Skip to content

.auth

cattle_grid.auth

auth_router module-attribute

auth_router = APIRouter(tags=['auth'])

The authentication router

create_app

create_app()

Allows running just the auth endpoint

Source code in cattle_grid/auth/__init__.py
def create_app():
    """Allows running just the auth endpoint"""

    @asynccontextmanager
    async def lifespan(app):
        async with global_container.alchemy_database() as engine:
            async with engine.begin() as conn:
                await conn.run_sync(Base.metadata.create_all)

            yield

    app = FastAPI(
        lifespan=lifespan,
        title="cattle_grid.auth",
        description="""Authorization server for Fediverse applications. It basically checks HTTP Signatures for you.""",
        version=__version__,
    )

    app.include_router(auth_router)

    return app

dependencies

AuthConfig module-attribute

AuthConfig = Annotated[
    AuthConfig, Depends(provide_auth_config)
]

Provides the configuration for the auth module

BovineActor module-attribute

BovineActor = Annotated[
    BovineActor, Depends(create_bovine_actor)
]

Returns the bovine actor

http_util

AcceptEntry dataclass

AcceptEntry(content_type: str, profile: str | None = None, quality: float = 1.0)

Parameters:

Name Type Description Default
content_type str
required
profile str | None
None
quality float
1.0
Source code in cattle_grid/auth/http_util.py
@dataclass
class AcceptEntry:
    content_type: str
    profile: str | None = None
    quality: float = 1.0

    @staticmethod
    def from_header(piece):
        return AcceptEntry(
            content_type=str(piece[0]),
            profile=piece[1].get("profile", None),
            quality=float(piece[1].get("q", 1.0)),
        )

    def to_content_type(self) -> ContentType:
        if self.content_type == "text/html":
            return ContentType.html
        elif self.content_type == "application/activity+json":
            return ContentType.activity_pub
        elif (
            self.content_type == "application/ld+json"
            and self.profile == "https://www.w3.org/ns/activitystreams"
        ):
            return ContentType.activity_pub
        else:
            return ContentType.other

ContentType

Bases: StrEnum

The content type of the response

Source code in cattle_grid/auth/http_util.py
class ContentType(StrEnum):
    """The content type of the response"""

    activity_pub = auto()
    html = auto()
    other = auto()

parse_accept_header

parse_accept_header(header)
>>> header = 'application/activity+json, application/ld+json; profile="https://www.w3.org/ns/activitystreams", text/html;q=0.1'
>>> parse_accept_header(header)
[AcceptEntry(content_type='application/activity+json', profile=None, quality=1.0),
    AcceptEntry(content_type='application/ld+json', profile='https://www.w3.org/ns/activitystreams', quality=1.0),
    AcceptEntry(content_type='text/html', profile=None, quality=0.1)]
Source code in cattle_grid/auth/http_util.py
def parse_accept_header(header):
    """
    ```pycon
    >>> header = 'application/activity+json, application/ld+json; profile="https://www.w3.org/ns/activitystreams", text/html;q=0.1'
    >>> parse_accept_header(header)
    [AcceptEntry(content_type='application/activity+json', profile=None, quality=1.0),
        AcceptEntry(content_type='application/ld+json', profile='https://www.w3.org/ns/activitystreams', quality=1.0),
        AcceptEntry(content_type='text/html', profile=None, quality=0.1)]

    ```
    """

    parsed = http_sf.parse(header.encode(), tltype="list")

    return [AcceptEntry.from_header(piece) for piece in parsed]  # type:ignore

should_serve

should_serve(header: str | None) -> List[ContentType]

Determines what content to serve

>>> should_serve("application/activity+json")
[<ContentType.activity_pub: 'activity_pub'>]

>>> should_serve("text/html")
[<ContentType.html: 'html'>]
Source code in cattle_grid/auth/http_util.py
def should_serve(header: str | None) -> List[ContentType]:
    """
    Determines what content to serve

    ```python
    >>> should_serve("application/activity+json")
    [<ContentType.activity_pub: 'activity_pub'>]

    >>> should_serve("text/html")
    [<ContentType.html: 'html'>]

    ```
    """
    if header is None:
        return [ContentType.other]

    parsed = sorted(parse_accept_header(header), key=lambda x: x.quality, reverse=True)

    return [x.to_content_type() for x in parsed if x.to_content_type() is not None]

public_key_cache

PublicKeyCache dataclass

Caches public keys in the database and fetches them using bovine_actor

Parameters:

Name Type Description Default
bovine_actor BovineActor

used to fetch the public key

required
session_maker Callable[list, AsyncSession] | None

sql session maker

