Código fonte para asyncworker.signals.handlers.rabbitmq
from asyncio import Task
from typing import TYPE_CHECKING, List
from asyncworker.connections import AMQPConnection
from asyncworker.consumer import Consumer
from asyncworker.options import RouteTypes
from asyncworker.signals.handlers.base import SignalHandler
if TYPE_CHECKING: # pragma: no cover
from asyncworker.app import App # noqa: F401
[documentos]class RabbitMQ(SignalHandler):
[documentos] async def startup(self, app: "App") -> List[Task]:
tasks = []
app[RouteTypes.AMQP_RABBITMQ]["consumers"] = []
for route_info in app.routes_registry.amqp_routes:
conn: AMQPConnection = app.get_connection_for_route(route_info)
consumer = Consumer(
route_info=route_info,
host=conn.hostname,
port=conn.port,
ssl=conn.ssl,
verify_ssl=conn.verify_ssl,
username=conn.username,
password=conn.password,
prefetch_count=conn.prefetch,
)
app[RouteTypes.AMQP_RABBITMQ]["consumers"].append(consumer)
conn.register(consumer.queue)
task = app.loop.create_task(consumer.start())
tasks.append(task)
return tasks