Lang
API

Knowledge base ingestion

Retrieval quality is capped by ingest quality. Production RAG separates offline ingestion (load → extract → chunk → embed → upsert) from online query paths. This guide covers source connectors, incremental sync, orchestration, multi-format parsing, ACL metadata, observability, and golden-document testing—the pipeline engineering that keeps indexes fresh without blocking user queries.

After reading, you should be able to: diagram an end-to-end ingest architecture from S3, Google Drive, or Notion through vector upsert; choose among LangChain, Unstructured, and LlamaHub loaders; implement hash-based incremental updates; schedule pipelines in Airflow, Prefect, or Temporal; handle PDF, DOCX, HTML, Markdown, CSV, and code repos; enforce permissions via chunk metadata filters; monitor ingest latency and failure rates; validate chunking with golden documents; and complete the Track 2 production checklist.

developer platform architect Track 2 Unstructured Temporal S3 Spring Batch

Pipeline architecture: source to vector index

Standard production flow: S3 / GDrive / Notion / SharePoint → loader → extract text → chunk → embed → upsert vector DB → attach metadata. Queries never touch source systems—they read the active index version only.

Treat ingestion as an ETL pipeline with idempotent stages. Each document gets a stable doc_id; each chunk a deterministic chunk_id = hash(doc_id, index, chunk_policy_version). Upserts replace rows in place; deletes remove stale chunk IDs when source files disappear or hashes change.

Stage breakdown

StageInputOutputFailure handling
DiscoverBucket prefix, Drive folder, Notion DBFile manifest with etag/mtimeRetry listing; alert on auth expiry
LoadRaw bytesNormalized text + structure hintsQuarantine corrupt PDFs
ExtractPDF/DOCX/HTMLClean markdown-like text, tablesOCR fallback queue
ChunkFull document textChunk records with offsetsPolicy version in chunk_id
EmbedChunk texts (batched)Vectors + model idRate limit backoff
UpsertVectors + metadataIndex rowsDual-write to staging collection
MetadataSource ACL, tenant, tagsFilterable index fieldsMust match auth at query time

Active collection pointer

Build kb_v{{N+1}} in staging; run eval queries; flip config pointer ACTIVE_COLLECTION=kb_v{{N+1}}; keep previous version for rollback 24–72 hours. User queries never wait on full corpus re-embed.

Source connectors in detail

Amazon S3

List objects under a prefix; use ETag and LastModified for change detection. EventBridge S3 notifications enqueue ingest jobs on ObjectCreated. Store s3://bucket/key as source_uri in chunk metadata for citation links.

Google Drive

Drive API v3 with domain-wide delegation or OAuth per workspace. Track modifiedTime and md5Checksum for Google Docs exported to PDF/DOCX before extract. Shared drives need supportsAllDrives=true on list calls—missing this silently drops half the corpus.

Notion

Notion API returns block trees—not flat files. Walk blocks recursively; flatten to markdown-like text preserving headings. Webhooks (or 5-minute poll on last_edited_time) trigger incremental re-ingest per page_id. Map Notion workspace + page permissions to your allowed_group_ids at extract time.

SourceDiscoverChange signalACL source
S3ListObjectsV2ETag, EventBridgeIAM prefix + bucket policy
Google Drivefiles.listmodifiedTime, md5Drive permissions API
Notionsearch / database querylast_edited_timePage sharing + workspace
SharePointGraph API deltadelta tokenMicrosoft Graph permissions
Manifest-driven ingest worker
import hashlib, json
from pathlib import Path

def chunk_id(doc_id: str, index: int, policy: str) -> str:
    return hashlib.sha256(f"{doc_id}:{index}:{policy}".encode()).hexdigest()[:24]

def ingest_manifest(manifest_path: str, vector_store, embed_fn, chunk_fn):
    manifest = json.loads(Path(manifest_path).read_text())
    policy = manifest["chunk_policy"]
    collection = manifest["collection"]
    for doc in manifest["documents"]:
        text = Path(doc["path"]).read_text(encoding="utf-8")
        chunks = chunk_fn(text, doc["doc_id"])
        vectors = embed_fn([c["text"] for c in chunks])
        for i, (c, v) in enumerate(zip(chunks, vectors)):
            vector_store.upsert(collection, id=chunk_id(doc["doc_id"], i, policy), vector=v,
                metadata={"doc_id": doc["doc_id"], "tenant_id": doc.get("tenant_id"), "text": c["text"]})
