Parametros adicionais para o decorator app.amqp.consume()
Contents
Parametros adicionais para o decorator app.amqp.consume()#
Para um handler RabbitMQ o decorator @app.amqp.consume()
pode receber alguns parametros adicionais.
queues
: Lista de filas de onde esse handler receberá mensagens
vhost
: Indica em qual vhost as filas estatão definidas. Se não passarmos nada será usadovhost="/"
connection
: Serve para passar manualmente um objetoAMQPConnection
para esse handler. Isso é útil quando sua app se conecta a mais de um broker simultaneamente;
options
: É uma instância do objetoasyncworker.rabbitmq.AMQPRouteOptions
.
Exemplo de valores para o campo options#
O objeto AMQPRouteOptions
pode ter os seguintes atributos:
bulk_size
: Esse valor é um inteiro e diz qual será o tamanho máximo da lista que o handler vai receber, a cada vez que for chamado.
bulk_flush_interval
: Inteiro e diz o tempo máximo que o bulk de mensagens poderá ficar com tamanho menor do quebulk_size
. Exemplo: Se seu handler tem um bulk_size de 4096 mensagens mas você recebe apenas 100 msg/min na fila em alguns momentos seu handler será chamado recebendo uma lista de mensagens menor do que 4096.
connection_fail_callback
: Função assíncrona que é chamada caso haja uma falha durante a conexão com o broker. Essa função recebe a exceção que ocorreu e o número de retentativas que falharam até então. O número de retentativas é zerado quando o app consegue se conectar com o broker.
on_success
: Diz qual será a ação tomada pelo asyncworker quando uma chamada a um handler for concluída com sucesso, ou seja, o handler não lançar nenhuma exception. O Valor padrão éActions.ACK
on_exception
: Diz qual será a ação padrão quando a chamada a um handler lançar uma excação não tratada. O valor padrão éActions.REQUEUE
Exemplo de um código que usa essas opções:
from asyncworker import App
from asyncworker.options import Actions
from asyncworker.rabbitmq.AMQPRouteOptions
async def fail_handler(e: Exception, n: int):
print(f"error: {e}, retries {n}")
@app.amqp.consume(
["queue"],
options=AMQPRouteOptions(
bulk_size=60,
bulk_flush_interval=10,
on_success=Actions.ACK,
on_exception=Actions.REJECT,
),
)
async def _handler(messages):
pass
Consumindo de filas de outros virtualhosts#
É possível consumir de filas que estão em outros vistualhosts do mesmo broker. Para isso basta passar o parametro vhost
para do decorator @app.amqp.consume()
. Exemplo:
from asyncworker import App
from asyncworker.options import Options, Actions, Events
from asyncworker.rabbitmq.AMQPRouteOptions
async def fail_handler(e: Exception, n: int):
print(f"error: {e}, retries {n}")
@app.amqp.consume(
["queue"],
vhost="logs",
options=AMQPRouteOptions(
bulk_size=60,
bulk_flush_interval=10,
on_success=Actions.ACK,
on_exception=Actions.REJECT,
),
)
async def _handler(messages):
pass
Nesse caso esse handler consome a fila queue
do virtualhost logs
.
Consumindo de filas de brokers diferentes#
É possível consumir, de forma concorrente, de brokers diferentes. Basta que pra isso tenhamos duas conexões distintas e que passemos uma das conexões na hora de regisgtrar nossos handlers. Assim:
from datetime import datetime
from typing import List
from asyncworker import App
from asyncworker.connections import AMQPConnection
from asyncworker.http.methods import HTTPMethods
from asyncworker.options import Actions, Events, Options
from asyncworker.rabbitmq import RabbitMQMessage, AMQPRouteOptions
amqp_conn = AMQPConnection(
hostname="127.0.0.1:5672", username="guest", password="guest", prefetch=1024
)
amqp_conn_2 = AMQPConnection(
hostname="127.0.0.1:5673", username="guest", password="guest", prefetch=128
)
app = App(connections=[amqp_conn, amqp_conn_2])
@app.amqp.consume(["queue"], connection=amqp_conn)
async def _handler_broker_1(msgs: List[RabbitMQMessage]):
print(f"Broker 1 ({amqp_conn.hostname}): Recv: {len(msgs)}")
@app.amqp.consume(["queue"], connection=amqp_conn_2)
async def _handler_roker_2(msgs: List[RabbitMQMessage]):
print(f"Broker 2 ({amqp_conn_2.hostname}): Recv: {len(msgs)}")
@app.run_every(1)
async def produce(app: App):
await amqp_conn.put(data={"msg": "Broker 1"}, routing_key="queue")
await amqp_conn_2.put(data={"msg": "broker 2"}, routing_key="queue")
app.run()
Uma nota sobre bulk_size e prefetch#
O valor do BULK_SIZE
sempre é escolhido com a fórmula: min(BULK_SIZE, PREFRETCH)
. Isso para evitar que o código fique em um deadlock, onde ao mesmo tempo que ele aguarda o bulk encher para poder pegar mais mensagens da fila, ele está aguardando o bulk esvaziar para pegar mais mensagens da fila.
Flush timeout#
Com o flush timeout a app
não necessita ficar presa esperando o bucket encher para conseguir processar as mensagens.
Após o tempo do FLUSH_TIMEOUT
(que são DefaultValues.BULK_FLUSH_INTERVAL
segundos por default) a app
irá enviar todas as mensagens que já possui para o _handler
.
Por exemplo, se tivermos um handler
que possui:
Um
BULK_SIZE
de 1.000As mensagens para esse handles são publicadas diariamente
E o bucket desse handler ficou com 500 mensagens
Nesse caso a app
irá esperar o timeout do flush para liberar essas mensagens para o handler
.
Caso queira alterar o tempo default do timeout do flush basta definir env ASYNCWORKER_FLUSH_TIMEOUT
com um número que representará os segundos em que a app irá esperar para realizar o flush.
Também é possível alterar o tempo do timeout do flush definindo o campo Options.BULK_FLUSH_INTERVAL
do dicionário options
passado como parâmetro na criação da rota.
O valor passado para o parametro options
tem precedência sobre a variável de ambiente ASYNCWORKER_FLUSH_TIMEOUT
.
Exemplo de um código mais completo#
from typing import List
from asyncworker import App
from asyncworker.connections import AMQPConnection
from asyncworker.rabbitmq import RabbitMQMessage, AMQPRouteOptions
amqp_conn = AMQPConnection(
host="127.0.0.1",
user="guest",
password="guest",
prefetch_count=256
)
app = App(connections=[amqp_conn])
@app.amqp.consume(
["asgard/counts", "asgard/counts/errors"],
vhost="fluentd",
options=AMQPRouteOptions(bulk_size=60, bulk_flush_interval=10),
)
async def drain_handler(messages: List[RabbitMQMessage]):
for m in messages:
print(m)
Nesse exemplo, o handler drain_handler()
recebe mensagens de ambas as filas: asgard/counts
e asgard/counts/errors
, que estão no virtualhost fluentd
.
Se o handler lançar alguma exception, a mensagem é automaticamente devolvida para a fila (reject com requeue=True); Se o handler rodar sem erros, a mensagem é automaticamente confirmada (ack).
Escolhendo, individualmente, qual ação será dada a cada mensgem recebida#
Existem situações onde você precisa que algumas as mensagens recebidas pelo handler teham tratamentos diferentes das outras mensagens. Ou seja, nem sempre você quer que todas recebam ack
ou requeue
.
Para isso o objeto recebido por um handler (RabbitMQMessage
) possui alguns métodos:
- class asyncworker.rabbitmq.message.RabbitMQMessage(delivery_tag: int, amqp_message: asyncworker.easyqueue.message.AMQPMessage, on_success: asyncworker.options.Actions = Actions.ACK, on_exception: asyncworker.options.Actions = Actions.REQUEUE)[código fonte]
- accept()[código fonte]
Marca essa mensagem para ser confirmada (
ACK
) ao fim da execução do handler.
- reject(requeue=True)[código fonte]
Marca essa mensagem para ser rejeitada. O parametro
requeue
indica se a mensagem será recolocada na fila original (requeue=True
) ou será descartada (requeue=False
).
Opcionalmente, caso seja necessário rejeitar uma mensagem e ao mesmo tempo não devolver essa mensagem pra fila,
podemos chamar message.reject(requeue=False)
. O valor default do requeue
é True
.
Sobre AMQPConnection#
Esse objeto é o ponto de comunicação principal com um broker RabbitMQ. Aqui temos um método put
onde podemos enviar novas mensagens ao broker.
Essa classe é um modelo pydantic e pode receber alguns parametros no construtor. Esses parametros estão na declaração dessa classe. asyncworker.connections.AMQPConnection
.