Início rápido#

Um exemplo rápido para mostrar a ideia geral do asynworker.

from asyncworker import App
from asyncworker.http.wrapper import RequestWrapper


app = App()

@app.http.get(["/"])
async def handler(wrapper: RequestWrapper):
  return web.json_response({})

Esse código é possível ser rodado na linha de comando e é capaz de atendar a uma requisição HTTP assim:

curl http://127.0.0.1:8080/

Consumindo de uma fila no RabbitMQ#

from asyncworker import App
from asyncworker.connections import AMQPConnection


amqp_conn = AMQPConnection(host="127.0.0.1", user="guest", password="guest", prefetch_count=256)
app = App(connections=[amqp_conn])

@app.amqp.consume(["asgard/counts", "asgard/counts/errors"],
           vhost="fluentd")
async def drain_handler(message):
    print(message)

Nesse exemplo, o handler drain_handler() recebe mensagens de ambas as filas: asgard/counts e asgard/counts/errors, que estão no virtualhost fluentd.

Se o handler lançar alguma exception, a mensagem é automaticamente devolvida para a fila (reject com requeue=True); Se o handler rodar sem erros, a mensagem é automaticamente confirmada (ack).

Lendo dados de um endpoint Server Side Events#

import json
from asyncworker import App, RouteTypes, Options
from asyncworker.connections import SSEConnection


sse_conn = SSEConnection(url="http://172.18.0.31:8080/")
app = App(connections=[sse_conn])

@app.route(["/v2/events"], type=RouteTypes.SSE, options={Options.BULK_SIZE: 2})
async def _on_event(events):
    event_names = [e.name for e in events]
    print(f"Events received: {len(events)} {event_names}")
    for event in events:
        data = ""
        if event.name == 'deployment_info':
            data = event.body['plan']['id']
        if event.name == 'deployment_success':
            data = event.body['id']
        if event.name == 'status_update_event':
            data = f"app={event.body['appId']}, task={event.body['taskId']} ({event.body['taskStatus']})"

        print(f"Event Received: {event.name} {data}")

Nesse exemplo, o handler _on_event() recebe os eventos enviados pelo servidor. O objeto events é sempre uma lista, mesmo quando estamos usando BULK_SIZE=1 (Falaremos sobre isso mais a frente)

Recebendo dados através de requisições HTTP#

from asyncworker import App
from asynworker.http.wrapper import RequestWrapper

# ...

@app.http.get(routes=['/', '/hello'])
async def index(wrapper: RequestWrapper) -> web.Response:
    return web.Response(body="Hello world")

Nesse exemplo, declaramos um handler index, que receberá uma instância de asyncworker.http.wrapper.RequestWrapper para cada acesso às rotas GET / e GET /hello.

Rodando seu worker#

Ambos os exemplos precisam de um main() para poderem rodar. Um exemplo de main seria o seguinte, assumindo que o objeto app está no módulo myworker:

from myworker import app

app.run()

Nesse ponto sua app já estará rodando e caso você seja desconectado, um loop ficará tentanto reconectar. A cada erro de conexão um log de exception é gerado.

No momento que você roda esse código (app.run()) todos os seus handlers registrados começam a funcionar.