Skip to content

.exchange

cattle_grid.exchange

create_router

create_router(
    main_exchange: RabbitExchange | None = None,
    include_shovels=True,
) -> RabbitRouter

Creates a router to be used to manage users

Source code in cattle_grid/exchange/__init__.py
def create_router(
    main_exchange: RabbitExchange | None = None, include_shovels=True
) -> RabbitRouter:
    """Creates a router to be used to manage users"""

    if main_exchange is None:
        main_exchange = global_container.exchange

    router = RabbitRouter(middlewares=[exception_middleware])

    if include_shovels:
        router.subscriber(
            RabbitQueue("incoming_shovel", routing_key="incoming.#", durable=True),
            global_container.internal_exchange,
        )(incoming_shovel)
        router.subscriber(
            RabbitQueue("outgoing_shovel", routing_key="outgoing.#", durable=True),
            global_container.internal_exchange,
        )(outgoing_shovel)

    routing_config: List[Tuple[str, Any]] = [
        ("update_actor", update_actor),
        ("delete_actor", delete_actor_handler),
        ("send_message", send_message),
        ("fetch_object", fetch_object),
        ("fetch", fetch),
        ("add_method_information", add_method_information),
    ]

    prefix = "cattle_grid_"

    for routing_key, coroutine in routing_config:
        router.subscriber(
            RabbitQueue(prefix + routing_key, routing_key=routing_key, durable=True),
            exchange=main_exchange,
            title=routing_key,
        )(coroutine)

    return router

actor_update

handle_actor_action async

handle_actor_action(
    actor: Actor,
    session: AsyncSession,
    action: UpdateAction,
) -> bool

Handles individual actions from the UpdateActor method

Source code in cattle_grid/exchange/actor_update/__init__.py
async def handle_actor_action(
    actor: Actor, session: AsyncSession, action: UpdateAction
) -> bool:
    """Handles individual actions from the UpdateActor
    method"""
    match action.action:
        case UpdateActionType.add_identifier:
            await handle_add_identifier(actor, session, action)
            return True
        case UpdateActionType.create_identifier:
            await handle_create_identifier(actor, session, action)
            return True
        case UpdateActionType.update_identifier:
            await handle_update_identifier(actor, session, action)
            return True

        case UpdateActionType.rename:
            await handle_rename_actor(session, actor, action)
            return False

        case UpdateActionType.update_property_value:
            handle_update_property_value(actor, action)
            return True
        case UpdateActionType.remove_property_value:
            handle_delete_property_value(actor, action)
            return True

        case UpdateActionType.add_url:
            add_url(actor, action)
            return True
        case UpdateActionType.remove_url:
            remove_url(actor, action)
            return True

    return False

identifiers

handle_add_identifier async
handle_add_identifier(
    actor: Actor,
    session: AsyncSession,
    action: UpdateIdentifierAction,
)

Adds an identifier to the actor

FIXME: Currently missing logic to verify identifier

Source code in cattle_grid/exchange/actor_update/identifiers.py
async def handle_add_identifier(
    actor: Actor, session: AsyncSession, action: UpdateIdentifierAction
):
    """Adds an identifier to the actor

    FIXME: Currently missing logic to verify identifier"""

    base_urls = await base_urls_for_actor(session, actor)
    identifier = action.identifier
    if is_identifier_for_a_base_url(identifier, base_urls):
        raise ValueError(
            "Cannot add an identifier for a controlled base_url use create instead"
        )
    await session.refresh(actor, attribute_names=["identifiers"])

    preference = 0

    if action.primary:
        preference = new_primary_preference(actor)

    session.add(
        PublicIdentifier(
            actor=actor,
            identifier=identifier,
            name="through_exchange",
            preference=preference,
            status=PublicIdentifierStatus.unverified,
        )
    )

urls

url_matches
url_matches(item: str | dict, url: str) -> bool

Checks if the item represents the url

>>> url_matches("http://remote.test", "http://remote.test")
True

>>> url_matches({"href": "http://remote.test"}, "http://remote.test")
True

Source code in cattle_grid/exchange/actor_update/urls.py
def url_matches(item: str | dict, url: str) -> bool:
    """Checks if the item represents the url
    ```
    >>> url_matches("http://remote.test", "http://remote.test")
    True

    >>> url_matches({"href": "http://remote.test"}, "http://remote.test")
    True

    ```
    """
    if isinstance(item, dict):
        return item.get("href") == url

    return item == url

