Multimodal ingest & processing
Production document AI is not “send every PDF to GPT-4o.” It is a tiered extraction pipeline—native text first, OCR for scans, hosted parsers for brutal layouts, vision only when cheaper paths fail—plus classification that routes invoices, contracts, and general knowledge to different downstream schemas. This guide covers PDF tiers, tables and embedded images, LLM document routing, vision cost math, audio and web ingest, and the Track 6 checklist for shipping multimodal corpora safely.
After reading, you should be able to: implement a native → OCR → vision tier hierarchy with PyMuPDF, Tesseract, and LlamaParse; extract tables with Camelot/Tabula and describe embedded images for RAG metadata; classify documents into invoice/contract/general routes with a cheap LLM; compare vision page costs (~1,000–2,000 image tokens) vs text-only paths (~200–500 tokens/page); wire Whisper + diarization and Crawl4AI/FireCrawl with robots.txt and change detection; and complete the multimodal ingest production checklist before structured extraction.
PDF extraction tier hierarchy
Every page flows through a decision ladder: try fast native text extraction first; escalate to OCR when the text layer is empty or corrupt; use hosted layout parsers (LlamaParse) for complex tables and slides; reserve full-page vision for pages that still fail quality gates. Never default to the most expensive tier.
Tier 0 — Native text (PyMuPDF)
PyMuPDF (fitz) reads embedded text streams from digital-born PDFs in milliseconds per page. It returns text blocks with bounding boxes—enough for heading-aware chunking and citation links back to page numbers. Use page.get_text("dict") when you need block geometry; plain get_text() for bulk throughput.
Quality gate: if extracted characters per page exceed your threshold (typically 40–80 chars) and character entropy looks normal, mark the page tier=native and skip OCR entirely.
Tier 1 — OCR (Tesseract)
Scanned PDFs, faxes, and photographed forms have no text layer—pages are bitmaps wrapped in PDF containers. Render each page to PNG at 200–300 DPI with PyMuPDF, then run Tesseract (or cloud OCR when SLA demands higher accuracy). Pre-process with deskew and contrast normalization; OCR quality dominates downstream RAG recall.
Tesseract is free and runs on-prem—critical for regulated workloads—but struggles with multi-column layouts and handwriting. Store ocr_confidence mean per page; pages below threshold auto-escalate to Tier 2.
Tier 2 — Hosted layout parse (LlamaParse)
LlamaParse (LlamaIndex) sends PDFs to a managed parser optimized for RAG—handles nested tables, slide decks, and footnotes that defeat local heuristics. You pay per page and add vendor latency, but it often beats stitching OCR + Camelot manually. Use for: investor decks, SEC filings with dense tables, medical forms with checkboxes.
Pattern: batch overnight via API; cache parsed markdown keyed by hash(file_bytes) + parser_version so re-ingest does not re-bill.
Extraction tier decision table
| Signal | Tier | Tool | Typical cost / page | Latency |
|---|---|---|---|---|
| Selectable text; >80 chars; clean Unicode | T0 — Native | PyMuPDF | ~$0 (compute only) | <50 ms |
| Empty/garbled text; bitmap page; fax scan | T1 — OCR | Tesseract / Textract | $0–$0.015 (cloud OCR) | 200 ms–2 s |
| Multi-column tables; slides; OCR confidence <0.75 | T2 — Layout parse | LlamaParse / Unstructured hi_res | $0.003–$0.03 | 1–10 s (API) |
| Handwriting, stamps, complex diagrams; T2 still fails QA | T3 — Vision | GPT-4o / Claude page image | $0.01–$0.08 | 2–8 s |
Per-page routing orchestrator
Process pages independently—mixed PDFs are common (cover page scan + digital appendix). Persist extraction_tier, char_count, and quality_score in chunk metadata for cost attribution and debugging.
import fitz
import pytesseract
from dataclasses import dataclass
from PIL import Image
import io
MIN_NATIVE_CHARS = 80
OCR_CONF_THRESHOLD = 0.72
@dataclass
class PageExtract:
page_num: int
text: str
tier: str
meta: dict
def native_text(page: fitz.Page) -> str:
return page.get_text("text").strip()
def ocr_page(page: fitz.Page, dpi: int = 250) -> tuple[str, float]:
pix = page.get_pixmap(dpi=dpi)
img = Image.open(io.BytesIO(pix.tobytes("png")))
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
text = pytesseract.image_to_string(img)
confs = [int(c) for c in data["conf"] if str(c).isdigit() and int(c) > 0]
avg_conf = (sum(confs) / len(confs) / 100.0) if confs else 0.0
return text.strip(), avg_conf
def extract_page(page: fitz.Page, page_num: int, llamaparse_fn=None) -> PageExtract:
text = native_text(page)
if len(text) >= MIN_NATIVE_CHARS:
return PageExtract(page_num, text, "native", {"char_count": len(text)})
text, conf = ocr_page(page)
if conf >= OCR_CONF_THRESHOLD and len(text) >= MIN_NATIVE_CHARS:
return PageExtract(page_num, text, "ocr", {"ocr_confidence": conf})
if llamaparse_fn:
parsed = llamaparse_fn(page_num)
if parsed and len(parsed) >= MIN_NATIVE_CHARS:
return PageExtract(page_num, parsed, "llamaparse", {"ocr_confidence": conf})
return PageExtract(page_num, text, "ocr_low_confidence", {"ocr_confidence": conf, "needs_vision": True})
def extract_pdf(path: str, llamaparse_fn=None) -> list[PageExtract]:
doc = fitz.open(path)
return [extract_page(doc[i], i + 1, llamaparse_fn) for i in range(len(doc))]
public record PageExtract(int pageNum, String text, String tier, Map<String, Object> meta) {}
public class TieredPdfExtractor {
static final int MIN_NATIVE_CHARS = 80;
static final double OCR_CONF_THRESHOLD = 0.72;
public List<PageExtract> extract(Path pdfPath, Optional<LlamaParseClient> llama) throws IOException {
try (PDDocument doc = Loader.loadPDF(pdfPath.toFile())) {
var out = new ArrayList<PageExtract>();
for (int i = 0; i < doc.getNumberOfPages(); i++) {
out.add(extractPage(doc, i + 1, llama));
}
return out;
}
}
PageExtract extractPage(PDDocument doc, int pageNum, Optional<LlamaParseClient> llama) throws IOException {
String nativeText = new PDFTextStripper() {{
setStartPage(pageNum);
setEndPage(pageNum);
}}.getText(doc).trim();
if (nativeText.length() >= MIN_NATIVE_CHARS) {
return new PageExtract(pageNum, nativeText, "native", Map.of("char_count", nativeText.length()));
}
OcrResult ocr = tesseractOcr(doc, pageNum);
if (ocr.confidence() >= OCR_CONF_THRESHOLD && ocr.text().length() >= MIN_NATIVE_CHARS) {
return new PageExtract(pageNum, ocr.text(), "ocr", Map.of("ocr_confidence", ocr.confidence()));
}
if (llama.isPresent()) {
String parsed = llama.get().parsePage(pdfPath, pageNum);
if (parsed != null && parsed.length() >= MIN_NATIVE_CHARS) {
return new PageExtract(pageNum, parsed, "llamaparse", Map.of("ocr_confidence", ocr.confidence()));
}
}
return new PageExtract(pageNum, ocr.text(), "ocr_low_confidence",
Map.of("ocr_confidence", ocr.confidence(), "needs_vision", true));
}
}
Log tier distribution per corpus on first ingest—if 60% of “digital” PDFs hit OCR, your upstream scan pipeline is mislabeled and you are overpaying for hosted parsers.
Running Tesseract on pages that already have good native text doubles latency and can degrade quality by misreading embedded fonts. Always try native first.
At 100K pages/month, routing 85% to T0 vs blindly sending all pages to LlamaParse saves roughly $2,500–$25K depending on parser pricing—measure tier mix before committing to a vendor contract.
Self-hosted Tesseract keeps data in VPC but needs GPU-less workers sized for peak batch. Cloud OCR (Textract, Document Intelligence) costs more per page but removes ops burden and often wins on forms.
Mixed-document detection
File-level MIME sniffing is insufficient—a single 200-page PDF may contain 30 scanned exhibits. Per-page tier metadata lets chunking preserve reading order while applying different extractors within one document. Emit a manifest JSON alongside chunks: {"doc_id": "...", "pages": [{"n": 1, "tier": "native"}, {"n": 2, "tier": "ocr"}]}.
Legal-tech teams cache LlamaParse output in S3 keyed by SHA-256 of the PDF bytes. Re-indexing for a new embedding model reuses parsed markdown—only embed stage re-runs.
“How do you ingest scanned PDFs?” — Native extract → OCR with confidence gate → layout parser → vision fallback; per-page not per-file; store tier in metadata for cost attribution.
Tables & embedded images
Flattening tables to prose destroys structure invoices and contracts depend on. Extract tables as markdown or CSV with row/column metadata; describe embedded charts and photos with a cheap vision pass so RAG retrieval can match “bar chart on page 7” even when OCR missed axis labels.
Table extraction — Camelot vs Tabula
| Library | Best for | Limitations |
|---|---|---|
| Camelot | Bordered tables in digital PDFs; lattice mode for ruled grids | Requires Ghostscript; weak on borderless tables |
| Tabula | Quick CSV export; stream mode for whitespace-aligned columns | Java runtime; manual area tuning on complex pages |
| pdfplumber | Line-detection heuristics inside Python stack | Slower; memory on large files |
| LlamaParse / Unstructured | Tables inside mixed layouts after Tier 2 escalation | Per-page API cost |
Store tables as separate chunk types with content_type=table. Include page, table_index, and optional csv_uri for downstream structured extraction (Guide 5). Embed a markdown rendering for vector search; keep CSV in object storage for SQL-style queries.
import camelot
import fitz
def extract_tables(pdf_path: str, pages: str = "all") -> list[dict]:
tables = camelot.read_pdf(pdf_path, pages=pages, flavor="lattice")
chunks = []
for i, t in enumerate(tables):
md = t.df.to_markdown(index=False)
chunks.append({
"content_type": "table",
"text": md,
"metadata": {
"page": t.page,
"table_index": i,
"accuracy": t.accuracy,
"csv": t.df.to_csv(index=False),
},
})
return chunks
def list_embedded_images(pdf_path: str) -> list[dict]:
doc = fitz.open(pdf_path)
images = []
for page_num in range(len(doc)):
for img_index, img in enumerate(doc[page_num].get_images(full=True)):
xref = img[0]
base = doc.extract_image(xref)
images.append({
"page": page_num + 1,
"image_index": img_index,
"bytes": base["image"],
"ext": base["ext"],
})
return images
// Tabula via tabula-java CLI wrapper or PDFBox table heuristics
public List<TableChunk> extractTables(Path pdf) throws IOException {
var chunks = new ArrayList<TableChunk>();
try (PDDocument doc = Loader.loadPDF(pdf.toFile())) {
var extractor = new SpreadsheetExtractionAlgorithm();
int pageNum = 1;
for (PDPage page : doc.getPages()) {
var tables = extractor.extract(page);
int idx = 0;
for (Table table : tables) {
String markdown = tableToMarkdown(table);
chunks.add(new TableChunk(markdown, pageNum, idx++, tableToCsv(table)));
}
pageNum++;
}
}
return chunks;
}
Embedded image vision descriptions
PDFs embed logos, signatures, charts, and diagrams that carry no text layer. For each image above a size threshold (skip 16×16 icons), run a low-detail vision call: “Describe this figure in 2–3 sentences for search retrieval. Include chart type, axis labels if visible, and key trends.” Attach the description as a sibling chunk with content_type=image_description and link via parent_page.
import base64
from openai import OpenAI
client = OpenAI()
def describe_image(image_bytes: bytes, mime: str = "image/png") -> str:
b64 = base64.b64encode(image_bytes).decode()
resp = client.chat.completions.create(
model="gpt-4o-mini",
max_tokens=200,
messages=[{
"role": "user",
"content": [
{"type": "text", "text": "Describe this figure for search retrieval. 2-3 sentences."},
{"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}", "detail": "low"}},
],
}],
)
return resp.choices[0].message.content.strip()
import anthropic, base64
client = anthropic.Anthropic()
def describe_image(image_bytes: bytes, mime: str = "image/png") -> str:
msg = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=200,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": mime, "data": base64.standard_b64encode(image_bytes).decode()}},
{"type": "text", "text": "Describe this figure for search retrieval. 2-3 sentences."},
],
}],
)
return msg.content[0].text
import boto3, base64, json
bedrock = boto3.client("bedrock-runtime")
def describe_image(image_bytes: bytes) -> str:
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 200,
"messages": [{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": base64.b64encode(image_bytes).decode()}},
{"type": "text", "text": "Describe this figure for search retrieval."},
],
}],
}
resp = bedrock.invoke_model(modelId="anthropic.claude-3-5-haiku-20241022", body=json.dumps(body))
return json.loads(resp["body"].read())["content"][0]["text"]
public String describeImage(byte[] pngBytes) {
Media media = new Media(MimeTypeUtils.IMAGE_PNG, pngBytes);
UserMessage msg = UserMessage.builder()
.text("Describe this figure for search retrieval. 2-3 sentences.")
.media(media)
.build();
return chatClient.prompt(new Prompt(msg))
.options(OpenAiChatOptions.builder().withModel("gpt-4o-mini").withMaxTokens(200).build())
.call()
.chatResponse()
.getResult().getOutput().getContent();
}
public String describeImage(byte[] pngBytes) {
var model = AnthropicChatModel.builder()
.modelName("claude-3-5-haiku-20241022")
.maxTokens(200)
.build();
Media media = new Media(MimeTypeUtils.IMAGE_PNG, pngBytes);
return model.call(new Prompt(UserMessage.builder()
.text("Describe this figure for search retrieval.")
.media(media)
.build())).getResult().getOutput().getContent();
}
public String describeImage(byte[] pngBytes) {
var chat = new BedrockAnthropicChatModel(
BedrockAnthropicChatOptions.builder()
.withModel("anthropic.claude-3-5-haiku-20241022")
.withMaxTokens(200)
.build());
Media media = new Media(MimeTypeUtils.IMAGE_PNG, pngBytes);
return chat.call(new Prompt(UserMessage.builder()
.text("Describe this figure for search retrieval.")
.media(media)
.build())).getResult().getOutput().getContent();
}
Metadata schema for multimodal chunks
| Field | Purpose | Example |
|---|---|---|
| content_type | Filter at query time | text | table | image_description |
| page | Citation UI | 7 |
| bbox | Highlight in PDF viewer | [72, 120, 540, 400] |
| extraction_tier | Cost/debug | native | ocr | llamaparse | vision |
| source_uri | Deep link | s3://docs/invoice-8842.pdf |
| image_ref | Join description → bytes | xref:42:page:7 |
Skip vision on images smaller than 10 KB or below 100×100 px—they are usually logos and add noise to embeddings without retrieval value.
Flattening tables into running prose before chunking makes “what was Q3 revenue?” unretrievable. Always emit table chunks separately with markdown headers preserved.
Embedded images may contain redacted PII that OCR missed. Run the same PII scanner on image description text before indexing—vision models sometimes read blurred regions.
Use gpt-4o-mini or Haiku at detail: low (~85 image tokens) for figure descriptions—not full GPT-4o unless the figure is the primary artifact.
Document classification & routing
Not every upload should enter the same chunk → embed → RAG path. A lightweight LLM or embedding classifier at ingest time routes invoices to structured extraction, contracts to clause chunking, and general docs to standard RAG—cutting cost and improving downstream accuracy.
Why classify before extract?
- Invoices — line items, tax IDs, totals; need JSON schema extraction and validation, not 512-token prose chunks.
- Contracts — section-aware chunking (Termination, Indemnity); longer context windows; legal citation metadata.
- General knowledge — policies, manuals, wikis; standard RAG with hybrid retrieval.
- Unknown / low confidence — quarantine queue for human review; never silently default to general.
Classifier design
Use the first 1–2 pages (or first 2,000 tokens) as classifier input—enough signal, minimal cost. Prefer structured output with confidence score and rationale (for audit). Model choice: cheap text model (gpt-4o-mini, Haiku) or fine-tuned embedding + k-NN if you have 500+ labeled docs per class.
| Class | Downstream pipeline | Chunk strategy |
|---|---|---|
| invoice | Structured extraction → ERP | Page-level + table chunks; JSON sidecar |
| contract | Clause RAG + obligation tracker | Heading-based sections; preserve article numbers |
| general | Standard vector index | Recursive 400–800 token chunks |
| medical / legal restricted | Isolated tenant index + extra ACL | Policy-specific chunk rules |
from pydantic import BaseModel, Field
from openai import OpenAI
client = OpenAI()
class DocClassification(BaseModel):
doc_type: str = Field(description="invoice | contract | general | unknown")
confidence: float = Field(ge=0, le=1)
rationale: str
CLASSIFY_PROMPT = """Classify this document excerpt for ingest routing.
Return doc_type, confidence 0-1, and one-sentence rationale."""
def classify_document(text_sample: str) -> DocClassification:
resp = client.beta.chat.completions.parse(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": CLASSIFY_PROMPT},
{"role": "user", "content": text_sample[:8000]},
],
response_format=DocClassification,
)
return resp.choices[0].message.parsed
def route_ingest(doc_id: str, text_sample: str):
result = classify_document(text_sample)
if result.confidence < 0.75:
return enqueue_human_review(doc_id, result)
routes = {
"invoice": pipeline_invoice_structured,
"contract": pipeline_contract_clauses,
"general": pipeline_rag_standard,
}
fn = routes.get(result.doc_type, pipeline_rag_standard)
return fn(doc_id, doc_type=result.doc_type, classification=result)
public record DocClassification(String docType, double confidence, String rationale) {}
public class DocumentRouter {
private final ChatClient chatClient;
public IngestJob route(String docId, String textSample) {
DocClassification c = classify(textSample.substring(0, Math.min(8000, textSample.length())));
if (c.confidence() < 0.75) {
return humanReviewQueue.enqueue(docId, c);
}
return switch (c.docType()) {
case "invoice" -> invoicePipeline.submit(docId, c);
case "contract" -> contractPipeline.submit(docId, c);
default -> ragPipeline.submit(docId, c);
};
}
DocClassification classify(String sample) {
// Spring AI structured output → DocClassification record
return chatClient.prompt()
.system("Classify for ingest routing: invoice | contract | general | unknown")
.user(sample)
.call()
.entity(DocClassification.class);
}
}
LLM classification costs ~$0.0001–$0.001 per doc vs free keyword rules—but keyword rules fail on multilingual and messy filenames. Hybrid: filename heuristics first, LLM only when uncertain.
“How do you handle mixed document types in one upload bucket?” — Classify early, route to specialized pipelines, quarantine low-confidence; store doc_type in metadata for filtered retrieval.
Accounts-payable bots classify on subject + first page; misrouted invoices fail JSON validation downstream—classification errors become measurable via schema validation rate, not silent RAG drift.
Evaluating the classifier
Maintain 50–100 labeled docs per class in a golden set; track precision/recall per type in CI. When a new doc type emerges (e.g., purchase orders), add it as a new route rather than forcing into general—downstream extraction schemas depend on clean routing.
Classifying after full OCR on 200 pages wastes money. Sample page 1 (and page 2 if page 1 is a cover sheet) before running expensive extraction tiers on the whole file.
Vision fallback — page-as-image
When T1–T2 extraction fails QA, render the page to PNG and send it to GPT-4o or Claude with a structured prompt. Vision is the most accurate and most expensive tier—budget it deliberately with token math and a hybrid strategy that keeps 90%+ of pages on text paths.
Token cost comparison
| Path | Tokens / page (typical) | Relative cost | When to use |
|---|---|---|---|
| Native PyMuPDF text | 200–500 text tokens (extracted prose) | 1× (baseline embed + optional LLM) | Digital PDFs, clean reports |
| OCR → text only | 250–600 text tokens (noisy OCR inflated) | 1–1.5× | Scans with decent OCR confidence |
| Vision page detail: high | 1,000–2,000+ image tokens + prompt | 5–20× vs text-only | Complex layout, handwriting, stamps |
| Vision page detail: low | ~85 image tokens + prompt | 2–4× vs text-only | Triage, classification, coarse describe |
Example: 10,000 pages/month at 1,500 vision image tokens + 300 text tokens on GPT-4o input pricing dwarfs the same corpus at 400 text tokens/page through native extract. Cap vision pages per document (e.g., max 5 escalation pages) and require needs_vision=true from the tier router.
Hybrid strategy
- Extract text via T0–T2; compute page quality score (char count, OCR conf, table detection).
- Escalate selectively — only pages flagged needs_vision or random 5% sample for QA.
- Vision prompt — ask for markdown transcription, not free-form summary, so output re-enters the same chunk pipeline.
- Merge — replace low-quality text chunks with vision transcription; keep tier metadata.
- Monitor — dashboard: % pages by tier, avg tokens/page, cost per doc_type.
import base64, fitz
from openai import OpenAI
client = OpenAI()
VISION_PROMPT = """Transcribe this document page to markdown.
Preserve headings, tables, and lists. Do not summarize."""
def page_to_png_bytes(doc: fitz.Document, page_num: int, dpi: int = 200) -> bytes:
return doc[page_num - 1].get_pixmap(dpi=dpi).tobytes("png")
def vision_transcribe_page(png_bytes: bytes) -> tuple[str, int]:
b64 = base64.b64encode(png_bytes).decode()
resp = client.chat.completions.create(
model="gpt-4o",
max_tokens=4096,
messages=[{
"role": "user",
"content": [
{"type": "text", "text": VISION_PROMPT},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}", "detail": "high"}},
],
}],
)
text = resp.choices[0].message.content
tokens = resp.usage.prompt_tokens
return text, tokens
import anthropic, base64
client = anthropic.Anthropic()
def vision_transcribe_page(png_bytes: bytes) -> tuple[str, int]:
msg = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": base64.standard_b64encode(png_bytes).decode()}},
{"type": "text", "text": "Transcribe this page to markdown. Preserve tables."},
],
}],
)
return msg.content[0].text, msg.usage.input_tokens
import boto3, base64, json
bedrock = boto3.client("bedrock-runtime")
def vision_transcribe_page(png_bytes: bytes) -> tuple[str, int]:
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"messages": [{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": base64.b64encode(png_bytes).decode()}},
{"type": "text", "text": "Transcribe this page to markdown."},
],
}],
}
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-5-sonnet-20241022-v2:0",
body=json.dumps(body),
)
payload = json.loads(resp["body"].read())
return payload["content"][0]["text"], payload["usage"]["input_tokens"]
public VisionPageResult transcribePage(byte[] pngBytes) {
Media media = new Media(MimeTypeUtils.IMAGE_PNG, pngBytes);
UserMessage msg = UserMessage.builder()
.text("Transcribe this document page to markdown. Preserve tables.")
.media(media)
.build();
ChatResponse resp = chatClient.prompt(new Prompt(msg))
.options(OpenAiChatOptions.builder().withModel("gpt-4o").withMaxTokens(4096).build())
.call()
.chatResponse();
int promptTokens = resp.getMetadata().getUsage().getPromptTokens();
return new VisionPageResult(resp.getResult().getOutput().getContent(), promptTokens);
}
public VisionPageResult transcribePage(byte[] pngBytes) {
var model = AnthropicChatModel.builder()
.modelName("claude-3-5-sonnet-20241022")
.maxTokens(4096)
.build();
Media media = new Media(MimeTypeUtils.IMAGE_PNG, pngBytes);
ChatResponse resp = model.call(new Prompt(UserMessage.builder()
.text("Transcribe this page to markdown.")
.media(media)
.build()));
return new VisionPageResult(
resp.getResult().getOutput().getContent(),
resp.getMetadata().getUsage().getPromptTokens());
}
public VisionPageResult transcribePage(byte[] pngBytes) {
var chat = new BedrockAnthropicChatModel(
BedrockAnthropicChatOptions.builder()
.withModel("anthropic.claude-3-5-sonnet-20241022-v2:0")
.withMaxTokens(4096)
.build());
Media media = new Media(MimeTypeUtils.IMAGE_PNG, pngBytes);
ChatResponse resp = chat.call(new Prompt(UserMessage.builder()
.text("Transcribe this page to markdown.")
.media(media)
.build()));
return new VisionPageResult(
resp.getResult().getOutput().getContent(),
resp.getMetadata().getUsage().getPromptTokens());
}
Resize page renders to max 1568px on the long edge before vision—GPT-4o bills image tokens by tile count; shrinking a 300 DPI A4 render often cuts image tokens 30–50% with negligible accuracy loss on text-heavy pages.
Full-page vision transcription is simpler to implement than LlamaParse but 5–10× more expensive at scale. Prefer hosted layout parse at T2 when recurring doc types justify vendor cost.
Log vision_prompt_tokens per page to FinOps dashboards. Spikes often trace to forgotten detail: high on batch jobs meant for triage.
Vision API calls send pixels to third parties—route confidential docs through VPC-hosted OCR or on-prem vision only; block T3 escalation via policy flag on classification=restricted.
QA gate before accepting vision output
Compare vision transcription length to OCR attempt; if vision returns <50 chars, retry at higher DPI or flag for human review. Optional: LLM-as-judge on sample pages comparing vision vs human gold—track character error rate in CI (Track 5).
Audio & web ingest
Multimodal corpora extend beyond PDFs: call recordings, podcasts, and public web docs. Whisper + diarization turns audio into searchable transcripts with speaker labels; Crawl4AI / FireCrawl extract clean markdown from JS-heavy sites—with robots.txt respect and content-hash change detection for incremental sync.
Whisper ASR pipeline
OpenAI Whisper (or faster-whisper locally) transcribes audio to text with segment timestamps. For support calls and meetings, add speaker diarization (pyannote, AssemblyAI, Deepgram) so chunks carry speaker_id metadata—“what did the agent promise?” becomes retrievable.
| Stage | Tool | Output |
|---|---|---|
| Normalize | ffmpeg | 16 kHz mono WAV |
| Transcribe | Whisper / faster-whisper | Segments with start/end |
| Diarize | pyannote / cloud API | Speaker labels per segment |
| Chunk | Time-window or speaker-turn | 30–120 s chunks for embed |
| PII scrub | Presidio / regex | Redacted transcript |
from openai import OpenAI
from dataclasses import dataclass
client = OpenAI()
@dataclass
class AudioChunk:
text: str
start_sec: float
end_sec: float
speaker: str | None
def transcribe_audio(path: str) -> list[AudioChunk]:
with open(path, "rb") as f:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=f,
response_format="verbose_json",
timestamp_granularities=["segment"],
)
chunks = []
for seg in transcript.segments:
chunks.append(AudioChunk(
text=seg.text.strip(),
start_sec=seg.start,
end_sec=seg.end,
speaker=None, # fill after diarization merge
))
return chunks
def merge_diarization(chunks: list[AudioChunk], diarization: list[dict]) -> list[AudioChunk]:
"""Align speaker labels by segment midpoint overlap."""
for c in chunks:
mid = (c.start_sec + c.end_sec) / 2
for d in diarization:
if d["start"] <= mid <= d["end"]:
c.speaker = d["speaker"]
break
return chunks
public List<AudioChunk> transcribe(Path audioFile) throws IOException {
var request = TranscriptionRequest.builder()
.model("whisper-1")
.responseFormat(TranscriptionResponseFormat.VERBOSE_JSON)
.build();
TranscriptionResult result = openAiAudio.transcribe(audioFile, request);
return result.segments().stream()
.map(s -> new AudioChunk(s.text(), s.startSec(), s.endSec(), null))
.toList();
}
public List<AudioChunk> mergeDiarization(List<AudioChunk> chunks, List<DiarSegment> diar) {
for (AudioChunk c : chunks) {
double mid = (c.startSec() + c.endSec()) / 2.0;
diar.stream()
.filter(d -> d.start() <= mid && mid <= d.end())
.findFirst()
.ifPresent(d -> c.setSpeaker(d.speaker()));
}
return chunks;
}
Call recordings contain PCI/PII—scrub before embed; restrict raw audio retention; use on-prem Whisper when contracts prohibit cloud audio upload.
Web scraping — Crawl4AI & FireCrawl
Public docs, changelogs, and help centers feed RAG indexes via web ingest. Crawl4AI (open source) runs headless Chromium, strips boilerplate, outputs LLM-friendly markdown. FireCrawl is a hosted API with crawl maps, sitemap discovery, and rate limiting—faster to ship, recurring cost.
robots.txt and politeness
- Parse /robots.txt before crawl; honor Disallow and Crawl-delay.
- Set a identifiable User-Agent with contact URL.
- Rate-limit per domain (1–2 req/s default); backoff on 429/503.
- Never crawl authenticated areas without explicit customer authorization.
Change detection
Store content_hash = sha256(normalized_markdown) per URL. On scheduled recrawl, skip embed when hash unchanged; delete stale chunks when hash changes or URL 404s. Pair with last_modified from HTTP headers when available.
import hashlib
from urllib.robotparser import RobotFileParser
from crawl4ai import AsyncWebCrawler
def allowed(url: str, user_agent: str = "SharpbyteBot/1.0") -> bool:
rp = RobotFileParser()
rp.set_url(f"{url.split('/')[0]}//{url.split('/')[2]}/robots.txt")
rp.read()
return rp.can_fetch(user_agent, url)
def content_hash(markdown: str) -> str:
normalized = "\n".join(line.strip() for line in markdown.splitlines() if line.strip())
return hashlib.sha256(normalized.encode()).hexdigest()
async def crawl_and_sync(url: str, state_db) -> str:
if not allowed(url):
return "blocked_by_robots"
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(url=url)
md = result.markdown or ""
h = content_hash(md)
prev = state_db.get_hash(url)
if prev == h:
return "unchanged"
chunks = chunk_markdown(md, source_url=url)
vector_store.upsert(chunks)
state_db.save(url, h)
return "updated"
public SyncResult crawlAndSync(String url, CrawlStateDb state) {
if (!robotsTxt.allowed(url, "SharpbyteBot/1.0")) {
return SyncResult.BLOCKED_BY_ROBOTS;
}
FirecrawlResponse page = firecrawlClient.scrape(url);
String hash = contentHash(normalize(page.markdown()));
Optional<String> prev = state.getHash(url);
if (prev.isPresent() && prev.get().equals(hash)) {
return SyncResult.UNCHANGED;
}
var chunks = markdownChunker.chunk(page.markdown(), url);
vectorStore.upsert(chunks);
state.save(url, hash);
return SyncResult.UPDATED;
}
String contentHash(String normalized) {
return HexFormat.of().formatHex(
MessageDigest.getInstance("SHA-256").digest(normalized.getBytes(StandardCharsets.UTF_8)));
}
Crawling without normalizing HTML nav/footer poisons embeddings with repeated chrome. Use main-content selectors or Crawl4AI’s pruning—hash the cleaned body only.
SaaS doc sites re-crawl nightly; hash diff reduces embed API calls 90%+ when only a few pages change per release—critical when embed pricing scales with token volume.
“How do you keep a web RAG index fresh?” — Scheduled crawl, content hash per URL, upsert/delete stale chunks, robots.txt compliance, sitemap seed URLs, alert on crawl failure rate.
Store crawl_snapshot_uri in chunk metadata—when users ask “when was this doc updated?” you answer from indexed_at and HTTP Last-Modified, not model guesswork.
Production: multimodal ingest checklist
Multimodal ingest is production-ready when tiers are measured, routes are classified, vision is capped, and web/audio paths respect compliance—not when a notebook parses one PDF. Complete this checklist before Guide 5 structured extraction and downstream ERP/legal integrations.
Track 6 multimodal ingest checklist
- ☐ Tiered PDF pipeline: native → OCR → layout parse → vision with per-page metadata
- ☐ Extraction tier decision table documented; tier mix dashboard live
- ☐ Tables extracted as separate chunks (Camelot/Tabula/LlamaParse); CSV sidecar in object storage
- ☐ Embedded images described with cheap vision; size threshold skips logos
- ☐ Document classifier routes invoice / contract / general; quarantine below confidence 0.75
- ☐ Vision fallback capped per doc; page renders resized; confidential docs block T3
- ☐ Token budget: text path ~200–500 tokens/page vs vision ~1,000–2,000 logged in FinOps
- ☐ Whisper + diarization pipeline with PII scrub on transcripts
- ☐ Web crawl honors robots.txt; content-hash incremental sync; identifiable User-Agent
- ☐ Parsed output cached by file hash; re-embed does not re-parse or re-vision
- ☐ Golden multimodal fixtures in CI (digital PDF, scan, table-heavy, audio snippet, crawled page)
- ☐ Runbook: OCR worker OOM, LlamaParse rate limit, vision cost spike, crawl blocklist
Guide 4 connects Track 1 multimodal concepts and Track 2 ingestion patterns to shipping discipline. Next, structured extraction turns classified invoices and forms into validated JSON schemas— the bridge between messy bytes and business systems.
Ship when >90% of pages stay on T0–T1, vision spend is under budget cap, and classifier quarantine rate is stable week-over-week—not on first successful demo parse.
Track 6 path: Continue to Guide 5 — Structured extraction or return to Shipping track home.
Teams that skip tier telemetry discover at invoice time that 40% of “text PDFs” were silently OCR’d at 10× latency—tier dashboards prevent month-three surprises.
Audit third-party parse/vision vendors against data residency requirements before Track 6 launch—switching vendors after ingest cache is populated is painful.
“Design a document ingest system.” — Start with tier hierarchy and classification router, mention table/image side paths, cap vision with token math, close with hash-based incremental sync and observability.