Skip to content

aimu.aio

Async surface. Mirrors the sync API one-for-one — same class names, different namespace. See how-to: use async for usage patterns and explanation: async design for why the surface is shaped this way.

Differences from the sync surface:

  • Every run(), chat(), generate() is async def.
  • Streaming returns AsyncIterator[StreamChunk] (consume with async for).
  • Parallel and concurrent_tool_calls=True use asyncio.TaskGroup instead of ThreadPoolExecutor.
  • In-process providers (AsyncHuggingFaceClient, AsyncLlamaCppClient) wrap an existing sync client; calling aio.client(HuggingFaceModel.X) directly raises.

Top-level

aimu.aio.chat async

chat(user_message: str, *, model: Union[str, Model], system: Optional[str] = None, generate_kwargs: Optional[dict] = None, stream: bool = False, images: Optional[list] = None, include: Optional[Iterable[Union[str, StreamingContentType]]] = None) -> Union[str, AsyncIterator[StreamChunk]]

One-shot async chat — builds a fresh client, sends one message, returns the response.

Example::

text = await aio.chat("Summarize this", model="anthropic:claude-sonnet-4-6")

async for chunk in await aio.chat("Tell me a story", model="ollama:qwen3.5:9b", stream=True):
    if chunk.is_text():
        print(chunk.content, end="")

aimu.aio.client

client(model: Union[str, Model, Any], *, system: Optional[str] = None, **kwargs: Any) -> AsyncModelClient

Construct an :class:AsyncModelClient from a model string, enum, or existing sync client.

For in-process providers (HuggingFace, LlamaCpp), pass an existing sync client to avoid loading model weights twice::

sync_client = aimu.client(HuggingFaceModel.LLAMA_70B)
async_client = aio.client(sync_client)

aimu.aio.AsyncModelClient

AsyncModelClient(model: Union[Model, ModelSpec, str, Any], **kwargs: Any)

Bases: AsyncBaseModelClient

Public factory for async provider-backed model clients.

Accepts a provider Model enum member, a "provider:model_id" string, or — for in-process providers — an existing sync client to wrap.

Examples::

# Cloud providers (separate sync/async clients are cheap)
client = AsyncModelClient("anthropic:claude-sonnet-4-6")
client = AsyncModelClient(OllamaModel.QWEN_3_8B)

# In-process providers — wrap an existing sync client to share weights
sync_client = aimu.client(HuggingFaceModel.LLAMA_70B)
async_client = AsyncModelClient(sync_client)

Hierarchy

aimu.aio.AsyncRunner

Bases: ABC

Abstract base for every concrete async agent and workflow.

messages abstractmethod property

messages: MessageHistory

Message histories of all sub-runners, keyed by runner name.

run abstractmethod async

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, AsyncIterator[StreamChunk]]

Run asynchronously (stream=False) or streaming (stream=True).

Agents

aimu.aio.Agent dataclass

Agent(model_client: AsyncBaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False)

Bases: AsyncRunner

Async equivalent of :class:aimu.agents.Agent.

Calls await model_client.chat() repeatedly until the model produces a turn without invoking tools, or max_iterations is reached.

Quick start::

from aimu.tools import tool
from aimu import aio

@tool
async def fetch(url: str) -> str:
    """Fetch the contents of a URL."""
    import httpx
    async with httpx.AsyncClient() as c:
        return (await c.get(url)).text[:500]

client = aio.client("anthropic:claude-sonnet-4-6")
agent = aio.Agent(client, "You are a helpful assistant.", tools=[fetch])
print(await agent.run("Fetch example.com"))

run async

run(task: str, generate_kwargs: Optional[dict[str, Any]] = None, stream: bool = False, images: Optional[list] = None) -> Union[str, AsyncIterator[StreamChunk]]

Run the async agentic loop. images attach only to the initial turn.

as_model_client

as_model_client() -> AsyncBaseModelClient

Return an :class:AsyncBaseModelClient view of this agent.

Each await client.chat() runs the full agent loop.

aimu.aio.SkillAgent dataclass

SkillAgent(model_client: AsyncBaseModelClient, system_message: Optional[str] = None, name: Optional[str] = None, tools: list[Callable] = list(), max_iterations: int = 10, continuation_prompt: str = DEFAULT_CONTINUATION_PROMPT, reset_messages_on_run: bool = False, skill_manager: SkillManager = SkillManager())

Bases: Agent

Async :class:Agent with filesystem-discovered skill injection.

On first run (or after a message reset) the SkillAgent appends the skill catalog to its system message and attaches an async skills :class:MCPClient so the model can call activate_skill to load full skill instructions on demand.

