Skip to content

.activity_pub

cattle_grid.activity_pub

get_async_api_schema

get_async_api_schema() -> Schema

Returns the async api schema for cattle_grid ActivityPub processing

Source code in cattle_grid/activity_pub/__init__.py
def get_async_api_schema() -> Schema:
    """Returns the async api schema for cattle_grid ActivityPub processing"""
    from faststream.asyncapi import get_app_schema

    app = get_mock_faststream_app()

    return get_app_schema(app)

get_fastapi_app

get_fastapi_app() -> FastAPI

Returns the fast api app for ActivityPub processing

Source code in cattle_grid/activity_pub/__init__.py
def get_fastapi_app() -> FastAPI:
    """Returns the fast api app for ActivityPub processing"""

    app = FastAPI(title="cattle_grid ActivityPub routes", version=__version__)
    app.include_router(router)

    return app

get_mock_faststream_app

get_mock_faststream_app() -> FastStream

Creates a mock faststream app for ActivityPub processing

Source code in cattle_grid/activity_pub/__init__.py
def get_mock_faststream_app() -> FastStream:
    """Creates a mock faststream app for ActivityPub processing"""

    from faststream.rabbit import RabbitBroker

    broker = RabbitBroker()
    broker.include_router(create_processing_router())

    return FastStream(
        broker,
        title="cattle_grid ActivityPub processing",
        version=__version__,
        description="Illustrates how cattle grid processes ActivityPub",
    )

cattle_grid.activity_pub.activity

actor_deletes_themselves

actor_deletes_themselves(activity: Dict[str, Any]) -> bool

Checks if activity is self delete of actor

>>> actor_deletes_themselves({"type": "Delete",
...     "actor": "http://actor.test/",
...     "object": "http://actor.test/"})
True

>>> actor_deletes_themselves({"type": "Delete",
...     "actor": "http://actor.test/",
...     "object": "http://other.test/"})
False
Source code in cattle_grid/activity_pub/activity.py
def actor_deletes_themselves(activity: Dict[str, Any]) -> bool:
    """
    Checks if activity is self delete of actor

    ```pycon
    >>> actor_deletes_themselves({"type": "Delete",
    ...     "actor": "http://actor.test/",
    ...     "object": "http://actor.test/"})
    True

    >>> actor_deletes_themselves({"type": "Delete",
    ...     "actor": "http://actor.test/",
    ...     "object": "http://other.test/"})
    False

    ```
    """

    activity_type = activity.get("type")
    if activity_type != "Delete":
        return False

    actor_id = activity.get("actor")
    object_id = id_for_object(activity.get("object"))

    if actor_id is None or object_id is None:
        return False

    return actor_id == object_id

cattle_grid.activity_pub.actor

ActorNotFound

Bases: Exception

Raised if an actor is not found

Source code in cattle_grid/activity_pub/actor/__init__.py
class ActorNotFound(Exception):
    """Raised if an actor is not found"""

DuplicateIdentifierException

Bases: Exception

Raised if an identifier already exists and one tries to create an actor with it

Source code in cattle_grid/activity_pub/actor/__init__.py
class DuplicateIdentifierException(Exception):
    """Raised if an identifier already exists and one tries to create an actor with it"""

actor_to_object

actor_to_object(actor: Actor) -> dict

Transform the actor to an object

Parameters:

Name Type Description Default
actor Actor
required

Returns:

