Skip to content

cattle_grid.account.router

cattle_grid.account.router

The exchanges used by cattle_grid are using routing keys to make processing easier. The cattle_grid gateway takes these messages and readdresses them with routing keys based on an user. Here an user can have multiple actors.

Furthermore, convenience methods are provided to manage users and actors through a HTTP Api. This is in difference to interacting with the Fediverse, which is done through a message queue.

create_gateway_router

create_gateway_router() -> RabbitRouter

Creates a router that moves messages to be routed by user.

Parameters:

Name Type Description Default
config
required

Returns:

Type Description
RabbitRouter
Source code in cattle_grid/account/router/__init__.py
def create_gateway_router() -> RabbitRouter:
    """Creates a router that moves messages to be routed by user.

    :param config:
    :returns:
    """
    gateway_router = RabbitRouter()
    gateway_router.include_router(create_router())

    return gateway_router

annotations

AccountFromRoutingKey module-attribute

AccountFromRoutingKey = Annotated[Account, Depends(account)]

Returns the account from the routing key

AccountName module-attribute

Assigns the account name extracted from the routing key

ActorForAccountFromMessage module-attribute

ActorForAccountFromMessage = Annotated[
    ActorForAccount, Depends(actor_for_account_from_account)
]

The actor provided in the send message

ActorFromMessage module-attribute

ActorFromMessage = Annotated[Actor, Depends(actor)]

The actor provided in the send message

MethodFromRoutingKey module-attribute

MethodFromRoutingKey = Annotated[
    str, Depends(method_from_routing_key)
]

Returns the method of a trigger message

RoutingKey module-attribute

RoutingKey = Annotated[
    str, Context("message.raw_message.routing_key")
]

The AMQP routing key

method_from_routing_key

method_from_routing_key(
    name: AccountName, routing_key: RoutingKey
) -> str

Extracts the method from the routing key

>>> method_from_routing_key("alice", "send.alice.trigger.method.first")
'method.first'
Source code in cattle_grid/account/router/annotations.py
def method_from_routing_key(
    name: AccountName,
    routing_key: RoutingKey,
) -> str:
    """
    Extracts the method from the routing key

    ```pycon
    >>> method_from_routing_key("alice", "send.alice.trigger.method.first")
    'method.first'

    ```
    """
    start_string = f"send.{name}.trigger."
    if routing_key.startswith(start_string):
        return routing_key.removeprefix(start_string)
    else:
        raise ValueError("Invalid routing key for trigger")

name_from_routing_key

name_from_routing_key(routing_key: RoutingKey) -> str
>>> name_from_routing_key("receiving.alice")
'alice'

>>> name_from_routing_key("receiving.alice.action.fetch")
'alice'
Source code in cattle_grid/account/router/annotations.py
def name_from_routing_key(
    routing_key: RoutingKey,
) -> str:
    """
    ```pycon
    >>> name_from_routing_key("receiving.alice")
    'alice'

    >>> name_from_routing_key("receiving.alice.action.fetch")
    'alice'

    ```
    """
    return routing_key.split(".")[1]

router

create_actor_handler async

create_actor_handler(
    create_message: CreateActorRequest,
    correlation_id: CorrelationId,
    account: AccountFromRoutingKey,
    broker: RabbitBroker = Context(),
) -> None

Creates an actor associated with the account.

Updating and deleting actors is done through trigger events.

Source code in cattle_grid/account/router/router.py
async def create_actor_handler(
    create_message: CreateActorRequest,
    correlation_id: CorrelationId,
    account: AccountFromRoutingKey,
    broker: RabbitBroker = Context(),
) -> None:
    """Creates an actor associated with the account.

    Updating and deleting actors is done through trigger events."""

    if not await can_create_actor_at_base_url(account, create_message.base_url):
        raise ValueError(f"Base URL {create_message.base_url} not in allowed base urls")

    actor = await create_actor(
        create_message.base_url,
        preferred_username=create_message.preferred_username,
        profile=create_message.profile,
    )

    await ActorForAccount.create(
        account=account,
        actor=actor.actor_id,
        name=create_message.name or "from drive",
    )

    if create_message.automatically_accept_followers:
        actor.automatically_accept_followers = True
        await actor.save()

    result = actor_to_object(actor)

    logger.info("Created actor %s for %s", actor.actor_id, account.name)

    await broker.publish(
        result,
        routing_key=f"receive.{account.name}.response.create_actor",
        exchange=global_container.account_exchange,
        correlation_id=correlation_id,
    )

