import asyncio
import traceback
from ssl import SSLContext
from typing import Dict, List, Type, Union, Optional
from aioamqp.exceptions import AioamqpException
from asyncworker import conf
from asyncworker.easyqueue.message import AMQPMessage
from asyncworker.easyqueue.queue import JsonQueue, QueueConsumerDelegate
from asyncworker.options import DefaultValues, Events, Options
from asyncworker.routes import AMQPRoute
from asyncworker.time import ClockTicker
from .bucket import Bucket
from .conf import settings
from .rabbitmq import RabbitMQMessage
[documentos]class Consumer(QueueConsumerDelegate):
def __init__(
self,
route_info: Union[Dict, AMQPRoute],
host: str,
username: str,
password: str,
prefetch_count: int = 128,
port: int = settings.AMQP_DEFAULT_PORT,
ssl: Optional[SSLContext] = None,
verify_ssl: bool = True,
bucket_class: Type[Bucket] = Bucket[RabbitMQMessage],
) -> None:
self.route = route_info
self._handler = route_info["handler"]
self._queue_name = route_info["routes"]
self._route_options = route_info["options"]
self.host = host
self.ssl = ssl
self.verify_ssl = verify_ssl
self.vhost = route_info.get("vhost", "/")
self.bucket = bucket_class(
size=min(self._route_options["bulk_size"], prefetch_count)
)
self.queue: JsonQueue = JsonQueue(
host,
username,
password,
port=port,
ssl=ssl,
verify_ssl=verify_ssl,
virtual_host=self.vhost,
delegate=self,
prefetch_count=prefetch_count,
logger=conf.logger,
connection_fail_callback=self._route_options.get(
Options.CONNECTION_FAIL_CALLBACK, None
),
)
self.clock = ClockTicker(
seconds=self._route_options.get(
Options.BULK_FLUSH_INTERVAL, conf.settings.FLUSH_TIMEOUT
)
)
self.clock_task = None
@property
def queue_name(self) -> str:
return self._queue_name
[documentos] async def on_before_start_consumption(
self, queue_name: str, queue: "JsonQueue"
):
"""
Coroutine called before queue consumption starts. May be overwritten to
implement further custom initialization.
:param queue_name: Queue name that will be consumed
:type queue_name: str
:param queue: AsynQueue instanced
:type queue: JsonQueue
"""
pass
[documentos] async def on_consumption_start(self, consumer_tag: str, queue: "JsonQueue"):
"""
Coroutine called once consumption started.
"""
pass
[documentos] async def on_queue_message(self, msg: AMQPMessage):
"""
Callback called every time that a new, valid and deserialized message
is ready to be handled.
"""
if not self.bucket.is_full():
message = RabbitMQMessage(
delivery_tag=msg.delivery_tag,
amqp_message=msg,
on_success=self._route_options[Events.ON_SUCCESS],
on_exception=self._route_options[Events.ON_EXCEPTION],
)
self.bucket.put(message)
if self.bucket.is_full():
return await self._flush_bucket_if_needed()
async def _flush_clocked(self):
async for _ in self.clock:
try:
await self._flush_bucket_if_needed()
except Exception as e:
await conf.logger.error(
{
"type": "flush-bucket-failed",
"dest": self.host,
"retry": True,
"exc_traceback": traceback.format_exc(),
}
)
async def _flush_bucket_if_needed(self):
try:
if not self.bucket.is_empty():
all_messages = self.bucket.pop_all()
await conf.logger.debug(
{
"event": "bucket-flush",
"bucket-size": len(all_messages),
"handler": self._handler.__name__,
}
)
rv = await self._handler(all_messages)
await asyncio.gather(
*(m.process_success() for m in all_messages)
)
return rv
except AioamqpException as aioamqpException:
raise aioamqpException
except Exception as e:
await asyncio.gather(*(m.process_exception() for m in all_messages))
raise e
[documentos] async def on_queue_error(self, body, delivery_tag, error, queue):
"""
Callback called every time that an error occurred during the validation
or deserialization stage.
:param body: unparsed, raw message content
:type body: Any
:param delivery_tag: delivery_tag of the consumed message
:type delivery_tag: int
:param error: THe error that caused the callback to be called
:type error: MessageError
:type queue: JsonQueue
"""
await conf.logger.error(
{
"parse-error": True,
"exception": "Error: not a JSON",
"original_msg": body,
}
)
try:
await queue.ack(delivery_tag=delivery_tag)
except AioamqpException as e:
await self._log_exception(e)
[documentos] async def on_message_handle_error(self, handler_error: Exception, **kwargs):
"""
Callback called when an uncaught exception was raised during message
handling stage.
:param handler_error: The exception that triggered
:param kwargs: arguments used to call the coroutine that handled
the message
"""
await self._log_exception(handler_error)
[documentos] async def on_connection_error(self, exception: Exception):
"""
Called when the connection fails
"""
await self._log_exception(exception)
async def _log_exception(self, exception):
current_exception = {
"exc_message": str(exception),
"exc_traceback": traceback.format_exc(),
}
await conf.logger.error(current_exception)
[documentos] async def consume_all_queues(self, queue: JsonQueue):
for queue_name in self._queue_name:
# Por enquanto não estamos guardando a consumer_tag retornada
# se precisar, podemos passar a guardar.
await conf.logger.debug(
{"queue": queue_name, "event": "start-consume"}
)
await queue.consume(queue_name=queue_name, delegate=self)
[documentos] async def start(self):
while self.keep_runnig():
if not self.queue.connection.has_channel_ready():
try:
await self.consume_all_queues(self.queue)
if not self.clock_task:
self.clock_task = self._flush_clocked()
asyncio.get_event_loop().create_task(self.clock_task)
except Exception as e:
await conf.logger.error(
{
"type": "connection-failed",
"dest": self.host,
"retry": True,
"exc_traceback": traceback.format_exc(),
}
)
await asyncio.sleep(1)