asyncworker.easyqueue package
Contents
asyncworker.easyqueue package#
Submodules#
asyncworker.easyqueue.connection module#
- class asyncworker.easyqueue.connection.AMQPConnection(host: str, username: str, password: str, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, heartbeat: int = 60, virtual_host: str = '/', loop: Optional[asyncio.events.AbstractEventLoop] = None, on_error: Union[None, Callable[[Exception], None], Callable[[Exception], Coroutine]] = None)[código fonte]#
Base:
object
- async close() None [código fonte]#
- property connection_parameters#
- has_channel_ready()[código fonte]#
- property is_connected: bool#
asyncworker.easyqueue.exceptions module#
- exception asyncworker.easyqueue.exceptions.EmptyQueueException[código fonte]#
Base:
Exception
No message to get
- exception asyncworker.easyqueue.exceptions.InvalidMessageSizeException(message=None)[código fonte]#
- exception asyncworker.easyqueue.exceptions.MessageError[código fonte]#
Base:
ValueError
Base for all message exceptions
- exception asyncworker.easyqueue.exceptions.UndecodableMessageException[código fonte]#
Base:
asyncworker.easyqueue.exceptions.MessageError
Can’t decode as JSON
asyncworker.easyqueue.message module#
- class asyncworker.easyqueue.message.AMQPMessage(connection: asyncworker.easyqueue.connection.AMQPConnection, channel: aioamqp.channel.Channel, queue_name: str, serialized_data: bytes, delivery_tag: int, envelope: aioamqp.envelope.Envelope, properties: aioamqp.properties.Properties, deserialization_method: Callable[[bytes], asyncworker.easyqueue.message.T], queue)[código fonte]#
Base:
Generic
[asyncworker.easyqueue.message.T
]- async ack()[código fonte]#
- channel#
- connection#
- delivery_tag#
- property deserialized_data: asyncworker.easyqueue.message.T#
- queue_name#
- async reject(requeue=False)[código fonte]#
- serialized_data#
asyncworker.easyqueue.queue module#
- class asyncworker.easyqueue.queue.BaseJsonQueue(host: str, username: str, password: str, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, virtual_host: str = '/', heartbeat: int = 60)[código fonte]#
Base:
asyncworker.easyqueue.queue.BaseQueue
- content_type = 'application/json'#
- deserialize(body: bytes) Any [código fonte]#
- serialize(body: Any, **kwargs) str [código fonte]#
- class asyncworker.easyqueue.queue.BaseQueue(host: str, username: str, password: str, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, virtual_host: str = '/', heartbeat: int = 60)[código fonte]#
Base:
object
- abstract deserialize(body: bytes) Any [código fonte]#
- abstract serialize(body: Any, **kwargs) str [código fonte]#
- class asyncworker.easyqueue.queue.ConnType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#
Base:
enum.Enum
- CONSUME = 1#
- WRITE = 2#
- class asyncworker.easyqueue.queue.DeliveryModes[código fonte]#
Base:
object
- NON_PERSISTENT = 1#
- PERSISTENT = 2#
- class asyncworker.easyqueue.queue.JsonQueue(host: str, username: str, password: str, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, delegate_class: Optional[Type[asyncworker.easyqueue.queue.QueueConsumerDelegate]] = None, delegate: Optional[asyncworker.easyqueue.queue.QueueConsumerDelegate] = None, virtual_host: str = '/', heartbeat: int = 60, prefetch_count: int = 100, loop: Optional[asyncio.events.AbstractEventLoop] = None, seconds_between_conn_retry: int = 1, logger: Optional[logging.Logger] = None, connection_fail_callback: Optional[Callable[[Exception, int], Coroutine]] = None)[código fonte]#
Base:
asyncworker.easyqueue.queue.BaseQueue
,Generic
[asyncworker.easyqueue.queue.T
]- conn_for(type: asyncworker.easyqueue.queue.ConnType) asyncworker.easyqueue.connection.AMQPConnection [código fonte]#
- async consume(queue_name: str, delegate: asyncworker.easyqueue.queue.QueueConsumerDelegate, consumer_name: str = '') str [código fonte]#
Connects the client if needed and starts queue consumption, sending on_before_start_consumption and on_consumption_start notifications to the delegate object
- Parâmetros
queue_name – queue name to consume from
consumer_name – An optional name to be used as a consumer
identifier. If one isn’t provided, a random one is generated by the broker :return: The consumer tag. Useful for cancelling/stopping consumption
- deserialize(body: bytes) asyncworker.easyqueue.queue.T [código fonte]#
- async put(routing_key: str, data: Any = None, serialized_data: Union[str, bytes] = '', exchange: str = '', properties: Optional[dict] = None, mandatory: bool = False, immediate: bool = False)[código fonte]#
- Parâmetros
data – A serializable data that should be serialized before
publishing :param serialized_data: A payload to be published as is :param exchange: The exchange to publish the message :param routing_key: The routing key to publish the message
- serialize(body: asyncworker.easyqueue.queue.T, **kwargs) str [código fonte]#
- class asyncworker.easyqueue.queue.QueueConsumerDelegate[código fonte]#
Base:
object
- async on_before_start_consumption(queue_name: str, queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#
Coroutine called before queue consumption starts. May be overwritten to implement further custom initialization.
- Parâmetros
queue_name (str) – Queue name that will be consumed
queue (JsonQueue) – AsynQueue instanced
- async on_connection_error(exception: Exception)[código fonte]#
Called when the connection fails
- async on_consumption_start(consumer_tag: str, queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#
Coroutine called once consumption started.
- async on_message_handle_error(handler_error: Exception, **kwargs)[código fonte]#
Callback called when an uncaught exception was raised during message handling stage.
- Parâmetros
handler_error – The exception that triggered
kwargs – arguments used to call the coroutine that handled
the message :return:
- abstract async on_queue_message(msg: asyncworker.easyqueue.message.AMQPMessage[Any])[código fonte]#
Callback called every time that a new, valid and deserialized message is ready to be handled.
- Parâmetros
msg – the consumed message