Agent — Streaming¶
agent.stream(prompt) is a generator that yields AgentEvents the
instant they're emitted. There is no buffering: each tool_called
event arrives before the tool runs, each tool_completed arrives the
moment the tool returns, and run_completed is the very last event.
Event types¶
| Type | Emitted when | Useful payload fields |
|---|---|---|
run_started |
The agent receives a user prompt | prompt |
step_started |
The runtime begins an LLM iteration | iteration, tool_count |
reasoning_started |
The model surfaces a thinking block | iteration |
reasoning_completed |
The thinking block is finalised | content |
planning_started |
Auto-planner is invoked | — |
planning_completed |
Planner output is ready | plan |
tool_called |
A tool is about to run | tool_name, arguments |
tool_completed |
A tool returned successfully | tool_name, metadata, output |
tool_failed |
A tool raised | tool_name, error |
interactive_request |
The agent needs the human to answer | question, options |
mcp_attached |
An MCP server has been wired in | server_name |
llm_retry |
The LLM call is being retried | attempt, error |
tool_retry |
A tool call is being retried | attempt, error |
context_snapshot |
Token usage update | usage, compaction_ratio |
rag_sources |
RAG sources captured during the run | sources |
run_completed |
The run is over | output, iterations |
See the Event Types reference for the complete schema.
Minimal example¶
for event in agent.stream("Search the web for SQLite news"):
print(f"[{event.type}] {event.message}")
Coloured terminal renderer¶
RESET = "\033[0m"
DIM = "\033[2m"
BOLD = "\033[1m"
CYAN = "\033[36m"
GREEN = "\033[32m"
YELL = "\033[33m"
for event in agent.stream("Find today's BTC price"):
if event.type == "run_started":
print(BOLD + "🚀 run started" + RESET)
elif event.type == "step_started":
print(DIM + f" · iter {event.payload.get('iteration')}" + RESET)
elif event.type == "reasoning_started":
print(YELL + " 🧠 thinking…" + RESET)
elif event.type == "reasoning_completed":
print(YELL + " 🧠 " + event.payload.get('content', '')[:80] + RESET)
elif event.type == "tool_called":
print(CYAN + " ▶ " + event.message + RESET)
elif event.type == "tool_completed":
print(GREEN + " ✓ " + event.message + RESET)
elif event.type == "rag_sources":
for s in event.payload.get("sources", []):
print(DIM + f" 📎 [{s['index']}] {s['source']}" + RESET)
elif event.type == "run_completed":
print(BOLD + "✅ done" + RESET)
print((event.payload.get('output') or '')[:300])
examples/02_streaming_with_reasoning.py ships a more polished
version of this you can copy verbatim.
Server-Sent Events (SSE)¶
For web UIs, every event has a built-in SSE encoder
(shipit_agent.packets.sse_event_packet):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from shipit_agent import Agent
from shipit_agent.packets import sse_event_packet, sse_result_packet
app = FastAPI()
agent = Agent.with_builtins(llm=llm)
@app.get("/stream")
async def stream(q: str):
def gen():
for event in agent.stream(q):
yield sse_event_packet(event)
# Final marker — useful for clients that watch for `event: done`
yield "event: done\ndata: {}\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
The browser side reads it with EventSource("/stream?q=…") and renders
events as they arrive.
WebSocket¶
from shipit_agent.packets import websocket_event_packet
@app.websocket("/ws")
async def ws(websocket):
await websocket.accept()
user_msg = await websocket.receive_text()
for event in agent.stream(user_msg):
await websocket.send_json(websocket_event_packet(event))
websocket_event_packet returns a JSON-friendly dict; pair it with
send_json for a clean transport.
Streaming inside a chat session¶
AgentChatSession.stream mirrors Agent.stream but also persists each
turn to the session store:
session = agent.chat_session(session_id="user-42")
for event in session.stream("Hi, what can you do?"):
print(event.message)
# Next turn — same session, same history
for event in session.stream("Search the web for SQLite news"):
print(event.message)
Subscribe to events programmatically with session.add_event_callback
or session.add_packet_callback if you want a callback API instead of
a generator.
Stopping a stream¶
A for event in agent.stream(...): loop can be exited with break —
the runtime cleans up the background thread automatically. For
explicit cancellation from another thread, raise StopIteration or
close the generator (stream.close()).
See also¶
- Examples — every snippet you can copy
- Event Types reference
- Packets module — SSE / WebSocket helpers
- Streaming guide — deeper background on the runtime