Skip to content

Extensions

cattle_grid is not meant as one size fits it all Fediverse application, but something customizable.

Features

Extensions allow you to

  • Transform


    You got incoming data. Transformers allow you to process the data and either perform actions, or pass the result on to subsequent steps.

    The information generated by transformers is passed to all subsequent steps.

  • Process


    This allows you to perform an action on receiving an activity, or someone triggering a method.

  • Lookup


    Adjust how content is looked up. This can enable caching or allow you to lookup non https objects.

  • Serve Content


    Allows you to answer HTTP requests using methods provided by cattle_grid, e.g. check that the actor should access this object.

Examples

  • simple_storage provides the capability to use the methods publish_activity and publish_object to publish ActivityPub activities and objects to the Fediverse. This entails adding an id, storing them, and then sending to the appropriate recipients for an activity. For an object, it is wrapped with a Create activity before being send to publish_activity. This extension has the caveat of not providing a HTML version of the objects. This is achieved by.

  • html display provides a version of publish_object that generates HTML versions. See https://dev.bovine.social/@release for a demo.

You can find more sample extensions on the right under Example Extensions.

  • muck_out is a transformer type extension. It takes the raw objects coming from the Fediverse, and turns them into opinionated normalized versions of themselves. It also provides access methods for objects.

  • moo illustrates how to build a simple thin just relying on a single activity.

Using extensions

Installing extensions

One can install extensions via pypi, or read them as a file package, e.g.

pip install muck_out[cattle_grid]

to install muck_out. Then one needs to tell cattle_grid to load them. This is done via the configuration, e.g.

config/extensions.toml
[[extensions]]
module = "muck_out.extension"

in this case.

Configuring extensions

Extensions are loaded with the load_extension method. There are 4 values being looked at:

  • module_name specifies the module the extension is looked at
  • config is a dictionary that is passed to the extension as configuration parameters
  • lookup_order when the extension is used to lookup an object.
  • api_prefix if your extension defines API endpoints, this prefix will be used.

Example

config/extensions.toml
[[extensions]]

module_name = "your.extension"
config = { var = 1}

lookup_order = 2
api_prefix = "/your/api"

Writing an extension

The basic implementation will be

from cattle_grid.extensions import Extension

extension = Extension("some name", __name__)

...

By writing something as a cattle_grid extension, you can first through the lookup and transform method influence cattle_grid’s behavior to e.g.

  • serve archived activities (e.g. from a migrated account)
  • add information to activities, e.g. label them

Writing a configuration

The config object of an extension is a pydantic model, e.g.

from pydantic import BaseModel, Field

class MyExtensionConfiguration(BaseModel):
    key: str = Field(default="value")

One registers this call to an extension via

extension = Extension(
    name="my_extension",
    module=__name__,
    config_class=MyExtensionConfiguration
)

Then one can use it in a function via injection

@extension.subscribe("incoming.*")
async def print_config_value(config: extension.Config):
    print(config.key)

When configured via

config/extensions.toml
[[extensions]]
module_name = "my.extension"
config = { key = "other" }

the extension should now print other whenever an incoming message is processed.

Serving content

By adding

@extension.get("/path/{parameter}")
async def serve_content(parameter):
    return {}

one can use a cattle_grid extension as one would use a FastAPI router. By using the dependency injection, one can access various object. (See FIXME) For example, to access the database, one would use

from cattle_grid.dependencies.fastapi import SqlSession

@extension.get("/path/{parameter}")
async def serve_content(parameter, session: SqlSession):
    await session.scalar(select(MyModel.parameter == parameter))

    return serialize_to_pydantic(MyModel)

Serving ActivityPub

Serving ActivityPub content isn’t as simple as just dropping a JSON somewhere. First, one needs to ensure that one sets the correct media type, we can do this with the ActivityResponese response class

from cattle_grid.tools.fastapi import ActivityResponse,

@extension.get("/activity_pub", response_class=ActivityResponse)
async def serve_something():
    return activity_pub_object

This is nice, but misses the authorization aspects. To do this, we need two additional helpers. is_valid_requester_for_obj allows checking if the requester should have access to the object. ActivityPubHeaders contain the authentication information gathered by cattle_grid.auth.

With these components, we are ready to write the code

from cattle_grid.activity_pub import is_valid_requester_for_obj
from cattle_grid.tools.fastapi import ActivityPubHeaders, ActivityResponse

