Knowledge base ingestion
Retrieval quality is capped by ingest quality. Production RAG separates offline ingestion (load → extract → chunk → embed → upsert) from online query paths. This guide covers source connectors, incremental sync, orchestration, multi-format parsing, ACL metadata, observability, and golden-document testing—the pipeline engineering that keeps indexes fresh without blocking user queries.
After reading, you should be able to: diagram an end-to-end ingest architecture from S3, Google Drive, or Notion through vector upsert; choose among LangChain, Unstructured, and LlamaHub loaders; implement hash-based incremental updates; schedule pipelines in Airflow, Prefect, or Temporal; handle PDF, DOCX, HTML, Markdown, CSV, and code repos; enforce permissions via chunk metadata filters; monitor ingest latency and failure rates; validate chunking with golden documents; and complete the Track 2 production checklist.
Pipeline architecture: source to vector index
Standard production flow: S3 / GDrive / Notion / SharePoint → loader → extract text → chunk → embed → upsert vector DB → attach metadata. Queries never touch source systems—they read the active index version only.
Treat ingestion as an ETL pipeline with idempotent stages. Each document gets a stable doc_id; each chunk a deterministic chunk_id = hash(doc_id, index, chunk_policy_version). Upserts replace rows in place; deletes remove stale chunk IDs when source files disappear or hashes change.
Stage breakdown
| Stage | Input | Output | Failure handling |
|---|---|---|---|
| Discover | Bucket prefix, Drive folder, Notion DB | File manifest with etag/mtime | Retry listing; alert on auth expiry |
| Load | Raw bytes | Normalized text + structure hints | Quarantine corrupt PDFs |
| Extract | PDF/DOCX/HTML | Clean markdown-like text, tables | OCR fallback queue |
| Chunk | Full document text | Chunk records with offsets | Policy version in chunk_id |
| Embed | Chunk texts (batched) | Vectors + model id | Rate limit backoff |
| Upsert | Vectors + metadata | Index rows | Dual-write to staging collection |
| Metadata | Source ACL, tenant, tags | Filterable index fields | Must match auth at query time |
Active collection pointer
Build kb_v{{N+1}} in staging; run eval queries; flip config pointer ACTIVE_COLLECTION=kb_v{{N+1}}; keep previous version for rollback 24–72 hours. User queries never wait on full corpus re-embed.
Source connectors in detail
Amazon S3
List objects under a prefix; use ETag and LastModified for change detection. EventBridge S3 notifications enqueue ingest jobs on ObjectCreated. Store s3://bucket/key as source_uri in chunk metadata for citation links.
Google Drive
Drive API v3 with domain-wide delegation or OAuth per workspace. Track modifiedTime and md5Checksum for Google Docs exported to PDF/DOCX before extract. Shared drives need supportsAllDrives=true on list calls—missing this silently drops half the corpus.
Notion
Notion API returns block trees—not flat files. Walk blocks recursively; flatten to markdown-like text preserving headings. Webhooks (or 5-minute poll on last_edited_time) trigger incremental re-ingest per page_id. Map Notion workspace + page permissions to your allowed_group_ids at extract time.
| Source | Discover | Change signal | ACL source |
|---|---|---|---|
| S3 | ListObjectsV2 | ETag, EventBridge | IAM prefix + bucket policy |
| Google Drive | files.list | modifiedTime, md5 | Drive permissions API |
| Notion | search / database query | last_edited_time | Page sharing + workspace |
| SharePoint | Graph API delta | delta token | Microsoft Graph permissions |
import hashlib, json
from pathlib import Path
def chunk_id(doc_id: str, index: int, policy: str) -> str:
return hashlib.sha256(f"{doc_id}:{index}:{policy}".encode()).hexdigest()[:24]
def ingest_manifest(manifest_path: str, vector_store, embed_fn, chunk_fn):
manifest = json.loads(Path(manifest_path).read_text())
policy = manifest["chunk_policy"]
collection = manifest["collection"]
for doc in manifest["documents"]:
text = Path(doc["path"]).read_text(encoding="utf-8")
chunks = chunk_fn(text, doc["doc_id"])
vectors = embed_fn([c["text"] for c in chunks])
for i, (c, v) in enumerate(zip(chunks, vectors)):
vector_store.upsert(collection, id=chunk_id(doc["doc_id"], i, policy), vector=v,
metadata={"doc_id": doc["doc_id"], "tenant_id": doc.get("tenant_id"), "text": c["text"]})
public void ingestManifest(Path manifestPath, VectorStore store, Embedder embedder, Chunker chunker) {
var manifest = parseManifest(manifestPath);
for (var doc : manifest.documents()) {
var text = Files.readString(Path.of(doc.path()));
var chunks = chunker.chunk(text, doc.docId());
var vectors = embedder.embed(chunks.stream().map(Chunk::text).toList());
for (int i = 0; i < chunks.size(); i++) {
var id = chunkId(doc.docId(), i, manifest.chunkPolicy());
store.upsert(manifest.collection(), id, vectors.get(i),
Map.of("doc_id", doc.docId(), "tenant_id", doc.tenantId(), "text", chunks.get(i).text()));
}
}
}
Notion → webhook → SQS → worker is a common pattern: edit events enqueue doc_id; worker fetches page via API, re-chunks, upserts—minutes lag, not batch nightly only.
Embedding during user-facing requests. Ingest must be async—blocking API on 200-page PDF parse is how demos become outages.
Store raw extracted text in object storage (S3) keyed by doc_id + content_hash. Re-chunk without re-fetching source when policy changes.
Document loaders: LangChain, Unstructured, LlamaHub
Loaders normalize heterogeneous sources into Document objects (text + metadata). LangChain offers broad integrations; Unstructured excels at layout-aware PDF/DOCX; LlamaHub (LlamaIndex) provides community connectors and LlamaParse for complex PDFs.
| Library | Strengths | Watch outs |
|---|---|---|
| LangChain loaders | S3, GDrive, Notion, Confluence, GitHub | Abstraction leaks; pin versions |
| Unstructured | PDF tables, OCR pipeline, partition strategies | Heavier deps; self-host vs API |
| LlamaHub | 100+ readers, LlamaParse cloud PDF | Vendor tie-in for LlamaParse |
Selection guide
- Mostly Markdown/HTML in S3 — simple custom loader or LangChain S3DirectoryLoader.
- Scanned PDFs and forms — Unstructured hi_res strategy or LlamaParse.
- Notion + Slack + Jira — LlamaHub connectors or native APIs with shared metadata schema.
Connector auth patterns
Run loaders from a worker role—not user laptops. S3 uses IAM role; Drive uses service account JSON with domain delegation; Notion uses integration token scoped to pages. Rotate secrets via vault; alert 7 days before expiry. Failed auth should increment ingest_failures_total{{reason="auth"}} and page on-call—silent empty indexes follow.
Metadata normalization
Every loader should emit a common schema before chunking:
- doc_id — stable across renames (Notion page_id, S3 key hash).
- source_uri — deep link for citations.
- title, updated_at, mime_type.
- acl — structured object copied to every chunk.
from langchain_community.document_loaders import S3DirectoryLoader
from unstructured.partition.auto import partition
def load_s3_prefix(bucket: str, prefix: str):
loader = S3DirectoryLoader(bucket, prefix=prefix)
return loader.load()
def partition_pdf(path: str):
elements = partition(filename=path, strategy="hi_res")
return "\n\n".join(el.text for el in elements if el.text)
// Spring AI + AWS SDK — list S3 prefix, load text objects
public List loadS3Prefix(S3Client s3, String bucket, String prefix) {
var docs = new ArrayList();
s3.listObjectsV2Paginator(b -> b.bucket(bucket).prefix(prefix)).contents().forEach(obj -> {
var text = s3.getObjectAsBytes(b -> b.bucket(bucket).key(obj.key())).asUtf8String();
docs.add(new Document(text, Map.of("source", obj.key(), "bucket", bucket)));
});
return docs;
}
Unified loader frameworks speed MVP; at scale, teams often fork loaders to control retries, ACL mapping, and cost per format.
Unstructured hi_res runs layout detection models—better table recovery, slower and GPU-hungry compared to fast text extraction.
Incremental sync: change detection and stale deletion
Full re-index nightly does not scale. Use content hashes, etags, or source revision IDs to re-embed only changed documents, delete orphaned chunks, and keep vector count proportional to live corpus.
Change detection strategies
| Signal | Source | Notes |
|---|---|---|
| Content SHA-256 | After extract | Best for file stores; ignores mtime-only touches |
| S3 ETag | Object metadata | Cheap list; multipart ETags differ |
| Notion last_edited_time | API | Per-page poll or webhook |
| Git commit SHA | Code repos | Re-index changed files only in CI |
Stale chunk cleanup
After re-chunking doc D with new policy or content:
- Compute new chunk_id set S_new.
- Load previous IDs S_old from metadata store.
- Delete S_old - S_new from vector index.
- Upsert all chunks in S_new.
def doc_content_hash(text: str) -> str:
return hashlib.sha256(text.encode()).hexdigest()
def sync_document(doc_id: str, text: str, state_db, vector_store, policy: str):
h = doc_content_hash(text)
prev = state_db.get(doc_id)
if prev and prev["hash"] == h and prev["policy"] == policy:
return {"status": "skipped"}
old_ids = prev["chunk_ids"] if prev else []
chunks = chunk_document(text, doc_id, policy)
new_ids = [chunk_id(doc_id, i, policy) for i in range(len(chunks))]
vector_store.delete(old_ids)
upsert_chunks(vector_store, chunks, new_ids)
state_db.put(doc_id, {"hash": h, "policy": policy, "chunk_ids": new_ids})
return {"status": "updated", "chunks": len(chunks)}
public SyncResult syncDocument(String docId, String text, DocStateRepository state, VectorStore vectors, String policy) {
var hash = sha256(text);
var prev = state.find(docId);
if (prev != null && prev.hash().equals(hash) && prev.policy().equals(policy))
return SyncResult.skipped();
if (prev != null) vectors.delete(prev.chunkIds());
var chunks = chunker.chunk(text, docId, policy);
var newIds = IntStream.range(0, chunks.size()).mapToObj(i -> chunkId(docId, i, policy)).toList();
vectors.upsert(chunks, newIds);
state.save(docId, hash, policy, newIds);
return SyncResult.updated(chunks.size());
}
Persist ingest state in Postgres, not only Redis—survive restarts and audit what version indexed each doc.
Changing embedding model without re-embed all docs breaks retrieval—treat embed_model change like schema migration with new collection.
Orchestration: Airflow, Prefect, Temporal
Ingestion jobs need retries, SLAs, and visibility. Airflow fits batch DAGs; Prefect simplifies Python-native flows; Temporal excels at durable long-running workflows with per-document activities and human approval steps.
| Engine | Best for | RAG ingest pattern |
|---|---|---|
| Airflow | Nightly batch, ops familiarity | DAG: list → map embed → validate → flip pointer |
| Prefect | Python teams, dynamic tasks | @flow with mapped document tasks |
| Temporal | Durable retries, per-tenant fairness | Workflow per corpus; activities for load/chunk/embed |
Scheduling considerations
- Rate-limit embedding API—shard work across hours if corpus is huge.
- Separate full rebuild (weekly) from incremental (15 min cron).
- Gate collection flip on eval job success (golden docs pass).
Airflow DAG sketch
Nightly full reconcile plus hourly incremental is a common split:
- list_sources — pull manifest from S3 or build from API.
- map_ingest — dynamic task per doc or batch of 50.
- validate_golden — pytest task; fail DAG if regression.
- flip_pointer — only when staging collection chunk count within 5% of expected.
Temporal vs cron
Cron re-runs failed jobs from scratch; Temporal resumes per-document workflows after worker crash. For 100K-document corpora, Temporal’s activity heartbeats and retry policies avoid duplicate embed charges from blind full retries.
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="0 * * * *", start_date=datetime(2026, 1, 1), catchup=False)
def kb_ingest():
@task
def list_docs(): return load_manifest("s3://kb/manifest.json")
@task
def ingest_doc(doc: dict):
return sync_document(doc["doc_id"], fetch_text(doc), state_db, vector_store, "v3")
@task
def flip_if_ok(results: list):
if all(r.get("status") != "error" for r in results):
flip_active_collection("kb_v43")
docs = list_docs()
results = ingest_doc.expand(doc=docs)
flip_if_ok(results)
kb_ingest()
@Scheduled(cron = "0 0 * * * *")
public void hourlyIngest() {
var docs = manifestLoader.load("s3://kb/manifest.json");
var results = docs.parallelStream().map(this::syncDocument).toList();
if (results.stream().noneMatch(r -> r.status() == Status.ERROR)) {
collectionPointer.flip("kb_v43");
}
}
Batch embed during off-peak; some providers offer batch embedding APIs at 50% discount—queue chunks overnight if latency SLO allows.
from prefect import flow, task
@task(retries=3, retry_delay_seconds=30)
def ingest_one(doc: dict) -> dict:
text = load_and_extract(doc)
return sync_document(doc["doc_id"], text, state_db, vector_store, policy="v2")
@flow
def nightly_kb_sync(manifest: list[dict]):
results = ingest_one.map(manifest)
failed = [r for r in results if r.get("status") == "error"]
if failed:
raise RuntimeError(f"{len(failed)} docs failed")
flip_active_collection("kb_v42")
// Temporal workflow interface (conceptual)
@WorkflowInterface
public interface IngestWorkflow {
@WorkflowMethod
void runIngest(String manifestUri, String targetCollection);
}
@ActivityInterface
public interface IngestActivities {
List listDocuments(String manifestUri);
IngestResult ingestDocument(DocManifest doc, String policy);
void flipCollection(String name);
}
Multi-tenant SaaS uses Temporal per-tenant task queues so one customer’s 50K-doc upload doesn’t starve others’ incremental syncs.
“How do you re-index without downtime?” — Blue/green collections, background ingest, eval gate, atomic pointer flip, rollback path.
Multi-format parsing: PDF through code repos
Each format needs tuned extraction before shared chunking. PDFs need layout awareness; code needs AST or function-level splits; CSV rows may become one chunk per row or schema-aware summaries.
| Format | Extractor | Chunking hint |
|---|---|---|
| Unstructured hi_res, LlamaParse | Page + heading boundaries; preserve tables as markdown | |
| DOCX | python-docx, Unstructured | Heading styles → section chunks |
| HTML | Trafilatura, BeautifulSoup | Strip nav; main content selector |
| Markdown | Header-based split | Keep code fences intact |
| CSV | pandas row reader | Row or group chunks + column schema in metadata |
| Code | tree-sitter, LangChain LanguageParser | Function/class chunks with path metadata |
Store mime_type, page_number, and section_title in metadata—retrieval UI uses them for citations.
def route_and_chunk(path: str, raw_bytes: bytes) -> list[dict]:
ext = path.rsplit(".", 1)[-1].lower()
if ext == "pdf":
text = partition_pdf_bytes(raw_bytes)
return chunk_by_page(text, path)
if ext in ("md", "markdown"):
return chunk_markdown(raw_bytes.decode(), path)
if ext in ("py", "java", "ts"):
return chunk_code(raw_bytes.decode(), path, language=ext)
return chunk_fixed(raw_bytes.decode(), path)
public List routeAndChunk(String path, byte[] raw) {
var ext = path.substring(path.lastIndexOf('.') + 1).toLowerCase();
return switch (ext) {
case "pdf" -> chunkByPage(pdfExtractor.extract(raw), path);
case "md", "markdown" -> chunkMarkdown(new String(raw), path);
case "py", "java", "ts" -> chunkCode(new String(raw), path, ext);
default -> chunkFixed(new String(raw), path);
};
}
PDF “text extraction” that drops tables destroys finance and spec docs—always sample 20 random PDFs manually before launch.
Code chunking with tree-sitter respects syntax boundaries—better than 512-token splits mid-function that lose signature context.
Access control: metadata filters at retrieval
Ingest must copy source permissions into chunk metadata—tenant_id, group_ids, classification. Query path applies mandatory filters so users never retrieve unauthorized chunks, even if embedding similarity is high.
Metadata fields to ingest
- tenant_id — hard filter for multi-tenant products.
- allowed_group_ids — from IdP groups or SharePoint ACLs.
- classification — public, internal, confidential.
- doc_owner — for audit and optional owner-only docs.
Never rely on prompt instructions (“only use allowed docs”)—vectors must be filtered in the database query. Post-filter in application code is acceptable only as defense in depth after index filter.
def retrieve_for_user(query: str, user: User, k: int = 10):
embedding = embed(query)
filter = {
"tenant_id": user.tenant_id,
"allowed_group_ids": {"$in": user.group_ids + ["public"]},
}
return vector_store.search(embedding, k=k, filter=filter)
public List retrieveForUser(String query, User user, int k) {
var embedding = embedder.embed(query);
var filter = Map.of(
"tenant_id", user.tenantId(),
"allowed_group_ids", Map.of("$in", concat(user.groupIds(), List.of("public")))
);
return vectorStore.search(embedding, k, filter);
}
ACL sync lag is a data leak vector. Re-ingest on permission change events; delete chunks within minutes when access revoked—not next nightly batch.
Filtered ANN indexes need composite indexes (tenant + vector)—plan index design early; brute post-filter on 10K hits is slow and unsafe.
Observability: latency, failures, and index health
Instrument every ingest stage. Operators should see documents processed/hour, p95 stage latency, failure rate by format, and total chunk count per collection—correlated with retrieval empty-hit rate downstream.
| Metric | Type | Alert threshold (example) |
|---|---|---|
| ingest_docs_total | Counter | N/A (dashboard) |
| ingest_failures_total | Counter by reason | >2% of batch |
| ingest_stage_latency_seconds | Histogram | p95 embed > 30s/doc |
| index_chunk_count | Gauge | Drop >10% day-over-day |
| ingest_lag_seconds | Gauge | Source mtime vs indexed > 1h |
Log structured fields: doc_id, stage, duration_ms, chunk_count, embed_model, collection.
from prometheus_client import Counter, Histogram, Gauge
INGEST_DOCS = Counter("ingest_docs_total", "Documents processed", ["status", "format"])
INGEST_LATENCY = Histogram("ingest_stage_latency_seconds", "Stage latency", ["stage"])
CHUNK_COUNT = Gauge("index_chunk_count", "Chunks in collection", ["collection"])
def timed_stage(stage: str):
return INGEST_LATENCY.labels(stage=stage).time()
with timed_stage("embed"):
vectors = embed_batch(chunks)
INGEST_DOCS.labels(status="ok", format="pdf").inc()
CHUNK_COUNT.labels(collection="kb_v42").set(vector_store.count())
meterRegistry.counter("ingest.docs", Tags.of("status", "ok", "format", "pdf")).increment();
meterRegistry.timer("ingest.stage", Tags.of("stage", "embed")).record(() -> embedBatch(chunks));
meterRegistry.gauge("index.chunk.count", Tags.of("collection", "kb_v42"), vectorStore::count);
Correlate ingest lag with OpenTelemetry trace IDs propagated from webhook → queue → worker—speeds RCA when one tenant’s PDFs hang OCR.
Alerting only on failure count misses slow degradation—watch p95 embed latency and chunk count drift, not just errors.
Platform teams page when ingest lag exceeds SLA and retrieval empty-hit rate spikes—often the same root cause (connector auth expired).
Golden documents: expected chunk validation
Maintain a golden doc set with expected chunk counts, boundary strings, and metadata. Run after every chunk-policy or embed-model change—CI gate before flipping active collection.
Golden test structure
{
"doc_id": "refund-policy-v3",
"source": "fixtures/refund-policy.pdf",
"chunk_policy": "heading_v2",
"expect": {
"min_chunks": 8,
"max_chunks": 14,
"must_contain": ["30-day window", "pro-rated refund"],
"metadata": {"classification": "public"}
}
}
Tests load fixture → extract → chunk → assert bounds and substring presence—no LLM needed for deterministic checks. Add optional embedding smoke test (nearest neighbor retrieves self-chunk).
Retrieval smoke on golden set
After index flip, run fixed queries against staging:
| Query | Expected doc_id in top-3 | Min score |
|---|---|---|
| "30-day refund window" | refund-policy-v3 | 0.75 cosine |
| "reset MFA device" | security-mfa | 0.70 |
CI pipeline integration
- PR changes chunker → run golden chunk tests.
- Merge to main → deploy worker → ingest staging collection.
- Golden retrieval + RAGAS sample → auto-flip or hold for human.
def test_refund_policy_chunks(golden_manifest, chunk_pipeline):
spec = golden_manifest["refund-policy-v3"]
chunks = chunk_pipeline(spec["source"], spec["chunk_policy"])
assert spec["expect"]["min_chunks"] <= len(chunks) <= spec["expect"]["max_chunks"]
blob = "\n".join(c["text"] for c in chunks)
for phrase in spec["expect"]["must_contain"]:
assert phrase in blob
@Test
void refundPolicyChunksMatchGolden() {
var spec = goldenManifest.get("refund-policy-v3");
var chunks = chunkPipeline.run(spec.source(), spec.chunkPolicy());
assertThat(chunks.size()).isBetween(spec.expect().minChunks(), spec.expect().maxChunks());
var blob = chunks.stream().map(Chunk::text).collect(Collectors.joining("\n"));
spec.expect().mustContain().forEach phrase -> assertThat(blob).contains(phrase));
}
Compare chunk count per doc before/after chunk policy change—silent 0-chunk extracts are common when HTML selectors break.
“How do you test RAG?” — Golden docs for ingest, retrieval recall@k fixtures for search, LLM-judge or RAGAS offline for end-to-end—run in CI on prompt/index changes.
Golden tests catch regressions but don’t cover all formats—invest in stratified fixtures (PDF table, scanned, code, CSV).
Production: Track 2 checklist and next steps
Track 2 closes when ingestion and retrieval run as versioned, observable pipelines with ACL-safe metadata and regression tests—not when a notebook indexes three files.
Track 2 production checklist
- ☐ Offline ingest separated from online query API
- ☐ Stable doc_id and chunk_id with policy version in hash
- ☐ Incremental sync with hash/etag; stale chunk deletion
- ☐ Blue/green or pointer-flip index versioning
- ☐ Source ACLs copied to chunk metadata; filtered retrieval
- ☐ Hybrid + rerank (or documented exception) on query path
- ☐ Advanced pattern chosen from matrix with eval evidence
- ☐ Ingest + retrieval metrics dashboards and alerts
- ☐ Golden document CI on chunk/embed changes
- ☐ Runbook: connector auth failure, embed rate limit, rollback collection
You now have the RAG stack: retrieval strategies, advanced patterns (GraphRAG, Self-RAG, CRAG, RAPTOR), and ingestion engineering. Track 3 shifts to prompt and context engineering—how you assemble retrieved chunks into reliable instructions without injection and context bloat.
Track 2 complete when stale docs index within your published SLA, unauthorized retrieval is impossible by construction, and you can roll back an bad embed deploy in under 15 minutes.
Teams that skip ingest observability debug retrieval in production for weeks—when the root cause was broken incremental sync after a credential rotation.
Before Track 3, audit that no ingest logs write full document bodies for confidential classification—log ids and hashes only.