Wesichain

Checkpointing & Persistence

Save and resume agent execution state with Wesichain’s checkpointing system.

Overview

Checkpointing enables:

  • Resumable workflows: Pick up where you left off after crashes or restarts
  • Long-running agents: Persist state between interactions
  • Debugging: Inspect and replay agent execution
  • Human-in-the-loop: Pause and resume for human review

Checkpoint Implementations

BackendCrateUse Case
In-Memorywesichain-graphDevelopment, testing
SQLitewesichain-checkpoint-sqliteLocal apps, single-node
PostgreSQLwesichain-checkpoint-postgresProduction, distributed

Quick Start

1. Add Dependency

[dependencies]
wesichain-checkpoint-sqlite = "0.2.1"

2. Initialize Checkpointer

use wesichain_checkpoint_sqlite::SqliteCheckpointer;

let checkpointer = SqliteCheckpointer::builder("sqlite://./checkpoints.db")
    .max_connections(5)
    .build()
    .await?;

3. Attach to Graph

use wesichain_graph::{GraphBuilder, InMemoryCheckpointer};

let graph = GraphBuilder::new()
    .add_node("agent", agent_node)
    .set_entry("agent")
    .with_checkpointer(checkpointer.clone(), "thread-123")
    .build();

Example: Persistent Conversation

// Run: cargo run -p wesichain-graph --example persistent_conversation

use async_trait::async_trait;
use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
use wesichain_core::{Runnable, StreamEvent, WesichainError};
use wesichain_graph::{
    Checkpointer, GraphBuilder, GraphError, GraphState, InMemoryCheckpointer, StateSchema,
    StateUpdate,
};

#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)]
struct ConversationState {
    messages: Vec<String>,
    turn: u32,
}

impl StateSchema for ConversationState {
    type Update = Self;

    fn apply(current: &Self, update: Self::Update) -> Self {
        let mut messages = current.messages.clone();
        messages.extend(update.messages);
        let turn = if update.turn == 0 {
            current.turn
        } else {
            update.turn
        };
        Self { messages, turn }
    }
}

struct Reply;

#[async_trait]
impl Runnable<GraphState<ConversationState>, StateUpdate<ConversationState>> for Reply {
    async fn invoke(
        &self,
        input: GraphState<ConversationState>,
    ) -> Result<StateUpdate<ConversationState>, WesichainError> {
        let next_turn = input.data.turn + 1;
        let last = input.data.messages.last().map(String::as_str).unwrap_or("");
        let reply = format!("Turn {next_turn}: got '{last}'");
        Ok(StateUpdate::new(ConversationState {
            messages: vec![reply],
            turn: next_turn,
        }))
    }

    fn stream(
        &self,
        _input: GraphState<ConversationState>,
    ) -> futures::stream::BoxStream<'_, Result<StreamEvent, WesichainError>> {
        stream::empty().boxed()
    }
}

#[tokio::main]
async fn main() -> Result<(), GraphError> {
    let checkpointer = InMemoryCheckpointer::default();

    let graph = GraphBuilder::new()
        .add_node("reply", Reply)
        .set_entry("reply")
        .with_checkpointer(checkpointer.clone(), "thread-1")
        .build();

    // First interaction
    let state = GraphState::new(ConversationState {
        messages: vec!["Hello".to_string()],
        turn: 0,
    });

    let out = graph.invoke_graph(state).await?;
    println!("After first turn: {}", out.data.messages.last().unwrap());

    // Load checkpoint and resume
    let checkpoint = checkpointer.load("thread-1").await?.expect("checkpoint");
    println!("Resuming from step {}", checkpoint.step);

    let resumed = graph.invoke_graph(checkpoint.state).await?;
    println!("After resume: {}", resumed.data.messages.last().unwrap());

    Ok(())
}

Checkpoint Operations

Save

Checkpoints are saved automatically during graph execution when:

  • A node completes successfully
  • An interrupt is triggered
  • Explicitly requested via CheckpointSaver trait

Load

// Load checkpoint by thread ID
let checkpoint = checkpointer.load("thread-123").await?;

if let Some(cp) = checkpoint {
    println!("Resuming from step {} at node {}", cp.step, cp.node);
    let state = cp.state; // GraphState<S>
}

List

// List all saved checkpoints
let checkpoints = checkpointer.list().await?;
for cp in checkpoints {
    println!("Thread: {}, Step: {}, Node: {}",
        cp.thread_id, cp.step, cp.node);
}

Delete

// Delete specific checkpoint
checkpointer.delete("thread-123").await?;

// Delete all checkpoints (use with caution!)
checkpointer.clear().await?;

State Merging

When resuming, state updates are merged using your StateSchema::apply implementation:

impl StateSchema for MyState {
    type Update = Self;

    fn apply(current: &Self, update: Self::Update) -> Self {
        // Merge logic determines how updates combine
        Self {
            // Messages accumulate
            messages: {
                let mut msgs = current.messages.clone();
                msgs.extend(update.messages);
                msgs
            },
            // Answer replaces if present
            answer: update.answer.or_else(|| current.answer.clone()),
            // Other fields...
            ..current.clone()
        }
    }
}

SQLite Configuration

Basic Setup

use wesichain_checkpoint_sqlite::SqliteCheckpointer;

let checkpointer = SqliteCheckpointer::builder("sqlite://./app.db")
    .build()
    .await?;

With Connection Pool

let checkpointer = SqliteCheckpointer::builder("sqlite://./app.db")
    .max_connections(10)
    .min_connections(2)
    .build()
    .await?;

In-Memory (Testing)

let checkpointer = SqliteCheckpointer::builder("sqlite::memory:")
    .build()
    .await?;

PostgreSQL Configuration

use wesichain_checkpoint_postgres::PostgresCheckpointer;

let checkpointer = PostgresCheckpointer::builder(
    "postgres://user:pass@localhost/wesichain"
)
.max_connections(20)
.build()
.await?;

Thread ID Patterns

Use consistent thread ID patterns for different use cases:

// Per-user sessions
format!("user-{}", user_id)

// Per-conversation
format!("conv-{}", conversation_uuid)

// Per-workflow run
format!("run-{}", uuid::Uuid::new_v4())

// Time-bucketed (for cleanup)
format!("session-{}-{}", date, user_id)

Cleanup Strategies

Automatic TTL

// Implement cleanup in your checkpointer wrapper
async fn cleanup_old_checkpoints(&self, days: i64) -> Result<()> {
    let cutoff = Utc::now() - Duration::days(days);
    // Delete checkpoints older than cutoff
}

Scheduled Cleanup

// Run cleanup periodically
tokio::spawn(async move {
    let mut interval = tokio::time::interval(Duration::hours(24));
    loop {
        interval.tick().await;
        checkpointer.cleanup_old(30).await.ok(); // Keep 30 days
    }
});

Best Practices

  1. Use descriptive thread IDs - Include user/conversation context
  2. Handle missing checkpoints - Always check Option<Checkpoint>
  3. Implement proper merge - Ensure state updates combine correctly
  4. Consider state size - Large states impact checkpoint performance
  5. Clean up old checkpoints - Prevent unbounded storage growth

Next Steps

Updated Edit on GitHub