asyncworker package#

Subpackages#

Submodules#

asyncworker.app module#

class asyncworker.app.App(connections: Optional[Iterable[asyncworker.connections.Connection]] = None)[código fonte]#

Base: collections.abc.MutableMapping, asyncworker.signals.base.Freezable

property amqp: asyncworker.rabbitmq.entrypoints.AMQPRouteEntryPointImpl#
async freeze()[código fonte]#
get_connection(name: str) asyncworker.connections.Connection[código fonte]#
get_connection_for_route(route_info: asyncworker.routes.Route)[código fonte]#
handlers = (<asyncworker.signals.handlers.rabbitmq.RabbitMQ object>, <asyncworker.signals.handlers.http.HTTPServer object>)#
property http: asyncworker.http.entrypoints.HTTPEntryPointImpl#
route(routes: Iterable[str], type: asyncworker.options.RouteTypes, options: Optional[dict] = None, **kwargs)[código fonte]#
run()[código fonte]#
run_every(seconds: int, options: Optional[Dict] = None)[código fonte]#

Registers a coroutine to be called with a given interval

run_on_shutdown(coro: Callable[[asyncworker.app.App], Coroutine]) None[código fonte]#

Registers a coroutine to be awaited for during app shutdown

run_on_startup(coro: Callable[[asyncworker.app.App], Coroutine]) None[código fonte]#

Registers a coroutine to be awaited for during app startup

shutdown() _asyncio.Future[código fonte]#

Schedules an on_startup signal

Is called automatically when the application receives a SIGINT or SIGTERM

shutdown_os_signals = (<Signals.SIGINT: 2>, <Signals.SIGTERM: 15>)#
async startup()[código fonte]#

Causes on_startup signal

Should be called in the event loop along with the request handler.

asyncworker.bucket module#

class asyncworker.bucket.Bucket(size: int)[código fonte]#

Base: Generic[asyncworker.bucket.T]

is_empty() bool[código fonte]#
is_full() bool[código fonte]#
pop_all() List[asyncworker.bucket.T][código fonte]#
put(item: asyncworker.bucket.T)[código fonte]#
property used: int#
exception asyncworker.bucket.BucketFullException[código fonte]#

Base: Exception

asyncworker.conf module#

class asyncworker.conf.Settings(_env_file: Optional[Union[pathlib.Path, str]] = '<object object>', _env_file_encoding: Optional[str] = None, _secrets_dir: Optional[Union[pathlib.Path, str]] = None, *, LOGLEVEL: str = 'ERROR', AMQP_DEFAULT_VHOST: str = '/', AMQP_DEFAULT_PORT: int = 5672, AMQP_DEFAULT_PREFETCH_COUNT: int = 128, AMQP_DEFAULT_HEARTBEAT: int = 60, HTTP_HOST: str = '127.0.0.1', HTTP_PORT: int = 8080, FLUSH_TIMEOUT: int = 60, METRICS_NAMESPACE: str = 'asyncworker', METRICS_APPPREFIX: str = None, METRICS_ROUTE_PATH: str = '/metrics', METRICS_ROUTE_ENABLED: bool = True, METRICS_DEFAULT_HISTOGRAM_BUCKETS_IN_MS: List[float] = [10, 50, 100, 200, 500, 1000, 5000, inf])[código fonte]#

Base: pydantic.env_settings.BaseSettings

AMQP_DEFAULT_HEARTBEAT: int#
AMQP_DEFAULT_PORT: int#
AMQP_DEFAULT_PREFETCH_COUNT: int#
AMQP_DEFAULT_VHOST: str#
class Config[código fonte]#

Base: object

allow_mutation = False#
env_prefix = 'ASYNCWORKER_'#
FLUSH_TIMEOUT: int#
HTTP_HOST: str#
HTTP_PORT: int#
LOGLEVEL: str#
METRICS_APPPREFIX: Optional[str]#
METRICS_DEFAULT_HISTOGRAM_BUCKETS_IN_MS: List[float]#
METRICS_NAMESPACE: str#
METRICS_ROUTE_ENABLED: bool#
METRICS_ROUTE_PATH: str#

