Checkpointing & Persistence
Save and resume agent execution state with Wesichain’s checkpointing system.
Overview
Checkpointing enables:
- Resumable workflows: Pick up where you left off after crashes or restarts
- Long-running agents: Persist state between interactions
- Debugging: Inspect and replay agent execution
- Human-in-the-loop: Pause and resume for human review
Checkpoint Implementations
| Backend | Crate | Use Case |
|---|---|---|
| In-Memory | wesichain-graph | Development, testing |
| SQLite | wesichain-checkpoint-sqlite | Local apps, single-node |
| PostgreSQL | wesichain-checkpoint-postgres | Production, distributed |
Quick Start
1. Add Dependency
[dependencies]
wesichain-checkpoint-sqlite = "0.2.1"
2. Initialize Checkpointer
use wesichain_checkpoint_sqlite::SqliteCheckpointer;
let checkpointer = SqliteCheckpointer::builder("sqlite://./checkpoints.db")
.max_connections(5)
.build()
.await?;
3. Attach to Graph
use wesichain_graph::{GraphBuilder, InMemoryCheckpointer};
let graph = GraphBuilder::new()
.add_node("agent", agent_node)
.set_entry("agent")
.with_checkpointer(checkpointer.clone(), "thread-123")
.build();
Example: Persistent Conversation
// Run: cargo run -p wesichain-graph --example persistent_conversation
use async_trait::async_trait;
use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
use wesichain_core::{Runnable, StreamEvent, WesichainError};
use wesichain_graph::{
Checkpointer, GraphBuilder, GraphError, GraphState, InMemoryCheckpointer, StateSchema,
StateUpdate,
};
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)]
struct ConversationState {
messages: Vec<String>,
turn: u32,
}
impl StateSchema for ConversationState {
type Update = Self;
fn apply(current: &Self, update: Self::Update) -> Self {
let mut messages = current.messages.clone();
messages.extend(update.messages);
let turn = if update.turn == 0 {
current.turn
} else {
update.turn
};
Self { messages, turn }
}
}
struct Reply;
#[async_trait]
impl Runnable<GraphState<ConversationState>, StateUpdate<ConversationState>> for Reply {
async fn invoke(
&self,
input: GraphState<ConversationState>,
) -> Result<StateUpdate<ConversationState>, WesichainError> {
let next_turn = input.data.turn + 1;
let last = input.data.messages.last().map(String::as_str).unwrap_or("");
let reply = format!("Turn {next_turn}: got '{last}'");
Ok(StateUpdate::new(ConversationState {
messages: vec![reply],
turn: next_turn,
}))
}
fn stream(
&self,
_input: GraphState<ConversationState>,
) -> futures::stream::BoxStream<'_, Result<StreamEvent, WesichainError>> {
stream::empty().boxed()
}
}
#[tokio::main]
async fn main() -> Result<(), GraphError> {
let checkpointer = InMemoryCheckpointer::default();
let graph = GraphBuilder::new()
.add_node("reply", Reply)
.set_entry("reply")
.with_checkpointer(checkpointer.clone(), "thread-1")
.build();
// First interaction
let state = GraphState::new(ConversationState {
messages: vec!["Hello".to_string()],
turn: 0,
});
let out = graph.invoke_graph(state).await?;
println!("After first turn: {}", out.data.messages.last().unwrap());
// Load checkpoint and resume
let checkpoint = checkpointer.load("thread-1").await?.expect("checkpoint");
println!("Resuming from step {}", checkpoint.step);
let resumed = graph.invoke_graph(checkpoint.state).await?;
println!("After resume: {}", resumed.data.messages.last().unwrap());
Ok(())
}
Checkpoint Operations
Save
Checkpoints are saved automatically during graph execution when:
- A node completes successfully
- An interrupt is triggered
- Explicitly requested via
CheckpointSavertrait
Load
// Load checkpoint by thread ID
let checkpoint = checkpointer.load("thread-123").await?;
if let Some(cp) = checkpoint {
println!("Resuming from step {} at node {}", cp.step, cp.node);
let state = cp.state; // GraphState<S>
}
List
// List all saved checkpoints
let checkpoints = checkpointer.list().await?;
for cp in checkpoints {
println!("Thread: {}, Step: {}, Node: {}",
cp.thread_id, cp.step, cp.node);
}
Delete
// Delete specific checkpoint
checkpointer.delete("thread-123").await?;
// Delete all checkpoints (use with caution!)
checkpointer.clear().await?;
State Merging
When resuming, state updates are merged using your StateSchema::apply implementation:
impl StateSchema for MyState {
type Update = Self;
fn apply(current: &Self, update: Self::Update) -> Self {
// Merge logic determines how updates combine
Self {
// Messages accumulate
messages: {
let mut msgs = current.messages.clone();
msgs.extend(update.messages);
msgs
},
// Answer replaces if present
answer: update.answer.or_else(|| current.answer.clone()),
// Other fields...
..current.clone()
}
}
}
SQLite Configuration
Basic Setup
use wesichain_checkpoint_sqlite::SqliteCheckpointer;
let checkpointer = SqliteCheckpointer::builder("sqlite://./app.db")
.build()
.await?;
With Connection Pool
let checkpointer = SqliteCheckpointer::builder("sqlite://./app.db")
.max_connections(10)
.min_connections(2)
.build()
.await?;
In-Memory (Testing)
let checkpointer = SqliteCheckpointer::builder("sqlite::memory:")
.build()
.await?;
PostgreSQL Configuration
use wesichain_checkpoint_postgres::PostgresCheckpointer;
let checkpointer = PostgresCheckpointer::builder(
"postgres://user:pass@localhost/wesichain"
)
.max_connections(20)
.build()
.await?;
Thread ID Patterns
Use consistent thread ID patterns for different use cases:
// Per-user sessions
format!("user-{}", user_id)
// Per-conversation
format!("conv-{}", conversation_uuid)
// Per-workflow run
format!("run-{}", uuid::Uuid::new_v4())
// Time-bucketed (for cleanup)
format!("session-{}-{}", date, user_id)
Cleanup Strategies
Automatic TTL
// Implement cleanup in your checkpointer wrapper
async fn cleanup_old_checkpoints(&self, days: i64) -> Result<()> {
let cutoff = Utc::now() - Duration::days(days);
// Delete checkpoints older than cutoff
}
Scheduled Cleanup
// Run cleanup periodically
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::hours(24));
loop {
interval.tick().await;
checkpointer.cleanup_old(30).await.ok(); // Keep 30 days
}
});
Best Practices
- Use descriptive thread IDs - Include user/conversation context
- Handle missing checkpoints - Always check
Option<Checkpoint> - Implement proper merge - Ensure state updates combine correctly
- Consider state size - Large states impact checkpoint performance
- Clean up old checkpoints - Prevent unbounded storage growth
Next Steps
Updated
Edit on GitHub