How to Build a RAG Pipeline with Embeddings and Vector Databases: A Production‑Ready Guide

A production‑grade RAG pipeline combines document chunking, dense vector embeddings stored in a vector database like PostgreSQL with pgvector, and hybrid retrieval with reranking to feed relevant context into an LLM prompt.

To build a RAG pipeline with embeddings and vector databases that can handle production traffic, you need more than a simple similarity search. The rohitg00/ai-engineering-from-scratch repository provides a complete curriculum showing how to implement semantic chunking, hybrid retrieval, and cache‑aware prompt assembly for domain‑specific question answering.

Stage 1: Document Ingestion and Semantic Chunking

Every RAG pipeline starts by breaking raw sources into semantically meaningful units. The curriculum demonstrates two specialized approaches: an AST‑aware chunker for codebases in phases/19-capstone-projects/02-rag-over-codebase/code/main.py, and a policy‑aware chunker for regulated documents in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py.

Each chunk carries metadata that drives later access control. In the production chatbot scaffold, the Chunk dataclass includes role and jurisdiction fields, allowing the system to filter results based on user permissions during retrieval.

Stage 2: Generate Dense Embeddings and Store in a Vector Database

Once chunked, each segment is transformed into a dense vector using a pretrained embedding model. According to the repository’s glossary in site/data.js, embeddings are defined as "dense vector representations of text." The production capstone specifically recommends Voyage‑3 or Nomic‑embed‑v2 models, storing vectors in PostgreSQL with pgvector as noted in phases/19-capstone-projects/08-production-rag-chatbot/docs/en.md (line 75).

Alternative vector stores mentioned include Qdrant, Vespa, and AstraDB. The key requirement is support for approximate nearest‑neighbor search with metadata filtering.

Stage 3: Hybrid Retrieval with Reciprocal Rank Fusion

At query time, the system embeds the user’s question and executes a hybrid search. The scaffold in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 81‑95) combines:

  • Dense similarity using cosine distance via pgvector’s <=> operator
  • BM25 lexical scores for keyword matching
  • RRF (Reciprocal Rank Fusion) to merge the two ranked lists

After initial retrieval, a reranker—such as bge‑reranker‑v2‑gemma—reorders the top‑k chunks to maximize relevance before prompt assembly.

Stage 4: Prompt Assembly and Semantic Caching

The retrieved chunks are injected into a structured prompt. The PromptLayout class in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 102‑128) constructs a cache‑key from the stable system prompt, policy block, and retrieved context.

Because the prefix remains deterministic across similar queries, LLM providers can reuse the KV‑cache, yielding approximately 70 % cost savings as documented in phases/17-infrastructure-and-production/14-prompt-semantic-caching/docs/en.md.

Stage 5: Safety Guardrails and PII Handling

Before generation, the pipeline applies safety checks. The scaffold in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 61‑72) implements:

  • Input guardrails using Llama Guard 4 to detect jailbreak attempts
  • Output scrubbing with a PII detection model to prevent data leakage

These components mirror the constitutional safety harness evaluated in phases/19-capstone-projects/15-constitutional-safety-harness/outputs/skill-safety-harness.md.

Complete Implementation Example

Below is a minimal, runnable Python implementation that follows the repository’s production scaffold. It uses sentence‑transformers for embeddings and pgvector for storage, with role‑based access control and hybrid retrieval:


# example_rag.py – minimal production RAG pipeline

import hashlib
import re
from dataclasses import dataclass
from typing import List, Tuple

import psycopg2
from sentence_transformers import SentenceTransformer
import numpy as np

@dataclass
class Chunk:
    doc_id: str
    section: str
    text: str
    role: str          # e.g. "analyst", "counsel", "public"

    jurisdiction: str  # e.g. "GDPR", "HIPAA", "any"

    def anchor(self) -> str:
        return f"{self.doc_id} {self.section}"

# Initialize embedder (swap for Voyage-3 or Nomic-embed-v2 in production)

EMBEDDER = SentenceTransformer("all-MiniLM-L6-v2")

def embed(text: str) -> np.ndarray:
    return EMBEDDER.encode(text, normalize_embeddings=True)

def ingest(chunks: List[Chunk], conn) -> None:
    """Store embeddings in pgvector with metadata."""
    cur = conn.cursor()
    for c in chunks:
        vec = embed(c.text).tolist()
        cur.execute(
            """
            INSERT INTO documents (anchor, text, role, jurisdiction, embedding)
            VALUES (%s, %s, %s, %s, %s)
            """,
            (c.anchor(), c.text, c.role, c.jurisdiction, vec)
        )
    conn.commit()

def dense_hits(query: str, role: str, jurisdiction: str, conn, k: int = 5):
    """Retrieve using pgvector cosine similarity with metadata filtering."""
    q_vec = embed(query).tolist()
    cur = conn.cursor()
    cur.execute(
        """
        SELECT anchor, text, 1 - (embedding <=> %s) AS score
        FROM documents
        WHERE (role = %s OR role = 'public')
          AND (jurisdiction = %s OR jurisdiction = 'any')
        ORDER BY score DESC
        LIMIT %s
        """,
        (q_vec, role, jurisdiction, k)
    )
    return cur.fetchall()

