Skip to content

cattle_grid.account.processing

cattle_grid.account.processing

The exchanges used by cattle_grid are using routing keys to make processing easier. The cattle_grid gateway takes these messages and readdresses them with routing keys based on an user. Here an user can have multiple actors.

Furthermore, convenience methods are provided to manage users and actors through a HTTP Api. This is in difference to interacting with the Fediverse, which is done through a message queue.

create_gateway_router

create_gateway_router() -> RabbitRouter

Creates a router that moves messages to be routed by user.

Source code in cattle_grid/account/processing/__init__.py
def create_gateway_router() -> RabbitRouter:
    """Creates a router that moves messages to be routed by user."""
    return create_router()

annotations

AccountFromRoutingKey module-attribute

AccountFromRoutingKey = Annotated[Account, Depends(account)]

Returns the account from the routing key

ActorForAccountFromMessage module-attribute

ActorForAccountFromMessage = Annotated[
    ActorForAccount, Depends(actor_for_account_from_account)
]

The actor provided in the send message

ActorFromMessage module-attribute

ActorFromMessage = Annotated[Actor, Depends(actor)]

The actor provided in the send message

MethodFromRoutingKey module-attribute

MethodFromRoutingKey = Annotated[
    str, Depends(method_from_routing_key)
]

Returns the method of a trigger message

ResponderClass dataclass

ResponderClass(name: typing.Annotated[str, Dependant(name_from_routing_key)], publisher: Annotated[Callable, Dependant(AccountExchangePublisherClass)], reply_to: str | None = Context(required=True, cast=False))

Parameters:

Name Type Description Default
name str
required
publisher Callable
required
reply_to str | None
Context(required=True, cast=False)
Source code in cattle_grid/account/processing/annotations.py
@dataclass
class ResponderClass:
    name: AccountName
    publisher: AccountExchangePublisher
    reply_to: str | None = Context("message.reply_to")

    async def respond(self, method: str, response):
        if self.reply_to:
            return response
        await self.publisher(
            response,
            routing_key=f"receive.{self.name}.response.{method}",
        )

    async def error(self):
        if self.reply_to:
            return {}
        await self.publisher(
            {"error": "Something went wrong"},
            routing_key=f"error.{self.name}",
        )

method_from_routing_key

method_from_routing_key(
    name: AccountName, routing_key: RoutingKey
) -> str

Extracts the method from the routing key

>>> method_from_routing_key("alice", "send.alice.trigger.method.first")
'method.first'
Source code in cattle_grid/account/processing/annotations.py
def method_from_routing_key(
    name: AccountName,
    routing_key: RoutingKey,
) -> str:
    """
    Extracts the method from the routing key

    ```pycon
    >>> method_from_routing_key("alice", "send.alice.trigger.method.first")
    'method.first'

    ```
    """
    start_string = f"send.{name}.trigger."
    if routing_key.startswith(start_string):
        return routing_key.removeprefix(start_string)
    else:
        raise ValueError("Invalid routing key for trigger")

info

cattle_drive_version

cattle_drive_version()

Gives the current cattle drive version

>>> print(cattle_drive_version().model_dump_json(indent=2))
{
  "name": "CattleDrive",
  "version": "0.1.1"
}
Source code in cattle_grid/account/processing/info.py
def cattle_drive_version():
    """
    Gives the current cattle drive version

    ```python
    >>> print(cattle_drive_version().model_dump_json(indent=2))
    {
      "name": "CattleDrive",
      "version": "0.1.1"
    }

    ```
    """
    return NameAndVersion(name="CattleDrive", version="0.1.1")

router

create_actor_handler async

create_actor_handler(
    create_message: CreateActorRequest,
    account: AccountFromRoutingKey,
    session: CommittingSession,
    responder: Responder,
) -> None

Creates an actor associated with the account.

Updating and deleting actors is done through trigger events.

