aimu.aio¶
Async surface. Mirrors the sync API one-for-one — same class names, different namespace. See how-to: use async for usage patterns and explanation: async design for why the surface is shaped this way.
Differences from the sync surface:
- Every
run(),chat(),generate()isasync def. - Streaming returns
AsyncIterator[StreamChunk](consume withasync for). Parallelandconcurrent_tool_calls=Trueuseasyncio.TaskGroupinstead ofThreadPoolExecutor.- In-process providers (
AsyncHuggingFaceClient,AsyncLlamaCppClient) wrap an existing sync client; callingaio.client(HuggingFaceModel.X)directly raises.
Top-level¶
aimu.aio.chat
async
¶
chat(user_message: str, *, model: Union[str, Model], system: Optional[str] = None, generate_kwargs: Optional[dict] = None, stream: bool = False, images: Optional[list] = None, include: Optional[Iterable[Union[str, StreamingContentType]]] = None) -> Union[str, AsyncIterator[StreamChunk]]
One-shot async chat — builds a fresh client, sends one message, returns the response.
Example::
text = await aio.chat("Summarize this", model="anthropic:claude-sonnet-4-6")
async for chunk in await aio.chat("Tell me a story", model="ollama:qwen3.5:9b", stream=True):
if chunk.is_text():
print(chunk.content, end="")
aimu.aio.client ¶
client(model: Union[str, Model, Any], *, system: Optional[str] = None, **kwargs: Any) -> AsyncModelClient
Construct an :class:AsyncModelClient from a model string, enum, or existing sync client.
For in-process providers (HuggingFace, LlamaCpp), pass an existing sync client to avoid loading model weights twice::
sync_client = aimu.client(HuggingFaceModel.LLAMA_70B)
async_client = aio.client(sync_client)
aimu.aio.AsyncModelClient ¶
Bases: AsyncBaseModelClient
Public factory for async provider-backed model clients.
Accepts a provider Model enum member, a "provider:model_id" string, or
— for in-process providers — an existing sync client to wrap.
Examples::
# Cloud providers (separate sync/async clients are cheap)
client = AsyncModelClient("anthropic:claude-sonnet-4-6")
client = AsyncModelClient(OllamaModel.QWEN_3_8B)
# In-process providers — wrap an existing sync client to share weights
sync_client = aimu.client(HuggingFaceModel.LLAMA_70B)
async_client = AsyncModelClient(sync_client)
Hierarchy¶
aimu.aio.AsyncRunner ¶
Bases: ABC
Abstract base for every concrete async agent and workflow.
messages
abstractmethod
property
¶
Message histories of all sub-runners, keyed by runner name.
run
abstractmethod
async
¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, AsyncIterator[StreamChunk]]
Run asynchronously (stream=False) or streaming (stream=True).
Agents¶
aimu.aio.Agent
dataclass
¶
Agent(model_client: AsyncBaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False)
Bases: AsyncRunner
Async equivalent of :class:aimu.agents.Agent.
Calls await model_client.chat() repeatedly until the model produces a turn
without invoking tools, or max_iterations is reached.
Quick start::
from aimu.tools import tool
from aimu import aio
@tool
async def fetch(url: str) -> str:
"""Fetch the contents of a URL."""
import httpx
async with httpx.AsyncClient() as c:
return (await c.get(url)).text[:500]
client = aio.client("anthropic:claude-sonnet-4-6")
agent = aio.Agent(client, "You are a helpful assistant.", tools=[fetch])
print(await agent.run("Fetch example.com"))
run
async
¶
run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, AsyncIterator[StreamChunk]]
Run the async agentic loop. images attach only to the initial turn.
as_model_client ¶
Return an :class:AsyncBaseModelClient view of this agent.
Each await client.chat() runs the full agent loop.
aimu.aio.SkillAgent
dataclass
¶
SkillAgent(model_client: AsyncBaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, skill_manager: SkillManager = SkillManager())
Bases: Agent
Async :class:Agent with filesystem-discovered skill injection.
On first run (or after a message reset) the SkillAgent appends the skill catalog
to its system message and attaches an async skills :class:MCPClient so the
model can call activate_skill to load full skill instructions on demand.
aimu.aio.OrchestratorAgent ¶
Bases: AsyncRunner, ABC
Async base for the orchestrator + worker-tools pattern.
Subclasses define worker :class:Agent instances and @tool-decorated dispatch
functions in __init__, then call :meth:_init_orchestrator to wire everything up.
Worker dispatch functions should be async def so the orchestrator's
concurrent_tool_calls=True actually overlaps work.
For the simple case of dispatching to a fixed list of workers, use
:meth:assemble to skip subclassing entirely.
assemble
classmethod
¶
assemble(model_client: AsyncBaseModelClient, system_message: str, *, workers: list[Agent], name: str = 'orchestrator', concurrent_tool_calls: bool = True) -> 'OrchestratorAgent'
Build a ready-to-run async orchestrator from a list of worker agents.
Workflows¶
aimu.aio.Chain
dataclass
¶
Bases: AsyncRunner
Async prompt-chaining: each step's output feeds the next step's input.
Steps run sequentially (the pattern requires it). Each step may be an
:class:aimu.aio.Agent or a nested async workflow.
from_client
classmethod
¶
Build a Chain from a single client and a list of step system_messages.
aimu.aio.Router
dataclass
¶
aimu.aio.Parallel
dataclass
¶
Parallel(workers: list, name: str = 'parallel', aggregator: Optional[AsyncRunner] = None, separator: str = '\n\n---\n\n')
Bases: AsyncRunner
Async parallelization: run workers concurrently via asyncio.TaskGroup, aggregate.
Each worker receives the same task. An optional aggregator receives all worker
outputs joined by separator. Without an aggregator, the joined output is returned.
Structured concurrency: if one worker raises, in-flight siblings are cancelled
and an ExceptionGroup surfaces with all errors.
aimu.aio.EvaluatorOptimizer
dataclass
¶
aimu.aio.PlanExecuteEvaluator
dataclass
¶
PlanExecuteEvaluator(planner: SkillAgent, executor: Agent, scorer: Scorer, criteria: Optional[str] = None, name: str = 'plan_execute_evaluator', max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None)
Bases: AsyncRunner
Async plan → execute → evaluate → replan-on-fail loop.
Same semantics as :class:aimu.agents.PlanExecuteEvaluator but with
async def run() and awaited delegation to planner/executor.
The scorer's score() is sync (it's a CPU/judge-call concern, not an
AIMU-async concern). If you wire an LLM judge, that judge call blocks the
event loop unless wrapped — use asyncio.to_thread from your scorer's
score() if needed.
from_client
classmethod
¶
from_client(client: AsyncBaseModelClient, *, judge_client: Optional[Any] = None, criteria: Optional[str] = None, executor_tools: Optional[list[Callable]] = None, skill_manager: Optional[SkillManager] = None, planner_system_message: Optional[str] = None, executor_system_message: Optional[str] = None, max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None, name: str = 'plan_execute_evaluator') -> PlanExecuteEvaluator
Build a PlanExecuteEvaluator from a single async client.
The judge_client for the scorer may be sync or async (the scorer is
sync; if it's an :class:LLMJudgeScorer it expects a sync client).
Tools¶
aimu.aio.MCPClient ¶
MCPClient(*, config: Optional[dict] = None, server: Optional[FastMCP] = None, file: Optional[str] = None)
Async wrapper around a FastMCP Client.
Use the connect() classmethod factory to construct + connect in one await::
mcp = await MCPClient.connect(server=my_fastmcp_server)
try:
tools = await mcp.get_tools()
result = await mcp.call_tool("foo", {"x": 1})
finally:
await mcp.aclose()
connect
async
classmethod
¶
connect(*, config: Optional[dict] = None, server: Optional[FastMCP] = None, file: Optional[str] = None) -> MCPClient
Construct and connect in one await. Returns the live instance.