Type Description
dict
Source code in cattle_grid/activity_pub/actor/__init__.py
def actor_to_object(actor: Actor) -> dict:
    """Transform the actor to an object

    :params actor:
    :returns:
    """

    sorted_identifiers = collect_identifiers_for_actor(actor)

    preferred_username = determine_preferred_username(
        sorted_identifiers, actor.actor_id
    )
    attachments = actor.profile.get("attachment")
    result = AsActor(
        id=actor.actor_id,
        outbox=actor.outbox_uri,
        inbox=actor.inbox_uri,
        followers=actor.followers_uri,
        following=actor.following_uri,
        public_key=actor.public_key,
        public_key_name=actor.public_key_name,
        preferred_username=preferred_username,
        type=actor.profile.get("type", "Person"),
        name=actor.profile.get("name"),
        summary=actor.profile.get("summary"),
        url=actor.profile.get("url"),
        icon=actor.profile.get("image", actor.profile.get("icon")),
        properties={
            "attachment": attachments,
            "published": actor.created_at.isoformat(),
        },
    ).build(visibility=Visibility.OWNER)

    result["identifiers"] = sorted_identifiers
    result["endpoints"] = endpoints_object_from_actor_id(actor.actor_id)

    if attachments:
        result["@context"].append(property_value_context)

    return result

bovine_actor_for_actor_id async

bovine_actor_for_actor_id(
    actor_id: str,
) -> BovineActor | None

Uses the information stored in Credential to construct a bovine actor

Parameters:

Name Type Description Default
actor_id str
required

Returns:

Type Description
BovineActor | None
Source code in cattle_grid/activity_pub/actor/__init__.py
async def bovine_actor_for_actor_id(actor_id: str) -> BovineActor | None:
    """Uses the information stored in [Credential][cattle_grid.activity_pub.models.Credential] to construct a bovine actor

    :params actor_id:
    :returns:
    """
    credential = await Credential.get_or_none(actor_id=actor_id)

    if credential is None:
        return None

    return BovineActor(
        public_key_url=credential.identifier,
        actor_id=actor_id,
        secret=credential.secret,
    )

compute_acct_uri

compute_acct_uri(base_url: str, preferred_username: str)

Computes the acct uri

>>> compute_acct_uri("http://host.example/somewhere", "alice")
'acct:alice@host.example'
Source code in cattle_grid/activity_pub/actor/__init__.py
def compute_acct_uri(base_url: str, preferred_username: str):
    """Computes the acct uri

    ```pycon
    >>> compute_acct_uri("http://host.example/somewhere", "alice")
    'acct:alice@host.example'

    ```

    """
    host = urlparse(base_url).hostname

    return f"acct:{preferred_username}@{host}"

create_actor async

create_actor(
    base_url: str,
    preferred_username: str | None = None,
    identifiers: dict = {},
    profile: dict = {},
)

Creates a new actor in the database

Source code in cattle_grid/activity_pub/actor/__init__.py
async def create_actor(
    base_url: str,
    preferred_username: str | None = None,
    identifiers: dict = {},
    profile: dict = {},
):
    """Creates a new actor in the database"""

    public_key, private_key = generate_rsa_public_private_key()
    public_key_name = "legacy-key-1"
    actor_id = new_url(base_url, "actor")

    if preferred_username:
        if "webfinger" in identifiers:
            raise ValueError("webfinger key set in identifiers")
        identifiers = {
            **identifiers,
            "webfinger": compute_acct_uri(base_url, preferred_username),
        }

    if "activitypub_id" not in identifiers:
        identifiers = {**identifiers, "activitypub_id": actor_id}

    identifier_already_exists = await identifier_in_list_exists(
        list(identifiers.values())
    )

    if identifier_already_exists:
        raise DuplicateIdentifierException("identifier already exists")

    actor = await Actor.create(
        actor_id=actor_id,
        inbox_uri=new_url(base_url, "inbox"),
        outbox_uri=new_url(base_url, "outbox"),
        following_uri=new_url(base_url, "following"),
        followers_uri=new_url(base_url, "followers"),
        public_key_name=public_key_name,
        public_key=public_key,
        profile=profile,
        automatically_accept_followers=False,
    )
    await Credential.create(
        actor_id=actor_id,
        identifier=f"{actor_id}#{public_key_name}",
        secret=private_key,
    )

    for name, identifier in identifiers.items():
        await PublicIdentifier.create(actor=actor, name=name, identifier=identifier)

    logging.info("Created actor with id '%s'", actor_id)

    await actor.fetch_related("identifiers")

    return actor

