Extensions¶
cattle_grid is not meant as one size fits it all Fediverse application, but something customizable.
Features¶
Extensions allow you to
-
You got incoming data. Transformers allow you to process the data and either perform actions, or pass the result on to subsequent steps.
The information generated by transformers is passed to all subsequent steps.
-
This allows you to perform an action on receiving an activity, or someone triggering a method.
-
Lookup
Adjust how content is looked up. This can enable caching or allow you to lookup non https objects.
-
Allows you to answer HTTP requests using methods provided by cattle_grid, e.g. check that the actor should access this object.
Examples¶
-
simple_storage provides the capability to use the methods
publish_activityandpublish_objectto publish ActivityPub activities and objects to the Fediverse. This entails adding an id, storing them, and then sending to the appropriate recipients for an activity. For an object, it is wrapped with aCreateactivity before being send topublish_activity. This extension has the caveat of not providing a HTML version of the objects. This is achieved by. -
html display provides a version of
publish_objectthat generates HTML versions. See https://dev.bovine.social/@release for a demo.
You can find more sample extensions on the right under Example Extensions.
-
muck_out is a transformer type extension. It takes the raw objects coming from the Fediverse, and turns them into opinionated normalized versions of themselves. It also provides access methods for objects.
-
moo illustrates how to build a simple thin just relying on a single activity.
Using extensions¶
Installing extensions¶
One can install extensions via pypi, or read them as a file package, e.g.
to install muck_out. Then one needs to tell cattle_grid to load them. This is done via the configuration, e.g.
in this case.
Configuring extensions¶
Extensions are loaded with the load_extension method. There are 4 values being looked at:
module_namespecifies the module the extension is looked atconfigis a dictionary that is passed to the extension as configuration parameterslookup_orderwhen the extension is used to lookup an object.api_prefixif your extension defines API endpoints, this prefix will be used.
Example
[[extensions]]
module_name = "your.extension"
config = { var = 1}
lookup_order = 2
api_prefix = "/your/api"
Writing an extension¶
The basic implementation will be
By writing something as a cattle_grid extension, you can first through the lookup and transform method influence cattle_grid’s behavior to e.g.
- serve archived activities (e.g. from a migrated account)
- add information to activities, e.g. label them
Writing a configuration¶
The config object of an extension is a pydantic model, e.g.
from pydantic import BaseModel, Field
class MyExtensionConfiguration(BaseModel):
key: str = Field(default="value")
One registers this call to an extension via
extension = Extension(
name="my_extension",
module=__name__,
config_class=MyExtensionConfiguration
)
Then one can use it in a function via injection
@extension.subscribe("incoming.*")
async def print_config_value(config: extension.Config):
print(config.key)
When configured via
the extension should now print other whenever an incoming
message is processed.
Serving content¶
By adding
one can use a cattle_grid extension as one would use a FastAPI router. By using the dependency injection, one can access various object. (See FIXME) For example, to access the database, one would use
from cattle_grid.dependencies.fastapi import SqlSession
@extension.get("/path/{parameter}")
async def serve_content(parameter, session: SqlSession):
await session.scalar(select(MyModel.parameter == parameter))
return serialize_to_pydantic(MyModel)
Serving ActivityPub¶
Serving ActivityPub content isn’t as simple as just dropping a JSON somewhere. First, one needs to ensure that one sets the correct media type, we can do this with the ActivityResponese response class
from cattle_grid.tools.fastapi import ActivityResponse,
@extension.get("/activity_pub", response_class=ActivityResponse)
async def serve_something():
return activity_pub_object
This is nice, but misses the authorization aspects. To do this, we need two additional helpers. is_valid_requester_for_obj allows checking if the requester should have access to the object. ActivityPubHeaders contain the authentication information gathered by cattle_grid.auth.
With these components, we are ready to write the code
from cattle_grid.activity_pub import is_valid_requester_for_obj
from cattle_grid.tools.fastapi import ActivityPubHeaders, ActivityResponse
@extension.get("/activity_pub", response_class=ActivityResponse)
async def serve_something(
ap_headers: ActivityPubHeaders,
session: SqlSession,
):
if not ap_headers.x_cattle_grid_requester:
raise HTTPException(401)
if not await is_valid_requester_for_obj(
session, ap_headers.x_cattle_grid_requester, activity_pub_object
):
raise HTTPException(401)
return activity_pub_object
For public objects, this ensures that the request has a valid signature, and is not being blocked by the owner of the object. For private objects, it is checked that the requester is in the recipients of the object.
Content Negotiation¶
Fediverse content often exists in two version: human readable HTML and ActivityPub JSON. It is often useful to be able to distinguish between these two. cattle_grid offers the cattle_grid.tools.fastapi.ShouldServe annotation to help with this. The next example shows how to redirect to the ActivityPub object.
from cattle_grid.tools.fastapi import ShouldServe, ContentType
@extension.get("/html")
async def serve_html(should_serve: ShouldServe):
if (
ContentType.html not in should_serve
and ContentType.activity_pub in should_serve
):
return RedirectResponse(activity_pub_id)
return "<html>...</html>"
Testing¶
One can obtain a TestClient via
from fastapi.testclient import TestClient
@pytest.fixture
def test_client():
app = FastAPI()
app.include_router(extension.api_router)
return TestClient(app)
then proceed to write tests as usual, e.g.
test_client.get("/url").
Creating new methods¶
By defining a subscriber
you can create a subscribtion to the ActivityExchange. The subscribtion should either be
a method name defined by your extension or a subscribtion
on a topic for processing incoming or outgoing messages, i.e.
incoming.* or outgoing.*, see processing activities
Dependencies¶
In these methods one can use the dependencies provided by cattle_grid.dependencies and cattle_grid.dependencies.processing. For example to modify the database use
from cattle_grid.dependencies import CommittingSession
@extension.subsribe("method_name")
async def my_committing_subscriber(message: dict, session: CommittingSession):
session.add(SqlAlchemyObject(message=message))
The session will be committed automatically. Similarly you can use
from cattle_grid.dependencies import ActivityExchangePublisher
from cattle_grid.dependencies.processing import FactoriesForActor
from cattle_grid.model import ActivityMessage
@extension.subsribe("method_name")
async def my_committing_subscriber(
message: dict,
publisher: ActivityExchangePublisher,
factories: FactoriesForActor
):
await publisher(
ActivityMessage(
actor=message.get("actor"),
data=factories[0].like("http://server.example/object/id").build()
),
routing_key="send_message",
)
to publish a like activity. We note that the convention for the factories is
Method Information¶
Subscribers to a method are automatically added to method information. The description is either the docstring or can be specified by adding a description argument, i.e.
@extension.subscribe("method_name", description="my description")
async def my_subscriber(msg: dict):
...
# or
@extension.subscribe("method_name")
async def my_subscriber(msg: dict):
"""My description"""
...
The description passed as an argument takes precedence.
Testing¶
One can unit test a subscriber by just importing and calling it, e.g.
If you wish to test using the TestRabbitBroker following the faststream guide, then one can use with_test_broker_for_extension. For this one can define the broker as a fixture
import pytest
from unittest.mock import AsyncMock
from cattle_grid.extensions.testing import with_test_broker_for_extension
from cattle_grid.testing.fixtures import * # type: ignore # noqa: F403
from . import extension
@pytest.fixture
async def send_message_mock():
yield AsyncMock()
@pytest.fixture
async def test_broker(send_message_mock):
extension.configure({"var": "value"})
async with with_test_broker_for_extension(
[extension], {"send_message": send_message_mock}
) as tbr:
yield tbr
and then write a test as
from cattle_grid.app import app_globals
async def test_message_send(test_broker, send_message_mock):
await broker.publish(
{"my": "message"},
routing_key="my_routing_key",
exchange=app_globals.activity_exchange
)
send_message_mock.assert_awaited_once()
Processing Activities¶
By defining subscriptions to the routing keys incoming.* or outgoing.*
one can process either all incoming and outgoing activities. By subscribing
to a specific activity type, e.g. incoming.Create one can subscribe to only
one type of activities.
from cattle_grid.model import ActivityMessage
@extension.subscribe("incoming.Create")
async def incoming_create(message: ActivityMessage):
raw_data = message.data["raw"]
...
The message has passed through the transformer at this point. This means that
the data attribute of the ActivityMessage
has the form
where the ... are provided by transformers. This means that with the
muck_out extension and its cattle_grid magic,
one can write:
from muck_out.cattle_grid import ParsedEmbeddedObject
@extension.subscribe("incoming.Create")
async def incoming_create(obj: ParsedEmbeddedObject):
if not obj.in_reply_to:
return
handle_reply()
to write code that only runs in case the message is a reply.
Handling actor deletions¶
It might be that you want to run some clean up, when an actor deleted by cattle_grid is being deleted. This can be done with the following code snippet.
from cattle_grid.activity_pub.activity import actor_deletes_themselves
@extension.subscribe("outgoing.Delete")
async def outgoing_delete(message: ActivityMessage):
if not actor_deletes_themselves(message.data.get("raw")):
return
# handle delete here
Transformers¶
Transformers are applied when shovelling activities from the internal to the activity and account exchanges.
flowchart LR
I -->|transformer| A1
I -->|transformer| A2
I["Internal Exchange"]
A1["Account Exchange"]
A2["Account Exchange"]
That means all activities on the incoming.* or outgoing.* routing
keys are affected by the transformer. Similarly all activities for the
receive.NAME.incoming and receive.NAME.outgoing in the Cattle Drive Protocol.
How transformers work¶
Transformer functions take the form
@extension.transform(inputs=["one", "two"], outputs=["three"])
async def transform(data: dict[str, dict]) -> dict[str, dict]:
...
where the inputs specify the keys that are assumed to exist
on data, e.g.
and outputs specifies the keys that will be added
Doing nothing¶
The case of the transformer not doing anything can be handled as follows.
@extension.transform(inputs=["one", "two"], outputs=["three"])
async def transform(data: dict[str, dict]) -> dict[str, dict]:
if transformer_does_nothing(data):
return {"three": {}}
...
Testing transformers¶
To test a transformer, I recommend creating a pytest fixture that builds a transformer, e.g.
import pytest
from cattle_grid.extensions.load import build_transformer
from . import extension
@pytest.fixture
def transformer():
return build_transformer([extension])
The reason for doing so is that it takes care of injecting possible dependencies of your transformer. Furthermore, if your transformer depends on other extensions, you can include them in the list of extensions. A test can then look like
async def test_transformer(transformer):
result = await transformer({"raw": some_data})
assert result == expected
Database access¶
cattle_grid uses SQLAlchemy. In this section, we discuss how you can use this from your extension.
Definition models¶
We define ORM models as follows. Subsequent database objects inherit from Base
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(AsyncAttrs, DeclarativeBase): ...
class MyModel(Base):
key: Mapped[str] = maped_column
Creating the database objects¶
If you wish to create the database objects on startup, you can use lifespan_for_sql_alchemy_base_class to achieve this like
from cattle_grid.extensions.util import lifespan_for_sql_alchemy_base_class
from .models import Base
extension = Extension(
name="database example",
module=__name__,
lifespan=lifespan_for_sql_alchemy_base_class(Base))
Accessing the SQL session¶
cattle_grid uses FastDepends to inject dependencies, or the FastAPI equivalent. This means you can access a AsyncSession by including
from the processing code, and
from the API code.
Testing¶
In order to test with pytest, one can now use
from cattle_grid.testing.fixtures import * # noqa
from cattle_grid.extensions.util import lifespan_for_sql_alchemy_base_class
from .models import Base
@pytest.fixture(autouse=True)
async def create_tables(sql_engine_for_tests):
lifespan = lifespan_for_sql_alchemy_base_class(Base)
async with lifespan(sql_engine_for_tests):
yield
Then the fixture sql_engine_for_tests creates an sql_engine. And the
create_tables fixtures creates the tables. The wildcard import is necessary
to import the dependent fixtures.
Running extensions¶
In order to test extensions, one might want to run these using a separate process. This can be achieved by running
See here for further details on this command.
Tip
To run in your host environment change the port with --port 8000.
Warning
This only works for processing and API extensions. Transformation and lookup extensions are called by cattle_grid directly.
We note here that the configuration will be loaded through the same mechanism as cattle_grid does. This is in particular relevant for accessing the database and the RabbitMQ router.