public void ingestManifest(Path manifestPath, VectorStore store, Embedder embedder, Chunker chunker) {
  var manifest = parseManifest(manifestPath);
  for (var doc : manifest.documents()) {
    var text = Files.readString(Path.of(doc.path()));
    var chunks = chunker.chunk(text, doc.docId());
    var vectors = embedder.embed(chunks.stream().map(Chunk::text).toList());
    for (int i = 0; i < chunks.size(); i++) {
      var id = chunkId(doc.docId(), i, manifest.chunkPolicy());
      store.upsert(manifest.collection(), id, vectors.get(i),
          Map.of("doc_id", doc.docId(), "tenant_id", doc.tenantId(), "text", chunks.get(i).text()));
    }
  }
}
📦 Real World

Notion → webhook → SQS → worker is a common pattern: edit events enqueue doc_id; worker fetches page via API, re-chunks, upserts—minutes lag, not batch nightly only.

⚠️ Pitfall

Embedding during user-facing requests. Ingest must be async—blocking API on 200-page PDF parse is how demos become outages.

💡 Pro Tip

Store raw extracted text in object storage (S3) keyed by doc_id + content_hash. Re-chunk without re-fetching source when policy changes.

Document loaders: LangChain, Unstructured, LlamaHub

Loaders normalize heterogeneous sources into Document objects (text + metadata). LangChain offers broad integrations; Unstructured excels at layout-aware PDF/DOCX; LlamaHub (LlamaIndex) provides community connectors and LlamaParse for complex PDFs.

LibraryStrengthsWatch outs
LangChain loadersS3, GDrive, Notion, Confluence, GitHubAbstraction leaks; pin versions
UnstructuredPDF tables, OCR pipeline, partition strategiesHeavier deps; self-host vs API
LlamaHub100+ readers, LlamaParse cloud PDFVendor tie-in for LlamaParse

Selection guide

  • Mostly Markdown/HTML in S3 — simple custom loader or LangChain S3DirectoryLoader.
  • Scanned PDFs and forms — Unstructured hi_res strategy or LlamaParse.
  • Notion + Slack + Jira — LlamaHub connectors or native APIs with shared metadata schema.

Connector auth patterns

Run loaders from a worker role—not user laptops. S3 uses IAM role; Drive uses service account JSON with domain delegation; Notion uses integration token scoped to pages. Rotate secrets via vault; alert 7 days before expiry. Failed auth should increment ingest_failures_total{{reason="auth"}} and page on-call—silent empty indexes follow.

Metadata normalization

Every loader should emit a common schema before chunking:

  • doc_id — stable across renames (Notion page_id, S3 key hash).
  • source_uri — deep link for citations.
  • title, updated_at, mime_type.
  • acl — structured object copied to every chunk.
LangChain + Unstructured loader chain
from langchain_community.document_loaders import S3DirectoryLoader
from unstructured.partition.auto import partition

def load_s3_prefix(bucket: str, prefix: str):
    loader = S3DirectoryLoader(bucket, prefix=prefix)
    return loader.load()

def partition_pdf(path: str):
    elements = partition(filename=path, strategy="hi_res")
    return "\n\n".join(el.text for el in elements if el.text)
// Spring AI + AWS SDK — list S3 prefix, load text objects
public List loadS3Prefix(S3Client s3, String bucket, String prefix) {
  var docs = new ArrayList();
  s3.listObjectsV2Paginator(b -> b.bucket(bucket).prefix(prefix)).contents().forEach(obj -> {
    var text = s3.getObjectAsBytes(b -> b.bucket(bucket).key(obj.key())).asUtf8String();
    docs.add(new Document(text, Map.of("source", obj.key(), "bucket", bucket)));
  });
  return docs;
}
⚖️ Trade-off

