#
tokens: 49072/50000 14/207 files (page 4/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 4 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/test_client.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Simple test client for Ultimate MCP Server
Tests basic functionality over streamable-HTTP transport
"""

import asyncio
import json

from fastmcp import Client


async def test_server_connection():
    """Test basic server connection and functionality."""
    server_url = "http://127.0.0.1:8013/mcp"

    print(f"🔗 Connecting to Ultimate MCP Server at {server_url}")

    try:
        async with Client(server_url) as client:
            print("✅ Successfully connected to server")

            # Test 1: List available tools
            print("\n📋 Listing available tools...")
            tools = await client.list_tools()
            print(f"Found {len(tools)} tools:")
            for i, tool in enumerate(tools[:10]):  # Show first 10
                print(f"  {i + 1:2d}. {tool.name}")
            if len(tools) > 10:
                print(f"  ... and {len(tools) - 10} more tools")

            # Test 2: List available resources
            print("\n📚 Listing available resources...")
            resources = await client.list_resources()
            print(f"Found {len(resources)} resources:")
            for resource in resources:
                print(f"  - {resource.uri}")

            # Test 3: Test echo tool (should be available)
            print("\n🔊 Testing echo tool...")
            try:
                echo_result = await client.call_tool("echo", {"message": "Hello from test client!"})
                if echo_result:
                    print(f"✅ Echo response: {echo_result[0].text}")
                else:
                    print("❌ Echo tool returned no response")
            except Exception as e:
                print(f"❌ Echo tool failed: {e}")

            # Test 4: Test provider status tool
            print("\n🔌 Testing provider status...")
            try:
                provider_result = await client.call_tool("get_provider_status", {})
                if provider_result:
                    provider_data = json.loads(provider_result[0].text)
                    providers = provider_data.get("providers", {})
                    print(f"✅ Found {len(providers)} providers:")
                    for name, status in providers.items():
                        available = "✅" if status.get("available") else "❌"
                        model_count = len(status.get("models", []))
                        enabled = status.get("enabled", False)
                        api_key_configured = status.get("api_key_configured", False)
                        
                        status_str = f"{available} {name}: {model_count} models"
                        if not enabled:
                            status_str += " (disabled)"
                        elif not api_key_configured:
                            status_str += " (no API key)"
                        elif status.get("error"):
                            status_str += f" (error: {status['error'][:50]}...)"
                        
                        print(f"  {status_str}")
            except Exception as e:
                print(f"❌ Provider status failed: {e}")

            # Test 5: Read a resource
            print("\n📖 Testing resource reading...")
            if resources:
                try:
                    resource_uri = resources[0].uri
                    resource_content = await client.read_resource(resource_uri)
                    if resource_content:
                        # FastMCP uses 'text' attribute, not 'content'
                        content = resource_content[0].text if hasattr(resource_content[0], 'text') else str(resource_content[0])
                        preview = content[:200] + "..." if len(content) > 200 else content
                        print(f"✅ Resource {resource_uri} content preview:")
                        print(f"  {preview}")
                except Exception as e:
                    print(f"❌ Resource reading failed: {e}")

            # Test 6: Test a completion tool
            print("\n🤖 Testing completion tool...")
            try:
                completion_result = await client.call_tool(
                    "generate_completion",
                    {
                        "prompt": "Say hello in a creative way",
                        "provider": "ollama",  # Using local Ollama since it's available
                        "model": "mix_77/gemma3-qat-tools:27b",
                        "max_tokens": 100,  # Increased for better response
                    },
                )
                if completion_result:
                    result_data = json.loads(completion_result[0].text)
                    response_text = result_data.get('text', '').strip()
                    if response_text:
                        print(f"✅ Completion response: {response_text}")
                    else:
                        print("⚠️ Completion succeeded but returned empty text")
                        print(f"  Model: {result_data.get('model', 'unknown')}")
                        print(f"  Processing time: {result_data.get('processing_time', 0):.2f}s")
                        print(f"  Tokens: {result_data.get('tokens', {})}")
            except Exception as e:
                print(f"❌ Completion tool failed: {e}")
                # Try with a different provider
                try:
                    completion_result = await client.call_tool(
                        "generate_completion",
                        {
                            "prompt": "Say hello in a creative way",
                            "provider": "anthropic",
                            "model": "claude-3-haiku-20240307",
                            "max_tokens": 100,
                        },
                    )
                    if completion_result:
                        result_data = json.loads(completion_result[0].text)
                        response_text = result_data.get('text', '').strip()
                        if response_text:
                            print(f"✅ Completion response (anthropic): {response_text}")
                        else:
                            print("⚠️ Anthropic completion succeeded but returned empty text")
                except Exception as e2:
                    print(f"❌ Completion with anthropic also failed: {e2}")

            print("\n🎉 Basic functionality test completed!")

    except Exception as e:
        print(f"❌ Failed to connect to server: {e}")
        print("Make sure the server is running at the correct address.")


async def test_specific_tools():
    """Test some specific tools that should be available."""
    server_url = "http://127.0.0.1:8013/mcp"

    print("\n🔧 Testing specific tools...")

    try:
        async with Client(server_url) as client:
            # Test filesystem tools
            print("\n📁 Testing filesystem tools...")
            try:
                # List allowed directories
                dirs_result = await client.call_tool("list_allowed_directories", {})
                if dirs_result:
                    print(f"✅ Allowed directories: {dirs_result[0].text}")

                # Try to list current directory
                ls_result = await client.call_tool("list_directory", {"path": "."})
                if ls_result:
                    ls_data = json.loads(ls_result[0].text)
                    files = ls_data.get("files", [])
                    print(f"✅ Current directory has {len(files)} items")
            except Exception as e:
                print(f"❌ Filesystem tools failed: {e}")

            # Test Python execution
            print("\n🐍 Testing Python execution...")
            try:
                python_result = await client.call_tool(
                    "execute_python",
                    {
                        "code": "print('Hello from Python!'); result = 2 + 2; print(f'2 + 2 = {result}')"
                    },
                )
                if python_result:
                    print("✅ Python execution result:")
                    result_data = json.loads(python_result[0].text)
                    # The field is called 'stdout', not 'output'
                    print(f"  Output: {result_data.get('stdout', 'No output')}")
                    print(f"  Success: {result_data.get('success', False)}")
                    if result_data.get('result') is not None:
                        print(f"  Result: {result_data.get('result')}")
            except Exception as e:
                print(f"❌ Python execution failed: {e}")

            # Test text processing tools
            print("\n📝 Testing text processing tools...")
            try:
                # Test ripgrep if available - tool expects args_str parameter
                ripgrep_result = await client.call_tool(
                    "run_ripgrep", {
                        "args_str": "'FastMCP' . -t py",
                        "input_dir": True  # Since we're searching in a directory
                    }
                )
                if ripgrep_result:
                    result_data = json.loads(ripgrep_result[0].text)
                    if result_data.get('success'):
                        print("✅ Ripgrep executed successfully")
                        stdout = result_data.get('stdout', '')
                        if stdout.strip():
                            print(f"  Found matches: {len(stdout.strip().splitlines())} lines")
                        else:
                            print("  No matches found")
                    else:
                        print(f"❌ Ripgrep failed: {result_data.get('error', 'Unknown error')}")
            except Exception as e:
                print(f"❌ Text processing tools failed: {e}")

    except Exception as e:
        print(f"❌ Failed during specific tool testing: {e}")


async def interactive_mode():
    """Interactive mode for testing tools manually."""
    server_url = "http://127.0.0.1:8013/mcp"

    print("\n🎮 Entering interactive mode...")
    print("Type 'list' to see available tools, 'quit' to exit")

    try:
        async with Client(server_url) as client:
            tools = await client.list_tools()
            tool_names = [tool.name for tool in tools]

            while True:
                try:
                    command = input("\n> ").strip()

                    if command.lower() in ["quit", "exit", "q"]:
                        break
                    elif command.lower() == "list":
                        print("Available tools:")
                        for i, name in enumerate(tool_names[:20]):  # Show first 20
                            print(f"  {i + 1:2d}. {name}")
                        if len(tool_names) > 20:
                            print(f"  ... and {len(tool_names) - 20} more")
                    elif command.lower() == "resources":
                        resources = await client.list_resources()
                        print("Available resources:")
                        for resource in resources:
                            print(f"  - {resource.uri}")
                    elif command.startswith("call "):
                        # Parse tool call: call tool_name {"param": "value"}
                        parts = command[5:].split(" ", 1)
                        tool_name = parts[0]
                        params = {}
                        if len(parts) > 1:
                            try:
                                params = json.loads(parts[1])
                            except json.JSONDecodeError:
                                print("❌ Invalid JSON for parameters")
                                continue

                        if tool_name in tool_names:
                            try:
                                result = await client.call_tool(tool_name, params)
                                if result:
                                    print(f"✅ Result: {result[0].text}")
                                else:
                                    print("❌ No result returned")
                            except Exception as e:
                                print(f"❌ Tool call failed: {e}")
                        else:
                            print(f"❌ Tool '{tool_name}' not found")
                    elif command.startswith("read "):
                        # Read resource: read resource_uri
                        resource_uri = command[5:].strip()
                        try:
                            result = await client.read_resource(resource_uri)
                            if result:
                                # FastMCP uses 'text' attribute, not 'content'
                                content = result[0].text if hasattr(result[0], 'text') else str(result[0])
                                preview = content[:500] + "..." if len(content) > 500 else content
                                print(f"✅ Resource content: {preview}")
                            else:
                                print("❌ No content returned")
                        except Exception as e:
                            print(f"❌ Resource read failed: {e}")
                    else:
                        print("Commands:")
                        print("  list                     - List available tools")
                        print("  resources                - List available resources")
                        print("  call <tool> <json_params> - Call a tool")
                        print("  read <resource_uri>      - Read a resource")
                        print("  quit                     - Exit interactive mode")

                except KeyboardInterrupt:
                    break
                except EOFError:
                    break

    except Exception as e:
        print(f"❌ Interactive mode failed: {e}")


async def main():
    """Main test function."""
    print("🚀 Ultimate MCP Server Test Client")
    print("=" * 50)

    # Run basic connection test
    await test_server_connection()

    # Run specific tool tests
    await test_specific_tools()

    # Ask if user wants interactive mode
    try:
        response = input("\nWould you like to enter interactive mode? (y/n): ").strip().lower()
        if response in ["y", "yes"]:
            await interactive_mode()
    except (KeyboardInterrupt, EOFError):
        pass

    print("\n👋 Test client finished!")


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/test_stdio_client.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Stdio Test Client for Ultimate MCP Server
Tests server functionality over stdio transport
"""

import asyncio
import json
import os
import subprocess
from pathlib import Path

from fastmcp import Client


async def test_stdio_server():
    """Test Ultimate MCP Server over stdio transport."""
    print("📡 Ultimate MCP Server Stdio Test Client")
    print("=" * 50)
    print("🔗 Starting Ultimate MCP Server in stdio mode...")
    
    # Find the umcp command
    umcp_cmd = None
    if os.path.exists("uv.lock"):
        # Try uv run first
        umcp_cmd = ["uv", "run", "umcp", "run"]
    else:
        # Try direct umcp command
        umcp_cmd = ["umcp", "run"]
    
    print(f"📡 Command: {' '.join(umcp_cmd)}")
    
    try:
        # Start the server process in stdio mode
        # Note: stdio is the default mode, so no -t flag needed
        process = subprocess.Popen(
            umcp_cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=0,  # Unbuffered
            cwd=Path.cwd(),
            env=os.environ.copy()
        )
        
        print("✅ Server process started, connecting via stdio...")
        
        # Create FastMCP client for stdio transport
        # Use the process stdin/stdout for communication
        async with Client.stdio(
            process.stdin,
            process.stdout
        ) as client:
            print("✅ Successfully connected to stdio server")
            
            # Test 1: List available tools
            print("\n📋 Testing tool discovery via stdio...")
            tools = await client.list_tools()
            print(f"Found {len(tools)} tools via stdio transport:")
            for i, tool in enumerate(tools[:10]):  # Show first 10
                print(f"  {i+1:2d}. {tool.name}")
            if len(tools) > 10:
                print(f"  ... and {len(tools) - 10} more tools")
            
            # Test 2: List available resources
            print("\n📚 Testing resource discovery via stdio...")
            resources = await client.list_resources()
            print(f"Found {len(resources)} resources:")
            for resource in resources:
                print(f"  - {resource.uri}")
            
            # Test 3: Echo tool test
            print("\n🔊 Testing echo tool via stdio...")
            echo_result = await client.call_tool("echo", {"message": "Hello from stdio client!"})
            if echo_result:
                echo_data = json.loads(echo_result[0].text)
                print(f"✅ Echo response: {json.dumps(echo_data, indent=2)}")
            
            # Test 4: Provider status test
            print("\n🔌 Testing provider status via stdio...")
            try:
                provider_result = await client.call_tool("get_provider_status", {})
                if provider_result:
                    provider_data = json.loads(provider_result[0].text)
                    providers = provider_data.get("providers", {})
                    print(f"✅ Found {len(providers)} providers via stdio:")
                    for name, status in providers.items():
                        available = "✅" if status.get("available") else "❌"
                        model_count = len(status.get("models", []))
                        print(f"  {available} {name}: {model_count} models")
            except Exception as e:
                print(f"❌ Provider status failed: {e}")
            
            # Test 5: Resource reading test
            print("\n📖 Testing resource reading via stdio...")
            if resources:
                try:
                    resource_uri = resources[0].uri
                    resource_content = await client.read_resource(resource_uri)
                    if resource_content:
                        content = resource_content[0].text
                        preview = content[:200] + "..." if len(content) > 200 else content
                        print(f"✅ Resource {resource_uri} content preview:")
                        print(f"  {preview}")
                except Exception as e:
                    print(f"❌ Resource reading failed: {e}")
            
            # Test 6: Simple completion test (if providers available)
            print("\n🤖 Testing completion via stdio...")
            try:
                completion_result = await client.call_tool(
                    "generate_completion",
                    {
                        "prompt": "Say hello in exactly 3 words",
                        "provider": "ollama",
                        "model": "mix_77/gemma3-qat-tools:27b",
                        "max_tokens": 10,
                    },
                )
                if completion_result:
                    result_data = json.loads(completion_result[0].text)
                    print("✅ Completion via stdio:")
                    print(f"  Text: '{result_data.get('text', 'No text')}'")
                    print(f"  Model: {result_data.get('model', 'Unknown')}")
                    print(f"  Success: {result_data.get('success', False)}")
                    print(f"  Processing time: {result_data.get('processing_time', 0):.2f}s")
            except Exception as e:
                print(f"⚠️ Completion test failed (expected if no providers): {e}")
            
            # Test 7: Filesystem tool test
            print("\n📁 Testing filesystem tools via stdio...")
            try:
                dirs_result = await client.call_tool("list_allowed_directories", {})
                if dirs_result:
                    dirs_data = json.loads(dirs_result[0].text)
                    print(f"✅ Allowed directories via stdio: {dirs_data.get('count', 0)} directories")
            except Exception as e:
                print(f"❌ Filesystem test failed: {e}")
            
            # Test 8: Text processing tool test
            print("\n📝 Testing text processing via stdio...")
            try:
                ripgrep_result = await client.call_tool(
                    "run_ripgrep", 
                    {
                        "args_str": "'import' . -t py --max-count 3",
                        "input_dir": "."
                    }
                )
                if ripgrep_result:
                    ripgrep_data = json.loads(ripgrep_result[0].text)
                    if ripgrep_data.get("success"):
                        lines = ripgrep_data.get("output", "").split('\n')
                        line_count = len([l for l in lines if l.strip()])  # noqa: E741
                        print(f"✅ Ripgrep via stdio found {line_count} matching lines")
                    else:
                        print("⚠️ Ripgrep completed but found no matches")
            except Exception as e:
                print(f"❌ Text processing test failed: {e}")
            
            print("\n🎉 Stdio transport functionality test completed!")
            
        # Clean up process
        print("\n🔄 Shutting down server process...")
        process.terminate()
        try:
            process.wait(timeout=5)
            print("✅ Server process terminated cleanly")
        except subprocess.TimeoutExpired:
            print("⚠️ Server process didn't terminate, forcing kill...")
            process.kill()
            process.wait()
        
        return True
        
    except FileNotFoundError:
        print("❌ Could not find umcp command")
        print("\nTroubleshooting:")
        print("1. Make sure you're in the Ultimate MCP Server directory")
        print("2. Make sure umcp is installed and in PATH")
        print("3. Try running 'uv run umcp run' manually to test")
        return False
    except Exception as e:
        print(f"❌ Stdio connection failed: {e}")
        print("\nTroubleshooting:")
        print("1. Make sure the server can start in stdio mode")
        print("2. Check for any startup errors in stderr")
        print("3. Verify all dependencies are installed")
        
        # Try to get stderr from process if available
        if 'process' in locals():
            try:
                stderr_output = process.stderr.read() if process.stderr else ""
                if stderr_output:
                    print(f"\nServer stderr:\n{stderr_output}")
                process.terminate()
                process.wait(timeout=5)
            except Exception:
                pass
        
        return False


async def test_stdio_interactive():
    """Interactive stdio testing mode."""
    print("\n🎮 Entering stdio interactive mode...")
    print("⚠️ Note: Interactive mode with stdio requires careful process management")
    print("Type 'list' to see available tools, 'quit' to exit")
    
    # Find the umcp command
    umcp_cmd = None
    if os.path.exists("uv.lock"):
        umcp_cmd = ["uv", "run", "umcp", "run"]
    else:
        umcp_cmd = ["umcp", "run"]
    
    try:
        # Start the server process
        process = subprocess.Popen(
            umcp_cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=0,
            cwd=Path.cwd(),
            env=os.environ.copy()
        )
        
        async with Client.stdio(process.stdin, process.stdout) as client:
            tools = await client.list_tools()
            resources = await client.list_resources()
            
            while True:
                try:
                    command = input("\nStdio> ").strip()
                    
                    if command.lower() in ['quit', 'exit', 'q']:
                        print("👋 Goodbye!")
                        break
                    elif command.lower() == 'list':
                        print("Available tools:")
                        for i, tool in enumerate(tools[:20]):
                            print(f"  {i+1:2d}. {tool.name}")
                        if len(tools) > 20:
                            print(f"  ... and {len(tools) - 20} more")
                    elif command.lower() == 'resources':
                        print("Available resources:")
                        for resource in resources:
                            print(f"  - {resource.uri}")
                    elif command.startswith("tool "):
                        # Call tool: tool <tool_name> <json_params>
                        parts = command[5:].split(' ', 1)
                        tool_name = parts[0]
                        params = json.loads(parts[1]) if len(parts) > 1 else {}
                        
                        try:
                            result = await client.call_tool(tool_name, params)
                            if result:
                                print(f"✅ Tool result: {result[0].text}")
                            else:
                                print("❌ No result returned")
                        except Exception as e:
                            print(f"❌ Tool call failed: {e}")
                    elif command.startswith("read "):
                        # Read resource: read <resource_uri>
                        resource_uri = command[5:].strip()
                        try:
                            result = await client.read_resource(resource_uri)
                            if result:
                                content = result[0].text
                                preview = content[:500] + "..." if len(content) > 500 else content
                                print(f"✅ Resource content: {preview}")
                            else:
                                print("❌ No content returned")
                        except Exception as e:
                            print(f"❌ Resource read failed: {e}")
                    else:
                        print("Commands:")
                        print("  list          - List available tools")
                        print("  resources     - List available resources")
                        print("  tool <name> <params>  - Call a tool with JSON params")
                        print("  read <uri>    - Read a resource")
                        print("  quit          - Exit interactive mode")
                
                except KeyboardInterrupt:
                    print("\n👋 Goodbye!")
                    break
                except Exception as e:
                    print(f"❌ Command error: {e}")
        
        # Clean up
        process.terminate()
        try:
            process.wait(timeout=5)
        except subprocess.TimeoutExpired:
            process.kill()
            process.wait()
    
    except Exception as e:
        print(f"❌ Stdio interactive mode failed: {e}")


def check_prerequisites():
    """Check if prerequisites are available."""
    print("🔍 Checking prerequisites...")
    
    # Check if we're in the right directory
    if not Path("pyproject.toml").exists():
        print("❌ Not in Ultimate MCP Server directory (no pyproject.toml found)")
        return False
    
    # Check if umcp is available
    try:
        if Path("uv.lock").exists():
            result = subprocess.run(["uv", "run", "umcp", "--version"], 
                                  capture_output=True, text=True, timeout=10)
        else:
            result = subprocess.run(["umcp", "--version"], 
                                  capture_output=True, text=True, timeout=10)
        
        if result.returncode == 0:
            print("✅ umcp command is available")
            return True
        else:
            print(f"❌ umcp command failed: {result.stderr}")
            return False
    except FileNotFoundError:
        print("❌ umcp command not found")
        print("Try: pip install -e . or uv sync")
        return False
    except subprocess.TimeoutExpired:
        print("❌ umcp command timed out")
        return False
    except Exception as e:
        print(f"❌ Error checking umcp: {e}")
        return False


async def main():
    """Main test function."""
    # Check prerequisites first
    if not check_prerequisites():
        print("\n❌ Prerequisites not met. Please fix the issues above.")
        return
    
    print("✅ Prerequisites check passed\n")
    
    # Run basic functionality test
    success = await test_stdio_server()
    
    if success:
        # Ask if user wants interactive mode
        try:
            response = input("\nWould you like to enter stdio interactive mode? (y/n): ").strip().lower()
            if response in ['y', 'yes']:
                await test_stdio_interactive()
        except KeyboardInterrupt:
            print("\n👋 Goodbye!")
    else:
        print("\n❌ Basic stdio test failed. Skipping interactive mode.")


if __name__ == "__main__":
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/formatter.py:
--------------------------------------------------------------------------------

```python
"""
Log formatters for Gateway logging system.

This module provides formatters that convert log records into Rich renderables
with consistent styling and visual elements.
"""
import logging
import time
import traceback
from datetime import datetime
from typing import Any, Dict, Optional, Tuple

from rich.columns import Columns
from rich.console import Console, ConsoleRenderable, Group
from rich.logging import RichHandler
from rich.panel import Panel
from rich.style import Style
from rich.table import Table
from rich.text import Text
from rich.traceback import Traceback

from .console import get_rich_console  # Import the console factory

# Use relative imports for utils within the same package
from .emojis import LEVEL_EMOJIS, get_emoji
from .themes import get_component_style, get_level_style


class GatewayLogRecord:
    """Enhanced log record simulation using standard LogRecord attributes.
    
    This class is mostly for documentation and conceptual clarity.
    The actual data comes from the standard logging.LogRecord, 
    populated via the 'extra' dictionary in the Logger._log method.
    """
    
    def __init__(self, record: logging.LogRecord):
        """Initialize from a standard logging.LogRecord."""
        self.record = record
        
    @property
    def level(self) -> str:
        """Get the original Gateway log level name (e.g., 'success')."""
        return getattr(self.record, 'gateway_level', self.record.levelname.lower())
        
    @property
    def message(self) -> str:
        """Get the log message."""
        return self.record.getMessage()
        
    @property
    def component(self) -> Optional[str]:
        """Get the Gateway component."""
        comp = getattr(self.record, 'component', None)
        return comp.lower() if comp else None
        
    @property
    def operation(self) -> Optional[str]:
        """Get the Gateway operation."""
        op = getattr(self.record, 'operation', None)
        return op.lower() if op else None

    @property
    def custom_emoji(self) -> Optional[str]:
        """Get the custom emoji override."""
        return getattr(self.record, 'custom_emoji', None)

    @property
    def context(self) -> Optional[Dict[str, Any]]:
        """Get the additional context data."""
        return getattr(self.record, 'log_context', None)

    @property
    def timestamp(self) -> float:
        """Get the log record creation time."""
        return self.record.created

    @property
    def exception_info(self) -> Optional[Tuple]:
        """Get the exception info tuple."""
        return self.record.exc_info

    @property
    def emoji(self) -> str:
        """Get the appropriate emoji for this log record."""
        if self.custom_emoji:
            return self.custom_emoji
            
        # Use operation emoji if available
        if self.operation:
            operation_emoji = get_emoji("operation", self.operation)
            if operation_emoji != "❓":  # If not unknown
                return operation_emoji
        
        # Fall back to level emoji (use gateway_level if available)
        return LEVEL_EMOJIS.get(self.level, "❓")
    
    @property
    def style(self) -> Style:
        """Get the appropriate style for this log record."""
        return get_level_style(self.level)
    
    @property
    def component_style(self) -> Style:
        """Get the style for this record's component."""
        if not self.component:
            return self.style
        return get_component_style(self.component)
    
    @property
    def format_time(self) -> str:
        """Format the timestamp for display."""
        dt = datetime.fromtimestamp(self.timestamp)
        return dt.strftime("%H:%M:%S.%f")[:-3]  # Trim microseconds to milliseconds
    
    def has_exception(self) -> bool:
        """Check if this record contains exception information."""
        return self.record.exc_info is not None

class GatewayLogFormatter(logging.Formatter):
    """Base formatter for Gateway logs that converts to Rich renderables.
    Adapts standard Formatter for Rich output.
    """
    
    def __init__(
        self,
        fmt: Optional[str] = None,
        datefmt: Optional[str] = None,
        style: str = '%',
        show_time: bool = True, 
        show_level: bool = True, 
        show_component: bool = True,
        show_path: bool = False,
        **kwargs
    ):
        """Initialize the formatter.
        
        Args:
            fmt: Format string (standard logging format)
            datefmt: Date format string
            style: Formatting style ('%', '{', '$')
            show_time: Whether to show timestamp in Rich output
            show_level: Whether to show log level in Rich output
            show_component: Whether to show component in Rich output
            show_path: Whether to show path/lineno in Rich output
            **kwargs: Additional args for base Formatter
        """
        super().__init__(fmt=fmt, datefmt=datefmt, style=style, **kwargs)
        self.show_time = show_time
        self.show_level = show_level
        self.show_component = show_component
        self.show_path = show_path
    
    def format(self, record: logging.LogRecord) -> str:
        """Format the record into a string (for non-Rich handlers)."""
        # Use default formatting for file/non-rich output
        # Add custom fields to the record temporarily if needed
        record.gateway_component = getattr(record, 'component', '')
        record.gateway_operation = getattr(record, 'operation', '')
        # Use the standard Formatter implementation
        return super().format(record)

    def format_rich(self, record: logging.LogRecord) -> ConsoleRenderable:
        """Format a standard logging.LogRecord into a Rich renderable.
        
        Args:
            record: The log record to format
            
        Returns:
            A Rich renderable object
        """
        # Subclasses should implement this
        raise NotImplementedError("Subclasses must implement format_rich")

class SimpleLogFormatter(GatewayLogFormatter):
    """Simple single-line log formatter for Rich console output."""
    
    def format_rich(self, record: logging.LogRecord) -> Text:
        """Format a record as a single line of rich text.
        
        Args:
            record: The log record to format
            
        Returns:
            Formatted Text object
        """
        gateway_record = GatewayLogRecord(record) # Wrap for easier access
        result = Text()
        
        # Add timestamp if requested
        if self.show_time:
            result.append(f"[{gateway_record.format_time}] ", style="timestamp")
            
        # Add emoji
        result.append(f"{gateway_record.emoji} ", style=gateway_record.style)
        
        # Add level if requested
        if self.show_level:
            level_text = f"[{gateway_record.level.upper()}] "
            result.append(level_text, style=gateway_record.style)
            
        # Add component if available and requested
        if self.show_component and gateway_record.component:
            component_text = f"[{gateway_record.component}] "
            result.append(component_text, style=gateway_record.component_style)
            
        # Add operation if available
        if gateway_record.operation:
            operation_text = f"{gateway_record.operation}: "
            result.append(operation_text, style="operation")
            
        # Add message
        result.append(gateway_record.message)

        # Add path/line number if requested
        if self.show_path:
             path_text = f" ({record.pathname}:{record.lineno})"
             result.append(path_text, style="dim")

        # Add Exception/Traceback if present (handled by RichHandler.render)
        
        return result

class DetailedLogFormatter(GatewayLogFormatter):
    """Multi-line formatter that can include context data (Placeholder)."""
    
    def format_rich(self, record: logging.LogRecord) -> ConsoleRenderable:
        """Format a record with potentially detailed information.
        
        Args:
            record: The log record to format
            
        Returns:
            Formatted Panel or Text object
        """
        # Fallback to simple formatting for now
        formatter = SimpleLogFormatter(
            show_time=self.show_time,
            show_level=self.show_level,
            show_component=self.show_component,
            show_path=self.show_path
        )
        return formatter.format_rich(record)

class RichLoggingHandler(RichHandler):
    """Custom RichHandler that uses GatewayLogFormatter.
    
    Overrides render to use the custom formatter.
    """
    
    def __init__(
        self,
        level: int = logging.NOTSET,
        console: Optional[Console] = None,
        formatter: Optional[GatewayLogFormatter] = None,
        show_path: bool = False, # Control path display via handler
        **kwargs
    ):
        """Initialize the Rich handler.
        
        Args:
            level: Log level for this handler
            console: Rich console instance (uses global if None)
            formatter: Custom Gateway formatter (creates default if None)
            show_path: Whether to show path/lineno in the logs
            **kwargs: Additional args for RichHandler
        """
        # Use the provided console or the default from console.py
        effective_console = console or get_rich_console()
        
        super().__init__(level=level, console=effective_console, **kwargs)
        
        # Create a default SimpleLogFormatter if none is provided
        self.formatter = formatter or SimpleLogFormatter(show_path=show_path)
        
    def emit(self, record: logging.LogRecord) -> None:
        """Emit a log record using Rich formatting."""
        try:
            # Let the custom formatter create the Rich renderable
            message_renderable = self.format_rich(record)
            
            # Get the traceback if there is one
            traceback_renderable = None
            if record.exc_info:
                traceback_renderable = Traceback.from_exception(
                    *record.exc_info,
                    width=self.console.width if self.console else None, # Check if console exists
                    extra_lines=self.tracebacks_extra_lines,
                    theme=self.tracebacks_theme,
                    word_wrap=self.tracebacks_word_wrap,
                    show_locals=self.tracebacks_show_locals,
                    locals_max_length=self.locals_max_length,
                    locals_max_string=self.locals_max_string,
                    suppress=self.tracebacks_suppress,
                )
            
            # Use the render method to combine message and traceback
            renderable = self.render(
                record=record,
                traceback=traceback_renderable, # Pass the Traceback instance
                message_renderable=message_renderable
            )
            if self.console:
                self.console.print(renderable)
        except Exception:
            self.handleError(record)

    def format_rich(self, record: logging.LogRecord) -> ConsoleRenderable:
        """Format the record using the assigned GatewayLogFormatter."""
        # Ensure formatter is of the correct type before calling format_rich
        if isinstance(self.formatter, GatewayLogFormatter):
            # Indentation corrected: 4 spaces
            return self.formatter.format_rich(record)
        elif isinstance(self.formatter, logging.Formatter):
            # Indentation corrected: 4 spaces
            # Fallback for standard formatter (e.g., if assigned incorrectly)
            return Text(self.formatter.format(record))
        else:
            # Indentation corrected: 4 spaces
            # Fallback if formatter is None or unexpected type
            return Text(record.getMessage())

    def render(
        self,
        *, # Make args keyword-only
        record: logging.LogRecord,
        traceback: Optional[Traceback],
        message_renderable: ConsoleRenderable,
    ) -> ConsoleRenderable:
        """Renders log message and Traceback.
        Overridden to ensure our formatted message_renderable is used correctly.
        
        Args:
            record: logging Record.
            traceback: Traceback instance or None for no Traceback.
            message_renderable: Renderable representing log message.

        Returns:
            Renderable to be written to console.
        """
        # message_renderable is already formatted by format_rich
        # We just need to potentially append the traceback
        if traceback:
            # If the message is simple Text, append newline and traceback
            if isinstance(message_renderable, Text):
                # Check if message already ends with newline for cleaner separation
                if not str(message_renderable).endswith("\n"):
                    message_renderable = Text.assemble(message_renderable, "\n") # Use assemble for safety
                return Group(message_renderable, traceback)
            else:
                # For Panels or other renderables, group them
                return Group(message_renderable, traceback)
        else:
            return message_renderable

def create_rich_console_handler(**kwargs):
    """Factory function to create a RichLoggingHandler. 
    Used in dictConfig.
    
    Args:
        **kwargs: Arguments passed from dictConfig, forwarded to RichLoggingHandler.
                  Includes level, formatter (if specified), show_path, etc.
                  
    Returns:
        Instance of RichLoggingHandler.
    """
    # Ensure console is not passed directly if we want the shared one
    kwargs.pop('console', None)
    
    # Extract formatter config if provided (though unlikely needed with custom handler)
    formatter_config = kwargs.pop('formatter', None) 
    # We expect the handler config to specify the formatter directly or rely on default

    # Extract level, default to NOTSET if not provided
    level_name = kwargs.pop('level', 'NOTSET').upper()
    level = logging.getLevelName(level_name)

    # Extract show_path flag
    show_path = kwargs.pop('show_path', False)
    
    # Create the handler instance
    # Pass relevant args like show_path
    # Also pass RichHandler specific args if they exist in kwargs
    rich_handler_args = { 
        k: v for k, v in kwargs.items() 
        if k in (
            'show_time', 'show_level', 'markup', 'rich_tracebacks', 
            'tracebacks_width', 'tracebacks_extra_lines', 'tracebacks_theme',
            'tracebacks_word_wrap', 'tracebacks_show_locals', 
            'locals_max_length', 'locals_max_string', 'tracebacks_suppress'
        ) 
    }
    # Add show_path explicitly as it's specific to our handler/formatter logic here
    handler = RichLoggingHandler(level=level, show_path=show_path, **rich_handler_args)
    
    # Note: Setting a specific formatter via dictConfig for this custom handler
    # might require more complex logic here to instantiate the correct GatewayLogFormatter.
    # For now, it defaults to SimpleLogFormatter controlled by show_path.
    
    return handler 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/feedback.py:
--------------------------------------------------------------------------------

```python
"""Feedback and adaptive learning service for RAG."""
import json
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Set

import numpy as np

from ultimate_mcp_server.services.vector import get_embedding_service
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


class RAGFeedbackService:
    """Service for collecting and utilizing feedback for RAG."""
    
    def __init__(self, storage_dir: Optional[str] = None):
        """Initialize the feedback service.
        
        Args:
            storage_dir: Directory to store feedback data
        """
        if storage_dir:
            self.storage_dir = Path(storage_dir)
        else:
            self.storage_dir = Path("storage") / "rag_feedback"
            
        # Create storage directory
        self.storage_dir.mkdir(parents=True, exist_ok=True)
        
        # Feedback data structure
        self.document_feedback = {}  # Knowledge base -> document_id -> feedback
        self.query_feedback = {}     # Knowledge base -> query -> feedback
        self.retrieval_stats = {}    # Knowledge base -> document_id -> usage stats
        
        # Load existing feedback data
        self._load_feedback_data()
        
        # Get embedding service for similarity calculations
        self.embedding_service = get_embedding_service()
        
        # Improvement factors (weights for feedback)
        self.feedback_weights = {
            "thumbs_up": 0.1,      # Positive explicit feedback
            "thumbs_down": -0.15,   # Negative explicit feedback
            "used_in_answer": 0.05, # Document was used in the answer
            "not_used": -0.02,      # Document was retrieved but not used
            "time_decay": 0.001,    # Decay factor for time
        }
        
        logger.info("RAG feedback service initialized", extra={"emoji_key": "success"})
    
    def _get_feedback_file(self, kb_name: str) -> Path:
        """Get path to feedback file for a knowledge base.
        
        Args:
            kb_name: Knowledge base name
            
        Returns:
            Path to feedback file
        """
        return self.storage_dir / f"{kb_name}_feedback.json"
    
    def _load_feedback_data(self):
        """Load feedback data from storage."""
        try:
            # Load all feedback files
            for file_path in self.storage_dir.glob("*_feedback.json"):
                try:
                    kb_name = file_path.stem.replace("_feedback", "")
                    
                    with open(file_path, "r") as f:
                        data = json.load(f)
                        
                    self.document_feedback[kb_name] = data.get("document_feedback", {})
                    self.query_feedback[kb_name] = data.get("query_feedback", {})
                    self.retrieval_stats[kb_name] = data.get("retrieval_stats", {})
                    
                    logger.debug(
                        f"Loaded feedback data for knowledge base '{kb_name}'",
                        extra={"emoji_key": "cache"}
                    )
                except Exception as e:
                    logger.error(
                        f"Error loading feedback data from {file_path}: {str(e)}",
                        extra={"emoji_key": "error"}
                    )
        except Exception as e:
            logger.error(
                f"Error loading feedback data: {str(e)}",
                extra={"emoji_key": "error"}
            )
    
    def _save_feedback_data(self, kb_name: str):
        """Save feedback data to storage.
        
        Args:
            kb_name: Knowledge base name
        """
        try:
            file_path = self._get_feedback_file(kb_name)
            
            # Prepare data
            data = {
                "document_feedback": self.document_feedback.get(kb_name, {}),
                "query_feedback": self.query_feedback.get(kb_name, {}),
                "retrieval_stats": self.retrieval_stats.get(kb_name, {}),
                "last_updated": time.time()
            }
            
            # Save to file
            with open(file_path, "w") as f:
                json.dump(data, f, indent=2)
                
            logger.debug(
                f"Saved feedback data for knowledge base '{kb_name}'",
                extra={"emoji_key": "cache"}
            )
        except Exception as e:
            logger.error(
                f"Error saving feedback data for knowledge base '{kb_name}': {str(e)}",
                extra={"emoji_key": "error"}
            )
    
    async def record_retrieval_feedback(
        self,
        knowledge_base_name: str,
        query: str,
        retrieved_documents: List[Dict[str, Any]],
        used_document_ids: Optional[Set[str]] = None,
        explicit_feedback: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Record feedback about retrieval results.
        
        Args:
            knowledge_base_name: Knowledge base name
            query: Query text
            retrieved_documents: List of retrieved documents with IDs and scores
            used_document_ids: Set of document IDs that were used in the answer
            explicit_feedback: Optional explicit feedback (document_id -> feedback)
            
        Returns:
            Feedback recording result
        """
        # Initialize structures if needed
        if knowledge_base_name not in self.document_feedback:
            self.document_feedback[knowledge_base_name] = {}
        
        if knowledge_base_name not in self.query_feedback:
            self.query_feedback[knowledge_base_name] = {}
        
        if knowledge_base_name not in self.retrieval_stats:
            self.retrieval_stats[knowledge_base_name] = {}
        
        # Set default for used_document_ids
        if used_document_ids is None:
            used_document_ids = set()
        
        # Set default for explicit_feedback
        if explicit_feedback is None:
            explicit_feedback = {}
        
        # Record query
        query_hash = query[:100]  # Use prefix as key
        
        if query_hash not in self.query_feedback[knowledge_base_name]:
            self.query_feedback[knowledge_base_name][query_hash] = {
                "query": query,
                "count": 0,
                "last_used": time.time(),
                "retrieved_docs": []
            }
        
        # Update query stats
        self.query_feedback[knowledge_base_name][query_hash]["count"] += 1
        self.query_feedback[knowledge_base_name][query_hash]["last_used"] = time.time()
        
        # Process each retrieved document
        for doc in retrieved_documents:
            doc_id = doc["id"]
            
            # Initialize document feedback if not exists
            if doc_id not in self.document_feedback[knowledge_base_name]:
                self.document_feedback[knowledge_base_name][doc_id] = {
                    "relevance_adjustment": 0.0,
                    "positive_feedback_count": 0,
                    "negative_feedback_count": 0,
                    "used_count": 0,
                    "retrieved_count": 0,
                    "last_used": time.time()
                }
            
            # Update document stats
            doc_feedback = self.document_feedback[knowledge_base_name][doc_id]
            doc_feedback["retrieved_count"] += 1
            doc_feedback["last_used"] = time.time()
            
            # Record if document was used in the answer
            if doc_id in used_document_ids:
                doc_feedback["used_count"] += 1
                doc_feedback["relevance_adjustment"] += self.feedback_weights["used_in_answer"]
            else:
                doc_feedback["relevance_adjustment"] += self.feedback_weights["not_used"]
            
            # Apply explicit feedback if provided
            if doc_id in explicit_feedback:
                feedback_type = explicit_feedback[doc_id]
                
                if feedback_type == "thumbs_up":
                    doc_feedback["positive_feedback_count"] += 1
                    doc_feedback["relevance_adjustment"] += self.feedback_weights["thumbs_up"]
                elif feedback_type == "thumbs_down":
                    doc_feedback["negative_feedback_count"] += 1
                    doc_feedback["relevance_adjustment"] += self.feedback_weights["thumbs_down"]
            
            # Keep adjustment within bounds
            doc_feedback["relevance_adjustment"] = max(-0.5, min(0.5, doc_feedback["relevance_adjustment"]))
            
            # Record document in query feedback
            if doc_id not in self.query_feedback[knowledge_base_name][query_hash]["retrieved_docs"]:
                self.query_feedback[knowledge_base_name][query_hash]["retrieved_docs"].append(doc_id)
        
        # Save feedback data
        self._save_feedback_data(knowledge_base_name)
        
        logger.info(
            f"Recorded feedback for {len(retrieved_documents)} documents in knowledge base '{knowledge_base_name}'",
            extra={"emoji_key": "success"}
        )
        
        return {
            "status": "success",
            "knowledge_base": knowledge_base_name,
            "query": query,
            "documents_count": len(retrieved_documents),
            "used_documents_count": len(used_document_ids)
        }
    
    async def get_document_boost(
        self,
        knowledge_base_name: str,
        document_id: str
    ) -> float:
        """Get relevance boost for a document based on feedback.
        
        Args:
            knowledge_base_name: Knowledge base name
            document_id: Document ID
            
        Returns:
            Relevance boost factor
        """
        if (knowledge_base_name not in self.document_feedback or
                document_id not in self.document_feedback[knowledge_base_name]):
            return 0.0
        
        # Get document feedback
        doc_feedback = self.document_feedback[knowledge_base_name][document_id]
        
        # Calculate time decay
        time_since_last_use = time.time() - doc_feedback.get("last_used", 0)
        time_decay = min(1.0, time_since_last_use / (86400 * 30))  # 30 days max decay
        
        # Apply decay to adjustment
        adjustment = doc_feedback["relevance_adjustment"] * (1.0 - time_decay * self.feedback_weights["time_decay"])
        
        return adjustment
    
    async def get_similar_queries(
        self,
        knowledge_base_name: str,
        query: str,
        top_k: int = 3,
        threshold: float = 0.8
    ) -> List[Dict[str, Any]]:
        """Find similar previous queries.
        
        Args:
            knowledge_base_name: Knowledge base name
            query: Query text
            top_k: Number of similar queries to return
            threshold: Similarity threshold
            
        Returns:
            List of similar queries with metadata
        """
        if knowledge_base_name not in self.query_feedback:
            return []
        
        query_feedback = self.query_feedback[knowledge_base_name]
        
        if not query_feedback:
            return []
        
        # Get embedding for the query
        query_embedding = await self.embedding_service.get_embedding(query)
        
        # Calculate similarity with all previous queries
        similarities = []
        
        for _query_hash, data in query_feedback.items():
            try:
                prev_query = data["query"]
                prev_embedding = await self.embedding_service.get_embedding(prev_query)
                
                # Calculate cosine similarity
                similarity = np.dot(query_embedding, prev_embedding) / (
                    np.linalg.norm(query_embedding) * np.linalg.norm(prev_embedding)
                )
                
                if similarity >= threshold:
                    similarities.append({
                        "query": prev_query,
                        "similarity": float(similarity),
                        "count": data["count"],
                        "last_used": data["last_used"],
                        "retrieved_docs": data["retrieved_docs"]
                    })
            except Exception as e:
                logger.error(
                    f"Error calculating similarity for query: {str(e)}",
                    extra={"emoji_key": "error"}
                )
        
        # Sort by similarity (descending)
        similarities.sort(key=lambda x: x["similarity"], reverse=True)
        
        return similarities[:top_k]
    
    async def apply_feedback_adjustments(
        self,
        knowledge_base_name: str,
        results: List[Dict[str, Any]],
        query: str
    ) -> List[Dict[str, Any]]:
        """Apply feedback-based adjustments to retrieval results.
        
        Args:
            knowledge_base_name: Knowledge base name
            results: List of retrieval results
            query: Query text
            
        Returns:
            Adjusted retrieval results
        """
        # Check if we have feedback data
        if knowledge_base_name not in self.document_feedback:
            return results
        
        # Get similar queries
        similar_queries = await self.get_similar_queries(
            knowledge_base_name=knowledge_base_name,
            query=query,
            top_k=3,
            threshold=0.8
        )
        
        # Collect document IDs from similar queries
        similar_doc_ids = set()
        for sq in similar_queries:
            similar_doc_ids.update(sq["retrieved_docs"])
        
        # Apply boosts to results
        adjusted_results = []
        
        for result in results:
            doc_id = result["id"]
            score = result["score"]
            
            # Apply document-specific boost
            doc_boost = await self.get_document_boost(knowledge_base_name, doc_id)
            
            # Apply boost for documents from similar queries
            similar_query_boost = 0.05 if doc_id in similar_doc_ids else 0.0
            
            # Calculate final score with boosts
            adjusted_score = min(1.0, score + doc_boost + similar_query_boost)
            
            # Update result
            adjusted_result = result.copy()
            adjusted_result["original_score"] = score
            adjusted_result["feedback_boost"] = doc_boost
            adjusted_result["similar_query_boost"] = similar_query_boost
            adjusted_result["score"] = adjusted_score
            
            adjusted_results.append(adjusted_result)
        
        # Sort by adjusted score
        adjusted_results.sort(key=lambda x: x["score"], reverse=True)
        
        return adjusted_results


# Singleton instance
_rag_feedback_service = None


def get_rag_feedback_service() -> RAGFeedbackService:
    """Get or create a RAG feedback service instance.
    
    Returns:
        RAGFeedbackService: RAG feedback service instance
    """
    global _rag_feedback_service
    
    if _rag_feedback_service is None:
        _rag_feedback_service = RAGFeedbackService()
        
    return _rag_feedback_service 
```

--------------------------------------------------------------------------------
/error_handling.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive error handling framework for Model Control Protocol (MCP) systems.

This module implements a consistent, standardized approach to error handling for MCP tools
and services. It provides decorators, formatters, and utilities that transform Python
exceptions into structured, protocol-compliant error responses that LLMs and client
applications can reliably interpret and respond to.

The framework is designed around several key principles:

1. CONSISTENCY: All errors follow the same structured format regardless of their source
2. RECOVERABILITY: Errors include explicit information on whether operations can be retried
3. ACTIONABILITY: Error responses provide specific suggestions for resolving issues
4. DEBUGGABILITY: Rich error details are preserved for troubleshooting
5. CATEGORIZATION: Errors are mapped to standardized types for consistent handling

Key components:
- ErrorType enum: Categorization system for different error conditions
- format_error_response(): Creates standardized error response dictionaries
- with_error_handling: Decorator that catches exceptions and formats responses
- validate_inputs: Decorator for declarative parameter validation
- Validator functions: Reusable validation logic for common parameter types

Usage example:
    ```python
    @with_error_handling
    @validate_inputs(
        prompt=non_empty_string,
        temperature=in_range(0.0, 1.0)
    )
    async def generate_text(prompt, temperature=0.7):
        # Implementation...
        # Any exceptions thrown here will be caught and formatted
        # Input validation happens before execution
        if external_service_down:
            raise Exception("External service unavailable")
        return result
    ```

The error handling pattern is designed to work seamlessly with async functions and
integrates with the MCP protocol's expected error response structure.
"""
import functools
import inspect
import time
import traceback
from enum import Enum
from typing import Any, Callable, Dict, Optional, Union


class ErrorType(str, Enum):
    """Types of errors that can occur in MCP tools."""
    
    VALIDATION_ERROR = "validation_error"  # Input validation failed
    EXECUTION_ERROR = "execution_error"    # Error during execution
    PERMISSION_ERROR = "permission_error"  # Insufficient permissions
    NOT_FOUND_ERROR = "not_found_error"    # Resource not found
    TIMEOUT_ERROR = "timeout_error"        # Operation timed out
    RATE_LIMIT_ERROR = "rate_limit_error"  # Rate limit exceeded
    EXTERNAL_ERROR = "external_error"      # Error in external service
    UNKNOWN_ERROR = "unknown_error"        # Unknown error


def format_error_response(
    error_type: Union[ErrorType, str],
    message: str,
    details: Optional[Dict[str, Any]] = None,
    retriable: bool = False,
    suggestions: Optional[list] = None
) -> Dict[str, Any]:
    """
    Format a standardized error response.
    
    Args:
        error_type: Type of error
        message: Human-readable error message
        details: Additional error details
        retriable: Whether the operation can be retried
        suggestions: List of suggestions for resolving the error
        
    Returns:
        Formatted error response
    """
    return {
        "success": False,
        "isError": True,  # MCP protocol flag
        "error": {
            "type": error_type if isinstance(error_type, str) else error_type.value,
            "message": message,
            "details": details or {},
            "retriable": retriable,
            "suggestions": suggestions or [],
            "timestamp": time.time()
        }
    }


def with_error_handling(func: Callable) -> Callable:
    """
    Decorator that provides standardized exception handling for MCP tool functions.
    
    This decorator intercepts any exceptions raised by the wrapped function and transforms
    them into a structured error response format that follows the MCP protocol. The response
    includes consistent error categorization, helpful suggestions for recovery, and details
    to aid debugging.
    
    Key features:
    - Automatically categorizes exceptions into appropriate ErrorType values
    - Preserves the original exception message and stack trace
    - Adds relevant suggestions based on the error type
    - Indicates whether the operation can be retried
    - Adds a timestamp for error logging/tracking
    
    The error response structure always includes:
    - success: False
    - isError: True (MCP protocol flag)
    - error: A dictionary with type, message, details, retriable flag, and suggestions
    
    Exception mapping:
    - ValueError, TypeError, KeyError, AttributeError → VALIDATION_ERROR (retriable)
    - FileNotFoundError, KeyError, IndexError → NOT_FOUND_ERROR (not retriable)
    - PermissionError, AccessError → PERMISSION_ERROR (not retriable)
    - TimeoutError → TIMEOUT_ERROR (retriable)
    - Exceptions with "rate limit" in message → RATE_LIMIT_ERROR (retriable)
    - All other exceptions → UNKNOWN_ERROR (not retriable)
    
    Args:
        func: The async function to wrap with error handling
        
    Returns:
        Decorated async function that catches exceptions and returns structured error responses
    
    Example:
        ```python
        @with_error_handling
        async def my_tool_function(param1, param2):
            # Function implementation that might raise exceptions
            # If an exception occurs, it will be transformed into a structured response
        ```
    """
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            # Call the original function
            return await func(*args, **kwargs)
        except Exception as e:
            # Get exception details
            exc_type = type(e).__name__
            exc_message = str(e)
            exc_traceback = traceback.format_exc()
            
            # Determine error type
            error_type = ErrorType.UNKNOWN_ERROR
            retriable = False
            
            # Map common exceptions to error types
            if exc_type in ("ValueError", "TypeError", "KeyError", "AttributeError"):
                error_type = ErrorType.VALIDATION_ERROR
                retriable = True
            elif exc_type in ("FileNotFoundError", "KeyError", "IndexError"):
                error_type = ErrorType.NOT_FOUND_ERROR
                retriable = False
            elif exc_type in ("PermissionError", "AccessError"):
                error_type = ErrorType.PERMISSION_ERROR
                retriable = False
            elif exc_type in ("TimeoutError"):
                error_type = ErrorType.TIMEOUT_ERROR
                retriable = True
            elif "rate limit" in exc_message.lower():
                error_type = ErrorType.RATE_LIMIT_ERROR
                retriable = True
            
            # Generate suggestions based on error type
            suggestions = []
            if error_type == ErrorType.VALIDATION_ERROR:
                suggestions = [
                    "Check that all required parameters are provided",
                    "Verify parameter types and formats",
                    "Ensure parameter values are within allowed ranges"
                ]
            elif error_type == ErrorType.NOT_FOUND_ERROR:
                suggestions = [
                    "Verify the resource ID or path exists",
                    "Check for typos in identifiers",
                    "Ensure the resource hasn't been deleted"
                ]
            elif error_type == ErrorType.RATE_LIMIT_ERROR:
                suggestions = [
                    "Wait before retrying the request",
                    "Reduce the frequency of requests",
                    "Implement backoff strategy for retries"
                ]
            
            # Format and return error response
            return format_error_response(
                error_type=error_type,
                message=exc_message,
                details={
                    "exception_type": exc_type,
                    "traceback": exc_traceback
                },
                retriable=retriable,
                suggestions=suggestions
            )
    
    return wrapper


def validate_inputs(**validators):
    """
    Decorator for validating tool input parameters against custom validation rules.
    
    This decorator enables declarative input validation for async tool functions by applying
    validator functions to specified parameters before the decorated function is called.
    If any validation fails, the function returns a standardized error response instead
    of executing, preventing errors from propagating and providing clear feedback on the issue.
    
    The validation approach supports:
    - Applying different validation rules to different parameters
    - Detailed error messages explaining which parameter failed and why
    - Custom validation logic via any callable that raises ValueError on failure
    - Zero validation overhead for parameters not explicitly validated
    
    Validator functions should:
    1. Take a single parameter (the value to validate)
    2. Raise a ValueError with a descriptive message if validation fails
    3. Return None or any value (which is ignored) if validation passes
    4. Include a docstring that describes the constraint (used in error messages)
    
    Args:
        **validators: A mapping of parameter names to validator functions.
            Each key should match a parameter name in the decorated function.
            Each value should be a callable that validates the corresponding parameter.
            
    Returns:
        Decorator function that wraps an async function with input validation
        
    Example:
        ```
        # Define validators (or use the provided ones like non_empty_string)
        def validate_temperature(value):
            '''Temperature must be between 0.0 and 1.0.'''
            if not isinstance(value, float) or value < 0.0 or value > 1.0:
                raise ValueError("Temperature must be between 0.0 and 1.0")
        
        # Apply validation to specific parameters
        @validate_inputs(
            prompt=non_empty_string,
            temperature=validate_temperature,
            max_tokens=positive_number
        )
        async def generate_text(prompt, temperature=0.7, max_tokens=None):
            # This function will only be called if all validations pass
            # Otherwise a standardized error response is returned
            ...
            
        # The response structure when validation fails:
        # {
        #   "success": False,
        #   "isError": True,
        #   "error": {
        #     "type": "validation_error",
        #     "message": "Invalid value for parameter 'prompt': Value must be a non-empty string",
        #     "details": { ... },
        #     "retriable": true,
        #     "suggestions": [ ... ]
        #   }
        # }
        ```
    
    Note:
        This decorator should typically be applied before other decorators like
        with_error_handling so that validation errors are correctly formatted.
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # Get function signature
            sig = inspect.signature(func)
            
            # Build mapping of parameter names to values
            bound_args = sig.bind(*args, **kwargs)
            bound_args.apply_defaults()
            
            # Validate inputs
            for param_name, validator in validators.items():
                if param_name in bound_args.arguments:
                    value = bound_args.arguments[param_name]
                    try:
                        # Run validation
                        validator(value)
                    except Exception as e:
                        # Return validation error
                        return format_error_response(
                            error_type=ErrorType.VALIDATION_ERROR,
                            message=f"Invalid value for parameter '{param_name}': {str(e)}",
                            details={
                                "parameter": param_name,
                                "value": str(value),
                                "constraint": str(validator.__doc__ or "")
                            },
                            retriable=True,
                            suggestions=[
                                f"Provide a valid value for '{param_name}'",
                                "Check the parameter constraints in the tool description"
                            ]
                        )
            
            # Call the original function if validation passes
            return await func(*args, **kwargs)
        
        return wrapper
    
    return decorator


# Example validators
def non_empty_string(value):
    """
    Validates that a value is a non-empty string.
    
    This validator checks that the input is a string type and contains at least
    one non-whitespace character. Empty strings or strings containing only
    whitespace characters are rejected. This is useful for validating required
    text inputs where blank values should not be allowed.
    
    Args:
        value: The value to validate
        
    Raises:
        ValueError: If the value is not a string or is empty/whitespace-only
    """
    if not isinstance(value, str) or not value.strip():
        raise ValueError("Value must be a non-empty string")

def positive_number(value):
    """
    Validates that a value is a positive number (greater than zero).
    
    This validator ensures that the input is either an integer or float
    and has a value greater than zero. Zero or negative values are rejected.
    This is useful for validating inputs like quantities, counts, or rates
    that must be positive.
    
    Args:
        value: The value to validate
        
    Raises:
        ValueError: If the value is not a number or is not positive
    """
    if not isinstance(value, (int, float)) or value <= 0:
        raise ValueError("Value must be a positive number")

def in_range(min_val, max_val):
    """
    Creates a validator function for checking if a number falls within a specified range.
    
    This is a validator factory that returns a custom validator function
    configured with the given minimum and maximum bounds. The returned function
    checks that a value is a number and falls within the inclusive range
    [min_val, max_val]. This is useful for validating inputs that must fall
    within specific limits, such as probabilities, temperatures, or indexes.
    
    Args:
        min_val: The minimum allowed value (inclusive)
        max_val: The maximum allowed value (inclusive)
        
    Returns:
        A validator function that checks if values are within the specified range
        
    Example:
        ```python
        # Create a validator for temperature (0.0 to 1.0)
        validate_temperature = in_range(0.0, 1.0)
        
        # Use in validation decorator
        @validate_inputs(temperature=validate_temperature)
        async def generate_text(prompt, temperature=0.7):
            # Function body
            ...
        ```
    """
    def validator(value):
        """Value must be between {min_val} and {max_val}."""
        if not isinstance(value, (int, float)) or value < min_val or value > max_val:
            raise ValueError(f"Value must be between {min_val} and {max_val}")
    return validator 
```

--------------------------------------------------------------------------------
/examples/vector_search_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Vector database and semantic search demonstration for Ultimate MCP Server."""
import asyncio
import sys
import time
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.services.vector import get_embedding_service, get_vector_db_service
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker

# --- Add Rich Imports ---
from ultimate_mcp_server.utils.logging.console import console

# ----------------------

# Initialize logger
logger = get_logger("example.vector_search")


async def demonstrate_vector_operations():
    """Demonstrate basic vector database operations using Rich."""
    console.print(Rule("[bold blue]Vector Database Operations Demo[/bold blue]"))
    logger.info("Starting vector database demonstration", emoji_key="start")
    
    embedding_service = get_embedding_service()
    vector_db = get_vector_db_service()
    
    if not embedding_service or not hasattr(embedding_service, 'client'):
        logger.critical("Failed to initialize embedding service. Is OPENAI_API_KEY configured correctly?", emoji_key="critical")
        console.print("[bold red]Error:[/bold red] Embedding service (likely OpenAI) failed to initialize. Check API key.")
        return False

    console.print(f"[dim]Vector DB Storage Path: {vector_db.base_dir}[/dim]")
    
    collection_name = "semantic_search_demo_rich"
    embedding_dimension = 1536 # Default for text-embedding-ada-002 / 3-small

    # --- Setup Collection --- 
    try:
        logger.info(f"Creating/resetting collection: {collection_name}", emoji_key="db")
        await vector_db.create_collection(
            name=collection_name,
            dimension=embedding_dimension, 
            overwrite=True,
            metadata={"description": "Rich Demo collection"}
        )

        documents = [
            "Machine learning is a field of study in artificial intelligence concerned with the development of algorithms that can learn from data.",
            "Natural language processing (NLP) is a subfield of linguistics and AI focused on interactions between computers and human language.",
            "Neural networks are computing systems inspired by the biological neural networks that constitute animal brains.",
            "Deep learning is part of a broader family of machine learning methods based on artificial neural networks.",
            "Transformer models have revolutionized natural language processing with their self-attention mechanism.",
            "Vector databases store and retrieve high-dimensional vectors for tasks like semantic search and recommendation systems.",
            "Embeddings are numerical representations that capture semantic meanings and relationships between objects.",
            "Clustering algorithms group data points into clusters based on similarity metrics.",
            "Reinforcement learning is about how software agents should take actions to maximize cumulative reward.",
            "Knowledge graphs represent knowledge in graph form with entities as nodes and relationships as edges."
        ]
        document_ids = [f"doc_{i}" for i in range(len(documents))]
        document_metadata = [
            {"domain": "machine_learning", "type": "concept", "id": document_ids[i]} 
            for i, doc in enumerate(documents)
        ]
        
        logger.info(f"Adding {len(documents)} documents...", emoji_key="processing")
        add_start_time = time.time()
        ids = await vector_db.add_texts(
            collection_name=collection_name,
            texts=documents,
            metadatas=document_metadata,
            ids=document_ids
        )
        add_time = time.time() - add_start_time
        logger.success(f"Added {len(ids)} documents in {add_time:.2f}s", emoji_key="success")

        # --- Basic Search --- 
        console.print(Rule("[green]Semantic Search[/green]"))
        query = "How do neural networks work?"
        logger.info(f"Searching for: '{escape(query)}'...", emoji_key="search")
        search_start_time = time.time()
        results = await vector_db.search_by_text(
            collection_name=collection_name,
            query_text=query,
            top_k=3,
            include_vectors=False
        )
        search_time = time.time() - search_start_time
        logger.success(f"Search completed in {search_time:.3f}s", emoji_key="success")
        
        results_table = Table(title=f'Search Results for: "{escape(query)}"', box=box.ROUNDED)
        results_table.add_column("#", style="dim", justify="right")
        results_table.add_column("Score", style="green", justify="right")
        results_table.add_column("Domain", style="cyan")
        results_table.add_column("Text Snippet", style="white")

        if results:
            for i, res in enumerate(results):
                metadata = res.get("metadata", {})
                text_snippet = escape(res.get("text", "")[:120] + ( "..." if len(res.get("text", "")) > 120 else ""))
                results_table.add_row(
                    str(i+1),
                    f"{res.get('similarity', 0.0):.4f}",
                    escape(metadata.get("domain", "N/A")),
                    text_snippet
                )
        else:
             results_table.add_row("-","-","-", "[dim]No results found.[/dim]")
        console.print(results_table)
        console.print()

        # --- Filtered Search --- 
        console.print(Rule("[green]Filtered Semantic Search[/green]"))
        filter_query = "embeddings"
        domain_filter = {"domain": "machine_learning"} # Example filter
        logger.info(f"Searching for '{escape(filter_query)}' with filter {escape(str(domain_filter))}...", emoji_key="filter")
        
        f_search_start_time = time.time()
        filtered_results = await vector_db.search_by_text(
            collection_name=collection_name,
            query_text=filter_query,
            top_k=3,
            filter=domain_filter
        )
        f_search_time = time.time() - f_search_start_time
        logger.success(f"Filtered search completed in {f_search_time:.3f}s", emoji_key="success")
        
        f_results_table = Table(title=f'Filtered Results (domain=machine_learning) for: "{escape(filter_query)}"', box=box.ROUNDED)
        f_results_table.add_column("#", style="dim", justify="right")
        f_results_table.add_column("Score", style="green", justify="right")
        f_results_table.add_column("Domain", style="cyan")
        f_results_table.add_column("Text Snippet", style="white")
        
        if filtered_results:
            for i, res in enumerate(filtered_results):
                metadata = res.get("metadata", {})
                text_snippet = escape(res.get("text", "")[:120] + ( "..." if len(res.get("text", "")) > 120 else ""))
                f_results_table.add_row(
                    str(i+1),
                    f"{res.get('similarity', 0.0):.4f}",
                    escape(metadata.get("domain", "N/A")),
                    text_snippet
                )
        else:
            f_results_table.add_row("-","-","-", "[dim]No results found.[/dim]")
        console.print(f_results_table)
        console.print()

        # --- Direct Embedding --- 
        console.print(Rule("[green]Direct Embedding Generation[/green]"))
        logger.info("Demonstrating direct embedding generation", emoji_key="vector")
        sample_text = "Semantic search helps find conceptually similar content."
        console.print(f"[cyan]Input Text:[/cyan] {escape(sample_text)}")
        
        emb_start_time = time.time()
        embeddings_list = await embedding_service.create_embeddings([sample_text])
        embedding = embeddings_list[0]
        emb_time = time.time() - emb_start_time
        logger.success(f"Generated embedding in {emb_time:.3f}s", emoji_key="success")
        
        # Use embedding display utility
        from ultimate_mcp_server.utils.display import _display_embeddings_info
        _display_embeddings_info([embedding], "text-embedding-3-small", console)
        
        # Also show sample values in a simple format for demo clarity
        console.print(f"[cyan]Sample Values (first 5):[/cyan] [dim]{escape(str(embedding[:5]))}...[/dim]")
        console.print()
        
        return True
    
    except Exception as e:
        logger.error(f"Error in vector operations: {e}", emoji_key="error", exc_info=True)
        console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
        return False
    finally:
        # Clean up collection
        try:
             logger.info(f"Deleting collection: {collection_name}", emoji_key="db")
             await vector_db.delete_collection(collection_name)
        except Exception as del_e:
            logger.warning(f"Could not delete collection {collection_name}: {del_e}", emoji_key="warning")


async def demonstrate_llm_with_vector_retrieval(tracker: CostTracker):
    """Demonstrate RAG using vector search and LLM with Rich display."""
    console.print(Rule("[bold blue]Retrieval-Augmented Generation (RAG) Demo[/bold blue]"))
    logger.info("Starting RAG demo", emoji_key="start")
    
    vector_db = get_vector_db_service()
    # Let get_provider handle key loading internally AND await it
    provider = await get_provider(Provider.OPENAI.value) 
    
    if not provider:
        logger.critical("OpenAI provider failed to initialize for RAG demo. Is OPENAI_API_KEY configured?", emoji_key="critical")
        console.print("[bold red]Error:[/bold red] OpenAI provider failed to initialize. Check API key.")
        return False
    
    # Re-create collection and add docs for this demo part
    collection_name = "rag_demo_collection_rich"
    embedding_dimension = 1536
    try:
        logger.info(f"Setting up collection: {collection_name}", emoji_key="db")
        await vector_db.create_collection(name=collection_name, dimension=embedding_dimension, overwrite=True)
        documents = [
            "Deep learning uses artificial neural networks with many layers (deep architectures).",
            "Neural networks are inspired by biological brains and consist of interconnected nodes or neurons.",
            "While deep learning is a type of machine learning that uses neural networks, not all neural networks qualify as deep learning (e.g., shallow networks).",
            "Key difference: Deep learning implies significant depth (many layers) allowing hierarchical feature learning."
        ]
        doc_ids = [f"rag_doc_{i}" for i in range(len(documents))]
        doc_metadatas = [
            {"topic": "machine_learning", "source": "demo_document", "id": doc_ids[i]} 
            for i in range(len(documents))
        ]
        await vector_db.add_texts(
            collection_name=collection_name, 
            texts=documents, 
            metadatas=doc_metadatas, 
            ids=doc_ids
        )
        logger.success(f"Added {len(documents)} documents for RAG.", emoji_key="success")

        question = "What is the difference between deep learning and neural networks?"
        console.print(f"[cyan]User Question:[/cyan] {escape(question)}")
        
        # Step 1: Retrieve Context
        logger.info("Retrieving relevant context...", emoji_key="search")
        search_start_time = time.time()
        search_results = await vector_db.search_by_text(
            collection_name=collection_name,
            query_text=question,
            top_k=3
        )
        search_time = time.time() - search_start_time
        
        logger.success(f"Retrieved {len(search_results)} context snippets in {search_time:.3f}s.", emoji_key="success")
        
        # Use vector results display utility
        from ultimate_mcp_server.utils.display import _display_vector_results
        _display_vector_results(search_results, console)
        
        # Join context for LLM
        context_texts = [result["text"] for result in search_results]
        context = "\n\n".join(context_texts)
        console.print(Panel(escape(context), title="[yellow]Retrieved Context[/yellow]", border_style="dim yellow", expand=False))

        # Step 2: Generate Answer with Context
        prompt = f"""Answer the following question based *only* on the provided context:

Context:
{context}

Question: {question}

Answer:"""
        
        logger.info("Generating answer using retrieved context...", emoji_key="processing")
        gen_start_time = time.time()
        result = await provider.generate_completion(
            prompt=prompt,
            model="gpt-4.1-mini", # Use a capable model
            temperature=0.2, # Lower temperature for factual answer
            max_tokens=200
        )
        gen_time = time.time() - gen_start_time
        logger.success("Answer generated.", emoji_key="success")
        
        # Track cost for the generation step
        tracker.add_call(result)

        # --- Display RAG Result --- 
        console.print(Panel(
            escape(result.text.strip()), 
            title="[bold green]Generated Answer (RAG)[/bold green]", 
            border_style="green", 
            expand=False
        ))
        
        stats_table = Table(title="RAG Stats", box=box.MINIMAL, show_header=False)
        stats_table.add_column("Metric", style="cyan")
        stats_table.add_column("Value", style="white")
        stats_table.add_row("Search Time", f"{search_time:.3f}s")
        stats_table.add_row("Generation Time", f"{gen_time:.3f}s")
        stats_table.add_row("Input Tokens", str(result.input_tokens))
        stats_table.add_row("Output Tokens", str(result.output_tokens))
        stats_table.add_row("Total Cost", f"${result.cost:.6f}")
        console.print(stats_table)
        console.print()
        
        return True

    except Exception as e:
        logger.error(f"Error in RAG demo: {e}", emoji_key="error", exc_info=True)
        console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
        return False
    finally:
         # Clean up collection
        try:
             logger.info(f"Deleting collection: {collection_name}", emoji_key="db")
             await vector_db.delete_collection(collection_name)
        except Exception as del_e:
            logger.warning(f"Could not delete collection {collection_name}: {del_e}", emoji_key="warning")

async def main():
    """Run all vector search demonstrations."""
    console.print(Rule("[bold magenta]Vector Search & RAG Demos Starting[/bold magenta]"))
    success = False
    tracker = CostTracker()
    try:
        operations_ok = await demonstrate_vector_operations()
        if operations_ok:
            rag_ok = await demonstrate_llm_with_vector_retrieval(tracker)
            success = rag_ok
        else:
             logger.warning("Skipping RAG demo due to vector operation errors.", emoji_key="skip")
        
    except Exception as e:
        logger.critical(f"Vector search demo failed: {str(e)}", emoji_key="critical", exc_info=True)
        console.print(f"[bold red]Critical Demo Error:[/bold red] {escape(str(e))}")
        return 1
    
    if success:
        logger.success("Vector Search & RAG Demos Finished Successfully!", emoji_key="complete")
        console.print(Rule("[bold magenta]Vector Search & RAG Demos Complete[/bold magenta]"))
        tracker.display_summary(console)
        return 0
    else:
         logger.error("One or more vector search demos failed.", emoji_key="error")
         console.print(Rule("[bold red]Vector Search & RAG Demos Finished with Errors[/bold red]"))
         tracker.display_summary(console)
         return 1


if __name__ == "__main__":
    # Run the demonstration
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/exceptions.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive exception system for the Ultimate MCP Server.

This module implements a hierarchical, structured exception system designed to provide
consistent error handling, reporting, and formatting across the MCP ecosystem. The exceptions
are designed to support both internal error handling and MCP protocol-compliant error responses
for client applications and LLMs.

Key design principles:
1. HIERARCHICAL: Exception types form a logical inheritance tree with ToolError at the root
2. CONTEXTUAL: Exceptions carry rich metadata including error codes, details, and context
3. FORMATTABLE: All exceptions can be converted to standard response dictionaries
4. TRACEABLE: Original causes and stack traces are preserved for debugging
5. ACTIONABLE: Error responses include specific information to aid recovery

Exception hierarchy:
- ToolError (base class)
  ├── ToolInputError (parameter validation issues)
  ├── ToolExecutionError (runtime execution failures)
  ├── ProviderError (LLM provider issues)
  │   ├── RateLimitError (provider throttling)
  │   └── AuthenticationError (auth failures)
  ├── ResourceError (resource access/manipulation)
  ├── ValidationError (general validation issues)
  ├── ConfigurationError (config problems)
  └── StorageError (storage operation failures)

The module also provides a format_error_response() function that standardizes any
exception (including non-ToolError exceptions) into a consistent error response
format compatible with the MCP protocol.

Example usage:
    ```python
    # Raising a specific exception with context
    if not os.path.exists(file_path):
        raise ResourceError(
            message="Cannot access required resource",
            resource_type="file",
            resource_id=file_path
        )
        
    # Catching and formatting an error
    try:
        result = process_data(data)
    except Exception as e:
        # Convert to standard response format for API
        error_response = format_error_response(e)
        return error_response
    ```
"""
import traceback
from typing import Any, Dict


class ToolError(Exception):
    """
    Base exception class for all tool-related errors in the Ultimate MCP Server.
    
    ToolError serves as the foundation of the MCP error handling system, providing a
    consistent interface for reporting, formatting, and categorizing errors that occur
    during tool execution. All specialized error types in the system inherit from this
    class, ensuring consistent error handling across the codebase.
    
    This exception class enhances Python's standard Exception with:
    
    - Error codes: Standardized identifiers for error categorization and programmatic handling
    - HTTP status mapping: Optional association with HTTP status codes for API responses
    - Detailed context: Support for rich error details and contextual information
    - Structured formatting: Conversion to standardized error response dictionaries
    
    The error hierarchy is designed to provide increasingly specific error types while
    maintaining a consistent structure that can be easily interpreted by error handlers,
    logging systems, and API responses.
    
    Error responses created from ToolError instances follow the MCP protocol format and
    include consistent fields for error type, message, details, and context.
    
    Usage example:
        ```python
        try:
            # Some operation that might fail
            result = process_data(data)
        except ToolInputError as e:
            # Handle input validation errors specifically
            log_validation_error(e)
        except ToolError as e:
            # Handle all other tool errors generically
            report_tool_error(e)
        ```
    """

    def __init__(self, message, error_code=None, details=None, context=None, http_status_code: int | None = None):
        """Initialize the tool error.

        Args:
            message: Error message
            error_code: Error code (for categorization)
            details: Additional error details dictionary
            context: Context dictionary (will be merged into details and stored)
            http_status_code: Optional HTTP status code associated with the error.
        """
        self.error_code = error_code or "TOOL_ERROR"
        self.http_status_code = http_status_code

        # Combine details and context, giving precedence to context if keys overlap
        combined_details = details.copy() if details else {} # Start with a copy of details or empty dict
        if context and isinstance(context, dict):
            combined_details.update(context) # Merge context into the combined dict

        self.details = combined_details # Store the combined dictionary
        self.context = context or {} # Also store original context separately for compatibility

        super().__init__(message)

class ToolInputError(ToolError):
    """Exception raised for errors in the tool input parameters."""
    
    def __init__(self, message, param_name=None, expected_type=None, provided_value=None, details=None):
        """Initialize the tool input error.
        
        Args:
            message: Error message
            param_name: Name of the problematic parameter
            expected_type: Expected parameter type
            provided_value: Value that was provided
            details: Additional error details
        """
        error_details = details or {}
        if param_name:
            error_details["param_name"] = param_name
        if expected_type:
            error_details["expected_type"] = str(expected_type)
        if provided_value is not None:
            error_details["provided_value"] = str(provided_value)
            
        super().__init__(
            message,
            error_code="INVALID_PARAMETER",
            details=error_details
        )


class ToolExecutionError(ToolError):
    """Exception raised when a tool execution fails."""
    
    def __init__(self, message, cause=None, details=None):
        """Initialize the tool execution error.
        
        Args:
            message: Error message
            cause: Original exception that caused the error
            details: Additional error details
        """
        error_details = details or {}
        if cause:
            error_details["cause"] = str(cause)
            error_details["traceback"] = traceback.format_exc()
            
        super().__init__(
            message,
            error_code="EXECUTION_ERROR",
            details=error_details
        )


class ProviderError(ToolError):
    """Exception raised for provider-specific errors."""
    
    def __init__(self, message, provider=None, model=None, cause=None, details=None):
        """Initialize the provider error.
        
        Args:
            message: Error message
            provider: Name of the provider
            model: Model name
            cause: Original exception that caused the error
            details: Additional error details
        """
        error_details = details or {}
        if provider:
            error_details["provider"] = provider
        if model:
            error_details["model"] = model
        if cause:
            error_details["cause"] = str(cause)
            error_details["traceback"] = traceback.format_exc()
            
        super().__init__(
            message,
            error_code="PROVIDER_ERROR",
            details=error_details
        )


class ResourceError(ToolError):
    """Exception raised for resource-related errors."""
    
    def __init__(self, message, resource_type=None, resource_id=None, cause=None, details=None):
        """Initialize the resource error.
        
        Args:
            message: Error message
            resource_type: Type of resource (e.g., "document", "embedding")
            resource_id: Resource identifier
            cause: Original exception that caused the error
            details: Additional error details
        """
        error_details = details or {}
        if resource_type:
            error_details["resource_type"] = resource_type
        if resource_id:
            error_details["resource_id"] = resource_id
        if cause:
            error_details["cause"] = str(cause)
            
        super().__init__(
            message,
            error_code="RESOURCE_ERROR",
            details=error_details
        )


class RateLimitError(ProviderError):
    """Exception raised when a provider's rate limit is reached."""
    
    def __init__(self, message, provider=None, retry_after=None, details=None):
        """Initialize the rate limit error.
        
        Args:
            message: Error message
            provider: Name of the provider
            retry_after: Seconds to wait before retrying
            details: Additional error details
        """
        error_details = details or {}
        if retry_after is not None:
            error_details["retry_after"] = retry_after
            
        super().__init__(
            message,
            provider=provider,
            error_code="RATE_LIMIT_ERROR",
            details=error_details
        )


class AuthenticationError(ProviderError):
    """Exception raised when authentication with a provider fails."""
    
    def __init__(self, message, provider=None, details=None):
        """Initialize the authentication error.
        
        Args:
            message: Error message
            provider: Name of the provider
            details: Additional error details
        """
        super().__init__(
            message,
            provider=provider,
            error_code="AUTHENTICATION_ERROR",
            details=details
        )


class ValidationError(ToolError):
    """Exception raised when validation of input/output fails."""
    
    def __init__(self, message, field_errors=None, details=None):
        """Initialize the validation error.
        
        Args:
            message: Error message
            field_errors: Dictionary of field-specific errors
            details: Additional error details
        """
        error_details = details or {}
        if field_errors:
            error_details["field_errors"] = field_errors
            
        super().__init__(
            message,
            error_code="VALIDATION_ERROR",
            details=error_details
        )


class ConfigurationError(ToolError):
    """Exception raised when there is an issue with configuration."""
    
    def __init__(self, message, config_key=None, details=None):
        """Initialize the configuration error.
        
        Args:
            message: Error message
            config_key: Key of the problematic configuration
            details: Additional error details
        """
        error_details = details or {}
        if config_key:
            error_details["config_key"] = config_key
            
        super().__init__(
            message,
            error_code="CONFIGURATION_ERROR",
            details=error_details
        )


class StorageError(ToolError):
    """Exception raised when there is an issue with storage operations."""
    
    def __init__(self, message, operation=None, location=None, details=None):
        """Initialize the storage error.
        
        Args:
            message: Error message
            operation: Storage operation that failed
            location: Location of the storage operation
            details: Additional error details
        """
        error_details = details or {}
        if operation:
            error_details["operation"] = operation
        if location:
            error_details["location"] = location
            
        super().__init__(
            message,
            error_code="STORAGE_ERROR",
            details=error_details
        )


def format_error_response(error: Exception) -> Dict[str, Any]:
    """
    Format any exception into a standardized MCP-compliant error response dictionary.
    
    This utility function creates a structured error response that follows the MCP protocol
    format, ensuring consistency in error reporting across different components. It handles
    both ToolError instances (with their rich error metadata) and standard Python exceptions,
    automatically extracting relevant information to create detailed, actionable error responses.
    
    The function performs special processing for different error types:
    
    - For ToolError and subclasses: Extracts error code, details, and context from the exception
    - For ToolInputError with path validation: Enhances messages with more user-friendly text
    - For standard Python exceptions: Captures traceback and generates appropriate error codes
    
    The resulting dictionary always contains these standardized fields:
    - error: Human-readable error message (string)
    - error_code: Categorized error code (string)
    - error_type: Name of the exception class (string)
    - details: Dictionary with additional error information (object)
    - success: Always false for errors (boolean)
    - isError: Always true, used by MCP protocol handlers (boolean)
    
    Args:
        error: Any exception instance to format into a response
        
    Returns:
        Dictionary containing standardized error information following the MCP protocol
        
    Example:
        ```python
        try:
            result = perform_operation()
        except Exception as e:
            error_response = format_error_response(e)
            return error_response  # Ready for API response
        ```
    """
    if isinstance(error, ToolError):
        # For ToolError instances, extract structured information
        error_type = error.__class__.__name__
        error_message = str(error)
        error_details = error.details or {}
        
        # Include context in the message for better clarity in user-facing errors
        context = getattr(error, 'context', None)
        if context and isinstance(context, dict):
            # Create a more specific error message based on error type
            if isinstance(error, ToolInputError):
                # For path validation errors, add more helpful information
                if 'path' in context and error_message.endswith('does not exist.'):
                    error_message = f"File not found: {context.get('path')}"
                elif 'path' in context and 'is not a regular file' in error_message:
                    if 'directory' in error_message.lower():
                        error_message = f"Cannot read directory as file: {context.get('path')}. Use list_directory instead."
                    else:
                        error_message = f"Path exists but is not a file: {context.get('path')}"
            
            # Add context to details for more information
            error_details["context"] = context
        
        # Look for error_type in details if available
        if "error_type" in error_details:
            error_type_from_details = error_details["error_type"]
            # Use this in the response directly
            response_error_type = error_type_from_details
        else:
            response_error_type = error_type
            
        # Create a standard error response that the demo can easily process
        return {
            "error": error_message,
            "error_code": error.error_code,
            "error_type": response_error_type,
            "details": error_details,
            "success": False,
            "isError": True
        }
    else:
        # For unknown errors, use the actual error message instead of a generic message
        error_message = str(error)
        if not error_message or error_message.strip() == "":
            error_message = f"Unknown error of type {type(error).__name__}"
            
        # Match the same response structure for consistency
        return {
            "error": error_message,
            "error_code": "UNKNOWN_ERROR", 
            "error_type": type(error).__name__,
            "details": {
                "type": type(error).__name__,
                "message": error_message,
                "traceback": traceback.format_exc()
            },
            "success": False,
            "isError": True
        } 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/grok.py:
--------------------------------------------------------------------------------

```python
"""Grok (xAI) provider implementation."""
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple

from openai import AsyncOpenAI

from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.utils import get_logger

# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.grok")


class GrokProvider(BaseProvider):
    """Provider implementation for xAI's Grok API."""
    
    provider_name = Provider.GROK.value
    
    def __init__(self, api_key: Optional[str] = None, **kwargs):
        """Initialize the Grok provider.
        
        Args:
            api_key: xAI API key
            **kwargs: Additional options
        """
        super().__init__(api_key=api_key, **kwargs)
        self.base_url = kwargs.get("base_url", "https://api.x.ai/v1")
        self.models_cache = None
        
    async def initialize(self) -> bool:
        """Initialize the Grok client.
        
        Returns:
            bool: True if initialization was successful
        """
        try:
            self.client = AsyncOpenAI(
                api_key=self.api_key, 
                base_url=self.base_url,
            )
            
            # Skip API call if using a mock key (for tests)
            if self.api_key and "mock-" in self.api_key:
                self.logger.info(
                    "Using mock Grok key - skipping API validation",
                    emoji_key="mock"
                )
                return True
            
            # Test connection by listing models
            await self.list_models()
            
            self.logger.success(
                "Grok provider initialized successfully", 
                emoji_key="provider"
            )
            return True
            
        except Exception as e:
            self.logger.error(
                f"Failed to initialize Grok provider: {str(e)}", 
                emoji_key="error"
            )
            return False
        
    async def generate_completion(
        self,
        prompt: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> ModelResponse:
        """Generate a completion using Grok.
        
        Args:
            prompt: Text prompt to send to the model
            model: Model name to use (e.g., "grok-3-latest")
            max_tokens: Maximum tokens to generate
            temperature: Temperature parameter (0.0-1.0)
            **kwargs: Additional model-specific parameters
            
        Returns:
            ModelResponse with completion result
            
        Raises:
            Exception: If API call fails
        """
        if not self.client:
            await self.initialize()
            
        # Use default model if not specified
        model = model or self.get_default_model()
        
        # Strip provider prefix if present (e.g., "grok/grok-3" -> "grok-3")
        if model.startswith(f"{self.provider_name}/"):
            original_model = model
            model = model.split("/", 1)[1]
            self.logger.debug(f"Stripped provider prefix from model name: {original_model} -> {model}")
        
        # Create messages
        messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
        
        # Get system message if provided
        system_message = kwargs.pop("system", None)
        if system_message and not any(msg.get("role") == "system" for msg in messages):
            messages.insert(0, {"role": "system", "content": system_message})
            
        # Handle tool support (function calling)
        tools = kwargs.pop("tools", None)
        tool_choice = kwargs.pop("tool_choice", None)
        
        # Handle reasoning effort for grok-3-mini models
        reasoning_effort = None
        if model.startswith("grok-3-mini") and "reasoning_effort" in kwargs:
            reasoning_effort = kwargs.pop("reasoning_effort")
        
        # Prepare API call parameters
        params = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
        }
        
        # Add max_tokens if specified
        if max_tokens is not None:
            params["max_tokens"] = max_tokens
            
        # Add tools and tool_choice if specified
        if tools:
            params["tools"] = tools
        if tool_choice:
            params["tool_choice"] = tool_choice
            
        # Handle JSON mode
        json_mode = kwargs.pop("json_mode", False)
        if json_mode:
            params["response_format"] = {"type": "json_object"}
            self.logger.debug("Setting response_format to JSON mode for Grok")
            
        # Add reasoning_effort for mini models if specified
        if reasoning_effort:
            params["reasoning_effort"] = reasoning_effort
            
        # Add any additional parameters
        params.update(kwargs)

        # Log request
        self.logger.info(
            f"Generating completion with Grok model {model}",
            emoji_key=self.provider_name,
            prompt_length=len(prompt),
            json_mode_requested=json_mode
        )
        
        try:
            # API call with timing
            response, processing_time = await self.process_with_timer(
                self.client.chat.completions.create, **params
            )
            
            # Extract response text
            completion_text = response.choices[0].message.content
            
            # Extract reasoning content for grok-3-mini models if available
            reasoning_content = None
            if hasattr(response.choices[0].message, "reasoning_content"):
                reasoning_content = response.choices[0].message.reasoning_content
            
            # Get usage statistics
            input_tokens = response.usage.prompt_tokens
            output_tokens = response.usage.completion_tokens
            total_tokens = response.usage.total_tokens
            
            # Extract reasoning tokens if available
            reasoning_tokens = None
            if hasattr(response.usage, "completion_tokens_details") and \
               hasattr(response.usage.completion_tokens_details, "reasoning_tokens"):
                reasoning_tokens = response.usage.completion_tokens_details.reasoning_tokens
            
            # Create metadata with reasoning information
            metadata = {}
            if reasoning_content:
                metadata["reasoning_content"] = reasoning_content
            if reasoning_tokens:
                metadata["reasoning_tokens"] = reasoning_tokens
            
            # Create standardized response
            result = ModelResponse(
                text=completion_text,
                model=model,
                provider=self.provider_name,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                total_tokens=total_tokens,
                processing_time=processing_time,
                raw_response=response,
                metadata=metadata,
            )
            
            # Log success
            self.logger.success(
                "Grok completion successful",
                emoji_key="success",
                model=model,
                tokens={
                    "input": result.input_tokens,
                    "output": result.output_tokens
                },
                cost=result.cost,
                time=result.processing_time
            )
            
            return result
            
        except Exception as e:
            self.logger.error(
                f"Grok completion failed: {str(e)}",
                emoji_key="error",
                model=model
            )
            raise
            
    async def generate_completion_stream(
        self,
        prompt: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
        """Generate a streaming completion using Grok.
        
        Args:
            prompt: Text prompt to send to the model
            model: Model name to use (e.g., "grok-3-latest")
            max_tokens: Maximum tokens to generate
            temperature: Temperature parameter (0.0-1.0)
            **kwargs: Additional model-specific parameters
            
        Yields:
            Tuple of (text_chunk, metadata)
            
        Raises:
            Exception: If API call fails
        """
        if not self.client:
            await self.initialize()
            
        # Use default model if not specified
        model = model or self.get_default_model()
        
        # Strip provider prefix if present (e.g., "grok/grok-3" -> "grok-3")
        if model.startswith(f"{self.provider_name}/"):
            original_model = model
            model = model.split("/", 1)[1]
            self.logger.debug(f"Stripped provider prefix from model name (stream): {original_model} -> {model}")
        
        # Create messages
        messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
        
        # Get system message if provided
        system_message = kwargs.pop("system", None)
        if system_message and not any(msg.get("role") == "system" for msg in messages):
            messages.insert(0, {"role": "system", "content": system_message})
        
        # Handle reasoning effort for grok-3-mini models
        reasoning_effort = None
        if model.startswith("grok-3-mini") and "reasoning_effort" in kwargs:
            reasoning_effort = kwargs.pop("reasoning_effort")
            
        # Prepare API call parameters
        params = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "stream": True,
        }
        
        # Add max_tokens if specified
        if max_tokens is not None:
            params["max_tokens"] = max_tokens
            
        # Handle JSON mode
        json_mode = kwargs.pop("json_mode", False)
        if json_mode:
            params["response_format"] = {"type": "json_object"}
            self.logger.debug("Setting response_format to JSON mode for Grok streaming")
            
        # Add reasoning_effort for mini models if specified
        if reasoning_effort:
            params["reasoning_effort"] = reasoning_effort
            
        # Add any additional parameters
        params.update(kwargs)
        
        # Log request
        self.logger.info(
            f"Generating streaming completion with Grok model {model}",
            emoji_key=self.provider_name,
            prompt_length=len(prompt),
            json_mode_requested=json_mode
        )
        
        start_time = time.time()
        total_chunks = 0
        
        try:
            # Make streaming API call
            stream = await self.client.chat.completions.create(**params)
            
            # Process the stream
            async for chunk in stream:
                total_chunks += 1
                
                # Extract content from the chunk
                delta = chunk.choices[0].delta
                content = delta.content or ""
                
                # Extract reasoning content for grok-3-mini models if available
                reasoning_content = None
                if hasattr(delta, "reasoning_content"):
                    reasoning_content = delta.reasoning_content
                
                # Metadata for this chunk
                metadata = {
                    "model": model,
                    "provider": self.provider_name,
                    "chunk_index": total_chunks,
                    "finish_reason": chunk.choices[0].finish_reason,
                }
                
                # Add reasoning content to metadata if available
                if reasoning_content:
                    metadata["reasoning_content"] = reasoning_content
                
                yield content, metadata
                
            # Log success
            processing_time = time.time() - start_time
            self.logger.success(
                "Grok streaming completion successful",
                emoji_key="success",
                model=model,
                chunks=total_chunks,
                time=processing_time
            )
            
        except Exception as e:
            self.logger.error(
                f"Grok streaming completion failed: {str(e)}",
                emoji_key="error",
                model=model
            )
            raise
            
    async def list_models(self) -> List[Dict[str, Any]]:
        """List available Grok models.
        
        Returns:
            List of model information dictionaries
        """
        if self.models_cache:
            return self.models_cache
            
        try:
            if not self.client:
                await self.initialize()
                
            # Fetch models from API (Grok API uses the same endpoint as OpenAI)
            response = await self.client.models.list()
            
            # Process response
            models = []
            for model in response.data:
                # Filter to only include grok-3 models
                if model.id.startswith("grok-3"):
                    models.append({
                        "id": model.id,
                        "provider": self.provider_name,
                        "created": model.created,
                        "owned_by": model.owned_by,
                    })
            
            # Cache results
            self.models_cache = models
            
            return models
            
        except Exception as e:
            self.logger.error(
                f"Failed to list Grok models: {str(e)}",
                emoji_key="error"
            )
            
            # Return basic grok-3 models on error based on documentation
            return [
                {
                    "id": "grok-3-latest",
                    "provider": self.provider_name,
                    "description": "Flagship model for enterprise tasks (latest version)",
                },
                {
                    "id": "grok-3-beta",
                    "provider": self.provider_name,
                    "description": "Flagship model that excels at enterprise tasks, domain knowledge",
                },
                {
                    "id": "grok-3-fast-latest",
                    "provider": self.provider_name,
                    "description": "Fast version of grok-3, same quality with higher cost",
                },
                {
                    "id": "grok-3-mini-latest",
                    "provider": self.provider_name,
                    "description": "Lightweight model with thinking capabilities",
                },
                {
                    "id": "grok-3-mini-fast-latest",
                    "provider": self.provider_name,
                    "description": "Fast version of grok-3-mini, same quality with higher cost",
                }
            ]
            
    def get_default_model(self) -> str:
        """Get the default Grok model.
        
        Returns:
            Default model name
        """
        # Safely get from config if available
        try:
            config = get_config()
            provider_config = getattr(config, 'providers', {}).get(self.provider_name, None)
            if provider_config and provider_config.default_model:
                return provider_config.default_model
        except (AttributeError, TypeError):
            # Handle case when providers attribute doesn't exist or isn't a dict
            pass
            
        # Otherwise return hard-coded default
        return "grok-3-beta"
        
    async def check_api_key(self) -> bool:
        """Check if the Grok API key is valid.
        
        Returns:
            bool: True if API key is valid
        """
        try:
            # Just list models as a simple validation
            await self.list_models()
            return True
        except Exception:
            return False
```

--------------------------------------------------------------------------------
/examples/analytics_reporting_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Analytics and reporting demonstration for Ultimate MCP Server."""
import asyncio
import sys
import time
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich import box
from rich.live import Live
from rich.markup import escape
from rich.rule import Rule
from rich.table import Table

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.services.analytics.metrics import get_metrics_tracker
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker, display_analytics_metrics

# --- Add Rich Imports ---
from ultimate_mcp_server.utils.logging.console import console

# ----------------------

# Initialize logger
logger = get_logger("example.analytics_reporting")


async def simulate_llm_usage(tracker: CostTracker = None):
    """Simulate various LLM API calls to generate analytics data."""
    console.print(Rule("[bold blue]Simulating LLM Usage[/bold blue]"))
    logger.info("Simulating LLM usage to generate analytics data", emoji_key="start")
    
    metrics = get_metrics_tracker()
    providers_info = []
    
    # Setup providers - Get keys from loaded config via get_provider
    # REMOVED provider_configs dict and direct decouple calls

    # Iterate through defined Provider enums
    for provider_enum in Provider:
        # Skip if provider doesn't make sense to simulate here, e.g., OpenRouter might need extra config
        if provider_enum == Provider.OPENROUTER:
             logger.info(f"Skipping {provider_enum.value} in simulation.", emoji_key="skip")
             continue 
             
        try:
            # Let get_provider handle config/key lookup internally
            provider = await get_provider(provider_enum.value) # REMOVED api_key argument
            if provider:
                # Ensure initialization (get_provider might not initialize)
                # await provider.initialize() # Initialization might be done by get_provider or completion call
                providers_info.append((provider_enum.value, provider))
                logger.info(f"Provider instance obtained for: {provider_enum.value}", emoji_key="provider")
            else:
                logger.info(f"Provider {provider_enum.value} not configured or key missing, skipping simulation.", emoji_key="skip")
        except Exception as e:
            logger.warning(f"Failed to get/initialize {provider_enum.value}: {e}", emoji_key="warning")

    if not providers_info:
        logger.error("No providers could be initialized. Cannot simulate usage.", emoji_key="error")
        console.print("[bold red]Error:[/bold red] No LLM providers could be initialized. Please check your API keys.")
        return metrics # Return empty metrics

    console.print(f"Simulating usage with [cyan]{len(providers_info)}[/cyan] providers.")

    prompts = [
        "What is machine learning?",
        "Explain the concept of neural networks in simple terms.",
        "Write a short story about a robot that learns to love.",
        "Summarize the key innovations in artificial intelligence over the past decade.",
        "What are the ethical considerations in developing advanced AI systems?"
    ]
    
    total_calls = len(providers_info) * len(prompts)
    call_count = 0
    
    for provider_name, provider in providers_info:
        # Use default model unless specific logic requires otherwise
        model_to_use = provider.get_default_model()
        if not model_to_use:
            logger.warning(f"No default model found for {provider_name}, skipping provider.", emoji_key="warning")
            continue # Skip this provider if no default model

        for prompt in prompts:
            call_count += 1
            logger.info(
                f"Simulating call ({call_count}/{total_calls}) for {provider_name}",
                emoji_key="processing"
            )
            
            try:
                start_time = time.time()
                result = await provider.generate_completion(
                    prompt=prompt,
                    model=model_to_use, # Use determined model
                    temperature=0.7,
                    max_tokens=150
                )
                completion_time = time.time() - start_time
                
                # Track costs if tracker provided
                if tracker:
                    tracker.add_call(result)
                
                # Record metrics using the actual model returned in the result
                metrics.record_request(
                    provider=provider_name,
                    model=result.model, # Use model from result
                    input_tokens=result.input_tokens,
                    output_tokens=result.output_tokens,
                    cost=result.cost,
                    duration=completion_time,
                    success=True
                )
                
                # Log less verbosely during simulation
                # logger.success("Completion generated", emoji_key="success", provider=provider_name, model=result.model)
                
                await asyncio.sleep(0.2) # Shorter delay
            
            except Exception as e:
                logger.error(f"Error simulating completion for {provider_name}: {str(e)}", emoji_key="error")
                metrics.record_request(
                    provider=provider_name,
                    model=model_to_use, # Log error against intended model
                    input_tokens=0, # Assume 0 tokens on error for simplicity
                    output_tokens=0,
                    cost=0.0,
                    duration=time.time() - start_time, # Log duration even on error
                    success=False # Mark as failed
                )
    
    logger.info("Finished simulating LLM usage", emoji_key="complete")
    return metrics


async def demonstrate_metrics_tracking(tracker: CostTracker = None):
    """Demonstrate metrics tracking functionality using Rich."""
    console.print(Rule("[bold blue]Metrics Tracking Demonstration[/bold blue]"))
    logger.info("Starting metrics tracking demonstration", emoji_key="start")
    
    metrics = get_metrics_tracker(reset_on_start=True)
    await simulate_llm_usage(tracker)
    stats = metrics.get_stats()
    
    # Use the standardized display utility instead of custom code
    display_analytics_metrics(stats)
    
    return stats


async def demonstrate_analytics_reporting():
    """Demonstrate analytics reporting functionality."""
    console.print(Rule("[bold blue]Analytics Reporting Demonstration[/bold blue]"))
    logger.info("Starting analytics reporting demonstration", emoji_key="start")
    
    metrics = get_metrics_tracker()
    stats = metrics.get_stats()
    if stats["general"]["requests_total"] == 0:
        logger.warning("No metrics data found. Running simulation first.", emoji_key="warning")
        await simulate_llm_usage()
        stats = metrics.get_stats()
    
    # --- Perform calculations directly from stats --- 
    general_stats = stats.get("general", {})
    provider_stats = stats.get("providers", {})
    model_stats = stats.get("models", {})
    daily_usage_stats = stats.get("daily_usage", [])
    total_cost = general_stats.get("cost_total", 0.0)
    total_tokens = general_stats.get("tokens_total", 0)
    
    # Calculate cost by provider
    cost_by_provider = []
    if total_cost > 0:
        cost_by_provider = [
            {
                "name": provider,
                "value": data.get("cost", 0.0),
                "percentage": data.get("cost", 0.0) / total_cost * 100 if total_cost > 0 else 0,
            }
            for provider, data in provider_stats.items()
        ]
        cost_by_provider.sort(key=lambda x: x["value"], reverse=True)
        
    # Calculate cost by model
    cost_by_model = []
    if total_cost > 0:
        cost_by_model = [
            {
                "name": model,
                "value": data.get("cost", 0.0),
                "percentage": data.get("cost", 0.0) / total_cost * 100 if total_cost > 0 else 0,
            }
            for model, data in model_stats.items()
        ]
        cost_by_model.sort(key=lambda x: x["value"], reverse=True)

    # Calculate tokens by provider
    tokens_by_provider = []
    if total_tokens > 0:
        tokens_by_provider = [
            {
                "name": provider,
                "value": data.get("tokens", 0),
                "percentage": data.get("tokens", 0) / total_tokens * 100 if total_tokens > 0 else 0,
            }
            for provider, data in provider_stats.items()
        ]
        tokens_by_provider.sort(key=lambda x: x["value"], reverse=True)

    # Calculate tokens by model
    tokens_by_model = []
    if total_tokens > 0:
        tokens_by_model = [
            {
                "name": model,
                "value": data.get("tokens", 0),
                "percentage": data.get("tokens", 0) / total_tokens * 100 if total_tokens > 0 else 0,
            }
            for model, data in model_stats.items()
        ]
        tokens_by_model.sort(key=lambda x: x["value"], reverse=True)
        
    # Calculate daily cost trend (simplified: just show daily cost, no % change)
    daily_cost_trend = [
        {
            "date": day.get("date"),
            "cost": day.get("cost", 0.0)
        }
        for day in daily_usage_stats
    ]
    daily_cost_trend.sort(key=lambda x: x["date"]) # Sort by date
    # --------------------------------------------------

    # Display reports using tables (using the calculated data)
    # Provider cost report
    if cost_by_provider:
        provider_cost_table = Table(title="[bold green]Cost by Provider[/bold green]", box=box.ROUNDED)
        provider_cost_table.add_column("Provider", style="magenta")
        provider_cost_table.add_column("Cost", style="green", justify="right")
        provider_cost_table.add_column("Percentage", style="cyan", justify="right")
        
        for item in cost_by_provider:
            provider_cost_table.add_row(
                escape(item["name"]),
                f"${item['value']:.6f}",
                f"{item['percentage']:.1f}%"
            )
        console.print(provider_cost_table)
        console.print()
    
    # Model cost report
    if cost_by_model:
        model_cost_table = Table(title="[bold green]Cost by Model[/bold green]", box=box.ROUNDED)
        model_cost_table.add_column("Model", style="blue")
        model_cost_table.add_column("Cost", style="green", justify="right")
        model_cost_table.add_column("Percentage", style="cyan", justify="right")
        
        for item in cost_by_model:
            model_cost_table.add_row(
                escape(item["name"]),
                f"${item['value']:.6f}",
                f"{item['percentage']:.1f}%"
            )
        console.print(model_cost_table)
        console.print()
    
    # Tokens by provider report
    if tokens_by_provider:
        tokens_provider_table = Table(title="[bold green]Tokens by Provider[/bold green]", box=box.ROUNDED)
        tokens_provider_table.add_column("Provider", style="magenta")
        tokens_provider_table.add_column("Tokens", style="white", justify="right")
        tokens_provider_table.add_column("Percentage", style="cyan", justify="right")
        
        for item in tokens_by_provider:
            tokens_provider_table.add_row(
                escape(item["name"]),
                f"{item['value']:,}",
                f"{item['percentage']:.1f}%"
            )
        console.print(tokens_provider_table)
        console.print()
    
    # Tokens by model report
    if tokens_by_model:
        tokens_model_table = Table(title="[bold green]Tokens by Model[/bold green]", box=box.ROUNDED)
        tokens_model_table.add_column("Model", style="blue")
        tokens_model_table.add_column("Tokens", style="white", justify="right")
        tokens_model_table.add_column("Percentage", style="cyan", justify="right")
        
        for item in tokens_by_model:
            tokens_model_table.add_row(
                escape(item["name"]),
                f"{item['value']:,}",
                f"{item['percentage']:.1f}%"
            )
        console.print(tokens_model_table)
        console.print()
        
    # Daily cost trend report
    if daily_cost_trend:
        daily_trend_table = Table(title="[bold green]Daily Cost Trend[/bold green]", box=box.ROUNDED)
        daily_trend_table.add_column("Date", style="yellow")
        daily_trend_table.add_column("Cost", style="green", justify="right")
        # daily_trend_table.add_column("Change", style="cyan", justify="right") # Removed change calculation for simplicity
        
        for item in daily_cost_trend:
            # change_str = f"{item.get('change', 0):.1f}%" if 'change' in item else "N/A"
            # change_style = ""
            # if 'change' in item:
            #     if item['change'] > 0:
            #         change_style = "[red]+"
            #     elif item['change'] < 0:
            #         change_style = "[green]"
                    
            daily_trend_table.add_row(
                item["date"],
                f"${item['cost']:.6f}"
                # f"{change_style}{change_str}[/]" if change_style else change_str
            )
        console.print(daily_trend_table)
        console.print()
    
    # Return the calculated data instead of None
    return {
        "cost_by_provider": cost_by_provider,
        "cost_by_model": cost_by_model,
        "tokens_by_provider": tokens_by_provider,
        "tokens_by_model": tokens_by_model,
        "daily_cost_trend": daily_cost_trend
    }


async def demonstrate_real_time_monitoring():
    """Demonstrate real-time metrics monitoring using Rich Live."""
    console.print(Rule("[bold blue]Real-Time Monitoring Demonstration[/bold blue]"))
    logger.info("Starting real-time monitoring (updates every 2s for 10s)", emoji_key="start")
    
    metrics = get_metrics_tracker() # Use existing tracker
    
    def generate_stats_table() -> Table:
        """Generates a Rich Table with current stats."""
        stats = metrics.get_stats()["general"]
        table = Table(title="Live LLM Usage Stats", box=box.ROUNDED)
        table.add_column("Metric", style="cyan")
        table.add_column("Value", style="white", justify="right")
        table.add_row("Total Requests", f"{stats['requests_total']:,}")
        table.add_row("Total Tokens", f"{stats['tokens_total']:,}")
        table.add_row("Total Cost", f"${stats['cost_total']:.6f}")
        table.add_row("Total Errors", f"{stats['errors_total']:,}")
        return table

    try:
        with Live(generate_stats_table(), refresh_per_second=0.5, console=console) as live:
            # Simulate some activity in the background while monitoring
            # We could run simulate_llm_usage again, but let's just wait for demo purposes
            end_time = time.time() + 10 # Monitor for 10 seconds
            while time.time() < end_time:
                # In a real app, other tasks would be modifying metrics here
                live.update(generate_stats_table())
                await asyncio.sleep(2) # Update display every 2 seconds
                
            # Final update
            live.update(generate_stats_table())
            
    except Exception as e:
         logger.error(f"Error during live monitoring: {e}", emoji_key="error", exc_info=True)

    logger.info("Finished real-time monitoring demonstration", emoji_key="complete")
    console.print()


async def main():
    """Run all analytics and reporting demonstrations."""
    tracker = CostTracker()  # Create cost tracker instance
    try:
        # Demonstrate metrics tracking (includes simulation)
        await demonstrate_metrics_tracking(tracker)
        
        # Demonstrate report generation
        await demonstrate_analytics_reporting()
        
        # Demonstrate real-time monitoring
        await demonstrate_real_time_monitoring()
        
        # Display final cost summary
        tracker.display_summary(console)
        
    except Exception as e:
        logger.critical(f"Analytics demo failed: {str(e)}", emoji_key="critical", exc_info=True)
        return 1
    
    logger.success("Analytics & Reporting Demo Finished Successfully!", emoji_key="complete")
    return 0


if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/examples/cost_optimization.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Cost optimization examples for Ultimate MCP Server."""
import asyncio
import sys
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from fastmcp import FastMCP
from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table

from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS, DEFAULT_MODELS, Provider
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.tools.optimization import estimate_cost, recommend_model
from ultimate_mcp_server.utils import get_logger

# --- Import display utilities ---
from ultimate_mcp_server.utils.display import CostTracker, parse_and_display_result

# --- Add Rich Imports ---
from ultimate_mcp_server.utils.logging.console import console
from ultimate_mcp_server.utils.text import count_tokens  # Import proper token counting function

# ----------------------

# Initialize logger
logger = get_logger("example.cost_optimization")

# Initialize FastMCP server
mcp = FastMCP("Cost Optimization Demo")

# Create optimization tools instance with MCP server - this registers all the tools
# OptimizationTools(mcp)

# Manually register the tools needed for this demo on the local MCP instance
mcp.tool()(estimate_cost)
mcp.tool()(recommend_model)
logger.info("Manually registered optimization tools (estimate_cost, recommend_model).")

# Helper function to unpack tool results that might be returned as a list
def unpack_tool_result(result):
    """
    Handles the case where tool results are returned as a list instead of a dictionary.
    
    Args:
        result: Result from an MCP tool call
        
    Returns:
        Unpacked result as a dictionary if possible, or the original result
    """
    # Check if result is a list with content
    if isinstance(result, list) and result:
        # Try to get the first item if it's a dictionary
        first_item = result[0]
        if isinstance(first_item, dict):
            return first_item
            
        # Handle case where item might be an object with a text attribute
        if hasattr(first_item, 'text'):
            try:
                import json
                # Try to parse as JSON
                return json.loads(first_item.text)
            except (json.JSONDecodeError, AttributeError):
                pass
    
    # Return the original result if we can't unpack
    return result

# Modified to use provider system directly
async def _execute_with_recommended_model(balanced_rec, prompt, estimated_output_tokens, tracker: CostTracker):
    """Execute completion with the recommended model using provider system."""
    if not balanced_rec:
        logger.error("No recommended model provided", emoji_key="error")
        return
        
    provider_name = _get_provider_for_model(balanced_rec)
    if not provider_name:
        logger.error(f"Could not determine provider for recommended model {balanced_rec}", emoji_key="error")
        return
    
    try:
        # Get the provider without explicitly passing an API key (let the provider system handle it)
        provider = await get_provider(provider_name)
        await provider.initialize()
        
        # Generate the completion and get accurate token counts from the response
        result = await provider.generate_completion(
            prompt=prompt,
            model=balanced_rec,
            temperature=0.7,
            max_tokens=estimated_output_tokens
        )
        
        # Track cost
        tracker.add_call(result)
        
        # Display result information using actual token counts from the API response
        logger.success(f"Completion with {balanced_rec} successful", emoji_key="success")
        logger.info(f"Actual input tokens: {result.input_tokens}, output tokens: {result.output_tokens}", emoji_key="token")
        logger.info(f"Cost based on actual usage: ${result.cost:.6f}", emoji_key="cost")
        
        console.print(Panel(
            escape(result.text.strip()),
            title=f"[bold green]Response from {escape(balanced_rec)}[/bold green]",
            border_style="green"
        ))
        
        # Display Stats using display utility
        parse_and_display_result(
            f"Execution Stats for {balanced_rec}",
            None,
            {
                "model": balanced_rec,
                "provider": provider_name,
                "cost": result.cost,
                "tokens": {
                    "input": result.input_tokens,
                    "output": result.output_tokens,
                    "total": result.input_tokens + result.output_tokens
                },
                "processing_time": result.processing_time
            }
        )
        
    except Exception as e:
         logger.error(f"Error running completion with {balanced_rec}: {e}", emoji_key="error", exc_info=True)

async def demonstrate_cost_optimization(tracker: CostTracker):
    """Demonstrate cost optimization features using Rich."""
    console.print(Rule("[bold blue]Cost Optimization Demonstration[/bold blue]"))
    logger.info("Starting cost optimization demonstration", emoji_key="start")
    
    prompt = """
    Write a comprehensive analysis of how machine learning is being applied in the healthcare industry,
    focusing on diagnostic tools, treatment optimization, and administrative efficiency.
    Include specific examples and potential future developments.
    """
    
    # Note for the demo: Use proper token counting, not character estimation
    logger.info("Calculating tokens for the prompt with tiktoken", emoji_key="info")
    # Use default models from constants for the initial token count display
    models_to_show = list(DEFAULT_MODELS.values())
    for model_name in models_to_show:
        token_count = count_tokens(prompt, model_name)
        logger.info(f"Model {model_name}: {token_count} input tokens", emoji_key="token")
    
    estimated_output_tokens = 500 # Estimate output length for the prompt
    
    # --- Cost Estimation --- 
    console.print(Rule("[cyan]Cost Estimation[/cyan]"))
    logger.info("Estimating costs for different models", emoji_key="cost")
    
    # Dynamically get models from the constants file
    models_to_compare = list(COST_PER_MILLION_TOKENS.keys())
    
    cost_table = Table(title="Estimated Costs", box=box.ROUNDED, show_header=True)
    cost_table.add_column("Model", style="magenta")
    cost_table.add_column("Input Tokens", style="white", justify="right")
    cost_table.add_column("Output Tokens", style="white", justify="right")
    cost_table.add_column("Input Rate ($/M)", style="dim blue", justify="right")
    cost_table.add_column("Output Rate ($/M)", style="dim blue", justify="right")
    cost_table.add_column("Estimated Cost", style="green", justify="right")

    cost_estimates = []
    for model_name_only in models_to_compare: # Renamed variable for clarity
        try:
            # Determine provider and construct full model name
            provider_name = _get_provider_for_model(model_name_only)
            if not provider_name:
                logger.warning(f"Could not determine provider for model '{model_name_only}'. Skipping cost estimation.", emoji_key="warning")
                cost_table.add_row(escape(model_name_only), "-", "-", "-", "-", "[dim yellow]Unknown provider[/dim yellow]")
                continue

            full_model_name = f"{provider_name}/{model_name_only}"

            # Call the estimate_cost tool with the prefixed model name
            raw_result = await mcp.call_tool("estimate_cost", {
                "prompt": prompt,
                "model": full_model_name, # Use the prefixed name
                "max_tokens": estimated_output_tokens
            })
            
            # Unpack the result
            estimate_result = unpack_tool_result(raw_result)
            
            if "error" in estimate_result:
                # Log the error with the original model name for clarity in logs
                logger.warning(f"Could not estimate cost for {model_name_only}: {estimate_result['error']}", emoji_key="warning")
                cost_table.add_row(escape(model_name_only), "-", "-", "-", "-", f"[dim red]{estimate_result['error']}[/dim red]")
            else:
                cost_estimates.append(estimate_result) # Store for later use if needed
                cost_table.add_row(
                    escape(model_name_only), # Display original model name in table
                    str(estimate_result["tokens"]["input"]),
                    str(estimate_result["tokens"]["output"]),
                    f"${estimate_result['rate']['input']:.2f}",
                    f"${estimate_result['rate']['output']:.2f}",
                    f"${estimate_result['cost']:.6f}"
                )
        except Exception as e:
            logger.error(f"Error calling estimate_cost for {model_name_only}: {e}", emoji_key="error", exc_info=True)
            cost_table.add_row(escape(model_name_only), "-", "-", "-", "-", "[red]Error[/red]")
            
    console.print(cost_table)
    console.print()

    # --- Model Recommendation --- 
    console.print(Rule("[cyan]Model Recommendation[/cyan]"))
    logger.info("Getting model recommendations based on different priorities", emoji_key="recommend")
    
    # Define task parameters for recommendation
    task_info = {
        "task_type": "analysis_generation",
        "expected_input_length": len(prompt),
        "expected_output_length": estimated_output_tokens * 4, # Convert tokens back to chars approx
        "required_capabilities": ["reasoning", "knowledge"], 
        "max_cost": 0.005 # Example max cost constraint
    }
    
    priorities = ["balanced", "cost", "quality", "speed"]
    
    recommendation_table = Table(title="Model Recommendations", box=box.ROUNDED, show_header=True)
    recommendation_table.add_column("Priority", style="yellow")
    recommendation_table.add_column("1st Rec", style="magenta")
    recommendation_table.add_column("Cost", style="green", justify="right")
    recommendation_table.add_column("Quality", style="blue", justify="right")
    recommendation_table.add_column("Speed", style="cyan", justify="right")
    recommendation_table.add_column("Score", style="white", justify="right")
    recommendation_table.add_column("Other Recs", style="dim")

    recommendation_results = {}
    
    for priority in priorities:
        try:
            # Call the recommend_model tool
            raw_result = await mcp.call_tool("recommend_model", {
                **task_info,
                "priority": priority
            })
            
            # Unpack the result
            rec_result = unpack_tool_result(raw_result)
            
            if "error" in rec_result:
                logger.warning(f"Could not get recommendations for priority '{priority}': {rec_result['error']}", emoji_key="warning")
                recommendation_table.add_row(priority, "-", "-", "-", "-", "-", f"[dim red]{rec_result['error']}[/dim red]")
            elif not rec_result.get("recommendations"):
                 logger.info(f"No models met criteria for priority '{priority}'", emoji_key="info")
                 recommendation_table.add_row(priority, "[dim]None[/dim]", "-", "-", "-", "-", "No models fit criteria")
            else:
                recs = rec_result["recommendations"]
                top_rec = recs[0]
                other_recs_str = ", ".join([escape(r["model"]) for r in recs[1:]]) if len(recs) > 1 else "None"
                
                cost_key = "estimated_cost"
                quality_key = "quality_score"
                speed_key = "speed_score"
                
                if 'score' not in top_rec:
                    if priority == "cost":
                        # Lower cost is better
                        score = 10.0 / (float(top_rec.get(cost_key, 1.0)) + 0.001) # Use .get with default
                    elif priority == "quality":
                        # Higher quality is better
                        score = float(top_rec.get(quality_key, 0))
                    elif priority == "speed":
                        # Lower speed value is better
                        score = 10.0 - float(top_rec.get(speed_key, 5))
                    else:  # balanced
                        # Balanced score - use .get for safety
                        q = float(top_rec.get(quality_key, 5))
                        c = float(top_rec.get(cost_key, 0.001))
                        s = float(top_rec.get(speed_key, 3))
                        score = (q * 0.5 - c * 100.0 - s * 0.3)
                else:
                    score = top_rec['score']
                
                recommendation_table.add_row(
                    priority,
                    escape(top_rec["model"]),
                    f"${top_rec.get(cost_key, 0.0):.6f}", # Use .get
                    str(top_rec.get(quality_key, '-')), # Use .get
                    str(top_rec.get(speed_key, '-')),    # Use .get
                    f"{score:.2f}",
                    other_recs_str
                )
                
                # Store for later use
                recommendation_results[priority] = rec_result
                
        except Exception as e:
             logger.error(f"Error calling recommend_model for priority {priority}: {e}", emoji_key="error", exc_info=True)
             recommendation_table.add_row(priority, "-", "-", "-", "-", "-", "[red]Error[/red]")

    console.print(recommendation_table)
    console.print()

    # --- Run with Recommended Model (Example) ---
    # Find the balanced recommendation
    balanced_rec = None
    try:
        # Use stored result if available
        if "balanced" in recommendation_results:
            rec_result = recommendation_results["balanced"]
            if rec_result.get("recommendations"):
                balanced_rec = rec_result["recommendations"][0]["model"]
        else:
            # Otherwise, try to get a fresh recommendation
            raw_result = await mcp.call_tool("recommend_model", {
                **task_info,
                "priority": "balanced"
            })
            
            # Unpack the result
            rec_result = unpack_tool_result(raw_result)
            
            if rec_result.get("recommendations"):
                balanced_rec = rec_result["recommendations"][0]["model"]
    except Exception as e:
        logger.error(f"Error getting balanced recommendation: {e}", emoji_key="error")
        pass # Ignore errors here, just trying to get a model

    if balanced_rec:
        console.print(Rule(f"[cyan]Executing with Recommended Model ({escape(balanced_rec)})[/cyan]"))
        logger.info(f"Running completion with balanced recommendation: {balanced_rec}", emoji_key="processing")
        
        # Use the new helper function instead of direct API key handling
        await _execute_with_recommended_model(balanced_rec, prompt, estimated_output_tokens, tracker)
    else:
        logger.info("Could not get a balanced recommendation to execute.", emoji_key="info")

    # Display cost summary at the end of the demonstration
    tracker.display_summary(console)


def _get_provider_for_model(model_name: str) -> str:
    """Helper to determine provider from model name (handles prefixed names)."""
    if '/' in model_name:
        # If already prefixed, extract provider
        provider = model_name.split('/')[0]
        # Validate against known providers if necessary
        known_providers = [p.value for p in Provider]
        if provider in known_providers:
            return provider
        else:
            logger.warning(f"Unknown or ambiguous provider prefix in '{model_name}'")
            return None
            
    # Fallback for non-prefixed names (original logic)
    if model_name.startswith("gpt-"):
        return Provider.OPENAI.value
    elif model_name.startswith("claude-"):
        return Provider.ANTHROPIC.value
    elif model_name.startswith("deepseek-"):
        return Provider.DEEPSEEK.value
    elif model_name.startswith("gemini-"):
        return Provider.GEMINI.value
    elif model_name.startswith("grok-"):
        return Provider.GROK.value
        
    # Add specific non-prefixed model checks if needed
    if model_name in ["o1-preview", "o3-mini"]: # Example
         return Provider.OPENAI.value
         
    logger.warning(f"Could not determine provider for model '{model_name}'")
    return None


async def main():
    """Run cost optimization examples."""
    tracker = CostTracker() # Instantiate tracker
    try:
        await demonstrate_cost_optimization(tracker) # Pass tracker
        
    except Exception as e:
        logger.critical(f"Example failed: {str(e)}", emoji_key="critical")
        return 1
    
    return 0


if __name__ == "__main__":
    # Run the examples
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/ums_api/ums_database.py:
--------------------------------------------------------------------------------

```python
"""Database utilities for UMS API."""

import sqlite3
import math
from pathlib import Path
from collections import Counter, defaultdict
from typing import Any, Dict, List

# Database path configuration
def get_database_path() -> str:
    """Get the path to the unified agent memory database."""
    project_root = Path(__file__).resolve().parent.parent.parent.parent
    storage_dir = project_root / "storage"
    return str(storage_dir / "unified_agent_memory.db")


def get_db_connection() -> sqlite3.Connection:
    """
    Return a SQLite connection with row factory.
    
    This function creates a connection to the unified agent memory database
    and configures it with a row factory for easier data access.
    
    Returns:
        sqlite3.Connection: Database connection with row factory configured
    """
    conn = sqlite3.connect(get_database_path())
    conn.row_factory = sqlite3.Row
    return conn


def execute_query(query: str, params: tuple = None) -> list:
    """
    Execute a SELECT query and return results as a list of dictionaries.
    
    Args:
        query: SQL SELECT query to execute
        params: Optional parameters for the query
        
    Returns:
        List of dictionaries representing the query results
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    
    try:
        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)
        
        columns = [description[0] for description in cursor.description]
        results = [dict(zip(columns, row, strict=False)) for row in cursor.fetchall()]
        
        return results
    finally:
        conn.close()


def execute_update(query: str, params: tuple = None) -> int:
    """
    Execute an INSERT, UPDATE, or DELETE query and return the number of affected rows.
    
    Args:
        query: SQL query to execute
        params: Optional parameters for the query
        
    Returns:
        Number of rows affected by the query
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    
    try:
        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)
        
        conn.commit()
        return cursor.rowcount
    finally:
        conn.close()


def ensure_database_exists() -> bool:
    """
    Ensure the database file exists and is accessible.
    
    Returns:
        True if the database exists and is accessible, False otherwise
    """
    try:
        db_path = get_database_path()
        return Path(db_path).exists()
    except Exception:
        return False 
# ---------- Helper Functions for Data Processing ----------
def _dict_depth(d: Dict[str, Any], depth: int = 0) -> int:
    if not isinstance(d, dict) or not d:
        return depth
    return max(_dict_depth(v, depth + 1) for v in d.values())
def _count_values(d: Dict[str, Any]) -> int:
    cnt = 0
    for v in d.values():
        if isinstance(v, dict):
            cnt += _count_values(v)
        elif isinstance(v, list):
            cnt += len(v)
        else:
            cnt += 1
    return cnt
def calculate_state_complexity(state_data: Dict[str, Any]) -> float:
    if not state_data:
        return 0.0
    comp = (
        len(state_data) * 5 + _dict_depth(state_data) * 10 + _count_values(state_data) * 0.5
    )
    return round(min(100.0, comp), 2)
def compute_state_diff(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
    diff = {"added": {}, "removed": {}, "modified": {}, "magnitude": 0.0}
    keys = set(a) | set(b)
    changed = 0
    for k in keys:
        if k not in a:
            diff["added"][k] = b[k]
            changed += 1
        elif k not in b:
            diff["removed"][k] = a[k]
            changed += 1
        elif a[k] != b[k]:
            diff["modified"][k] = {"before": a[k], "after": b[k]}
            changed += 1
    if keys:
        diff["magnitude"] = (changed / len(keys)) * 100
    return diff


# ---------- Timeline Analysis Functions ----------
def generate_timeline_segments(
    timeline_data: List[Dict[str, Any]], granularity: str, hours: int
) -> List[Dict[str, Any]]:
    """Generate timeline segments summarising state counts / complexity over time."""
    if not timeline_data:
        return []

    start_ts = min(item["timestamp"] for item in timeline_data)
    end_ts = max(item["timestamp"] for item in timeline_data)

    seg_seconds = 1 if granularity == "second" else 60 if granularity == "minute" else 3600
    segments: List[Dict[str, Any]] = []
    current = start_ts

    while current < end_ts:
        seg_end = current + seg_seconds
        seg_states = [it for it in timeline_data if current <= it["timestamp"] < seg_end]
        if seg_states:
            segments.append(
                {
                    "start_time": current,
                    "end_time": seg_end,
                    "state_count": len(seg_states),
                    "avg_complexity": sum(s["complexity_score"] for s in seg_states)
                    / len(seg_states),
                    "max_change_magnitude": max(s["change_magnitude"] for s in seg_states),
                    "dominant_type": Counter(
                        s["state_type"] for s in seg_states
                    ).most_common(1)[0][0],
                }
            )
        current = seg_end
    return segments
def calculate_timeline_stats(timeline_data: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Return aggregate stats about timeline complexity / changes."""
    if not timeline_data:
        return {}

    complexities = [it["complexity_score"] for it in timeline_data]
    changes = [it["change_magnitude"] for it in timeline_data if it["change_magnitude"] > 0]
    stypes = Counter(it["state_type"] for it in timeline_data)
    return {
        "avg_complexity": sum(complexities) / len(complexities),
        "max_complexity": max(complexities),
        "avg_change_magnitude": (sum(changes) / len(changes)) if changes else 0,
        "max_change_magnitude": max(changes) if changes else 0,
        "most_common_type": stypes.most_common(1)[0][0] if stypes else None,
        "type_distribution": dict(stypes),
    }

# ---------- Action Monitoring Helper Functions ----------
def get_action_status_indicator(status: str, execution_time: float) -> dict:
    """Get status indicator with color and icon for action status"""
    indicators = {
        "running": {"color": "blue", "icon": "play", "label": "Running"},
        "executing": {"color": "blue", "icon": "cpu", "label": "Executing"},
        "in_progress": {"color": "orange", "icon": "clock", "label": "In Progress"},
        "completed": {"color": "green", "icon": "check", "label": "Completed"},
        "failed": {"color": "red", "icon": "x", "label": "Failed"},
        "cancelled": {"color": "gray", "icon": "stop", "label": "Cancelled"},
        "timeout": {"color": "yellow", "icon": "timer-off", "label": "Timeout"},
    }

    indicator = indicators.get(
        status, {"color": "gray", "icon": "help", "label": "Unknown"}
    )

    # Add urgency flag for long-running actions
    if (
        status in ["running", "executing", "in_progress"] and execution_time > 120
    ):  # 2 minutes
        indicator["urgency"] = "high"
    elif (
        status in ["running", "executing", "in_progress"] and execution_time > 60
    ):  # 1 minute
        indicator["urgency"] = "medium"
    else:
        indicator["urgency"] = "low"

    return indicator
def categorize_action_performance(execution_time: float, estimated_duration: float) -> str:
    """Categorize action performance based on execution time vs estimate"""
    if estimated_duration <= 0:
        return "unknown"

    ratio = execution_time / estimated_duration

    if ratio <= 0.5:
        return "excellent"
    elif ratio <= 0.8:
        return "good"
    elif ratio <= 1.2:
        return "acceptable"
    elif ratio <= 2.0:
        return "slow"
    else:
        return "very_slow"
def get_action_resource_usage(action_id: str) -> dict:
    """Get resource usage for an action (placeholder implementation)"""
    # This is a placeholder - in a real implementation, you'd fetch actual metrics
    return {"cpu_usage": 0.0, "memory_usage": 0.0, "network_io": 0.0, "disk_io": 0.0}
def estimate_wait_time(position: int, queue: list) -> float:
    """Estimate wait time based on queue position and historical data"""
    if position == 0:
        return 0.0
    # Average action time of 30 seconds (this could be calculated from historical data)
    avg_action_time = 30.0
    return position * avg_action_time
def get_priority_label(priority: int) -> str:
    """Get human-readable priority label"""
    if priority <= 1:
        return "Critical"
    elif priority <= 3:
        return "High"
    elif priority <= 5:
        return "Normal"
    elif priority <= 7:
        return "Low"
    else:
        return "Very Low"
def calculate_action_performance_score(action: dict) -> float:
    """Calculate performance score for a completed action"""
    if action["status"] != "completed":
        return 0.0

    execution_time = action.get("execution_duration", 0)
    if execution_time <= 0:
        return 100.0

    if execution_time <= 5:
        return 100.0
    elif execution_time <= 15:
        return 90.0
    elif execution_time <= 30:
        return 80.0
    elif execution_time <= 60:
        return 70.0
    elif execution_time <= 120:
        return 60.0
    else:
        return max(50.0, 100.0 - (execution_time / 10))
def calculate_efficiency_rating(execution_time: float, result_size: int) -> str:
    """Calculate efficiency rating based on time and output"""
    if execution_time <= 0:
        return "unknown"

    efficiency_score = result_size / execution_time if execution_time > 0 else 0

    if efficiency_score >= 100:
        return "excellent"
    elif efficiency_score >= 50:
        return "good"
    elif efficiency_score >= 20:
        return "fair"
    else:
        return "poor"

# ---------- File and Data Utilities ----------
def format_file_size(size_bytes: int) -> str:
    """Format file size in human readable format"""
    if size_bytes == 0:
        return "0 B"

    size_names = ["B", "KB", "MB", "GB", "TB"]
    i = int(math.floor(math.log(size_bytes, 1024)))
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return f"{s} {size_names[i]}"

# ---------- Performance Analysis Functions ----------
def calculate_performance_summary(actions: list) -> dict:
    """Calculate performance summary from action history"""
    if not actions:
        return {
            "avg_score": 0.0,
            "top_performer": None,
            "worst_performer": None,
            "efficiency_distribution": {},
        }

    scores = [a.get("performance_score", 0) for a in actions]
    avg_score = sum(scores) / len(scores)

    best_action = max(actions, key=lambda a: a.get("performance_score", 0))
    worst_action = min(actions, key=lambda a: a.get("performance_score", 0))


    efficiency_counts = Counter(a.get("efficiency_rating", "unknown") for a in actions)

    return {
        "avg_score": round(avg_score, 2),
        "top_performer": {
            "tool_name": best_action.get("tool_name", ""),
            "score": best_action.get("performance_score", 0),
        },
        "worst_performer": {
            "tool_name": worst_action.get("tool_name", ""),
            "score": worst_action.get("performance_score", 0),
        },
        "efficiency_distribution": dict(efficiency_counts),
    }
def generate_performance_insights(
    overall_stats: dict, tool_stats: list, hourly_metrics: list
) -> list:
    """Generate actionable performance insights"""
    insights = []

    success_rate = (
        overall_stats.get("successful_actions", 0) / overall_stats.get("total_actions", 1)
    ) * 100
    if success_rate < 80:
        insights.append(
            {
                "type": "warning",
                "title": "Low Success Rate",
                "message": f"Current success rate is {success_rate:.1f}%. Consider investigating failing tools.",
                "severity": "high",
            }
        )

    if tool_stats:
        slowest_tool = max(tool_stats, key=lambda t: t.get("avg_duration", 0))
        if slowest_tool.get("avg_duration", 0) > 60:
            insights.append(
                {
                    "type": "info",
                    "title": "Performance Optimization",
                    "message": f"{slowest_tool['tool_name']} is taking {slowest_tool['avg_duration']:.1f}s on average. Consider optimization.",
                    "severity": "medium",
                }
            )

    if hourly_metrics:
        peak_hour = max(hourly_metrics, key=lambda h: h.get("action_count", 0))
        insights.append(
            {
                "type": "info",
                "title": "Peak Usage",
                "message": f"Peak usage occurs at {peak_hour['hour']}:00 with {peak_hour['action_count']} actions.",
                "severity": "low",
            }
        )

    return insights


# ---------- Cognitive Pattern Analysis Functions ----------
def find_cognitive_patterns(
    states: List[Dict[str, Any]], min_length: int, similarity_threshold: float
) -> List[Dict[str, Any]]:
    """Find recurring patterns in cognitive states"""
    patterns = []
    from collections import defaultdict

    type_sequences = defaultdict(list)
    for state in states:
        type_sequences[state["state_type"]].append(state)
    for state_type, sequence in type_sequences.items():
        if len(sequence) >= min_length * 2:
            for length in range(min_length, len(sequence) // 2 + 1):
                for start in range(len(sequence) - length * 2 + 1):
                    subseq1 = sequence[start : start + length]
                    subseq2 = sequence[start + length : start + length * 2]
                    similarity = calculate_sequence_similarity(subseq1, subseq2)
                    if similarity >= similarity_threshold:
                        patterns.append(
                            {
                                "type": f"repeating_{state_type}",
                                "length": length,
                                "similarity": similarity,
                                "occurrences": 2,
                                "first_occurrence": subseq1[0]["timestamp"],
                                "pattern_description": f"Repeating {state_type} sequence of {length} states",
                            }
                        )
    return sorted(patterns, key=lambda p: p["similarity"], reverse=True)
def calculate_sequence_similarity(
    seq1: List[Dict[str, Any]], seq2: List[Dict[str, Any]]
) -> float:
    """Calculate similarity between two state sequences"""
    if len(seq1) != len(seq2):
        return 0.0
    total_similarity = 0.0
    for s1, s2 in zip(seq1, seq2, strict=False):
        state_sim = calculate_single_state_similarity(s1, s2)
        total_similarity += state_sim
    return total_similarity / len(seq1)
def calculate_single_state_similarity(
    state1: Dict[str, Any], state2: Dict[str, Any]
) -> float:
    """Calculate similarity between two individual states"""
    data1 = state1.get("state_data", {})
    data2 = state2.get("state_data", {})
    if not data1 and not data2:
        return 1.0
    if not data1 or not data2:
        return 0.0
    keys1 = set(data1.keys())
    keys2 = set(data2.keys())
    key_similarity = len(keys1 & keys2) / len(keys1 | keys2) if keys1 | keys2 else 1.0
    common_keys = keys1 & keys2
    value_similarity = 0.0
    if common_keys:
        matching_values = sum(1 for key in common_keys if data1[key] == data2[key])
        value_similarity = matching_values / len(common_keys)
    return (key_similarity + value_similarity) / 2
def analyze_state_transitions(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Analyze transitions between cognitive states"""
    from collections import defaultdict

    transitions = defaultdict(int)
    for i in range(len(states) - 1):
        current_type = states[i]["state_type"]
        next_type = states[i + 1]["state_type"]
        transition = f"{current_type} → {next_type}"
        transitions[transition] += 1
    sorted_transitions = sorted(transitions.items(), key=lambda x: x[1], reverse=True)
    return [
        {
            "transition": transition,
            "count": count,
            "percentage": (count / (len(states) - 1)) * 100 if len(states) > 1 else 0,
        }
        for transition, count in sorted_transitions
    ]
def detect_cognitive_anomalies(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Detect anomalous cognitive states"""
    anomalies = []
    if len(states) < 3:
        return anomalies
    complexities = [calculate_state_complexity(s.get("state_data", {})) for s in states]
    avg_complexity = sum(complexities) / len(complexities)
    std_complexity = (
        sum((c - avg_complexity) ** 2 for c in complexities) / len(complexities)
    ) ** 0.5
    for i, state in enumerate(states):
        complexity = complexities[i]
        z_score = (
            (complexity - avg_complexity) / std_complexity if std_complexity > 0 else 0
        )
        if abs(z_score) > 2:
            anomalies.append(
                {
                    "state_id": state["state_id"],
                    "timestamp": state["timestamp"],
                    "anomaly_type": "complexity_outlier",
                    "z_score": z_score,
                    "description": f"Unusual complexity: {complexity:.1f} (avg: {avg_complexity:.1f})",
                    "severity": "high" if abs(z_score) > 3 else "medium",
                }
            )
    return anomalies

# ---------- Pattern analysis models ----------

```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/single_shot_synthesis.py:
--------------------------------------------------------------------------------

```python
# --- tools/single_shot_synthesis.py (NEW) ---
import asyncio
import json
import random
import re
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional

from ultimate_mcp_server.exceptions import ToolError
from pydantic import ValidationError

from ultimate_mcp_server.core.models.tournament import (  # Reusing models from tournament where appropriate
    SingleShotGeneratorModelConfig,
    SingleShotIndividualResponse,
    SingleShotSynthesisInput,
    SingleShotSynthesisOutput,
)
from ultimate_mcp_server.core.tournaments.utils import extract_thinking
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.tools.completion import generate_completion
from ultimate_mcp_server.tools.extraction import extract_code_from_response
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.single_shot_synthesis")

STORAGE_DIR_BASE = Path(__file__).resolve().parent.parent.parent / "storage" / "single_shot_synthesis"
STORAGE_DIR_BASE.mkdir(parents=True, exist_ok=True)

def _get_single_shot_storage_path(name: str, request_id: str) -> Path:
    timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    safe_name = re.sub(r'[^\w\s-]', '', name).strip().replace(' ', '_')
    safe_name = re.sub(r'[-\s]+', '-', safe_name)[:50]
    uuid_suffix = request_id.split('-')[0]
    folder_name = f"{timestamp_str}_{safe_name}_{uuid_suffix}"
    path = STORAGE_DIR_BASE / folder_name
    path.mkdir(parents=True, exist_ok=True)
    return path

async def _generate_expert_response(
    prompt: str,
    config: SingleShotGeneratorModelConfig,
    global_retry_config: Optional[Dict] = None # e.g. {"max_retries": 2, "backoff_base": 1.0}
) -> SingleShotIndividualResponse:
    
    start_time = time.monotonic()
    response_data = SingleShotIndividualResponse(model_id=config.model_id)
    
    # Simple retry logic here, could be more sophisticated or rely on generate_completion
    max_retries = global_retry_config.get("max_retries", 1) if global_retry_config else 1
    backoff_base = global_retry_config.get("backoff_base", 1.0) if global_retry_config else 1.0
    
    # Determine provider and apply specific logic
    derived_provider = None
    if '/' in config.model_id:
        provider_prefix = config.model_id.split('/')[0].lower()
        if provider_prefix == "google":
            derived_provider = "gemini"  # Map 'google' to 'gemini'
        elif provider_prefix == "anthropic":
            derived_provider = "anthropic"
        # Add other explicit mappings here if needed in the future
        else:
            derived_provider = provider_prefix # Default to the prefix as is
    
    current_max_tokens = config.max_tokens
    if derived_provider == "anthropic" and current_max_tokens is None:
        logger.info(f"Anthropic model {config.model_id} called without max_tokens, defaulting to 2048 for timeout calculation.")
        current_max_tokens = 2048 # Default for Anthropic if not specified to prevent TypeError

    for attempt in range(max_retries + 1):
        try:
            # provider was originally derived inside the loop, now passed explicitly
            completion_result = await generate_completion(
                prompt=prompt,
                model=config.model_id,
                provider=derived_provider, # Use the determined/mapped provider
                temperature=config.temperature,
                max_tokens=current_max_tokens, # Use the potentially defaulted max_tokens
                # TODO: Add seed if SingleShotGeneratorModelConfig includes it
            )

            response_data.metrics["cost"] = completion_result.get("cost", 0.0)
            response_data.metrics["input_tokens"] = completion_result.get("tokens", {}).get("input")
            response_data.metrics["output_tokens"] = completion_result.get("tokens", {}).get("output")
            response_data.metrics["api_latency_ms"] = int(completion_result.get("processing_time", 0) * 1000)
            response_data.metrics["api_model_id_used"] = completion_result.get("model", config.model_id)

            if completion_result.get("success"):
                response_data.response_text = completion_result.get("text")
                break # Success
            else:
                response_data.error = completion_result.get("error", f"Generation failed on attempt {attempt+1}")
                if attempt == max_retries: # Last attempt
                    logger.error(f"Expert {config.model_id} failed after {max_retries+1} attempts: {response_data.error}")

        except Exception as e:
            logger.error(f"Exception during expert call {config.model_id} (attempt {attempt+1}): {e}", exc_info=True)
            response_data.error = str(e)
            if attempt == max_retries:
                logger.error(f"Expert {config.model_id} failed with exception after {max_retries+1} attempts.")
        
        if attempt < max_retries:
            sleep_duration = random.uniform(backoff_base, backoff_base * 1.5) * (2 ** attempt)
            sleep_duration = min(sleep_duration, 15.0) # Max 15s sleep
            logger.info(f"Expert {config.model_id} attempt {attempt+1} failed. Retrying in {sleep_duration:.2f}s...")
            await asyncio.sleep(sleep_duration)
            
    response_data.metrics["total_task_time_ms"] = int((time.monotonic() - start_time) * 1000)
    return response_data


@with_tool_metrics # This decorator will add its own overall metrics
@with_error_handling
async def single_shot_synthesis(
    name: str,
    prompt: str,
    expert_models: List[Dict[str, Any]], # CHANGED from expert_model_configs
    synthesizer_model: Dict[str, Any],   # CHANGED from synthesizer_model_config
    tournament_type: Literal["code", "text"] = "text",
    synthesis_instructions: Optional[str] = None
) -> Dict[str, Any]:
    """
    Performs a single-shot, multi-model synthesis:
    1. Sends the prompt to multiple "expert" LLMs in parallel.
    2. Collects their responses.
    3. Feeds the original prompt and all expert responses to a "synthesizer" LLM.
    4. The synthesizer LLM produces a final, fused response.
    Useful for quick brainstorming and merging diverse perspectives.

    Args:
        name: A descriptive name for this synthesis task (e.g., "Product Description Brainstorm").
        prompt: The initial challenge prompt or question for all expert models.
        expert_models: A list of configurations for the "expert" models. Each config is a dict:
            - model_id (str, required): e.g., 'openai/gpt-3.5-turbo'.
            - temperature (float, optional): Model-specific temperature.
            - max_tokens (int, optional): Model-specific max tokens.
        synthesizer_model: Configuration for the "synthesizer" model. Dict fields:
            - model_id (str, required, default 'anthropic/claude-3-7-sonnet-20250219'): e.g., 'google/gemini-1.5-pro-latest'.
            - temperature (float, optional): Synthesizer-specific temperature.
            - max_tokens (int, optional): Synthesizer-specific max tokens.
            - system_prompt (str, optional): System prompt for the synthesizer.
        tournament_type: 'code' or 'text'. Influences synthesis instructions and output processing (default 'text').
        synthesis_instructions: Custom instructions for the synthesizer. If None, default instructions are used.

    Returns:
        A dictionary containing the request_id, status, individual expert responses,
        the synthesized response, metrics, and storage path for artifacts.
    """
    task_start_time = time.monotonic()
    request_id = str(uuid.uuid4())
    storage_path = _get_single_shot_storage_path(name, request_id)
    
    output_data = SingleShotSynthesisOutput(
        request_id=request_id,
        name=name,
        status="FAILED", # Default to FAILED, update on success
        expert_responses=[],
        storage_path=str(storage_path)
    )

    try:
        # Validate inputs using Pydantic model with aliases
        validated_input = SingleShotSynthesisInput(
            name=name,
            prompt=prompt,
            expert_models=expert_models, # Pass using alias
            synthesizer_model=synthesizer_model, # Pass using alias
            tournament_type=tournament_type,
            synthesis_instructions=synthesis_instructions
        )
        parsed_expert_configs = validated_input.expert_model_configs
        parsed_synthesizer_config = validated_input.synthesizer_model_config
        retry_cfg_experts = {"max_retries": 1, "backoff_base": 1.0} 

    except ValidationError as e:
        output_data.error_message = f"Input validation error: {e.errors()}"
        output_data.total_metrics["total_task_time_ms"] = int((time.monotonic() - task_start_time) * 1000)
        logger.error(f"SingleShotSynthesis input validation error for '{name}': {e.json(indent=2)}")
        raise ToolError(f"Invalid input for single_shot_synthesis: {e.errors()}", status_code=400) from e


    # 1. Parallel fan-out to expert models
    logger.info(f"[{request_id}] Starting expert model responses for '{name}'. Count: {len(parsed_expert_configs)}")
    expert_tasks = [
        _generate_expert_response(prompt, config, retry_cfg_experts) for config in parsed_expert_configs
    ]
    output_data.expert_responses = await asyncio.gather(*expert_tasks, return_exceptions=False) # Exceptions handled in _generate_expert_response

    # Persist expert responses
    for i, resp in enumerate(output_data.expert_responses):
        expert_file_name = f"expert_{i+1}_{re.sub(r'[^a-zA-Z0-9_-]', '_', resp.model_id)}.md"
        expert_file_path = storage_path / expert_file_name
        content = f"# Expert Response: {resp.model_id}\n\n"
        content += f"## Metrics\nCost: ${resp.metrics.get('cost',0):.6f}\nLatency: {resp.metrics.get('api_latency_ms','N/A')}ms\n\n"
        if resp.error:
            content += f"## Error\n```\n{resp.error}\n```\n"
        if resp.response_text:
            content += f"## Response Text\n```\n{resp.response_text}\n```\n"
        expert_file_path.write_text(content, encoding='utf-8')

    # 2. Aggregation prompt builder
    agg_prompt_parts = [f"You are an advanced AI tasked with synthesizing a definitive answer based on multiple expert inputs.\n\nOriginal Problem/Prompt:\n---\n{prompt}\n---\n"]
    agg_prompt_parts.append("Below are responses from several expert models. Review them critically:\n")

    for i, resp in enumerate(output_data.expert_responses):
        agg_prompt_parts.append(f"-- Response from Expert Model {i+1} ({resp.model_id}) --")
        if resp.response_text and not resp.error:
            agg_prompt_parts.append(resp.response_text)
        elif resp.error:
            agg_prompt_parts.append(f"[This model encountered an error: {resp.error}]")
        else:
            agg_prompt_parts.append("[This model provided no content.]")
        agg_prompt_parts.append("-------------------------------------\n")
    
    # Synthesis Instructions
    if synthesis_instructions:
        agg_prompt_parts.append(f"\nSynthesis Instructions:\n{synthesis_instructions}\n")
    else: # Default instructions
        default_instr = "Your Task:\n1. Identify unique insights, complementary information, and key arguments from each expert response.\n"
        default_instr += "2. Resolve any contradictions or discrepancies, prioritizing verifiable facts and logical consistency.\n"
        default_instr += "3. Produce a single, coherent, and comprehensive response that is strictly superior to any individual expert response.\n"
        if tournament_type == "code":
            default_instr += "4. If the task involves code, provide ONLY the complete, final code block (e.g., ```python ... ```). Do not include explanations outside of code comments.\n"
        else: # Text
            default_instr += "4. You MAY optionally begin your output with a brief <thinking>...</thinking> block explaining your synthesis strategy, then provide the final synthesized text.\n"
        default_instr += "\n### Final Synthesized Answer:\n"
        agg_prompt_parts.append(default_instr)
        
    aggregation_prompt = "\n".join(agg_prompt_parts)
    (storage_path / "synthesis_prompt.md").write_text(aggregation_prompt, encoding='utf-8')


    # 3. Fan-in call to synthesizer_id
    logger.info(f"[{request_id}] Requesting synthesis from {parsed_synthesizer_config.model_id} for '{name}'.")
    synthesizer_success = False
    try:
        # Simple retry for synthesizer
        retry_cfg_synth = {"max_retries": 1, "backoff_base": 2.0} 
        synth_response_raw = await _generate_expert_response( # Reuse for basic call structure
            aggregation_prompt,
            SingleShotGeneratorModelConfig( # Adapt to expected input
                model_id=parsed_synthesizer_config.model_id,
                temperature=parsed_synthesizer_config.temperature,
                max_tokens=parsed_synthesizer_config.max_tokens
            ),
            retry_cfg_synth
        )
        output_data.synthesizer_metrics = synth_response_raw.metrics
        
        if synth_response_raw.response_text and not synth_response_raw.error:
            output_data.synthesized_response_text = synth_response_raw.response_text
            output_data.synthesizer_thinking_process = await extract_thinking(output_data.synthesized_response_text)
            if output_data.synthesizer_thinking_process and output_data.synthesized_response_text:
                 # Remove thinking block from main response if present
                 output_data.synthesized_response_text = output_data.synthesized_response_text.replace(f"<thinking>{output_data.synthesizer_thinking_process}</thinking>", "").strip()

            if tournament_type == "code":
                # Extraction model can be fixed or configurable for this tool
                extraction_model_id = "anthropic/claude-3-5-haiku-20241022" # Example default
                code_extraction_result = await extract_code_from_response(
                    response_text=output_data.synthesized_response_text,
                    language_hint="python", # Assuming Python for now
                    extraction_model_id=extraction_model_id
                )
                if code_extraction_result.get("success"):
                    output_data.synthesized_extracted_code = code_extraction_result.get("code_block")
                else: # Log if extraction failed but don't mark whole thing as failure
                    logger.warning(f"[{request_id}] Code extraction from synthesizer output failed: {code_extraction_result.get('error')}")
            synthesizer_success = True
        else:
            output_data.error_message = f"Synthesizer ({parsed_synthesizer_config.model_id}) failed: {synth_response_raw.error or 'No response text'}"
            logger.error(f"[{request_id}] {output_data.error_message}")
            
    except Exception as e:
        output_data.error_message = f"Exception during synthesis call: {str(e)}"
        logger.error(f"[{request_id}] {output_data.error_message}", exc_info=True)

    # 4. Persist synthesized artifact
    synth_content = f"# Synthesized Response\n\n**Synthesizer Model:** {parsed_synthesizer_config.model_id}\n"
    synth_content += f"## Metrics\nCost: ${output_data.synthesizer_metrics.get('cost',0):.6f}\nLatency: {output_data.synthesizer_metrics.get('api_latency_ms','N/A')}ms\n\n"
    if output_data.synthesizer_thinking_process:
        synth_content += f"## Thinking Process\n```\n{output_data.synthesizer_thinking_process}\n```\n"
    if tournament_type == "code" and output_data.synthesized_extracted_code:
        synth_content += f"## Extracted Code\n```python\n{output_data.synthesized_extracted_code}\n```\n"
    elif output_data.synthesized_response_text: # Fallback to full text if code not extracted or text type
        synth_content += f"## Response Text\n```\n{output_data.synthesized_response_text}\n```\n"
    if output_data.error_message and not synthesizer_success : # If synthesizer specifically failed
         synth_content += f"\n## Synthesizer Error\n```\n{output_data.error_message}\n```"

    (storage_path / "synthesized_response.md").write_text(synth_content, encoding='utf-8')
    if tournament_type == "code" and output_data.synthesized_extracted_code:
        (storage_path / "synthesized_code.py").write_text(output_data.synthesized_extracted_code, encoding='utf-8')


    # 5. Finalize output
    output_data.status = "SUCCESS" if synthesizer_success else ("PARTIAL_SUCCESS" if any(er.response_text for er in output_data.expert_responses) else "FAILED")
    if not synthesizer_success and not output_data.error_message: # General failure if no specific error
        output_data.error_message = output_data.error_message or "Synthesis process encountered an issue."

    # Aggregate total metrics
    total_cost_agg = output_data.synthesizer_metrics.get("cost", 0.0)
    total_input_tokens_agg = output_data.synthesizer_metrics.get("input_tokens", 0) or 0
    total_output_tokens_agg = output_data.synthesizer_metrics.get("output_tokens", 0) or 0
    
    for resp in output_data.expert_responses:
        total_cost_agg += resp.metrics.get("cost", 0.0)
        total_input_tokens_agg += resp.metrics.get("input_tokens", 0) or 0
        total_output_tokens_agg += resp.metrics.get("output_tokens", 0) or 0
        
    output_data.total_metrics = {
        "total_cost": total_cost_agg,
        "total_input_tokens": total_input_tokens_agg,
        "total_output_tokens": total_output_tokens_agg,
        "overall_task_time_ms": int((time.monotonic() - task_start_time) * 1000)
    }
    
    # Save overall metrics
    (storage_path / "overall_metrics.json").write_text(json.dumps(output_data.total_metrics, indent=2), encoding='utf-8')
    
    logger.info(f"[{request_id}] Single-shot synthesis '{name}' completed. Status: {output_data.status}. Cost: ${total_cost_agg:.4f}")
    return output_data.model_dump()

```

--------------------------------------------------------------------------------
/examples/prompt_templates_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Prompt templates and repository demonstration for Ultimate MCP Server."""
import asyncio
import sys
import time
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.services.prompts import PromptTemplate, get_prompt_repository
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker, display_text_content_result

# --- Add Rich Imports ---
from ultimate_mcp_server.utils.logging.console import console

# ----------------------

# Initialize logger
logger = get_logger("example.prompt_templates")


async def demonstrate_prompt_templates():
    """Demonstrate prompt template creation and rendering."""
    # Use Rich Rule for title
    console.print(Rule("[bold blue]Prompt Template Demonstration[/bold blue]"))
    logger.info("Starting prompt template demonstration", emoji_key="start")
    
    # Simple prompt template
    template_text = """
You are an expert in {{field}}. 
Please explain {{concept}} in simple terms that a {{audience}} could understand.
"""

    # Create a prompt template
    template = PromptTemplate(
        template=template_text,
        template_id="simple_explanation",
        description="A template for generating simple explanations of concepts"
    )
    
    logger.info(
        f"Created prompt template: {template.template_id}",
        emoji_key="template"
    )
    
    # Render the template with variables
    variables = {
        "field": "artificial intelligence",
        "concept": "neural networks",
        "audience": "high school student"
    }
    
    rendered_prompt = template.render(variables)
    
    logger.info(
        "Template rendered successfully",
        emoji_key="success",
        variables=list(variables.keys())
    )
    
    # Display rendered template using Rich
    console.print(Rule("[cyan]Simple Template Rendering[/cyan]"))
    console.print(Panel(
        Syntax(template.template, "jinja2", theme="default", line_numbers=False),
        title="[bold]Template Source[/bold]",
        border_style="dim blue",
        expand=False
    ))
    vars_table = Table(title="[bold]Variables[/bold]", box=box.MINIMAL, show_header=False)
    vars_table.add_column("Key", style="magenta")
    vars_table.add_column("Value", style="white")
    for key, value in variables.items():
        vars_table.add_row(escape(key), escape(value))
    console.print(vars_table)
    console.print(Panel(
        escape(rendered_prompt.strip()), 
        title="[bold green]Rendered Prompt[/bold green]", 
        border_style="green",
        expand=False
    ))
    console.print()

    
    # Create a more complex template with conditional blocks
    complex_template_text = """
{% if system_message %}
{{system_message}}
{% else %}
You are a helpful assistant that provides accurate information.
{% endif %}

{% if context %}
Here is some context to help you answer:
{{context}}
{% endif %}

USER: {{query}}

Please respond with:
{% for item in response_items %}
- {{item}}
{% endfor %}
"""
    
    complex_template_obj = PromptTemplate(
        template=complex_template_text, # Use the text variable
        template_id="complex_assistant",
        description="A complex assistant template with conditionals and loops",
        required_vars=["system_message", "query", "response_items", "context"] 
    )
    
    # Complex variables
    complex_variables = {
        "system_message": "You are an expert in climate science who explains concepts clearly and objectively.",
        "query": "What are the main causes of climate change?",
        "context": """
Recent data shows that global temperatures have risen by about 1.1°C since pre-industrial times.
The IPCC Sixth Assessment Report (2021) states that human activities are unequivocally the main driver
of climate change, primarily through greenhouse gas emissions. CO2 levels have increased by 48% since 
the industrial revolution, reaching levels not seen in at least 800,000 years.
""",
        "response_items": [
            "A summary of the main causes based on scientific consensus",
            "The role of greenhouse gases (CO2, methane, etc.) in climate change",
            "Human activities that contribute most significantly to emissions",
            "Natural vs anthropogenic factors and their relative impact",
            "Regional variations in climate change impacts"
        ]
    }
    
    complex_rendered = complex_template_obj.render(complex_variables)
    
    logger.info(
        "Complex template rendered successfully",
        emoji_key="success",
        template_id=complex_template_obj.template_id
    )
    
    # Display complex template rendering using Rich
    console.print(Rule("[cyan]Complex Template Rendering[/cyan]"))
    console.print(Panel(
        Syntax(complex_template_obj.template, "jinja2", theme="default", line_numbers=False),
        title="[bold]Template Source[/bold]",
        border_style="dim blue",
        expand=False
    ))
    complex_vars_table = Table(title="[bold]Variables[/bold]", box=box.MINIMAL, show_header=False)
    complex_vars_table.add_column("Key", style="magenta")
    complex_vars_table.add_column("Value", style="white")
    for key, value in complex_variables.items():
         # Truncate long context for display
        display_value = escape(str(value))
        if key == 'context' and len(display_value) > 150:
            display_value = display_value[:150] + '...'
        elif isinstance(value, list):
             display_value = escape(str(value)[:100] + '...' if len(str(value)) > 100 else str(value)) # Truncate lists too
        complex_vars_table.add_row(escape(key), display_value)
    console.print(complex_vars_table)
    console.print(Panel(
        escape(complex_rendered.strip()), 
        title="[bold green]Rendered Prompt[/bold green]", 
        border_style="green",
        expand=False
    ))
    console.print()
    
    # Demonstrate rendering with missing variables (handled by Jinja's default behavior or errors)
    console.print(Rule("[cyan]Template with Missing Variables[/cyan]"))
    missing_variables = {
        "query": "How can individuals reduce their carbon footprint?",
        "response_items": [
            "Daily lifestyle changes with significant impact",
            "Transportation choices and alternatives",
            "Home energy consumption reduction strategies"
        ]
        # system_message and context are intentionally missing
    }
    
    try:
        missing_rendered = complex_template_obj.render(missing_variables)
        logger.info(
            "Template rendered with missing optional variables (using defaults)",
            emoji_key="info",
            missing=["system_message", "context"]
        )
        console.print(Panel(
            escape(missing_rendered.strip()), 
            title="[bold yellow]Rendered with Defaults[/bold yellow]", 
            border_style="yellow",
            expand=False
        ))
    except Exception as e: # Catch Jinja exceptions or others
        logger.warning(f"Could not render with missing variables: {str(e)}", emoji_key="warning")
        console.print(Panel(
            f"[red]Error rendering template:[/red]\n{escape(str(e))}", 
            title="[bold red]Rendering Error[/bold red]", 
            border_style="red",
            expand=False
        ))
    console.print()

    return template, complex_template_obj


async def demonstrate_prompt_repository():
    """Demonstrate saving and retrieving templates from repository."""
    # Use Rich Rule
    console.print(Rule("[bold blue]Prompt Repository Demonstration[/bold blue]"))
    logger.info("Starting prompt repository demonstration", emoji_key="start")
    
    # Get repository
    repo = get_prompt_repository()
    
    # Check repository path
    logger.info(f"Prompt repository path: {repo.base_dir}", emoji_key="info")
    
    # List existing prompts (if any)
    prompts = await repo.list_prompts()
    if prompts:
        logger.info(f"Found {len(prompts)} existing prompts: {', '.join(prompts)}", emoji_key="info")
    else:
        logger.info("No existing prompts found in repository", emoji_key="info")
    
    # Create a new prompt template for saving
    translation_template = """
Translate the following {{source_language}} text into {{target_language}}:

TEXT: {{text}}

The translation should be:
- Accurate and faithful to the original
- Natural in the target language
- Preserve the tone and style of the original

TRANSLATION:
"""
    
    template = PromptTemplate(
        template=translation_template,
        template_id="translation_prompt",
        description="A template for translation tasks",
        metadata={
            "author": "Ultimate MCP Server",
            "version": "1.0",
            "supported_languages": ["English", "Spanish", "French", "German", "Japanese"]
        }
    )
    
    # Save to repository
    template_dict = template.to_dict()
    
    logger.info(
        f"Saving template '{template.template_id}' to repository",
        emoji_key="save",
        metadata=template.metadata
    )
    
    save_result = await repo.save_prompt(template.template_id, template_dict)
    
    if save_result:
        logger.success(
            f"Template '{template.template_id}' saved successfully",
            emoji_key="success"
        )
    else:
        logger.error(
            f"Failed to save template '{template.template_id}'",
            emoji_key="error"
        )
        return
    
    # Retrieve the saved template
    logger.info(f"Retrieving template '{template.template_id}' from repository", emoji_key="loading")
    
    retrieved_dict = await repo.get_prompt(template.template_id)
    
    if retrieved_dict:
        # Convert back to PromptTemplate object
        retrieved_template = PromptTemplate.from_dict(retrieved_dict)
        
        logger.success(
            f"Retrieved template '{retrieved_template.template_id}' successfully",
            emoji_key="success",
            metadata=retrieved_template.metadata
        )
        
        # Display retrieved template details using Rich
        retrieved_table = Table(title=f"[bold]Retrieved Template: {escape(retrieved_template.template_id)}[/bold]", box=box.ROUNDED, show_header=False)
        retrieved_table.add_column("Attribute", style="cyan")
        retrieved_table.add_column("Value", style="white")
        retrieved_table.add_row("Description", escape(retrieved_template.description))
        retrieved_table.add_row("Metadata", escape(str(retrieved_template.metadata)))
        console.print(retrieved_table)
        console.print(Panel(
            Syntax(retrieved_template.template, "jinja2", theme="default", line_numbers=False),
            title="[bold]Template Source[/bold]",
            border_style="dim blue",
            expand=False
        ))
        console.print()
        
    else:
        logger.error(
            f"Failed to retrieve template '{template.template_id}'",
            emoji_key="error"
        )
    
    # List prompts again to confirm addition
    updated_prompts = await repo.list_prompts()
    logger.info(
        f"Repository now contains {len(updated_prompts)} prompts: {', '.join(updated_prompts)}",
        emoji_key="info"
    )
    
    # Comment out the deletion to keep the template for the LLM demo
    # Uncommenting the below would delete the template
    """
    delete_result = await repo.delete_prompt(template.template_id)
    if delete_result:
        logger.info(
            f"Deleted template '{template.template_id}' from repository",
            emoji_key="cleaning"
        )
    """

    return retrieved_template


async def demonstrate_llm_with_templates(tracker: CostTracker):
    """Demonstrate using a template from the repository with an LLM."""
    # Use Rich Rule
    console.print(Rule("[bold blue]LLM with Template Demonstration[/bold blue]"))
    logger.info("Starting LLM with template demonstration", emoji_key="start")

    # Retrieve the translation template saved earlier
    repo = get_prompt_repository()
    template_id = "translation_prompt"
    template_dict = await repo.get_prompt(template_id)
    
    if not template_dict:
        console.print(f"Prompt '{template_id}' not found")
        logger.error(f"Template '{template_id}' not found. Skipping LLM demo.", emoji_key="error")
        return
        
    template = PromptTemplate.from_dict(template_dict)
    logger.info(f"Retrieved template '{template_id}' for LLM use", emoji_key="template")

    # Variables for translation
    translation_vars = {
        "source_language": "English",
        "target_language": "French",
        "text": "The quick brown fox jumps over the lazy dog."
    }
    
    # Render the prompt
    try:
        rendered_prompt = template.render(translation_vars)
        logger.info("Translation prompt rendered", emoji_key="success")
        
        # Display the rendered prompt for clarity
        console.print(Panel(
            escape(rendered_prompt.strip()),
            title="[bold]Rendered Translation Prompt[/bold]",
            border_style="blue",
            expand=False
        ))
        
    except Exception as e:
        logger.error(f"Error rendering translation prompt: {str(e)}", emoji_key="error", exc_info=True)
        return
    
    # Initialize gateway with providers
    gateway = Gateway("prompt-templates-demo", register_tools=False)
    logger.info("Initializing providers...", emoji_key="provider")
    await gateway._initialize_providers()
    
    # Providers to try in order of preference
    providers_to_try = [
        Provider.OPENAI.value,
        Provider.ANTHROPIC.value,
        Provider.GEMINI.value,
        Provider.DEEPSEEK.value
    ]
    
    # Find an available provider
    provider = None
    provider_name = None
    
    for p_name in providers_to_try:
        if p_name in gateway.providers:
            provider = gateway.providers[p_name]
            provider_name = p_name
            logger.info(f"Using provider {p_name}", emoji_key="provider")
            break

    try:
        model = provider.get_default_model()
        logger.info(f"Using provider {provider_name} with model {model}", emoji_key="provider")
        
        # Generate completion using the rendered prompt
        logger.info("Generating translation...", emoji_key="processing")
        start_time = time.time()
        result = await provider.generate_completion(
            prompt=rendered_prompt,
            model=model,
            temperature=0.5,
            max_tokens=150
        )
        processing_time = time.time() - start_time
        
        logger.success("Translation generated successfully!", emoji_key="success")

        # Use display.py function for better visualization
        display_text_content_result(
            f"Translation Result ({escape(provider_name)}/{escape(model)})",
            result,
            console_instance=console
        )
        
        # Track cost
        tracker.add_call(result)

        # Display additional stats with standard rich components
        stats_table = Table(title="Translation Stats", show_header=False, box=box.ROUNDED)
        stats_table.add_column("Metric", style="cyan")
        stats_table.add_column("Value", style="white")
        stats_table.add_row("Provider", provider_name)
        stats_table.add_row("Model", model)
        stats_table.add_row("Input Tokens", str(result.input_tokens))
        stats_table.add_row("Output Tokens", str(result.output_tokens))
        stats_table.add_row("Cost", f"${result.cost:.6f}")
        stats_table.add_row("Processing Time", f"{processing_time:.3f}s")
        console.print(stats_table)
        
    except Exception as e:
        logger.error(f"Error during LLM completion: {str(e)}", emoji_key="error", exc_info=True)
        # Fall back to mock response
        console.print(Panel(
            "[yellow]Failed to generate real translation. Here's a mock response:[/yellow]\n" +
            "Le renard brun rapide saute par-dessus le chien paresseux.",
            title="[bold yellow]Mock Translation (After Error)[/bold yellow]",
            border_style="yellow"
        ))

    # Display cost summary at the end of this demo section
    tracker.display_summary(console)


async def main():
    """Run all demonstrations."""
    try:
        # Demonstrate template creation and rendering
        template1, template2 = await demonstrate_prompt_templates()
        console.print() # Add space
        
        # Demonstrate repository usage
        retrieved_template = await demonstrate_prompt_repository()  # noqa: F841
        console.print()
        
        # Demonstrate using a template with LLM - no longer check for retrieved_template
        # as it should always be available since we commented out the deletion
        tracker = CostTracker() # Instantiate tracker here
        await demonstrate_llm_with_templates(tracker)
            
    except Exception as e:
        logger.critical(f"Demo failed: {str(e)}", emoji_key="critical", exc_info=True)
        return 1
    
    # Clean up after demo is complete - optionally delete the template
    try:
        # After demo is complete, we can clean up by deleting the template
        repo = get_prompt_repository()
        await repo.delete_prompt("translation_prompt")
        logger.info("Deleted demonstration template", emoji_key="cleaning")
    except Exception as e:
        logger.warning(f"Cleanup error: {str(e)}", emoji_key="warning")
    
    logger.success("Prompt Template Demo Finished Successfully!", emoji_key="complete")
    return 0


if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/tests/unit/test_providers.py:
--------------------------------------------------------------------------------

```python
"""Tests for the provider implementations."""
from typing import Any, Dict

import pytest
from pytest import MonkeyPatch

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.anthropic import AnthropicProvider
from ultimate_mcp_server.core.providers.base import (
    BaseProvider,
    ModelResponse,
    get_provider,
)
from ultimate_mcp_server.core.providers.deepseek import DeepSeekProvider
from ultimate_mcp_server.core.providers.gemini import GeminiProvider
from ultimate_mcp_server.core.providers.openai import OpenAIProvider
from ultimate_mcp_server.utils import get_logger

logger = get_logger("test.providers")

# Set the loop scope for all tests - function scope is recommended for isolated test execution
pytestmark = pytest.mark.asyncio(loop_scope="function")


class TestBaseProvider:
    """Tests for the base provider class."""
    
    def test_init(self):
        """Test provider initialization."""
        logger.info("Testing base provider initialization", emoji_key="test")
        
        class TestProvider(BaseProvider):
            provider_name = "test"
            
            async def initialize(self):
                return True
                
            async def generate_completion(self, prompt, **kwargs):
                return ModelResponse(
                    text="Test response",
                    model="test-model",
                    provider=self.provider_name
                )
                
            async def generate_completion_stream(self, prompt, **kwargs):
                yield "Test", {}
                
            def get_default_model(self):
                return "test-model"
        
        provider = TestProvider(api_key="test-key", test_option="value")
        
        assert provider.api_key == "test-key"
        assert provider.options == {"test_option": "value"}
        assert provider.provider_name == "test"
        
    async def test_process_with_timer(self, mock_provider: BaseProvider):
        """Test the process_with_timer utility method."""
        logger.info("Testing process_with_timer", emoji_key="test")
        
        # Create a mock async function that returns a value
        async def mock_func(arg1, arg2=None):
            return {"result": arg1 + str(arg2 or "")}
            
        # Process with timer
        result, time_taken = await mock_provider.process_with_timer(
            mock_func, "test", arg2="arg"
        )
        
        assert result == {"result": "testarg"}
        assert isinstance(time_taken, float)
        assert time_taken >= 0  # Time should be non-negative
        
    def test_model_response(self):
        """Test the ModelResponse class."""
        logger.info("Testing ModelResponse", emoji_key="test")
        
        # Create a response with minimal info
        response = ModelResponse(
            text="Test response",
            model="test-model",
            provider="test"
        )
        
        assert response.text == "Test response"
        assert response.model == "test-model"
        assert response.provider == "test"
        assert response.input_tokens == 0
        assert response.output_tokens == 0
        assert response.total_tokens == 0
        assert response.cost == 0.0  # No tokens, no cost
        
        # Create a response with token usage
        response = ModelResponse(
            text="Test response with tokens",
            model="gpt-4o",  # A model with known cost
            provider="openai",
            input_tokens=100,
            output_tokens=50
        )
        
        assert response.input_tokens == 100
        assert response.output_tokens == 50
        assert response.total_tokens == 150
        assert response.cost > 0.0  # Should have calculated a cost
        
        # Test dictionary conversion
        response_dict = response.to_dict()
        assert response_dict["text"] == "Test response with tokens"
        assert response_dict["model"] == "gpt-4o"
        assert response_dict["provider"] == "openai"
        assert response_dict["usage"]["input_tokens"] == 100
        assert response_dict["usage"]["output_tokens"] == 50
        assert response_dict["usage"]["total_tokens"] == 150
        assert "cost" in response_dict
        
    def test_get_provider_factory(self, mock_env_vars):
        """Test the get_provider factory function."""
        logger.info("Testing get_provider factory", emoji_key="test")
        
        # Test getting a provider by name
        openai_provider = get_provider(Provider.OPENAI.value)
        assert isinstance(openai_provider, OpenAIProvider)
        assert openai_provider.provider_name == Provider.OPENAI.value
        
        # Test with different provider
        anthropic_provider = get_provider(Provider.ANTHROPIC.value)
        assert isinstance(anthropic_provider, AnthropicProvider)
        assert anthropic_provider.provider_name == Provider.ANTHROPIC.value
        
        # Test with invalid provider
        with pytest.raises(ValueError):
            get_provider("invalid-provider")
            
        # Test with custom API key
        custom_key_provider = get_provider(Provider.OPENAI.value, api_key="custom-key")
        assert custom_key_provider.api_key == "custom-key"


class TestOpenAIProvider:
    """Tests for the OpenAI provider."""
    
    @pytest.fixture
    def mock_openai_responses(self) -> Dict[str, Any]:
        """Mock responses for OpenAI API."""
        # Create proper class-based mocks with attributes instead of dictionaries
        class MockCompletion:
            def __init__(self):
                self.id = "mock-completion-id"
                self.choices = [MockChoice()]
                self.usage = MockUsage()
                
        class MockChoice:
            def __init__(self):
                self.message = MockMessage()
                self.finish_reason = "stop"
                
        class MockMessage:
            def __init__(self):
                self.content = "Mock OpenAI response"
                
        class MockUsage:
            def __init__(self):
                self.prompt_tokens = 10
                self.completion_tokens = 5
                self.total_tokens = 15
                
        class MockModelsResponse:
            def __init__(self):
                self.data = [
                    type("MockModel", (), {"id": "gpt-4o", "owned_by": "openai"}),
                    type("MockModel", (), {"id": "gpt-4.1-mini", "owned_by": "openai"}),
                    type("MockModel", (), {"id": "gpt-4.1-mini", "owned_by": "openai"})
                ]
        
        return {
            "completion": MockCompletion(),
            "models": MockModelsResponse()
        }
        
    @pytest.fixture
    def mock_openai_provider(self, monkeypatch: MonkeyPatch, mock_openai_responses: Dict[str, Any]) -> OpenAIProvider:
        """Get a mock OpenAI provider with patched methods."""
        # Create the provider
        provider = OpenAIProvider(api_key="mock-openai-key")
        
        # Mock the AsyncOpenAI client methods
        class MockAsyncOpenAI:
            def __init__(self, **kwargs):
                self.kwargs = kwargs
                self.chat = MockChat()
                self.models = MockModels()
                
        class MockChat:
            def __init__(self):
                self.completions = MockCompletions()
                
        class MockCompletions:
            async def create(self, **kwargs):
                return mock_openai_responses["completion"]
                
        class MockModels:
            async def list(self):
                return mock_openai_responses["models"]
        
        # Patch the AsyncOpenAI client
        monkeypatch.setattr("openai.AsyncOpenAI", MockAsyncOpenAI)
        
        # Initialize the provider with the mock client
        provider.client = MockAsyncOpenAI(api_key="mock-openai-key")
        
        return provider
    
    async def test_initialization(self, mock_openai_provider: OpenAIProvider):
        """Test OpenAI provider initialization."""
        logger.info("Testing OpenAI provider initialization", emoji_key="test")
        
        # Initialize
        success = await mock_openai_provider.initialize()
        assert success
        assert mock_openai_provider.client is not None
        
    async def test_completion(self, mock_openai_provider: OpenAIProvider):
        """Test OpenAI completion generation."""
        logger.info("Testing OpenAI completion", emoji_key="test")
        
        # Generate completion
        result = await mock_openai_provider.generate_completion(
            prompt="Test prompt",
            model="gpt-4o",
            temperature=0.7
        )
        
        # Check result
        assert isinstance(result, ModelResponse)
        assert result.text == "Mock OpenAI response"
        assert result.model == "gpt-4o"
        assert result.provider == Provider.OPENAI.value
        assert result.input_tokens == 10
        assert result.output_tokens == 5
        assert result.total_tokens == 15
        
    async def test_list_models(self, mock_openai_provider: OpenAIProvider):
        """Test listing OpenAI models."""
        logger.info("Testing OpenAI list_models", emoji_key="test")
        
        # Initialize first
        await mock_openai_provider.initialize()
        
        # List models
        models = await mock_openai_provider.list_models()
        
        # Should return filtered list of models (chat-capable)
        assert isinstance(models, list)
        assert len(models) > 0
        
        # Check model format
        for model in models:
            assert "id" in model
            assert "provider" in model
            assert model["provider"] == Provider.OPENAI.value
            
    def test_default_model(self, mock_openai_provider: OpenAIProvider):
        """Test getting default model."""
        logger.info("Testing OpenAI default_model", emoji_key="test")
        
        # Should return a default model
        model = mock_openai_provider.get_default_model()
        assert model is not None
        assert isinstance(model, str)


class TestAnthropicProvider:
    """Tests for the Anthropic provider."""
    
    @pytest.fixture
    def mock_anthropic_responses(self) -> Dict[str, Any]:
        """Mock responses for Anthropic API."""
        class MockMessage:
            def __init__(self):
                # Content should be an array of objects with text property
                self.content = [type("ContentBlock", (), {"text": "Mock Claude response"})]
                self.usage = type("Usage", (), {"input_tokens": 20, "output_tokens": 10})
                
        return {
            "message": MockMessage()
        }
        
    @pytest.fixture
    def mock_anthropic_provider(self, monkeypatch: MonkeyPatch, mock_anthropic_responses: Dict[str, Any]) -> AnthropicProvider:
        """Get a mock Anthropic provider with patched methods."""
        # Create the provider
        provider = AnthropicProvider(api_key="mock-anthropic-key")
        
        # Mock the AsyncAnthropic client methods
        class MockAsyncAnthropic:
            def __init__(self, **kwargs):
                self.kwargs = kwargs
                self.messages = MockMessages()
                
        class MockMessages:
            async def create(self, **kwargs):
                return mock_anthropic_responses["message"]
                
            async def stream(self, **kwargs):
                class MockStream:
                    async def __aenter__(self):
                        return self
                        
                    async def __aexit__(self, exc_type, exc_val, exc_tb):
                        pass
                        
                    async def __aiter__(self):
                        yield type("MockChunk", (), {
                            "type": "content_block_delta",
                            "delta": type("MockDelta", (), {"text": "Mock streaming content"})
                        })
                        
                    async def get_final_message(self):
                        return mock_anthropic_responses["message"]
                
                return MockStream()
                
        # Patch the AsyncAnthropic client
        monkeypatch.setattr("anthropic.AsyncAnthropic", MockAsyncAnthropic)
        
        # Initialize the provider with the mock client
        provider.client = MockAsyncAnthropic(api_key="mock-anthropic-key")
        
        return provider
    
    async def test_initialization(self, mock_anthropic_provider: AnthropicProvider):
        """Test Anthropic provider initialization."""
        logger.info("Testing Anthropic provider initialization", emoji_key="test")
        
        # Initialize
        success = await mock_anthropic_provider.initialize()
        assert success
        assert mock_anthropic_provider.client is not None
        
    async def test_completion(self, mock_anthropic_provider: AnthropicProvider):
        """Test Anthropic completion generation."""
        logger.info("Testing Anthropic completion", emoji_key="test")
        
        # Generate completion
        result = await mock_anthropic_provider.generate_completion(
            prompt="Test prompt",
            model="claude-3-sonnet-20240229",
            temperature=0.7
        )
        
        # Check result
        assert isinstance(result, ModelResponse)
        assert result.text == "Mock Claude response"
        assert result.model == "claude-3-sonnet-20240229"
        assert result.provider == Provider.ANTHROPIC.value
        assert result.input_tokens == 20
        assert result.output_tokens == 10
        assert result.total_tokens == 30
        
    async def test_list_models(self, mock_anthropic_provider: AnthropicProvider):
        """Test listing Anthropic models."""
        logger.info("Testing Anthropic list_models", emoji_key="test")
        
        # Initialize first
        await mock_anthropic_provider.initialize()
        
        # List models
        models = await mock_anthropic_provider.list_models()
        
        # Should return a list of models
        assert isinstance(models, list)
        assert len(models) > 0
        
        # Check model format
        for model in models:
            assert "id" in model
            assert "provider" in model
            assert model["provider"] == Provider.ANTHROPIC.value
            
    def test_default_model(self, mock_anthropic_provider: AnthropicProvider):
        """Test getting default model."""
        logger.info("Testing Anthropic default_model", emoji_key="test")
        
        # Should return a default model
        model = mock_anthropic_provider.get_default_model()
        assert model is not None
        assert isinstance(model, str)


# Brief tests for the other providers to save space
class TestOtherProviders:
    """Brief tests for other providers."""
    
    async def test_deepseek_provider(self, monkeypatch: MonkeyPatch):
        """Test DeepSeek provider."""
        logger.info("Testing DeepSeek provider", emoji_key="test")
        
        # Mock the API client
        monkeypatch.setattr("openai.AsyncOpenAI", lambda **kwargs: type("MockClient", (), {
            "chat": type("MockChat", (), {
                "completions": type("MockCompletions", (), {
                    "create": lambda **kwargs: type("MockResponse", (), {
                        "choices": [type("MockChoice", (), {
                            "message": type("MockMessage", (), {"content": "Mock DeepSeek response"}),
                            "finish_reason": "stop"
                        })],
                        "usage": type("MockUsage", (), {
                            "prompt_tokens": 15,
                            "completion_tokens": 8,
                            "total_tokens": 23
                        })
                    })
                })
            })
        }))
        
        provider = DeepSeekProvider(api_key="mock-deepseek-key")
        assert provider.provider_name == Provider.DEEPSEEK.value
        
        # Should return a default model
        model = provider.get_default_model()
        assert model is not None
        assert isinstance(model, str)
        
    async def test_gemini_provider(self, monkeypatch: MonkeyPatch):
        """Test Gemini provider."""
        logger.info("Testing Gemini provider", emoji_key="test")
        
        # Create mock response
        mock_response = type("MockResponse", (), {
            "text": "Mock Gemini response",
            "candidates": [
                type("MockCandidate", (), {
                    "content": {
                        "parts": [{"text": "Mock Gemini response"}]
                    }
                })
            ]
        })
        
        # Mock the Google Generative AI Client
        class MockClient:
            def __init__(self, **kwargs):
                self.kwargs = kwargs
                self.models = MockModels()
                
        class MockModels:
            def generate_content(self, **kwargs):
                return mock_response
                
            def list(self):
                return [
                    {"name": "gemini-2.0-flash-lite"},
                    {"name": "gemini-2.0-pro"}
                ]
        
        # Patch the genai Client
        monkeypatch.setattr("google.genai.Client", MockClient)
        
        # Create and test the provider
        provider = GeminiProvider(api_key="mock-gemini-key")
        assert provider.provider_name == Provider.GEMINI.value
        
        # Initialize with the mock client
        await provider.initialize()
        
        # Should return a default model
        model = provider.get_default_model()
        assert model is not None
        assert isinstance(model, str)
        
        # Test completion
        result = await provider.generate_completion(
            prompt="Test prompt",
            model="gemini-2.0-pro"
        )
        
        # Check result
        assert result.text is not None
        assert "Gemini" in result.text  # Should contain "Mock Gemini response"
```
Page 4/35FirstPrevNextLast