asyncworker.connections module#

class asyncworker.connections.AMQPConnection(*, route_type: asyncworker.options.RouteTypes = RouteTypes.AMQP_RABBITMQ, name: str = None, hostname: str, username: str, password: str, port: int = 5672, ssl: ssl.SSLContext = None, verify_ssl: bool = True, prefetch: int = 128, heartbeat: int = 60, connections: Dict[str, asyncworker.easyqueue.queue.JsonQueue] = {})[código fonte]#

Base: asyncworker.connections.Connection

class Config[código fonte]#

Base: object

arbitrary_types_allowed = True#
connections: Dict[str, asyncworker.easyqueue.queue.JsonQueue]#
heartbeat: int#
hostname: str#
items()[código fonte]#
keys()[código fonte]#
name: Optional[str]#
password: str#
port: int#
prefetch: int#
async put(routing_key: str, data: Any = None, serialized_data: Optional[Union[str, bytes]] = None, exchange: str = '', vhost: str = '/', properties: Optional[dict] = None, mandatory: bool = False, immediate: bool = False)[código fonte]#
register(queue: asyncworker.easyqueue.queue.JsonQueue) None[código fonte]#

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

classmethod set_connections(v)[código fonte]#
ssl: Optional[ssl.SSLContext]#
username: str#
values()[código fonte]#
verify_ssl: bool#
class asyncworker.connections.Connection(*, route_type: asyncworker.options.RouteTypes, name: str = None)[código fonte]#

Base: pydantic.main.BaseModel, abc.ABC

Common ancestral for all Connection classes that auto generates a connection name and is responsible for keeping track of new connections on the ConnectionsMapping

name: Optional[str]#
route_type: asyncworker.options.RouteTypes#
class asyncworker.connections.ConnectionsMapping[código fonte]#

Base: Mapping[str, asyncworker.connections.Connection], asyncworker.signals.base.Freezable

A mapping (Connection.name->Connection) of all available connections that also keeps a counter for each connection type

add(connections: Iterable[asyncworker.connections.Connection]) None[código fonte]#
with_type(route_type: asyncworker.options.RouteTypes) List[asyncworker.connections.Connection][código fonte]#

asyncworker.consumer module#

class asyncworker.consumer.Consumer(route_info: Union[Dict, asyncworker.routes.AMQPRoute], host: str, username: str, password: str, prefetch_count: int = 128, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, bucket_class: Type[asyncworker.bucket.Bucket] = asyncworker.bucket.Bucket[asyncworker.rabbitmq.message.RabbitMQMessage])[código fonte]#

Base: asyncworker.easyqueue.queue.QueueConsumerDelegate

