Skip to content

.exchange

cattle_grid.exchange

create_router

create_router(
    main_exchange: RabbitExchange | None = None,
    include_shovels=True,
) -> RabbitRouter

Creates a router to be used to manage users

Source code in cattle_grid/exchange/__init__.py
def create_router(
    main_exchange: RabbitExchange | None = None, include_shovels=True
) -> RabbitRouter:
    """Creates a router to be used to manage users"""

    if main_exchange is None:
        main_exchange = global_container.exchange

    router = RabbitRouter(middlewares=[exception_middleware])

    if include_shovels:
        router.subscriber(
            RabbitQueue("incoming_shovel", routing_key="incoming.#"),
            global_container.internal_exchange,
        )(incoming_shovel)
        router.subscriber(
            RabbitQueue("outgoing_shovel", routing_key="outgoing.#"),
            global_container.internal_exchange,
        )(outgoing_shovel)

    routing_config: List[Tuple[str, Any]] = [
        ("update_actor", update_actor),
        ("delete_actor", delete_actor_handler),
        ("send_message", send_message),
        ("fetch_object", fetch_object),
        ("add_method_information", add_method_information),
    ]

    prefix = "cattle_grid_"

    for routing_key, coroutine in routing_config:
        router.subscriber(
            RabbitQueue(prefix + routing_key, routing_key=routing_key),
            exchange=main_exchange,
            title=routing_key,
        )(coroutine)

    return router

actor_update

handle_actor_action async

handle_actor_action(
    actor: Actor,
    session: AsyncSession,
    action: UpdateAction,
) -> bool

Handles individual actions from the UpdateActor method

Source code in cattle_grid/exchange/actor_update/__init__.py
async def handle_actor_action(
    actor: Actor, session: AsyncSession, action: UpdateAction
) -> bool:
    """Handles individual actions from the UpdateActor
    method"""
    match action.action:
        case UpdateActionType.add_identifier:
            action = UpdateIdentifierAction.model_validate(action.model_dump())
            await handle_add_identifier(actor, session, action)
            return True
        case UpdateActionType.create_identifier:
            action = UpdateIdentifierAction.model_validate(action.model_dump())
            await handle_create_identifier(actor, session, action)
            return True
        case UpdateActionType.update_identifier:
            action = UpdateIdentifierAction.model_validate(action.model_dump())
            await handle_update_identifier(actor, session, action)
            return True
        case UpdateActionType.rename:
            rename_action = RenameActorAction.model_validate(action.model_dump())
            await handle_rename_actor(session, actor, rename_action)
            return False
        case UpdateActionType.update_property_value:
            cast_action = UpdatePropertyValueAction.model_validate(action.model_dump())
            handle_update_property_value(actor, cast_action)
            await session.commit()

            return True
        case UpdateActionType.remove_property_value:
            cast_action = UpdatePropertyValueAction.model_validate(action.model_dump())
            handle_delete_property_value(actor, cast_action)
            await session.commit()

            return True

    return False

identifiers

handle_add_identifier async
handle_add_identifier(
    actor: Actor,
    session: AsyncSession,
    action: UpdateIdentifierAction,
)

Adds an identifier to the actor

FIXME: Currently missing logic to verify identifier

Source code in cattle_grid/exchange/actor_update/identifiers.py
async def handle_add_identifier(
    actor: Actor, session: AsyncSession, action: UpdateIdentifierAction
):
    """Adds an identifier to the actor

    FIXME: Currently missing logic to verify identifier"""

    base_urls = await base_urls_for_actor(session, actor)
    identifier = action.identifier
    if is_identifier_for_a_base_url(identifier, base_urls):
        raise ValueError(
            "Cannot add an identifier for a controlled base_url use create instead"
        )
    await session.refresh(actor, attribute_names=["identifiers"])

    preference = 0

    if action.primary:
        preference = new_primary_preference(actor)

    session.add(
        PublicIdentifier(
            actor=actor,
            identifier=identifier,
            name="through_exchange",
            preference=preference,
            status=PublicIdentifierStatus.unverified,
        )
    )
    await session.commit()

