feat: Add multi-file batch upload with sequential processing

Implements comprehensive batch upload system with real-time progress tracking:

Backend Infrastructure:
- Add batch_jobs global dict for batch orchestration
- Add BatchFileInfo and BatchJob TypedDicts to utils/types.py
- Create run_batch_sequential() worker function with thread.join() synchronization
- Modify /upload POST route to detect single vs multi-file uploads
- Add 3 batch API routes: /upload/batch/progress, /status, /result
- Add timestamp_to_date Jinja2 template filter

Frontend:
- Update upload.html with 'multiple' attribute and file counter
- Create upload_batch_progress.html: Real-time dashboard with SSE per file
- Create upload_batch_result.html: Final summary with statistics

Architecture:
- Backward compatible: single-file upload unchanged
- Sequential processing: one file after another (respects API limits)
- N parallel SSE connections: one per file for real-time progress
- Polling mechanism to discover job IDs as files start processing
- 1-hour timeout per file with error handling and continuation

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-08 22:41:52 +01:00
parent 7a7a2b8e19
commit b70b796ef8
5 changed files with 819 additions and 37 deletions

View File

@@ -74,7 +74,7 @@ import threading
import queue
import time
from pathlib import Path
from typing import Any, Dict, Generator, Iterator, List, Optional, Union
from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple, Union
from flask import Flask, render_template, request, jsonify, redirect, url_for, send_from_directory, Response, flash
from contextlib import contextmanager
@@ -123,6 +123,32 @@ chat_sessions: Dict[str, Dict[str, Any]] = {} # {session_id: {"status": str, "q
# Stockage des jobs TTS en cours
tts_jobs: Dict[str, Dict[str, Any]] = {} # {job_id: {"status": str, "filepath": Path, "error": str}}
# Stockage des batch jobs (upload multiple)
batch_jobs: Dict[str, Dict[str, Any]] = {} # {batch_id: BatchJob dict}
# ═══════════════════════════════════════════════════════════════════════════════
# Template Filters
# ═══════════════════════════════════════════════════════════════════════════════
@app.template_filter("timestamp_to_date")
def timestamp_to_date(timestamp: float) -> str:
"""Convert Unix timestamp to formatted date string.
Args:
timestamp: Unix timestamp (seconds since epoch).
Returns:
Formatted date string (e.g., "15 janvier 2026 à 14:30").
"""
from datetime import datetime
if not timestamp:
return ""
try:
dt = datetime.fromtimestamp(timestamp)
return dt.strftime("%d %B %Y à %H:%M")
except (ValueError, OSError):
return ""
# ═══════════════════════════════════════════════════════════════════════════════
# Weaviate Connection
# ═══════════════════════════════════════════════════════════════════════════════
@@ -2613,6 +2639,88 @@ def run_processing_job(
q.put(exception_event)
def run_batch_sequential(
batch_id: str,
files: List[Tuple[bytes, str, int]],
options: ProcessingOptions,
) -> None:
"""Execute batch processing of multiple PDFs sequentially.
This function processes files ONE BY ONE (not in parallel) to respect
API rate limits and provide clear progress tracking. Each file creates
an individual processing job that is tracked separately.
Args:
batch_id: Unique identifier for this batch job.
files: List of tuples (file_bytes, filename, size_bytes).
options: Processing options applied to all files.
"""
import time
import threading
batch: Dict[str, Any] = batch_jobs[batch_id]
for idx, (file_bytes, filename, size_bytes) in enumerate(files):
# 1. Create individual job (reuses existing infrastructure)
job_id = str(uuid.uuid4())
processing_jobs[job_id] = {
"status": "processing",
"queue": queue.Queue(),
"result": None,
"filename": filename,
"batch_id": batch_id, # New field to link back to batch
}
# 2. Update batch state
batch["files"][idx]["job_id"] = job_id
batch["files"][idx]["status"] = "processing"
batch["current_job_id"] = job_id
# 3. Launch processing thread (uses existing function)
thread = threading.Thread(
target=run_processing_job,
args=(job_id, file_bytes, filename, options),
daemon=True
)
thread.start()
# 4. WAIT for completion with 1-hour timeout
thread.join(timeout=3600)
# 5. Check result and update batch
job = processing_jobs[job_id]
if thread.is_alive():
# Thread still running after timeout
batch["failed_files"] += 1
batch["files"][idx]["status"] = "error"
batch["files"][idx]["error"] = "Timeout (> 1 heure)"
continue
if job["status"] == "complete":
batch["completed_files"] += 1
batch["files"][idx]["status"] = "complete"
else:
batch["failed_files"] += 1
batch["files"][idx]["status"] = "error"
error_msg = job.get("result", {}).get("error", "Erreur inconnue") if job.get("result") else "Erreur inconnue"
batch["files"][idx]["error"] = error_msg
# Clear current job before next iteration
batch["current_job_id"] = None
# Small delay between files (optional)
time.sleep(1)
# Mark batch as complete
if batch["failed_files"] == 0:
batch["status"] = "complete"
elif batch["completed_files"] == 0:
batch["status"] = "error"
else:
batch["status"] = "partial"
def run_word_processing_job(
job_id: str,
file_bytes: bytes,
@@ -2728,17 +2836,22 @@ def upload() -> str:
if request.method == "GET":
return render_template("upload.html")
# POST: traiter le fichier
# POST: traiter le(s) fichier(s)
if "file" not in request.files:
return render_template("upload.html", error="Aucun fichier sélectionné")
file = request.files["file"]
# Récupérer tous les fichiers (support single + multiple)
files = request.files.getlist("file")
if not file.filename or file.filename == "":
if not files or len(files) == 0:
return render_template("upload.html", error="Aucun fichier sélectionné")
if not allowed_file(file.filename):
return render_template("upload.html", error="Format non supporté. Utilisez un fichier PDF (.pdf) ou Word (.docx).")
# Valider tous les fichiers
for file in files:
if not file.filename or file.filename == "":
return render_template("upload.html", error="Un des fichiers est vide")
if not allowed_file(file.filename):
return render_template("upload.html", error=f"Format non supporté pour {file.filename}. Utilisez PDF (.pdf) ou Word (.docx).")
# Options de traitement
llm_provider: str = request.form.get("llm_provider", "mistral")
@@ -2754,41 +2867,199 @@ def upload() -> str:
"max_toc_pages": int(request.form.get("max_toc_pages", "8")),
}
# Lire le fichier
filename: str = secure_filename(file.filename)
file_bytes: bytes = file.read()
# ═════════════════════════════════════════════════════════════════════════
# SINGLE FILE UPLOAD (existing behavior, backward compatible)
# ═════════════════════════════════════════════════════════════════════════
if len(files) == 1:
file = files[0]
# Déterminer le type de fichier
file_extension: str = filename.rsplit(".", 1)[1].lower() if "." in filename else ""
is_word_document: bool = file_extension == "docx"
# Lire le fichier
filename: str = secure_filename(file.filename)
file_bytes: bytes = file.read()
# Créer un job de traitement
job_id: str = str(uuid.uuid4())
processing_jobs[job_id] = {
"status": "processing",
"queue": queue.Queue(),
"result": None,
"filename": filename,
}
# Déterminer le type de fichier
file_extension: str = filename.rsplit(".", 1)[1].lower() if "." in filename else ""
is_word_document: bool = file_extension == "docx"
# Démarrer le traitement en background (Word ou PDF)
if is_word_document:
thread: threading.Thread = threading.Thread(
target=run_word_processing_job,
args=(job_id, file_bytes, filename, options)
)
# Créer un job de traitement
job_id: str = str(uuid.uuid4())
processing_jobs[job_id] = {
"status": "processing",
"queue": queue.Queue(),
"result": None,
"filename": filename,
}
# Démarrer le traitement en background (Word ou PDF)
if is_word_document:
thread: threading.Thread = threading.Thread(
target=run_word_processing_job,
args=(job_id, file_bytes, filename, options)
)
else:
thread: threading.Thread = threading.Thread(
target=run_processing_job,
args=(job_id, file_bytes, filename, options)
)
thread.daemon = True
thread.start()
# Afficher la page de progression
return render_template("upload_progress.html", job_id=job_id, filename=filename)
# ═════════════════════════════════════════════════════════════════════════
# MULTI-FILE BATCH UPLOAD (new feature)
# ═════════════════════════════════════════════════════════════════════════
else:
thread: threading.Thread = threading.Thread(
target=run_processing_job,
args=(job_id, file_bytes, filename, options)
import time
from utils.types import BatchFileInfo
# Créer un batch ID
batch_id: str = str(uuid.uuid4())
# Lire tous les fichiers et créer les structures
files_data: List[Tuple[bytes, str, int]] = []
batch_files: List[BatchFileInfo] = []
for file in files:
filename_secure: str = secure_filename(file.filename)
file_bytes_data: bytes = file.read()
size_bytes: int = len(file_bytes_data)
files_data.append((file_bytes_data, filename_secure, size_bytes))
batch_files.append({
"filename": filename_secure,
"job_id": None, # Will be assigned during processing
"status": "pending",
"error": None,
"size_bytes": size_bytes,
})
# Créer le batch job
batch_jobs[batch_id] = {
"job_ids": [],
"files": batch_files,
"total_files": len(files),
"completed_files": 0,
"failed_files": 0,
"status": "processing",
"current_job_id": None,
"options": options,
"created_at": time.time(),
}
# Lancer le thread de traitement séquentiel
batch_thread: threading.Thread = threading.Thread(
target=run_batch_sequential,
args=(batch_id, files_data, options),
daemon=True
)
batch_thread.start()
thread.daemon = True
thread.start()
# Rediriger vers la page de progression batch
return redirect(url_for("upload_batch_progress", batch_id=batch_id))
# Afficher la page de progression
file_type_label: str = "Word" if is_word_document else "PDF"
return render_template("upload_progress.html", job_id=job_id, filename=filename)
@app.route("/upload/batch/progress/<batch_id>")
def upload_batch_progress(batch_id: str) -> str:
"""Display batch processing progress dashboard.
Shows a table with all files in the batch and their processing status.
Uses polling to discover job IDs and opens SSE connections for each file.
Args:
batch_id: Unique identifier for the batch job.
Returns:
Rendered batch progress template with batch info.
"""
if batch_id not in batch_jobs:
return render_template("upload.html", error="Batch non trouvé")
batch: Dict[str, Any] = batch_jobs[batch_id]
return render_template(
"upload_batch_progress.html",
batch_id=batch_id,
batch=batch,
files=batch["files"],
total_files=batch["total_files"],
)
@app.route("/upload/batch/status/<batch_id>")
def upload_batch_status(batch_id: str) -> Response:
"""API endpoint for batch status polling.
Returns JSON with current batch status and file information.
Used by the frontend to discover job IDs as files start processing.
Args:
batch_id: Unique identifier for the batch job.
Returns:
JSON response with batch status.
"""
if batch_id not in batch_jobs:
return jsonify({"error": "Batch non trouvé"}), 404
batch: Dict[str, Any] = batch_jobs[batch_id]
return jsonify({
"batch_id": batch_id,
"status": batch["status"],
"total_files": batch["total_files"],
"completed_files": batch["completed_files"],
"failed_files": batch["failed_files"],
"current_job_id": batch["current_job_id"],
"files": batch["files"],
})
@app.route("/upload/batch/result/<batch_id>")
def upload_batch_result(batch_id: str) -> str:
"""Display batch processing results summary.
Shows final statistics and links to successfully processed documents.
Args:
batch_id: Unique identifier for the batch job.
Returns:
Rendered batch result template with summary.
"""
if batch_id not in batch_jobs:
return render_template("upload.html", error="Batch non trouvé")
batch: Dict[str, Any] = batch_jobs[batch_id]
# Build results with document names for completed files
results: List[Dict[str, Any]] = []
for file_info in batch["files"]:
result_data: Dict[str, Any] = {
"filename": file_info["filename"],
"status": file_info["status"],
"error": file_info.get("error"),
"document_name": None,
}
# Get document name from job result if successful
if file_info["status"] == "complete" and file_info.get("job_id"):
job_id = file_info["job_id"]
if job_id in processing_jobs:
job = processing_jobs[job_id]
if job.get("result") and job["result"].get("document_name"):
result_data["document_name"] = job["result"]["document_name"]
results.append(result_data)
return render_template(
"upload_batch_result.html",
batch_id=batch_id,
batch=batch,
results=results,
)
@app.route("/upload/progress/<job_id>")