from asyncworker.easyqueue.message import AMQPMessage
from asyncworker.options import Actions
[documentos]class RabbitMQMessage:
def __init__(
self,
delivery_tag: int,
amqp_message: AMQPMessage,
on_success: Actions = Actions.ACK,
on_exception: Actions = Actions.REQUEUE,
) -> None:
self._delivery_tag = delivery_tag
self._on_success_action = on_success
self._on_exception_action = on_exception
self._final_action = None
self._amqp_message = amqp_message
@property
def body(self):
return self._amqp_message.deserialized_data
@property
def serialized_data(self):
return self._amqp_message.serialized_data
[documentos] def reject(self, requeue=True):
"""
Marca essa mensagem para ser rejeitada. O parametro ``requeue`` indica se a mensagem será recolocada na fila original (``requeue=True``) ou será descartada (``requeue=False``).
"""
self._final_action = Actions.REQUEUE if requeue else Actions.REJECT
[documentos] def accept(self):
"""
Marca essa mensagem para ser confirmada (``ACK``) ao fim da execução do handler.
"""
self._final_action = Actions.ACK
async def _process_action(self, action: Actions):
if action == Actions.REJECT:
await self._amqp_message.reject(requeue=False)
elif action == Actions.REQUEUE:
await self._amqp_message.reject(requeue=True)
elif action == Actions.ACK:
await self._amqp_message.ack()
[documentos] async def process_success(self):
action = self._final_action or self._on_success_action
return await self._process_action(action)
[documentos] async def process_exception(self):
action = self._final_action or self._on_exception_action
return await self._process_action(action)