Skip to content

.activity_pub.server

cattle_grid.activity_pub.server

This package contains the overall router for all connection needs to the Fediverse. This means the .well-known endpoints.

JrdResponse

Bases: JSONResponse

Response that ensures the content-type is “application/jrd+json”

Source code in cattle_grid/activity_pub/server/__init__.py
class JrdResponse(JSONResponse):
    """Response that ensures the content-type is
    "application/jrd+json"
    """

    media_type = "application/jrd+json"

nodeinfo_data_responder async

nodeinfo_data_responder() -> NodeInfo

Returns the information according to the nodeinfo spec

Source code in cattle_grid/activity_pub/server/__init__.py
@router.get("/.well-known/nodeinfo_2.0", response_class=JrdResponse)
async def nodeinfo_data_responder() -> NodeInfo:
    """Returns the information according to the nodeinfo spec"""
    return NodeInfo(software=Software(name="cattle-grid", version=__version__))  # type: ignore

webfinger_responder async

webfinger_responder(
    resource: str, session: SqlSession
) -> JrdData

Handles requests to .well-known/webfinger. Results are determined by the identifier property of PublicIdentifier matching the resource parameter.

See RFC 7033 WebFinger.

Source code in cattle_grid/activity_pub/server/__init__.py
@router.get("/.well-known/webfinger")
async def webfinger_responder(resource: str, session: SqlSession) -> JrdData:
    """Handles requests to .well-known/webfinger. Results are determined
    by the identifier property of [PublicIdentifier][cattle_grid.database.activity_pub_actor.PublicIdentifier] matching the resource
    parameter.

    See [RFC 7033 WebFinger](https://www.rfc-editor.org/rfc/rfc7033).
    """

    logger.info("looking up web finger for resource '%s'", resource)

    pi = await session.scalar(
        select(PublicIdentifier).where(PublicIdentifier.identifier == resource)
    )

    if not pi:
        raise HTTPException(status_code=404, detail="Item not found")

    return webfinger_response(pi.identifier, pi.actor.actor_id)

router

ActivityPub related functionality

actor_profile async

actor_profile(
    headers: ActivityPubHeaders, session: SqlSession
)

Returns the actor

Source code in cattle_grid/activity_pub/server/router.py
@ap_router.get("/actor/{id_str}", response_class=ActivityResponse)
async def actor_profile(headers: ActivityPubHeaders, session: SqlSession):
    """Returns the actor"""
    logger.debug("Request for actor at %s", headers.x_ap_location)
    actor = await session.scalar(
        select(Actor)
        .where(Actor.actor_id == headers.x_ap_location)
        .options(joinedload(Actor.identifiers))
    )

    if headers.x_cattle_grid_should_serve == "html":
        if not actor:
            raise HTTPException(404)
        html_url = extract_html_url(actor)
        if html_url:
            return RedirectResponse(html_url)
        raise HTTPException(406)

    actor = await validate_request(session, actor, headers.x_cattle_grid_requester)

    result = actor_to_object(actor)
    return result

followers async

followers(
    id_str, headers: ActivityPubHeaders, session: SqlSession
)

Returns the followers

Source code in cattle_grid/activity_pub/server/router.py
@ap_router.get("/followers/{id_str}", response_class=ActivityResponse)
async def followers(id_str, headers: ActivityPubHeaders, session: SqlSession):
    """Returns the followers"""
    actor = await session.scalar(
        select(Actor).where(Actor.followers_uri == headers.x_ap_location)
    )
    actor = await validate_request(session, actor, headers.x_cattle_grid_requester)

    followers = await followers_for_actor(session, actor)
    return OrderedCollection(id=headers.x_ap_location, items=list(followers)).build()

following async

following(
    id_str, headers: ActivityPubHeaders, session: SqlSession
)

Returns the following

Source code in cattle_grid/activity_pub/server/router.py
@ap_router.get("/following/{id_str}", response_class=ActivityResponse)
async def following(id_str, headers: ActivityPubHeaders, session: SqlSession):
    """Returns the following"""

    actor = await session.scalar(
        select(Actor).where(Actor.following_uri == headers.x_ap_location)
    )
    actor = await validate_request(session, actor, headers.x_cattle_grid_requester)

    following = await following_for_actor(session, actor)
    return OrderedCollection(id=headers.x_ap_location, items=list(following)).build()

