Skip to content

aimu.aio

Async surface. Mirrors the sync API one-for-one: same class names, different namespace. See how-to: use async for usage patterns and explanation: async design for why the surface is shaped this way.

Differences from the sync surface:

  • Every run(), chat(), generate() is async def.
  • Streaming returns AsyncIterator[StreamChunk] (consume with async for).
  • Parallel and concurrent_tool_calls=True use asyncio.TaskGroup instead of ThreadPoolExecutor.
  • In-process providers (AsyncHuggingFaceClient, AsyncLlamaCppClient) wrap an existing sync client; calling aio.client(HuggingFaceModel.X) directly raises.

Top-level

aimu.aio.chat async

chat(user_message: str, *, model: Union[str, Model, None] = None, system: Optional[str] = None, generate_kwargs: Optional[dict] = None, stream: bool = False, images: Optional[list] = None, include: Optional[Iterable[Union[str, StreamingContentType]]] = None) -> Union[str, AsyncIterator[StreamChunk]]

One-shot async chat: builds a fresh client, sends one message, returns the response.

Example::

text = await aio.chat("Summarize this", model="anthropic:claude-sonnet-4-6")

async for chunk in await aio.chat("Tell me a story", model="ollama:qwen3.5:9b", stream=True):
    if chunk.is_text():
        print(chunk.content, end="")

aimu.aio.client

client(model: Union[str, Model, Any, None] = None, *, system: Optional[str] = None, **kwargs: Any) -> AsyncModelClient

Construct an :class:AsyncModelClient from a model string, enum, or existing sync client.

For in-process providers (HuggingFace, LlamaCpp), pass an existing sync client to avoid loading model weights twice::

sync_client = aimu.client(HuggingFaceModel.LLAMA_70B)
async_client = aio.client(sync_client)

When model is omitted, a default is resolved from AIMU_LANGUAGE_MODEL or an already-available local model. The async path probes only Ollama and local OpenAI-compatible servers (an hf: default would need an explicit sync-client wrap).

aimu.aio.AsyncModelClient

AsyncModelClient(model: Union[Model, ModelSpec, str, Any], **kwargs: Any)

Bases: AsyncBaseModelClient

Public factory for async provider-backed model clients.

Accepts a provider Model enum member, a "provider:model_id" string, or for in-process providers, an existing sync client to wrap.

Examples::

# Cloud providers (separate sync/async clients are cheap)
client = AsyncModelClient("anthropic:claude-sonnet-4-6")
client = AsyncModelClient(OllamaModel.QWEN_3_8B)

# In-process providers: wrap an existing sync client to share weights
sync_client = aimu.client(HuggingFaceModel.LLAMA_70B)
async_client = AsyncModelClient(sync_client)

last_usage property writable

last_usage: Optional[dict]

Token usage of the most recent non-streaming response, or None.

Hierarchy

aimu.aio.AsyncRunner

Bases: ABC

Abstract base for every concrete async agent and workflow.

messages abstractmethod property

messages: MessageHistory

Message histories of all sub-runners, keyed by runner name.

run abstractmethod async

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, AsyncIterator[StreamChunk]]

Run asynchronously (stream=False) or streaming (stream=True).

as_tool

as_tool(*, name: Optional[str] = None, description: Optional[str] = None) -> Callable

Wrap this async runner as an async @tool-style callable: await tool(task).

Async mirror of :meth:aimu.agents.base.Runner.as_tool. The returned callable is an async def delegating to await self.run(task), so the @tool decorator marks it __tool_is_async__ = True and the async agent loop awaits it directly.

Agents

aimu.aio.Agent dataclass

Agent(model_client: AsyncBaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, final_answer_prompt: Optional[str] = None, deps: Optional[Any] = None)

Bases: _AgentLoopMixin, AsyncRunner

Async equivalent of :class:aimu.agents.Agent.

Calls await model_client.chat() repeatedly until the model produces a turn without invoking tools, or max_iterations is reached.

Quick start::

from aimu.tools import tool
from aimu import aio

@tool
async def fetch(url: str) -> str:
    """Fetch the contents of a URL."""
    import httpx
    async with httpx.AsyncClient() as c:
        return (await c.get(url)).text[:500]

client = aio.client("anthropic:claude-sonnet-4-6")
agent = aio.Agent(client, "You are a helpful assistant.", tools=[fetch])
print(await agent.run("Fetch example.com"))

run async

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None, tools: Optional[list[Callable]] = None, deps: Optional[Any] = None, schema: Optional[type] = None) -> Union[str, Any, AsyncIterator[StreamChunk]]

Run the async agentic loop. images attach only to the initial turn.

tools is a per-run override of the agent's configured self.tools; deps is a per-run override of the agent's self.deps (injected as ctx.deps into tools that declare a :class:~aimu.tools.ToolContext parameter); schema makes the run a single structured-output turn returning a validated instance. See the sync :meth:aimu.agents.Agent.run for full semantics.

as_model_client

as_model_client() -> AsyncBaseModelClient

Return an :class:AsyncBaseModelClient view of this agent.

Each await client.chat() runs the full agent loop.

aimu.aio.SkillAgent dataclass

SkillAgent(model_client: AsyncBaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, final_answer_prompt: Optional[str] = None, deps: Optional[Any] = None, skill_manager: SkillManager = SkillManager())

Bases: Agent

Async :class:Agent with filesystem-discovered skill injection.

On first run (or after a message reset) the SkillAgent appends the skill catalog to its system message and adds the async skills server's tools (via aio.MCPClient.as_tools()) to model_client.tools so the model can call activate_skill to load full skill instructions on demand.

aimu.aio.OrchestratorAgent

Bases: AsyncRunner, ABC

Async base for the orchestrator + worker-tools pattern.

Subclasses define worker :class:Agent instances and @tool-decorated dispatch functions in __init__, then call :meth:_init_orchestrator to wire everything up. Worker dispatch functions should be async def so the orchestrator's concurrent_tool_calls=True actually overlaps work.

For the simple case of dispatching to a fixed list of workers, use :meth:assemble to skip subclassing entirely.

assemble classmethod

assemble(model_client: AsyncBaseModelClient, system_message: str, *, workers: list[AsyncRunner], name: str = 'orchestrator', concurrent_tool_calls: bool = True, final_answer_prompt: Optional[str] = None) -> 'OrchestratorAgent'

Build a ready-to-run async orchestrator from a list of worker runners.

Each worker becomes an async callable tool via :meth:AsyncRunner.as_tool. Workers may be any :class:AsyncRunner (an async Agent, a workflow, or a remote A2A agent), not just Agent instances.

restore

restore(messages: list[dict]) -> None

Restore the inner orchestrator agent's state from a saved message list.

Workers are invoked as tools, so their own state is not part of the orchestrator's history; restore a worker directly if it needs resuming. See :meth:aimu.aio.Agent.restore for the full save/restore pattern.

Workflows

aimu.aio.Chain dataclass

Chain(agents: list, name: str = 'chain')

Bases: AsyncRunner

Async prompt-chaining: each step's output feeds the next step's input.

Steps run sequentially (the pattern requires it). Each step may be an :class:aimu.aio.Agent or a nested async workflow.

from_client classmethod

from_client(client: AsyncBaseModelClient, prompts: list[str], *, name: str = 'chain') -> Chain

Build a Chain from a single client and a list of step system_messages.

restore

restore(messages: list[dict], *, step: int = 0) -> None

Restore a chain step's state from a saved message list.

step (keyword-only) selects which step's agent to restore (default 0); subsequent steps start fresh on the next run(). Raises IndexError if step is out of range. See :meth:aimu.aio.Agent.restore for the pattern.

aimu.aio.Router dataclass

Router(routing_agent: Agent, handlers: dict[str, AsyncRunner], name: str = 'router', fallback: Optional[AsyncRunner] = None)

Bases: AsyncRunner

Async routing: classify the task, dispatch to a specialist handler.

restore

restore(messages: list[dict], *, route: Optional[str] = None) -> None

Restore one sub-runner's state from a saved message list.

route=None (default, keyword-only) restores the routing classifier; a route key restores that handler (raises KeyError listing the routes on a miss). Other sub-runners start fresh on the next run(). See :meth:aimu.aio.Agent.restore.

aimu.aio.Parallel dataclass

Parallel(workers: list, name: str = 'parallel', aggregator: Optional[AsyncRunner] = None, separator: str = '\n\n---\n\n')

Bases: AsyncRunner

Async parallelization: run workers concurrently via asyncio.TaskGroup, aggregate.

Each worker receives the same task. An optional aggregator receives all worker outputs joined by separator. Without an aggregator, the joined output is returned.

Structured concurrency: if one worker raises, in-flight siblings are cancelled and an ExceptionGroup surfaces with all errors.

restore

restore(messages: list[dict], *, worker: int = 0) -> None

Restore one worker's state from a saved message list.

worker (keyword-only) selects which worker by index (default 0). Other workers and the aggregator start fresh on the next run(). Raises IndexError if worker is out of range. See :meth:aimu.aio.Agent.restore.

aimu.aio.EvaluatorOptimizer dataclass

EvaluatorOptimizer(generator: Agent, evaluator: Agent, name: str = 'evaluator_optimizer', max_rounds: int = 3, pass_keyword: str = 'PASS')

Bases: AsyncRunner

Async generate-evaluate-revise loop, identical semantics to sync version.

restore

restore(messages: list[dict]) -> None

Restore the generator's state from a saved message list.

The evaluator starts fresh on the next round. See :meth:aimu.aio.Agent.restore.

aimu.aio.PlanExecuteEvaluator dataclass

PlanExecuteEvaluator(planner: SkillAgent, executor: Agent, scorer: Scorer, criteria: Optional[str] = None, name: str = 'plan_execute_evaluator', max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None)

Bases: AsyncRunner

Async plan → execute → evaluate → replan-on-fail loop.

Same semantics as :class:aimu.agents.PlanExecuteEvaluator but with async def run() and awaited delegation to planner/executor.

The scorer's score() is sync (it's a CPU/judge-call concern, not an AIMU-async concern). If you wire an LLM judge, that judge call blocks the event loop unless wrapped; use asyncio.to_thread from your scorer's score() if needed.

from_client classmethod

from_client(client: AsyncBaseModelClient, *, judge_client: Optional[Any] = None, criteria: Optional[str] = None, executor_tools: Optional[list[Callable]] = None, skill_manager: Optional[SkillManager] = None, planner_system_message: Optional[str] = None, executor_system_message: Optional[str] = None, max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None, name: str = 'plan_execute_evaluator') -> PlanExecuteEvaluator

Build a PlanExecuteEvaluator from a single async client.

The judge_client for the scorer may be sync or async (the scorer is sync; if it's an :class:LLMJudgeScorer it expects a sync client).

Tools

aimu.aio.MCPClient

MCPClient(*, config: Optional[dict] = None, server: Optional[FastMCP] = None, file: Optional[str] = None)

Async wrapper around a FastMCP Client.

Use the connect() classmethod factory to construct + connect in one await::

mcp = await MCPClient.connect(server=my_fastmcp_server)
try:
    tools = await mcp.get_tools()
    result = await mcp.call_tool("foo", {"x": 1})
finally:
    await mcp.aclose()

connect async classmethod

connect(*, config: Optional[dict] = None, server: Optional[FastMCP] = None, file: Optional[str] = None) -> MCPClient

Construct and connect in one await. Returns the live instance.

ping async

ping() -> list

Verify the connection is alive by listing tools.

get_tools async

get_tools() -> list[dict]

Return tools in OpenAI function-calling format.

as_tools async

as_tools() -> list

Return this server's tools as async @tool-style callables.

Async mirror of :meth:aimu.tools.MCPClient.as_tools. Each callable is an async def that awaits :meth:call_tool and returns the result's text content; it carries __tool_spec__, __tool_is_async__ = True, and __tool_is_streaming__ = False, so it drops into client.tools / aio.Agent(tools=...) and the async dispatcher awaits it directly::

mcp = await aio.MCPClient.connect(server=my_server)
agent = aio.Agent(client, tools=await mcp.as_tools())

The list is a snapshot (one list_tools() round-trip); call again to refresh.

aclose async

aclose() -> None

Close the underlying connection. Idempotent.

A2A interop

Async twin of aimu.agents.a2a (requires the a2a extra). aimu.aio.a2a.RemoteAgent uses the a2a-sdk async client natively (no anyio portal) and supports incremental message/stream streaming.

aimu.aio.RemoteAgent

RemoteAgent(client: A2AClient, httpx_client: AsyncClient, name: str, card: Any)

Bases: AsyncRunner

A remote A2A agent presented as a local asynchronous AsyncRunner.

Construct via the async :meth:connect::

remote = await RemoteAgent.connect("http://localhost:9000")
print(await remote.run("Summarise the news"))
async for chunk in await remote.run("Summarise", stream=True):
    ...

connect async classmethod

connect(url: str, *, name: Optional[str] = None, agent_card_path: str = DEFAULT_AGENT_CARD_PATH, timeout: float = 60.0) -> 'RemoteAgent'

Resolve the remote agent card at url and return a connected RemoteAgent.

run async

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, AsyncIterator[StreamChunk]]

Send task to the remote agent; return its text (or a chunk stream).

aimu.aio.serve_a2a

serve_a2a(runner: AsyncRunner, *, host: str = '127.0.0.1', port: int = 9000, url: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None, skills: Optional[list[AgentSkill]] = None, **uvicorn_kwargs: Any) -> None

Serve an async runner as an A2A agent over HTTP (blocking).

aimu.aio.build_a2a_app

build_a2a_app(runner: AsyncRunner, *, url: str, name: Optional[str] = None, description: Optional[str] = None, skills: Optional[list[AgentSkill]] = None, agent_card_path: str = DEFAULT_AGENT_CARD_PATH)

Build the Starlette ASGI app that serves an async runner over A2A (does not run it).