Skip to content

almabtrieb.stream

Stream dataclass

Transforms an asyncio.Queue into an asynchronous iterator with two helper function

Usage

queue = asyncio.Queue()
async for x in Stream(queue):
    await do_something_with(x)

Parameters:

Name Type Description Default
queue Queue
required
Source code in almabtrieb/stream.py
@dataclass
class Stream:
    """Transforms an asyncio.Queue into an asynchronous iterator with two helper function

    Usage
    ```python
    queue = asyncio.Queue()
    async for x in Stream(queue):
        await do_something_with(x)
    ```
    """

    queue: asyncio.Queue

    def __aiter__(self):
        return self

    async def next(self, timeout: float = 1):
        """The next element, if the wait time is longer than timeout, an
        [almabtrieb.stream.StreamNoNewItemException][] is raised."""
        try:
            async with asyncio.timeout(timeout):
                return await self.queue.get()
        except asyncio.TimeoutError:
            raise StreamNoNewItemException("No new message in stream")

    async def __anext__(self):
        result = await self.queue.get()
        if result is None:
            raise StopAsyncIteration()

        return result

    async def clear(self):
        """Removes all items from the queue"""
        while not self.queue.empty():
            await self.queue.get()

clear async

clear()

Removes all items from the queue

Source code in almabtrieb/stream.py
async def clear(self):
    """Removes all items from the queue"""
    while not self.queue.empty():
        await self.queue.get()

next async

next(timeout: float = 1)

The next element, if the wait time is longer than timeout, an almabtrieb.stream.StreamNoNewItemException is raised.

Source code in almabtrieb/stream.py
async def next(self, timeout: float = 1):
    """The next element, if the wait time is longer than timeout, an
    [almabtrieb.stream.StreamNoNewItemException][] is raised."""
    try:
        async with asyncio.timeout(timeout):
            return await self.queue.get()
    except asyncio.TimeoutError:
        raise StreamNoNewItemException("No new message in stream")

StreamNoNewItemException

Bases: Exception

Raised if no new item is present in stream

Source code in almabtrieb/stream.py
5
6
7
8
class StreamNoNewItemException(Exception):
    """Raised if no new item is present in stream"""

    ...