Skip to content

aimu.agents

Agents and code-controlled workflows.

Hierarchy

aimu.agents.Runner

Bases: ABC

Abstract base for every concrete agent and workflow in AIMU.

Concrete subclasses implement :meth:run and :attr:messages.

messages abstractmethod property

messages: MessageHistory

Message histories of all sub-runners, keyed by runner name.

For a leaf agent: {agent.name: model_client.messages}. For composite workflows: the per-runner dicts merged into one (via dict.update), so the result spans the full sub-tree regardless of nesting depth. Example::

router.messages
# {"router-classifier": [...], "code-handler": [...], "prose-handler": [...]}

Merge is by runner name: two sub-runners sharing a name collide and the one merged later wins (last-write). Give sub-runners distinct name=s if you need every history addressable.

Separately, when sub-runners share a single ModelClient (e.g. via Chain.from_config), they all reference the same messages list, so after a run every key points at that one shared (last step's) history. Use distinct clients per sub-runner to keep histories separate.

run abstractmethod

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run synchronously (stream=False) or streaming (stream=True).

as_tool

as_tool(*, name: Optional[str] = None, description: Optional[str] = None) -> Callable

Wrap this runner as a @tool-style callable: tool(task: str) -> str.

The returned callable runs :meth:run and returns its string result, so any agent or workflow can call this runner as a tool: drop it into Agent(tools=[...]) or an :class:OrchestratorAgent's worker list. Works for every concrete Runner (Agent, Chain, Router, a remote A2A agent, ...) since it only relies on run().

name defaults to self.name (sanitised to a valid identifier) or "runner". description defaults to the first line of self.system_message when present (Agent / SkillAgent), else a generic delegation string (workflows have no system_message).

aimu.agents.MessageHistory module-attribute

MessageHistory = dict[str, list[dict]]

Agents

aimu.agents.Agent dataclass

Agent(model_client: BaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, final_answer_prompt: Optional[str] = None, deps: Optional[Any] = None)

Bases: _AgentLoopMixin, Runner

A model client wrapped in an agentic loop.

Calls model_client.chat() repeatedly until the model produces a turn without invoking tools, or max_iterations is reached. The stop condition scans model_client.messages in reverse for a "tool" role message after the last "user" role. If found, the agent sends continuation_prompt and loops.

Tools are plain callables in tools=: functions decorated with @aimu.tools.tool for in-process tools, and/or MCPClient(...).as_tools() for cross-process FastMCP tools (each MCP tool becomes a callable). Mix them freely in one list, tools=builtin.web + mcp.as_tools().

When system_message is set or reset_messages_on_run is True, the agent clears model_client.messages and re-applies system_message before every run. This isolates state when a client is shared (e.g. inside a :class:Chain).

final_answer_prompt (opt-in, default None) guarantees a final answer when the loop exhausts max_iterations while the model is still calling tools. Instead of returning whatever the last (possibly tool-only) turn produced, the agent sends this prompt once with tools disabled, forcing the model to synthesize an answer from the context it has gathered. This wrap-up turn is not counted against max_iterations (the cap bounds tool-using turns; this is the guaranteed finish). It fires only on the cap-with-pending-tools path; a natural finish (a turn with no tool calls) is unaffected.

Quick start::

from aimu.tools import tool
from aimu.agents import Agent
import aimu

@tool
def letter_counter(word: str, letter: str) -> int:
    """Count occurrences of a letter in a word."""
    return word.lower().count(letter.lower())

client = aimu.client("ollama:qwen3.5:9b")
agent = Agent(client, "You are a helpful assistant.", tools=[letter_counter])
print(agent.run("How many r's in strawberry?"))

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None, tools: Optional[list[Callable]] = None, deps: Optional[Any] = None, schema: Optional[type] = None) -> Union[str, Any, Iterator[StreamChunk]]

Run the agentic loop. images attach only to the initial turn.

tools is a per-run override of the agent's configured self.tools: None (default) uses them, any other value (including [] to disable Python tools for this run) replaces them for every chat() call in the loop and is restored afterward.

deps is a per-run override of the agent's self.deps field, the value injected as ctx.deps into tools that declare a :class:~aimu.tools.ToolContext parameter.

schema (a dataclass or Pydantic v2 model) makes the run a single structured-output turn that returns a validated instance instead of looping with tools. Use it for an agent whose job is to return a typed object (e.g. a critic's verdict). It is mutually exclusive with stream=True and with the tool-calling loop.

as_model_client

as_model_client() -> BaseModelClient

Return a :class:BaseModelClient view of this agent.

Each chat() call on the returned object runs the full agent loop, looping until the model stops calling tools. Use this only where an API expects a BaseModelClient. For direct use, call :meth:run instead.

from_config classmethod

from_config(config: dict[str, Any], model_client: BaseModelClient) -> Agent

Create an Agent from a plain dict config.

Recognised keys: name, system_message, max_iterations, continuation_prompt, final_answer_prompt.

aimu.agents.SkillAgent dataclass

SkillAgent(model_client: BaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, final_answer_prompt: Optional[str] = None, deps: Optional[Any] = None, skill_manager: SkillManager = SkillManager())

Bases: Agent

An :class:Agent extended with filesystem-discovered skill injection.

On first run (or after a message reset) the SkillAgent appends the skill catalog to its system message and adds the skills server's tools (via MCPClient.as_tools()) to model_client.tools so the model can call activate_skill to load full skill instructions before proceeding.

By default a fresh :class:SkillManager is created, scanning the standard search paths (.agents/skills/, .claude/skills/, ~/.agents/skills/, ~/.claude/skills/). Pass an explicit SkillManager to override.

Usage::

agent = SkillAgent(client, "Use available skills as needed.")
result = agent.run("Use the pdf-processing skill to extract pages.")

With explicit skill dirs::

agent = SkillAgent(client, skill_manager=SkillManager(skill_dirs=["./skills"]))

from_config classmethod

from_config(config: dict[str, Any], model_client: BaseModelClient) -> SkillAgent

Create a SkillAgent from a plain dict config.

Recognised keys: name, system_message, max_iterations, continuation_prompt, skill_dirs (omit to auto-discover).

aimu.agents.OrchestratorAgent

Bases: Runner, ABC

Base class for the orchestrator + worker-tools pattern.

Subclasses define worker :class:Agent instances and @tool-decorated dispatch functions in __init__, then call :meth:_init_orchestrator to wire everything up::

from aimu.tools import tool

class ResearchAgent(OrchestratorAgent):
    def __init__(self, client):
        researcher = Agent(client, "Research the topic.", name="researcher")

        @tool
        def research(topic: str) -> str:
            """Run the researcher on a topic."""
            return researcher.run(topic)

        self._init_orchestrator(
            client,
            name="research-orchestrator",
            system_message="Use the research tool to investigate.",
            tools=[research],
        )

For the simple case of dispatching to a fixed list of workers, use :meth:assemble to skip subclassing entirely.

assemble classmethod

assemble(model_client: BaseModelClient, system_message: str, *, workers: list[Runner], name: str = 'orchestrator', concurrent_tool_calls: bool = True, final_answer_prompt: Optional[str] = None) -> 'OrchestratorAgent'

Build a ready-to-run orchestrator from a list of worker runners.

Each worker becomes a callable tool via :meth:Runner.as_tool; the orchestrator dispatches by name. Workers may be any :class:Runner (an :class:Agent, a Chain/Router/Parallel workflow, or a remote A2A agent), not just Agent instances; tool names/descriptions come from each worker's name and (when present) system_message.

Example::

researcher = Agent(client, "Research the topic.", name="researcher")
critic = Agent(client, "Critique the response.", name="critic")
orch = OrchestratorAgent.assemble(client, "Use both workers.",
                                  workers=[researcher, critic])
print(orch.run("Quantum computing"))

restore

restore(messages: list[dict]) -> None

Restore the inner orchestrator agent's state from a saved message list.

Workers are invoked as tools, so their own state is not part of the orchestrator's history; restore a worker directly if it needs resuming. See :meth:Agent.restore for the full save/restore pattern.

Workflows

aimu.agents.Chain dataclass

Chain(agents: list, name: str = 'chain')

Bases: Runner

Prompt Chaining pattern: agents run sequentially, output → input.

Each step's text output (concatenated GENERATING chunks) becomes the next step's task input. Steps may be :class:Agent instances or nested workflows.

Quick start::

chain = Chain.from_client(client, [
    "Break the task into 3 concrete steps.",
    "Execute each step and collect results.",
    "Polish the result into a final report.",
])
result = chain.run("Research top Python web frameworks.")

Direct construction (each step owns its own client/agent)::

chain = Chain(agents=[
    Agent(client_a, "Planner", name="planner"),
    Agent(client_b, "Executor", name="executor"),
])

from_client classmethod

from_client(client: BaseModelClient, prompts: list[str], *, name: str = 'chain') -> Chain

Build a Chain from a single client and a list of step system_messages.

Each step gets its own :class:Agent with reset_messages_on_run=True so it clears the shared client's history and applies its own system_message.

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run all agents sequentially. images are forwarded only to the first step.

restore

restore(messages: list[dict], *, step: int = 0) -> None

Restore a chain step's state from a saved message list.

step (keyword-only) selects which step's agent to restore (default 0, the first step); subsequent steps start fresh on the next run(). Raises IndexError if step is out of range. See :meth:Agent.restore for the full save/restore pattern.

from_config classmethod

from_config(configs: list[dict[str, Any]], client: BaseModelClient) -> Chain

Build a Chain from a list of agent config dicts and a single client.

Each step's Agent gets reset_messages_on_run=True so it clears the client's messages and applies its own system_message before running.

aimu.agents.Router dataclass

Router(routing_agent: Agent, handlers: dict[str, Runner], name: str = 'router', fallback: Optional[Runner] = None)

Bases: Runner

Routing pattern: classify the task, dispatch to a specialist.

The routing_agent receives the task and must respond with a single route name. The Router dispatches to the matching handler (case-insensitive, whitespace-stripped). Handlers may be any :class:Runner (agent or nested workflow).

Quick start::

router = Router.from_client(
    client,
    classifier_prompt=(
        "Classify the task as one of: code, writing, math. "
        "Reply with only the category name."
    ),
    handlers={
        "code":    Agent(client, "You are a coder.", name="coder"),
        "writing": Agent(client, "You are a writer.", name="writer"),
        "math":    Agent(client, "You are a mathematician.", name="math"),
    },
    fallback=Agent(client, "Be helpful.", name="general"),
)

from_client classmethod

from_client(client: BaseModelClient, classifier_prompt: str, handlers: dict[str, Runner], *, fallback: Optional[Runner] = None, name: str = 'router') -> Router

Build a Router using client as the classifier with the given prompt.

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Classify the task and dispatch to the matched handler.

images are forwarded to the handler; the routing agent classifies on text only.

restore

restore(messages: list[dict], *, route: Optional[str] = None) -> None

Restore one sub-runner's state from a saved message list.

route=None (default, keyword-only) restores the routing classifier; a route key restores that handler (raises KeyError listing the routes on a miss). Other sub-runners start fresh on the next run(). See :meth:Agent.restore for the pattern.

from_config classmethod

from_config(routing_config: dict[str, Any], handler_configs: dict[str, dict[str, Any]], client: BaseModelClient, fallback_config: Optional[dict[str, Any]] = None) -> Router

Build a Router from config dicts and a single client.

aimu.agents.Parallel dataclass

Parallel(workers: list[Runner], name: str = 'parallel', aggregator: Optional[Runner] = None, separator: str = '\n\n---\n\n', max_workers: Optional[int] = None)

Bases: Runner

Parallelization pattern: run workers concurrently, aggregate.

Each worker receives the same task. An optional aggregator receives all worker outputs joined by separator and produces the final result. Without an aggregator, the joined worker outputs are returned directly.

Workers run concurrently via ThreadPoolExecutor; results are collected in submission order. Workers may be any :class:Runner.

Quick start::

parallel = Parallel.from_client(
    client,
    worker_prompts=[
        "Analyze this from a security perspective.",
        "Analyze this from a performance perspective.",
        "Analyze this from a readability perspective.",
    ],
    aggregator_prompt="Synthesize the perspectives into one concise review.",
)

from_client classmethod

from_client(client: BaseModelClient, worker_prompts: list[str], *, aggregator_prompt: Optional[str] = None, separator: str = '\n\n---\n\n', name: str = 'parallel') -> Parallel

Build a Parallel using client for all workers (and aggregator).

restore

restore(messages: list[dict], *, worker: int = 0) -> None

Restore one worker's state from a saved message list.

worker (keyword-only) selects which worker by index (default 0), mirroring Chain.restore's step. Other workers and the aggregator start fresh on the next run(). Raises IndexError if worker is out of range. See :meth:Agent.restore for the full save/restore pattern.

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run workers concurrently then aggregate.

images are forwarded to every worker; the aggregator runs on text only.

aimu.agents.EvaluatorOptimizer dataclass

EvaluatorOptimizer(generator: Agent, evaluator: Agent, name: str = 'evaluator_optimizer', max_rounds: int = 3, pass_keyword: str = 'PASS', stop_when: Optional[Callable[[Any], bool]] = None, verdict_schema: Optional[type] = None, passed_attr: str = 'passed', feedback_attr: str = 'feedback')

Bases: Runner

Evaluator-Optimizer pattern: iteratively improve via critic feedback.

The generator produces an initial response. The evaluator reviews it against the original task and decides whether it is accepted; if not, its feedback drives a revision. The loop stops when the evaluator accepts or max_rounds is reached.

Acceptance is decided by one of three mechanisms, in priority order:

  • stop_when: a predicate over the evaluator's output (the raw text, or the typed verdict when verdict_schema is set). The most flexible and robust option.
  • verdict_schema: a dataclass / Pydantic model the evaluator must return (via structured output). Acceptance reads its passed (bool) attribute and revision uses its feedback (str) attribute. Requires a structured-output-capable model; it raises on a malformed verdict rather than silently continuing.
  • pass_keyword (default): accept when this substring appears in the evaluator's text.

Usage::

eo = EvaluatorOptimizer(
    generator=Agent(client, "Write a clear, accurate explanation.", name="writer"),
    evaluator=Agent(client, "Reply PASS or REVISE: <feedback>.", name="critic"),
    max_rounds=4,
)
result = eo.run("Explain gradient descent.")

# Typed verdict instead of substring matching:
class Verdict(BaseModel):
    passed: bool
    feedback: str = ""
eo = EvaluatorOptimizer(generator=writer, evaluator=critic, verdict_schema=Verdict)

restore

restore(messages: list[dict]) -> None

Restore the generator's state from a saved message list.

The evaluator starts fresh on the next round. See :meth:Agent.restore for the full save/restore pattern and system-message handling details.

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run the generate-evaluate loop. Streaming yields only the final output.

aimu.agents.PlanExecuteEvaluator dataclass

PlanExecuteEvaluator(planner: SkillAgent, executor: Agent, scorer: Scorer, criteria: Optional[str] = None, name: str = 'plan_execute_evaluator', max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None)

Bases: Runner

Plan → execute → evaluate → replan-on-fail loop.

A planner agent produces a plan for the task; an executor agent runs the plan with its tools; a pluggable :class:Scorer judges the output. On failure, the planner is invoked again with the prior round's feedback to produce a new plan (not a revision). The loop bounds at max_rounds and on exhaustion returns the highest-scoring attempt.

Two criteria modes:

  • User-supplied (criteria="..." at construction): the criteria is passed to the planner's prompt every round, and to the scorer as the row's reference field.
  • Planner-invented (criteria=None): the planner is asked to emit ## Evaluation criteria + ## Plan sections; the workflow parses both. The parsed criteria is passed to the scorer as reference.

Usage::

wf = PlanExecuteEvaluator.from_client(
    client=aimu.client("anthropic:claude-sonnet-4-6"),
    judge_client=aimu.client("openai:gpt-4o"),
    executor_tools=builtin.web + builtin.fs,
    criteria="The summary cites 3+ sources and is under 200 words.",
    max_rounds=3,
)
result = wf.run("Summarise the latest news on quantum computing.")

last_attempts property

last_attempts: list[dict]

Per-round attempts from the most recent run().

Each entry has round, plan, criteria, output, score, feedback. Useful for introspection or post-hoc analysis.

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Run the plan-execute-evaluate loop. images are forwarded to the executor each round.

from_client classmethod

from_client(client: BaseModelClient, *, judge_client: Optional[BaseModelClient] = None, criteria: Optional[str] = None, executor_tools: Optional[list[Callable]] = None, skill_manager: Optional[SkillManager] = None, planner_system_message: Optional[str] = None, executor_system_message: Optional[str] = None, max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None, name: str = 'plan_execute_evaluator') -> PlanExecuteEvaluator

Build a PlanExecuteEvaluator from a single client and the common knobs.

  • planner is a :class:SkillAgent over client; pass skill_manager= to use task-type planning skills from disk.
  • executor is an :class:Agent over client with the given executor_tools.
  • scorer is an :class:LLMJudgeScorer over judge_client (defaults to client if not provided; a stronger separate model is recommended). Its criteria is the workflow's criteria if supplied, otherwise a generic fallback.

For full control (e.g. a custom :class:Scorer), construct the dataclass directly.

A2A interop

Optional Agent2Agent protocol support (requires the a2a extra: pip install 'aimu[a2a]'). Available only when aimu.agents.HAS_A2A is True. See A2A vs MCP and Connect agents (A2A).

aimu.agents.RemoteAgent

RemoteAgent(portal_cm, portal, client: A2AClient, httpx_client: AsyncClient, name: str, card: Any)

Bases: Runner

A remote A2A agent presented as a local synchronous Runner.

Construct via :meth:connect (it resolves the remote agent card first)::

remote = RemoteAgent.connect("http://localhost:9000")
print(remote.run("Summarise the news"))

# Composes like any other runner:
chain = Chain(agents=[local_agent, remote])
local = Agent(client, tools=[remote.as_tool()])

Holds a live async connection (an httpx.AsyncClient driven through an anyio portal); keep the instance alive for the connection's lifetime and let it be garbage collected (or call :meth:close) to tear the portal down.

connect classmethod

connect(url: str, *, name: Optional[str] = None, agent_card_path: str = DEFAULT_AGENT_CARD_PATH, timeout: float = 60.0) -> 'RemoteAgent'

Resolve the remote agent card at url and return a connected RemoteAgent.

name defaults to the remote agent card's name. Connection failures raise :class:A2AConnectionError with the original exception chained.

run

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]

Send task to the remote agent and return its text response.

stream=True yields the full response as a single GENERATING chunk followed by DONE; incremental token streaming over the portal is not supported on the sync surface (use aimu.aio.a2a.RemoteAgent for real streaming).

aimu.agents.serve_a2a

serve_a2a(runner: Runner, *, host: str = '127.0.0.1', port: int = 9000, url: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None, skills: Optional[list[AgentSkill]] = None, **uvicorn_kwargs: Any) -> None

Serve runner as an A2A agent over HTTP (blocking).

url is the externally reachable base URL advertised in the agent card; it defaults to http://{host}:{port}/. Extra kwargs pass through to uvicorn.run.

aimu.agents.build_a2a_app

build_a2a_app(runner: Runner, *, url: str, name: Optional[str] = None, description: Optional[str] = None, skills: Optional[list[AgentSkill]] = None, agent_card_path: str = DEFAULT_AGENT_CARD_PATH)

Build the Starlette ASGI app that serves runner over A2A (does not run it).

aimu.agents.A2AConnectionError

Bases: RuntimeError

Raised when a :class:RemoteAgent fails to connect to or call a remote A2A agent.