Skip to content

.auth

cattle_grid.auth

create_app

create_app(filenames)

Allows running just the auth endpoint

Source code in cattle_grid/auth/__init__.py
def create_app(filenames):
    """Allows running just the auth endpoint"""

    config = load_settings(filenames)

    @asynccontextmanager
    async def lifespan(app):
        async with database(config.db_uri):  # type:ignore
            await tortoise.Tortoise.generate_schemas()
            yield

    app = FastAPI(
        lifespan=lifespan,
        title="cattle_grid.auth",
        description="""Authorization server for Fediverse applications. It basically checks HTTP Signatures for you.""",
        version=__version__,
    )

    app.include_router(create_auth_router(get_auth_config(config), tags=[]))

    return app

http_util

AcceptEntry dataclass

AcceptEntry(content_type: str, profile: str | None = None, quality: float = 1.0)

Parameters:

Name Type Description Default
content_type str
required
profile str | None
None
quality float
1.0
Source code in cattle_grid/auth/http_util.py
@dataclass
class AcceptEntry:
    content_type: str
    profile: str | None = None
    quality: float = 1.0

    @staticmethod
    def from_header(piece):
        return AcceptEntry(
            content_type=str(piece[0]),
            profile=piece[1].get("profile", None),
            quality=float(piece[1].get("q", 1.0)),
        )

    def to_content_type(self) -> ContentType:
        if self.content_type == "text/html":
            return ContentType.html
        elif self.content_type == "application/activity+json":
            return ContentType.activity_pub
        elif (
            self.content_type == "application/ld+json"
            and self.profile == "https://www.w3.org/ns/activitystreams"
        ):
            return ContentType.activity_pub
        else:
            return ContentType.other

ContentType

Bases: StrEnum

The content type of the response

Source code in cattle_grid/auth/http_util.py
class ContentType(StrEnum):
    """The content type of the response"""

    activity_pub = auto()
    html = auto()
    other = auto()

parse_accept_header

parse_accept_header(header)
>>> header = 'application/activity+json, application/ld+json; profile="https://www.w3.org/ns/activitystreams", text/html;q=0.1'
>>> parse_accept_header(header)
[AcceptEntry(content_type='application/activity+json', profile=None, quality=1.0),
    AcceptEntry(content_type='application/ld+json', profile='https://www.w3.org/ns/activitystreams', quality=1.0),
    AcceptEntry(content_type='text/html', profile=None, quality=0.1)]
Source code in cattle_grid/auth/http_util.py
def parse_accept_header(header):
    """
    ```pycon
    >>> header = 'application/activity+json, application/ld+json; profile="https://www.w3.org/ns/activitystreams", text/html;q=0.1'
    >>> parse_accept_header(header)
    [AcceptEntry(content_type='application/activity+json', profile=None, quality=1.0),
        AcceptEntry(content_type='application/ld+json', profile='https://www.w3.org/ns/activitystreams', quality=1.0),
        AcceptEntry(content_type='text/html', profile=None, quality=0.1)]

    ```
    """

    parsed = http_sf.parse(header.encode(), tltype="list")

    return [AcceptEntry.from_header(piece) for piece in parsed]  # type:ignore

model

RemoteIdentity

Bases: Model

Represents the information about a remote identity

Source code in cattle_grid/auth/model.py
class RemoteIdentity(Model):
    """Represents the information about a remote identity"""

    id = fields.IntField(primary_key=True)
    key_id = fields.CharField(max_length=512, unique=True)
    controller = fields.CharField(max_length=512)
    public_key = fields.CharField(max_length=1024)

public_key_cache

PublicKeyCache dataclass

Caches public keys in the database and fetches them using bovine_actor

Parameters:

Name Type Description Default
bovine_actor BovineActor
required
Source code in cattle_grid/auth/public_key_cache.py
@dataclass
class PublicKeyCache:
    """Caches public keys in the database and fetches them
    using bovine_actor"""

    bovine_actor: BovineActor
    """used to fetch the public key"""

    async def cryptographic_identifier(
        self, key_id: str
    ) -> CryptographicIdentifier | Literal["gone"] | None:
        """Returns "gone" if Tombstone

        :param key_id: URI of the public key to fetch
        :returns:
        """

        try:
            result = await self.bovine_actor.get(key_id)

            if result is None:
                return "gone"

            if result.get("type") == "Tombstone":
                logger.info("Got Tombstone for %s", key_id)
                return "gone"

            public_key, owner = actor_object_to_public_key(result, key_id)

            if public_key is None or owner is None:
                return None

            return CryptographicIdentifier.from_pem(public_key, owner)
        except Exception as e:
            logger.info("Failed to fetch public key for %s with %s", key_id, repr(e))
            # logger.exception(e)
            return None

    async def from_cache(self, key_id: str) -> CryptographicIdentifier | None:
        identity = await RemoteIdentity.get_or_none(key_id=key_id)

        if identity is None:
            identifier = await self.cryptographic_identifier(key_id)
            if identifier is None:
                return None

            if identifier == "gone":
                await RemoteIdentity.create(
                    key_id=key_id, public_key="gone", controller="gone"
                )
                return None

            try:
                controller, public_key = identifier.as_tuple()
                identity = await RemoteIdentity.create(
                    key_id=key_id, public_key=public_key, controller=controller
                )
                await identity.save()
            except Exception as e:
                logger.exception(e)
            return identifier

        if identity.controller == "gone":
            return None

        return CryptographicIdentifier.from_tuple(
            identity.controller, identity.public_key
        )
bovine_actor instance-attribute
bovine_actor: BovineActor

used to fetch the public key

cryptographic_identifier async
cryptographic_identifier(
    key_id: str,
) -> CryptographicIdentifier | Literal["gone"] | None

Returns “gone” if Tombstone

Parameters:

Name Type Description Default
key_id str

URI of the public key to fetch

required

Returns:

Type Description
CryptographicIdentifier | Literal['gone'] | None
Source code in cattle_grid/auth/public_key_cache.py
async def cryptographic_identifier(
    self, key_id: str
) -> CryptographicIdentifier | Literal["gone"] | None:
    """Returns "gone" if Tombstone

    :param key_id: URI of the public key to fetch
    :returns:
    """

    try:
        result = await self.bovine_actor.get(key_id)

        if result is None:
            return "gone"

        if result.get("type") == "Tombstone":
            logger.info("Got Tombstone for %s", key_id)
            return "gone"

        public_key, owner = actor_object_to_public_key(result, key_id)

        if public_key is None or owner is None:
            return None

        return CryptographicIdentifier.from_pem(public_key, owner)
    except Exception as e:
        logger.info("Failed to fetch public key for %s with %s", key_id, repr(e))
        # logger.exception(e)
        return None

router

ReverseProxyHeaders

Bases: BaseModel

Headers set by the reverse proxy

Parameters:

Name Type Description Default
x_original_method str
'get'
x_original_uri str | None
None
x_original_host str | None
None
x_forwarded_proto str
'http'
Source code in cattle_grid/auth/router.py
class ReverseProxyHeaders(BaseModel):
    """Headers set by the reverse proxy"""

    x_original_method: str = "get"
    """The original used method"""

    x_original_uri: str | None = None
    """The original request uri"""

    x_original_host: str | None = None
    """The original used host"""

    x_forwarded_proto: str = "http"
    """The protocol being used"""
x_forwarded_proto class-attribute instance-attribute
x_forwarded_proto: str = 'http'

The protocol being used

x_original_host class-attribute instance-attribute
x_original_host: str | None = None

The original used host

x_original_method class-attribute instance-attribute
x_original_method: str = 'get'

The original used method

x_original_uri class-attribute instance-attribute
x_original_uri: str | None = None

The original request uri

create_auth_router

create_auth_router(
    config: AuthConfig, tags: List[str | Enum] = ["auth"]
) -> APIRouter

Adds the authorization endpoint to the app

