· 15 min read · ...

RAG in Practice: Building Systems That Actually Work

A hands-on guide to building Retrieval-Augmented Generation systems that go beyond toy demos. Covering architecture, chunking strategies, embedding models, vector databases, and the real challenges nobody talks about.

AIRAGLLMssoftware engineeringarchitecture
Table of Contents

Introduction

Everyone is building RAG systems right now. The concept is simple: instead of relying purely on what a large language model learned during training, you retrieve relevant documents at query time and inject them into the prompt as context. The LLM then generates answers grounded in your actual data. Simple, right?

In theory, yes. In practice, I have spent months debugging RAG pipelines that returned garbage, hallucinated confidently, or simply ignored the retrieved context altogether. The gap between a working demo and a production-grade RAG system is enormous, and most tutorials skip over the hard parts entirely.

In this post, I want to share what I have learned building RAG systems for real use cases. Not the “hello world” version where you load a PDF and ask questions about it. I am talking about systems that handle thousands of documents, serve concurrent users, and need to be accurate enough that people trust them for actual decisions.

What RAG Actually Is (And What It Is Not)

RAG stands for Retrieval-Augmented Generation. The original paper by Lewis et al. (2020) introduced it as a way to combine parametric memory (the LLM’s weights) with non-parametric memory (an external knowledge base). The idea is elegant: instead of fine-tuning a model every time your data changes, you keep the knowledge external and retrieve it on demand.

Here is the basic flow:

  1. User sends a query
  2. The query is converted into an embedding (a dense vector representation)
  3. The embedding is used to search a vector database for similar documents
  4. The top-k most relevant documents are retrieved
  5. These documents are injected into the LLM prompt as context
  6. The LLM generates an answer based on the retrieved context

What RAG is not: it is not a magic solution that makes LLMs stop hallucinating. It is not a replacement for fine-tuning in every scenario. And it is definitely not as simple as “just throw your documents into a vector database and call it a day.”

The reality is that RAG introduces an entirely new set of failure modes. Your retrieval can fail (wrong documents), your context window can overflow (too many documents), your chunking can split critical information across chunks, and your LLM can still hallucinate even with the right context sitting right there in the prompt.

Architecture: The Components You Need

Let me walk through the architecture of a production RAG system. I will use a concrete example: a customer support system that answers questions based on internal documentation, knowledge base articles, and past ticket resolutions.

Ingestion Pipeline

Before you can retrieve anything, you need to get your documents into the system. This is the ingestion pipeline, and it is where most of the complexity hides.

from langchain.document_loaders import (
    PyPDFLoader,
    UnstructuredHTMLLoader,
    CSVLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter

class IngestionPipeline:
    def __init__(self, embedding_model, vector_store):
        self.embedding_model = embedding_model
        self.vector_store = vector_store
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=512,
            chunk_overlap=64,
            separators=["\n\n", "\n", ". ", " ", ""]
        )

    def ingest_document(self, file_path: str, metadata: dict):
        # Load based on file type
        loader = self._get_loader(file_path)
        documents = loader.load()

        # Clean and normalize text
        for doc in documents:
            doc.page_content = self._clean_text(doc.page_content)
            doc.metadata.update(metadata)

        # Split into chunks
        chunks = self.splitter.split_documents(documents)

        # Generate embeddings and store
        self.vector_store.add_documents(chunks)

        return len(chunks)

    def _clean_text(self, text: str) -> str:
        # Remove excessive whitespace
        text = " ".join(text.split())
        # Remove special characters that confuse embeddings
        text = text.replace("\x00", "")
        return text.strip()

    def _get_loader(self, file_path: str):
        if file_path.endswith(".pdf"):
            return PyPDFLoader(file_path)
        elif file_path.endswith(".html"):
            return UnstructuredHTMLLoader(file_path)
        elif file_path.endswith(".csv"):
            return CSVLoader(file_path)
        else:
            raise ValueError(f"Unsupported file type: {file_path}")

A few things to note here. First, the RecursiveCharacterTextSplitter is my go-to choice for most use cases. It tries to split on natural boundaries (paragraphs, sentences) before falling back to character-level splits. Second, I always add metadata to chunks. This metadata becomes critical later for filtering, attribution, and debugging.

Embedding Model Selection

Choosing the right embedding model matters more than most people think. I have tested several options extensively, and here is my honest take.

OpenAI text-embedding-3-small: Good default choice. Affordable, fast, and produces 1536-dimensional vectors. For most use cases, this is where I start.