handle_fetch async

handle_fetch(
    msg: FetchMessage,
    correlation_id: CorrelationId,
    name: AccountName,
    actor: ActorFromMessage,
    broker: RabbitBroker = Context(),
)

Used to retrieve an object

Source code in cattle_grid/account/router/router.py
async def handle_fetch(
    msg: FetchMessage,
    correlation_id: CorrelationId,
    name: AccountName,
    actor: ActorFromMessage,
    broker: RabbitBroker = Context(),
):
    """Used to retrieve an object"""
    try:
        async with asyncio.timeout(0.5):
            result = await broker.publish(
                {"actor": actor.actor_id, "uri": msg.uri},
                routing_key="fetch_object",
                exchange=global_container.internal_exchange,
                rpc=True,
            )
        if result == b"":
            result = None
        logger.info("GOT result %s", result)
    except TimeoutError as e:
        logger.error("Request ran into timeout %s", e)
        result = None

    if result:
        await broker.publish(
            FetchResponse(
                uri=msg.uri,
                actor=actor.actor_id,
                data=result,
            ),
            routing_key=f"receive.{name}.response.fetch",
            exchange=global_container.account_exchange,
            correlation_id=correlation_id,
        )
    else:
        await broker.publish(
            {
                "message": "Could not fetch object",
            },
            routing_key=f"error.{name}",
            exchange=global_container.account_exchange,
            correlation_id=correlation_id,
        )

schema

get_async_api_schema

get_async_api_schema() -> Schema

Returns the async api schema for cattle_grid Account processing

Source code in cattle_grid/account/router/schema.py
def get_async_api_schema() -> Schema:
    """Returns the async api schema for cattle_grid Account processing"""
    from faststream.asyncapi import get_app_schema

    app = get_mock_faststream_app()

    return get_app_schema(app)

get_mock_faststream_app

get_mock_faststream_app() -> FastStream

Creates a mock faststream app for Account processing

Source code in cattle_grid/account/router/schema.py
def get_mock_faststream_app() -> FastStream:
    """Creates a mock faststream app for Account processing"""

    from faststream.rabbit import RabbitBroker

    broker = RabbitBroker()
    broker.include_router(create_router())

    broker.publisher(
        "incoming",
        title="receive.NAME.incoming",
        schema=EventInformation,
        description="""Incoming messages from the Fediverse""",
    )

    broker.publisher(
        "outgoing",
        title="receive.NAME.outgoing",
        schema=EventInformation,
        description="""Messages being sent towards the Fediverse""",
    )

    return FastStream(
        broker,
        title="cattle_grid Cattle Drive Implementation",
        version=__version__,
        description="Illustrates how cattle grid processes ActivityPub",
    )

trigger

handle_trigger async

handle_trigger(
    msg: TriggerMessage,
    actor: ActorForAccountFromMessage,
    correlation_id: CorrelationId,
    method: MethodFromRoutingKey,
    broker: RabbitBroker = Context(),
)

Used to trigger a method performed by the actor.

The main thing an actor can do is send activities to the Fediverse. This can be done with send_message. This can be extended in cattle_grid through extensions.

However the methods to update the actor profile and delete the actor are also called via a trigger.

Source code in cattle_grid/account/router/trigger.py
async def handle_trigger(
    msg: TriggerMessage,
    actor: ActorForAccountFromMessage,
    correlation_id: CorrelationId,
    method: MethodFromRoutingKey,
    broker: RabbitBroker = Context(),
):
    """Used to trigger a method performed by the actor.

    The main thing an actor can do is send activities to
    the Fediverse. This can be done with `send_message`.
    This can be extended in cattle_grid through extensions.

    However the methods to update the actor profile and delete
    the actor are also called via a trigger.
    """

    group_names = await group_names_for_actor(actor)
    rewrite_rules = global_container.config.get("rewrite")  # type: ignore

    if rewrite_rules:
        for group_name in group_names:
            if group_name in rewrite_rules:
                rewrite_rule = rewrite_rules[group_name]

                if method in rewrite_rule:
                    method = rewrite_rule[method]
                    break

    await broker.publish(
        msg,
        routing_key=method,
        exchange=global_container.exchange,
        correlation_id=correlation_id,
    )