Código fonte para asyncworker.bucket
from typing import Generic, List, TypeVar
T = TypeVar("T")
[documentos]class Bucket(Generic[T]):
def __init__(self, size: int) -> None:
self.size = size
# fixme: Criar uma interface comum para as *Message
# para substituir esse Any
self._items: List[T] = []
[documentos] def put(self, item: T):
if self.is_full():
error_msg = f"Bucket is at full capacity: {self.size}"
raise BucketFullException(error_msg)
self._items.append(item)
@property
def used(self) -> int:
return len(self._items)