Unified loader frameworks speed MVP; at scale, teams often fork loaders to control retries, ACL mapping, and cost per format.

🔬 Under the Hood

Unstructured hi_res runs layout detection models—better table recovery, slower and GPU-hungry compared to fast text extraction.

Incremental sync: change detection and stale deletion

Full re-index nightly does not scale. Use content hashes, etags, or source revision IDs to re-embed only changed documents, delete orphaned chunks, and keep vector count proportional to live corpus.

Change detection strategies

SignalSourceNotes
Content SHA-256After extractBest for file stores; ignores mtime-only touches
S3 ETagObject metadataCheap list; multipart ETags differ
Notion last_edited_timeAPIPer-page poll or webhook
Git commit SHACode reposRe-index changed files only in CI

Stale chunk cleanup

After re-chunking doc D with new policy or content:

  1. Compute new chunk_id set S_new.
  2. Load previous IDs S_old from metadata store.
  3. Delete S_old - S_new from vector index.
  4. Upsert all chunks in S_new.
Hash-based incremental ingest
def doc_content_hash(text: str) -> str:
    return hashlib.sha256(text.encode()).hexdigest()

def sync_document(doc_id: str, text: str, state_db, vector_store, policy: str):
    h = doc_content_hash(text)
    prev = state_db.get(doc_id)
    if prev and prev["hash"] == h and prev["policy"] == policy:
        return {"status": "skipped"}
    old_ids = prev["chunk_ids"] if prev else []
    chunks = chunk_document(text, doc_id, policy)
    new_ids = [chunk_id(doc_id, i, policy) for i in range(len(chunks))]
    vector_store.delete(old_ids)
    upsert_chunks(vector_store, chunks, new_ids)
    state_db.put(doc_id, {"hash": h, "policy": policy, "chunk_ids": new_ids})
    return {"status": "updated", "chunks": len(chunks)}
public SyncResult syncDocument(String docId, String text, DocStateRepository state, VectorStore vectors, String policy) {
  var hash = sha256(text);
  var prev = state.find(docId);
  if (prev != null && prev.hash().equals(hash) && prev.policy().equals(policy))
    return SyncResult.skipped();
  if (prev != null) vectors.delete(prev.chunkIds());
  var chunks = chunker.chunk(text, docId, policy);
  var newIds = IntStream.range(0, chunks.size()).mapToObj(i -> chunkId(docId, i, policy)).toList();
  vectors.upsert(chunks, newIds);
  state.save(docId, hash, policy, newIds);
  return SyncResult.updated(chunks.size());
}
💡 Pro Tip

Persist ingest state in Postgres, not only Redis—survive restarts and audit what version indexed each doc.

⚠️ Pitfall

Changing embedding model without re-embed all docs breaks retrieval—treat embed_model change like schema migration with new collection.

Orchestration: Airflow, Prefect, Temporal

Ingestion jobs need retries, SLAs, and visibility. Airflow fits batch DAGs; Prefect simplifies Python-native flows; Temporal excels at durable long-running workflows with per-document activities and human approval steps.

EngineBest forRAG ingest pattern
AirflowNightly batch, ops familiarityDAG: list → map embed → validate → flip pointer
PrefectPython teams, dynamic tasks@flow with mapped document tasks
TemporalDurable retries, per-tenant fairnessWorkflow per corpus; activities for load/chunk/embed

Scheduling considerations

  • Rate-limit embedding API—shard work across hours if corpus is huge.
  • Separate full rebuild (weekly) from incremental (15 min cron).
  • Gate collection flip on eval job success (golden docs pass).

Airflow DAG sketch

Nightly full reconcile plus hourly incremental is a common split:

  1. list_sources — pull manifest from S3 or build from API.
  2. map_ingest — dynamic task per doc or batch of 50.
  3. validate_golden — pytest task; fail DAG if regression.
  4. flip_pointer — only when staging collection chunk count within 5% of expected.

Temporal vs cron

Cron re-runs failed jobs from scratch; Temporal resumes per-document workflows after worker crash. For 100K-document corpora, Temporal’s activity heartbeats and retry policies avoid duplicate embed charges from blind full retries.

