RabbitMQ
RabbitMQ#
Aqui você verá como escrever um handler que recebe mensagens de um broker RabbitMQ
Todo handler desse tipo recebe o mesmo parametro, que é uma lista de objetos asyncworker.rabbitmq.message.RabbitMQMessage
.
Isso significa que a assinatura dos seus handlers são fixas, ou seja, todos eles possuem essa assinatura:
from asyncworker.rabbitmq.message import RabbitMQMessage
from typing import List
async def handler(msgs: List[RabbitMQMessage]):
...
Como no caso de handlers RabitMQ é preciso ter uma conexão prévia com o servidor de filas, precisamos criar uma instância de asyncworker.connections.AMQPConnection
. Essa instância deve ser passada no momento da criação de sua Asyncworker App.
Essa instância de conexão pode também ser usada dentro do handler, caso necessário.
Um exemplo disso é quando precisamos de um handler que lê mensagens de um fila e publica em outra. Esse exemplo pode ser escrito assim:
from typing import List
from asyncworker import App
from asyncworker.connections import AMQPConnection
from asyncworker.rabbitmq import RabbitMQMessage
amqp_conn = AMQPConnection(
hostname="127.0.0.1",
username="guest",
password="guest",
prefetch_count=256
)
app = App(connections=[amqp_conn])
@app.amqp.consume(["original_queue"])
async def handler(messages: List[RabbitMQMessage]):
await amqp_conn.put(
data={"dogs": ["Xablau", "Xena"]},
exchange="ANOTHER_EXCHANGE",
routing_key="another-routing-key"
)
se a fila de destino estiver um outro virtual host, basta pegar uma nova conexão com esse virtual host acessando o atributo (dict like) com o nome do virtual host desejado, no objeto da conexão, assim:
@app.amqp.consume(["original_queue"])
async def handler(messages: List[RabbitMQMessage]):
await amqp_conn["other-vhost"].put(
data={"dogs": ["Xablau", "Xena"]},
exchange="ANOTHER_EXCHANGE",
routing_key="another-routing-key"
)