Wesichain

Human-in-the-Loop

Implement human review and approval checkpoints in your agent workflows using interrupt capabilities.

Overview

Wesichain graphs support pausing execution at specific nodes to allow human review. This is useful for:

  • Approval workflows: Require human sign-off before critical actions
  • Review loops: Inspect and modify agent outputs before continuation
  • Debugging: Pause to inspect state during development

Basic Pattern

The human-in-the-loop pattern uses:

  1. with_interrupt_before() or with_interrupt_after() to pause execution
  2. Checkpointer to save and resume state
  3. Graph rebuilding and state restoration for continuation

Example: Review Workflow

use async_trait::async_trait;
use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
use wesichain_core::{Runnable, StreamEvent, WesichainError};
use wesichain_graph::{
    Checkpointer, GraphBuilder, GraphError, GraphState, InMemoryCheckpointer, StateSchema,
    StateUpdate,
};

#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)]
struct ReviewState {
    value: i32,
    approved: bool,
}

impl StateSchema for ReviewState {
    type Update = Self;

    fn apply(current: &Self, update: Self::Update) -> Self {
        Self {
            value: if update.value == 0 { current.value } else { update.value },
            approved: update.approved || current.approved,
        }
    }
}

struct Prepare;

#[async_trait]
impl Runnable<GraphState<ReviewState>, StateUpdate<ReviewState>> for Prepare {
    async fn invoke(
        &self,
        input: GraphState<ReviewState>,
    ) -> Result<StateUpdate<ReviewState>, WesichainError> {
        Ok(StateUpdate::new(ReviewState {
            value: input.data.value + 1,
            approved: false,
        }))
    }

    fn stream(
        &self,
        _input: GraphState<ReviewState>,
    ) -> futures::stream::BoxStream<'_, Result<StreamEvent, WesichainError>> {
        stream::empty().boxed()
    }
}

struct Review;

#[async_trait]
impl Runnable<GraphState<ReviewState>, StateUpdate<ReviewState>> for Review {
    async fn invoke(
        &self,
        input: GraphState<ReviewState>,
    ) -> Result<StateUpdate<ReviewState>, WesichainError> {
        Ok(StateUpdate::new(ReviewState {
            value: input.data.value,
            approved: true,
        }))
    }

    fn stream(
        &self,
        _input: GraphState<ReviewState>,
    ) -> futures::stream::BoxStream<'_, Result<StreamEvent, WesichainError>> {
        stream::empty().boxed()
    }
}

#[tokio::main]
async fn main() -> Result<(), GraphError> {
    let checkpointer = InMemoryCheckpointer::default();

    // Build graph with interrupt before "review" node
    let graph = GraphBuilder::new()
        .add_node("prepare", Prepare)
        .add_node("review", Review)
        .add_edge("prepare", "review")
        .set_entry("prepare")
        .with_checkpointer(checkpointer.clone(), "thread-42")
        .with_interrupt_before(["review"])  // Pause before review
        .build();

    let state = GraphState::new(ReviewState {
        value: 0,
        approved: false,
    });

    match graph.invoke_graph(state).await {
        Err(GraphError::Interrupted) => {
            // Graph paused for human review
            let checkpoint = checkpointer.load("thread-42").await?.expect("checkpoint");
            println!(
                "Paused at step {} (node: {})",
                checkpoint.step, checkpoint.node
            );

            // Human review happens here...
            println!("Value prepared: {}", checkpoint.state.data.value);

            // Resume execution
            let resume_graph = GraphBuilder::new()
                .add_node("prepare", Prepare)
                .add_node("review", Review)
                .add_edge("prepare", "review")
                .set_entry("prepare")
                .build();

            let out = resume_graph.invoke_graph(checkpoint.state).await?;
            println!("Approved: {}", out.data.approved);
        }
        Ok(out) => println!("Completed without interrupt: {}", out.data.approved),
        Err(err) => return Err(err),
    }

    Ok(())
}

Run the Example

cargo run -p wesichain-graph --example human_in_loop_review

Key API Methods

Interrupt Configuration

// Interrupt before specific nodes
GraphBuilder::new()
    .with_interrupt_before(["review", "approve"])
    .build();

// Interrupt after specific nodes
GraphBuilder::new()
    .with_interrupt_after(["generate"])
    .build();

// Combine with checkpointing
GraphBuilder::new()
    .with_checkpointer(checkpointer.clone(), "thread-id")
    .with_interrupt_before(["critical_action"])
    .build();

Handling Interruptions

match graph.invoke_graph(state).await {
    Err(GraphError::Interrupted) => {
        // Execution paused - inspect checkpoint
        let checkpoint = checkpointer.load("thread-id").await?;

        // ... human review logic ...

        // Resume from saved state
        let out = resumed_graph.invoke_graph(checkpoint.state).await?;
    }
    Ok(out) => { /* Completed without interrupt */ }
    Err(err) => { /* Other error */ }
}

Checkpoint Structure

The checkpoint contains:

pub struct Checkpoint<S: StateSchema> {
    pub thread_id: String,
    pub node: String,        // Current node name
    pub step: u32,           // Execution step number
    pub state: GraphState<S>, // Full state snapshot
}

Real-World Use Cases

Content Moderation

// Pause before publishing to allow moderation
GraphBuilder::new()
    .add_node("generate", GenerateContent)
    .add_node("moderate", HumanModeration)
    .add_node("publish", PublishContent)
    .add_edge("generate", "moderate")
    .add_edge("moderate", "publish")
    .with_interrupt_before(["publish"])
    .build();

Financial Approval

// Require approval for transactions above threshold
GraphBuilder::new()
    .add_node("prepare_tx", PrepareTransaction)
    .add_node("execute", ExecuteTransaction)
    .add_conditional_edge("prepare_tx", |state| {
        if state.data.amount > 10000 {
            "approve"  // Route to approval
        } else {
            "execute"  // Auto-execute small amounts
        }
    })
    .add_node("approve", HumanApproval)
    .add_edge("approve", "execute")
    .with_interrupt_before(["approve"])
    .build();

Multi-Stage Review

// Multiple review stages with different reviewers
GraphBuilder::new()
    .add_node("draft", DraftDocument)
    .add_node("technical_review", TechnicalReview)
    .add_node("legal_review", LegalReview)
    .add_node("publish", Publish)
    .add_edge("draft", "technical_review")
    .add_edge("technical_review", "legal_review")
    .add_edge("legal_review", "publish")
    .with_interrupt_before(["technical_review", "legal_review"])
    .build();

Best Practices

  1. Always use checkpointing with interrupts to enable resumption
  2. Use descriptive thread IDs to track review queues
  3. Handle both paths - with and without interrupts
  4. Consider timeouts for human review stages
  5. Log review decisions for audit trails

Next Steps

Updated Edit on GitHub