diff --git a/.gitignore b/.gitignore index 49880cf..7e3a70b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # Agent-generated output directories -generations/ +generations/* +!generations/library_rag/ # Log files logs/ diff --git a/generations/library_rag/flask_app.py b/generations/library_rag/flask_app.py new file mode 100644 index 0000000..f754d8f --- /dev/null +++ b/generations/library_rag/flask_app.py @@ -0,0 +1,2039 @@ +"""Flask web application for Library RAG - Philosophical Text Search. + +This module provides a web interface for the Library RAG application, enabling +users to upload PDF documents, process them through the OCR/LLM pipeline, and +perform semantic searches on the indexed philosophical texts stored in Weaviate. + +Architecture: + The application is built on Flask and connects to a local Weaviate instance + for vector storage and semantic search. PDF processing is handled asynchronously + using background threads with Server-Sent Events (SSE) for real-time progress. + +Routes: + - ``/`` : Home page with collection statistics (passages, authors, works) + - ``/passages`` : Paginated list of all passages with author/work filters + - ``/search`` : Semantic search interface using vector similarity + - ``/upload`` : PDF upload form with processing options + - ``/upload/progress/`` : SSE endpoint for real-time processing updates + - ``/upload/status/`` : JSON endpoint to check job status + - ``/documents`` : List of all processed documents + - ``/documents//view`` : Detailed view of a processed document + - ``/documents/delete/`` : Delete a document and its Weaviate data + - ``/output/`` : Static file server for processed outputs + +SSE Implementation: + The upload progress system uses Server-Sent Events to stream real-time + processing updates to the browser. Each processing step emits events:: + + {"type": "step", "step": "OCR", "status": "running", "detail": "Page 1/10"} + {"type": "complete", "redirect": "/documents/doc_name/view"} + {"type": "error", "message": "OCR failed"} + + The SSE endpoint includes keep-alive messages every 30 seconds to maintain + the connection and detect stale jobs. + +Weaviate Connection: + The application uses a context manager ``get_weaviate_client()`` to handle + Weaviate connections. This ensures proper cleanup of connections even when + errors occur. The client connects to localhost:8080 (HTTP) and localhost:50051 + (gRPC) by default. + +Configuration: + - ``SECRET_KEY`` : Flask session secret (set via environment variable) + - ``UPLOAD_FOLDER`` : Directory for processed PDF outputs (default: ./output) + - ``MAX_CONTENT_LENGTH`` : Maximum upload size (default: 50MB) + +Example: + Start the application in development mode:: + + $ python flask_app.py + + Or with production settings:: + + $ export SECRET_KEY="your-production-secret" + $ gunicorn -w 4 flask_app:app + + Access the web interface at http://localhost:5000 + +Dependencies: + - Flask 3.0+ for web framework + - Weaviate Python client for vector database + - utils.pdf_pipeline for PDF processing + - utils.weaviate_ingest for database operations + +See Also: + - ``utils/pdf_pipeline.py`` : PDF processing pipeline + - ``utils/weaviate_ingest.py`` : Weaviate ingestion functions + - ``schema.py`` : Weaviate collection schemas +""" + +import os +import json +import uuid +import threading +import queue +import time +from pathlib import Path +from typing import Any, Dict, Generator, Iterator, List, Optional, Union + +from flask import Flask, render_template, request, jsonify, redirect, url_for, send_from_directory, Response, flash +from contextlib import contextmanager +from werkzeug.utils import secure_filename +from werkzeug.wrappers import Response as WerkzeugResponse +import weaviate +import weaviate.classes.query as wvq + +from utils.types import ( + CollectionStats, + ProcessingOptions, + SSEEvent, +) + +app = Flask(__name__) + +# Configuration Flask +app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-production") + +# Configuration upload +app.config["UPLOAD_FOLDER"] = Path(__file__).parent / "output" +app.config["MAX_CONTENT_LENGTH"] = 50 * 1024 * 1024 # 50 MB max +ALLOWED_EXTENSIONS = {"pdf", "md"} + +# Stockage des jobs de traitement en cours +processing_jobs: Dict[str, Dict[str, Any]] = {} # {job_id: {"status": str, "queue": Queue, "result": dict}} + +# Stockage des sessions de chat en cours +chat_sessions: Dict[str, Dict[str, Any]] = {} # {session_id: {"status": str, "queue": Queue, "context": list}} + +# ═══════════════════════════════════════════════════════════════════════════════ +# Weaviate Connection +# ═══════════════════════════════════════════════════════════════════════════════ + +@contextmanager +def get_weaviate_client() -> Generator[Optional[weaviate.WeaviateClient], None, None]: + """Context manager for Weaviate connection. + + Yields: + WeaviateClient if connection succeeds, None otherwise. + """ + client: Optional[weaviate.WeaviateClient] = None + try: + client = weaviate.connect_to_local( + host="localhost", + port=8080, + grpc_port=50051, + ) + yield client + except Exception as e: + print(f"Erreur connexion Weaviate: {e}") + yield None + finally: + if client: + client.close() + + +def get_collection_stats() -> Optional[CollectionStats]: + """Get statistics about Weaviate collections. + + Returns: + CollectionStats with passage counts and unique values, or None on error. + """ + try: + with get_weaviate_client() as client: + if client is None: + return None + + stats: CollectionStats = {} + + # Chunk stats (renamed from Passage) + passages = client.collections.get("Chunk") + passage_count = passages.aggregate.over_all(total_count=True) + stats["passages"] = passage_count.total_count or 0 + + # Get unique authors and works (from nested objects) + all_passages = passages.query.fetch_objects(limit=1000) + authors: set[str] = set() + works: set[str] = set() + languages: set[str] = set() + + for obj in all_passages.objects: + # Work is now a nested object with {title, author} + work_obj = obj.properties.get("work") + if work_obj and isinstance(work_obj, dict): + if work_obj.get("author"): + authors.add(str(work_obj["author"])) + if work_obj.get("title"): + works.add(str(work_obj["title"])) + if obj.properties.get("language"): + languages.add(str(obj.properties["language"])) + + stats["authors"] = len(authors) + stats["works"] = len(works) + stats["languages"] = len(languages) + stats["author_list"] = sorted(authors) + stats["work_list"] = sorted(works) + stats["language_list"] = sorted(languages) + + return stats + except Exception as e: + print(f"Erreur stats: {e}") + return None + + +def get_all_passages( + limit: int = 50, + offset: int = 0, +) -> List[Dict[str, Any]]: + """Fetch all passages with pagination. + + Args: + limit: Maximum number of passages to return. + offset: Number of passages to skip (for pagination). + + Returns: + List of passage dictionaries with uuid and properties. + + Note: + Author/work filters are disabled due to Weaviate 1.34.4 limitation: + nested object filtering is not yet supported (GitHub issue #3694). + """ + try: + with get_weaviate_client() as client: + if client is None: + return [] + + chunks = client.collections.get("Chunk") + + result = chunks.query.fetch_objects( + limit=limit, + offset=offset, + return_properties=[ + "text", "sectionPath", "sectionLevel", "chapterTitle", + "canonicalReference", "unitType", "keywords", "orderIndex", "language" + ], + ) + + return [ + { + "uuid": str(obj.uuid), + **obj.properties + } + for obj in result.objects + ] + except Exception as e: + print(f"Erreur passages: {e}") + return [] + + +def search_passages( + query: str, + limit: int = 10, + author_filter: Optional[str] = None, + work_filter: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Semantic search on passages using vector similarity. + + Args: + query: Search query text. + limit: Maximum number of results to return. + author_filter: Filter by author name (uses workAuthor property). + work_filter: Filter by work title (uses workTitle property). + + Returns: + List of passage dictionaries with uuid, similarity, and properties. + """ + try: + with get_weaviate_client() as client: + if client is None: + return [] + + chunks = client.collections.get("Chunk") + + # Build filters using top-level properties (workAuthor, workTitle) + filters: Optional[Any] = None + if author_filter: + filters = wvq.Filter.by_property("workAuthor").equal(author_filter) + if work_filter: + work_filter_obj = wvq.Filter.by_property("workTitle").equal(work_filter) + filters = filters & work_filter_obj if filters else work_filter_obj + + result = chunks.query.near_text( + query=query, + limit=limit, + filters=filters, + return_metadata=wvq.MetadataQuery(distance=True), + return_properties=[ + "text", "sectionPath", "sectionLevel", "chapterTitle", + "canonicalReference", "unitType", "keywords", "orderIndex", "language" + ], + ) + + return [ + { + "uuid": str(obj.uuid), + "distance": obj.metadata.distance if obj.metadata else None, + "similarity": round((1 - obj.metadata.distance) * 100, 1) if obj.metadata and obj.metadata.distance else None, + **obj.properties + } + for obj in result.objects + ] + except Exception as e: + print(f"Erreur recherche: {e}") + return [] + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Routes +# ═══════════════════════════════════════════════════════════════════════════════ + +@app.route("/") +def index() -> str: + """Render the home page with collection statistics. + + Displays an overview of the Library RAG application with statistics about + indexed passages, works, authors, and supported languages from Weaviate. + + Returns: + Rendered HTML template (index.html) with collection statistics including: + - Total passage count + - Number of unique authors and works + - List of available languages + + Note: + If Weaviate connection fails, stats will be None and the template + should handle displaying an appropriate fallback message. + """ + from utils.types import CollectionStats + stats: Optional[CollectionStats] = get_collection_stats() + return render_template("index.html", stats=stats) + + +@app.route("/passages") +def passages() -> str: + """Render the passages list page with pagination and filtering. + + Displays a paginated list of all indexed passages from Weaviate with optional + filtering by author and/or work title. Includes statistics and filter options + in the sidebar. + + Query Parameters: + page (int): Page number for pagination. Defaults to 1. + per_page (int): Number of passages per page. Defaults to 20. + author (str, optional): Filter passages by author name. + work (str, optional): Filter passages by work title. + + Returns: + Rendered HTML template (passages.html) with: + - List of passages for the current page + - Collection statistics for sidebar filters + - Pagination controls + - Current filter state + + Example: + GET /passages?page=2&per_page=50&author=Platon + Returns page 2 with 50 passages per page, filtered by author "Platon". + """ + page: int = request.args.get("page", 1, type=int) + per_page: int = request.args.get("per_page", 20, type=int) + author: Optional[str] = request.args.get("author", None) + work: Optional[str] = request.args.get("work", None) + + # Clean filters + if author == "": + author = None + if work == "": + work = None + + offset: int = (page - 1) * per_page + + from utils.types import CollectionStats + stats: Optional[CollectionStats] = get_collection_stats() + passages_list: List[Dict[str, Any]] = get_all_passages( + limit=per_page, + offset=offset, + ) + + return render_template( + "passages.html", + chunks=passages_list, + stats=stats, + page=page, + per_page=per_page, + author_filter=author, + work_filter=work, + ) + + +@app.route("/search") +def search() -> str: + """Render the semantic search page with vector similarity results. + + Provides a search interface for finding passages using semantic similarity + via Weaviate's near_text query. Results include similarity scores and can + be filtered by author and/or work. + + Query Parameters: + q (str): Search query text. Empty string shows no results. + limit (int): Maximum number of results to return. Defaults to 10. + author (str, optional): Filter results by author name. + work (str, optional): Filter results by work title. + + Returns: + Rendered HTML template (search.html) with: + - Search form with current query + - List of matching passages with similarity percentages + - Collection statistics for filter dropdowns + - Current filter state + + Example: + GET /search?q=la%20mort%20et%20le%20temps&limit=5&author=Heidegger + Returns top 5 semantically similar passages about death and time + by Heidegger. + """ + query: str = request.args.get("q", "") + limit: int = request.args.get("limit", 10, type=int) + author: Optional[str] = request.args.get("author", None) + work: Optional[str] = request.args.get("work", None) + + # Clean filters + if author == "": + author = None + if work == "": + work = None + + from utils.types import CollectionStats + stats: Optional[CollectionStats] = get_collection_stats() + results: List[Dict[str, Any]] = [] + + if query: + results = search_passages( + query=query, + limit=limit, + author_filter=author, + work_filter=work, + ) + + return render_template( + "search.html", + query=query, + results=results, + stats=stats, + limit=limit, + author_filter=author, + work_filter=work, + ) + + +def rag_search(query: str, limit: int = 5) -> List[Dict[str, Any]]: + """Search passages for RAG context with formatted results. + + Wraps the existing search_passages() function but returns results formatted + specifically for RAG prompt construction. Includes author, work, and section + information needed to build context for LLM generation. + + Args: + query: The user's question or search query. + limit: Maximum number of context chunks to retrieve. Defaults to 5. + + Returns: + List of context dictionaries with keys: + - text (str): The passage text content + - author (str): Author name (from workAuthor) + - work (str): Work title (from workTitle) + - section (str): Section path or chapter title + - similarity (float): Similarity score 0-100 + - uuid (str): Weaviate chunk UUID + + Example: + >>> results = rag_search("Qu'est-ce que la vertu ?", limit=3) + >>> results[0]["author"] + 'Platon' + >>> results[0]["work"] + 'République' + """ + import time + start_time = time.time() + + try: + with get_weaviate_client() as client: + if client is None: + print("[RAG Search] Weaviate client unavailable") + return [] + + chunks = client.collections.get("Chunk") + + # Query with properties needed for RAG context + result = chunks.query.near_text( + query=query, + limit=limit, + return_metadata=wvq.MetadataQuery(distance=True), + return_properties=[ + "text", + "workAuthor", # Top-level author property + "workTitle", # Top-level work property + "sectionPath", + "chapterTitle", + "canonicalReference", + ], + ) + + # Format results for RAG prompt construction + formatted_results = [] + for obj in result.objects: + props = obj.properties + similarity = round((1 - obj.metadata.distance) * 100, 1) if obj.metadata and obj.metadata.distance else 0.0 + + formatted_results.append({ + "text": props.get("text", ""), + "author": props.get("workAuthor", "Auteur inconnu"), + "work": props.get("workTitle", "Œuvre inconnue"), + "section": props.get("sectionPath") or props.get("chapterTitle") or "Section inconnue", + "similarity": similarity, + "uuid": str(obj.uuid), + }) + + # Log search metrics + elapsed = time.time() - start_time + print(f"[RAG Search] Query: '{query[:50]}...' | Results: {len(formatted_results)} | Time: {elapsed:.2f}s") + + return formatted_results + + except Exception as e: + print(f"[RAG Search] Error: {e}") + return [] + + +def diverse_author_search( + query: str, + limit: int = 10, + initial_pool: int = 100, + max_authors: int = 5, + chunks_per_author: int = 2 +) -> List[Dict[str, Any]]: + """Search passages with author diversity to avoid corpus imbalance bias. + + This function addresses the problem where prolific authors (e.g., Peirce with + 300 works) dominate search results over less represented but equally relevant + authors (e.g., Tiercelin with 1 work). + + Algorithm: + 1. Retrieve large initial pool of chunks (e.g., 100) + 2. Group chunks by author + 3. Compute average similarity score of top-3 chunks per author + 4. Select top-N authors by average score + 5. Extract best chunks from each selected author + 6. Return diversified chunk list + + Args: + query: The user's question or search query. + limit: Maximum number of chunks to return (default: 10). + initial_pool: Size of initial candidate pool (default: 100). + max_authors: Maximum number of distinct authors to include (default: 5). + chunks_per_author: Number of chunks per selected author (default: 2). + + Returns: + List of context dictionaries with keys: + - text (str): The passage text content + - author (str): Author name (from workAuthor) + - work (str): Work title (from workTitle) + - section (str): Section path or chapter title + - similarity (float): Similarity score 0-100 + - uuid (str): Weaviate chunk UUID + + Example: + >>> results = diverse_author_search("Scotus et Peirce", limit=10) + >>> authors = set(r["author"] for r in results) + >>> len(authors) # Multiple authors guaranteed + 5 + >>> [r["author"] for r in results].count("Peirce") # Max chunks_per_author + 2 + + Note: + This prevents a single prolific author from dominating all results. + For "Scotus et Peirce", ensures results from Peirce, Tiercelin, Scotus, + Boler, and other relevant commentators. + """ + import time + start_time = time.time() + + print(f"[Diverse Search] CALLED with query='{query[:50]}...', initial_pool={initial_pool}, max_authors={max_authors}, chunks_per_author={chunks_per_author}") + + try: + # Step 1: Retrieve large initial pool + print(f"[Diverse Search] Calling rag_search with limit={initial_pool}") + candidates = rag_search(query, limit=initial_pool) + print(f"[Diverse Search] rag_search returned {len(candidates)} candidates") + + if not candidates: + print("[Diverse Search] No candidates found, returning empty list") + return [] + + # Step 2: Group chunks by author + by_author: Dict[str, List[Dict[str, Any]]] = {} + for chunk in candidates: + author = chunk.get("author", "Auteur inconnu") + if author not in by_author: + by_author[author] = [] + by_author[author].append(chunk) + + print(f"[Diverse Search] Found {len(by_author)} distinct authors in pool of {len(candidates)} chunks") + + # Step 3: Compute average similarity of top-3 chunks per author + author_scores: Dict[str, float] = {} + for author, chunks in by_author.items(): + # Sort by similarity descending + sorted_chunks = sorted(chunks, key=lambda x: x["similarity"], reverse=True) + # Take top-3 (or all if fewer than 3) + top_chunks = sorted_chunks[:3] + # Average similarity + avg_score = sum(c["similarity"] for c in top_chunks) / len(top_chunks) + author_scores[author] = avg_score + + # Step 4: Select top-N authors by average score + top_authors = sorted(author_scores.items(), key=lambda x: x[1], reverse=True)[:max_authors] + + print(f"[Diverse Search] Top {len(top_authors)} authors: {[author for author, score in top_authors]}") + for author, score in top_authors: + print(f" - {author}: avg_score={score:.1f}%, {len(by_author[author])} chunks in pool") + + # Step 5: Extract best chunks from each selected author + # SMART ALLOCATION: If only 1-2 authors, take more chunks per author to reach target limit + num_authors = len(top_authors) + if num_authors == 1: + # Only one author: take up to 'limit' chunks from that author + adaptive_chunks_per_author = limit + print(f"[Diverse Search] Only 1 author found → taking up to {adaptive_chunks_per_author} chunks") + elif num_authors <= 3: + # Few authors (2-3): take more chunks per author + adaptive_chunks_per_author = max(chunks_per_author, limit // num_authors) + print(f"[Diverse Search] Only {num_authors} authors → taking up to {adaptive_chunks_per_author} chunks per author") + else: + # Many authors (4+): stick to original limit for diversity + adaptive_chunks_per_author = chunks_per_author + print(f"[Diverse Search] {num_authors} authors → taking {adaptive_chunks_per_author} chunks per author") + + final_chunks: List[Dict[str, Any]] = [] + for author, avg_score in top_authors: + # Get best chunks for this author + author_chunks = sorted(by_author[author], key=lambda x: x["similarity"], reverse=True) + selected = author_chunks[:adaptive_chunks_per_author] + final_chunks.extend(selected) + + # Cap at limit + final_chunks = final_chunks[:limit] + + # Log final metrics + final_authors = set(c["author"] for c in final_chunks) + elapsed = time.time() - start_time + print(f"[Diverse Search] Final: {len(final_chunks)} chunks from {len(final_authors)} authors | Time: {elapsed:.2f}s") + + return final_chunks + + except Exception as e: + import traceback + print(f"[Diverse Search] EXCEPTION CAUGHT: {e}") + print(f"[Diverse Search] Traceback: {traceback.format_exc()}") + print(f"[Diverse Search] Falling back to standard rag_search with limit={limit}") + # Fallback to standard search + return rag_search(query, limit) + + +def build_prompt_with_context(user_question: str, rag_context: List[Dict[str, Any]]) -> str: + """Build a prompt for LLM generation using RAG context. + + Constructs a comprehensive prompt that includes a system instruction, + formatted RAG context chunks with author/work metadata, and the user's + question. The prompt is designed to work with all LLM providers + (Ollama, Mistral, Anthropic, OpenAI). + + Args: + user_question: The user's question in natural language. + rag_context: List of context dictionaries from rag_search() with keys: + - text: Passage text + - author: Author name + - work: Work title + - section: Section or chapter + - similarity: Similarity score (0-100) + + Returns: + Formatted prompt string ready for LLM generation. + + Example: + >>> context = rag_search("Qu'est-ce que la justice ?", limit=2) + >>> prompt = build_prompt_with_context("Qu'est-ce que la justice ?", context) + >>> print(prompt[:100]) + 'Vous êtes un assistant spécialisé en philosophie...' + """ + # System instruction + system_instruction = """Vous êtes un assistant expert en philosophie. Votre rôle est de fournir des analyses APPROFONDIES et DÉTAILLÉES en vous appuyant sur les passages philosophiques fournis. + +INSTRUCTIONS IMPÉRATIVES : +- Fournissez une réponse LONGUE et DÉVELOPPÉE (minimum 500-800 mots) +- Analysez EN PROFONDEUR tous les aspects de la question +- Citez ABONDAMMENT les passages fournis avec références précises (auteur, œuvre) +- Développez les concepts philosophiques, ne vous contentez PAS de résumés superficiels +- Explorez les NUANCES, les implications, les relations entre les idées +- Structurez votre réponse en sections claires (introduction, développement avec sous-parties, conclusion) +- Si les passages ne couvrent pas tous les aspects, indiquez-le mais développez ce qui est disponible +- Adoptez un style académique rigoureux digne d'une analyse philosophique universitaire +- N'inventez JAMAIS d'informations absentes des passages, mais exploitez à fond celles qui y sont""" + + # Build context section + context_section = "\n\nPASSAGES PHILOSOPHIQUES :\n\n" + + if not rag_context: + context_section += "(Aucun passage trouvé)\n" + else: + for i, chunk in enumerate(rag_context, 1): + author = chunk.get("author", "Auteur inconnu") + work = chunk.get("work", "Œuvre inconnue") + section = chunk.get("section", "") + text = chunk.get("text", "") + similarity = chunk.get("similarity", 0) + + # Truncate very long passages (keep first 2000 chars max per chunk for deep analysis) + if len(text) > 2000: + text = text[:2000] + "..." + + context_section += f"**Passage {i}** [Score de pertinence: {similarity}%]\n" + context_section += f"**Auteur :** {author}\n" + context_section += f"**Œuvre :** {work}\n" + if section: + context_section += f"**Section :** {section}\n" + context_section += f"\n{text}\n\n" + context_section += "---\n\n" + + # User question + question_section = f"\nQUESTION :\n{user_question}\n\n" + + # Final instruction + final_instruction = """CONSIGNE FINALE : +Répondez à cette question en produisant une analyse philosophique COMPLÈTE et APPROFONDIE (minimum 500-800 mots). +Votre réponse doit : +1. Commencer par une introduction contextualisant la question +2. Développer une analyse détaillée en plusieurs parties, citant abondamment les passages +3. Explorer les implications philosophiques, les concepts-clés, les relations entre les idées +4. Conclure en synthétisant l'apport des passages à la question posée + +Ne vous limitez PAS à un résumé superficiel. Développez, analysez, approfondissez. C'est une discussion philosophique universitaire, pas un tweet.""" + + # Combine all sections + full_prompt = system_instruction + context_section + question_section + final_instruction + + # Truncate if too long (max ~30000 chars - modern LLMs have 128k+ context windows) + if len(full_prompt) > 30000: + # Reduce number of context chunks + print(f"[Prompt Builder] Warning: Prompt too long ({len(full_prompt)} chars), truncating context") + truncated_context = rag_context[:min(3, len(rag_context))] # Keep only top 3 chunks + return build_prompt_with_context(user_question, truncated_context) + + return full_prompt + + +@app.route("/test-rag") +def test_rag() -> Dict[str, Any]: + """Test endpoint for RAG search function. + + Example: + GET /test-rag?q=vertu&limit=3 + """ + query = request.args.get("q", "Qu'est-ce que la vertu ?") + limit = request.args.get("limit", 5, type=int) + + results = rag_search(query, limit) + + return jsonify({ + "query": query, + "limit": limit, + "results_count": len(results), + "results": results + }) + + +@app.route("/test-prompt") +def test_prompt() -> str: + """Test endpoint for prompt construction with RAG context. + + Example: + GET /test-prompt?q=Qu'est-ce que la justice ?&limit=3 + + Returns: + HTML page displaying the constructed prompt. + """ + query = request.args.get("q", "Qu'est-ce que la vertu ?") + limit = request.args.get("limit", 3, type=int) + + # Get RAG context + rag_context = rag_search(query, limit) + + # Build prompt + prompt = build_prompt_with_context(query, rag_context) + + # Display as preformatted text in HTML + html = f""" + + + + Test Prompt RAG + + + +
+