None
Source code in cattle_grid/auth/public_key_cache.py
@dataclass
class PublicKeyCache:
    """Caches public keys in the database and fetches them
    using bovine_actor"""

    bovine_actor: BovineActor
    """used to fetch the public key"""

    session_maker: Callable[[], AsyncSession] | None = None
    """sql session maker"""

    async def cryptographic_identifier(
        self, key_id: str
    ) -> CryptographicIdentifier | Literal["gone"] | None:
        """Returns "gone" if Tombstone

        :param key_id: URI of the public key to fetch
        :returns:
        """

        try:
            result = await self.bovine_actor.get(key_id)

            if result is None:
                return "gone"

            if result.get("type") == "Tombstone":
                logger.info("Got Tombstone for %s", key_id)
                return "gone"

            public_key, owner = actor_object_to_public_key(result, key_id)

            if public_key is None or owner is None:
                return None

            return CryptographicIdentifier.from_pem(public_key, owner)
        except Exception as e:
            logger.info("Failed to fetch public key for %s with %s", key_id, repr(e))
            logger.exception(e)
            return None

    async def from_cache(self, key_id: str) -> CryptographicIdentifier | None:
        if not self.session_maker:
            raise Exception("Sql session must be set")

        async with self.session_maker() as session:
            try:
                identity = await session.scalar(
                    select(RemoteIdentity).where(RemoteIdentity.key_id == key_id)
                )
            except Exception as e:
                logger.exception(e)
                identity = None

            if identity:
                return result_for_identity(identity)

            identifier = await self.cryptographic_identifier(key_id)
            if identifier is None:
                return None

            if identifier == "gone":
                session.add(
                    RemoteIdentity(key_id=key_id, public_key="gone", controller="gone")
                )
                await session.commit()
                return None

            try:
                controller, public_key = identifier.as_tuple()

                session.add(
                    RemoteIdentity(
                        key_id=key_id, public_key=public_key, controller=controller
                    )
                )
                await session.commit()
            except Exception as e:
                logger.exception(e)
            return identifier
bovine_actor instance-attribute
bovine_actor: BovineActor

used to fetch the public key

session_maker class-attribute instance-attribute
session_maker: Callable[[], AsyncSession] | None = None

sql session maker

cryptographic_identifier async
cryptographic_identifier(
    key_id: str,
) -> CryptographicIdentifier | Literal["gone"] | None

Returns “gone” if Tombstone

Parameters:

Name Type Description Default
key_id str

URI of the public key to fetch

required

Returns:

Type Description
CryptographicIdentifier | Literal['gone'] | None
Source code in cattle_grid/auth/public_key_cache.py
async def cryptographic_identifier(
    self, key_id: str
) -> CryptographicIdentifier | Literal["gone"] | None:
    """Returns "gone" if Tombstone

    :param key_id: URI of the public key to fetch
    :returns:
    """

    try:
        result = await self.bovine_actor.get(key_id)

        if result is None:
            return "gone"

        if result.get("type") == "Tombstone":
            logger.info("Got Tombstone for %s", key_id)
            return "gone"

        public_key, owner = actor_object_to_public_key(result, key_id)

        if public_key is None or owner is None:
            return None

        return CryptographicIdentifier.from_pem(public_key, owner)
    except Exception as e:
        logger.info("Failed to fetch public key for %s with %s", key_id, repr(e))
        logger.exception(e)
        return None

router

auth_router module-attribute

auth_router = APIRouter(tags=['auth'])

The authentication router

ReverseProxyHeaders

Bases: BaseModel

Headers set by the reverse proxy

Parameters:

Name Type Description Default
x_original_method str

The original used method

'get'
x_original_uri str | None

The original request uri

None
x_original_host str | None

The original used host

None
x_forwarded_proto str

The protocol being used

'http'
Source code in cattle_grid/auth/router.py
class ReverseProxyHeaders(BaseModel):
    """Headers set by the reverse proxy"""

    x_original_method: str = Field("get", description="""The original used method""")
    x_original_uri: str | None = Field(None, description="""The original request uri""")
    x_original_host: str | None = Field(None, description="""The original used host""")
    x_forwarded_proto: str = Field("http", description="""The protocol being used""")

handle_get_actor async

handle_get_actor(actor_object: ActorObject)

Returns the actor profile of the fetch actor used to retrieve public keys, e.g.

{
    "type": "Service",
    "id": "https://your-domain.example/cattle_grid_actor",
    ...
}
Source code in cattle_grid/auth/router.py
@auth_router.get(
    "/cattle_grid_actor",
    response_class=ActivityResponse,
)
async def handle_get_actor(actor_object: ActorObject):
    """Returns the actor profile of the
    fetch actor used to retrieve public keys, e.g.

    ```json
    {
        "type": "Service",
        "id": "https://your-domain.example/cattle_grid_actor",
        ...
    }
    ```
    """
    return actor_object

verify_signature async

