Skip to content

.exchange

cattle_grid.exchange

create_router

create_router(
    main_exchange: RabbitExchange | None = None,
    include_shovels=True,
) -> RabbitRouter

Creates a router to be used to manage users

Source code in cattle_grid/exchange/__init__.py
def create_router(
    main_exchange: RabbitExchange | None = None, include_shovels=True
) -> RabbitRouter:
    """Creates a router to be used to manage users"""

    if main_exchange is None:
        main_exchange = global_container.exchange

    router = RabbitRouter()

    if include_shovels:
        router.subscriber(
            RabbitQueue("incoming_shovel", routing_key="incoming.#"),
            global_container.internal_exchange,
        )(incoming_shovel)
        router.subscriber(
            RabbitQueue("outgoing_shovel", routing_key="outgoing.#"),
            global_container.internal_exchange,
        )(outgoing_shovel)

    routing_config: List[Tuple[str, Any]] = [
        ("update_actor", update_actor),
        ("delete_actor", delete_actor_handler),
        ("send_message", send_message),
        ("ffetch_object", fetch_object),
        ("add_method_information", add_method_information),
    ]

    prefix = "cattle_grid_"

    for routing_key, coroutine in routing_config:
        router.subscriber(
            RabbitQueue(prefix + routing_key, routing_key=routing_key),
            exchange=main_exchange,
            title=routing_key,
        )(coroutine)

    return router

actor_update

handle_actor_action async

handle_actor_action(
    actor: Actor, action: UpdateAction
) -> bool

Handles individual actions from the UpdateActor method

Source code in cattle_grid/exchange/actor_update/__init__.py
async def handle_actor_action(actor: Actor, action: UpdateAction) -> bool:
    """Handles individual actions from the UpdateActor
    method"""
    match action.action:
        case UpdateActionType.add_identifier:
            action = UpdateIdentifierAction.model_validate(action.model_dump())
            await handle_add_identifier(actor, action)
            return True
        case UpdateActionType.create_identifier:
            action = UpdateIdentifierAction.model_validate(action.model_dump())
            await handle_create_identifier(actor, action)
            return True
        case UpdateActionType.update_identifier:
            action = UpdateIdentifierAction.model_validate(action.model_dump())
            await handle_update_identifier(actor, action)
            return True
        case UpdateActionType.rename:
            rename_action = RenameActorAction.model_validate(action.model_dump())
            await handle_rename_actor(actor, rename_action)
            return False
        case UpdateActionType.update_property_value:
            cast_action = UpdatePropertyValueAction.model_validate(action.model_dump())
            await handle_update_property_value(actor, cast_action)
            return True
        case UpdateActionType.remove_property_value:
            cast_action = UpdatePropertyValueAction.model_validate(action.model_dump())
            await handle_delete_property_value(actor, cast_action)
            return True

    return False

identifiers

handle_add_identifier async
handle_add_identifier(
    actor: Actor, action: UpdateIdentifierAction
)

Adds an identifier to the actor

FIXME: Currently missing logic to verify identifier

Source code in cattle_grid/exchange/actor_update/identifiers.py
async def handle_add_identifier(actor: Actor, action: UpdateIdentifierAction):
    """Adds an identifier to the actor

    FIXME: Currently missing logic to verify identifier"""

    base_urls = await base_urls_for_actor(actor)
    identifier = action.identifier
    if is_identifier_for_a_base_url(identifier, base_urls):
        raise ValueError(
            "Cannot add an identifier for a controlled base_url use create instead"
        )

    await actor.fetch_related("identifiers")

    preference = 0

    if action.primary:
        preference = new_primary_preference(actor)

    await PublicIdentifier.create(
        actor=actor,
        identifier=identifier,
        name="through_exchange",
        preference=preference,
        status=PublicIdentifierStatus.unverified,
    )

handlers

delete_actor_handler async

delete_actor_handler(
    message: DeleteActorMessage,
    actor: MessageActor,
    broker: Annotated[RabbitBroker, Context()],
) -> None

Deletes the actor by id. Should be used asynchronously.

