Human-in-the-Loop
Implement human review and approval checkpoints in your agent workflows using interrupt capabilities.
Overview
Wesichain graphs support pausing execution at specific nodes to allow human review. This is useful for:
- Approval workflows: Require human sign-off before critical actions
- Review loops: Inspect and modify agent outputs before continuation
- Debugging: Pause to inspect state during development
Basic Pattern
The human-in-the-loop pattern uses:
with_interrupt_before()orwith_interrupt_after()to pause executionCheckpointerto save and resume state- Graph rebuilding and state restoration for continuation
Example: Review Workflow
use async_trait::async_trait;
use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
use wesichain_core::{Runnable, StreamEvent, WesichainError};
use wesichain_graph::{
Checkpointer, GraphBuilder, GraphError, GraphState, InMemoryCheckpointer, StateSchema,
StateUpdate,
};
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)]
struct ReviewState {
value: i32,
approved: bool,
}
impl StateSchema for ReviewState {
type Update = Self;
fn apply(current: &Self, update: Self::Update) -> Self {
Self {
value: if update.value == 0 { current.value } else { update.value },
approved: update.approved || current.approved,
}
}
}
struct Prepare;
#[async_trait]
impl Runnable<GraphState<ReviewState>, StateUpdate<ReviewState>> for Prepare {
async fn invoke(
&self,
input: GraphState<ReviewState>,
) -> Result<StateUpdate<ReviewState>, WesichainError> {
Ok(StateUpdate::new(ReviewState {
value: input.data.value + 1,
approved: false,
}))
}
fn stream(
&self,
_input: GraphState<ReviewState>,
) -> futures::stream::BoxStream<'_, Result<StreamEvent, WesichainError>> {
stream::empty().boxed()
}
}
struct Review;
#[async_trait]
impl Runnable<GraphState<ReviewState>, StateUpdate<ReviewState>> for Review {
async fn invoke(
&self,
input: GraphState<ReviewState>,
) -> Result<StateUpdate<ReviewState>, WesichainError> {
Ok(StateUpdate::new(ReviewState {
value: input.data.value,
approved: true,
}))
}
fn stream(
&self,
_input: GraphState<ReviewState>,
) -> futures::stream::BoxStream<'_, Result<StreamEvent, WesichainError>> {
stream::empty().boxed()
}
}
#[tokio::main]
async fn main() -> Result<(), GraphError> {
let checkpointer = InMemoryCheckpointer::default();
// Build graph with interrupt before "review" node
let graph = GraphBuilder::new()
.add_node("prepare", Prepare)
.add_node("review", Review)
.add_edge("prepare", "review")
.set_entry("prepare")
.with_checkpointer(checkpointer.clone(), "thread-42")
.with_interrupt_before(["review"]) // Pause before review
.build();
let state = GraphState::new(ReviewState {
value: 0,
approved: false,
});
match graph.invoke_graph(state).await {
Err(GraphError::Interrupted) => {
// Graph paused for human review
let checkpoint = checkpointer.load("thread-42").await?.expect("checkpoint");
println!(
"Paused at step {} (node: {})",
checkpoint.step, checkpoint.node
);
// Human review happens here...
println!("Value prepared: {}", checkpoint.state.data.value);
// Resume execution
let resume_graph = GraphBuilder::new()
.add_node("prepare", Prepare)
.add_node("review", Review)
.add_edge("prepare", "review")
.set_entry("prepare")
.build();
let out = resume_graph.invoke_graph(checkpoint.state).await?;
println!("Approved: {}", out.data.approved);
}
Ok(out) => println!("Completed without interrupt: {}", out.data.approved),
Err(err) => return Err(err),
}
Ok(())
}
Run the Example
cargo run -p wesichain-graph --example human_in_loop_review
Key API Methods
Interrupt Configuration
// Interrupt before specific nodes
GraphBuilder::new()
.with_interrupt_before(["review", "approve"])
.build();
// Interrupt after specific nodes
GraphBuilder::new()
.with_interrupt_after(["generate"])
.build();
// Combine with checkpointing
GraphBuilder::new()
.with_checkpointer(checkpointer.clone(), "thread-id")
.with_interrupt_before(["critical_action"])
.build();
Handling Interruptions
match graph.invoke_graph(state).await {
Err(GraphError::Interrupted) => {
// Execution paused - inspect checkpoint
let checkpoint = checkpointer.load("thread-id").await?;
// ... human review logic ...
// Resume from saved state
let out = resumed_graph.invoke_graph(checkpoint.state).await?;
}
Ok(out) => { /* Completed without interrupt */ }
Err(err) => { /* Other error */ }
}
Checkpoint Structure
The checkpoint contains:
pub struct Checkpoint<S: StateSchema> {
pub thread_id: String,
pub node: String, // Current node name
pub step: u32, // Execution step number
pub state: GraphState<S>, // Full state snapshot
}
Real-World Use Cases
Content Moderation
// Pause before publishing to allow moderation
GraphBuilder::new()
.add_node("generate", GenerateContent)
.add_node("moderate", HumanModeration)
.add_node("publish", PublishContent)
.add_edge("generate", "moderate")
.add_edge("moderate", "publish")
.with_interrupt_before(["publish"])
.build();
Financial Approval
// Require approval for transactions above threshold
GraphBuilder::new()
.add_node("prepare_tx", PrepareTransaction)
.add_node("execute", ExecuteTransaction)
.add_conditional_edge("prepare_tx", |state| {
if state.data.amount > 10000 {
"approve" // Route to approval
} else {
"execute" // Auto-execute small amounts
}
})
.add_node("approve", HumanApproval)
.add_edge("approve", "execute")
.with_interrupt_before(["approve"])
.build();
Multi-Stage Review
// Multiple review stages with different reviewers
GraphBuilder::new()
.add_node("draft", DraftDocument)
.add_node("technical_review", TechnicalReview)
.add_node("legal_review", LegalReview)
.add_node("publish", Publish)
.add_edge("draft", "technical_review")
.add_edge("technical_review", "legal_review")
.add_edge("legal_review", "publish")
.with_interrupt_before(["technical_review", "legal_review"])
.build();
Best Practices
- Always use checkpointing with interrupts to enable resumption
- Use descriptive thread IDs to track review queues
- Handle both paths - with and without interrupts
- Consider timeouts for human review stages
- Log review decisions for audit trails
Next Steps
Updated
Edit on GitHub