Skip to content

aimu.agents

Agents and code-controlled workflows.

Hierarchy

aimu.agents.Runner

Bases: ABC

Abstract base for every concrete agent and workflow in AIMU.

Concrete subclasses implement :meth:run and :attr:messages.

messages abstractmethod property

messages: MessageHistory

Message histories of all sub-runners, keyed by runner name.

For a leaf agent: {agent.name: model_client.messages}. For composite workflows: dicts from every constituent runner merged into one, so the result spans the full sub-tree regardless of nesting depth.

Note: when agents share a single ModelClient (e.g. via Chain.from_config), all agents reference the same messages list. After the run every key in the returned dict points at the last step's messages.

run abstractmethod

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run synchronously (stream=False) or streaming (stream=True).

aimu.agents.MessageHistory module-attribute

MessageHistory = dict[str, list[dict]]

Agents

aimu.agents.Agent dataclass

Agent(model_client: BaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False)

Bases: Runner

A model client wrapped in an agentic loop.

Calls model_client.chat() repeatedly until the model produces a turn without invoking tools, or max_iterations is reached. The stop condition scans model_client.messages in reverse for a "tool" role message after the last "user" role — if found, the agent sends continuation_prompt and loops.

Tools may be supplied two ways and combined:

  • tools=[fn1, fn2] — Python functions decorated with @aimu.tools.tool.
  • model_client.mcp_client = MCPClient(...) — a FastMCP-backed tool server.

When system_message is set or reset_messages_on_run is True, the agent clears model_client.messages and re-applies system_message before every run. This isolates state when a client is shared (e.g. inside a :class:Chain).

Quick start::

from aimu.tools import tool
from aimu.agents import Agent
import aimu

@tool
def letter_counter(word: str, letter: str) -> int:
    """Count occurrences of a letter in a word."""
    return word.lower().count(letter.lower())

client = aimu.client("ollama:qwen3.5:9b")
agent = Agent(client, "You are a helpful assistant.", tools=[letter_counter])
print(agent.run("How many r's in strawberry?"))

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run the agentic loop. images attach only to the initial turn.

as_model_client

as_model_client() -> BaseModelClient

Return a :class:BaseModelClient view of this agent.

Each chat() call on the returned object runs the full agent loop, looping until the model stops calling tools. Use this only where an API expects a BaseModelClient — for direct use, call :meth:run instead.

from_config classmethod

from_config(config: dict[str, Any], model_client: BaseModelClient) -> Agent

Create an Agent from a plain dict config.

Recognised keys: name, system_message, max_iterations, continuation_prompt.

aimu.agents.SkillAgent dataclass

SkillAgent(model_client: BaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, skill_manager: SkillManager = SkillManager())

Bases: Agent

An :class:Agent extended with filesystem-discovered skill injection.

On first run (or after a message reset) the SkillAgent appends the skill catalog to its system message and attaches a skills MCPClient so the model can call activate_skill to load full skill instructions before proceeding.

By default a fresh :class:SkillManager is created, scanning the standard search paths (.agents/skills/, .claude/skills/, ~/.agents/skills/, ~/.claude/skills/). Pass an explicit SkillManager to override.

Usage::

agent = SkillAgent(client, "Use available skills as needed.")
result = agent.run("Use the pdf-processing skill to extract pages.")

With explicit skill dirs::

agent = SkillAgent(client, skill_manager=SkillManager(skill_dirs=["./skills"]))

from_config classmethod

from_config(config: dict[str, Any], model_client: BaseModelClient) -> SkillAgent

Create a SkillAgent from a plain dict config.

Recognised keys: name, system_message, max_iterations, continuation_prompt, skill_dirs (omit to auto-discover).

aimu.agents.OrchestratorAgent

Bases: Runner, ABC

Base class for the orchestrator + worker-tools pattern.

Subclasses define worker :class:Agent instances and @tool-decorated dispatch functions in __init__, then call :meth:_init_orchestrator to wire everything up::

from aimu.tools import tool

class ResearchAgent(OrchestratorAgent):
    def __init__(self, client):
        researcher = Agent(client, "Research the topic.", name="researcher")

        @tool
        def research(topic: str) -> str:
            """Run the researcher on a topic."""
            return researcher.run(topic)

        self._init_orchestrator(
            client,
            name="research-orchestrator",
            system_message="Use the research tool to investigate.",
            tools=[research],
        )