delete_actor async

delete_actor(actor: Actor)

Deletes an actor

Parameters:

Name Type Description Default
actor Actor

Actor to be deleted

required
Source code in cattle_grid/activity_pub/actor/__init__.py
async def delete_actor(actor: Actor):
    """Deletes an actor

    :param actor: Actor to be deleted
    """

    # await Credential.filter(actor_id=actor.actor_id).delete()
    await PublicIdentifier.filter(actor=actor).delete()

    actor.status = ActorStatus.deleted
    await actor.save()

delete_for_actor_profile

delete_for_actor_profile(actor: Actor) -> dict

Creates a delete activity for the Actor

Source code in cattle_grid/activity_pub/actor/__init__.py
def delete_for_actor_profile(actor: Actor) -> dict:
    """Creates a delete activity for the Actor"""

    actor_profile = actor_to_object(actor)
    activity_factory, _ = factories_for_actor_object(actor_profile)

    result = (
        activity_factory.delete(
            actor_profile.get("id"), followers=actor_profile["followers"]
        )
        .as_public()
        .build()
    )

    result["cc"].append(actor_profile["following"])

    return result

followers_for_actor async

followers_for_actor(actor: Actor) -> List[str]

Returns the list of accepted followers

Parameters:

Name Type Description Default
actor Actor
required

Returns:

Type Description
List[str]
Source code in cattle_grid/activity_pub/actor/__init__.py
async def followers_for_actor(actor: Actor) -> List[str]:
    """Returns the list of accepted followers

    :param actor:
    :returns:
    """

    await actor.fetch_related("followers")
    return [x.follower for x in actor.followers if x.accepted]

following_for_actor async

following_for_actor(actor: Actor) -> List[str]

Returns the list of accepted people to follow said actor. This is the following table.

Parameters:

Name Type Description Default
actor Actor
required

Returns:

Type Description
List[str]
Source code in cattle_grid/activity_pub/actor/__init__.py
async def following_for_actor(actor: Actor) -> List[str]:
    """Returns the list of accepted people to follow said actor.
    This is the following table.

    :param actor:
    :returns:
    """

    await actor.fetch_related("following")
    return [x.following for x in actor.following if x.accepted]

is_valid_requester async

is_valid_requester(requester: str, actor: Actor, obj: dict)

Checks if the requested is allowed to view the object

Source code in cattle_grid/activity_pub/actor/__init__.py
async def is_valid_requester(requester: str, actor: Actor, obj: dict):
    """Checks if the requested is allowed to view the object"""

    blocked = await Blocking.get_or_none(blocking=requester, actor=actor, active=True)

    if blocked:
        return False

    if is_public(obj):
        return True

    recipients = recipients_for_object(obj)
    self_delete = actor_deletes_themselves(obj)

    recipients = await update_recipients_for_actor(actor, recipients, self_delete)

    valid_requesters = recipients

    if "actor" in obj:
        valid_requesters = valid_requesters | {obj["actor"]}
    if "attributedTo" in obj:
        valid_requesters = valid_requesters | {obj["attributedTo"]}

    return requester in valid_requesters

is_valid_requester_for_obj async

is_valid_requester_for_obj(requester: str, obj: dict)

Checks if the requested is allowed to view the object

Source code in cattle_grid/activity_pub/actor/__init__.py
async def is_valid_requester_for_obj(requester: str, obj: dict):
    """Checks if the requested is allowed to view the object"""

    actor_id = obj.get("attributedTo")
    if actor_id is None:
        actor_id = obj.get("actor")
    if actor_id is None:
        raise ActorNotFound("Object does not have an actor or attributedTo")

    actor = await Actor.get_or_none(actor_id=actor_id)
    if actor is None:
        raise ActorNotFound("Actor not found")

    blocked = await Blocking.get_or_none(blocking=requester, actor=actor, active=True)

    if blocked:
        return False

    if is_public(obj):
        return True

    recipients = recipients_for_object(obj)
    self_delete = actor_deletes_themselves(obj)

    recipients = await update_recipients_for_actor(actor, recipients, self_delete)

    valid_requesters = recipients

    if "actor" in obj:
        valid_requesters = valid_requesters | {obj["actor"]}
    if "attributedTo" in obj:
        valid_requesters = valid_requesters | {obj["attributedTo"]}

    return requester in valid_requesters

