Skip to content

cattle_grid.dependencies

cattle_grid.dependencies

Dependencies injected by fast_depends

AccountExchange module-attribute

AccountExchange = Annotated[
    RabbitExchange, Depends(get_account_exchange)
]

The account exchange

ActivityExchange module-attribute

ActivityExchange = Annotated[
    RabbitExchange, Depends(get_exchange)
]

The activity exchange

ClientSession module-attribute

ClientSession = Annotated[
    ClientSession, Depends(get_client_session)
]

The aiohttp.ClientSession used by the application

CommittingSession module-attribute

CommittingSession = Annotated[
    AsyncSession, Depends(with_session_commit)
]

Session that commits the transaction

Config module-attribute

Config = Annotated[Dynaconf, Depends(get_config)]

Returns the configuration

CorrelationId module-attribute

CorrelationId = Annotated[
    str, Context("message.correlation_id")
]

The correlation id of the message

InternalExchange module-attribute

InternalExchange = Annotated[
    RabbitExchange, Depends(get_internal_exchange)
]

The interal activity exchange

LookupAnnotation module-attribute

LookupAnnotation = Annotated[
    LookupMethod, Depends(get_lookup)
]

The lookup method loaded from extensions

MethodInformation module-attribute

MethodInformation = Annotated[
    List[MethodInformationModel],
    Depends(get_method_information),
]

Returns the information about the methods that are a part of the exchange

SqlAsyncEngine module-attribute

SqlAsyncEngine = Annotated[AsyncEngine, Depends(get_engine)]

Returns the SqlAlchemy AsyncEngine

SqlSession module-attribute

SqlSession = Annotated[
    AsyncSession, Depends(with_sql_session)
]

SQL session that does not commit afterwards

Transformer module-attribute

Transformer = Annotated[
    Callable[[Dict], Awaitable[Dict]],
    Depends(get_transformer),
]

The transformer loaded from extensions

fastapi

ActivityExchange module-attribute

ActivityExchange = Annotated[
    RabbitExchange, Depends(get_exchange)
]

The Activity Exchange

Broker module-attribute

Broker = Annotated[RabbitBroker, Depends(get_broker)]

The RabbitMQ broker

CommittingSqlSession module-attribute

CommittingSqlSession = Annotated[
    AsyncSession, Depends(with_committing_sql_session)
]

Session annotation to be used with FastAPI. A commit is performed, after processing the request

Config module-attribute

Config = Annotated[Dynaconf, Depends(get_config)]

Returns the configuration

MethodInformation module-attribute

MethodInformation = Annotated[
    List[MethodInformationModel],
    Depends(get_method_information),
]

Returns the information about the methods that are a part of the exchange

SqlAsyncEngine module-attribute

SqlAsyncEngine = Annotated[AsyncEngine, Depends(get_engine)]

Returns the SqlAlchemy AsyncEngine

SqlSession module-attribute

SqlSession = Annotated[
    AsyncSession, Depends(with_fast_api_session)
]

Session annotation to be used with FastAPI

Transformer module-attribute

Transformer = Annotated[
    Callable[[Dict], Awaitable[Dict]],
    Depends(get_transformer),
]

The transformer loaded from extensions

globals

GlobalContainer dataclass

GlobalContainer(session: aiohttp.client.ClientSession | None = None, engine: sqlalchemy.ext.asyncio.engine.AsyncEngine | None = None, method_information: Optional[List[cattle_grid.model.extension.MethodInformationModel]] = None, transformer: Optional[Callable[[Dict], Awaitable[Dict]]] = None, lookup: Optional[Callable[[cattle_grid.model.lookup.Lookup], Awaitable[cattle_grid.model.lookup.Lookup]]] = None, _config: dynaconf.base.LazySettings | dynaconf.utils.DynaconfDict | None = None, async_session_maker: Optional[Callable[[], sqlalchemy.ext.asyncio.session.AsyncSession]] = None)

Parameters:

