Código fonte para asyncworker.easyqueue.message

from typing import Callable, Generic, Optional, TypeVar

from aioamqp.channel import Channel
from aioamqp.envelope import Envelope
from aioamqp.properties import Properties

from asyncworker.easyqueue.connection import AMQPConnection
from asyncworker.easyqueue.exceptions import UndecodableMessageException

T = TypeVar("T")


[documentos]class AMQPMessage(Generic[T]): __slots__ = ( "connection", "channel", "queue_name", "serialized_data", "delivery_tag", "_envelope", "_properties", "_deserialization_method", "_deserialized_data", "_queue", ) def __init__( self, connection: AMQPConnection, channel: Channel, queue_name: str, serialized_data: bytes, delivery_tag: int, envelope: Envelope, properties: Properties, deserialization_method: Callable[[bytes], T], queue, ) -> None: self.queue_name = queue_name self.serialized_data = serialized_data self.delivery_tag = delivery_tag self.connection = connection self.channel = channel self._envelope = envelope self._properties = properties self._deserialization_method = deserialization_method self._deserialized_data: Optional[T] = None self._queue = queue @property def deserialized_data(self) -> T: if self._deserialized_data: return self._deserialized_data try: self._deserialized_data = self._deserialization_method( self.serialized_data ) except ValueError as e: raise UndecodableMessageException( "msg couldn't be decoded as JSON" ) from e return self._deserialized_data def __eq__(self, other): if not isinstance(other, self.__class__): return False for attr in self.__slots__: if attr.startswith("__"): continue if getattr(self, attr) != getattr(other, attr): return False return True
[documentos] async def ack(self): return await self.channel.basic_client_ack(self.delivery_tag)
[documentos] async def reject(self, requeue=False): return await self.channel.basic_reject( delivery_tag=self.delivery_tag, requeue=requeue )