Extensions
Warning
Still experimental
Types of extensions
cattle_grid supports different types of extensions:
- Lookup extensions: Retrieves stuff
- Transform extensions: Takes data and harmonizes the format / adds information
- Processing extensions: When an activity is received or send, the extension does something
- API extensions: Provide something accessible via HTTP
Combining these types of extension is possible.
Info
I might add a transform extension for outgoing messages. Similarly, a transform extension for messages just about to be send. This would allow one to do remote instance specific transformations.
Types of subscriptions
Extensions can define new topics, and then perform an action when a message is received. These actions should be either to change the state of the extension, e.g. update its database, or send new messages. These messages should be send to existing topics. Extensions should not send to incoming.#
(as it is reserved for messages received form the Fediverse), not should they send to outgoing.#
, instead they should send to send_message
, which will ensure proper distribution to outgoing.#
.
However, extensions should subscribe to outgoing.#
and incoming.#
to process messages.
Writing an extension
The basic implementation will be
By writing something as a cattle_grid extension, you can first through the lookup and transform method influence cattle_grid’s behavior to e.g.
- serve archived activities (e.g. from a migrated account)
- add information to activities, e.g. label them
Serving content
By adding
one can use a cattle_grid extension as one would use a FastAPI router. By using the dependency injection, one can access various object. (See FIXME) For example, to access the database, one would use
from cattle_grid.dependencies.fastapi import SqlSession
@extension.get("/path/{parameter}")
async def serve_content(parameter, session: SqlSession):
await session.scalar(select(MyModel.parameter == parameter))
return serialize_to_pydantic(MyModel)
Testing
One can obtain a TestClient via
from fastapi.testclient import TestClient
@pytest.fixture
def test_client():
app = FastAPI()
app.include_router(extension.api_router)
return TestClient(app)
then proceed to write tests as usual, e.g.
test_client.get("/url")
.
Processing activities
By defining a subscriber
you can create a subscribtion to the ActivityExchange. The subscribtion should either be
a method name defined by your extension or a subscribtion
on a topic for processing incoming or outgoing messages, i.e.
incoming.*
or outgoing.*
.
Method Information
Subscribers to a method are automatically added to method information. The description is either the docstring or can be specified by adding a description argument, i.e.
@extension.subscribe("method_name", description="my description")
async def my_subscriber(msg: dict):
...
# or
@extension.subscribe("method_name")
async def my_subscriber(msg: dict):
"""My description"""
...
The description passed as an argument takes precedence.
Testing
One can unit test a subscriber by just importing and calling it, e.g.
If you wish to test using the TestRabbitBroker following the faststream guide, then one can use with_test_broker_for_extension. For this one can define the broker as a fixture
@pytest.fixture
async def send_message_mock():
yield AsyncMock()
@pytest.fixture
async def test_broker(send_message_mock):
extension.configure({"var": "value"})
async with with_test_broker_for_extension(
[extension], {"send_message": send_message_mock}
) as tbr:
yield tbr
and then write a test as
async def test_message_send(test_broker, send_message_mock):
await broker.publish(
{"my": "message"},
routing_key="my_routing_key",
exchange=exchange
)
send_message_mock.assert_awaited_once()
Initializing the database
Define your models using SQL Alchemy, e.g.
You can then obtain an sql session via
from cattle_grid.dependencies import SqlSession, CommittingSession
from cattle_grid.dependencies.fastapi import SqlSession, CommittingSession
for use with faststream or fastapi respectively, e.g.
from cattle_grid.dependencies.fastapi import SqlSession
@extension.get("/path/{parameter}")
async def serve_content(parameter, session: SqlSession):
await session.scalar(select(MyModel.parameter == parameter))
return serialize_to_pydantic(MyModel)
If you wish to create the database objects on startup, you can use
@asynccontextmanager
async def lifespan(engine: SqlAsyncEngine):
"""The lifespan ensure that the necessary database table is
created."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
extension = Extension(
name="database example",
module=__name__,
lifespan=lifespan)
Running extensions
In order to test extensions, one might want to run these using a separate process. This can be achieved by running
See below for further details on this command.
Tip
To run in your host environment change the port with --port 8000
.
Warning
This only works for processing and API extensions. Transformation and lookup extensions are called by cattle_grid directly.
We note here that the configuration will be loaded through the same mechanism as cattle_grid does. This is in particular relevant for accessing the database and the RabbitMQ router.
Configuring extensions
Extensions are configured in cattle_grid.toml
by adding an entry of the form
The factory method in the python module your.extension
will be called with the contents config
as an argument.
python -m cattle_grid.extensions
Tooling for managing extensions
Usage:
Options:
Name | Type | Description | Default |
---|---|---|---|
--help |
boolean | Show this message and exit. | False |
Subcommands
- async-api: Generates the async api schema for the extension
- load: Loads an extension
- openapi: Generates the openapi schema for the extension
- run: Runs the extension as an independent server process.
python -m cattle_grid.extensions async-api
Generates the async api schema for the extension
Usage:
Options:
Name | Type | Description | Default |
---|---|---|---|
--help |
boolean | Show this message and exit. | False |
python -m cattle_grid.extensions load
Loads an extension
Usage:
Options:
Name | Type | Description | Default |
---|---|---|---|
--help |
boolean | Show this message and exit. | False |
python -m cattle_grid.extensions openapi
Generates the openapi schema for the extension
Usage:
Options:
Name | Type | Description | Default |
---|---|---|---|
--filename |
text | Filename to write to | None |
--help |
boolean | Show this message and exit. | False |
python -m cattle_grid.extensions run
Runs the extension as an independent server process. The configuration is taken from the same files as cattle_grid. Thus these must be present.
Usage:
Options:
Name | Type | Description | Default |
---|---|---|---|
--host |
text | Host to run on | 0.0.0.0 |
--port |
integer | Port to run on | 80 |
--reload_dir |
text | Reload on changes in directory | None |
--no_broker |
boolean | Set to run without included broker | False |
--help |
boolean | Show this message and exit. | False |