@extension.get("/activity_pub", response_class=ActivityResponse)
async def serve_something(    
    ap_headers: ActivityPubHeaders,
    session: SqlSession,
):
    if not ap_headers.x_cattle_grid_requester:
        raise HTTPException(401)

    if not await is_valid_requester_for_obj(
        session, ap_headers.x_cattle_grid_requester, activity_pub_object
    ):
        raise HTTPException(401)

    return activity_pub_object

For public objects, this ensures that the request has a valid signature, and is not being blocked by the owner of the object. For private objects, it is checked that the requester is in the recipients of the object.

Content Negotiation

Fediverse content often exists in two version: human readable HTML and ActivityPub JSON. It is often useful to be able to distinguish between these two. cattle_grid offers the cattle_grid.tools.fastapi.ShouldServe annotation to help with this. The next example shows how to redirect to the ActivityPub object.

from cattle_grid.tools.fastapi import ShouldServe, ContentType

@extension.get("/html")
async def serve_html(should_serve: ShouldServe):
    if (
        ContentType.html not in should_serve
        and ContentType.activity_pub in should_serve
    ):
        return RedirectResponse(activity_pub_id)

    return "<html>...</html>"

Testing

One can obtain a TestClient via

from fastapi.testclient import TestClient

@pytest.fixture
def test_client():
    app = FastAPI()
    app.include_router(extension.api_router)

    return TestClient(app)

then proceed to write tests as usual, e.g. test_client.get("/url").

Creating new methods

By defining a subscriber

@extension.subscribe("method_name")
async def my_subscriber(message: dict):
    ...

you can create a subscribtion to the ActivityExchange. The subscribtion should either be a method name defined by your extension or a subscribtion on a topic for processing incoming or outgoing messages, i.e. incoming.* or outgoing.*, see processing activities

Dependencies

In these methods one can use the dependencies provided by cattle_grid.dependencies and cattle_grid.dependencies.processing. For example to modify the database use

from cattle_grid.dependencies import CommittingSession

@extension.subsribe("method_name")
async def my_committing_subscriber(message: dict, session: CommittingSession):
    session.add(SqlAlchemyObject(message=message))

The session will be committed automatically. Similarly you can use

from cattle_grid.dependencies import ActivityExchangePublisher
from cattle_grid.dependencies.processing import FactoriesForActor
from cattle_grid.model import ActivityMessage

@extension.subsribe("method_name")
async def my_committing_subscriber(
    message: dict, 
    publisher: ActivityExchangePublisher, 
    factories: FactoriesForActor
):
    await publisher(
        ActivityMessage(
            actor=message.get("actor"),
            data=factories[0].like("http://server.example/object/id").build()
        ),
        routing_key="send_message",
    )

to publish a like activity. We note that the convention for the factories is

activity_factory, object_factory = factories

Method Information

Subscribers to a method are automatically added to method information. The description is either the docstring or can be specified by adding a description argument, i.e.

@extension.subscribe("method_name", description="my description")
async def my_subscriber(msg: dict):
    ...

# or

@extension.subscribe("method_name")
async def my_subscriber(msg: dict):
    """My description"""
    ...

The description passed as an argument takes precedence.

Testing

One can unit test a subscriber by just importing and calling it, e.g.

from . import my_subscriber

async def test_my_subscriber():
    await my_subscriber({"some": "data"})

If you wish to test using the TestRabbitBroker following the faststream guide, then one can use with_test_broker_for_extension. For this one can define the broker as a fixture

import pytest

from unittest.mock import AsyncMock
from cattle_grid.extensions.testing import with_test_broker_for_extension
from cattle_grid.testing.fixtures import * # type: ignore  # noqa: F403

from . import extension

@pytest.fixture
async def send_message_mock():
    yield AsyncMock()

@pytest.fixture
async def test_broker(send_message_mock):
    extension.configure({"var": "value"})

    async with with_test_broker_for_extension(
        [extension], {"send_message": send_message_mock}
    ) as tbr:
        yield tbr

and then write a test as

from cattle_grid.app import app_globals

async def test_message_send(test_broker, send_message_mock):
    await broker.publish(
        {"my": "message"}, 
        routing_key="my_routing_key", 
        exchange=app_globals.activity_exchange
    )

    send_message_mock.assert_awaited_once()

Processing Activities

By defining subscriptions to the routing keys incoming.* or outgoing.* one can process either all incoming and outgoing activities. By subscribing to a specific activity type, e.g. incoming.Create one can subscribe to only one type of activities.

from cattle_grid.model import ActivityMessage

@extension.subscribe("incoming.Create")
async def incoming_create(message: ActivityMessage):
    raw_data = message.data["raw"]
    ...

