asyncworker package
Contents
asyncworker package#
Subpackages#
- asyncworker.easyqueue package
- asyncworker.http package
- asyncworker.metrics package
- asyncworker.rabbitmq package
- asyncworker.signals package
- asyncworker.testing package
- asyncworker.types package
- asyncworker.typing package
Submodules#
asyncworker.app module#
- class asyncworker.app.App(connections: Optional[Iterable[asyncworker.connections.Connection]] = None)[código fonte]#
Base:
collections.abc.MutableMapping
,asyncworker.signals.base.Freezable
- property amqp: asyncworker.rabbitmq.entrypoints.AMQPRouteEntryPointImpl#
- async freeze()[código fonte]#
- get_connection(name: str) asyncworker.connections.Connection [código fonte]#
- get_connection_for_route(route_info: asyncworker.routes.Route)[código fonte]#
- handlers = (<asyncworker.signals.handlers.rabbitmq.RabbitMQ object>, <asyncworker.signals.handlers.http.HTTPServer object>)#
- property http: asyncworker.http.entrypoints.HTTPEntryPointImpl#
- route(routes: Iterable[str], type: asyncworker.options.RouteTypes, options: Optional[dict] = None, **kwargs)[código fonte]#
- run()[código fonte]#
- run_every(seconds: int, options: Optional[Dict] = None)[código fonte]#
Registers a coroutine to be called with a given interval
- run_on_shutdown(coro: Callable[[asyncworker.app.App], Coroutine]) None [código fonte]#
Registers a coroutine to be awaited for during app shutdown
- run_on_startup(coro: Callable[[asyncworker.app.App], Coroutine]) None [código fonte]#
Registers a coroutine to be awaited for during app startup
- shutdown() _asyncio.Future [código fonte]#
Schedules an on_startup signal
Is called automatically when the application receives a SIGINT or SIGTERM
- shutdown_os_signals = (<Signals.SIGINT: 2>, <Signals.SIGTERM: 15>)#
- async startup()[código fonte]#
Causes on_startup signal
Should be called in the event loop along with the request handler.
asyncworker.bucket module#
- class asyncworker.bucket.Bucket(size: int)[código fonte]#
Base:
Generic
[asyncworker.bucket.T
]- is_empty() bool [código fonte]#
- is_full() bool [código fonte]#
- pop_all() List[asyncworker.bucket.T] [código fonte]#
- put(item: asyncworker.bucket.T)[código fonte]#
- property used: int#
- exception asyncworker.bucket.BucketFullException[código fonte]#
Base:
Exception
asyncworker.conf module#
- class asyncworker.conf.Settings(_env_file: Optional[Union[pathlib.Path, str]] = '<object object>', _env_file_encoding: Optional[str] = None, _secrets_dir: Optional[Union[pathlib.Path, str]] = None, *, LOGLEVEL: str = 'ERROR', AMQP_DEFAULT_VHOST: str = '/', AMQP_DEFAULT_PORT: int = 5672, AMQP_DEFAULT_PREFETCH_COUNT: int = 128, AMQP_DEFAULT_HEARTBEAT: int = 60, HTTP_HOST: str = '127.0.0.1', HTTP_PORT: int = 8080, FLUSH_TIMEOUT: int = 60, METRICS_NAMESPACE: str = 'asyncworker', METRICS_APPPREFIX: str = None, METRICS_ROUTE_PATH: str = '/metrics', METRICS_ROUTE_ENABLED: bool = True, METRICS_DEFAULT_HISTOGRAM_BUCKETS_IN_MS: List[float] = [10, 50, 100, 200, 500, 1000, 5000, inf])[código fonte]#
Base:
pydantic.env_settings.BaseSettings
- AMQP_DEFAULT_HEARTBEAT: int#
- AMQP_DEFAULT_PORT: int#
- AMQP_DEFAULT_PREFETCH_COUNT: int#
- AMQP_DEFAULT_VHOST: str#
- class Config[código fonte]#
Base:
object
- allow_mutation = False#
- env_prefix = 'ASYNCWORKER_'#
- FLUSH_TIMEOUT: int#
- HTTP_HOST: str#
- HTTP_PORT: int#
- LOGLEVEL: str#
- METRICS_APPPREFIX: Optional[str]#
- METRICS_DEFAULT_HISTOGRAM_BUCKETS_IN_MS: List[float]#
- METRICS_NAMESPACE: str#
- METRICS_ROUTE_ENABLED: bool#
- METRICS_ROUTE_PATH: str#
asyncworker.connections module#
- class asyncworker.connections.AMQPConnection(*, route_type: asyncworker.options.RouteTypes = RouteTypes.AMQP_RABBITMQ, name: str = None, hostname: str, username: str, password: str, port: int = 5672, ssl: ssl.SSLContext = None, verify_ssl: bool = True, prefetch: int = 128, heartbeat: int = 60, connections: Dict[str, asyncworker.easyqueue.queue.JsonQueue] = {})[código fonte]#
Base:
asyncworker.connections.Connection
- class Config[código fonte]#
Base:
object
- arbitrary_types_allowed = True#
- connections: Dict[str, asyncworker.easyqueue.queue.JsonQueue]#
- heartbeat: int#
- hostname: str#
- items()[código fonte]#
- keys()[código fonte]#
- name: Optional[str]#
- password: str#
- port: int#
- prefetch: int#
- async put(routing_key: str, data: Any = None, serialized_data: Optional[Union[str, bytes]] = None, exchange: str = '', vhost: str = '/', properties: Optional[dict] = None, mandatory: bool = False, immediate: bool = False)[código fonte]#
- register(queue: asyncworker.easyqueue.queue.JsonQueue) None [código fonte]#
Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
- classmethod set_connections(v)[código fonte]#
- ssl: Optional[ssl.SSLContext]#
- username: str#
- values()[código fonte]#
- verify_ssl: bool#
- class asyncworker.connections.Connection(*, route_type: asyncworker.options.RouteTypes, name: str = None)[código fonte]#
Base:
pydantic.main.BaseModel
,abc.ABC
Common ancestral for all Connection classes that auto generates a connection name and is responsible for keeping track of new connections on the ConnectionsMapping
- name: Optional[str]#
- route_type: asyncworker.options.RouteTypes#
- class asyncworker.connections.ConnectionsMapping[código fonte]#
Base:
Mapping
[str
,asyncworker.connections.Connection
],asyncworker.signals.base.Freezable
A mapping (Connection.name->Connection) of all available connections that also keeps a counter for each connection type
- add(connections: Iterable[asyncworker.connections.Connection]) None [código fonte]#
- with_type(route_type: asyncworker.options.RouteTypes) List[asyncworker.connections.Connection] [código fonte]#
asyncworker.consumer module#
- class asyncworker.consumer.Consumer(route_info: Union[Dict, asyncworker.routes.AMQPRoute], host: str, username: str, password: str, prefetch_count: int = 128, port: int = 5672, ssl: Optional[ssl.SSLContext] = None, verify_ssl: bool = True, bucket_class: Type[asyncworker.bucket.Bucket] = asyncworker.bucket.Bucket[asyncworker.rabbitmq.message.RabbitMQMessage])[código fonte]#
Base:
asyncworker.easyqueue.queue.QueueConsumerDelegate
- async consume_all_queues(queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#
- keep_runnig()[código fonte]#
- async on_before_start_consumption(queue_name: str, queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#
Coroutine called before queue consumption starts. May be overwritten to implement further custom initialization.
- Parâmetros
queue_name (str) – Queue name that will be consumed
queue (JsonQueue) – AsynQueue instanced
- async on_connection_error(exception: Exception)[código fonte]#
Called when the connection fails
- async on_consumption_start(consumer_tag: str, queue: asyncworker.easyqueue.queue.JsonQueue)[código fonte]#
Coroutine called once consumption started.
- async on_message_handle_error(handler_error: Exception, **kwargs)[código fonte]#
Callback called when an uncaught exception was raised during message handling stage.
- Parâmetros
handler_error – The exception that triggered
kwargs – arguments used to call the coroutine that handled
the message
- async on_queue_error(body, delivery_tag, error, queue)[código fonte]#
Callback called every time that an error occurred during the validation or deserialization stage.
- Parâmetros
body (Any) – unparsed, raw message content
delivery_tag (int) – delivery_tag of the consumed message
error (MessageError) – THe error that caused the callback to be called
- async on_queue_message(msg: asyncworker.easyqueue.message.AMQPMessage)[código fonte]#
Callback called every time that a new, valid and deserialized message is ready to be handled.
- property queue_name: str#
- async start()[código fonte]#
asyncworker.decorators module#
- asyncworker.decorators.wraps(original_handler)[código fonte]#
Esse decorator faz com que a assinatura da função original “suba” até o último decorator, que deverá ser sempre um registrador do próprio asyncworker. ex: @app.http.get(…) @deco1 @deco2 async def handler(…)
pass
Nesse caso, os decorators @deco1 e @deco2 devem, necessariamente fazer uso desse @wraps()
asyncworker.entrypoints module#
- class asyncworker.entrypoints.EntrypointInterface(app: asyncworker.app.App)[código fonte]#
Base:
abc.ABC
,Generic
[asyncworker.entrypoints.T
]
asyncworker.exceptions module#
- exception asyncworker.exceptions.InvalidConnection[código fonte]#
Base:
ValueError
Defines an invalid connection definition condition.
- exception asyncworker.exceptions.InvalidRoute[código fonte]#
Base:
ValueError
Defines an invalid route definition condition.
asyncworker.options module#
- class asyncworker.options.Actions(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#
Base:
asyncworker.options.AutoNameEnum
- ACK = 'ack'#
- REJECT = 'reject'#
- REQUEUE = 'requeue'#
- class asyncworker.options.AutoNameEnum(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#
Base:
str
,enum.Enum
- class asyncworker.options.DefaultValues[código fonte]#
Base:
object
- BULK_FLUSH_INTERVAL = 60#
- BULK_SIZE = 1#
- ON_EXCEPTION = 'requeue'#
- ON_SUCCESS = 'ack'#
- RUN_EVERY_MAX_CONCURRENCY = 1#
- class asyncworker.options.Events(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#
Base:
asyncworker.options.AutoNameEnum
- ON_EXCEPTION = 'on_exception'#
- ON_SUCCESS = 'on_success'#
- class asyncworker.options.Options(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#
Base:
asyncworker.options.AutoNameEnum
- BULK_FLUSH_INTERVAL = 'bulk_flush_interval'#
- BULK_SIZE = 'bulk_size'#
- CONNECTION_FAIL_CALLBACK = 'connection_fail_callback'#
- MAX_CONCURRENCY = 'max_concurrency'#
- class asyncworker.options.RouteTypes(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[código fonte]#
Base:
asyncworker.options.AutoNameEnum
- AMQP_RABBITMQ = 'amqp_rabbitmq'#
- HTTP = 'http'#
asyncworker.routes module#
- class asyncworker.routes.AMQPRoute(*, type: asyncworker.options.RouteTypes = RouteTypes.AMQP_RABBITMQ, handler: Any = None, routes: List[str], connection: asyncworker.connections.AMQPConnection = None, options: asyncworker.routes.AMQPRouteOptions, vhost: str = '/')[código fonte]#
Base:
asyncworker.routes.Route
- connection: Optional[asyncworker.connections.AMQPConnection]#
- options: asyncworker.routes.AMQPRouteOptions#
- vhost: str#
- class asyncworker.routes.AMQPRouteOptions(*, bulk_size: int = 1, bulk_flush_interval: int = 60, on_success: asyncworker.options.Actions = Actions.ACK, on_exception: asyncworker.options.Actions = Actions.REQUEUE, connection_fail_callback: Callable[[Exception, int], Coroutine] = None, connection: Optional[Union[asyncworker.connections.AMQPConnection, str]] = None)[código fonte]#
Base:
asyncworker.routes._RouteOptions
- class Config[código fonte]#
Base:
object
- arbitrary_types_allowed = False#
- extra = 'forbid'#
- bulk_flush_interval: int#
- bulk_size: int#
- connection: Optional[Union[asyncworker.connections.AMQPConnection, str]]#
- connection_fail_callback: Optional[Callable[[Exception, int], Coroutine]]#
- on_exception: asyncworker.options.Actions#
- on_success: asyncworker.options.Actions#
- class asyncworker.routes.HTTPRoute(*, type: asyncworker.options.RouteTypes = RouteTypes.HTTP, handler: Any = None, routes: List[str], connection: asyncworker.connections.Connection = None, options: asyncworker.routes._RouteOptions = _RouteOptions(), methods: List[str])[código fonte]#
Base:
asyncworker.routes.Route
- aiohttp_routes() Iterable[aiohttp.web_routedef.RouteDef] [código fonte]#
- methods: List[str]#
- options: asyncworker.routes._RouteOptions#
- classmethod validate_method(v: Union[str, List[str]])[código fonte]#
- class asyncworker.routes.Model[código fonte]#
Base:
pydantic.main.BaseModel
,abc.ABC
An abstract pydantic BaseModel that also behaves like a Mapping
- get(key, default=None)[código fonte]#
- keys()[código fonte]#
- class asyncworker.routes.Route(*, type: asyncworker.options.RouteTypes, handler: Any = None, routes: List[str], connection: asyncworker.connections.Connection = None, options: asyncworker.routes._RouteOptions = _RouteOptions())[código fonte]#
Base:
asyncworker.routes.Model
,abc.ABC
An abstract Model that acts like a route factory
- connection: Optional[asyncworker.connections.Connection]#
- static factory(data: Dict) asyncworker.routes.Route [código fonte]#
- handler: Any#
- options: asyncworker.routes._RouteOptions#
- routes: List[str]#
- class asyncworker.routes.RoutesRegistry(dict=None, /, **kwargs)[código fonte]#
Base:
collections.UserDict
- add_amqp_route(route: asyncworker.routes.AMQPRoute) None [código fonte]#
- add_http_route(route: asyncworker.routes.HTTPRoute) None [código fonte]#
- add_route(route: asyncworker.routes.Route) None [código fonte]#
- amqp_routes#
- http_routes#
- route_for(handler: Callable[[...], Coroutine]) asyncworker.routes.Route [código fonte]#
- async asyncworker.routes.call_http_handler(request: asyncworker.http.wrapper.RequestWrapper, handler: Callable[[...], Coroutine])[código fonte]#
- asyncworker.routes.http_handler_wrapper(handler)[código fonte]#
asyncworker.task_runners module#
- class asyncworker.task_runners.ScheduledTaskRunner(seconds: int, task: Callable[[App], Coroutine], app: App, max_concurrency: int)[código fonte]#
Base:
object
- async can_dispatch_task() bool [código fonte]#
- async start(app: App) _asyncio.Future [código fonte]#
- async stop(app: App) None [código fonte]#
asyncworker.time module#
- class asyncworker.time.ClockTicker(seconds: Union[float, int])[código fonte]#
Base:
collections.abc.AsyncIterator
T - A clock tick F - Something that happens inside an iteration (“x” = running “-” = waiting) I - A clock iteration
E.g:
- async for tick in Clock(seconds=2):
await asyncio.sleep(3)
T: 15——17——19——21——23——25——27——29—— F: xxxxxxxxxxxxx—xxxxxxxxxxxxx—xxxxxxxxxxxxx—xxxxxxxxxxxxx— I: x—————x—————x—————x—————
- async stop() None [código fonte]#
- asyncworker.time.perf_counter_ms() float [código fonte]#
Return the value (in fractional milliseconds) of a performance counter, i.e. a clock with the highest available resolution to measure a short duration. It does include time elapsed during sleep and is system-wide. The reference point of the returned value is undefined, so that only the difference between the results of consecutive calls is valid.
asyncworker.utils module#
- class asyncworker.utils.Timeit(name: str, callback: Optional[Callable[[...], Coroutine]] = None)[código fonte]#
Base:
object
- TRANSACTIONS_KEY = 'transactions'#
- asyncworker.utils.entrypoint(f)[código fonte]#