Skip to content

.auth

cattle_grid.auth

auth_router module-attribute

auth_router = APIRouter(tags=['auth'])

The authentication router

create_app

create_app()

Allows running just the auth endpoint

Source code in cattle_grid/auth/__init__.py
def create_app():
    """Allows running just the auth endpoint"""

    @asynccontextmanager
    async def lifespan(app):
        async with global_container.alchemy_database() as engine:
            async with engine.begin() as conn:
                await conn.run_sync(Base.metadata.create_all)

            yield

    app = FastAPI(
        lifespan=lifespan,
        title="cattle_grid.auth",
        description="""Authorization server for Fediverse applications. It basically checks HTTP Signatures for you.""",
        version=__version__,
    )

    app.include_router(auth_router)

    return app

dependencies

AuthConfig module-attribute

AuthConfig = Annotated[
    AuthConfig, Depends(provide_auth_config)
]

Provides the configuration for the auth module

BovineActor module-attribute

BovineActor = Annotated[
    BovineActor, Depends(create_bovine_actor)
]

Returns the bovine actor

public_key_cache

PublicKeyCache dataclass

Caches public keys in the database and fetches them using bovine_actor

Parameters:

Name Type Description Default
bovine_actor BovineActor

used to fetch the public key

required
session_maker Callable[list, AsyncSession] | None

sql session maker

None
Source code in cattle_grid/auth/public_key_cache.py
@dataclass
class PublicKeyCache:
    """Caches public keys in the database and fetches them
    using bovine_actor"""

    bovine_actor: BovineActor
    """used to fetch the public key"""

    session_maker: Callable[[], AsyncSession] | None = None
    """sql session maker"""

    async def cryptographic_identifier(
        self, key_id: str
    ) -> CryptographicIdentifier | Literal["gone"] | None:
        """Returns "gone" if Tombstone

        :param key_id: URI of the public key to fetch
        :returns:
        """

        try:
            result = await self.bovine_actor.get(key_id)

            if result is None:
                return "gone"

            if result.get("type") == "Tombstone":
                logger.info("Got Tombstone for %s", key_id)
                return "gone"

            public_key, owner = public_key_owner_from_dict(result, key_id)

            if public_key is None or owner is None:
                return None

            return CryptographicIdentifier.from_pem(public_key, owner)
        except Exception as e:
            logger.info("Failed to fetch public key for %s with %s", key_id, repr(e))
            logger.debug(e)
            return None

    async def from_cache(self, key_id: str) -> CryptographicIdentifier | None:
        if not self.session_maker:
            raise Exception("Sql session must be set")

        async with self.session_maker() as session:
            try:
                identity = await session.scalar(
                    select(RemoteIdentity).where(RemoteIdentity.key_id == key_id)
                )
            except Exception as e:
                logger.debug(e)
                identity = None

            if identity:
                return result_for_identity(identity)

            identifier = await self.cryptographic_identifier(key_id)
            if identifier is None:
                return None

            if identifier == "gone":
                session.add(
                    RemoteIdentity(key_id=key_id, public_key="gone", controller="gone")
                )
                await session.commit()
                return None

            try:
                controller, public_key = identifier.as_tuple()

                session.add(
                    RemoteIdentity(
                        key_id=key_id, public_key=public_key, controller=controller
                    )
                )
                await session.commit()
            except Exception as e:
                logger.exception(e)
            return identifier
bovine_actor instance-attribute
bovine_actor: BovineActor

used to fetch the public key

session_maker class-attribute instance-attribute
session_maker: Callable[[], AsyncSession] | None = None

sql session maker

cryptographic_identifier async
cryptographic_identifier(
    key_id: str,
) -> CryptographicIdentifier | Literal["gone"] | None

Returns “gone” if Tombstone

Parameters:

Name Type Description Default
key_id str

URI of the public key to fetch

required

Returns:

