Skip to content

cattle_grid.account.processing

cattle_grid.account.processing

The exchanges used by cattle_grid are using routing keys to make processing easier. On the cattle_ground account router messages are also addressed by account. An account consists of a grouping of multiple actors.

create_account_router

create_account_router() -> RabbitRouter

Creates a router that moves messages to be routed by user.

Source code in cattle_grid/account/processing/__init__.py
def create_account_router() -> RabbitRouter:
    """Creates a router that moves messages to be routed by user."""
    return create_router()

annotations

AccountFromRoutingKey module-attribute

AccountFromRoutingKey = Annotated[Account, Depends(account)]

Returns the account from the routing key

ActorForAccountFromMessage module-attribute

ActorForAccountFromMessage = Annotated[
    ActorForAccount, Depends(actor_for_account_from_account)
]

The actor provided in the send message

ActorFromMessage module-attribute

ActorFromMessage = Annotated[Actor, Depends(actor)]

The actor provided in the send message

MethodFromRoutingKey module-attribute

MethodFromRoutingKey = Annotated[
    str, Depends(method_from_routing_key)
]

Returns the method of a trigger message

ResponderClass dataclass

ResponderClass(name: typing.Annotated[str, Dependant(name_from_routing_key)], publisher: Annotated[Callable, Dependant(AccountExchangePublisherClass)], reply_to: str | None = Context(required=True, cast=False))

Parameters:

Name Type Description Default
name str
required
publisher Callable
required
reply_to str | None
Context(required=True, cast=False)
Source code in cattle_grid/account/processing/annotations.py
@dataclass
class ResponderClass:
    name: AccountName
    publisher: AccountExchangePublisher
    reply_to: str | None = Context("message.reply_to")

    async def respond(self, method: str, response):
        if self.reply_to:
            return response
        await self.publisher(
            response,
            routing_key=f"receive.{self.name}.response.{method}",
        )

    async def error(self):
        if self.reply_to:
            return {}
        await self.publisher(
            {"error": "Something went wrong"},
            routing_key=f"error.{self.name}",
        )

method_from_routing_key

method_from_routing_key(
    name: AccountName, routing_key: RoutingKey
) -> str

Extracts the method from the routing key

>>> method_from_routing_key("alice", "send.alice.trigger.method.first")
'method.first'
Source code in cattle_grid/account/processing/annotations.py
def method_from_routing_key(
    name: AccountName,
    routing_key: RoutingKey,
) -> str:
    """
    Extracts the method from the routing key

    ```pycon
    >>> method_from_routing_key("alice", "send.alice.trigger.method.first")
    'method.first'

    ```
    """
    start_string = f"send.{name}.trigger."
    if routing_key.startswith(start_string):
        return routing_key.removeprefix(start_string)
    else:
        raise ValueError("Invalid routing key for trigger")

history

Handles creating the history for the actor

info

cattle_drive_version

cattle_drive_version()

Gives the current cattle drive version

>>> print(cattle_drive_version().model_dump_json(indent=2))
{
  "name": "CattleDrive",
  "version": "0.1.1"
}
Source code in cattle_grid/account/processing/info.py
def cattle_drive_version():
    """
    Gives the current cattle drive version

    ```python
    >>> print(cattle_drive_version().model_dump_json(indent=2))
    {
      "name": "CattleDrive",
      "version": "0.1.1"
    }

    ```
    """
    return NameAndVersion(name="CattleDrive", version="0.1.1")

router

create_actor_handler async

create_actor_handler(
    message: CreateActorRequest,
    account: AccountFromRoutingKey,
    session: CommittingSession,
    responder: Responder,
)

Creates an actor associated with the account.

Updating and deleting actors is done through trigger events.

Source code in cattle_grid/account/processing/router.py
async def create_actor_handler(
    message: CreateActorRequest,
    account: AccountFromRoutingKey,
    session: CommittingSession,
    responder: Responder,
):
    """Creates an actor associated with the account.

    Updating and deleting actors is done through trigger events."""

    if not await can_create_actor_at_base_url(session, account, message.base_url):
        raise ValueError(f"Base URL {message.base_url} not in allowed base urls")

    actor = await create_actor(
        session,
        message.base_url,
        preferred_username=message.preferred_username,
        profile=message.profile,
    )

    session.add(
        ActorForAccount(
            account=account,
            actor=actor.actor_id,
            name=message.name or "from drive",
        )
    )

    if message.automatically_accept_followers:
        actor.automatically_accept_followers = True

    result = actor_to_object(actor)

    await session.refresh(account)

    logger.info("Created actor %s for %s", actor.actor_id, account.name)

    return await responder.respond("create_actor", result)

handle_fetch async

handle_fetch(
    msg: FetchMessage,
    actor: ActorFromMessage,
    internal_requester: InternalExchangeRequester,
    responder: Responder,
)

Used to retrieve an object

