Código fonte para asyncworker.rabbitmq.entrypoints

from typing import List, Optional

from asyncworker import conf
from asyncworker.connections import AMQPConnection
from asyncworker.entrypoints import EntrypointInterface, _extract_async_callable
from asyncworker.routes import AMQPRoute, AMQPRouteOptions, RoutesRegistry


def _register_amqp_handler(
    registry: RoutesRegistry,
    routes: List[str],
    vhost: str,
    connection: Optional[AMQPConnection],
    options: Optional[AMQPRouteOptions],
):
    def _wrap(f):
        cb = _extract_async_callable(f)
        route = AMQPRoute(
            handler=cb,
            routes=routes,
            vhost=vhost,
            connection=connection,
            options=options,
        )
        registry.add_amqp_route(route)

        return f

    return _wrap


[documentos]class AMQPRouteEntryPointImpl(EntrypointInterface):
[documentos] def consume( self, routes: List[str], vhost: str = conf.settings.AMQP_DEFAULT_VHOST, connection: Optional[AMQPConnection] = None, options: Optional[AMQPRouteOptions] = AMQPRouteOptions(), ): return _register_amqp_handler( self.app.routes_registry, routes, vhost, connection, options )