Add unified memory tools: search_memories, trace_concept_evolution, check_consistency, update_thought_evolution_stage

- Create memory/mcp/unified_tools.py with 4 new handlers:
  - search_memories: unified search across Thoughts and Conversations
  - trace_concept_evolution: track concept development over time
  - check_consistency: verify statement alignment with past content
  - update_thought_evolution_stage: update thought maturity stage
- Export new tools from memory/mcp/__init__.py
- Register new tools in mcp_server.py with full docstrings

These tools complete the Ikario memory toolset to match memoryTools.js expectations.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-30 17:24:30 +01:00
parent 376d77ddfa
commit 9e657cbf29
3 changed files with 721 additions and 0 deletions

View File

@@ -30,6 +30,17 @@ from memory.mcp.conversation_tools import (
list_conversations_handler,
)
from memory.mcp.unified_tools import (
SearchMemoriesInput,
TraceConceptEvolutionInput,
CheckConsistencyInput,
UpdateThoughtEvolutionStageInput,
search_memories_handler,
trace_concept_evolution_handler,
check_consistency_handler,
update_thought_evolution_stage_handler,
)
__all__ = [
# Thought tools
"AddThoughtInput",
@@ -53,4 +64,14 @@ __all__ = [
"get_conversation_handler",
"search_conversations_handler",
"list_conversations_handler",
# Unified tools (cross-collection)
"SearchMemoriesInput",
"TraceConceptEvolutionInput",
"CheckConsistencyInput",
"UpdateThoughtEvolutionStageInput",
"search_memories_handler",
"trace_concept_evolution_handler",
"check_consistency_handler",
"update_thought_evolution_stage_handler",
]

521
memory/mcp/unified_tools.py Normal file
View File

@@ -0,0 +1,521 @@
"""
Unified Memory MCP Tools.
Provides unified search and analysis tools that work across
Thoughts and Conversations collections.
"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
import weaviate
from weaviate.classes.query import MetadataQuery
import os
from datetime import datetime, timedelta
import re
# =============================================================================
# Input Models
# =============================================================================
class SearchMemoriesInput(BaseModel):
"""Input for unified memory search."""
query: str = Field(description="Search query text (can be empty for listing)")
n_results: int = Field(default=5, ge=1, le=20, description="Number of results")
filter_type: Optional[str] = Field(default=None, description="Filter: 'thoughts' or 'conversations'")
since: Optional[str] = Field(default=None, description="Filter after date (ISO or relative: 7d, 3h, 1w)")
before: Optional[str] = Field(default=None, description="Filter before date (ISO only)")
sort_by: Optional[str] = Field(default="relevance", description="Sort: relevance, date_desc, date_asc")
class TraceConceptEvolutionInput(BaseModel):
"""Input for concept evolution tracing."""
concept: str = Field(description="The concept to trace")
limit: int = Field(default=10, ge=1, le=50, description="Max timeline points")
class CheckConsistencyInput(BaseModel):
"""Input for consistency checking."""
statement: str = Field(description="The statement to check for consistency")
class UpdateThoughtEvolutionStageInput(BaseModel):
"""Input for updating thought evolution stage."""
thought_id: str = Field(description="Thought ID (format: thought_YYYY-MM-DDTHH:MM:SS or UUID)")
new_stage: str = Field(description="New stage: nascent, developing, mature, revised, abandoned")
# =============================================================================
# Helper Functions
# =============================================================================
def get_weaviate_client():
"""Get Weaviate client from environment."""
url = os.environ.get("WEAVIATE_URL", "http://localhost:8080")
api_key = os.environ.get("WEAVIATE_API_KEY")
if api_key:
return weaviate.connect_to_custom(
http_host=url.replace("http://", "").replace("https://", "").split(":")[0],
http_port=int(url.split(":")[-1]) if ":" in url.split("/")[-1] else 8080,
http_secure=url.startswith("https"),
auth_credentials=weaviate.auth.AuthApiKey(api_key),
)
else:
return weaviate.connect_to_local(
host=url.replace("http://", "").replace("https://", "").split(":")[0],
port=int(url.split(":")[-1]) if ":" in url.split("/")[-1] else 8080,
)
def parse_relative_date(date_str: str) -> Optional[datetime]:
"""Parse relative date string (7d, 3h, 1w, 30m) to datetime."""
if not date_str:
return None
# Try ISO format first
try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except ValueError:
pass
# Parse relative format
match = re.match(r"(\d+)([dhwm])", date_str.lower())
if match:
value = int(match.group(1))
unit = match.group(2)
now = datetime.now()
if unit == "d":
return now - timedelta(days=value)
elif unit == "h":
return now - timedelta(hours=value)
elif unit == "w":
return now - timedelta(weeks=value)
elif unit == "m":
return now - timedelta(minutes=value)
return None
# =============================================================================
# Tool Handlers
# =============================================================================
async def search_memories_handler(input_data: SearchMemoriesInput) -> Dict[str, Any]:
"""
Search across both Thoughts and Conversations.
Returns unified results sorted by relevance or date.
"""
try:
client = get_weaviate_client()
results = []
# Parse date filters
since_dt = parse_relative_date(input_data.since) if input_data.since else None
before_dt = parse_relative_date(input_data.before) if input_data.before else None
# Search Thoughts (if not filtered to conversations only)
if input_data.filter_type != "conversations":
try:
thought_collection = client.collections.get("Thought")
if input_data.query:
thought_results = thought_collection.query.near_text(
query=input_data.query,
limit=input_data.n_results,
return_metadata=MetadataQuery(distance=True),
)
else:
thought_results = thought_collection.query.fetch_objects(
limit=input_data.n_results,
)
for obj in thought_results.objects:
props = obj.properties
timestamp = props.get("timestamp", "")
# Apply date filters
if since_dt and timestamp:
try:
obj_dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
if obj_dt < since_dt:
continue
except:
pass
if before_dt and timestamp:
try:
obj_dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
if obj_dt > before_dt:
continue
except:
pass
results.append({
"type": "thought",
"id": str(obj.uuid),
"content": props.get("content", "")[:500],
"thought_type": props.get("thought_type", ""),
"timestamp": timestamp,
"concepts": props.get("concepts", []),
"distance": obj.metadata.distance if obj.metadata else None,
})
except Exception as e:
# Collection might not exist
pass
# Search Conversations (if not filtered to thoughts only)
if input_data.filter_type != "thoughts":
try:
conv_collection = client.collections.get("Conversation")
if input_data.query:
conv_results = conv_collection.query.near_text(
query=input_data.query,
limit=input_data.n_results,
return_metadata=MetadataQuery(distance=True),
)
else:
conv_results = conv_collection.query.fetch_objects(
limit=input_data.n_results,
)
for obj in conv_results.objects:
props = obj.properties
timestamp = props.get("timestamp_start", "") or props.get("timestamp_end", "")
# Apply date filters
if since_dt and timestamp:
try:
obj_dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
if obj_dt < since_dt:
continue
except:
pass
if before_dt and timestamp:
try:
obj_dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
if obj_dt > before_dt:
continue
except:
pass
results.append({
"type": "conversation",
"id": props.get("conversation_id", str(obj.uuid)),
"summary": props.get("summary", "")[:500],
"category": props.get("category", ""),
"timestamp": timestamp,
"tags": props.get("tags", []),
"distance": obj.metadata.distance if obj.metadata else None,
})
except Exception as e:
# Collection might not exist
pass
# Sort results
if input_data.sort_by == "date_desc":
results.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
elif input_data.sort_by == "date_asc":
results.sort(key=lambda x: x.get("timestamp", ""))
else:
# Sort by distance (relevance) - lower is better
results.sort(key=lambda x: x.get("distance") or 999)
# Limit results
results = results[:input_data.n_results]
client.close()
return {
"success": True,
"query": input_data.query,
"results": results,
"count": len(results),
"filter_type": input_data.filter_type,
}
except Exception as e:
return {
"success": False,
"error": str(e),
"query": input_data.query,
"results": [],
"count": 0,
}
async def trace_concept_evolution_handler(input_data: TraceConceptEvolutionInput) -> Dict[str, Any]:
"""
Trace the evolution of a concept through thoughts and conversations over time.
Returns a timeline showing how the concept appeared and evolved.
"""
try:
client = get_weaviate_client()
timeline = []
# Search Thoughts for the concept
try:
thought_collection = client.collections.get("Thought")
thought_results = thought_collection.query.near_text(
query=input_data.concept,
limit=input_data.limit,
return_metadata=MetadataQuery(distance=True),
)
for obj in thought_results.objects:
props = obj.properties
distance = obj.metadata.distance if obj.metadata else 1.0
# Only include reasonably relevant results
if distance < 0.8:
timeline.append({
"type": "thought",
"id": str(obj.uuid),
"timestamp": props.get("timestamp", ""),
"content": props.get("content", "")[:300],
"thought_type": props.get("thought_type", ""),
"evolution_stage": props.get("evolution_stage", "nascent"),
"relevance": 1 - distance,
})
except:
pass
# Search Conversations for the concept
try:
conv_collection = client.collections.get("Conversation")
conv_results = conv_collection.query.near_text(
query=input_data.concept,
limit=input_data.limit,
return_metadata=MetadataQuery(distance=True),
)
for obj in conv_results.objects:
props = obj.properties
distance = obj.metadata.distance if obj.metadata else 1.0
if distance < 0.8:
timeline.append({
"type": "conversation",
"id": props.get("conversation_id", str(obj.uuid)),
"timestamp": props.get("timestamp_start", ""),
"summary": props.get("summary", "")[:300],
"category": props.get("category", ""),
"relevance": 1 - distance,
})
except:
pass
# Sort by timestamp
timeline.sort(key=lambda x: x.get("timestamp", ""))
# Limit results
timeline = timeline[:input_data.limit]
client.close()
return {
"success": True,
"concept": input_data.concept,
"timeline": timeline,
"count": len(timeline),
}
except Exception as e:
return {
"success": False,
"error": str(e),
"concept": input_data.concept,
"timeline": [],
"count": 0,
}
async def check_consistency_handler(input_data: CheckConsistencyInput) -> Dict[str, Any]:
"""
Check if a statement is consistent with existing thoughts and conversations.
Searches for similar content and identifies potential contradictions.
"""
try:
client = get_weaviate_client()
related_content = []
# Search for similar thoughts
try:
thought_collection = client.collections.get("Thought")
thought_results = thought_collection.query.near_text(
query=input_data.statement,
limit=10,
return_metadata=MetadataQuery(distance=True),
)
for obj in thought_results.objects:
props = obj.properties
distance = obj.metadata.distance if obj.metadata else 1.0
if distance < 0.7: # Only very similar content
related_content.append({
"type": "thought",
"content": props.get("content", "")[:400],
"thought_type": props.get("thought_type", ""),
"timestamp": props.get("timestamp", ""),
"similarity": 1 - distance,
})
except:
pass
# Search for similar conversations
try:
conv_collection = client.collections.get("Conversation")
conv_results = conv_collection.query.near_text(
query=input_data.statement,
limit=10,
return_metadata=MetadataQuery(distance=True),
)
for obj in conv_results.objects:
props = obj.properties
distance = obj.metadata.distance if obj.metadata else 1.0
if distance < 0.7:
related_content.append({
"type": "conversation",
"summary": props.get("summary", "")[:400],
"category": props.get("category", ""),
"timestamp": props.get("timestamp_start", ""),
"similarity": 1 - distance,
})
except:
pass
client.close()
# Sort by similarity
related_content.sort(key=lambda x: x.get("similarity", 0), reverse=True)
# Calculate consistency score
if not related_content:
consistency_score = 1.0 # No related content = no contradiction
analysis = "Aucun contenu similaire trouvé. L'affirmation semble nouvelle."
else:
avg_similarity = sum(c.get("similarity", 0) for c in related_content) / len(related_content)
consistency_score = avg_similarity
if avg_similarity > 0.8:
analysis = "L'affirmation est très cohérente avec le contenu existant."
elif avg_similarity > 0.6:
analysis = "L'affirmation est partiellement cohérente. Quelques nuances possibles."
else:
analysis = "L'affirmation pourrait nécessiter une vérification. Similarité modérée."
return {
"success": True,
"statement": input_data.statement,
"consistency_score": round(consistency_score, 2),
"analysis": analysis,
"related_content": related_content[:5],
"count": len(related_content),
}
except Exception as e:
return {
"success": False,
"error": str(e),
"statement": input_data.statement,
"consistency_score": 0,
"analysis": f"Erreur lors de la vérification: {str(e)}",
"related_content": [],
}
async def update_thought_evolution_stage_handler(
input_data: UpdateThoughtEvolutionStageInput
) -> Dict[str, Any]:
"""
Update the evolution stage of an existing thought.
Stages: nascent, developing, mature, revised, abandoned
"""
valid_stages = ["nascent", "developing", "mature", "revised", "abandoned"]
if input_data.new_stage not in valid_stages:
return {
"success": False,
"error": f"Invalid stage. Must be one of: {', '.join(valid_stages)}",
"thought_id": input_data.thought_id,
}
try:
client = get_weaviate_client()
thought_collection = client.collections.get("Thought")
# Try to find the thought by ID
# The thought_id could be a UUID or a custom format like "thought_2025-01-15T10:30:00"
thought_uuid = None
# Try direct UUID lookup
try:
import uuid as uuid_module
thought_uuid = uuid_module.UUID(input_data.thought_id)
except ValueError:
# Not a UUID, search by custom ID pattern
pass
if thought_uuid:
# Update by UUID
thought_collection.data.update(
uuid=thought_uuid,
properties={"evolution_stage": input_data.new_stage}
)
else:
# Search for thought with matching ID in content or metadata
results = thought_collection.query.fetch_objects(
limit=100, # Search through recent thoughts
)
found = False
for obj in results.objects:
# Check if the thought_id matches any identifier
props = obj.properties
timestamp = props.get("timestamp", "")
# Match by timestamp-based ID
if input_data.thought_id in timestamp or timestamp in input_data.thought_id:
thought_collection.data.update(
uuid=obj.uuid,
properties={"evolution_stage": input_data.new_stage}
)
found = True
break
if not found:
client.close()
return {
"success": False,
"error": f"Thought not found with ID: {input_data.thought_id}",
"thought_id": input_data.thought_id,
}
client.close()
return {
"success": True,
"thought_id": input_data.thought_id,
"new_stage": input_data.new_stage,
"message": f"Evolution stage updated to '{input_data.new_stage}'",
}
except Exception as e:
return {
"success": False,
"error": str(e),
"thought_id": input_data.thought_id,
}