This is page 14 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/run_all_demo_scripts_and_check_for_errors.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Runs all demo scripts in the 'examples' folder sequentially and checks for errors.
Uses rich for progress tracking and a summary report.
Incorporates specific knowledge about expected outcomes for individual scripts.
"""
import asyncio
import re # Import regex
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple
from rich import box
from rich.console import Console
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn
from rich.rule import Rule
from rich.table import Table
# --- Configuration ---
EXAMPLES_DIR = Path(__file__).parent / "examples"
PYTHON_EXECUTABLE = sys.executable # Use the same Python interpreter that runs this script
OUTPUT_LOG_FILE = Path(__file__).parent / "all_demo_script_console_output_log.txt"
# Scripts to skip (not actual demo scripts or special cases)
SCRIPTS_TO_SKIP = ["sse_client_demo.py", "web_automation_instruction_packs.py", "__init__.py"]
# Strings indicating a critical error in the output (used if no specific allowed patterns)
DEFAULT_ERROR_INDICATORS = ["Traceback (most recent call last):", "CRITICAL"]
# --- Individual Demo Expectations ---
# Define expected outcomes for specific scripts.
# - expected_exit_code: The code the script should exit with (default: 0)
# - allowed_stderr_patterns: List of regex patterns for stderr messages that are OK for this script.
# If this list exists, DEFAULT_ERROR_INDICATORS are ignored for stderr.
# - allowed_stdout_patterns: List of regex patterns for stdout messages that are OK (less common).
# If this list exists, DEFAULT_ERROR_INDICATORS are ignored for stdout.
DEMO_EXPECTATIONS: Dict[str, Dict[str, Any]] = {
# --- Scripts with specific known patterns ---
"text_redline_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues - expected when API keys aren't configured
r"Provider '(openai|anthropic|google)' not available or initialized",
r"Failed to get provider: No valid OpenAI key found",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"filesystem_operations_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC intentional demo patterns - these test protection features
r"Protection Triggered! Deletion of \d+ files blocked", # Specific deletion protection test
r"Could not set utime for.*?: \[Errno \d+\]", # Specific file timestamp issue with exact error format
# Configuration verification messages - specific to demo setup
r"WARNING: No allowed directories loaded in filesystem configuration", # Specific verification message
r"WARNING: Temporary directory .* not found in loaded allowed dirs:", # Specific verification message
# OS-specific limitations - with specific reasons
r"WARNING: Symlink creation might not be supported or permitted on this system", # Windows-specific limitation
r"WARNING: Could not create symlink \(.*\): \[Errno \d+\]", # OS-specific permission error with exact format
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
r"Forcing configuration reload due to GATEWAY_FORCE_CONFIG_RELOAD=true\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
],
"allowed_stdout_patterns": [
# Specific allowed stdout patterns that aren't errors
r"WARNING: .*", # Warning messages in stdout
r"ERROR: .*", # Error messages in stdout (these are demo outputs, not actual errors)
]
},
"sql_database_interactions_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC column statistics computation issues - known data type limitation
r"Could not compute statistics for column customers\.signup_date: 'str' object has no attribute 'isoformat'", # Specific data type issue
r"Could not compute statistics for column orders\.order_date: 'str' object has no attribute 'isoformat'", # Specific data type issue
# Demo-specific database connection scenarios - intentional examples
r"Connection failed: \(sqlite3\.OperationalError\) unable to open database file", # Specific SQLite error format
r"Failed to connect to database \(sqlite:///.*\): \(sqlite3\.OperationalError\) unable to open database file", # Specific connection error format
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
]
},
"rag_example.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC cleanup messages with reasons - intentional error handling
r"Could not delete collection 'demo_.*?': Collection '.*?' does not exist", # Non-existent collection during cleanup
r"Error deleting knowledge base 'demo-kb': Knowledge base 'demo-kb' not found", # Non-existent KB during cleanup
r"Error directly deleting vector collection 'demo_.*?': Collection '.*?' does not exist", # Non-existent collection
# SPECIFIC provider availability issues - expected when API keys aren't configured
r"Provider '(openai|anthropic|google)' not available or initialized", # Missing specific provider
r"No suitable provider found for embedding generation", # No embedding provider available
r"OpenAIError: No API key provided.", # Specific API key error
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
r"Initializing Gateway: Loading configuration\.\.\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"marqo_fused_search_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC setup/config issues - expected on systems without Marqo
r"Marqo config file not found at path: .*config/marqo\.json", # Specific config file path
r"Error decoding Marqo config file: No JSON object could be decoded", # Specific JSON parsing error
r"Exiting demo as Marqo config could not be loaded\.", # Specific exit message
# SPECIFIC connection issues - expected on systems without Marqo service
r"Connection refused: \[Errno 111\] Connection refused", # Specific connection error with errno
# SPECIFIC skipping behavior - expected for incomplete setup
r"Skipping Example \d+: No suitable .* field found in dataset", # Specific reason for skipping
]
},
"advanced_vector_search_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues
r"Provider '(openai|anthropic|google)' not available or initialized",
r"No suitable provider found for embedding generation",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"vector_search_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC cleanup messages with reasons - intentional cleanup operations
r"Could not delete collection 'demo_.*?': Collection '.*?' does not exist", # Non-existent collection during cleanup
# SPECIFIC provider availability issues - expected when API keys aren't configured
r"Failed to initialize provider '(openai|anthropic|google)': .*API key.*", # Specific provider with API key issue
r"No suitable provider found for embedding generation", # Specific embedding provider error
# SPECIFIC demo workflow messages - expected for educational examples
r"Skipping RAG demo - embedding provider not available", # Specific reason for skipping demo
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"prompt_templates_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC intentional template demo cases - expected behavior demonstrations
r"Template 'non_existent_template\.txt' not found in .*/templates/", # Specific non-existent template
r"Could not render with missing variables: \['variable_name'\]", # Specific missing variable demonstration
# SPECIFIC provider availability messages - expected when API keys aren't configured
r"No providers available for completion with template", # Specific provider availability message
# Standard setup messages - not errors
r"Initializing Gateway: Loading configuration\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
r"Ultimate MCP Server .* initialized",
r"Initializing LLM providers",
r"Configuration not yet loaded\. Loading now\.\.\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"tournament_code_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Intentional demo cases (clean slate testing)
r"Error reading state file directly", r"State file not found at", # First run of tournament
r"No functions found in the code", # Test for empty code
# Known state handling messages
r"Cleanup error:", # Non-critical cleanup issues
# Provider availability (expected if not configured)
r"Failed to initialize providers", # Expected when API keys not configured
# Initialization logging (not errors)
r"Gateway initialized",
r"Initializing Gateway.*",
# Common setup/config messages (not errors)
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# Formatting patterns (not errors)
r"─+.*─+", # Section dividers
r"INFO.*", # INFO level log messages
]
},
"tournament_text_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Intentional demo cases (clean slate testing)
r"Error reading state file directly", r"State file not found at", # First run of tournament
# Provider availability (expected if not configured)
r"Provider .* not available for evaluation", # Expected when API keys missing
r"Failed to initialize providers", # Expected when API keys missing
# Timeout handling (acceptable on slow CI)
r"Evaluation with .* timed out", # Long-running ops may timeout
# Common setup/config messages (not errors)
r"Gateway initialized",
r"Initializing Gateway.*",
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# Formatting patterns (not errors)
r"─+.*─+", # Section dividers
r"INFO.*", # INFO level log messages
]
},
"test_code_extraction.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Intentional demo cases (clean slate testing)
r"Error loading tournament state: .*No such file or directory", # First run
r"Failed to load tournament state", # First run
r"No round results found", # Expected for empty state
# Provider availability (expected if not configured)
r"Failed to initialize providers", # Expected if API keys not present
# Common setup/config messages (not errors)
r"Initializing Gateway", r"Configuration loaded",
r"Ultimate MCP Server .* initialized", r"Initializing LLM providers",
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# Formatting patterns (not errors)
r"─+.*─+", # Section dividers
r"INFO.*", # INFO level log messages
r"WARNING.*", # WARNING level log messages
]
},
"advanced_extraction_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability (expected if not configured)
r"Failed to get OpenAI provider", # Expected if API key not present
r"Failed to initialize OpenAI provider", # Expected if API key not present
# Common setup/config messages (not errors)
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# Formatting patterns (not errors)
r"─+.*─+", # Section dividers
],
# Allow the skip message in stdout
"allowed_stdout_patterns": [r"Skipping .* demo - no provider available", r"Raw Model Output \(JSON parsing failed\)"]
},
"analytics_reporting_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC provider availability issues - expected when API keys aren't configured
r"Failed to get/initialize provider '(openai|anthropic|google)': .*", # Specific provider with reason
r"No providers could be initialized for this demonstration", # Specific provider initialization message
r"No default model found for provider '(openai|anthropic|google)'", # Specific model availability issue
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
# Logging patterns - not errors
r"INFO \d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.*", # Timestamped INFO logs
# Initialization messages - not errors
r"Simulating usage with \d+ providers\." # Specific simulation statement
]
},
"basic_completion_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC provider availability issues - expected when API keys aren't configured
r"Provider '(openai|anthropic|google)' not available or initialized", # Specific missing provider
r"All providers failed: No providers available for completion", # Specific provider failure
# SPECIFIC demo features - expected component testing
r"Error with cached completion demo: Cache is disabled", # Specific cache demo error
# Standard setup and logging messages - not errors
r"Initializing Gateway: Loading configuration\.\.\.",
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
r"INFO \d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.*", # Timestamped INFO logs
r"Ultimate MCP Server 'basic-completion-demo' initialized", # Specific initialization message
]
},
"browser_automation_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Browser automation issues - expected during demos
r"Could not find search input element with selectors: .*", # Element not found error
r"playwright\._impl\._api_types\.TimeoutError: Timeout \d+ms exceeded", # Timeout error
r"net::ERR_CONNECTION_REFUSED at .*", # Connection error
r"Navigation failed: net::ERR_CONNECTION_REFUSED at .*", # Navigation error
r"Execution error in.*: .*", # General execution errors
r"Traceback \(most recent call last\):.*", # Tracebacks from browser automation
# Provider availability issues
r"Provider '(openai|anthropic|google)' not available or initialized",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"claude_integration_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC provider availability issues - expected when API keys aren't configured
r"Provider 'anthropic' not available or initialized", # Specific Claude provider missing
r"No suitable Claude model found in available models: \[\]", # Specific Claude model selection issue
r"Selected models not found: \['claude-3-opus-20240229', 'claude-3-sonnet-20240229'\]", # Specific model availability issue
r"Model 'claude-3-opus-20240229' not available, falling back to default\.", # Specific fallback behavior
# Standard setup messages - not errors
r"Initializing Gateway: Loading configuration\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
r"Ultimate MCP Server 'claude-demo' initialized",
r"Initializing LLM providers",
r"Configuration not yet loaded\. Loading now\.\.\.",
# Logging patterns - not errors
r"INFO \d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.*", # Timestamped INFO logs
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"compare_synthesize_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC provider availability issues - expected when API keys aren't configured
r"Failed to initialize providers: No providers available", # Specific provider initialization message
# SPECIFIC tool registration messages - expected behavior for specialized tools
r"compare_and_synthesize tool FAILED to register: Tool 'compare_and_synthesize' requires 2\+ providers", # Specific registration failure reason
# Standard setup messages - not errors
r"Initializing Gateway: Loading configuration\.\.\.",
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
r"INFO \d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.*", # Timestamped INFO logs
r"Ultimate MCP Server 'compare-synthesize-demo-v2' initialized", # Specific initialization message
]
},
"cost_optimization.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC provider availability issues - expected when API keys aren't configured
r"API key for provider '(openai|anthropic|google)' not found", # Specific API key missing message
r"Could not determine provider for model '.*?'", # Specific model-provider mapping issue
r"No models met criteria: max_cost=\$\d+\.\d+, .*", # Specific criteria filtering result
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
# Logging patterns - not errors
r"INFO \d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.*", # Timestamped INFO logs
]
},
"document_processing.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC initialization messages - expected setup steps
r"Clearing cache before demonstration\.\.\.", # Specific cache operation
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
# Logging patterns - not errors
r"INFO \d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.*", # Timestamped INFO logs
]
},
"multi_provider_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC provider availability issues - expected when API keys aren't configured
r"Provider '(openai|anthropic|google)' not available or initialized", # Specific provider not available
r"All providers failed: \['(openai|anthropic|google)'.*?\]", # Specific list of failed providers
# Standard setup messages - not errors
r"Initializing Gateway: Loading configuration\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
r"Ultimate MCP Server 'multi-provider-demo' initialized",
r"Initializing LLM providers",
r"Configuration not yet loaded\. Loading now\.\.\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
# Logging patterns - not errors
r"INFO \d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.*", # Timestamped INFO logs
]
},
"simple_completion_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC provider availability issues - expected when API keys aren't configured
r"Provider '(openai|anthropic|google)' not available", # Specific provider not available
# Standard setup messages - not errors
r"Initializing Gateway: Loading configuration\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
r"Ultimate MCP Server 'simple-demo' initialized",
r"Initializing LLM providers",
r"Configuration not yet loaded\. Loading now\.\.\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
# Logging patterns - not errors
r"INFO \d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.*", # Timestamped INFO logs
]
},
"workflow_delegation_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC provider availability messages - expected initialization info
r"Some API keys missing: \['(openai|anthropic|google)'.*?\]", # Specific API keys warning
r"Provider '(openai|anthropic|google)' not available", # Specific provider not available
r"Failed to initialize provider: Invalid API key or provider configuration", # Specific initialization error
# SPECIFIC initialization messages - expected setup steps
r"Initializing required providers for delegation demo", # Specific initialization message
r"All required API keys seem to be present", # Specific configuration check
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
# Logging patterns - not errors
r"INFO \d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.*", # Timestamped INFO logs
]
},
"cache_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# SPECIFIC operational messages - expected configuration info
r"Cache is disabled \(GATEWAY__CACHE__ENABLED=false\)", # Specific cache configuration message
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"audio_transcription_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues
r"Provider '(openai|anthropic|google)' not available or initialized",
r"Failed to initialize OpenAI provider: Invalid API key",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"entity_relation_graph_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues
r"Provider '(openai|anthropic|google)' not available or initialized",
r"Skipping provider initialization as no API keys are available",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"grok_integration_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues
r"Provider 'grok' not available or initialized",
r"No API key found for Grok",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"html_to_markdown_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues
r"Provider '(openai|anthropic|google)' not available or initialized",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"measure_model_speeds.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues
r"Provider '(openai|anthropic|google|grok|meta)' not available or initialized",
r"No providers could be initialized",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"meta_api_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues
r"Provider 'meta' not available or initialized",
r"No API key found for Meta",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"research_workflow_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues
r"Provider '(openai|anthropic|google)' not available or initialized",
# Search and web access related messages
r"Failed to perform web search: .*",
r"Web search failed: .*",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
"text_classification_demo.py": {
"expected_exit_code": 0,
"allowed_stderr_patterns": [
# Provider availability issues
r"Provider '(openai|anthropic|google)' not available or initialized",
# Standard setup messages - not errors
r"Configuration not yet loaded\. Loading now\.\.\.",
r"Configuration loaded and environment variables applied via decouple\.",
# UI formatting patterns - not errors
r"─+.*─+", # Section dividers
]
},
}
console = Console()
def find_demo_scripts() -> List[Path]:
"""Find all Python demo scripts in the examples directory."""
if not EXAMPLES_DIR.is_dir():
console.print(f"[bold red]Error:[/bold red] Examples directory not found at '{EXAMPLES_DIR}'")
return []
scripts = sorted([
p for p in EXAMPLES_DIR.glob("*.py")
if p.is_file() and p.name not in SCRIPTS_TO_SKIP
])
return scripts
async def run_script(script_path: Path) -> Tuple[int, str, str]:
"""
Run a single Python script as a subprocess and capture its output.
This async function executes a Python script in a separate process using the same
Python interpreter that's running this script. It captures both standard output
and standard error streams, as well as the exit code of the process.
The function uses asyncio.create_subprocess_exec for non-blocking execution,
allowing multiple scripts to be run concurrently if needed, although the current
implementation runs them sequentially.
Args:
script_path (Path): The path to the Python script to be executed.
This should be a fully resolved path object pointing to a valid Python file.
Returns:
Tuple[int, str, str]: A tuple containing:
- exit_code (int): The return code of the process (0 typically means success)
- stdout (str): The captured standard output as a string, with encoding errors ignored
- stderr (str): The captured standard error as a string, with encoding errors ignored
Note:
- The function waits for the script to complete before returning
- Any encoding errors in stdout/stderr are ignored during decoding
- The script is executed with the same Python interpreter as the parent process
- No environment variables or arguments are passed to the script
"""
command = [PYTHON_EXECUTABLE, str(script_path)]
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
exit_code = process.returncode
return exit_code, stdout.decode(errors='ignore'), stderr.decode(errors='ignore')
def check_for_errors(script_name: str, exit_code: int, stdout: str, stderr: str) -> Tuple[bool, str]:
"""
Check script output against predefined expectations to determine success or failure.
This function analyzes the execution results of a demo script and determines if it
succeeded or failed based on:
1. Comparing the actual exit code against the expected exit code for the script
2. Checking for unexpected error messages in stdout and stderr
3. Applying script-specific patterns for allowed errors and warnings
The function uses the DEMO_EXPECTATIONS dictionary to get script-specific expectations
including allowed error patterns. For scripts without specific expectations defined,
it applies default success criteria (exit code 0 and no critical error indicators).
The function handles two types of patterns for allowed output:
- allowed_stderr_patterns: Regex patterns for permitted messages in stderr
- allowed_stdout_patterns: Regex patterns for permitted messages in stdout
Args:
script_name (str): Name of the script being checked (used to lookup expectations)
exit_code (int): The actual exit code returned by the script
stdout (str): The captured standard output from the script
stderr (str): The captured standard error from the script
Returns:
Tuple[bool, str]: A tuple containing:
- success (bool): True if the script execution meets all success criteria
- reason (str): A descriptive message explaining the result
"Success" for successful executions
Error details for failed executions
Note:
- Log messages at INFO, DEBUG, and WARNING levels are generally ignored
unless they match critical error patterns
- Script-specific allowed patterns take precedence over default error indicators
- If no script-specific expectations exist, only the DEFAULT_ERROR_INDICATORS
are used to check for problems
"""
expectations = DEMO_EXPECTATIONS.get(script_name, {})
expected_exit_code = expectations.get("expected_exit_code", 0)
allowed_stderr_patterns = expectations.get("allowed_stderr_patterns", [])
allowed_stdout_patterns = expectations.get("allowed_stdout_patterns", [])
# 1. Check Exit Code
if exit_code != expected_exit_code:
return False, f"Exited with code {exit_code} (expected {expected_exit_code})"
# --- Refined Error Log Checking ---
def find_unexpected_lines(output: str, allowed_patterns: List[str], default_indicators: List[str]) -> List[str]:
"""
Find lines in script output that indicate errors or unexpected behavior.
This function analyzes the output of a script (either stdout or stderr) and
identifies lines that may indicate an error or unexpected behavior. It handles
two different checking modes:
1. With allowed_patterns: All lines that don't match at least one of the allowed
patterns are considered unexpected.
2. Without allowed_patterns: Only lines containing any of the default_indicators
are considered unexpected.
The first mode is more restrictive (whitelist approach) while the second is
more permissive (blacklist approach). The function chooses the appropriate mode
based on whether allowed_patterns is provided.
Args:
output (str): The script output to analyze (either stdout or stderr)
allowed_patterns (List[str]): List of regex patterns for allowed output lines.
If provided, any line not matching at least one pattern is unexpected.
default_indicators (List[str]): List of string indicators of critical errors.
Only used when allowed_patterns is empty, to identify error lines.
Returns:
List[str]: A list of lines from the output that are considered unexpected or
indicative of errors. Empty list means no unexpected lines found.
Note:
- Empty lines are always ignored
- When allowed_patterns is provided, the function uses a whitelist approach
- When allowed_patterns is empty, the function uses a blacklist approach
- Regex matching is used for allowed_patterns, simple substring matching for default_indicators
"""
lines = output.strip().splitlines()
unexpected_lines = []
for line in lines:
line_content = line.strip()
if not line_content: # Skip blank lines
continue
is_allowed = False
# Check against specific allowed patterns for this script
if allowed_patterns:
for pattern in allowed_patterns:
if re.search(pattern, line_content):
is_allowed = True
break
# If specific patterns were defined and line wasn't allowed, it's unexpected
if allowed_patterns and not is_allowed:
unexpected_lines.append(line)
# If no specific patterns were defined, check against default critical indicators only
elif not allowed_patterns:
for indicator in default_indicators:
if indicator in line_content: # Use 'in' for default indicators for simplicity
unexpected_lines.append(line)
break # Found a default indicator, no need to check others for this line
return unexpected_lines
unexpected_stderr = find_unexpected_lines(stderr, allowed_stderr_patterns, DEFAULT_ERROR_INDICATORS)
unexpected_stdout = find_unexpected_lines(stdout, allowed_stdout_patterns, DEFAULT_ERROR_INDICATORS)
# Filter out lines that are just INFO/DEBUG/WARNING level logs unless they are explicitly disallowed
# (This assumes default log format: YYYY-MM-DD HH:MM:SS] LEVEL ...) or rich format
def is_ignorable_log(line: str) -> bool:
"""
Determine if a log line can be safely ignored for error detection.
This function identifies standard INFO, DEBUG, and WARNING level log messages
that should typically be ignored when checking for errors, unless they are
explicitly flagged as problematic by other patterns.
The function recognizes common log line formats:
- Standard timestamp-prefixed format: [YYYY-MM-DD HH:MM:SS] LEVEL message
- Simple level-prefixed format: LEVEL message
Args:
line (str): The log line to analyze
Returns:
bool: True if the line appears to be a standard INFO, DEBUG, or WARNING
log message that can be safely ignored. False otherwise.
Note:
This function only identifies the format of standard log lines;
it doesn't analyze the content of the messages themselves.
"""
line_lower = line.lower() # noqa: F841
return (
re.match(r"^\[\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\]\s+(INFO|DEBUG|WARNING)\s+", line.strip()) or
re.match(r"^\s*(INFO|DEBUG|WARNING)\s+", line.strip())
)
actual_stderr_errors = [line for line in unexpected_stderr if not is_ignorable_log(line)]
actual_stdout_errors = [line for line in unexpected_stdout if not is_ignorable_log(line)]
if actual_stderr_errors:
return False, f"Unexpected errors found in stderr: ...{escape(actual_stderr_errors[0])}..."
if actual_stdout_errors:
return False, f"Unexpected errors found in stdout: ...{escape(actual_stdout_errors[0])}..."
# --- End Refined Error Log Checking ---
# If exit code matches and no unexpected critical errors found
return True, "Success"
def write_script_output_to_log(script_name: str, exit_code: int, stdout: str, stderr: str, is_success: bool):
"""
Write the complete output of a script run to the consolidated log file.
This function appends the execution results of a single script to a consolidated
log file for record-keeping and debugging purposes. The log includes:
- A header with the script name, exit code, and success/failure status
- The complete stdout output captured during execution
- The complete stderr output captured during execution
Each script's log entry is clearly separated with delimiters for easy navigation
and searching within the log file.
Args:
script_name (str): Name of the script that was executed
exit_code (int): The exit code returned by the script
stdout (str): The complete standard output captured during execution
stderr (str): The complete standard error captured during execution
is_success (bool): Whether the script execution was considered successful
according to the check_for_errors criteria
Returns:
None: The function writes to the log file specified by OUTPUT_LOG_FILE
but doesn't return any value
Note:
- The function appends to the log file, preserving previous entries
- If stdout or stderr is empty, a placeholder message is logged
- No limit is placed on the size of the logged output
"""
with open(OUTPUT_LOG_FILE, "a", encoding="utf-8") as log_file:
# Write script header with result
log_file.write(f"\n{'=' * 80}\n")
status = "SUCCESS" if is_success else "FAILURE"
log_file.write(f"SCRIPT: {script_name} - EXIT CODE: {exit_code} - STATUS: {status}\n")
log_file.write(f"{'-' * 80}\n\n")
# Write stdout
log_file.write("STDOUT:\n")
log_file.write(stdout if stdout.strip() else "(No stdout)\n")
log_file.write("\n")
# Write stderr
log_file.write("STDERR:\n")
log_file.write(stderr if stderr.strip() else "(No stderr)\n")
log_file.write("\n")
async def main():
"""
Main function to run all demo scripts and generate a comprehensive report.
This async function coordinates the entire process of:
1. Finding all demo scripts in the examples directory
2. Running each script sequentially and capturing its output
3. Checking each script's result against expected behavior
4. Logging detailed output to a consolidated log file
5. Generating a rich, interactive summary report in the console
The function implements a progress bar display using rich.progress to provide
real-time feedback during execution. After all scripts have run, it displays
a detailed table summarizing the results of each script, including status,
exit code, and relevant output snippets.
The function follows these specific steps:
- Locate Python scripts in the examples directory (skipping certain files)
- Initialize/clear the consolidated log file
- Run each script in sequence, updating the progress bar
- Check each script's output against expectations
- Write detailed output for each script to the log file
- Generate and display a summary table with success/failure indicators
- Display final counts of succeeded and failed scripts
Returns:
int: Exit code for the parent process:
- 0 if all scripts succeed
- 1 if any script fails or if no scripts are found
Note:
- Scripts listed in SCRIPTS_TO_SKIP are excluded from execution
- The function creates a new consolidated log file each time it runs
- Progress information is displayed using a rich progress bar
- The summary table highlights both successful and failed scripts
"""
console.print(Rule("[bold blue]Running All Example Scripts[/bold blue]"))
scripts = find_demo_scripts()
if not scripts:
console.print("[yellow]No demo scripts found to run.[/yellow]")
return 1
console.print(f"Found {len(scripts)} demo scripts in '{EXAMPLES_DIR}'.")
# Initialize/clear the output log file
with open(OUTPUT_LOG_FILE, "w", encoding="utf-8") as log_file:
log_file.write("DEMO SCRIPT CONSOLE OUTPUT LOG\n")
log_file.write(f"Generated by {Path(__file__).name}\n")
log_file.write(f"{'=' * 80}\n\n")
results = []
success_count = 0
fail_count = 0
# --- Progress Bar Setup ---
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}", justify="right"),
BarColumn(bar_width=None),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("({task.completed}/{task.total})"),
console=console,
transient=False # Keep progress bar visible after completion
)
task_id = progress.add_task("[cyan]Running scripts...", total=len(scripts))
with Live(progress, console=console, vertical_overflow="visible"):
for script in scripts:
script_name = script.name
progress.update(task_id, description=f"[cyan]Running {script_name}...")
exit_code, stdout, stderr = await run_script(script)
is_success, reason = check_for_errors(script_name, exit_code, stdout, stderr)
# Log all output to the consolidated log file
write_script_output_to_log(script_name, exit_code, stdout, stderr, is_success)
results.append({
"script": script_name,
"success": is_success,
"reason": reason,
"exit_code": exit_code,
"stdout": stdout,
"stderr": stderr
})
if is_success:
success_count += 1
else:
fail_count += 1
progress.update(task_id, advance=1)
progress.update(task_id, description="[bold green]All scripts finished![/bold green]")
await asyncio.sleep(0.5) # Allow final update to render
# --- Summary Report ---
console.print(Rule("[bold blue]Demo Run Summary[/bold blue]"))
summary_table = Table(title="Script Execution Results", box=box.ROUNDED, show_header=True, header_style="bold magenta")
summary_table.add_column("Script Name", style="cyan", no_wrap=True)
summary_table.add_column("Status", style="white")
summary_table.add_column("Exit Code", style="yellow", justify="right")
summary_table.add_column("Reason / Output Snippet", style="white")
for result in results:
status_icon = "[green]✅ SUCCESS[/green]" if result["success"] else "[bold red]❌ FAILURE[/bold red]"
reason_or_output = result["reason"]
# --- Enhanced Snippet Logic ---
# Prioritize showing snippet related to the failure reason
if not result["success"]:
output_to_search = result["stderr"] + result["stdout"] # Combined output
snippet = ""
# If failure is due to unexpected error message
if "Unexpected errors found" in reason_or_output:
# Extract the specific error shown in the reason
match = re.search(r"Unexpected errors found in (stdout|stderr): \.\.\.(.*)\.\.\.\"?", reason_or_output)
if match:
error_snippet_text = match.group(2).strip()
# Try to find this snippet in the actual output
start_idx = output_to_search.find(error_snippet_text)
if start_idx != -1:
# Find the start of the line containing the snippet
line_start_idx = output_to_search.rfind('\n', 0, start_idx) + 1
lines_around_error = output_to_search[line_start_idx:].splitlines()
snippet = "\n".join(lines_around_error[:5]) # Show 5 lines from error
if len(lines_around_error) > 5:
snippet += "\n..."
# If failure is due to exit code, show end of stderr/stdout
elif "Exited with code" in reason_or_output:
if result["stderr"].strip():
lines = result["stderr"].strip().splitlines()
snippet = "\n".join(lines[-5:]) # Last 5 lines of stderr
elif result["stdout"].strip():
lines = result["stdout"].strip().splitlines()
snippet = "\n".join(lines[-5:]) # Last 5 lines of stdout
# Fallback if no specific snippet found yet for failure
if not snippet:
lines = output_to_search.strip().splitlines()
snippet = "\n".join(lines[-5:]) # Last 5 lines overall
if snippet:
reason_or_output += f"\n---\n[dim]{escape(snippet)}[/dim]"
elif result["success"]:
# Show last few lines of stdout for successful runs
lines = result["stdout"].strip().splitlines()
if lines:
snippet = "\n".join(lines[-3:]) # Show last 3 lines
reason_or_output += f"\n---\n[dim]{escape(snippet)}[/dim]"
else: # Handle case with no stdout
reason_or_output += "\n---\n[dim](No stdout produced)[/dim]"
# --- End Enhanced Snippet Logic ---
summary_table.add_row(
result["script"],
status_icon,
str(result["exit_code"]),
reason_or_output
)
console.print(summary_table)
# --- Final Count ---
console.print(Rule())
total_scripts = len(scripts)
final_message = f"[bold green]{success_count}[/bold green] succeeded, [bold red]{fail_count}[/bold red] failed out of {total_scripts} scripts."
final_color = "green" if fail_count == 0 else "red"
console.print(Panel(final_message, border_style=final_color, expand=False))
console.print(f"\nComplete output log saved to: [cyan]{OUTPUT_LOG_FILE}[/cyan]")
return 1 if fail_count > 0 else 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/logger.py:
--------------------------------------------------------------------------------
```python
"""
Main Logger class for Gateway.
This module provides the central Logger class that integrates all Gateway logging
functionality with a beautiful, informative interface.
"""
import logging
import sys
import time
from contextlib import contextmanager
from datetime import datetime
from functools import wraps
from typing import Any, Dict, List, Optional, Tuple, Union
from rich.console import Console
# Use relative imports for utils within the same package
from .console import console
from .emojis import get_emoji
from .formatter import (
DetailedLogFormatter,
RichLoggingHandler,
SimpleLogFormatter,
)
from .panels import (
CodePanel,
ErrorPanel,
HeaderPanel,
InfoPanel,
ResultPanel,
ToolOutputPanel,
WarningPanel,
)
from .progress import GatewayProgress
# Set up standard Python logging with our custom handler
# Logging configuration is handled externally via dictConfig
class Logger:
"""
Advanced logging system with rich formatting, progress tracking, and structured output.
The Logger class extends Python's standard logging system with enhanced features:
Key Features:
- Rich console output with color, emoji, and formatted panels
- Component-based logging for better organization of log messages
- Operation tracking with timing and progress visualization
- Multi-level logging (debug, info, success, warning, error, critical)
- Context data capture for more detailed debugging
- Integrated progress bars and spinners for long-running operations
- Special formatters for code blocks, results, errors, and warnings
Integration with Python's logging:
- Builds on top of the standard logging module
- Compatible with external logging configuration (e.g., dictConfig)
- Properly propagates logs to ensure they reach root handlers
- Adds custom "extra" fields to standard LogRecord objects
Usage Patterns:
- Create loggers with get_logger() for consistent naming
- Use component and operation parameters to organize related logs
- Add context data as structured information with message
- Use special display methods (code, warning_panel, etc.) for rich output
- Track long operations with time_operation and progress tracking
This logger is designed to make complex server operations more transparent,
providing clear information for both developers and users of the Ultimate MCP Server.
"""
def __init__(
self,
name: str = "ultimate", # Default logger name changed
console: Optional[Console] = None,
level: str = "info",
show_timestamps: bool = True,
component: Optional[str] = None,
capture_output: bool = False,
):
"""Initialize the logger.
Args:
name: Logger name
console: Rich console to use
level: Initial log level
show_timestamps: Whether to show timestamps in logs
component: Default component name
capture_output: Whether to capture and store log output
"""
self.name = name
# Use provided console or get global console, defaulting to stderr console
if console is not None:
self.console = console
else:
global_console = globals().get("console")
if global_console is not None:
self.console = global_console
else:
self.console = Console(file=sys.stderr)
self.level = level.lower()
self.show_timestamps = show_timestamps
self.component = component
self.capture_output = capture_output
# Create a standard Python logger
self.python_logger = logging.getLogger(name)
# Set up formatters
self.simple_formatter = SimpleLogFormatter(show_time=show_timestamps, show_level=True, show_component=True)
self.detailed_formatter = DetailedLogFormatter(show_time=show_timestamps, show_level=True, show_component=True)
# Progress tracker
self.progress = GatewayProgress(console=self.console)
# Output capture if enabled
self.captured_logs = [] if capture_output else None
# Restore propagation to allow messages to reach root handlers
# Make sure this is True so logs configured via dictConfig are passed up
self.python_logger.propagate = True
# Set initial log level on the Python logger instance
# Note: The effective level will be determined by the handler/root config
self.set_level(level)
def set_level(self, level: str) -> None:
"""Set the log level.
Args:
level: Log level (debug, info, warning, error, critical)
"""
level = level.lower()
self.level = level # Store the intended level for should_log checks
# Map to Python logging levels
level_map = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
python_level = level_map.get(level, logging.INFO)
# Set level on the logger itself. Handlers might have their own levels.
self.python_logger.setLevel(python_level)
def get_level(self) -> str:
"""Get the current log level.
Returns:
Current log level
"""
# Return the Python logger's effective level
effective_level_num = self.python_logger.getEffectiveLevel()
level_map_rev = {
logging.DEBUG: "debug",
logging.INFO: "info",
logging.WARNING: "warning",
logging.ERROR: "error",
logging.CRITICAL: "critical",
}
return level_map_rev.get(effective_level_num, "info")
def should_log(self, level: str) -> bool:
"""Check if a message at the given level should be logged based on Python logger's effective level.
Args:
level: Log level to check
Returns:
Whether messages at this level should be logged
"""
level_map = {
"debug": logging.DEBUG,
"info": logging.INFO,
"success": logging.INFO, # Map success to info for level check
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
message_level_num = level_map.get(level.lower(), logging.INFO)
return self.python_logger.isEnabledFor(message_level_num)
def _log(
self,
level: str,
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
emoji: Optional[str] = None,
emoji_key: Optional[str] = None, # Add emoji_key parameter
context: Optional[Dict[str, Any]] = None,
use_detailed_formatter: bool = False, # This arg seems unused now?
exception_info: Optional[Union[bool, Tuple]] = None,
stack_info: bool = False,
extra: Optional[Dict[str, Any]] = None,
) -> None:
"""Internal method to handle logging via the standard Python logging mechanism.
Args:
level: Log level
message: Log message
component: Gateway component (core, composite, analysis, etc.)
operation: Operation being performed
emoji: Custom emoji override
emoji_key: Key to look up emoji from emoji map (alternative to emoji)
context: Additional contextual data
exception_info: Include exception info (True/False or tuple)
stack_info: Include stack info
extra: Dictionary passed as extra to logging framework
"""
# Check if we should log at this level using standard Python logging check
# No need for the custom should_log method here if using stdlib correctly
# Map level name to Python level number
level_map = {
"debug": logging.DEBUG,
"info": logging.INFO,
"success": logging.INFO, # Log success as INFO
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
level_num = level_map.get(level.lower(), logging.INFO)
if not self.python_logger.isEnabledFor(level_num):
return
# Use default component if not provided
component = component or self.component
# If emoji_key is provided, use it to determine emoji
if emoji_key and not emoji:
emoji = get_emoji("operation", emoji_key)
if emoji == "❓": # If operation emoji not found
# Try level emojis
from .emojis import LEVEL_EMOJIS
emoji = LEVEL_EMOJIS.get(emoji_key, "❓")
# Prepare 'extra' dict for LogRecord
log_extra = {} if extra is None else extra.copy() # Create a copy to avoid modifying the original
# Remove any keys that conflict with built-in LogRecord attributes
for reserved_key in ['message', 'asctime', 'exc_info', 'exc_text', 'lineno', 'funcName', 'created', 'levelname', 'levelno']:
if reserved_key in log_extra:
del log_extra[reserved_key]
# Add our custom keys
log_extra['component'] = component
log_extra['operation'] = operation
log_extra['custom_emoji'] = emoji
log_extra['log_context'] = context # Use a different key to avoid collision
log_extra['gateway_level'] = level # Pass the original level name if needed by formatter
# Handle exception info
exc_info = None
if exception_info:
if isinstance(exception_info, bool):
exc_info = sys.exc_info()
else:
exc_info = exception_info # Assume it's a valid tuple
# Log through Python's logging system
self.python_logger.log(
level=level_num,
msg=message,
exc_info=exc_info,
stack_info=stack_info,
extra=log_extra
)
# Capture if enabled
if self.captured_logs is not None:
self.captured_logs.append({
"level": level,
"message": message,
"component": component,
"operation": operation,
"timestamp": datetime.now().isoformat(),
"context": context,
})
# --- Standard Logging Methods ---
def debug(
self,
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
emoji_key: Optional[str] = None,
**kwargs
) -> None:
"""Log a debug message."""
self._log("debug", message, component, operation, context=context, emoji_key=emoji_key, extra=kwargs)
def info(
self,
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
emoji_key: Optional[str] = None,
**kwargs
) -> None:
"""Log an info message."""
self._log("info", message, component, operation, context=context, emoji_key=emoji_key, extra=kwargs)
def success(
self,
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
emoji_key: Optional[str] = None,
**kwargs
) -> None:
"""Log a success message."""
self._log("success", message, component, operation, context=context, emoji_key=emoji_key, extra=kwargs)
def warning(
self,
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
emoji_key: Optional[str] = None,
# details: Optional[List[str]] = None, # Details handled by panel methods
**kwargs
) -> None:
"""Log a warning message."""
self._log("warning", message, component, operation, context=context, emoji_key=emoji_key, extra=kwargs)
def error(
self,
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
exception: Optional[Exception] = None,
emoji_key: Optional[str] = None,
# error_code: Optional[str] = None,
# resolution_steps: Optional[List[str]] = None,
**kwargs
) -> None:
"""Log an error message."""
# Get the exception info tuple if an exception was provided
exc_info = None
if exception is not None:
exc_info = (type(exception), exception, exception.__traceback__)
elif 'exc_info' in kwargs:
exc_info = kwargs.pop('exc_info') # Remove from kwargs to prevent conflicts
self._log("error", message, component, operation, context=context,
exception_info=exc_info, emoji_key=emoji_key, extra=kwargs)
def critical(
self,
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
exception: Optional[Exception] = None,
emoji_key: Optional[str] = None,
# error_code: Optional[str] = None, # Pass via context or kwargs
**kwargs
) -> None:
"""Log a critical message."""
# Get the exception info tuple if an exception was provided
exc_info = None
if exception is not None:
exc_info = (type(exception), exception, exception.__traceback__)
elif 'exc_info' in kwargs:
exc_info = kwargs.pop('exc_info') # Remove from kwargs to prevent conflicts
self._log("critical", message, component, operation, context=context,
exception_info=exc_info, emoji_key=emoji_key, extra=kwargs)
# --- Rich Display Methods ---
# These methods use the console directly or generate renderables
# They might bypass the standard logging flow, or log additionally
def operation(
self,
operation: str,
message: str,
component: Optional[str] = None,
level: str = "info",
context: Optional[Dict[str, Any]] = None,
**kwargs
) -> None:
"""Log an operation-specific message.
Args:
operation: Operation name
message: Log message
component: Gateway component
level: Log level (default: info)
context: Additional context
**kwargs: Extra fields for logging
"""
self._log(level, message, component, operation, context=context, extra=kwargs)
def tool(
self,
tool: str,
command: str,
output: str,
status: str = "success",
duration: Optional[float] = None,
component: Optional[str] = None,
**kwargs
) -> None:
"""Display formatted output from a tool.
Args:
tool: Name of the tool
command: Command executed
output: Tool output
status: Execution status (success, error)
duration: Execution duration in seconds
component: Gateway component
**kwargs: Extra fields for logging
"""
# Optionally log the event
log_level = "error" if status == "error" else "debug"
log_message = f"Tool '{tool}' finished (status: {status})"
log_context = {"command": command, "output_preview": output[:100] + "..." if len(output) > 100 else output}
if duration is not None:
log_context["duration_s"] = duration
self._log(log_level, log_message, component, operation=f"tool.{tool}", context=log_context, extra=kwargs)
# Display the panel directly on the console
panel = ToolOutputPanel(tool, command, output, status, duration)
self.console.print(panel)
def code(
self,
code: str,
language: str = "python",
title: Optional[str] = None,
line_numbers: bool = True,
highlight_lines: Optional[List[int]] = None,
message: Optional[str] = None,
component: Optional[str] = None,
level: str = "debug",
**kwargs
) -> None:
"""Display a code block.
Args:
code: Code string
language: Language for syntax highlighting
title: Optional title for the panel
line_numbers: Show line numbers
highlight_lines: Lines to highlight
message: Optional message to log alongside displaying the code
component: Gateway component
level: Log level for the optional message (default: debug)
**kwargs: Extra fields for logging
"""
if message:
self._log(level, message, component, context={"code_preview": code[:100] + "..." if len(code) > 100 else code}, extra=kwargs)
# Display the panel directly
panel = CodePanel(code, language, title, line_numbers, highlight_lines)
self.console.print(panel)
def display_results(
self,
title: str,
results: Union[List[Dict[str, Any]], Dict[str, Any]],
status: str = "success",
component: Optional[str] = None,
show_count: bool = True,
compact: bool = False,
message: Optional[str] = None,
level: str = "info",
**kwargs
) -> None:
"""Display results in a formatted panel.
Args:
title: Panel title
results: Results data
status: Status (success, warning, error)
component: Gateway component
show_count: Show count in title
compact: Use compact format
message: Optional message to log
level: Log level for the optional message (default: info)
**kwargs: Extra fields for logging
"""
if message:
self._log(level, message, component, context={"result_count": len(results) if isinstance(results, list) else 1, "status": status}, extra=kwargs)
# Display the panel directly
panel = ResultPanel(title, results, status, component, show_count, compact)
self.console.print(panel)
def section(
self,
title: str,
subtitle: Optional[str] = None,
component: Optional[str] = None,
) -> None:
"""Display a section header.
Args:
title: Section title
subtitle: Optional subtitle
component: Gateway component
"""
# This is purely presentational, doesn't log typically
panel = HeaderPanel(title, subtitle, component=component)
self.console.print(panel)
def info_panel(
self,
title: str,
content: Union[str, List[str], Dict[str, Any]],
icon: Optional[str] = None,
style: str = "info",
component: Optional[str] = None,
) -> None:
"""Display an informational panel.
Args:
title: Panel title
content: Panel content
icon: Optional icon
style: Panel style
component: Gateway component
"""
# Could log the title/content summary if desired
# self._log("info", f"Displaying info panel: {title}", component)
panel = InfoPanel(title, content, icon, style)
self.console.print(panel)
def warning_panel(
self,
title: Optional[str] = None,
message: str = "",
details: Optional[List[str]] = None,
component: Optional[str] = None,
) -> None:
"""Display a warning panel.
Args:
title: Optional panel title
message: Warning message
details: Optional list of detail strings
component: Gateway component
"""
# Log the warning separately
log_title = title if title else "Warning"
self.warning(f"{log_title}: {message}", component, context={"details": details})
# Display the panel directly
panel = WarningPanel(title, message, details)
self.console.print(panel)
def error_panel(
self,
title: Optional[str] = None,
message: str = "",
details: Optional[str] = None,
resolution_steps: Optional[List[str]] = None,
error_code: Optional[str] = None,
component: Optional[str] = None,
exception: Optional[Exception] = None,
) -> None:
"""Display an error panel.
Args:
title: Optional panel title
message: Error message
details: Optional detail string (e.g., traceback)
resolution_steps: Optional list of resolution steps
error_code: Optional error code
component: Gateway component
exception: Associated exception (for logging traceback)
"""
# Log the error separately
log_title = title if title else "Error"
log_context = {
"details": details,
"resolution": resolution_steps,
"error_code": error_code,
}
self.error(f"{log_title}: {message}", component, context=log_context, exception=exception)
# Display the panel directly
panel = ErrorPanel(title, message, details, resolution_steps, error_code)
self.console.print(panel)
# --- Context Managers & Decorators ---
@contextmanager
def time_operation(
self,
operation: str,
component: Optional[str] = None,
level: str = "info",
start_message: Optional[str] = "Starting {operation}...",
end_message: Optional[str] = "Finished {operation} in {duration:.2f}s",
**kwargs
):
"""
Context manager that times an operation and logs its start and completion.
This method provides a clean, standardized way to track and log the duration
of operations, ensuring consistent timing measurement and log formatting.
It automatically logs the start of an operation, executes the operation
within the context, measures the exact duration, and logs the completion
with timing information.
The timing uses Python's monotonic clock for accurate duration measurement
even if system time changes during execution. Both start and end messages
support templating with format string syntax, allowing customization while
maintaining consistency.
Key features:
- Precise operation timing with monotonic clock
- Automatic logging at start and end of operations
- Customizable message templates
- Consistent log format and metadata
- Exception-safe timing (duration is logged even if operation fails)
- Hierarchical operation tracking when combined with component parameter
Usage Examples:
```python
# Basic usage
with logger.time_operation("data_processing"):
process_large_dataset()
# Custom messages and different log level
with logger.time_operation(
operation="database_backup",
component="storage",
level="debug",
start_message="Starting backup of {operation}...",
end_message="Backup of {operation} completed in {duration:.3f}s"
):
backup_database()
# Timing nested operations with different components
with logger.time_operation("parent_task", component="scheduler"):
do_first_part()
with logger.time_operation("child_task", component="worker"):
do_second_part()
finish_task()
```
Args:
operation: Name of the operation being timed
component: Component performing the operation (uses logger default if None)
level: Log level for start/end messages (default: "info")
start_message: Template string for operation start message
(None to skip start logging)
end_message: Template string for operation end message
(None to skip end logging)
**kwargs: Additional fields to include in log entries
Yields:
None
Note:
This context manager is exception-safe: the end message with duration
is logged even if an exception occurs within the context. Exceptions
are re-raised normally after logging.
"""
start_time = time.monotonic()
if start_message:
self._log(level, start_message.format(operation=operation), component, operation, extra=kwargs)
try:
yield
finally:
duration = time.monotonic() - start_time
if end_message:
self._log(level, end_message.format(operation=operation, duration=duration), component, operation, context={"duration_s": duration}, extra=kwargs)
def track(
self,
iterable: Any,
description: str,
name: Optional[str] = None,
total: Optional[int] = None,
parent: Optional[str] = None,
# Removed component - handled by logger instance
) -> Any:
"""Track progress over an iterable using the logger's progress tracker.
Args:
iterable: Iterable to track
description: Description of the task
name: Optional task name (defaults to description)
total: Optional total number of items
parent: Optional parent task name
Returns:
The iterable wrapped with progress tracking
"""
return self.progress.track(iterable, description, name, total, parent)
@contextmanager
def task(
self,
description: str,
name: Optional[str] = None,
total: int = 100,
parent: Optional[str] = None,
# Removed component - handled by logger instance
autostart: bool = True,
):
"""
Context manager for tracking and displaying progress of a task.
This method creates a rich progress display for long-running tasks, providing
visual feedback and real-time status updates. It integrates with rich's
progress tracking to show animated spinners, completion percentage, and
elapsed/remaining time.
The task progress tracker is particularly useful for operations like:
- File processing (uploads, downloads, parsing)
- Batch database operations
- Multi-step data processing pipelines
- API calls with multiple sequential requests
- Any operation where progress feedback improves user experience
The progress display automatically adapts to terminal width and supports
nested tasks with parent-child relationships, allowing for complex operation
visualization. Progress can be updated manually within the context.
Key Features:
- Real-time progress visualization with percentage completion
- Automatic elapsed and remaining time estimation
- Support for nested tasks and task hierarchies
- Customizable description and task identification
- Thread-safe progress updates
- Automatic completion on context exit
Usage Examples:
```python
# Basic usage - process 50 items
with logger.task("Processing files", total=50) as task:
for i, file in enumerate(files):
process_file(file)
task.update(advance=1) # Increment progress by 1
# Nested tasks with parent-child relationship
with logger.task("Main import", total=100) as main_task:
# Process users (contributes 30% to main task)
with logger.task("Importing users", total=len(users), parent=main_task.id) as subtask:
for user in users:
import_user(user)
subtask.update(advance=1)
main_task.update(advance=30) # Users complete = 30% of main task
# Process products (contributes 70% to main task)
with logger.task("Importing products", total=len(products), parent=main_task.id) as subtask:
for product in products:
import_product(product)
subtask.update(advance=1)
main_task.update(advance=70) # Products complete = 70% of main task
```
Args:
description: Human-readable description of the task
name: Unique identifier for the task (defaults to description if None)
total: Total number of steps/work units for completion (100%)
parent: ID of parent task (for nested task hierarchies)
autostart: Automatically start displaying progress (default: True)
Yields:
GatewayProgress instance that can be used to update progress
Notes:
- The yielded progress object has methods like `update(advance=N)` to
increment progress and `update(total=N)` to adjust the total units.
- Tasks are automatically completed when the context exits, even if
an exception occurs.
- For tasks without a clear number of steps, you can use update with
a percentage value: `task.update(completed=50)` for 50% complete.
"""
with self.progress.task(description, name, total, parent, autostart) as task_context:
yield task_context
@contextmanager
def catch_and_log(
self,
component: Optional[str] = None,
operation: Optional[str] = None,
reraise: bool = True,
level: str = "error",
message: str = "An error occurred during {operation}",
):
"""
Context manager that catches, logs, and optionally re-raises exceptions.
This utility provides structured exception handling with automatic logging,
allowing code to maintain a consistent error handling pattern while ensuring
all exceptions are properly logged with relevant context information. It's
particularly useful for operations where you want to ensure errors are always
recorded, even if they'll be handled or suppressed at a higher level.
The context manager wraps a block of code and:
1. Executes the code normally
2. Catches any exceptions that occur
3. Logs the exception with configurable component, operation, and message
4. Optionally re-raises the exception (controlled by the reraise parameter)
This prevents "silent failures" and ensures consistent logging of all errors
while preserving the original exception's traceback for debugging purposes.
Key features:
- Standardized error logging across the application
- Configurable log level for different error severities
- Component and operation tagging for error categorization
- Template-based error messages with operation name substitution
- Control over exception propagation behavior
Usage Examples:
```python
# Basic usage - catch, log, and re-raise
with logger.catch_and_log(component="auth", operation="login"):
user = authenticate_user(username, password)
# Suppress exception after logging
with logger.catch_and_log(
component="email",
operation="send_notification",
reraise=False,
level="warning",
message="Failed to send notification email for {operation}"
):
send_email(user.email, "Welcome!", template="welcome")
# Use as a safety net around cleanup code
try:
# Main operation
process_file(file_path)
finally:
# Always log errors in cleanup but don't let them mask the main exception
with logger.catch_and_log(reraise=False, level="warning"):
os.remove(temp_file)
```
Args:
component: Component name for error categorization (uses logger default if None)
operation: Operation name for context (substituted in message template)
reraise: Whether to re-raise the caught exception (default: True)
level: Log level to use for the error message (default: "error")
message: Template string for the error message, with {operation} placeholder
Yields:
None
Note:
When reraise=False, exceptions are completely suppressed after logging.
This can be useful for non-critical operations like cleanup tasks,
but should be used carefully to avoid hiding important errors.
"""
component = component or self.component
operation = operation or "operation"
try:
yield
except Exception:
log_msg = message.format(operation=operation)
self._log(level, log_msg, component, operation, exception_info=True)
if reraise:
raise
def log_call(
self,
component: Optional[str] = None,
operation: Optional[str] = None,
level: str = "debug",
log_args: bool = True,
log_result: bool = False,
log_exceptions: bool = True,
):
"""
Decorator that logs function entries, exits, timing, and exceptions.
This decorator provides automatic instrumentation for function calls,
generating standardized log entries when functions are called and when they
complete. It tracks execution time, captures function arguments and results,
and properly handles and logs exceptions.
When applied to a function, it will:
1. Log when the function is entered, optionally including arguments
2. Execute the function normally
3. Track the exact execution time using a monotonic clock
4. Log function completion with duration, optionally including the return value
5. Catch, log, and re-raise any exceptions that occur
This is particularly valuable for:
- Debugging complex call flows and function interaction
- Performance analysis and identifying slow function calls
- Audit trails of function execution and parameters
- Troubleshooting intermittent issues with full context
- Standardizing logging across large codebases
Configuration Options:
- Logging level can be adjusted based on function importance
- Function arguments can be optionally included or excluded (for privacy/size)
- Return values can be optionally captured (for debugging/audit)
- Exception handling can be customized
- Component and operation names provide hierarchical organization
Usage Examples:
```python
# Basic usage - log entry and exit at debug level
@logger.log_call()
def process_data(item_id, options=None):
# Function implementation...
return result
# Customized - log as info level, include specific operation name
@logger.log_call(
component="billing",
operation="payment_processing",
level="info"
)
def process_payment(payment_id, amount):
# Process payment...
return receipt_id
# Capture return values but not arguments (e.g., for sensitive data)
@logger.log_call(
level="debug",
log_args=False,
log_result=True
)
def validate_credentials(username, password):
# Validate credentials without logging the password
return is_valid
# Detailed debugging for critical components
@logger.log_call(
component="auth",
operation="token_verification",
level="debug",
log_args=True,
log_result=True,
log_exceptions=True
)
def verify_auth_token(token):
# Verify token with full logging
return token_data
```
Args:
component: Component name for logs (defaults to logger's component)
operation: Operation name for logs (defaults to function name)
level: Log level for entry/exit messages (default: "debug")
log_args: Whether to log function arguments (default: True)
log_result: Whether to log function return value (default: False)
log_exceptions: Whether to log exceptions (default: True)
Returns:
Decorated function that logs entry, exit, and timing information
Notes:
- For functions with large or sensitive arguments, set log_args=False
- When log_result=True, be cautious with functions returning large data
structures as they will be truncated but may still impact performance
- This decorator preserves the original function's name, docstring,
and signature for compatibility with introspection tools
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Determine operation name
op_name = operation or func.__name__
comp_name = component or self.component
# Log entry
entry_msg = f"Entering {op_name}..."
context = {}
if log_args:
# Be careful logging args, could contain sensitive info or be large
try:
arg_repr = f"args={args!r}, kwargs={kwargs!r}"
context['args'] = arg_repr[:200] + '...' if len(arg_repr) > 200 else arg_repr
except Exception:
context['args'] = "<Could not represent args>"
self._log(level, entry_msg, comp_name, op_name, context=context)
start_time = time.monotonic()
try:
result = func(*args, **kwargs)
duration = time.monotonic() - start_time
# Log exit
exit_msg = f"Exiting {op_name} (duration: {duration:.3f}s)"
exit_context = {"duration_s": duration}
if log_result:
try:
res_repr = repr(result)
exit_context['result'] = res_repr[:200] + '...' if len(res_repr) > 200 else res_repr
except Exception:
exit_context['result'] = "<Could not represent result>"
self._log(level, exit_msg, comp_name, op_name, context=exit_context)
return result
except Exception as e:
duration = time.monotonic() - start_time
if log_exceptions:
exc_level = "error" # Always log exceptions as error?
exc_msg = f"Exception in {op_name} after {duration:.3f}s: {e}"
exc_context = {"duration_s": duration}
if log_args: # Include args context if available
exc_context.update(context)
self._log(exc_level, exc_msg, comp_name, op_name, exception_info=True, context=exc_context)
raise
return wrapper
return decorator
# --- Startup/Shutdown Methods ---
def startup(
self,
version: str,
component: Optional[str] = None,
mode: str = "standard",
context: Optional[Dict[str, Any]] = None,
**kwargs
) -> None:
"""Log server startup information.
Args:
version: Server version
component: Component name (usually None for global startup)
mode: Performance mode
context: Additional startup context
**kwargs: Extra fields for logging
"""
message = f"Starting Server (Version: {version}, Mode: {mode})"
emoji = get_emoji("system", "startup")
self.info(message, component, operation="startup", emoji=emoji, context=context, **kwargs)
def shutdown(
self,
component: Optional[str] = None,
duration: Optional[float] = None,
context: Optional[Dict[str, Any]] = None,
**kwargs
) -> None:
"""Log server shutdown information.
Args:
component: Component name
duration: Optional uptime duration
context: Additional shutdown context
**kwargs: Extra fields for logging
"""
message = "Server Shutting Down"
if duration is not None:
message += f" (Uptime: {duration:.2f}s)"
emoji = get_emoji("system", "shutdown")
self.info(message, component, operation="shutdown", emoji=emoji, context=context, **kwargs)
# --- Global Convenience Functions ---
# These use the global 'logger' instance created in __init__.py
# At the global level, declare logger as None initially
logger = None
def get_logger(name: str) -> Logger:
"""
Get or create a logger instance for a specific component or module.
This function creates a properly named Logger instance following the application's
logging hierarchy and naming conventions. It serves as the primary entry point
for obtaining loggers throughout the application, ensuring consistent logger
configuration and behavior.
The function implements a pseudo-singleton pattern for the default logger:
- The first call initializes a global default logger
- Each subsequent call creates a new named logger instance
- The name parameter establishes the logger's identity in the logging hierarchy
Logger Naming Conventions:
Logger names should follow Python's module path pattern, where dots separate
hierarchy levels. The recommended practice is to use:
- The module's __name__ variable in most cases
- Explicit names for specific subsystems or components
Examples:
- "ultimate_mcp_server.core.state_store"
- "ultimate_mcp_server.services.rag"
- "ultimate_mcp_server.tools.local_text"
Args:
name: Logger name that identifies the component, module, or subsystem
Usually set to the module's __name__ or a specific component identifier
Returns:
A configured Logger instance with the specified name
Usage Examples:
```python
# Standard usage in a module
logger = get_logger(__name__)
# Component-specific logger
auth_logger = get_logger("ultimate_mcp_server.auth")
# Usage with structured logging
logger = get_logger("my_module")
logger.info("User action",
component="auth",
operation="login",
context={"user_id": user.id})
```
Note:
While each call returns a new Logger instance, they all share the underlying
Python logging configuration and output destinations. This allows for
centralized control of log levels, formatting, and output handlers through
standard logging configuration.
"""
# Initialize the global logger if needed
global logger
if logger is None:
logger = Logger(name)
# Return a new logger with the requested name
return Logger(name)
# Helper functions for global usage
def debug(
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
emoji_key: Optional[str] = None,
**kwargs
) -> None:
"""Forward to default logger's debug method."""
# Ensure logger is initialized
global logger
if logger is None:
logger = Logger(__name__)
logger.debug(message, component, operation, context, emoji_key=emoji_key, **kwargs)
def info(
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
emoji_key: Optional[str] = None,
**kwargs
) -> None:
"""Forward to default logger's info method."""
# Ensure logger is initialized
global logger
if logger is None:
logger = Logger(__name__)
logger.info(message, component, operation, context, emoji_key=emoji_key, **kwargs)
def success(
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
emoji_key: Optional[str] = None,
**kwargs
) -> None:
"""Forward to default logger's success method."""
# Ensure logger is initialized
global logger
if logger is None:
logger = Logger(__name__)
logger.success(message, component, operation, context, emoji_key=emoji_key, **kwargs)
def warning(
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
emoji_key: Optional[str] = None,
# details: Optional[List[str]] = None,
**kwargs
) -> None:
"""Forward to default logger's warning method."""
# Ensure logger is initialized
global logger
if logger is None:
logger = Logger(__name__)
logger.warning(message, component, operation, context, emoji_key=emoji_key, **kwargs)
def error(
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
exception: Optional[Exception] = None,
emoji_key: Optional[str] = None,
# error_code: Optional[str] = None,
# resolution_steps: Optional[List[str]] = None,
**kwargs
) -> None:
"""Forward to default logger's error method."""
# Ensure logger is initialized
global logger
if logger is None:
logger = Logger(__name__)
# Handle exc_info specially to prevent conflicts
exc_info = kwargs.pop('exc_info', None) if 'exc_info' in kwargs else None
logger.error(message, component, operation, context,
exception=exception, emoji_key=emoji_key,
**{**kwargs, 'exc_info': exc_info} if exc_info is not None else kwargs)
def critical(
message: str,
component: Optional[str] = None,
operation: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
exception: Optional[Exception] = None,
emoji_key: Optional[str] = None,
# error_code: Optional[str] = None,
**kwargs
) -> None:
"""Forward to default logger's critical method."""
# Ensure logger is initialized
global logger
if logger is None:
logger = Logger(__name__)
# Handle exc_info specially to prevent conflicts
exc_info = kwargs.pop('exc_info', None) if 'exc_info' in kwargs else None
logger.critical(message, component, operation, context,
exception=exception, emoji_key=emoji_key,
**{**kwargs, 'exc_info': exc_info} if exc_info is not None else kwargs)
def section(
title: str,
subtitle: Optional[str] = None,
component: Optional[str] = None,
) -> None:
"""Display a section header using the global logger's console."""
# Ensure logger is initialized
global logger
if logger is None:
logger = Logger(__name__)
logger.section(title, subtitle, component)
# Example Usage (if run directly)
if __name__ == '__main__':
# Example of how the logger might be configured and used
# Normally configuration happens via dictConfig in main entry point
# For standalone testing, we can add a handler manually
test_logger = Logger("test_logger", level="debug") # Create instance
test_logger.python_logger.addHandler(RichLoggingHandler(console=console))
# Need to prevent propagation if manually adding handler here for test
test_logger.python_logger.propagate = False
test_logger.section("Initialization", "Setting up components")
test_logger.startup(version="1.0.0", mode="test")
test_logger.debug("This is a debug message", component="core", operation="setup")
test_logger.info("This is an info message", component="api")
test_logger.success("Operation completed successfully", component="worker", operation="process_data")
test_logger.warning("Something looks suspicious", component="cache", context={"key": "user:123"})
try:
x = 1 / 0
except ZeroDivisionError as e:
test_logger.error("An error occurred", component="math", operation="divide", exception=e)
test_logger.critical("System unstable!", component="core", context={"reason": "disk full"})
test_logger.info_panel("Configuration", {"host": "localhost", "port": 8013}, component="core")
test_logger.warning_panel("Cache Alert", "Cache nearing capacity", details=["Size: 95MB", "Limit: 100MB"], component="cache")
test_logger.error_panel("DB Connection Failed", "Could not connect to database", details="Connection timed out after 5s", resolution_steps=["Check DB server status", "Verify credentials"], error_code="DB500", component="db")
test_logger.tool("grep", "grep 'error' log.txt", "line 1: error found\nline 5: error processing", status="success", duration=0.5, component="analysis")
test_logger.code("def hello():\n print('Hello')", language="python", title="Example Code", component="docs")
with test_logger.time_operation("long_process", component="worker"):
time.sleep(0.5)
with test_logger.task("Processing items", total=10) as p:
for _i in range(10):
time.sleep(0.05)
p.update_task(p.current_task_id, advance=1) # Assuming task context provides task_id
@test_logger.log_call(component="utils", log_result=True)
def add_numbers(a, b):
return a + b
add_numbers(5, 3)
test_logger.shutdown(duration=123.45)
__all__ = [
"critical",
"debug",
"error",
"get_logger",
"info",
"logger", # Add logger to exported names
"warning",
]
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/config.py:
--------------------------------------------------------------------------------
```python
"""
Configuration management for Ultimate MCP Server.
Handles loading, validation, and access to configuration settings
from environment variables and config files.
"""
import json
import logging
import os
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
import yaml
from decouple import Config as DecoupleConfig
from decouple import RepositoryEnv, UndefinedValueError
from pydantic import BaseModel, Field, ValidationError, field_validator
# from pydantic_settings import BaseSettings, SettingsConfigDict # Removed BaseSettings
# --- Decouple Config Instance ---
# This will read from .env file and environment variables
decouple_config = DecoupleConfig(RepositoryEnv('.env'))
# --------------------------------
# Default configuration file paths (Adapt as needed)
DEFAULT_CONFIG_PATHS = [
"./gateway_config.yaml",
"./gateway_config.yml",
"./gateway_config.json",
"~/.config/ultimate_mcp_server/config.yaml",
"~/.ultimate_mcp_server.yaml",
]
# Environment variable prefix (still potentially useful for non-secret env vars)
ENV_PREFIX = "GATEWAY_"
# Global configuration instance
_config = None
# Basic logger for config loading issues before full logging is set up
config_logger = logging.getLogger("ultimate_mcp_server.config")
handler = logging.StreamHandler(sys.stderr)
if not config_logger.hasHandlers():
config_logger.addHandler(handler)
config_logger.setLevel(logging.INFO)
class ServerConfig(BaseModel):
"""
HTTP server configuration settings for the Ultimate MCP Server.
This configuration class defines the core server parameters including network binding,
performance settings, debugging options, and server identity information. It controls
how the Ultimate MCP Server presents itself on the network and manages HTTP connections,
especially when running in SSE (Server-Sent Events) mode.
Settings defined here affect:
- Where and how the server listens for connections (host, port)
- How many concurrent workers are spawned to handle requests
- Cross-origin resource sharing (CORS) for web clients
- Logging verbosity level
- Debug capabilities for development
Most of these settings can be overridden at startup using environment variables
or command-line arguments when launching the server.
All values have sensible defaults suitable for local development. For production
deployments, it's recommended to adjust host, port, workers, and CORS settings
based on your specific requirements.
"""
name: str = Field("Ultimate MCP Server", description="Name of the server")
host: str = Field("127.0.0.1", description="Host to bind the server to")
port: int = Field(8013, description="Port to bind the server to") # Default port changed
workers: int = Field(1, description="Number of worker processes")
debug: bool = Field(False, description="Enable debug mode (affects reload)")
cors_origins: List[str] = Field(default_factory=lambda: ["*"], description="CORS allowed origins") # Use default_factory for mutable defaults
log_level: str = Field("info", description="Logging level (debug, info, warning, error, critical)")
version: str = Field("0.1.0", description="Server version (from config, not package)")
@field_validator('log_level')
@classmethod
def validate_log_level(cls, v):
"""
Validate and normalize the log level configuration value.
This validator ensures that the log_level field contains a valid logging level string.
It performs two key functions:
1. Validation: Checks that the provided value is one of the allowed logging levels
(debug, info, warning, error, critical). If the value is invalid, it raises a
ValidationError with a clear message listing the allowed values.
2. Normalization: Converts the input to lowercase to ensure consistent handling
regardless of how the value was specified in configuration sources. This allows
users to specify the level in any case (e.g., "INFO", "info", "Info") and have
it properly normalized.
Args:
v: The raw log_level value from the configuration source (file, env var, etc.)
Returns:
str: The validated and normalized (lowercase) log level string
Raises:
ValueError: If the provided value is not one of the allowed logging levels
Example:
>>> ServerConfig.validate_log_level("INFO")
'info'
>>> ServerConfig.validate_log_level("warning")
'warning'
>>> ServerConfig.validate_log_level("invalid")
ValueError: Log level must be one of ['debug', 'info', 'warning', 'error', 'critical']
"""
allowed = ['debug', 'info', 'warning', 'error', 'critical']
level_lower = v.lower()
if level_lower not in allowed:
raise ValueError(f"Log level must be one of {allowed}")
return level_lower
class CacheConfig(BaseModel):
"""
Caching system configuration for the Ultimate MCP Server.
This configuration class defines parameters for the server's caching infrastructure,
which is used to store and retrieve frequently accessed data like LLM completions.
Effective caching significantly reduces API costs, improves response times, and
decreases load on provider APIs.
The caching system supports:
- In-memory caching with configurable entry limits
- Time-based expiration of cached entries
- Optional persistence to disk
- Fuzzy matching for similar but non-identical requests
When enabled, the caching layer sits between tool calls and provider APIs,
intercepting duplicate requests and returning cached results when appropriate.
This is especially valuable for expensive operations like complex LLM completions
that may be called multiple times with identical parameters.
Proper cache configuration can dramatically reduce operating costs in production
environments while improving response times for end users. The default settings
provide a reasonable balance for most use cases, but may need adjustment based
on traffic patterns and memory constraints.
"""
enabled: bool = Field(True, description="Whether caching is enabled")
ttl: int = Field(3600, description="Time-to-live for cache entries in seconds")
max_entries: int = Field(10000, description="Maximum number of entries to store in cache")
directory: Optional[str] = Field(None, description="Directory for cache persistence")
fuzzy_match: bool = Field(True, description="Whether to use fuzzy matching for cache keys")
class ProviderConfig(BaseModel):
"""
Configuration for an individual LLM provider connection.
This class encapsulates all settings needed to establish and maintain a connection
to a specific LLM provider service, such as OpenAI, Anthropic, or Gemini. Each provider
instance in the system has its own configuration derived from this class, allowing for
precise control over connection parameters, model selection, and authentication.
The configuration supports:
- Authentication via API keys (typically loaded from environment variables)
- Custom API endpoints via base_url overrides
- Organization-specific routing (for multi-tenant API services)
- Default model selection for when no model is explicitly specified
- Request timeout and token limit management
- Provider-specific parameters via the additional_params dictionary
Most provider settings can be loaded from either configuration files or environment
variables, with environment variables taking precedence. This allows for secure
management of sensitive credentials outside of versioned configuration files.
For security best practices:
- API keys should be specified via environment variables, not in configuration files
- Custom API endpoints with private deployments should use HTTPS
- Timeout values should be set appropriately to prevent hung connections
Each provider has its own instance of this configuration class, allowing for
independent configuration of multiple providers within the same server.
"""
enabled: bool = Field(True, description="Whether the provider is enabled")
api_key: Optional[str] = Field(None, description="API key for the provider (loaded via decouple)") # Updated description
base_url: Optional[str] = Field(None, description="Base URL for API requests (loaded via decouple/file)") # Updated description
organization: Optional[str] = Field(None, description="Organization identifier (loaded via decouple/file)") # Updated description
default_model: Optional[str] = Field(None, description="Default model to use (loaded via decouple/file)") # Updated description
max_tokens: Optional[int] = Field(None, description="Maximum tokens for completions")
timeout: Optional[float] = Field(30.0, description="Timeout for API requests in seconds")
additional_params: Dict[str, Any] = Field(default_factory=dict, description="Additional provider-specific parameters (loaded via decouple/file)") # Updated description
class ProvidersConfig(BaseModel):
"""
Centralized configuration for all supported LLM providers in the Ultimate MCP Server.
This class serves as a container for individual provider configurations, organizing
all supported provider settings in a structured hierarchy. It acts as the central
registry of provider configurations, making it easy to:
1. Access configuration for specific providers by name as attributes
2. Iterate over all provider configurations for initialization and status checks
3. Update provider settings through a consistent interface
4. Add new providers to the system in a structured way
Each provider has its own ProviderConfig instance as an attribute, named after the
provider (e.g., openai, anthropic, gemini). This allows for dot-notation access
to specific provider settings, providing a clean and intuitive API for configuration.
The available providers are pre-defined based on the supported integrations in the
system. Each provider's configuration follows the same structure but may have
different default values or additional parameters based on provider-specific needs.
When the configuration system loads settings from files or environment variables,
it updates these provider configurations directly, making them the definitive source
of provider settings throughout the application.
"""
openai: ProviderConfig = Field(default_factory=ProviderConfig, description="OpenAI provider configuration")
anthropic: ProviderConfig = Field(default_factory=ProviderConfig, description="Anthropic provider configuration")
deepseek: ProviderConfig = Field(default_factory=ProviderConfig, description="DeepSeek provider configuration")
gemini: ProviderConfig = Field(default_factory=ProviderConfig, description="Gemini provider configuration")
openrouter: ProviderConfig = Field(default_factory=ProviderConfig, description="OpenRouter provider configuration")
grok: ProviderConfig = Field(default_factory=ProviderConfig, description="Grok (xAI) provider configuration")
ollama: ProviderConfig = Field(default_factory=ProviderConfig, description="Ollama provider configuration")
class FilesystemProtectionConfig(BaseModel):
"""Configuration for filesystem protection heuristics."""
enabled: bool = Field(True, description="Enable protection checks for this operation")
max_files_threshold: int = Field(50, description="Trigger detailed check above this many files")
datetime_stddev_threshold_sec: float = Field(60 * 60 * 24 * 30, description="Timestamp variance threshold (seconds)")
file_type_variance_threshold: int = Field(5, description="File extension variance threshold")
max_stat_errors_pct: float = Field(10.0, description="Max percentage of failed stat calls allowed during check")
class FilesystemConfig(BaseModel):
"""Configuration for filesystem tools."""
allowed_directories: List[str] = Field(default_factory=list, description="List of absolute paths allowed for access")
file_deletion_protection: FilesystemProtectionConfig = Field(default_factory=FilesystemProtectionConfig, description="Settings for deletion protection heuristics")
file_modification_protection: FilesystemProtectionConfig = Field(default_factory=FilesystemProtectionConfig, description="Settings for modification protection heuristics (placeholder)")
default_encoding: str = Field("utf-8", description="Default encoding for text file operations")
max_read_size_bytes: int = Field(100 * 1024 * 1024, description="Maximum size for reading files") # 100MB example
class AgentMemoryConfig(BaseModel):
"""Configuration for Cognitive and Agent Memory tool."""
db_path: str = Field("unified_agent_memory.db", description="Path to the agent memory SQLite database")
max_text_length: int = Field(64000, description="Maximum length for text fields (e.g., content, reasoning)")
connection_timeout: float = Field(10.0, description="Database connection timeout in seconds")
max_working_memory_size: int = Field(20, description="Maximum number of items in working memory")
memory_decay_rate: float = Field(0.01, description="Decay rate for memory relevance per hour")
importance_boost_factor: float = Field(1.5, description="Multiplier for explicitly marked important memories")
similarity_threshold: float = Field(0.75, description="Default threshold for semantic similarity search")
max_semantic_candidates: int = Field(500, description="Maximum candidates to consider in semantic search before scoring")
# TTLs per level (in seconds)
ttl_working: int = Field(60 * 30, description="Default TTL for working memories (seconds)")
ttl_episodic: int = Field(60 * 60 * 24 * 7, description="Default TTL for episodic memories (seconds)")
ttl_semantic: int = Field(60 * 60 * 24 * 30, description="Default TTL for semantic memories (seconds)")
ttl_procedural: int = Field(60 * 60 * 24 * 90, description="Default TTL for procedural memories (seconds)")
# Embedding related (primarily for reference/defaults, service might override)
default_embedding_model: str = Field("text-embedding-3-small", description="Default embedding model identifier")
embedding_dimension: int = Field(1536, description="Expected dimension for the default embedding model")
# Multi-tool support (for agents that make multiple UMS calls per turn)
enable_batched_operations: bool = Field(True, description="Allow multiple tool calls per agent turn")
max_tools_per_batch: int = Field(20, description="Maximum number of tools that can be called in a single batch")
# SQLite Optimizations (Defined here, not env vars by default)
sqlite_pragmas: List[str] = Field(
default_factory=lambda: [
"PRAGMA journal_mode=DELETE",
"PRAGMA synchronous=NORMAL",
"PRAGMA foreign_keys=ON",
"PRAGMA temp_store=MEMORY",
"PRAGMA cache_size=-32000", # ~32MB cache
"PRAGMA mmap_size=2147483647", # Max mmap size
"PRAGMA busy_timeout=30000", # 30 seconds busy timeout
],
description="List of SQLite PRAGMA statements for optimization"
)
class ToolRegistrationConfig(BaseModel):
"""Configuration for tool registration."""
filter_enabled: bool = Field(False, description="Whether to filter which tools are registered")
included_tools: List[str] = Field(default_factory=list, description="List of tool names to include (empty means include all)")
excluded_tools: List[str] = Field(default_factory=list, description="List of tool names to exclude (takes precedence over included_tools)")
class SmartBrowserConfig(BaseModel):
"""Configuration specific to the Smart Browser tool."""
sb_state_key_b64: Optional[str] = Field(None, description="Base64 encoded AES key for state encryption (e.g., 'openssl rand -base64 32')")
sb_max_tabs: int = Field(5, description="Max concurrent tabs in the pool")
sb_tab_timeout: int = Field(300, description="Timeout for operations within a tab (seconds)")
sb_inactivity_timeout: int = Field(600, description="Browser inactivity shutdown timeout (seconds)")
headless_mode: bool = Field(True, description="Run browser in headless mode")
vnc_enabled: bool = Field(False, description="Enable VNC server for headful mode")
vnc_password: Optional[str] = Field(None, description="Password for VNC server (required if vnc_enabled=True)")
proxy_pool_str: str = Field("", description="Semicolon-separated list of proxy URLs (e.g., 'http://user:pass@host:port;socks5://host2:port2')")
proxy_allowed_domains_str: str = Field("*", description="Comma-separated domains allowed for proxy (e.g., '.google.com,.example.com', '*' for all)")
vault_allowed_paths_str: str = Field("secret/data/,kv/data/", description="Comma-separated allowed Vault path prefixes (e.g., 'kv/data/myapp/,secret/data/shared/')")
# Enhanced Locator Tunables
max_widgets: int = Field(300, description="Max interactive elements extracted for page map")
max_section_chars: int = Field(5000, description="Max chars for main text summary in page map")
dom_fp_limit: int = Field(20000, description="Max chars used for DOM fingerprint calculation")
llm_model_locator: str = Field("gpt-4o", description="LLM model used for locator fallback")
retry_after_fail: int = Field(1, description="Number of LLM locator retries after initial failure")
seq_cutoff: float = Field(0.72, description="SequenceMatcher cutoff for heuristic locator match")
area_min: int = Field(400, description="Minimum pixel area (width*height) for elements in page map")
high_risk_domains_set: Set[str] = Field( # Use set for direct comparison
default_factory=lambda: { # Use factory for mutable default
".google.com", ".facebook.com", ".linkedin.com", ".glassdoor.com",
".instagram.com", ".twitter.com", ".x.com", ".reddit.com", ".amazon.com",
".ebay.com", ".ticketmaster.com", ".cloudflare.com", ".datadome.co",
".perimeterx.net", ".recaptcha.net", ".hcaptcha.com",
},
description="Set of domains considered high-risk for bot detection (influences jitter timing)",
)
# Validator for high_risk_domains_set (ensures leading dot)
@field_validator('high_risk_domains_set', mode='before')
@classmethod
def normalize_high_risk_domains(cls, v):
if isinstance(v, str): # Allow comma-separated string input from env/file
domains = {d.strip().lower() for d in v.split(',') if d.strip()}
elif isinstance(v, (list, set)):
domains = {str(d).strip().lower() for d in v if str(d).strip()}
else:
raise ValueError("high_risk_domains_set must be a list, set, or comma-separated string")
# Ensure leading dot for all domains
normalized_domains = {d if d.startswith('.') else '.' + d for d in domains}
return normalized_domains
class GatewayConfig(BaseModel): # Inherit from BaseModel now
"""
Root configuration model for the entire Ultimate MCP Server system.
This class serves as the top-level configuration container, bringing together
all component-specific configurations into a unified structure. It represents the
complete configuration state of the Ultimate MCP Server and is the primary interface
for accessing configuration settings throughout the application.
The configuration is hierarchically organized into logical sections:
- server: Network, HTTP, and core server settings
- providers: LLM provider connections and credentials
- cache: Response caching behavior and persistence
- filesystem: Safe filesystem access rules and protection
- agent_memory: Settings for the agent memory and cognitive systems
- tool_registration: Controls for which tools are enabled
Additionally, it includes several top-level settings for paths and directories
that are used across multiple components of the system.
This configuration model is loaded through the config module's functions, which
handle merging settings from:
1. Default values defined in the model
2. Configuration files (YAML/JSON)
3. Environment variables
4. Command-line arguments (where applicable)
Throughout the application, this configuration is accessed through the get_config()
function, which returns a singleton instance of this class with all settings
properly loaded and validated.
Usage example:
```python
from ultimate_mcp_server.config import get_config
config = get_config()
# Access configuration sections
server_port = config.server.port
openai_api_key = config.providers.openai.api_key
# Access top-level settings
logs_dir = config.log_directory
```
"""
server: ServerConfig = Field(default_factory=ServerConfig)
providers: ProvidersConfig = Field(default_factory=ProvidersConfig)
cache: CacheConfig = Field(default_factory=CacheConfig)
filesystem: FilesystemConfig = Field(default_factory=FilesystemConfig)
agent_memory: AgentMemoryConfig = Field(default_factory=AgentMemoryConfig) # Added agent memory
tool_registration: ToolRegistrationConfig = Field(default_factory=ToolRegistrationConfig) # Added tool registration config
smart_browser: SmartBrowserConfig = Field(default_factory=SmartBrowserConfig)
default_provider: str = Field("openai", description="Default LLM provider to use if unspecified (e.g., 'openai', 'anthropic')")
storage_directory: str = Field("./storage", description="Directory for persistent storage")
log_directory: str = Field("./logs", description="Directory for log files")
prompt_templates_directory: str = Field("./prompt_templates", description="Directory containing prompt templates") # Added prompt dir
def expand_path(path: str) -> str:
"""
Expand a path string to resolve user home directories and environment variables.
This utility function takes a potentially relative path string that may contain
user home directory references (e.g., "~/logs") or environment variables
(e.g., "$HOME/data") and expands it to an absolute path.
The expansion process:
1. Expands user home directory (e.g., "~" → "/home/username")
2. Expands environment variables (e.g., "$VAR" → "value")
3. Converts to an absolute path (resolving relative paths)
Args:
path: A path string that may contain "~" or environment variables
Returns:
The expanded absolute path as a string
Example:
>>> expand_path("~/logs")
'/home/username/logs'
>>> expand_path("$DATA_DIR/cache")
'/var/data/cache' # Assuming $DATA_DIR is set to "/var/data"
"""
expanded = os.path.expanduser(path)
expanded = os.path.expandvars(expanded)
return os.path.abspath(expanded)
def find_config_file() -> Optional[str]:
"""
Find the first available configuration file from the list of default paths.
This function searches for configuration files in standard locations, following
a predefined priority order. It checks each potential location sequentially and
returns the path of the first valid configuration file found.
The search locations (defined in DEFAULT_CONFIG_PATHS) typically include:
- Current directory (e.g., "./gateway_config.yaml")
- User config directory (e.g., "~/.config/ultimate_mcp_server/config.yaml")
- User home directory (e.g., "~/.ultimate_mcp_server.yaml")
Each path is expanded using expand_path() before checking if it exists.
Returns:
The path to the first found configuration file, or None if no files exist
Note:
This function only verifies that the files exist, not that they have
valid content or format. Content validation happens during actual loading.
"""
for path in DEFAULT_CONFIG_PATHS:
try:
expanded_path = expand_path(path)
if os.path.isfile(expanded_path):
config_logger.debug(f"Found config file: {expanded_path}")
return expanded_path
except Exception as e:
config_logger.debug(f"Could not check path {path}: {e}")
config_logger.debug("No default config file found.")
return None
def load_config_from_file(path: str) -> Dict[str, Any]:
"""
Load configuration data from a YAML or JSON file.
This function reads and parses a configuration file into a Python dictionary.
It automatically detects the file format based on the file extension:
- .yaml/.yml: Parsed as YAML using PyYAML
- .json: Parsed as JSON using Python's built-in json module
The function performs several steps:
1. Expands the path to resolve any home directory (~/...) or environment variables
2. Verifies that the file exists
3. Determines the appropriate parser based on file extension
4. Reads and parses the file content
5. Returns the parsed configuration as a dictionary
Args:
path: Path to the configuration file (can be relative or use ~/... or $VAR/...)
Returns:
Dictionary containing the parsed configuration data
Raises:
FileNotFoundError: If the configuration file doesn't exist
ValueError: If the file has an unsupported format or contains invalid syntax
RuntimeError: If there are other errors reading the file
Note:
If the file is empty or contains "null" in YAML, an empty dictionary is
returned rather than None, ensuring consistent return type.
"""
path = expand_path(path)
if not os.path.isfile(path):
raise FileNotFoundError(f"Configuration file not found: {path}")
config_logger.debug(f"Loading configuration from file: {path}")
try:
with open(path, 'r', encoding='utf-8') as f:
if path.endswith(('.yaml', '.yml')):
config_data = yaml.safe_load(f)
elif path.endswith('.json'):
config_data = json.load(f)
else:
raise ValueError(f"Unsupported config format: {path}")
return config_data if config_data is not None else {}
except (yaml.YAMLError, json.JSONDecodeError) as e:
raise ValueError(f"Invalid format in {path}: {e}") from e
except Exception as e:
raise RuntimeError(f"Error reading {path}: {e}") from e
def load_config(
config_file_path: Optional[str] = None,
load_default_files: bool = True,
) -> GatewayConfig:
"""
Load, merge, and validate configuration from multiple sources with priority handling.
This function implements the complete configuration loading process, combining settings
from multiple sources according to their priority. It also handles path expansion,
directory creation, and validation of the resulting configuration.
Configuration Sources (in order of decreasing priority):
1. Environment variables (via decouple) - Use GATEWAY_* prefix or provider-specific vars
2. .env file variables (via decouple) - Same naming as environment variables
3. YAML/JSON configuration file - If explicitly specified or found in default locations
4. Default values defined in Pydantic models - Fallback when no other source specifies a value
Special handling:
- Provider API keys: Loaded from provider-specific environment variables
(e.g., OPENAI_API_KEY, ANTHROPIC_API_KEY)
- Directory paths: Automatically expanded and created if they don't exist
- Validation: All configuration values are validated against their Pydantic models
Args:
config_file_path: Optional explicit path to a configuration file to load.
If provided, this file must exist and be valid YAML/JSON.
load_default_files: Whether to search for configuration files in default locations
if config_file_path is not provided. Default: True
Returns:
GatewayConfig: A fully loaded and validated configuration object
Raises:
FileNotFoundError: If an explicitly specified config file doesn't exist
ValueError: If the config file has invalid format or content
RuntimeError: If other errors occur during loading
Example:
```python
# Load with defaults and environment variables
config = load_config()
# Load from a specific config file
config = load_config(config_file_path="path/to/custom_config.yaml")
# Load only from environment variables, ignoring config files
config = load_config(load_default_files=False)
```
"""
global _config
file_config_data = {}
# 1. Find and load config file (if specified or found)
chosen_file_path = None
if config_file_path:
chosen_file_path = expand_path(config_file_path)
elif load_default_files:
chosen_file_path = find_config_file()
if chosen_file_path and os.path.isfile(chosen_file_path):
try:
file_config_data = load_config_from_file(chosen_file_path)
config_logger.info(f"Loaded base configuration from: {chosen_file_path}")
except Exception as e:
config_logger.warning(f"Could not load config file {chosen_file_path}: {e}")
if config_file_path:
raise ValueError(f"Failed to load specified config: {chosen_file_path}") from e
elif config_file_path:
raise FileNotFoundError(f"Specified configuration file not found: {config_file_path}")
# 2. Initialize GatewayConfig from Pydantic defaults and file data
try:
# Ensure nested keys exist before validation if loading from potentially incomplete file
file_config_data.setdefault('server', {})
file_config_data.setdefault('providers', {})
file_config_data.setdefault('cache', {})
file_config_data.setdefault('filesystem', {})
file_config_data.setdefault('agent_memory', {})
file_config_data.setdefault('tool_registration', {})
file_config_data.setdefault('smart_browser', {})
loaded_config = GatewayConfig.model_validate(file_config_data)
except ValidationError as e:
config_logger.error("Configuration validation failed during file/default loading:")
config_logger.error(str(e))
config_logger.warning("Falling back to default configuration before applying env vars.")
loaded_config = GatewayConfig() # Fallback to defaults
# 3. Use decouple to load/override settings from .env/environment variables
# Decouple handles checking env vars and .env file automatically.
# --- Load Provider API Keys ---
provider_key_map = {
"openai": "OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"deepseek": "DEEPSEEK_API_KEY",
"gemini": "GEMINI_API_KEY",
"openrouter": "OPENROUTER_API_KEY",
"grok": "GROK_API_KEY",
}
for provider_name, env_var in provider_key_map.items():
provider_conf = getattr(loaded_config.providers, provider_name, None)
if provider_conf:
api_key_from_env = decouple_config.get(env_var, default=None)
if api_key_from_env:
if provider_conf.api_key and provider_conf.api_key != api_key_from_env:
config_logger.debug(f"Overriding API key for {provider_name} from env/'.env'.")
elif not provider_conf.api_key:
config_logger.debug(f"Setting API key for {provider_name} from env/'.env'.")
provider_conf.api_key = api_key_from_env
try:
# Use the default defined in GatewayConfig as the fallback if env/file doesn't specify
loaded_config.default_provider = decouple_config('DEFAULT_PROVIDER', default=loaded_config.default_provider)
config_logger.debug(f"Set default provider: {loaded_config.default_provider}")
except Exception as e:
config_logger.warning(f"Could not load default provider from env: {e}. Using default '{loaded_config.default_provider}'.")
# --- Load other Provider settings (base_url, default_model, org, specific headers) ---
# Example for OpenRouter specific headers
openrouter_conf = loaded_config.providers.openrouter
try:
# Use get() to avoid UndefinedValueError if not set
http_referer = decouple_config.get('OPENROUTER_HTTP_REFERER', default=None)
x_title = decouple_config.get('OPENROUTER_X_TITLE', default=None)
if http_referer:
openrouter_conf.additional_params['http_referer'] = http_referer
config_logger.debug("Setting OpenRouter http_referer from env/'.env'.")
if x_title:
openrouter_conf.additional_params['x_title'] = x_title
config_logger.debug("Setting OpenRouter x_title from env/'.env'.")
except Exception as e: # Catch potential decouple issues
config_logger.warning(f"Could not load optional OpenRouter headers from env: {e}")
# --- Load Ollama Provider Settings ---
ollama_conf = loaded_config.providers.ollama
try:
enabled_env = decouple_config.get('OLLAMA_ENABLED', default=None)
if enabled_env is not None:
ollama_conf.enabled = enabled_env.lower() == 'true'
config_logger.debug(f"Setting Ollama enabled from env/'.env': {ollama_conf.enabled}")
api_url_env = decouple_config.get('OLLAMA_API_URL', default=None)
if api_url_env:
ollama_conf.base_url = api_url_env
config_logger.debug(f"Setting Ollama base_url from env/'.env': {ollama_conf.base_url}")
default_model_env = decouple_config.get('OLLAMA_DEFAULT_MODEL', default=None)
if default_model_env:
ollama_conf.default_model = default_model_env
config_logger.debug(f"Setting Ollama default_model from env/'.env': {ollama_conf.default_model}")
request_timeout_env = decouple_config.get('OLLAMA_REQUEST_TIMEOUT', default=None)
if request_timeout_env is not None:
ollama_conf.timeout = int(request_timeout_env)
config_logger.debug(f"Setting Ollama timeout from env/'.env': {ollama_conf.timeout}")
except Exception as e:
config_logger.warning(f"Could not load optional Ollama settings from env: {e}")
# Example for generic provider settings like base_url, default_model, organization
for provider_name in ["openai", "anthropic", "deepseek", "gemini", "openrouter", "grok", "ollama"]:
provider_conf = getattr(loaded_config.providers, provider_name, None)
if provider_conf:
p_name_upper = provider_name.upper()
try:
base_url_env = decouple_config.get(f"{p_name_upper}_BASE_URL", default=None)
if base_url_env:
provider_conf.base_url = base_url_env
config_logger.debug(f"Setting {provider_name} base_url from env/'.env'.")
default_model_env = decouple_config.get(f"{p_name_upper}_DEFAULT_MODEL", default=None)
if default_model_env:
provider_conf.default_model = default_model_env
config_logger.debug(f"Setting {provider_name} default_model from env/'.env'.")
org_env = decouple_config.get(f"{p_name_upper}_ORGANIZATION", default=None)
if org_env:
provider_conf.organization = org_env
config_logger.debug(f"Setting {provider_name} organization from env/'.env'.")
except Exception as e:
config_logger.warning(f"Could not load optional settings for provider {provider_name} from env: {e}")
# --- Load Server Port ---
try:
server_port_env = decouple_config.get('GATEWAY_SERVER_PORT', default=None)
if server_port_env is not None:
loaded_config.server.port = decouple_config('GATEWAY_SERVER_PORT', cast=int)
config_logger.debug(f"Overriding server port from env: {loaded_config.server.port}")
except (ValueError, UndefinedValueError) as e:
config_logger.warning(f"Invalid or missing GATEWAY_SERVER_PORT env var: {e}. Using default/file value.")
# --- Load Filesystem Allowed Directories ---
allowed_dirs_env_var = "FILESYSTEM__ALLOWED_DIRECTORIES"
try:
allowed_dirs_env_value_str = decouple_config.get(allowed_dirs_env_var, default=None)
if allowed_dirs_env_value_str is not None:
try:
allowed_dirs_from_env = json.loads(allowed_dirs_env_value_str)
if isinstance(allowed_dirs_from_env, list):
if loaded_config.filesystem.allowed_directories:
config_logger.debug(f"Overriding filesystem.allowed_directories from env var {allowed_dirs_env_var}.")
else:
config_logger.debug(f"Setting filesystem.allowed_directories from env var {allowed_dirs_env_var}.")
loaded_config.filesystem.allowed_directories = allowed_dirs_from_env
else:
config_logger.warning(f"Env var {allowed_dirs_env_var} did not contain a valid JSON list. Value ignored.")
except json.JSONDecodeError:
config_logger.warning(f"Failed to parse JSON from env var {allowed_dirs_env_var}. Value: '{allowed_dirs_env_value_str}'. Ignoring env var.")
except Exception as e:
config_logger.error(f"Error processing env var {allowed_dirs_env_var}: {e}", exc_info=True)
# --- Load Agent Memory Settings ---
agent_mem_conf = loaded_config.agent_memory # Get the config object
try:
agent_mem_conf.db_path = decouple_config('AGENT_MEMORY_DB_PATH', default=agent_mem_conf.db_path)
agent_mem_conf.max_text_length = decouple_config('AGENT_MEMORY_MAX_TEXT_LENGTH', default=agent_mem_conf.max_text_length, cast=int)
agent_mem_conf.connection_timeout = decouple_config('AGENT_MEMORY_CONNECTION_TIMEOUT', default=agent_mem_conf.connection_timeout, cast=float)
agent_mem_conf.max_working_memory_size = decouple_config('AGENT_MEMORY_MAX_WORKING_SIZE', default=agent_mem_conf.max_working_memory_size, cast=int)
# Load TTLs
agent_mem_conf.ttl_working = decouple_config('AGENT_MEMORY_TTL_WORKING', default=agent_mem_conf.ttl_working, cast=int)
agent_mem_conf.ttl_episodic = decouple_config('AGENT_MEMORY_TTL_EPISODIC', default=agent_mem_conf.ttl_episodic, cast=int)
agent_mem_conf.ttl_semantic = decouple_config('AGENT_MEMORY_TTL_SEMANTIC', default=agent_mem_conf.ttl_semantic, cast=int)
agent_mem_conf.ttl_procedural = decouple_config('AGENT_MEMORY_TTL_PROCEDURAL', default=agent_mem_conf.ttl_procedural, cast=int)
# Load other parameters
agent_mem_conf.memory_decay_rate = decouple_config('AGENT_MEMORY_DECAY_RATE', default=agent_mem_conf.memory_decay_rate, cast=float)
agent_mem_conf.importance_boost_factor = decouple_config('AGENT_MEMORY_IMPORTANCE_BOOST', default=agent_mem_conf.importance_boost_factor, cast=float)
agent_mem_conf.similarity_threshold = decouple_config('AGENT_MEMORY_SIMILARITY_THRESHOLD', default=agent_mem_conf.similarity_threshold, cast=float)
agent_mem_conf.max_semantic_candidates = decouple_config('AGENT_MEMORY_MAX_SEMANTIC_CANDIDATES', default=agent_mem_conf.max_semantic_candidates, cast=int)
# Load embedding defaults (mainly for reference)
agent_mem_conf.default_embedding_model = decouple_config('AGENT_MEMORY_DEFAULT_EMBEDDING_MODEL', default=agent_mem_conf.default_embedding_model)
agent_mem_conf.embedding_dimension = decouple_config('AGENT_MEMORY_EMBEDDING_DIMENSION', default=agent_mem_conf.embedding_dimension, cast=int)
# Load multi-tool support settings
def _cast_bool(value):
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ('true', '1', 'yes', 'on')
return bool(value)
agent_mem_conf.enable_batched_operations = decouple_config('AGENT_MEMORY_ENABLE_BATCHED_OPERATIONS', default=agent_mem_conf.enable_batched_operations, cast=_cast_bool)
agent_mem_conf.max_tools_per_batch = decouple_config('AGENT_MEMORY_MAX_TOOLS_PER_BATCH', default=agent_mem_conf.max_tools_per_batch, cast=int)
config_logger.debug("Loaded agent memory settings from env/'.env' or defaults.")
except (ValueError, UndefinedValueError) as e:
config_logger.warning(f"Issue loading agent memory settings from env: {e}. Using Pydantic defaults.")
except Exception as e:
config_logger.error(f"Unexpected error loading agent memory settings: {e}", exc_info=True)
# --- Load Prompt Templates Directory ---
try:
loaded_config.prompt_templates_directory = decouple_config('GATEWAY_PROMPT_TEMPLATES_DIR', default=loaded_config.prompt_templates_directory)
config_logger.debug(f"Set prompt templates directory: {loaded_config.prompt_templates_directory}")
except Exception as e:
config_logger.warning(f"Could not load prompt templates directory from env: {e}")
# --- Load Cache Directory ---
try:
cache_dir_env = decouple_config('GATEWAY_CACHE_DIR', default=None) # Changed env var name for clarity
if cache_dir_env:
loaded_config.cache.directory = cache_dir_env
config_logger.debug(f"Set cache directory from env: {loaded_config.cache.directory}")
except Exception as e:
config_logger.warning(f"Could not load cache directory from env: {e}")
sb_conf = loaded_config.smart_browser # Get the config object
try:
# State Key (already added previously)
sb_conf.sb_state_key_b64 = decouple_config('SB_STATE_KEY', default=sb_conf.sb_state_key_b64)
if sb_conf.sb_state_key_b64:
config_logger.debug("Loaded SB_STATE_KEY from env/'.env' or file.")
else:
config_logger.info("Smart Browser state encryption disabled (SB_STATE_KEY not found).")
# Other SB settings
sb_conf.sb_max_tabs = decouple_config('SB_MAX_TABS', default=sb_conf.sb_max_tabs, cast=int)
sb_conf.sb_tab_timeout = decouple_config('SB_TAB_TIMEOUT', default=sb_conf.sb_tab_timeout, cast=int)
sb_conf.sb_inactivity_timeout = decouple_config('SB_INACTIVITY_TIMEOUT', default=sb_conf.sb_inactivity_timeout, cast=int)
sb_conf.headless_mode = decouple_config('SB_HEADLESS_MODE', default=sb_conf.headless_mode, cast=bool) # Use SB_ prefix
sb_conf.vnc_enabled = decouple_config('SB_VNC_ENABLED', default=sb_conf.vnc_enabled, cast=bool) # Use SB_ prefix
sb_conf.vnc_password = decouple_config('SB_VNC_PASSWORD', default=sb_conf.vnc_password) # Use SB_ prefix
sb_conf.proxy_pool_str = decouple_config('SB_PROXY_POOL', default=sb_conf.proxy_pool_str) # Use SB_ prefix
sb_conf.proxy_allowed_domains_str = decouple_config('SB_PROXY_ALLOWED_DOMAINS', default=sb_conf.proxy_allowed_domains_str) # Use SB_ prefix
sb_conf.vault_allowed_paths_str = decouple_config('SB_VAULT_ALLOWED_PATHS', default=sb_conf.vault_allowed_paths_str) # Use SB_ prefix
# Locator Tunables
sb_conf.max_widgets = decouple_config('SB_MAX_WIDGETS', default=sb_conf.max_widgets, cast=int)
sb_conf.max_section_chars = decouple_config('SB_MAX_SECTION_CHARS', default=sb_conf.max_section_chars, cast=int)
sb_conf.dom_fp_limit = decouple_config('SB_DOM_FP_LIMIT', default=sb_conf.dom_fp_limit, cast=int)
sb_conf.llm_model_locator = decouple_config('SB_LLM_MODEL_LOCATOR', default=sb_conf.llm_model_locator)
sb_conf.retry_after_fail = decouple_config('SB_RETRY_AFTER_FAIL', default=sb_conf.retry_after_fail, cast=int)
sb_conf.seq_cutoff = decouple_config('SB_SEQ_CUTOFF', default=sb_conf.seq_cutoff, cast=float)
sb_conf.area_min = decouple_config('SB_AREA_MIN', default=sb_conf.area_min, cast=int)
# High Risk Domains (Load as string, validator handles conversion)
high_risk_domains_env = decouple_config('SB_HIGH_RISK_DOMAINS', default=None)
if high_risk_domains_env is not None:
# Let the validator handle parsing and normalization
sb_conf.high_risk_domains_set = high_risk_domains_env # Pass the raw string
config_logger.debug("Loaded Smart Browser settings from env/'.env' or defaults.")
except (ValueError, UndefinedValueError) as e:
config_logger.warning(f"Issue loading Smart Browser settings from env: {e}. Using defaults/file values.")
except Exception as e:
config_logger.error(f"Unexpected error loading Smart Browser settings: {e}", exc_info=True)
# --- Expand paths ---
try:
# Expand core directories
loaded_config.storage_directory = expand_path(loaded_config.storage_directory)
loaded_config.log_directory = expand_path(loaded_config.log_directory)
loaded_config.prompt_templates_directory = expand_path(loaded_config.prompt_templates_directory) # Expand new dir
# Expand cache directory if set
if loaded_config.cache.directory:
loaded_config.cache.directory = expand_path(loaded_config.cache.directory)
# Expand agent memory DB path (assuming it's a relative path)
# Check if it's already absolute to avoid issues
if not os.path.isabs(loaded_config.agent_memory.db_path):
# Place it relative to storage_directory by default? Or workspace root? Let's choose storage.
db_in_storage = Path(loaded_config.storage_directory) / loaded_config.agent_memory.db_path
loaded_config.agent_memory.db_path = str(db_in_storage.resolve())
config_logger.debug(f"Expanded agent memory db path to: {loaded_config.agent_memory.db_path}")
# Expand allowed filesystem directories
expanded_allowed_dirs = []
for d in loaded_config.filesystem.allowed_directories:
if isinstance(d, str):
expanded_allowed_dirs.append(expand_path(d))
else:
config_logger.warning(f"Ignoring non-string entry in allowed_directories: {d!r}")
loaded_config.filesystem.allowed_directories = expanded_allowed_dirs
except Exception as e:
config_logger.error(f"Error expanding configured paths: {e}", exc_info=True)
# --- Ensure critical directories exist ---
try:
# Use pathlib for consistency
Path(loaded_config.storage_directory).mkdir(parents=True, exist_ok=True)
Path(loaded_config.log_directory).mkdir(parents=True, exist_ok=True)
Path(loaded_config.prompt_templates_directory).mkdir(parents=True, exist_ok=True) # Ensure prompt dir exists
if loaded_config.cache.enabled and loaded_config.cache.directory:
Path(loaded_config.cache.directory).mkdir(parents=True, exist_ok=True)
# Ensure Agent Memory DB directory exists
db_dir = Path(loaded_config.agent_memory.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
config_logger.error(f"Failed to create necessary directories: {e}")
_config = loaded_config
config_logger.debug(f"Effective allowed directories: {loaded_config.filesystem.allowed_directories}")
config_logger.debug(f"Effective Agent Memory DB path: {loaded_config.agent_memory.db_path}")
config_logger.debug(f"Effective Prompt Templates directory: {loaded_config.prompt_templates_directory}")
return _config
def get_config() -> GatewayConfig:
"""
Retrieve the globally cached configuration instance or load a new one if needed.
This function serves as the primary entry point for accessing the server's configuration
throughout the application. It implements a singleton pattern with on-demand loading and
optional forced reloading to ensure consistent configuration access with minimal overhead.
Key behaviors:
- CACHING: Returns a previously loaded configuration instance when available
- LAZY LOADING: Loads configuration on first access rather than at import time
- FORCE RELOAD: Supports reloading via the GATEWAY_FORCE_CONFIG_RELOAD environment variable
- COMPLETE: Includes settings from environment variables, config files, and defaults
- VALIDATED: Uses Pydantic models to ensure all configuration values are valid
The configuration loading follows this priority order:
1. Environment variables (highest priority)
2. .env file values
3. Configuration file settings
4. Pydantic default values (lowest priority)
Returns:
GatewayConfig: The validated configuration instance with all settings applied.
Raises:
RuntimeError: If configuration loading fails for any reason (invalid settings,
missing required values, inaccessible files, etc.)
Example usage:
```python
from ultimate_mcp_server.config import get_config
# Access server configuration
config = get_config()
server_port = config.server.port
# Access provider API keys
openai_api_key = config.providers.openai.api_key
# Check if a feature is enabled
if config.cache.enabled:
# Use caching functionality
pass
```
"""
global _config
# Use decouple directly here for the reload flag check
force_reload = decouple_config.get("GATEWAY_FORCE_CONFIG_RELOAD", default='false').lower() == 'true'
if _config is None or force_reload:
try:
_config = load_config() # load_config now handles internal state update
except Exception as e:
config_logger.critical(f"Failed to load configuration: {e}", exc_info=True)
raise RuntimeError("Configuration could not be loaded.") from e
if _config is None: # Should not happen if load_config succeeded or raised
raise RuntimeError("Configuration is None after loading attempt.")
return _config
def get_config_as_dict() -> Dict[str, Any]:
"""
Convert the current configuration to a plain Python dictionary.
This function retrieves the current configuration using get_config() and
converts the Pydantic model instance to a standard Python dictionary. This is
useful for situations where you need a serializable representation of the
configuration, such as:
- Sending configuration over an API
- Logging configuration values
- Debugging configuration state
- Comparing configurations
The conversion preserves the full nested structure of the configuration,
with all Pydantic models converted to their dictionary representations.
Returns:
A nested dictionary containing all configuration values
Raises:
Any exceptions that might be raised by get_config()
Example:
```python
# Get dictionary representation of config for logging
config_dict = get_config_as_dict()
logger.debug(f"Current server configuration: {config_dict['server']}")
# Use with JSON serialization
import json
config_json = json.dumps(get_config_as_dict())
```
"""
config_obj = get_config()
return config_obj.model_dump()
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/analytics/reporting.py:
--------------------------------------------------------------------------------
```python
"""Reporting and visualization for Ultimate MCP Server analytics."""
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from ultimate_mcp_server.services.analytics.metrics import get_metrics_tracker
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
try:
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import pandas as pd
PLOTTING_AVAILABLE = True
except ImportError:
PLOTTING_AVAILABLE = False
class AnalyticsReporting:
"""Provides comprehensive reporting and visualization capabilities for Ultimate MCP Server analytics.
This class offers tools to generate detailed usage, cost, and provider-specific reports
in various formats (JSON, HTML, Markdown) with optional data visualizations. It serves
as the primary interface for extracting actionable insights from the server's operational
metrics and presenting them in human-readable formats.
Features:
- Multiple report types: usage reports, provider-specific analysis, and cost breakdowns
- Multiple output formats: JSON, HTML, and Markdown
- Optional data visualizations using matplotlib (when available)
- Customizable reporting periods
- Persistent report storage
The reporting system uses the metrics tracked by the MetricsTracker to generate these
reports, providing insights into token usage, costs, request patterns, cache efficiency,
and provider/model distribution.
Usage:
# Create a reporting instance
reporter = AnalyticsReporting()
# Generate a usage report for the last 7 days
report_path = reporter.generate_usage_report(days=7, output_format="html")
# Generate a cost analysis report for the last month
cost_report = reporter.generate_cost_report(days=30, output_format="json")
# Generate a provider-specific report
provider_report = reporter.generate_provider_report(
provider="anthropic",
days=14,
output_format="markdown"
)
"""
def __init__(
self,
reports_dir: Optional[Union[str, Path]] = None,
include_plots: bool = True
):
"""Initialize the analytics reporting.
Args:
reports_dir: Directory for reports storage
include_plots: Whether to include plots in reports
"""
# Set reports directory
if reports_dir:
self.reports_dir = Path(reports_dir)
else:
self.reports_dir = Path.home() / ".ultimate" / "reports"
# Create reports directory if it doesn't exist
self.reports_dir.mkdir(parents=True, exist_ok=True)
# Plotting settings
self.include_plots = include_plots and PLOTTING_AVAILABLE
# Get metrics tracker
self.metrics = get_metrics_tracker()
logger.info(
f"Analytics reporting initialized (dir: {self.reports_dir}, plots: {self.include_plots})",
emoji_key="analytics"
)
def generate_usage_report(
self,
days: int = 7,
output_format: str = "json",
include_plots: Optional[bool] = None
) -> Union[Dict[str, Any], str, Path]:
"""Generate a usage report.
Args:
days: Number of days to include in the report
output_format: Output format (json, html, markdown)
include_plots: Whether to include plots (overrides default setting)
Returns:
Report data or path to report file
"""
# Get metrics
metrics = self.metrics.get_stats()
# Determine plotting
do_plots = self.include_plots if include_plots is None else include_plots
do_plots = do_plots and PLOTTING_AVAILABLE
# Build report data
report_data = {
"generated_at": datetime.now().isoformat(),
"period": f"{days} days",
"general": metrics["general"],
"cache": metrics["cache"],
"top_providers": metrics["top_providers"],
"top_models": metrics["top_models"],
"daily_usage": [
day for day in metrics["daily_usage"]
if (datetime.now() - datetime.strptime(day["date"], "%Y-%m-%d")).days < days
],
}
# Generate report based on format
if output_format == "json":
# JSON format
report_path = self.reports_dir / f"usage_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_path, "w") as f:
json.dump(report_data, f, indent=2)
logger.info(
f"Generated JSON usage report: {report_path}",
emoji_key="analytics"
)
return report_path
elif output_format == "html":
# HTML format (with optional plots)
report_path = self.reports_dir / f"usage_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
# Generate plots if requested
plot_paths = []
if do_plots:
plot_paths = self._generate_report_plots(report_data, days)
# Generate HTML
html = self._generate_html_report(report_data, plot_paths)
with open(report_path, "w") as f:
f.write(html)
logger.info(
f"Generated HTML usage report: {report_path}",
emoji_key="analytics"
)
return report_path
elif output_format == "markdown":
# Markdown format
report_path = self.reports_dir / f"usage_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
# Generate plots if requested
plot_paths = []
if do_plots:
plot_paths = self._generate_report_plots(report_data, days)
# Generate Markdown
markdown = self._generate_markdown_report(report_data, plot_paths)
with open(report_path, "w") as f:
f.write(markdown)
logger.info(
f"Generated Markdown usage report: {report_path}",
emoji_key="analytics"
)
return report_path
else:
# Default to raw data
logger.warning(
f"Unknown output format: {output_format}, returning raw data",
emoji_key="warning"
)
return report_data
def generate_provider_report(
self,
provider: str,
days: int = 7,
output_format: str = "json",
include_plots: Optional[bool] = None
) -> Union[Dict[str, Any], str, Path]:
"""Generate a provider-specific report.
Args:
provider: Provider name
days: Number of days to include in the report
output_format: Output format (json, html, markdown)
include_plots: Whether to include plots (overrides default setting)
Returns:
Report data or path to report file
"""
# Get metrics
metrics = self.metrics.get_stats()
# Check if provider exists
if provider not in metrics["providers"]:
logger.error(
f"Unknown provider: {provider}",
emoji_key="error"
)
return {"error": f"Unknown provider: {provider}"}
# Determine plotting
do_plots = self.include_plots if include_plots is None else include_plots
do_plots = do_plots and PLOTTING_AVAILABLE
# Extract provider-specific data
provider_data = metrics["providers"][provider]
provider_models = {
model: data
for model, data in metrics["models"].items()
if model.startswith(provider) or model.lower().startswith(provider.lower())
}
# Collect daily usage for this provider (approximate)
provider_share = provider_data["tokens"] / metrics["general"]["tokens_total"] if metrics["general"]["tokens_total"] > 0 else 0
provider_daily = [
{
"date": day["date"],
"tokens": int(day["tokens"] * provider_share), # Approximate
"cost": day["cost"] * provider_share, # Approximate
}
for day in metrics["daily_usage"]
if (datetime.now() - datetime.strptime(day["date"], "%Y-%m-%d")).days < days
]
# Build report data
report_data = {
"generated_at": datetime.now().isoformat(),
"period": f"{days} days",
"provider": provider,
"stats": provider_data,
"models": provider_models,
"daily_usage": provider_daily,
"percentage_of_total": provider_share * 100,
}
# Generate report based on format
if output_format == "json":
# JSON format
report_path = self.reports_dir / f"{provider}_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_path, "w") as f:
json.dump(report_data, f, indent=2)
logger.info(
f"Generated JSON provider report: {report_path}",
emoji_key="analytics"
)
return report_path
elif output_format == "html":
# HTML format (with optional plots)
report_path = self.reports_dir / f"{provider}_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
# Generate plots if requested
plot_paths = []
if do_plots:
plot_paths = self._generate_provider_plots(report_data, provider, days)
# Generate HTML
html = self._generate_html_provider_report(report_data, plot_paths)
with open(report_path, "w") as f:
f.write(html)
logger.info(
f"Generated HTML provider report: {report_path}",
emoji_key="analytics"
)
return report_path
elif output_format == "markdown":
# Markdown format
report_path = self.reports_dir / f"{provider}_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
# Generate plots if requested
plot_paths = []
if do_plots:
plot_paths = self._generate_provider_plots(report_data, provider, days)
# Generate Markdown
markdown = self._generate_markdown_provider_report(report_data, plot_paths)
with open(report_path, "w") as f:
f.write(markdown)
logger.info(
f"Generated Markdown provider report: {report_path}",
emoji_key="analytics"
)
return report_path
else:
# Default to raw data
logger.warning(
f"Unknown output format: {output_format}, returning raw data",
emoji_key="warning"
)
return report_data
def generate_cost_report(
self,
days: int = 30,
output_format: str = "json",
include_plots: Optional[bool] = None
) -> Union[Dict[str, Any], str, Path]:
"""Generate a cost analysis report.
Args:
days: Number of days to include in the report
output_format: Output format (json, html, markdown)
include_plots: Whether to include plots (overrides default setting)
Returns:
Report data or path to report file
"""
# Get metrics
metrics = self.metrics.get_stats()
# Determine plotting
do_plots = self.include_plots if include_plots is None else include_plots
do_plots = do_plots and PLOTTING_AVAILABLE
# Process daily cost data
daily_costs = [
{
"date": day["date"],
"cost": day["cost"],
}
for day in metrics["daily_usage"]
if (datetime.now() - datetime.strptime(day["date"], "%Y-%m-%d")).days < days
]
# Calculate cost by provider
provider_costs = [
{
"provider": provider,
"cost": data["cost"],
"percentage": data["cost"] / metrics["general"]["cost_total"] * 100 if metrics["general"]["cost_total"] > 0 else 0,
}
for provider, data in metrics["providers"].items()
]
provider_costs.sort(key=lambda x: x["cost"], reverse=True)
# Calculate cost by model
model_costs = [
{
"model": model,
"cost": data["cost"],
"percentage": data["cost"] / metrics["general"]["cost_total"] * 100 if metrics["general"]["cost_total"] > 0 else 0,
}
for model, data in metrics["models"].items()
]
model_costs.sort(key=lambda x: x["cost"], reverse=True)
# Calculate cost efficiency (tokens per dollar)
cost_efficiency = [
{
"model": model,
"tokens_per_dollar": data["tokens"] / data["cost"] if data["cost"] > 0 else 0,
"tokens": data["tokens"],
"cost": data["cost"],
}
for model, data in metrics["models"].items()
if data["cost"] > 0
]
cost_efficiency.sort(key=lambda x: x["tokens_per_dollar"], reverse=True)
# Build report data
report_data = {
"generated_at": datetime.now().isoformat(),
"period": f"{days} days",
"total_cost": metrics["general"]["cost_total"],
"cache_savings": metrics["cache"]["saved_cost"],
"daily_costs": daily_costs,
"provider_costs": provider_costs,
"model_costs": model_costs,
"cost_efficiency": cost_efficiency,
}
# Generate report based on format
if output_format == "json":
# JSON format
report_path = self.reports_dir / f"cost_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_path, "w") as f:
json.dump(report_data, f, indent=2)
logger.info(
f"Generated JSON cost report: {report_path}",
emoji_key="analytics"
)
return report_path
elif output_format == "html":
# HTML format (with optional plots)
report_path = self.reports_dir / f"cost_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
# Generate plots if requested
plot_paths = []
if do_plots:
plot_paths = self._generate_cost_plots(report_data, days)
# Generate HTML
html = self._generate_html_cost_report(report_data, plot_paths)
with open(report_path, "w") as f:
f.write(html)
logger.info(
f"Generated HTML cost report: {report_path}",
emoji_key="analytics"
)
return report_path
elif output_format == "markdown":
# Markdown format
report_path = self.reports_dir / f"cost_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
# Generate plots if requested
plot_paths = []
if do_plots:
plot_paths = self._generate_cost_plots(report_data, days)
# Generate Markdown
markdown = self._generate_markdown_cost_report(report_data, plot_paths)
with open(report_path, "w") as f:
f.write(markdown)
logger.info(
f"Generated Markdown cost report: {report_path}",
emoji_key="analytics"
)
return report_path
else:
# Default to raw data
logger.warning(
f"Unknown output format: {output_format}, returning raw data",
emoji_key="warning"
)
return report_data
def _generate_report_plots(
self,
report_data: Dict[str, Any],
days: int
) -> List[str]:
"""Generate plots for a usage report.
Args:
report_data: Report data
days: Number of days to include
Returns:
List of plot file paths
"""
if not PLOTTING_AVAILABLE:
return []
plot_paths = []
# Create daily usage plot
if report_data["daily_usage"]:
try:
# Prepare data
df = pd.DataFrame(report_data["daily_usage"])
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values("date")
# Create plot directory
plot_dir = self.reports_dir / "plots"
plot_dir.mkdir(exist_ok=True)
# Create plot
plt.figure(figsize=(10, 6))
plt.plot(df["date"], df["tokens"], marker="o", linestyle="-", linewidth=2)
plt.title(f"Daily Token Usage (Last {days} Days)")
plt.xlabel("Date")
plt.ylabel("Tokens")
plt.grid(True, alpha=0.3)
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=max(1, days // 7)))
plt.xticks(rotation=45)
plt.tight_layout()
# Save plot
plot_path = str(plot_dir / f"daily_usage_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(plot_path)
plt.close()
plot_paths.append(plot_path)
except Exception as e:
logger.error(
f"Failed to generate daily usage plot: {str(e)}",
emoji_key="error"
)
# Create provider distribution plot
if report_data["top_providers"]:
try:
# Prepare data
providers = [p["provider"] for p in report_data["top_providers"]]
percentages = [p["percentage"] * 100 for p in report_data["top_providers"]]
# Create plot
plt.figure(figsize=(8, 8))
plt.pie(percentages, labels=providers, autopct="%1.1f%%", startangle=90, shadow=True)
plt.axis("equal")
plt.title("Token Usage by Provider")
plt.tight_layout()
# Save plot
plot_path = str(plot_dir / f"provider_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(plot_path)
plt.close()
plot_paths.append(plot_path)
except Exception as e:
logger.error(
f"Failed to generate provider distribution plot: {str(e)}",
emoji_key="error"
)
# Create model distribution plot
if report_data["top_models"]:
try:
# Prepare data
models = [m["model"] for m in report_data["top_models"]]
percentages = [m["percentage"] * 100 for m in report_data["top_models"]]
# Create plot
plt.figure(figsize=(10, 6))
plt.bar(models, percentages)
plt.title("Token Usage by Model")
plt.xlabel("Model")
plt.ylabel("Percentage of Total Tokens")
plt.xticks(rotation=45, ha="right")
plt.grid(True, alpha=0.3, axis="y")
plt.tight_layout()
# Save plot
plot_path = str(plot_dir / f"model_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(plot_path)
plt.close()
plot_paths.append(plot_path)
except Exception as e:
logger.error(
f"Failed to generate model distribution plot: {str(e)}",
emoji_key="error"
)
return plot_paths
def _generate_provider_plots(
self,
report_data: Dict[str, Any],
provider: str,
days: int
) -> List[str]:
"""Generate plots for a provider report.
Args:
report_data: Report data
provider: Provider name
days: Number of days to include
Returns:
List of plot file paths
"""
if not PLOTTING_AVAILABLE:
return []
plot_paths = []
# Create plot directory
plot_dir = self.reports_dir / "plots"
plot_dir.mkdir(exist_ok=True)
# Create daily usage plot
if report_data["daily_usage"]:
try:
# Prepare data
df = pd.DataFrame(report_data["daily_usage"])
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values("date")
# Create plot
plt.figure(figsize=(10, 6))
plt.plot(df["date"], df["tokens"], marker="o", linestyle="-", linewidth=2)
plt.title(f"{provider} Daily Token Usage (Last {days} Days)")
plt.xlabel("Date")
plt.ylabel("Tokens")
plt.grid(True, alpha=0.3)
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=max(1, days // 7)))
plt.xticks(rotation=45)
plt.tight_layout()
# Save plot
plot_path = str(plot_dir / f"{provider}_daily_usage_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(plot_path)
plt.close()
plot_paths.append(plot_path)
except Exception as e:
logger.error(
f"Failed to generate provider daily usage plot: {str(e)}",
emoji_key="error"
)
# Create model distribution plot
if report_data["models"]:
try:
# Prepare data
models = list(report_data["models"].keys())
tokens = [data["tokens"] for _, data in report_data["models"].items()]
# Create plot
plt.figure(figsize=(10, 6))
plt.bar(models, tokens)
plt.title(f"{provider} Token Usage by Model")
plt.xlabel("Model")
plt.ylabel("Tokens")
plt.xticks(rotation=45, ha="right")
plt.grid(True, alpha=0.3, axis="y")
plt.tight_layout()
# Save plot
plot_path = str(plot_dir / f"{provider}_model_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(plot_path)
plt.close()
plot_paths.append(plot_path)
except Exception as e:
logger.error(
f"Failed to generate provider model distribution plot: {str(e)}",
emoji_key="error"
)
return plot_paths
def _generate_cost_plots(
self,
report_data: Dict[str, Any],
days: int
) -> List[str]:
"""Generate plots for a cost report.
Args:
report_data: Report data
days: Number of days to include
Returns:
List of plot file paths
"""
if not PLOTTING_AVAILABLE:
return []
plot_paths = []
# Create plot directory
plot_dir = self.reports_dir / "plots"
plot_dir.mkdir(exist_ok=True)
# Create daily cost plot
if report_data["daily_costs"]:
try:
# Prepare data
df = pd.DataFrame(report_data["daily_costs"])
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values("date")
# Create plot
plt.figure(figsize=(10, 6))
plt.plot(df["date"], df["cost"], marker="o", linestyle="-", linewidth=2)
plt.title(f"Daily Cost (Last {days} Days)")
plt.xlabel("Date")
plt.ylabel("Cost ($)")
plt.grid(True, alpha=0.3)
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=max(1, days // 7)))
plt.xticks(rotation=45)
plt.tight_layout()
# Save plot
plot_path = str(plot_dir / f"daily_cost_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(plot_path)
plt.close()
plot_paths.append(plot_path)
except Exception as e:
logger.error(
f"Failed to generate daily cost plot: {str(e)}",
emoji_key="error"
)
# Create provider cost distribution plot
if report_data["provider_costs"]:
try:
# Prepare data
providers = [p["provider"] for p in report_data["provider_costs"]]
costs = [p["cost"] for p in report_data["provider_costs"]]
# Create plot
plt.figure(figsize=(8, 8))
plt.pie(costs, labels=providers, autopct="%1.1f%%", startangle=90, shadow=True)
plt.axis("equal")
plt.title("Cost by Provider")
plt.tight_layout()
# Save plot
plot_path = str(plot_dir / f"provider_cost_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(plot_path)
plt.close()
plot_paths.append(plot_path)
except Exception as e:
logger.error(
f"Failed to generate provider cost distribution plot: {str(e)}",
emoji_key="error"
)
# Create cost efficiency plot
if report_data["cost_efficiency"]:
try:
# Prepare data (limit to top 10 for readability)
top_efficient = report_data["cost_efficiency"][:10]
models = [m["model"] for m in top_efficient]
efficiency = [m["tokens_per_dollar"] for m in top_efficient]
# Create plot
plt.figure(figsize=(10, 6))
plt.bar(models, efficiency)
plt.title("Cost Efficiency (Tokens per Dollar)")
plt.xlabel("Model")
plt.ylabel("Tokens per Dollar")
plt.xticks(rotation=45, ha="right")
plt.grid(True, alpha=0.3, axis="y")
plt.tight_layout()
# Save plot
plot_path = str(plot_dir / f"cost_efficiency_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(plot_path)
plt.close()
plot_paths.append(plot_path)
except Exception as e:
logger.error(
f"Failed to generate cost efficiency plot: {str(e)}",
emoji_key="error"
)
return plot_paths
def _generate_html_report(self, report_data: Dict[str, Any], plot_paths: List[str]) -> str:
"""Generate an HTML usage report.
Args:
report_data: Report data
plot_paths: List of plot file paths
Returns:
HTML report content
"""
# Basic HTML template
html = f"""<!DOCTYPE html>
<html>
<head>
<title>Ultimate MCP Server Usage Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
h1, h2, h3 {{ color: #333; }}
.container {{ max-width: 1200px; margin: 0 auto; }}
.card {{ background: #f9f9f9; border-radius: 5px; padding: 20px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
table {{ width: 100%; border-collapse: collapse; }}
th, td {{ padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }}
th {{ background-color: #f2f2f2; }}
.stat {{ font-size: 24px; font-weight: bold; color: #0066cc; }}
.plot {{ max-width: 100%; height: auto; margin: 20px 0; }}
</style>
</head>
<body>
<div class="container">
<h1>Ultimate MCP Server Usage Report</h1>
<p>Generated at: {report_data["generated_at"]}</p>
<p>Period: {report_data["period"]}</p>
<div class="card">
<h2>General Statistics</h2>
<table>
<tr>
<td>Uptime</td>
<td class="stat">{report_data["general"]["uptime_human"]}</td>
</tr>
<tr>
<td>Total Requests</td>
<td class="stat">{report_data["general"]["requests_total"]:,}</td>
</tr>
<tr>
<td>Total Tokens</td>
<td class="stat">{report_data["general"]["tokens_total"]:,}</td>
</tr>
<tr>
<td>Total Cost</td>
<td class="stat">${report_data["general"]["cost_total"]:.2f}</td>
</tr>
<tr>
<td>Average Response Time</td>
<td class="stat">{report_data["general"]["avg_response_time"]:.3f}s</td>
</tr>
<tr>
<td>Total Errors</td>
<td class="stat">{report_data["general"]["errors_total"]}</td>
</tr>
<tr>
<td>Error Rate</td>
<td class="stat">{report_data["general"]["error_rate"]*100:.2f}%</td>
</tr>
</table>
</div>
<div class="card">
<h2>Cache Statistics</h2>
<table>
<tr>
<td>Cache Hits</td>
<td class="stat">{report_data["cache"]["hits"]:,}</td>
</tr>
<tr>
<td>Cache Misses</td>
<td class="stat">{report_data["cache"]["misses"]:,}</td>
</tr>
<tr>
<td>Hit Ratio</td>
<td class="stat">{report_data["cache"]["hit_ratio"]*100:.2f}%</td>
</tr>
<tr>
<td>Cost Savings</td>
<td class="stat">${report_data["cache"]["saved_cost"]:.2f}</td>
</tr>
</table>
</div>
"""
# Add plots if available
if plot_paths:
html += """
<div class="card">
<h2>Usage Visualizations</h2>
"""
for plot_path in plot_paths:
# Use relative path
rel_path = os.path.relpath(plot_path, self.reports_dir)
html += f"""
<img class="plot" src="{rel_path}" alt="Usage Plot">
"""
html += """
</div>
"""
# Add top providers
if report_data["top_providers"]:
html += """
<div class="card">
<h2>Top Providers</h2>
<table>
<tr>
<th>Provider</th>
<th>Tokens</th>
<th>Percentage</th>
</tr>
"""
for provider in report_data["top_providers"]:
html += f"""
<tr>
<td>{provider["provider"]}</td>
<td>{provider["tokens"]:,}</td>
<td>{provider["percentage"]*100:.2f}%</td>
</tr>
"""
html += """
</table>
</div>
"""
# Add top models
if report_data["top_models"]:
html += """
<div class="card">
<h2>Top Models</h2>
<table>
<tr>
<th>Model</th>
<th>Tokens</th>
<th>Percentage</th>
</tr>
"""
for model in report_data["top_models"]:
html += f"""
<tr>
<td>{model["model"]}</td>
<td>{model["tokens"]:,}</td>
<td>{model["percentage"]*100:.2f}%</td>
</tr>
"""
html += """
</table>
</div>
"""
# Add daily usage
if report_data["daily_usage"]:
html += """
<div class="card">
<h2>Daily Usage</h2>
<table>
<tr>
<th>Date</th>
<th>Tokens</th>
<th>Cost</th>
</tr>
"""
for day in sorted(report_data["daily_usage"], key=lambda x: x["date"], reverse=True):
html += f"""
<tr>
<td>{day["date"]}</td>
<td>{day["tokens"]:,}</td>
<td>${day["cost"]:.2f}</td>
</tr>
"""
html += """
</table>
</div>
"""
# Close HTML
html += """
</div>
</body>
</html>
"""
return html
def _generate_markdown_report(self, report_data: Dict[str, Any], plot_paths: List[str]) -> str:
"""Generate a Markdown usage report.
Args:
report_data: Report data
plot_paths: List of plot file paths
Returns:
Markdown report content
"""
# Basic Markdown template
markdown = f"""# Ultimate MCP Server Usage Report
Generated at: {report_data["generated_at"]}
Period: {report_data["period"]}
## General Statistics
- **Uptime:** {report_data["general"]["uptime_human"]}
- **Total Requests:** {report_data["general"]["requests_total"]:,}
- **Total Tokens:** {report_data["general"]["tokens_total"]:,}
- **Total Cost:** ${report_data["general"]["cost_total"]:.2f}
- **Average Response Time:** {report_data["general"]["avg_response_time"]:.3f}s
- **Total Errors:** {report_data["general"]["errors_total"]}
- **Error Rate:** {report_data["general"]["error_rate"]*100:.2f}%
## Cache Statistics
- **Cache Hits:** {report_data["cache"]["hits"]:,}
- **Cache Misses:** {report_data["cache"]["misses"]:,}
- **Hit Ratio:** {report_data["cache"]["hit_ratio"]*100:.2f}%
- **Cost Savings:** ${report_data["cache"]["saved_cost"]:.2f}
"""
# Add plots if available
if plot_paths:
markdown += """## Usage Visualizations
"""
for plot_path in plot_paths:
# Use relative path
rel_path = os.path.relpath(plot_path, self.reports_dir)
markdown += f"""
"""
# Add top providers
if report_data["top_providers"]:
markdown += """## Top Providers
| Provider | Tokens | Percentage |
|----------|--------|------------|
"""
for provider in report_data["top_providers"]:
markdown += f"""| {provider["provider"]} | {provider["tokens"]:,} | {provider["percentage"]*100:.2f}% |
"""
markdown += "\n"
# Add top models
if report_data["top_models"]:
markdown += """## Top Models
| Model | Tokens | Percentage |
|-------|--------|------------|
"""
for model in report_data["top_models"]:
markdown += f"""| {model["model"]} | {model["tokens"]:,} | {model["percentage"]*100:.2f}% |
"""
markdown += "\n"
# Add daily usage
if report_data["daily_usage"]:
markdown += """## Daily Usage
| Date | Tokens | Cost |
|------|--------|------|
"""
for day in sorted(report_data["daily_usage"], key=lambda x: x["date"], reverse=True):
markdown += f"""| {day["date"]} | {day["tokens"]:,} | ${day["cost"]:.2f} |
"""
return markdown
def _generate_html_provider_report(self, report_data: Dict[str, Any], plot_paths: List[str]) -> str:
"""Generate an HTML provider report.
Args:
report_data: Report data
plot_paths: List of plot file paths
Returns:
HTML report content
"""
# Basic HTML template (similar to usage report but provider-specific)
provider = report_data["provider"]
provider_stats = report_data["stats"]
html = f"""<!DOCTYPE html>
<html>
<head>
<title>{provider} Provider Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
h1, h2, h3 {{ color: #333; }}
.container {{ max-width: 1200px; margin: 0 auto; }}
.card {{ background: #f9f9f9; border-radius: 5px; padding: 20px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
table {{ width: 100%; border-collapse: collapse; }}
th, td {{ padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }}
th {{ background-color: #f2f2f2; }}
.stat {{ font-size: 24px; font-weight: bold; color: #0066cc; }}
.plot {{ max-width: 100%; height: auto; margin: 20px 0; }}
</style>
</head>
<body>
<div class="container">
<h1>{provider} Provider Report</h1>
<p>Generated at: {report_data["generated_at"]}</p>
<p>Period: {report_data["period"]}</p>
<div class="card">
<h2>Provider Statistics</h2>
<table>
<tr>
<td>Total Requests</td>
<td class="stat">{provider_stats["requests"]:,}</td>
</tr>
<tr>
<td>Total Tokens</td>
<td class="stat">{provider_stats["tokens"]:,}</td>
</tr>
<tr>
<td>Total Cost</td>
<td class="stat">${provider_stats["cost"]:.2f}</td>
</tr>
<tr>
<td>Average Response Time</td>
<td class="stat">{provider_stats["avg_response_time"]:.3f}s</td>
</tr>
<tr>
<td>Total Errors</td>
<td class="stat">{provider_stats["errors"]}</td>
</tr>
<tr>
<td>Percentage of Total Usage</td>
<td class="stat">{report_data["percentage_of_total"]:.2f}%</td>
</tr>
</table>
</div>
"""
# Add plots if available
if plot_paths:
html += """
<div class="card">
<h2>Usage Visualizations</h2>
"""
for plot_path in plot_paths:
# Use relative path
rel_path = os.path.relpath(plot_path, self.reports_dir)
html += f"""
<img class="plot" src="{rel_path}" alt="Provider Usage Plot">
"""
html += """
</div>
"""
# Add models
if report_data["models"]:
html += """
<div class="card">
<h2>Models</h2>
<table>
<tr>
<th>Model</th>
<th>Requests</th>
<th>Tokens</th>
<th>Cost</th>
<th>Avg Response Time</th>
</tr>
"""
for model, data in report_data["models"].items():
html += f"""
<tr>
<td>{model}</td>
<td>{data["requests"]:,}</td>
<td>{data["tokens"]:,}</td>
<td>${data["cost"]:.2f}</td>
<td>{data["avg_response_time"]:.3f}s</td>
</tr>
"""
html += """
</table>
</div>
"""
# Add daily usage
if report_data["daily_usage"]:
html += """
<div class="card">
<h2>Daily Usage</h2>
<table>
<tr>
<th>Date</th>
<th>Tokens</th>
<th>Cost</th>
</tr>
"""
for day in sorted(report_data["daily_usage"], key=lambda x: x["date"], reverse=True):
html += f"""
<tr>
<td>{day["date"]}</td>
<td>{day["tokens"]:,}</td>
<td>${day["cost"]:.2f}</td>
</tr>
"""
html += """
</table>
</div>
"""
# Close HTML
html += """
</div>
</body>
</html>
"""
return html
def _generate_markdown_provider_report(self, report_data: Dict[str, Any], plot_paths: List[str]) -> str:
"""Generate a Markdown provider report.
Args:
report_data: Report data
plot_paths: List of plot file paths
Returns:
Markdown report content
"""
# Basic Markdown template (similar to usage report but provider-specific)
provider = report_data["provider"]
provider_stats = report_data["stats"]
markdown = f"""# {provider} Provider Report
Generated at: {report_data["generated_at"]}
Period: {report_data["period"]}
## Provider Statistics
- **Total Requests:** {provider_stats["requests"]:,}
- **Total Tokens:** {provider_stats["tokens"]:,}
- **Total Cost:** ${provider_stats["cost"]:.2f}
- **Average Response Time:** {provider_stats["avg_response_time"]:.3f}s
- **Total Errors:** {provider_stats["errors"]}
- **Percentage of Total Usage:** {report_data["percentage_of_total"]:.2f}%
"""
# Add plots if available
if plot_paths:
markdown += """## Usage Visualizations
"""
for plot_path in plot_paths:
# Use relative path
rel_path = os.path.relpath(plot_path, self.reports_dir)
markdown += f"""
"""
# Add models
if report_data["models"]:
markdown += """## Models
| Model | Requests | Tokens | Cost | Avg Response Time |
|-------|----------|--------|------|-------------------|
"""
for model, data in report_data["models"].items():
markdown += f"""| {model} | {data["requests"]:,} | {data["tokens"]:,} | ${data["cost"]:.2f} | {data["avg_response_time"]:.3f}s |
"""
markdown += "\n"
# Add daily usage
if report_data["daily_usage"]:
markdown += """## Daily Usage
| Date | Tokens | Cost |
|------|--------|------|
"""
for day in sorted(report_data["daily_usage"], key=lambda x: x["date"], reverse=True):
markdown += f"""| {day["date"]} | {day["tokens"]:,} | ${day["cost"]:.2f} |
"""
return markdown
def _generate_html_cost_report(self, report_data: Dict[str, Any], plot_paths: List[str]) -> str:
"""Generate an HTML cost report.
Args:
report_data: Report data
plot_paths: List of plot file paths
Returns:
HTML report content
"""
# Basic HTML template (cost-focused)
html = f"""<!DOCTYPE html>
<html>
<head>
<title>Ultimate MCP Server Cost Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
h1, h2, h3 {{ color: #333; }}
.container {{ max-width: 1200px; margin: 0 auto; }}
.card {{ background: #f9f9f9; border-radius: 5px; padding: 20px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
table {{ width: 100%; border-collapse: collapse; }}
th, td {{ padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }}
th {{ background-color: #f2f2f2; }}
.stat {{ font-size: 24px; font-weight: bold; color: #0066cc; }}
.cost {{ font-size: 24px; font-weight: bold; color: #cc0000; }}
.savings {{ font-size: 24px; font-weight: bold; color: #00cc00; }}
.plot {{ max-width: 100%; height: auto; margin: 20px 0; }}
</style>
</head>
<body>
<div class="container">
<h1>Ultimate MCP Server Cost Report</h1>
<p>Generated at: {report_data["generated_at"]}</p>
<p>Period: {report_data["period"]}</p>
<div class="card">
<h2>Cost Overview</h2>
<table>
<tr>
<td>Total Cost</td>
<td class="cost">${report_data["total_cost"]:.2f}</td>
</tr>
<tr>
<td>Cache Savings</td>
<td class="savings">${report_data["cache_savings"]:.2f}</td>
</tr>
<tr>
<td>Net Cost</td>
<td class="cost">${report_data["total_cost"] - report_data["cache_savings"]:.2f}</td>
</tr>
</table>
</div>
"""
# Add plots if available
if plot_paths:
html += """
<div class="card">
<h2>Cost Visualizations</h2>
"""
for plot_path in plot_paths:
# Use relative path
rel_path = os.path.relpath(plot_path, self.reports_dir)
html += f"""
<img class="plot" src="{rel_path}" alt="Cost Plot">
"""
html += """
</div>
"""
# Add provider costs
if report_data["provider_costs"]:
html += """
<div class="card">
<h2>Cost by Provider</h2>
<table>
<tr>
<th>Provider</th>
<th>Cost</th>
<th>Percentage</th>
</tr>
"""
for provider in report_data["provider_costs"]:
html += f"""
<tr>
<td>{provider["provider"]}</td>
<td>${provider["cost"]:.2f}</td>
<td>{provider["percentage"]:.2f}%</td>
</tr>
"""
html += """
</table>
</div>
"""
# Add model costs
if report_data["model_costs"]:
html += """
<div class="card">
<h2>Cost by Model</h2>
<table>
<tr>
<th>Model</th>
<th>Cost</th>
<th>Percentage</th>
</tr>
"""
for model in report_data["model_costs"]:
html += f"""
<tr>
<td>{model["model"]}</td>
<td>${model["cost"]:.2f}</td>
<td>{model["percentage"]:.2f}%</td>
</tr>
"""
html += """
</table>
</div>
"""
# Add cost efficiency
if report_data["cost_efficiency"]:
html += """
<div class="card">
<h2>Cost Efficiency (Tokens per Dollar)</h2>
<table>
<tr>
<th>Model</th>
<th>Tokens per Dollar</th>
<th>Tokens</th>
<th>Cost</th>
</tr>
"""
for model in report_data["cost_efficiency"]:
html += f"""
<tr>
<td>{model["model"]}</td>
<td class="stat">{model["tokens_per_dollar"]:,.0f}</td>
<td>{model["tokens"]:,}</td>
<td>${model["cost"]:.2f}</td>
</tr>
"""
html += """
</table>
</div>
"""
# Add daily costs
if report_data["daily_costs"]:
html += """
<div class="card">
<h2>Daily Costs</h2>
<table>
<tr>
<th>Date</th>
<th>Cost</th>
</tr>
"""
for day in sorted(report_data["daily_costs"], key=lambda x: x["date"], reverse=True):
html += f"""
<tr>
<td>{day["date"]}</td>
<td>${day["cost"]:.2f}</td>
</tr>
"""
html += """
</table>
</div>
"""
# Close HTML
html += """
</div>
</body>
</html>
"""
return html
def _generate_markdown_cost_report(self, report_data: Dict[str, Any], plot_paths: List[str]) -> str:
"""Generate a Markdown cost report.
Args:
report_data: Report data
plot_paths: List of plot file paths
Returns:
Markdown report content
"""
# Basic Markdown template (cost-focused)
markdown = f"""# Ultimate MCP Server Cost Report
Generated at: {report_data["generated_at"]}
Period: {report_data["period"]}
## Cost Overview
- **Total Cost:** ${report_data["total_cost"]:.2f}
- **Cache Savings:** ${report_data["cache_savings"]:.2f}
- **Net Cost:** ${report_data["total_cost"] - report_data["cache_savings"]:.2f}
"""
# Add plots if available
if plot_paths:
markdown += """## Cost Visualizations
"""
for plot_path in plot_paths:
# Use relative path
rel_path = os.path.relpath(plot_path, self.reports_dir)
markdown += f"""
"""
# Add provider costs
if report_data["provider_costs"]:
markdown += """## Cost by Provider
| Provider | Cost | Percentage |
|----------|------|------------|
"""
for provider in report_data["provider_costs"]:
markdown += f"""| {provider["provider"]} | ${provider["cost"]:.2f} | {provider["percentage"]:.2f}% |
"""
markdown += "\n"
# Add model costs
if report_data["model_costs"]:
markdown += """## Cost by Model
| Model | Cost | Percentage |
|-------|------|------------|
"""
for model in report_data["model_costs"]:
markdown += f"""| {model["model"]} | ${model["cost"]:.2f} | {model["percentage"]:.2f}% |
"""
markdown += "\n"
# Add cost efficiency
if report_data["cost_efficiency"]:
markdown += """## Cost Efficiency (Tokens per Dollar)
| Model | Tokens per Dollar | Tokens | Cost |
|-------|-------------------|--------|------|
"""
for model in report_data["cost_efficiency"]:
markdown += f"""| {model["model"]} | {model["tokens_per_dollar"]:,.0f} | {model["tokens"]:,} | ${model["cost"]:.2f} |
"""
markdown += "\n"
# Add daily costs
if report_data["daily_costs"]:
markdown += """## Daily Costs
| Date | Cost |
|------|------|
"""
for day in sorted(report_data["daily_costs"], key=lambda x: x["date"], reverse=True):
markdown += f"""| {day["date"]} | ${day["cost"]:.2f} |
"""
return markdown
```