Skip to content

aiohttp_signer

aiohttp_signer

DateMiddleware dataclass

Adds the date header, see RFC 9110

Parameters:

Name Type Description Default
date_maker Callable[list, datetime]
<function default_date_maker at 0x7fe98ea96980>
Source code in aiohttp_signer/date.py
@dataclass
class DateMiddleware:
    """Adds the date header, see [RFC 9110](https://httpwg.org/specs/rfc9110.html#field.date)"""

    date_maker: Callable[[], datetime] = field(default=default_date_maker)

    async def __call__(self, req: ClientRequest, handler: ClientHandlerType):
        req.headers.add("date", self.date_maker().strftime(GMT_STRING))

        return await handler(req)

DigestAlgorithm

Bases: StrEnum

Algorithm to use for digest

Source code in aiohttp_signer/digest.py
class DigestAlgorithm(StrEnum):
    """Algorithm to use for digest"""

    sha256 = auto()
    sha512 = auto()

DigestMiddleware dataclass

Implements adding the digest header. By default RFC 9530 Digest Fields is used with sha256.

Parameters:

Name Type Description Default
mode DigestMode
<DigestMode.rfc9430: 'rfc9430'>
algorithm DigestAlgorithm
<DigestAlgorithm.sha256: 'sha256'>
Source code in aiohttp_signer/digest.py
@dataclass
class DigestMiddleware:
    """Implements adding the digest header. By default
    [RFC 9530 Digest Fields](https://www.rfc-editor.org/rfc/rfc9530.html)
    is used with sha256."""

    mode: DigestMode = DigestMode.rfc9430
    algorithm: DigestAlgorithm = DigestAlgorithm.sha256

    async def __call__(self, req: ClientRequest, handler: ClientHandlerType):
        if req.body == b"":
            return await handler(req)

        value = compute_digest(await req.body.as_bytes(), self.mode, self.algorithm)
        req.headers.add(mode_to_header_name(self.mode), value)

        return await handler(req)

DigestMode

Bases: StrEnum

Mode to display the digest

Source code in aiohttp_signer/digest.py
class DigestMode(StrEnum):
    """Mode to display the digest"""

    rfc9430 = auto()
    rfc3230 = auto()
    rfc3230_upper = auto()

DraftCavageSigner dataclass

Signs using draft cavage

Parameters:

Name Type Description Default
signer Callable[list, bytes]
required
key_id str
required
params list[str]
['(request-target)', 'host', 'date', 'digest']
Source code in aiohttp_signer/draft_cavage/__init__.py
@dataclass
class DraftCavageSigner:
    """Signs using draft cavage"""

    signer: Callable[[bytes], bytes]
    key_id: str
    params: list[str] = field(default_factory=default_params)

    async def __call__(self, req: ClientRequest, handler: ClientHandlerType):
        fields = DraftCavageFields(req.method, str(req.url), headers=req.headers)
        message = self._message(fields).encode()
        signature = b64encode(self.signer(message)).decode()
        req.headers["signature"] = self._build_signature(signature)

        return await handler(req)

    def _build_signature(self, signature: str):
        headers = " ".join(self.params)

        signature_parts = [
            f'keyId="{self.key_id}"',
            'algorithm="rsa-sha256"',
            f'headers="{headers}"',
            f'signature="{signature}"',
        ]

        return ",".join(signature_parts)

    def _message(self, fields: DraftCavageFields):
        lines = []
        for param in self.params:
            if fields.has_component(param):
                lines.append(f"{param}: {fields.value_for_component(param)}")

        return "\n".join(lines)

Rfc9421Signer dataclass

Signs using RFC-9421

Parameters:

Name Type Description Default
signer Callable[list, bytes]
required
key_id str
required
created_maker Callable[list, int]
<function default_created_maker at 0x7fe98ea96e50>
params list[str]
['@method', '@target-uri', 'content-digest']
nonce str | None
None
name str

Name to use for the signature

'sig-b1'
Source code in aiohttp_signer/rfc9421/__init__.py
@dataclass
class Rfc9421Signer:
    """Signs using [RFC-9421](https://www.rfc-editor.org/rfc/rfc9421.html)"""

    signer: Callable[[bytes], bytes]
    key_id: str
    created_maker: Callable[[], int] = default_created_maker
    params: list[str] = field(default_factory=default_params)
    nonce: str | None = None
    name: str = field(
        default="sig-b1", metadata={"description": "Name to use for the signature"}
    )

    @property
    def quoted_params(self):
        return [f'"{x}"' for x in self.params]

    async def __call__(self, req: ClientRequest, handler: ClientHandlerType):
        fields = Rfc9421Fields(req.method, str(req.url), req.headers)
        req.headers.add("signature-input", self._input_header())
        req.headers.add("signature", self._signature_header(fields))

        return await handler(req)

    def _signature_params(self):
        parts = [
            "(" + " ".join(self.quoted_params) + ")",
            f"created={self.created_maker()}",
            f'keyid="{self.key_id}"',
        ]
        if self.nonce:
            parts.append(f'nonce="{self.nonce}"')

        return ";".join(parts)

    def _message(self, fields: Rfc9421Fields):
        lines = []
        for param in self.params:
            value = fields.value_for_component(param)
            lines.append(f'"{param}": {value}')

        lines.append(f'"@signature-params": {self._signature_params()}')
        result = "\n".join(lines)

        return result

    def _input_header(self):
        return f"{self.name}={self._signature_params()}"

    def _signature_header(self, fields: Rfc9421Fields):
        message = self._message(fields)
        signature = self.signer(message.encode())
        return f"{self.name}=:{b64encode(signature).decode()}:"