Quick Start - RAG Pipeline
Build a streaming RAG pipeline that ingests documents and answers questions with context.
What You’ll Build
A RAG application that:
- Ingests documents from files or inline content
- Stores embeddings in a vector database
- Retrieves relevant context for queries
- Streams responses with Server-Sent Events (SSE)
- Persists conversation state with SQLite checkpointing
Prerequisites
- Rust 1.75+ installed
- OpenAI API key (for embeddings and LLM)
- Optional: Document files in a
fixtures/directory
Setup
Create a new Rust project:
cargo new my-rag-app
cd my-rag-app
Add dependencies to Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
wesichain-core = "0.3.0"
wesichain-rag = "0.3.0"
wesichain-checkpoint-sqlite = "0.3.0"
futures = "0.3"
The Code
Create src/main.rs:
use std::error::Error;
use std::path::Path;
use futures::StreamExt;
use wesichain_checkpoint_sqlite::SqliteCheckpointer;
use wesichain_core::AgentEvent;
use wesichain_rag::adapters::sse::{done_event, ping_event, to_sse_event};
use wesichain_rag::{RagQueryRequest, WesichainRag};
async fn ingest_and_query(
rag: &WesichainRag,
query: &str,
thread_id: &str,
) -> Result<String, Box<dyn Error>> {
let stream = rag
.query_stream(RagQueryRequest {
query: query.to_string(),
thread_id: Some(thread_id.to_string()),
})
.await?;
tokio::pin!(stream);
let mut answer = String::new();
while let Some(item) = stream.next().await {
match item {
Ok(AgentEvent::Final { content, .. }) => answer = content,
Ok(event) => print!("{}", to_sse_event(&event)),
Err(error) => {
print!(
"{}",
to_sse_event(&AgentEvent::Error {
message: error.to_string(),
step: 999,
recoverable: false,
source: Some("simple-rag-stream".to_string()),
})
);
break;
}
}
}
Ok(answer)
}
async fn async_main() -> Result<(), Box<dyn Error>> {
// Initialize SQLite checkpointer for session persistence
let db_path = std::env::temp_dir().join("wesichain-rag-sessions.db");
let database_url = format!("sqlite://{}", db_path.display());
let checkpointer = SqliteCheckpointer::builder(database_url)
.max_connections(1)
.build()
.await?;
// Build RAG pipeline
let rag = WesichainRag::builder()
.with_checkpointer(checkpointer)
.with_max_retries(2)
.build()?;
// Ingest documents from fixtures/ or use inline documents
let fixture_dir = Path::new("fixtures");
if fixture_dir.exists() {
println!("Discovering documents in fixtures/...");
for entry in std::fs::read_dir(fixture_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_file() {
print!("Ingesting: {} ... ", path.display());
match rag.process_file(&path).await {
Ok(_) => println!("OK"),
Err(e) => println!("ERROR: {e}"),
}
}
}
} else {
// Fallback: ingest inline documents
rag.add_documents(vec![
wesichain_core::Document {
id: "demo-1".to_string(),
content: "Paris is the capital of France. It has a population of over 2 million."
.to_string(),
metadata: Default::default(),
embedding: None,
},
wesichain_core::Document {
id: "demo-2".to_string(),
content: "France is known for its cuisine, wine, and the Eiffel Tower in Paris."
.to_string(),
metadata: Default::default(),
embedding: None,
},
]).await?;
}
// Query with session persistence
let thread_id = "demo-session-001";
println!("Query 1: What is the capital of France?");
let answer1 = ingest_and_query(&rag, "What is the capital of France?", thread_id).await?;
println!("Answer: {}\n", answer1);
// Follow-up query (demonstrates session resumption)
println!("Query 2: What else is it known for? (follow-up)");
let answer2 = ingest_and_query(&rag, "What else is it known for?", thread_id).await?;
println!("Answer: {}\n", answer2);
// New session (fresh context)
println!("Query 3: What is the population? (new session)");
let answer3 = ingest_and_query(&rag, "What is the population?", "demo-session-002").await?;
println!("Answer: {}\n", answer3);
Ok(())
}
fn main() -> Result<(), Box<dyn Error>> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(async_main())
}
Run It
export OPENAI_API_KEY="your-key"
cargo run
Key Concepts
Document Ingestion
Wesichain RAG supports multiple document sources:
// From files (txt, docx, pdf)
rag.process_file(Path::new("document.pdf")).await?;
// From inline content
rag.add_documents(vec![
Document { id: "1".into(), content: "...".into(), metadata, embedding: None },
]).await?;
Streaming Responses
The query_stream method returns a stream of AgentEvent:
use wesichain_core::AgentEvent;
while let Some(item) = stream.next().await {
match item {
Ok(AgentEvent::Final { content, .. }) => {
// Final answer
}
Ok(AgentEvent::Trace { message, .. }) => {
// Intermediate trace
}
Ok(event) => {
// Other events: Token, ToolCall, ToolResult, Error
}
Err(error) => {
// Handle error
}
}
}
SSE Adapter
Convert events to Server-Sent Events for web APIs:
use wesichain_rag::adapters::sse::to_sse_event;
// In a web handler (e.g., axum)
let sse_stream = stream.map(|event| {
to_sse_event(&event?)
});
Session Persistence
Use thread_id to maintain conversation context:
// Same thread_id = resumed session with previous context
let answer1 = rag.query_stream(RagQueryRequest {
query: "First question".into(),
thread_id: Some("session-1".into()),
}).await?;
// Follow-up uses same thread_id
let answer2 = rag.query_stream(RagQueryRequest {
query: "Follow-up question".into(),
thread_id: Some("session-1".into()), // Same session
}).await?;
Next Steps
Updated
Edit on GitHub