verify_signature(
    request: Request,
    response: Response,
    config: AuthConfig,
    signature_checker: SignatureCheckWithCache,
    reverse_proxy_headers: Annotated[
        ReverseProxyHeaders, Header()
    ],
) -> str

Takes the request and checks signature. If signature check fails a 401 is returned. If the domain the public key belongs to is blocked, a 403 is returned.

If the request is valid. The controller corresponding to the signature is set in the response header X-CATTLE-GRID-REQUESTER.

The header X-CATTLE-GRID-SHOULD-SERVE is set to html if one should redirect to the HTML resource. It is set to other if the resource to serve cannot be determined. This is only used for unsigned requests.

Note: More headers than the ones listed below can be used to verify a signature.

Source code in cattle_grid/auth/router.py
@auth_router.get(
    "/auth",
    responses={
        200: {"description": "Request is valid", "content": {"text/plain": ""}},
        401: {"description": "The signature was invalid"},
        403: {"description": "Request was blocked"},
    },
    response_class=PlainTextResponse,
)
async def verify_signature(
    request: Request,
    response: Response,
    config: AuthConfigDependency,
    signature_checker: SignatureCheckWithCache,
    reverse_proxy_headers: Annotated[ReverseProxyHeaders, Header()],
) -> str:
    """Takes the request and checks signature. If signature check
    fails a 401 is returned. If the domain the public key belongs
    to is blocked, a 403 is returned.

    If the request is valid. The controller corresponding to
    the signature is set in the response header `X-CATTLE-GRID-REQUESTER`.

    The header `X-CATTLE-GRID-SHOULD-SERVE` is set to `html`
    if one should redirect to the HTML resource. It is set to `other` if the resource to serve cannot be determined.
    This is only used for unsigned requests.

    Note: More headers than the ones listed below can be used
    to verify a signature.
    """
    headers = MutableHeaders(request.headers)

    logger.debug(headers)

    logger.info("Got accept header %s", headers.get("accept"))

    servable_content_types = should_serve(headers.get("accept"))

    if "signature" not in headers:
        if ContentType.html in servable_content_types:
            response.headers["x-cattle-grid-should-serve"] = "html"
            return ""
        elif ContentType.other in servable_content_types:
            response.headers["x-cattle-grid-should-serve"] = "other"
            return ""
        elif config.require_signature_for_activity_pub:
            raise HTTPException(401)
        else:
            return ""

    if reverse_proxy_headers.x_original_host:
        headers["Host"] = reverse_proxy_headers.x_original_host

    url = f"{reverse_proxy_headers.x_forwarded_proto}://{reverse_proxy_headers.x_original_host}{reverse_proxy_headers.x_original_uri}"

    logger.debug("Treating request as to url %s", url)

    controller = await signature_checker.validate_signature(
        reverse_proxy_headers.x_original_method.lower(),
        url,
        dict(headers.items()),
        None,
    )

    logger.debug("Got controller %s", controller)

    if controller:
        if check_block(config.domain_blocks, controller):
            logger.info("Blocked a request by %s", controller)
            raise HTTPException(403)
        response.headers["x-cattle-grid-requester"] = controller

        logger.debug("Got requester %s", controller)

        return ""

    logger.info(
        "invalid signature for request to %s => access denied",
        request.headers.get("X-Original-Uri", ""),
    )

    raise HTTPException(401)

webfinger async

webfinger(resource: str, config: AuthConfig) -> JrdData

If resource is the actor corresponding to the actor fetching public keys, returns the corresponding Jrd. Otherwise returns not found

Source code in cattle_grid/auth/router.py
@auth_router.get("/.well-known/webfinger")
async def webfinger(resource: str, config: AuthConfigDependency) -> JrdData:
    """If resource is the actor corresponding to the actor fetching
    public keys, returns the corresponding Jrd. Otherwise returns
    not found"""
    logger.info(config)
    if resource != config.actor_acct_id:
        raise HTTPException(404)
    return webfinger_response(config.actor_acct_id, config.actor_id)

util

check_block

check_block(
    domain_blocks: Set[str], controller: str
) -> bool

Checks if a controller’s domain is in block list

>>> check_block({"blocked.example"}, "http://actor.example/path")
False

>>> check_block({"blocked.example"}, "http://blocked.example/path")
True
Source code in cattle_grid/auth/util.py
def check_block(domain_blocks: Set[str], controller: str) -> bool:
    """Checks if a controller's domain is in block list

    ```pycon
    >>> check_block({"blocked.example"}, "http://actor.example/path")
    False

    >>> check_block({"blocked.example"}, "http://blocked.example/path")
    True

    ```
    """
    try:
        domain = urlparse(controller).netloc
        return domain in domain_blocks
    except Exception as e:
        logger.warning("Something went wrong with %s", repr(e))
        return True