Skip to content

cattle_grid.dependencies.internals

AccountExchange module-attribute

AccountExchange = Annotated[
    RabbitExchange, Depends(get_account_exchange)
]

The account exchange

ActivityExchange module-attribute

ActivityExchange = Annotated[
    RabbitExchange, Depends(get_activity_exchange)
]

The activity exchange

CorrelationId module-attribute

CorrelationId = Annotated[
    str, Context("message.correlation_id")
]

The correlation id of the message

InternalExchange module-attribute

InternalExchange = Annotated[
    RabbitExchange, Depends(get_internal_exchange)
]

The interal activity exchange

LookupAnnotation module-attribute

LookupAnnotation = Annotated[
    LookupMethod, Depends(get_lookup)
]

The lookup method loaded from extensions

MethodInformation module-attribute

MethodInformation = Annotated[
    list[MethodInformationModel],
    Depends(get_method_information),
]

Returns the information about the methods that are a part of the exchange

RewriteRules module-attribute

RewriteRules = Annotated[
    RewriteConfiguration, Depends(get_rewrite_rules)
]

Rewturns the rewrite configuration

Transformer module-attribute

Transformer = Annotated[
    Callable[..., Awaitable[dict]], Depends(get_transformer)
]

The transformer loaded from extensions

AccountExchangePublisherClass dataclass

Bases: BasePublisherClass

AccountExchangePublisherClass(correlation_id: typing.Annotated[str, Context(required=True, cast=False)], exchange: typing.Annotated[faststream.rabbit.schemas.exchange.RabbitExchange, Dependant(get_account_exchange)], broker: faststream.rabbit.broker.broker.RabbitBroker = Context(required=True, cast=False))

Parameters:

Name Type Description Default
correlation_id str
required
exchange RabbitExchange
required
broker RabbitBroker
Context(required=True, cast=False)
Source code in cattle_grid/dependencies/internals.py
@dataclass
class AccountExchangePublisherClass(BasePublisherClass):
    correlation_id: CorrelationId
    exchange: AccountExchange
    broker: RabbitBroker = Context()

ActivityExchangePublisherClass dataclass

Bases: BasePublisherClass

ActivityExchangePublisherClass(correlation_id: typing.Annotated[str, Context(required=True, cast=False)], exchange: typing.Annotated[faststream.rabbit.schemas.exchange.RabbitExchange, Dependant(get_activity_exchange)], broker: faststream.rabbit.broker.broker.RabbitBroker = Context(required=True, cast=False))

Parameters:

Name Type Description Default
correlation_id str
required
exchange RabbitExchange
required
broker RabbitBroker
Context(required=True, cast=False)
Source code in cattle_grid/dependencies/internals.py
@dataclass
class ActivityExchangePublisherClass(BasePublisherClass):
    correlation_id: CorrelationId
    exchange: ActivityExchange
    broker: RabbitBroker = Context()

ActivityExchangeRequesterClass dataclass

Bases: BaseRequesterClass

ActivityExchangeRequesterClass(correlation_id: typing.Annotated[str, Context(required=True, cast=False)], exchange: typing.Annotated[faststream.rabbit.schemas.exchange.RabbitExchange, Dependant(get_activity_exchange)], broker: faststream.rabbit.broker.broker.RabbitBroker = Context(required=True, cast=False))

Parameters:

Name Type Description Default
correlation_id str
required
exchange RabbitExchange
required
broker RabbitBroker
Context(required=True, cast=False)
Source code in cattle_grid/dependencies/internals.py
@dataclass
class ActivityExchangeRequesterClass(BaseRequesterClass):
    correlation_id: CorrelationId
    exchange: ActivityExchange
    broker: RabbitBroker = Context()

InternalExchangePublisherClass dataclass

Bases: BasePublisherClass

InternalExchangePublisherClass(correlation_id: typing.Annotated[str, Context(required=True, cast=False)], exchange: typing.Annotated[faststream.rabbit.schemas.exchange.RabbitExchange, Dependant(get_internal_exchange)], broker: faststream.rabbit.broker.broker.RabbitBroker = Context(required=True, cast=False))

Parameters:

Name Type Description Default
correlation_id str
required
exchange RabbitExchange
required
broker RabbitBroker
Context(required=True, cast=False)
Source code in cattle_grid/dependencies/internals.py
@dataclass
class InternalExchangePublisherClass(BasePublisherClass):
    correlation_id: CorrelationId
    exchange: InternalExchange
    broker: RabbitBroker = Context()

InternalExchangeRequesterClass dataclass

Bases: BaseRequesterClass

InternalExchangeRequesterClass(correlation_id: typing.Annotated[str, Context(required=True, cast=False)], exchange: typing.Annotated[faststream.rabbit.schemas.exchange.RabbitExchange, Dependant(get_internal_exchange)], broker: faststream.rabbit.broker.broker.RabbitBroker = Context(required=True, cast=False))

Parameters:

Name Type Description Default
correlation_id str
required
exchange RabbitExchange
required
broker RabbitBroker
Context(required=True, cast=False)
Source code in cattle_grid/dependencies/internals.py
@dataclass
class InternalExchangeRequesterClass(BaseRequesterClass):
    correlation_id: CorrelationId
    exchange: InternalExchange
    broker: RabbitBroker = Context()