outbox async

outbox(headers: ActivityPubHeaders, session: SqlSession)

Returns an empty ordered collection as outbox

Source code in cattle_grid/activity_pub/server/router.py
@ap_router.get("/outbox/{id_str}", response_class=ActivityResponse)
async def outbox(headers: ActivityPubHeaders, session: SqlSession):
    """Returns an empty ordered collection as outbox"""
    actor = await session.scalar(
        select(Actor).where(Actor.outbox_uri == headers.x_ap_location)
    )
    actor = await validate_request(session, actor, headers.x_cattle_grid_requester)

    return OrderedCollection(id=headers.x_ap_location).build()

router_inbox

ActivityPub related functionality

APHeadersWithDigest

Bases: APHeaders

The addition of digest headers

Parameters:

Name Type Description Default
x_cattle_grid_requester str | None

URI of the actor making the request

None
x_cattle_grid_should_serve str | None

Type of content cattle_grid should serve

None
x_ap_location str

URI of the resource being retrieved

required
digest str | None

Legacy digest

None
content_digest str | None

Digest according to RFC 9530 Digest Fields

None
Source code in cattle_grid/activity_pub/server/router_inbox.py
class APHeadersWithDigest(APHeaders):
    """The addition of digest headers"""

    digest: str | None = None
    """Legacy digest"""
    content_digest: str | None = None
    """Digest according to [RFC 9530 Digest Fields](https://www.rfc-editor.org/rfc/rfc9530.html)"""
content_digest class-attribute instance-attribute
content_digest: str | None = None

Digest according to RFC 9530 Digest Fields

digest class-attribute instance-attribute
digest: str | None = None

Legacy digest

inbox async

inbox(
    id_str,
    request: Request,
    headers: Annotated[APHeadersWithDigest, Header()],
    broker: Broker,
    exchange: InternalExchange,
    session: SqlSession,
)

Processes an inbox message

Source code in cattle_grid/activity_pub/server/router_inbox.py
@ap_router_inbox.post("/inbox/{id_str}", status_code=202)
async def inbox(
    id_str,
    request: Request,
    headers: Annotated[APHeadersWithDigest, Header()],
    broker: Broker,
    exchange: InternalExchange,
    session: SqlSession,
):
    """Processes an inbox message"""
    logger.info("Got incoming request")
    actor = await session.scalar(
        select(Actor).where(Actor.inbox_uri == headers.x_ap_location)
    )
    actor = await validate_request(session, actor, headers.x_cattle_grid_requester)

    try:
        raw_body = await request.body()
        validate_digest_header(headers, raw_body)
        data = parse_body(raw_body)

        request_actor = data.get("actor")

        if request_actor != headers.x_cattle_grid_requester:
            raise HTTPException(401)

        await enqueue_from_inbox(broker, exchange, actor.actor_id, data)

        return ""

    except Exception as e:
        if isinstance(e, HTTPException):
            raise e
        logger.error("Processing post request failed with %s", e)
        logger.exception(e)

        raise HTTPException(422)

router_object

ActivityPub related functionality

return_object async

return_object(
    obj_id, headers: ActivityPubHeaders, session: SqlSession
)

Returns the stored activities

Source code in cattle_grid/activity_pub/server/router_object.py
@ap_router_object.get("/object/{obj_id}", response_class=ActivityResponse)
async def return_object(obj_id, headers: ActivityPubHeaders, session: SqlSession):
    """Returns the stored activities"""

    obj = await session.scalar(
        select(StoredActivity).where(StoredActivity.id == obj_id)
    )
    if obj is None or not isinstance(obj.data, dict):
        raise HTTPException(404)

    if obj.data.get("id") != headers.x_ap_location:
        raise HTTPException(404)

    if headers.x_cattle_grid_requester is None:
        raise HTTPException(401)

    if not await is_valid_requester(
        session, headers.x_cattle_grid_requester, obj.actor, obj.data
    ):
        raise HTTPException(401)

    return obj.data