Name Type Description Default
session ClientSession | None
None
engine AsyncEngine | None
None
method_information List[MethodInformationModel] | None
None
transformer Callable[list, Awaitable[Dict]] | None
None
lookup Callable[list, Awaitable[Lookup]] | None
None
_config LazySettings | DynaconfDict | None
None
async_session_maker Callable[list, AsyncSession] | None
None
Source code in cattle_grid/dependencies/globals.py
@dataclass
class GlobalContainer:
    session: aiohttp.ClientSession | None = None
    engine: AsyncEngine | None = None
    method_information: List[MethodInformationModel] | None = None

    transformer: Callable[[Dict], Awaitable[Dict]] | None = None
    lookup: LookupMethod | None = None

    _config: Dynaconf | DynaconfDict | None = None

    async_session_maker: Callable[[], AsyncSession] | None = None

    def __post_init__(self):
        self.load_config()

    def get_config(self):
        if self._config is None:
            raise ValueError("Config not loaded")
        return self._config

    @property
    def config(self):
        if self._config is None:
            raise ValueError("Config not loaded")
        return self._config

    def load_config(self, filenames: list[str] = default_filenames):
        self._config = load_settings(filenames)

    @asynccontextmanager
    async def session_lifecycle(self):
        async with aiohttp.ClientSession() as session:
            self.session = session
            yield session
            self.session = None

    @asynccontextmanager
    async def alchemy_database(self, db_uri: str | None = None, echo: bool = False):
        """Initializes the sql alchemy engine"""

        if db_uri is None:
            db_uri = self.config.db_uri  # type:ignore

        if db_uri is None:
            raise ValueError("Database URI not set")

        if "postgres://" in db_uri:
            db_uri = db_uri.replace("postgres://", "postgresql+asyncpg://")

        if self.engine or self.async_session_maker:
            raise ValueError("Database already initialized")

        self.engine = create_async_engine(db_uri, echo=echo)
        self.async_session_maker = async_sessionmaker(self.engine)
        logger.info(
            "Connected to %s with sqlalchemy", re.sub("://.*@", "://***:***@", db_uri)
        )

        yield self.engine

        await self.engine.dispose()

        self.engine = None
        self.async_session_maker = None

    @asynccontextmanager
    async def common_lifecycle(self, config=None):
        if config is None:
            config = self.config

        async with database(config.db_uri, generate_schemas=True):  # type: ignore
            async with self.session_lifecycle():
                async with self.alchemy_database():
                    yield

    @cached_property
    def internal_exchange(self) -> RabbitExchange:
        """The internal exchange used to process
        ActivityPub messages related to the social graph

        :returns:
        """
        return RabbitExchange(
            self.config.activity_pub.internal_exchange,  # type: ignore
            type=ExchangeType.TOPIC,
        )

    def get_internal_exchange(self) -> RabbitExchange:
        return self.internal_exchange

    @cached_property
    def exchange(self) -> RabbitExchange:
        """Returns the public exchange used to process

        :returns:
        """
        return RabbitExchange(
            self.config.activity_pub.exchange,  # type: ignore
            type=ExchangeType.TOPIC,
        )

    def get_exchange(self) -> RabbitExchange:
        return self.exchange

    @cached_property
    def account_exchange(self) -> RabbitExchange:
        """Returns the exchange used to distribute messages between accounts

        :returns:
        """
        exchange_name = self.config.activity_pub.account_exchange  # type: ignore

        durable = True if exchange_name == "amq.topic" else False

        return RabbitExchange(exchange_name, type=ExchangeType.TOPIC, durable=durable)  # type: ignore

    def get_account_exchange(self) -> RabbitExchange:
        return self.account_exchange

    @cached_property
    def broker(self) -> RabbitBroker:
        return RabbitBroker(self.config.amqp_uri)  # type: ignore

    def get_broker(self) -> RabbitBroker:
        return self.broker

    def get_session_maker(self) -> Callable[[], AsyncSession]:
        if not self.async_session_maker:
            raise ValueError("Alchemy database not initialized")
        return self.async_session_maker
account_exchange cached property
account_exchange: RabbitExchange

Returns the exchange used to distribute messages between accounts

Returns:

Type Description
RabbitExchange
exchange cached property
exchange: RabbitExchange

Returns the public exchange used to process

Returns:

Type Description
RabbitExchange
internal_exchange cached property
internal_exchange: RabbitExchange

The internal exchange used to process ActivityPub messages related to the social graph

Returns:

Type Description
RabbitExchange
alchemy_database async
alchemy_database(
    db_uri: str | None = None, echo: bool = False
)

Initializes the sql alchemy engine

Source code in cattle_grid/dependencies/globals.py
@asynccontextmanager
async def alchemy_database(self, db_uri: str | None = None, echo: bool = False):
    """Initializes the sql alchemy engine"""

    if db_uri is None:
        db_uri = self.config.db_uri  # type:ignore

    if db_uri is None:
        raise ValueError("Database URI not set")

    if "postgres://" in db_uri:
        db_uri = db_uri.replace("postgres://", "postgresql+asyncpg://")

    if self.engine or self.async_session_maker:
        raise ValueError("Database already initialized")

    self.engine = create_async_engine(db_uri, echo=echo)
    self.async_session_maker = async_sessionmaker(self.engine)
    logger.info(
        "Connected to %s with sqlalchemy", re.sub("://.*@", "://***:***@", db_uri)
    )

    yield self.engine

    await self.engine.dispose()

    self.engine = None
    self.async_session_maker = None

processing

ActorProfile module-attribute

ActorProfile = Annotated[dict, Depends(get_actor_profile)]

Returns the actor profile of the actor processing the message

FactoriesForActor module-attribute

FactoriesForActor = Annotated[
    tuple[ActivityFactory, ObjectFactory],
    Depends(get_factories_for_actor),
]

Returns the activity and object factories for the actor

MessageActor module-attribute

MessageActor = Annotated[Actor, Depends(actor_for_message)]

Returns the actor for the message