Type Description
CryptographicIdentifier | Literal['gone'] | None
Source code in cattle_grid/auth/public_key_cache.py
async def cryptographic_identifier(
    self, key_id: str
) -> CryptographicIdentifier | Literal["gone"] | None:
    """Returns "gone" if Tombstone

    :param key_id: URI of the public key to fetch
    :returns:
    """

    try:
        result = await self.bovine_actor.get(key_id)

        if result is None:
            return "gone"

        if result.get("type") == "Tombstone":
            logger.info("Got Tombstone for %s", key_id)
            return "gone"

        public_key, owner = public_key_owner_from_dict(result, key_id)

        if public_key is None or owner is None:
            return None

        return CryptographicIdentifier.from_pem(public_key, owner)
    except Exception as e:
        logger.info("Failed to fetch public key for %s with %s", key_id, repr(e))
        logger.debug(e)
        return None

find_with_item

find_with_item(dict_list, key_id)

Given a list of dictionaries, finds the dictionary with id = key_id

Source code in cattle_grid/auth/public_key_cache.py
def find_with_item(dict_list, key_id):
    """Given a list of dictionaries, finds the dictionary with
    id = key_id"""
    for key in dict_list:
        if key.get("id") == key_id:
            return key
    return None

public_key_owner_from_dict

public_key_owner_from_dict(
    actor: dict, key_id: str
) -> tuple[str | None, str | None]

Given an actor and key_id returns the public_key and the owner. This method directly checks the key publicKey

Source code in cattle_grid/auth/public_key_cache.py
def public_key_owner_from_dict(
    actor: dict, key_id: str
) -> tuple[str | None, str | None]:
    """Given an actor and key_id returns the public_key and the owner. This method directly checks the key `publicKey`"""

    public_key_data = actor.get("publicKey", {})

    if isinstance(public_key_data, list):
        if len(public_key_data) == 1:
            public_key_data = public_key_data[0]
        else:
            public_key_data = find_with_item(public_key_data, key_id)

    if not public_key_data:
        return None, None

    public_key = public_key_data.get("publicKeyPem")
    owner = public_key_data.get("owner")

    return public_key, owner

router

auth_router module-attribute

auth_router = APIRouter(tags=['auth'])

The authentication router

ReverseProxyHeaders

Bases: BaseModel

Headers set by the reverse proxy

Parameters:

Name Type Description Default
x_original_method str

The original used method

'get'
x_original_uri str | None

The original request uri

None
x_original_host str | None

The original used host

None
x_forwarded_proto str

The protocol being used

'http'
Source code in cattle_grid/auth/router.py
class ReverseProxyHeaders(BaseModel):
    """Headers set by the reverse proxy"""

    x_original_method: str = Field("get", description="""The original used method""")
    x_original_uri: str | None = Field(None, description="""The original request uri""")
    x_original_host: str | None = Field(None, description="""The original used host""")
    x_forwarded_proto: str = Field("http", description="""The protocol being used""")

handle_get_actor async

handle_get_actor(actor_object: ActorObject)

Returns the actor profile of the fetch actor used to retrieve public keys, e.g.

{
    "type": "Service",
    "id": "https://your-domain.example/cattle_grid_actor",
    ...
}
Source code in cattle_grid/auth/router.py
@auth_router.get(
    "/cattle_grid_actor",
    response_class=ActivityResponse,
)
async def handle_get_actor(actor_object: ActorObject):
    """Returns the actor profile of the
    fetch actor used to retrieve public keys, e.g.

    ```json
    {
        "type": "Service",
        "id": "https://your-domain.example/cattle_grid_actor",
        ...
    }
    ```
    """
    return actor_object

verify_signature async

verify_signature(
    request: Request,
    response: Response,
    config: AuthConfig,
    signature_checker: SignatureCheckWithCache,
    reverse_proxy_headers: Annotated[
        ReverseProxyHeaders, Header()
    ],
    servable_content_types: ShouldServe,
) -> str

Takes the request and checks signature. If signature check fails a 401 is returned. If the domain the public key belongs to is blocked, a 403 is returned.

If the request is valid. The controller corresponding to the signature is set in the response header X-CATTLE-GRID-REQUESTER.

The header X-CATTLE-GRID-SHOULD-SERVE is set to html if one should redirect to the HTML resource. It is set to other if the resource to serve cannot be determined. This is only used for unsigned requests.

