- Add complete Library RAG application (Flask + MCP server) - PDF processing pipeline with OCR and LLM extraction - Weaviate vector database integration (BGE-M3 embeddings) - Flask web interface with search and document management - MCP server for Claude Desktop integration - Comprehensive test suite (134 tests) - Clean up root directory - Remove obsolete documentation files - Remove backup and temporary files - Update autonomous agent configuration - Update prompts - Enhance initializer bis prompt with better instructions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
463 lines
15 KiB
Python
463 lines
15 KiB
Python
"""Structured JSON logging configuration for Library RAG MCP Server.
|
|
|
|
This module provides structured JSON logging with sensitive data filtering
|
|
and tool invocation tracking.
|
|
|
|
Features:
|
|
- JSON-formatted log output for machine parsing
|
|
- Sensitive data filtering (API keys, passwords)
|
|
- Tool invocation logging with timing
|
|
- Configurable log levels via environment variable
|
|
|
|
Example:
|
|
Configure logging at server startup::
|
|
|
|
from mcp_tools.logging_config import setup_mcp_logging, get_tool_logger
|
|
|
|
# Setup logging
|
|
logger = setup_mcp_logging(log_level="INFO")
|
|
|
|
# Get tool-specific logger
|
|
tool_logger = get_tool_logger("search_chunks")
|
|
tool_logger.info("Processing query", extra={"query": "justice"})
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from contextlib import contextmanager
|
|
from datetime import datetime, timezone
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, Generator, Literal, Optional, TypeVar, cast
|
|
|
|
# Type variable for decorator return type preservation
|
|
F = TypeVar("F", bound=Callable[..., Any])
|
|
|
|
# =============================================================================
|
|
# Sensitive Data Patterns
|
|
# =============================================================================
|
|
|
|
# Patterns to detect sensitive data in log messages
|
|
SENSITIVE_PATTERNS = [
|
|
# API keys
|
|
(re.compile(r'(api[_-]?key\s*[=:]\s*)["\']?[\w-]{20,}["\']?', re.I), r"\1***REDACTED***"),
|
|
(re.compile(r'(bearer\s+)[\w-]{20,}', re.I), r"\1***REDACTED***"),
|
|
(re.compile(r'(authorization\s*[=:]\s*)["\']?[\w-]{20,}["\']?', re.I), r"\1***REDACTED***"),
|
|
# Mistral API key format
|
|
(re.compile(r'(MISTRAL_API_KEY\s*[=:]\s*)["\']?[\w-]+["\']?', re.I), r"\1***REDACTED***"),
|
|
# Generic secrets
|
|
(re.compile(r'(password\s*[=:]\s*)["\']?[^\s"\']+["\']?', re.I), r"\1***REDACTED***"),
|
|
(re.compile(r'(secret\s*[=:]\s*)["\']?[\w-]+["\']?', re.I), r"\1***REDACTED***"),
|
|
(re.compile(r'(token\s*[=:]\s*)["\']?[\w-]{20,}["\']?', re.I), r"\1***REDACTED***"),
|
|
]
|
|
|
|
|
|
def redact_sensitive_data(message: str) -> str:
|
|
"""Remove sensitive data from log messages.
|
|
|
|
Args:
|
|
message: The log message to sanitize.
|
|
|
|
Returns:
|
|
Sanitized message with sensitive data redacted.
|
|
|
|
Example:
|
|
>>> redact_sensitive_data("api_key=sk-12345abcdef")
|
|
"api_key=***REDACTED***"
|
|
"""
|
|
result = message
|
|
for pattern, replacement in SENSITIVE_PATTERNS:
|
|
result = pattern.sub(replacement, result)
|
|
return result
|
|
|
|
|
|
def redact_dict(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Recursively redact sensitive data from a dictionary.
|
|
|
|
Args:
|
|
data: Dictionary that may contain sensitive data.
|
|
|
|
Returns:
|
|
New dictionary with sensitive values redacted.
|
|
"""
|
|
sensitive_keys = {
|
|
"api_key", "apikey", "api-key",
|
|
"password", "passwd", "pwd",
|
|
"secret", "token", "auth",
|
|
"authorization", "bearer",
|
|
"mistral_api_key", "MISTRAL_API_KEY",
|
|
}
|
|
|
|
result: Dict[str, Any] = {}
|
|
for key, value in data.items():
|
|
key_lower = key.lower().replace("-", "_")
|
|
|
|
if key_lower in sensitive_keys or any(s in key_lower for s in ["key", "secret", "token", "password"]):
|
|
result[key] = "***REDACTED***"
|
|
elif isinstance(value, dict):
|
|
result[key] = redact_dict(value)
|
|
elif isinstance(value, str):
|
|
result[key] = redact_sensitive_data(value)
|
|
else:
|
|
result[key] = value
|
|
|
|
return result
|
|
|
|
|
|
# =============================================================================
|
|
# JSON Log Formatter
|
|
# =============================================================================
|
|
|
|
|
|
class JSONLogFormatter(logging.Formatter):
|
|
"""JSON formatter for structured logging.
|
|
|
|
Outputs log records as single-line JSON objects with consistent structure.
|
|
Automatically redacts sensitive data from messages and extra fields.
|
|
|
|
JSON Structure:
|
|
{
|
|
"timestamp": "2024-12-24T10:30:00.000Z",
|
|
"level": "INFO",
|
|
"logger": "library-rag-mcp.search_chunks",
|
|
"message": "Processing query",
|
|
"tool": "search_chunks",
|
|
"duration_ms": 123,
|
|
...extra fields...
|
|
}
|
|
"""
|
|
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
"""Format the log record as JSON.
|
|
|
|
Args:
|
|
record: The log record to format.
|
|
|
|
Returns:
|
|
JSON-formatted log string.
|
|
"""
|
|
# Base log structure
|
|
log_entry: Dict[str, Any] = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"level": record.levelname,
|
|
"logger": record.name,
|
|
"message": redact_sensitive_data(record.getMessage()),
|
|
}
|
|
|
|
# Add exception info if present
|
|
if record.exc_info:
|
|
log_entry["exception"] = self.formatException(record.exc_info)
|
|
|
|
# Add extra fields (excluding standard LogRecord attributes)
|
|
standard_attrs = {
|
|
"name", "msg", "args", "levelname", "levelno", "pathname",
|
|
"filename", "module", "lineno", "funcName", "created",
|
|
"msecs", "relativeCreated", "thread", "threadName",
|
|
"processName", "process", "exc_info", "exc_text", "stack_info",
|
|
"message", "taskName",
|
|
}
|
|
|
|
for key, value in record.__dict__.items():
|
|
if key not in standard_attrs and not key.startswith("_"):
|
|
if isinstance(value, dict):
|
|
log_entry[key] = redact_dict(value)
|
|
elif isinstance(value, str):
|
|
log_entry[key] = redact_sensitive_data(value)
|
|
else:
|
|
log_entry[key] = value
|
|
|
|
return json.dumps(log_entry, default=str, ensure_ascii=False)
|
|
|
|
|
|
# =============================================================================
|
|
# Logging Setup
|
|
# =============================================================================
|
|
|
|
|
|
def setup_mcp_logging(
|
|
log_level: str = "INFO",
|
|
log_dir: Optional[Path] = None,
|
|
json_format: bool = True,
|
|
) -> logging.Logger:
|
|
"""Configure structured logging for the MCP server.
|
|
|
|
Sets up logging with JSON formatting to both file and stderr.
|
|
Uses stderr for console output since stdout is used for MCP communication.
|
|
|
|
Args:
|
|
log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
|
|
log_dir: Directory for log files. Defaults to "logs".
|
|
json_format: Use JSON formatting (default True).
|
|
|
|
Returns:
|
|
Configured logger instance for the MCP server.
|
|
|
|
Example:
|
|
>>> logger = setup_mcp_logging(log_level="DEBUG")
|
|
>>> logger.info("Server started", extra={"port": 8080})
|
|
"""
|
|
# Determine log directory
|
|
if log_dir is None:
|
|
log_dir = Path("logs")
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Get or create the root MCP logger
|
|
logger = logging.getLogger("library-rag-mcp")
|
|
logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
|
|
|
|
# Clear existing handlers to avoid duplicates
|
|
logger.handlers.clear()
|
|
|
|
# Create formatters
|
|
if json_format:
|
|
formatter: logging.Formatter = JSONLogFormatter()
|
|
else:
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
# File handler (JSON logs)
|
|
file_handler = logging.FileHandler(
|
|
log_dir / "mcp_server.log",
|
|
encoding="utf-8",
|
|
)
|
|
file_handler.setLevel(logging.DEBUG) # Log everything to file
|
|
file_handler.setFormatter(formatter)
|
|
logger.addHandler(file_handler)
|
|
|
|
# Stderr handler (for console output - stdout is for MCP)
|
|
stderr_handler = logging.StreamHandler(sys.stderr)
|
|
stderr_handler.setLevel(getattr(logging, log_level.upper(), logging.INFO))
|
|
stderr_handler.setFormatter(formatter)
|
|
logger.addHandler(stderr_handler)
|
|
|
|
# Prevent propagation to root logger
|
|
logger.propagate = False
|
|
|
|
return logger
|
|
|
|
|
|
def get_tool_logger(tool_name: str) -> logging.Logger:
|
|
"""Get a logger for a specific MCP tool.
|
|
|
|
Creates a child logger under the main MCP logger with the tool name
|
|
automatically included in log entries.
|
|
|
|
Args:
|
|
tool_name: Name of the MCP tool (e.g., "search_chunks", "parse_pdf").
|
|
|
|
Returns:
|
|
Logger instance for the tool.
|
|
|
|
Example:
|
|
>>> logger = get_tool_logger("search_chunks")
|
|
>>> logger.info("Query processed", extra={"results": 10})
|
|
"""
|
|
return logging.getLogger(f"library-rag-mcp.{tool_name}")
|
|
|
|
|
|
# =============================================================================
|
|
# Tool Invocation Logging
|
|
# =============================================================================
|
|
|
|
|
|
class ToolInvocationLogger:
|
|
"""Context manager for logging tool invocations with timing.
|
|
|
|
Automatically logs tool start, success/failure, and duration.
|
|
Handles exception logging and provides structured output.
|
|
|
|
Example:
|
|
>>> with ToolInvocationLogger("search_chunks", {"query": "justice"}) as inv:
|
|
... result = do_search()
|
|
... inv.set_result({"count": 10})
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
tool_name: str,
|
|
inputs: Dict[str, Any],
|
|
logger: Optional[logging.Logger] = None,
|
|
) -> None:
|
|
"""Initialize the invocation logger.
|
|
|
|
Args:
|
|
tool_name: Name of the tool being invoked.
|
|
inputs: Tool input parameters (will be redacted).
|
|
logger: Logger to use. Defaults to tool-specific logger.
|
|
"""
|
|
self.tool_name = tool_name
|
|
self.inputs = redact_dict(inputs)
|
|
self.logger = logger or get_tool_logger(tool_name)
|
|
self.start_time: float = 0.0
|
|
self.result: Optional[Dict[str, Any]] = None
|
|
self.error: Optional[Exception] = None
|
|
|
|
def __enter__(self) -> "ToolInvocationLogger":
|
|
"""Start timing and log invocation start."""
|
|
self.start_time = time.perf_counter()
|
|
self.logger.info(
|
|
f"Tool invocation started: {self.tool_name}",
|
|
extra={
|
|
"tool": self.tool_name,
|
|
"event": "invocation_start",
|
|
"inputs": self.inputs,
|
|
},
|
|
)
|
|
return self
|
|
|
|
def __exit__(
|
|
self,
|
|
exc_type: Optional[type],
|
|
exc_val: Optional[BaseException],
|
|
exc_tb: Any,
|
|
) -> Literal[False]:
|
|
"""Log invocation completion with timing."""
|
|
duration_ms = (time.perf_counter() - self.start_time) * 1000
|
|
|
|
if exc_val is not None:
|
|
# Log error
|
|
self.logger.error(
|
|
f"Tool invocation failed: {self.tool_name}",
|
|
extra={
|
|
"tool": self.tool_name,
|
|
"event": "invocation_error",
|
|
"duration_ms": round(duration_ms, 2),
|
|
"error_type": exc_type.__name__ if exc_type else "Unknown",
|
|
"error_message": str(exc_val),
|
|
},
|
|
exc_info=True,
|
|
)
|
|
# Don't suppress the exception
|
|
return False
|
|
|
|
# Log success
|
|
extra: Dict[str, Any] = {
|
|
"tool": self.tool_name,
|
|
"event": "invocation_success",
|
|
"duration_ms": round(duration_ms, 2),
|
|
}
|
|
if self.result:
|
|
extra["result_summary"] = self._summarize_result()
|
|
|
|
self.logger.info(
|
|
f"Tool invocation completed: {self.tool_name}",
|
|
extra=extra,
|
|
)
|
|
return False
|
|
|
|
def set_result(self, result: Dict[str, Any]) -> None:
|
|
"""Set the result for logging summary.
|
|
|
|
Args:
|
|
result: The tool result dictionary.
|
|
"""
|
|
self.result = result
|
|
|
|
def _summarize_result(self) -> Dict[str, Any]:
|
|
"""Create a summary of the result for logging.
|
|
|
|
Returns:
|
|
Dictionary with key result metrics (counts, success status, etc.)
|
|
"""
|
|
if not self.result:
|
|
return {}
|
|
|
|
summary: Dict[str, Any] = {}
|
|
|
|
# Common summary fields
|
|
if "success" in self.result:
|
|
summary["success"] = self.result["success"]
|
|
if "total_count" in self.result:
|
|
summary["total_count"] = self.result["total_count"]
|
|
if "results" in self.result and isinstance(self.result["results"], list):
|
|
summary["result_count"] = len(self.result["results"])
|
|
if "chunks_count" in self.result:
|
|
summary["chunks_count"] = self.result["chunks_count"]
|
|
if "cost_total" in self.result:
|
|
summary["cost_total"] = self.result["cost_total"]
|
|
if "found" in self.result:
|
|
summary["found"] = self.result["found"]
|
|
if "error" in self.result and self.result["error"]:
|
|
summary["error"] = self.result["error"]
|
|
|
|
return summary
|
|
|
|
|
|
@contextmanager
|
|
def log_tool_invocation(
|
|
tool_name: str,
|
|
inputs: Dict[str, Any],
|
|
) -> Generator[ToolInvocationLogger, None, None]:
|
|
"""Context manager for logging tool invocations.
|
|
|
|
Convenience function that creates and manages a ToolInvocationLogger.
|
|
|
|
Args:
|
|
tool_name: Name of the tool being invoked.
|
|
inputs: Tool input parameters.
|
|
|
|
Yields:
|
|
ToolInvocationLogger instance for setting results.
|
|
|
|
Example:
|
|
>>> with log_tool_invocation("search_chunks", {"query": "test"}) as inv:
|
|
... result = search(query)
|
|
... inv.set_result(result)
|
|
"""
|
|
logger_instance = ToolInvocationLogger(tool_name, inputs)
|
|
with logger_instance as inv:
|
|
yield inv
|
|
|
|
|
|
def log_weaviate_query(
|
|
operation: str,
|
|
collection: str,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
result_count: Optional[int] = None,
|
|
duration_ms: Optional[float] = None,
|
|
) -> None:
|
|
"""Log a Weaviate query operation.
|
|
|
|
Utility function for logging Weaviate database queries with consistent
|
|
structure.
|
|
|
|
Args:
|
|
operation: Query operation type (fetch, near_text, aggregate, etc.).
|
|
collection: Weaviate collection name.
|
|
filters: Query filters applied (optional).
|
|
result_count: Number of results returned (optional).
|
|
duration_ms: Query duration in milliseconds (optional).
|
|
|
|
Example:
|
|
>>> log_weaviate_query(
|
|
... operation="near_text",
|
|
... collection="Chunk",
|
|
... filters={"author": "Platon"},
|
|
... result_count=10,
|
|
... duration_ms=45.2
|
|
... )
|
|
"""
|
|
logger = logging.getLogger("library-rag-mcp.weaviate")
|
|
|
|
extra: Dict[str, Any] = {
|
|
"event": "weaviate_query",
|
|
"operation": operation,
|
|
"collection": collection,
|
|
}
|
|
|
|
if filters:
|
|
extra["filters"] = redact_dict(filters)
|
|
if result_count is not None:
|
|
extra["result_count"] = result_count
|
|
if duration_ms is not None:
|
|
extra["duration_ms"] = round(duration_ms, 2)
|
|
|
|
logger.debug(f"Weaviate {operation} on {collection}", extra=extra)
|