async consume_all_queues(queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#
keep_runnig()[código fonte]#
async on_before_start_consumption(queue_name: str, queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#

Coroutine called before queue consumption starts. May be overwritten to implement further custom initialization.

Parâmetros
  • queue_name (str) – Queue name that will be consumed

  • queue (JsonQueue) – AsynQueue instanced

async on_connection_error(exception: Exception)[código fonte]#

Called when the connection fails

async on_consumption_start(consumer_tag: str, queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#

Coroutine called once consumption started.

async on_message_handle_error(handler_error: Exception, **kwargs)[código fonte]#

Callback called when an uncaught exception was raised during message handling stage.

Parâmetros
  • handler_error – The exception that triggered

  • kwargs – arguments used to call the coroutine that handled

the message

async on_queue_error(body, delivery_tag, error, queue)[código fonte]#

Callback called every time that an error occurred during the validation or deserialization stage.

Parâmetros
  • body (Any) – unparsed, raw message content

  • delivery_tag (int) – delivery_tag of the consumed message

  • error (MessageError) – THe error that caused the callback to be called

async on_queue_message(msg: asyncworker.easyqueue.message.AMQPMessage)[código fonte]#

Callback called every time that a new, valid and deserialized message is ready to be handled.

property queue_name: str#
async start()[código fonte]#

asyncworker.decorators module#

asyncworker.decorators.wraps(original_handler)[código fonte]#

Esse decorator faz com que a assinatura da função original “suba” até o último decorator, que deverá ser sempre um registrador do próprio asyncworker. ex: @app.http.get(…) @deco1 @deco2 async def handler(…)

pass

Nesse caso, os decorators @deco1 e @deco2 devem, necessariamente fazer uso desse @wraps()

asyncworker.entrypoints module#

class asyncworker.entrypoints.EntrypointInterface(app: asyncworker.app.App)[código fonte]#

Base: abc.ABC, Generic[asyncworker.entrypoints.T]

asyncworker.exceptions module#

exception asyncworker.exceptions.InvalidConnection[código fonte]#

Base: ValueError

Defines an invalid connection definition condition.

exception asyncworker.exceptions.InvalidRoute[código fonte]#

Base: ValueError

Defines an invalid route definition condition.

asyncworker.options module#

class asyncworker.options.Actions(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#

Base: asyncworker.options.AutoNameEnum

ACK = 'ack'#
REJECT = 'reject'#
REQUEUE = 'requeue'#
class asyncworker.options.AutoNameEnum(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#

Base: str, enum.Enum

class asyncworker.options.DefaultValues[código fonte]#

Base: object

BULK_FLUSH_INTERVAL = 60#
BULK_SIZE = 1#
ON_EXCEPTION = 'requeue'#
ON_SUCCESS = 'ack'#
RUN_EVERY_MAX_CONCURRENCY = 1#
class asyncworker.options.Events(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#

Base: asyncworker.options.AutoNameEnum

ON_EXCEPTION = 'on_exception'#
ON_SUCCESS = 'on_success'#
class asyncworker.options.Options(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#

Base: asyncworker.options.AutoNameEnum

BULK_FLUSH_INTERVAL = 'bulk_flush_interval'#
BULK_SIZE = 'bulk_size'#
CONNECTION_FAIL_CALLBACK = 'connection_fail_callback'#
MAX_CONCURRENCY = 'max_concurrency'#
class asyncworker.options.RouteTypes(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#

Base: asyncworker.options.AutoNameEnum

AMQP_RABBITMQ = 'amqp_rabbitmq'#
HTTP = 'http'#

asyncworker.routes module#

class asyncworker.routes.AMQPRoute(*, type: asyncworker.options.RouteTypes = RouteTypes.AMQP_RABBITMQ, handler: Any = None, routes: List[str], connection: asyncworker.connections.AMQPConnection = None, options: asyncworker.routes.AMQPRouteOptions, vhost: str = '/')[código fonte]#

Base: asyncworker.routes.Route

connection: Optional[asyncworker.connections.AMQPConnection]#
options: asyncworker.routes.AMQPRouteOptions#
type: asyncworker.options.RouteTypes#
vhost: str#
class asyncworker.routes.AMQPRouteOptions(*, bulk_size: int = 1, bulk_flush_interval: int = 60, on_success: asyncworker.options.Actions = Actions.ACK, on_exception: asyncworker.options.Actions = Actions.REQUEUE, connection_fail_callback: Callable[[Exception, int], Coroutine] = None, connection: Optional[Union[asyncworker.connections.AMQPConnection, str]] = None)[código fonte]#

Base: asyncworker.routes._RouteOptions

class Config[código fonte]#

Base: object

arbitrary_types_allowed = False#
extra = 'forbid'#
bulk_flush_interval: int#
bulk_size: int#
connection: Optional[Union[asyncworker.connections.AMQPConnection, str]]#
connection_fail_callback: Optional[Callable[[Exception, int], Coroutine]]#
on_exception: asyncworker.options.Actions#
on_success: asyncworker.options.Actions#
class asyncworker.routes.HTTPRoute(*, type: asyncworker.options.RouteTypes = RouteTypes.HTTP, handler: Any = None, routes: List[str], connection: asyncworker.connections.Connection = None, options: asyncworker.routes._RouteOptions = _RouteOptions(), methods: List[str])[código fonte]#

Base: asyncworker.routes.Route

aiohttp_routes() Iterable[aiohttp.web_routedef.RouteDef][código fonte]#
methods: List[str]#
options: asyncworker.routes._RouteOptions#
type: asyncworker.options.RouteTypes#
classmethod validate_method(v: Union[str, List[str]])[código fonte]#
class asyncworker.routes.Model[código fonte]#

Base: pydantic.main.BaseModel, abc.ABC

An abstract pydantic BaseModel that also behaves like a Mapping

get(key, default=None)[código fonte]#
keys()[código fonte]#
class asyncworker.routes.Route(*, type: asyncworker.options.RouteTypes, handler: Any = None, routes: List[str], connection: asyncworker.connections.Connection = None, options: asyncworker.routes._RouteOptions = _RouteOptions())[código fonte]#

Base: asyncworker.routes.Model, abc.ABC

An abstract Model that acts like a route factory

connection: Optional[asyncworker.connections.Connection]#
static factory(data: Dict) asyncworker.routes.Route[código fonte]#
handler: Any#
options: asyncworker.routes._RouteOptions#
routes: List[str]#
type: asyncworker.options.RouteTypes#
class asyncworker.routes.RoutesRegistry(dict=None, /, **kwargs)[código fonte]#

Base: collections.UserDict

add_amqp_route(route: asyncworker.routes.AMQPRoute) None[código fonte]#
add_http_route(route: asyncworker.routes.HTTPRoute) None[código fonte]#
add_route(route: asyncworker.routes.Route) None[código fonte]#
amqp_routes#
http_routes#
route_for(handler: Callable[[...], Coroutine]) asyncworker.routes.Route[código fonte]#
async asyncworker.routes.call_http_handler(request: asyncworker.http.wrapper.RequestWrapper, handler: Callable[[...], Coroutine])[código fonte]#
asyncworker.routes.http_handler_wrapper(handler)[código fonte]#

asyncworker.task_runners module#

class asyncworker.task_runners.ScheduledTaskRunner(seconds: int, task: Callable[[App], Coroutine], app: App, max_concurrency: int)[código fonte]#

Base: object

async can_dispatch_task() bool[código fonte]#
async start(app: App) _asyncio.Future[código fonte]#
async stop(app: App) None[código fonte]#

asyncworker.time module#

class asyncworker.time.ClockTicker(seconds: Union[float, int])[código fonte]#

Base: collections.abc.AsyncIterator

T - A clock tick F - Something that happens inside an iteration (“x” = running “-” = waiting) I - A clock iteration

E.g:

async for tick in Clock(seconds=2):

await asyncio.sleep(3)

T: 15——17——19——21——23——25——27——29—— F: xxxxxxxxxxxxx—xxxxxxxxxxxxx—xxxxxxxxxxxxx—xxxxxxxxxxxxx— I: x—————x—————x—————x—————

async stop() None[código fonte]#
asyncworker.time.perf_counter_ms() float[código fonte]#

Return the value (in fractional milliseconds) of a performance counter, i.e. a clock with the highest available resolution to measure a short duration. It does include time elapsed during sleep and is system-wide. The reference point of the returned value is undefined, so that only the difference between the results of consecutive calls is valid.

asyncworker.utils module#

class asyncworker.utils.Timeit(name: str, callback: Optional[Callable[[...], Coroutine]] = None)[código fonte]#

Base: object

TRANSACTIONS_KEY = 'transactions'#
asyncworker.utils.entrypoint(f)[código fonte]#

Module contents#