Skip to content

cattle_grid.tools

cattle_grid.tools

rabbitmq

enqueue_to_routing_key_and_connection async

enqueue_to_routing_key_and_connection(
    connection: RobustConnection,
    asyncio_queue: Queue,
    routing_key: str,
    name: str = "cattle-grid",
    exchange_name: str = "amq.topic",
)

Subscribes to the routing key and adds all the received messages to asyncio_queue as a string

Source code in cattle_grid/tools/rabbitmq.py
async def enqueue_to_routing_key_and_connection(
    connection: aio_pika.RobustConnection,
    asyncio_queue: asyncio.Queue,
    routing_key: str,
    name: str = "cattle-grid",
    exchange_name: str = "amq.topic",
):
    """Subscribes to the routing key and adds all
    the received messages to `asyncio_queue` as a string"""

    async with queue_for_connection(
        connection, routing_key=routing_key, name=name, exchange_name=exchange_name
    ) as queue:
        async with queue.iterator() as iterator:
            try:
                async for message in iterator:
                    async with message.process():
                        message_body = message.body.decode()
                        await asyncio_queue.put(message_body)
            except Exception as e:
                logger.exception(e)

server_sent_events

ServerSentEventFromQueueAndTask

ServerSentEventFromQueueAndTask(
    request: Request,
    queue: Queue,
    task: Task,
    timeout: float = 50,
)

The task is supposed to add strings to queue via await queue.put(some_string).

from fastapi import Request

@fastapi.get("/stream")
def sample_stream(request: Request):
    queue = asyncio.Queue()
    task = asyncio.create_task(method_to_add_elements_to_queue(queue))

    return ServerSentEventFromQueueAndTask(request, queue, task)
Source code in cattle_grid/tools/server_sent_events.py
def ServerSentEventFromQueueAndTask(
    request: Request, queue: asyncio.Queue, task: asyncio.Task, timeout: float = 50
):
    """The task is supposed to add strings to queue via
    `await queue.put(some_string)`.

    ```python
    from fastapi import Request

    @fastapi.get("/stream")
    def sample_stream(request: Request):
        queue = asyncio.Queue()
        task = asyncio.create_task(method_to_add_elements_to_queue(queue))

        return ServerSentEventFromQueueAndTask(request, queue, task)
    ```
    """

    async_iterator = async_iterator_for_queue_and_task(queue, task, timeout=timeout)
    return ServerSentEventFromAsyncIterator(request, async_iterator)