remove_from_followers_following async

remove_from_followers_following(actor_id_to_remove: str)

Removes actor_id from all occurring followers and following

Source code in cattle_grid/activity_pub/actor/__init__.py
async def remove_from_followers_following(actor_id_to_remove: str):
    """Removes actor_id from all occurring followers and following"""

    await Follower.filter(follower=actor_id_to_remove).delete()
    await Following.filter(following=actor_id_to_remove).delete()

update_for_actor_profile

update_for_actor_profile(actor: Actor) -> dict

Creates an update for the Actor

Source code in cattle_grid/activity_pub/actor/__init__.py
def update_for_actor_profile(actor: Actor) -> dict:
    """Creates an update for the Actor"""

    actor_profile = actor_to_object(actor)
    activity_factory, _ = factories_for_actor_object(actor_profile)

    return (
        activity_factory.update(actor_profile, followers=actor_profile["followers"])
        .as_public()
        .build()
    )

update_recipients_for_actor async

update_recipients_for_actor(
    actor, recipients, self_delete=False
)

Updates set of recipients by removing the followers and following collections, and replacing them with the actual sets.

The following collecting is only allowed for self delete activities.

Source code in cattle_grid/activity_pub/actor/__init__.py
async def update_recipients_for_actor(actor, recipients, self_delete=False):
    """Updates set of recipients by removing the followers and following collections, and replacing
    them with the actual sets.

    The following collecting is only allowed for self delete activities.
    """
    if actor.followers_uri in recipients:
        recipients = recipients - {actor.followers_uri} | set(
            await followers_for_actor(actor)
        )

        logger.info("Got recipients %s after handling followers", ", ".join(recipients))

    if actor.following_uri in recipients:
        recipients = recipients - {actor.following_uri}

        if self_delete:
            recipients = recipients | set(await following_for_actor(actor))
        else:
            logger.warning(
                "Actor '%s' included following collection in recipients where not allowed",
                actor.actor_id,
            )

    return recipients

helper

endpoints_object_from_actor_id

endpoints_object_from_actor_id(actor_id: str) -> dict

Returns the endpoints object of the actor identified by actor_id

>>> endpoints_object_from_actor_id("http://host.test/actor/someId")
{'sharedInbox': 'http://host.test/shared_inbox'}
Source code in cattle_grid/activity_pub/actor/helper.py
def endpoints_object_from_actor_id(actor_id: str) -> dict:
    """Returns the endpoints object of the actor identified by actor_id

    ```pycon
    >>> endpoints_object_from_actor_id("http://host.test/actor/someId")
    {'sharedInbox': 'http://host.test/shared_inbox'}

    ```
    """
    return {"sharedInbox": shared_inbox_from_actor_id(actor_id)}

shared_inbox_from_actor_id

shared_inbox_from_actor_id(actor_id: str) -> str

Returns the shared inbox of the actor identified by actor_id

>>> shared_inbox_from_actor_id("http://host.test/actor/someId")
'http://host.test/shared_inbox'
Source code in cattle_grid/activity_pub/actor/helper.py
def shared_inbox_from_actor_id(actor_id: str) -> str:
    """Returns the shared inbox of the actor identified by actor_id

    ```pycon
    >>> shared_inbox_from_actor_id("http://host.test/actor/someId")
    'http://host.test/shared_inbox'

    ```
    """

    return urljoin(actor_id, "/shared_inbox")

identifiers

determine_preferred_username

determine_preferred_username(
    identifiers: List[str], actor_id: str
) -> str | None

