Código fonte para asyncworker.entrypoints

from abc import ABC
from asyncio import iscoroutinefunction
from typing import Generic, TypeVar

import asyncworker
from asyncworker.routes import RouteHandler

T = TypeVar("T")


def _extract_async_callable(handler) -> RouteHandler:
    cb = handler

    if not iscoroutinefunction(cb):
        try:
            cb = handler.__call__
            if not iscoroutinefunction(cb):
                raise TypeError(f"handler must be async: {cb}")
        except AttributeError as e:
            raise TypeError(f"Object passed as handler is not callable: {cb}")

    return cb


[documentos]class EntrypointInterface(ABC, Generic[T]): def __init__(self, app: "asyncworker.App") -> None: self.app = app