Source code in cattle_grid/auth/router.py
def create_auth_router(
    config: AuthConfig, tags: List[str | Enum] = ["auth"]
) -> APIRouter:
    """Adds the authorization endpoint to the app"""

    bovine_actor = config_to_bovine_actor(config)
    public_key_cache = PublicKeyCache(bovine_actor)
    signature_checker = SignatureChecker(
        public_key_cache.from_cache, skip_digest_check=True
    )

    actor_path = str(urlparse(config.actor_id).path)
    webfinger_jrd = webfinger_response(config.actor_acct_id, config.actor_id)

    username, _ = parse_fediverse_handle(config.actor_acct_id.removeprefix("acct:"))
    actor_object = Actor(
        id=config.actor_id,
        type="Service",
        public_key=config.public_key,
        preferred_username=username,
        public_key_name="mykey",
    ).build()

    @asynccontextmanager
    async def lifespan(app):
        if global_container.session:
            await bovine_actor.init(session=global_container.session)
            yield
        else:
            async with global_container.session_lifecycle():
                await bovine_actor.init(session=global_container.session)
                yield

    router = APIRouter(lifespan=lifespan, tags=tags)

    @router.get(
        actor_path,
        response_class=ActivityResponse,
    )
    async def handle_get_actor():
        """Returns the actor profile of the
        fetch actor used to retrieve public keys, e.g.

        ```json
        {
            "type": "Service",
            "id": "https://your-domain.example/cattle_grid_actor",
            ...
        }
        ```
        """
        return actor_object

    @router.get("/.well-known/webfinger")
    async def webfinger(resource: str) -> JrdData:
        """If resource is the actor corresponding to the actor fetching
        public keys, returns the corresponding Jrd. Otherwise returns
        not found"""
        if resource != config.actor_acct_id:
            raise HTTPException(404)
        return webfinger_jrd

    @router.get(
        "/auth",
        responses={
            401: {"description": "The signature was invalid"},
            403: {"description": "Request was blocked"},
        },
    )
    async def verify_signature(
        request: Request,
        response: Response,
        reverse_proxy_headers: Annotated[ReverseProxyHeaders, Header()],
    ):
        """Takes the request and checks signature. If signature check
        fails a 401 is returned. If the domain the public key belongs
        to is blocked, a 403 is returned.

        If the request is valid. The controller corresponding to
        the signature is set in the response header `X-CATTLE-GRID-REQUESTER`.

        Note: More headers than the ones listed below can be used
        to verify a signature.
        """
        headers = MutableHeaders(request.headers)

        logger.debug(headers)

        servable_content_types = should_serve(headers.get("accept"))

        if "signature" not in headers:
            if ContentType.html in servable_content_types:
                response.headers["x-cattle-grid-should-serve"] = "html"
                return ""
            elif ContentType.other in servable_content_types:
                response.headers["x-cattle-grid-should-serve"] = "other"
                return ""
            elif config.require_signature_for_activity_pub:
                raise HTTPException(401)
            else:
                return ""

        if reverse_proxy_headers.x_original_host:
            headers["Host"] = reverse_proxy_headers.x_original_host

        url = f"{reverse_proxy_headers.x_forwarded_proto}://{reverse_proxy_headers.x_original_host}{reverse_proxy_headers.x_original_uri}"

        logger.debug("Treating request as to url %s", url)

        controller = await signature_checker.validate_signature(
            reverse_proxy_headers.x_original_method.lower(),
            url,
            dict(headers.items()),
            None,
        )

        logger.debug("Got controller %s", controller)

        if controller:
            if check_block(config.domain_blocks, controller):
                logger.info("Blocked a request by %s", controller)
                raise HTTPException(403)
            response.headers["x-cattle-grid-requester"] = controller

            logger.debug("Got requester %s", controller)

            return ""

        logger.info(
            "invalid signature for request to %s => access denied",
            request.headers.get("X-Original-Uri", ""),
        )

        raise HTTPException(401)

    return router

util

check_block

check_block(
    domain_blocks: Set[str], controller: str
) -> bool

Checks if a controller’s domain is in block list

>>> check_block({"blocked.example"}, "http://actor.example/path")
False

>>> check_block({"blocked.example"}, "http://blocked.example/path")
True
Source code in cattle_grid/auth/util.py
def check_block(domain_blocks: Set[str], controller: str) -> bool:
    """Checks if a controller's domain is in block list

    ```pycon
    >>> check_block({"blocked.example"}, "http://actor.example/path")
    False

    >>> check_block({"blocked.example"}, "http://blocked.example/path")
    True

    ```
    """
    try:
        domain = urlparse(controller).netloc
        return domain in domain_blocks
    except Exception as e:
        logger.warning("Something went wrong with %s", repr(e))
        return True