handlers

delete_actor_handler async

delete_actor_handler(
    message: DeleteActorMessage,
    actor: MessageActor,
    session: SqlSession,
    broker: Annotated[RabbitBroker, Context()],
) -> None

Deletes the actor by id. Should be used asynchronously.

Source code in cattle_grid/exchange/handlers.py
async def delete_actor_handler(
    message: DeleteActorMessage,
    actor: MessageActor,
    session: SqlSession,
    broker: Annotated[RabbitBroker, Context()],
) -> None:
    """
    Deletes the actor by id. Should be used asynchronously.

    """

    logger.info("Deleting actor %s", message.actor)
    actor_for_account = await session.scalar(
        select(ActorForAccount).where(ActorForAccount.actor == message.actor)
    )
    if actor_for_account:
        logger.info("setting account to deleted")
        actor_for_account.status = ActorStatus.deleted

    await session.refresh(actor, attribute_names=["identifiers"])

    await broker.publish(
        StoreActivityMessage(
            actor=actor.actor_id, data=delete_for_actor_profile(actor)
        ),
        routing_key="store_activity",
        exchange=global_container.internal_exchange,
    )

    await delete_actor(session, actor)
    await session.commit()

update_actor async

update_actor(
    message: UpdateActorMessage,
    actor: MessageActor,
    session: SqlSession,
    broker: RabbitBroker = Context(),
) -> None

Should be used asynchronously

Source code in cattle_grid/exchange/handlers.py
async def update_actor(
    message: UpdateActorMessage,
    actor: MessageActor,
    session: SqlSession,
    broker: RabbitBroker = Context(),
) -> None:
    """Should be used asynchronously"""
    send_update = False

    for action in message.actions:
        try:
            if await handle_actor_action(actor, session, action):
                await session.refresh(actor, attribute_names=["identifiers"])
                send_update = True
        except Exception as e:
            logger.error(
                "Something went wrong when handling action of type %s",
                action.action.value,
            )
            logger.exception(e)

            # FIXME publish to error? How?

    if message.profile:
        actor.profile.update(message.profile)
        flag_modified(actor, "profile")

        logger.info("Updating actor %s", actor.actor_id)
        await session.commit()

        send_update = True

    if message.autoFollow is not None:
        actor.automatically_accept_followers = message.autoFollow
        await session.commit()

    await session.refresh(actor)

    if send_update:
        await broker.publish(
            StoreActivityMessage(
                actor=message.actor, data=update_for_actor_profile(actor)
            ),
            routing_key="store_activity",
            exchange=global_container.internal_exchange,
        )

info

exchange_method_information module-attribute

exchange_method_information = [
    MethodInformationModel(
        module="cattle_grid.exchange",
        routing_key="send_message",
        description="Takes an activity and sends it to its recipients",
    ),
    MethodInformationModel(
        module="cattle_grid.exchange",
        routing_key="update_actor",
        description="Updates an actor",
    ),
    MethodInformationModel(
        module="cattle_grid.exchange",
        routing_key="delete_actor",
        description="Deletes an actor",
    ),
]

Information about the methods defined on the ActivityExchange by default

add_method_information async

add_method_information(
    message: AddMethodInformationMessage,
)

Adds information about methods defined by an extension

Source code in cattle_grid/exchange/info.py
async def add_method_information(message: AddMethodInformationMessage):
    """Adds information about methods defined by an extension"""
    current_information = global_container.method_information or []

    global_container.method_information = (
        current_information + message.method_information
    )

message_handlers

fetch_object async

fetch_object(
    msg: FetchMessage, broker: RabbitBroker = Context()
) -> dict

Used to fetch an object as an RPC method

