Wesichain

Quick Start - RAG Pipeline

Build a streaming RAG pipeline that ingests documents and answers questions with context.

What You’ll Build

A RAG application that:

  1. Ingests documents from files or inline content
  2. Stores embeddings in a vector database
  3. Retrieves relevant context for queries
  4. Streams responses with Server-Sent Events (SSE)
  5. Persists conversation state with SQLite checkpointing

Prerequisites

  • Rust 1.75+ installed
  • OpenAI API key (for embeddings and LLM)
  • Optional: Document files in a fixtures/ directory

Setup

Create a new Rust project:

cargo new my-rag-app
cd my-rag-app

Add dependencies to Cargo.toml:

[dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
wesichain-core = "0.3.0"
wesichain-rag = "0.3.0"
wesichain-checkpoint-sqlite = "0.3.0"
futures = "0.3"

The Code

Create src/main.rs:

use std::error::Error;
use std::path::Path;

use futures::StreamExt;
use wesichain_checkpoint_sqlite::SqliteCheckpointer;
use wesichain_core::AgentEvent;
use wesichain_rag::adapters::sse::{done_event, ping_event, to_sse_event};
use wesichain_rag::{RagQueryRequest, WesichainRag};

async fn ingest_and_query(
    rag: &WesichainRag,
    query: &str,
    thread_id: &str,
) -> Result<String, Box<dyn Error>> {
    let stream = rag
        .query_stream(RagQueryRequest {
            query: query.to_string(),
            thread_id: Some(thread_id.to_string()),
        })
        .await?;

    tokio::pin!(stream);
    let mut answer = String::new();

    while let Some(item) = stream.next().await {
        match item {
            Ok(AgentEvent::Final { content, .. }) => answer = content,
            Ok(event) => print!("{}", to_sse_event(&event)),
            Err(error) => {
                print!(
                    "{}",
                    to_sse_event(&AgentEvent::Error {
                        message: error.to_string(),
                        step: 999,
                        recoverable: false,
                        source: Some("simple-rag-stream".to_string()),
                    })
                );
                break;
            }
        }
    }

    Ok(answer)
}

async fn async_main() -> Result<(), Box<dyn Error>> {
    // Initialize SQLite checkpointer for session persistence
    let db_path = std::env::temp_dir().join("wesichain-rag-sessions.db");
    let database_url = format!("sqlite://{}", db_path.display());

    let checkpointer = SqliteCheckpointer::builder(database_url)
        .max_connections(1)
        .build()
        .await?;

    // Build RAG pipeline
    let rag = WesichainRag::builder()
        .with_checkpointer(checkpointer)
        .with_max_retries(2)
        .build()?;

    // Ingest documents from fixtures/ or use inline documents
    let fixture_dir = Path::new("fixtures");
    if fixture_dir.exists() {
        println!("Discovering documents in fixtures/...");
        for entry in std::fs::read_dir(fixture_dir)? {
            let entry = entry?;
            let path = entry.path();
            if path.is_file() {
                print!("Ingesting: {} ... ", path.display());
                match rag.process_file(&path).await {
                    Ok(_) => println!("OK"),
                    Err(e) => println!("ERROR: {e}"),
                }
            }
        }
    } else {
        // Fallback: ingest inline documents
        rag.add_documents(vec![
            wesichain_core::Document {
                id: "demo-1".to_string(),
                content: "Paris is the capital of France. It has a population of over 2 million."
                    .to_string(),
                metadata: Default::default(),
                embedding: None,
            },
            wesichain_core::Document {
                id: "demo-2".to_string(),
                content: "France is known for its cuisine, wine, and the Eiffel Tower in Paris."
                    .to_string(),
                metadata: Default::default(),
                embedding: None,
            },
        ]).await?;
    }

    // Query with session persistence
    let thread_id = "demo-session-001";

    println!("Query 1: What is the capital of France?");
    let answer1 = ingest_and_query(&rag, "What is the capital of France?", thread_id).await?;
    println!("Answer: {}\n", answer1);

    // Follow-up query (demonstrates session resumption)
    println!("Query 2: What else is it known for? (follow-up)");
    let answer2 = ingest_and_query(&rag, "What else is it known for?", thread_id).await?;
    println!("Answer: {}\n", answer2);

    // New session (fresh context)
    println!("Query 3: What is the population? (new session)");
    let answer3 = ingest_and_query(&rag, "What is the population?", "demo-session-002").await?;
    println!("Answer: {}\n", answer3);

    Ok(())
}

fn main() -> Result<(), Box<dyn Error>> {
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()?;

    runtime.block_on(async_main())
}

Run It

export OPENAI_API_KEY="your-key"
cargo run

Key Concepts

Document Ingestion

Wesichain RAG supports multiple document sources:

// From files (txt, docx, pdf)
rag.process_file(Path::new("document.pdf")).await?;

// From inline content
rag.add_documents(vec![
    Document { id: "1".into(), content: "...".into(), metadata, embedding: None },
]).await?;

Streaming Responses

The query_stream method returns a stream of AgentEvent:

use wesichain_core::AgentEvent;

while let Some(item) = stream.next().await {
    match item {
        Ok(AgentEvent::Final { content, .. }) => {
            // Final answer
        }
        Ok(AgentEvent::Trace { message, .. }) => {
            // Intermediate trace
        }
        Ok(event) => {
            // Other events: Token, ToolCall, ToolResult, Error
        }
        Err(error) => {
            // Handle error
        }
    }
}

SSE Adapter

Convert events to Server-Sent Events for web APIs:

use wesichain_rag::adapters::sse::to_sse_event;

// In a web handler (e.g., axum)
let sse_stream = stream.map(|event| {
    to_sse_event(&event?)
});

Session Persistence

Use thread_id to maintain conversation context:

// Same thread_id = resumed session with previous context
let answer1 = rag.query_stream(RagQueryRequest {
    query: "First question".into(),
    thread_id: Some("session-1".into()),
}).await?;

// Follow-up uses same thread_id
let answer2 = rag.query_stream(RagQueryRequest {
    query: "Follow-up question".into(),
    thread_id: Some("session-1".into()), // Same session
}).await?;

Next Steps

Updated Edit on GitHub