aimu.aio.OrchestratorAgent

Bases: AsyncRunner, ABC

Async base for the orchestrator + worker-tools pattern.

Subclasses define worker :class:Agent instances and @tool-decorated dispatch functions in __init__, then call :meth:_init_orchestrator to wire everything up. Worker dispatch functions should be async def so the orchestrator's concurrent_tool_calls=True actually overlaps work.

For the simple case of dispatching to a fixed list of workers, use :meth:assemble to skip subclassing entirely.

assemble classmethod

assemble(model_client: AsyncBaseModelClient, system_message: str, *, workers: list[Agent], name: str = 'orchestrator', concurrent_tool_calls: bool = True) -> 'OrchestratorAgent'

Build a ready-to-run async orchestrator from a list of worker agents.

Workflows

aimu.aio.Chain dataclass

Chain(agents: list, name: str = 'chain')

Bases: AsyncRunner

Async prompt-chaining: each step's output feeds the next step's input.

Steps run sequentially (the pattern requires it). Each step may be an :class:aimu.aio.Agent or a nested async workflow.

from_client classmethod

from_client(client: AsyncBaseModelClient, prompts: list[str], *, name: str = 'chain') -> Chain

Build a Chain from a single client and a list of step system_messages.

aimu.aio.Router dataclass

Router(routing_agent: Agent, handlers: dict[str, AsyncRunner], name: str = 'router', fallback: Optional[AsyncRunner] = None)

Bases: AsyncRunner

Async routing: classify the task, dispatch to a specialist handler.

aimu.aio.Parallel dataclass

Parallel(workers: list, name: str = 'parallel', aggregator: Optional[AsyncRunner] = None, separator: str = '\n\n---\n\n')

Bases: AsyncRunner

Async parallelization: run workers concurrently via asyncio.TaskGroup, aggregate.

Each worker receives the same task. An optional aggregator receives all worker outputs joined by separator. Without an aggregator, the joined output is returned.

Structured concurrency: if one worker raises, in-flight siblings are cancelled and an ExceptionGroup surfaces with all errors.

aimu.aio.EvaluatorOptimizer dataclass

EvaluatorOptimizer(generator: Agent, evaluator: Agent, name: str = 'evaluator_optimizer', max_rounds: int = 3, pass_keyword: str = 'PASS')

Bases: AsyncRunner

Async generate-evaluate-revise loop, identical semantics to sync version.

aimu.aio.PlanExecuteEvaluator dataclass

PlanExecuteEvaluator(planner: SkillAgent, executor: Agent, scorer: Scorer, criteria: Optional[str] = None, name: str = 'plan_execute_evaluator', max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None)

Bases: AsyncRunner

Async plan → execute → evaluate → replan-on-fail loop.

Same semantics as :class:aimu.agents.PlanExecuteEvaluator but with async def run() and awaited delegation to planner/executor.

The scorer's score() is sync (it's a CPU/judge-call concern, not an AIMU-async concern). If you wire an LLM judge, that judge call blocks the event loop unless wrapped — use asyncio.to_thread from your scorer's score() if needed.

from_client classmethod

from_client(client: AsyncBaseModelClient, *, judge_client: Optional[Any] = None, criteria: Optional[str] = None, executor_tools: Optional[list[Callable]] = None, skill_manager: Optional[SkillManager] = None, planner_system_message: Optional[str] = None, executor_system_message: Optional[str] = None, max_rounds: int = 3, pass_threshold: float = 0.7, pass_keyword: Optional[str] = None, name: str = 'plan_execute_evaluator') -> PlanExecuteEvaluator

Build a PlanExecuteEvaluator from a single async client.

The judge_client for the scorer may be sync or async (the scorer is sync; if it's an :class:LLMJudgeScorer it expects a sync client).

Tools

aimu.aio.MCPClient

MCPClient(*, config: Optional[dict] = None, server: Optional[FastMCP] = None, file: Optional[str] = None)

Async wrapper around a FastMCP Client.

Use the connect() classmethod factory to construct + connect in one await::

mcp = await MCPClient.connect(server=my_fastmcp_server)
try:
    tools = await mcp.get_tools()
    result = await mcp.call_tool("foo", {"x": 1})
finally:
    await mcp.aclose()

connect async classmethod

connect(*, config: Optional[dict] = None, server: Optional[FastMCP] = None, file: Optional[str] = None) -> MCPClient

Construct and connect in one await. Returns the live instance.

ping async

ping() -> list

Verify the connection is alive by listing tools.

get_tools async

get_tools() -> list[dict]

Return tools in OpenAI function-calling format.

aclose async

aclose() -> None

Close the underlying connection. Idempotent.