Airflow task group for ingest
from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="0 * * * *", start_date=datetime(2026, 1, 1), catchup=False)
def kb_ingest():
    @task
    def list_docs(): return load_manifest("s3://kb/manifest.json")

    @task
    def ingest_doc(doc: dict):
        return sync_document(doc["doc_id"], fetch_text(doc), state_db, vector_store, "v3")

    @task
    def flip_if_ok(results: list):
        if all(r.get("status") != "error" for r in results):
            flip_active_collection("kb_v43")

    docs = list_docs()
    results = ingest_doc.expand(doc=docs)
    flip_if_ok(results)

kb_ingest()
@Scheduled(cron = "0 0 * * * *")
public void hourlyIngest() {
  var docs = manifestLoader.load("s3://kb/manifest.json");
  var results = docs.parallelStream().map(this::syncDocument).toList();
  if (results.stream().noneMatch(r -> r.status() == Status.ERROR)) {
    collectionPointer.flip("kb_v43");
  }
}
💰 Cost

Batch embed during off-peak; some providers offer batch embedding APIs at 50% discount—queue chunks overnight if latency SLO allows.

Prefect ingest flow sketch
from prefect import flow, task

@task(retries=3, retry_delay_seconds=30)
def ingest_one(doc: dict) -> dict:
    text = load_and_extract(doc)
    return sync_document(doc["doc_id"], text, state_db, vector_store, policy="v2")

@flow
def nightly_kb_sync(manifest: list[dict]):
    results = ingest_one.map(manifest)
    failed = [r for r in results if r.get("status") == "error"]
    if failed:
        raise RuntimeError(f"{len(failed)} docs failed")
    flip_active_collection("kb_v42")
// Temporal workflow interface (conceptual)
@WorkflowInterface
public interface IngestWorkflow {
  @WorkflowMethod
  void runIngest(String manifestUri, String targetCollection);
}

@ActivityInterface
public interface IngestActivities {
  List listDocuments(String manifestUri);
  IngestResult ingestDocument(DocManifest doc, String policy);
  void flipCollection(String name);
}
📦 Real World

Multi-tenant SaaS uses Temporal per-tenant task queues so one customer’s 50K-doc upload doesn’t starve others’ incremental syncs.

🎯 Interview Tip

“How do you re-index without downtime?” — Blue/green collections, background ingest, eval gate, atomic pointer flip, rollback path.

Multi-format parsing: PDF through code repos

Each format needs tuned extraction before shared chunking. PDFs need layout awareness; code needs AST or function-level splits; CSV rows may become one chunk per row or schema-aware summaries.

FormatExtractorChunking hint
PDFUnstructured hi_res, LlamaParsePage + heading boundaries; preserve tables as markdown
DOCXpython-docx, UnstructuredHeading styles → section chunks
HTMLTrafilatura, BeautifulSoupStrip nav; main content selector
MarkdownHeader-based splitKeep code fences intact
CSVpandas row readerRow or group chunks + column schema in metadata
Codetree-sitter, LangChain LanguageParserFunction/class chunks with path metadata

Store mime_type, page_number, and section_title in metadata—retrieval UI uses them for citations.

Format-aware chunk router
def route_and_chunk(path: str, raw_bytes: bytes) -> list[dict]:
    ext = path.rsplit(".", 1)[-1].lower()
    if ext == "pdf":
        text = partition_pdf_bytes(raw_bytes)
        return chunk_by_page(text, path)
    if ext in ("md", "markdown"):
        return chunk_markdown(raw_bytes.decode(), path)
    if ext in ("py", "java", "ts"):
        return chunk_code(raw_bytes.decode(), path, language=ext)
    return chunk_fixed(raw_bytes.decode(), path)
public List routeAndChunk(String path, byte[] raw) {
  var ext = path.substring(path.lastIndexOf('.') + 1).toLowerCase();
  return switch (ext) {
    case "pdf" -> chunkByPage(pdfExtractor.extract(raw), path);
    case "md", "markdown" -> chunkMarkdown(new String(raw), path);
    case "py", "java", "ts" -> chunkCode(new String(raw), path, ext);
    default -> chunkFixed(new String(raw), path);
  };
}
⚠️ Pitfall

