Skip to content

cattle_grid.activity_pub

These are the methods from the cattle_grid.activity_pub package that are hopefully safe to use when building stuff, and not subject to change.

DuplicateIdentifierException

Bases: Exception

Raised if an identifier already exists and one tries to create an actor with it

Source code in cattle_grid/activity_pub/actor/__init__.py
class DuplicateIdentifierException(Exception):
    """Raised if an identifier already exists and one tries to create an actor with it"""

actor_to_object

actor_to_object(actor: Actor) -> dict

Transform the actor to an object

Warning

The actor.identifiers needs to be fetched from the database for this to work properly

Source code in cattle_grid/activity_pub/actor/transform.py
def actor_to_object(actor: Actor) -> dict:
    """Transform the actor to an object

    !!! warning
        The `actor.identifiers` needs to be fetched from the database for this to work properly

    """

    sorted_identifiers = collect_identifiers_for_actor(actor)

    preferred_username = determine_preferred_username(
        sorted_identifiers, actor.actor_id
    )
    attachments = actor.profile.get("attachment")
    result = AsActor(
        id=actor.actor_id,
        outbox=actor.outbox_uri,
        inbox=actor.inbox_uri,
        followers=actor.followers_uri,
        following=actor.following_uri,
        public_key=actor.public_key,
        public_key_name=actor.public_key_name,
        preferred_username=preferred_username,
        type=actor.profile.get("type", "Person"),
        name=actor.profile.get("name"),
        summary=actor.profile.get("summary"),
        url=actor.profile.get("url"),
        icon=actor.profile.get("image", actor.profile.get("icon")),
        properties={
            "attachment": attachments,
            "published": actor.created_at.isoformat(),
        },
    ).build(visibility=Visibility.OWNER)

    result["identifiers"] = sorted_identifiers
    result["endpoints"] = endpoints_object_from_actor_id(actor.actor_id)

    result["@context"].append(
        {"manuallyApprovesFollowers": "as:manuallyApprovesFollowers"}
    )
    result["manuallyApprovesFollowers"] = not actor.automatically_accept_followers

    if attachments:
        result["@context"].append(property_value_context)

    return result

compute_acct_uri

compute_acct_uri(base_url: str, preferred_username: str)

Computes the acct uri (see RFC 7565)

>>> compute_acct_uri("http://host.example/somewhere", "alice")
'acct:alice@host.example'
Source code in cattle_grid/activity_pub/actor/helper.py
def compute_acct_uri(base_url: str, preferred_username: str):
    """Computes the acct uri (see [RFC 7565](https://www.rfc-editor.org/rfc/rfc7565))

    ```pycon
    >>> compute_acct_uri("http://host.example/somewhere", "alice")
    'acct:alice@host.example'

    ```

    """
    host = urlparse(base_url).hostname

    return f"acct:{preferred_username}@{host}"

create_actor async

create_actor(
    session: AsyncSession,
    base_url: str,
    preferred_username: str | None = None,
    identifiers: dict = {},
    profile: dict = {},
    automatically_accept_followers: bool = False,
)

Creates a new actor in the database

Source code in cattle_grid/activity_pub/actor/__init__.py
async def create_actor(
    session: AsyncSession,
    base_url: str,
    preferred_username: str | None = None,
    identifiers: dict = {},
    profile: dict = {},
    automatically_accept_followers: bool = False,
):
    """Creates a new actor in the database"""

    public_key, private_key = generate_rsa_public_private_key()
    public_key_name = "legacy-key-1"
    actor_id = new_url(base_url, "actor")

    if preferred_username:
        if "webfinger" in identifiers:
            raise ValueError("webfinger key set in identifiers")
        identifiers = {
            **identifiers,
            "webfinger": compute_acct_uri(base_url, preferred_username),
        }

    if "activitypub_id" not in identifiers:
        identifiers = {**identifiers, "activitypub_id": actor_id}

    identifier_already_exists = await identifier_in_list_exists(
        session, list(identifiers.values())
    )

    if identifier_already_exists:
        raise DuplicateIdentifierException("identifier already exists")

    actor = Actor(
        actor_id=actor_id,
        inbox_uri=new_url(base_url, "inbox"),
        outbox_uri=f"{actor_id}/outbox",
        following_uri=f"{actor_id}/following",
        followers_uri=f"{actor_id}/followers",
        public_key_name=public_key_name,
        public_key=public_key,
        profile={**profile},
        automatically_accept_followers=automatically_accept_followers,
        status=ActorStatus.active,
    )
    session.add(actor)

    for name, identifier in identifiers.items():
        session.add(
            PublicIdentifier(
                actor=actor,
                name=name,
                identifier=identifier,
                status=PublicIdentifierStatus.verified,
            )
        )

    credential = Credential(
        actor_id=actor_id,
        identifier=f"{actor_id}#{public_key_name}",
        secret=private_key,
    )
    session.add(credential)

    logging.info("Created actor with id '%s'", actor_id)

    await session.commit()
    await session.refresh(actor, attribute_names=["identifiers"])

    return actor

identifier_exists async

identifier_exists(
    session: AsyncSession, identifier: str
) -> bool

Checks if the identifier already exists

Source code in cattle_grid/activity_pub/actor/identifiers.py
async def identifier_exists(session: AsyncSession, identifier: str) -> bool:
    """Checks if the identifier already exists"""

    return await identifier_in_list_exists(session, [identifier])

is_valid_requester_for_obj async

is_valid_requester_for_obj(
    session: AsyncSession, requester: str, obj: dict
)

Checks if the requested is allowed to view the object

Source code in cattle_grid/activity_pub/actor/requester.py
async def is_valid_requester_for_obj(session: AsyncSession, requester: str, obj: dict):
    """Checks if the requested is allowed to view the object"""

    actor_id = obj.get("attributedTo")
    if actor_id is None:
        actor_id = obj.get("actor")
    if actor_id is None:
        raise ActorNotFound("Object does not have an actor or attributedTo")

    actor = await session.scalar(select(Actor).where(Actor.actor_id == actor_id))
    if actor is None:
        raise ActorNotFound("Actor not found")

    return await is_valid_requester(session, requester, actor, obj)