diff --git a/generations/library_rag/flask_app.py b/generations/library_rag/flask_app.py index 89a1b50..ed60ef9 100644 --- a/generations/library_rag/flask_app.py +++ b/generations/library_rag/flask_app.py @@ -74,7 +74,7 @@ import threading import queue import time from pathlib import Path -from typing import Any, Dict, Generator, Iterator, List, Optional, Union +from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple, Union from flask import Flask, render_template, request, jsonify, redirect, url_for, send_from_directory, Response, flash from contextlib import contextmanager @@ -123,6 +123,32 @@ chat_sessions: Dict[str, Dict[str, Any]] = {} # {session_id: {"status": str, "q # Stockage des jobs TTS en cours tts_jobs: Dict[str, Dict[str, Any]] = {} # {job_id: {"status": str, "filepath": Path, "error": str}} +# Stockage des batch jobs (upload multiple) +batch_jobs: Dict[str, Dict[str, Any]] = {} # {batch_id: BatchJob dict} + +# ═══════════════════════════════════════════════════════════════════════════════ +# Template Filters +# ═══════════════════════════════════════════════════════════════════════════════ + +@app.template_filter("timestamp_to_date") +def timestamp_to_date(timestamp: float) -> str: + """Convert Unix timestamp to formatted date string. + + Args: + timestamp: Unix timestamp (seconds since epoch). + + Returns: + Formatted date string (e.g., "15 janvier 2026 à 14:30"). + """ + from datetime import datetime + if not timestamp: + return "—" + try: + dt = datetime.fromtimestamp(timestamp) + return dt.strftime("%d %B %Y à %H:%M") + except (ValueError, OSError): + return "—" + # ═══════════════════════════════════════════════════════════════════════════════ # Weaviate Connection # ═══════════════════════════════════════════════════════════════════════════════ @@ -2613,6 +2639,88 @@ def run_processing_job( q.put(exception_event) +def run_batch_sequential( + batch_id: str, + files: List[Tuple[bytes, str, int]], + options: ProcessingOptions, +) -> None: + """Execute batch processing of multiple PDFs sequentially. + + This function processes files ONE BY ONE (not in parallel) to respect + API rate limits and provide clear progress tracking. Each file creates + an individual processing job that is tracked separately. + + Args: + batch_id: Unique identifier for this batch job. + files: List of tuples (file_bytes, filename, size_bytes). + options: Processing options applied to all files. + """ + import time + import threading + + batch: Dict[str, Any] = batch_jobs[batch_id] + + for idx, (file_bytes, filename, size_bytes) in enumerate(files): + # 1. Create individual job (reuses existing infrastructure) + job_id = str(uuid.uuid4()) + processing_jobs[job_id] = { + "status": "processing", + "queue": queue.Queue(), + "result": None, + "filename": filename, + "batch_id": batch_id, # New field to link back to batch + } + + # 2. Update batch state + batch["files"][idx]["job_id"] = job_id + batch["files"][idx]["status"] = "processing" + batch["current_job_id"] = job_id + + # 3. Launch processing thread (uses existing function) + thread = threading.Thread( + target=run_processing_job, + args=(job_id, file_bytes, filename, options), + daemon=True + ) + thread.start() + + # 4. WAIT for completion with 1-hour timeout + thread.join(timeout=3600) + + # 5. Check result and update batch + job = processing_jobs[job_id] + + if thread.is_alive(): + # Thread still running after timeout + batch["failed_files"] += 1 + batch["files"][idx]["status"] = "error" + batch["files"][idx]["error"] = "Timeout (> 1 heure)" + continue + + if job["status"] == "complete": + batch["completed_files"] += 1 + batch["files"][idx]["status"] = "complete" + else: + batch["failed_files"] += 1 + batch["files"][idx]["status"] = "error" + error_msg = job.get("result", {}).get("error", "Erreur inconnue") if job.get("result") else "Erreur inconnue" + batch["files"][idx]["error"] = error_msg + + # Clear current job before next iteration + batch["current_job_id"] = None + + # Small delay between files (optional) + time.sleep(1) + + # Mark batch as complete + if batch["failed_files"] == 0: + batch["status"] = "complete" + elif batch["completed_files"] == 0: + batch["status"] = "error" + else: + batch["status"] = "partial" + + def run_word_processing_job( job_id: str, file_bytes: bytes, @@ -2728,17 +2836,22 @@ def upload() -> str: if request.method == "GET": return render_template("upload.html") - # POST: traiter le fichier + # POST: traiter le(s) fichier(s) if "file" not in request.files: return render_template("upload.html", error="Aucun fichier sélectionné") - file = request.files["file"] + # Récupérer tous les fichiers (support single + multiple) + files = request.files.getlist("file") - if not file.filename or file.filename == "": + if not files or len(files) == 0: return render_template("upload.html", error="Aucun fichier sélectionné") - if not allowed_file(file.filename): - return render_template("upload.html", error="Format non supporté. Utilisez un fichier PDF (.pdf) ou Word (.docx).") + # Valider tous les fichiers + for file in files: + if not file.filename or file.filename == "": + return render_template("upload.html", error="Un des fichiers est vide") + if not allowed_file(file.filename): + return render_template("upload.html", error=f"Format non supporté pour {file.filename}. Utilisez PDF (.pdf) ou Word (.docx).") # Options de traitement llm_provider: str = request.form.get("llm_provider", "mistral") @@ -2754,41 +2867,199 @@ def upload() -> str: "max_toc_pages": int(request.form.get("max_toc_pages", "8")), } - # Lire le fichier - filename: str = secure_filename(file.filename) - file_bytes: bytes = file.read() + # ═════════════════════════════════════════════════════════════════════════ + # SINGLE FILE UPLOAD (existing behavior, backward compatible) + # ═════════════════════════════════════════════════════════════════════════ + if len(files) == 1: + file = files[0] - # Déterminer le type de fichier - file_extension: str = filename.rsplit(".", 1)[1].lower() if "." in filename else "" - is_word_document: bool = file_extension == "docx" + # Lire le fichier + filename: str = secure_filename(file.filename) + file_bytes: bytes = file.read() - # Créer un job de traitement - job_id: str = str(uuid.uuid4()) - processing_jobs[job_id] = { - "status": "processing", - "queue": queue.Queue(), - "result": None, - "filename": filename, - } + # Déterminer le type de fichier + file_extension: str = filename.rsplit(".", 1)[1].lower() if "." in filename else "" + is_word_document: bool = file_extension == "docx" - # Démarrer le traitement en background (Word ou PDF) - if is_word_document: - thread: threading.Thread = threading.Thread( - target=run_word_processing_job, - args=(job_id, file_bytes, filename, options) - ) + # Créer un job de traitement + job_id: str = str(uuid.uuid4()) + processing_jobs[job_id] = { + "status": "processing", + "queue": queue.Queue(), + "result": None, + "filename": filename, + } + + # Démarrer le traitement en background (Word ou PDF) + if is_word_document: + thread: threading.Thread = threading.Thread( + target=run_word_processing_job, + args=(job_id, file_bytes, filename, options) + ) + else: + thread: threading.Thread = threading.Thread( + target=run_processing_job, + args=(job_id, file_bytes, filename, options) + ) + + thread.daemon = True + thread.start() + + # Afficher la page de progression + return render_template("upload_progress.html", job_id=job_id, filename=filename) + + # ═════════════════════════════════════════════════════════════════════════ + # MULTI-FILE BATCH UPLOAD (new feature) + # ═════════════════════════════════════════════════════════════════════════ else: - thread: threading.Thread = threading.Thread( - target=run_processing_job, - args=(job_id, file_bytes, filename, options) + import time + from utils.types import BatchFileInfo + + # Créer un batch ID + batch_id: str = str(uuid.uuid4()) + + # Lire tous les fichiers et créer les structures + files_data: List[Tuple[bytes, str, int]] = [] + batch_files: List[BatchFileInfo] = [] + + for file in files: + filename_secure: str = secure_filename(file.filename) + file_bytes_data: bytes = file.read() + size_bytes: int = len(file_bytes_data) + + files_data.append((file_bytes_data, filename_secure, size_bytes)) + batch_files.append({ + "filename": filename_secure, + "job_id": None, # Will be assigned during processing + "status": "pending", + "error": None, + "size_bytes": size_bytes, + }) + + # Créer le batch job + batch_jobs[batch_id] = { + "job_ids": [], + "files": batch_files, + "total_files": len(files), + "completed_files": 0, + "failed_files": 0, + "status": "processing", + "current_job_id": None, + "options": options, + "created_at": time.time(), + } + + # Lancer le thread de traitement séquentiel + batch_thread: threading.Thread = threading.Thread( + target=run_batch_sequential, + args=(batch_id, files_data, options), + daemon=True ) + batch_thread.start() - thread.daemon = True - thread.start() + # Rediriger vers la page de progression batch + return redirect(url_for("upload_batch_progress", batch_id=batch_id)) - # Afficher la page de progression - file_type_label: str = "Word" if is_word_document else "PDF" - return render_template("upload_progress.html", job_id=job_id, filename=filename) + +@app.route("/upload/batch/progress/") +def upload_batch_progress(batch_id: str) -> str: + """Display batch processing progress dashboard. + + Shows a table with all files in the batch and their processing status. + Uses polling to discover job IDs and opens SSE connections for each file. + + Args: + batch_id: Unique identifier for the batch job. + + Returns: + Rendered batch progress template with batch info. + """ + if batch_id not in batch_jobs: + return render_template("upload.html", error="Batch non trouvé") + + batch: Dict[str, Any] = batch_jobs[batch_id] + + return render_template( + "upload_batch_progress.html", + batch_id=batch_id, + batch=batch, + files=batch["files"], + total_files=batch["total_files"], + ) + + +@app.route("/upload/batch/status/") +def upload_batch_status(batch_id: str) -> Response: + """API endpoint for batch status polling. + + Returns JSON with current batch status and file information. + Used by the frontend to discover job IDs as files start processing. + + Args: + batch_id: Unique identifier for the batch job. + + Returns: + JSON response with batch status. + """ + if batch_id not in batch_jobs: + return jsonify({"error": "Batch non trouvé"}), 404 + + batch: Dict[str, Any] = batch_jobs[batch_id] + + return jsonify({ + "batch_id": batch_id, + "status": batch["status"], + "total_files": batch["total_files"], + "completed_files": batch["completed_files"], + "failed_files": batch["failed_files"], + "current_job_id": batch["current_job_id"], + "files": batch["files"], + }) + + +@app.route("/upload/batch/result/") +def upload_batch_result(batch_id: str) -> str: + """Display batch processing results summary. + + Shows final statistics and links to successfully processed documents. + + Args: + batch_id: Unique identifier for the batch job. + + Returns: + Rendered batch result template with summary. + """ + if batch_id not in batch_jobs: + return render_template("upload.html", error="Batch non trouvé") + + batch: Dict[str, Any] = batch_jobs[batch_id] + + # Build results with document names for completed files + results: List[Dict[str, Any]] = [] + for file_info in batch["files"]: + result_data: Dict[str, Any] = { + "filename": file_info["filename"], + "status": file_info["status"], + "error": file_info.get("error"), + "document_name": None, + } + + # Get document name from job result if successful + if file_info["status"] == "complete" and file_info.get("job_id"): + job_id = file_info["job_id"] + if job_id in processing_jobs: + job = processing_jobs[job_id] + if job.get("result") and job["result"].get("document_name"): + result_data["document_name"] = job["result"]["document_name"] + + results.append(result_data) + + return render_template( + "upload_batch_result.html", + batch_id=batch_id, + batch=batch, + results=results, + ) @app.route("/upload/progress/") diff --git a/generations/library_rag/templates/upload.html b/generations/library_rag/templates/upload.html index 665696e..9715e12 100644 --- a/generations/library_rag/templates/upload.html +++ b/generations/library_rag/templates/upload.html @@ -17,7 +17,7 @@
- + -
Formats acceptés : PDF (.pdf), Word (.docx) ou Markdown (.md) • Max 50 MB
+
+ Formats acceptés : PDF (.pdf), Word (.docx) ou Markdown (.md) • Max 50 MB par fichier
+ +
@@ -217,6 +221,18 @@ function updateModelOptions() { function updateOptionsForFileType() { const fileInput = document.getElementById('file'); + const fileCountInfo = document.getElementById('file-count-info'); + const fileCount = fileInput.files.length; + + // Update file count display + if (fileCount === 0) { + fileCountInfo.textContent = ''; + } else if (fileCount === 1) { + fileCountInfo.textContent = ''; + } else { + fileCountInfo.textContent = `📦 ${fileCount} fichiers sélectionnés (traitement séquentiel : un fichier après l'autre)`; + } + const fileName = fileInput.files[0]?.name || ''; const isWord = fileName.toLowerCase().endsWith('.docx'); const isPDF = fileName.toLowerCase().endsWith('.pdf'); @@ -238,7 +254,7 @@ function updateOptionsForFileType() { wordPipelineInfo.style.display = 'none'; markdownPipelineInfo.style.display = 'none'; - // Afficher selon le type + // Afficher selon le type (basé sur le premier fichier) if (isWord) { wordInfo.style.display = 'block'; wordPipelineInfo.style.display = 'block'; diff --git a/generations/library_rag/templates/upload_batch_progress.html b/generations/library_rag/templates/upload_batch_progress.html new file mode 100644 index 0000000..6ffa488 --- /dev/null +++ b/generations/library_rag/templates/upload_batch_progress.html @@ -0,0 +1,284 @@ +{% extends "base.html" %} + +{% block title %}Traitement Batch en cours{% endblock %} + +{% block content %} +
+

📦 Traitement de {{ total_files }} document(s)

+

Suivi en temps réel du traitement séquentiel

+ + +
+

📊 Progression Globale

+
+
+
0
+
Fichiers complétés
+
+
+
0
+
Fichiers en erreur
+
+
+
+
+ 0% +
+
+
+ Initialisation... +
+
+
+
+ + +
+

📄 Détails par fichier

+
+ + + + + + + + + + + + + {% for file in files %} + + + + + + + + + {% endfor %} + +
#FichierStatutÉtape actuelleProgressionActions
{{ loop.index }} + {{ file.filename }} + + + En attente + + + + +
+
+
+
+ +
+
+
+ + + +
+ + + + +{% endblock %} diff --git a/generations/library_rag/templates/upload_batch_result.html b/generations/library_rag/templates/upload_batch_result.html new file mode 100644 index 0000000..f216e18 --- /dev/null +++ b/generations/library_rag/templates/upload_batch_result.html @@ -0,0 +1,167 @@ +{% extends "base.html" %} + +{% block title %}Résultats Batch{% endblock %} + +{% block content %} +
+

📊 Résumé du Traitement Batch

+

Résultats finaux du traitement de {{ batch.total_files }} fichier(s)

+ + +
+

📈 Statistiques Globales

+
+
+
{{ batch.total_files }}
+
Total Fichiers
+
+
+
{{ batch.completed_files }}
+
✅ Réussis
+
+
+
{{ batch.failed_files }}
+
❌ Erreurs
+
+
+
+ {% set success_rate = (batch.completed_files / batch.total_files * 100) | round(1) %} + {{ success_rate }}% +
+
Taux de Réussite
+
+
+
+ + + {% if batch.failed_files == 0 %} +
+ ✅ Tous les fichiers ont été traités avec succès ! +

Vous pouvez maintenant consulter les documents via les liens ci-dessous ou dans la section "Documents".

+
+ {% elif batch.completed_files == 0 %} +
+ ❌ Aucun fichier n'a pu être traité avec succès. +

Vérifiez les erreurs ci-dessous pour plus de détails.

+
+ {% else %} +
+ ⚠️ Traitement partiel : {{ batch.completed_files }} réussi(s), {{ batch.failed_files }} erreur(s). +

Certains fichiers ont été traités avec succès, d'autres ont rencontré des erreurs.

+
+ {% endif %} + + +
+

📄 Détails par Fichier

+
+ {% for result in results %} +
+ +
+ +
+
+ {{ loop.index }}. {{ result.filename }} +
+
+ {% if result.status == 'complete' %} + + ✅ Réussi + + {% elif result.status == 'error' %} + + ❌ Erreur + + {% else %} + + ⏳ En attente + + {% endif %} +
+
+ + +
+ {% if result.status == 'complete' and result.document_name %} + + 📄 Voir le document + + {% elif result.status == 'error' %} +
+ Erreur :
+ {{ result.error or 'Erreur inconnue' }} +
+ {% endif %} +
+
+
+ {% endfor %} +
+
+ + + + + +
+

ℹ️ Informations de Traitement

+
+

Batch ID : {{ batch_id }}

+

Date de traitement : {{ batch.created_at | default(0) | int | timestamp_to_date }}

+

Options utilisées :

+
    +
  • Provider LLM : {{ batch.options.llm_provider }}
  • +
  • Modèle : {{ batch.options.llm_model }}
  • +
  • Skip OCR : {{ "Oui" if batch.options.skip_ocr else "Non" }}
  • +
  • Ingestion Weaviate : {{ "Oui" if batch.options.ingest_weaviate else "Non" }}
  • +
+
+
+
+ + + + +{% endblock %} diff --git a/generations/library_rag/utils/types.py b/generations/library_rag/utils/types.py index 3bb42b4..719f8ac 100644 --- a/generations/library_rag/utils/types.py +++ b/generations/library_rag/utils/types.py @@ -1216,3 +1216,47 @@ class DeleteDocumentResult(TypedDict, total=False): deleted_sections: int deleted_document: bool error: Optional[str] + + +class BatchFileInfo(TypedDict, total=False): + """Information about a single file in a batch upload. + + Attributes: + filename: Original filename + job_id: Processing job ID (assigned when processing starts) + status: Current status (pending, processing, complete, error) + error: Error message if processing failed + size_bytes: File size in bytes + """ + + filename: str + job_id: Optional[str] + status: str # Literal["pending", "processing", "complete", "error"] + error: Optional[str] + size_bytes: int + + +class BatchJob(TypedDict, total=False): + """Batch processing job tracking multiple file uploads. + + Attributes: + job_ids: List of individual processing job IDs + files: List of file information dictionaries + total_files: Total number of files in batch + completed_files: Number of files successfully processed + failed_files: Number of files that failed processing + status: Overall batch status (processing, complete, partial) + current_job_id: Currently processing job ID (None if between files) + options: Processing options applied to all files + created_at: Timestamp when batch was created + """ + + job_ids: List[str] + files: List[BatchFileInfo] + total_files: int + completed_files: int + failed_files: int + status: str # Literal["processing", "complete", "partial"] + current_job_id: Optional[str] + options: ProcessingOptions + created_at: float