Async Runtime¶
AsyncAgentRuntime provides an async interface for running agents in async Python applications like FastAPI, Starlette, and aiohttp. It supports all the same features as the synchronous runtime — parallel tool execution, hooks, graceful failure recovery, context window management, and mid-run re-planning.
Quick start¶
import asyncio
from shipit_agent import AsyncAgentRuntime
from shipit_agent.llms import OpenAIChatLLM
async def main():
runtime = AsyncAgentRuntime(
llm=OpenAIChatLLM(model="gpt-4o-mini"),
prompt="You are a helpful assistant.",
)
state, response = await runtime.run("What is 2 + 2?")
print(response.content)
asyncio.run(main())
Streaming events¶
async def stream_example():
runtime = AsyncAgentRuntime(
llm=OpenAIChatLLM(model="gpt-4o-mini"),
prompt="You are a helpful assistant.",
)
async for event in runtime.stream("Search for Python news"):
print(f"{event.type:22s} {event.message}")
FastAPI integration¶
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from shipit_agent import AsyncAgentRuntime
from shipit_agent.llms import OpenAIChatLLM
import json
app = FastAPI()
@app.post("/chat")
async def chat(prompt: str):
runtime = AsyncAgentRuntime(
llm=OpenAIChatLLM(model="gpt-4o-mini"),
prompt="You are a helpful assistant.",
)
state, response = await runtime.run(prompt)
return {"output": response.content}
@app.post("/chat/stream")
async def chat_stream(prompt: str):
runtime = AsyncAgentRuntime(
llm=OpenAIChatLLM(model="gpt-4o-mini"),
prompt="You are a helpful assistant.",
)
async def event_generator():
async for event in runtime.stream(prompt):
yield json.dumps(event.to_dict()) + "\n"
return StreamingResponse(event_generator(), media_type="application/x-ndjson")
With tools and parallel execution¶
from shipit_agent import AsyncAgentRuntime, FunctionTool, AgentHooks
def search_web(query: str) -> str:
return f"Results for: {query}"
def fetch_url(url: str) -> str:
return f"Content of {url}"
runtime = AsyncAgentRuntime(
llm=llm,
prompt="You are a research assistant.",
tools=[
FunctionTool.from_callable(search_web),
FunctionTool.from_callable(fetch_url),
],
parallel_tool_execution=True, # run tools concurrently
hooks=AgentHooks(), # attach hooks
context_window_tokens=128000, # enable context management
)
state, response = await runtime.run("Research quantum computing advances")
How it works¶
The async runtime wraps synchronous LLM and tool calls in asyncio.run_in_executor(), so they run in thread pool workers without blocking the event loop. When parallel tool execution is enabled, multiple tools run as concurrent asyncio.Tasks via asyncio.gather().
Sync runtime Async runtime
────────────── ─────────────
threading.Thread asyncio.Task
└─ run() └─ await run()
└─ llm.complete() └─ await run_in_executor(llm.complete)
└─ tool.run() └─ await run_in_executor(tool.run)
stream() → queue.Queue stream() → asyncio.Queue
└─ yield from queue └─ async for event in queue
Constructor parameters¶
AsyncAgentRuntime accepts the same parameters as AgentRuntime:
| Parameter | Type | Default | Description |
|---|---|---|---|
llm |
LLM |
required | The LLM adapter to use |
prompt |
str |
required | System prompt |
tools |
list[Tool] |
[] |
Tools available to the agent |
mcps |
list[MCPServer] |
[] |
MCP servers to attach |
max_iterations |
int |
4 |
Maximum tool-calling iterations |
parallel_tool_execution |
bool |
False |
Run tools concurrently |
hooks |
AgentHooks |
None |
Lifecycle hooks |
context_window_tokens |
int |
0 |
Enable context compaction (0 = disabled) |
replan_interval |
int |
0 |
Re-plan every N iterations (0 = disabled) |
retry_policy |
RetryPolicy |
default | Retry configuration |
memory_store |
MemoryStore |
in-memory | Persistent memory |
session_store |
SessionStore |
in-memory | Session persistence |
trace_store |
TraceStore |
in-memory | Audit logging |
Note
The synchronous Agent class does not have an async mode. Use AsyncAgentRuntime directly for async applications. It's intentionally a runtime-level primitive rather than a high-level wrapper.