PersistentAgent¶
Agent that checkpoints progress periodically so long-running tasks survive crashes, timeouts, or interruptions. Resume exactly where you left off.
Quick start¶
from shipit_agent.deep import PersistentAgent
agent = PersistentAgent(
llm=llm,
tools=[...],
checkpoint_dir="./checkpoints",
checkpoint_interval=5,
max_steps=50,
)
# Start a long task
result = agent.run("Write a comprehensive report", agent_id="report-1")
# If interrupted, resume from last checkpoint
result = agent.resume(agent_id="report-1")
# Check progress without resuming
status = agent.status("report-1")
# {"state": "paused", "steps_done": 20, "outputs_count": 20}
Parameters¶
| Parameter | Default | Description |
|---|---|---|
checkpoint_dir |
.shipit_checkpoints |
Where to save checkpoints |
checkpoint_interval |
5 |
Save every N steps |
max_steps |
50 |
Maximum total steps |
rag |
None |
Optional RAG instance forwarded to every inner Agent |
Streaming with checkpoints¶
PersistentAgent.stream(task, agent_id=...) yields one event flow per
step plus a final run_completed event. Checkpoints are still written
every checkpoint_interval steps so the run can be resumed if
interrupted mid-stream.
agent = PersistentAgent(
llm=llm,
rag=rag,
checkpoint_interval=2,
max_steps=10,
)
for event in agent.stream("Long research task", agent_id="task-42"):
if event.type == "step_started":
print(f"📍 step {event.payload['step']}")
elif event.type == "tool_called":
print(f" ▶ {event.message}")
elif event.type == "rag_sources":
for s in event.payload.get("sources", []):
print(f" 📎 [{s['index']}] {s['source']}")
elif event.type == "run_completed":
print(f"✓ {event.message} ({event.payload.get('steps')} steps)")
With Super RAG¶
from shipit_agent.rag import RAG, HashingEmbedder
rag = RAG.default(embedder=HashingEmbedder(dimension=512))
rag.index_file("docs/research_brief.md")
agent = PersistentAgent(llm=llm, rag=rag, checkpoint_interval=3, max_steps=20)
result = agent.run("Long research task", agent_id="task-1")
print("sources cited across the run:")
for src in result.rag_sources:
print(f" [{src.index}] {src.source}")