For the simple case of dispatching to a fixed list of workers, use :meth:assemble to skip subclassing entirely.

assemble classmethod

assemble(model_client: BaseModelClient, system_message: str, *, workers: list[Agent], name: str = 'orchestrator', concurrent_tool_calls: bool = True) -> 'OrchestratorAgent'

Build a ready-to-run orchestrator from a list of worker agents.

Each worker becomes a callable tool — the orchestrator dispatches by name. Tool descriptions are taken from the worker's system_message (truncated to one line) so callers don't need to write @tool wrappers manually.

Example::

researcher = Agent(client, "Research the topic.", name="researcher")
critic = Agent(client, "Critique the response.", name="critic")
orch = OrchestratorAgent.assemble(client, "Use both workers.",
                                  workers=[researcher, critic])
print(orch.run("Quantum computing"))

Workflows

aimu.agents.Chain dataclass

Chain(agents: list, name: str = 'chain')

Bases: Runner

Prompt Chaining pattern: agents run sequentially, output → input.

Each step's text output (concatenated GENERATING chunks) becomes the next step's task input. Steps may be :class:Agent instances or nested workflows.

Quick start::

chain = Chain.from_client(client, [
    "Break the task into 3 concrete steps.",
    "Execute each step and collect results.",
    "Polish the result into a final report.",
])
result = chain.run("Research top Python web frameworks.")

Direct construction (each step owns its own client/agent)::

chain = Chain(agents=[
    Agent(client_a, "Planner", name="planner"),
    Agent(client_b, "Executor", name="executor"),
])

from_client classmethod

from_client(client: BaseModelClient, prompts: list[str], *, name: str = 'chain') -> Chain

Build a Chain from a single client and a list of step system_messages.

Each step gets its own :class:Agent with reset_messages_on_run=True so it clears the shared client's history and applies its own system_message.

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run all agents sequentially. images are forwarded only to the first step.

from_config classmethod

from_config(configs: list[dict[str, Any]], client: BaseModelClient) -> Chain

Build a Chain from a list of agent config dicts and a single client.

Each step's Agent gets reset_messages_on_run=True so it clears the client's messages and applies its own system_message before running.

aimu.agents.Router dataclass

Router(routing_agent: Agent, handlers: dict[str, Runner], name: str = 'router', fallback: Optional[Runner] = None)

Bases: Runner

Routing pattern: classify the task, dispatch to a specialist.

The routing_agent receives the task and must respond with a single route name. The Router dispatches to the matching handler (case-insensitive, whitespace-stripped). Handlers may be any :class:Runner (agent or nested workflow).

Quick start::

router = Router.from_client(
    client,
    classifier_prompt=(
        "Classify the task as one of: code, writing, math. "
        "Reply with only the category name."
    ),
    handlers={
        "code":    Agent(client, "You are a coder.", name="coder"),
        "writing": Agent(client, "You are a writer.", name="writer"),
        "math":    Agent(client, "You are a mathematician.", name="math"),
    },
    fallback=Agent(client, "Be helpful.", name="general"),
)

from_client classmethod

from_client(client: BaseModelClient, classifier_prompt: str, handlers: dict[str, Runner], *, fallback: Optional[Runner] = None, name: str = 'router') -> Router

Build a Router using client as the classifier with the given prompt.

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Classify the task and dispatch to the matched handler.

images are forwarded to the handler; the routing agent classifies on text only.

from_config classmethod

from_config(routing_config: dict[str, Any], handler_configs: dict[str, dict[str, Any]], client: BaseModelClient, fallback_config: Optional[dict[str, Any]] = None) -> Router

Build a Router from config dicts and a single client.

aimu.agents.Parallel dataclass

Parallel(workers: list[Runner], name: str = 'parallel', aggregator: Optional[Runner] = None, separator: str = '\n\n---\n\n', max_workers: Optional[int] = None)

Bases: Runner

Parallelization pattern: run workers concurrently, aggregate.

Each worker receives the same task. An optional aggregator receives all worker outputs joined by separator and produces the final result. Without an aggregator, the joined worker outputs are returned directly.

Workers run concurrently via ThreadPoolExecutor; results are collected in submission order. Workers may be any :class:Runner.

