Files
linear-coding-agent/generations/library_rag/utils/pdf_pipeline.py
David Blanc Brioir d2f7165120 Add Library RAG project and cleanup root directory
- Add complete Library RAG application (Flask + MCP server)
  - PDF processing pipeline with OCR and LLM extraction
  - Weaviate vector database integration (BGE-M3 embeddings)
  - Flask web interface with search and document management
  - MCP server for Claude Desktop integration
  - Comprehensive test suite (134 tests)

- Clean up root directory
  - Remove obsolete documentation files
  - Remove backup and temporary files
  - Update autonomous agent configuration

- Update prompts
  - Enhance initializer bis prompt with better instructions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-30 11:57:12 +01:00

1440 lines
66 KiB
Python

"""PDF Processing Pipeline for Philosophical and Academic Texts.
This module provides a comprehensive 10-step pipeline for processing PDF documents
through OCR, intelligent LLM-based extraction, and vector database ingestion.
It is the main orchestration layer for the Library RAG application.
Pipeline V2 Architecture (10 Steps):
1. **OCR Processing**: Extract text and images via Mistral OCR API (~0.003 EUR/page)
2. **Markdown Building**: Convert OCR output to structured Markdown
3. **Image Extraction**: Save document images to disk (optional base64 embedding)
4. **Metadata Extraction**: LLM-based extraction of title, author, year, language
5. **TOC Extraction**: Hierarchical table of contents via LLM or OCR annotations
6. **Section Classification**: Classify sections (main_content, exposition, etc.)
7. **Semantic Chunking**: Split content into argumentative units via LLM
8. **Chunk Cleaning**: Remove OCR artifacts and validate chunk quality
9. **Validation & Enrichment**: LLM validation + concept extraction
10. **Weaviate Ingestion**: Vectorize and store chunks for semantic search
LLM Provider Options:
- **Ollama** (local): Free, slower, good for development/testing
- **Mistral API** (remote): Fast, paid, recommended for production
Key Parameters:
- ``skip_ocr``: Reuse existing markdown (avoids OCR cost on re-processing)
- ``use_llm``: Enable/disable LLM processing steps
- ``use_semantic_chunking``: Use LLM for intelligent chunking (slower but precise)
- ``use_ocr_annotations``: Use OCR annotations for TOC (3x cost, more reliable)
- ``ingest_to_weaviate``: Insert chunks into Weaviate vector database
Example:
Basic usage with default settings (Ollama local)::
from pathlib import Path
from utils.pdf_pipeline import process_pdf
result = process_pdf(
Path("input/descartes_meditations.pdf"),
output_dir=Path("output"),
use_llm=True,
llm_provider="ollama",
)
if result["success"]:
print(f"Processed: {result['document_name']}")
print(f"Chunks: {result['chunks_count']}")
print(f"Cost: {result.get('cost_total', 0):.4f} EUR")
Production usage with Mistral API::
result = process_pdf(
Path("input/document.pdf"),
llm_provider="mistral",
use_semantic_chunking=True, # Better quality chunks
use_ocr_annotations=True, # More reliable TOC
ingest_to_weaviate=True,
)
Re-processing without OCR cost::
result = process_pdf(
Path("input/document.pdf"),
skip_ocr=True, # Reuse existing markdown
use_llm=True,
)
Cost Tracking:
The pipeline tracks costs for both OCR and LLM operations:
- OCR costs: ~0.003 EUR/page (standard), ~0.009 EUR/page (with annotations)
- LLM costs: Variable when using Mistral API, free with Ollama
Costs are returned in the result dict::
result["cost_ocr"] # OCR cost in euros
result["cost_llm"] # LLM cost in euros (Mistral API only)
result["cost_total"] # Total cost
Output Files:
For each processed document, the pipeline creates::
output/<doc_name>/
<doc_name>.md # Structured markdown
<doc_name>_ocr.json # Raw OCR response
<doc_name>_chunks.json # Processed chunks + metadata
<doc_name>_weaviate.json # Weaviate ingestion results
images/ # Extracted images (if not embedded)
See Also:
- :mod:`utils.mistral_client`: OCR API client
- :mod:`utils.llm_metadata`: Metadata extraction
- :mod:`utils.llm_toc`: TOC extraction
- :mod:`utils.llm_classifier`: Section classification
- :mod:`utils.llm_chunker`: Semantic chunking
- :mod:`utils.llm_cleaner`: Chunk cleaning
- :mod:`utils.llm_validator`: Document validation
- :mod:`utils.weaviate_ingest`: Vector database ingestion
- :mod:`utils.types`: Type definitions for all data structures
Note:
This module supports both V1 (legacy) and V2 (current) pipelines.
V1 is kept for backwards compatibility but V2 is recommended.
Version: 2.0
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, TYPE_CHECKING, Union, cast
# Import Mistral types for type checking only (avoids runtime import issues)
if TYPE_CHECKING:
from mistralai import Mistral as MistralClient
from .types import (
ChunkData,
ChunksDataExport,
ClassifiedSection,
DocumentHierarchy,
FlatChunk,
FlatSection,
HierarchyNode,
LegacyMetadata,
LLMCostStats,
LLMProvider,
Metadata,
OptionalProgressCallback,
ParsedDocument,
PipelineResult,
SemanticChunk,
SimpleTOCEntry,
TOCEntry,
TOCExtractionResult,
V1PipelineResult,
V2PipelineFiles,
V2PipelineResult,
ValidationResult,
WeaviateIngestResult,
)
from .mistral_client import create_client, estimate_ocr_cost
from .ocr_processor import run_ocr, serialize_ocr_response
from .markdown_builder import build_markdown
from .image_extractor import create_image_writer
from .hierarchy_parser import build_hierarchy, flatten_hierarchy
from .llm_structurer import structure_with_llm, LLMStructureError, LLMStructuredResult, reset_llm_cost, get_llm_cost
# Nouveaux modules LLM v2
from .llm_metadata import extract_metadata
from .llm_toc import extract_toc
from .llm_classifier import classify_sections, filter_indexable_sections
from .llm_cleaner import clean_chunk, is_chunk_valid
from .llm_chunker import chunk_section_with_llm, simple_chunk_by_paragraphs
from .llm_validator import validate_document, apply_corrections, enrich_chunks_with_concepts
from .weaviate_ingest import ingest_document
# Logger
logger = logging.getLogger(__name__)
def extract_document_metadata_legacy(
hierarchy: DocumentHierarchy,
chunks: List[Dict[str, Any]],
doc_name: str,
) -> LegacyMetadata:
"""Extrait les métadonnées du document depuis la hiérarchie et les chunks (méthode legacy).
Args:
hierarchy: Structure hiérarchique du document (DocumentHierarchy).
chunks: Liste des chunks aplatis.
doc_name: Nom du document.
Returns:
Dictionnaire de métadonnées au format LegacyMetadata.
"""
import re
metadata: LegacyMetadata = {
"title": None,
"author": None,
"work": doc_name,
"chunks_count": len(chunks),
"toc": [],
}
sections: List[HierarchyNode] = hierarchy.get("sections", [])
if sections:
first_section: HierarchyNode = sections[0]
metadata["title"] = first_section.get("title")
def _extract_toc_entries(
sections: List[HierarchyNode],
level: int = 0,
) -> List[SimpleTOCEntry]:
"""Extract simple TOC entries from hierarchy nodes."""
toc_items: List[SimpleTOCEntry] = []
for section in sections:
section_level: int = section.get("level", 1)
if section_level <= 2:
toc_items.append({
"title": section.get("title") or "Sans titre",
"level": section_level,
})
children: List[HierarchyNode] = section.get("children", [])
if children:
toc_items.extend(_extract_toc_entries(children, level + 1))
return toc_items
metadata["toc"] = _extract_toc_entries(sections)
preamble: str = hierarchy.get("preamble", "")
author_patterns: List[str] = [
r"(?:par|by|auteur|author)[:\s]+([A-ZÀ-Ü][a-zà-ü]+(?:\s+[A-ZÀ-Ü][a-zà-ü]+)*)",
r"([A-ZÀ-Ü][a-zà-ü]+(?:\s+[A-ZÀ-Ü][a-zà-ü]+)*)\s*[-–—]\s*(?:auteur|author)",
]
for pattern in author_patterns:
match = re.search(pattern, preamble, re.IGNORECASE)
if match:
metadata["author"] = match.group(1).strip()
break
return metadata
def process_pdf_v2(
pdf_path: Path,
output_dir: Path = Path("output"),
*,
document_name: Optional[str] = None,
api_key: Optional[str] = None,
use_llm: bool = True,
llm_provider: LLMProvider = "ollama",
llm_model: Optional[str] = None,
llm_base_url: Optional[str] = None,
llm_temperature: float = 0.2,
embed_images: bool = False,
ingest_to_weaviate: bool = True,
clean_chunks: bool = True,
extract_concepts: bool = True,
validate_output: bool = True,
skip_ocr: bool = False,
use_ocr_annotations: bool = False,
max_toc_pages: int = 8,
use_semantic_chunking: bool = False,
progress_callback: OptionalProgressCallback = None,
) -> V2PipelineResult:
"""Process a PDF document through the intelligent V2 pipeline with LLM extraction.
This is the main entry point for processing philosophical and academic PDF documents.
The pipeline performs OCR, LLM-based metadata/TOC extraction, semantic chunking,
and optional Weaviate ingestion for semantic search.
The 10 processing steps are:
1. OCR via Mistral API (unless skip_ocr=True)
2. Markdown construction from OCR output
3. Image extraction to disk (or base64 embedding)
4. Metadata extraction via LLM (title, author, year, language)
5. TOC extraction via LLM or OCR annotations
6. Section classification via LLM (main_content, exposition, etc.)
7. Semantic chunking via LLM (argumentative units)
8. Chunk cleaning (remove OCR artifacts, validate quality)
9. Validation and concept enrichment via LLM
10. Weaviate ingestion (vectorization and storage)
Args:
pdf_path: Path to the PDF file to process. Must exist and be readable.
output_dir: Base output directory. A subdirectory named after the document
will be created. Defaults to "output".
document_name: Override the document name (derived from filename if None).
api_key: Mistral API key for OCR. If None, uses MISTRAL_API_KEY env var.
use_llm: Enable LLM processing steps (metadata, TOC, classification,
chunking, validation). Set to False for basic processing only.
llm_provider: LLM provider to use. "ollama" for local (free, slower) or
"mistral" for API (fast, paid). Defaults to "ollama".
llm_model: Specific model name. If None, uses provider default
(e.g., "qwen2.5:7b" for Ollama, "mistral-small" for Mistral).
llm_base_url: Base URL for Ollama server. Defaults to "http://localhost:11434".
llm_temperature: LLM sampling temperature. Lower values (0.1-0.3) produce
more deterministic output. Defaults to 0.2.
embed_images: If True, embed images as base64 in markdown. If False,
save images to disk. Defaults to False.
ingest_to_weaviate: Insert processed chunks into Weaviate vector database
for semantic search. Requires Weaviate to be running. Defaults to True.
clean_chunks: Apply chunk cleaning to remove OCR artifacts and validate
minimum quality thresholds. Defaults to True.
extract_concepts: Extract philosophical/academic concepts from chunks
using LLM. Adds concept keywords for search. Defaults to True.
validate_output: Run final LLM validation on the processed document.
Checks metadata accuracy and suggests corrections. Defaults to True.
skip_ocr: Skip OCR step and reuse existing markdown file from previous run.
Useful for re-processing without incurring OCR costs. Defaults to False.
use_ocr_annotations: Use OCR with annotations for more reliable TOC extraction.
Costs approximately 3x more than standard OCR. Defaults to False.
max_toc_pages: Maximum number of pages to analyze for TOC extraction.
Mistral API limits to 8 pages maximum. Defaults to 8.
use_semantic_chunking: Use LLM-based semantic chunking instead of basic
paragraph splitting. Slower but produces higher quality argumentative
units. Defaults to False.
progress_callback: Optional callback function for progress updates.
Signature: ``callback(step_id: str, status: str, detail: str | None)``.
step_id values: ocr, markdown, metadata, toc, classify, chunking,
cleaning, validation, weaviate.
status values: active, completed, error, skipped.
Returns:
V2PipelineResult dictionary containing:
- success (bool): Whether processing completed successfully.
- document_name (str): Name of the processed document.
- output_dir (str): Path to the output directory.
- files (dict): Paths to generated files (markdown, chunks, ocr, weaviate).
- pages (int): Number of pages in the PDF.
- cost_ocr (float): OCR cost in euros.
- cost_llm (float): LLM cost in euros (Mistral API only).
- cost_total (float): Total processing cost.
- llm_stats (dict | None): Detailed LLM usage statistics.
- metadata (dict): Extracted document metadata.
- toc (list): Hierarchical table of contents.
- chunks_count (int): Number of chunks generated.
- validation (dict | None): Validation results if enabled.
- weaviate_ingest (dict | None): Weaviate ingestion results.
- pipeline_version (str): Always "2.0" for this pipeline.
- error (str): Error message if success is False.
Raises:
FileNotFoundError: If pdf_path does not exist and skip_ocr is False.
ValueError: If skip_ocr is True but no cached markdown exists.
Example:
Basic processing with Ollama::
from pathlib import Path
from utils.pdf_pipeline import process_pdf_v2
result = process_pdf_v2(
Path("input/plato_republic.pdf"),
use_llm=True,
llm_provider="ollama",
)
if result["success"]:
print(f"Processed {result['chunks_count']} chunks")
print(f"Title: {result['metadata'].get('title')}")
Production processing with Mistral API::
result = process_pdf_v2(
Path("input/kant_critique.pdf"),
llm_provider="mistral",
use_semantic_chunking=True,
use_ocr_annotations=True,
)
print(f"Total cost: {result['cost_total']:.4f} EUR")
Note:
Cost implications:
- OCR: ~0.003 EUR/page (standard), ~0.009 EUR/page (with annotations)
- LLM (Mistral API): Variable based on token usage
- LLM (Ollama): Free (local processing)
Use ``skip_ocr=True`` when re-processing to avoid OCR costs.
The function will reuse the existing markdown file.
"""
pdf_path = Path(pdf_path).resolve()
# Detect file type by extension
file_ext: str = pdf_path.suffix.lower()
is_markdown_file: bool = file_ext == ".md"
doc_name: str = document_name or pdf_path.stem
if doc_name.lower().endswith(".pdf"):
doc_name = doc_name[:-4]
elif doc_name.lower().endswith(".md"):
doc_name = doc_name[:-3]
doc_output_dir: Path = Path(output_dir).resolve() / doc_name
doc_output_dir.mkdir(parents=True, exist_ok=True)
# Chemins des fichiers de sortie
md_path: Path = doc_output_dir / f"{doc_name}.md"
chunks_path: Path = doc_output_dir / f"{doc_name}_chunks.json"
ocr_path: Path = doc_output_dir / f"{doc_name}_ocr.json"
images_dir: Path = doc_output_dir / "images"
logger.info(f"[V2] Traitement de : {pdf_path}")
logger.info(f"[V2] Sortie dans : {doc_output_dir}")
# Helper pour émettre la progression
def emit_progress(step: str, status: str, detail: Optional[str] = None) -> None:
if progress_callback:
try:
progress_callback(step, status, detail)
except Exception:
pass
try:
# ═══════════════════════════════════════════════════════════════════
# ÉTAPE 1-4 : OCR et Markdown (sauf si skip_ocr)
# ═══════════════════════════════════════════════════════════════════
nb_pages: int = 0
cost: float = 0.0 # Coût OCR (0 si skip_ocr)
# Réinitialiser le compteur de coût LLM pour ce document
if llm_provider == "mistral":
reset_llm_cost()
if is_markdown_file:
# Mode Markdown natif : charger directement le fichier .md
emit_progress("ocr", "active", "Chargement Markdown...")
if not pdf_path.is_file():
emit_progress("ocr", "error", "Fichier Markdown introuvable")
return {
"success": False,
"error": f"Fichier Markdown introuvable : {pdf_path}",
}
logger.info("[1-4/10] 📝 Chargement direct du fichier Markdown (pas d'OCR)")
markdown_text: str = pdf_path.read_text(encoding="utf-8")
# Copier le contenu vers le répertoire de sortie
md_path.write_text(markdown_text, encoding="utf-8")
# Estimer le nombre de "pages" à partir des en-têtes H1
h1_count = markdown_text.count("\n# ") + (1 if markdown_text.startswith("# ") else 0)
nb_pages = max(h1_count, 1) # Au moins 1 "page"
emit_progress("ocr", "skipped", f"Markdown (0.00€)")
emit_progress("markdown", "completed", f"{nb_pages} sections détectées")
logger.info(f"Fichier Markdown chargé directement ({nb_pages} sections H1 détectées)")
elif skip_ocr:
# Mode test : réutiliser le markdown existant
emit_progress("ocr", "active", "Vérification du cache...")
if not md_path.exists():
emit_progress("ocr", "error", "Fichier non trouvé")
return {
"success": False,
"error": f"Mode skip_ocr activé mais fichier non trouvé : {md_path}",
}
logger.info("[1-4/10] ⚡ Skip OCR - Réutilisation du markdown existant")
markdown_text = md_path.read_text(encoding="utf-8")
# Essayer de récupérer le nombre de pages depuis l'OCR existant
if ocr_path.exists():
try:
ocr_data = json.loads(ocr_path.read_text(encoding="utf-8"))
nb_pages = len(ocr_data.get("pages", []))
except Exception:
nb_pages = markdown_text.count("<!-- Page")
else:
nb_pages = markdown_text.count("<!-- Page")
emit_progress("ocr", "skipped", f"{nb_pages} pages (cache)")
emit_progress("markdown", "skipped", "Réutilisation")
logger.info(f"Markdown existant chargé ({nb_pages} pages détectées)")
else:
# Mode normal : OCR via Mistral
if not pdf_path.is_file():
emit_progress("ocr", "error", "PDF introuvable")
return {
"success": False,
"error": f"Fichier PDF introuvable : {pdf_path}",
}
emit_progress("ocr", "active", "Connexion à Mistral...")
logger.info("[1/10] Connexion à Mistral...")
client: Any = create_client(api_key) # Mistral client
pdf_bytes: bytes = pdf_path.read_bytes()
emit_progress("ocr", "active", "OCR en cours...")
logger.info("[2/10] OCR en cours...")
# Step 1: OCR processing - returns Mistral OCR response (Pydantic model)
ocr_response: Any = run_ocr(client, pdf_bytes, pdf_path.name, include_images=True)
nb_pages = len(ocr_response.pages)
cost = estimate_ocr_cost(nb_pages)
emit_progress("ocr", "completed", f"{nb_pages} pages ({cost:.4f}€)")
logger.info(f"OCR terminé : {nb_pages} pages (coût estimé : {cost:.4f}€)")
# Sauvegarder OCR brut
ocr_json: Dict[str, Any] = serialize_ocr_response(ocr_response)
ocr_path.write_text(json.dumps(ocr_json, ensure_ascii=False, indent=2), encoding="utf-8")
# Step 3: Image extraction
emit_progress("markdown", "active", "Construction du markdown...")
image_writer: Optional[Callable[[int, int, str], Optional[str]]] = None
if not embed_images:
logger.info("[3/10] Extraction des images...")
images_dir.mkdir(parents=True, exist_ok=True)
image_writer = create_image_writer(images_dir)
# Step 4: Markdown building - input: OCR response, output: str
logger.info("[4/10] Construction du Markdown...")
markdown_text = build_markdown(ocr_response, embed_images=embed_images, image_writer=image_writer)
md_path.write_text(markdown_text, encoding="utf-8")
emit_progress("markdown", "completed", "Document généré")
# Analyse hiérarchique basique (fallback)
hierarchy: DocumentHierarchy = build_hierarchy(markdown_text)
basic_chunks: List[FlatChunk] = flatten_hierarchy(hierarchy)
# ═══════════════════════════════════════════════════════════════════
# ÉTAPE 5 : Extraction métadonnées via LLM
# ═══════════════════════════════════════════════════════════════════
# Step 5: Metadata extraction - input: markdown str, output: Dict[str, Any]
emit_progress("metadata", "active", "Analyse du document...")
metadata: Dict[str, Any]
if use_llm:
logger.info(f"[5/10] Extraction métadonnées via {llm_provider.upper()}...")
metadata = extract_metadata(markdown_text, model=llm_model, provider=llm_provider, temperature=llm_temperature)
else:
logger.info("[5/10] Extraction métadonnées (mode basique)...")
metadata = cast(Dict[str, Any], extract_document_metadata_legacy(hierarchy, cast(List[Dict[str, Any]], basic_chunks), doc_name))
# Compléter les métadonnées
metadata["work"] = metadata.get("title") or doc_name
metadata["chunks_count"] = 0 # Sera mis à jour plus tard
title_str = metadata.get("title") or "Métadonnées extraites"
emit_progress("metadata", "completed", title_str[:50])
# ═══════════════════════════════════════════════════════════════════
# ÉTAPE 6 : Extraction TOC via LLM ou Annotations OCR
# ═══════════════════════════════════════════════════════════════════
# Step 6: TOC extraction - input: markdown str, output: List[Dict[str, Any]]
emit_progress("toc", "active", "Extraction de la structure...")
toc: List[Dict[str, Any]] = []
flat_toc: List[Dict[str, Any]] = []
# Branche 1 : OCR avec annotations (analyse markdown pour détecter indentation)
toc_result: Dict[str, Any]
if use_ocr_annotations:
logger.info(f"[6/10] Extraction TOC via analyse markdown (indentation)...")
emit_progress("toc", "active", "Analyse indentation TOC...")
from .toc_extractor_markdown import extract_toc_from_markdown, MarkdownTOCResult
toc_result_typed: MarkdownTOCResult = extract_toc_from_markdown(
markdown_text,
max_lines=300,
)
toc_result = cast(Dict[str, Any], toc_result_typed)
if toc_result.get("success"):
# Succès : utiliser les annotations
metadata_annotated: Dict[str, Any] = toc_result["metadata"]
toc = toc_result["toc"]
flat_toc = toc_result["toc_flat"]
cost += toc_result["cost_ocr_annotated"]
# Enrichir les métadonnées existantes
metadata.update({
"title": metadata_annotated.get("title", metadata.get("title")),
"author": metadata_annotated.get("author", metadata.get("author")),
"languages": metadata_annotated.get("languages", []),
"summary": metadata_annotated.get("summary", ""),
"collection": metadata_annotated.get("collection"),
"publisher": metadata_annotated.get("publisher"),
"year": metadata_annotated.get("year"),
})
emit_progress("toc", "completed", f"{len(flat_toc)} entrées (annotations, +{toc_result['cost_ocr_annotated']:.4f}€)")
logger.info(f"TOC extraite via annotations : {len(flat_toc)} entrées (coût : +{toc_result['cost_ocr_annotated']:.4f}€)")
else:
# Échec : fallback sur extraction LLM classique
error_msg = toc_result.get("error", "Erreur inconnue")
logger.warning(f"Échec annotations OCR ({error_msg}), fallback sur LLM...")
emit_progress("toc", "active", f"Fallback LLM après échec annotations...")
if use_llm:
toc_result = extract_toc(markdown_text, document_title=metadata.get("title"), model=llm_model, provider=llm_provider)
toc = toc_result.get("toc", [])
flat_toc = toc_result.get("flat_toc", [])
else:
toc = metadata.get("toc", [])
flat_toc = toc
emit_progress("toc", "completed", f"{len(flat_toc)} entrées (fallback LLM)")
# Branche 2 : Extraction LLM classique (moins fiable mais moins cher)
elif use_llm:
logger.info(f"[6/10] Extraction TOC via {llm_provider.upper()}...")
toc_result = extract_toc(markdown_text, document_title=metadata.get("title"), model=llm_model, provider=llm_provider)
toc = toc_result.get("toc", [])
flat_toc = toc_result.get("flat_toc", [])
emit_progress("toc", "completed", f"{len(flat_toc)} entrées (LLM)")
# Branche 3 : Mode basique (sans LLM ni annotations)
else:
logger.info("[6/10] Extraction TOC (mode basique)...")
toc = metadata.get("toc", [])
flat_toc = toc
emit_progress("toc", "completed", f"{len(flat_toc)} entrées (basique)")
metadata["toc"] = toc
# ═══════════════════════════════════════════════════════════════════
# ÉTAPE 7 : Aplatir la hiérarchie et classifier les sections
# ═══════════════════════════════════════════════════════════════════
# Aplatir TOUTE la hiérarchie (inclut les sous-sections avec contenu)
def flatten_all_sections(
sections: List[HierarchyNode],
parent_title: Optional[str] = None,
parent_chapter: Optional[str] = None,
) -> List[FlatSection]:
"""Aplatit récursivement toutes les sections avec leur contenu."""
result: List[FlatSection] = []
for section in sections:
title: str = section.get("title") or ""
content: str = section.get("content") or ""
level: int = section.get("level", 1)
# Déterminer le chapitre parent
chapter_title: Optional[str] = parent_chapter
if level == 1:
chapter_title = title
elif level == 2 and parent_title:
chapter_title = parent_title
# Ajouter cette section avec son contenu
flat_section: FlatSection = {
"title": title,
"content": content,
"level": level,
"chapter_title": chapter_title,
"parent_title": parent_title,
}
result.append(flat_section)
# Récursion sur les enfants
children: List[HierarchyNode] = section.get("children", [])
if children:
result.extend(flatten_all_sections(children, title, chapter_title))
return result
# Aplatir toutes les sections
all_sections: List[FlatSection] = flatten_all_sections(hierarchy.get("sections", []))
logger.info(f"Sections aplaties: {len(all_sections)} sections trouvées")
# Préparer pour classification (seulement celles avec contenu significatif)
sections_for_classification: List[FlatSection] = [
s for s in all_sections
if s.get("content") and len(s["content"].strip()) > 30
]
# Step 7: Section classification - input: sections, output: classified sections
emit_progress("classify", "active", f"Analyse de {len(sections_for_classification)} sections...")
classified_sections: List[Dict[str, Any]]
indexable_sections: List[Dict[str, Any]]
if use_llm and sections_for_classification:
logger.info(f"[7/10] Classification des sections via {llm_provider.upper()}...")
classified_sections = classify_sections(
cast(List[Dict[str, Any]], sections_for_classification),
document_title=metadata.get("title"),
model=llm_model,
provider=llm_provider,
)
# Double validation pour détecter les faux positifs (morceaux de TOC)
from .llm_classifier import validate_classified_sections
classified_sections = validate_classified_sections(classified_sections)
indexable_sections = filter_indexable_sections(classified_sections)
else:
logger.info("[7/10] Classification (mode basique)...")
classified_sections = cast(List[Dict[str, Any]], sections_for_classification)
# Par défaut, tout est indexable sauf les sections vides
indexable_sections = cast(List[Dict[str, Any]], [s for s in sections_for_classification if s.get("content")])
emit_progress("classify", "completed", f"{len(indexable_sections)} sections indexables")
logger.info(f"Sections indexables: {len(indexable_sections)} sections")
# ═══════════════════════════════════════════════════════════════════
# ÉTAPE 8 : Chunking sémantique via LLM
# ═══════════════════════════════════════════════════════════════════
# Step 8: Chunking - input: sections, output: List[SemanticChunk]
all_chunks: List[SemanticChunk] = []
chunk_index: int = 0
emit_progress("chunking", "active", "Découpage sémantique..." if use_semantic_chunking else "Découpage basique...")
if use_llm and use_semantic_chunking:
logger.info("[8/10] Chunking sémantique via LLM...")
for section in indexable_sections:
section_title = section.get("title", "")
section_content = section.get("content", "")
section_level = section.get("level", 1)
chapter_title = section.get("chapter_title", section_title)
# Déterminer le sous-chapitre (niveau 2) si on est niveau 3+
subsection_title = None
if section_level >= 2:
subsection_title = section.get("parent_title") or section_title
# Seuil plus bas pour inclure plus de contenu
if not section_content or len(section_content.strip()) < 30:
continue
# Chunker la section avec hiérarchie complète
section_chunks = chunk_section_with_llm(
section_content,
section_title,
chapter_title=chapter_title,
subsection_title=subsection_title,
section_level=section_level,
model=llm_model,
provider=llm_provider,
temperature=llm_temperature,
)
# Ajouter les métadonnées à chaque chunk
for chunk in section_chunks:
# Cast to Dict for modification
chunk_dict = cast(Dict[str, Any], chunk)
chunk_dict["chunk_id"] = f"chunk_{chunk_index:05d}"
chunk_dict["section"] = section_title
chunk_dict["section_level"] = section_level
chunk_dict["chapter_title"] = chapter_title
# Ajouter le sous-chapitre si différent
if subsection_title and subsection_title != chapter_title:
chunk_dict["subsection_title"] = subsection_title
chunk_index += 1
all_chunks.append(cast(SemanticChunk, chunk_dict))
else:
logger.info("[8/10] Chunking (mode basique)...")
# Utiliser les chunks basiques
for i, flat_chunk in enumerate(basic_chunks):
chunk_dict = cast(Dict[str, Any], flat_chunk)
basic_semantic_chunk: SemanticChunk = cast(SemanticChunk, {
"chunk_id": f"chunk_{i:05d}",
"text": chunk_dict.get("text", ""),
"section": chunk_dict.get("title", f"Section {i}"),
"section_level": chunk_dict.get("level", 1),
"type": chunk_dict.get("type", "main_content"),
"concepts": [],
})
all_chunks.append(basic_semantic_chunk)
# ═══════════════════════════════════════════════════════════════════
# ÉTAPE 9 : Nettoyage des chunks
# ═══════════════════════════════════════════════════════════════════
emit_progress("chunking", "completed", f"{len(all_chunks)} chunks générés")
emit_progress("cleaning", "active", "Nettoyage des artefacts...")
logger.info("[9/10] Nettoyage et filtrage des chunks...")
# Step 9: Cleaning - input: chunks, output: List[SemanticChunk]
cleaned_chunks: List[SemanticChunk] = []
for chunk in all_chunks:
# Nettoyer le texte
chunk_dict = cast(Dict[str, Any], chunk)
text: str = chunk_dict.get("text", "")
if clean_chunks and use_llm:
text = clean_chunk(text, use_llm=False) # Nettoyage basique rapide
else:
text = clean_chunk(text, use_llm=False)
# Vérifier validité
if is_chunk_valid(text, min_chars=30, min_words=8):
chunk_dict["text"] = text
cleaned_chunks.append(cast(SemanticChunk, chunk_dict))
emit_progress("cleaning", "completed", f"{len(cleaned_chunks)} chunks valides")
logger.info(f"Chunks après nettoyage: {len(cleaned_chunks)} (sur {len(all_chunks)})")
# Extraire concepts si demandé
if extract_concepts and use_llm and cleaned_chunks:
logger.info(f"Enrichissement avec concepts via {llm_provider.upper()}...")
enriched = enrich_chunks_with_concepts(cast(List[Dict[str, Any]], cleaned_chunks[:50]), model=llm_model, provider=llm_provider) # Limiter
cleaned_chunks = cast(List[SemanticChunk], enriched)
metadata["chunks_count"] = len(cleaned_chunks)
# ═══════════════════════════════════════════════════════════════════
# ÉTAPE 10 : Validation finale
# ═══════════════════════════════════════════════════════════════════
# Step 10: Validation - input: parsed doc, output: ValidationResult
emit_progress("validation", "active", "Vérification de la qualité...")
validation_result: Optional[ValidationResult] = None
parsed_doc: Dict[str, Any] = {
"metadata": metadata,
"toc": toc,
"chunks": cleaned_chunks,
}
if validate_output and use_llm:
logger.info("[10/10] Validation du document...")
validation_result = validate_document(parsed_doc, model=llm_model, provider=llm_provider)
# Appliquer les corrections (ou nettoyer les métadonnées)
parsed_doc = apply_corrections(parsed_doc, cast(Dict[str, Any], validation_result))
metadata = parsed_doc.get("metadata", metadata)
emit_progress("validation", "completed", "Qualité vérifiée")
else:
logger.info("[10/10] Validation (ignorée)...")
# Nettoyer quand même les métadonnées (titre, auteur)
parsed_doc = apply_corrections(parsed_doc, None)
metadata = parsed_doc.get("metadata", metadata)
emit_progress("validation", "skipped", "Non activée")
# ═══════════════════════════════════════════════════════════════════
# Sauvegarder les résultats
# ═══════════════════════════════════════════════════════════════════
# Export data structure
chunks_data: Dict[str, Any] = {
"document": doc_name,
"pages": nb_pages,
"metadata": metadata,
"toc": toc,
"flat_toc": flat_toc if use_llm else [],
"hierarchy": hierarchy,
"classified_sections": classified_sections if use_llm else [],
"chunks": cleaned_chunks,
"validation": validation_result,
"pipeline_version": "2.0",
}
chunks_path.write_text(json.dumps(chunks_data, ensure_ascii=False, indent=2), encoding="utf-8")
# ═══════════════════════════════════════════════════════════════════
# Ingestion Weaviate
# ═══════════════════════════════════════════════════════════════════
# Weaviate ingestion step
weaviate_result: Optional[WeaviateIngestResult] = None
if ingest_to_weaviate:
emit_progress("weaviate", "active", "Vectorisation et stockage...")
logger.info("Ingestion dans Weaviate...")
weaviate_result = ingest_document(
doc_name=doc_name,
chunks=cast(List[Dict[str, Any]], cleaned_chunks),
metadata=metadata,
language=metadata.get("language", "fr"),
)
if weaviate_result.get("success"):
emit_progress("weaviate", "completed", f"{weaviate_result.get('count', 0)} passages insérés")
logger.info(f"Ingestion terminée: {weaviate_result.get('count', 0)} passages")
weaviate_path = doc_output_dir / f"{doc_name}_weaviate.json"
weaviate_path.write_text(json.dumps(weaviate_result, ensure_ascii=False, indent=2), encoding="utf-8")
else:
emit_progress("weaviate", "error", weaviate_result.get('error', 'Erreur'))
logger.warning(f"Erreur ingestion: {weaviate_result.get('error')}")
else:
emit_progress("weaviate", "skipped", "Non activée")
# Résultat final - file paths
files_dict: Dict[str, Any] = {
"markdown": str(md_path),
"chunks": str(chunks_path),
"ocr": str(ocr_path),
}
if weaviate_result and weaviate_result.get("success"):
files_dict["weaviate"] = str(doc_output_dir / f"{doc_name}_weaviate.json")
if not embed_images and images_dir.exists():
image_files: List[Path] = list(images_dir.glob("*.png"))
files_dict["images"] = [str(f) for f in image_files]
files_dict["images_dir"] = str(images_dir)
files: V2PipelineFiles = cast(V2PipelineFiles, files_dict)
# Récupérer le coût LLM si Mistral API utilisée
llm_cost_stats: Optional[LLMCostStats] = get_llm_cost() if llm_provider == "mistral" else None
total_cost: float = cost # Coût OCR
if llm_cost_stats:
total_cost += llm_cost_stats["total_cost"]
logger.info(f"Coût LLM Mistral: {llm_cost_stats['total_cost']:.4f}€ ({llm_cost_stats['calls_count']} appels)")
logger.info(f"[V2] Traitement terminé : {doc_name} - Coût total: {total_cost:.4f}")
return {
"success": True,
"document_name": doc_name,
"output_dir": str(doc_output_dir),
"files": files,
"pages": nb_pages,
"cost_ocr": cost,
"cost_llm": llm_cost_stats["total_cost"] if llm_cost_stats else 0.0,
"cost_total": total_cost,
"llm_stats": llm_cost_stats,
"metadata": metadata,
"toc": toc,
"chunks_count": len(cleaned_chunks),
"validation": validation_result,
"weaviate_ingest": weaviate_result,
"pipeline_version": "2.0",
}
except Exception as e:
logger.error(f"Erreur : {e}")
import traceback
traceback.print_exc()
return {
"success": False,
"document_name": doc_name,
"output_dir": str(doc_output_dir),
"error": str(e),
"pipeline_version": "2.0",
}
# ═══════════════════════════════════════════════════════════════════════════════
# Fonctions legacy pour compatibilité
# ═══════════════════════════════════════════════════════════════════════════════
# Alias pour la fonction legacy
extract_document_metadata = extract_document_metadata_legacy
def process_pdf(
pdf_path: Path,
output_dir: Path = Path("output"),
*,
document_name: Optional[str] = None,
api_key: Optional[str] = None,
use_llm: bool = True,
llm_provider: LLMProvider = "ollama",
llm_model: Optional[str] = None,
llm_base_url: Optional[str] = None,
llm_temperature: float = 0.2,
llm_max_chars: int = 8000,
llm_timeout: int = 300,
embed_images: bool = False,
ingest_to_weaviate: bool = True,
use_v2_pipeline: bool = True,
skip_ocr: bool = False,
use_ocr_annotations: bool = False,
max_toc_pages: int = 8,
use_semantic_chunking: bool = False,
progress_callback: OptionalProgressCallback = None,
) -> Union[V2PipelineResult, V1PipelineResult]:
"""Main entry point for PDF processing pipeline.
This is the unified API for processing philosophical and academic PDF documents.
By default, it uses the V2 pipeline with intelligent LLM-based extraction.
The V1 pipeline is available for backwards compatibility via use_v2_pipeline=False.
The function automatically selects the appropriate pipeline version and delegates
to either process_pdf_v2() or _process_pdf_v1() based on the use_v2_pipeline flag.
Args:
pdf_path: Path to the PDF file to process. Must exist and be readable.
output_dir: Base output directory. A subdirectory named after the document
will be created for all output files. Defaults to "output".
document_name: Override the document name used for output files. If None,
the name is derived from the PDF filename (stem).
api_key: Mistral API key for OCR. If None, reads from MISTRAL_API_KEY
environment variable.
use_llm: Enable LLM processing for metadata extraction, TOC extraction,
section classification, chunking, and validation. Set to False for
basic processing without LLM intelligence. Defaults to True.
llm_provider: LLM provider to use for intelligent processing.
"ollama" uses a local Ollama server (free, slower).
"mistral" uses the Mistral API (paid, faster).
Defaults to "ollama".
llm_model: Specific model name to use. If None, uses provider defaults:
- Ollama: "qwen2.5:7b"
- Mistral: "mistral-small"
llm_base_url: Base URL for Ollama server. Only used when llm_provider="ollama".
Defaults to "http://localhost:11434".
llm_temperature: Sampling temperature for LLM calls. Lower values (0.1-0.3)
produce more deterministic, focused output. Defaults to 0.2.
llm_max_chars: Maximum characters to send to LLM per call. V1 pipeline only.
Defaults to 8000.
llm_timeout: Timeout in seconds for LLM calls. V1 pipeline only.
Defaults to 300.
embed_images: If True, embed images as base64 data URIs in the markdown.
If False, save images as separate files in an images/ subdirectory.
Defaults to False.
ingest_to_weaviate: Insert processed chunks into the Weaviate vector database
for semantic search. Requires Weaviate to be running locally via Docker.
Defaults to True.
use_v2_pipeline: Use the V2 pipeline with intelligent processing. Set to False
to use the legacy V1 pipeline. Defaults to True.
skip_ocr: Skip OCR and reuse existing markdown from a previous run.
Useful for re-processing without incurring additional OCR costs.
The markdown file must exist at output/<doc_name>/<doc_name>.md.
V2 pipeline only. Defaults to False.
use_ocr_annotations: Use OCR with annotations for more reliable TOC extraction.
Annotations provide visual structure hints but cost approximately 3x more
than standard OCR (~0.009 EUR/page vs ~0.003 EUR/page).
V2 pipeline only. Defaults to False.
max_toc_pages: Maximum number of pages to analyze for TOC extraction.
Mistral API limits annotations to 8 pages maximum.
V2 pipeline only. Defaults to 8.
use_semantic_chunking: Use LLM-based semantic chunking to identify
argumentative units. More accurate but slower than paragraph-based splitting.
V2 pipeline only. Defaults to False.
progress_callback: Optional callback function for real-time progress updates.
Signature: ``callback(step_id: str, status: str, detail: str | None)``.
step_id values: ocr, markdown, metadata, toc, classify, chunking,
cleaning, validation, weaviate.
status values: active, completed, error, skipped.
V2 pipeline only.
Returns:
Dictionary containing processing results. The exact structure depends on
the pipeline version used:
V2 Pipeline (use_v2_pipeline=True) returns V2PipelineResult:
- success (bool): Whether processing completed successfully.
- document_name (str): Name of the processed document.
- output_dir (str): Path to the output directory.
- files (dict): Paths to generated files.
- pages (int): Number of pages in the PDF.
- cost_ocr (float): OCR cost in euros.
- cost_llm (float): LLM cost in euros.
- cost_total (float): Total processing cost.
- metadata (dict): Extracted document metadata.
- toc (list): Hierarchical table of contents.
- chunks_count (int): Number of chunks generated.
- validation (dict | None): Validation results.
- weaviate_ingest (dict | None): Weaviate ingestion results.
- pipeline_version (str): "2.0"
- error (str): Error message if success is False.
V1 Pipeline (use_v2_pipeline=False) returns V1PipelineResult:
- success (bool): Whether processing completed successfully.
- document_name (str): Name of the processed document.
- output_dir (str): Path to the output directory.
- files (dict): Paths to generated files.
- pages (int): Number of pages in the PDF.
- cost (float): OCR cost in euros.
- metadata (dict): Extracted document metadata.
- chunks_count (int): Number of chunks generated.
- weaviate_ingest (dict | None): Weaviate ingestion results.
- pipeline_version (str): "1.0"
- error (str): Error message if success is False.
Raises:
FileNotFoundError: If pdf_path does not exist (and skip_ocr is False).
ValueError: If skip_ocr is True but no cached markdown exists.
Example:
Basic processing with local Ollama::
from pathlib import Path
from utils.pdf_pipeline import process_pdf
result = process_pdf(
Path("input/philosophy_book.pdf"),
use_llm=True,
llm_provider="ollama",
)
if result["success"]:
print(f"Processed: {result['document_name']}")
print(f"Chunks: {result['chunks_count']}")
Production processing with Mistral API::
result = process_pdf(
Path("input/kant_critique.pdf"),
llm_provider="mistral",
use_semantic_chunking=True,
use_ocr_annotations=True,
ingest_to_weaviate=True,
)
print(f"OCR cost: {result['cost_ocr']:.4f} EUR")
print(f"LLM cost: {result['cost_llm']:.4f} EUR")
print(f"Total: {result['cost_total']:.4f} EUR")
Re-processing without OCR cost::
result = process_pdf(
Path("input/existing_document.pdf"),
skip_ocr=True, # Reuse cached markdown
use_semantic_chunking=True,
)
Legacy V1 pipeline::
result = process_pdf(
Path("input/document.pdf"),
use_v2_pipeline=False, # Use V1 pipeline
)
Note:
**Cost Implications:**
- OCR: ~0.003 EUR/page (standard), ~0.009 EUR/page (with annotations)
- LLM (Mistral API): Variable based on token usage
- LLM (Ollama): Free (local processing)
Use ``skip_ocr=True`` when re-processing documents to avoid OCR costs.
**Pipeline Comparison:**
- V2 (default): Intelligent 10-step pipeline with LLM-based extraction.
Better metadata, TOC, and semantic chunking. Recommended for production.
- V1 (legacy): Basic pipeline with simple hierarchy parsing.
Faster but less accurate. Kept for backwards compatibility.
See Also:
- :func:`process_pdf_v2`: Direct access to V2 pipeline.
- :func:`process_pdf_bytes`: Process PDF from bytes (for Flask uploads).
- :mod:`utils.types`: Type definitions for result dictionaries.
"""
if use_v2_pipeline:
return process_pdf_v2(
pdf_path,
output_dir,
document_name=document_name,
api_key=api_key,
use_llm=use_llm,
llm_provider=llm_provider,
llm_model=llm_model,
llm_base_url=llm_base_url,
llm_temperature=llm_temperature,
embed_images=embed_images,
ingest_to_weaviate=ingest_to_weaviate,
skip_ocr=skip_ocr,
use_ocr_annotations=use_ocr_annotations,
max_toc_pages=max_toc_pages,
use_semantic_chunking=use_semantic_chunking,
progress_callback=progress_callback,
)
else:
# Pipeline V1 legacy
return _process_pdf_v1(
pdf_path,
output_dir,
document_name=document_name,
api_key=api_key,
use_llm=use_llm,
llm_model=llm_model,
llm_base_url=llm_base_url,
llm_temperature=llm_temperature,
llm_max_chars=llm_max_chars,
llm_timeout=llm_timeout,
embed_images=embed_images,
ingest_to_weaviate=ingest_to_weaviate,
)
def _process_pdf_v1(
pdf_path: Path,
output_dir: Path = Path("output"),
*,
document_name: Optional[str] = None,
api_key: Optional[str] = None,
use_llm: bool = True,
llm_model: Optional[str] = None,
llm_base_url: Optional[str] = None,
llm_temperature: float = 0.2,
llm_max_chars: int = 8000,
llm_timeout: int = 300,
embed_images: bool = False,
ingest_to_weaviate: bool = True,
) -> V1PipelineResult:
"""Pipeline V1 (legacy) - gardé pour compatibilité.
Args:
pdf_path: Chemin du fichier PDF.
output_dir: Dossier de sortie de base.
document_name: Nom du document (optionnel).
api_key: Clé API Mistral.
use_llm: Activer les traitements LLM.
llm_model: Modèle Ollama/Mistral.
llm_base_url: URL Ollama.
llm_temperature: Température LLM.
llm_max_chars: Nombre maximum de caractères à traiter.
llm_timeout: Timeout en secondes pour les appels LLM.
embed_images: Intégrer images en base64.
ingest_to_weaviate: Insérer dans Weaviate.
Returns:
Résultat du pipeline V1 au format V1PipelineResult.
"""
pdf_path = Path(pdf_path).resolve()
if not pdf_path.is_file():
return {"success": False, "error": f"Fichier PDF introuvable : {pdf_path}"}
# Detect file type by extension
file_ext: str = pdf_path.suffix.lower()
is_markdown_file: bool = file_ext == ".md"
doc_name: str = document_name or pdf_path.stem
if doc_name.lower().endswith(".pdf"):
doc_name = doc_name[:-4]
elif doc_name.lower().endswith(".md"):
doc_name = doc_name[:-3]
doc_output_dir: Path = Path(output_dir).resolve() / doc_name
doc_output_dir.mkdir(parents=True, exist_ok=True)
md_path: Path = doc_output_dir / f"{doc_name}.md"
chunks_path: Path = doc_output_dir / f"{doc_name}_chunks.json"
structured_path: Path = doc_output_dir / f"{doc_name}_structured.json"
ocr_path: Path = doc_output_dir / f"{doc_name}_ocr.json"
images_dir: Path = doc_output_dir / "images"
logger.info(f"[V1] Traitement de : {pdf_path}")
try:
client = create_client(api_key)
pdf_bytes: bytes = pdf_path.read_bytes()
ocr_response = run_ocr(client, pdf_bytes, pdf_path.name, include_images=True)
nb_pages: int = len(ocr_response.pages)
cost: float = estimate_ocr_cost(nb_pages)
ocr_json: Dict[str, Any] = serialize_ocr_response(ocr_response)
ocr_path.write_text(json.dumps(ocr_json, ensure_ascii=False, indent=2), encoding="utf-8")
if not embed_images:
images_dir.mkdir(parents=True, exist_ok=True)
image_writer = create_image_writer(images_dir)
else:
image_writer = None
markdown_text: str = build_markdown(ocr_response, embed_images=embed_images, image_writer=image_writer)
md_path.write_text(markdown_text, encoding="utf-8")
hierarchy: DocumentHierarchy = build_hierarchy(markdown_text)
chunks: List[Dict[str, Any]] = cast(List[Dict[str, Any]], flatten_hierarchy(hierarchy))
metadata: LegacyMetadata = extract_document_metadata_legacy(hierarchy, chunks, doc_name)
chunks_data: Dict[str, Any] = {
"document": doc_name,
"pages": nb_pages,
"metadata": metadata,
"hierarchy": hierarchy,
"chunks": chunks,
"pipeline_version": "1.0",
}
chunks_path.write_text(json.dumps(chunks_data, ensure_ascii=False, indent=2), encoding="utf-8")
structured_data_typed: Optional[LLMStructuredResult] = None
structured_data: Optional[Dict[str, Any]] = None
if use_llm:
try:
structured_data_typed = structure_with_llm(
markdown_text, cast(Dict[str, Any], hierarchy),
model=llm_model, base_url=llm_base_url,
temperature=llm_temperature, max_chars=llm_max_chars, timeout=llm_timeout,
)
structured_data = cast(Dict[str, Any], structured_data_typed)
structured_data["llm_model"] = llm_model or "qwen2.5:7b"
structured_path.write_text(json.dumps(structured_data, ensure_ascii=False, indent=2), encoding="utf-8")
except LLMStructureError as e:
logger.warning(f"Erreur LLM : {e}")
structured_data = {"error": str(e)}
weaviate_result: Optional[WeaviateIngestResult] = None
if ingest_to_weaviate:
weaviate_result = ingest_document(doc_name=doc_name, chunks=chunks, metadata=cast(Dict[str, Any], metadata), language="fr")
if weaviate_result.get("success"):
weaviate_path: Path = doc_output_dir / f"{doc_name}_weaviate.json"
weaviate_path.write_text(json.dumps(weaviate_result, ensure_ascii=False, indent=2), encoding="utf-8")
files: Dict[str, Any] = {"markdown": str(md_path), "chunks": str(chunks_path), "ocr": str(ocr_path)}
if use_llm and structured_data:
files["structured"] = str(structured_path)
if weaviate_result and weaviate_result.get("success"):
files["weaviate"] = str(doc_output_dir / f"{doc_name}_weaviate.json")
if not embed_images and images_dir.exists():
files["images"] = [str(f) for f in images_dir.glob("*.png")]
files["images_dir"] = str(images_dir)
return {
"success": True,
"document_name": doc_name,
"output_dir": str(doc_output_dir),
"files": files,
"pages": nb_pages,
"cost": cost,
"metadata": metadata,
"chunks_count": len(chunks),
"weaviate_ingest": weaviate_result,
"pipeline_version": "1.0",
}
except Exception as e:
logger.error(f"Erreur : {e}")
return {"success": False, "document_name": doc_name, "output_dir": str(doc_output_dir), "error": str(e)}
def process_pdf_bytes(
file_bytes: bytes,
filename: str,
output_dir: Path = Path("output"),
progress_callback: OptionalProgressCallback = None,
llm_provider: LLMProvider = "ollama",
**kwargs: Any,
) -> Union[V2PipelineResult, V1PipelineResult]:
"""Process a PDF document from raw bytes.
This function is designed for web application integrations where PDFs are
received as file uploads (e.g., Flask request.files). It creates a temporary
file, processes it through the main pipeline, and cleans up afterward.
The function delegates to process_pdf() internally, supporting all the same
processing options via **kwargs.
Args:
file_bytes: Raw bytes content of the PDF file. Typically obtained from
a file upload in a web framework (e.g., request.files['pdf'].read()).
filename: Original filename of the uploaded PDF. Used to derive the
document name for output files. Should include the .pdf extension.
output_dir: Base output directory. A subdirectory named after the document
will be created for all output files. Defaults to "output".
progress_callback: Optional callback function for real-time progress updates.
Signature: ``callback(step_id: str, status: str, detail: str | None)``.
step_id values: ocr, markdown, metadata, toc, classify, chunking,
cleaning, validation, weaviate.
status values: active, completed, error, skipped.
Used for Server-Sent Events (SSE) in web interfaces.
llm_provider: LLM provider to use for intelligent processing.
"ollama" uses a local Ollama server (free, slower).
"mistral" uses the Mistral API (paid, faster).
Defaults to "ollama".
**kwargs: Additional keyword arguments passed directly to process_pdf().
Common options include:
- use_llm (bool): Enable LLM processing. Defaults to True.
- use_semantic_chunking (bool): Use LLM-based chunking. Defaults to False.
- use_ocr_annotations (bool): Use OCR annotations. Defaults to False.
- ingest_to_weaviate (bool): Insert into Weaviate. Defaults to True.
- skip_ocr (bool): Reuse cached markdown. Defaults to False.
See process_pdf() for the complete list of supported options.
Returns:
Dictionary containing processing results from process_pdf(). The structure
depends on whether V2 or V1 pipeline is used (V2 by default).
V2 Pipeline returns V2PipelineResult:
- success (bool): Whether processing completed successfully.
- document_name (str): Name derived from the uploaded filename.
- output_dir (str): Path to the output directory.
- files (dict): Paths to generated files.
- pages (int): Number of pages in the PDF.
- cost_ocr (float): OCR cost in euros.
- cost_llm (float): LLM cost in euros.
- cost_total (float): Total processing cost.
- metadata (dict): Extracted document metadata.
- toc (list): Hierarchical table of contents.
- chunks_count (int): Number of chunks generated.
- validation (dict | None): Validation results.
- weaviate_ingest (dict | None): Weaviate ingestion results.
- pipeline_version (str): "2.0"
- error (str): Error message if success is False.
Raises:
No exceptions are raised directly. Errors are returned in the result
dictionary with success=False and an error message.
Example:
Flask file upload handler::
from flask import request
from utils.pdf_pipeline import process_pdf_bytes
@app.route('/upload', methods=['POST'])
def upload_pdf():
pdf_file = request.files['document']
file_bytes = pdf_file.read()
result = process_pdf_bytes(
file_bytes,
pdf_file.filename,
llm_provider="ollama",
use_semantic_chunking=True,
)
if result["success"]:
return jsonify({
"document": result["document_name"],
"chunks": result["chunks_count"],
})
else:
return jsonify({"error": result["error"]}), 500
With SSE progress callback::
def emit_progress(step_id: str, status: str, detail: str | None) -> None:
# Send SSE event to client
sse.publish({"step": step_id, "status": status, "detail": detail})
result = process_pdf_bytes(
file_bytes,
filename,
progress_callback=emit_progress,
llm_provider="mistral",
)
Note:
- The function creates a temporary file on disk during processing.
The temporary file is automatically deleted after processing completes,
even if an error occurs.
- For large files, ensure adequate disk space in the system temp directory.
- The document name is derived from the filename stem (without extension).
See Also:
- :func:`process_pdf`: Main processing function called internally.
- :func:`process_pdf_v2`: Direct access to V2 pipeline.
"""
import tempfile
original_name = Path(filename).stem
file_ext = Path(filename).suffix.lower() # Get original extension (.pdf or .md)
# Use appropriate suffix for temp file to ensure correct processing
with tempfile.NamedTemporaryFile(suffix=file_ext, delete=False) as tmp:
tmp.write(file_bytes)
tmp_path = Path(tmp.name)
try:
result = process_pdf(
tmp_path,
output_dir,
document_name=original_name,
progress_callback=progress_callback,
llm_provider=llm_provider,
**kwargs
)
return result
finally:
try:
tmp_path.unlink()
except Exception:
pass