aimu.aio¶
Async surface. Mirrors the sync API one-for-one: same class names, different namespace. See how-to: use async for usage patterns and explanation: async design for why the surface is shaped this way.
Differences from the sync surface:
- Every
run(),chat(),generate()isasync def. - Streaming returns
AsyncIterator[StreamChunk](consume withasync for). Parallelandconcurrent_tool_calls=Trueuseasyncio.TaskGroupinstead ofThreadPoolExecutor.- In-process providers (
AsyncHuggingFaceClient,AsyncLlamaCppClient) wrap an existing sync client; callingaio.client(HuggingFaceModel.X)directly raises.
Top-level¶
aimu.aio.chat
async
¶
chat(user_message: str, *, model: Union[str, Model, None] = None, system: Optional[str] = None, generate_kwargs: Optional[dict] = None, stream: bool = False, images: Optional[list] = None, include: Optional[Iterable[Union[str, StreamingContentType]]] = None) -> Union[str, AsyncIterator[StreamChunk]]
One-shot async chat: builds a fresh client, sends one message, returns the response.
Example::
text = await aio.chat("Summarize this", model="anthropic:claude-sonnet-4-6")
async for chunk in await aio.chat("Tell me a story", model="ollama:qwen3.5:9b", stream=True):
if chunk.is_text():
print(chunk.content, end="")
aimu.aio.client ¶
client(model: Union[str, Model, Any, None] = None, *, system: Optional[str] = None, **kwargs: Any) -> AsyncModelClient
Construct an :class:AsyncModelClient from a model string, enum, or existing sync client.
For in-process providers (HuggingFace, LlamaCpp), pass an existing sync client to avoid loading model weights twice::
sync_client = aimu.client(HuggingFaceModel.LLAMA_70B)
async_client = aio.client(sync_client)
When model is omitted, a default is resolved from AIMU_LANGUAGE_MODEL or an
already-available local model. The async path probes only Ollama and local
OpenAI-compatible servers (an hf: default would need an explicit sync-client wrap).
aimu.aio.AsyncModelClient ¶
Bases: AsyncBaseModelClient
Public factory for async provider-backed model clients.
Accepts a provider Model enum member, a "provider:model_id" string, or
for in-process providers, an existing sync client to wrap.
Examples::
# Cloud providers (separate sync/async clients are cheap)
client = AsyncModelClient("anthropic:claude-sonnet-4-6")
client = AsyncModelClient(OllamaModel.QWEN_3_8B)
# In-process providers: wrap an existing sync client to share weights
sync_client = aimu.client(HuggingFaceModel.LLAMA_70B)
async_client = AsyncModelClient(sync_client)
last_usage
property
writable
¶
Token usage of the most recent non-streaming response, or None.
Hierarchy¶
aimu.aio.AsyncRunner ¶
Bases: ABC
Abstract base for every concrete async agent and workflow.
messages
abstractmethod
property
¶
Message histories of all sub-runners, keyed by runner name.
run
abstractmethod
async
¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, AsyncIterator[StreamChunk]]
Run asynchronously (stream=False) or streaming (stream=True).
as_tool ¶
Wrap this async runner as an async @tool-style callable: await tool(task).
Async mirror of :meth:aimu.agents.base.Runner.as_tool. The returned callable is an
async def delegating to await self.run(task), so the @tool decorator marks
it __tool_is_async__ = True and the async agent loop awaits it directly.
Agents¶
aimu.aio.Agent
dataclass
¶
Agent(model_client: AsyncBaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, final_answer_prompt: Optional[str] = None, deps: Optional[Any] = None)
Bases: _AgentLoopMixin, AsyncRunner
Async equivalent of :class:aimu.agents.Agent.
Calls await model_client.chat() repeatedly until the model produces a turn
without invoking tools, or max_iterations is reached.
Quick start::
from aimu.tools import tool
from aimu import aio
@tool
async def fetch(url: str) -> str:
"""Fetch the contents of a URL."""
import httpx
async with httpx.AsyncClient() as c:
return (await c.get(url)).text[:500]
client = aio.client("anthropic:claude-sonnet-4-6")
agent = aio.Agent(client, "You are a helpful assistant.", tools=[fetch])
print(await agent.run("Fetch example.com"))
run
async
¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None, tools: Optional[list[Callable]] = None, deps: Optional[Any] = None, schema: Optional[type] = None) -> Union[str, Any, AsyncIterator[StreamChunk]]
Run the async agentic loop. images attach only to the initial turn.
tools is a per-run override of the agent's configured self.tools; deps is a
per-run override of the agent's self.deps (injected as ctx.deps into tools that
declare a :class:~aimu.tools.ToolContext parameter); schema makes the run a single
structured-output turn returning a validated instance. See the sync
:meth:aimu.agents.Agent.run for full semantics.
as_model_client ¶
Return an :class:AsyncBaseModelClient view of this agent.
Each await client.chat() runs the full agent loop.
aimu.aio.SkillAgent
dataclass
¶
SkillAgent(model_client: AsyncBaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, final_answer_prompt: Optional[str] = None, deps: Optional[Any] = None, skill_manager: SkillManager = SkillManager())
Bases: Agent
Async :class:Agent with filesystem-discovered skill injection.
On first run (or after a message reset) the SkillAgent appends the skill catalog
to its system message and adds the async skills server's tools (via
aio.MCPClient.as_tools()) to model_client.tools so the model can call
activate_skill to load full skill instructions on demand.
aimu.aio.OrchestratorAgent ¶
Bases: AsyncRunner, ABC
Async base for the orchestrator + worker-tools pattern.
Subclasses define worker :class:Agent instances and @tool-decorated dispatch
functions in __init__, then call :meth:_init_orchestrator to wire everything up.
Worker dispatch functions should be async def so the orchestrator's
concurrent_tool_calls=True actually overlaps work.
For the simple case of dispatching to a fixed list of workers, use
:meth:assemble to skip subclassing entirely.
assemble
classmethod
¶
assemble(model_client: AsyncBaseModelClient, system_message: str, *, workers: list[AsyncRunner], name: str = 'orchestrator', concurrent_tool_calls: bool = True, final_answer_prompt: Optional[str] = None) -> 'OrchestratorAgent'
Build a ready-to-run async orchestrator from a list of worker runners.
Each worker becomes an async callable tool via :meth:AsyncRunner.as_tool. Workers
may be any :class:AsyncRunner (an async Agent, a workflow, or a remote A2A
agent), not just Agent instances.
restore ¶
Restore the inner orchestrator agent's state from a saved message list.
Workers are invoked as tools, so their own state is not part of the orchestrator's
history; restore a worker directly if it needs resuming. See
:meth:aimu.aio.Agent.restore for the full save/restore pattern.
Workflows¶
aimu.aio.Chain
dataclass
¶
Bases: AsyncRunner
Async prompt-chaining: each step's output feeds the next step's input.
Steps run sequentially (the pattern requires it). Each step may be an
:class:aimu.aio.Agent or a nested async workflow.
from_client
classmethod
¶
Build a Chain from a single client and a list of step system_messages.
restore ¶
Restore a chain step's state from a saved message list.
step (keyword-only) selects which step's agent to restore (default 0); subsequent
steps start fresh on the next run(). Raises IndexError if step is out of
range. See :meth:aimu.aio.Agent.restore for the pattern.
aimu.aio.Router
dataclass
¶
Router(routing_agent: Agent, handlers: dict[str, AsyncRunner], name: str = 'router', fallback: Optional[AsyncRunner] = None)
Bases: AsyncRunner
Async routing: classify the task, dispatch to a specialist handler.
restore ¶
Restore one sub-runner's state from a saved message list.
route=None (default, keyword-only) restores the routing classifier; a route key
restores that handler (raises KeyError listing the routes on a miss). Other
sub-runners start fresh on the next run(). See :meth:aimu.aio.Agent.restore.
aimu.aio.Parallel
dataclass
¶
Parallel(workers: list, name: str = 'parallel', aggregator: Optional[AsyncRunner] = None, separator: str = '\n\n---\n\n')
Bases: AsyncRunner
Async parallelization: run workers concurrently via asyncio.TaskGroup, aggregate.
Each worker receives the same task. An optional aggregator receives all worker
outputs joined by separator. Without an aggregator, the joined output is returned.
Structured concurrency: if one worker raises, in-flight siblings are cancelled
and an ExceptionGroup surfaces with all errors.
restore ¶
Restore one worker's state from a saved message list.
worker (keyword-only) selects which worker by index (default 0). Other workers and
the aggregator start fresh on the next run(). Raises IndexError if worker
is out of range. See :meth:aimu.aio.Agent.restore.
aimu.aio.EvaluatorOptimizer
dataclass
¶
EvaluatorOptimizer(generator: Agent, evaluator: Agent, name: str = 'evaluator_optimizer', max_rounds: int = 3, pass_keyword: str = 'PASS')
Bases: AsyncRunner
Async generate-evaluate-revise loop, identical semantics to sync version.
restore ¶
Restore the generator's state from a saved message list.
The evaluator starts fresh on the next round. See :meth:aimu.aio.Agent.restore.
aimu.aio.PlanExecuteEvaluator
dataclass
¶
PlanExecuteEvaluator(planner: SkillAgent, executor: Agent, scorer: Scorer, criteria: Optional[str] = None, name: str = 'plan_execute_evaluator', max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None)
Bases: AsyncRunner
Async plan → execute → evaluate → replan-on-fail loop.
Same semantics as :class:aimu.agents.PlanExecuteEvaluator but with
async def run() and awaited delegation to planner/executor.
The scorer's score() is sync (it's a CPU/judge-call concern, not an
AIMU-async concern). If you wire an LLM judge, that judge call blocks the
event loop unless wrapped; use asyncio.to_thread from your scorer's
score() if needed.
from_client
classmethod
¶
from_client(client: AsyncBaseModelClient, *, judge_client: Optional[Any] = None, criteria: Optional[str] = None, executor_tools: Optional[list[Callable]] = None, skill_manager: Optional[SkillManager] = None, planner_system_message: Optional[str] = None, executor_system_message: Optional[str] = None, max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None, name: str = 'plan_execute_evaluator') -> PlanExecuteEvaluator
Build a PlanExecuteEvaluator from a single async client.
The judge_client for the scorer may be sync or async (the scorer is
sync; if it's an :class:LLMJudgeScorer it expects a sync client).
Tools¶
aimu.aio.MCPClient ¶
MCPClient(*, config: Optional[dict] = None, server: Optional[FastMCP] = None, file: Optional[str] = None)
Async wrapper around a FastMCP Client.
Use the connect() classmethod factory to construct + connect in one await::
mcp = await MCPClient.connect(server=my_fastmcp_server)
try:
tools = await mcp.get_tools()
result = await mcp.call_tool("foo", {"x": 1})
finally:
await mcp.aclose()
connect
async
classmethod
¶
connect(*, config: Optional[dict] = None, server: Optional[FastMCP] = None, file: Optional[str] = None) -> MCPClient
Construct and connect in one await. Returns the live instance.
as_tools
async
¶
Return this server's tools as async @tool-style callables.
Async mirror of :meth:aimu.tools.MCPClient.as_tools. Each callable is an
async def that awaits :meth:call_tool and returns the result's text content;
it carries __tool_spec__, __tool_is_async__ = True, and
__tool_is_streaming__ = False, so it drops into client.tools /
aio.Agent(tools=...) and the async dispatcher awaits it directly::
mcp = await aio.MCPClient.connect(server=my_server)
agent = aio.Agent(client, tools=await mcp.as_tools())
The list is a snapshot (one list_tools() round-trip); call again to refresh.
A2A interop¶
Async twin of aimu.agents.a2a (requires the a2a extra). aimu.aio.a2a.RemoteAgent uses the
a2a-sdk async client natively (no anyio portal) and supports incremental message/stream streaming.
aimu.aio.RemoteAgent ¶
Bases: AsyncRunner
A remote A2A agent presented as a local asynchronous AsyncRunner.
Construct via the async :meth:connect::
remote = await RemoteAgent.connect("http://localhost:9000")
print(await remote.run("Summarise the news"))
async for chunk in await remote.run("Summarise", stream=True):
...
connect
async
classmethod
¶
connect(url: str, *, name: Optional[str] = None, agent_card_path: str = DEFAULT_AGENT_CARD_PATH, timeout: float = 60.0) -> 'RemoteAgent'
Resolve the remote agent card at url and return a connected RemoteAgent.
run
async
¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, AsyncIterator[StreamChunk]]
Send task to the remote agent; return its text (or a chunk stream).
aimu.aio.serve_a2a ¶
serve_a2a(runner: AsyncRunner, *, host: str = '127.0.0.1', port: int = 9000, url: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None, skills: Optional[list[AgentSkill]] = None, **uvicorn_kwargs: Any) -> None
Serve an async runner as an A2A agent over HTTP (blocking).
aimu.aio.build_a2a_app ¶
build_a2a_app(runner: AsyncRunner, *, url: str, name: Optional[str] = None, description: Optional[str] = None, skills: Optional[list[AgentSkill]] = None, agent_card_path: str = DEFAULT_AGENT_CARD_PATH)
Build the Starlette ASGI app that serves an async runner over A2A (does not run it).