Quick start::

parallel = Parallel.from_client(
    client,
    worker_prompts=[
        "Analyze this from a security perspective.",
        "Analyze this from a performance perspective.",
        "Analyze this from a readability perspective.",
    ],
    aggregator_prompt="Synthesize the perspectives into one concise review.",
)

from_client classmethod

from_client(client: BaseModelClient, worker_prompts: list[str], *, aggregator_prompt: Optional[str] = None, separator: str = '\n\n---\n\n', name: str = 'parallel') -> Parallel

Build a Parallel using client for all workers (and aggregator).

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run workers concurrently then aggregate.

images are forwarded to every worker; the aggregator runs on text only.

aimu.agents.EvaluatorOptimizer dataclass

EvaluatorOptimizer(generator: Agent, evaluator: Agent, name: str = 'evaluator_optimizer', max_rounds: int = 3, pass_keyword: str = 'PASS')

Bases: Runner

Evaluator-Optimizer pattern: iteratively improve via critic feedback.

The generator produces an initial response. The evaluator reviews it against the original task and either returns pass_keyword (accepted) or revision feedback. The loop stops when the evaluator accepts or max_rounds is reached.

Prompt the evaluator to respond with the exact pass_keyword for acceptance and specific revision feedback otherwise.

Usage::

eo = EvaluatorOptimizer(
    generator=Agent(client, "Write a clear, accurate explanation.", name="writer"),
    evaluator=Agent(
        client,
        "Review for accuracy and clarity. If acceptable, reply PASS. "
        "Otherwise reply REVISE: <specific feedback>.",
        name="critic",
    ),
    max_rounds=4,
)
result = eo.run("Explain gradient descent.")

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run the generate-evaluate loop. Streaming yields only the final output.

aimu.agents.PlanExecuteEvaluator dataclass

PlanExecuteEvaluator(planner: SkillAgent, executor: Agent, scorer: Scorer, criteria: Optional[str] = None, name: str = 'plan_execute_evaluator', max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None)

Bases: Runner

Plan → execute → evaluate → replan-on-fail loop.

A planner agent produces a plan for the task; an executor agent runs the plan with its tools; a pluggable :class:Scorer judges the output. On failure, the planner is invoked again with the prior round's feedback to produce a new plan (not a revision). The loop bounds at max_rounds and on exhaustion returns the highest-scoring attempt.

Two criteria modes:

  • User-supplied (criteria="..." at construction): the criteria is passed to the planner's prompt every round, and to the scorer as the row's reference field.
  • Planner-invented (criteria=None): the planner is asked to emit ## Evaluation criteria + ## Plan sections; the workflow parses both. The parsed criteria is passed to the scorer as reference.

Usage::

wf = PlanExecuteEvaluator.from_client(
    client=aimu.client("anthropic:claude-sonnet-4-6"),
    judge_client=aimu.client("openai:gpt-4o"),
    executor_tools=builtin.web + builtin.fs,
    criteria="The summary cites 3+ sources and is under 200 words.",
    max_rounds=3,
)
result = wf.run("Summarise the latest news on quantum computing.")

last_attempts property

last_attempts: list[dict]

Per-round attempts from the most recent run().

Each entry has round, plan, criteria, output, score, feedback. Useful for introspection or post-hoc analysis.

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run the plan-execute-evaluate loop. images are forwarded to the executor each round.

from_client classmethod

from_client(client: BaseModelClient, *, judge_client: Optional[BaseModelClient] = None, criteria: Optional[str] = None, executor_tools: Optional[list[Callable]] = None, skill_manager: Optional[SkillManager] = None, planner_system_message: Optional[str] = None, executor_system_message: Optional[str] = None, max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None, name: str = 'plan_execute_evaluator') -> PlanExecuteEvaluator

Build a PlanExecuteEvaluator from a single client and the common knobs.

  • planner is a :class:SkillAgent over client; pass skill_manager= to use task-type planning skills from disk.
  • executor is an :class:Agent over client with the given executor_tools.
  • scorer is an :class:LLMJudgeScorer over judge_client (defaults to client if not provided; a stronger separate model is recommended). Its criteria is the workflow's criteria if supplied, otherwise a generic fallback.

For full control (e.g. a custom :class:Scorer), construct the dataclass directly.