Skip to content

cattle_grid.dependencies

Dependencies injected by fast_depends

cattle_grid uses dependencies to manage objects, one needs access to. This works by declaring them using fast_depends.Depends and then injecting them using fast_depends.inject.

For example if you want to make a webrequest using the aiohttp.ClientSession, you could use

from cattle_grid.dependencies import ClientSession

async def web_request(session: ClientSession):
    response = await session.get("...")

This function can then be called via

from fast_depends import inject

await inject(web_request)()

This package contains annotations that should be available in all code using cattle_grid, i.e. extensions. The sub packages contain methods for more specific use cases.

AccountExchangePublisher module-attribute

AccountExchangePublisher = Annotated[
    Callable, Depends(AccountExchangePublisherClass)
]

Publishes a message to the activity exchange

ActivityExchangePublisher module-attribute

ActivityExchangePublisher = Annotated[
    Callable, Depends(ActivityExchangePublisherClass)
]

Publishes a message to the activity exchange. Sample usage:

async def my_method(publisher: ActivityExchangePublisher):
    message = {"actor": my_actor_id, "data": {"type": "Activity"}}
    await publisher(message, routing_key="send_message")

ActivityExchangeRequester module-attribute

ActivityExchangeRequester = Annotated[
    Callable, Depends(ActivityExchangeRequesterClass)
]

Publishes a message to the activity exchange. Sample usage:

async def my_method(requester: ActivityRequesterPublisher):
    message = {"actor": my_actor_id, "data": {"type": "Activity"}}
    await requester(message, routing_key="fetch")

ClientSession module-attribute

ClientSession = Annotated[
    ClientSession, Depends(get_client_session)
]

The aiohttp.ClientSession used by the application

CommittingSession module-attribute

CommittingSession = Annotated[
    AsyncSession, Depends(with_session_commit)
]

Session that commits the transaction

Config module-attribute

Config = Annotated[Dynaconf, Depends(get_config)]

Returns the configuration

InternalExchangePublisher module-attribute

InternalExchangePublisher = Annotated[
    Callable, Depends(InternalExchangePublisherClass)
]

Publishes a message to the internal exchange

InternalExchangeRequester module-attribute

InternalExchangeRequester = Annotated[
    Callable, Depends(InternalExchangeRequesterClass)
]

Request a message to the internal exchange.

SqlAsyncEngine module-attribute

SqlAsyncEngine = Annotated[AsyncEngine, Depends(get_engine)]

Returns the SqlAlchemy AsyncEngine

SqlSession module-attribute

SqlSession = Annotated[
    AsyncSession, Depends(with_sql_session)
]

SQL session that does not commit afterwards