Skip to content

.activity_pub.processing

cattle_grid.activity_pub.processing

incoming

incoming_accept_activity async

incoming_accept_activity(
    message: ActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

Handles an incoming Accept activity

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_accept_activity(
    message: ActivityMessage, actor: MessageActor, broker=Context()
):
    """Handles an incoming Accept activity"""
    accept_request = message.data
    request_being_accepted = id_for_object(accept_request.get("object"))

    following = await Following.get_or_none(request=request_being_accepted)

    if not following:
        logger.warning("Follow request with id '%s' not found", request_being_accepted)
        return

    following.accepted = True
    await following.save()

    logger.info("Processed follow request %s (following)", request_being_accepted)

incoming_block_activity async

incoming_block_activity(
    message: ActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

Handles an incoming Block activity

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_block_activity(
    message: ActivityMessage, actor: MessageActor, broker=Context()
):
    """Handles an incoming Block activity"""
    current_actor_id = message.data.get("object")
    if current_actor_id != actor.actor_id:
        logger.warning("Mismatch of actor and target of block")
        return

    actor_blocking = message.data.get("actor")

    following = await Following.get_or_none(following=actor_blocking)

    if not following:
        return

    await following.delete()

incoming_follow_request async

incoming_follow_request(
    message: ActivityMessage,
    actor: MessageActor,
    factories: FactoriesForActor,
    broker=Context(),
)

For an incoming Follow request an entry in the Follower table is created with having accepted set to False.

If the actor automatically accepts followers, the actor sends Accept activity to the actor requesting to follow it.

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_follow_request(
    message: ActivityMessage,
    actor: MessageActor,
    factories: FactoriesForActor,
    broker=Context(),
):
    """
    For an incoming Follow request an entry in the Follower table is created
    with having `accepted` set to False.

    If the actor automatically accepts followers, the actor sends Accept activity
    to the actor requesting to follow it.
    """
    follow_request = message.data
    to_follow = follow_request.get("object")
    if isinstance(to_follow, dict):
        to_follow = to_follow.get("id")

    if to_follow is None or to_follow != actor.actor_id:
        return

    request_id = follow_request.get("id")
    follower = follow_request.get("actor")

    await Follower.update_or_create(
        actor=actor,
        follower=follower,
        defaults={"request": request_id, "accepted": False},
    )

    if actor.automatically_accept_followers:
        accept = factories[0].accept(follow_request).build()

        await broker.publish(
            StoreActivityMessage(actor=actor.actor_id, data=accept),
            routing_key="store_activity",
            exchange=global_container.internal_exchange,
        )
        logger.info(
            "Got follow request from %s with id %s (auto accepted)",
            follower,
            request_id,
        )

    else:
        logger.info("Got follow request from %s with id %s", follower, request_id)

incoming_reject_activity async

incoming_reject_activity(
    message: ActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

Handles an incoming Reject activity

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_reject_activity(
    message: ActivityMessage, actor: MessageActor, broker=Context()
):
    """Handles an incoming Reject activity"""
    reject_request = message.data
    request_being_rejected = reject_request.get("object")

    following = await Following.get_or_none(request=request_being_rejected)

    if not following:
        return

    await following.delete()

incoming_undo_activity async

incoming_undo_activity(
    message: ActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

Handles an incoming Undo activity

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_undo_activity(
    message: ActivityMessage, actor: MessageActor, broker=Context()
):
    """Handles an incoming Undo activity"""

    accept_request = message.data
    request_being_undone = id_for_object(accept_request.get("object"))
    follower = await Follower.get_or_none(request=request_being_undone)

    if not follower:
        return

    await follower.delete()

outgoing

outgoing_accept_request async

outgoing_accept_request(
    message: ActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

Handles an outgoing Accept activity

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_accept_request(
    message: ActivityMessage, actor: MessageActor, broker=Context()
):
    """Handles an outgoing Accept activity"""
    accept_request = message.data
    request_being_accepted = id_for_object(accept_request.get("object"))

    follower = await Follower.get_or_none(request=request_being_accepted)

    if not follower:
        logger.warning("Follow request with id '%s' not found", request_being_accepted)
        return

    follower.accepted = True
    await follower.save()

    logger.info("Accepted follow request %s", request_being_accepted)

outgoing_block_activity async

outgoing_block_activity(
    message: ActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

Handles an outgoing Block activity

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_block_activity(
    message: ActivityMessage, actor: MessageActor, broker=Context()
):
    """Handles an outgoing Block activity"""
    block_request = message.data
    actor_being_blocked = block_request.get("object")

    follower = await Follower.get_or_none(follower=actor_being_blocked)

    if follower:
        await follower.delete()

    block_id = block_request.get("id", "permanent")

    if block_id == "permanent":
        logger.warning("%s permanently blocked %s", actor.actor_id, actor_being_blocked)

    await Blocking.create(
        actor=actor, blocking=actor_being_blocked, request=block_id, active=True
    )
    logger.info("Created block")
    logger.info("%s blocked %s", actor.actor_id, actor_being_blocked)

outgoing_follow_request async

outgoing_follow_request(
    message: ActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

Handles an outgoing Follow request

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_follow_request(
    message: ActivityMessage, actor: MessageActor, broker=Context()
):
    """Handles an outgoing Follow request"""

    follow_request = message.data
    to_follow = follow_request.get("object")
    if isinstance(to_follow, dict):
        to_follow = to_follow.get("id")

    if to_follow is None:
        return

    logger.info("Send follow request to %s", to_follow)

    await Following.update_or_create(
        actor=actor,
        following=to_follow,
        defaults={"request": follow_request.get("id"), "accepted": False},
    )

outgoing_message_distribution async

outgoing_message_distribution(
    message: ActivityMessage, broker=Context()
)

Distributes the message to its recipients

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_message_distribution(message: ActivityMessage, broker=Context()):
    """Distributes the message to its recipients"""

    recipients = recipients_for_object(message.data)
    recipients = remove_public(recipients)

    logger.debug("Got recipients %s", ", ".join(recipients))

    recipients = await update_recipients_for_collections(message, recipients)

    for recipient in recipients:
        await broker.publish(
            ToSendMessage(actor=message.actor, data=message.data, target=recipient),
            exchange=global_container.internal_exchange,
            routing_key="to_send",
        )

outgoing_reject_activity async

outgoing_reject_activity(
    message: ActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

Handles an outgoing Reject activity

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_reject_activity(
    message: ActivityMessage, actor: MessageActor, broker=Context()
):
    """Handles an outgoing Reject activity"""
    reject_request = message.data
    request_being_rejected = reject_request.get("object")

    follower = await Follower.get_or_none(request=request_being_rejected)
    if follower:
        await follower.delete()
        return

outgoing_undo_request async

outgoing_undo_request(
    message: ActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

Handles an outgoing Undo activity

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_undo_request(
    message: ActivityMessage, actor: MessageActor, broker=Context()
):
    """Handles an outgoing Undo activity"""
    accept_request = message.data
    request_being_undone = accept_request.get("object")

    following = await Following.get_or_none(request=request_being_undone)
    if following:
        await following.delete()
        return

    blocking = await Blocking.get_or_none(request=request_being_undone)
    if blocking:
        blocking.active = False
        await blocking.save()
        return

remote

fetch_object async

fetch_object(
    message: FetchMessage,
    actor: MessageBovineActor,
    lookup: LookupAnnotation,
)

Handles retrieving a remote object

Source code in cattle_grid/activity_pub/processing/remote.py
async def fetch_object(
    message: FetchMessage,
    actor: MessageBovineActor,
    lookup: LookupAnnotation,
):
    """Handles retrieving a remote object"""

    try:
        lookup_result = await lookup(Lookup(uri=message.uri, actor=message.actor))
        if lookup_result.result:
            return lookup_result.result
    except Exception as e:
        logger.error("Something went up with lookup")
        logger.exception(e)

        lookup_result = Lookup(uri=message.uri, actor=message.actor)

    result = await actor.get(lookup_result.uri, fail_silently=True)

    return result

resolve_inbox async

resolve_inbox(actor, target)

Resolves the inbox of target for actor using a cache

Source code in cattle_grid/activity_pub/processing/remote.py
async def resolve_inbox(actor, target):
    """Resolves the inbox of target for actor using
    a cache"""
    cached = await InboxLocation.get_or_none(actor=target)
    if cached:
        return cached.inbox

    target_actor = await actor.get(target)
    if not target_actor:
        return None

    inbox = target_actor.get("inbox")
    if inbox is None:
        return

    await InboxLocation.update_or_create(actor=target, defaults={"inbox": inbox})

    return inbox

sending_message async

sending_message(
    message: ToSendMessage, actor: MessageBovineActor
)

Handles sending a message

Source code in cattle_grid/activity_pub/processing/remote.py
async def sending_message(
    message: ToSendMessage,
    actor: MessageBovineActor,
):
    """Handles sending a message"""
    inbox = await resolve_inbox(actor, message.target)
    if inbox:
        result = await actor.post(inbox, message.data)
        logger.info("Got %s for sending to %s", str(result), inbox)

shared_inbox

handle_shared_inbox_message async

handle_shared_inbox_message(
    message: SharedInboxMessage,
    broker: RabbitBroker = Context(),
)

This method is used to handle incoming messages from the shared inbox.

Source code in cattle_grid/activity_pub/processing/shared_inbox.py
async def handle_shared_inbox_message(
    message: SharedInboxMessage,
    broker: RabbitBroker = Context(),
):
    """
    This method is used to handle incoming messages from the shared inbox.
    """

    recipients = recipients_for_object(message.data)
    sender = message.data.get("actor")

    if sender is None:
        return

    local_actor_ids = {
        x.actor_id for x in await Actor.filter(actor_id__in=recipients).all()
    }
    following_actor_ids = {
        x.actor.actor_id
        for x in await Following.filter(following=sender, accepted=True)
        .prefetch_related("actor")
        .all()
    }

    for actor in local_actor_ids | following_actor_ids:
        await enqueue_from_inbox(
            broker, global_container.internal_exchange, actor, message.data
        )

store_activity

store_activity_subscriber async

store_activity_subscriber(
    message: StoreActivityMessage,
    actor: MessageActor,
    broker=Context(),
)

This method used internally to store activities generated by cattle_grid, e.g. an automatically generated accept requests. We note that it assigns an id to the activity.

After storing the activity, it is published to the appropriate outgoing.* topic, which triggers sending the activity.

Source code in cattle_grid/activity_pub/processing/store_activity.py
async def store_activity_subscriber(
    message: StoreActivityMessage,
    actor: MessageActor,
    broker=Context(),
):
    """This method used internally to store activities
    generated by `cattle_grid`, e.g. an automatically generated
    accept requests. We note that it assigns an id to
    the activity.

    After storing the activity, it is published to the
    appropriate `outgoing.*` topic, which triggers sending
    the activity.
    """
    base_url = "/".join(actor.actor_id.split("/")[:-2])
    uuid = str(uuid7())
    activity_id = f"{base_url}/object/{uuid}"

    activity = message.data
    activity["id"] = activity_id
    activity_type = activity.get("type")

    await StoredActivity.create(
        id=uuid, actor=actor, data=activity, published=datetime.now()
    )

    await broker.publish(
        {"actor": actor.actor_id, "data": activity},
        routing_key=f"outgoing.{activity_type}",
        exchange=global_container.internal_exchange,
    )

util

update_recipients_for_collections async

update_recipients_for_collections(
    msg: ActivityMessage, recipients: Set[str]
) -> Set[str]

Updates recipients with followers and following collection.

Source code in cattle_grid/activity_pub/processing/util.py
async def update_recipients_for_collections(
    msg: ActivityMessage, recipients: Set[str]
) -> Set[str]:
    """Updates recipients with followers and following collection."""

    db_actor = await Actor.get(actor_id=msg.actor)
    self_delete = actor_deletes_themselves(msg.data)

    return await update_recipients_for_actor(
        db_actor, recipients, self_delete=self_delete
    )