The message has passed through the transformer at this point. This means that the data attribute of the ActivityMessage has the form

{
    "raw": {"original": "activity"},
    "...": "..."
}

where the ... are provided by transformers. This means that with the muck_out extension and its cattle_grid magic, one can write:

from muck_out.cattle_grid import ParsedEmbeddedObject

@extension.subscribe("incoming.Create")
async def incoming_create(obj: ParsedEmbeddedObject):
    if not obj.in_reply_to:
        return
    handle_reply()

to write code that only runs in case the message is a reply.

Handling actor deletions

It might be that you want to run some clean up, when an actor deleted by cattle_grid is being deleted. This can be done with the following code snippet.

from cattle_grid.activity_pub.activity import actor_deletes_themselves

@extension.subscribe("outgoing.Delete")
async def outgoing_delete(message: ActivityMessage):
    if not actor_deletes_themselves(message.data.get("raw")):
        return

    # handle delete here

Transformers

Transformers are applied when shovelling activities from the internal to the activity and account exchanges.

flowchart LR
    I -->|transformer| A1
    I -->|transformer| A2

I["Internal Exchange"]
A1["Account Exchange"]
A2["Account Exchange"]

That means all activities on the incoming.* or outgoing.* routing keys are affected by the transformer. Similarly all activities for the receive.NAME.incoming and receive.NAME.outgoing in the Cattle Drive Protocol.

How transformers work

Transformer functions take the form

@extension.transform(inputs=["one", "two"], outputs=["three"])
async def transform(data: dict[str, dict]) -> dict[str, dict]:
    ...

where the inputs specify the keys that are assumed to exist on data, e.g.

data = {
    "one": { ... },
    "two": { ... }
}

and outputs specifies the keys that will be added

await transform(data) = {
    "three": { ... }
}

Doing nothing

The case of the transformer not doing anything can be handled as follows.

@extension.transform(inputs=["one", "two"], outputs=["three"])
async def transform(data: dict[str, dict]) -> dict[str, dict]:
    if transformer_does_nothing(data):
        return {"three": {}}

    ...

Testing transformers

To test a transformer, I recommend creating a pytest fixture that builds a transformer, e.g.

import pytest
from cattle_grid.extensions.load import build_transformer

from . import extension

@pytest.fixture
def transformer():
    return build_transformer([extension])

The reason for doing so is that it takes care of injecting possible dependencies of your transformer. Furthermore, if your transformer depends on other extensions, you can include them in the list of extensions. A test can then look like

async def test_transformer(transformer):
    result = await transformer({"raw": some_data})
    assert result == expected

Database access

cattle_grid uses SQLAlchemy. In this section, we discuss how you can use this from your extension.

Definition models

We define ORM models as follows. Subsequent database objects inherit from Base

models.py
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(AsyncAttrs, DeclarativeBase): ...

class MyModel(Base):
    key: Mapped[str] = maped_column

Creating the database objects

If you wish to create the database objects on startup, you can use lifespan_for_sql_alchemy_base_class to achieve this like

from cattle_grid.extensions.util import lifespan_for_sql_alchemy_base_class

from .models import Base

extension = Extension(
    name="database example",
    module=__name__,
    lifespan=lifespan_for_sql_alchemy_base_class(Base))

Accessing the SQL session

cattle_grid uses FastDepends to inject dependencies, or the FastAPI equivalent. This means you can access a AsyncSession by including

from the processing code, and

from the API code.

Testing

In order to test with pytest, one can now use

from cattle_grid.testing.fixtures import *  # noqa
from cattle_grid.extensions.util import lifespan_for_sql_alchemy_base_class

from .models import Base


@pytest.fixture(autouse=True)
async def create_tables(sql_engine_for_tests):
    lifespan = lifespan_for_sql_alchemy_base_class(Base)
    async with lifespan(sql_engine_for_tests):
        yield

Then the fixture sql_engine_for_tests creates an sql_engine. And the create_tables fixtures creates the tables. The wildcard import is necessary to import the dependent fixtures.

Running extensions

In order to test extensions, one might want to run these using a separate process. This can be achieved by running

python -m cattle_grid.extensions run your.extension.module

See here for further details on this command.

Tip

To run in your host environment change the port with --port 8000.

Warning

This only works for processing and API extensions. Transformation and lookup extensions are called by cattle_grid directly.

We note here that the configuration will be loaded through the same mechanism as cattle_grid does. This is in particular relevant for accessing the database and the RabbitMQ router.