#
tokens: 48391/50000 33/207 files (page 2/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/emojis.py:
--------------------------------------------------------------------------------

```python
"""
Emoji definitions for Gateway logging system.

This module contains constants for emojis used in logging to provide visual cues
about the type and severity of log messages.
"""
from typing import Dict

# Log level emojis
INFO = "ℹ️"
DEBUG = "🔍"
WARNING = "⚠️"
ERROR = "❌"
CRITICAL = "🚨"
SUCCESS = "✅"
TRACE = "📍"

# Status emojis
RUNNING = "🔄"
PENDING = "⏳"
COMPLETED = "🏁"
FAILED = "👎"
STARTING = "🚀"
STOPPING = "🛑"
RESTARTING = "🔁"
LOADING = "📥"
SAVING = "📤"
CANCELLED = "🚫"
TIMEOUT = "⏱️"
SKIPPED = "⏭️"

# Operation emojis (Adapt for ultimate)
REQUEST = "➡️" # Example
RESPONSE = "⬅️" # Example
PROCESS = "⚙️"  # Example
CACHE_HIT = "✅" # Example
CACHE_MISS = "❌" # Example
AUTHENTICATE = "🔒" # Example
AUTHORIZE = "🔑" # Example
VALIDATE = "✔️"
CONNECT = "🔌"
DISCONNECT = "🔌"
UPDATE = "📝"

# Component emojis (Adapt for ultimate)
CORE = "⚙️"
PROVIDER = "☁️" # Example
ROUTER = "🔀" # Example
CACHE = "📦"
API = "🌐"
MCP = "📡" # Keep if relevant
UTILS = "🔧" # Example

# Tool emojis (Keep/remove/add as needed)
# RIPGREP = "🔍"
# AWK = "🔧"
# JQ = "🧰"
# SQLITE = "🗃️"

# Result emojis
FOUND = "🎯"
NOT_FOUND = "🔍"
PARTIAL = "◐"
UNKNOWN = "❓"
HIGH_CONFIDENCE = "🔒"
MEDIUM_CONFIDENCE = "🔓"
LOW_CONFIDENCE = "🚪"

# System emojis
STARTUP = "🔆"
SHUTDOWN = "🔅"
CONFIG = "⚙️"
ERROR = "⛔" # Distinct from level error
WARNING = "⚠️" # Same as level warning
DEPENDENCY = "🧱"
VERSION = "🏷️"
UPDATE_AVAILABLE = "🆕"

# User interaction emojis (Keep if relevant)
INPUT = "⌨️"
OUTPUT = "📺"
HELP = "❓"
HINT = "💡"
EXAMPLE = "📋"
QUESTION = "❓"
ANSWER = "💬"

# Time emojis
TIMING = "⏱️"
SCHEDULED = "📅"
DELAYED = "⏰"
OVERTIME = "⌛"

# Convenience mapping for log levels
LEVEL_EMOJIS: Dict[str, str] = {
    "info": INFO,
    "debug": DEBUG,
    "warning": WARNING,
    "error": ERROR,
    "critical": CRITICAL,
    "success": SUCCESS,
    "trace": TRACE,
}

# Dictionary for mapping operation names to emojis
OPERATION_EMOJIS: Dict[str, str] = {
    "request": REQUEST,
    "response": RESPONSE,
    "process": PROCESS,
    "cache_hit": CACHE_HIT,
    "cache_miss": CACHE_MISS,
    "authenticate": AUTHENTICATE,
    "authorize": AUTHORIZE,
    "validate": VALIDATE,
    "connect": CONNECT,
    "disconnect": DISCONNECT,
    "update": UPDATE,
    # Add other common operations here
    "startup": STARTUP,
    "shutdown": SHUTDOWN,
    "config": CONFIG,
}

# Dictionary for mapping component names to emojis
COMPONENT_EMOJIS: Dict[str, str] = {
    "core": CORE,
    "provider": PROVIDER,
    "router": ROUTER,
    "cache": CACHE,
    "api": API,
    "mcp": MCP,
    "utils": UTILS,
    # Add other components here
}

# Get emoji by name function for more dynamic access
def get_emoji(category: str, name: str) -> str:
    """Get an emoji by category and name.
    
    Args:
        category: The category of emoji (e.g., 'level', 'status', 'operation', 'component')
        name: The name of the emoji within that category
    
    Returns:
        The emoji string or a default '?' if not found
    """
    category = category.lower()
    name_lower = name.lower()
    
    if category == "level":
        return LEVEL_EMOJIS.get(name_lower, "?")
    elif category == "operation":
        return OPERATION_EMOJIS.get(name_lower, "⚙️") # Default to generic gear
    elif category == "component":
        return COMPONENT_EMOJIS.get(name_lower, "🧩") # Default to puzzle piece
    
    # Fallback for other categories or direct constant lookup
    name_upper = name.upper()
    globals_dict = globals()
    if name_upper in globals_dict:
        return globals_dict[name_upper]
        
    # Default if nothing matches
    return "❓" 
```

--------------------------------------------------------------------------------
/examples/sample/text_classification_samples/support_tickets.txt:
--------------------------------------------------------------------------------

```
BUG REPORT:
The export to PDF feature is completely broken in version 3.2.1. When I click the export button, the application freezes for about 30 seconds, then crashes with error code 0x8007EE7. This happens consistently on every attempt. I've tried reinstalling the software and clearing the cache as suggested in the knowledge base, but the issue persists. This is blocking our team from delivering reports to clients. System specs: Windows 11 Pro, 16GB RAM, Intel i7-12700K.

FEATURE REQUEST:
It would be extremely helpful if you could add a bulk editing option for tags in the content management system. Currently, we have to edit tags one by one, which is very time-consuming when managing hundreds of articles. Ideally, we should be able to select multiple content pieces and apply or remove tags from all of them at once. This would save our editorial team hours of work each week and reduce the chance of tagging inconsistencies.

ACCOUNT ISSUE:
I'm unable to access my premium account despite having an active subscription. When I log in, the system still shows I have a free account with limited features. I can see in my bank statement that the $14.99 monthly charge went through three days ago. I've tried logging out and back in, clearing cookies, and using a different browser, but the problem remains. My account email is [email protected] and my customer ID is CUST-58924.

BILLING QUESTION:
I noticed an unexpected charge of $29.99 on my credit card statement from your company dated June 15th. I was under the impression that my subscription was $19.99/month. Was there a price increase that I missed notification about? Or is this an error? Please clarify what this charge covers and if there's been a change to my subscription terms. My account is registered under [email protected].

TECHNICAL QUESTION:
Is it possible to integrate your API with Zapier? We're trying to automate our workflow between your platform and our CRM system. I've looked through the documentation but couldn't find specific information about Zapier integrations. If this is supported, could you point me to relevant documentation or examples? If not, do you have any recommendations for alternative integration methods that wouldn't require custom development?

BUG REPORT:
There appears to be a security vulnerability in the user permission system. I discovered that standard users can access administrative reports by directly navigating to the URL pattern /admin/reports/custom/[report-id] even without admin privileges. I've verified this with two different standard user accounts. This potentially exposes sensitive company data to unauthorized personnel. Please address this urgently as it represents a significant security concern for our organization.

FEATURE REQUEST:
Could you please consider adding dark mode to both the web and mobile applications? Working with the current bright interface during evening hours is causing eye strain for many of our team members. Ideally, the dark mode would be automatically triggered based on system settings but with the option to manually override. This has become a standard feature in most professional applications, and would greatly improve the user experience for those of us who work long hours.

ACCOUNT ISSUE:
Our team admin left the company last week, and we need to transfer administrative privileges to another team member. The admin account was under [email protected]. We need to assign admin rights to [email protected] as soon as possible, as we're currently unable to add new team members or modify subscription settings. Our business account number is BIZ-4452-T. Please advise on the process for this transfer. 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/cache/utils.py:
--------------------------------------------------------------------------------

```python
"""Cache utility functions for Ultimate MCP Server.

This module provides utility functions for working with the cache service
that were previously defined in example scripts but are now part of the library.
"""

import hashlib

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.services.cache import get_cache_service
from ultimate_mcp_server.utils import get_logger

# Initialize logger
logger = get_logger("ultimate_mcp_server.services.cache.utils")

async def run_completion_with_cache(
    prompt: str,
    provider_name: str = Provider.OPENAI.value,
    model: str = None,
    temperature: float = 0.1,
    max_tokens: int = None,
    use_cache: bool = True,
    ttl: int = 3600,  # Default 1 hour cache TTL
    api_key: str = None
):
    """Run a completion with automatic caching.
    
    This utility function handles provider initialization, cache key generation,
    cache lookups, and caching results automatically.
    
    Args:
        prompt: Text prompt for completion
        provider_name: Provider to use (default: OpenAI)
        model: Model name (optional, uses provider default if not specified)
        temperature: Temperature for generation (default: 0.1)
        max_tokens: Maximum tokens to generate (optional)
        use_cache: Whether to use cache (default: True)
        ttl: Cache TTL in seconds (default: 3600/1 hour)
        api_key: Provider API key (optional, falls back to internal provider system)
        
    Returns:
        Completion result with additional processing_time attribute
    """
    try:
        # Let the provider system handle API keys if none provided
        provider = await get_provider(provider_name, api_key=api_key)
        await provider.initialize()
    except Exception as e:
         logger.error(f"Failed to initialize provider {provider_name}: {e}", emoji_key="error")
         raise # Re-raise exception to stop execution if provider fails
    
    cache_service = get_cache_service()
    
    # Create a more robust cache key using all relevant parameters
    model_id = model or provider.get_default_model() # Ensure we have a model id
    
    # Create consistent hash of parameters that affect the result
    params_str = f"{prompt}:{temperature}:{max_tokens if max_tokens else 'default'}"
    params_hash = hashlib.md5(params_str.encode()).hexdigest()
    
    cache_key = f"completion:{provider_name}:{model_id}:{params_hash}"
    
    if use_cache and cache_service.enabled:
        cached_result = await cache_service.get(cache_key)
        if cached_result is not None:
            logger.success("Cache hit! Using cached result", emoji_key="cache")
            # Set processing time for cache retrieval (negligible)
            cached_result.processing_time = 0.001 
            return cached_result
    
    # Generate completion if not cached or cache disabled
    if use_cache:
        logger.info("Cache miss. Generating new completion...", emoji_key="processing")
    else:
        logger.info("Cache disabled by request. Generating new completion...", emoji_key="processing")
        
    # Use the determined model_id and pass through other parameters
    result = await provider.generate_completion(
        prompt=prompt,
        model=model_id,
        temperature=temperature,
        max_tokens=max_tokens
    )
    
    # Save to cache if enabled
    if use_cache and cache_service.enabled:
        await cache_service.set(
            key=cache_key,
            value=result,
            ttl=ttl
        )
        logger.info(f"Result saved to cache (key: ...{cache_key[-10:]})", emoji_key="cache")
        
    return result 
```

--------------------------------------------------------------------------------
/TEST_README.md:
--------------------------------------------------------------------------------

```markdown
# Ultimate MCP Server Test Scripts

This directory contains test scripts to validate your Ultimate MCP Server functionality.

## Prerequisites

Make sure you have FastMCP installed:
```bash
pip install fastmcp
# or
uv add fastmcp
```

Also install aiohttp for REST API testing:
```bash
pip install aiohttp
# or  
uv add aiohttp
```

## Test Scripts

### 1. `quick_test.py` - Quick Connectivity Test
**Purpose**: Fast basic connectivity and functionality check
**Runtime**: ~5 seconds

```bash
python quick_test.py
```

This script tests:
- ✅ Basic MCP connection
- 📢 Echo tool functionality  
- 🔌 Provider availability
- 🛠️ Tool count
- 📚 Resource count

### 2. `test_client.py` - Interactive Test Client
**Purpose**: Comprehensive testing with interactive mode
**Runtime**: Variable (can be used interactively)

```bash
python test_client.py
```

This script tests:
- 🔗 Server connection
- 📋 Tool listing and calling
- 📚 Resource reading
- 🤖 LLM completions
- 📁 Filesystem tools
- 🐍 Python execution
- 📝 Text processing tools
- 🎮 Interactive command mode

**Interactive Commands**:
- `list` - Show available tools
- `resources` - Show available resources  
- `call <tool_name> <json_params>` - Call a tool
- `read <resource_uri>` - Read a resource
- `quit` - Exit

### 3. `comprehensive_test.py` - Full Test Suite
**Purpose**: Complete validation of MCP and REST API functionality
**Runtime**: ~30 seconds

```bash
python comprehensive_test.py
```

This script tests:
- 🔧 MCP Interface (tools, providers, filesystem, Python)
- 🌐 REST API Endpoints (discovery, health, docs, cognitive states, performance, artifacts)
- 🤖 LLM Completions (actual generation with available providers)
- 🧠 Memory System (storage, retrieval, cognitive states)

## Understanding Results

### ✅ Green Check - Working Correctly
The feature is functioning as expected.

### ❌ Red X - Needs Attention  
The feature failed or is not available. Common reasons:
- API keys not configured
- Provider services unavailable
- Database connection issues
- Missing dependencies

## Your Server Configuration

Based on your server startup logs, your server has:
- **107 tools** loaded (all available tools mode)
- **7 LLM providers** configured:
  - ✅ Anthropic (3 models)
  - ✅ DeepSeek (2 models) 
  - ✅ Gemini (4 models)
  - ✅ OpenRouter (3 models)
  - ✅ Ollama (3 models) - Local
  - ✅ Grok (4 models)
  - ✅ OpenAI (47 models)

## Endpoints Available

### MCP Protocol
- `http://127.0.0.1:8013/mcp` - Main MCP streamable-HTTP endpoint

### REST API
- `http://127.0.0.1:8013/` - Discovery endpoint
- `http://127.0.0.1:8013/api/health` - Health check
- `http://127.0.0.1:8013/api/docs` - Swagger UI documentation
- `http://127.0.0.1:8013/api/cognitive-states` - Cognitive state management
- `http://127.0.0.1:8013/api/performance/overview` - Performance metrics
- `http://127.0.0.1:8013/api/artifacts` - Artifact management

### UMS Explorer
- `http://127.0.0.1:8013/api/ums-explorer` - Memory system explorer UI

## Troubleshooting

### Connection Failed
- Verify server is running on port 8013
- Check firewall settings
- Ensure no other service is using the port

### Provider Errors  
- Check API keys in environment variables
- Verify provider service availability
- Test with local Ollama first (no API key needed)

### Tool Errors
- Check filesystem permissions
- Verify Python sandbox configuration
- Check database connectivity

## Example Usage

```bash
# Quick smoke test
python quick_test.py

# Interactive exploration
python test_client.py
# Then type: list
# Then type: call echo {"message": "Hello!"}

# Full validation
python comprehensive_test.py
```

## Next Steps

After successful testing:
1. Check the Swagger UI at `http://127.0.0.1:8013/api/docs`
2. Explore the UMS Explorer at `http://127.0.0.1:8013/api/ums-explorer`  
3. Test with a real MCP client like Claude Desktop
4. Start building your applications using the MCP tools! 
```

--------------------------------------------------------------------------------
/tests/manual/test_extraction_advanced.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Manual test for advanced extraction tools using standardized completion.
This script tests the remaining extraction tools that were refactored to use
the standardized completion tool.
"""

import asyncio
import json
import os
import sys

# Add the project root to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.tools.extraction import extract_code_from_response, extract_semantic_schema


async def test_extract_semantic_schema():
    """Test the extract_semantic_schema function with a simple schema."""
    print("\n--- Testing extract_semantic_schema ---")
    
    # Define a JSON schema to extract data
    schema = {
        "type": "object",
        "properties": {
            "name": {"type": "string"},
            "email": {"type": "string"},
            "phone": {"type": "string"},
            "interests": {"type": "array", "items": {"type": "string"}}
        }
    }
    
    # Sample text containing information matching the schema
    sample_text = """
    Profile information:
    
    Name: Sarah Johnson
    Contact: [email protected]
    Phone Number: 555-987-6543
    
    Sarah is interested in: machine learning, data visualization, and hiking.
    """
    
    result = await extract_semantic_schema(
        text=sample_text,
        semantic_schema=schema,
        provider=Provider.OPENAI.value,
        model="gpt-3.5-turbo"
    )
    
    print(f"Success: {result.get('success', False)}")
    print(f"Model used: {result.get('model', 'unknown')}")
    print(f"Tokens: {result.get('tokens', {})}")
    print(f"Processing time: {result.get('processing_time', 0):.2f}s")
    
    # Pretty print the extracted data
    if result.get('data'):
        print("Extracted Schema Data:")
        print(json.dumps(result['data'], indent=2))
    else:
        print("Failed to extract schema data")
        print(f"Error: {result.get('error', 'unknown error')}")


async def test_extract_code_from_response():
    """Test the extract_code_from_response function."""
    print("\n--- Testing extract_code_from_response ---")
    
    # Sample text with a code block
    sample_text = """
    Here's a Python function to calculate the factorial of a number:
    
    ```python
    def factorial(n):
        if n == 0 or n == 1:
            return 1
        else:
            return n * factorial(n-1)
            
    # Example usage
    print(factorial(5))  # Output: 120
    ```
    
    This uses a recursive approach to calculate the factorial.
    """
    
    # Test with regex-based extraction
    print("Testing regex-based extraction...")
    extracted_code = await extract_code_from_response(
        response_text=sample_text,
        model="openai/gpt-3.5-turbo",
        timeout=10
    )
    
    print("Extracted Code:")
    print(extracted_code)
    
    # Test with LLM-based extraction on text without markdown
    print("\nTesting LLM-based extraction...")
    sample_text_no_markdown = """
    Here's a Python function to calculate the factorial of a number:
    
    def factorial(n):
        if n == 0 or n == 1:
            return 1
        else:
            return n * factorial(n-1)
            
    # Example usage
    print(factorial(5))  # Output: 120
    
    This uses a recursive approach to calculate the factorial.
    """
    
    extracted_code = await extract_code_from_response(
        response_text=sample_text_no_markdown,
        model="openai/gpt-3.5-turbo",
        timeout=10
    )
    
    print("Extracted Code:")
    print(extracted_code)


async def main():
    """Run all tests."""
    print("Testing advanced extraction tools with standardized completion...")
    
    await test_extract_semantic_schema()
    await test_extract_code_from_response()
    
    print("\nAll tests completed.")


if __name__ == "__main__":
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/examples/sample/article.txt:
--------------------------------------------------------------------------------

```
## Tech Industry Shake-up: Microsoft Acquires AI Startup Anthropic for $10B

March 25, 2025 | Sarah Johnson, Technology Reporter

In a move that has sent shockwaves through Silicon Valley, Microsoft Corporation announced yesterday its acquisition of AI research company Anthropic for $10 billion. The deal, which was finalized after months of secretive negotiations, marks Microsoft's largest investment in artificial intelligence to date.

Microsoft CEO Satya Nadella explained the strategic importance of the acquisition during a press conference held at Microsoft's headquarters in Redmond, Washington. "Anthropic's Claude AI models represent some of the most sophisticated language systems ever developed," Nadella stated. "This acquisition strengthens our commitment to responsible AI development and ensures Microsoft remains at the forefront of the AI revolution."

Anthropic, founded in 2021 by former OpenAI researchers Dario Amodei and Daniela Amodei, has gained recognition for its Claude family of AI models that emphasize safety and interpretability. The company had previously received significant funding from Google and Amazon, with Google investing $300 million for a 10% stake in 2023, and Amazon committing up to $4 billion in September 2023.

Both Dario Amodei, who serves as Anthropic's CEO, and Daniela Amodei, the company's President, will join Microsoft's AI leadership team while continuing to oversee Anthropic's operations. "Joining forces with Microsoft gives us the computational resources and research talent needed to advance our constitutional AI approach," said Dario Amodei. "We believe this partnership will accelerate our mission to develop AI systems that are steerable, interpretable, and robust."

The acquisition has raised antitrust concerns, with the Federal Trade Commission (FTC) Chair Lina Khan announcing an immediate review of the deal. "We will scrutinize this acquisition carefully to ensure it doesn't further concentrate power in the already consolidated AI sector," Khan said in a statement released by the FTC.

Google's parent company Alphabet and Amazon, both major investors in Anthropic, may face significant losses from the acquisition. Alphabet's stock (GOOGL) fell 3.2% following the announcement, while Amazon (AMZN) saw a more modest decline of 1.5%. In contrast, Microsoft (MSFT) shares jumped 5.8% to $420.75.

OpenAI CEO Sam Altman expressed surprise at the acquisition in a post on X (formerly Twitter): "Congratulations to the Anthropic team. This creates an interesting competitive landscape. Game on." OpenAI, which has received approximately $13 billion in investment from Microsoft, now finds itself in the unusual position of competing with another Microsoft-owned AI company.

Industry analyst Maria Rodriguez from Morgan Stanley noted that the acquisition signals a new phase in the AI arms race. "Microsoft is clearly hedging its bets by owning stakes in both leading frontier AI labs. This could be interpreted as uncertainty about which approach to AI safety and capabilities will ultimately succeed," Rodriguez explained in a research note to investors.

The deal includes provisions for Anthropic to continue operating as a separate entity within Microsoft, with guaranteed compute resources on Microsoft's Azure cloud platform. All of Anthropic's 350 employees will be retained, and the company's San Francisco headquarters will remain operational.

According to sources familiar with the matter, the acquisition talks began after a dinner meeting between Nadella and Dario Amodei at the World Economic Forum in Davos, Switzerland in January 2025. Microsoft President Brad Smith and CFO Amy Hood were reportedly instrumental in structuring the complex deal.

The acquisition is expected to close by Q3 2025, pending regulatory approval. If approved, it would mark another significant milestone in the rapidly evolving artificial intelligence industry, where companies are increasingly competing for talent, technology, and market position. 
```

--------------------------------------------------------------------------------
/examples/sample/text_classification_samples/product_reviews.txt:
--------------------------------------------------------------------------------

```
POSITIVE REVIEW:
I absolutely love this coffee maker! It brews the perfect cup every time and the temperature control is spot on. After three months of daily use, I'm still impressed with how consistent the results are. The app connectivity seemed gimmicky at first, but being able to schedule brews from bed has been a game-changer for my morning routine. Clean-up is simple with the removable parts and the sleek design fits perfectly in my kitchen. Best appliance purchase I've made in years!

NEGATIVE REVIEW:
This laptop has been nothing but trouble since day one. The battery barely lasts 2 hours despite the "all-day battery life" claim, and it overheats constantly even with basic web browsing. The keyboard started having sticky keys after just two weeks, and customer support has been completely unhelpful. To make matters worse, the screen has strange flickering issues that come and go randomly. Save your money and avoid this model completely - total waste of $1200.

NEUTRAL REVIEW:
The wireless earbuds are decent for the price point. Sound quality is acceptable though not exceptional - you get what you pay for. Battery life matches the advertised 4 hours, and the charging case provides about 3 full charges as expected. The fit is comfortable enough for short periods, though they start to hurt after about 2 hours of continuous use. Connectivity is generally stable but occasional drops occur when the phone is in a pocket. Overall, a reasonable budget option if you're not an audiophile.

POSITIVE REVIEW:
This air fryer has completely transformed how I cook! Food comes out perfectly crispy on the outside and juicy on the inside, all with minimal or no oil. It preheats quickly and the digital controls are intuitive and responsive. I appreciate the dishwasher-safe basket which makes cleanup a breeze. Even my kids are eating more vegetables now because they taste so good prepared this way. The unit is a bit bulky on the counter, but the performance more than makes up for the space it takes.

NEGATIVE REVIEW:
I regret purchasing this robot vacuum. It constantly gets stuck under furniture despite claiming to have "smart navigation," and the battery dies before finishing our modestly sized apartment. The app disconnects frequently requiring tedious reconnection processes. The dust bin is way too small and needs emptying after each use. Worst of all, it scratched our hardwood floors in several places! Customer service offered little help beyond basic troubleshooting steps I'd already tried. Returning this disappointment ASAP.

NEUTRAL REVIEW:
The fitness tracker works as advertised for basic functions. Step counting seems accurate enough for casual use, and the sleep tracking provides interesting if not necessarily actionable data. Heart rate monitoring is hit or miss during high-intensity workouts but fine for resting measurements. The app is somewhat clunky but gets the job done. Battery lasts about 4 days which is adequate. The band is comfortable but shows signs of wear after a couple months. It's not outstanding but reasonable value for the price point.

POSITIVE REVIEW:
This blender is an absolute powerhouse! I've thrown everything at it from frozen fruits to tough vegetables and nuts, and it creates perfectly smooth blends every time. The variable speed control gives precise results whether you want chunky salsa or silky smoothies. It's definitely louder than my previous blender, but the performance justifies the noise. The container is easy to clean and the blades are impressively durable. Yes, it's expensive, but given the quality and 7-year warranty, it's worth every penny for a serious home cook.

NEGATIVE REVIEW:
These "premium" headphones are anything but premium. The sound quality is muddy with overwhelming bass that drowns out mids and highs. The noise cancellation is so weak it barely reduces ambient noise. The build quality feels cheap with plastic parts that creak when you adjust them. After just one month, the right ear cup started cutting out intermittently. For this price point, I expected far better quality and performance. Definitely returning these and going with a more reliable brand. 
```

--------------------------------------------------------------------------------
/examples/sample/text_classification_samples/news_samples.txt:
--------------------------------------------------------------------------------

```
TECH NEWS SAMPLE:
Apple unveiled its new Apple Intelligence features today, integrating advanced AI capabilities directly into iOS 18, macOS Sequoia, and iPadOS 18. This marks a significant shift in Apple's strategy, embracing generative AI while maintaining their focus on privacy by processing most tasks on-device. Features include intelligent email summarization, photo editing via text prompts, and a significantly enhanced Siri experience that can understand context across apps.

SPORTS NEWS SAMPLE:
The Boston Celtics clinched their 18th NBA championship with a decisive 106-88 victory over the Dallas Mavericks last night, winning the series 4-1. Jayson Tatum was named Finals MVP after averaging 28.5 points and 9.8 rebounds during the series. "This is what we've worked for all season," said Celtics coach Joe Mazzulla. The victory ends a 16-year championship drought for the storied franchise, which now moves ahead of the Los Angeles Lakers for most NBA titles.

POLITICS NEWS SAMPLE:
The Senate passed a major bipartisan infrastructure bill today with a 69-30 vote, allocating $1.2 trillion for roads, bridges, public transit, and broadband internet. The legislation represents a rare moment of cooperation in a deeply divided Congress. "This bill shows America can do big things when we work together," said President Biden at a press conference following the vote. The bill now moves to the House, where progressive Democrats have tied its passage to a larger $3.5 trillion social spending package.

HEALTH NEWS SAMPLE:
A new study published in The Lancet suggests that intermittent fasting may not offer significant weight loss advantages over traditional calorie restriction diets. Researchers followed 250 participants over a 12-month period and found that while both approaches led to weight loss, there was no statistically significant difference between the two methods. "What matters most is consistency and finding an eating pattern that works for your lifestyle," said lead researcher Dr. Emily Chen from Stanford University.

ENTERTAINMENT NEWS SAMPLE:
The 96th Academy Awards ceremony delivered several surprises, with "The Quiet Hour" taking home Best Picture despite being a low-budget independent film. Lead actress Zoe Kazan won Best Actress for her role as a Holocaust survivor, while Christopher Nolan finally secured his first Best Director Oscar for "Synchronicity." The ceremony saw a 12% increase in viewership from last year, reversing a years-long decline in ratings for Hollywood's biggest night.

SCIENCE NEWS SAMPLE:
NASA's Europa Clipper mission has entered its final assembly phase, with launch scheduled for October 2024. The spacecraft will conduct detailed reconnaissance of Jupiter's moon Europa, which scientists believe harbors a subsurface ocean that could potentially support life. "This mission represents our best chance to determine if Europa's ocean is habitable," said project scientist Dr. Robert Pappalardo. The spacecraft will make nearly 50 close flybys of Europa, collecting data that will help scientists understand the moon's potential to harbor life.

BUSINESS NEWS SAMPLE:
Tesla announced record quarterly profits today, exceeding Wall Street expectations with revenue of $24.3 billion and earnings per share of $1.24. The electric vehicle manufacturer delivered 466,000 vehicles in Q2, a 50% increase from the same period last year. CEO Elon Musk attributed the success to improved production efficiency and strong demand for the Model Y. The company also revealed plans to begin production of its Cybertruck at the Texas Gigafactory by early next quarter, ending years of delays for the highly anticipated vehicle.

EDUCATION NEWS SAMPLE:
A landmark study from the Department of Education found that states implementing universal pre-kindergarten programs saw significant improvements in literacy rates and reduced achievement gaps. The research, which followed 28,000 students across 12 states, showed that children who attended quality pre-K programs were 38% more likely to read at grade level by third grade compared to their peers. "This provides compelling evidence that early childhood education should be a national priority," said Education Secretary Miguel Cardona. 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/utils.py:
--------------------------------------------------------------------------------

```python
"""Utility functions for the knowledge base services."""
from typing import Any, Dict, List, Optional


def build_metadata_filter(
    filters: Optional[Dict[str, Any]] = None,
    operator: str = "$and"
) -> Optional[Dict[str, Any]]:
    """Build a ChromaDB metadata filter.
    
    Args:
        filters: Dictionary of metadata filters (field->value or field->{op: value})
        operator: Logical operator to combine filters ($and or $or)
        
    Returns:
        ChromaDB-compatible filter or None
    """
    if not filters:
        return None
    
    # Handle direct equality case with single filter
    if len(filters) == 1 and not any(isinstance(v, dict) for v in filters.values()):
        field, value = next(iter(filters.items()))
        return {field: value}  # ChromaDB handles direct equality
    
    # Process complex filters
    filter_conditions = []
    
    for field, condition in filters.items():
        if isinstance(condition, dict):
            # Already has operators
            if any(k.startswith("$") for k in condition.keys()):
                filter_conditions.append({field: condition})
            else:
                # Convert to $eq
                filter_conditions.append({field: {"$eq": condition}})
        else:
            # Simple equality
            filter_conditions.append({field: {"$eq": condition}})
    
    # If only one condition, no need for logical operator
    if len(filter_conditions) == 1:
        return filter_conditions[0]
    
    # Combine with logical operator
    return {operator: filter_conditions}


def extract_keywords(text: str, min_length: int = 3, max_keywords: int = 10) -> List[str]:
    """Extract important keywords from text.
    
    Args:
        text: Input text
        min_length: Minimum length of keywords
        max_keywords: Maximum number of keywords to extract
        
    Returns:
        List of keywords
    """
    # Simple keyword extraction (could be improved with NLP)
    words = text.lower().split()
    
    # Filter out short words and common stop words
    stop_words = {
        "the", "and", "a", "an", "in", "on", "at", "to", "for", "with", "by", 
        "is", "are", "was", "were", "be", "been", "has", "have", "had", "of", "that"
    }
    
    keywords = [
        word.strip(".,?!\"'()[]{}:;") 
        for word in words 
        if len(word) >= min_length and word.lower() not in stop_words
    ]
    
    # Count occurrences
    keyword_counts = {}
    for word in keywords:
        if word in keyword_counts:
            keyword_counts[word] += 1
        else:
            keyword_counts[word] = 1
    
    # Sort by frequency
    sorted_keywords = sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)
    
    # Return top keywords
    return [k for k, _ in sorted_keywords[:max_keywords]]


def generate_token_estimate(text: str) -> int:
    """Generate a rough estimate of token count.
    
    Args:
        text: Input text
        
    Returns:
        Estimated token count
    """
    # Rough estimate based on whitespace tokenization and a multiplier
    # This is a very crude approximation
    words = len(text.split())
    
    # Adjust for non-English or technical content
    if any(ord(c) > 127 for c in text):  # Has non-ASCII chars
        return int(words * 1.5)  # Non-English texts need more tokens
    
    # Standard English approximation
    return int(words * 1.3)  # Account for tokenization differences


def create_document_metadata(
    document: str,
    source: Optional[str] = None,
    document_type: Optional[str] = None
) -> Dict[str, Any]:
    """Create metadata for a document.
    
    Args:
        document: Document text
        source: Optional source of the document
        document_type: Optional document type
        
    Returns:
        Document metadata
    """
    # Basic metadata
    metadata = {
        "length": len(document),
        "token_estimate": generate_token_estimate(document),
        "created_at": int(1000 * import_time()),
    }
    
    # Add source if provided
    if source:
        metadata["source"] = source
    
    # Add document type if provided
    if document_type:
        metadata["type"] = document_type
    
    # Extract potential title from first line
    lines = document.strip().split("\n")
    if lines and len(lines[0]) < 100:  # Potential title
        metadata["potential_title"] = lines[0]
    
    return metadata


# Import at the end to avoid circular imports
import time as import_time  # noqa: E402

```

--------------------------------------------------------------------------------
/tests/manual/test_extraction.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Manual test for extraction tools using standardized completion.
This script tests the key functions in extraction.py to ensure they work
with the updated standardized completion tool.
"""

import asyncio
import json
import os
import sys

# Add the project root to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.tools.extraction import (
    extract_json,
    extract_key_value_pairs,
    extract_table,
)


async def test_extract_json():
    """Test the extract_json function with a simple JSON object."""
    print("\n--- Testing extract_json ---")
    # Simplified JSON without nested structures
    sample_text = """
    Here's the result of my analysis:
    
    {
      "name": "John Smith",
      "age": 42,
      "skills": "programming, design, project management",
      "email": "[email protected]",
      "phone": "555-1234"
    }
    
    Let me know if you need any more information.
    """
    
    result = await extract_json(
        text=sample_text,
        provider=Provider.OPENAI.value,
        model="gpt-3.5-turbo"
    )
    
    print(f"Success: {result.get('success', False)}")
    print(f"Model used: {result.get('model', 'unknown')}")
    print(f"Tokens: {result.get('tokens', {})}")
    print(f"Processing time: {result.get('processing_time', 0):.2f}s")
    
    # Pretty print the extracted data
    if result.get('data'):
        print("Extracted JSON:")
        print(json.dumps(result['data'], indent=2))
    else:
        print("Failed to extract JSON")
        print(f"Error: {result.get('error', 'unknown error')}")

async def test_extract_table():
    """Test the extract_table function with a simple table."""
    print("\n--- Testing extract_table ---")
    sample_text = """
    Here's a summary of our quarterly sales:
    
    | Product  | Q1 Sales | Q2 Sales |
    |----------|----------|----------|
    | Widget A | 1200     | 1350     |
    | Widget B | 850      | 940      |
    
    As you can see, Widget A performed best in Q2.
    """
    
    result = await extract_table(
        text=sample_text,
        return_formats=["json"],  # Just request json to keep it simple
        provider=Provider.OPENAI.value,
        model="gpt-3.5-turbo"
    )
    
    print(f"Success: {result.get('success', False)}")
    print(f"Model used: {result.get('model', 'unknown')}")
    print(f"Tokens: {result.get('tokens', {})}")
    print(f"Processing time: {result.get('processing_time', 0):.2f}s")
    
    # Print the extracted data
    if result.get('data'):
        print("Extracted Table Data:")
        if isinstance(result['data'], dict) and "json" in result['data']:
            print("JSON Format:")
            print(json.dumps(result['data']["json"], indent=2))
        else:
            print(json.dumps(result['data'], indent=2))
    else:
        print("Failed to extract table")
        print(f"Error: {result.get('error', 'unknown error')}")
        if result.get('raw_text'):
            print(f"Raw text: {result.get('raw_text')[:200]}...")

async def test_extract_key_value_pairs():
    """Test the extract_key_value_pairs function."""
    print("\n--- Testing extract_key_value_pairs ---")
    sample_text = """
    Patient Information:
    
    Name: Jane Doe
    DOB: 05/12/1985
    Gender: Female
    Blood Type: O+
    Height: 5'6"
    Weight: 145 lbs
    Allergies: Penicillin, Shellfish
    Primary Care Physician: Dr. Robert Chen
    """
    
    result = await extract_key_value_pairs(
        text=sample_text,
        provider=Provider.OPENAI.value,
        model="gpt-3.5-turbo"
    )
    
    print(f"Success: {result.get('success', False)}")
    print(f"Model used: {result.get('model', 'unknown')}")
    print(f"Tokens: {result.get('tokens', {})}")
    print(f"Processing time: {result.get('processing_time', 0):.2f}s")
    
    # Print the extracted data
    if result.get('data'):
        print("Extracted Key-Value Pairs:")
        for key, value in result['data'].items():
            print(f"  {key}: {value}")
    else:
        print("Failed to extract key-value pairs")
        print(f"Error: {result.get('error', 'unknown error')}")

async def main():
    """Run all tests."""
    print("Testing extraction tools with standardized completion...")
    
    await test_extract_json()
    await test_extract_table()
    await test_extract_key_value_pairs()
    
    print("\nAll tests completed.")

if __name__ == "__main__":
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/examples/sample/text_classification_samples/email_classification.txt:
--------------------------------------------------------------------------------

```
SPAM EMAIL:
CONGRATULATIONS! You have been selected as the LUCKY WINNER of our INTERNATIONAL LOTTERY! Your email was randomly chosen from our database and you have won $5,000,000.00 USD (FIVE MILLION UNITED STATES DOLLARS). To claim your prize, please contact our claims agent immediately at [email protected] with your full name, address, phone number, and a copy of your ID. A processing fee of $199 is required to release your funds. DO NOT DELAY! This offer expires in 48 HOURS!

URGENT EMAIL:
Subject: Critical Security Breach - Immediate Action Required

Dear IT Team,

Our monitoring systems have detected an unauthorized access attempt to our main customer database at 3:42 AM EST. The attempt originated from an IP address in Eastern Europe and appears to have successfully extracted approximately 25,000 customer records including names, email addresses, and hashed passwords. Our security team has temporarily shut down external access to the affected systems.

Please implement the emergency response protocol immediately:
1. Activate the incident response team
2. Reset all administrative credentials
3. Deploy the prepared statement to affected customers
4. Begin forensic analysis of the breach vector

This is classified as a Severity 1 incident requiring immediate attention.

David Chen
Chief Information Security Officer

PROMOTIONAL EMAIL:
Subject: Summer Sale - 48 Hours Only! Up to 70% Off Everything

Beat the heat with sizzling savings! 🔥

Our biggest sale of the season is HERE! For just 48 hours, enjoy:
• Up to 70% off ALL clothing and accessories
• Buy one, get one 50% off on summer essentials
• Free shipping on orders over $50
• Extra 15% off with code: SUMMER24

Plus, the first 500 orders receive a FREE beach tote (valued at $45)!

Don't miss out - sale ends Sunday at midnight.

Shop now: https://www.fashionretailer.com/summer-sale

INFORMATIONAL EMAIL:
Subject: Upcoming System Maintenance - May 15th

Dear Valued Customer,

Please be informed that we will be conducting scheduled maintenance on our systems to improve performance and reliability. During this time, our services will be temporarily unavailable.

Maintenance details:
• Date: Tuesday, May 15th, 2024
• Time: 2:00 AM - 5:00 AM EDT (UTC-4)
• Affected services: Online banking portal, mobile app, and automated phone system

No action is required on your part. All services will resume automatically once maintenance is complete. We recommend completing any urgent transactions before the maintenance window begins.

We apologize for any inconvenience this may cause and appreciate your understanding as we work to enhance your experience.

Sincerely,
Customer Support Team
First National Bank

PHISHING EMAIL:
Subject: Your Amazon Account Has Been Suspended

Dear Valued Customer,

We regret to inform you that your Amazon account has been temporarily suspended due to unusual activity. Our security system has detected multiple failed login attempts from unrecognized devices.

To verify your identity and restore your account access, please update your payment information by clicking the link below:

>> Restore Account Access Now <<

If you do not verify your account within 24 hours, your account will be permanently deactivated and all pending orders will be canceled.

Thank you for your immediate attention to this matter.

Amazon Customer Service Team

PERSONAL EMAIL:
Subject: Vacation Plans for Next Month

Hi Sarah,

How are you doing? I hope everything's going well with the new job! I've been thinking about our conversation last month about taking a short vacation together, and I wanted to follow up.

I checked some options for that beach town we talked about, and there are some great deals for the weekend of the 15th. I found a cute rental cottage about two blocks from the beach for $180/night, which seems reasonable if we split it. The weather should be perfect that time of year too.

Let me know if you're still interested and if those dates work for you. I could book it this week to secure the place before summer rates kick in.

Can't wait to catch up properly!

Talk soon,
Michael

TRANSACTIONAL EMAIL:
Subject: Order #78291 Confirmation - Your Purchase from TechGadgets

Dear Alex Rodriguez,

Thank you for your recent purchase from TechGadgets. We're processing your order and will ship it soon.

Order Details:
• Order Number: #78291
• Order Date: April 3, 2024
• Payment Method: Visa ending in 4872
• Shipping Method: Standard (3-5 business days)

Items Purchased:
1. Wireless Earbuds Pro - Black (1) - $129.99
2. Fast Charging Cable 6ft (2) - $19.99 each
3. Screen Protector Ultra (1) - $24.99

Subtotal: $194.96
Shipping: $5.99
Tax: $16.57
Total: $217.52

You will receive a shipping confirmation email with tracking information once your order ships. You can also check your order status anytime by logging into your account.

If you have any questions about your order, please contact our customer service team at [email protected] or call 1-800-555-1234.

Thank you for shopping with us!

The TechGadgets Team 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "ultimate_mcp_server"
version = "0.1.0"
description = "The Ultimate Model Context Protocol (MCP) Server, providing unified access to a wide variety of useful and powerful tools."
readme = "README.md"
requires-python = ">=3.13"
license = {file = "LICENSE"}
authors = [
    {name = "Jeffrey Emanuel", email = "[email protected]"},
]
maintainers = [
    {name = "Jeffrey Emanuel", email = "[email protected]"},
]
keywords = ["ultimte", "mcp", "server", "agent", "ai", "claude", "gpt", "gemini", "deepseek"]
classifiers = [
    "Development Status :: 3 - Alpha",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3.13",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
]

dependencies = [
    # Core MCP and LLM providers
    "mcp>=0",
    "anthropic>=0",
    "openai>=0",
    "google-genai>=0",
    # Async utilities
    "httpx>=0",
    "aiofiles>=0",
    # Data processing
    "pydantic>=0",
    "tenacity>=0", # For retry logic
    # Caching and persistence
    "diskcache>=0", # Persistent disk cache
    "msgpack>=0", # Efficient serialization
    # Vector database for semantic caching
    "numpy>=0",
    "sentence-transformers>=0", # For embeddings
    "chromadb>=0", # Vector DB
    # Analytics and monitoring
    "prometheus-client>=0",
    "pandas>=0",
    "rich>=0", # Console output formatting
    # Templating for prompt management
    "jinja2>=0",
    # Multi-modal support
    "pillow>=0", # Image processing
    # Utilities
    "python-slugify>=0", # For URL-friendly strings
    "colorama>=0", # Terminal colors
    "tqdm>=0", # Progress bars
    "tiktoken>=0", # Token counting
    "python-decouple>=0", # .env management
    "pydantic-settings>=0",
    "jsonschema>=0",
    "matplotlib>=0",
    "marqo>=0", # Added for Marqo search tool
    "pytest-playwright>=0", # For web browser automation
    "sqlalchemy>=0", # For SQL database interactions
    "aiosqlite>=0", # Async SQLite database access
    "pyvis>=0", # Graph visualization
    "python-docx>=0", # MS Word DOCX support
    "opencv-python>=0", # For OCR tools
    "pytesseract>=0", # For OCR
    "pdf2image>=0", # For OCR
    "PyPDF2>=0", # PDF conversion
    "pdfplumber>=0", # For OCR
    "fitz>=0", # For OCR
    "pymupdf>=0", # For OCR
    "beautifulsoup4>=0", # Dealing with HTML
    "xmldiff>=0", # for redlines
    "lxml>=0", # XML parser
    "faster-whisper>=0", # Audio transcripts
    "html2text>=0",
    "readability-lxml>=0",
    "markdownify>=0",
    "trafilatura>=0",
    "markdown>=0",
    "jsonpatch>=0",
    "jsonpointer>=0",
    "pygments>=0",
    "typer>=0", # For CLI interface
    "docling>=0", # For document conversion
    "aiohttp>=0",
    "boto3>=0", # For AWS secrets management
    "hvac>=0", # For HashiVault pw management
    "pandera>=0", # Data validation
    "rapidfuzz>=0",
    "magika>=0",
    "tabula-py>=0",
    "brotli>=0",
    "pygments>=0",
    "fastapi>=0.115.9",
    "uvicorn>=0.34.2",
    "networkx>0",
    "scipy>0",
    "fastmcp>0",

]

[project.optional-dependencies]
advanced = [
  "torch",
  "torchvision",
  "torchaudio",
  "pytorch-triton",
  "transformers>=0",
  "accelerate>=0",
]

#excel_automation = [
#    "win32com", # Excel automation,
#    "win32com",
#]

# Development and testing
dev = [
    "pytest>=0",
    "pytest-asyncio>=0",
    "pytest-cov>=0",
    "isort>=0",
    "mypy>=0",
    "ruff>=0",
    "types-aiofiles>=0",
    "pre-commit>=0",
]

# Documentation
docs = [
    "mkdocs>=0",
    "mkdocs-material>=0",
    "mkdocstrings>=0",
    "mkdocstrings-python>=0",
]

# All extras
all = ["ultimate_mcp_server[advanced,dev,docs]"]

[[tool.uv.index]]
name = "pypi"
url  = "https://pypi.org/simple"

[tool.uv.pip]
prerelease = "allow"
torch-backend = "auto"

[project.urls]
Homepage = "https://github.com/Dicklesworthstone/ultimate_mcp_server"
Documentation = "https://github.com/Dicklesworthstone/ultimate_mcp_server/docs"
Repository = "https://github.com/Dicklesworthstone/ultimate_mcp_server.git"
"Bug Reports" = "https://github.com/Dicklesworthstone/ultimate_mcp_server/issues"

[project.scripts]
umcp = "ultimate_mcp_server.cli.typer_cli:cli"

[tool.hatch.version]
path = "ultimate_mcp_server/__init__.py"

[tool.hatch.build.targets.sdist]
include = [
    "/ultimate_mcp_server",
    "/examples",
    "/tests",
    "LICENSE",
    "README.md",
    "pyproject.toml",
]

[tool.hatch.build.targets.wheel]
packages = ["ultimate_mcp_server"]

[tool.black]
line-length = 100
target-version = ["py313"]
include = '\.pyi?$'

[tool.isort]
profile = "black"
line_length = 100
multi_line_output = 3

[tool.mypy]
python_version = "3.13"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
strict_optional = true

[tool.pytest.ini_options]
minversion = "7.0"
addopts = "--cov=ultimate_mcp_server --cov-report=term-missing -v"
testpaths = ["tests"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

[tool.ruff]
line-length = 100
target-version = "py313"

[tool.ruff.lint]
select = ["E", "F", "B", "I", "Q"]
ignore = ["E203", "E501", "Q000"]

```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/cache/persistence.py:
--------------------------------------------------------------------------------

```python
"""Cache persistence mechanisms."""
import json
import os
import pickle
from pathlib import Path
from typing import Any, Dict, Optional

import aiofiles

from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


class CachePersistence:
    """Handles cache persistence operations."""
    
    def __init__(self, cache_dir: Path):
        """Initialize the cache persistence handler.
        
        Args:
            cache_dir: Directory for cache storage
        """
        self.cache_dir = cache_dir
        self.cache_file = cache_dir / "cache.pkl"
        self.metadata_file = cache_dir / "metadata.json"
        
        # Create cache directory if it doesn't exist
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        
    async def save_cache(self, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> bool:
        """Save cache data to disk.
        
        Args:
            data: Cache data to save
            metadata: Optional metadata about the cache
            
        Returns:
            True if successful
        """
        try:
            # Save cache data
            temp_file = f"{self.cache_file}.tmp"
            async with aiofiles.open(temp_file, 'wb') as f:
                await f.write(pickle.dumps(data))
                
            # Rename temp file to cache file (atomic operation)
            os.replace(temp_file, self.cache_file)
            
            # Save metadata if provided
            if metadata:
                await self.save_metadata(metadata)
                
            logger.debug(
                f"Saved cache data to {self.cache_file}",
                emoji_key="cache"
            )
            return True
            
        except Exception as e:
            logger.error(
                f"Failed to save cache data: {str(e)}",
                emoji_key="error"
            )
            return False
            
    async def load_cache(self) -> Optional[Dict[str, Any]]:
        """Load cache data from disk.
        
        Returns:
            Cache data or None if file doesn't exist or error occurs
        """
        if not self.cache_file.exists():
            return None
            
        try:
            async with aiofiles.open(self.cache_file, 'rb') as f:
                data = await f.read()
                
            cache_data = pickle.loads(data)
            
            logger.debug(
                f"Loaded cache data from {self.cache_file}",
                emoji_key="cache"
            )
            return cache_data
            
        except Exception as e:
            logger.error(
                f"Failed to load cache data: {str(e)}",
                emoji_key="error"
            )
            return None
    
    async def save_metadata(self, metadata: Dict[str, Any]) -> bool:
        """Save cache metadata to disk.
        
        Args:
            metadata: Metadata to save
            
        Returns:
            True if successful
        """
        try:
            # Save metadata
            temp_file = f"{self.metadata_file}.tmp"
            async with aiofiles.open(temp_file, 'w') as f:
                await f.write(json.dumps(metadata, indent=2))
                
            # Rename temp file to metadata file (atomic operation)
            os.replace(temp_file, self.metadata_file)
            
            return True
            
        except Exception as e:
            logger.error(
                f"Failed to save cache metadata: {str(e)}",
                emoji_key="error"
            )
            return False
            
    async def load_metadata(self) -> Optional[Dict[str, Any]]:
        """Load cache metadata from disk.
        
        Returns:
            Metadata or None if file doesn't exist or error occurs
        """
        if not self.metadata_file.exists():
            return None
            
        try:
            async with aiofiles.open(self.metadata_file, 'r') as f:
                data = await f.read()
                
            metadata = json.loads(data)
            
            return metadata
            
        except Exception as e:
            logger.error(
                f"Failed to load cache metadata: {str(e)}",
                emoji_key="error"
            )
            return None
            
    async def cleanup_old_cache_files(self, max_age_days: int = 30) -> int:
        """Clean up old cache files.
        
        Args:
            max_age_days: Maximum age of cache files in days
            
        Returns:
            Number of files deleted
        """
        import time
        
        now = time.time()
        max_age_seconds = max_age_days * 24 * 60 * 60
        
        deleted_count = 0
        
        try:
            # Find all cache files
            cache_files = list(self.cache_dir.glob("*.tmp"))
            
            # Delete old files
            for file_path in cache_files:
                mtime = file_path.stat().st_mtime
                age = now - mtime
                
                if age > max_age_seconds:
                    file_path.unlink()
                    deleted_count += 1
                    
            if deleted_count > 0:
                logger.info(
                    f"Cleaned up {deleted_count} old cache files",
                    emoji_key="cache"
                )
                
            return deleted_count
            
        except Exception as e:
            logger.error(
                f"Failed to clean up old cache files: {str(e)}",
                emoji_key="error"
            )
            return deleted_count
```

--------------------------------------------------------------------------------
/check_api_keys.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Script to check API key configurations for Ultimate MCP Server using rich formatting."""
import asyncio
import sys
from pathlib import Path

# Add project root to path for imports
sys.path.insert(0, str(Path(__file__).parent))

from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text

from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.utils import get_logger

# Initialize rich console
console = Console()

logger = get_logger("api_key_checker")

# Map provider names to the corresponding environment variable names
# Used for informational display only
PROVIDER_ENV_VAR_MAP = {
    "openai": "OPENAI_API_KEY",
    "anthropic": "ANTHROPIC_API_KEY",
    "deepseek": "DEEPSEEK_API_KEY",
    "gemini": "GEMINI_API_KEY",
    "openrouter": "OPENROUTER_API_KEY",
}

async def check_api_keys():
    """
    Check API key configurations and display a comprehensive report.
    
    This async function:
    1. Loads the current configuration settings from all sources (environment variables,
       .env file, configuration files)
    2. Initializes a minimal Gateway instance to access provider configurations
    3. Checks if API keys are properly configured for all supported providers
    4. Displays formatted results using rich tables and panels, including:
       - Provider-by-provider API key status
       - Configuration loading priority information
       - How to set API keys properly
       - Example .env file content
    
    The function checks keys for all providers defined in the Provider enum,
    including OpenAI, Anthropic, DeepSeek, Gemini, and OpenRouter.
    
    Returns:
        int: Exit code (0 for success)
    """
    # Force load config to ensure we get the latest resolved settings
    cfg = get_config()
    
    # Create Gateway with minimal initialization (no tools) - kept for potential future checks
    gateway = Gateway(name="api-key-checker", register_tools=False)  # noqa: F841
    
    console.print(Panel(
        "Checking API Key Configuration based on loaded settings",
        title="[bold cyan]Ultimate MCP Server API Key Check[/bold cyan]",
        expand=False,
        border_style="blue"
    ))
    
    # Create table for results
    table = Table(title="Provider API Key Status", show_header=True, header_style="bold magenta")
    table.add_column("Provider", style="dim", width=12)
    table.add_column("API Key Status", style="cyan")
    table.add_column("Relevant Env Var", style="yellow")
    table.add_column("Status", style="bold")
    
    # Check each provider based on the loaded configuration
    for provider_name in [p.value for p in Provider]:
        # Get provider config from the loaded GatewayConfig object
        provider_config = getattr(cfg.providers, provider_name, None)
        
        # Check if key exists in the loaded config
        # This key would have been resolved from .env, env vars, or config file by get_config()
        config_key = provider_config.api_key if provider_config else None
        
        # Format key for display (if present)
        key_display = Text("Not set in config", style="dim yellow")
        status_text = Text("NOT CONFIGURED", style="red")
        status_icon = "❌"
        
        if config_key:
            if len(config_key) > 8:
                key_display = Text(f"{config_key[:4]}...{config_key[-4:]}", style="green")
            else:
                key_display = Text("[INVALID KEY FORMAT]", style="bold red")
            status_text = Text("CONFIGURED", style="green")
            status_icon = "✅"
        
        # Get the corresponding environment variable name for informational purposes
        env_var_name = PROVIDER_ENV_VAR_MAP.get(provider_name, "N/A")
        
        # Add row to table
        table.add_row(
            provider_name.capitalize(),
            key_display,
            env_var_name,
            f"[{status_text.style}]{status_icon} {status_text}[/]"
        )
    
    # Print the table
    console.print(table)
    
    # Configuration Loading Info Panel
    config_info = Text.assemble(
        ("1. ", "bold blue"), ("Environment Variables", "cyan"), (" (e.g., ", "dim"), ("GATEWAY_PROVIDERS__OPENAI__API_KEY=...", "yellow"), (")\n", "dim"),
        ("2. ", "bold blue"), ("Values in a ", "cyan"), (".env", "yellow"), (" file in the project root\n", "cyan"),
        ("3. ", "bold blue"), ("Values in a config file", "cyan"), (" (e.g., ", "dim"), ("gateway_config.yaml", "yellow"), (")\n", "dim"),
        ("4. ", "bold blue"), ("Default values defined in the configuration models", "cyan")
    )
    console.print(Panel(config_info, title="[bold]Configuration Loading Priority[/]", border_style="blue"))
    
    # How to Set Keys Panel
    set_keys_info = Text.assemble(
        ("Ensure API keys are available via one of the methods above,\n", "white"),
        ("preferably using ", "white"), ("environment variables", "cyan"), (" or a ", "white"), (".env", "yellow"), (" file.", "white")
    )
    console.print(Panel(set_keys_info, title="[bold]How to Set API Keys[/]", border_style="green"))
    
    # Example .env Panel
    env_example_lines = []
    for env_var in PROVIDER_ENV_VAR_MAP.values():
        env_example_lines.append(Text.assemble((env_var, "yellow"), "=", ("your_", "dim"), (env_var.lower(), "dim cyan"), ("_here", "dim")))
    env_example_content = Text("\n").join(env_example_lines)
    console.print(Panel(env_example_content, title="[bold dim]Example .env file content[/]", border_style="yellow"))
    
    console.print("[bold green]Run your example scripts or the main server after setting the API keys.[/bold green]")
    return 0

if __name__ == "__main__":
    exit_code = asyncio.run(check_api_keys())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/prompts/repository.py:
--------------------------------------------------------------------------------

```python
"""Prompt repository for managing and accessing prompts."""
import asyncio
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import aiofiles

from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


class PromptRepository:
    """Repository for managing and accessing prompts."""
    
    _instance = None
    
    def __new__(cls, *args, **kwargs):
        """Singleton implementation for prompt repository."""
        if cls._instance is None:
            cls._instance = super(PromptRepository, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self, base_dir: Optional[Union[str, Path]] = None):
        """Initialize the prompt repository.
        
        Args:
            base_dir: Base directory for prompt storage
        """
        # Only initialize once for singleton
        if self._initialized:
            return
            
        # Set base directory
        if base_dir:
            self.base_dir = Path(base_dir)
        else:
            # Default to project directory / prompts
            self.base_dir = Path.home() / ".ultimate" / "prompts"
            
        # Create directory if it doesn't exist
        self.base_dir.mkdir(parents=True, exist_ok=True)
        
        # Cache for prompts
        self.prompts = {}
        
        # Flag as initialized
        self._initialized = True
        
        logger.info(
            f"Prompt repository initialized (base_dir: {self.base_dir})",
            emoji_key="provider"
        )
    
    async def get_prompt(self, prompt_id: str) -> Optional[Dict[str, Any]]:
        """Get a prompt by ID.
        
        Args:
            prompt_id: Prompt identifier
            
        Returns:
            Prompt data or None if not found
        """
        # Check cache first
        if prompt_id in self.prompts:
            return self.prompts[prompt_id]
            
        # Try to load from file
        prompt_path = self.base_dir / f"{prompt_id}.json"
        if not prompt_path.exists():
            logger.warning(
                f"Prompt '{prompt_id}' not found",
                emoji_key="warning"
            )
            return None
            
        try:
            # Load prompt from file
            async with asyncio.Lock():
                with open(prompt_path, "r", encoding="utf-8") as f:
                    prompt_data = json.load(f)
                    
            # Cache for future use
            self.prompts[prompt_id] = prompt_data
            
            return prompt_data
        except Exception as e:
            logger.error(
                f"Error loading prompt '{prompt_id}': {str(e)}",
                emoji_key="error"
            )
            return None
    
    async def save_prompt(self, prompt_id: str, prompt_data: Dict[str, Any]) -> bool:
        """Save a prompt.
        
        Args:
            prompt_id: Prompt identifier
            prompt_data: Prompt data to save
            
        Returns:
            True if successful
        """
        # Validate prompt data
        if not isinstance(prompt_data, dict) or "template" not in prompt_data:
            logger.error(
                f"Invalid prompt data for '{prompt_id}'",
                emoji_key="error"
            )
            return False
            
        try:
            # Save to cache
            self.prompts[prompt_id] = prompt_data
            
            # Save to file
            prompt_path = self.base_dir / f"{prompt_id}.json"
            async with asyncio.Lock():
                async with aiofiles.open(prompt_path, "w", encoding="utf-8") as f:
                    await f.write(json.dumps(prompt_data, indent=2))
                    
            logger.info(
                f"Saved prompt '{prompt_id}'",
                emoji_key="success"
            )
            return True
        except Exception as e:
            logger.error(
                f"Error saving prompt '{prompt_id}': {str(e)}",
                emoji_key="error"
            )
            return False
    
    async def delete_prompt(self, prompt_id: str) -> bool:
        """Delete a prompt.
        
        Args:
            prompt_id: Prompt identifier
            
        Returns:
            True if successful
        """
        # Remove from cache
        if prompt_id in self.prompts:
            del self.prompts[prompt_id]
            
        # Remove file if exists
        prompt_path = self.base_dir / f"{prompt_id}.json"
        if prompt_path.exists():
            try:
                os.remove(prompt_path)
                logger.info(
                    f"Deleted prompt '{prompt_id}'",
                    emoji_key="success"
                )
                return True
            except Exception as e:
                logger.error(
                    f"Error deleting prompt '{prompt_id}': {str(e)}",
                    emoji_key="error"
                )
                return False
        
        return False
    
    async def list_prompts(self) -> List[str]:
        """List available prompts.
        
        Returns:
            List of prompt IDs
        """
        try:
            # Get prompt files
            prompt_files = list(self.base_dir.glob("*.json"))
            
            # Extract IDs from filenames
            prompt_ids = [f.stem for f in prompt_files]
            
            return prompt_ids
        except Exception as e:
            logger.error(
                f"Error listing prompts: {str(e)}",
                emoji_key="error"
            )
            return []


def get_prompt_repository(base_dir: Optional[Union[str, Path]] = None) -> PromptRepository:
    """Get the prompt repository singleton instance.
    
    Args:
        base_dir: Base directory for prompt storage
        
    Returns:
        PromptRepository singleton instance
    """
    return PromptRepository(base_dir)
```

--------------------------------------------------------------------------------
/examples/simple_completion_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
Simple completion demo using Ultimate MCP Server's direct provider functionality.

This example demonstrates how to:
1. Initialize the Ultimate MCP Server Gateway
2. Connect directly to an LLM provider (OpenAI)
3. Generate a text completion with a specific model
4. Track and display token usage and costs

The demo bypasses the MCP tool interface and interacts directly with provider APIs,
which is useful for understanding the underlying provider connections or when you need
lower-level access to provider-specific features. It also showcases the CostTracker
utility for monitoring token usage and associated costs across multiple requests.

This script can be run as a standalone Python module and serves as a minimal example of
direct provider integration with the Ultimate MCP Server framework.

Usage:
    python examples/simple_completion_demo.py
"""
import asyncio
import sys
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker
from ultimate_mcp_server.utils.logging.console import console

# Initialize logger
logger = get_logger("example.simple_completion")

async def run_model_demo(tracker: CostTracker):
    """
    Run a simple completion demo using direct provider access to LLM APIs.
    
    This function demonstrates the complete workflow for generating text completions
    using the Ultimate MCP Server framework with direct provider access:
    
    1. Initialize a Gateway instance without registering tools
    2. Initialize the LLM providers from configuration
    3. Access a specific provider (OpenAI in this case)
    4. Generate a completion with a specific prompt and model
    5. Display the completion result with Rich formatting
    6. Track and display token usage and cost metrics
    
    Direct provider access (vs. using MCP tools) offers more control over provider-specific
    parameters and is useful for applications that need to customize provider interactions
    beyond what the standard MCP tools offer.
    
    Args:
        tracker: CostTracker instance to record token usage and costs for this operation.
                The tracker will be updated with the completion results.
        
    Returns:
        int: Exit code - 0 for success, 1 for failure
        
    Raises:
        Various exceptions may be raised by the provider initialization or completion
        generation process, but these are logged and contained within this function.
    """
    logger.info("Starting simple completion demo", emoji_key="start")
    # Use Rich Rule for title
    console.print(Rule("[bold blue]Simple Completion Demo[/bold blue]"))
    
    # Create Gateway instance
    gateway = Gateway("simple-demo", register_tools=False)
    
    # Initialize providers
    logger.info("Initializing providers", emoji_key="provider")
    await gateway._initialize_providers()
    
    # Get provider (OpenAI)
    provider_name = Provider.OPENAI.value
    provider = gateway.providers.get(provider_name)
    
    if not provider:
        logger.error(f"Provider {provider_name} not available", emoji_key="error")
        return 1
        
    logger.success(f"Provider {provider_name} initialized", emoji_key="success")
    
    # List available models
    models = await provider.list_models()
    logger.info(f"Available models: {len(models)}", emoji_key="model")
    
    # Pick a valid model from the provider
    model = "gpt-4.1-mini"  # A valid model from constants.py
    
    # Generate a completion
    prompt = "Explain quantum computing in simple terms."
    
    logger.info(f"Generating completion with {model}", emoji_key="processing")
    result = await provider.generate_completion(
        prompt=prompt,
        model=model,
        temperature=0.7,
        max_tokens=150
    )
    
    # Print the result using Rich Panel
    logger.success("Completion generated successfully!", emoji_key="success")
    console.print(Panel(
        result.text.strip(),
        title=f"Quantum Computing Explanation ({model})",
        subtitle=f"Prompt: {prompt}",
        border_style="green",
        expand=False
    ))
    
    # Print stats using Rich Table
    stats_table = Table(title="Completion Stats", show_header=False, box=None)
    stats_table.add_column("Metric", style="cyan")
    stats_table.add_column("Value", style="white")
    stats_table.add_row("Input Tokens", str(result.input_tokens))
    stats_table.add_row("Output Tokens", str(result.output_tokens))
    stats_table.add_row("Cost", f"${result.cost:.6f}")
    stats_table.add_row("Processing Time", f"{result.processing_time:.2f}s")
    console.print(stats_table)

    # Track the call
    tracker.add_call(result)

    # Display cost summary
    tracker.display_summary(console)

    return 0

async def main():
    """
    Entry point function that sets up the demo environment and error handling.
    
    This function:
    1. Creates a CostTracker instance to monitor token usage and costs
    2. Calls the run_model_demo function within a try-except block
    3. Handles and logs any uncaught exceptions
    4. Returns an appropriate exit code based on execution success/failure
    
    The separation between main() and run_model_demo() allows for clean error handling
    and resource management at the top level while keeping the demo logic organized
    in its own function.
    
    Returns:
        int: Exit code - 0 for success, 1 for failure
    """
    tracker = CostTracker()
    try:
        return await run_model_demo(tracker)
    except Exception as e:
        logger.critical(f"Demo failed: {str(e)}", emoji_key="critical")
        return 1

if __name__ == "__main__":
    # Run the demo
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/examples/sample/medical_case.txt:
--------------------------------------------------------------------------------

```
PATIENT MEDICAL RECORD
Memorial Hospital Medical Center
123 Medical Center Blvd, Boston, MA 02118

CONFIDENTIAL - FOR MEDICAL PERSONNEL ONLY

Patient ID: MH-459872
Date of Admission: April 10, 2025
Attending Physician: Dr. Elizabeth Chen, MD (Cardiology)
Consulting Physicians: Dr. Robert Martinez, MD (Neurology), Dr. Sarah Williams, MD (Endocrinology)

PATIENT INFORMATION
Name: John Anderson
DOB: 05/22/1968 (57 years old)
Gender: Male
Address: 45 Maple Street, Apt 3B, Cambridge, MA 02139
Contact: (617) 555-3829
Emergency Contact: Mary Anderson (Wife) - (617) 555-4912
Insurance: BlueCross BlueShield, Policy #BCB-88765432

CHIEF COMPLAINT
Patient presented to the Emergency Department with acute chest pain, shortness of breath, and left arm numbness beginning approximately 2 hours prior to arrival.

CURRENT MEDICATIONS
1. Metformin 1000mg twice daily (for Type 2 Diabetes, prescribed by Dr. Williams in 2019)
2. Atorvastatin 40mg daily (for Hypercholesterolemia, prescribed by Dr. Chen in 2022)
3. Lisinopril 20mg daily (for Hypertension, prescribed by Dr. Chen in 2022)
4. Aspirin 81mg daily (for cardiovascular health, prescribed by Dr. Chen in 2022)
5. Sertraline 50mg daily (for Depression, prescribed by Dr. Thomas Gordon in 2023)

ALLERGIES
1. Penicillin (Severe - Hives, Difficulty Breathing)
2. Shellfish (Moderate - Gastrointestinal distress)

PAST MEDICAL HISTORY
1. Type 2 Diabetes Mellitus (Diagnosed 2019 by Dr. Williams)
2. Hypertension (Diagnosed 2020 by Dr. Chen)
3. Hypercholesterolemia (Diagnosed 2020 by Dr. Chen)
4. Depression (Diagnosed 2023 by Dr. Gordon)
5. Left knee arthroscopy (2015, Boston Orthopedic Center, Dr. James Miller)

FAMILY HISTORY
- Father: Deceased at age 68 from myocardial infarction, had hypertension, type 2 diabetes
- Mother: Living, age 82, has hypertension, osteoarthritis
- Brother: Age 60, has type 2 diabetes, hypercholesterolemia
- Sister: Age 55, no known medical conditions

SOCIAL HISTORY
Occupation: High school mathematics teacher
Tobacco: Former smoker, quit in 2018 (25 pack-year history)
Alcohol: Occasional (1-2 drinks per week)
Exercise: Walks 20 minutes, 3 times per week
Diet: Reports following a "mostly" diabetic diet with occasional non-compliance

PHYSICAL EXAMINATION
Vital Signs:
- BP: 165/95 mmHg
- HR: 95 bpm
- RR: 22 breaths/min
- Temp: 98.6°F (37°C)
- O2 Saturation: 94% on room air

General: Patient is alert but anxious, in moderate distress
Cardiovascular: Irregular rhythm, tachycardia, S3 gallop present, no murmurs
Respiratory: Bibasilar crackles, decreased breath sounds at bases bilaterally
Neurological: Left arm weakness (4/5 strength), otherwise grossly intact
Extremities: No edema, normal peripheral pulses

DIAGNOSTIC STUDIES
Laboratory:
- Troponin I: 2.3 ng/mL (elevated)
- CK-MB: 12.5 ng/mL (elevated)
- BNP: 450 pg/mL (elevated)
- Complete Blood Count: WBC 12,000/μL, Hgb 13.5 g/dL, Plt 230,000/μL
- Complete Metabolic Panel: Glucose 185 mg/dL, Cr 1.1 mg/dL, BUN 22 mg/dL
- Lipid Panel: Total Chol 210 mg/dL, LDL 130 mg/dL, HDL 35 mg/dL, TG 190 mg/dL
- HbA1c: 7.8%

Imaging and Other Studies:
- ECG: ST-segment elevation in leads II, III, aVF; reciprocal changes in I, aVL
- Chest X-ray: Mild pulmonary edema, cardiomegaly
- Echocardiogram: EF 40%, inferior wall hypokinesis, moderate mitral regurgitation
- Cardiac Catheterization: 90% occlusion of right coronary artery, 70% occlusion of left circumflex artery

DIAGNOSIS
1. Acute ST-elevation Myocardial Infarction (STEMI), inferior wall
2. Coronary Artery Disease, multivessel
3. Congestive Heart Failure, acute onset (NYHA Class III)
4. Type 2 Diabetes Mellitus, inadequately controlled
5. Essential Hypertension, inadequately controlled
6. Hyperlipidemia

TREATMENT
Procedures:
1. Emergency Percutaneous Coronary Intervention (PCI) with drug-eluting stent placement in right coronary artery by Dr. Michael Wilson on April 10, 2025
2. Scheduled PCI for left circumflex artery by Dr. Wilson on April 13, 2025

Medications:
1. Aspirin 325mg daily
2. Clopidogrel 75mg daily
3. Metoprolol succinate 50mg daily
4. Lisinopril 40mg daily (increased from 20mg)
5. Atorvastatin 80mg daily (increased from 40mg)
6. Furosemide 40mg twice daily
7. Metformin continued at 1000mg twice daily
8. Insulin glargine 20 units at bedtime (new)
9. Sertraline continued at 50mg daily

HOSPITAL COURSE
Patient was admitted through the Emergency Department and taken emergently to the cardiac catheterization lab where he underwent successful PCI with stent placement to the right coronary artery. Post-procedure, the patient was transferred to the Cardiac Care Unit (CCU) for close monitoring. Patient experienced a brief episode of ventricular fibrillation on the first night, which was successfully treated with defibrillation. Cardiology and endocrinology were consulted for management of heart failure and diabetes. Follow-up echocardiogram on April 12 showed improvement in EF to 45%. Patient underwent scheduled PCI of the left circumflex artery on April 13 without complications.

DISCHARGE PLAN
Discharge Date: April 16, 2025
Discharge Disposition: Home with scheduled home health visits from Memorial Home Health Services

Follow-up Appointments:
1. Dr. Elizabeth Chen (Cardiology) - April 23, 2025 at 10:00 AM
2. Dr. Sarah Williams (Endocrinology) - April 25, 2025 at 2:30 PM
3. Cardiac Rehabilitation evaluation - April 30, 2025 at 1:00 PM

Patient Education:
1. STEMI and coronary artery disease management
2. Diabetes self-management and glucometer use
3. Heart-healthy diet (consultation with nutritionist completed)
4. Medication management and adherence
5. Warning signs requiring immediate medical attention

PROGNOSIS
Guarded. Patient has significant coronary artery disease with reduced ejection fraction. Long-term prognosis will depend on medication adherence, lifestyle modifications, and management of comorbidities.

ATTESTATION
I have personally examined the patient and reviewed all diagnostic studies. This documentation is complete and accurate to the best of my knowledge.

Electronically signed by:
Elizabeth Chen, MD
Cardiology
Memorial Hospital Medical Center
Date: April 16, 2025 | Time: 14:35 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/graceful_shutdown.py:
--------------------------------------------------------------------------------

```python
"""
Graceful shutdown utilities for Ultimate MCP Server.

This module provides utilities to handle signals and gracefully terminate
the application with ZERO error outputs during shutdown using OS-level redirection.
"""

import asyncio
import logging
import os
import signal
import sys
import warnings
from contextlib import suppress
from typing import Callable, List, Optional

logger = logging.getLogger("ultimate_mcp_server.shutdown")

# Track registered shutdown handlers and state
_shutdown_handlers: List[Callable] = []
_shutdown_in_progress = False
_original_stderr_fd = None
_devnull_fd = None


def _redirect_stderr_to_devnull():
    """Redirect stderr to /dev/null at the OS level"""
    global _original_stderr_fd, _devnull_fd
    
    try:
        if _original_stderr_fd is None:
            # Save original stderr file descriptor
            _original_stderr_fd = os.dup(sys.stderr.fileno())
            
            # Open /dev/null
            _devnull_fd = os.open(os.devnull, os.O_WRONLY)
            
            # Redirect stderr to /dev/null
            os.dup2(_devnull_fd, sys.stderr.fileno())
            
    except Exception:
        # If redirection fails, just continue
        pass


def _restore_stderr():
    """Restore original stderr"""
    global _original_stderr_fd, _devnull_fd
    
    try:
        if _original_stderr_fd is not None:
            os.dup2(_original_stderr_fd, sys.stderr.fileno())
            os.close(_original_stderr_fd)
            _original_stderr_fd = None
            
        if _devnull_fd is not None:
            os.close(_devnull_fd)
            _devnull_fd = None
            
    except Exception:
        pass


def register_shutdown_handler(handler: Callable) -> None:
    """Register a function to be called during graceful shutdown."""
    if handler not in _shutdown_handlers:
        _shutdown_handlers.append(handler)


def remove_shutdown_handler(handler: Callable) -> None:
    """Remove a previously registered shutdown handler."""
    if handler in _shutdown_handlers:
        _shutdown_handlers.remove(handler)


async def _execute_shutdown_handlers():
    """Execute all registered shutdown handlers with complete error suppression"""
    for handler in _shutdown_handlers:
        with suppress(Exception):  # Suppress ALL exceptions
            if asyncio.iscoroutinefunction(handler):
                with suppress(asyncio.TimeoutError, asyncio.CancelledError):
                    await asyncio.wait_for(handler(), timeout=3.0)
            else:
                handler()


def _handle_shutdown_signal(signum, frame):
    """Handle shutdown signals - IMMEDIATE TERMINATION"""
    global _shutdown_in_progress
    
    if _shutdown_in_progress:
        # Force immediate exit on second signal
        os._exit(1)
        return
        
    _shutdown_in_progress = True
    
    # Print final message to original stderr if possible
    try:
        if _original_stderr_fd:
            os.write(_original_stderr_fd, b"\n[Graceful Shutdown] Signal received. Exiting...\n")
        else:
            print("\n[Graceful Shutdown] Signal received. Exiting...", file=sys.__stderr__)
    except Exception:
        pass
    
    # Immediately redirect stderr to suppress any error output
    _redirect_stderr_to_devnull()
    
    # Suppress all warnings
    warnings.filterwarnings("ignore")
    
    # Try to run shutdown handlers quickly, but don't wait long
    try:
        loop = asyncio.get_running_loop()
        # Create a task but don't wait for it - just exit
        asyncio.create_task(_execute_shutdown_handlers())
        # Give it a tiny bit of time then exit
        loop.call_later(0.5, lambda: os._exit(0))
    except RuntimeError:
        # No running loop - just exit immediately
        os._exit(0)


def setup_signal_handlers(loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
    """Set up signal handlers for immediate shutdown"""
    # Use traditional signal handlers for immediate termination
    signal.signal(signal.SIGINT, _handle_shutdown_signal)
    signal.signal(signal.SIGTERM, _handle_shutdown_signal)
    
    # Also try to set up async handlers if we have a loop
    if loop is not None:
        try:
            for sig in [signal.SIGINT, signal.SIGTERM]:
                try:
                    loop.add_signal_handler(sig, lambda s=sig: _handle_shutdown_signal(s, None))
                except (NotImplementedError, OSError):
                    # Platform doesn't support async signal handlers
                    pass
        except Exception:
            # Fallback is already set up with signal.signal above
            pass


def enable_quiet_shutdown():
    """Enable comprehensive quiet shutdown - immediate termination approach"""
    # Set up signal handlers immediately
    setup_signal_handlers()
    
    # Suppress asyncio debug mode
    try:
        asyncio.get_event_loop().set_debug(False)
    except RuntimeError:
        pass
    
    # Suppress warnings
    warnings.filterwarnings("ignore")


def force_silent_exit():
    """Force immediate silent exit with no output whatsoever"""
    global _shutdown_in_progress
    _shutdown_in_progress = True
    _redirect_stderr_to_devnull()
    os._exit(0)


class QuietUvicornServer:
    """Custom Uvicorn server that overrides signal handling for quiet shutdown"""
    
    def __init__(self, config):
        import uvicorn
        self.config = config
        self.server = uvicorn.Server(config)
        
    def install_signal_handlers(self):
        """Override uvicorn's signal handlers with our quiet ones"""
        # Set up our own signal handlers instead of uvicorn's
        setup_signal_handlers()
        
    def run(self):
        """Run the server with custom signal handling"""
        # Patch the server's install_signal_handlers method
        self.server.install_signal_handlers = self.install_signal_handlers
        
        # Set up our signal handlers immediately
        setup_signal_handlers()
        
        # Run the server
        self.server.run()


def create_quiet_server(config):
    """Create a uvicorn server with quiet shutdown handling"""
    return QuietUvicornServer(config) 
```

--------------------------------------------------------------------------------
/model_preferences.py:
--------------------------------------------------------------------------------

```python
"""
Model preferences for MCP servers.

This module implements the ModelPreferences capability from the MCP protocol,
allowing servers to express preferences for model selection during sampling.
"""
from typing import List, Optional


class ModelHint:
    """
    Hint for model selection.
    
    Model hints allow the server to suggest specific models or model families
    that would be appropriate for a given task.
    """
    
    def __init__(self, name: str):
        """
        Initialize a model hint.
        
        Args:
            name: A hint for a model name (e.g., 'claude-3-5-sonnet', 'sonnet', 'claude').
                 This should be treated as a substring matching.
        """
        self.name = name
        
    def to_dict(self) -> dict:
        """Convert model hint to dictionary."""
        return {"name": self.name}


class ModelPreferences:
    """
    Preferences for model selection to guide LLM client decisions.
    
    The ModelPreferences class provides a standardized way for servers to express 
    prioritization along three key dimensions (intelligence, speed, cost) that can 
    help clients make more informed decisions when selecting LLM models for specific tasks.
    
    These preferences serve as advisory hints that help optimize the tradeoffs between:
    - Intelligence/capability: Higher quality, more capable models (but often slower/costlier)
    - Speed: Faster response time and lower latency (but potentially less capable)
    - Cost: Lower token or API costs (but potentially less capable or slower)
    
    The class also supports model-specific hints that can recommend particular models
    or model families that are well-suited for specific tasks (e.g., suggesting Claude
    models for creative writing or GPT-4V for image analysis).
    
    All preferences are expressed with normalized values between 0.0 (lowest priority) 
    and 1.0 (highest priority) to allow for consistent interpretation across different
    implementations.
    
    Note: These preferences are always advisory. Clients may use them as guidance but
    are not obligated to follow them, particularly if there are overriding user preferences
    or system constraints.
    
    Usage example:
        ```python
        # For a coding task requiring high intelligence but where cost is a major concern
        preferences = ModelPreferences(
            intelligence_priority=0.8,  # High priority on capability
            speed_priority=0.4,         # Moderate priority on speed
            cost_priority=0.7,          # High priority on cost
            hints=[ModelHint("gpt-4-turbo")]  # Specific model recommendation
        )
        ```
    """
    
    def __init__(
        self,
        intelligence_priority: float = 0.5,
        speed_priority: float = 0.5,
        cost_priority: float = 0.5,
        hints: Optional[List[ModelHint]] = None
    ):
        """
        Initialize model preferences.
        
        Args:
            intelligence_priority: How much to prioritize intelligence/capabilities (0.0-1.0).
                Higher values favor more capable, sophisticated models that may produce
                higher quality outputs, handle complex tasks, or follow instructions better.
                Default: 0.5 (balanced)
            speed_priority: How much to prioritize sampling speed/latency (0.0-1.0).
                Higher values favor faster models with lower latency, which is important
                for real-time applications, interactive experiences, or time-sensitive tasks.
                Default: 0.5 (balanced)
            cost_priority: How much to prioritize cost efficiency (0.0-1.0).
                Higher values favor more economical models with lower token or API costs,
                which is important for budget-constrained applications or high-volume usage.
                Default: 0.5 (balanced)
            hints: Optional model hints in preference order. These can suggest specific
                models or model families that would be appropriate for the task.
                The list should be ordered by preference (most preferred first).
        """
        # Clamp values between 0 and 1
        self.intelligence_priority = max(0.0, min(1.0, intelligence_priority))
        self.speed_priority = max(0.0, min(1.0, speed_priority))
        self.cost_priority = max(0.0, min(1.0, cost_priority))
        self.hints = hints or []
        
    def to_dict(self) -> dict:
        """Convert model preferences to dictionary."""
        return {
            "intelligencePriority": self.intelligence_priority,
            "speedPriority": self.speed_priority,
            "costPriority": self.cost_priority,
            "hints": [hint.to_dict() for hint in self.hints]
        }


# Pre-defined preference templates for common use cases

# Default balanced preference profile - no strong bias in any direction
# Use when there's no clear priority between intelligence, speed, and cost
# Good for general-purpose applications where trade-offs are acceptable
BALANCED_PREFERENCES = ModelPreferences(
    intelligence_priority=0.5,
    speed_priority=0.5,
    cost_priority=0.5
)

# Prioritizes high-quality, sophisticated model responses
# Use for complex reasoning, creative tasks, or critical applications
# where accuracy and capability matter more than speed or cost
INTELLIGENCE_FOCUSED = ModelPreferences(
    intelligence_priority=0.9,
    speed_priority=0.3,
    cost_priority=0.3,
    hints=[ModelHint("claude-3-5-opus")]
)

# Prioritizes response speed and low latency
# Use for real-time applications, interactive experiences, 
# chatbots, or any use case where user wait time is critical
SPEED_FOCUSED = ModelPreferences(
    intelligence_priority=0.3,
    speed_priority=0.9,
    cost_priority=0.5,
    hints=[ModelHint("claude-3-haiku"), ModelHint("gemini-flash")]
)

# Prioritizes cost efficiency and token economy
# Use for high-volume applications, background processing,
# or when operating under strict budget constraints
COST_FOCUSED = ModelPreferences(
    intelligence_priority=0.3,
    speed_priority=0.5,
    cost_priority=0.9,
    hints=[ModelHint("mistral"), ModelHint("gemini-flash")]
) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/themes.py:
--------------------------------------------------------------------------------

```python
"""
Color themes for Gateway logging system.

This module defines color schemes for different log levels, operations, and components
to provide visual consistency and improve readability of log output.
"""
from typing import Optional, Tuple

from rich.style import Style
from rich.theme import Theme

COLORS = {
    # Main colors
    "primary": "bright_blue",
    "secondary": "cyan",
    "accent": "magenta",
    "success": "green",
    "warning": "yellow",
    "error": "red",
    "critical": "bright_red",
    "info": "bright_blue",
    "debug": "bright_black",
    "trace": "bright_black",
    
    # Component-specific colors (Adapt as needed for ultimate)
    "core": "blue", 
    "provider": "cyan", # Example: Renamed 'composite' to 'provider'
    "router": "green", # Example: Renamed 'analysis' to 'router'
    "cache": "bright_magenta",
    "api": "bright_yellow",
    "mcp": "bright_blue", # Kept if relevant
    "utils": "magenta", # Example: Added 'utils'
    "default_component": "bright_cyan", # Fallback component color
    
    # Tool-specific colors (Keep or remove as needed)
    "ripgrep": "blue",
    "awk": "green",
    "jq": "yellow",
    "sqlite": "magenta",
    
    # Result/Status colors
    "high_confidence": "green",
    "medium_confidence": "yellow",
    "low_confidence": "red",
    
    # Misc
    "muted": "bright_black",
    "highlight": "bright_white",
    "timestamp": "bright_black",
    "path": "bright_blue",
    "code": "bright_cyan",
    "data": "bright_yellow",
    "data.key": "bright_black", # Added for context tables
}

STYLES = {
    # Base styles for log levels
    "info": Style(color=COLORS["info"]),
    "debug": Style(color=COLORS["debug"]),
    "warning": Style(color=COLORS["warning"], bold=True),
    "error": Style(color=COLORS["error"], bold=True),
    "critical": Style(color=COLORS["critical"], bold=True, reverse=True),
    "success": Style(color=COLORS["success"], bold=True),
    "trace": Style(color=COLORS["trace"], dim=True),
    
    # Component styles (Matching adapted COLORS)
    "core": Style(color=COLORS["core"], bold=True),
    "provider": Style(color=COLORS["provider"], bold=True),
    "router": Style(color=COLORS["router"], bold=True),
    "cache": Style(color=COLORS["cache"], bold=True),
    "api": Style(color=COLORS["api"], bold=True),
    "mcp": Style(color=COLORS["mcp"], bold=True),
    "utils": Style(color=COLORS["utils"], bold=True),
    "default_component": Style(color=COLORS["default_component"], bold=True),
    
    # Operation styles
    "operation": Style(color=COLORS["accent"], bold=True),
    "startup": Style(color=COLORS["success"], bold=True),
    "shutdown": Style(color=COLORS["error"], bold=True),
    "request": Style(color=COLORS["primary"], bold=True),
    "response": Style(color=COLORS["secondary"], bold=True),
    
    # Confidence level styles
    "high_confidence": Style(color=COLORS["high_confidence"], bold=True),
    "medium_confidence": Style(color=COLORS["medium_confidence"], bold=True),
    "low_confidence": Style(color=COLORS["low_confidence"], bold=True),
    
    # Misc styles
    "timestamp": Style(color=COLORS["timestamp"], dim=True),
    "path": Style(color=COLORS["path"], underline=True),
    "code": Style(color=COLORS["code"], italic=True),
    "data": Style(color=COLORS["data"]),
    "data.key": Style(color=COLORS["data.key"], bold=True),
    "muted": Style(color=COLORS["muted"], dim=True),
    "highlight": Style(color=COLORS["highlight"], bold=True),
}

# Rich theme that can be used directly with Rich Console
RICH_THEME = Theme({name: style for name, style in STYLES.items()})

# Get the appropriate style for a log level
def get_level_style(level: str) -> Style:
    """Get the Rich style for a specific log level.
    
    Args:
        level: The log level (info, debug, warning, error, critical, success, trace)
        
    Returns:
        The corresponding Rich Style
    """
    level = level.lower()
    return STYLES.get(level, STYLES["info"]) # Default to info style

# Get style for a component
def get_component_style(component: str) -> Style:
    """Get the Rich style for a specific component.
    
    Args:
        component: The component name (core, provider, router, etc.)
        
    Returns:
        The corresponding Rich Style
    """
    component = component.lower()
    # Fallback to a default component style if specific one not found
    return STYLES.get(component, STYLES["default_component"])

# Get color by name
def get_color(name: str) -> str:
    """Get a color by name.
    
    Args:
        name: The color name
        
    Returns:
        The color string that can be used with Rich
    """
    return COLORS.get(name.lower(), COLORS["primary"])

# Apply style to text directly
def style_text(text: str, style_name: str) -> str:
    """Apply a named style to text (for use without Rich console).
    
    This is a utility function that doesn't depend on Rich, useful for
    simple terminal output or when Rich console isn't available.
    
    Args:
        text: The text to style
        style_name: The name of the style to apply
        
    Returns:
        Text with ANSI color codes applied (using Rich tags for simplicity)
    """
    # This uses Rich markup format for simplicity, assuming it will be printed
    # by a Rich console later or that the markup is acceptable.
    return f"[{style_name}]{text}[/{style_name}]"

# Get foreground and background colors for a specific context
def get_context_colors(
    context: str, component: Optional[str] = None
) -> Tuple[str, Optional[str]]:
    """Get appropriate foreground and background colors for a log context.
    
    Args:
        context: The log context (e.g., 'request', 'response')
        component: Optional component name for further refinement
        
    Returns:
        Tuple of (foreground_color, background_color) or (color, None)
    """
    style = STYLES.get(context.lower()) or STYLES.get("default_component")
    
    if style and style.color:
        return (str(style.color.name), str(style.bgcolor.name) if style.bgcolor else None)
    else:
        # Fallback to basic colors
        fg = COLORS.get(context.lower(), COLORS["primary"])
        return (fg, None) 
```

--------------------------------------------------------------------------------
/examples/test_code_extraction.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test script for the LLM-based code extraction function.

This script loads the tournament state from a previous run and tests
the new code extraction function against the raw response texts.
"""

import asyncio
import json
import sys
from pathlib import Path
from typing import Dict

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich import box
from rich.panel import Panel
from rich.table import Table

from ultimate_mcp_server.core.server import Gateway

# Import the extraction function from the library
from ultimate_mcp_server.tools import extract_code_from_response
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker  # Import CostTracker
from ultimate_mcp_server.utils.logging.console import console

# Initialize logger
logger = get_logger("example.test_extraction")

# Create a simple structure for cost tracking (though likely won't be used directly here)
# TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])

# Initialize global gateway
gateway = None

# Path to the tournament state file from the last run
TOURNAMENT_STATE_PATH = "/data/projects/ultimate_mcp_server/storage/tournaments/2025-04-01_03-24-37_tournament_76009a9a/tournament_state.json"

async def setup_gateway():
    """Set up the gateway for testing."""
    global gateway
    
    # Create gateway instance
    logger.info("Initializing gateway for testing", emoji_key="start")
    gateway = Gateway("test-extraction", register_tools=False)
    
    # Initialize the server with all providers and built-in tools
    await gateway._initialize_providers()
    
    logger.info("Gateway initialized", emoji_key="success")

async def load_tournament_state() -> Dict:
    """Load the tournament state from the previous run."""
    try:
        with open(TOURNAMENT_STATE_PATH, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        logger.error(f"Error loading tournament state: {str(e)}", emoji_key="error")
        return {}

async def test_extraction(tracker: CostTracker): # Add tracker
    """Test the LLM-based code extraction function."""
    # Load the tournament state
    tournament_state = await load_tournament_state()
    
    if not tournament_state:
        logger.error("Failed to load tournament state", emoji_key="error")
        return 1
    
    # Check if we have rounds_results
    rounds_results = tournament_state.get('rounds_results', [])
    if not rounds_results:
        logger.error("No round results found in tournament state", emoji_key="error")
        return 1
    
    # Create a table to display the results
    console.print("\n[bold]Testing LLM-based Code Extraction Function[/bold]\n")
    
    # Create a table for extraction results
    extraction_table = Table(box=box.MINIMAL, show_header=True, expand=False)
    extraction_table.add_column("Round", style="cyan")
    extraction_table.add_column("Model", style="magenta")
    extraction_table.add_column("Code Extracted", style="green")
    extraction_table.add_column("Line Count", style="yellow", justify="right")
    
    # Process each round
    for round_idx, round_data in enumerate(rounds_results):
        responses = round_data.get('responses', {})
        
        for model_id, response in responses.items():
            display_model = model_id.split(':')[-1] if ':' in model_id else model_id
            response_text = response.get('response_text', '')
            
            if response_text:
                # Extract code using our new function, passing the tracker
                extracted_code = await extract_code_from_response(response_text, tracker=tracker)
                
                # Calculate line count
                line_count = len(extracted_code.split('\n')) if extracted_code else 0
                
                # Add to the table
                extraction_table.add_row(
                    str(round_idx),
                    display_model,
                    "✅" if extracted_code else "❌",
                    str(line_count)
                )
                
                # Print detailed results
                if extracted_code:
                    console.print(Panel(
                        f"[bold]Round {round_idx} - {display_model}[/bold]\n\n"
                        f"[green]Successfully extracted {line_count} lines of code[/green]\n",
                        title="Extraction Result",
                        expand=False
                    ))
                    
                    # Print first 10 lines of code as a preview
                    code_preview = "\n".join(extracted_code.split('\n')[:10])
                    if len(extracted_code.split('\n')) > 10:
                        code_preview += "\n..."
                    
                    console.print(Panel(
                        code_preview,
                        title="Code Preview",
                        expand=False
                    ))
                else:
                    console.print(Panel(
                        f"[bold]Round {round_idx} - {display_model}[/bold]\n\n"
                        f"[red]Failed to extract code[/red]\n",
                        title="Extraction Result",
                        expand=False
                    ))
    
    # Display the summary table
    console.print("\n[bold]Extraction Summary:[/bold]")
    console.print(extraction_table)
    
    # Display cost summary at the end
    tracker.display_summary(console)
    
    return 0

async def main():
    """Run the test script."""
    tracker = CostTracker() # Instantiate tracker
    try:
        # Set up gateway
        await setup_gateway()
        
        # Run the extraction test
        return await test_extraction(tracker) # Pass tracker
    except Exception as e:
        logger.critical(f"Test failed: {str(e)}", emoji_key="critical", exc_info=True)
        return 1
    finally:
        # Clean up
        if gateway:
            pass  # No cleanup needed for Gateway instance

if __name__ == "__main__":
    # Run the script
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/examples/cache_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Cache demonstration for Ultimate MCP Server."""
import asyncio
import sys
import time
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich.markup import escape
from rich.rule import Rule

from ultimate_mcp_server.services.cache import get_cache_service, run_completion_with_cache
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker, display_cache_stats

# --- Add Rich Imports ---
from ultimate_mcp_server.utils.logging.console import console

# ----------------------

# Initialize logger
logger = get_logger("example.cache_demo")


async def demonstrate_cache(tracker: CostTracker = None):
    """Demonstrate cache functionality using Rich."""
    console.print(Rule("[bold blue]Cache Demonstration[/bold blue]"))
    logger.info("Starting cache demonstration", emoji_key="start")
    
    cache_service = get_cache_service()
    
    if not cache_service.enabled:
        logger.warning("Cache is disabled by default. Enabling for demonstration.", emoji_key="warning")
        cache_service.enabled = True
    
    cache_service.clear() # Start with a clean slate
    logger.info("Cache cleared for demonstration", emoji_key="cache")
    
    prompt = "Explain how caching works in distributed systems."
    console.print(f"[cyan]Using Prompt:[/cyan] {escape(prompt)}")
    console.print()

    results = {}
    times = {}
    stats_log = {}

    try:
        # Helper function to get current stats snapshot
        def get_current_stats_dict():
            return {
                "get_count": getattr(cache_service.metrics, "gets", 0), # Use gets for Total Gets
                "hit_count": getattr(cache_service.metrics, "hits", 0),
                "miss_count": getattr(cache_service.metrics, "misses", 0),
                "set_count": getattr(cache_service.metrics, "stores", 0), # Use stores for Total Sets
                # Add other stats if needed by display_cache_stats later
            }
            
        # 1. Cache Miss
        logger.info("1. Running first completion (expect cache MISS)...", emoji_key="processing")
        start_time = time.time()
        results[1] = await run_completion_with_cache(prompt, use_cache=True)
        times[1] = time.time() - start_time
        stats_log[1] = get_current_stats_dict()
        
        # Track cost - only for non-cache hits (actual API calls)
        if tracker:
            tracker.add_call(results[1])
            
        console.print(f"   [yellow]MISS:[/yellow] Took [bold]{times[1]:.3f}s[/bold] (Cost: ${results[1].cost:.6f}, Tokens: {results[1].total_tokens})")

        # 2. Cache Hit
        logger.info("2. Running second completion (expect cache HIT)...", emoji_key="processing")
        start_time = time.time()
        results[2] = await run_completion_with_cache(prompt, use_cache=True)
        times[2] = time.time() - start_time
        stats_log[2] = get_current_stats_dict()
        speedup = times[1] / times[2] if times[2] > 0 else float('inf')
        console.print(f"   [green]HIT:[/green]  Took [bold]{times[2]:.3f}s[/bold] (Speed-up: {speedup:.1f}x vs Miss)")

        # 3. Cache Bypass
        logger.info("3. Running third completion (BYPASS cache)...", emoji_key="processing")
        start_time = time.time()
        results[3] = await run_completion_with_cache(prompt, use_cache=False)
        times[3] = time.time() - start_time
        stats_log[3] = get_current_stats_dict() # Stats shouldn't change much for bypass
        
        # Track cost - bypassing cache calls the API
        if tracker:
            tracker.add_call(results[3])
            
        console.print(f"   [cyan]BYPASS:[/cyan] Took [bold]{times[3]:.3f}s[/bold] (Cost: ${results[3].cost:.6f}, Tokens: {results[3].total_tokens})")

        # 4. Another Cache Hit
        logger.info("4. Running fourth completion (expect cache HIT again)...", emoji_key="processing")
        start_time = time.time()
        results[4] = await run_completion_with_cache(prompt, use_cache=True)
        times[4] = time.time() - start_time
        stats_log[4] = get_current_stats_dict()
        speedup_vs_bypass = times[3] / times[4] if times[4] > 0 else float('inf')
        console.print(f"   [green]HIT:[/green]  Took [bold]{times[4]:.3f}s[/bold] (Speed-up: {speedup_vs_bypass:.1f}x vs Bypass)")
        console.print()

    except Exception as e:
         logger.error(f"Error during cache demonstration run: {e}", emoji_key="error", exc_info=True)
         console.print(f"[bold red]Error during demo run:[/bold red] {escape(str(e))}")
         # Attempt to display stats even if error occurred mid-way
         final_stats_dict = get_current_stats_dict() # Get stats even on error
    else:
         # Get final stats if all runs succeeded
         final_stats_dict = get_current_stats_dict()

    # Prepare the final stats dictionary for display_cache_stats
    # It expects top-level keys like 'enabled', 'persistence', and a 'stats' sub-dict
    display_stats = {
        "enabled": cache_service.enabled,
        "persistence": cache_service.enable_persistence,
        "stats": final_stats_dict,
        # Add savings if available/calculated (Example: Placeholder)
        # "savings": { "cost": getattr(cache_service.metrics, "saved_cost", 0.0) }
    }

    # Display Final Cache Statistics using our display function
    display_cache_stats(display_stats, stats_log, console)
    
    console.print()
    # Use the persistence setting directly from cache_service
    if cache_service.enable_persistence:
        logger.info("Cache persistence is enabled.", emoji_key="cache")
        if hasattr(cache_service, 'cache_dir'):
            console.print(f"[dim]Cache Directory: {cache_service.cache_dir}[/dim]")
    else:
        logger.info("Cache persistence is disabled.", emoji_key="cache")
    console.print()


async def main():
    """Run cache demonstration."""
    tracker = CostTracker()  # Create cost tracker instance
    try:
        await demonstrate_cache(tracker)
        
        # Display cost summary at the end
        tracker.display_summary(console)
        
    except Exception as e:
        logger.critical(f"Cache demonstration failed: {str(e)}", emoji_key="critical")
        return 1
    
    return 0


if __name__ == "__main__":
    # Run the demonstration
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
```

--------------------------------------------------------------------------------
/examples/multi_provider_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Multi-provider completion demo using Ultimate MCP Server."""
import asyncio
import sys
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

# Third-party imports
# These imports need to be below sys.path modification, which is why they have noqa comments
from rich import box  # noqa: E402
from rich.markup import escape  # noqa: E402
from rich.panel import Panel  # noqa: E402
from rich.rule import Rule  # noqa: E402
from rich.table import Table  # noqa: E402

# Project imports
from ultimate_mcp_server.constants import Provider  # noqa: E402
from ultimate_mcp_server.core.server import Gateway  # noqa: E402
from ultimate_mcp_server.utils import get_logger  # noqa: E402
from ultimate_mcp_server.utils.display import CostTracker  # Import CostTracker
from ultimate_mcp_server.utils.logging.console import console  # noqa: E402

# Initialize logger and console
logger = get_logger("example.multi_provider")

async def run_provider_comparison(tracker: CostTracker):
    """Run a comparison of completions across multiple providers using Rich."""
    console.print(Rule("[bold blue]Multi-Provider Completion Comparison[/bold blue]"))
    logger.info("Starting multi-provider comparison demo", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("multi-provider-demo", register_tools=False)
    
    # Initialize providers
    logger.info("Initializing providers...", emoji_key="provider")
    await gateway._initialize_providers()
    
    prompt = "Explain the advantages of quantum computing in 3-4 sentences."
    console.print(f"[cyan]Prompt:[/cyan] {escape(prompt)}")
    
    # Use model names directly if providers are inferred or handled by get_provider
    configs = [
        {"provider": Provider.OPENAI.value, "model": "gpt-4.1-mini"},
        {"provider": Provider.ANTHROPIC.value, "model": "claude-3-5-haiku-20241022"}, 
        {"provider": Provider.GEMINI.value, "model": "gemini-2.0-flash-lite"}, 
        {"provider": Provider.DEEPSEEK.value, "model": "deepseek-chat"}, 
        {"provider": Provider.GROK.value, "model": "grok-3-mini-latest"},
        {"provider": Provider.OPENROUTER.value, "model": "mistralai/mistral-nemo"},
        {"provider": Provider.OLLAMA.value, "model": "llama3.2"}
    ]
    
    results_data = []
    
    for config in configs:
        provider_name = config["provider"]
        model_name = config["model"]
        
        provider = gateway.providers.get(provider_name)
        if not provider:
            logger.warning(f"Provider {provider_name} not available or initialized, skipping.", emoji_key="warning")
            continue
            
        try:
            logger.info(f"Generating completion with {provider_name}/{model_name}...", emoji_key="processing")
            result = await provider.generate_completion(
                prompt=prompt,
                model=model_name,
                temperature=0.7,
                max_tokens=150
            )
            
            # Track the cost
            tracker.add_call(result)

            results_data.append({
                "provider": provider_name,
                "model": model_name,
                "text": result.text,
                "input_tokens": result.input_tokens,
                "output_tokens": result.output_tokens,
                "cost": result.cost,
                "processing_time": result.processing_time
            })
            logger.success(f"Completion from {provider_name}/{model_name} successful.", emoji_key="success")
            
        except Exception as e:
            logger.error(f"Error with {provider_name}/{model_name}: {e}", emoji_key="error", exc_info=True)
            # Optionally store error result
            results_data.append({
                 "provider": provider_name,
                 "model": model_name,
                 "text": f"[red]Error: {escape(str(e))}[/red]",
                 "cost": 0.0, "processing_time": 0.0, "input_tokens": 0, "output_tokens": 0
            })
    
    # Print comparison results using Rich Panels
    console.print(Rule("[bold green]Comparison Results[/bold green]"))
    for result in results_data:
        stats_line = (
            f"Cost: [green]${result['cost']:.6f}[/green] | "
            f"Time: [yellow]{result['processing_time']:.2f}s[/yellow] | "
            f"Tokens: [cyan]{result['input_tokens']} in, {result['output_tokens']} out[/cyan]"
        )
        console.print(Panel(
            escape(result['text'].strip()),
            title=f"[bold magenta]{escape(result['provider'])} / {escape(result['model'])}[/bold magenta]",
            subtitle=stats_line,
            border_style="blue",
            expand=False
        ))
    
    # Filter out error results before calculating summary stats
    valid_results = [r for r in results_data if "Error" not in r["text"]]

    if valid_results:
        summary_table = Table(title="Comparison Summary", box=box.ROUNDED, show_header=False)
        summary_table.add_column("Metric", style="cyan")
        summary_table.add_column("Value", style="white")

        try:
            fastest = min(valid_results, key=lambda r: r['processing_time'])
            summary_table.add_row("⚡ Fastest", f"{escape(fastest['provider'])}/{escape(fastest['model'])} ({fastest['processing_time']:.2f}s)")
        except ValueError: 
            pass # Handle empty list
        
        try:
            cheapest = min(valid_results, key=lambda r: r['cost'])
            summary_table.add_row("💰 Cheapest", f"{escape(cheapest['provider'])}/{escape(cheapest['model'])} (${cheapest['cost']:.6f})")
        except ValueError: 
            pass
        
        try:
            most_tokens = max(valid_results, key=lambda r: r['output_tokens'])
            summary_table.add_row("📄 Most Tokens", f"{escape(most_tokens['provider'])}/{escape(most_tokens['model'])} ({most_tokens['output_tokens']} tokens)")
        except ValueError: 
            pass

        if summary_table.row_count > 0:
            console.print(summary_table)
    else:
        console.print("[yellow]No valid results to generate summary.[/yellow]")
        
    # Display final summary
    tracker.display_summary(console) # Display summary at the end
    
    console.print() # Final spacing
    return 0

async def main():
    """Run the demo."""
    tracker = CostTracker() # Instantiate tracker
    try:
        return await run_provider_comparison(tracker) # Pass tracker
    except Exception as e:
        logger.critical(f"Demo failed: {str(e)}", emoji_key="critical")
        return 1

if __name__ == "__main__":
    # Run the demo
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/tool_annotations.py:
--------------------------------------------------------------------------------

```python
"""
Tool annotations for MCP servers.

This module provides a standardized way to annotate tools with hints that help LLMs
understand when and how to use them effectively.
"""
from typing import List, Optional


class ToolAnnotations:
    """
    Tool annotations providing hints to LLMs about tool behavior and usage patterns.
    
    ToolAnnotations supply metadata that helps LLMs make informed decisions about:
    - WHEN to use a particular tool (appropriate contexts and priority)
    - HOW to use the tool correctly (through examples and behavior hints)
    - WHAT the potential consequences of using the tool might be (read-only vs. destructive)
    - WHO should use the tool (assistant, user, or both via audience hints)
    
    These annotations serve as a bridge between tool developers and LLMs, providing
    crucial context beyond just function signatures and descriptions. For example, the
    annotations can indicate that a file deletion tool is destructive and should be used
    with caution, or that a search tool is safe to retry multiple times.
    
    The system supports four key behavioral hints:
    - read_only_hint: Tool doesn't modify state (safe for exploratory use)
    - destructive_hint: Tool may perform irreversible changes (use with caution)
    - idempotent_hint: Repeated calls with same arguments produce same results
    - open_world_hint: Tool interacts with external systems beyond the LLM's knowledge
    
    Additional metadata includes:
    - audience: Who can/should use this tool
    - priority: How important/commonly used this tool is
    - title: Human-readable name for the tool
    - examples: Sample inputs and expected outputs
    
    Usage example:
        ```python
        # For a document deletion tool
        delete_doc_annotations = ToolAnnotations(
            read_only_hint=False,       # Modifies state
            destructive_hint=True,      # Deletion is destructive
            idempotent_hint=True,       # Deleting twice has same effect as once
            open_world_hint=True,       # Changes external file system
            audience=["assistant"],     # Only assistant should use it
            priority=0.3,               # Lower priority (use cautiously)
            title="Delete Document",
            examples=[{
                "input": {"document_id": "doc-123"},
                "output": {"success": True, "message": "Document deleted"}
            }]
        )
        ```
    
    Note: All hints are advisory only - they don't enforce behavior but help LLMs
    make better decisions about tool usage.
    """
    
    def __init__(
        self,
        read_only_hint: bool = False,
        destructive_hint: bool = True,
        idempotent_hint: bool = False,
        open_world_hint: bool = True,
        audience: List[str] = None,
        priority: float = 0.5,
        title: Optional[str] = None,
        examples: List[dict] = None,
    ):
        """
        Initialize tool annotations.
        
        Args:
            read_only_hint: If True, indicates this tool does not modify its environment.
                Tools with read_only_hint=True are safe to call for exploration without
                side effects. Examples: search tools, data retrieval, information queries.
                Default: False
                
            destructive_hint: If True, the tool may perform destructive updates that
                can't easily be reversed or undone. Only meaningful when read_only_hint 
                is False. Examples: deletion operations, irreversible state changes, payments.
                Default: True
                
            idempotent_hint: If True, calling the tool repeatedly with the same arguments
                will have no additional effect beyond the first call. Useful for retry logic.
                Only meaningful when read_only_hint is False. Examples: setting a value,
                deleting an item (calling it twice doesn't delete it twice).
                Default: False
                
            open_world_hint: If True, this tool may interact with systems or information 
                outside the LLM's knowledge context (external APIs, file systems, etc.).
                If False, the tool operates in a closed domain the LLM can fully model.
                Default: True
                
            audience: Who is the intended user of this tool, as a list of roles:
                - "assistant": The AI assistant can use this tool
                - "user": The human user can use this tool
                Default: ["assistant"]
                
            priority: How important this tool is, from 0.0 (lowest) to 1.0 (highest).
                Higher priority tools should be considered first when multiple tools
                might accomplish a similar task. Default: 0.5 (medium priority)
                
            title: Human-readable title for the tool. If not provided, the tool's
                function name is typically used instead.
                
            examples: List of usage examples, each containing 'input' and 'output' keys.
                These help the LLM understand expected patterns of use and responses.
        """
        self.read_only_hint = read_only_hint
        self.destructive_hint = destructive_hint
        self.idempotent_hint = idempotent_hint
        self.open_world_hint = open_world_hint
        self.audience = audience or ["assistant"]
        self.priority = max(0.0, min(1.0, priority))  # Clamp between 0 and 1
        self.title = title
        self.examples = examples or []
        
    def to_dict(self) -> dict:
        """Convert annotations to dictionary for MCP protocol."""
        return {
            "readOnlyHint": self.read_only_hint,
            "destructiveHint": self.destructive_hint,
            "idempotentHint": self.idempotent_hint,
            "openWorldHint": self.open_world_hint,
            "title": self.title,
            "audience": self.audience,
            "priority": self.priority,
            "examples": self.examples
        }

# Pre-defined annotation templates for common tool types

# A tool that only reads/queries data without modifying any state
READONLY_TOOL = ToolAnnotations(
    read_only_hint=True,
    destructive_hint=False,
    idempotent_hint=True,
    open_world_hint=False,
    priority=0.8,
    title="Read-Only Tool"
)

# A tool that queries external systems or APIs for information
QUERY_TOOL = ToolAnnotations(
    read_only_hint=True,
    destructive_hint=False,
    idempotent_hint=True,
    open_world_hint=True,
    priority=0.7,
    title="Query Tool"
)

# A tool that performs potentially irreversible changes to state
# The LLM should use these with caution, especially without confirmation
DESTRUCTIVE_TOOL = ToolAnnotations(
    read_only_hint=False,
    destructive_hint=True,
    idempotent_hint=False,
    open_world_hint=True,
    priority=0.3,
    title="Destructive Tool"
)

# A tool that modifies state but can be safely called multiple times
# with the same arguments (e.g., setting a value, creating if not exists)
IDEMPOTENT_UPDATE_TOOL = ToolAnnotations(
    read_only_hint=False,
    destructive_hint=False,
    idempotent_hint=True,
    open_world_hint=False,
    priority=0.5,
    title="Idempotent Update Tool"
) 
```

--------------------------------------------------------------------------------
/examples/test_content_detection.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
Test script to demonstrate enhanced content type detection with Magika integration
in the DocumentProcessingTool.
"""

import asyncio
import sys
from pathlib import Path

# Add project root to path for imports when running as script
_PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

from rich.console import Console  # noqa: E402
from rich.panel import Panel  # noqa: E402
from rich.table import Table  # noqa: E402

from ultimate_mcp_server.core.server import Gateway  # noqa: E402
from ultimate_mcp_server.tools.document_conversion_and_processing import (  # noqa: E402
    DocumentProcessingTool,  # noqa: E402
)

console = Console()

# Sample content for testing
HTML_CONTENT = """<!DOCTYPE html>
<html>
<head>
    <title>Test HTML Document</title>
    <meta charset="utf-8">
</head>
<body>
    <h1>This is a test HTML document</h1>
    <p>This paragraph is for testing the content detection.</p>
    <div class="container">
        <ul>
            <li>Item 1</li>
            <li>Item 2</li>
        </ul>
    </div>
    <script>
        // Some JavaScript
        console.log("Hello world");
    </script>
</body>
</html>
"""

MARKDOWN_CONTENT = """# Test Markdown Document

This is a paragraph in markdown format.

## Section 1

* Item 1
* Item 2

[Link to example](https://example.com)

```python
def hello_world():
    print("Hello world")
```

| Column 1 | Column 2 |
|----------|----------|
| Cell 1   | Cell 2   |
"""

CODE_CONTENT = """
#!/usr/bin/env python
import sys
from typing import List, Dict, Optional

class TestClass:
    def __init__(self, name: str, value: int = 0):
        self.name = name
        self.value = value
        
    def process(self, data: List[Dict]) -> Optional[Dict]:
        result = {}
        for item in data:
            if "key" in item:
                result[item["key"]] = item["value"]
        return result if result else None
        
def main():
    test = TestClass("test", 42)
    result = test.process([{"key": "a", "value": 1}, {"key": "b", "value": 2}])
    print(f"Result: {result}")
    
if __name__ == "__main__":
    main()
"""

PLAIN_TEXT_CONTENT = """
This is a plain text document with no special formatting.

It contains multiple paragraphs and some sentences.
There are no markdown elements, HTML tags, or code structures.

Just regular text that someone might write in a simple text editor.
"""

AMBIGUOUS_CONTENT = """
Here's some text with a <div> tag in it.

# This looks like a heading

But it also has some <span>HTML</span> elements.

def is_this_code():
    return "maybe"

Regular paragraph text continues here.
"""

async def test_content_detection():
    console.print(Panel("Testing Content Type Detection with Magika Integration", style="bold green"))
    
    # Initialize the document processor
    gateway = Gateway("content-detection-test")
    # Initialize providers
    console.print("Initializing gateway and providers...")
    await gateway._initialize_providers()
    
    # Create document processing tool
    doc_tool = DocumentProcessingTool(gateway)
    
    # Define test cases
    test_cases = [
        ("HTML Document", HTML_CONTENT),
        ("Markdown Document", MARKDOWN_CONTENT),
        ("Code Document", CODE_CONTENT),
        ("Plain Text Document", PLAIN_TEXT_CONTENT),
        ("Ambiguous Content", AMBIGUOUS_CONTENT),
    ]
    
    # Create results table
    results_table = Table(title="Content Type Detection Results")
    results_table.add_column("Content Type", style="cyan")
    results_table.add_column("Detected Type", style="green")
    results_table.add_column("Confidence", style="yellow")
    results_table.add_column("Method", style="magenta")
    results_table.add_column("Detection Criteria", style="blue")
    
    # Test each case
    for name, content in test_cases:
        console.print(f"\nDetecting content type for: [bold cyan]{name}[/]")
        
        # Detect content type
        result = await doc_tool.detect_content_type(content)
        
        # Get detection details
        detected_type = result.get("content_type", "unknown")
        confidence = result.get("confidence", 0.0)
        criteria = ", ".join(result.get("detection_criteria", []))
        
        # Check if Magika was used
        method = "Magika" if result.get("detection_method") == "magika" else "Heuristic"
        if not result.get("detection_method") == "magika" and result.get("magika_details"):
            method = "Combined (Magika + Heuristic)"
            
        # Add to results table
        results_table.add_row(
            name,
            detected_type,
            f"{confidence:.2f}",
            method,
            criteria[:100] + "..." if len(criteria) > 100 else criteria
        )
        
        # Show all scores
        scores = result.get("all_scores", {})
        if scores:
            scores_table = Table(title="Detection Scores")
            scores_table.add_column("Content Type", style="cyan")
            scores_table.add_column("Score", style="yellow")
            
            for ctype, score in scores.items():
                scores_table.add_row(ctype, f"{score:.3f}")
            
            console.print(scores_table)
        
        # Show Magika details if available
        if "magika_details" in result:
            magika_details = result["magika_details"]
            console.print(Panel(
                f"Magika Type: {magika_details.get('type', 'unknown')}\n"
                f"Magika Confidence: {magika_details.get('confidence', 0.0):.3f}\n"
                f"Matched Primary Type: {magika_details.get('matched_primary_type', False)}",
                title="Magika Details",
                style="blue"
            ))
    
    # Print final results table
    console.print("\n")
    console.print(results_table)

    # Now test HTML to Markdown conversion with a clearly broken HTML case
    console.print(Panel("Testing HTML to Markdown Conversion with Content Detection", style="bold green"))
    
    # Create a test case with problematic HTML (the one that previously failed)
    problematic_html = """<!DOCTYPE html>
<html class="client-nojs vector-feature-language-in-header-enabled vector-feature-language-in-main-page-header-disabled">
<head>
<meta charset="UTF-8">
<title>Transformer (deep learning architecture) - Wikipedia</title>
<script>(function(){var className="client-js vector-feature-language-in-header-enabled vector-feature-language-in-main-page-header-disabled";</script>
</head>
<body>
<h1>Transformer Model</h1>
<p>The Transformer is a deep learning model introduced in the paper "Attention Is All You Need".</p>
</body>
</html>"""

    console.print("Converting problematic HTML to Markdown...")
    result = await doc_tool.clean_and_format_text_as_markdown(
        text=problematic_html,
        extraction_method="auto",
        preserve_tables=True,
        preserve_links=True
    )
    
    console.print(Panel(
        f"Original Type: {result.get('original_content_type', 'unknown')}\n"
        f"Was HTML: {result.get('was_html', False)}\n"
        f"Extraction Method: {result.get('extraction_method_used', 'none')}",
        title="Conversion Details", 
        style="cyan"
    ))
    
    console.print(Panel(
        result.get("markdown_text", "No markdown produced"),
        title="Converted Markdown", 
        style="green"
    ))

if __name__ == "__main__":
    asyncio.run(test_content_detection()) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/provider.py:
--------------------------------------------------------------------------------

```python
"""Provider tools for Ultimate MCP Server."""
from typing import Any, Dict, Optional

# Import ToolError explicitly
from ultimate_mcp_server.exceptions import ToolError

# REMOVE global instance logic
from ultimate_mcp_server.utils import get_logger

from .base import with_error_handling, with_tool_metrics

logger = get_logger("ultimate_mcp_server.tools.provider")

def _get_provider_status_dict() -> Dict[str, Any]:
    """Reliably gets the provider_status dictionary from the gateway instance."""
    provider_status = {}
    # Import here to avoid circular dependency at module load time
    try:
        from ultimate_mcp_server.core import get_gateway_instance
        gateway = get_gateway_instance()
        if gateway and hasattr(gateway, 'provider_status'):
            provider_status = gateway.provider_status
            if provider_status:
                logger.debug("Retrieved provider status via global instance.")
                return provider_status
    except ImportError as e:
        logger.error(f"Failed to import get_gateway_instance: {e}")
    except Exception as e:
        logger.error(f"Error accessing global gateway instance: {e}")
        
    if not provider_status:
        logger.warning("Could not retrieve provider status from global gateway instance.")
        
    return provider_status

# --- Tool Functions (Standalone, Decorated) ---

@with_tool_metrics
@with_error_handling
async def get_provider_status() -> Dict[str, Any]:
    """Checks the status and availability of all configured LLM providers.

    Use this tool to determine which LLM providers (e.g., OpenAI, Anthropic, Gemini)
    are currently enabled, configured correctly (e.g., API keys), and ready to accept requests.
    This helps in deciding which provider to use for a task or for troubleshooting.

    Returns:
        A dictionary mapping provider names to their status details:
        {
            "providers": {
                "openai": {                      # Example for one provider
                    "enabled": true,             # Is the provider enabled in the server config?
                    "available": true,             # Is the provider initialized and ready for requests?
                    "api_key_configured": true,  # Is the necessary API key set?
                    "error": null,               # Error message if initialization failed, null otherwise.
                    "models_count": 38           # Number of models detected for this provider.
                },
                "anthropic": {                   # Example for another provider
                    "enabled": true,
                    "available": false,
                    "api_key_configured": true,
                    "error": "Initialization failed: Connection timeout",
                    "models_count": 0
                },
                ...
            }
        }
        Returns an empty "providers" dict and a message if status info is unavailable.

    Usage:
        - Call this tool before attempting complex tasks to ensure required providers are available.
        - Use the output to inform the user about available options or diagnose issues.
        - If a provider shows "available: false", check the "error" field for clues.
    """
    provider_status = _get_provider_status_dict()

    if not provider_status:
        # Raise ToolError if status cannot be retrieved
        raise ToolError(status_code=503, detail="Provider status information is currently unavailable. The server might be initializing or an internal error occurred.")

    return {
        "providers": {
            name: {
                "enabled": status.enabled,
                "available": status.available,
                "api_key_configured": status.api_key_configured,
                "error": status.error,
                "models_count": len(status.models)
            }
            for name, status in provider_status.items()
        }
    }

@with_tool_metrics
@with_error_handling
async def list_models(
    provider: Optional[str] = None
) -> Dict[str, Any]:
    """Lists available LLM models, optionally filtered by provider.

    Use this tool to discover specific models offered by the configured and available LLM providers.
    The returned model IDs (e.g., 'openai/gpt-4.1-mini') are needed for other tools like
    `chat_completion`, `generate_completion`, `estimate_cost`, or `create_tournament`.

    Args:
        provider: (Optional) The specific provider name (e.g., "openai", "anthropic", "gemini")
                  to list models for. If omitted, models from *all available* providers are listed.

    Returns:
        A dictionary mapping provider names to a list of their available models:
        {
            "models": {
                "openai": [                       # Example for one provider
                    {
                        "id": "openai/gpt-4.1-mini", # Unique ID used in other tools
                        "name": "GPT-4o Mini",     # Human-friendly name
                        "context_window": 128000,
                        "features": ["chat", "completion", "vision"],
                        "input_cost_pmt": 0.15,  # Cost per Million Tokens (Input)
                        "output_cost_pmt": 0.60  # Cost per Million Tokens (Output)
                    },
                    ...
                ],
                "gemini": [                      # Example for another provider
                    {
                        "id": "gemini/gemini-2.5-pro-preview-03-25",
                        "name": "Gemini 2.5 Pro Experimental",
                        "context_window": 8192,
                        "features": ["chat", "completion"],
                        "input_cost_pmt": null, # Cost info might be null
                        "output_cost_pmt": null
                    },
                    ...
                ],
                ...
            }
        }
        Returns an empty "models" dict or includes warnings/errors if providers/models are unavailable.

    Usage Flow:
        1. (Optional) Call `get_provider_status` to see which providers are generally available.
        2. Call `list_models` (optionally specifying a provider) to get usable model IDs.
        3. Use a specific model ID (like "openai/gpt-4.1-mini") as the 'model' parameter in other tools.

    Raises:
        ToolError: If the specified provider name is invalid or provider status is unavailable.
    """
    provider_status = _get_provider_status_dict()

    if not provider_status:
        raise ToolError(status_code=503, detail="Provider status information is currently unavailable. Cannot list models.")

    models = {}
    if provider:
        if provider not in provider_status:
            valid_providers = list(provider_status.keys())
            raise ToolError(status_code=404, detail=f"Invalid provider specified: '{provider}'. Valid options: {valid_providers}")

        status = provider_status[provider]
        if not status.available:
            # Return empty list for the provider but include a warning message
            return {
                "models": {provider: []},
                "warning": f"Provider '{provider}' is configured but currently unavailable. Reason: {status.error or 'Unknown error'}"
            }
        # Use model details directly from the ProviderStatus object
        models[provider] = [m for m in status.models] if status.models else []
    else:
        # List models for all *available* providers
        any_available = False
        for name, status in provider_status.items():
            if status.available:
                any_available = True
                 # Use model details directly from the ProviderStatus object
                models[name] = [m for m in status.models] if status.models else []
            # else: Provider not available, don't include it unless specifically requested

        if not any_available:
            return {
                "models": {},
                "warning": "No providers are currently available. Check provider status using get_provider_status."
            }
        elif all(len(model_list) == 0 for model_list in models.values()):
             return {
                "models": models,
                "warning": "No models listed for any available provider. Check provider status or configuration."
            }

    return {"models": models} 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/cache.py:
--------------------------------------------------------------------------------

```python
"""Cache service for LLM and RAG results."""
import asyncio
import os
import pickle
import time
from pathlib import Path
from typing import Any, Dict, Optional

from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)

# Singleton instance
_cache_service = None


def get_cache_service():
    """
    Get or create the global singleton cache service instance.
    
    This function implements the singleton pattern for the CacheService, ensuring that only
    one instance is created across the entire application. On the first call, it creates a
    new CacheService instance and stores it in a module-level variable. Subsequent calls
    return the same instance.
    
    Using this function instead of directly instantiating CacheService ensures consistent
    caching behavior throughout the application, with a shared cache that persists across
    different components and requests.
    
    Returns:
        CacheService: The global singleton cache service instance.
        
    Example:
        ```python
        # Get the same cache service instance from anywhere in the code
        cache = get_cache_service()
        await cache.set("my_key", my_value, ttl=3600)
        ```
    """
    global _cache_service
    if _cache_service is None:
        _cache_service = CacheService()
    return _cache_service


class CacheService:
    """Service for caching LLM and RAG results."""
    
    def __init__(self, cache_dir: Optional[str] = None):
        """Initialize the cache service.
        
        Args:
            cache_dir: Directory to store cache files
        """
        config = get_config()
        cache_config = config.cache
        self.cache_dir = cache_config.directory
        
        # Create cache directory if it doesn't exist
        os.makedirs(self.cache_dir, exist_ok=True)
        
        # In-memory cache
        self.memory_cache: Dict[str, Dict[str, Any]] = {}
        
        # Load cache from disk
        self._load_cache()
        
        # Schedule cache maintenance
        self._schedule_maintenance()
        
        logger.info(f"Cache service initialized with directory: {self.cache_dir}")
    
    def _load_cache(self) -> None:
        """Load cache from disk."""
        try:
            cache_file = Path(self.cache_dir) / "cache.pickle"
            if cache_file.exists():
                with open(cache_file, "rb") as f:
                    loaded_cache = pickle.load(f)
                    
                    # Filter out expired items
                    current_time = time.time()
                    filtered_cache = {
                        key: value for key, value in loaded_cache.items()
                        if "expiry" not in value or value["expiry"] > current_time
                    }
                    
                    self.memory_cache = filtered_cache
                    logger.info(f"Loaded {len(self.memory_cache)} items from cache")
            else:
                logger.info("No cache file found, starting with empty cache")
        except Exception as e:
            logger.error(f"Error loading cache: {str(e)}")
            # Start with empty cache
            self.memory_cache = {}
    
    def _save_cache(self) -> None:
        """Save cache to disk."""
        try:
            cache_file = Path(self.cache_dir) / "cache.pickle"
            
            with open(cache_file, "wb") as f:
                pickle.dump(self.memory_cache, f)
                
            logger.info(f"Saved {len(self.memory_cache)} items to cache")
        except Exception as e:
            logger.error(f"Error saving cache: {str(e)}")
    
    def _schedule_maintenance(self) -> None:
        """Schedule periodic cache maintenance."""
        asyncio.create_task(self._periodic_maintenance())
    
    async def _periodic_maintenance(self) -> None:
        """Perform periodic cache maintenance."""
        while True:
            try:
                # Clean expired items
                self._clean_expired()
                
                # Save cache to disk
                self._save_cache()
                
                # Wait for next maintenance cycle (every hour)
                await asyncio.sleep(3600)
            except Exception as e:
                logger.error(f"Error in cache maintenance: {str(e)}")
                await asyncio.sleep(300)  # Wait 5 minutes on error
    
    def _clean_expired(self) -> None:
        """Clean expired items from cache."""
        current_time = time.time()
        initial_count = len(self.memory_cache)
        
        self.memory_cache = {
            key: value for key, value in self.memory_cache.items()
            if "expiry" not in value or value["expiry"] > current_time
        }
        
        removed = initial_count - len(self.memory_cache)
        if removed > 0:
            logger.info(f"Cleaned {removed} expired items from cache")
    
    async def get(self, key: str) -> Optional[Any]:
        """Get an item from the cache.
        
        Args:
            key: Cache key
            
        Returns:
            Cached value or None if not found or expired
        """
        if key not in self.memory_cache:
            return None
            
        cache_item = self.memory_cache[key]
        
        # Check expiry
        if "expiry" in cache_item and cache_item["expiry"] < time.time():
            # Remove expired item
            del self.memory_cache[key]
            return None
        
        # Update access time
        cache_item["last_access"] = time.time()
        cache_item["access_count"] = cache_item.get("access_count", 0) + 1
        
        return cache_item["value"]
    
    async def set(
        self, 
        key: str, 
        value: Any, 
        ttl: Optional[int] = None
    ) -> bool:
        """Set an item in the cache.
        
        Args:
            key: Cache key
            value: Value to cache
            ttl: Time to live in seconds (None for no expiry)
            
        Returns:
            True if successful
        """
        try:
            expiry = time.time() + ttl if ttl is not None else None
            
            self.memory_cache[key] = {
                "value": value,
                "created": time.time(),
                "last_access": time.time(),
                "access_count": 0,
                "expiry": expiry
            }
            
            # Schedule save if more than 10 items have been added since last save
            if len(self.memory_cache) % 10 == 0:
                asyncio.create_task(self._async_save_cache())
                
            return True
        except Exception as e:
            logger.error(f"Error setting cache item: {str(e)}")
            return False
    
    async def _async_save_cache(self) -> None:
        """Save cache asynchronously."""
        self._save_cache()
    
    async def delete(self, key: str) -> bool:
        """Delete an item from the cache.
        
        Args:
            key: Cache key
            
        Returns:
            True if item was deleted, False if not found
        """
        if key in self.memory_cache:
            del self.memory_cache[key]
            return True
        return False
    
    async def clear(self) -> None:
        """Clear all items from the cache."""
        self.memory_cache.clear()
        self._save_cache()
        logger.info("Cache cleared")
    
    async def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics.
        
        Returns:
            Cache statistics
        """
        total_items = len(self.memory_cache)
        
        # Count expired items
        current_time = time.time()
        expired_items = sum(
            1 for item in self.memory_cache.values()
            if "expiry" in item and item["expiry"] < current_time
        )
        
        # Calculate average access count
        access_counts = [
            item.get("access_count", 0) 
            for item in self.memory_cache.values()
        ]
        avg_access = sum(access_counts) / max(1, len(access_counts))
        
        return {
            "total_items": total_items,
            "expired_items": expired_items,
            "active_items": total_items - expired_items,
            "avg_access_count": avg_access,
            "cache_dir": self.cache_dir
        } 
```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
"""Pytest fixtures for Ultimate MCP Server tests."""
import asyncio
import json
import os
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional

import pytest
from pytest import MonkeyPatch

from ultimate_mcp_server.config import Config, get_config
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.utils import get_logger

logger = get_logger("tests")


class MockResponse:
    """Mock response for testing."""
    def __init__(self, status_code: int = 200, json_data: Optional[Dict[str, Any]] = None):
        self.status_code = status_code
        self.json_data = json_data or {}
        
    async def json(self):
        return self.json_data
        
    async def text(self):
        return json.dumps(self.json_data)
        
    async def __aenter__(self):
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass


class MockClient:
    """Mock HTTP client for testing."""
    def __init__(self, responses: Optional[Dict[str, Any]] = None):
        self.responses = responses or {}
        self.requests = []
        
    async def post(self, url: str, json: Dict[str, Any], headers: Optional[Dict[str, str]] = None):
        self.requests.append({"url": url, "json": json, "headers": headers})
        return MockResponse(json_data=self.responses.get(url, {"choices": [{"message": {"content": "Mock response"}}]}))
        
    async def get(self, url: str, headers: Optional[Dict[str, str]] = None):
        self.requests.append({"url": url, "headers": headers})
        return MockResponse(json_data=self.responses.get(url, {"data": [{"id": "mock-model"}]}))


class MockProvider(BaseProvider):
    """Mock provider for testing."""
    
    provider_name = "mock"
    
    def __init__(self, api_key: Optional[str] = None, **kwargs):
        super().__init__(api_key=api_key, **kwargs)
        self.responses = kwargs.pop("responses", {})
        self.initialized = False
        self.calls = []
        
    async def initialize(self) -> bool:
        self.initialized = True
        self.logger.success("Mock provider initialized successfully", emoji_key="provider")
        return True
        
    async def generate_completion(
        self,
        prompt: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> ModelResponse:
        self.calls.append({
            "type": "completion",
            "prompt": prompt,
            "model": model,
            "max_tokens": max_tokens,
            "temperature": temperature,
            "kwargs": kwargs
        })
        
        model = model or self.get_default_model()
        
        return ModelResponse(
            text=self.responses.get("text", "Mock completion response"),
            model=model,
            provider=self.provider_name,
            input_tokens=100,
            output_tokens=50,
            processing_time=0.1,
            raw_response={"id": "mock-response-id"}
        )
        
    async def generate_completion_stream(
        self,
        prompt: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ):
        self.calls.append({
            "type": "stream",
            "prompt": prompt,
            "model": model,
            "max_tokens": max_tokens,
            "temperature": temperature,
            "kwargs": kwargs
        })
        
        model = model or self.get_default_model()
        
        chunks = self.responses.get("chunks", ["Mock ", "streaming ", "response"])
        
        for i, chunk in enumerate(chunks):
            yield chunk, {
                "model": model,
                "provider": self.provider_name,
                "chunk_index": i + 1,
                "finish_reason": "stop" if i == len(chunks) - 1 else None
            }
            
    async def list_models(self) -> List[Dict[str, Any]]:
        return self.responses.get("models", [
            {
                "id": "mock-model-1",
                "provider": self.provider_name,
                "description": "Mock model 1"
            },
            {
                "id": "mock-model-2", 
                "provider": self.provider_name,
                "description": "Mock model 2"
            }
        ])
        
    def get_default_model(self) -> str:
        return "mock-model-1"
        
    async def check_api_key(self) -> bool:
        return True


@pytest.fixture
def test_dir() -> Path:
    """Get the tests directory path."""
    return Path(__file__).parent


@pytest.fixture
def sample_data_dir(test_dir: Path) -> Path:
    """Get the sample data directory path."""
    data_dir = test_dir / "data"
    data_dir.mkdir(exist_ok=True)
    return data_dir


@pytest.fixture
def mock_env_vars(monkeypatch: MonkeyPatch) -> None:
    """Set mock environment variables."""
    monkeypatch.setenv("OPENAI_API_KEY", "mock-openai-key")
    monkeypatch.setenv("ANTHROPIC_API_KEY", "mock-anthropic-key")
    monkeypatch.setenv("GEMINI_API_KEY", "mock-gemini-key")
    monkeypatch.setenv("DEEPSEEK_API_KEY", "mock-deepseek-key")
    monkeypatch.setenv("CACHE_ENABLED", "true")
    monkeypatch.setenv("LOG_LEVEL", "DEBUG")


@pytest.fixture
def test_config() -> Config:
    """Get a test configuration."""
    # Create a test configuration
    test_config = Config()
    
    # Override settings for testing
    test_config.cache.enabled = True
    test_config.cache.ttl = 60  # Short TTL for testing
    test_config.cache.max_entries = 100
    test_config.server.port = 8888  # Different port for testing
    
    # Set test API keys
    test_config.providers.openai.api_key = "test-openai-key"
    test_config.providers.anthropic.api_key = "test-anthropic-key"
    test_config.providers.gemini.api_key = "test-gemini-key"
    test_config.providers.deepseek.api_key = "test-deepseek-key"
    
    return test_config


@pytest.fixture
def mock_provider() -> MockProvider:
    """Get a mock provider."""
    return MockProvider(api_key="mock-api-key")


@pytest.fixture
def mock_gateway(mock_provider: MockProvider) -> Gateway:
    """Get a mock gateway with the mock provider."""
    gateway = Gateway(name="test-gateway")
    
    # Add mock provider
    gateway.providers["mock"] = mock_provider
    gateway.provider_status["mock"] = {
        "enabled": True,
        "available": True,
        "api_key_configured": True,
        "models": [
            {
                "id": "mock-model-1",
                "provider": "mock",
                "description": "Mock model 1"
            },
            {
                "id": "mock-model-2", 
                "provider": "mock",
                "description": "Mock model 2"
            }
        ]
    }
    
    return gateway


@pytest.fixture
def mock_http_client(monkeypatch: MonkeyPatch) -> MockClient:
    """Mock HTTP client to avoid real API calls."""
    mock_client = MockClient()
    
    # We'll need to patch any HTTP clients used by the providers
    # This will be implemented as needed in specific tests
    
    return mock_client


@pytest.fixture
def sample_document() -> str:
    """Get a sample document for testing."""
    return """
    # Sample Document
    
    This is a sample document for testing purposes.
    
    ## Section 1
    
    Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
    Nullam auctor, nisl eget ultricies aliquam, est libero tincidunt nisi,
    eu aliquet nunc nisl eu nisl.
    
    ## Section 2
    
    Praesent euismod, nisl eget ultricies aliquam, est libero tincidunt nisi,
    eu aliquet nunc nisl eu nisl.
    
    ### Subsection 2.1
    
    - Item 1
    - Item 2
    - Item 3
    
    ### Subsection 2.2
    
    1. First item
    2. Second item
    3. Third item
    """


@pytest.fixture
def sample_json_data() -> Dict[str, Any]:
    """Get sample JSON data for testing."""
    return {
        "name": "Test User",
        "age": 30,
        "email": "[email protected]",
        "address": {
            "street": "123 Test St",
            "city": "Test City",
            "state": "TS",
            "zip": "12345"
        },
        "tags": ["test", "sample", "json"]
    }


@pytest.fixture(scope="session")
def event_loop_policy():
    """Return an event loop policy for the test session."""
    return asyncio.DefaultEventLoopPolicy()
```

--------------------------------------------------------------------------------
/tests/integration/test_server.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for the Ultimate MCP Server server."""
from contextlib import asynccontextmanager
from typing import Any, Dict, Optional

import pytest
from pytest import MonkeyPatch

from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.utils import get_logger

logger = get_logger("test.integration.server")


@pytest.fixture
async def test_gateway() -> Gateway:
    """Create a test gateway instance."""
    gateway = Gateway(name="test-gateway")
    await gateway._initialize_providers()
    return gateway


class TestGatewayServer:
    """Tests for the Gateway server."""
    
    async def test_initialization(self, test_gateway: Gateway):
        """Test gateway initialization."""
        logger.info("Testing gateway initialization", emoji_key="test")
        
        assert test_gateway.name == "test-gateway"
        assert test_gateway.mcp is not None
        assert hasattr(test_gateway, "providers")
        assert hasattr(test_gateway, "provider_status")
        
    async def test_provider_status(self, test_gateway: Gateway):
        """Test provider status information."""
        logger.info("Testing provider status", emoji_key="test")
        
        # Should have provider status information
        assert test_gateway.provider_status is not None
        
        # Get info - we need to use the resource accessor instead of get_resource
        @test_gateway.mcp.resource("info://server")
        def server_info() -> Dict[str, Any]:
            return {
                "name": test_gateway.name,
                "version": "0.1.0",
                "providers": list(test_gateway.provider_status.keys())
            }
        
        # Access the server info
        server_info_data = server_info()
        assert server_info_data is not None
        assert "name" in server_info_data
        assert "version" in server_info_data
        assert "providers" in server_info_data
        
    async def test_tool_registration(self, test_gateway: Gateway):
        """Test tool registration."""
        logger.info("Testing tool registration", emoji_key="test")
        
        # Define a test tool
        @test_gateway.mcp.tool()
        async def test_tool(arg1: str, arg2: Optional[str] = None) -> Dict[str, Any]:
            """Test tool for testing."""
            return {"result": f"{arg1}-{arg2 or 'default'}", "success": True}
        
        # Execute the tool - result appears to be a list not a dict
        result = await test_gateway.mcp.call_tool("test_tool", {"arg1": "test", "arg2": "value"})
        
        # Verify test passed by checking we get a valid response (without assuming exact structure)
        assert result is not None
        
        # Execute with default
        result2 = await test_gateway.mcp.call_tool("test_tool", {"arg1": "test"})
        assert result2 is not None
        
    async def test_tool_error_handling(self, test_gateway: Gateway):
        """Test error handling in tools."""
        logger.info("Testing tool error handling", emoji_key="test")
        
        # Define a tool that raises an error
        @test_gateway.mcp.tool()
        async def error_tool(should_fail: bool = True) -> Dict[str, Any]:
            """Tool that fails on demand."""
            if should_fail:
                raise ValueError("Test error")
            return {"success": True}
        
        # Execute and catch the error
        with pytest.raises(Exception):  # MCP might wrap the error  # noqa: B017
            await test_gateway.mcp.call_tool("error_tool", {"should_fail": True})
            
        # Execute successful case
        result = await test_gateway.mcp.call_tool("error_tool", {"should_fail": False})
        # Just check a result is returned, not its specific structure
        assert result is not None


class TestServerLifecycle:
    """Tests for server lifecycle."""
    
    async def test_server_lifespan(self, monkeypatch: MonkeyPatch):
        """Test server lifespan context manager."""
        logger.info("Testing server lifespan", emoji_key="test")
        
        # Track lifecycle events
        events = []
        
        # Mock Gateway.run method to avoid asyncio conflicts
        def mock_gateway_run(self):
            events.append("run")
            
        monkeypatch.setattr(Gateway, "run", mock_gateway_run)
        
        # Create a fully mocked lifespan context manager
        @asynccontextmanager
        async def mock_lifespan(server):
            """Mock lifespan context manager that directly adds events"""
            events.append("enter")
            try:
                yield {"mocked": "context"}
            finally:
                events.append("exit")
        
        # Create a gateway and replace its _server_lifespan with our mock
        gateway = Gateway(name="test-lifecycle")
        monkeypatch.setattr(gateway, "_server_lifespan", mock_lifespan)
        
        # Test run method (now mocked)
        gateway.run()
        assert "run" in events
        
        # Test the mocked lifespan context manager
        async with gateway._server_lifespan(None) as context:
            events.append("in_context")
            assert context is not None
            
        # Check all expected events were recorded
        assert "enter" in events, f"Events: {events}"
        assert "in_context" in events, f"Events: {events}"
        assert "exit" in events, f"Events: {events}"


class TestServerIntegration:
    """Integration tests for server with tools."""
    
    async def test_provider_tools(self, test_gateway: Gateway, monkeypatch: MonkeyPatch):
        """Test provider-related tools."""
        logger.info("Testing provider tools", emoji_key="test")
        
        # Mock tool execution
        async def mock_call_tool(tool_name, params):
            if tool_name == "get_provider_status":
                return {
                    "providers": {
                        "openai": {
                            "enabled": True,
                            "available": True,
                            "api_key_configured": True,
                            "error": None,
                            "models_count": 3
                        },
                        "anthropic": {
                            "enabled": True,
                            "available": True,
                            "api_key_configured": True,
                            "error": None,
                            "models_count": 5
                        }
                    }
                }
            elif tool_name == "list_models":
                provider = params.get("provider")
                if provider == "openai":
                    return {
                        "models": {
                            "openai": [
                                {"id": "gpt-4o", "provider": "openai"},
                                {"id": "gpt-4.1-mini", "provider": "openai"},
                                {"id": "gpt-4.1-mini", "provider": "openai"}
                            ]
                        }
                    }
                else:
                    return {
                        "models": {
                            "openai": [
                                {"id": "gpt-4o", "provider": "openai"},
                                {"id": "gpt-4.1-mini", "provider": "openai"}
                            ],
                            "anthropic": [
                                {"id": "claude-3-opus-20240229", "provider": "anthropic"},
                                {"id": "claude-3-5-haiku-20241022", "provider": "anthropic"}
                            ]
                        }
                    }
            else:
                return {"error": f"Unknown tool: {tool_name}"}
                
        monkeypatch.setattr(test_gateway.mcp, "call_tool", mock_call_tool)
        
        # Test get_provider_status
        status = await test_gateway.mcp.call_tool("get_provider_status", {})
        assert "providers" in status
        assert "openai" in status["providers"]
        assert "anthropic" in status["providers"]
        
        # Test list_models with provider
        models = await test_gateway.mcp.call_tool("list_models", {"provider": "openai"})
        assert "models" in models
        assert "openai" in models["models"]
        assert len(models["models"]["openai"]) == 3
        
        # Test list_models without provider
        all_models = await test_gateway.mcp.call_tool("list_models", {})
        assert "models" in all_models
        assert "openai" in all_models["models"]
        assert "anthropic" in all_models["models"]
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/console.py:
--------------------------------------------------------------------------------

```python
"""
Rich console configuration for Gateway logging system.

This module provides a configured Rich console instance for beautiful terminal output,
along with utility functions for common console operations.
"""
import sys  # Add this import
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Tuple, Union

from rich.box import ROUNDED, Box
from rich.console import Console, ConsoleRenderable
from rich.live import Live
from rich.panel import Panel
from rich.progress import (
    BarColumn,
    Progress,
    SpinnerColumn,
    TextColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
)
from rich.status import Status
from rich.syntax import Syntax
from rich.table import Table
from rich.text import Text
from rich.traceback import install as install_rich_traceback
from rich.tree import Tree

# Use relative import for theme
from .themes import RICH_THEME

# Configure global console with our theme
# Note: Recording might be useful for testing or specific scenarios
console = Console(
    theme=RICH_THEME,
    highlight=True,
    markup=True,
    emoji=True,
    record=False, # Set to True to capture output for testing
    width=None,  # Auto-width, or set a fixed width if desired
    color_system="auto", # "auto", "standard", "256", "truecolor"
    file=sys.stderr,  # Always use stderr to avoid interfering with JSON-RPC messages on stdout
)

# Install rich traceback handler for beautiful error tracebacks
# show_locals=True can be verbose, consider False for production
install_rich_traceback(console=console, show_locals=False)

# Custom progress bar setup
def create_progress(
    transient: bool = True,
    auto_refresh: bool = True,
    disable: bool = False,
    **kwargs
) -> Progress:
    """Create a customized Rich Progress instance.
    
    Args:
        transient: Whether to remove the progress bar after completion
        auto_refresh: Whether to auto-refresh the progress bar
        disable: Whether to disable the progress bar
        **kwargs: Additional arguments passed to Progress constructor
        
    Returns:
        Configured Progress instance
    """
    return Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"), # Use theme style
        BarColumn(bar_width=None),
        "[progress.percentage]{task.percentage:>3.0f}%", # Use theme style
        TimeElapsedColumn(),
        TimeRemainingColumn(),
        console=console,
        transient=transient,
        auto_refresh=auto_refresh,
        disable=disable,
        **kwargs
    )

@contextmanager
def status(message: str, spinner: str = "dots", **kwargs):
    """Context manager for displaying a status message during an operation.
    
    Args:
        message: The status message to display
        spinner: The spinner animation to use
        **kwargs: Additional arguments to pass to Status constructor
    
    Yields:
        Rich Status object that can be updated
    """
    with Status(message, console=console, spinner=spinner, **kwargs) as status_obj:
        yield status_obj

def print_panel(
    content: Union[str, Text, ConsoleRenderable],
    title: Optional[str] = None,
    style: Optional[str] = "info", # Use theme styles by default
    box: Optional[Box] = ROUNDED,
    expand: bool = True,
    padding: Tuple[int, int] = (1, 2),
    **kwargs
) -> None:
    """Print content in a styled panel.
    
    Args:
        content: The content to display in the panel
        title: Optional panel title
        style: Style name to apply (from theme)
        box: Box style to use
        expand: Whether the panel should expand to fill width
        padding: Panel padding (vertical, horizontal)
        **kwargs: Additional arguments to pass to Panel constructor
    """
    if isinstance(content, str):
        content = Text.from_markup(content) # Allow markup in string content
    
    panel = Panel(
        content,
        title=title,
        style=style if style else "none", # Pass style name directly
        border_style=style, # Use same style for border unless overridden
        box=box,
        expand=expand,
        padding=padding,
        **kwargs
    )
    console.print(panel)

def print_syntax(
    code: str,
    language: str = "python",
    line_numbers: bool = True,
    theme: str = "monokai", # Standard Rich theme
    title: Optional[str] = None,
    background_color: Optional[str] = None,
    **kwargs
) -> None:
    """Print syntax-highlighted code.
    
    Args:
        code: The code to highlight
        language: The programming language
        line_numbers: Whether to show line numbers
        theme: Syntax highlighting theme (e.g., 'monokai', 'native')
        title: Optional title for the code block (creates a panel)
        background_color: Optional background color
        **kwargs: Additional arguments to pass to Syntax constructor
    """
    syntax = Syntax(
        code,
        language,
        theme=theme,
        line_numbers=line_numbers,
        background_color=background_color,
        **kwargs
    )
    
    if title:
        # Use a neutral panel style for code
        print_panel(syntax, title=title, style="none", padding=(0,1))
    else:
        console.print(syntax)

def print_table(
    title: Optional[str] = None,
    columns: Optional[List[Union[str, Dict[str, Any]]]] = None,
    rows: Optional[List[List[Any]]] = None,
    box: Box = ROUNDED,
    show_header: bool = True,
    **kwargs
) -> Table:
    """Create and print a Rich table.
    
    Args:
        title: Optional table title
        columns: List of column names or dicts for more control (e.g., {"header": "Name", "style": "bold"})
        rows: List of rows, each a list of values (will be converted to str)
        box: Box style to use
        show_header: Whether to show the table header
        **kwargs: Additional arguments to pass to Table constructor
        
    Returns:
        The created Table instance (in case further modification is needed)
    """
    table = Table(title=title, box=box, show_header=show_header, **kwargs)
    
    if columns:
        for column in columns:
            if isinstance(column, dict):
                table.add_column(**column)
            else:
                table.add_column(str(column))
            
    if rows:
        for row in rows:
            # Ensure all items are renderable (convert simple types to str)
            renderable_row = [
                item if isinstance(item, ConsoleRenderable) else str(item) 
                for item in row
            ]
            table.add_row(*renderable_row)
    
    console.print(table)
    return table

def print_tree(
    name: str,
    data: Union[Dict[str, Any], List[Any]],
    guide_style: str = "bright_black",
    highlight: bool = True,
    **kwargs
) -> None:
    """Print a hierarchical tree structure from nested data.
    
    Args:
        name: The root label of the tree
        data: Nested dictionary or list to render as a tree
        guide_style: Style for the tree guides
        highlight: Apply highlighting to the tree
        **kwargs: Additional arguments to pass to Tree constructor
    """
    tree = Tree(name, guide_style=guide_style, highlight=highlight, **kwargs)
    
    def build_tree(branch, node_data):
        """Recursively build the tree from nested data."""
        if isinstance(node_data, dict):
            for key, value in node_data.items():
                sub_branch = branch.add(str(key))
                build_tree(sub_branch, value)
        elif isinstance(node_data, list):
            for index, item in enumerate(node_data):
                # Use index as label or try to represent item briefly
                label = f"[{index}]"
                sub_branch = branch.add(label)
                build_tree(sub_branch, item)
        else:
             # Leaf node
             branch.add(Text(str(node_data)))
             
    build_tree(tree, data)
    console.print(tree)

def print_json(data: Any, title: Optional[str] = None, indent: int = 2, highlight: bool = True) -> None:
    """Print data formatted as JSON with syntax highlighting.

    Args:
        data: The data to format as JSON.
        title: Optional title (creates a panel).
        indent: JSON indentation level.
        highlight: Apply syntax highlighting.
    """
    import json
    try:
        json_str = json.dumps(data, indent=indent, ensure_ascii=False)
        if highlight:
            syntax = Syntax(json_str, "json", theme="native", word_wrap=True)
            if title:
                print_panel(syntax, title=title, style="none", padding=(0, 1))
            else:
                console.print(syntax)
        else:
            if title:
                print_panel(json_str, title=title, style="none", padding=(0, 1))
            else:
                console.print(json_str)
    except Exception as e:
        console.print(f"[error]Could not format data as JSON: {e}[/error]")

@contextmanager
def live_display(renderable: ConsoleRenderable, **kwargs):
    """Context manager for displaying a live-updating renderable.
    
    Args:
        renderable: The Rich renderable to display live.
        **kwargs: Additional arguments for the Live instance.
    
    Yields:
        The Live instance.
    """
    with Live(renderable, console=console, **kwargs) as live:
        yield live

def get_rich_console() -> Console:
    """Returns the shared Rich Console instance."""
    return console 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/security.py:
--------------------------------------------------------------------------------

```python
"""Security utilities for Ultimate MCP Server."""
import base64
import hashlib
import hmac
import re
import secrets
import time
from typing import Any, Dict, List, Optional, Tuple

from ultimate_mcp_server.config import get_env
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


def mask_api_key(api_key: str) -> str:
    """Mask API key for safe logging.
    
    Args:
        api_key: API key to mask
        
    Returns:
        Masked API key
    """
    if not api_key:
        return ""
        
    # Keep first 4 and last 4 characters, mask the rest
    if len(api_key) <= 8:
        return "*" * len(api_key)
        
    return api_key[:4] + "*" * (len(api_key) - 8) + api_key[-4:]


def validate_api_key(api_key: str, provider: str) -> bool:
    """Validate API key format for a provider.
    
    Args:
        api_key: API key to validate
        provider: Provider name
        
    Returns:
        True if API key format is valid
    """
    if not api_key:
        return False
        
    # Provider-specific validation patterns
    patterns = {
        "openai": r'^sk-[a-zA-Z0-9]{48}$',
        "anthropic": r'^sk-ant-[a-zA-Z0-9]{48}$',
        "deepseek": r'^sk-[a-zA-Z0-9]{32,64}$',
        "gemini": r'^[a-zA-Z0-9_-]{39}$',
        # Add more providers as needed
    }
    
    # Get pattern for provider
    pattern = patterns.get(provider.lower())
    if not pattern:
        # For unknown providers, check minimum length
        return len(api_key) >= 16
        
    # Check if API key matches the pattern
    return bool(re.match(pattern, api_key))


def generate_random_string(length: int = 32) -> str:
    """Generate a cryptographically secure random string.
    
    Args:
        length: Length of the string
        
    Returns:
        Random string
    """
    # Generate random bytes
    random_bytes = secrets.token_bytes(length)
    
    # Convert to URL-safe base64
    random_string = base64.urlsafe_b64encode(random_bytes).decode('utf-8')
    
    # Truncate to desired length
    return random_string[:length]


def generate_api_key(prefix: str = 'lgw') -> str:
    """Generate an API key for the gateway.
    
    Args:
        prefix: Key prefix
        
    Returns:
        Generated API key
    """
    # Generate timestamp
    timestamp = int(time.time())
    
    # Generate random bytes
    random_bytes = secrets.token_bytes(24)
    
    # Combine and encode
    timestamp_bytes = timestamp.to_bytes(4, byteorder='big')
    combined = timestamp_bytes + random_bytes
    encoded = base64.urlsafe_b64encode(combined).decode('utf-8').rstrip('=')
    
    # Add prefix
    return f"{prefix}-{encoded}"


def create_hmac_signature(
    key: str,
    message: str,
    algorithm: str = 'sha256'
) -> str:
    """Create an HMAC signature.
    
    Args:
        key: Secret key
        message: Message to sign
        algorithm: Hash algorithm to use
        
    Returns:
        HMAC signature as hexadecimal string
    """
    # Convert inputs to bytes
    key_bytes = key.encode('utf-8')
    message_bytes = message.encode('utf-8')
    
    # Create HMAC
    if algorithm == 'sha256':
        h = hmac.new(key_bytes, message_bytes, hashlib.sha256)
    elif algorithm == 'sha512':
        h = hmac.new(key_bytes, message_bytes, hashlib.sha512)
    else:
        raise ValueError(f"Unsupported algorithm: {algorithm}")
        
    # Return hexadecimal digest
    return h.hexdigest()


def verify_hmac_signature(
    key: str,
    message: str,
    signature: str,
    algorithm: str = 'sha256'
) -> bool:
    """Verify an HMAC signature.
    
    Args:
        key: Secret key
        message: Original message
        signature: HMAC signature to verify
        algorithm: Hash algorithm used
        
    Returns:
        True if signature is valid
    """
    # Calculate expected signature
    expected = create_hmac_signature(key, message, algorithm)
    
    # Compare signatures (constant-time comparison)
    return hmac.compare_digest(signature, expected)


def sanitize_input(text: str, allowed_patterns: Optional[List[str]] = None) -> str:
    """Sanitize user input to prevent injection attacks.
    
    Args:
        text: Input text to sanitize
        allowed_patterns: List of regex patterns for allowed content
        
    Returns:
        Sanitized input
    """
    if not text:
        return ""
        
    # Remove control characters
    text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
    
    # Apply allowed patterns if specified
    if allowed_patterns:
        # Filter out anything not matching allowed patterns
        filtered = ""
        for pattern in allowed_patterns:
            matches = re.finditer(pattern, text)
            for match in matches:
                filtered += match.group(0)
        return filtered
    
    # Default sanitization (alphanumeric, spaces, and common punctuation)
    return re.sub(r'[^\w\s.,;:!?"\'-]', '', text)


def sanitize_path(path: str) -> str:
    """Sanitize file path to prevent path traversal attacks.
    
    Args:
        path: File path to sanitize
        
    Returns:
        Sanitized path
    """
    if not path:
        return ""
        
    # Normalize path separators
    path = path.replace('\\', '/')
    
    # Remove path traversal sequences
    path = re.sub(r'\.\.[/\\]', '', path)
    path = re.sub(r'[/\\]\.\.[/\\]', '/', path)
    
    # Remove multiple consecutive slashes
    path = re.sub(r'[/\\]{2,}', '/', path)
    
    # Remove leading slash
    path = re.sub(r'^[/\\]', '', path)
    
    # Remove dangerous characters
    path = re.sub(r'[<>:"|?*]', '', path)
    
    return path


def create_session_token(user_id: str, expires_in: int = 86400) -> Dict[str, Any]:
    """Create a session token for a user.
    
    Args:
        user_id: User identifier
        expires_in: Token expiration time in seconds
        
    Returns:
        Dictionary with token and expiration
    """
    # Generate expiration timestamp
    expiration = int(time.time()) + expires_in
    
    # Generate random token
    token = generate_random_string(48)
    
    # Compute signature
    # In a real implementation, use a secure key from config
    secret_key = get_env('SESSION_SECRET_KEY', 'default_session_key')
    signature_msg = f"{user_id}:{token}:{expiration}"
    signature = create_hmac_signature(secret_key, signature_msg)
    
    return {
        'token': token,
        'signature': signature,
        'user_id': user_id,
        'expiration': expiration,
    }


def verify_session_token(token_data: Dict[str, Any]) -> bool:
    """Verify a session token.
    
    Args:
        token_data: Token data dictionary
        
    Returns:
        True if token is valid
    """
    # Check required fields
    required_fields = ['token', 'signature', 'user_id', 'expiration']
    if not all(field in token_data for field in required_fields):
        return False
        
    # Check expiration
    if int(time.time()) > token_data['expiration']:
        return False
        
    # Verify signature
    secret_key = get_env('SESSION_SECRET_KEY', 'default_session_key')
    signature_msg = f"{token_data['user_id']}:{token_data['token']}:{token_data['expiration']}"
    
    return verify_hmac_signature(secret_key, signature_msg, token_data['signature'])


def hash_password(password: str, salt: Optional[str] = None) -> Tuple[str, str]:
    """Hash a password securely.
    
    Args:
        password: Password to hash
        salt: Optional salt (generated if not provided)
        
    Returns:
        Tuple of (hash, salt)
    """
    # Generate salt if not provided
    if not salt:
        salt = secrets.token_hex(16)
        
    # Create key derivation
    key = hashlib.pbkdf2_hmac(
        'sha256',
        password.encode('utf-8'),
        salt.encode('utf-8'),
        100000,  # 100,000 iterations
        dklen=32
    )
    
    # Convert to hexadecimal
    password_hash = key.hex()
    
    return password_hash, salt


def verify_password(password: str, stored_hash: str, salt: str) -> bool:
    """Verify a password against a stored hash.
    
    Args:
        password: Password to verify
        stored_hash: Stored password hash
        salt: Salt used for hashing
        
    Returns:
        True if password is correct
    """
    # Hash the provided password with the same salt
    password_hash, _ = hash_password(password, salt)
    
    # Compare hashes (constant-time comparison)
    return hmac.compare_digest(password_hash, stored_hash)


def is_safe_url(url: str, allowed_hosts: Optional[List[str]] = None) -> bool:
    """Check if a URL is safe to redirect to.
    
    Args:
        url: URL to check
        allowed_hosts: List of allowed hosts
        
    Returns:
        True if URL is safe
    """
    if not url:
        return False
        
    # Check if URL is absolute and has a network location
    if not url.startswith(('http://', 'https://')):
        # Relative URLs are considered safe
        return True
        
    # Parse URL
    try:
        from urllib.parse import urlparse
        parsed_url = urlparse(url)
        
        # Check network location
        if not parsed_url.netloc:
            return False
            
        # Check against allowed hosts
        if allowed_hosts:
            return parsed_url.netloc in allowed_hosts
            
        # Default: only allow relative URLs
        return False
    except Exception:
        return False
```

--------------------------------------------------------------------------------
/tests/unit/test_cache.py:
--------------------------------------------------------------------------------

```python
"""Tests for the cache service."""
import asyncio
from pathlib import Path

import pytest

from ultimate_mcp_server.services.cache import (
    CacheService,
    with_cache,
)
from ultimate_mcp_server.services.cache.strategies import (
    ExactMatchStrategy,
    SemanticMatchStrategy,
    TaskBasedStrategy,
)
from ultimate_mcp_server.utils import get_logger

logger = get_logger("test.cache")


@pytest.fixture
def temp_cache_dir(tmp_path: Path) -> Path:
    """Create a temporary cache directory."""
    cache_dir = tmp_path / "cache"
    cache_dir.mkdir(exist_ok=True)
    return cache_dir


@pytest.fixture
def cache_service(temp_cache_dir: Path) -> CacheService:
    """Get a cache service instance with a temporary directory."""
    return CacheService(
        enabled=True,
        ttl=60,  # Short TTL for testing
        max_entries=10,
        enable_persistence=True,
        cache_dir=str(temp_cache_dir),
        enable_fuzzy_matching=True
    )


class TestCacheService:
    """Tests for the cache service."""
    
    async def test_init(self, cache_service: CacheService):
        """Test cache service initialization."""
        logger.info("Testing cache service initialization", emoji_key="test")
        
        assert cache_service.enabled
        assert cache_service.ttl == 60
        assert cache_service.max_entries == 10
        assert cache_service.enable_persistence
        assert cache_service.enable_fuzzy_matching
        
    async def test_get_set(self, cache_service: CacheService):
        """Test basic get and set operations."""
        logger.info("Testing cache get/set operations", emoji_key="test")
        
        # Set a value
        key = "test-key"
        value = {"text": "Test value", "metadata": {"test": True}}
        await cache_service.set(key, value)
        
        # Get the value back
        result = await cache_service.get(key)
        assert result == value
        
        # Check cache stats
        assert cache_service.metrics.hits == 1
        assert cache_service.metrics.misses == 0
        assert cache_service.metrics.stores == 1
        
    async def test_cache_miss(self, cache_service: CacheService):
        """Test cache miss."""
        logger.info("Testing cache miss", emoji_key="test")
        
        # Get a non-existent key
        result = await cache_service.get("non-existent-key")
        assert result is None
        
        # Check cache stats
        assert cache_service.metrics.hits == 0
        assert cache_service.metrics.misses == 1
        
    async def test_cache_expiry(self, cache_service: CacheService):
        """Test cache entry expiry."""
        logger.info("Testing cache expiry", emoji_key="test")
        
        # Set a value with short TTL
        key = "expiring-key"
        value = {"text": "Expiring value"}
        await cache_service.set(key, value, ttl=1)  # 1 second TTL
        
        # Get immediately (should hit)
        result = await cache_service.get(key)
        assert result == value
        
        # Wait for expiry
        await asyncio.sleep(1.5)
        
        # Get again (should miss)
        result = await cache_service.get(key)
        assert result is None
        
        # Check stats
        assert cache_service.metrics.hits == 1
        assert cache_service.metrics.misses == 1
        
    async def test_cache_eviction(self, cache_service: CacheService):
        """Test cache eviction when max size is reached."""
        logger.info("Testing cache eviction", emoji_key="test")
        
        # Set max_entries + 1 values
        for i in range(cache_service.max_entries + 5):
            key = f"key-{i}"
            value = {"text": f"Value {i}"}
            await cache_service.set(key, value)
            
        # Check size - should be at most max_entries
        assert len(cache_service.cache) <= cache_service.max_entries
        
        # Check stats
        assert cache_service.metrics.evictions > 0
        
    async def test_fuzzy_matching(self, cache_service: CacheService):
        """Test fuzzy matching of cache keys."""
        logger.info("Testing fuzzy matching", emoji_key="test")
        
        # Set a value with a prompt that would generate a fuzzy key
        request_params = {
            "prompt": "What is the capital of France?",
            "model": "test-model",
            "temperature": 0.7
        }
        
        key = cache_service.generate_cache_key(request_params)
        fuzzy_key = cache_service.generate_fuzzy_key(request_params)
        
        value = {"text": "The capital of France is Paris."}
        await cache_service.set(key, value, fuzzy_key=fuzzy_key, request_params=request_params)
        
        # Create a similar request that should match via fuzzy lookup
        similar_request = {
            "prompt": "What is the capital of France? Tell me about it.",
            "model": "different-model",
            "temperature": 0.5
        }
        
        similar_key = cache_service.generate_cache_key(similar_request)
        similar_fuzzy = cache_service.generate_fuzzy_key(similar_request)  # noqa: F841
        
        # Should still find the original value
        result = await cache_service.get(similar_key, fuzzy=True)
        assert result == value
        
    async def test_cache_decorator(self):
        """Test the cache decorator."""
        logger.info("Testing cache decorator", emoji_key="test")
        
        call_count = 0
        
        @with_cache(ttl=60)
        async def test_function(arg1, arg2=None):
            nonlocal call_count
            call_count += 1
            return {"result": arg1 + str(arg2)}
            
        # First call should execute the function
        result1 = await test_function("test", arg2="123")
        assert result1 == {"result": "test123"}
        assert call_count == 1
        
        # Second call with same args should use cache
        result2 = await test_function("test", arg2="123")
        assert result2 == {"result": "test123"}
        assert call_count == 1  # Still 1
        
        # Call with different args should execute function again
        result3 = await test_function("test", arg2="456")
        assert result3 == {"result": "test456"}
        assert call_count == 2
        
        
class TestCacheStrategies:
    """Tests for cache strategies."""
    
    def test_exact_match_strategy(self):
        """Test exact match strategy."""
        logger.info("Testing exact match strategy", emoji_key="test")
        
        strategy = ExactMatchStrategy()
        
        # Generate key for a request
        request = {
            "prompt": "Test prompt",
            "model": "test-model",
            "temperature": 0.7
        }
        
        key = strategy.generate_key(request)
        assert key.startswith("exact:")
        
        # Should cache most requests
        assert strategy.should_cache(request, {"text": "Test response"})
        
        # Shouldn't cache streaming requests
        streaming_request = request.copy()
        streaming_request["stream"] = True
        assert not strategy.should_cache(streaming_request, {"text": "Test response"})
        
    def test_semantic_match_strategy(self):
        """Test semantic match strategy."""
        logger.info("Testing semantic match strategy", emoji_key="test")
        
        strategy = SemanticMatchStrategy()
        
        # Generate key for a request
        request = {
            "prompt": "What is the capital of France?",
            "model": "test-model",
            "temperature": 0.7
        }
        
        key = strategy.generate_key(request)
        assert key.startswith("exact:")  # Primary key is still exact
        
        semantic_key = strategy.generate_semantic_key(request)
        assert semantic_key.startswith("semantic:")
        
        # Should generate similar semantic keys for similar prompts
        similar_request = {
            "prompt": "Tell me the capital city of France?",
            "model": "test-model",
            "temperature": 0.7
        }
        
        similar_semantic_key = strategy.generate_semantic_key(similar_request)
        assert similar_semantic_key.startswith("semantic:")
        
        # The two semantic keys should share many common words
        # This is a bit harder to test deterministically, so we'll skip detailed assertions
        
    def test_task_based_strategy(self):
        """Test task-based strategy."""
        logger.info("Testing task-based strategy", emoji_key="test")
        
        strategy = TaskBasedStrategy()
        
        # Test different task types
        summarization_request = {
            "prompt": "Summarize this document: Lorem ipsum...",
            "model": "test-model",
            "task_type": "summarization"
        }
        
        extraction_request = {
            "prompt": "Extract entities from this text: John Smith...",
            "model": "test-model",
            "task_type": "extraction"
        }
        
        # Generate keys
        summary_key = strategy.generate_key(summarization_request)
        extraction_key = strategy.generate_key(extraction_request)
        
        # Keys should include task type
        assert "summarization" in summary_key
        assert "extraction" in extraction_key
        
        # Task-specific TTL
        summary_ttl = strategy.get_ttl(summarization_request, None)
        extraction_ttl = strategy.get_ttl(extraction_request, None)
        
        # Summarization should have longer TTL than extraction (typically)
        assert summary_ttl is not None
        assert extraction_ttl is not None
        assert summary_ttl > extraction_ttl
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tool_token_counter.py:
--------------------------------------------------------------------------------

```python
import inspect
import json
from typing import Any, Callable, Dict, List, Optional

from rich.console import Console
from rich.table import Table

from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS
from ultimate_mcp_server.tools.base import _get_json_schema_type
from ultimate_mcp_server.utils.text import count_tokens


def extract_tool_info(func: Callable, tool_name: Optional[str] = None) -> Dict[str, Any]:
    """
    Extract tool information from a function, similar to how MCP does it.
    
    Args:
        func: The function to extract information from
        tool_name: Optional custom name for the tool (defaults to function name)
        
    Returns:
        Dictionary containing the tool information
    """
    # Get function name and docstring
    name = tool_name or func.__name__
    description = func.__doc__ or f"Tool: {name}"
    
    # Get function parameters
    sig = inspect.signature(func)
    params = {}
    
    for param_name, param in sig.parameters.items():
        # Skip 'self' parameter for class methods
        if param_name == 'self':
            continue
            
        # Skip context parameter which is usually added by decorators
        if param_name == 'ctx':
            continue
            
        # Also skip state management parameters
        if param_name in ['get_state', 'set_state', 'delete_state']:
            continue
            
        # Get parameter type annotation and default value
        param_type = param.annotation
        param_default = param.default if param.default is not inspect.Parameter.empty else None
        
        # Convert Python type to JSON Schema
        if param_type is not inspect.Parameter.empty:
            param_schema = _get_json_schema_type(param_type)
        else:
            param_schema = {"type": "object"}  # Default to object for unknown types
            
        # Add default value if available
        if param_default is not None:
            param_schema["default"] = param_default
            
        # Add to parameters
        params[param_name] = param_schema
    
    # Construct input schema
    input_schema = {
        "type": "object",
        "properties": params,
        "required": [param_name for param_name, param in sig.parameters.items() 
                    if param.default is inspect.Parameter.empty 
                    and param_name not in ['self', 'ctx', 'get_state', 'set_state', 'delete_state']]
    }
    
    # Construct final tool info
    tool_info = {
        "name": name,
        "description": description,
        "inputSchema": input_schema
    }
    
    return tool_info


def count_tool_registration_tokens(tools: List[Callable], model: str = "gpt-4o") -> int:
    """
    Count the tokens that would be used to register the given tools with an LLM.
    
    Args:
        tools: List of tool functions
        model: The model to use for token counting (default: gpt-4o)
        
    Returns:
        Total number of tokens
    """
    # Extract tool info for each tool
    tool_infos = [extract_tool_info(tool) for tool in tools]
    
    # Convert to JSON string (similar to what MCP does when sending to LLM)
    tools_json = json.dumps({"tools": tool_infos}, ensure_ascii=False)
    
    # Count tokens
    token_count = count_tokens(tools_json, model)
    
    return token_count


def calculate_cost_per_provider(token_count: int) -> Dict[str, float]:
    """
    Calculate the cost of including the tokens as input for various API providers.
    
    Args:
        token_count: Number of tokens
        
    Returns:
        Dictionary mapping provider names to costs in USD
    """
    costs = {}
    
    try:
        # Make sure we can access the cost data structure
        if not isinstance(COST_PER_MILLION_TOKENS, dict):
            console = Console()
            console.print("[yellow]Warning: COST_PER_MILLION_TOKENS is not a dictionary[/yellow]")
            return costs
        
        for provider_name, provider_info in COST_PER_MILLION_TOKENS.items():
            # Skip if provider_info is not a dictionary
            if not isinstance(provider_info, dict):
                continue
                
            # Choose a reasonable default input cost if we can't determine from models
            default_input_cost = 0.01  # $0.01 per million tokens as a safe default
            input_cost_per_million = default_input_cost
            
            try:
                # Try to get cost from provider models if available
                if provider_info and len(provider_info) > 0:
                    # Try to find the most expensive model
                    max_cost = 0
                    for _model_name, model_costs in provider_info.items():
                        if isinstance(model_costs, dict) and 'input' in model_costs:
                            cost = model_costs['input']
                            if cost > max_cost:
                                max_cost = cost
                    
                    if max_cost > 0:
                        input_cost_per_million = max_cost
            except Exception as e:
                # If any error occurs, use the default cost
                console = Console()
                console.print(f"[yellow]Warning getting costs for {provider_name}: {str(e)}[/yellow]")
            
            # Calculate cost for this token count
            cost = (token_count / 1_000_000) * input_cost_per_million
            
            # Store in results
            costs[provider_name] = cost
    except Exception as e:
        console = Console()
        console.print(f"[red]Error calculating costs: {str(e)}[/red]")
    
    return costs


def display_tool_token_usage(current_tools_info: List[Dict[str, Any]], all_tools_info: List[Dict[str, Any]]):
    """
    Display token usage information for tools in a Rich table.
    
    Args:
        current_tools_info: List of tool info dictionaries for currently registered tools
        all_tools_info: List of tool info dictionaries for all available tools
    """
    # Convert to JSON and count tokens
    current_json = json.dumps({"tools": current_tools_info}, ensure_ascii=False)
    all_json = json.dumps({"tools": all_tools_info}, ensure_ascii=False)
    
    current_token_count = count_tokens(current_json)
    all_token_count = count_tokens(all_json)
    
    # Calculate size in KB
    current_kb = len(current_json) / 1024
    all_kb = len(all_json) / 1024
    
    # Calculate costs for each provider
    current_costs = calculate_cost_per_provider(current_token_count)
    all_costs = calculate_cost_per_provider(all_token_count)
    
    # Create Rich table
    console = Console()
    table = Table(title="Tool Registration Token Usage")
    
    # Add columns
    table.add_column("Metric", style="cyan")
    table.add_column("Current Tools", style="green")
    table.add_column("All Tools", style="yellow")
    table.add_column("Difference", style="magenta")
    
    # Add rows
    table.add_row(
        "Number of Tools", 
        str(len(current_tools_info)),
        str(len(all_tools_info)),
        str(len(all_tools_info) - len(current_tools_info))
    )
    
    table.add_row(
        "Size (KB)", 
        f"{current_kb:.2f}",
        f"{all_kb:.2f}",
        f"{all_kb - current_kb:.2f}"
    )
    
    table.add_row(
        "Token Count", 
        f"{current_token_count:,}",
        f"{all_token_count:,}",
        f"{all_token_count - current_token_count:,}"
    )
    
    # Add cost rows for each provider
    for provider_name in sorted(current_costs.keys()):
        current_cost = current_costs.get(provider_name, 0)
        all_cost = all_costs.get(provider_name, 0)
        
        table.add_row(
            f"Cost ({provider_name})", 
            f"${current_cost:.4f}",
            f"${all_cost:.4f}",
            f"${all_cost - current_cost:.4f}"
        )
    
    # Print table
    console.print(table)
    
    return {
        "current_tools": {
            "count": len(current_tools_info),
            "size_kb": current_kb,
            "tokens": current_token_count,
            "costs": current_costs
        },
        "all_tools": {
            "count": len(all_tools_info),
            "size_kb": all_kb,
            "tokens": all_token_count,
            "costs": all_costs
        }
    }


async def count_registered_tools_tokens(mcp_server):
    """
    Count tokens for tools that are currently registered with the MCP server.
    
    Args:
        mcp_server: The MCP server instance
        
    Returns:
        Dictionary with token counts and costs
    """
    # Get registered tools info from the server
    # Since we might not have direct access to the function objects, extract tool info from the MCP API
    if hasattr(mcp_server, 'tools') and hasattr(mcp_server.tools, 'list'):
        # Try to get tool definitions directly
        current_tools_info = await mcp_server.tools.list()
    else:
        # Fallback if we can't access the tools directly
        current_tools_info = []
        console = Console()
        console.print("[yellow]Warning: Could not directly access registered tools from MCP server[/yellow]")

    try:
        # Import all available tools
        from ultimate_mcp_server.tools import STANDALONE_TOOL_FUNCTIONS
        
        # Extract full tool info for all available tools
        all_tools_info = [extract_tool_info(func) for func in STANDALONE_TOOL_FUNCTIONS]
    except ImportError:
        console = Console()
        console.print("[yellow]Warning: Could not import STANDALONE_TOOL_FUNCTIONS[/yellow]")
        all_tools_info = []
    
    # Display token usage
    result = display_tool_token_usage(current_tools_info, all_tools_info)
    
    return result 
```
Page 2/35FirstPrevNextLast