Why This Step Exists
The retriever from Step 6 always considers every chunk in the database. That's fine when "all of your docs" is the right scope for every question. The moment it isn't — "only billing-team docs," "only the v2 API reference," "only documents updated since the migration" — pure cosine ranking starts mixing relevant and irrelevant chunks together. You get correct-shaped answers grounded in the wrong material.
Context-aware retrieval fixes this by handing the database two pieces of information per query: the question's embedding (similarity) and the filters that scope the answer (relevance). The DB pre-filters with the WHERE clause, then ranks the survivors by distance, then returns the top K. No extra LLM call, no extra service, no re-ranking pass.
The Three Layers (Recap)
We already do layer 1; this step builds layer 2; the optional sidebar covers layer 3.
| Layer | What it is | Where it lives in this blueprint |
|---|---|---|
| 1. Task-typed embeddings | RETRIEVAL_DOCUMENT at ingest vs. RETRIEVAL_QUERY at query time | Step 5 (embed(..., task=...)) |
| 2. Metadata-filtered retrieval | Structured columns + WHERE clause that the index pushes through | This step |
| 3. Contextual chunking | Prepend a per-doc context line to each chunk before embedding | Sidebar at the bottom of this step |
1. Extend the Schema
Three columns cover ~90% of real filtering needs. Apply this against the existing table — it's additive and idempotent.
-- sql/002_add_metadata.sql
ALTER TABLE chunks
ADD COLUMN IF NOT EXISTS category TEXT,
ADD COLUMN IF NOT EXISTS tags TEXT[] NOT NULL DEFAULT '{}',
ADD COLUMN IF NOT EXISTS published_at TIMESTAMPTZ;
-- B-tree indexes for hot single-column filters
CREATE INDEX IF NOT EXISTS chunks_category_idx ON chunks (category);
CREATE INDEX IF NOT EXISTS chunks_published_at_idx ON chunks (published_at DESC);
-- GIN index for tag containment / overlap queries (tags @> ARRAY['x'])
CREATE INDEX IF NOT EXISTS chunks_tags_gin_idx ON chunks USING gin (tags);Apply it:
gcloud sql connect rag-db --user=$USER_EMAIL --database=rag --quiet < sql/002_add_metadata.sqlWhy these three columns:
category TEXT— a single discriminator ("billing","engineering","v2-api") is what 80% of filters look like in practice. One value per chunk, B-tree index, fast.tags TEXT[]— Postgres native arrays. Multiple labels per chunk ({"v2", "rate-limits"}). The GIN index makestags @> ARRAY['v2']a millisecond operation even at scale.published_at TIMESTAMPTZ— for "recent first" / date-bounded filters. We keepcreated_atseparately (when the row was inserted);published_atis when the source document was published.
Skip anything you don't need. Adding a column is cheap; building a feature on one you didn't need is not.
2. Capture Metadata at Ingest Time
The cleanest pattern: a sidecar JSON file next to each text file. data/billing-faq.txt gets data/billing-faq.meta.json:
{
"category": "billing",
"tags": ["faq", "refunds"],
"published_at": "2026-03-12T00:00:00Z"
}Update the ingest to read sidecars when they exist, and to write the metadata into the new columns:
# src/ingest.py — diff against Step 5
import json
from datetime import datetime
from pathlib import Path
from .chunker import chunk_text
from .db import get_conn
from .embeddings import embed, vec_to_pg
DATA_DIR = Path("data")
BATCH_SIZE = 50
def _load_meta(text_path: Path) -> dict:
"""Look for <name>.meta.json next to <name>.txt. Return {} if missing."""
meta_path = text_path.with_suffix(".meta.json")
if not meta_path.exists():
return {}
raw = json.loads(meta_path.read_text(encoding="utf-8"))
if raw.get("published_at"):
raw["published_at"] = datetime.fromisoformat(
raw["published_at"].replace("Z", "+00:00")
)
raw.setdefault("category", None)
raw.setdefault("tags", [])
raw.setdefault("published_at", None)
return raw
def gather_chunks() -> list[tuple[str, int, str, dict]]:
"""Return (source, chunk_index, content, meta) for every chunk."""
out: list[tuple[str, int, str, dict]] = []
for path in sorted(DATA_DIR.glob("*.txt")):
meta = _load_meta(path)
text = path.read_text(encoding="utf-8")
for i, content in enumerate(chunk_text(text)):
out.append((path.name, i, content, meta))
return out
def insert_chunks(rows):
"""rows: list of (source, chunk_index, content, embedding, meta)."""
with get_conn() as conn:
cur = conn.cursor()
cur.executemany(
"""
INSERT INTO chunks
(source, chunk_index, content, embedding,
category, tags, published_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
[
(s, i, c, vec_to_pg(v),
m.get("category"), m.get("tags", []), m.get("published_at"))
for s, i, c, v, m in rows
],
)
conn.commit()The main() driver changes only to pass meta through:
def main() -> None:
chunks = gather_chunks()
rows = []
for start in range(0, len(chunks), BATCH_SIZE):
batch = chunks[start : start + BATCH_SIZE]
vectors = embed([c[2] for c in batch], task="RETRIEVAL_DOCUMENT")
rows.extend(
(s, i, c, v, m) for (s, i, c, m), v in zip(batch, vectors)
)
insert_chunks(rows)
print(f"Inserted {len(rows)} chunks.")Re-run ingest (truncate first so you don't double-up):
uv run python -m scripts.reset # TRUNCATE chunks RESTART IDENTITY
uv run python -m src.ingest3. Filtered Retrieval
This is where the work pays off. One SQL statement; the database does the rest.
# src/retrieve.py — new function alongside the existing retrieve()
from datetime import datetime
from typing import Optional
from .db import get_conn
from .embeddings import embed, vec_to_pg
def retrieve_filtered(
question: str,
k: int = 5,
*,
category: Optional[str] = None,
tags: Optional[list[str]] = None,
after: Optional[datetime] = None,
max_distance: Optional[float] = None,
) -> list[dict]:
"""Top-K chunks closest to the question, scoped by metadata.
- category: exact match
- tags: ARRAY @> match — chunks must include every tag listed
- after: published_at >= after
- max_distance: drop chunks whose cosine distance exceeds this threshold
"""
[query_vec] = embed([question], task="RETRIEVAL_QUERY")
query_str = vec_to_pg(query_vec)
clauses = []
params: list = []
if category is not None:
clauses.append("category = %s")
params.append(category)
if tags:
clauses.append("tags @> %s")
params.append(tags)
if after is not None:
clauses.append("published_at >= %s")
params.append(after)
if max_distance is not None:
clauses.append("embedding <=> %s <= %s")
params.append(query_str)
params.append(max_distance)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
sql = f"""
SELECT id, source, chunk_index, content,
category, tags, published_at,
embedding <=> %s AS distance
FROM chunks
{where}
ORDER BY embedding <=> %s
LIMIT %s
"""
# Two trailing %s for the SELECT-distance and ORDER BY copies of the vector,
# plus the limit. Any earlier %s placeholders come from the filter params.
final_params = [query_str, *params, query_str, k]
with get_conn() as conn:
cur = conn.cursor()
cur.execute(sql, final_params)
rows = cur.fetchall()
return [
{
"id": r[0],
"source": r[1],
"chunk_index": r[2],
"content": r[3],
"category": r[4],
"tags": list(r[5]) if r[5] is not None else [],
"published_at": r[6].isoformat() if r[6] is not None else None,
"distance": float(r[7]),
}
for r in rows
]A few things worth flagging:
- Pre-filter is the right shape. Postgres evaluates the
WHEREclause first, then ranks the survivors. On modest data (under ~1M chunks) the HNSW index handles this directly; on bigger tables, setSET LOCAL hnsw.iterative_scan = relaxed_orderper session and pgvector will pull more candidates from the index until enough pass the filter. No application-side mask, no two-stage query. tags @> %srequires all the listed tags. Usetags && %sfor "any of these tags" (set overlap).max_distanceis the cosine equivalent ofmin_score. Lower distance = more similar.0.5is a generous cutoff;0.35is strict. Tune against your data — log the distances for a few real questions before picking a number.- The
embedding <=> %sappears in both the SELECT (to return the distance) and the ORDER BY (to drive the index). That isn't a copy-paste; Postgres won't reuse the result, so you pass the vector twice.
4. Try It
A quick check that filters actually do what you expect:
# scripts/test_filtered.py
from datetime import datetime, timezone
from src.retrieve import retrieve_filtered
# All categories, no filters — should match Step 6 behavior
print("\n--- no filter ---")
for r in retrieve_filtered("What is RAG?", k=3):
print(f"[{r['distance']:.3f}] {r['source']} cat={r['category']} tags={r['tags']}")
# Only billing
print("\n--- category=billing ---")
for r in retrieve_filtered("What is RAG?", k=3, category="billing"):
print(f"[{r['distance']:.3f}] {r['source']} cat={r['category']}")
# Only docs published in 2026
print("\n--- after 2026-01-01 ---")
for r in retrieve_filtered(
"What is RAG?", k=3, after=datetime(2026, 1, 1, tzinfo=timezone.utc)
):
print(f"[{r['distance']:.3f}] {r['source']} pub={r['published_at']}")Run it:
uv run python -m scripts.test_filteredYou should see the category and date filters narrowing the result set. Top-K chunks change as you change the scope.
5. Expose Filters Through the API
Make the /ask endpoint accept the same filters:
# src/api.py — extended AskRequest
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
class AskRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=2000)
k: int = Field(5, ge=1, le=20)
category: Optional[str] = None
tags: Optional[list[str]] = None
after: Optional[datetime] = None
max_distance: Optional[float] = Field(None, ge=0.0, le=2.0)And the handler delegates to the filtered retriever when any filter is present, falling back to the simpler one otherwise:
from .retrieve import retrieve, retrieve_filtered
from .generate import generate_answer
@app.post("/ask")
def ask(req: AskRequest) -> dict:
if req.category or req.tags or req.after or req.max_distance is not None:
chunks = retrieve_filtered(
req.question,
k=req.k,
category=req.category,
tags=req.tags,
after=req.after,
max_distance=req.max_distance,
)
else:
chunks = retrieve(req.question, k=req.k)
return {
"question": req.question,
"answer": generate_answer(req.question, chunks),
"sources": [
{"source": c["source"], "chunk_index": c["chunk_index"],
"category": c.get("category"), "tags": c.get("tags", []),
"distance": c["distance"]}
for c in chunks
],
}A scoped call now looks like:
curl -X POST $SERVICE_URL/ask \
-H "Content-Type: application/json" \
-d '{
"question": "What is the refund policy?",
"category": "billing",
"tags": ["refunds"],
"after": "2026-01-01T00:00:00Z",
"max_distance": 0.5
}'The unfiltered call still works exactly like before. Filters are additive and optional, and the answer payload now carries enough metadata back that callers can show "answered from billing/refunds, published 2026-03-12."
Sidebar: Contextual Chunking
Layer 3, optional but striking when it works. The premise: a chunk in isolation often lacks the context that the surrounding document gives it. "The limit is 30 per minute" embedded on its own is ambiguous; embedded as "[Rate Limiting → Free Tier] The limit is 30 per minute" is not.
Anthropic's "contextual retrieval" technique formalizes this — before embedding each chunk, ask a small LLM to write a one-line summary of where the chunk sits in the document, and prepend that summary to the chunk's text. The embedding now encodes both the chunk's content and its document context.
A minimal implementation, slotted into the ingest pipeline:
# src/contextual_chunker.py (optional)
from google import genai
from google.genai.types import GenerateContentConfig
from .config import load_config
_cfg = load_config()
_client = genai.Client(vertexai=True, project=_cfg.project, location=_cfg.region)
CONTEXT_PROMPT = """Write one short sentence describing where the chunk
below sits in the broader document. The sentence will be prepended to the
chunk before embedding, to help retrieval. Be terse and specific.
<document>
{doc}
</document>
<chunk>
{chunk}
</chunk>
One sentence:"""
def contextualize(doc: str, chunk: str) -> str:
response = _client.models.generate_content(
model="gemini-flash-latest",
contents=CONTEXT_PROMPT.format(doc=doc[:8000], chunk=chunk),
config=GenerateContentConfig(temperature=0.0),
)
summary = (response.text or "").strip().splitlines()[0]
return f"[{summary}]\n\n{chunk}" if summary else chunkWire it into ingest (one extra call per chunk):
# In gather_chunks or alongside the embed() call:
content = contextualize(text, content)Trade-offs to flag:
- Cost: one Gemini Flash call per chunk at ingest. Cheap (fractions of a cent each) but not free. Worth caching by content hash.
- Quality: Anthropic reports 35% improvement in retrieval failure rate on their RAG eval set with contextual chunks alone, ~50% combined with hybrid search. Your mileage varies.
- Re-embedding: changing the chunk text means re-embedding. Treat contextual chunking as an ingest-time decision; don't toggle it on existing data without TRUNCATE + re-ingest.
If you turn this on, leave a note in the row so you can A/B test. Add a chunking_strategy TEXT column and set it to "contextual" vs "plain", then compare retrieval quality with the same eval set across both strategies. Don't enable it everywhere without checking — for short, self-contained docs it rarely helps.
What You Have Now
- A
chunkstable with three new searchable metadata columns and the right indexes - An ingest pipeline that picks up sidecar
.meta.jsonfiles automatically - A
retrieve_filtered()function that scopes the candidate set before scoring - A
/askendpoint that takes optional filters in the request body - An optional contextual-chunking pattern you can layer on top when you need higher recall
The retriever still falls back to the simple top-K when no filters are passed, so nothing breaks on day one.
Next: ship it to Cloud Run.
Reference: pgvector iterative scan · Postgres array operators · Anthropic — Contextual Retrieval · GIN indexes for tags