Código fonte para asyncworker.task_runners

import asyncio
from typing import TYPE_CHECKING, Callable, Coroutine, Optional, Set

from asyncworker.time import ClockTicker

if TYPE_CHECKING:
    from asyncworker.app import App  # pragma: nocover


[documentos]class ScheduledTaskRunner: def __init__( self, seconds: int, task: Callable[["App"], Coroutine], app: "App", max_concurrency: int, ) -> None: self.seconds = seconds self.max_concurrency = max_concurrency self.task = task self.app = app self.running_tasks: Set[asyncio.Future] = set() self.task_is_done_event = asyncio.Event() self._started = False self.clock = ClockTicker(seconds=self.seconds)
[documentos] async def can_dispatch_task(self) -> bool: if len(self.running_tasks) < self.max_concurrency: return True if await self.task_is_done_event.wait(): return True return False
async def _wrapped_task(self) -> None: """ Wraps the future task on a coroutine that's responsible for unregistering itself from the "running tasks" and emitting an "task is done" event """ try: await self.task(self.app) finally: self.task_is_done_event.set() self.running_tasks.remove(asyncio.current_task()) # type: ignore
[documentos] async def start(self, app: "App") -> asyncio.Future: self._started = True return asyncio.ensure_future(self._run())
[documentos] async def stop(self, app: "App") -> None: await self.clock.stop() await asyncio.gather(*self.running_tasks)
async def _run(self) -> None: async for tick in self.clock: if await self.can_dispatch_task(): task = asyncio.ensure_future(self._wrapped_task()) self.running_tasks.add(task) self.task_is_done_event.clear()