Skip to content

cattle_grid.dependencies.fastapi

FastAPI uses fastapi.Depends instead of fast_depends.Depends, so when building a fastapi.APIRouter, one needs to use dependencies using the former. These are provided in this package.

ActivityExchange module-attribute

ActivityExchange = Annotated[
    RabbitExchange, Depends(get_exchange)
]

The Activity Exchange

ActivityExchangePublisher module-attribute

ActivityExchangePublisher = Annotated[
    Callable, Depends(ActivityExchangePublisherClass)
]

Publisher to the activity exchange

ActivityExchangeRequester module-attribute

ActivityExchangeRequester = Annotated[
    Callable, Depends(ActivityExchangeRequesterClass)
]

Requester to the activity exchange

Broker module-attribute

Broker = Annotated[RabbitBroker, Depends(get_broker)]

The RabbitMQ broker

ClientSession module-attribute

ClientSession = Annotated[
    ClientSession, Depends(get_client_session)
]

The aiohttp.ClientSession used by the application

CommittingSqlSession module-attribute

CommittingSqlSession = Annotated[
    AsyncSession, Depends(with_committing_sql_session)
]

Session annotation to be used with FastAPI. A commit is performed, after processing the request

Config module-attribute

Config = Annotated[Dynaconf, Depends(get_config)]

Returns the configuration

MethodInformation module-attribute

MethodInformation = Annotated[
    List[MethodInformationModel],
    Depends(get_method_information),
]

Returns the information about the methods that are a part of the exchange

SqlAsyncEngine module-attribute

SqlAsyncEngine = Annotated[AsyncEngine, Depends(get_engine)]

Returns the SqlAlchemy AsyncEngine

SqlSession module-attribute

SqlSession = Annotated[
    AsyncSession, Depends(with_fast_api_session)
]

Session annotation to be used with FastAPI

Transformer module-attribute

Transformer = Annotated[
    Callable[[Dict], Awaitable[Dict]],
    Depends(get_transformer),
]

The transformer loaded from extensions

ActivityExchangePublisherClass dataclass

ActivityExchangePublisherClass(exchange: typing.Annotated[faststream.rabbit.schemas.exchange.RabbitExchange, Depends(get_exchange)], broker: typing.Annotated[faststream.rabbit.broker.broker.RabbitBroker, Depends(get_broker)])

Parameters:

Name Type Description Default
exchange RabbitExchange
required
broker RabbitBroker
required
Source code in cattle_grid/dependencies/fastapi.py
@dataclass
class ActivityExchangePublisherClass:
    exchange: ActivityExchange
    broker: Broker

    async def __call__(self, *args, **kwargs):
        kwargs_updated = {**kwargs}
        kwargs_updated.update(dict(exchange=self.exchange))
        return await self.broker.publish(*args, **kwargs_updated)

ActivityExchangeRequesterClass dataclass

ActivityExchangeRequesterClass(exchange: typing.Annotated[faststream.rabbit.schemas.exchange.RabbitExchange, Depends(get_exchange)], broker: typing.Annotated[faststream.rabbit.broker.broker.RabbitBroker, Depends(get_broker)])

Parameters:

Name Type Description Default
exchange RabbitExchange
required
broker RabbitBroker
required
Source code in cattle_grid/dependencies/fastapi.py
@dataclass
class ActivityExchangeRequesterClass:
    exchange: ActivityExchange
    broker: Broker

    async def __call__(self, *args, **kwargs):
        kwargs_updated = {**kwargs}
        kwargs_updated.update(dict(exchange=self.exchange))
        result = await self.broker.request(*args, **kwargs_updated)
        return json.loads(result.body)