Files
linear-coding-agent/ikario_processual/vigilance.py
David Blanc Brioir f6fe71e2f7 Add Ikario Architecture v2 - Phases 1-8 complete
Implements the processual architecture based on Whitehead's Process
Philosophy and Peirce's Semiotics. Core paradigm: "L'espace latent
pense. Le LLM traduit." (The latent space thinks. The LLM translates.)

Phase 1-4: Core semiotic cycle
- StateTensor 8x1024 (8 Peircean dimensions)
- Dissonance computation with hard negatives
- Fixation via 4 Peircean methods (Tenacity, Authority, A Priori, Science)
- LatentEngine orchestrating the full cycle

Phase 5: StateToLanguage
- LLM as pure translator (zero-reasoning, T=0)
- Projection on interpretable directions
- Reasoning markers detection (Amendment #4)

Phase 6: Vigilance
- x_ref (David) as guard-rail, NOT attractor
- Drift detection per dimension and globally
- Alerts: ok, warning, critical

Phase 7: Autonomous Daemon
- Two modes: CONVERSATION (always verbalize), AUTONOMOUS (~1000 cycles/day)
- Amendment #5: 50% probability on unresolved impacts
- TriggerGenerator with weighted random selection

Phase 8: Integration & Metrics
- ProcessMetrics for daily/weekly reports
- Health status monitoring
- Integration tests validating all modules

297 tests passing, version 0.7.0

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 22:30:19 +01:00

632 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Vigilance System - Surveillance de la derive d'Ikario par rapport a x_ref (David).
Phase 6 de l'architecture processuelle v2.
x_ref N'EST PAS un attracteur. Ikario ne "tend" pas vers David.
x_ref EST un garde-fou. Si distance > seuil → ALERTE.
Ce module :
1. Definit x_ref comme StateTensor (profil de David)
2. Calcule la distance par dimension et globalement
3. Detecte les derives et genere des alertes
4. Permet le reset apres validation de David
Amendement #15 : Comparaison StateTensor Ikario <-> x_ref David
"""
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
import numpy as np
from .state_tensor import StateTensor, DIMENSION_NAMES, EMBEDDING_DIM
# Logger
logger = logging.getLogger(__name__)
@dataclass
class VigilanceAlert:
"""Alerte de vigilance quand Ikario derive de x_ref."""
level: str # "ok", "warning", "critical"
message: str = ""
dimensions: Dict[str, float] = field(default_factory=dict)
cumulative_drift: float = 0.0
per_cycle_drift: float = 0.0
timestamp: str = field(default_factory=lambda: datetime.now().isoformat() + "Z")
state_id: int = 0
top_drifting_dimensions: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Serialise en dictionnaire."""
return {
'level': self.level,
'message': self.message,
'dimensions': self.dimensions,
'cumulative_drift': self.cumulative_drift,
'per_cycle_drift': self.per_cycle_drift,
'timestamp': self.timestamp,
'state_id': self.state_id,
'top_drifting_dimensions': self.top_drifting_dimensions,
}
@property
def is_alert(self) -> bool:
"""True si alerte (warning ou critical)."""
return self.level in ("warning", "critical")
@dataclass
class VigilanceConfig:
"""Configuration du systeme de vigilance."""
# Seuil de derive cumulative (fraction)
threshold_cumulative: float = 0.01 # 1% cumule
# Seuil de derive par cycle (fraction)
threshold_per_cycle: float = 0.002 # 0.2% par cycle
# Seuil par dimension (cosine distance)
threshold_per_dimension: float = 0.05 # 5% par dimension
# Seuil critique (multiplicateur)
critical_multiplier: float = 2.0 # critical = 2x le seuil
def validate(self) -> bool:
"""Verifie que la config est valide."""
return (
0 < self.threshold_cumulative < 1 and
0 < self.threshold_per_cycle < 1 and
0 < self.threshold_per_dimension < 1 and
self.critical_multiplier > 1
)
class VigilanceSystem:
"""
Surveille la derive d'Ikario par rapport a x_ref (David).
x_ref est un garde-fou, PAS un attracteur.
Niveaux d'alerte :
- "ok" : Pas de derive significative
- "warning" : Derive detectee (> seuil)
- "critical" : Derive importante (> 2x seuil)
"""
def __init__(
self,
x_ref: StateTensor,
config: Optional[VigilanceConfig] = None,
):
"""
Initialise le systeme de vigilance.
Args:
x_ref: Tenseur de reference (profil de David, fixe)
config: Configuration des seuils
"""
self.x_ref = x_ref
self.config = config or VigilanceConfig()
self.cumulative_drift = 0.0
self.X_prev: Optional[StateTensor] = None
self.history: List[VigilanceAlert] = []
self._alerts_count = {'ok': 0, 'warning': 0, 'critical': 0}
def check_drift(self, X_t: StateTensor) -> VigilanceAlert:
"""
Compare l'etat actuel X_t avec x_ref et l'etat precedent.
Args:
X_t: Etat actuel d'Ikario
Returns:
VigilanceAlert avec niveau et details de la derive.
"""
# 1. Distance par dimension (cosine distance)
dim_distances = self._distance_per_dimension(X_t)
# 2. Distance globale normalisee
global_distance = self._global_distance(X_t)
# 3. Drift incremental (si etat precedent existe)
per_cycle_drift = 0.0
if self.X_prev is not None:
per_cycle_drift = self._compute_distance(X_t, self.X_prev)
self.cumulative_drift += per_cycle_drift
self.X_prev = X_t.copy()
# 4. Identifier les dimensions en derive
drifting_dims = {
dim: dist for dim, dist in dim_distances.items()
if dist > self.config.threshold_per_dimension
}
# Top 3 dimensions en derive
sorted_dims = sorted(dim_distances.items(), key=lambda x: x[1], reverse=True)
top_drifting = [d[0] for d in sorted_dims[:3]]
# 5. Determiner niveau d'alerte
critical_threshold = self.config.threshold_cumulative * self.config.critical_multiplier
warning_threshold = self.config.threshold_cumulative
if self.cumulative_drift > critical_threshold:
level = "critical"
message = f"DERIVE CRITIQUE : {self.cumulative_drift:.2%} cumule (seuil: {warning_threshold:.2%})"
elif self.cumulative_drift > warning_threshold or len(drifting_dims) > 2:
level = "warning"
message = f"Derive detectee : {self.cumulative_drift:.2%} cumule"
if drifting_dims:
message += f", dimensions en derive : {list(drifting_dims.keys())}"
elif per_cycle_drift > self.config.threshold_per_cycle:
level = "warning"
message = f"Derive rapide ce cycle : {per_cycle_drift:.2%}"
else:
level = "ok"
message = ""
alert = VigilanceAlert(
level=level,
message=message,
dimensions=dim_distances,
cumulative_drift=self.cumulative_drift,
per_cycle_drift=per_cycle_drift,
state_id=X_t.state_id,
top_drifting_dimensions=top_drifting,
)
self.history.append(alert)
self._alerts_count[level] += 1
if level != "ok":
logger.warning(f"Vigilance {level}: {message}")
return alert
def _distance_per_dimension(self, X_t: StateTensor) -> Dict[str, float]:
"""
Distance cosine par dimension (0=identique, 1=orthogonal, 2=oppose).
Args:
X_t: Etat actuel
Returns:
Dict dimension -> distance cosine
"""
distances = {}
for dim_name in DIMENSION_NAMES:
vec_ikario = getattr(X_t, dim_name)
vec_david = getattr(self.x_ref, dim_name)
# Cosine distance = 1 - cosine_similarity
norm_ikario = np.linalg.norm(vec_ikario)
norm_david = np.linalg.norm(vec_david)
if norm_ikario > 0 and norm_david > 0:
cos_sim = np.dot(vec_ikario, vec_david) / (norm_ikario * norm_david)
distances[dim_name] = 1 - cos_sim
else:
distances[dim_name] = 1.0 # Max distance si un vecteur est nul
return distances
def _global_distance(self, X_t: StateTensor) -> float:
"""
Distance L2 normalisee sur les 8192 dimensions.
Args:
X_t: Etat actuel
Returns:
Distance L2 normalisee
"""
flat_ikario = X_t.to_flat() # 8192 dims
flat_david = self.x_ref.to_flat() # 8192 dims
diff = flat_ikario - flat_david
norm_david = np.linalg.norm(flat_david)
if norm_david > 0:
return np.linalg.norm(diff) / norm_david
return np.linalg.norm(diff)
def _compute_distance(self, X1: StateTensor, X2: StateTensor) -> float:
"""
Distance normalisee entre deux etats.
Args:
X1: Premier etat
X2: Second etat (reference pour normalisation)
Returns:
Distance L2 normalisee
"""
diff = X1.to_flat() - X2.to_flat()
norm_ref = np.linalg.norm(X2.to_flat())
if norm_ref > 0:
return np.linalg.norm(diff) / norm_ref
return np.linalg.norm(diff)
def reset_cumulative(self) -> None:
"""
Reset le compteur de derive cumulative.
A utiliser apres validation explicite de David.
"""
logger.info(f"Reset cumulative drift from {self.cumulative_drift:.2%}")
self.cumulative_drift = 0.0
def get_stats(self) -> Dict[str, Any]:
"""Retourne les statistiques de vigilance."""
return {
'cumulative_drift': self.cumulative_drift,
'total_checks': len(self.history),
'alerts_count': self._alerts_count.copy(),
'recent_alerts': [a.to_dict() for a in self.history[-10:]],
}
class DavidReference:
"""
Factory pour creer x_ref (profil de David) comme StateTensor.
Sert de garde-fou (NOT attracteur) pour le systeme de vigilance.
"""
@staticmethod
def create_from_declared_profile(
profile_path: str,
embedding_model=None,
) -> StateTensor:
"""
Cree x_ref a partir du profil declare (JSON).
Le profil contient des valeurs [-10, +10] par direction.
On utilise ces valeurs pour ponderer les directions du tenseur.
Args:
profile_path: Chemin vers le fichier JSON du profil
embedding_model: Modele d'embedding (SentenceTransformer)
Returns:
StateTensor representant David
"""
with open(profile_path, 'r', encoding='utf-8') as f:
profile_data = json.load(f)
profile = profile_data.get("profile", {})
# Creer un tenseur vide
x_ref = StateTensor(
state_id=-1, # ID special pour x_ref
timestamp=datetime.now().isoformat() + "Z",
)
if embedding_model is None:
# Mode sans modele : creer des vecteurs aleatoires deterministes
# base sur les valeurs du profil
np.random.seed(42)
for dim_name in DIMENSION_NAMES:
v = np.random.randn(EMBEDDING_DIM)
v = v / np.linalg.norm(v)
setattr(x_ref, dim_name, v)
return x_ref
# Avec modele : creer des embeddings depuis les descriptions
# Mapping dimensions -> categories du profil
dim_to_category = {
'firstness': 'epistemic',
'secondness': 'metacognitive',
'thirdness': 'philosophical',
'dispositions': 'affective',
'orientations': 'temporal',
'engagements': 'relational',
'pertinences': 'thematic',
'valeurs': 'ethical',
}
for dim_name in DIMENSION_NAMES:
category = dim_to_category.get(dim_name, 'epistemic')
category_profile = profile.get(category, {})
# Construire une description textuelle basee sur les valeurs
descriptions = []
for trait, value in category_profile.items():
if value > 5:
descriptions.append(f"tres {trait}")
elif value > 2:
descriptions.append(f"moderement {trait}")
elif value < -5:
descriptions.append(f"pas du tout {trait}")
elif value < -2:
descriptions.append(f"peu {trait}")
if descriptions:
text = f"David est {', '.join(descriptions[:5])}."
else:
text = f"David a un profil {category} neutre."
# Embedding du texte
embedding = embedding_model.encode(text)
if isinstance(embedding, list):
embedding = np.array(embedding)
# Normaliser
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
setattr(x_ref, dim_name, embedding)
return x_ref
@staticmethod
def create_from_history(
weaviate_client,
n_sessions: int = 100,
) -> StateTensor:
"""
Cree x_ref a partir de l'historique des conversations.
x_ref = moyenne ponderee des etats pendant conversations authentiques.
Args:
weaviate_client: Client Weaviate v4
n_sessions: Nombre de sessions a utiliser
Returns:
StateTensor moyenne ponderee
"""
try:
from weaviate.classes.query import Sort
except ImportError:
logger.warning("Weaviate classes not available, using empty tensor")
return StateTensor(state_id=-1, timestamp=datetime.now().isoformat() + "Z")
try:
collection = weaviate_client.collections.get("StateTensor")
results = collection.query.fetch_objects(
limit=n_sessions,
sort=Sort.by_property("timestamp", ascending=False),
include_vector=True,
)
if not results.objects:
raise ValueError("Aucun etat historique trouve")
states = []
for obj in results.objects:
tensor = StateTensor(
state_id=obj.properties.get("state_id", 0),
timestamp=obj.properties.get("timestamp", ""),
)
# Extraire les vecteurs depuis named vectors
if hasattr(obj, 'vector') and isinstance(obj.vector, dict):
for dim_name in DIMENSION_NAMES:
if dim_name in obj.vector:
setattr(tensor, dim_name, np.array(obj.vector[dim_name]))
states.append(tensor)
# Ponderation exponentielle (recents = plus de poids)
weights = np.exp(-np.arange(len(states)) * 0.01)
weights /= weights.sum()
return StateTensor.weighted_mean(states, weights)
except Exception as e:
logger.error(f"Erreur creation x_ref depuis historique: {e}")
return StateTensor(state_id=-1, timestamp=datetime.now().isoformat() + "Z")
@staticmethod
def create_hybrid(
profile_path: str,
weaviate_client,
embedding_model=None,
alpha: float = 0.7,
) -> StateTensor:
"""
RECOMMANDE : 70% profil declare + 30% historique observe.
Args:
profile_path: Chemin vers le profil JSON
weaviate_client: Client Weaviate
embedding_model: Modele d'embedding
alpha: Poids du profil declare (default 0.7)
Returns:
StateTensor mixte
"""
x_declared = DavidReference.create_from_declared_profile(
profile_path, embedding_model
)
x_observed = DavidReference.create_from_history(weaviate_client)
return StateTensor.blend(x_declared, x_observed, alpha=alpha)
class VigilanceVisualizer:
"""Visualisation de la distance par dimension."""
@staticmethod
def format_distance_report(
X_t: StateTensor,
x_ref: StateTensor,
cumulative_drift: float = 0.0,
) -> str:
"""
Genere un rapport textuel de la distance.
Args:
X_t: Etat actuel d'Ikario
x_ref: Reference David
cumulative_drift: Derive cumulative actuelle
Returns:
Rapport formate en texte
"""
lines = ["=== RAPPORT VIGILANCE ===", ""]
lines.append(f"Derive cumulative : {cumulative_drift:.2%}")
lines.append("")
lines.append("Distance par dimension :")
lines.append("-" * 50)
distances = []
for dim_name in DIMENSION_NAMES:
vec_ikario = getattr(X_t, dim_name)
vec_david = getattr(x_ref, dim_name)
norm_i = np.linalg.norm(vec_ikario)
norm_d = np.linalg.norm(vec_david)
if norm_i > 0 and norm_d > 0:
cos_sim = np.dot(vec_ikario, vec_david) / (norm_i * norm_d)
distance = 1 - cos_sim
else:
distance = 1.0
distances.append((dim_name, distance))
# Trier par distance decroissante
distances.sort(key=lambda x: x[1], reverse=True)
for dim_name, distance in distances:
# Barre de progression
bar_len = 20
filled = int(distance * bar_len)
bar = "#" * filled + "-" * (bar_len - filled)
# Indicateur de niveau
if distance > 0.05:
level = "[!]"
elif distance > 0.02:
level = "[~]"
else:
level = "[OK]"
lines.append(f" {dim_name:15} [{bar}] {distance:.3f} {level}")
lines.append("")
# Distance globale
flat_i = X_t.to_flat()
flat_d = x_ref.to_flat()
global_dist = np.linalg.norm(flat_i - flat_d) / (np.linalg.norm(flat_d) + 1e-8)
lines.append(f"Distance globale L2 : {global_dist:.4f}")
return "\n".join(lines)
@staticmethod
def radar_chart(
X_t: StateTensor,
x_ref: StateTensor,
save_path: Optional[str] = None,
):
"""
Genere un radar chart des 8 dimensions.
Args:
X_t: Etat actuel d'Ikario
x_ref: Reference David
save_path: Chemin pour sauvegarder l'image (optionnel)
Returns:
Figure matplotlib ou None
"""
try:
import matplotlib.pyplot as plt
except ImportError:
logger.warning("matplotlib not available for radar chart")
return None
dimensions = DIMENSION_NAMES
values = []
for dim in dimensions:
vec_ikario = getattr(X_t, dim)
vec_david = getattr(x_ref, dim)
norm_i = np.linalg.norm(vec_ikario)
norm_d = np.linalg.norm(vec_david)
if norm_i > 0 and norm_d > 0:
cos_sim = np.dot(vec_ikario, vec_david) / (norm_i * norm_d)
distance = 1 - cos_sim
else:
distance = 1.0
values.append(distance)
# Fermer le polygone
values += values[:1]
angles = np.linspace(0, 2 * np.pi, len(dimensions), endpoint=False).tolist()
angles += angles[:1]
fig, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(projection='polar'))
ax.fill(angles, values, color='red', alpha=0.25)
ax.plot(angles, values, color='red', linewidth=2)
ax.set_xticks(angles[:-1])
ax.set_xticklabels(dimensions)
ax.set_ylim(0, 1)
ax.set_title(
"Distance Ikario - David (x_ref) par dimension\n0=identique, 1=orthogonal",
fontsize=12
)
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close()
return None
return fig
def create_vigilance_system(
profile_path: str = None,
weaviate_client=None,
embedding_model=None,
config: Optional[VigilanceConfig] = None,
) -> VigilanceSystem:
"""
Factory pour creer un systeme de vigilance configure.
Args:
profile_path: Chemin vers le profil declare de David
weaviate_client: Client Weaviate (optionnel, pour historique)
embedding_model: Modele d'embedding (optionnel)
config: Configuration des seuils
Returns:
VigilanceSystem configure
"""
if profile_path and weaviate_client:
# Mode hybride recommande
x_ref = DavidReference.create_hybrid(
profile_path, weaviate_client, embedding_model
)
elif profile_path:
# Mode profil declare uniquement
x_ref = DavidReference.create_from_declared_profile(
profile_path, embedding_model
)
elif weaviate_client:
# Mode historique uniquement
x_ref = DavidReference.create_from_history(weaviate_client)
else:
# Mode test : tenseur aleatoire
x_ref = StateTensor(
state_id=-1,
timestamp=datetime.now().isoformat() + "Z",
)
np.random.seed(42)
for dim_name in DIMENSION_NAMES:
v = np.random.randn(EMBEDDING_DIM)
v = v / np.linalg.norm(v)
setattr(x_ref, dim_name, v)
return VigilanceSystem(x_ref=x_ref, config=config)