Skip to content

cattle_grid.extensions.examples.simple_storage

This extension is an example of storing activities and then serving them through a HTTP API.

I will possibly extend it to also store objects (and provide some Create activity creation), but not much more.

A real storage mechanism should have several features this simple API has not, e.g.

  • Serving a HTML page through content negotiation
  • Allowing one to update the content of the database
  • Collecting and adding metadata, e.g. a replies collection for objects

Usage:

[[extensions]]
module = "cattle_grid.extensions.examples.simple_storage"
api_prefix = "/simple_storage"

config = { prefix = "/simple_storage/" }

extension module-attribute

extension = Extension(
    name="simple storage",
    module=__name__,
    lifespan=lifespan,
    config_class=SimpleStorageConfiguration,
)

The simple storage extension

get_activity_or_object async

get_activity_or_object(
    uuid: UUID,
    headers: ActivityPubHeaders,
    session: SqlSession,
)

Returns the activity or object

Source code in cattle_grid/extensions/examples/simple_storage/__init__.py
@extension.get("/{uuid}")
async def get_activity_or_object(
    uuid: uuid.UUID, headers: ActivityPubHeaders, session: FastApiSession
):
    """Returns the activity or object"""
    result = await session.scalar(
        select(StoredActivity).where(StoredActivity.id == uuid)
    )

    if result is None:
        result = await session.scalar(
            select(StoredObject).where(StoredObject.id == uuid)
        )

    if result is None:
        raise HTTPException(status_code=404, detail="Activity not found")

    try:
        if not await is_valid_requester_for_obj(
            headers.x_cattle_grid_requester, result.data
        ):
            raise HTTPException(status_code=401)
    except ActorNotFound:
        raise HTTPException(status_code=410, detail="Activity no longer available")

    if result.data.get("id") != headers.x_ap_location:
        raise HTTPException(status_code=400, detail="Location header does not match id")

    return result.data

lifespan async

lifespan(engine: SqlAsyncEngine)

The lifespan ensure that the necessary database table is created.

Source code in cattle_grid/extensions/examples/simple_storage/__init__.py
@asynccontextmanager
async def lifespan(engine: SqlAsyncEngine):
    """The lifespan ensure that the necessary database table is
    created."""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    yield

main async

main()

Basic endpoint that just returns a string, so requesting with an uuid doesn’t return an error.

Source code in cattle_grid/extensions/examples/simple_storage/__init__.py
@extension.get("/")
async def main():
    """Basic endpoint that just returns a string, so
    requesting with an uuid doesn't return an error."""
    return "simple storage cattle grid sample extension"

simple_storage_publish_activity async

simple_storage_publish_activity(
    message: PublishActivity,
    config: Config,
    session: CommittingSession,
    broker: RabbitBroker = Context(),
)

Method to subscribe to the publish_activity routing_key.

An activity send to this endpoint will be stored in the database, and then published through the send_message mechanism.

The stored activity can then be retrieved through the HTTP API.

Source code in cattle_grid/extensions/examples/simple_storage/__init__.py
@extension.subscribe("publish_activity")
async def simple_storage_publish_activity(
    message: PublishActivity,
    config: extension.Config,  # type: ignore
    session: CommittingSession,
    broker: RabbitBroker = Context(),
):
    """Method to subscribe to the `publish_activity` routing_key.

    An activity send to this endpoint will be stored in the
    database, and then published through the `send_message`
    mechanism.

    The stored activity can then be retrieved through the
    HTTP API.
    """
    if message.data.get("id"):
        raise ValueError("Activity ID must not be set")

    if message.data.get("actor") != message.actor:
        raise ValueError("Actor must match message actor")

    activity = message.data
    activity["id"], uuid = config.make_id(message.actor)

    logger.info("Publishing activity with id %s for %s", message.actor, activity["id"])

    session.add(
        StoredActivity(
            id=uuid,
            data=activity,
            actor=message.actor,
        )
    )

    await broker.publish(
        ActivityMessage(actor=message.actor, data=activity),
        routing_key="send_message",
        exchange=global_container.exchange,
    )