Note: More headers than the ones listed below can be used to verify a signature.

Source code in cattle_grid/auth/router.py
@auth_router.get(
    "/auth",
    responses={
        200: {"description": "Request is valid", "content": {"text/plain": ""}},
        401: {"description": "The signature was invalid"},
        403: {"description": "Request was blocked"},
    },
    response_class=PlainTextResponse,
)
async def verify_signature(
    request: Request,
    response: Response,
    config: AuthConfigDependency,
    signature_checker: SignatureCheckWithCache,
    reverse_proxy_headers: Annotated[ReverseProxyHeaders, Header()],
    servable_content_types: ShouldServe,
) -> str:
    """Takes the request and checks signature. If signature check
    fails a 401 is returned. If the domain the public key belongs
    to is blocked, a 403 is returned.

    If the request is valid. The controller corresponding to
    the signature is set in the response header `X-CATTLE-GRID-REQUESTER`.

    The header `X-CATTLE-GRID-SHOULD-SERVE` is set to `html`
    if one should redirect to the HTML resource. It is set to `other` if the resource to serve cannot be determined.
    This is only used for unsigned requests.

    Note: More headers than the ones listed below can be used
    to verify a signature.
    """
    headers = MutableHeaders(request.headers)

    logger.debug("Got auth request with headers:")
    logger.debug(headers)

    if "signature" not in headers:
        if ContentType.html in servable_content_types:
            response.headers["x-cattle-grid-should-serve"] = "html"
            return ""
        elif ContentType.other in servable_content_types:
            response.headers["x-cattle-grid-should-serve"] = "other"
            return ""
        elif config.require_signature_for_activity_pub:
            raise HTTPException(401)
        else:
            return ""

    if reverse_proxy_headers.x_original_host:
        headers["Host"] = reverse_proxy_headers.x_original_host

    url = f"{reverse_proxy_headers.x_forwarded_proto}://{reverse_proxy_headers.x_original_host}{reverse_proxy_headers.x_original_uri}"

    logger.debug("Treating request as to url %s", url)

    controller = await signature_checker.validate_signature(
        reverse_proxy_headers.x_original_method.lower(),
        url,
        dict(headers.items()),
        None,
    )

    logger.debug("Got controller %s", controller)

    if controller:
        if check_block(config.domain_blocks, controller):
            logger.info("Blocked a request by %s", controller)
            raise HTTPException(403)
        response.headers["x-cattle-grid-requester"] = controller

        logger.debug("Got requester %s", controller)

        return ""

    logger.info(
        "invalid signature for request to %s => access denied",
        request.headers.get("X-Original-Uri", ""),
    )

    raise HTTPException(401)

webfinger async

webfinger(resource: str, config: AuthConfig) -> JrdData

If resource is the actor corresponding to the actor fetching public keys, returns the corresponding Jrd. Otherwise returns not found

Source code in cattle_grid/auth/router.py
@auth_router.get("/.well-known/webfinger")
async def webfinger(resource: str, config: AuthConfigDependency) -> JrdData:
    """If resource is the actor corresponding to the actor fetching
    public keys, returns the corresponding Jrd. Otherwise returns
    not found"""
    logger.info(config)
    if resource != config.actor_acct_id:
        raise HTTPException(404)
    return webfinger_response(config.actor_acct_id, config.actor_id)

util

check_block

check_block(
    domain_blocks: Set[str], controller: str
) -> bool

Checks if a controller’s domain is in block list

>>> check_block({"blocked.example"}, "http://actor.example/path")
False

>>> check_block({"blocked.example"}, "http://blocked.example/path")
True
Source code in cattle_grid/auth/util.py
def check_block(domain_blocks: Set[str], controller: str) -> bool:
    """Checks if a controller's domain is in block list

    ```pycon
    >>> check_block({"blocked.example"}, "http://actor.example/path")
    False

    >>> check_block({"blocked.example"}, "http://blocked.example/path")
    True

    ```
    """
    try:
        domain = urlparse(controller).netloc
        return domain in domain_blocks
    except Exception as e:
        logger.warning("Something went wrong with %s", repr(e))
        return True