exception

exception_handler async

exception_handler(
    exception: Exception,
    routing_key: RoutingKey,
    session: SqlSession,
    publisher: AccountExchangePublisher,
    message=Context("message.body"),
)

When an exception occurs in processing, this handler will create an appropriate entry in receive.NAME.error in the account exchange

Source code in cattle_grid/exchange/exception.py
@exception_middleware.add_handler(Exception)
async def exception_handler(
    exception: Exception,
    routing_key: RoutingKey,
    session: SqlSession,
    publisher: AccountExchangePublisher,
    message=Context("message.body"),
):
    """When an exception occurs in processing, this handler will create
    an appropriate entry in `receive.NAME.error` in the account exchange"""
    try:
        data = json.loads(message)
        actor_id = data["actor"]
    except Exception:
        logger.exception(exception)
        logger.exception(message)
        return

    logger.info("Exception for %s", actor_id)
    logger.debug(exception, exc_info=True)
    account = await account_for_actor(session, actor_id)
    if not account:
        return
    name = account.name

    logger.info("Processing error occurred in exchange for account %s", name)

    await publisher(
        ErrorMessage(message=str(exception).split("\n"), routing_key=routing_key),
        routing_key=f"error.{name}",
    )

handlers

delete_actor_handler async

delete_actor_handler(
    message: DeleteActorMessage,
    actor: MessageActor,
    session: SqlSession,
    publisher: InternalExchangePublisher,
) -> None

Deletes the actor by id. Should be used asynchronously.

Source code in cattle_grid/exchange/handlers.py
async def delete_actor_handler(
    message: DeleteActorMessage,
    actor: MessageActor,
    session: SqlSession,
    publisher: InternalExchangePublisher,
) -> None:
    """
    Deletes the actor by id. Should be used asynchronously.
    """

    logger.info("Deleting actor %s", message.actor)
    actor_for_account = await session.scalar(
        select(ActorForAccount).where(ActorForAccount.actor == message.actor)
    )
    if actor_for_account:
        logger.info("setting account to deleted")
        actor_for_account.status = ActorStatus.deleted

    await session.refresh(actor, attribute_names=["identifiers"])

    await publisher(
        StoreActivityMessage(
            actor=actor.actor_id, data=delete_for_actor_profile(actor)
        ),
        routing_key="store_activity",
    )

    await delete_actor(session, actor)
    await session.commit()

update_actor async

update_actor(
    message: UpdateActorMessage,
    actor: MessageActor,
    session: SqlSession,
    publisher: InternalExchangePublisher,
) -> None

Should be used asynchronously

Source code in cattle_grid/exchange/handlers.py
async def update_actor(
    message: UpdateActorMessage,
    actor: MessageActor,
    session: SqlSession,
    publisher: InternalExchangePublisher,
) -> None:
    """Should be used asynchronously"""
    send_update = False

    for action in message.actions:
        try:
            if await handle_actor_action(actor, session, action):
                await session.commit()
                await session.refresh(actor, attribute_names=["identifiers"])
                send_update = True
        except Exception as e:
            logger.error(
                "Something went wrong when handling action of type %s",
                action.action.value,
            )
            logger.exception(e)
            raise e
    if message.profile:
        actor.profile.update(message.profile)
        flag_modified(actor, "profile")

        logger.info("Updating actor %s", actor.actor_id)
        await session.commit()

        send_update = True

    if message.autoFollow is not None:
        actor.automatically_accept_followers = message.autoFollow
        await session.commit()

    await session.refresh(actor)

    if send_update:
        await publisher(
            StoreActivityMessage(
                actor=message.actor, data=update_for_actor_profile(actor)
            ),
            routing_key="store_activity",
        )

info

exchange_method_information module-attribute

exchange_method_information = [
    MethodInformationModel(
        module="cattle_grid.exchange",
        routing_key="send_message",
        description="Takes an activity and sends it to its recipients",
    ),
    MethodInformationModel(
        module="cattle_grid.exchange",
        routing_key="update_actor",
        description="Updates an actor",
    ),
    MethodInformationModel(
        module="cattle_grid.exchange",
        routing_key="delete_actor",
        description="Deletes an actor",
    ),
]

Information about the methods defined on the ActivityExchange by default

add_method_information async

add_method_information(
    message: AddMethodInformationMessage,
)

Adds information about methods defined by an extension

