import abc
from collections import UserDict
from typing import (
Any,
Callable,
Coroutine,
Dict,
Iterable,
List,
Optional,
Type,
Union,
)
from aiohttp import web
from aiohttp.hdrs import METH_ALL
from aiohttp.web_routedef import RouteDef
from cached_property import cached_property
from pydantic import BaseModel, Extra, root_validator, validator
from asyncworker import conf
from asyncworker.connections import AMQPConnection, Connection
from asyncworker.http.wrapper import RequestWrapper
from asyncworker.options import Actions, DefaultValues, RouteTypes
from asyncworker.types.registry import TypesRegistry
from asyncworker.types.resolver import ArgResolver, MissingTypeAnnotationError
RouteHandler = Callable[..., Coroutine]
[documentos]class Model(BaseModel, abc.ABC):
"""
An abstract pydantic BaseModel that also behaves like a Mapping
"""
def __getitem__(self, item):
try:
return getattr(self, item)
except AttributeError as e:
raise KeyError from e
def __setitem__(self, key, value):
try:
return self.__setattr__(key, value)
except (AttributeError, ValueError) as e:
raise KeyError from e
def __eq__(self, other):
if isinstance(other, dict):
return self.dict() == other
return super(Model, self).__eq__(other)
def __len__(self):
return len(self.__fields__)
[documentos] def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
class _RouteOptions(Model):
pass
[documentos]class Route(Model, abc.ABC):
"""
An abstract Model that acts like a route factory
"""
type: RouteTypes
handler: Any
routes: List[str]
connection: Optional[Connection]
options: _RouteOptions = _RouteOptions()
[documentos] @staticmethod
def factory(data: Dict) -> "Route":
try:
type_ = data.pop("type")
except KeyError as e:
raise ValueError("Routes must have a type") from e
if type_ == RouteTypes.HTTP:
return HTTPRoute(**data)
if type_ == RouteTypes.AMQP_RABBITMQ:
return AMQPRoute(**data)
raise ValueError(f"'{type_}' is an invalid RouteType.")
[documentos]class HTTPRoute(Route):
type: RouteTypes = RouteTypes.HTTP
methods: List[str]
options: _RouteOptions = _RouteOptions()
@classmethod
def _validate_method(cls, method: str) -> str:
method = method.upper()
if method not in METH_ALL:
raise ValueError(f"'{method}' isn't a valid supported HTTP method.")
return method
[documentos] @validator("methods")
def validate_method(cls, v: Union[str, List[str]]):
# compatibility with older versions of pydantic
if isinstance(v, str): # pragma: no cover
return cls._validate_method(v)
return [cls._validate_method(method) for method in v]
@root_validator
def _validate_metrics_route(cls, values: dict) -> dict:
if not conf.settings.METRICS_ROUTE_ENABLED:
return values
if "methods" not in values:
return values
if "GET" not in values["methods"]:
return values
if conf.settings.METRICS_ROUTE_PATH in values["routes"]:
raise ValueError(
f"Conflicting HTTP routes."
f"Defining a `{conf.settings.METRICS_ROUTE_PATH}` "
f"conflicts with asyncworker's metrics path. Consider the "
f"following options: a) Remove your route and use asyncworker "
f"metrics; b) disable asyncworker's metrics route "
)
return values
[documentos] def aiohttp_routes(self) -> Iterable[RouteDef]:
for route in self.routes:
for method in self.methods:
kwargs = {"allow_head": False} if method == "GET" else {}
yield RouteDef(
method=method,
path=route,
handler=self.handler,
kwargs=kwargs,
)
[documentos]class AMQPRouteOptions(_RouteOptions):
bulk_size: int = DefaultValues.BULK_SIZE
bulk_flush_interval: int = DefaultValues.BULK_FLUSH_INTERVAL
on_success: Actions = DefaultValues.ON_SUCCESS
on_exception: Actions = DefaultValues.ON_EXCEPTION
connection_fail_callback: Optional[
Callable[[Exception, int], Coroutine]
] = None
connection: Optional[Union[AMQPConnection, str]]
[documentos] class Config:
arbitrary_types_allowed = False
extra = Extra.forbid
[documentos]class AMQPRoute(Route):
type: RouteTypes = RouteTypes.AMQP_RABBITMQ
vhost: str = conf.settings.AMQP_DEFAULT_VHOST
connection: Optional[AMQPConnection]
options: AMQPRouteOptions
[documentos]async def call_http_handler(request: RequestWrapper, handler: RouteHandler):
arg_resolver = ArgResolver(registry=request.types_registry)
try:
return await arg_resolver.wrap(handler)
except MissingTypeAnnotationError:
raise
[documentos]def http_handler_wrapper(handler):
async def _insert_types_registry(request: web.Request):
"""
Esse é o único ponto que tem contato direto com o aiohttp. É essa corotina que é efetivament registrada nas rotas do aiohttp. Daqui pra frente tudo é chamado através do ccall_http_handler().
"""
registry = TypesRegistry()
request["types_registry"] = registry
registry.set(request)
r_wrapper = RequestWrapper(
http_request=request, types_registry=registry
)
registry.set(r_wrapper)
return await call_http_handler(r_wrapper, handler)
return _insert_types_registry
[documentos]class RoutesRegistry(UserDict):
def _get_routes_for_type(self, route_type: Type) -> Iterable:
return tuple((r for r in self.values() if isinstance(r, route_type)))
@cached_property
def http_routes(self) -> Iterable[HTTPRoute]:
return self._get_routes_for_type(HTTPRoute)
@cached_property
def amqp_routes(self) -> Iterable[AMQPRoute]:
return self._get_routes_for_type(AMQPRoute)
def __setitem__(self, key: RouteHandler, value: Union[Dict, Route]):
if not isinstance(value, Route):
route = Route.factory({"handler": key, **value})
else:
route = value
if route.type == RouteTypes.HTTP:
route.handler = http_handler_wrapper(key)
super(RoutesRegistry, self).__setitem__(key, route)
[documentos] def add_route(self, route: Route) -> None:
self[route.handler] = route
[documentos] def add_http_route(self, route: HTTPRoute) -> None:
self[route.handler] = route
[documentos] def add_amqp_route(self, route: AMQPRoute) -> None:
self[route.handler] = route
[documentos] def route_for(self, handler: RouteHandler) -> Route:
return self[handler]