Add ikario_processual with David profile and embedding script

- david_profile_declared.json: David's declared profile values from questionnaire
- scripts/embed_david.py: Python script to generate embeddings using BGE-M3 model
- questionnaire_david.md: Questionnaire template for profile values

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-31 16:56:41 +01:00
parent 9e657cbf29
commit 21f5676c7b
18 changed files with 5463 additions and 0 deletions

View File

@@ -0,0 +1 @@
# Scripts utilitaires pour ikario_processual

View File

@@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""
Script pour creer toutes les directions de projection dans Weaviate.
Usage:
python scripts/create_all_directions.py [--reset]
Options:
--reset Supprimer et recreer la collection (attention: perte de donnees!)
"""
import sys
import time
from pathlib import Path
# Ajouter le parent au path
sys.path.insert(0, str(Path(__file__).parent.parent))
from projection_directions import (
DIRECTIONS_CONFIG,
create_projection_direction_collection,
delete_projection_direction_collection,
create_direction_by_contrast,
save_direction,
get_all_directions,
get_existing_classes,
)
def main():
reset = "--reset" in sys.argv
print("=" * 70)
print("CREATION DES DIRECTIONS DE PROJECTION")
print("=" * 70)
print(f"Total directions configurees: {len(DIRECTIONS_CONFIG)}")
print()
# Verifier Weaviate
try:
classes = get_existing_classes()
print(f"[OK] Weaviate accessible, {len(classes)} classes existantes")
except Exception as e:
print(f"[ERREUR] Weaviate non accessible: {e}")
print("Assurez-vous que Weaviate est en cours d'execution sur localhost:8080")
return 1
# Reset si demande
if reset:
print("\n[RESET] Suppression de la collection ProjectionDirection...")
if delete_projection_direction_collection():
print("[OK] Collection supprimee")
else:
print("[INFO] Collection n'existait pas")
# Creer la collection si necessaire
print("\n[INFO] Creation de la collection ProjectionDirection...")
if create_projection_direction_collection():
print("[OK] Collection creee")
else:
print("[INFO] Collection existe deja")
# Charger le modele d'embedding
print("\n[INFO] Chargement du modele BGE-M3...")
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('BAAI/bge-m3')
print("[OK] Modele charge")
except Exception as e:
print(f"[ERREUR] Impossible de charger le modele: {e}")
return 1
# Verifier les directions existantes
existing_directions = get_all_directions()
existing_names = {d["name"] for d in existing_directions}
print(f"\n[INFO] {len(existing_names)} directions existantes")
# Compter les categories
categories = {}
for name, config in DIRECTIONS_CONFIG.items():
cat = config["category"]
categories[cat] = categories.get(cat, 0) + 1
print("\nDirections par categorie:")
for cat, count in sorted(categories.items()):
print(f" - {cat}: {count}")
# Creer les directions manquantes
new_directions = [name for name in DIRECTIONS_CONFIG if name not in existing_names]
print(f"\n[INFO] {len(new_directions)} nouvelles directions a creer")
if not new_directions:
print("[OK] Toutes les directions existent deja!")
return 0
# Creation
print("\n" + "-" * 70)
print("CREATION DES DIRECTIONS")
print("-" * 70)
created = 0
errors = 0
start_time = time.time()
for i, name in enumerate(new_directions, 1):
config = DIRECTIONS_CONFIG[name]
print(f"\n[{i}/{len(new_directions)}] {name} ({config['category']})")
try:
# Creer le vecteur direction par contraste
direction_vector = create_direction_by_contrast(
config["positive_examples"],
config["negative_examples"],
model
)
# Sauvegarder dans Weaviate
obj_id = save_direction(name, config, direction_vector)
if obj_id:
print(f" [OK] Cree: {obj_id[:8]}...")
created += 1
else:
print(f" [ERREUR] Echec de sauvegarde")
errors += 1
except Exception as e:
print(f" [ERREUR] {e}")
errors += 1
elapsed = time.time() - start_time
# Resume
print("\n" + "=" * 70)
print("RESUME")
print("=" * 70)
print(f"Directions creees: {created}")
print(f"Erreurs: {errors}")
print(f"Temps: {elapsed:.1f}s ({elapsed/max(1,created):.1f}s par direction)")
# Verification finale
final_directions = get_all_directions()
print(f"\nTotal directions dans Weaviate: {len(final_directions)}")
# Afficher par categorie
final_categories = {}
for d in final_directions:
cat = d.get("category", "unknown")
final_categories[cat] = final_categories.get(cat, 0) + 1
print("\nDirections par categorie (final):")
for cat, count in sorted(final_categories.items()):
print(f" - {cat}: {count}")
return 0 if errors == 0 else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""
Script to generate David's embedding from his messages.
Returns JSON with the embedding vector.
Usage:
python scripts/embed_david.py "concatenated text of david's messages"
Output (JSON):
{"vector": [0.1, 0.2, ...], "dimension": 1024}
"""
import sys
import json
from sentence_transformers import SentenceTransformer
def main():
if len(sys.argv) < 2:
print(json.dumps({"error": "No text provided"}))
sys.exit(1)
text = sys.argv[1]
if len(text) < 10:
print(json.dumps({"error": "Text too short"}))
sys.exit(1)
try:
# Load BGE-M3 model (same as used for Ikario's embeddings)
model = SentenceTransformer('BAAI/bge-m3')
# Generate embedding
vector = model.encode(text, normalize_embeddings=True)
# Return as JSON
result = {
"vector": vector.tolist(),
"dimension": len(vector)
}
print(json.dumps(result))
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,198 @@
#!/usr/bin/env python3
"""
Phase 1 : Creation de la collection StateVector et de S(0).
Ce script:
1. Cree la collection StateVector dans Weaviate
2. Recupere et filtre les pensees (exclut les tests)
3. Recupere et filtre les messages d'Ikario (assistant uniquement)
4. Calcule l'embedding agrege avec BGE-M3
5. Cree l'etat initial S(0)
Usage:
python phase1_state_vector.py
python phase1_state_vector.py --dry-run
python phase1_state_vector.py --reset # Supprime et recree
"""
import argparse
import sys
from pathlib import Path
# Ajouter le parent au path
sys.path.insert(0, str(Path(__file__).parent.parent))
from state_vector import (
check_weaviate_ready,
get_existing_classes,
create_state_vector_collection,
delete_state_vector_collection,
get_all_thoughts,
get_all_messages,
filter_thoughts,
filter_assistant_messages,
compute_aggregate_embedding,
create_initial_state,
get_current_state_id,
get_state_vector,
)
def print_section(title: str):
print("\n" + "=" * 60)
print(title)
print("=" * 60)
def main():
parser = argparse.ArgumentParser(
description="Phase 1: Creation StateVector et S(0)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Simuler sans creer"
)
parser.add_argument(
"--reset",
action="store_true",
help="Supprimer et recreer la collection"
)
args = parser.parse_args()
print_section("PHASE 1 : STATEVECTOR ET S(0)")
# 1. Verifier Weaviate
print("\n[1/6] Verification Weaviate...")
if not check_weaviate_ready():
print("ERREUR: Weaviate non accessible")
sys.exit(1)
print(" Weaviate [OK]")
# 2. Gerer la collection StateVector
print("\n[2/6] Collection StateVector...")
existing = get_existing_classes()
if "StateVector" in existing:
if args.reset:
print(" Suppression de la collection existante...")
if not args.dry_run:
delete_state_vector_collection()
print(" Collection supprimee")
else:
print(" [DRY-RUN] Suppression simulee")
else:
# Verifier si S(0) existe deja
current_id = get_current_state_id()
if current_id >= 0:
print(f" Collection existe avec {current_id + 1} etat(s)")
print(" Utilisez --reset pour reinitialiser")
sys.exit(0)
# Creer la collection
if args.dry_run:
print(" [DRY-RUN] Creation collection simulee")
else:
if "StateVector" not in get_existing_classes():
create_state_vector_collection()
# 3. Recuperer et filtrer les pensees
print("\n[3/6] Recuperation des pensees...")
all_thoughts = get_all_thoughts()
print(f" Total pensees: {len(all_thoughts)}")
filtered_thoughts = filter_thoughts(all_thoughts)
excluded = len(all_thoughts) - len(filtered_thoughts)
print(f" Pensees filtrees: {len(filtered_thoughts)} (exclues: {excluded})")
# Afficher quelques exemples de pensees gardees
if filtered_thoughts:
print("\n Exemples de pensees gardees:")
for t in filtered_thoughts[:3]:
content = t.get("properties", {}).get("content", "")[:80]
print(f" - {content}...")
# 4. Recuperer et filtrer les messages
print("\n[4/6] Recuperation des messages...")
all_messages = get_all_messages()
print(f" Total messages: {len(all_messages)}")
filtered_messages = filter_assistant_messages(all_messages)
excluded = len(all_messages) - len(filtered_messages)
print(f" Messages Ikario: {len(filtered_messages)} (exclues: {excluded})")
# Afficher quelques exemples
if filtered_messages:
print("\n Exemples de messages Ikario:")
for m in filtered_messages[:3]:
content = m.get("properties", {}).get("content", "")[:80]
print(f" - {content}...")
# 5. Calculer l'embedding agrege
print("\n[5/6] Calcul de l'embedding agrege...")
if args.dry_run:
print(" [DRY-RUN] Embedding simule (1024 dims)")
embedding = None
else:
# Charger le modele BGE-M3
print(" Chargement du modele BGE-M3...")
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('BAAI/bge-m3')
print(" Modele charge [OK]")
except ImportError:
print("ERREUR: sentence-transformers non installe")
print(" pip install sentence-transformers")
sys.exit(1)
# Calculer l'embedding
print(" Calcul de l'embedding agrege...")
embedding = compute_aggregate_embedding(
filtered_thoughts,
filtered_messages,
model
)
print(f" Embedding calcule: {embedding.shape} (norme: {embedding.sum():.4f})")
# 6. Creer S(0)
print("\n[6/6] Creation de S(0)...")
if args.dry_run:
print(" [DRY-RUN] S(0) simule")
print(f" - {len(filtered_thoughts)} pensees")
print(f" - {len(filtered_messages)} messages")
else:
s0 = create_initial_state(
filtered_thoughts,
filtered_messages,
embedding
)
print(f" S(0) cree avec succes!")
print(f" - ID: {s0.get('id', 'N/A')}")
print(f" - Pensees sources: {s0['source_thoughts_count']}")
print(f" - Messages sources: {s0['source_messages_count']}")
# Resume
print_section("PHASE 1 TERMINEE")
if args.dry_run:
print("\n[DRY-RUN] Aucune modification effectuee")
else:
print("\nResultat:")
print(f" - Collection StateVector creee")
print(f" - S(0) cree a partir de:")
print(f" {len(filtered_thoughts)} pensees")
print(f" {len(filtered_messages)} messages")
print("\nTests de validation:")
print(" curl -s http://localhost:8080/v1/schema | jq '.classes[] | select(.class == \"StateVector\")'")
print(" curl -s 'http://localhost:8080/v1/objects?class=StateVector&limit=1' | jq '.objects[0].properties'")
print("\nProchaine etape:")
print(" python scripts/phase2_projection_directions.py")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""
Phase 2 : Creation des directions de projection.
Ce script:
1. Cree la collection ProjectionDirection dans Weaviate
2. Genere les vecteurs de direction par contraste (BGE-M3)
3. Sauvegarde les directions dans Weaviate
4. Calcule et affiche le profil de S(0)
Usage:
python phase2_projection_directions.py
python phase2_projection_directions.py --dry-run
python phase2_projection_directions.py --reset
"""
import argparse
import sys
from pathlib import Path
import numpy as np
# Ajouter le parent au path
sys.path.insert(0, str(Path(__file__).parent.parent))
from state_vector import (
check_weaviate_ready,
get_state_vector,
)
from projection_directions import (
get_existing_classes,
create_projection_direction_collection,
delete_projection_direction_collection,
create_direction_by_contrast,
save_direction,
get_all_directions,
get_state_profile,
format_profile,
DIRECTIONS_CONFIG,
)
def print_section(title: str):
print("\n" + "=" * 60)
print(title)
print("=" * 60)
def main():
parser = argparse.ArgumentParser(
description="Phase 2: Creation des directions de projection"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Simuler sans creer"
)
parser.add_argument(
"--reset",
action="store_true",
help="Supprimer et recreer la collection"
)
args = parser.parse_args()
print_section("PHASE 2 : DIRECTIONS DE PROJECTION")
# 1. Verifier Weaviate
print("\n[1/5] Verification Weaviate...")
if not check_weaviate_ready():
print("ERREUR: Weaviate non accessible")
sys.exit(1)
print(" Weaviate [OK]")
# 2. Gerer la collection ProjectionDirection
print("\n[2/5] Collection ProjectionDirection...")
existing = get_existing_classes()
if "ProjectionDirection" in existing:
if args.reset:
print(" Suppression de la collection existante...")
if not args.dry_run:
delete_projection_direction_collection()
print(" Collection supprimee")
else:
print(" [DRY-RUN] Suppression simulee")
else:
# Verifier combien de directions existent
directions = get_all_directions()
if len(directions) > 0:
print(f" Collection existe avec {len(directions)} directions")
print(" Utilisez --reset pour reinitialiser")
# Afficher le profil de S(0) quand meme
print("\n[INFO] Affichage du profil S(0) existant...")
s0 = get_state_vector(0)
if s0:
state_vec = np.array(s0.get("_additional", {}).get("vector", []))
if len(state_vec) > 0:
profile = get_state_profile(state_vec)
print(format_profile(profile))
sys.exit(0)
# Creer la collection
if args.dry_run:
print(" [DRY-RUN] Creation collection simulee")
else:
if "ProjectionDirection" not in get_existing_classes():
create_projection_direction_collection()
# 3. Charger le modele
print("\n[3/5] Chargement du modele BGE-M3...")
if args.dry_run:
print(" [DRY-RUN] Chargement simule")
model = None
else:
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('BAAI/bge-m3')
print(" Modele charge [OK]")
except ImportError:
print("ERREUR: sentence-transformers non installe")
print(" pip install sentence-transformers")
sys.exit(1)
# 4. Creer les directions
print("\n[4/5] Creation des directions par contraste...")
print(f" {len(DIRECTIONS_CONFIG)} directions a creer")
print()
created_count = 0
for name, config in DIRECTIONS_CONFIG.items():
category = config["category"]
positive = config["positive_examples"]
negative = config["negative_examples"]
if args.dry_run:
print(f" [DRY-RUN] {name} ({category})")
print(f" + {len(positive)} exemples positifs")
print(f" - {len(negative)} exemples negatifs")
created_count += 1
else:
# Calculer la direction
direction_vec = create_direction_by_contrast(positive, negative, model)
# Sauvegarder
obj_id = save_direction(name, config, direction_vec)
if obj_id:
print(f" [OK] {name} ({category})")
created_count += 1
else:
print(f" [FAIL] {name}")
print(f"\n Total: {created_count}/{len(DIRECTIONS_CONFIG)} directions creees")
# 5. Calculer le profil de S(0)
print("\n[5/5] Calcul du profil de S(0)...")
if args.dry_run:
print(" [DRY-RUN] Profil simule")
else:
# Recuperer S(0)
s0 = get_state_vector(0)
if not s0:
print(" ERREUR: S(0) non trouve. Executez d'abord phase1_state_vector.py")
sys.exit(1)
state_vec = np.array(s0.get("_additional", {}).get("vector", []))
if len(state_vec) == 0:
print(" ERREUR: S(0) n'a pas de vecteur")
sys.exit(1)
# Calculer le profil
profile = get_state_profile(state_vec)
print("\n PROFIL DE S(0) - Etat initial d'Ikario")
print(" " + "-" * 50)
print(format_profile(profile))
# Resume
print_section("PHASE 2 TERMINEE")
if args.dry_run:
print("\n[DRY-RUN] Aucune modification effectuee")
else:
print("\nResultat:")
print(f" - Collection ProjectionDirection creee")
print(f" - {created_count} directions creees:")
# Grouper par categorie
by_category = {}
for name, config in DIRECTIONS_CONFIG.items():
cat = config["category"]
if cat not in by_category:
by_category[cat] = []
by_category[cat].append(name)
for cat, names in sorted(by_category.items()):
print(f" {cat}: {', '.join(names)}")
print("\nTests de validation:")
print(" curl -s 'http://localhost:8080/v1/objects?class=ProjectionDirection' | jq '.objects | length'")
print(" python -c \"from projection_directions import *; print(get_all_directions())\"")
print("\nProchaine etape:")
print(" python scripts/phase3_transformation.py")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,285 @@
#!/usr/bin/env python3
"""
Script de vérification de la Phase 0.
Vérifie que tous les prérequis sont en place:
1. Weaviate est accessible
2. Les collections existent
3. Le backup fonctionne
4. La restauration (dry-run) fonctionne
Usage:
python verify_phase0.py
"""
import os
import sys
import tempfile
from pathlib import Path
import requests
# Configuration
WEAVIATE_URL = os.getenv("WEAVIATE_URL", "http://localhost:8080")
# Couleurs pour l'output (désactivées sur Windows si problème encodage)
import platform
if platform.system() == "Windows":
GREEN = ""
RED = ""
YELLOW = ""
RESET = ""
CHECK = "[OK]"
CROSS = "[FAIL]"
WARN = "[WARN]"
else:
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
RESET = "\033[0m"
CHECK = "\u2713"
CROSS = "\u2717"
WARN = "\u26A0"
def print_ok(msg: str):
print(f" {GREEN}{CHECK}{RESET} {msg}")
def print_fail(msg: str):
print(f" {RED}{CROSS}{RESET} {msg}")
def print_warn(msg: str):
print(f" {YELLOW}{WARN}{RESET} {msg}")
def check_weaviate_connection() -> bool:
"""Vérifie la connexion à Weaviate."""
print("\n[1/5] Connexion Weaviate...")
try:
response = requests.get(f"{WEAVIATE_URL}/v1/.well-known/ready", timeout=5)
if response.status_code == 200:
print_ok(f"Weaviate accessible sur {WEAVIATE_URL}")
return True
else:
print_fail(f"Weaviate répond avec status {response.status_code}")
return False
except requests.RequestException as e:
print_fail(f"Impossible de se connecter à Weaviate: {e}")
return False
def check_collections() -> tuple[bool, list[str]]:
"""Vérifie les collections existantes."""
print("\n[2/5] Collections Weaviate...")
try:
response = requests.get(f"{WEAVIATE_URL}/v1/schema")
schema = response.json()
classes = [c["class"] for c in schema.get("classes", [])]
expected = ["Thought", "Conversation", "Message", "Chunk", "Work", "Summary"]
found = [c for c in classes if c in expected]
missing = [c for c in expected if c not in classes]
if found:
print_ok(f"Collections trouvées: {', '.join(found)}")
if missing:
print_warn(f"Collections manquantes: {', '.join(missing)}")
# Compter les objets
for class_name in found:
response = requests.get(f"{WEAVIATE_URL}/v1/objects?class={class_name}&limit=1")
# Note: Pour avoir le count exact, il faudrait utiliser l'API aggregate
objects = response.json().get("objects", [])
if objects:
print_ok(f" {class_name}: contient des objets")
else:
print_warn(f" {class_name}: vide")
return len(found) > 0, found
except Exception as e:
print_fail(f"Erreur lors de la vérification du schéma: {e}")
return False, []
def check_backup_script() -> bool:
"""Vérifie que le script de backup fonctionne."""
print("\n[3/5] Script de backup...")
scripts_dir = Path(__file__).parent
backup_script = scripts_dir / "weaviate_backup.py"
if not backup_script.exists():
print_fail(f"Script non trouvé: {backup_script}")
return False
print_ok("Script weaviate_backup.py présent")
# Tester l'import
try:
sys.path.insert(0, str(scripts_dir))
from weaviate_backup import backup_weaviate, check_weaviate_ready
if check_weaviate_ready():
print_ok("Fonction check_weaviate_ready() fonctionne")
else:
print_fail("check_weaviate_ready() retourne False")
return False
except ImportError as e:
print_fail(f"Erreur d'import: {e}")
return False
# Tester un backup rapide
try:
with tempfile.TemporaryDirectory() as tmpdir:
output_path = Path(tmpdir) / "test_backup.json"
backup_weaviate(
output_path=output_path,
collections=["Thought"],
include_vectors=False
)
if output_path.exists() and output_path.stat().st_size > 0:
print_ok(f"Backup de test créé ({output_path.stat().st_size} bytes)")
return True
else:
print_fail("Backup de test vide ou non créé")
return False
except Exception as e:
print_fail(f"Erreur lors du backup de test: {e}")
return False
def check_restore_script() -> bool:
"""Vérifie que le script de restauration fonctionne."""
print("\n[4/5] Script de restauration...")
scripts_dir = Path(__file__).parent
restore_script = scripts_dir / "weaviate_restore.py"
if not restore_script.exists():
print_fail(f"Script non trouvé: {restore_script}")
return False
print_ok("Script weaviate_restore.py présent")
# Tester l'import
try:
sys.path.insert(0, str(scripts_dir))
from weaviate_restore import restore_weaviate, get_existing_classes
classes = get_existing_classes()
print_ok(f"Fonction get_existing_classes() retourne {len(classes)} classes")
return True
except ImportError as e:
print_fail(f"Erreur d'import: {e}")
return False
def check_directory_structure() -> bool:
"""Vérifie la structure des dossiers."""
print("\n[5/5] Structure des dossiers...")
base_dir = Path(__file__).parent.parent
required_dirs = [
base_dir,
base_dir / "scripts",
base_dir / "tests",
]
optional_dirs = [
base_dir.parent / "exports",
]
all_ok = True
for d in required_dirs:
if d.exists():
print_ok(f"Dossier: {d.relative_to(base_dir.parent)}")
else:
print_fail(f"Dossier manquant: {d.relative_to(base_dir.parent)}")
all_ok = False
for d in optional_dirs:
if d.exists():
print_ok(f"Dossier: {d.relative_to(base_dir.parent)}")
else:
print_warn(f"Dossier optionnel absent: {d.relative_to(base_dir.parent)}")
# Créer le dossier
d.mkdir(parents=True, exist_ok=True)
print_ok(f" → Créé: {d.relative_to(base_dir.parent)}")
return all_ok
def main():
print("=" * 60)
print("VÉRIFICATION PHASE 0 - Préparation et Backup")
print("=" * 60)
results = {}
# 1. Connexion Weaviate
results["weaviate"] = check_weaviate_connection()
if not results["weaviate"]:
print("\n" + "=" * 60)
print(f"{RED}ÉCHEC{RESET}: Weaviate n'est pas accessible.")
print("Assurez-vous que Weaviate tourne:")
print(" docker start weaviate")
print(" # ou")
print(" docker run -d --name weaviate -p 8080:8080 ...")
print("=" * 60)
sys.exit(1)
# 2. Collections
results["collections"], found_collections = check_collections()
# 3. Script backup
results["backup"] = check_backup_script()
# 4. Script restore
results["restore"] = check_restore_script()
# 5. Structure dossiers
results["structure"] = check_directory_structure()
# Résumé
print("\n" + "=" * 60)
print("RÉSUMÉ PHASE 0")
print("=" * 60)
all_passed = all(results.values())
for check, passed in results.items():
status = f"{GREEN}OK{RESET}" if passed else f"{RED}ÉCHEC{RESET}"
print(f" {check}: {status}")
print()
if all_passed:
print(f"{GREEN}{CHECK} PHASE 0 VALIDEE{RESET}")
print("\nProchaines etapes:")
print(" 1. Creer un backup complet:")
print(" python scripts/weaviate_backup.py --output exports/backup_phase0.json")
print(" 2. Creer la branche git:")
print(" git checkout -b feature/processual-v3")
print(" 3. Passer a la Phase 1:")
print(" python scripts/phase1_state_vector.py")
else:
print(f"{RED}{CROSS} PHASE 0 INCOMPLETE{RESET}")
print("\nCorrigez les erreurs ci-dessus avant de continuer.")
print("=" * 60)
sys.exit(0 if all_passed else 1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,250 @@
#!/usr/bin/env python3
"""
Backup complet de toutes les collections Weaviate.
Usage:
python weaviate_backup.py
python weaviate_backup.py --output exports/backup_20260131.json
python weaviate_backup.py --collections Thought,Conversation
Ce script exporte:
- Le schéma complet (classes et propriétés)
- Tous les objets de chaque collection
- Les vecteurs (embeddings) de chaque objet
"""
import argparse
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
import requests
# Configuration par défaut
WEAVIATE_URL = os.getenv("WEAVIATE_URL", "http://localhost:8080")
DEFAULT_OUTPUT_DIR = Path(__file__).parent.parent.parent / "exports"
def check_weaviate_ready() -> bool:
"""Vérifie que Weaviate est accessible."""
try:
response = requests.get(f"{WEAVIATE_URL}/v1/.well-known/ready", timeout=5)
return response.status_code == 200
except requests.RequestException:
return False
def get_schema() -> dict:
"""Récupère le schéma complet de Weaviate."""
response = requests.get(f"{WEAVIATE_URL}/v1/schema")
response.raise_for_status()
return response.json()
def get_all_objects(class_name: str, include_vector: bool = True) -> list[dict]:
"""
Récupère tous les objets d'une classe avec pagination.
Args:
class_name: Nom de la collection
include_vector: Inclure les vecteurs (embeddings)
Returns:
Liste de tous les objets
"""
objects = []
limit = 100
offset = 0
include_param = "vector" if include_vector else ""
while True:
url = f"{WEAVIATE_URL}/v1/objects?class={class_name}&limit={limit}&offset={offset}"
if include_param:
url += f"&include={include_param}"
response = requests.get(url)
if response.status_code != 200:
print(f" Erreur lors de la récupération de {class_name}: {response.status_code}")
break
data = response.json()
batch = data.get("objects", [])
if not batch:
break
objects.extend(batch)
offset += limit
# Progress
print(f" {class_name}: {len(objects)} objets récupérés...", end="\r")
print(f" {class_name}: {len(objects)} objets au total")
return objects
def backup_weaviate(
output_path: Path,
collections: list[str] | None = None,
include_vectors: bool = True
) -> dict:
"""
Effectue un backup complet de Weaviate.
Args:
output_path: Chemin du fichier de sortie
collections: Liste des collections à exporter (None = toutes)
include_vectors: Inclure les vecteurs
Returns:
Statistiques du backup
"""
print("=" * 60)
print("BACKUP WEAVIATE")
print("=" * 60)
print(f"URL: {WEAVIATE_URL}")
print(f"Output: {output_path}")
print(f"Include vectors: {include_vectors}")
print("-" * 60)
# Vérifier la connexion
if not check_weaviate_ready():
print("ERREUR: Weaviate n'est pas accessible")
print(f"Vérifiez que le serveur tourne sur {WEAVIATE_URL}")
sys.exit(1)
print("Weaviate connecte [OK]")
# Récupérer le schéma
print("\n[1/3] Récupération du schéma...")
schema = get_schema()
all_classes = [c["class"] for c in schema.get("classes", [])]
print(f" Classes trouvées: {', '.join(all_classes)}")
# Filtrer les collections si spécifié
if collections:
classes_to_backup = [c for c in all_classes if c in collections]
print(f" Collections sélectionnées: {', '.join(classes_to_backup)}")
else:
classes_to_backup = all_classes
# Récupérer les objets de chaque classe
print("\n[2/3] Récupération des objets...")
backup_data = {
"metadata": {
"timestamp": datetime.now().isoformat(),
"weaviate_url": WEAVIATE_URL,
"include_vectors": include_vectors,
"version": "1.0"
},
"schema": schema,
"collections": {}
}
stats = {}
for class_name in classes_to_backup:
objects = get_all_objects(class_name, include_vector=include_vectors)
backup_data["collections"][class_name] = objects
stats[class_name] = len(objects)
# Sauvegarder
print(f"\n[3/3] Sauvegarde dans {output_path}...")
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(backup_data, f, indent=2, ensure_ascii=False)
file_size = output_path.stat().st_size / (1024 * 1024) # MB
# Résumé
print("\n" + "=" * 60)
print("BACKUP TERMINÉ")
print("=" * 60)
print(f"Fichier: {output_path}")
print(f"Taille: {file_size:.2f} MB")
print("\nStatistiques par collection:")
total = 0
for class_name, count in stats.items():
print(f" - {class_name}: {count} objets")
total += count
print(f"\nTotal: {total} objets")
return stats
def main():
global WEAVIATE_URL # Declare global at start of function
parser = argparse.ArgumentParser(
description="Backup complet de Weaviate",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Exemples:
python weaviate_backup.py
python weaviate_backup.py --output backup.json
python weaviate_backup.py --collections Thought,Conversation
python weaviate_backup.py --no-vectors
"""
)
parser.add_argument(
"--output", "-o",
type=Path,
default=None,
help="Chemin du fichier de sortie (defaut: exports/backup_YYYYMMDD_HHMMSS.json)"
)
parser.add_argument(
"--collections", "-c",
type=str,
default=None,
help="Collections a exporter (separees par des virgules)"
)
parser.add_argument(
"--no-vectors",
action="store_true",
help="Ne pas inclure les vecteurs (plus rapide, fichier plus petit)"
)
parser.add_argument(
"--url",
type=str,
default=None,
help=f"URL Weaviate (defaut: {WEAVIATE_URL})"
)
args = parser.parse_args()
# URL Weaviate
if args.url:
WEAVIATE_URL = args.url
# Chemin de sortie
if args.output:
output_path = args.output
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
DEFAULT_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
output_path = DEFAULT_OUTPUT_DIR / f"backup_{timestamp}.json"
# Collections
collections = None
if args.collections:
collections = [c.strip() for c in args.collections.split(",")]
# Exécuter le backup
backup_weaviate(
output_path=output_path,
collections=collections,
include_vectors=not args.no_vectors
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,373 @@
#!/usr/bin/env python3
"""
Restauration de collections Weaviate depuis un backup.
Usage:
python weaviate_restore.py backup.json
python weaviate_restore.py backup.json --collections Thought,Conversation
python weaviate_restore.py backup.json --dry-run
python weaviate_restore.py backup.json --clear-existing
ATTENTION: Ce script peut supprimer des données existantes!
Utilisez --dry-run pour prévisualiser les actions.
"""
import argparse
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
import requests
# Configuration par défaut
WEAVIATE_URL = os.getenv("WEAVIATE_URL", "http://localhost:8080")
def check_weaviate_ready() -> bool:
"""Vérifie que Weaviate est accessible."""
try:
response = requests.get(f"{WEAVIATE_URL}/v1/.well-known/ready", timeout=5)
return response.status_code == 200
except requests.RequestException:
return False
def get_existing_classes() -> list[str]:
"""Récupère la liste des classes existantes."""
response = requests.get(f"{WEAVIATE_URL}/v1/schema")
response.raise_for_status()
schema = response.json()
return [c["class"] for c in schema.get("classes", [])]
def delete_class(class_name: str) -> bool:
"""Supprime une classe et tous ses objets."""
response = requests.delete(f"{WEAVIATE_URL}/v1/schema/{class_name}")
return response.status_code == 200
def create_class(class_schema: dict) -> bool:
"""Crée une classe avec son schéma."""
response = requests.post(
f"{WEAVIATE_URL}/v1/schema",
json=class_schema,
headers={"Content-Type": "application/json"}
)
return response.status_code == 200
def insert_object(class_name: str, obj: dict) -> bool:
"""
Insère un objet dans une classe.
Args:
class_name: Nom de la classe
obj: Objet complet du backup (avec id, properties, vector)
"""
data = {
"class": class_name,
"properties": obj.get("properties", {}),
}
# Préserver l'ID original si présent
if "id" in obj:
data["id"] = obj["id"]
# Inclure le vecteur si présent
if "vector" in obj:
data["vector"] = obj["vector"]
response = requests.post(
f"{WEAVIATE_URL}/v1/objects",
json=data,
headers={"Content-Type": "application/json"}
)
return response.status_code in [200, 201]
def batch_insert_objects(class_name: str, objects: list[dict], batch_size: int = 100) -> tuple[int, int]:
"""
Insère des objets par batch.
Returns:
(succès, échecs)
"""
success = 0
failures = 0
for i in range(0, len(objects), batch_size):
batch = objects[i:i + batch_size]
batch_data = {
"objects": [
{
"class": class_name,
"properties": obj.get("properties", {}),
**({"id": obj["id"]} if "id" in obj else {}),
**({"vector": obj["vector"]} if "vector" in obj else {}),
}
for obj in batch
]
}
response = requests.post(
f"{WEAVIATE_URL}/v1/batch/objects",
json=batch_data,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
for item in result:
if item.get("result", {}).get("status") == "SUCCESS":
success += 1
else:
failures += 1
error = item.get("result", {}).get("errors", {})
if error:
print(f" Erreur: {error}")
else:
failures += len(batch)
print(f" Erreur batch: {response.status_code}")
# Progress
progress = min(i + batch_size, len(objects))
print(f" {class_name}: {progress}/{len(objects)} objets traités...", end="\r")
print(f" {class_name}: {success} succès, {failures} échecs" + " " * 20)
return success, failures
def restore_weaviate(
backup_path: Path,
collections: list[str] | None = None,
clear_existing: bool = False,
dry_run: bool = False
) -> dict:
"""
Restaure des collections depuis un backup.
Args:
backup_path: Chemin du fichier de backup
collections: Collections à restaurer (None = toutes)
clear_existing: Supprimer les collections existantes avant restauration
dry_run: Prévisualiser sans effectuer les actions
Returns:
Statistiques de la restauration
"""
print("=" * 60)
print("RESTAURATION WEAVIATE")
if dry_run:
print("*** MODE DRY-RUN - Aucune modification ***")
print("=" * 60)
print(f"URL: {WEAVIATE_URL}")
print(f"Backup: {backup_path}")
print(f"Clear existing: {clear_existing}")
print("-" * 60)
# Vérifier la connexion
if not check_weaviate_ready():
print("ERREUR: Weaviate n'est pas accessible")
print(f"Vérifiez que le serveur tourne sur {WEAVIATE_URL}")
sys.exit(1)
print("Weaviate connecté ✓")
# Charger le backup
print(f"\n[1/4] Chargement du backup...")
with open(backup_path, "r", encoding="utf-8") as f:
backup_data = json.load(f)
metadata = backup_data.get("metadata", {})
print(f" Timestamp: {metadata.get('timestamp', 'N/A')}")
print(f" Source: {metadata.get('weaviate_url', 'N/A')}")
print(f" Vectors inclus: {metadata.get('include_vectors', False)}")
schema = backup_data.get("schema", {})
backup_collections = backup_data.get("collections", {})
# Déterminer les collections à restaurer
if collections:
classes_to_restore = [c for c in collections if c in backup_collections]
else:
classes_to_restore = list(backup_collections.keys())
print(f"\n Collections à restaurer: {', '.join(classes_to_restore)}")
# Vérifier les collections existantes
print(f"\n[2/4] Vérification des collections existantes...")
existing_classes = get_existing_classes()
print(f" Collections existantes: {', '.join(existing_classes) or '(aucune)'}")
conflicts = [c for c in classes_to_restore if c in existing_classes]
if conflicts:
print(f" Conflits détectés: {', '.join(conflicts)}")
if clear_existing:
print(" → Seront supprimées (--clear-existing)")
else:
print(" → Seront ignorées (utilisez --clear-existing pour les remplacer)")
classes_to_restore = [c for c in classes_to_restore if c not in conflicts]
if not classes_to_restore:
print("\nAucune collection à restaurer.")
return {}
# Préparer le schéma
print(f"\n[3/4] Préparation du schéma...")
schema_classes = {c["class"]: c for c in schema.get("classes", [])}
# Supprimer les collections existantes si demandé
if clear_existing and conflicts:
print("\n Suppression des collections existantes...")
for class_name in conflicts:
if dry_run:
print(f" [DRY-RUN] Suppression de {class_name}")
else:
if delete_class(class_name):
print(f" Supprimé: {class_name}")
else:
print(f" ERREUR suppression: {class_name}")
# Créer les classes
print("\n Création des classes...")
for class_name in classes_to_restore:
if class_name in schema_classes:
class_schema = schema_classes[class_name]
if dry_run:
print(f" [DRY-RUN] Création de {class_name}")
else:
# Vérifier si existe déjà (après clear)
current_classes = get_existing_classes()
if class_name not in current_classes:
if create_class(class_schema):
print(f" Créé: {class_name}")
else:
print(f" ERREUR création: {class_name}")
else:
print(f" Existe déjà: {class_name}")
else:
print(f" Schéma manquant pour: {class_name}")
# Insérer les objets
print(f"\n[4/4] Insertion des objets...")
stats = {"success": 0, "failures": 0, "by_class": {}}
for class_name in classes_to_restore:
objects = backup_collections.get(class_name, [])
if not objects:
print(f" {class_name}: 0 objets")
continue
if dry_run:
print(f" [DRY-RUN] {class_name}: {len(objects)} objets à insérer")
stats["by_class"][class_name] = {"success": len(objects), "failures": 0}
stats["success"] += len(objects)
else:
success, failures = batch_insert_objects(class_name, objects)
stats["by_class"][class_name] = {"success": success, "failures": failures}
stats["success"] += success
stats["failures"] += failures
# Résumé
print("\n" + "=" * 60)
print("RESTAURATION TERMINÉE" + (" (DRY-RUN)" if dry_run else ""))
print("=" * 60)
print("\nStatistiques par collection:")
for class_name, class_stats in stats.get("by_class", {}).items():
print(f" - {class_name}: {class_stats['success']} succès, {class_stats['failures']} échecs")
print(f"\nTotal: {stats['success']} succès, {stats['failures']} échecs")
return stats
def main():
global WEAVIATE_URL # Declare global at start of function
parser = argparse.ArgumentParser(
description="Restauration de Weaviate depuis un backup",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Exemples:
python weaviate_restore.py backup.json
python weaviate_restore.py backup.json --dry-run
python weaviate_restore.py backup.json --collections Thought,Conversation
python weaviate_restore.py backup.json --clear-existing
ATTENTION: --clear-existing supprime les donnees existantes!
"""
)
parser.add_argument(
"backup",
type=Path,
help="Chemin du fichier de backup"
)
parser.add_argument(
"--collections", "-c",
type=str,
default=None,
help="Collections à restaurer (séparées par des virgules)"
)
parser.add_argument(
"--clear-existing",
action="store_true",
help="Supprimer les collections existantes avant restauration"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Prévisualiser les actions sans les exécuter"
)
parser.add_argument(
"--url",
type=str,
default=None,
help=f"URL Weaviate (défaut: {WEAVIATE_URL})"
)
args = parser.parse_args()
# Vérifier que le fichier existe
if not args.backup.exists():
print(f"ERREUR: Fichier non trouvé: {args.backup}")
sys.exit(1)
# URL Weaviate
if args.url:
WEAVIATE_URL = args.url
# Collections
collections = None
if args.collections:
collections = [c.strip() for c in args.collections.split(",")]
# Confirmation si clear_existing et pas dry_run
if args.clear_existing and not args.dry_run:
print("⚠️ ATTENTION: --clear-existing va SUPPRIMER des données!")
print(" Utilisez --dry-run pour prévisualiser.")
response = input(" Continuer? [y/N] ")
if response.lower() != "y":
print("Annulé.")
sys.exit(0)
# Exécuter la restauration
restore_weaviate(
backup_path=args.backup,
collections=collections,
clear_existing=args.clear_existing,
dry_run=args.dry_run
)
if __name__ == "__main__":
main()