Source code in cattle_grid/account/processing/router.py
async def handle_fetch(
    msg: FetchMessage,
    actor: ActorFromMessage,
    internal_requester: InternalExchangeRequester,
    responder: Responder,
):
    """Used to retrieve an object"""

    try:
        async with asyncio.timeout(0.5):
            result = await internal_requester(
                {"actor": actor.actor_id, "uri": msg.uri},
                routing_key="fetch_object",
            )
        logger.info("GOT result %s", result)
    except TimeoutError as e:
        logger.error("Request ran into timeout %s", e)
        result = None

    if not result:
        return await responder.error()

    response = FetchResponse(
        uri=msg.uri,
        actor=actor.actor_id,
        data=result,
    )
    return await responder.respond("fetch", response)

schema

get_async_api_schema

get_async_api_schema() -> AsyncAPI

Returns the async api schema for cattle_grid Account processing

Source code in cattle_grid/account/processing/schema.py
def get_async_api_schema() -> AsyncAPI:
    """Returns the async api schema for cattle_grid Account processing"""

    from faststream.rabbit import RabbitBroker

    broker = RabbitBroker()
    broker.include_router(create_router(for_async_api=True))

    broker.publisher(
        "incoming",
        title="receive.NAME.incoming.ACTIVITY_TYPE",
        schema=EventInformation,
        description="""Incoming messages from the Fediverse""",
    )

    broker.publisher(
        "outgoing",
        title="receive.NAME.outgoing.ACTIVITY_TYPE",
        schema=EventInformation,
        description="""Messages being sent towards the Fediverse""",
    )

    return AsyncAPI(
        broker,
        title="cattle_grid Cattle Drive Implementation",
        version=__version__,
        description="Illustrates how cattle grid processes ActivityPub",
    )

testing

account_for_test async

account_for_test(sql_session) -> Account

Fixture to create an account

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture
async def account_for_test(sql_session) -> Account:
    """Fixture to create an account"""
    result = await create_account(sql_session, "alice", "alice", permissions=["admin"])
    assert result
    return result

actor_for_test async

actor_for_test(sql_session) -> Actor

Fixture to create an actor

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture
async def actor_for_test(sql_session) -> Actor:
    """Fixture to create an actor"""
    actor = await create_actor(sql_session, "http://localhost/ap")

    return actor

actor_with_account async

actor_with_account(sql_session, account_for_test) -> Actor

Fixture to create an actor with an account

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture
async def actor_with_account(sql_session, account_for_test) -> Actor:
    """Fixture to create an actor with an account"""
    actor = await create_actor(
        sql_session, "http://localhost/ap", preferred_username="test_actor"
    )
    await add_actor_to_account(
        sql_session, account_for_test, actor, name="test_fixture"
    )

    await sql_session.refresh(actor)

    return actor

loaded_config

loaded_config()

Ensures the configuration variables are loaded

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture(autouse=True, scope="session")
def loaded_config():
    """Ensures the configuration variables are loaded"""
    global_container.load_config()

sql_engine_for_tests async

sql_engine_for_tests()

Provides the sql engine (as in memory sqlite) for tests

This fixture has autouse=True, meaning that by importing

from cattle_grid.testing.fixtures import sql_engine_for_tests

it will run automatically. The engine is initialized in the place cattle_grid expects it.

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture(autouse=True)
async def sql_engine_for_tests():
    """Provides the sql engine (as in memory sqlite) for tests

    This fixture has autouse=True, meaning that by importing

    ```python
    from cattle_grid.testing.fixtures import sql_engine_for_tests
    ```

    it will run automatically. The engine is initialized in the
    place cattle_grid expects it.
    """
    async with alchemy_database("sqlite+aiosqlite:///:memory:", echo=False) as engine:
        async with engine.begin() as conn:
            await conn.run_sync(APBase.metadata.create_all)

        yield engine

sql_session async

sql_session(session_maker_for_tests)

Returns an AsyncSession to be used by tests

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture()
async def sql_session(session_maker_for_tests):
    """Returns an [AsyncSession][sqlalchemy.ext.asyncio.AsyncSession] to be used by tests"""
    async with session_maker_for_tests() as session:
        yield session

trigger

handle_trigger async

handle_trigger(
    msg: TriggerMessage,
    actor: ActorForAccountFromMessage,
    method: MethodFromRoutingKey,
    session: SqlSession,
    rewrite_rules: RewriteRules,
    publisher: ActivityExchangePublisher,
)

Used to trigger a method performed by the actor.

The main thing an actor can do is send activities to the Fediverse. This can be done with send_message. This can be extended in cattle_grid through extensions.

However the methods to update the actor profile and delete the actor are also called via a trigger.

Source code in cattle_grid/account/processing/trigger.py
async def handle_trigger(
    msg: TriggerMessage,
    actor: ActorForAccountFromMessage,
    method: MethodFromRoutingKey,
    session: SqlSession,
    rewrite_rules: RewriteRules,
    publisher: ActivityExchangePublisher,
):
    """Used to trigger a method performed by the actor.

    The main thing an actor can do is send activities to
    the Fediverse. This can be done with `send_message`.
    This can be extended in cattle_grid through extensions.

    However the methods to update the actor profile and delete
    the actor are also called via a trigger.
    """

    if rewrite_rules:
        group_names = await group_names_for_actor(session, actor)
        method = rewrite_rules.rewrite(method, group_names)

    await publisher(msg, routing_key=method)