OpenAI text-embedding-3-large: Better quality, especially for complex technical content. 3072 dimensions by default, but you can reduce dimensionality without losing much quality. More expensive per token.

Open-source alternatives (e5-large-v2, bge-large-en): If you need to run embeddings on-premise or want to avoid API costs at scale, these are solid choices. I have found that bge-large-en performs surprisingly well for English content.

Cohere embed-v3: Excellent for multilingual content. If your documents are in multiple languages, this is worth testing.

The key insight I want to share: always benchmark on YOUR data. I have seen cases where a smaller model outperformed a larger one because the domain vocabulary aligned better with its training data. Create a small evaluation set with queries and expected relevant documents, then measure retrieval accuracy across models.

from sentence_transformers import SentenceTransformer
import numpy as np

def benchmark_embedding_models(queries, relevant_docs, models):
    results = {}

    for model_name in models:
        model = SentenceTransformer(model_name)

        hits_at_5 = 0
        hits_at_10 = 0

        for query, expected_docs in zip(queries, relevant_docs):
            query_embedding = model.encode(query)
            doc_embeddings = model.encode(
                [doc.page_content for doc in corpus]
            )

            similarities = np.dot(doc_embeddings, query_embedding)
            top_10_indices = np.argsort(similarities)[-10:][::-1]
            top_5_indices = top_10_indices[:5]

            if any(idx in expected_docs for idx in top_5_indices):
                hits_at_5 += 1
            if any(idx in expected_docs for idx in top_10_indices):
                hits_at_10 += 1

        results[model_name] = {
            "hits@5": hits_at_5 / len(queries),
            "hits@10": hits_at_10 / len(queries),
        }

    return results

Vector Database

For the vector store, you have several options. Here is what I have used in production.

Pinecone: Managed service, easy to set up, scales well. I use it when the team does not want to manage infrastructure. The downside is cost at scale and vendor lock-in.

Qdrant: My current favorite for self-hosted deployments. Written in Rust, fast, and has excellent filtering capabilities. The payload filtering is particularly useful when you need to scope searches to specific document categories, tenants, or time ranges.

pgvector (PostgreSQL extension): If you already have PostgreSQL in your stack, this is the simplest option. Performance is acceptable for up to a few million vectors. Beyond that, you will want a dedicated solution.

ChromaDB: Great for prototyping and small-scale applications. I would not use it in production for anything critical, but it is perfect for getting started quickly.

Here is how I typically set up Qdrant:

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct, Filter,
    FieldCondition, MatchValue
)

client = QdrantClient(host="localhost", port=6333)

# Create collection
client.create_collection(
    collection_name="knowledge_base",
    vectors_config=VectorParams(
        size=1536,
        distance=Distance.COSINE
    )
)

# Upsert with metadata
def store_chunks(chunks, embeddings):
    points = [
        PointStruct(
            id=idx,
            vector=embedding,
            payload={
                "text": chunk.page_content,
                "source": chunk.metadata.get("source"),
                "category": chunk.metadata.get("category"),
                "created_at": chunk.metadata.get("created_at"),
            }
        )
        for idx, (chunk, embedding) in enumerate(
            zip(chunks, embeddings)
        )
    ]
    client.upsert(
        collection_name="knowledge_base",
        points=points
    )

# Search with filtering
def search(query_embedding, category=None, top_k=5):
    search_filter = None
    if category:
        search_filter = Filter(
            must=[
                FieldCondition(
                    key="category",
                    match=MatchValue(value=category)
                )
            ]
        )

    results = client.search(
        collection_name="knowledge_base",
        query_vector=query_embedding,
        query_filter=search_filter,
        limit=top_k
    )
    return results

Chunking Strategies: The Most Underrated Problem

If I had to pick one thing that makes or breaks a RAG system, it would be chunking. How you split your documents into chunks determines what can be retrieved, and you cannot retrieve information that was split across chunks in a way that loses its meaning.

Fixed-Size Chunking

The simplest approach: split every N characters (or tokens) with some overlap. This is what most tutorials show, and it works surprisingly well as a baseline.

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=64,
)

I typically start with 512 tokens and 64 tokens of overlap. The overlap helps ensure that information at chunk boundaries is not lost. But fixed-size chunking has a fundamental problem: it does not respect document structure. A chunk might start in the middle of a paragraph and end in the middle of another.

Semantic Chunking

A more sophisticated approach: split documents based on semantic similarity between consecutive sentences. When the topic shifts, you create a new chunk.

from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings

splitter = SemanticChunker(
    OpenAIEmbeddings(),
    breakpoint_threshold_type="percentile",
    breakpoint_threshold_amount=85,
)

Semantic chunking produces more coherent chunks, but it is slower (requires embedding each sentence) and more expensive. I use it when document quality is critical and the corpus is not too large.

Hierarchical Chunking

This is the approach I have landed on for most production systems. You create chunks at multiple granularity levels: sections, paragraphs, and sentences. Each chunk stores a reference to its parent, allowing you to retrieve at the paragraph level but expand to the full section if needed.

class HierarchicalChunker:
    def __init__(self):
        self.section_splitter = RecursiveCharacterTextSplitter(
            chunk_size=2048,
            chunk_overlap=0,
            separators=["\n## ", "\n# "],
        )
        self.paragraph_splitter = RecursiveCharacterTextSplitter(
            chunk_size=512,
            chunk_overlap=64,
            separators=["\n\n", "\n"],
        )

    def chunk_document(self, document):
        sections = self.section_splitter.split_documents([document])
        all_chunks = []

        for section_idx, section in enumerate(sections):
            # Store the section itself
            section.metadata["chunk_type"] = "section"
            section.metadata["section_idx"] = section_idx
            all_chunks.append(section)

            # Split section into paragraphs
            paragraphs = self.paragraph_splitter.split_documents(
                [section]
            )
            for para_idx, para in enumerate(paragraphs):
                para.metadata["chunk_type"] = "paragraph"
                para.metadata["section_idx"] = section_idx
                para.metadata["paragraph_idx"] = para_idx
                all_chunks.append(para)

        return all_chunks

The retrieval logic then searches at the paragraph level for precision, but if the top result is a paragraph chunk, it can optionally expand to include the full section for more context.

What Chunk Size Should You Use?

I get asked this question constantly, and the honest answer is: it depends on your use case, your embedding model, and your LLM’s context window. But here are some guidelines I follow.

For question-answering over technical docs: 256 to 512 tokens. Smaller chunks mean more precise retrieval, which matters when answers are usually contained in a single paragraph.

For summarization or analysis tasks: 1024 to 2048 tokens. Larger chunks preserve more context, which helps the LLM generate coherent summaries.

For conversational agents: 512 tokens is a good sweet spot. It is enough context to answer most questions but small enough to fit several chunks in the prompt.

The Retrieval Problem: Getting the Right Documents

Retrieval is where most RAG systems fail silently. The LLM generates a plausible-sounding answer, but it is based on irrelevant documents because the retrieval step returned the wrong results. Users see a confident answer and have no way of knowing it is wrong.

Pure vector search (semantic similarity) has a well-known weakness: it can miss documents that are relevant because of specific keywords rather than semantic meaning. For example, searching for “error code E-4521” might not match a document about that error code if the embedding model does not capture the specific code well.

The solution is hybrid search, combining vector search with traditional keyword search (BM25). I have found that a 70/30 split (70% semantic, 30% keyword) works well as a default.

from rank_bm25 import BM25Okapi
import numpy as np

class HybridRetriever:
    def __init__(self, vector_store, documents):
        self.vector_store = vector_store
        self.documents = documents

        # Build BM25 index
        tokenized = [
            doc.page_content.lower().split()
            for doc in documents
        ]
        self.bm25 = BM25Okapi(tokenized)

    def search(
        self,
        query: str,
        query_embedding: list,
        top_k: int = 5,
        alpha: float = 0.7,
    ):
        # Vector search
        vector_results = self.vector_store.search(
            query_embedding, top_k=top_k * 2
        )

        # BM25 search
        tokenized_query = query.lower().split()
        bm25_scores = self.bm25.get_scores(tokenized_query)
        bm25_top = np.argsort(bm25_scores)[-top_k * 2:][::-1]

        # Normalize scores
        vector_scores = self._normalize(
            [r.score for r in vector_results]
        )
        bm25_top_scores = self._normalize(
            [bm25_scores[i] for i in bm25_top]
        )

        # Combine with reciprocal rank fusion
        combined = {}
        for idx, (result, score) in enumerate(
            zip(vector_results, vector_scores)
        ):
            doc_id = result.id
            combined[doc_id] = combined.get(doc_id, 0) + (
                alpha * score
            )

        for idx, (doc_idx, score) in enumerate(
            zip(bm25_top, bm25_top_scores)
        ):
            combined[doc_idx] = combined.get(doc_idx, 0) + (
                (1 - alpha) * score
            )

        # Sort by combined score
        ranked = sorted(
            combined.items(), key=lambda x: x[1], reverse=True
        )
        return ranked[:top_k]

    def _normalize(self, scores):
        if not scores:
            return scores
        min_s, max_s = min(scores), max(scores)
        if max_s == min_s:
            return [1.0] * len(scores)
        return [(s - min_s) / (max_s - min_s) for s in scores]

Re-ranking

After initial retrieval, I always add a re-ranking step. The initial retrieval (whether vector, keyword, or hybrid) is optimized for recall. The re-ranker then optimizes for precision by scoring each candidate document against the original query with a more powerful model.

Cohere’s reranker and cross-encoder models from Sentence Transformers are both excellent options here. The performance improvement is significant, especially when your initial retrieval returns many candidates.

from sentence_transformers import CrossEncoder

reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def rerank(query, documents, top_k=5):
    pairs = [(query, doc.page_content) for doc in documents]
    scores = reranker.predict(pairs)

    ranked = sorted(
        zip(documents, scores),
        key=lambda x: x[1],
        reverse=True,
    )
    return [doc for doc, score in ranked[:top_k]]

Query Transformation

Sometimes the user’s query is not well-suited for retrieval as-is. Consider a conversational agent where the user says “What about the pricing?” after asking about a specific product. The query “What about the pricing?” lacks context. You need to transform it into something like “What is the pricing for Product X?”

I use two techniques for query transformation.

Query rewriting: Use the LLM to rewrite the query with full context from the conversation history. This is cheap and effective.

HyDE (Hypothetical Document Embeddings): Generate a hypothetical answer to the query using the LLM, then use that hypothetical answer as the search query. The idea is that a hypothetical answer will be more similar to the actual answer in embedding space than the original question. This sounds counterintuitive, but it works remarkably well in practice.

def hyde_search(query, llm, embedding_model, vector_store):
    # Generate hypothetical answer
    hypothetical = llm.generate(
        f"Write a short paragraph that answers this question: "
        f"{query}"
    )

    # Embed the hypothetical answer
    embedding = embedding_model.encode(hypothetical)

    # Search with the hypothetical embedding
    results = vector_store.search(embedding, top_k=5)
    return results

Real Challenges Nobody Talks About

Stale Data

Documents change. Knowledge bases get updated. Products get new features. If your RAG system ingested a document six months ago and the content has since changed, you are serving stale information. You need a strategy for keeping your index fresh.

I handle this with a simple versioning system. Each document gets a hash of its content. A background job periodically checks for changes and re-ingests updated documents. For critical documents, I use webhooks to trigger re-ingestion immediately on changes.

Multi-Tenant Isolation

If you are building a RAG system for multiple customers, you need to ensure that Customer A never sees Customer B’s documents. This sounds obvious, but it is surprisingly easy to mess up, especially with metadata filtering.

My approach: use separate collections (or namespaces) per tenant. Metadata filtering works, but a bug in your filter logic could leak data across tenants. Separate collections make data leakage structurally impossible.

Evaluation and Monitoring

How do you know if your RAG system is working well? You need metrics, and you need to track them over time.

I track three key metrics:

  1. Retrieval relevance: Are the retrieved documents actually relevant to the query? I use an LLM-as-judge approach, where a separate LLM call evaluates whether each retrieved document is relevant.

  2. Answer faithfulness: Is the generated answer supported by the retrieved documents? Again, LLM-as-judge works well here.

  3. Answer correctness: Is the answer actually correct? This requires a ground truth dataset, which is expensive to create but invaluable.

def evaluate_retrieval(query, retrieved_docs, llm):
    prompt = f"""Given the query: "{query}"

    Rate the relevance of each document on a scale of 0-2:
    0 = Not relevant
    1 = Partially relevant
    2 = Highly relevant

    Documents:
    {chr(10).join(
        f"Doc {i}: {doc.page_content[:200]}"
        for i, doc in enumerate(retrieved_docs)
    )}

    Return a JSON array of scores."""

    response = llm.generate(prompt)
    return json.loads(response)

Cost Management

RAG systems can get expensive quickly. Every query involves at least one embedding call (for the query), a vector search, and an LLM call with potentially large context. Multiply that by thousands of queries per day, and costs add up.

Strategies I use to manage costs:

  • Cache frequent queries and their results (with a TTL based on how often the underlying data changes)
  • Use smaller embedding models for initial retrieval, then re-rank with a more powerful model only for the top candidates
  • Implement token budgets per query, limiting the amount of context injected into the LLM prompt
  • Use streaming responses to reduce perceived latency even when processing time is longer

Putting It All Together

Here is a simplified but complete RAG pipeline that incorporates the patterns I have discussed:

class RAGPipeline:
    def __init__(
        self,
        embedding_model,
        vector_store,
        reranker,
        llm,
    ):
        self.embedding_model = embedding_model
        self.vector_store = vector_store
        self.reranker = reranker
        self.llm = llm
        self.cache = {}

    def answer(
        self,
        query: str,
        conversation_history: list = None,
        filters: dict = None,
    ) -> dict:
        # Check cache
        cache_key = self._cache_key(query, filters)
        if cache_key in self.cache:
            return self.cache[cache_key]

        # Rewrite query with conversation context
        if conversation_history:
            query = self._rewrite_query(
                query, conversation_history
            )

        # Embed query
        query_embedding = self.embedding_model.encode(query)

        # Retrieve candidates (hybrid search)
        candidates = self.vector_store.hybrid_search(
            query_embedding=query_embedding,
            query_text=query,
            filters=filters,
            top_k=20,
        )

        # Re-rank
        reranked = self.reranker.rerank(
            query, candidates, top_k=5
        )

        # Build prompt with retrieved context
        context = "\n\n---\n\n".join(
            [doc.page_content for doc in reranked]
        )

        prompt = f"""Answer the question based on the context.
If the context does not contain enough information, say so.
Do not make up information.

Context:
{context}

Question: {query}

Answer:"""

        # Generate answer
        answer = self.llm.generate(prompt)

        # Build response with sources
        result = {
            "answer": answer,
            "sources": [
                {
                    "text": doc.page_content[:200],
                    "source": doc.metadata.get("source"),
                }
                for doc in reranked
            ],
        }

        # Cache result
        self.cache[cache_key] = result
        return result

    def _rewrite_query(self, query, history):
        history_text = "\n".join(
            [f"{msg['role']}: {msg['content']}" for msg in history]
        )
        rewritten = self.llm.generate(
            f"Rewrite this query to be self-contained, given "
            f"the conversation history.\n\n"
            f"History:\n{history_text}\n\n"
            f"Query: {query}\n\n"
            f"Rewritten query:"
        )
        return rewritten

    def _cache_key(self, query, filters):
        return f"{query}:{json.dumps(filters, sort_keys=True)}"

Lessons Learned

After building several RAG systems, here are the lessons I keep coming back to.

Start simple and iterate. Do not build a complex pipeline on day one. Start with basic chunking, a single embedding model, and simple vector search. Measure performance, identify where it fails, and then add complexity where it is needed.

Chunking is everything. Spend time understanding your documents and how information is structured. The best retrieval model in the world cannot help if your chunks do not contain coherent, complete information.

Always show sources. Users need to verify answers. Always return the source documents alongside the generated answer. This builds trust and makes debugging easier.

Test with real queries. Synthetic evaluation datasets are useful, but nothing beats testing with the actual queries your users will ask. Collect real queries from your users (with appropriate privacy measures) and use them for evaluation.

Monitor in production. RAG performance can degrade over time as your document corpus grows or changes. Set up monitoring and alerting for retrieval quality, answer quality, and latency.

Do not ignore the “R” in RAG. Most people focus on the generation part (the LLM) and treat retrieval as an afterthought. In my experience, 80% of RAG failures are retrieval failures. Invest accordingly.

Conclusion

RAG is one of the most practical patterns in the AI toolbox right now. It lets you build systems that are grounded in your actual data, without the cost and complexity of fine-tuning. But building a RAG system that works reliably in production requires careful attention to every component in the pipeline: ingestion, chunking, embedding, retrieval, re-ranking, and generation.

The good news is that the tooling is maturing rapidly. A year ago, building a production RAG system required stitching together half a dozen libraries and writing a lot of glue code. Today, frameworks like LangChain, LlamaIndex, and Haystack provide solid abstractions that handle much of the complexity. But you still need to understand what is happening under the hood, because when things go wrong (and they will), you need to know where to look.

Start simple, measure everything, and iterate based on real user feedback. That is the formula that has worked for me, and I believe it will work for you too.

Comments

Loading comments...

Leave a comment

Related posts

Stay in the loop

Get articles about software architecture, AI and open source projects delivered to your inbox.