aimu.agents¶
Agents and code-controlled workflows.
Hierarchy¶
aimu.agents.Runner ¶
Bases: ABC
Abstract base for every concrete agent and workflow in AIMU.
Concrete subclasses implement :meth:run and :attr:messages.
messages
abstractmethod
property
¶
Message histories of all sub-runners, keyed by runner name.
For a leaf agent: {agent.name: model_client.messages}. For composite
workflows: the per-runner dicts merged into one (via dict.update), so the
result spans the full sub-tree regardless of nesting depth. Example::
router.messages
# {"router-classifier": [...], "code-handler": [...], "prose-handler": [...]}
Merge is by runner name: two sub-runners sharing a name collide and the one
merged later wins (last-write). Give sub-runners distinct name=s if you need
every history addressable.
Separately, when sub-runners share a single ModelClient (e.g. via
Chain.from_config), they all reference the same messages list, so after a run
every key points at that one shared (last step's) history. Use distinct clients per
sub-runner to keep histories separate.
run
abstractmethod
¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]
Run synchronously (stream=False) or streaming (stream=True).
as_tool ¶
Wrap this runner as a @tool-style callable: tool(task: str) -> str.
The returned callable runs :meth:run and returns its string result, so any agent
or workflow can call this runner as a tool: drop it into Agent(tools=[...]) or
an :class:OrchestratorAgent's worker list. Works for every concrete Runner
(Agent, Chain, Router, a remote A2A agent, ...) since it only relies on
run().
name defaults to self.name (sanitised to a valid identifier) or "runner".
description defaults to the first line of self.system_message when present
(Agent / SkillAgent), else a generic delegation string (workflows have no
system_message).
Agents¶
aimu.agents.Agent
dataclass
¶
Agent(model_client: BaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, final_answer_prompt: Optional[str] = None, deps: Optional[Any] = None)
Bases: _AgentLoopMixin, Runner
A model client wrapped in an agentic loop.
Calls model_client.chat() repeatedly until the model produces a turn without
invoking tools, or max_iterations is reached. The stop condition scans
model_client.messages in reverse for a "tool" role message after the last
"user" role. If found, the agent sends continuation_prompt and loops.
Tools are plain callables in tools=: functions decorated with
@aimu.tools.tool for in-process tools, and/or MCPClient(...).as_tools() for
cross-process FastMCP tools (each MCP tool becomes a callable). Mix them freely in
one list, tools=builtin.web + mcp.as_tools().
When system_message is set or reset_messages_on_run is True, the agent
clears model_client.messages and re-applies system_message before every
run. This isolates state when a client is shared (e.g. inside a :class:Chain).
final_answer_prompt (opt-in, default None) guarantees a final answer when the
loop exhausts max_iterations while the model is still calling tools. Instead of
returning whatever the last (possibly tool-only) turn produced, the agent sends this
prompt once with tools disabled, forcing the model to synthesize an answer from the
context it has gathered. This wrap-up turn is not counted against max_iterations
(the cap bounds tool-using turns; this is the guaranteed finish). It fires only on the
cap-with-pending-tools path; a natural finish (a turn with no tool calls) is unaffected.
Quick start::
from aimu.tools import tool
from aimu.agents import Agent
import aimu
@tool
def letter_counter(word: str, letter: str) -> int:
"""Count occurrences of a letter in a word."""
return word.lower().count(letter.lower())
client = aimu.client("ollama:qwen3.5:9b")
agent = Agent(client, "You are a helpful assistant.", tools=[letter_counter])
print(agent.run("How many r's in strawberry?"))
run ¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None, tools: Optional[list[Callable]] = None, deps: Optional[Any] = None, schema: Optional[type] = None) -> Union[str, Any, Iterator[StreamChunk]]
Run the agentic loop. images attach only to the initial turn.
tools is a per-run override of the agent's configured self.tools: None
(default) uses them, any other value (including [] to disable Python tools for
this run) replaces them for every chat() call in the loop and is restored
afterward.
deps is a per-run override of the agent's self.deps field, the value injected
as ctx.deps into tools that declare a :class:~aimu.tools.ToolContext parameter.
schema (a dataclass or Pydantic v2 model) makes the run a single structured-output
turn that returns a validated instance instead of looping with tools. Use it for an
agent whose job is to return a typed object (e.g. a critic's verdict). It is mutually
exclusive with stream=True and with the tool-calling loop.
as_model_client ¶
Return a :class:BaseModelClient view of this agent.
Each chat() call on the returned object runs the full agent loop, looping
until the model stops calling tools. Use this only where an API expects a
BaseModelClient. For direct use, call :meth:run instead.
from_config
classmethod
¶
Create an Agent from a plain dict config.
Recognised keys: name, system_message, max_iterations,
continuation_prompt, final_answer_prompt.
aimu.agents.SkillAgent
dataclass
¶
SkillAgent(model_client: BaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, final_answer_prompt: Optional[str] = None, deps: Optional[Any] = None, skill_manager: SkillManager = SkillManager())
Bases: Agent
An :class:Agent extended with filesystem-discovered skill injection.
On first run (or after a message reset) the SkillAgent appends the skill catalog
to its system message and adds the skills server's tools (via MCPClient.as_tools())
to model_client.tools so the model can call activate_skill to load full skill
instructions before proceeding.
By default a fresh :class:SkillManager is created, scanning the standard search
paths (.agents/skills/, .claude/skills/, ~/.agents/skills/,
~/.claude/skills/). Pass an explicit SkillManager to override.
Usage::
agent = SkillAgent(client, "Use available skills as needed.")
result = agent.run("Use the pdf-processing skill to extract pages.")
With explicit skill dirs::
agent = SkillAgent(client, skill_manager=SkillManager(skill_dirs=["./skills"]))
from_config
classmethod
¶
Create a SkillAgent from a plain dict config.
Recognised keys: name, system_message, max_iterations,
continuation_prompt, skill_dirs (omit to auto-discover).
aimu.agents.OrchestratorAgent ¶
Bases: Runner, ABC
Base class for the orchestrator + worker-tools pattern.
Subclasses define worker :class:Agent instances and @tool-decorated dispatch
functions in __init__, then call :meth:_init_orchestrator to wire everything up::
from aimu.tools import tool
class ResearchAgent(OrchestratorAgent):
def __init__(self, client):
researcher = Agent(client, "Research the topic.", name="researcher")
@tool
def research(topic: str) -> str:
"""Run the researcher on a topic."""
return researcher.run(topic)
self._init_orchestrator(
client,
name="research-orchestrator",
system_message="Use the research tool to investigate.",
tools=[research],
)
For the simple case of dispatching to a fixed list of workers, use
:meth:assemble to skip subclassing entirely.
assemble
classmethod
¶
assemble(model_client: BaseModelClient, system_message: str, *, workers: list[Runner], name: str = 'orchestrator', concurrent_tool_calls: bool = True, final_answer_prompt: Optional[str] = None) -> 'OrchestratorAgent'
Build a ready-to-run orchestrator from a list of worker runners.
Each worker becomes a callable tool via :meth:Runner.as_tool; the orchestrator
dispatches by name. Workers may be any :class:Runner (an :class:Agent, a
Chain/Router/Parallel workflow, or a remote A2A agent), not just
Agent instances; tool names/descriptions come from each worker's name and
(when present) system_message.
Example::
researcher = Agent(client, "Research the topic.", name="researcher")
critic = Agent(client, "Critique the response.", name="critic")
orch = OrchestratorAgent.assemble(client, "Use both workers.",
workers=[researcher, critic])
print(orch.run("Quantum computing"))
restore ¶
Restore the inner orchestrator agent's state from a saved message list.
Workers are invoked as tools, so their own state is not part of the orchestrator's
history; restore a worker directly if it needs resuming. See :meth:Agent.restore
for the full save/restore pattern.
Workflows¶
aimu.agents.Chain
dataclass
¶
Bases: Runner
Prompt Chaining pattern: agents run sequentially, output → input.
Each step's text output (concatenated GENERATING chunks) becomes the next step's
task input. Steps may be :class:Agent instances or nested workflows.
Quick start::
chain = Chain.from_client(client, [
"Break the task into 3 concrete steps.",
"Execute each step and collect results.",
"Polish the result into a final report.",
])
result = chain.run("Research top Python web frameworks.")
Direct construction (each step owns its own client/agent)::
chain = Chain(agents=[
Agent(client_a, "Planner", name="planner"),
Agent(client_b, "Executor", name="executor"),
])
from_client
classmethod
¶
Build a Chain from a single client and a list of step system_messages.
Each step gets its own :class:Agent with reset_messages_on_run=True so it
clears the shared client's history and applies its own system_message.
run ¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]
Run all agents sequentially. images are forwarded only to the first step.
restore ¶
Restore a chain step's state from a saved message list.
step (keyword-only) selects which step's agent to restore (default 0, the first
step); subsequent steps start fresh on the next run(). Raises IndexError if
step is out of range. See :meth:Agent.restore for the full save/restore pattern.
from_config
classmethod
¶
Build a Chain from a list of agent config dicts and a single client.
Each step's Agent gets reset_messages_on_run=True so it clears the client's
messages and applies its own system_message before running.
aimu.agents.Router
dataclass
¶
Router(routing_agent: Agent, handlers: dict[str, Runner], name: str = 'router', fallback: Optional[Runner] = None)
Bases: Runner
Routing pattern: classify the task, dispatch to a specialist.
The routing_agent receives the task and must respond with a single route name.
The Router dispatches to the matching handler (case-insensitive, whitespace-stripped).
Handlers may be any :class:Runner (agent or nested workflow).
Quick start::
router = Router.from_client(
client,
classifier_prompt=(
"Classify the task as one of: code, writing, math. "
"Reply with only the category name."
),
handlers={
"code": Agent(client, "You are a coder.", name="coder"),
"writing": Agent(client, "You are a writer.", name="writer"),
"math": Agent(client, "You are a mathematician.", name="math"),
},
fallback=Agent(client, "Be helpful.", name="general"),
)
from_client
classmethod
¶
from_client(client: BaseModelClient, classifier_prompt: str, handlers: dict[str, Runner], *, fallback: Optional[Runner] = None, name: str = 'router') -> Router
Build a Router using client as the classifier with the given prompt.
run ¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]
Classify the task and dispatch to the matched handler.
images are forwarded to the handler; the routing agent classifies on text only.
restore ¶
Restore one sub-runner's state from a saved message list.
route=None (default, keyword-only) restores the routing classifier; a route key
restores that handler (raises KeyError listing the routes on a miss). Other
sub-runners start fresh on the next run(). See :meth:Agent.restore for the pattern.
from_config
classmethod
¶
from_config(routing_config: dict[str, Any], handler_configs: dict[str, dict[str, Any]], client: BaseModelClient, fallback_config: Optional[dict[str, Any]] = None) -> Router
Build a Router from config dicts and a single client.
aimu.agents.Parallel
dataclass
¶
Parallel(workers: list[Runner], name: str = 'parallel', aggregator: Optional[Runner] = None, separator: str = '\n\n---\n\n', max_workers: Optional[int] = None)
Bases: Runner
Parallelization pattern: run workers concurrently, aggregate.
Each worker receives the same task. An optional aggregator receives all worker
outputs joined by separator and produces the final result. Without an
aggregator, the joined worker outputs are returned directly.
Workers run concurrently via ThreadPoolExecutor; results are collected in
submission order. Workers may be any :class:Runner.
Quick start::
parallel = Parallel.from_client(
client,
worker_prompts=[
"Analyze this from a security perspective.",
"Analyze this from a performance perspective.",
"Analyze this from a readability perspective.",
],
aggregator_prompt="Synthesize the perspectives into one concise review.",
)
from_client
classmethod
¶
from_client(client: BaseModelClient, worker_prompts: list[str], *, aggregator_prompt: Optional[str] = None, separator: str = '\n\n---\n\n', name: str = 'parallel') -> Parallel
Build a Parallel using client for all workers (and aggregator).
restore ¶
Restore one worker's state from a saved message list.
worker (keyword-only) selects which worker by index (default 0), mirroring
Chain.restore's step. Other workers and the aggregator start fresh on the
next run(). Raises IndexError if worker is out of range. See
:meth:Agent.restore for the full save/restore pattern.
run ¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]
Run workers concurrently then aggregate.
images are forwarded to every worker; the aggregator runs on text only.
aimu.agents.EvaluatorOptimizer
dataclass
¶
EvaluatorOptimizer(generator: Agent, evaluator: Agent, name: str = 'evaluator_optimizer', max_rounds: int = 3, pass_keyword: str = 'PASS', stop_when: Optional[Callable[[Any], bool]] = None, verdict_schema: Optional[type] = None, passed_attr: str = 'passed', feedback_attr: str = 'feedback')
Bases: Runner
Evaluator-Optimizer pattern: iteratively improve via critic feedback.
The generator produces an initial response. The evaluator reviews it against the
original task and decides whether it is accepted; if not, its feedback drives a revision.
The loop stops when the evaluator accepts or max_rounds is reached.
Acceptance is decided by one of three mechanisms, in priority order:
stop_when: a predicate over the evaluator's output (the raw text, or the typed verdict whenverdict_schemais set). The most flexible and robust option.verdict_schema: a dataclass / Pydantic model the evaluator must return (via structured output). Acceptance reads itspassed(bool) attribute and revision uses itsfeedback(str) attribute. Requires a structured-output-capable model; it raises on a malformed verdict rather than silently continuing.pass_keyword(default): accept when this substring appears in the evaluator's text.
Usage::
eo = EvaluatorOptimizer(
generator=Agent(client, "Write a clear, accurate explanation.", name="writer"),
evaluator=Agent(client, "Reply PASS or REVISE: <feedback>.", name="critic"),
max_rounds=4,
)
result = eo.run("Explain gradient descent.")
# Typed verdict instead of substring matching:
class Verdict(BaseModel):
passed: bool
feedback: str = ""
eo = EvaluatorOptimizer(generator=writer, evaluator=critic, verdict_schema=Verdict)
restore ¶
Restore the generator's state from a saved message list.
The evaluator starts fresh on the next round. See :meth:Agent.restore
for the full save/restore pattern and system-message handling details.
run ¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]
Run the generate-evaluate loop. Streaming yields only the final output.
aimu.agents.PlanExecuteEvaluator
dataclass
¶
PlanExecuteEvaluator(planner: SkillAgent, executor: Agent, scorer: Scorer, criteria: Optional[str] = None, name: str = 'plan_execute_evaluator', max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None)
Bases: Runner
Plan → execute → evaluate → replan-on-fail loop.
A planner agent produces a plan for the task; an executor agent runs the
plan with its tools; a pluggable :class:Scorer judges the output. On
failure, the planner is invoked again with the prior round's feedback to
produce a new plan (not a revision). The loop bounds at max_rounds
and on exhaustion returns the highest-scoring attempt.
Two criteria modes:
- User-supplied (
criteria="..."at construction): the criteria is passed to the planner's prompt every round, and to the scorer as the row'sreferencefield. - Planner-invented (
criteria=None): the planner is asked to emit## Evaluation criteria+## Plansections; the workflow parses both. The parsed criteria is passed to the scorer asreference.
Usage::
wf = PlanExecuteEvaluator.from_client(
client=aimu.client("anthropic:claude-sonnet-4-6"),
judge_client=aimu.client("openai:gpt-4o"),
executor_tools=builtin.web + builtin.fs,
criteria="The summary cites 3+ sources and is under 200 words.",
max_rounds=3,
)
result = wf.run("Summarise the latest news on quantum computing.")
last_attempts
property
¶
Per-round attempts from the most recent run().
Each entry has round, plan, criteria, output, score,
feedback. Useful for introspection or post-hoc analysis.
run ¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]
Run the plan-execute-evaluate loop. images are forwarded to the executor each round.
from_client
classmethod
¶
from_client(client: BaseModelClient, *, judge_client: Optional[BaseModelClient] = None, criteria: Optional[str] = None, executor_tools: Optional[list[Callable]] = None, skill_manager: Optional[SkillManager] = None, planner_system_message: Optional[str] = None, executor_system_message: Optional[str] = None, max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None, name: str = 'plan_execute_evaluator') -> PlanExecuteEvaluator
Build a PlanExecuteEvaluator from a single client and the common knobs.
planneris a :class:SkillAgentoverclient; passskill_manager=to use task-type planning skills from disk.executoris an :class:Agentoverclientwith the givenexecutor_tools.scoreris an :class:LLMJudgeScoreroverjudge_client(defaults toclientif not provided; a stronger separate model is recommended). Itscriteriais the workflow'scriteriaif supplied, otherwise a generic fallback.
For full control (e.g. a custom :class:Scorer), construct the
dataclass directly.
A2A interop¶
Optional Agent2Agent protocol support (requires the a2a extra: pip install 'aimu[a2a]').
Available only when aimu.agents.HAS_A2A is True. See A2A vs MCP
and Connect agents (A2A).
aimu.agents.RemoteAgent ¶
Bases: Runner
A remote A2A agent presented as a local synchronous Runner.
Construct via :meth:connect (it resolves the remote agent card first)::
remote = RemoteAgent.connect("http://localhost:9000")
print(remote.run("Summarise the news"))
# Composes like any other runner:
chain = Chain(agents=[local_agent, remote])
local = Agent(client, tools=[remote.as_tool()])
Holds a live async connection (an httpx.AsyncClient driven through an anyio
portal); keep the instance alive for the connection's lifetime and let it be garbage
collected (or call :meth:close) to tear the portal down.
connect
classmethod
¶
connect(url: str, *, name: Optional[str] = None, agent_card_path: str = DEFAULT_AGENT_CARD_PATH, timeout: float = 60.0) -> 'RemoteAgent'
Resolve the remote agent card at url and return a connected RemoteAgent.
name defaults to the remote agent card's name. Connection failures raise
:class:A2AConnectionError with the original exception chained.
run ¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, Iterator[StreamChunk]]
Send task to the remote agent and return its text response.
stream=True yields the full response as a single GENERATING chunk followed
by DONE; incremental token streaming over the portal is not supported on the
sync surface (use aimu.aio.a2a.RemoteAgent for real streaming).
aimu.agents.serve_a2a ¶
serve_a2a(runner: Runner, *, host: str = '127.0.0.1', port: int = 9000, url: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None, skills: Optional[list[AgentSkill]] = None, **uvicorn_kwargs: Any) -> None
Serve runner as an A2A agent over HTTP (blocking).
url is the externally reachable base URL advertised in the agent card; it defaults
to http://{host}:{port}/. Extra kwargs pass through to uvicorn.run.
aimu.agents.build_a2a_app ¶
build_a2a_app(runner: Runner, *, url: str, name: Optional[str] = None, description: Optional[str] = None, skills: Optional[list[AgentSkill]] = None, agent_card_path: str = DEFAULT_AGENT_CARD_PATH)
Build the Starlette ASGI app that serves runner over A2A (does not run it).
aimu.agents.A2AConnectionError ¶
Bases: RuntimeError
Raised when a :class:RemoteAgent fails to connect to or call a remote A2A agent.