def hybrid_retrieve(query: str, role: str, jurisdiction: str, conn, k: int = 5):
    """Combine dense vectors and BM25 via Reciprocal Rank Fusion."""
    dense = dense_hits(query, role, jurisdiction, conn, k*2)
    
    # RRF: aggregate ranks from dense search

    scores = {}
    for rank, (anchor, text, _) in enumerate(dense):
        scores[anchor] = scores.get(anchor, 0) + 1 / (60 + rank)
    
    # Naive BM25 lexical scoring for illustration

    q_terms = set(re.findall(r"\w+", query.lower()))
    cur = conn.cursor()
    cur.execute("SELECT anchor, text FROM documents")
    lexical_results = cur.fetchall()
    
    for rank, (anchor, text) in enumerate(lexical_results):
        d_terms = set(re.findall(r"\w+", text.lower()))
        bm25 = len(q_terms & d_terms) / (1 + len(d_terms) / 20)
        if bm25 > 0:
            scores[anchor] = scores.get(anchor, 0) + 1 / (60 + rank) * bm25
    
    # Return top-k by fused score

    ordered = sorted(scores.items(), key=lambda x: -x[1])[:k]
    cur.execute(
        "SELECT anchor, text, role, jurisdiction FROM documents WHERE anchor = ANY(%s)",
        ([a for a, _ in ordered],)
    )
    return cur.fetchall()

def make_prompt(query: str, chunks: List[Tuple], role: str, jurisdiction: str) -> str:
    """Assemble cache-aware prompt with citations."""
    system = (
        "You are a regulated-domain assistant. Cite every claim by (doc_id section). "
        "Do not answer outside provided context."
    )
    policy = f"role={role} jurisdiction={jurisdiction}"
    context = "\n".join(f"[{a}] {t}" for a, t, *_ in chunks)
    return f"{system}\n{policy}\n{context}\nUser: {query}"

def cache_key(prompt: str) -> str:
    """Generate stable cache key from prefix (system + policy + context)."""
    prefix = "\n".join(prompt.split("\n")[:3])
    return hashlib.sha256(prefix.encode()).hexdigest()[:16]

# Example usage

if __name__ == "__main__":
    corpus = [
        Chunk("GDPR-2024", "s1", "Data subjects have the right to erasure within 30 days.", "analyst", "GDPR"),
        Chunk("HIPAA-2024", "s2", "PHI must be destroyed within 60 days after contract termination.", "counsel", "HIPAA"),
        Chunk("PUBLIC-FAQ", "q1", "Users can export their data via the portal.", "public", "any"),
    ]
    
    conn = psycopg2.connect(dbname="rag_demo", user="postgres", password="postgres")
    ingest(corpus, conn)
    
    query = "How long do we have to delete user data after termination?"
    hits = hybrid_retrieve(query, "analyst", "GDPR", conn, k=3)
    prompt = make_prompt(query, hits, "analyst", "GDPR")
    
    print("Cache key:", cache_key(prompt))
    print("Prompt:\n", prompt)

Summary

  • Semantic chunking with metadata (role, jurisdiction) enables fine‑grained access control during retrieval.
  • Dense embeddings from models like Voyage‑3 or Nomic‑embed‑v2 should be stored in a vector database such as PostgreSQL + pgvector for scalable similarity search.
  • Hybrid retrieval combining vector similarity and BM25 via Reciprocal Rank Fusion improves recall over dense‑only approaches.
  • Cache‑aware prompt assembly reduces inference costs by up to 70 % through KV‑cache reuse when system prompts and context prefixes remain stable.
  • Safety guardrails including input validation (Llama Guard) and PII scrubbing are essential for production deployments.

Frequently Asked Questions

What embedding model should I use for a production RAG pipeline?

The curriculum recommends Voyage‑3 or Nomic‑embed‑v2 for dense retrieval, as cited in phases/19-capstone-projects/08-production-rag-chatbot/docs/en.md (line 75). These models provide state‑of‑the‑art performance on domain‑specific retrieval tasks, though sentence-transformers models like all-MiniLM-L6-v2 work for prototypes.

Why use a vector database instead of storing embeddings in memory?

Vector databases like PostgreSQL with pgvector, Qdrant, or Vespa provide persistent storage, metadata filtering, and approximate nearest‑neighbor indices (e.g., IVF, HNSW) that scale to millions of vectors—capabilities that in‑memory numpy arrays cannot match for production workloads.

What is the advantage of hybrid retrieval over dense embedding search alone?

Hybrid retrieval combines dense semantic similarity (capturing meaning and paraphrases) with BM25 lexical matching (exact keyword matches). As implemented in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 81‑95), Reciprocal Rank Fusion merges these signals to improve recall on rare terms and out‑of‑domain queries that pure vector search might miss.

How does prompt caching reduce LLM inference costs?

When you generate a stable cache key from the system prompt, policy context, and retrieved documents—as shown in phases/19-capstone-projects/08-production-rag-chatbot/code/main.py (lines 102‑128)—LLM providers can reuse precomputed KV‑cache for identical prefixes. According to phases/17-infrastructure-and-production/14-prompt-semantic-caching/docs/en.md, this optimization yields approximately 70 % cost savings on repeated queries with similar context.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →