Drop in Some Sample Data

For testing we'll use any plain-text source. Two options:

# Option A — use anything you have
cp ~/some-doc.txt data/
 
# Option B — grab the Wikipedia article on RAG (pure illustration)
curl -s "https://en.wikipedia.org/api/rest_v1/page/plain/Retrieval-augmented_generation" \
  > data/rag-wikipedia.txt

Two or three short files are enough to verify the pipeline. We'll add more once we know it works.

The Plan

The ingest script is ~70 lines and does exactly three things:

  1. Chunk — walk data/, split each file into overlapping ~400-token chunks
  2. Embed — batch-call text-embedding-005 for the chunks
  3. Insert — write (source, chunk_index, content, embedding) rows into Postgres

We're going to keep it dependency-light. No LlamaIndex, no LangChain. The chunker is 15 lines of Python.

A Tiny Chunker

Most chunkers do one job: split on sentence boundaries and pack sentences until the byte/token budget is full, with a small overlap between chunks so context isn't lost at the seams.

# src/chunker.py
"""Split text into overlapping chunks of roughly N characters."""
 
import re
from typing import Iterable
 
# Roughly: 400 tokens ≈ 1600 characters for English prose.
TARGET_CHARS = 1600
OVERLAP_CHARS = 200
 
 
def _split_paragraphs(text: str) -> list[str]:
    return [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()]
 
 
def chunk_text(text: str) -> Iterable[str]:
    paragraphs = _split_paragraphs(text)
 
    buf = ""
    for p in paragraphs:
        if len(buf) + len(p) + 2 <= TARGET_CHARS:
            buf = f"{buf}\n\n{p}".strip()
            continue
 
        if buf:
            yield buf
 
        # Start the new buffer with the tail of the previous one for overlap.
        tail = buf[-OVERLAP_CHARS:] if buf else ""
        buf = f"{tail}\n\n{p}".strip() if tail else p
 
    if buf:
        yield buf

That's it. Paragraph-aware, overlap-aware, no external dependency. It's not the smartest chunker on earth, but for prose it's perfectly fine — and you understand every line.

The Embedding Call

Vertex AI's text-embedding-005 takes a list of strings (up to 250 per call) and returns a list of vectors. We use it through the official google-cloud-aiplatform SDK:

# src/embeddings.py
"""Wrap Vertex AI text-embedding-005."""
 
from functools import lru_cache
from vertexai.language_models import TextEmbeddingInput, TextEmbeddingModel
import vertexai
 
from .config import load_config
 
EMBED_MODEL = "text-embedding-005"
EMBED_DIM = 768
 
 
@lru_cache(maxsize=1)
def _model() -> TextEmbeddingModel:
    cfg = load_config()
    vertexai.init(project=cfg.project, location=cfg.region)
    return TextEmbeddingModel.from_pretrained(EMBED_MODEL)
 
 
def embed(texts: list[str], task: str = "RETRIEVAL_DOCUMENT") -> list[list[float]]:
    """Embed a batch of texts. Use task='RETRIEVAL_QUERY' for question embedding."""
    inputs = [TextEmbeddingInput(text=t, task_type=task) for t in texts]
    response = _model().get_embeddings(inputs)
    return [r.values for r in response]

A couple of things to notice:

  • task_type matters. Vertex AI embedding models produce slightly different vectors depending on whether the text is a document being indexed (RETRIEVAL_DOCUMENT) or a query being asked (RETRIEVAL_QUERY). Always pass RETRIEVAL_DOCUMENT during ingest and RETRIEVAL_QUERY at query time. Using the same task type for both is a common silent quality regression.
  • lru_cache keeps the model object alive across calls so we don't reinitialize Vertex on every batch.
  • EMBED_DIM = 768 matches the vector(768) column we created. If you ever change embedding models, you change this constant and migrate the column.

The Ingest Script

Glue it together:

# src/ingest.py
"""Walk data/, chunk, embed, insert."""
 
from pathlib import Path
 
from .chunker import chunk_text
from .db import get_conn
from .embeddings import embed
 
DATA_DIR = Path("data")
BATCH_SIZE = 50  # how many chunks to embed per Vertex AI call
 
 
def gather_chunks() -> list[tuple[str, int, str]]:
    """Return a flat list of (source, chunk_index, content)."""
    out: list[tuple[str, int, str]] = []
    for path in sorted(DATA_DIR.glob("*.txt")):
        text = path.read_text(encoding="utf-8")
        for i, content in enumerate(chunk_text(text)):
            out.append((path.name, i, content))
    return out
 
 
def insert_chunks(rows: list[tuple[str, int, str, list[float]]]) -> None:
    """Insert (source, chunk_index, content, embedding) rows."""
    with get_conn() as conn:
        cur = conn.cursor()
        cur.executemany(
            """
            INSERT INTO chunks (source, chunk_index, content, embedding)
            VALUES (%s, %s, %s, %s)
            """,
            # pgvector accepts the standard string format '[1.0,2.0,...]'
            [(s, i, c, _vec_to_pg(v)) for s, i, c, v in rows],
        )
        conn.commit()
 
 
def _vec_to_pg(vec: list[float]) -> str:
    return "[" + ",".join(f"{x:.7f}" for x in vec) + "]"
 
 
def main() -> None:
    chunks = gather_chunks()
    print(f"Found {len(chunks)} chunks across {len(set(c[0] for c in chunks))} files.")
 
    rows: list[tuple[str, int, str, list[float]]] = []
    for start in range(0, len(chunks), BATCH_SIZE):
        batch = chunks[start : start + BATCH_SIZE]
        texts = [c[2] for c in batch]
        vectors = embed(texts, task="RETRIEVAL_DOCUMENT")
        rows.extend((s, i, c, v) for (s, i, c), v in zip(batch, vectors))
        print(f"Embedded {min(start + BATCH_SIZE, len(chunks))}/{len(chunks)}")
 
    insert_chunks(rows)
    print(f"Inserted {len(rows)} chunks into Postgres.")
 
 
if __name__ == "__main__":
    main()

Why _vec_to_pg?

pgvector accepts vectors as either a native binary type or the string '[1.0,2.0,3.0,...]'. The string form is fine for our throughput, doesn't need any extra adapter registration with pg8000, and survives every Postgres driver in existence. Don't optimize until you have a reason to.

Run It

uv run python -m src.ingest

Expected output:

Found 17 chunks across 2 files.
Embedded 17/17
Inserted 17 chunks into Postgres.

Verify in psql

Connect back to the database and look:

gcloud sql connect rag-db --user=$USER_EMAIL --database=rag --quiet
-- Count rows
SELECT COUNT(*), COUNT(DISTINCT source) FROM chunks;
 
-- Peek at one
SELECT source, chunk_index, LEFT(content, 80) AS preview
FROM chunks
LIMIT 3;
 
-- Spot-check the embedding dim
SELECT vector_dims(embedding) FROM chunks LIMIT 1;

You should see your chunk count, your source filenames, the start of the chunks, and 768 for the dimension. If vector_dims returns anything other than 768, your embedding model and your column type are out of sync — fix one or the other before going further.

On Re-Ingest

This script appends. Running it twice doubles the rows. For a real pipeline you'd want either:

  • Delete-then-insert per source: DELETE FROM chunks WHERE source = %s before inserting that file's chunks.
  • Upsert via a content hash: add a content_hash TEXT UNIQUE column and INSERT ... ON CONFLICT DO NOTHING.

For the blueprint we're going to keep it simple — if you re-ingest, run TRUNCATE chunks first. The companion repo includes a scripts/reset.py if you don't want to type it.

What You Have Now

  • A working ingest pipeline that turns text files into searchable vectors
  • A chunks table with real content and real embeddings inside Cloud SQL
  • A clear separation: chunker, embedder, inserter — each replaceable on its own

Next: query.


Reference: Vertex AI text embeddings · pgvector data types · Task type for embeddings