simple_storage_publish_object async

simple_storage_publish_object(
    message: PublishObject,
    config: Config,
    session: CommittingSession,
    actor: MessageActor,
    broker: RabbitBroker = Context(),
)

Publishes an object, subscribed to the routing key publish_object.

We note that this routine creates a Create activity for the object.

Source code in cattle_grid/extensions/examples/simple_storage/__init__.py
@extension.subscribe("publish_object")
async def simple_storage_publish_object(
    message: PublishObject,
    config: extension.Config,  # type: ignore
    session: CommittingSession,
    actor: MessageActor,
    broker: RabbitBroker = Context(),
):
    """Publishes an object, subscribed to the routing key
    `publish_object`.

    We note that this routine creates a `Create` activity for the object."""

    if message.data.get("id"):
        raise ValueError("Object ID must not be set")

    if message.data.get("attributedTo") != message.actor:
        raise ValueError("Actor must match object attributedTo")

    obj = message.data
    obj["id"], obj_uuid = config.make_id(message.actor)

    logger.info("Publishing object with id %s for %s", message.actor, obj["id"])

    actor_profile = actor_to_object(actor)
    activity_factory, _ = factories_for_actor_object(actor_profile)

    activity_id, activity_uuid = config.make_id(message.actor)

    activity = activity_factory.create(obj, id=activity_id).build()

    logger.info(activity)

    await broker.publish(
        ActivityMessage(actor=message.actor, data=activity),
        routing_key="send_message",
        exchange=global_container.exchange,
    )

    session.add(
        StoredActivity(
            id=activity_uuid,
            data=activity,
            actor=message.actor,
        )
    )
    session.add(
        StoredObject(
            id=obj_uuid,
            data=obj,
            actor=message.actor,
        )
    )

config

SimpleStorageConfiguration

Bases: BaseModel

Configuration of the simple storage extension

Parameters:

Name Type Description Default
prefix str
'/simple_storage/'
Source code in cattle_grid/extensions/examples/simple_storage/config.py
class SimpleStorageConfiguration(BaseModel):
    """Configuration of the simple storage extension"""

    prefix: str = "/simple_storage/"
    """
    Path to use before the generated uuid. The protocol and domain will be extracted from the actor id. See [cattle_grid.extensions.examples.simple_storage.config.determine_url_start][].
    """

    def url_start(self, actor_id):
        return determine_url_start(actor_id, self.prefix)

    def make_id(self, actor_id):
        new_uuid = uuid6.uuid7()
        return self.url_start(actor_id) + str(new_uuid), new_uuid

prefix class-attribute instance-attribute

prefix: str = '/simple_storage/'

Path to use before the generated uuid. The protocol and domain will be extracted from the actor id. See cattle_grid.extensions.examples.simple_storage.config.determine_url_start.

determine_url_start

determine_url_start(actor_id, prefix)

Used to determine the url of a stored object

>>> determine_url_start("http://abel.example/actor/alice",
...     "/simple/storage/")
'http://abel.example/simple/storage/'
Source code in cattle_grid/extensions/examples/simple_storage/config.py
def determine_url_start(actor_id, prefix):
    """
    Used to determine the url of a stored object

    ```pycon
    >>> determine_url_start("http://abel.example/actor/alice",
    ...     "/simple/storage/")
    'http://abel.example/simple/storage/'

    ```
    """
    parsed = urlparse(actor_id)

    return f"{parsed.scheme}://{parsed.netloc}{prefix}"

message_types

PublishActivity

Bases: BaseModel

Used when publishing an activity

Parameters:

Name Type Description Default
actor str

The actor performing the activity

required
data dict

Activity to publish

required
Source code in cattle_grid/extensions/examples/simple_storage/message_types.py
class PublishActivity(BaseModel):
    """Used when publishing an activity"""

    actor: str = Field(
        examples=["http://alice.example"],
        description="The actor performing the activity",
    )

    data: dict = Field(
        examples=[
            {
                "@context": "https://www.w3.org/ns/activitystreams",
                "type": "AnimalSound",
                "actor": "http://alice.example",
                "to": ["http://bob.example"],
                "content": "moo",
            }
        ],
        description="""Activity to publish""",
    )

PublishObject

Bases: BaseModel

Used when publishing an object

Parameters:

Name Type Description Default
actor str

The actor performing the activity

required
data dict

Object to publish

required
Source code in cattle_grid/extensions/examples/simple_storage/message_types.py
class PublishObject(BaseModel):
    """Used when publishing an object"""

    actor: str = Field(
        examples=["http://alice.example"],
        description="""The actor performing the activity""",
    )

    data: dict = Field(
        examples=[
            {
                "@context": "https://www.w3.org/ns/activitystreams",
                "type": "Note",
                "attributedTo": "http://alice.example",
                "to": ["http://bob.example"],
                "content": "moo",
            }
        ],
        description="""Object to publish""",
    )

models

Base

Bases: AsyncAttrs, DeclarativeBase

Base model

Source code in cattle_grid/extensions/examples/simple_storage/models.py
class Base(AsyncAttrs, DeclarativeBase):
    """Base model"""

    pass

StoredActivity

Bases: Base

Stored activity in the database

Source code in cattle_grid/extensions/examples/simple_storage/models.py
class StoredActivity(Base):
    """Stored activity in the database"""

    __tablename__ = "simple_storage_stored_activity"
    """name of the table"""

    id: Mapped[bytes] = mapped_column(UUIDType(binary=True), primary_key=True)
    """The id (uuid as bytes)"""
    data: Mapped[dict] = mapped_column(JSON)
    """The activity as JSON"""
    actor: Mapped[str] = mapped_column()
    """The actor that created the activity"""
    create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    """The create timestamp"""

__tablename__ class-attribute instance-attribute

__tablename__ = 'simple_storage_stored_activity'

name of the table

actor class-attribute instance-attribute

actor: Mapped[str] = mapped_column()

The actor that created the activity

create_date class-attribute instance-attribute

create_date: Mapped[datetime] = mapped_column(
    server_default=now()
)

The create timestamp

data class-attribute instance-attribute

data: Mapped[dict] = mapped_column(JSON)

The activity as JSON

id class-attribute instance-attribute

id: Mapped[bytes] = mapped_column(
    UUIDType(binary=True), primary_key=True
)

The id (uuid as bytes)

StoredObject

Bases: Base

Stored object in the database

Source code in cattle_grid/extensions/examples/simple_storage/models.py
class StoredObject(Base):
    """Stored object in the database"""

    __tablename__ = "simple_storage_stored_object"
    """name of the table"""

    id: Mapped[bytes] = mapped_column(UUIDType(binary=True), primary_key=True)
    """The id (uuid as bytes)"""
    data: Mapped[dict] = mapped_column(JSON)
    """The object as JSON"""
    actor: Mapped[str] = mapped_column()
    """The actor that created the object"""
    create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    """The create timestamp"""

__tablename__ class-attribute instance-attribute

__tablename__ = 'simple_storage_stored_object'

name of the table

actor class-attribute instance-attribute

actor: Mapped[str] = mapped_column()

The actor that created the object

create_date class-attribute instance-attribute

create_date: Mapped[datetime] = mapped_column(
    server_default=now()
)

The create timestamp

data class-attribute instance-attribute

data: Mapped[dict] = mapped_column(JSON)

The object as JSON

id class-attribute instance-attribute

id: Mapped[bytes] = mapped_column(
    UUIDType(binary=True), primary_key=True
)

The id (uuid as bytes)