🧪 Test Prompt Construction RAG

+
+ Question: {query}
+ Contextes RAG: {len(rag_context)} passages
+ Longueur prompt: {len(prompt)} caractères +
+

Prompt généré :

+
{prompt}
+
+ Chunks utilisés :
+ {chr(10).join([f"- {c['author']} - {c['work']} (similarité: {c['similarity']}%)" for c in rag_context])} +
+
+ + + """ + + return html + + +@app.route("/test-llm") +def test_llm() -> WerkzeugResponse: + """Test endpoint for LLM streaming. + + Example: + GET /test-llm?provider=ollama&model=qwen2.5:7b&prompt=Hello + + Returns: + Plain text streamed response. + """ + from utils.llm_chat import call_llm, LLMError + + provider = request.args.get("provider", "ollama") + model = request.args.get("model", "qwen2.5:7b") + prompt = request.args.get("prompt", "Réponds en une phrase: Qu'est-ce que la philosophie ?") + + def generate() -> Iterator[str]: + try: + yield f"[Test LLM Streaming]\n" + yield f"Provider: {provider}\n" + yield f"Model: {model}\n" + yield f"Prompt: {prompt}\n\n" + yield "Response:\n" + + for token in call_llm(prompt, provider, model, stream=True): + yield token + + yield "\n\n[Done]" + + except LLMError as e: + yield f"\n\n[Error] {str(e)}" + except Exception as e: + yield f"\n\n[Unexpected Error] {str(e)}" + + return Response(generate(), mimetype='text/plain') + + +@app.route("/test-chat-backend") +def test_chat_backend() -> str: + """Test page for chat backend.""" + return render_template("test_chat_backend.html") + + +@app.route("/chat") +def chat() -> str: + """Render the conversation RAG interface. + + Provides a ChatGPT-like conversation interface where users can ask questions + in natural language. The system performs RAG (Retrieval-Augmented Generation) + by searching Weaviate for relevant philosophical text chunks and using them + to generate AI-powered answers via multiple LLM providers. + + Features: + - Multi-LLM support: Ollama (local), Mistral API, Anthropic API, OpenAI API + - Real-time streaming responses via Server-Sent Events + - RAG context sidebar showing relevant chunks used for answer generation + - Markdown rendering with code syntax highlighting + + Returns: + Rendered HTML template (chat.html) with: + - Chat interface with message history + - Model selector dropdown + - Input area for user questions + - Context sidebar for RAG chunks + + Example: + GET /chat + Returns the conversation interface ready for user interaction. + """ + # Get collection stats for display (optional) + stats: Optional[CollectionStats] = get_collection_stats() + + return render_template( + "chat.html", + stats=stats, + ) + + +def rerank_rag_chunks(question: str, chunks: List[Dict[str, Any]], provider: str, model: str) -> List[Dict[str, Any]]: + """Re-rank RAG chunks using LLM to filter out irrelevant results. + + After semantic search, uses LLM to evaluate which chunks are actually + relevant to the question and filters out noise (index pages, tangential mentions, etc.). + + Args: + question: The reformulated search query. + chunks: List of RAG chunks from semantic search. + provider: LLM provider name. + model: LLM model name. + + Returns: + Filtered list of chunks that are genuinely relevant (minimum 2 chunks). + + Example: + >>> chunks = rag_search("L'apport de Duns Scotus à Peirce", limit=5) + >>> relevant = rerank_rag_chunks("L'apport de Duns Scotus à Peirce", chunks, "mistral", "mistral-small-latest") + >>> len(relevant) <= len(chunks) + True + """ + from utils.llm_chat import call_llm + + if not chunks or len(chunks) <= 3: + return chunks # Keep all if too few (≤3 chunks) + + # Build reranking prompt + reranking_prompt = f"""Tu es un expert en évaluation de pertinence pour la recherche sémantique. + +QUESTION : {question} + +PASSAGES À ÉVALUER : +""" + + for i, chunk in enumerate(chunks, 1): + text_preview = chunk.get("text", "")[:400] # First 400 chars (increased from 300) + author = chunk.get("author", "") + work = chunk.get("work", "") + similarity = chunk.get("similarity", 0) + reranking_prompt += f"\n[{i}] ({similarity}%) {author} - {work}\n{text_preview}...\n" + + reranking_prompt += f""" +TÂCHE : Identifie les numéros des passages pertinents (garde au moins {min(10, len(chunks))} passages). + +CRITÈRES (sois TRÈS inclusif) : +- GARDE : contenu substantiel, analyse, citations, développement +- GARDE : contexte, introduction, commentaires indirects +- EXCLUS : index purs, tables des matières vides, bibliographies seules +- En cas de doute → INCLUS (philosophie = contexte riche nécessaire) + +IMPORTANT - FORMAT DE RÉPONSE : +- Si tous pertinents → réponds exactement : ALL +- Sinon → réponds UNIQUEMENT les numéros séparés par virgules +- AUCUN texte explicatif, AUCUN markdown, AUCUNE justification +- Minimum {min(8, len(chunks))} numéros + +EXEMPLES DE RÉPONSES VALIDES : +- ALL +- 1,2,3,4,5,6,7,8 +- 1,3,5,7,9,11,13,15 + +RÉPONSE (numéros UNIQUEMENT) :""" + + # Get LLM evaluation + response = "" + for token in call_llm(reranking_prompt, provider, model, stream=False, temperature=0.2, max_tokens=200): + response += token + + response = response.strip() + + # Log LLM response for debugging + print(f"[Re-ranking] LLM response: {response}") + + # Clean response: extract only numbers if LLM added markdown/explanations + # Common patterns: "**1, 4**" or "1,4\n\n**Explications:**" + import re + # Extract first line or content before markdown/explanations + first_line = response.split('\n')[0].strip() + # Remove markdown formatting (**, __, etc.) + cleaned = re.sub(r'\*\*|__|~~', '', first_line).strip() + + print(f"[Re-ranking] Cleaned response: {cleaned}") + + # Parse response + if cleaned.upper() == "ALL": + print(f"[Re-ranking] LLM selected ALL chunks, returning all {len(chunks)} chunks") + return chunks # Return all chunks + elif cleaned.upper() == "NONE": + print(f"[Re-ranking] LLM selected NONE, returning top 8 by similarity") + return chunks[:8] # Keep top 8 by similarity even if LLM says none + else: + try: + # Parse comma-separated numbers from cleaned response + relevant_indices = [int(num.strip()) - 1 for num in cleaned.split(",") if num.strip().isdigit()] + filtered_chunks = [chunks[i] for i in relevant_indices if 0 <= i < len(chunks)] + + print(f"[Re-ranking] LLM selected {len(filtered_chunks)} chunks from {len(chunks)} candidates") + + # Log excluded chunks for debugging + excluded_indices = [i for i in range(len(chunks)) if i not in relevant_indices] + if excluded_indices: + print(f"\n[Re-ranking] ❌ EXCLUDED {len(excluded_indices)} chunks:") + for idx in excluded_indices: + chunk = chunks[idx] + author = chunk.get('author', 'Unknown') + work = chunk.get('work', 'Unknown') + text_preview = chunk.get('text', '')[:150].replace('\n', ' ') + similarity = chunk.get('similarity', 0) + print(f" [{idx+1}] ({similarity}%) {author} - {work}") + print(f" \"{text_preview}...\"") + + # Ensure minimum of all chunks if too few selected (re-ranking failed) + if len(filtered_chunks) < len(chunks) // 2: + print(f"[Re-ranking] Too few selected ({len(filtered_chunks)}), keeping ALL {len(chunks)} chunks") + return chunks + + # Return filtered chunks (no cap, trust the LLM selection) + return filtered_chunks if filtered_chunks else chunks + except Exception as e: + print(f"[Re-ranking] Parse error: {e}, keeping ALL {len(chunks)} chunks") + return chunks + + +def reformulate_question(question: str, provider: str, model: str) -> str: + """Reformulate user question for optimal RAG search. + + Takes a potentially informal or poorly worded question and reformulates + it into a clear, well-structured search query optimized for semantic search. + + Args: + question: Original user question (may be informal). + provider: LLM provider name. + model: LLM model name. + + Returns: + Reformulated question optimized for RAG search. + + Example: + >>> reformulate_question("scotus a apporté quoi a Peirce?", "mistral", "mistral-small-latest") + "L'apport de Duns Scotus à la philosophie de Charles Sanders Peirce" + """ + from utils.llm_chat import call_llm + + reformulation_prompt = f"""Tu es un expert en recherche philosophique et en reformulation de requêtes pour bases de données textuelles. + +Ta tâche : transformer la question suivante en une REQUÊTE LONGUE ET DÉTAILLÉE (plusieurs lignes) qui maximisera la récupération de passages pertinents dans une recherche sémantique. + +RÈGLES DE REFORMULATION EXPANSIVE : +1. Corrige les fautes et formalise le langage +2. Explicite TOUS les noms propres avec leurs formes complètes et variantes : + - Ex: "Scotus" → "Duns Scot, Jean Duns Scot, Scotus" + - Ex: "Peirce" → "Charles Sanders Peirce, C.S. Peirce" +3. DÉVELOPPE la question en problématique philosophique (3-5 lignes) : + - Identifie les concepts clés impliqués + - Mentionne les contextes philosophiques pertinents + - Évoque les filiations intellectuelles (qui a influencé qui, écoles de pensée) + - Suggère des thèmes connexes (métaphysique, logique, sémiotique, réalisme vs nominalisme, etc.) +4. Utilise un vocabulaire RICHE en synonymes et termes techniques +5. "Ratisse large" pour capturer un maximum de passages pertinents + +OBJECTIF : Ta reformulation doit être un texte de 4-6 lignes qui explore tous les angles de la question pour que la recherche sémantique trouve TOUS les passages pertinents possibles. + +QUESTION ORIGINALE : +{question} + +REFORMULATION EXPANSIVE (4-6 lignes de texte détaillé, sans explication supplémentaire) :""" + + reformulated = "" + for token in call_llm(reformulation_prompt, provider, model, stream=False, temperature=0.3, max_tokens=500): + reformulated += token + + return reformulated.strip() + + +def run_chat_generation( + session_id: str, + question: str, + provider: str, + model: str, + limit: int, + use_reformulation: bool = True, +) -> None: + """Execute RAG search and LLM generation in background thread. + + Pipeline: + 1. Reformulate question for optimal RAG search (optional) + 2. RAG search with chosen question version + 3. Build prompt with context + 4. Stream LLM response + + Args: + session_id: Unique session identifier. + question: User's question (may be original or reformulated). + provider: LLM provider name. + model: LLM model name. + limit: Number of RAG context chunks to retrieve. + use_reformulation: Whether reformulation was used (for display purposes). + """ + session: Dict[str, Any] = chat_sessions[session_id] + q: queue.Queue[Dict[str, Any]] = session["queue"] + + try: + from utils.llm_chat import call_llm, LLMError + + # Note: Reformulation is now done separately via /chat/reformulate endpoint + # The question parameter here is the final chosen version (original or reformulated) + + # Step 1: Diverse author search (avoids corpus imbalance bias) + session["status"] = "searching" + rag_context = diverse_author_search( + query=question, + limit=25, # Get 25 diverse chunks + initial_pool=200, # LARGE pool to find all relevant authors (increased from 100) + max_authors=8, # Include up to 8 distinct authors (increased from 6) + chunks_per_author=3 # Max 3 chunks per author for balance + ) + + print(f"[Pipeline] diverse_author_search returned {len(rag_context)} chunks") + if rag_context: + authors = list(set(c.get('author', 'Unknown') for c in rag_context)) + print(f"[Pipeline] Authors in rag_context: {authors}") + + # Step 1.5: Re-rank chunks to filter out irrelevant results + session["status"] = "reranking" + filtered_context = rerank_rag_chunks(question, rag_context, provider, model) + + print(f"[Pipeline] rerank_rag_chunks returned {len(filtered_context)} chunks") + if filtered_context: + authors = list(set(c.get('author', 'Unknown') for c in filtered_context)) + print(f"[Pipeline] Authors in filtered_context: {authors}") + + # Send filtered context to client + context_event: Dict[str, Any] = { + "type": "context", + "chunks": filtered_context + } + q.put(context_event) + + # Store context in session + session["context"] = filtered_context + + # Step 3: Build prompt (use ORIGINAL question for natural response, filtered context) + session["status"] = "generating" + prompt = build_prompt_with_context(question, filtered_context) + + # Step 4: Stream LLM response + for token in call_llm(prompt, provider, model, stream=True): + token_event: Dict[str, Any] = { + "type": "token", + "content": token + } + q.put(token_event) + + # Send completion event + session["status"] = "complete" + complete_event: Dict[str, Any] = { + "type": "complete" + } + q.put(complete_event) + + except LLMError as e: + session["status"] = "error" + error_event: Dict[str, Any] = { + "type": "error", + "message": f"Erreur LLM: {str(e)}" + } + q.put(error_event) + + except Exception as e: + session["status"] = "error" + error_event: Dict[str, Any] = { + "type": "error", + "message": f"Erreur: {str(e)}" + } + q.put(error_event) + + +@app.route("/chat/reformulate", methods=["POST"]) +def chat_reformulate() -> tuple[Dict[str, Any], int]: + """Reformulate user question for optimal RAG search. + + Accepts JSON body with user question and LLM configuration, + returns both original and reformulated versions. + + Request Body (JSON): + question (str): User's question. + provider (str): LLM provider ("ollama", "mistral", "anthropic", "openai"). + model (str): Model name. + + Returns: + JSON response with original and reformulated questions. + + Example: + POST /chat/reformulate + { + "question": "scotus a apporté quoi a Peirce?", + "provider": "ollama", + "model": "qwen2.5:7b" + } + + Response: + { + "original": "scotus a apporté quoi a Peirce?", + "reformulated": "L'apport de Duns Scotus à Charles Sanders Peirce..." + } + """ + data = request.get_json() + + # Validate input + if not data: + return {"error": "JSON body required"}, 400 + + question = data.get("question", "").strip() + if not question: + return {"error": "Question is required"}, 400 + + if len(question) > 2000: + return {"error": "Question too long (max 2000 chars)"}, 400 + + provider = data.get("provider", "ollama").lower() + valid_providers = ["ollama", "mistral", "anthropic", "openai"] + if provider not in valid_providers: + return {"error": f"Invalid provider. Must be one of: {', '.join(valid_providers)}"}, 400 + + model = data.get("model", "") + if not model: + return {"error": "Model is required"}, 400 + + try: + # Reformulate question + reformulated = reformulate_question(question, provider, model) + + return { + "original": question, + "reformulated": reformulated + }, 200 + + except Exception as e: + return {"error": f"Reformulation failed: {str(e)}"}, 500 + + +@app.route("/chat/send", methods=["POST"]) +def chat_send() -> tuple[Dict[str, Any], int]: + """Handle user question and initiate RAG + LLM generation. + + Accepts JSON body with user question and LLM configuration, + creates a background thread for RAG search and LLM generation, + and returns a session ID for SSE streaming. + + Request Body (JSON): + question (str): User's question. + provider (str): LLM provider ("ollama", "mistral", "anthropic", "openai"). + model (str): Model name. + limit (int, optional): Number of RAG chunks. Defaults to 5. + use_reformulation (bool, optional): Use reformulated question. Defaults to True. + + Returns: + JSON response with session_id and status. + + Example: + POST /chat/send + { + "question": "Qu'est-ce que la vertu ?", + "provider": "ollama", + "model": "qwen2.5:7b", + "limit": 5, + "use_reformulation": true + } + + Response: + { + "session_id": "uuid-here", + "status": "streaming" + } + """ + data = request.get_json() + + # Validate input + if not data: + return {"error": "JSON body required"}, 400 + + question = data.get("question", "").strip() + if not question: + return {"error": "Question is required"}, 400 + + if len(question) > 2000: + return {"error": "Question too long (max 2000 chars)"}, 400 + + provider = data.get("provider", "ollama").lower() + valid_providers = ["ollama", "mistral", "anthropic", "openai"] + if provider not in valid_providers: + return {"error": f"Invalid provider. Must be one of: {', '.join(valid_providers)}"}, 400 + + model = data.get("model", "") + if not model: + return {"error": "Model is required"}, 400 + + limit = data.get("limit", 5) + if not isinstance(limit, int) or limit < 1 or limit > 10: + return {"error": "Limit must be between 1 and 10"}, 400 + + use_reformulation = data.get("use_reformulation", True) + + # Create session + session_id = str(uuid.uuid4()) + chat_sessions[session_id] = { + "status": "initializing", + "queue": queue.Queue(), + "context": [], + "question": question, + "provider": provider, + "model": model, + } + + # Start background thread + thread = threading.Thread( + target=run_chat_generation, + args=(session_id, question, provider, model, limit, use_reformulation), + daemon=True, + ) + thread.start() + + return { + "session_id": session_id, + "status": "streaming" + }, 200 + + +@app.route("/chat/stream/") +def chat_stream(session_id: str) -> WerkzeugResponse: + """Server-Sent Events endpoint for streaming LLM responses. + + Streams events from the chat generation background thread to the client + using Server-Sent Events (SSE). Events include RAG context, LLM tokens, + completion, and errors. + + Args: + session_id: Unique session identifier from POST /chat/send. + + Event Types: + - context: RAG chunks used for generation + - token: Individual LLM output token + - complete: Generation finished successfully + - error: Error occurred during generation + + Returns: + SSE stream response. + + Example: + GET /chat/stream/uuid-here + + Event stream: + data: {"type": "context", "chunks": [...]} + + data: {"type": "token", "content": "La"} + + data: {"type": "token", "content": " philosophie"} + + data: {"type": "complete"} + """ + if session_id not in chat_sessions: + def error_stream() -> Iterator[str]: + yield f"data: {json.dumps({'type': 'error', 'message': 'Session not found'})}\n\n" + return Response(error_stream(), mimetype='text/event-stream') + + session: Dict[str, Any] = chat_sessions[session_id] + q: queue.Queue[Dict[str, Any]] = session["queue"] + + def generate_events() -> Iterator[str]: + """Generate SSE events from queue.""" + last_keepalive = time.time() + keepalive_interval = 30 # seconds + + while True: + try: + # Non-blocking get with timeout for keep-alive + try: + event = q.get(timeout=1) + + # Send event to client + yield f"data: {json.dumps(event)}\n\n" + + # If complete or error, end stream + if event["type"] in ["complete", "error"]: + break + + except queue.Empty: + # Send keep-alive if needed + now = time.time() + if now - last_keepalive > keepalive_interval: + yield f": keepalive\n\n" + last_keepalive = now + + # Check if session is stale (no activity for 5 minutes) + if session.get("status") == "error": + break + + except GeneratorExit: + # Client disconnected + break + + return Response( + generate_events(), + mimetype='text/event-stream', + headers={ + 'Cache-Control': 'no-cache', + 'X-Accel-Buffering': 'no', + } + ) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# PDF Upload & Processing +# ═══════════════════════════════════════════════════════════════════════════════ + +def allowed_file(filename: str) -> bool: + """Check if file has an allowed extension. + + Args: + filename: The filename to check. + + Returns: + True if the file extension is allowed, False otherwise. + """ + return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS + + +def run_processing_job( + job_id: str, + file_bytes: bytes, + filename: str, + options: ProcessingOptions, +) -> None: + """Execute PDF processing in background with SSE event emission. + + Args: + job_id: Unique identifier for this processing job. + file_bytes: Raw PDF file content. + filename: Original filename for the PDF. + options: Processing options (LLM settings, OCR options, etc.). + """ + job: Dict[str, Any] = processing_jobs[job_id] + q: queue.Queue[SSEEvent] = job["queue"] + + try: + from utils.pdf_pipeline import process_pdf_bytes + + # Callback pour émettre la progression + def progress_callback(step: str, status: str, detail: Optional[str] = None) -> None: + event: SSEEvent = { + "type": "step", + "step": step, + "status": status, + "detail": detail + } + q.put(event) + + # Traiter le PDF avec callback + from utils.types import V2PipelineResult, V1PipelineResult, LLMProvider + from typing import Union, cast + result: Union[V2PipelineResult, V1PipelineResult] = process_pdf_bytes( + file_bytes, + filename, + output_dir=app.config["UPLOAD_FOLDER"], + skip_ocr=options["skip_ocr"], + use_llm=options["use_llm"], + llm_provider=cast(LLMProvider, options["llm_provider"]), + llm_model=options["llm_model"], + ingest_to_weaviate=options["ingest_weaviate"], + use_ocr_annotations=options["use_ocr_annotations"], + max_toc_pages=options["max_toc_pages"], + progress_callback=progress_callback, + ) + + job["result"] = result + + if result.get("success"): + job["status"] = "complete" + doc_name: str = result.get("document_name", Path(filename).stem) + complete_event: SSEEvent = { + "type": "complete", + "redirect": f"/documents/{doc_name}/view" + } + q.put(complete_event) + else: + job["status"] = "error" + error_event: SSEEvent = { + "type": "error", + "message": result.get("error", "Erreur inconnue") + } + q.put(error_event) + + except Exception as e: + job["status"] = "error" + job["result"] = {"error": str(e)} + exception_event: SSEEvent = { + "type": "error", + "message": str(e) + } + q.put(exception_event) + + +@app.route("/upload", methods=["GET", "POST"]) +def upload() -> str: + """Handle PDF upload form display and file submission. + + GET: Displays the upload form with processing options. + POST: Validates the uploaded PDF, starts background processing, and + redirects to the progress page. + + Form Parameters (POST): + file: PDF file to upload (required, max 50MB). + llm_provider (str): LLM provider - "mistral" or "ollama". Defaults to "mistral". + llm_model (str): Specific model name. Defaults based on provider. + skip_ocr (bool): Skip OCR if markdown already exists. Defaults to False. + use_llm (bool): Enable LLM processing steps. Defaults to True. + ingest_weaviate (bool): Ingest chunks to Weaviate. Defaults to True. + use_ocr_annotations (bool): Use OCR annotations for better TOC. Defaults to False. + max_toc_pages (int): Max pages to scan for TOC. Defaults to 8. + + Returns: + GET: Rendered upload form (upload.html). + POST (success): Rendered progress page (upload_progress.html) with job_id. + POST (error): Rendered upload form with error message. + + Note: + Processing runs in a background thread. Use /upload/progress/ + SSE endpoint to monitor progress in real-time. + """ + if request.method == "GET": + return render_template("upload.html") + + # POST: traiter le fichier + if "file" not in request.files: + return render_template("upload.html", error="Aucun fichier sélectionné") + + file = request.files["file"] + + if not file.filename or file.filename == "": + return render_template("upload.html", error="Aucun fichier sélectionné") + + if not allowed_file(file.filename): + return render_template("upload.html", error="Format non supporté. Utilisez un fichier PDF ou Markdown (.md).") + + # Options de traitement + llm_provider: str = request.form.get("llm_provider", "mistral") + default_model: str = "mistral-small-latest" if llm_provider == "mistral" else "qwen2.5:7b" + + options: Dict[str, Any] = { + "skip_ocr": request.form.get("skip_ocr") == "on", + "use_llm": request.form.get("use_llm", "on") == "on", + "llm_provider": llm_provider, + "llm_model": request.form.get("llm_model", default_model) or default_model, + "ingest_weaviate": request.form.get("ingest_weaviate", "on") == "on", + "use_ocr_annotations": request.form.get("use_ocr_annotations") == "on", + "max_toc_pages": int(request.form.get("max_toc_pages", "8")), + } + + # Lire le fichier + filename: str = secure_filename(file.filename) + file_bytes: bytes = file.read() + + # Créer un job de traitement + job_id: str = str(uuid.uuid4()) + processing_jobs[job_id] = { + "status": "processing", + "queue": queue.Queue(), + "result": None, + "filename": filename, + } + + # Démarrer le traitement en background + thread: threading.Thread = threading.Thread( + target=run_processing_job, + args=(job_id, file_bytes, filename, options) + ) + thread.daemon = True + thread.start() + + # Afficher la page de progression + return render_template("upload_progress.html", job_id=job_id, filename=filename) + + +@app.route("/upload/progress/") +def upload_progress(job_id: str) -> Response: + """SSE endpoint for real-time processing progress updates. + + Streams Server-Sent Events to the client with processing step updates, + completion status, or error messages. + + Args: + job_id: Unique identifier for the processing job. + + Returns: + Response with text/event-stream mimetype for SSE communication. + """ + def generate() -> Generator[str, None, None]: + """Generate SSE events from the processing job queue. + + Yields: + SSE-formatted strings containing JSON event data. + """ + if job_id not in processing_jobs: + error_event: SSEEvent = {"type": "error", "message": "Job non trouvé"} + yield f"data: {json.dumps(error_event)}\n\n" + return + + job: Dict[str, Any] = processing_jobs[job_id] + q: queue.Queue[SSEEvent] = job["queue"] + + while True: + try: + # Attendre un événement (timeout 30s pour keep-alive) + event: SSEEvent = q.get(timeout=30) + yield f"data: {json.dumps(event)}\n\n" + + # Arrêter si terminé + if event.get("type") in ("complete", "error"): + break + + except queue.Empty: + # Envoyer un keep-alive + keepalive_event: SSEEvent = {"type": "keepalive"} + yield f"data: {json.dumps(keepalive_event)}\n\n" + + # Vérifier si le job est toujours actif + if job["status"] != "processing": + break + + return Response( + generate(), + mimetype="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + } + ) + + +@app.route("/upload/status/") +def upload_status(job_id: str) -> Response: + """Check the status of a PDF processing job via JSON API. + + Provides a polling endpoint for clients that cannot use SSE to check + job completion status. Returns JSON with status and redirect URL or + error message. + + Args: + job_id: UUID of the processing job to check. + + Returns: + JSON response with one of the following structures: + - ``{"status": "not_found"}`` if job_id is invalid + - ``{"status": "processing"}`` if job is still running + - ``{"status": "complete", "redirect": "/documents//view"}`` on success + - ``{"status": "error", "message": ""}`` on failure + + Note: + Prefer using the SSE endpoint /upload/progress/ for real-time + updates instead of polling this endpoint. + """ + if job_id not in processing_jobs: + return jsonify({"status": "not_found"}) + + job: Dict[str, Any] = processing_jobs[job_id] + + if job["status"] == "complete": + result: Dict[str, Any] = job.get("result", {}) + doc_name: str = result.get("document_name", "") + return jsonify({ + "status": "complete", + "redirect": f"/documents/{doc_name}/view" + }) + elif job["status"] == "error": + return jsonify({ + "status": "error", + "message": job.get("result", {}).get("error", "Erreur inconnue") + }) + else: + return jsonify({"status": "processing"}) + + +@app.route("/output/") +def serve_output(filepath: str) -> Response: + """Serve static files from the output directory. + + Provides access to processed document files including markdown, JSON, + and extracted images. Used by document view templates to display + document content and images. + + Args: + filepath: Relative path within the output folder (e.g., "doc_name/images/page_1.png"). + + Returns: + File contents with appropriate MIME type, or 404 if file not found. + + Example: + GET /output/mon_document/images/page_1.png + Returns the PNG image file for page 1 of "mon_document". + + Security: + Files are served from UPLOAD_FOLDER only. Path traversal is handled + by Flask's send_from_directory. + """ + return send_from_directory(app.config["UPLOAD_FOLDER"], filepath) + + +@app.route("/documents/delete/", methods=["POST"]) +def delete_document(doc_name: str) -> WerkzeugResponse: + """Delete a document and all associated data. + + Removes a processed document from both the local filesystem and Weaviate + database. Handles partial deletion gracefully, providing appropriate + flash messages for each scenario. + + Deletion order: + 1. Delete passages and sections from Weaviate + 2. Delete local files (markdown, chunks, images) + 3. Flash appropriate success/warning/error message + + Args: + doc_name: Name of the document directory to delete. + + Returns: + Redirect to documents list page with flash message indicating result. + + Note: + This action is irreversible. Both Weaviate data and local files + will be permanently deleted. + + Flash Messages: + - success: Document fully deleted + - warning: Partial deletion (files or Weaviate only) + - error: Document not found or deletion failed + """ + import shutil + import logging + from utils.weaviate_ingest import delete_document_chunks + + logger = logging.getLogger(__name__) + output_dir: Path = app.config["UPLOAD_FOLDER"] + doc_dir: Path = output_dir / doc_name + + files_deleted: bool = False + weaviate_deleted: bool = False + + # 1. Supprimer de Weaviate en premier + from utils.weaviate_ingest import DeleteResult + weaviate_result: DeleteResult = delete_document_chunks(doc_name) + + if weaviate_result.get("success"): + deleted_chunks: int = weaviate_result.get("deleted_chunks", 0) + deleted_summaries: int = weaviate_result.get("deleted_summaries", 0) + deleted_document: bool = weaviate_result.get("deleted_document", False) + + if deleted_chunks > 0 or deleted_summaries > 0 or deleted_document: + weaviate_deleted = True + logger.info(f"Weaviate : {deleted_chunks} chunks, {deleted_summaries} summaries supprimés pour '{doc_name}'") + else: + logger.info(f"Aucune donnée Weaviate trouvée pour '{doc_name}'") + else: + error_msg: str = weaviate_result.get("error", "Erreur inconnue") + logger.warning(f"Erreur Weaviate lors de la suppression de '{doc_name}': {error_msg}") + + # 2. Supprimer les fichiers locaux + if doc_dir.exists() and doc_dir.is_dir(): + try: + shutil.rmtree(doc_dir) + files_deleted = True + logger.info(f"Fichiers locaux supprimés : {doc_dir}") + except Exception as e: + logger.error(f"Erreur suppression fichiers pour '{doc_name}': {e}") + flash(f"Erreur lors de la suppression des fichiers : {e}", "error") + return redirect(url_for("documents")) + else: + logger.warning(f"Dossier '{doc_name}' introuvable localement") + + # 3. Messages de feedback + if files_deleted and weaviate_deleted: + deleted_chunks = weaviate_result.get("deleted_chunks", 0) + flash(f"✓ Document « {doc_name} » supprimé : {deleted_chunks} chunks supprimés de Weaviate", "success") + elif files_deleted and not weaviate_result.get("success"): + error_msg = weaviate_result.get("error", "Erreur inconnue") + flash(f"⚠ Fichiers supprimés, mais erreur Weaviate : {error_msg}", "warning") + elif files_deleted: + flash(f"✓ Document « {doc_name} » supprimé (aucune donnée Weaviate trouvée)", "success") + elif weaviate_deleted: + flash(f"⚠ Données Weaviate supprimées, mais fichiers locaux introuvables", "warning") + else: + flash(f"✗ Erreur : Document « {doc_name} » introuvable", "error") + + return redirect(url_for("documents")) + + +@app.route("/documents//view") +def view_document(doc_name: str) -> Union[str, WerkzeugResponse]: + """Display detailed view of a processed document. + + Shows comprehensive information about a processed document including + metadata, table of contents, chunks, extracted images, and Weaviate + ingestion status. + + Args: + doc_name: Name of the document directory to view. + + Returns: + Rendered HTML template (document_view.html) with document data, or + redirect to documents list if document not found. + + Template Context: + result (dict): Contains: + - document_name: Directory name + - output_dir: Full path to document directory + - files: Dict of available files (markdown, chunks, images, etc.) + - metadata: Extracted metadata (title, author, year, language) + - pages: Total page count + - chunks_count: Number of text chunks + - chunks: List of chunk data + - toc: Hierarchical table of contents + - flat_toc: Flattened TOC for navigation + - weaviate_ingest: Ingestion results if available + - cost: Processing cost (0 for legacy documents) + """ + output_dir: Path = app.config["UPLOAD_FOLDER"] + doc_dir: Path = output_dir / doc_name + + if not doc_dir.exists(): + return redirect(url_for("documents")) + + # Charger toutes les données du document + result: Dict[str, Any] = { + "document_name": doc_name, + "output_dir": str(doc_dir), + "files": {}, + "metadata": {}, + "weaviate_ingest": None, + } + + # Fichiers + md_file: Path = doc_dir / f"{doc_name}.md" + chunks_file: Path = doc_dir / f"{doc_name}_chunks.json" + structured_file: Path = doc_dir / f"{doc_name}_structured.json" + weaviate_file: Path = doc_dir / f"{doc_name}_weaviate.json" + images_dir: Path = doc_dir / "images" + + result["files"]["markdown"] = str(md_file) if md_file.exists() else None + result["files"]["chunks"] = str(chunks_file) if chunks_file.exists() else None + result["files"]["structured"] = str(structured_file) if structured_file.exists() else None + result["files"]["weaviate"] = str(weaviate_file) if weaviate_file.exists() else None + + if images_dir.exists(): + result["files"]["images"] = [str(f) for f in images_dir.glob("*.png")] + + # Charger les métadonnées, chunks et TOC depuis chunks.json + if chunks_file.exists(): + try: + with open(chunks_file, "r", encoding="utf-8") as f: + chunks_data: Dict[str, Any] = json.load(f) + result["metadata"] = chunks_data.get("metadata", {}) + result["pages"] = chunks_data.get("pages", 0) + result["chunks_count"] = len(chunks_data.get("chunks", [])) + # Charger les chunks complets + result["chunks"] = chunks_data.get("chunks", []) + # Charger la TOC hiérarchique + result["toc"] = chunks_data.get("toc", []) + result["flat_toc"] = chunks_data.get("flat_toc", []) + # Fallback sur metadata.toc si toc n'existe pas au niveau racine + if not result["toc"] and result["metadata"].get("toc"): + result["toc"] = result["metadata"]["toc"] + except Exception: + result["pages"] = 0 + result["chunks_count"] = 0 + result["chunks"] = [] + result["toc"] = [] + result["flat_toc"] = [] + + # Charger les données Weaviate + if weaviate_file.exists(): + try: + with open(weaviate_file, "r", encoding="utf-8") as f: + result["weaviate_ingest"] = json.load(f) + except Exception: + pass + + result["cost"] = 0 # Non disponible pour les anciens documents + + return render_template("document_view.html", result=result) + + +@app.route("/documents") +def documents() -> str: + """Render the list of all processed documents. + + Queries Weaviate to get actual document statistics from the database, + not from the local files. + + Returns: + Rendered HTML template (documents.html) with list of document info. + + Template Context: + documents (list): List of document dictionaries, each containing: + - name: Document source ID (from Weaviate) + - path: Full path to document directory (if exists) + - has_markdown: Whether markdown file exists + - has_chunks: Whether chunks JSON exists + - has_structured: Whether structured JSON exists + - has_images: Whether images directory has content + - image_count: Number of extracted PNG images + - metadata: Extracted document metadata + - pages: Page count + - chunks_count: Number of chunks IN WEAVIATE (not file) + - title: Document title (from Weaviate) + - author: Document author (from Weaviate) + - toc: Table of contents (from metadata) + """ + output_dir: Path = app.config["UPLOAD_FOLDER"] + documents_list: List[Dict[str, Any]] = [] + + # Query Weaviate to get actual documents and their stats + documents_from_weaviate: Dict[str, Dict[str, Any]] = {} + + with get_weaviate_client() as client: + if client is not None: + # Get chunk counts and authors + chunk_collection = client.collections.get("Chunk") + + for obj in chunk_collection.iterator(include_vector=False): + props = obj.properties + from typing import cast + doc_obj = cast(Dict[str, Any], props.get("document", {})) + work_obj = cast(Dict[str, Any], props.get("work", {})) + + if doc_obj: + source_id = doc_obj.get("sourceId", "") + if source_id: + if source_id not in documents_from_weaviate: + documents_from_weaviate[source_id] = { + "source_id": source_id, + "title": work_obj.get("title") if work_obj else "Unknown", + "author": work_obj.get("author") if work_obj else "Unknown", + "chunks_count": 0, + "summaries_count": 0, + "authors": set(), + } + documents_from_weaviate[source_id]["chunks_count"] += 1 + + # Track unique authors + author = work_obj.get("author") if work_obj else None + if author: + documents_from_weaviate[source_id]["authors"].add(author) + + # Get summary counts + try: + summary_collection = client.collections.get("Summary") + for obj in summary_collection.iterator(include_vector=False): + props = obj.properties + doc_obj = cast(Dict[str, Any], props.get("document", {})) + + if doc_obj: + source_id = doc_obj.get("sourceId", "") + if source_id and source_id in documents_from_weaviate: + documents_from_weaviate[source_id]["summaries_count"] += 1 + except Exception: + # Summary collection may not exist + pass + + # Match with local files if they exist + for source_id, weaviate_data in documents_from_weaviate.items(): + doc_dir: Path = output_dir / source_id + md_file: Path = doc_dir / f"{source_id}.md" + chunks_file: Path = doc_dir / f"{source_id}_chunks.json" + structured_file: Path = doc_dir / f"{source_id}_structured.json" + images_dir: Path = doc_dir / "images" + + # Load additional metadata from chunks.json if exists + metadata: Dict[str, Any] = {} + pages: int = 0 + toc: List[Dict[str, Any]] = [] + + if chunks_file.exists(): + try: + with open(chunks_file, "r", encoding="utf-8") as f: + chunks_data: Dict[str, Any] = json.load(f) + metadata = chunks_data.get("metadata", {}) + pages = chunks_data.get("pages", 0) + toc = metadata.get("toc", []) + except Exception: + pass + + documents_list.append({ + "name": source_id, + "path": str(doc_dir) if doc_dir.exists() else "", + "has_markdown": md_file.exists(), + "has_chunks": chunks_file.exists(), + "has_structured": structured_file.exists(), + "has_images": images_dir.exists() and any(images_dir.iterdir()) if images_dir.exists() else False, + "image_count": len(list(images_dir.glob("*.png"))) if images_dir.exists() else 0, + "metadata": metadata, + "summaries_count": weaviate_data["summaries_count"], # FROM WEAVIATE + "authors_count": len(weaviate_data["authors"]), # FROM WEAVIATE + "chunks_count": weaviate_data["chunks_count"], # FROM WEAVIATE + "title": weaviate_data["title"], # FROM WEAVIATE + "author": weaviate_data["author"], # FROM WEAVIATE + "toc": toc, + }) + + return render_template("documents.html", documents=documents_list) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Main +# ═══════════════════════════════════════════════════════════════════════════════ + +if __name__ == "__main__": + # Créer le dossier output si nécessaire + app.config["UPLOAD_FOLDER"].mkdir(parents=True, exist_ok=True) + app.run(debug=True, port=5000) + diff --git a/generations/library_rag/templates/chat.html b/generations/library_rag/templates/chat.html new file mode 100644 index 0000000..258eeed --- /dev/null +++ b/generations/library_rag/templates/chat.html @@ -0,0 +1,1171 @@ +{% extends "base.html" %} + +{% block title %}Conversation RAG{% endblock %} + +{% block content %} + + +
+ +
+ +
+
+ 💬 + Conversation RAG +
+
+ + +
+
+ + +
+ +
+
💭
+
Bienvenue dans la Conversation RAG
+
Posez une question pour commencer la recherche sémantique
+
+
+ + +
+
+ + +
+
0 / 2000
+
+
+ + +
+ + +
+
+ + + + + + + +{% endblock %}