Determine the preferred username from the sorted identifiers. The result is the name of the first acct-uri whose domain matches the actor id.

>>> determine_preferred_username(["acct:alice@other.example",
...     "acct:alice@actor.example"], "http://actor.example/actor")
'alice'
Source code in cattle_grid/activity_pub/actor/identifiers.py
def determine_preferred_username(identifiers: List[str], actor_id: str) -> str | None:
    """Determine the preferred username from the sorted identifiers.
    The result is the name of the first acct-uri whose domain matches
    the actor id.

    ```pycon
    >>> determine_preferred_username(["acct:alice@other.example",
    ...     "acct:alice@actor.example"], "http://actor.example/actor")
    'alice'

    ```
    """
    actor_domain = urlparse(actor_id).netloc
    for identifier in identifiers:
        if identifier.startswith("acct:"):
            handle, domain = identifier.removeprefix("acct:").split("@")
            if domain == actor_domain:
                return handle

    return None

cattle_grid.activity_pub.enqueuer

determine_activity_type

determine_activity_type(activity: dict) -> str | None

Determines the type of an activity

>>> determine_activity_type({"type": "Follow"})
'Follow'
>>> determine_activity_type({}) is None
True

In the case of multiple types, these are concatenated. This means that they are probably missed by processing, but don’t get ignored.

>>> determine_activity_type({"type": ["Follow", "WhileSkipping"]})
'FollowWhileSkipping'

Parameters:

Name Type Description Default
activity dict
required

Returns:

Type Description
str | None
Source code in cattle_grid/activity_pub/enqueuer.py
def determine_activity_type(activity: dict) -> str | None:
    """Determines the type of an activity

    ```pycon
    >>> determine_activity_type({"type": "Follow"})
    'Follow'

    ```


    ```pycon
    >>> determine_activity_type({}) is None
    True

    ```

    In the case of multiple types, these are concatenated. This means
    that they are probably missed by processing, but don't get ignored.

    ```pycon
    >>> determine_activity_type({"type": ["Follow", "WhileSkipping"]})
    'FollowWhileSkipping'

    ```

    :params activity:
    :returns:

    """

    activity_type = activity.get("type")
    if activity_type is None:
        return None
    if isinstance(activity_type, list):
        activity_type = "".join(activity_type)

    return activity_type

enqueue_from_inbox async

enqueue_from_inbox(
    broker: RabbitBroker,
    exchange: RabbitExchange,
    receiving_actor_id: str,
    content: dict,
)

Enqueues a new message arrived from the inbox

The routing key will be incoming.${activity_type}

Source code in cattle_grid/activity_pub/enqueuer.py
async def enqueue_from_inbox(
    broker: RabbitBroker,
    exchange: RabbitExchange,
    receiving_actor_id: str,
    content: dict,
):
    """Enqueues a new message arrived from the inbox

    The routing key will be `incoming.${activity_type}`
    """
    activity_type = determine_activity_type(content)
    if activity_type is None:
        return

    msg = ActivityMessage(actor=receiving_actor_id, data=content)

    await broker.publish(
        msg, exchange=exchange, routing_key=f"incoming.{activity_type}"
    )

enqueue_from_shared_inbox async

enqueue_from_shared_inbox(
    broker: RabbitBroker,
    exchange: RabbitExchange,
    content: dict,
)

Enqueues a new message arrived from the inbox

The routing key will be incoming.${activity_type}

Source code in cattle_grid/activity_pub/enqueuer.py
async def enqueue_from_shared_inbox(
    broker: RabbitBroker,
    exchange: RabbitExchange,
    content: dict,
):
    """Enqueues a new message arrived from the inbox

    The routing key will be `incoming.${activity_type}`
    """
    activity_type = determine_activity_type(content)
    if activity_type is None:
        return

    msg = SharedInboxMessage(data=content)

    await broker.publish(msg, exchange=exchange, routing_key="shared_inbox")