asyncworker.easyqueue package#

Submodules#

asyncworker.easyqueue.connection module#

class asyncworker.easyqueue.connection.AMQPConnection(host: str, username: str, password: str, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, heartbeat: int = 60, virtual_host: str = '/', loop: Optional[asyncio.events.AbstractEventLoop] = None, on_error: Union[None, Callable[[Exception], None], Callable[[Exception], Coroutine]] = None)[código fonte]#

Base: object

async close() None[código fonte]#
property connection_parameters#
has_channel_ready()[código fonte]#
property is_connected: bool#

asyncworker.easyqueue.exceptions module#

exception asyncworker.easyqueue.exceptions.EmptyQueueException[código fonte]#

Base: Exception

No message to get

exception asyncworker.easyqueue.exceptions.InvalidMessageSizeException(message=None)[código fonte]#

Base: asyncworker.easyqueue.exceptions.MessageError

exception asyncworker.easyqueue.exceptions.MessageError[código fonte]#

Base: ValueError

Base for all message exceptions

exception asyncworker.easyqueue.exceptions.UndecodableMessageException[código fonte]#

Base: asyncworker.easyqueue.exceptions.MessageError

Can’t decode as JSON

asyncworker.easyqueue.message module#

class asyncworker.easyqueue.message.AMQPMessage(connection: asyncworker.easyqueue.connection.AMQPConnection, channel: aioamqp.channel.Channel, queue_name: str, serialized_data: bytes, delivery_tag: int, envelope: aioamqp.envelope.Envelope, properties: aioamqp.properties.Properties, deserialization_method: Callable[[bytes], asyncworker.easyqueue.message.T], queue)[código fonte]#

Base: Generic[asyncworker.easyqueue.message.T]

async ack()[código fonte]#
channel#
connection#
delivery_tag#
property deserialized_data: asyncworker.easyqueue.message.T#
queue_name#
async reject(requeue=False)[código fonte]#
serialized_data#

asyncworker.easyqueue.queue module#

class asyncworker.easyqueue.queue.BaseJsonQueue(host: str, username: str, password: str, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, virtual_host: str = '/', heartbeat: int = 60)[código fonte]#

Base: asyncworker.easyqueue.queue.BaseQueue

content_type = 'application/json'#
deserialize(body: bytes) Any[código fonte]#
serialize(body: Any, **kwargs) str[código fonte]#
class asyncworker.easyqueue.queue.BaseQueue(host: str, username: str, password: str, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, virtual_host: str = '/', heartbeat: int = 60)[código fonte]#

Base: object

abstract deserialize(body: bytes) Any[código fonte]#
abstract serialize(body: Any, **kwargs) str[código fonte]#
class asyncworker.easyqueue.queue.ConnType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#

Base: enum.Enum

CONSUME = 1#
WRITE = 2#
class asyncworker.easyqueue.queue.DeliveryModes[código fonte]#

Base: object

NON_PERSISTENT = 1#
PERSISTENT = 2#
class asyncworker.easyqueue.queue.JsonQueue(host: str, username: str, password: str, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, delegate_class: Optional[Type[asyncworker.easyqueue.queue.QueueConsumerDelegate]] = None, delegate: Optional[asyncworker.easyqueue.queue.QueueConsumerDelegate] = None, virtual_host: str = '/', heartbeat: int = 60, prefetch_count: int = 100, loop: Optional[asyncio.events.AbstractEventLoop] = None, seconds_between_conn_retry: int = 1, logger: Optional[logging.Logger] = None, connection_fail_callback: Optional[Callable[[Exception, int], Coroutine]] = None)[código fonte]#

Base: asyncworker.easyqueue.queue.BaseQueue, Generic[asyncworker.easyqueue.queue.T]

conn_for(type: asyncworker.easyqueue.queue.ConnType) asyncworker.easyqueue.connection.AMQPConnection[código fonte]#
async consume(queue_name: str, delegate: asyncworker.easyqueue.queue.QueueConsumerDelegate, consumer_name: str = '') str[código fonte]#

Connects the client if needed and starts queue consumption, sending on_before_start_consumption and on_consumption_start notifications to the delegate object

Parâmetros
  • queue_name – queue name to consume from

  • consumer_name – An optional name to be used as a consumer

identifier. If one isn’t provided, a random one is generated by the broker :return: The consumer tag. Useful for cancelling/stopping consumption

deserialize(body: bytes) asyncworker.easyqueue.queue.T[código fonte]#
async put(routing_key: str, data: Any = None, serialized_data: Union[str, bytes] = '', exchange: str = '', properties: Optional[dict] = None, mandatory: bool = False, immediate: bool = False)[código fonte]#
Parâmetros

data – A serializable data that should be serialized before

publishing :param serialized_data: A payload to be published as is :param exchange: The exchange to publish the message :param routing_key: The routing key to publish the message

serialize(body: asyncworker.easyqueue.queue.T, **kwargs) str[código fonte]#
class asyncworker.easyqueue.queue.QueueConsumerDelegate[código fonte]#

Base: object

async on_before_start_consumption(queue_name: str, queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#

Coroutine called before queue consumption starts. May be overwritten to implement further custom initialization.

Parâmetros
  • queue_name (str) – Queue name that will be consumed

  • queue (JsonQueue) – AsynQueue instanced

async on_connection_error(exception: Exception)[código fonte]#

Called when the connection fails

async on_consumption_start(consumer_tag: str, queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#

Coroutine called once consumption started.

async on_message_handle_error(handler_error: Exception, **kwargs)[código fonte]#

Callback called when an uncaught exception was raised during message handling stage.

Parâmetros
  • handler_error – The exception that triggered

  • kwargs – arguments used to call the coroutine that handled

the message :return:

abstract async on_queue_message(msg: asyncworker.easyqueue.message.AMQPMessage[Any])[código fonte]#

Callback called every time that a new, valid and deserialized message is ready to be handled.

Parâmetros

msg – the consumed message

Module contents#