PDF “text extraction” that drops tables destroys finance and spec docs—always sample 20 random PDFs manually before launch.

🔬 Under the Hood

Code chunking with tree-sitter respects syntax boundaries—better than 512-token splits mid-function that lose signature context.

Access control: metadata filters at retrieval

Ingest must copy source permissions into chunk metadata—tenant_id, group_ids, classification. Query path applies mandatory filters so users never retrieve unauthorized chunks, even if embedding similarity is high.

Metadata fields to ingest

  • tenant_id — hard filter for multi-tenant products.
  • allowed_group_ids — from IdP groups or SharePoint ACLs.
  • classification — public, internal, confidential.
  • doc_owner — for audit and optional owner-only docs.

Never rely on prompt instructions (“only use allowed docs”)—vectors must be filtered in the database query. Post-filter in application code is acceptable only as defense in depth after index filter.

Retrieval with ACL filter
def retrieve_for_user(query: str, user: User, k: int = 10):
    embedding = embed(query)
    filter = {
        "tenant_id": user.tenant_id,
        "allowed_group_ids": {"$in": user.group_ids + ["public"]},
    }
    return vector_store.search(embedding, k=k, filter=filter)
public List retrieveForUser(String query, User user, int k) {
  var embedding = embedder.embed(query);
  var filter = Map.of(
      "tenant_id", user.tenantId(),
      "allowed_group_ids", Map.of("$in", concat(user.groupIds(), List.of("public")))
  );
  return vectorStore.search(embedding, k, filter);
}
🔒 Security

ACL sync lag is a data leak vector. Re-ingest on permission change events; delete chunks within minutes when access revoked—not next nightly batch.

💰 Cost

Filtered ANN indexes need composite indexes (tenant + vector)—plan index design early; brute post-filter on 10K hits is slow and unsafe.

Observability: latency, failures, and index health

Instrument every ingest stage. Operators should see documents processed/hour, p95 stage latency, failure rate by format, and total chunk count per collection—correlated with retrieval empty-hit rate downstream.

MetricTypeAlert threshold (example)
ingest_docs_totalCounterN/A (dashboard)
ingest_failures_totalCounter by reason>2% of batch
ingest_stage_latency_secondsHistogramp95 embed > 30s/doc
index_chunk_countGaugeDrop >10% day-over-day
ingest_lag_secondsGaugeSource mtime vs indexed > 1h

Log structured fields: doc_id, stage, duration_ms, chunk_count, embed_model, collection.

Prometheus metrics for ingest
from prometheus_client import Counter, Histogram, Gauge

INGEST_DOCS = Counter("ingest_docs_total", "Documents processed", ["status", "format"])
INGEST_LATENCY = Histogram("ingest_stage_latency_seconds", "Stage latency", ["stage"])
CHUNK_COUNT = Gauge("index_chunk_count", "Chunks in collection", ["collection"])

def timed_stage(stage: str):
    return INGEST_LATENCY.labels(stage=stage).time()

with timed_stage("embed"):
    vectors = embed_batch(chunks)
INGEST_DOCS.labels(status="ok", format="pdf").inc()
CHUNK_COUNT.labels(collection="kb_v42").set(vector_store.count())
meterRegistry.counter("ingest.docs", Tags.of("status", "ok", "format", "pdf")).increment();
meterRegistry.timer("ingest.stage", Tags.of("stage", "embed")).record(() -> embedBatch(chunks));
meterRegistry.gauge("index.chunk.count", Tags.of("collection", "kb_v42"), vectorStore::count);
🔬 Under the Hood

Correlate ingest lag with OpenTelemetry trace IDs propagated from webhook → queue → worker—speeds RCA when one tenant’s PDFs hang OCR.

⚠️ Pitfall

Alerting only on failure count misses slow degradation—watch p95 embed latency and chunk count drift, not just errors.

📦 Real World

Platform teams page when ingest lag exceeds SLA and retrieval empty-hit rate spikes—often the same root cause (connector auth expired).

Golden documents: expected chunk validation