Source code in cattle_grid/exchange/handlers.py
async def delete_actor_handler(
    message: DeleteActorMessage,
    actor: MessageActor,
    broker: Annotated[RabbitBroker, Context()],
) -> None:
    """
    Deletes the actor by id. Should be used asynchronously.

    """

    logger.info("Deleting actor %s", message.actor)
    actor_for_account = await ActorForAccount.get_or_none(actor=message.actor)
    if actor_for_account:
        logger.info("setting account to deleted")
        actor_for_account.status = ActorStatus.deleted
        await actor_for_account.save()

    await broker.publish(
        StoreActivityMessage(
            actor=actor.actor_id, data=delete_for_actor_profile(actor)
        ),
        routing_key="store_activity",
        exchange=global_container.internal_exchange,
    )

    await delete_actor(actor)

update_actor async

update_actor(
    message: UpdateActorMessage,
    actor: MessageActor,
    broker: RabbitBroker = Context(),
) -> None

Should be used asynchronously

Source code in cattle_grid/exchange/handlers.py
async def update_actor(
    message: UpdateActorMessage, actor: MessageActor, broker: RabbitBroker = Context()
) -> None:
    """Should be used asynchronously"""
    send_update = False

    for action in message.actions:
        try:
            if await handle_actor_action(actor, action):
                await actor.refresh_from_db()
                await actor.fetch_related("identifiers")
                send_update = True
        except Exception as e:
            logger.error(
                "Something went wrong when handling action of type %s",
                action.action.value,
            )
            logger.exception(e)

            # FIXME publish to error? How?

    if message.profile:
        actor.profile.update(message.profile)

        logger.info("Updating actor %s", actor.actor_id)
        await actor.save()
        await actor.fetch_related("identifiers")
        send_update = True

    if message.autoFollow is not None:
        actor.automatically_accept_followers = message.autoFollow
        await actor.save()

    if send_update:
        await broker.publish(
            StoreActivityMessage(
                actor=message.actor, data=update_for_actor_profile(actor)
            ),
            routing_key="store_activity",
            exchange=global_container.internal_exchange,
        )

info

exchange_method_information module-attribute

exchange_method_information = [
    MethodInformationModel(
        module="cattle_grid.exchange",
        routing_key="send_message",
        description="Takes an activity and sends it to its recipients",
    ),
    MethodInformationModel(
        module="cattle_grid.exchange",
        routing_key="update_actor",
        description="Updates an actor",
    ),
    MethodInformationModel(
        module="cattle_grid.exchange",
        routing_key="delete_actor",
        description="Deletes an actor",
    ),
]

Information about the methods defined on the ActivityExchange by default

add_method_information async

add_method_information(
    message: AddMethodInformationMessage,
)

Adds information about methods defined by an extension

Source code in cattle_grid/exchange/info.py
async def add_method_information(message: AddMethodInformationMessage):
    """Adds information about methods defined by an extension"""
    current_information = global_container.method_information or []

    global_container.method_information = (
        current_information + message.method_information
    )

message_handlers

send_message async

send_message(
    msg: ActivityMessage, broker: RabbitBroker = Context()
) -> None

Takes a message and ensure it is distributed appropriatelty

Source code in cattle_grid/exchange/message_handlers.py
async def send_message(
    msg: ActivityMessage,
    broker: RabbitBroker = Context(),
) -> None:
    """Takes a message and ensure it is distributed appropriatelty"""

    content = msg.data
    activity_type = determine_activity_type(content)

    if not activity_type:
        return

    to_send = ActivityMessage(actor=msg.actor, data=content)

    await broker.publish(
        to_send,
        exchange=global_container.exchange,
        routing_key=f"outgoing.{activity_type}",
    )
    await broker.publish(
        to_send,
        exchange=global_container.internal_exchange,
        routing_key=f"outgoing.{activity_type}",
    )

shovel

should_shovel_activity async

should_shovel_activity(activity: dict) -> bool

Some activities like Block or Undo Block should not be visible to the user. This method returns False if this is the case.

Source code in cattle_grid/exchange/shovel.py
async def should_shovel_activity(activity: dict) -> bool:
    """Some activities like Block or Undo Block should not be visible to the user. This method
    returns False if this is the case."""

    activity_type = activity.get("type")

    if activity_type == "Block":
        return False

    logger.info([x.request for x in await Blocking.filter().all()])

    if activity_type == "Undo":
        object_id = id_for_object(activity.get("object"))
        blocking = await Blocking.get_or_none(request=object_id)

        if blocking:
            return False

    return True