Source code in cattle_grid/exchange/message_handlers.py
async def fetch_object(msg: FetchMessage, broker: RabbitBroker = Context()) -> dict:
    """Used to fetch an object as an RPC method"""
    result = await broker.publish(
        msg,
        routing_key="fetch_object",
        exchange=global_container.internal_exchange,
        rpc=True,
    )
    if result == b"" or result is None:
        return {}
    return result

send_message async

send_message(
    msg: ActivityMessage,
    correlation_id: CorrelationId,
    broker: RabbitBroker = Context(),
) -> None

Takes a message and ensure it is distributed appropriately

FIXME: out_transformer?

Source code in cattle_grid/exchange/message_handlers.py
async def send_message(
    msg: ActivityMessage,
    correlation_id: CorrelationId,
    broker: RabbitBroker = Context(),
) -> None:
    """Takes a message and ensure it is distributed appropriately

    FIXME: out_transformer?"""

    content = msg.data
    activity_type = determine_activity_type(content)

    if not activity_type:
        return

    to_send = ActivityMessage(actor=msg.actor, data=content)

    await broker.publish(
        to_send,
        exchange=global_container.exchange,
        routing_key=f"outgoing.{activity_type}",
        correlation_id=correlation_id,
    )
    await broker.publish(
        to_send,
        exchange=global_container.internal_exchange,
        routing_key=f"outgoing.{activity_type}",
        correlation_id=correlation_id,
    )

shovel

incoming_shovel async

incoming_shovel(
    msg: ActivityMessage,
    transformer: Transformer,
    correlation_id: CorrelationId,
    session: SqlSession,
    broker: RabbitBroker = Context(),
) -> None

Transfers the message from the RawExchange to the Activity- and Account one.

The message is passed through the transformer.

Source code in cattle_grid/exchange/shovel.py
async def incoming_shovel(
    msg: RawActivityMessage,
    transformer: Transformer,
    correlation_id: CorrelationId,
    session: SqlSession,
    broker: RabbitBroker = Context(),
) -> None:
    """Transfers the message from the RawExchange to the
    Activity- and Account one.

    The message is passed through the transformer.
    """
    actor_for_account = await session.scalar(
        select(ActorForAccount).where(ActorForAccount.actor == msg.actor)
    )

    if actor_for_account is None:
        logger.warning("Got actor without account %s", msg.actor)
        return

    activity = msg.data

    if not await should_shovel_activity(session, activity):
        return

    # FIXME: Use join to combine the next two queries ...

    db_actor = await session.scalar(select(Actor).where(Actor.actor_id == msg.actor))
    if not db_actor:
        raise ValueError("Actor not found in database")

    blocking = await session.scalar(
        select(Blocking)
        .where(Blocking.actor == db_actor)
        .where(Blocking.blocking == activity.get("actor"))
        .where(Blocking.active)
    )
    if blocking:
        return

    account_name = actor_for_account.account.name
    to_shovel = await transformer({"raw": msg.data})
    activity_type = msg.data.get("type")

    await shovel_to_account_exchange(
        msg.actor, account_name, EventType.incoming, to_shovel, broker, correlation_id
    )

    await broker.publish(
        {
            "actor": msg.actor,
            "data": to_shovel,
        },
        routing_key=f"incoming.{activity_type}",
        exchange=global_container.exchange,
        correlation_id=correlation_id,
    )

should_shovel_activity async

should_shovel_activity(
    session: AsyncSession, activity: dict
) -> bool

Some activities like Block or Undo Block should not be visible to the user. This method returns False if this is the case.

Source code in cattle_grid/exchange/shovel.py
async def should_shovel_activity(session: AsyncSession, activity: dict) -> bool:
    """Some activities like Block or Undo Block should not be visible to the user. This method
    returns False if this is the case."""

    activity_type = activity.get("type")

    if activity_type == "Block":
        return False

    if activity_type == "Undo":
        object_id = id_for_object(activity.get("object"))
        blocking = await session.scalar(
            func.count(
                select(Blocking.id)
                .where(Blocking.request == object_id)
                .scalar_subquery()
            )
        )

        if blocking:
            return False

    return True