Maintain a golden doc set with expected chunk counts, boundary strings, and metadata. Run after every chunk-policy or embed-model change—CI gate before flipping active collection.

Golden test structure

{
  "doc_id": "refund-policy-v3",
  "source": "fixtures/refund-policy.pdf",
  "chunk_policy": "heading_v2",
  "expect": {
    "min_chunks": 8,
    "max_chunks": 14,
    "must_contain": ["30-day window", "pro-rated refund"],
    "metadata": {"classification": "public"}
  }
}

Tests load fixture → extract → chunk → assert bounds and substring presence—no LLM needed for deterministic checks. Add optional embedding smoke test (nearest neighbor retrieves self-chunk).

Retrieval smoke on golden set

After index flip, run fixed queries against staging:

QueryExpected doc_id in top-3Min score
"30-day refund window"refund-policy-v30.75 cosine
"reset MFA device"security-mfa0.70

CI pipeline integration

  1. PR changes chunker → run golden chunk tests.
  2. Merge to main → deploy worker → ingest staging collection.
  3. Golden retrieval + RAGAS sample → auto-flip or hold for human.
Golden doc pytest
def test_refund_policy_chunks(golden_manifest, chunk_pipeline):
    spec = golden_manifest["refund-policy-v3"]
    chunks = chunk_pipeline(spec["source"], spec["chunk_policy"])
    assert spec["expect"]["min_chunks"] <= len(chunks) <= spec["expect"]["max_chunks"]
    blob = "\n".join(c["text"] for c in chunks)
    for phrase in spec["expect"]["must_contain"]:
        assert phrase in blob
@Test
void refundPolicyChunksMatchGolden() {
  var spec = goldenManifest.get("refund-policy-v3");
  var chunks = chunkPipeline.run(spec.source(), spec.chunkPolicy());
  assertThat(chunks.size()).isBetween(spec.expect().minChunks(), spec.expect().maxChunks());
  var blob = chunks.stream().map(Chunk::text).collect(Collectors.joining("\n"));
  spec.expect().mustContain().forEach phrase -> assertThat(blob).contains(phrase));
}
💡 Pro Tip

Compare chunk count per doc before/after chunk policy change—silent 0-chunk extracts are common when HTML selectors break.

🎯 Interview Tip

“How do you test RAG?” — Golden docs for ingest, retrieval recall@k fixtures for search, LLM-judge or RAGAS offline for end-to-end—run in CI on prompt/index changes.

⚖️ Trade-off

Golden tests catch regressions but don’t cover all formats—invest in stratified fixtures (PDF table, scanned, code, CSV).

Production: Track 2 checklist and next steps

Track 2 closes when ingestion and retrieval run as versioned, observable pipelines with ACL-safe metadata and regression tests—not when a notebook indexes three files.

Track 2 production checklist

  • ☐ Offline ingest separated from online query API
  • ☐ Stable doc_id and chunk_id with policy version in hash
  • ☐ Incremental sync with hash/etag; stale chunk deletion
  • ☐ Blue/green or pointer-flip index versioning
  • ☐ Source ACLs copied to chunk metadata; filtered retrieval
  • ☐ Hybrid + rerank (or documented exception) on query path
  • ☐ Advanced pattern chosen from matrix with eval evidence
  • ☐ Ingest + retrieval metrics dashboards and alerts
  • ☐ Golden document CI on chunk/embed changes
  • ☐ Runbook: connector auth failure, embed rate limit, rollback collection

You now have the RAG stack: retrieval strategies, advanced patterns (GraphRAG, Self-RAG, CRAG, RAPTOR), and ingestion engineering. Track 3 shifts to prompt and context engineering—how you assemble retrieved chunks into reliable instructions without injection and context bloat.

📦 Production checklist

Track 2 complete when stale docs index within your published SLA, unauthorized retrieval is impossible by construction, and you can roll back an bad embed deploy in under 15 minutes.

📦 Real World

Teams that skip ingest observability debug retrieval in production for weeks—when the root cause was broken incremental sync after a credential rotation.

🔒 Security

Before Track 3, audit that no ingest logs write full document bodies for confidential classification—log ids and hashes only.