Source code in cattle_grid/account/processing/router.py
async def create_actor_handler(
    create_message: CreateActorRequest,
    account: AccountFromRoutingKey,
    session: CommittingSession,
    responder: Responder,
) -> None:
    """Creates an actor associated with the account.

    Updating and deleting actors is done through trigger events."""

    if not await can_create_actor_at_base_url(
        session, account, create_message.base_url
    ):
        raise ValueError(f"Base URL {create_message.base_url} not in allowed base urls")

    actor = await create_actor(
        session,
        create_message.base_url,
        preferred_username=create_message.preferred_username,
        profile=create_message.profile,
    )

    session.add(
        ActorForAccount(
            account=account,
            actor=actor.actor_id,
            name=create_message.name or "from drive",
        )
    )

    if create_message.automatically_accept_followers:
        actor.automatically_accept_followers = True

    result = actor_to_object(actor)

    await session.refresh(account)

    logger.info("Created actor %s for %s", actor.actor_id, account.name)

    return await responder.respond("create_actor", result)

handle_fetch async

handle_fetch(
    msg: FetchMessage,
    actor: ActorFromMessage,
    internal_requester: InternalExchangeRequester,
    responder: Responder,
)

Used to retrieve an object

Source code in cattle_grid/account/processing/router.py
async def handle_fetch(
    msg: FetchMessage,
    actor: ActorFromMessage,
    internal_requester: InternalExchangeRequester,
    responder: Responder,
):
    """Used to retrieve an object"""

    try:
        async with asyncio.timeout(0.5):
            result = await internal_requester(
                {"actor": actor.actor_id, "uri": msg.uri},
                routing_key="fetch_object",
            )
        logger.info("GOT result %s", result)
    except TimeoutError as e:
        logger.error("Request ran into timeout %s", e)
        result = None

    if not result:
        return await responder.error()

    response = FetchResponse(
        uri=msg.uri,
        actor=actor.actor_id,
        data=result,
    )
    return await responder.respond("fetch", response)

schema

get_async_api_schema

get_async_api_schema() -> AsyncAPI

Returns the async api schema for cattle_grid Account processing

Source code in cattle_grid/account/processing/schema.py
def get_async_api_schema() -> AsyncAPI:
    """Returns the async api schema for cattle_grid Account processing"""

    from faststream.rabbit import RabbitBroker

    broker = RabbitBroker()
    broker.include_router(create_router())

    broker.publisher(
        "incoming",
        title="receive.NAME.incoming",
        schema=EventInformation,
        description="""Incoming messages from the Fediverse""",
    )

    broker.publisher(
        "outgoing",
        title="receive.NAME.outgoing",
        schema=EventInformation,
        description="""Messages being sent towards the Fediverse""",
    )

    return AsyncAPI(
        broker,
        title="cattle_grid Cattle Drive Implementation",
        version=__version__,
        description="Illustrates how cattle grid processes ActivityPub",
    )

trigger

handle_trigger async

handle_trigger(
    msg: TriggerMessage,
    actor: ActorForAccountFromMessage,
    method: MethodFromRoutingKey,
    session: SqlSession,
    rewrite_rules: RewriteRules,
    publisher: ActivityExchangePublisher,
)

Used to trigger a method performed by the actor.

The main thing an actor can do is send activities to the Fediverse. This can be done with send_message. This can be extended in cattle_grid through extensions.

However the methods to update the actor profile and delete the actor are also called via a trigger.

Source code in cattle_grid/account/processing/trigger.py
async def handle_trigger(
    msg: TriggerMessage,
    actor: ActorForAccountFromMessage,
    method: MethodFromRoutingKey,
    session: SqlSession,
    rewrite_rules: RewriteRules,
    publisher: ActivityExchangePublisher,
):
    """Used to trigger a method performed by the actor.

    The main thing an actor can do is send activities to
    the Fediverse. This can be done with `send_message`.
    This can be extended in cattle_grid through extensions.

    However the methods to update the actor profile and delete
    the actor are also called via a trigger.
    """

    if rewrite_rules:
        group_names = await group_names_for_actor(session, actor)
        method = rewrite_rules.rewrite(method, group_names)

    await publisher(msg, routing_key=method)