import asyncio
from ssl import SSLContext
from typing import Callable, Coroutine, Optional, Union
import aioamqp
from aioamqp import AmqpProtocol
from aioamqp.channel import Channel
from aioamqp.exceptions import AioamqpException
from aioamqp.protocol import OPEN
from asyncworker.conf import settings
OnErrorCallback = Union[
None, Callable[[Exception], None], Callable[[Exception], Coroutine]
]
[documentos]class AMQPConnection:
def __init__(
self,
host: str,
username: str,
password: str,
port: int = settings.AMQP_DEFAULT_PORT,
ssl: Optional[SSLContext] = None,
verify_ssl: bool = True,
heartbeat: int = 60,
virtual_host: str = "/",
loop: Optional[asyncio.AbstractEventLoop] = None,
on_error: OnErrorCallback = None,
) -> None:
self.host = host
self.port = port
self.ssl = ssl
self.verify_ssl = verify_ssl
self.username = username
self.password = password
self.virtual_host = virtual_host
self.heartbeat = heartbeat
self.loop = loop
self._on_error = on_error
self._connection_lock = asyncio.Lock()
self.channel: Channel = None
self._transport: Optional[asyncio.BaseTransport] = None
self._protocol: AmqpProtocol = None
@property
def connection_parameters(self):
return {
"host": self.host,
"port": self.port,
"ssl": self.ssl,
"verify_ssl": self.verify_ssl,
"login": self.username,
"password": self.password,
"virtualhost": self.virtual_host,
"loop": self.loop,
"on_error": self._on_error,
"heartbeat": self.heartbeat,
}
@property
def is_connected(self) -> bool:
procotol_ok = self._protocol and self._protocol.state == OPEN
return procotol_ok
[documentos] def has_channel_ready(self):
channel_ok = self.channel and self.channel.is_open
return channel_ok
[documentos] async def close(self) -> None:
if not self.is_connected:
return None
await self._protocol.close()
self._transport.close() # type: ignore
async def _connect(self) -> None:
async with self._connection_lock:
if self.is_connected and self.has_channel_ready():
return
try:
if self._protocol:
self.channel = await self._protocol.channel()
return
except AioamqpException as e:
# Se não conseguirmos pegar um channel novo
# a conexão atual deve mesmo ser renovada e isso
# será feito logo abaixo.
pass
conn = await aioamqp.connect(**self.connection_parameters)
self._transport, self._protocol = conn
self.channel = await self._protocol.channel()