Source code in cattle_grid/exchange/info.py
async def add_method_information(message: AddMethodInformationMessage):
    """Adds information about methods defined by an extension"""
    current_information = global_container.method_information or []

    global_container.method_information = (
        current_information + message.method_information
    )

message_handlers

fetch async

fetch(
    message: FetchMessage,
    transformer: Transformer,
    lookup: LookupAnnotation,
    actor: MessageBovineActor,
) -> dict

Used to fetch an object as an RPC method. In difference to fetch_object, this method applies the transformer.

Source code in cattle_grid/exchange/message_handlers.py
async def fetch(
    message: FetchMessage,
    transformer: Transformer,
    lookup: LookupAnnotation,
    actor: MessageBovineActor,
) -> dict:
    """Used to fetch an object as an RPC method. In difference to `fetch_object`,
    this method applies the transformer."""

    from cattle_grid.activity_pub.processing.remote import fetch_object

    logger.info("fetch started")
    result = await fetch_object(
        message,
        actor,
        lookup,
    )
    logger.info("fetch done", result)

    transformed = await transformer({"raw": result}, actor_id=message.actor)
    return transformed

fetch_object async

fetch_object(
    msg: FetchMessage, requester: InternalExchangeRequester
) -> dict

Used to fetch an object as an RPC method

Source code in cattle_grid/exchange/message_handlers.py
async def fetch_object(msg: FetchMessage, requester: InternalExchangeRequester) -> dict:
    """Used to fetch an object as an RPC method"""
    result = await requester(
        msg,
        routing_key="fetch_object",
    )
    return result

send_message async

send_message(
    msg: ActivityMessage,
    publisher: InternalExchangePublisher,
) -> None

Takes a message and ensure it is distributed appropriately

Source code in cattle_grid/exchange/message_handlers.py
async def send_message(
    msg: ActivityMessage,
    publisher: InternalExchangePublisher,
) -> None:
    """Takes a message and ensure it is distributed appropriately"""

    content = msg.data
    activity_type = determine_activity_type(content)

    if not activity_type:
        return

    to_send = ActivityMessage(actor=msg.actor, data=content)

    await publisher(
        to_send,
        routing_key=f"outgoing.{activity_type}",
    )

shovel

incoming_shovel async

incoming_shovel(
    msg: ActivityMessage,
    session: SqlSession,
    transformer: Transformer,
    publisher: ActivityExchangePublisher,
    account_publisher: AccountExchangePublisher,
) -> None

Transfers the message from the RawExchange to the Activity- and Account one.

The message is passed through the transformer.

Source code in cattle_grid/exchange/shovel.py
async def incoming_shovel(
    msg: ActivityMessage,
    session: SqlSession,
    transformer: Transformer,
    publisher: ActivityExchangePublisher,
    account_publisher: AccountExchangePublisher,
) -> None:
    """Transfers the message from the RawExchange to the
    Activity- and Account one.

    The message is passed through the transformer.
    """
    logger.info("incoming shovel")

    if not await should_shovel_activity(session, msg.data):
        return

    # FIXME: Use join to combine the next two queries ...

    db_actor = await session.scalar(select(Actor).where(Actor.actor_id == msg.actor))
    if not db_actor:
        raise ValueError("Actor not found in database")

    blocking = await session.scalar(
        select(Blocking)
        .where(Blocking.actor == db_actor)
        .where(Blocking.blocking == msg.data.get("actor"))
        .where(Blocking.active)
    )
    if blocking:
        return

    return await shovel(
        msg.actor,
        msg.data,
        direction=EventType.incoming,
        transformer=transformer,
        publisher=publisher,
        account_publisher=account_publisher,
        session=session,
    )

should_shovel_activity async

should_shovel_activity(
    session: AsyncSession, activity: dict
) -> bool

Some activities like Block or Undo Block should not be visible to the user. This method returns False if this is the case.

Source code in cattle_grid/exchange/shovel.py
async def should_shovel_activity(session: AsyncSession, activity: dict) -> bool:
    """Some activities like Block or Undo Block should not be visible to the user. This method
    returns False if this is the case."""

    activity_type = activity.get("type")

    if activity_type == "Block":
        return False

    if activity_type == "Undo":
        object_id = id_for_object(activity.get("object"))
        blocking = await session.scalar(
            func.count(
                select(Blocking.id)
                .where(Blocking.request == object_id)
                .scalar_subquery()
            )
        )

        if blocking:
            return False

    return True