How to Build a RAG Pipeline with Embeddings and Vector Databases: A Production‑Ready Guide
A production‑grade RAG pipeline combines document chunking, dense vector embeddings stored in a vector database like PostgreSQL with pgvector, and hybrid retrieval with reranking to feed relevant context into an LLM prompt.
To build a RAG pipeline with embeddings and vector databases that can handle production traffic, you need more than a simple similarity search. The rohitg00/ai-engineering-from-scratch repository provides a complete curriculum showing how to implement semantic chunking, hybrid retrieval, and cache‑aware prompt assembly for domain‑specific question answering.
Stage 1: Document Ingestion and Semantic Chunking
Every RAG pipeline starts by breaking raw sources into semantically meaningful units. The curriculum demonstrates two specialized approaches: an AST‑aware chunker for codebases in phases/19-capstone-projects/02-rag-over-codebase/code/main.py, and a policy‑aware chunker for regulated documents in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py.
Each chunk carries metadata that drives later access control. In the production chatbot scaffold, the Chunk dataclass includes role and jurisdiction fields, allowing the system to filter results based on user permissions during retrieval.
Stage 2: Generate Dense Embeddings and Store in a Vector Database
Once chunked, each segment is transformed into a dense vector using a pretrained embedding model. According to the repository’s glossary in site/data.js, embeddings are defined as "dense vector representations of text." The production capstone specifically recommends Voyage‑3 or Nomic‑embed‑v2 models, storing vectors in PostgreSQL with pgvector as noted in phases/19-capstone-projects/08-production-rag-chatbot/docs/en.md (line 75).
Alternative vector stores mentioned include Qdrant, Vespa, and AstraDB. The key requirement is support for approximate nearest‑neighbor search with metadata filtering.
Stage 3: Hybrid Retrieval with Reciprocal Rank Fusion
At query time, the system embeds the user’s question and executes a hybrid search. The scaffold in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 81‑95) combines:
- Dense similarity using cosine distance via pgvector’s
<=>operator - BM25 lexical scores for keyword matching
- RRF (Reciprocal Rank Fusion) to merge the two ranked lists
After initial retrieval, a reranker—such as bge‑reranker‑v2‑gemma—reorders the top‑k chunks to maximize relevance before prompt assembly.
Stage 4: Prompt Assembly and Semantic Caching
The retrieved chunks are injected into a structured prompt. The PromptLayout class in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 102‑128) constructs a cache‑key from the stable system prompt, policy block, and retrieved context.
Because the prefix remains deterministic across similar queries, LLM providers can reuse the KV‑cache, yielding approximately 70 % cost savings as documented in phases/17-infrastructure-and-production/14-prompt-semantic-caching/docs/en.md.
Stage 5: Safety Guardrails and PII Handling
Before generation, the pipeline applies safety checks. The scaffold in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 61‑72) implements:
- Input guardrails using Llama Guard 4 to detect jailbreak attempts
- Output scrubbing with a PII detection model to prevent data leakage
These components mirror the constitutional safety harness evaluated in phases/19-capstone-projects/15-constitutional-safety-harness/outputs/skill-safety-harness.md.
Complete Implementation Example
Below is a minimal, runnable Python implementation that follows the repository’s production scaffold. It uses sentence‑transformers for embeddings and pgvector for storage, with role‑based access control and hybrid retrieval:
# example_rag.py – minimal production RAG pipeline
import hashlib
import re
from dataclasses import dataclass
from typing import List, Tuple
import psycopg2
from sentence_transformers import SentenceTransformer
import numpy as np
@dataclass
class Chunk:
doc_id: str
section: str
text: str
role: str # e.g. "analyst", "counsel", "public"
jurisdiction: str # e.g. "GDPR", "HIPAA", "any"
def anchor(self) -> str:
return f"{self.doc_id} {self.section}"
# Initialize embedder (swap for Voyage-3 or Nomic-embed-v2 in production)
EMBEDDER = SentenceTransformer("all-MiniLM-L6-v2")
def embed(text: str) -> np.ndarray:
return EMBEDDER.encode(text, normalize_embeddings=True)
def ingest(chunks: List[Chunk], conn) -> None:
"""Store embeddings in pgvector with metadata."""
cur = conn.cursor()
for c in chunks:
vec = embed(c.text).tolist()
cur.execute(
"""
INSERT INTO documents (anchor, text, role, jurisdiction, embedding)
VALUES (%s, %s, %s, %s, %s)
""",
(c.anchor(), c.text, c.role, c.jurisdiction, vec)
)
conn.commit()
def dense_hits(query: str, role: str, jurisdiction: str, conn, k: int = 5):
"""Retrieve using pgvector cosine similarity with metadata filtering."""
q_vec = embed(query).tolist()
cur = conn.cursor()
cur.execute(
"""
SELECT anchor, text, 1 - (embedding <=> %s) AS score
FROM documents
WHERE (role = %s OR role = 'public')
AND (jurisdiction = %s OR jurisdiction = 'any')
ORDER BY score DESC
LIMIT %s
""",
(q_vec, role, jurisdiction, k)
)
return cur.fetchall()
def hybrid_retrieve(query: str, role: str, jurisdiction: str, conn, k: int = 5):
"""Combine dense vectors and BM25 via Reciprocal Rank Fusion."""
dense = dense_hits(query, role, jurisdiction, conn, k*2)
# RRF: aggregate ranks from dense search
scores = {}
for rank, (anchor, text, _) in enumerate(dense):
scores[anchor] = scores.get(anchor, 0) + 1 / (60 + rank)
# Naive BM25 lexical scoring for illustration
q_terms = set(re.findall(r"\w+", query.lower()))
cur = conn.cursor()
cur.execute("SELECT anchor, text FROM documents")
lexical_results = cur.fetchall()
for rank, (anchor, text) in enumerate(lexical_results):
d_terms = set(re.findall(r"\w+", text.lower()))
bm25 = len(q_terms & d_terms) / (1 + len(d_terms) / 20)
if bm25 > 0:
scores[anchor] = scores.get(anchor, 0) + 1 / (60 + rank) * bm25
# Return top-k by fused score
ordered = sorted(scores.items(), key=lambda x: -x[1])[:k]
cur.execute(
"SELECT anchor, text, role, jurisdiction FROM documents WHERE anchor = ANY(%s)",
([a for a, _ in ordered],)
)
return cur.fetchall()
def make_prompt(query: str, chunks: List[Tuple], role: str, jurisdiction: str) -> str:
"""Assemble cache-aware prompt with citations."""
system = (
"You are a regulated-domain assistant. Cite every claim by (doc_id section). "
"Do not answer outside provided context."
)
policy = f"role={role} jurisdiction={jurisdiction}"
context = "\n".join(f"[{a}] {t}" for a, t, *_ in chunks)
return f"{system}\n{policy}\n{context}\nUser: {query}"
def cache_key(prompt: str) -> str:
"""Generate stable cache key from prefix (system + policy + context)."""
prefix = "\n".join(prompt.split("\n")[:3])
return hashlib.sha256(prefix.encode()).hexdigest()[:16]
# Example usage
if __name__ == "__main__":
corpus = [
Chunk("GDPR-2024", "s1", "Data subjects have the right to erasure within 30 days.", "analyst", "GDPR"),
Chunk("HIPAA-2024", "s2", "PHI must be destroyed within 60 days after contract termination.", "counsel", "HIPAA"),
Chunk("PUBLIC-FAQ", "q1", "Users can export their data via the portal.", "public", "any"),
]
conn = psycopg2.connect(dbname="rag_demo", user="postgres", password="postgres")
ingest(corpus, conn)
query = "How long do we have to delete user data after termination?"
hits = hybrid_retrieve(query, "analyst", "GDPR", conn, k=3)
prompt = make_prompt(query, hits, "analyst", "GDPR")
print("Cache key:", cache_key(prompt))
print("Prompt:\n", prompt)
Summary
- Semantic chunking with metadata (role, jurisdiction) enables fine‑grained access control during retrieval.
- Dense embeddings from models like Voyage‑3 or Nomic‑embed‑v2 should be stored in a vector database such as PostgreSQL + pgvector for scalable similarity search.
- Hybrid retrieval combining vector similarity and BM25 via Reciprocal Rank Fusion improves recall over dense‑only approaches.
- Cache‑aware prompt assembly reduces inference costs by up to 70 % through KV‑cache reuse when system prompts and context prefixes remain stable.
- Safety guardrails including input validation (Llama Guard) and PII scrubbing are essential for production deployments.
Frequently Asked Questions
What embedding model should I use for a production RAG pipeline?
The curriculum recommends Voyage‑3 or Nomic‑embed‑v2 for dense retrieval, as cited in phases/19-capstone-projects/08-production-rag-chatbot/docs/en.md (line 75). These models provide state‑of‑the‑art performance on domain‑specific retrieval tasks, though sentence-transformers models like all-MiniLM-L6-v2 work for prototypes.
Why use a vector database instead of storing embeddings in memory?
Vector databases like PostgreSQL with pgvector, Qdrant, or Vespa provide persistent storage, metadata filtering, and approximate nearest‑neighbor indices (e.g., IVF, HNSW) that scale to millions of vectors—capabilities that in‑memory numpy arrays cannot match for production workloads.
What is the advantage of hybrid retrieval over dense embedding search alone?
Hybrid retrieval combines dense semantic similarity (capturing meaning and paraphrases) with BM25 lexical matching (exact keyword matches). As implemented in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 81‑95), Reciprocal Rank Fusion merges these signals to improve recall on rare terms and out‑of‑domain queries that pure vector search might miss.
How does prompt caching reduce LLM inference costs?
When you generate a stable cache key from the system prompt, policy context, and retrieved documents—as shown in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 102‑128)—LLM providers can reuse precomputed KV‑cache for identical prefixes. According to phases/17-infrastructure-and-production/14-prompt-semantic-caching/docs/en.md, this optimization yields approximately 70 % cost savings on repeated queries with similar context.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →