Skip to content

roboherd.herd

RoboHerd dataclass

RoboHerd(name: str = ‘RoboHerd’, base_url: str = ‘http://abel’, manager: roboherd.herd.manager.HerdManager = , cows: List[roboherd.cow.RoboCow] = )

Parameters:

Name Type Description Default
name str
'RoboHerd'
base_url str
'http://abel'
manager HerdManager

HerdManager(prefix: str = ‘bot:’, herd_config: roboherd.herd.manager.config.HerdConfig = )

<dynamic>
cows List[RoboCow]

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

<dynamic>
Source code in roboherd/herd/__init__.py
@dataclass
class RoboHerd:
    name: str = "RoboHerd"
    base_url: str = "http://abel"

    manager: HerdManager = field(default_factory=HerdManager)
    cows: List[RoboCow] = field(default_factory=list)

    async def run(self, connection: Almabtrieb):
        async with connection:
            self.validate(connection)
            await self.startup(connection)
            await self.process(connection)

    async def startup(self, connection: Almabtrieb):
        if not connection.information:
            raise Exception("Could not get information from server")

        self.cows = self.manager.existing_cows(connection.information.actors)

        cows_to_create = self.manager.cows_to_create(connection.information.actors)

        for cow_config in cows_to_create:
            logger.info("Creating cow with name %s", cow_config.name)
            cow = cow_config.load()
            result = await connection.create_actor(
                name=f"{self.manager.prefix}{cow_config.name}",
                base_url=cow.internals.base_url or self.base_url,
                preferred_username=cow.information.handle,
                profile={"type": "Service"},
                automatically_accept_followers=True,
            )
            cow.internals.actor_id = result.get("id")

            self.cows.append(cow)

        for cow in self.cows:
            await cow.run_startup(connection)

    async def process(self, connection: Almabtrieb):
        async with asyncio.TaskGroup() as tg:
            logger.info("Starting processing tasks")

            processor = HerdProcessor(connection, self.incoming_handlers())
            processor.create_tasks(tg)

            scheduler = HerdScheduler(self.cron_entries(), connection)
            scheduler.create_task(tg)

    def validate(self, connection):
        result = connection.information

        logger.info("Got base urls: %s", ",".join(result.base_urls))

        if self.base_url not in result.base_urls:
            logger.error(
                "Configure base url %s not in base urls %s of server",
                self.base_url,
                ", ".join(result.base_urls),
            )
            raise ValueError("Incorrectly configured base url")

    def cron_entries(self) -> List[Tuple[RoboCow, CronEntry]]:
        """Returns the cron entries of all cows"""

        result = []
        for cow in self.cows:
            for cron_entry in cow.internals.cron_entries:
                result.append((cow, cron_entry))

        return result

    def incoming_handlers(self) -> List[RoboCow]:
        result = []
        for cow in self.cows:
            if cow.internals.handlers.has_handlers:
                result.append(cow)
        return result

cron_entries()

Returns the cron entries of all cows

Source code in roboherd/herd/__init__.py
def cron_entries(self) -> List[Tuple[RoboCow, CronEntry]]:
    """Returns the cron entries of all cows"""

    result = []
    for cow in self.cows:
        for cron_entry in cow.internals.cron_entries:
            result.append((cow, cron_entry))

    return result

builder

create_actor_message_for_cow(cow, base_url)

>>> create_actor_message_for_cow(moocow, "http://domain.example/")
{'baseUrl': 'http://domain.example/',
    'preferredUsername': 'moocow',
    'automaticallyAcceptFollowers': True,
    'profile': {'type': 'Service'}}
Source code in roboherd/herd/builder.py
def create_actor_message_for_cow(cow: RoboCow, base_url):
    """
    ```pycon
    >>> create_actor_message_for_cow(moocow, "http://domain.example/")
    {'baseUrl': 'http://domain.example/',
        'preferredUsername': 'moocow',
        'automaticallyAcceptFollowers': True,
        'profile': {'type': 'Service'}}

    ```
    """
    return {
        "baseUrl": base_url,
        "preferredUsername": cow.information.handle,
        "automaticallyAcceptFollowers": cow.auto_follow,
        "profile": {"type": "Service"},
    }

manager

HerdManager dataclass

HerdManager(prefix: str = ‘bot:’, herd_config: roboherd.herd.manager.config.HerdConfig = )

Parameters:

Name Type Description Default
prefix str
'bot:'
herd_config HerdConfig

HerdConfig(cows: list[roboherd.herd.manager.config.CowConfig] = )

<dynamic>
Source code in roboherd/herd/manager/__init__.py
@dataclass
class HerdManager:
    prefix: str = "bot:"
    herd_config: HerdConfig = field(default_factory=HerdConfig)

    @staticmethod
    def from_settings(settings):
        return HerdManager(herd_config=HerdConfig.from_settings(settings))

    def existing_cows(self, actors: List[ActorInformation]) -> List[RoboCow]:
        existing_cows = []

        for info in actors:
            if info.name.startswith(self.prefix):
                cow_name = info.name.removeprefix(self.prefix)
                cow_config = self.herd_config.for_name(cow_name)
                if cow_config:
                    cow = cow_config.load()
                    cow.internals.actor_id = info.id
                    existing_cows.append(cow)

        return existing_cows

    def cows_to_create(self, existing_actors: list[ActorInformation]) -> set[CowConfig]:
        existing_names = {
            actor.name.removeprefix(self.prefix)
            for actor in existing_actors
            if actor.name.startswith(self.prefix)
        }
        names_to_create = self.herd_config.names - existing_names

        cows = {self.herd_config.for_name(name) for name in names_to_create}

        return {cow for cow in cows if cow}

config

ConfigOverrides

Bases: BaseModel

Values used in roboherd.toml to overide the default values in the imported cow. This class is meant as a reference, and not meant to be directly used.

Parameters:

Name Type Description Default
name str | None

set to override the name

None
handle str | None

set to override the handle

None
base_url str | None

set to override the base url

None
skip_profile_update bool | None

set to skip updating the profile

None
Source code in roboherd/herd/manager/config.py
class ConfigOverrides(BaseModel):
    """Values used in `roboherd.toml` to overide the default
    values in the imported cow. This class is meant as a
    reference, and not meant to be directly used."""

    name: str | None = Field(
        default=None, description="set to override the name", examples=["New name"]
    )
    handle: str | None = Field(
        default=None, description="set to override the handle", examples=["new-handle"]
    )
    base_url: str | None = Field(
        default=None,
        description="set to override the base url",
        examples=["https://other.example"],
    )
    skip_profile_update: bool | None = Field(
        default=None, description="set to skip updating the profile", examples=[True]
    )

CowConfig dataclass

CowConfig(name: str, module: str, attribute: str, config: dict)

Parameters:

Name Type Description Default
name str

Name of the cow, must be unique

required
module str
required
attribute str
required
config dict
required
Source code in roboherd/herd/manager/config.py
@dataclass
class CowConfig:
    name: str = field(metadata={"description": "Name of the cow, must be unique"})
    module: str
    attribute: str
    config: dict

    @staticmethod
    def from_name_and_dict(name, cow: dict) -> "CowConfig":
        module, attribute = cow["bot"].split(":")

        return CowConfig(name=name, module=module, attribute=attribute, config=cow)

    def load(self) -> RoboCow:
        cow = load_cow(self.module, self.attribute)

        overrides = ConfigOverrides(**self.config)

        for value in ["name", "handle", "base_url"]:
            if getattr(overrides, value):
                setattr(cow.information, value, getattr(overrides, value))

        if overrides.skip_profile_update:
            cow.skip_profile_update = overrides.skip_profile_update

        return cow

    def __hash__(self):
        return hash(self.name)

HerdConfig dataclass

HerdConfig(cows: list[roboherd.herd.manager.config.CowConfig] = )

Parameters:

Name Type Description Default
cows list[CowConfig]

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

<dynamic>
Source code in roboherd/herd/manager/config.py
@dataclass
class HerdConfig:
    cows: list[CowConfig] = field(default_factory=list)

    def for_name(self, name: str) -> CowConfig | None:
        for cow in self.cows:
            if cow.name == name:
                return cow
        return None

    @property
    def names(self) -> set[str]:
        return {cow.name for cow in self.cows}

    @staticmethod
    def from_settings(settings):
        cows = [
            CowConfig.from_name_and_dict(name, config)
            for name, config in settings.cow.items()
        ]

        return HerdConfig(cows=cows)

    def load_herd(self) -> list[RoboCow]:
        return [cow.load() for cow in self.cows]

load

load_cow(module_name, attribute)

Loads a cow from module name and attribute

Source code in roboherd/herd/manager/load.py
def load_cow(module_name: str, attribute: str) -> RoboCow:
    """Loads a cow from module name and attribute"""
    module = import_module(module_name)
    importlib.reload(module)

    cow = getattr(module, attribute)

    return copy.deepcopy(cow)

processor

HerdProcessor dataclass

HerdProcessor(connection: almabtrieb.Almabtrieb, incoming_handlers: List[roboherd.cow.RoboCow])

Parameters:

Name Type Description Default
connection Almabtrieb
required
incoming_handlers List[RoboCow]
required
Source code in roboherd/herd/processor.py
@dataclass
class HerdProcessor:
    connection: Almabtrieb
    incoming_handlers: List[RoboCow]

    def create_tasks(self, task_group: asyncio.TaskGroup):
        tasks = []
        if len(self.incoming_handlers) > 0:
            tasks.append(task_group.create_task(self.process_incoming(self.connection)))

        return tasks

    async def process_incoming(self, connection):
        actor_id_to_cow_map = {}
        for cow in self.incoming_handlers:
            actor_id_to_cow_map[cow.internals.actor_id] = cow

        async for msg in connection.incoming():
            actor_id = msg["actor"]

            cow = actor_id_to_cow_map.get(actor_id)
            logger.info(cow)
            if cow:
                await cow.internals.handlers.handle(
                    msg, "incoming", connection, actor_id, cow=cow
                )

scheduler

HerdScheduler dataclass

HerdScheduler(entries: List[Tuple[roboherd.cow.RoboCow, roboherd.cow.CronEntry]], connection: almabtrieb.Almabtrieb)

Parameters:

Name Type Description Default
entries List[Tuple[RoboCow, CronEntry]]
required
connection Almabtrieb
required
Source code in roboherd/herd/scheduler.py
@dataclass
class HerdScheduler:
    entries: List[Tuple[RoboCow, CronEntry]]
    connection: Almabtrieb

    def create_task(self, task_group: asyncio.TaskGroup):
        if len(self.entries) == 0:
            logger.info("No tasks to schedule")
            return
        task_group.create_task(self.run())

    async def run(self):
        if len(self.entries) == 0:
            return

        scheduler = AsyncIOScheduler()

        for cow, entry in self.entries:
            trigger = CronTrigger.from_crontab(entry.crontab)
            scheduler.add_job(
                inject(entry.func),
                trigger=trigger,
                kwargs={
                    "actor_id": cow.internals.actor_id,
                    "connection": self.connection,
                    "cow": cow,
                },
            )

        scheduler.start()

        while True:
            await asyncio.sleep(60 * 60)

types

HandlerInformation dataclass

HandlerInformation(action: str, activity_type: str, func: Callable, cow: roboherd.cow.RoboCow)

Parameters:

Name Type Description Default
action str
required
activity_type str
required
func Callable
required
cow RoboCow
required
Source code in roboherd/herd/types.py
@dataclass
class HandlerInformation:
    action: str
    activity_type: str
    func: Callable
    cow: RoboCow