#
tokens: 47461/50000 17/207 files (page 3/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 3 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/test_sse_client.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
SSE Test Client for Ultimate MCP Server
Tests server functionality over SSE (Server-Sent Events) transport
"""

import asyncio
import json

from fastmcp import Client


async def test_sse_server():
    """Test Ultimate MCP Server over SSE transport."""
    # SSE endpoint - note the /sse path for SSE transport
    server_url = "http://127.0.0.1:8013/sse"
    
    print("🔥 Ultimate MCP Server SSE Test Client")
    print("=" * 50)
    print(f"🔗 Connecting to Ultimate MCP Server SSE endpoint at {server_url}")
    
    try:
        async with Client(server_url) as client:
            print("✅ Successfully connected to SSE server")
            
            # Test 1: List available tools
            print("\n📋 Testing tool discovery...")
            tools = await client.list_tools()
            print(f"Found {len(tools)} tools via SSE transport:")
            for i, tool in enumerate(tools[:10]):  # Show first 10
                print(f"  {i+1:2d}. {tool.name}")
            if len(tools) > 10:
                print(f"  ... and {len(tools) - 10} more tools")
            
            # Test 2: List available resources
            print("\n📚 Testing resource discovery...")
            resources = await client.list_resources()
            print(f"Found {len(resources)} resources:")
            for resource in resources:
                print(f"  - {resource.uri}")
            
            # Test 3: Echo tool test
            print("\n🔊 Testing echo tool over SSE...")
            echo_result = await client.call_tool("echo", {"message": "Hello from SSE client!"})
            if echo_result:
                echo_data = json.loads(echo_result[0].text)
                print(f"✅ Echo response: {json.dumps(echo_data, indent=2)}")
            
            # Test 4: Provider status test
            print("\n🔌 Testing provider status over SSE...")
            try:
                provider_result = await client.call_tool("get_provider_status", {})
                if provider_result:
                    provider_data = json.loads(provider_result[0].text)
                    providers = provider_data.get("providers", {})
                    print(f"✅ Found {len(providers)} providers via SSE:")
                    for name, status in providers.items():
                        available = "✅" if status.get("available") else "❌"
                        model_count = len(status.get("models", []))
                        print(f"  {available} {name}: {model_count} models")
            except Exception as e:
                print(f"❌ Provider status failed: {e}")
            
            # Test 5: Resource reading test
            print("\n📖 Testing resource reading over SSE...")
            if resources:
                try:
                    resource_uri = resources[0].uri
                    resource_content = await client.read_resource(resource_uri)
                    if resource_content:
                        content = resource_content[0].text
                        preview = content[:200] + "..." if len(content) > 200 else content
                        print(f"✅ Resource {resource_uri} content preview:")
                        print(f"  {preview}")
                except Exception as e:
                    print(f"❌ Resource reading failed: {e}")
            
            # Test 6: Simple completion test (if providers available)
            print("\n🤖 Testing completion over SSE...")
            try:
                completion_result = await client.call_tool(
                    "generate_completion",
                    {
                        "prompt": "Say hello in exactly 3 words",
                        "provider": "ollama",
                        "model": "mix_77/gemma3-qat-tools:27b",
                        "max_tokens": 10,
                    },
                )
                if completion_result:
                    result_data = json.loads(completion_result[0].text)
                    print("✅ Completion via SSE:")
                    print(f"  Text: '{result_data.get('text', 'No text')}'")
                    print(f"  Model: {result_data.get('model', 'Unknown')}")
                    print(f"  Success: {result_data.get('success', False)}")
                    print(f"  Processing time: {result_data.get('processing_time', 0):.2f}s")
            except Exception as e:
                print(f"⚠️ Completion test failed (expected if no providers): {e}")
            
            # Test 7: Filesystem tool test
            print("\n📁 Testing filesystem tools over SSE...")
            try:
                dirs_result = await client.call_tool("list_allowed_directories", {})
                if dirs_result:
                    dirs_data = json.loads(dirs_result[0].text)
                    print(f"✅ Allowed directories via SSE: {dirs_data.get('count', 0)} directories")
            except Exception as e:
                print(f"❌ Filesystem test failed: {e}")
            
            # Test 8: Text processing tool test
            print("\n📝 Testing text processing over SSE...")
            try:
                ripgrep_result = await client.call_tool(
                    "run_ripgrep", 
                    {
                        "args_str": "'async' . -t py --max-count 5",
                        "input_dir": "."
                    }
                )
                if ripgrep_result:
                    ripgrep_data = json.loads(ripgrep_result[0].text)
                    if ripgrep_data.get("success"):
                        lines = ripgrep_data.get("output", "").split('\n')
                        line_count = len([l for l in lines if l.strip()])  # noqa: E741
                        print(f"✅ Ripgrep via SSE found {line_count} matching lines")
                    else:
                        print("⚠️ Ripgrep completed but found no matches")
            except Exception as e:
                print(f"❌ Text processing test failed: {e}")
            
            print("\n🎉 SSE transport functionality test completed!")
            return True
            
    except Exception as e:
        print(f"❌ SSE connection failed: {e}")
        print("\nTroubleshooting:")
        print("1. Make sure the server is running in SSE mode:")
        print("   umcp run -t sse")
        print("2. Verify the server is accessible at http://127.0.0.1:8013")
        print("3. Check that the SSE endpoint is available at /sse")
        return False


async def test_sse_interactive():
    """Interactive SSE testing mode."""
    server_url = "http://127.0.0.1:8013/sse"
    
    print("\n🎮 Entering SSE interactive mode...")
    print("Type 'list' to see available tools, 'quit' to exit")
    
    try:
        async with Client(server_url) as client:
            tools = await client.list_tools()
            resources = await client.list_resources()
            
            while True:
                try:
                    command = input("\nSSE> ").strip()
                    
                    if command.lower() in ['quit', 'exit', 'q']:
                        print("👋 Goodbye!")
                        break
                    elif command.lower() == 'list':
                        print("Available tools:")
                        for i, tool in enumerate(tools[:20]):
                            print(f"  {i+1:2d}. {tool.name}")
                        if len(tools) > 20:
                            print(f"  ... and {len(tools) - 20} more")
                    elif command.lower() == 'resources':
                        print("Available resources:")
                        for resource in resources:
                            print(f"  - {resource.uri}")
                    elif command.startswith("tool "):
                        # Call tool: tool <tool_name> <json_params>
                        parts = command[5:].split(' ', 1)
                        tool_name = parts[0]
                        params = json.loads(parts[1]) if len(parts) > 1 else {}
                        
                        try:
                            result = await client.call_tool(tool_name, params)
                            if result:
                                print(f"✅ Tool result: {result[0].text}")
                            else:
                                print("❌ No result returned")
                        except Exception as e:
                            print(f"❌ Tool call failed: {e}")
                    elif command.startswith("read "):
                        # Read resource: read <resource_uri>
                        resource_uri = command[5:].strip()
                        try:
                            result = await client.read_resource(resource_uri)
                            if result:
                                content = result[0].text
                                preview = content[:500] + "..." if len(content) > 500 else content
                                print(f"✅ Resource content: {preview}")
                            else:
                                print("❌ No content returned")
                        except Exception as e:
                            print(f"❌ Resource read failed: {e}")
                    else:
                        print("Commands:")
                        print("  list          - List available tools")
                        print("  resources     - List available resources")
                        print("  tool <name> <params>  - Call a tool with JSON params")
                        print("  read <uri>    - Read a resource")
                        print("  quit          - Exit interactive mode")
                
                except KeyboardInterrupt:
                    print("\n👋 Goodbye!")
                    break
                except Exception as e:
                    print(f"❌ Command error: {e}")
    
    except Exception as e:
        print(f"❌ SSE interactive mode failed: {e}")


async def main():
    """Main test function."""
    # Run basic functionality test
    success = await test_sse_server()
    
    if success:
        # Ask if user wants interactive mode
        try:
            response = input("\nWould you like to enter SSE interactive mode? (y/n): ").strip().lower()
            if response in ['y', 'yes']:
                await test_sse_interactive()
        except KeyboardInterrupt:
            print("\n👋 Goodbye!")


if __name__ == "__main__":
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Gateway Logging Package.

This package provides enhanced logging capabilities with rich formatting,
progress tracking, and console output for the Gateway system.
"""

import logging
import logging.handlers
from typing import Any, Dict, List, Optional

# Import Rich-based console
# Adjusted imports to be relative within the new structure
from .console import (
    console,
    create_progress,
    live_display,
    print_json,
    print_panel,
    print_syntax,
    print_table,
    print_tree,
    status,
)

# Import emojis
from .emojis import (
    COMPLETED,
    CRITICAL,
    DEBUG,
    ERROR,
    FAILED,
    INFO,
    RUNNING,
    SUCCESS,
    WARNING,
    get_emoji,
)

# Import formatters and handlers
from .formatter import (
    DetailedLogFormatter,
    GatewayLogRecord,
    RichLoggingHandler,
    SimpleLogFormatter,
    create_rich_console_handler,  # Added missing import used in server.py LOGGING_CONFIG
)

# Import logger and related utilities
from .logger import (
    Logger,
    critical,
    debug,
    error,
    info,
    section,
    success,
    warning,
)

# Import panels
from .panels import (
    CodePanel,
    ErrorPanel,
    HeaderPanel,
    InfoPanel,
    ResultPanel,
    ToolOutputPanel,
    WarningPanel,
    display_code,
    display_error,
    display_header,
    display_info,
    display_results,
    display_tool_output,
    display_warning,
)

# Import progress tracking
from .progress import (
    GatewayProgress,
    track,
)

# Create a global logger instance for importing
logger = Logger("ultimate")

# Removed configure_root_logger, initialize_logging, set_log_level functions
# Logging is now configured via dictConfig in main.py (or server.py equivalent)

def get_logger(name: str) -> Logger:
    """
    Get or create a specialized Logger instance for a specific component.
    
    This function provides access to the enhanced logging system of the Ultimate MCP Server,
    returning a Logger instance that includes rich formatting, emoji support, and other
    advanced features beyond Python's standard logging.
    
    The returned Logger is configured with the project's logging settings and integrates
    with the rich console output system. It provides methods like success() and section()
    in addition to standard logging methods.
    
    Args:
        name: The logger name, typically the module or component name.
             Can use dot notation for hierarchy (e.g., "module.submodule").
    
    Returns:
        An enhanced Logger instance with rich formatting and emoji support
    
    Example:
        ```python
        # In a module file
        from ultimate_mcp_server.utils.logging import get_logger
        
        # Create logger with the module name
        logger = get_logger(__name__)
        
        # Use the enhanced logging methods
        logger.info("Server starting")                  # Basic info log
        logger.success("Operation completed")           # Success log (not in std logging)
        logger.warning("Resource low", resource="RAM")  # With additional context
        logger.error("Failed to connect", emoji_key="network")  # With custom emoji
        ```
    """
    # Use the new base name for sub-loggers if needed, or keep original logic
    # return Logger(f"ultimate_mcp_server.{name}") # Option 1: Prefix with base name
    return Logger(name) # Option 2: Keep original name logic

def capture_logs(level: Optional[str] = None) -> "LogCapture":
    """
    Create a context manager to capture logs for testing or debugging.
    
    This function is a convenience wrapper around the LogCapture class, creating
    and returning a context manager that will capture logs at or above the specified
    level during its active scope.
    
    Use this function when you need to verify that certain log messages are emitted
    during tests, or when you want to collect logs for analysis without modifying
    the application's logging configuration.
    
    Args:
        level: Minimum log level to capture (e.g., "INFO", "WARNING", "ERROR").
               If None, all log levels are captured. Default: None
    
    Returns:
        A LogCapture context manager that will collect logs when active
    
    Example:
        ```python
        # Test that a function produces expected log messages
        def test_login_function():
            with capture_logs("WARNING") as logs:
                # Call function that should produce a warning log for invalid login
                result = login("invalid_user", "wrong_password")
                
                # Assert that the expected warning was logged
                assert logs.contains("Invalid login attempt")
                assert len(logs.get_logs()) == 1
        ```
    """
    return LogCapture(level)

# Log capturing for testing
class LogCapture:
    """
    Context manager for capturing and analyzing logs during execution.
    
    This class provides a way to intercept, store, and analyze logs emitted during
    a specific block of code execution. It's primarily useful for:
    
    - Testing: Verify that specific log messages were emitted during tests
    - Debugging: Collect logs for examination without changing logging configuration
    - Analysis: Gather statistics about logging patterns
    
    The LogCapture acts as a context manager, capturing logs only within its scope
    and providing methods to retrieve and analyze the captured logs after execution.
    
    Each captured log entry is stored as a dictionary with details including the
    message, level, timestamp, and source file/line information.
    
    Example usage:
        ```python
        # Capture all logs
        with LogCapture() as capture:
            # Code that generates logs
            perform_operation()
            
            # Check for specific log messages
            assert capture.contains("Database connected")
            assert not capture.contains("Error")
            
            # Get all captured logs
            all_logs = capture.get_logs()
            
            # Get only warning and error messages
            warnings = capture.get_logs(level="WARNING")
        ```
    """
    
    def __init__(self, level: Optional[str] = None):
        """Initialize the log capture.
        
        Args:
            level: Minimum log level to capture
        """
        self.level = level
        self.level_num = getattr(logging, self.level.upper(), 0) if self.level else 0
        self.logs: List[Dict[str, Any]] = []
        self.handler = self._create_handler()
    
    def _create_handler(self) -> logging.Handler:
        """Create a handler to capture logs.
        
        Returns:
            Log handler
        """
        class CaptureHandler(logging.Handler):
            def __init__(self, capture):
                super().__init__()
                self.capture = capture
            
            def emit(self, record):
                # Skip if record level is lower than minimum
                if record.levelno < self.capture.level_num:
                    return
                
                # Add log record to captured logs
                self.capture.logs.append({
                    "level": record.levelname,
                    "message": record.getMessage(),
                    "name": record.name,
                    "time": record.created,
                    "file": record.pathname,
                    "line": record.lineno,
                })
        
        return CaptureHandler(self)
    
    def __enter__(self) -> "LogCapture":
        """Enter the context manager.
        
        Returns:
            Self
        """
        # Add handler to root logger
        # Use the project's logger name
        logging.getLogger("ultimate").addHandler(self.handler)
        # Consider adding to the absolute root logger as well if needed
        # logging.getLogger().addHandler(self.handler) 
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit the context manager.
        
        Args:
            exc_type: Exception type
            exc_val: Exception value
            exc_tb: Exception traceback
        """
        # Remove handler from root logger
        logging.getLogger("ultimate").removeHandler(self.handler)
        # logging.getLogger().removeHandler(self.handler)
    
    def get_logs(self, level: Optional[str] = None) -> List[Dict[str, Any]]:
        """Get captured logs, optionally filtered by level.
        
        Args:
            level: Filter logs by level
            
        Returns:
            List of log records
        """
        if not level:
            return self.logs
        
        level_num = getattr(logging, level.upper(), 0)
        return [log for log in self.logs if getattr(logging, log["level"], 0) >= level_num]
    
    def get_messages(self, level: Optional[str] = None) -> List[str]:
        """Get captured log messages, optionally filtered by level.
        
        Args:
            level: Filter logs by level
            
        Returns:
            List of log messages
        """
        return [log["message"] for log in self.get_logs(level)]
    
    def contains(self, text: str, level: Optional[str] = None) -> bool:
        """Check if any log message contains the given text.
        
        Args:
            text: Text to search for
            level: Optional level filter
            
        Returns:
            True if text is found in any message
        """
        return any(text in msg for msg in self.get_messages(level))

__all__ = [
    # Console
    "console",
    "create_progress",
    "status",
    "print_panel",
    "print_syntax",
    "print_table",
    "print_tree",
    "print_json",
    "live_display",
    
    # Logger and utilities
    "logger",
    "Logger",
    "debug",
    "info",
    "success",
    "warning",
    "error",
    "critical",
    "section",
    "get_logger",
    "capture_logs",
    "LogCapture",
    
    # Emojis
    "get_emoji",
    "INFO",
    "DEBUG",
    "WARNING",
    "ERROR",
    "CRITICAL",
    "SUCCESS",
    "RUNNING",
    "COMPLETED",
    "FAILED",
    
    # Panels
    "HeaderPanel",
    "ResultPanel",
    "InfoPanel",
    "WarningPanel",
    "ErrorPanel",
    "ToolOutputPanel",
    "CodePanel",
    "display_header",
    "display_results",
    "display_info",
    "display_warning",
    "display_error",
    "display_tool_output",
    "display_code",
    
    # Progress tracking
    "GatewayProgress",
    "track",
    
    # Formatters and handlers
    "GatewayLogRecord",
    "SimpleLogFormatter",
    "DetailedLogFormatter",
    "RichLoggingHandler",
    "create_rich_console_handler",
] 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/vector/embeddings.py:
--------------------------------------------------------------------------------

```python
"""Embedding generation service for vector operations."""
import asyncio
import hashlib
import os
from pathlib import Path
from typing import List, Optional

import numpy as np
from openai import AsyncOpenAI

from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)

# Global dictionary to store embedding instances (optional)
embedding_instances = {}


class EmbeddingCache:
    """Cache for embeddings to avoid repeated API calls."""
    
    def __init__(self, cache_dir: Optional[str] = None):
        """Initialize the embedding cache.
        
        Args:
            cache_dir: Directory to store cache files
        """
        if cache_dir:
            self.cache_dir = Path(cache_dir)
        else:
            self.cache_dir = Path.home() / ".ultimate" / "embeddings"
            
        # Create cache directory if it doesn't exist
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        
        # In-memory cache
        self.cache = {}
        
        logger.info(
            f"Embeddings cache initialized (directory: {self.cache_dir})",
            emoji_key="cache"
        )
        
    def _get_cache_key(self, text: str, model: str) -> str:
        """Generate a cache key for text and model.
        
        Args:
            text: Text to embed
            model: Embedding model name
            
        Returns:
            Cache key
        """
        # Create a hash based on text and model
        text_hash = hashlib.md5(text.encode("utf-8")).hexdigest()
        return f"{model}_{text_hash}"
        
    def _get_cache_file_path(self, key: str) -> Path:
        """Get cache file path for a key.
        
        Args:
            key: Cache key
            
        Returns:
            Cache file path
        """
        return self.cache_dir / f"{key}.npy"
        
    def get(self, text: str, model: str) -> Optional[np.ndarray]:
        """Get embedding from cache.
        
        Args:
            text: Text to embed
            model: Embedding model name
            
        Returns:
            Cached embedding or None if not found
        """
        key = self._get_cache_key(text, model)
        
        # Check in-memory cache first
        if key in self.cache:
            return self.cache[key]
            
        # Check disk cache
        cache_file = self._get_cache_file_path(key)
        if cache_file.exists():
            try:
                embedding = np.load(str(cache_file))
                # Add to in-memory cache
                self.cache[key] = embedding
                return embedding
            except Exception as e:
                logger.error(
                    f"Failed to load embedding from cache: {str(e)}",
                    emoji_key="error"
                )
                
        return None
        
    def set(self, text: str, model: str, embedding: np.ndarray) -> None:
        """Set embedding in cache.
        
        Args:
            text: Text to embed
            model: Embedding model name
            embedding: Embedding vector
        """
        key = self._get_cache_key(text, model)
        
        # Add to in-memory cache
        self.cache[key] = embedding
        
        # Save to disk
        cache_file = self._get_cache_file_path(key)
        try:
            np.save(str(cache_file), embedding)
        except Exception as e:
            logger.error(
                f"Failed to save embedding to cache: {str(e)}",
                emoji_key="error"
            )
            
    def clear(self) -> None:
        """Clear the embedding cache."""
        # Clear in-memory cache
        self.cache.clear()
        
        # Clear disk cache
        for cache_file in self.cache_dir.glob("*.npy"):
            try:
                cache_file.unlink()
            except Exception as e:
                logger.error(
                    f"Failed to delete cache file {cache_file}: {str(e)}",
                    emoji_key="error"
                )
                
        logger.info(
            "Embeddings cache cleared",
            emoji_key="cache"
        )


class EmbeddingService:
    """Generic service to create embeddings using different providers."""
    def __init__(self, provider_type: str = 'openai', model_name: str = 'text-embedding-3-small', api_key: Optional[str] = None, **kwargs):
        """Initialize the embedding service.

        Args:
            provider_type: The type of embedding provider (e.g., 'openai').
            model_name: The specific embedding model to use.
            api_key: Optional API key. If not provided, attempts to load from config.
            **kwargs: Additional provider-specific arguments.
        """
        self.provider_type = provider_type.lower()
        self.model_name = model_name
        self.client = None
        self.api_key = api_key
        self.kwargs = kwargs

        try:
            config = get_config()
            if self.provider_type == 'openai':
                provider_config = config.providers.openai
                # Use provided key first, then config key
                self.api_key = self.api_key or provider_config.api_key
                if not self.api_key:
                    raise ValueError("OpenAI API key not provided or found in configuration.")
                # Pass base_url and organization from config if available
                openai_kwargs = {
                    'api_key': self.api_key,
                    'base_url': provider_config.base_url or self.kwargs.get('base_url'),
                    'organization': provider_config.organization or self.kwargs.get('organization'),
                    'timeout': provider_config.timeout or self.kwargs.get('timeout'),
                }
                # Filter out None values before passing to OpenAI client
                openai_kwargs = {k: v for k, v in openai_kwargs.items() if v is not None}
                
                # Always use AsyncOpenAI
                self.client = AsyncOpenAI(**openai_kwargs)
                logger.info(f"Initialized AsyncOpenAI embedding client for model: {self.model_name}")

            else:
                raise ValueError(f"Unsupported embedding provider type: {self.provider_type}")

        except Exception as e:
            logger.error(f"Failed to initialize embedding service for provider {self.provider_type}: {e}", exc_info=True)
            raise RuntimeError(f"Embedding service initialization failed: {e}") from e


    async def create_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Create embeddings for a list of texts.

        Args:
            texts: A list of strings to embed.

        Returns:
            A list of embedding vectors (each a list of floats).

        Raises:
            ValueError: If the provider type is unsupported or embedding fails.
            RuntimeError: If the client is not initialized.
        """
        if self.client is None:
            raise RuntimeError("Embedding client is not initialized.")
        
        try:
            if self.provider_type == 'openai':
                response = await self.client.embeddings.create(
                    input=texts,
                    model=self.model_name
                )
                # Extract the embedding data
                embeddings = [item.embedding for item in response.data]
                logger.debug(f"Successfully created {len(embeddings)} embeddings using {self.model_name}.")
                return embeddings

            else:
                raise ValueError(f"Unsupported provider type: {self.provider_type}")

        except Exception as e:
            logger.error(f"Failed to create embeddings using {self.provider_type} model {self.model_name}: {e}", exc_info=True)
            # Re-raise the error or return an empty list/handle appropriately
            raise ValueError(f"Embedding creation failed: {e}") from e


def get_embedding_service(provider_type: str = 'openai', model_name: str = 'text-embedding-3-small', **kwargs) -> EmbeddingService:
    """Factory function to get or create an EmbeddingService instance.

    Args:
        provider_type: The type of embedding provider.
        model_name: The specific embedding model.
        **kwargs: Additional arguments passed to the EmbeddingService constructor.

    Returns:
        An initialized EmbeddingService instance.
    """
    # Optional: Implement caching/singleton pattern for instances if desired
    instance_key = (provider_type, model_name)
    if instance_key in embedding_instances:
        # TODO: Check if kwargs match cached instance? For now, assume they do.
        logger.debug(f"Returning cached embedding service instance for {provider_type}/{model_name}")
        return embedding_instances[instance_key]
    else:
        logger.debug(f"Creating new embedding service instance for {provider_type}/{model_name}")
        instance = EmbeddingService(provider_type=provider_type, model_name=model_name, **kwargs)
        embedding_instances[instance_key] = instance
        return instance


# Example usage (for testing)
async def main():
    # setup_logging(log_level="DEBUG") # Removed as logging is configured centrally
    # Make sure OPENAI_API_KEY is set in your .env file or environment
    os.environ['GATEWAY_FORCE_CONFIG_RELOAD'] = 'true' # Ensure latest config

    try:
        # Get the default OpenAI service
        openai_service = get_embedding_service()

        texts_to_embed = [
            "The quick brown fox jumps over the lazy dog.",
            "Quantum computing leverages quantum mechanics.",
            "Paris is the capital of France."
        ]

        embeddings = await openai_service.create_embeddings(texts_to_embed)
        print(f"Generated {len(embeddings)} embeddings.")
        print(f"Dimension of first embedding: {len(embeddings[0])}")
        # print(f"First embedding (preview): {embeddings[0][:10]}...")

        # Example of specifying a different model (if available and configured)
        # try:
        #     ada_service = get_embedding_service(model_name='text-embedding-ada-002')
        #     ada_embeddings = await ada_service.create_embeddings(["Test with Ada model"]) 
        #     print(\"\nSuccessfully used Ada model.\")
        # except Exception as e:
        #     print(f\"\nCould not use Ada model (may need different API key/config): {e}\")

    except Exception as e:
        print(f"An error occurred during the example: {e}")
    finally:
        if 'GATEWAY_FORCE_CONFIG_RELOAD' in os.environ:
             del os.environ['GATEWAY_FORCE_CONFIG_RELOAD']

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
```

--------------------------------------------------------------------------------
/examples/marqo_fused_search_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""Demo script showcasing the marqo_fused_search tool."""

import asyncio
import json
import os
import sys
import time  # Add time import
from datetime import datetime, timedelta
from typing import Any, Dict, Optional

# Add Rich imports
from rich.console import Console
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax

# Add the project root to the Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, project_root)

from ultimate_mcp_server.tools.marqo_fused_search import DateRange, marqo_fused_search  # noqa: E402
from ultimate_mcp_server.utils.logging import logger  # noqa: E402

# Initialize Rich Console
console = Console()

# --- Configuration ---
CONFIG_FILE_PATH = os.path.join(project_root, "marqo_index_config.json")

def load_marqo_config() -> Dict[str, Any]:
    """Loads Marqo configuration from the JSON file."""
    try:
        with open(CONFIG_FILE_PATH, 'r') as f:
            config = json.load(f)
            logger.info(f"Loaded Marqo config from {CONFIG_FILE_PATH}")
            return config
    except FileNotFoundError:
        logger.error(f"Marqo config file not found at {CONFIG_FILE_PATH}. Cannot run dynamic examples.")
        return {}
    except json.JSONDecodeError as e:
        logger.error(f"Error decoding Marqo config file {CONFIG_FILE_PATH}: {e}")
        return {}

def find_schema_field(schema: Dict[str, Any], required_properties: Dict[str, Any]) -> Optional[str]:
    """
    Finds the first field name in the schema that matches all required properties.
    Handles nested properties like 'type'.
    """
    if not schema or "fields" not in schema:
        return None

    for field_name, properties in schema["fields"].items():
        match = True
        for req_prop, req_value in required_properties.items():
            # Allow checking properties like 'type', 'filterable', 'sortable', 'role', 'searchable'
            if properties.get(req_prop) != req_value:
                match = False
                break
        if match:
            # Avoid returning internal fields like _id unless specifically requested
            if field_name == "_id" and required_properties.get("role") != "internal":
                continue
            return field_name
    return None

# --- Helper Function ---
async def run_search_example(example_name: str, **kwargs):
    """Runs a single search example and prints the results using Rich."""
    console.print(Rule(f"[bold cyan]{example_name}[/bold cyan]"))
    
    # Display parameters using a panel
    param_str_parts = []
    for key, value in kwargs.items():
        # Format DateRange nicely
        if isinstance(value, DateRange):
            start_str = value.start_date.strftime("%Y-%m-%d") if value.start_date else "N/A"
            end_str = value.end_date.strftime("%Y-%m-%d") if value.end_date else "N/A"
            param_str_parts.append(f"  [green]{key}[/green]: Start=[yellow]{start_str}[/yellow], End=[yellow]{end_str}[/yellow]")
        else:
            param_str_parts.append(f"  [green]{key}[/green]: [yellow]{escape(str(value))}[/yellow]")
    param_str = "\n".join(param_str_parts)
    console.print(Panel(param_str, title="Search Parameters", border_style="blue", expand=False))

    try:
        start_time = time.time() # Use time for accurate timing
        results = await marqo_fused_search(**kwargs)
        processing_time = time.time() - start_time

        logger.debug(f"Raw results for '{example_name}': {results}") # Keep debug log

        if results.get("success"):
            logger.success(f"Search successful for '{example_name}'! ({processing_time:.3f}s)", emoji_key="success")

            # Display results using Rich Syntax for JSON
            results_json = json.dumps(results, indent=2, default=str)
            syntax = Syntax(results_json, "json", theme="default", line_numbers=True)
            console.print(Panel(syntax, title="Marqo Search Results", border_style="green"))

        else:
            # Display error nicely if success is False but no exception was raised
            error_msg = results.get("error", "Unknown error")
            error_code = results.get("error_code", "UNKNOWN_CODE")
            logger.error(f"Search failed for '{example_name}': {error_code} - {error_msg}", emoji_key="error")
            console.print(Panel(f"[bold red]Error ({error_code}):[/bold red]\n{escape(error_msg)}", title="Search Failed", border_style="red"))

    except Exception as e:
        processing_time = time.time() - start_time
        logger.error(f"An exception occurred during '{example_name}' ({processing_time:.3f}s): {e}", emoji_key="critical", exc_info=True)
        # Display exception using Rich traceback
        console.print_exception(show_locals=False)
        console.print(Panel(f"[bold red]Exception:[/bold red]\n{escape(str(e))}", title="Execution Error", border_style="red"))
        
    console.print() # Add space after each example


# --- Main Demo Function ---
async def main():
    """Runs various demonstrations of the marqo_fused_search tool."""

    # Load Marqo configuration and schema
    marqo_config = load_marqo_config()
    if not marqo_config:
        logger.error("Exiting demo as Marqo config could not be loaded.")
        return

    schema = marqo_config.get("default_schema", {})
    tensor_field = schema.get("tensor_field")
    # content_field = schema.get("default_content_field", "content") # Not directly used in examples
    date_field = schema.get("default_date_field") # Used for date range

    # --- Find suitable fields dynamically ---
    # For filter examples (keyword preferred)
    filter_field = find_schema_field(schema, {"filterable": True, "type": "keyword"}) or \
                   find_schema_field(schema, {"filterable": True}) # Fallback to any filterable

    # For lexical search (requires searchable='lexical')
    lexical_field_1 = find_schema_field(schema, {"searchable": "lexical"})
    lexical_field_2 = find_schema_field(schema, {"searchable": "lexical", "field_name_not": lexical_field_1}) or lexical_field_1 # Find a second one if possible

    # For hybrid search (need tensor + lexical)
    hybrid_tensor_field = tensor_field # Use the main tensor field
    hybrid_lexical_field_1 = lexical_field_1
    hybrid_lexical_field_2 = lexical_field_2

    # For explicit tensor search (need tensor field)
    explicit_tensor_field = tensor_field

    logger.info("Dynamically determined fields for examples:")
    logger.info(f"  Filter Field: '{filter_field}'")
    logger.info(f"  Lexical Fields: '{lexical_field_1}', '{lexical_field_2}'")
    logger.info(f"  Tensor Field (for hybrid/explicit): '{hybrid_tensor_field}'")
    logger.info(f"  Date Field (for range): '{date_field}'")

    # --- Run Examples --- 

    # --- Example 1: Basic Semantic Search --- (No specific fields needed)
    await run_search_example(
        "Basic Semantic Search",
        query="impact of AI on software development"
    )

    # --- Example 2: Search with Metadata Filter ---
    if filter_field:
        # Use a plausible value; specific value might not exist in data
        example_filter_value = "10-K" if filter_field == "form_type" else "example_value"
        await run_search_example(
            "Search with Metadata Filter",
            query="latest advancements in renewable energy",
            filters={filter_field: example_filter_value}
        )
    else:
        logger.warning("Skipping Example 2: No suitable filterable field found in schema.")

    # --- Example 3: Search with Multiple Filter Values (OR condition) ---
    if filter_field:
        # Use plausible values
        example_filter_values = ["10-K", "10-Q"] if filter_field == "form_type" else ["value1", "value2"]
        await run_search_example(
            "Search with Multiple Filter Values (OR)",
            query="financial report analysis",
            filters={filter_field: example_filter_values}
        )
    else:
        logger.warning("Skipping Example 3: No suitable filterable field found in schema.")

    # --- Example 4: Search with Date Range ---
    if date_field and find_schema_field(schema, {"name": date_field, "type": "timestamp"}):
        start_date = datetime.now() - timedelta(days=900)
        end_date = datetime.now() - timedelta(days=30)
        await run_search_example(
            "Search with Date Range",
            query="market trends",
            date_range=DateRange(start_date=start_date, end_date=end_date)
        )
    else:
        logger.warning(f"Skipping Example 4: No sortable timestamp field named '{date_field}' (default_date_field) found in schema.")

    # --- Example 5: Pure Lexical Search --- (Relies on schema having lexical fields)
    # The tool will auto-detect lexical fields if not specified, but this tests the weight
    await run_search_example(
        "Pure Lexical Search",
        query="exact sciences", # Query likely to hit company name etc.
        semantic_weight=0.0
    )

    # --- Example 6: Hybrid Search with Custom Weight --- (Relies on schema having both)
    await run_search_example(
        "Hybrid Search with Custom Weight",
        query="balancing innovation and regulation",
        semantic_weight=0.5 # Equal weight
    )

    # --- Example 7: Pagination (Limit and Offset) --- (No specific fields needed)
    await run_search_example(
        "Pagination (Limit and Offset)",
        query="common programming paradigms",
        limit=10,
        offset=10
    )

    # --- Example 8: Explicit Searchable Attributes (Tensor Search) ---
    if explicit_tensor_field:
        await run_search_example(
            "Explicit Tensor Searchable Attributes",
            query="neural network architectures",
            searchable_attributes=[explicit_tensor_field],
            semantic_weight=1.0 # Ensure tensor search is used
        )
    else:
        logger.warning("Skipping Example 8: No tensor field found in schema.")

    # --- Example 9: Explicit Hybrid Search Attributes ---
    if hybrid_tensor_field and hybrid_lexical_field_1:
        lexical_fields = [hybrid_lexical_field_1]
        if hybrid_lexical_field_2 and hybrid_lexical_field_1 != hybrid_lexical_field_2:
             lexical_fields.append(hybrid_lexical_field_2)
        await run_search_example(
            "Explicit Hybrid Search Attributes",
            query="machine learning applications in healthcare",
            hybrid_search_attributes={
                "tensor": [hybrid_tensor_field],
                "lexical": lexical_fields
            },
            semantic_weight=0.6 # Specify hybrid search balance
        )
    else:
        logger.warning("Skipping Example 9: Need both tensor and lexical fields defined in schema.")

    # --- Example 12: Overriding Marqo URL and Index Name --- (Keep commented out)
    # ... rest of the code ...
    console.print(Rule("[bold magenta]Marqo Fused Search Demo Complete[/bold magenta]"))


if __name__ == "__main__":
    console.print(Rule("[bold magenta]Starting Marqo Fused Search Demo[/bold magenta]"))
    # logger.info("Starting Marqo Fused Search Demo...") # Replaced by Rich rule
    asyncio.run(main())
    # logger.info("Marqo Fused Search Demo finished.") # Replaced by Rich rule 
```

--------------------------------------------------------------------------------
/examples/claude_integration_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Claude integration demonstration using Ultimate MCP Server."""
import asyncio
import sys
import time
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

# Third-party imports
# These imports need to be below sys.path modification, which is why they have noqa comments
from rich import box  # noqa: E402
from rich.markup import escape  # noqa: E402
from rich.panel import Panel  # noqa: E402
from rich.rule import Rule  # noqa: E402
from rich.table import Table  # noqa: E402

# Project imports
from ultimate_mcp_server.constants import Provider  # noqa: E402
from ultimate_mcp_server.core.server import Gateway  # noqa: E402
from ultimate_mcp_server.utils import get_logger  # noqa: E402
from ultimate_mcp_server.utils.display import CostTracker  # Import CostTracker
from ultimate_mcp_server.utils.logging.console import console  # noqa: E402

# Initialize logger
logger = get_logger("example.claude_integration_demo")


async def compare_claude_models(tracker: CostTracker):
    """Compare different Claude models."""
    console.print(Rule("[bold blue]Claude Model Comparison[/bold blue]"))
    logger.info("Starting Claude models comparison", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("claude-demo", register_tools=False)
    
    # Initialize providers
    logger.info("Initializing providers...", emoji_key="provider")
    await gateway._initialize_providers()
    
    provider_name = Provider.ANTHROPIC.value
    try:
        # Get the provider from the gateway
        provider = gateway.providers.get(provider_name)
        if not provider:
            logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
            return
        
        logger.info(f"Using provider: {provider_name}", emoji_key="provider")
        
        models = await provider.list_models()
        model_names = [m["id"] for m in models] # Extract names from model dictionaries
        console.print(f"Found {len(model_names)} Claude models: [cyan]{escape(str(model_names))}[/cyan]")
        
        # Select specific models to compare (Ensure these are valid and available)
        claude_models = [
            "anthropic/claude-3-7-sonnet-20250219", 
            "anthropic/claude-3-5-haiku-20241022"
        ]
        # Filter based on available models
        models_to_compare = [m for m in claude_models if m in model_names]
        if not models_to_compare:
            logger.error("None of the selected models for comparison are available. Exiting comparison.", emoji_key="error")
            console.print("[red]Selected models not found in available list.[/red]")
            return
        console.print(f"Comparing models: [yellow]{escape(str(models_to_compare))}[/yellow]")
        
        prompt = """
        Explain the concept of quantum entanglement in a way that a high school student would understand.
        Keep your response brief and accessible.
        """
        console.print(f"[cyan]Using Prompt:[/cyan] {escape(prompt.strip())[:100]}...")
        
        results_data = []
        
        for model_name in models_to_compare:
            try:
                logger.info(f"Testing model: {model_name}", emoji_key="model")
                start_time = time.time()
                result = await provider.generate_completion(
                    prompt=prompt,
                    model=model_name,
                    temperature=0.3,
                    max_tokens=300
                )
                processing_time = time.time() - start_time
                
                # Track the cost
                tracker.add_call(result)
                
                results_data.append({
                    "model": model_name,
                    "text": result.text,
                    "tokens": {
                        "input": result.input_tokens,
                        "output": result.output_tokens,
                        "total": result.total_tokens
                    },
                    "cost": result.cost,
                    "time": processing_time
                })
                
                logger.success(
                    f"Completion for {model_name} successful",
                    emoji_key="success",
                    # Tokens/cost/time logged implicitly by storing in results_data
                )
                
            except Exception as e:
                logger.error(f"Error testing model {model_name}: {str(e)}", emoji_key="error", exc_info=True)
                # Optionally add an error entry to results_data if needed
        
        # Display comparison results using Rich
        if results_data:
            console.print(Rule("[bold green]Comparison Results[/bold green]"))
            
            for result_item in results_data:
                model = result_item["model"]
                time_s = result_item["time"]
                tokens = result_item.get("tokens", {}).get("total", 0)
                tokens_per_second = tokens / time_s if time_s > 0 else 0
                cost = result_item.get("cost", 0.0)
                text = result_item.get("text", "[red]Error generating response[/red]").strip()

                stats_line = (
                    f"Time: [yellow]{time_s:.2f}s[/yellow] | "
                    f"Tokens: [cyan]{tokens}[/cyan] | "
                    f"Speed: [blue]{tokens_per_second:.1f} tok/s[/blue] | "
                    f"Cost: [green]${cost:.6f}[/green]"
                )
                
                console.print(Panel(
                    escape(text),
                    title=f"[bold magenta]{escape(model)}[/bold magenta]",
                    subtitle=stats_line,
                    border_style="blue",
                    expand=False
                ))
            console.print()
        
    except Exception as e:
        logger.error(f"Error in model comparison: {str(e)}", emoji_key="error", exc_info=True)
        # Optionally re-raise or handle differently


async def demonstrate_system_prompt(tracker: CostTracker):
    """Demonstrate Claude with system prompts."""
    console.print(Rule("[bold blue]Claude System Prompt Demonstration[/bold blue]"))
    logger.info("Demonstrating Claude with system prompts", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("claude-demo", register_tools=False)
    
    # Initialize providers
    logger.info("Initializing providers...", emoji_key="provider")
    await gateway._initialize_providers()
    
    provider_name = Provider.ANTHROPIC.value
    try:
        # Get the provider from the gateway
        provider = gateway.providers.get(provider_name)
        if not provider:
            logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
            return
        
        # Use a fast Claude model (ensure it's available)
        model = "anthropic/claude-3-5-haiku-20241022"
        available_models = await provider.list_models()
        if model not in [m["id"] for m in available_models]:
            logger.warning(f"Model {model} not available, falling back to default.", emoji_key="warning")
            model = provider.get_default_model()
            if not model:
                 logger.error("No suitable Claude model found for system prompt demo.", emoji_key="error")
                 return
        logger.info(f"Using model: {model}", emoji_key="model")
        
        system_prompt = """
You are a helpful assistant with expertise in physics.
Keep all explanations accurate but very concise.
Always provide real-world examples to illustrate concepts.
"""
        user_prompt = "Explain the concept of gravity."
        
        logger.info("Generating completion with system prompt", emoji_key="processing")
        
        result = await provider.generate_completion(
            prompt=user_prompt,
            model=model,
            temperature=0.7,
            system=system_prompt,
            max_tokens=1000 # Increased max_tokens
        )
        
        # Track the cost
        tracker.add_call(result)
        
        logger.success("Completion with system prompt successful", emoji_key="success")
        
        # Display result using Rich Panels
        console.print(Panel(
            escape(system_prompt.strip()),
            title="[bold cyan]System Prompt[/bold cyan]",
            border_style="dim cyan",
            expand=False
        ))
        console.print(Panel(
            escape(user_prompt.strip()),
            title="[bold yellow]User Prompt[/bold yellow]",
            border_style="dim yellow",
            expand=False
        ))
        console.print(Panel(
            escape(result.text.strip()),
            title="[bold green]Claude Response[/bold green]",
            border_style="green",
            expand=False
        ))
        
        # Display stats in a small table
        stats_table = Table(title="Execution Stats", show_header=False, box=box.MINIMAL, expand=False)
        stats_table.add_column("Metric", style="cyan")
        stats_table.add_column("Value", style="white")
        stats_table.add_row("Input Tokens", str(result.input_tokens))
        stats_table.add_row("Output Tokens", str(result.output_tokens))
        stats_table.add_row("Cost", f"${result.cost:.6f}")
        stats_table.add_row("Processing Time", f"{result.processing_time:.3f}s")
        console.print(stats_table)
        console.print()
        
    except Exception as e:
        logger.error(f"Error in system prompt demonstration: {str(e)}", emoji_key="error", exc_info=True)
        # Optionally re-raise or handle differently


async def explore_claude_models():
    """Display available Claude models."""
    console.print(Rule("[bold cyan]Available Claude Models[/bold cyan]"))
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("claude-demo", register_tools=False)
    
    # Initialize providers
    logger.info("Initializing providers...", emoji_key="provider")
    await gateway._initialize_providers()
    
    # Get provider from the gateway
    provider = gateway.providers.get(Provider.ANTHROPIC.value)
    if not provider:
        logger.error(f"Provider {Provider.ANTHROPIC.value} not available or initialized", emoji_key="error")
        return
    
    # Get list of available models
    models = await provider.list_models()
    model_names = [m["id"] for m in models] # Extract names from model dictionaries
    console.print(f"Found {len(model_names)} Claude models: [cyan]{escape(str(model_names))}[/cyan]")


async def main():
    """Run Claude integration examples."""
    tracker = CostTracker() # Instantiate tracker here
    try:
        # Run model comparison
        await compare_claude_models(tracker) # Pass tracker
        
        console.print() # Add space between sections
        
        # Run system prompt demonstration
        await demonstrate_system_prompt(tracker) # Pass tracker
        
        # Run explore Claude models
        await explore_claude_models()

        # Display final summary
        tracker.display_summary(console) # Display summary at the end
        
    except Exception as e:
        logger.critical(f"Example failed: {str(e)}", emoji_key="critical", exc_info=True)
        return 1
    
    logger.success("Claude Integration Demo Finished Successfully!", emoji_key="complete")
    return 0


if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
```

--------------------------------------------------------------------------------
/comprehensive_test.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Comprehensive test script for Ultimate MCP Server
Tests specific tools and REST API endpoints
"""

import asyncio
import json

import aiohttp
from fastmcp import Client


async def test_mcp_interface():
    """Test the MCP interface functionality."""
    server_url = "http://127.0.0.1:8013/mcp"
    
    print("🔧 Testing MCP Interface")
    print("=" * 40)
    
    try:
        async with Client(server_url) as client:
            print("✅ MCP client connected")
            
            # Test core tools
            tools_to_test = [
                ("echo", {"message": "Hello MCP!"}),
                ("get_provider_status", {}),
                ("list_models", {}),
            ]
            
            for tool_name, params in tools_to_test:
                try:
                    result = await client.call_tool(tool_name, params)
                    if result:
                        print(f"✅ {tool_name}: OK")
                        # Show sample of result for key tools
                        if tool_name == "get_provider_status":
                            data = json.loads(result[0].text)
                            provider_count = len(data.get('providers', {}))
                            print(f"   → {provider_count} providers configured")
                        elif tool_name == "list_models":
                            data = json.loads(result[0].text)
                            total_models = sum(len(models) for models in data.get('models', {}).values())
                            print(f"   → {total_models} total models available")
                    else:
                        print(f"❌ {tool_name}: No response")
                except Exception as e:
                    print(f"❌ {tool_name}: {e}")
            
            # Test filesystem tools
            print("\n📁 Testing filesystem access...")
            try:
                dirs_result = await client.call_tool("list_allowed_directories", {})
                if dirs_result:
                    print("✅ Filesystem access configured")
            except Exception as e:
                print(f"❌ Filesystem access: {e}")
            
            # Test Python execution
            print("\n🐍 Testing Python sandbox...")
            try:
                python_result = await client.call_tool("execute_python", {
                    "code": "import sys; print(f'Python {sys.version_info.major}.{sys.version_info.minor}')"
                })
                if python_result:
                    result_data = json.loads(python_result[0].text)
                    if result_data.get('success'):
                        print("✅ Python sandbox working")
                        print(f"   → {result_data.get('output', '').strip()}")
                    else:
                        print("❌ Python sandbox failed")
            except Exception as e:
                print(f"❌ Python sandbox: {e}")
                
    except Exception as e:
        print(f"❌ MCP interface failed: {e}")


async def test_rest_api():
    """Test the REST API endpoints."""
    base_url = "http://127.0.0.1:8013"
    
    print("\n🌐 Testing REST API Endpoints")
    print("=" * 40)
    
    async with aiohttp.ClientSession() as session:
        # Test discovery endpoint
        try:
            async with session.get(f"{base_url}/") as response:
                if response.status == 200:
                    data = await response.json()
                    print(f"✅ Discovery endpoint: {data.get('type')}")
                    print(f"   → Transport: {data.get('transport')}")
                    print(f"   → Endpoint: {data.get('endpoint')}")
                else:
                    print(f"❌ Discovery endpoint: HTTP {response.status}")
        except Exception as e:
            print(f"❌ Discovery endpoint: {e}")
        
        # Test health endpoint
        try:
            async with session.get(f"{base_url}/api/health") as response:
                if response.status == 200:
                    data = await response.json()
                    print(f"✅ Health endpoint: {data.get('status')}")
                else:
                    print(f"❌ Health endpoint: HTTP {response.status}")
        except Exception as e:
            print(f"❌ Health endpoint: {e}")
        
        # Test OpenAPI docs
        try:
            async with session.get(f"{base_url}/api/docs") as response:
                if response.status == 200:
                    print("✅ Swagger UI accessible")
                else:
                    print(f"❌ Swagger UI: HTTP {response.status}")
        except Exception as e:
            print(f"❌ Swagger UI: {e}")
        
        # Test cognitive states endpoint
        try:
            async with session.get(f"{base_url}/api/cognitive-states") as response:
                if response.status == 200:
                    data = await response.json()
                    print(f"✅ Cognitive states: {data.get('total', 0)} states")
                else:
                    print(f"❌ Cognitive states: HTTP {response.status}")
        except Exception as e:
            print(f"❌ Cognitive states: {e}")
        
        # Test performance overview
        try:
            async with session.get(f"{base_url}/api/performance/overview") as response:
                if response.status == 200:
                    data = await response.json()
                    overview = data.get('overview', {})
                    print(f"✅ Performance overview: {overview.get('total_actions', 0)} actions")
                else:
                    print(f"❌ Performance overview: HTTP {response.status}")
        except Exception as e:
            print(f"❌ Performance overview: {e}")
        
        # Test artifacts endpoint
        try:
            async with session.get(f"{base_url}/api/artifacts") as response:
                if response.status == 200:
                    data = await response.json()
                    print(f"✅ Artifacts: {data.get('total', 0)} artifacts")
                else:
                    print(f"❌ Artifacts: HTTP {response.status}")
        except Exception as e:
            print(f"❌ Artifacts: {e}")


async def test_tool_completions():
    """Test actual completions with available providers."""
    server_url = "http://127.0.0.1:8013/mcp"
    
    print("\n🤖 Testing LLM Completions")
    print("=" * 40)
    
    try:
        async with Client(server_url) as client:
            # Get available providers first
            provider_result = await client.call_tool("get_provider_status", {})
            provider_data = json.loads(provider_result[0].text)
            
            available_providers = []
            for name, status in provider_data.get('providers', {}).items():
                if status.get('available') and status.get('models'):
                    available_providers.append((name, status['models'][0]))
            
            if not available_providers:
                print("❌ No providers available for testing")
                return
            
            # Test with first available provider
            provider_name, model_info = available_providers[0]
            model_id = model_info.get('id')
            
            print(f"🧪 Testing with {provider_name} / {model_id}")
            
            try:
                result = await client.call_tool("generate_completion", {
                    "prompt": "Count from 1 to 5",
                    "provider": provider_name,
                    "model": model_id,
                    "max_tokens": 50
                })
                
                if result:
                    response_data = json.loads(result[0].text)
                    if response_data.get('success', True):
                        print("✅ Completion successful")
                        print(f"   → Response: {response_data.get('text', '')[:100]}...")
                        if 'usage' in response_data:
                            usage = response_data['usage']
                            print(f"   → Tokens: {usage.get('total_tokens', 'N/A')}")
                    else:
                        print(f"❌ Completion failed: {response_data.get('error')}")
                else:
                    print("❌ No completion response")
                    
            except Exception as e:
                print(f"❌ Completion error: {e}")
                
    except Exception as e:
        print(f"❌ Completion test failed: {e}")


async def test_memory_system():
    """Test the memory and cognitive state system."""
    server_url = "http://127.0.0.1:8013/mcp"
    
    print("\n🧠 Testing Memory System")
    print("=" * 40)
    
    try:
        async with Client(server_url) as client:
            # Test memory storage
            try:
                memory_result = await client.call_tool("store_memory", {
                    "memory_type": "test",
                    "content": "This is a test memory for the test client",
                    "importance": 7.5,
                    "tags": ["test", "client"]
                })
                
                if memory_result:
                    memory_data = json.loads(memory_result[0].text)
                    if memory_data.get('success'):
                        memory_id = memory_data.get('memory_id')
                        print(f"✅ Memory stored: {memory_id}")
                        
                        # Test memory retrieval
                        try:
                            get_result = await client.call_tool("get_memory_by_id", {
                                "memory_id": memory_id
                            })
                            
                            if get_result:
                                print("✅ Memory retrieved successfully")
                        except Exception as e:
                            print(f"❌ Memory retrieval: {e}")
                            
                    else:
                        print(f"❌ Memory storage failed: {memory_data.get('error')}")
                        
            except Exception as e:
                print(f"❌ Memory system: {e}")
                
            # Test cognitive state
            try:
                state_result = await client.call_tool("save_cognitive_state", {
                    "state_type": "test_state",
                    "description": "Test cognitive state from client",
                    "data": {"test": True, "client": "test_client"}
                })
                
                if state_result:
                    state_data = json.loads(state_result[0].text)
                    if state_data.get('success'):
                        print("✅ Cognitive state saved")
                    else:
                        print(f"❌ Cognitive state failed: {state_data.get('error')}")
                        
            except Exception as e:
                print(f"❌ Cognitive state: {e}")
                
    except Exception as e:
        print(f"❌ Memory system test failed: {e}")


async def main():
    """Run all comprehensive tests."""
    print("🚀 Ultimate MCP Server Comprehensive Test Suite")
    print("=" * 60)
    
    # Test MCP interface
    await test_mcp_interface()
    
    # Test REST API
    await test_rest_api()
    
    # Test completions
    await test_tool_completions()
    
    # Test memory system
    await test_memory_system()
    
    print("\n🎯 Comprehensive testing completed!")
    print("\nIf you see mostly ✅ symbols, your server is working correctly!")
    print("Any ❌ symbols indicate areas that may need attention.")


if __name__ == "__main__":
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/constants.py:
--------------------------------------------------------------------------------

```python
"""
Global constants and enumerations for the Ultimate MCP Server.

This module defines system-wide constants, enumerations, and mappings used throughout
the Ultimate MCP Server codebase. Centralizing these values ensures consistency across
the application and simplifies maintenance when values need to be updated.

The module includes:

- Provider enum: Supported LLM providers (OpenAI, Anthropic, etc.)
- TaskType enum: Categories of tasks that can be performed with LLMs
- LogLevel enum: Standard logging levels
- COST_PER_MILLION_TOKENS: Cost estimates for different models
- DEFAULT_MODELS: Default model mappings for each provider
- EMOJI_MAP: Emoji icons for enhanced logging and visualization

These constants should be imported and used directly rather than duplicating their
values in other parts of the codebase. This approach ensures that when values need
to be updated (e.g., adding a new provider or updating pricing), changes only need
to be made in this central location.

Example usage:
    ```python
    from ultimate_mcp_server.constants import Provider, TaskType, EMOJI_MAP
    
    # Use provider enum
    default_provider = Provider.OPENAI
    
    # Get emoji for logging
    success_emoji = EMOJI_MAP["success"]  # ✅
    
    # Check task type
    if task_type == TaskType.COMPLETION:
        # Handle completion task
    ```
"""
from enum import Enum
from typing import Dict


class Provider(str, Enum):
    """
    Enumeration of supported LLM providers in the Ultimate MCP Server.
    
    This enum defines the canonical names for each supported large language model
    provider in the system. These identifiers are used consistently throughout the
    codebase for:
    
    - Configuration settings (provider-specific API keys, endpoints, etc.)
    - Tool parameters (selecting which provider to use for a task)
    - Logging and error reporting (identifying the source of requests/responses)
    - Cost calculation and billing (provider-specific pricing models)
    
    New providers should be added here as they are integrated into the system.
    The string values should be lowercase and match the provider's canonical name
    where possible, as these values appear in API requests/responses.
    
    Usage:
        ```python
        # Reference a provider by enum
        default_provider = Provider.OPENAI
        
        # Convert between string and enum
        provider_name = "anthropic"
        provider_enum = Provider(provider_name)  # Provider.ANTHROPIC
        
        # Check if a provider is supported
        if user_provider in Provider.__members__.values():
            use_provider(user_provider)
        ```
    """
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    DEEPSEEK = "deepseek"
    GEMINI = "gemini"
    OPENROUTER = "openrouter"
    OLLAMA = "ollama"
    GROK = "grok"
    MISTRAL = "mistral"
    AWS = "aws"
    AZURE = "azure"


class TaskType(str, Enum):
    """
    Enumeration of task types that can be performed by LLMs in the system.
    
    This enum categorizes the different types of operations that LLMs can perform
    within the MCP ecosystem. These task types are used for:
    
    - Logging and analytics (tracking usage patterns by task type)
    - Prompt selection (optimizing prompts for specific task types)
    - Resource allocation (prioritizing resources for different task types)
    - Performance monitoring (measuring success rates by task category)
    
    The categorization helps organize tools in a semantically meaningful way and
    provides metadata for optimizing the system's handling of different tasks.
    When tools register with the system, they typically specify which task type
    they represent.
    
    Task types are roughly organized into these categories:
    - Text generation (COMPLETION, GENERATION, etc.)
    - Analysis and understanding (ANALYSIS, CLASSIFICATION, etc.)
    - Data manipulation (EXTRACTION, TRANSLATION, etc.)
    - System interaction (DATABASE, BROWSER, etc.)
    - Document operations (DOCUMENT_PROCESSING, etc.)
    
    Usage:
        ```python
        # Log with task type
        logger.info("Generating text completion", task_type=TaskType.COMPLETION)
        
        # Register tool with its task type
        @register_tool(name="generate_text", task_type=TaskType.COMPLETION)
        async def generate_text(prompt: str):
            # Implementation
        ```
    """
    COMPLETION = "completion"
    CHAT = "chat"
    SUMMARIZATION = "summarization"
    EXTRACTION = "extraction"
    GENERATION = "generation"
    ANALYSIS = "analysis"
    CLASSIFICATION = "classification"
    TRANSLATION = "translation"
    QA = "qa"
    DATABASE = "database"
    QUERY = "query"
    BROWSER = "browser"
    DOWNLOAD = "download"
    UPLOAD = "upload"
    DOCUMENT_PROCESSING = "document_processing"
    DOCUMENT = "document"
    TEXT_ENHANCEMENT = "text_enhancement"
    NER = "ner"
    QUESTION_ANSWERING = "question_answering"
    QUALITY_ASSESSMENT = "quality_assessment"
    OCR = "ocr"
    TEXT_EXTRACTION = "text_extraction"
    CODE_EXECUTION = "code_execution"


class LogLevel(str, Enum):
    """Log levels."""
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"


# Cost estimates for model pricing (in dollars per million tokens)
# This constant defines the estimated costs for different models, used for cost tracking and budgeting
# Values represent US dollars per million tokens, differentiated by input (prompt) and output (completion) costs
# These costs may change as providers update their pricing, and should be periodically reviewed
COST_PER_MILLION_TOKENS: Dict[str, Dict[str, float]] = {
    # OpenAI models
    "gpt-4o": {"input": 2.5, "output": 10.0},
    "gpt-4o-mini": {"input": 0.15, "output": 0.6},
    "gpt-4.1": {"input": 2.0, "output": 8.0},
    "gpt-4.1-mini": {"input": 0.40, "output": 1.60},
    "gpt-4.1-nano": {"input": 0.10, "output": 0.40},
    "o1-preview": {"input": 15.00, "output": 60.00},
    "o3-mini": {"input": 1.10, "output": 4.40},
    
    # Claude models
    "claude-3-7-sonnet-20250219": {"input": 3.0, "output": 15.0},
    "claude-3-5-haiku-20241022": {"input": 0.80, "output": 4.0},
    "claude-3-opus-20240229": {"input": 15.0, "output": 75.0},

    # DeepSeek models
    "deepseek-chat": {"input": 0.27, "output": 1.10},
    "deepseek-reasoner": {"input": 0.55, "output": 2.19},
    
    # Gemini models
    "gemini-2.0-flash-lite": {"input": 0.075, "output": 0.30},
    "gemini-2.0-flash": {"input": 0.35, "output": 1.05},
    "gemini-2.0-flash-thinking-exp-01-21": {"input": 0.0, "output": 0.0},
    "gemini-2.5-pro-preview-03-25": {"input": 1.25, "output": 10.0},

    # OpenRouter models
    "mistralai/mistral-nemo": {"input": 0.035, "output": 0.08},
    
    # Grok models (based on the provided documentation)
    "grok-3-latest": {"input": 3.0, "output": 15.0},
    "grok-3-fast-latest": {"input": 5.0, "output": 25.0},
    "grok-3-mini-latest": {"input": 0.30, "output": 0.50},
    "grok-3-mini-fast-latest": {"input": 0.60, "output": 4.0},
    
    # Ollama models (very low estimated costs since they run locally)
    "mix_77/gemma3-qat-tools:27b": {"input": 0.0001, "output": 0.0001},
    "JollyLlama/GLM-Z1-32B-0414-Q4_K_M:latest": {"input": 0.0001, "output": 0.0001},
    "llama3.2-vision:latest": {"input": 0.0001, "output": 0.0001},
}


# Default models by provider
# This mapping defines the recommended default model for each supported provider
# Used when no specific model is requested in API calls or tool invocations
# These defaults aim to balance quality, speed, and cost for general-purpose usage
DEFAULT_MODELS = {
    Provider.OPENAI: "gpt-4.1-mini",
    Provider.ANTHROPIC: "claude-3-5-haiku-20241022",
    Provider.DEEPSEEK: "deepseek-chat",
    Provider.GEMINI: "gemini-2.5-pro-preview-03-25",
    Provider.OPENROUTER: "mistralai/mistral-nemo",
    Provider.GROK: "grok-3-latest",
    Provider.OLLAMA: "mix_77/gemma3-qat-tools:27b"
}


# Emoji mapping by log type and action
# Provides visual indicators for different log types, components, and actions
# Used in rich logging output to improve readability and visual scanning
# Organized into sections: general status, components, tasks, and providers
EMOJI_MAP = {
    "start": "🚀",
    "success": "✅",
    "error": "❌",
    "warning": "⚠️",
    "info": "ℹ️",
    "debug": "🔍",
    "critical": "🔥",
    
    # Component-specific emojis
    "server": "🖥️",
    "cache": "💾",
    "provider": "🔌",
    "request": "📤",
    "response": "📥",
    "processing": "⚙️",
    "model": "🧠",
    "config": "🔧",
    "token": "🔢",
    "cost": "💰",
    "time": "⏱️",
    "tool": "🛠️",
    "tournament": "🏆",
    "cancel": "🛑",
    "database": "🗄️",
    "browser": "🌐",
    
    # Task-specific emojis
    "completion": "✍️",
    "chat": "💬",
    "summarization": "📝",
    "extraction": "🔍",
    "generation": "🎨",
    "analysis": "📊",
    "classification": "🏷️",
    "query": "🔍",
    "browser_automation": "🌐",
    "database_interactions": "🗄️",
    "download": "⬇️",
    "upload": "⬆️",
    "document_processing": "📄",
    "document": "📄",
    "translation": "🔄",
    "qa": "❓",
    
    # Provider-specific emojis
    Provider.OPENAI: "🟢",
    Provider.ANTHROPIC: "🟣",
    Provider.DEEPSEEK: "🟠", 
    Provider.GEMINI: "🔵",
    Provider.OPENROUTER: "🌐",
    Provider.OLLAMA: "🦙",
    Provider.GROK: "⚡"
}


# Base toolset categories for the server
BASE_TOOLSET_CATEGORIES = {
    "Completion": ["generate_completion", "stream_completion", "chat_completion", "multi_completion"],
    "Provider": ["get_provider_status", "list_models"],
    "Filesystem": ["read_file", "write_file", "edit_file", "list_directory", "directory_tree", "search_files"],
    "Optimization": ["estimate_cost", "compare_models", "recommend_model"],
    "Text Processing": ["run_ripgrep", "run_awk", "run_sed", "run_jq"],
    "Meta": ["get_tool_info", "get_llm_instructions", "get_tool_recommendations"],
    "Search": ["marqo_fused_search"],
    # Browser automation tools
    "Browser": [
        "browser_init", "browser_navigate", "browser_click", "browser_type", 
        "browser_screenshot", "browser_close", "browser_select", "browser_checkbox", 
        "browser_get_text", "browser_get_attributes", "browser_execute_javascript", 
        "browser_wait", "browser_back", "browser_forward", "browser_reload", 
        "browser_get_console_logs", "browser_download_file", "browser_upload_file", 
        "browser_pdf", "browser_tab_new", "browser_tab_close", "browser_tab_list", 
        "browser_tab_select"
    ],
    "Web Research": [
        "execute_web_workflow", "extract_structured_data_from_pages", 
        "find_and_download_pdfs", "multi_engine_search_summary",
        "monitor_web_data_points", "research_and_synthesize_report"
    ],
    # HTML to markdown tools
    "HTML Processing": [
        "clean_and_format_text_as_markdown", "detect_content_type", 
        "batch_format_texts", "optimize_markdown_formatting"
    ],
    # Extraction tools
    "Extraction": [
        "extract_json", "extract_table", "extract_key_value_pairs", 
        "extract_semantic_schema"
    ],
    # Cognitive and agent memory tools
    "Cognitive and Agent Memory": [
        "initialize_memory_system", "create_workflow", "update_workflow_status",
        "record_action_start", "record_action_completion", "get_action_details",
        "summarize_context_block", "add_action_dependency", "get_action_dependencies",
        "record_artifact", "record_thought", "store_memory", "get_memory_by_id",
        "hybrid_search_memories", "create_memory_link",
        "query_memories", "list_workflows", "get_workflow_details", "get_recent_actions",
        "get_artifacts", "get_artifact_by_id", "create_thought_chain", "get_thought_chain",
        "get_working_memory", "focus_memory", "optimize_working_memory",
        "save_cognitive_state", "load_cognitive_state", "get_workflow_context",
        "auto_update_focus", "promote_memory_level", "update_memory", "get_linked_memories",
        "consolidate_memories", "generate_reflection", "summarize_text",
        "delete_expired_memories", "compute_memory_statistics"
    ],
}
```

--------------------------------------------------------------------------------
/tests/unit/test_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for the tool implementations."""
from typing import Any, Dict

import pytest
from pytest import MonkeyPatch

from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.tools.base import (
    BaseTool,
    register_tool,
    with_retry,
    with_tool_metrics,
)

# Remove the CompletionTools import as the class was deleted
# from ultimate_mcp_server.tools.completion import CompletionTools 
from ultimate_mcp_server.tools.document import DocumentTools
from ultimate_mcp_server.tools.extraction import ExtractionTools
from ultimate_mcp_server.utils import get_logger

logger = get_logger("test.tools")


class TestBaseTools:
    """Tests for the base tool classes and decorators."""
    
    def test_base_tool_init(self, mock_gateway: Gateway):
        """Test base tool initialization."""
        logger.info("Testing base tool initialization", emoji_key="test")
        
        # Create a minimal tool class
        class TestTool(BaseTool):
            tool_name = "test-tool"
            description = "Test tool"
            
            def _register_tools(self):
                # No tools to register
                pass
        
        # Initialize
        tool = TestTool(mock_gateway)
        
        # Check properties
        assert tool.tool_name == "test-tool"
        assert tool.description == "Test tool"
        assert tool.mcp == mock_gateway.mcp
        assert tool.logger is not None
        assert tool.metrics is not None
        
    @pytest.mark.asyncio
    async def test_with_tool_metrics(self):
        """Test the with_tool_metrics decorator."""
        logger.info("Testing with_tool_metrics decorator", emoji_key="test")
        
        # Create a tool class with metrics
        class TestTool(BaseTool):
            tool_name = "test-metrics-tool"
            description = "Test metrics tool"
            
            def _register_tools(self):
                pass
            
            @with_tool_metrics
            async def test_method(self, arg1, arg2=None, ctx=None):
                return {"result": arg1 + str(arg2 or "")}
        
        # Create a mock MCP server
        mock_mcp = type("MockMCP", (), {"tool": lambda: lambda x: x})
        mock_gateway = type("MockGateway", (), {"mcp": mock_mcp})
        
        # Initialize
        tool = TestTool(mock_gateway)
        
        # Call method
        result = await tool.test_method("test", "arg")
        
        # Check result
        assert result == {"result": "testarg"}
        
        # Check metrics
        assert tool.metrics.total_calls == 1
        assert tool.metrics.successful_calls == 1
        assert tool.metrics.failed_calls == 0
        
        # Test error case
        @with_tool_metrics
        async def failing_method(self, arg):
            raise ValueError("Test error")
            
        # Add to class
        TestTool.failing_method = failing_method
        
        # Call failing method
        with pytest.raises(ValueError):
            await tool.failing_method("test")
            
        # Check metrics
        assert tool.metrics.total_calls == 2
        assert tool.metrics.successful_calls == 1
        assert tool.metrics.failed_calls == 1
        
    @pytest.mark.asyncio
    async def test_with_retry(self):
        """Test the with_retry decorator."""
        logger.info("Testing with_retry decorator", emoji_key="test")
        
        # Track calls
        calls = []
        
        @with_retry(max_retries=2, retry_delay=0.1)
        async def flaky_function(succeed_after):
            calls.append(len(calls))
            if len(calls) < succeed_after:
                raise ValueError("Temporary error")
            return "success"
        
        # Should succeed on first try
        calls = []
        result = await flaky_function(1)
        assert result == "success"
        assert len(calls) == 1
        
        # Should fail first, succeed on retry
        calls = []
        result = await flaky_function(2)
        assert result == "success"
        assert len(calls) == 2
        
        # Should fail first two, succeed on second retry
        calls = []
        result = await flaky_function(3)
        assert result == "success"
        assert len(calls) == 3
        
        # Should fail too many times
        calls = []
        with pytest.raises(ValueError):
            await flaky_function(4)  # Will make 3 attempts (original + 2 retries)
        assert len(calls) == 3
    
    def test_register_tool(self, mock_gateway: Gateway):
        """Test the register_tool decorator."""
        logger.info("Testing register_tool decorator", emoji_key="test")
        
        # Create a mock MCP server with a tool registration function
        registered_tools = {}
        
        class MockMCP:
            def tool(self, name=None, description=None):
                def decorator(f):
                    registered_tools[name or f.__name__] = {
                        "function": f,
                        "description": description or f.__doc__
                    }
                    return f
                return decorator
        
        mock_mcp = MockMCP()
        mock_gateway.mcp = mock_mcp
        
        # Register a tool
        @register_tool(mock_gateway.mcp, name="test-tool", description="Test tool")
        async def test_tool(arg1, arg2=None):
            """Tool docstring."""
            return {"result": arg1 + str(arg2 or "")}
        
        # Check registration
        assert "test-tool" in registered_tools
        assert registered_tools["test-tool"]["description"] == "Test tool"
        
        # Register with defaults
        @register_tool(mock_gateway.mcp)
        async def another_tool(arg):
            """Another tool docstring."""
            return {"result": arg}
        
        # Check registration with defaults
        assert "another_tool" in registered_tools
        assert registered_tools["another_tool"]["description"] == "Another tool docstring."


# Comment out the entire TestCompletionTools class as it relies on the deleted class structure
# class TestCompletionTools:
#     """Tests for the completion tools."""
#     
#     @pytest.fixture
#     def mock_completion_tools(self, mock_gateway: Gateway) -> CompletionTools:
#         """Get mock completion tools."""
#         # This fixture is no longer valid as CompletionTools doesn't exist
#         # We would need to refactor tests to mock standalone functions
#         pass 
#         # return CompletionTools(mock_gateway)
#     
#     def test_init(self, mock_completion_tools: CompletionTools):
#         """Test initialization."""
#         logger.info("Testing completion tools initialization", emoji_key="test")
#         # This test is no longer valid
#         # assert mock_completion_tools.tool_name == "completion"
#         # assert mock_completion_tools.description is not None
#         pass
#         
#     async def test_generate_completion(self, mock_completion_tools: CompletionTools, mock_gateway: Gateway, monkeypatch: MonkeyPatch):
#         """Test generate_completion tool."""
#         logger.info("Testing generate_completion tool", emoji_key="test")
#         
#         # Mocking needs to target the standalone function now, not a method
#         # This test needs complete refactoring
#         pass
# 
#     async def test_chat_completion(self, mock_completion_tools: CompletionTools, mock_gateway: Gateway, monkeypatch: MonkeyPatch):
#         """Test chat_completion tool."""
#         logger.info("Testing chat_completion tool", emoji_key="test")
#         # This test needs complete refactoring
#         pass
# 
#     async def test_stream_completion(self, mock_completion_tools: CompletionTools, mock_gateway: Gateway, monkeypatch: MonkeyPatch):
#         """Test stream_completion tool."""
#         logger.info("Testing stream_completion tool", emoji_key="test")
#         # This test needs complete refactoring
#         pass
# 
#     async def test_multi_completion(self, mock_completion_tools: CompletionTools, mock_gateway: Gateway, monkeypatch: MonkeyPatch):
#         """Test multi_completion tool."""
#         logger.info("Testing multi_completion tool", emoji_key="test")
#         # This test needs complete refactoring
#         pass


class TestDocumentTools:
    """Tests for the document tools."""
    
    @pytest.fixture
    def mock_document_tools(self, mock_gateway: Gateway) -> DocumentTools:
        """Get mock document tools."""
        return DocumentTools(mock_gateway)
    
    def test_init(self, mock_document_tools: DocumentTools):
        """Test initialization."""
        logger.info("Testing document tools initialization", emoji_key="test")
        
        assert mock_document_tools.tool_name is not None
        assert mock_document_tools.description is not None
        
    async def test_chunk_document(self, mock_document_tools: DocumentTools, sample_document: str, monkeypatch: MonkeyPatch):
        """Test chunk_document tool."""
        logger.info("Testing chunk_document tool", emoji_key="test")
        
        # Create a simplified implementation for testing
        async def mock_chunk_document(document, chunk_size=1000, chunk_overlap=100, method="token", ctx=None):
            chunks = []
            # Simple paragraph chunking for testing
            for para in document.split("\n\n"):
                if para.strip():
                    chunks.append(para.strip())
            return {
                "chunks": chunks,
                "chunk_count": len(chunks),
                "method": method,
                "processing_time": 0.1
            }
        
        # Create a mock execute function for our BaseTool
        async def mock_execute(tool_name, params):
            # Call our mock implementation
            return await mock_chunk_document(**params)
        
        # Monkeypatch the tool execution using our new execute method
        monkeypatch.setattr(mock_document_tools, "execute", mock_execute)
        
        # Call the tool
        result = await mock_document_tools.execute("chunk_document", {
            "document": sample_document,
            "method": "paragraph"
        })
        
        # Check result
        assert isinstance(result, dict)
        assert "chunks" in result
        assert isinstance(result["chunks"], list)
        assert result["chunk_count"] > 0
        assert result["method"] == "paragraph"
        assert result["processing_time"] > 0


class TestExtractionTools:
    """Tests for the extraction tools."""
    
    @pytest.fixture
    def mock_extraction_tools(self, mock_gateway: Gateway) -> ExtractionTools:
        """Get mock extraction tools."""
        return ExtractionTools(mock_gateway)
    
    def test_init(self, mock_extraction_tools: ExtractionTools):
        """Test initialization."""
        logger.info("Testing extraction tools initialization", emoji_key="test")
        
        assert mock_extraction_tools.tool_name == "extraction"
        assert mock_extraction_tools.description is not None
        
    async def test_extract_json(self, mock_extraction_tools: ExtractionTools, sample_json_data: Dict[str, Any], monkeypatch: MonkeyPatch):
        """Test extract_json tool."""
        logger.info("Testing extract_json tool", emoji_key="test")
        
        # Mock the tool execution
        async def mock_extract_json(text, schema=None, provider="openai", model=None, max_attempts=3, ctx=None):
            return {
                "data": sample_json_data,
                "provider": provider,
                "model": model or "mock-model",
                "tokens": {
                    "input": 50,
                    "output": 30,
                    "total": 80
                },
                "cost": 0.01,
                "processing_time": 0.2
            }
        
        # Create a mock execute function for our BaseTool
        async def mock_execute(tool_name, params):
            # Call our mock implementation
            return await mock_extract_json(**params)
            
        # Monkeypatch the tool execution using our new execute method
        monkeypatch.setattr(mock_extraction_tools, "execute", mock_execute)
        
        # Call the tool
        result = await mock_extraction_tools.execute("extract_json", {
            "text": "Extract JSON from this: " + str(sample_json_data),
            "provider": "mock",
            "model": "mock-model"
        })
        
        # Check result
        assert isinstance(result, dict)
        assert "data" in result
        assert result["data"] == sample_json_data
        assert result["provider"] == "mock"
        assert result["model"] == "mock-model"
        assert "tokens" in result
        assert "cost" in result
        assert "processing_time" in result
```

--------------------------------------------------------------------------------
/examples/text_redline_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Comprehensive HTML‑redline demo that exercises **every** change type.

Run this file after you have installed/linked the Ultimate‑MCP‑Server package
in editable mode (``pip install -e .``) or added the repo root to ``PYTHONPATH``.
It generates a single HTML file (``./redline_outputs/comprehensive_redline_demo.html``)
that you can open in any browser to see insertions (blue), deletions (red),
move‑targets/sources (green), attribute changes (orange) and inline word‑level
diffs.
"""

from __future__ import annotations

import asyncio
import sys
from pathlib import Path
from typing import Dict, List

from rich import box
from rich.console import Console
from rich.markup import escape
from rich.table import Table

# ---------------------------------------------------------------------------
# 1.  Make sure we can import the Ultimate‑MCP‑Server package from source.
# ---------------------------------------------------------------------------
ROOT = Path(__file__).resolve().parents[1]  # repo root (../..)
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

# ---------------------------------------------------------------------------
# 2.  Project‑level imports (raise immediately if the dev env is broken)
# ---------------------------------------------------------------------------
from ultimate_mcp_server.exceptions import ToolError  # noqa: E402
from ultimate_mcp_server.tools.filesystem import write_file  # noqa: E402
from ultimate_mcp_server.tools.text_redline_tools import (  # noqa: E402
    create_html_redline,  # noqa: E402
)
from ultimate_mcp_server.utils import get_logger  # noqa: E402

# ---------------------------------------------------------------------------
# 3.  Logger / console helpers
# ---------------------------------------------------------------------------
LOGGER = get_logger("demo.comprehensive_redline")
CONSOLE = Console()

# ---------------------------------------------------------------------------
# 4.  Demo input documents (original vs. modified)
# ---------------------------------------------------------------------------
ORIGINAL_HTML = """<!DOCTYPE html>
<html>
<head>
    <title>Comprehensive Demo Document</title>
    <meta name="description" content="A document to demonstrate redlining features">
</head>
<body>
    <h1>Project Documentation</h1>
    
    <div class="intro">
        <p>This project documentation covers all aspects of the Alpha system implementation.</p>
        <p>Last updated on January 15, 2025</p>
    </div>

    <h2>Executive Summary</h2>
    <p>The Alpha system provides robust data processing capabilities for enterprise applications.</p>
    <p>This documentation serves as the primary reference for developers and system architects.</p>

    <h2>Architecture Overview</h2>
    <p>The system follows a microservices architecture with the following components:</p>
    <ul>
        <li>Data ingestion layer</li>
        <li>Processing engine</li>
        <li>Storage layer</li>
        <li>API gateway</li>
    </ul>

    <h2>Implementation Details</h2>
    <p>Implementation follows the standard protocol described in section 5.2 of the technical specifications.</p>
    <p>All components must pass integration tests before deployment.</p>
    
    <h2>Deployment Process</h2>
    <p>Deployment occurs in three phases:</p>
    <ol>
        <li>Development environment validation</li>
        <li>Staging environment testing</li>
        <li>Production rollout</li>
    </ol>
    <p>Each phase requires approval from the technical lead.</p>

    <h2>Security Considerations</h2>
    <p>All data must be encrypted during transfer and at rest.</p>
    <p>Authentication uses OAuth 2.0 with JWT tokens.</p>
    <p>Regular security audits are conducted quarterly.</p>

    <table border="1">
        <tr>
            <th>Component</th>
            <th>Responsible Team</th>
            <th>Status</th>
        </tr>
        <tr>
            <td>Data ingestion</td>
            <td>Data Engineering</td>
            <td>Complete</td>
        </tr>
        <tr>
            <td>Processing engine</td>
            <td>Core Systems</td>
            <td>In progress</td>
        </tr>
        <tr>
            <td>Storage layer</td>
            <td>Infrastructure</td>
            <td>Complete</td>
        </tr>
        <tr>
            <td>API gateway</td>
            <td>API Team</td>
            <td>Planning</td>
        </tr>
    </table>

    <h2>Appendix</h2>
    <p>For additional information, refer to the technical specifications document.</p>
    <p>Contact <a href="mailto:[email protected]">[email protected]</a> with any questions.</p>
</body>
</html>"""

MODIFIED_HTML = """<!DOCTYPE html>
<html>
<head>
    <title>Comprehensive Demo Document - 2025 Update</title>
    <meta name="description" content="A document to demonstrate all redlining features">
    <meta name="author" content="Documentation Team">
</head>
<body>
    <h1>Project Documentation</h1>
    
    <div class="intro">
        <p>This project documentation covers all aspects of the Alpha system implementation and integration.</p>
        <p>Last updated on May 5, 2025</p>
    </div>

    <h2>Appendix</h2>
    <p>For additional information, refer to the technical specifications document and API references.</p>
    <p>Contact <a href="mailto:[email protected]">[email protected]</a> with any questions.</p>

    <h2>Security Considerations</h2>
    <p>All data must be encrypted during transfer and at rest using AES-256 encryption.</p>
    <p>Authentication uses OAuth 2.0 with JWT tokens and optional two-factor authentication.</p>
    <p>Regular security audits are conducted quarterly by an independent security firm.</p>
    <p>Penetration testing is performed bi-annually.</p>

    <h2>Executive Summary</h2>
    <p>The Alpha system provides robust data processing capabilities for enterprise applications with enhanced performance.</p>
    <p>This documentation serves as the primary reference for developers, system architects, and operations teams.</p>
    <p>The system has been validated against ISO 27001 standards.</p>

    <h2>Architecture Overview</h2>
    <p>The system implements a cloud-native microservices architecture with the following components:</p>
    <ul>
        <li>Data ingestion layer with real-time processing</li>
        <li>Distributed processing engine</li>
        <li>Multi-region storage layer</li>
        <li>API gateway with rate limiting</li>
        <li>Monitoring and observability platform</li>
        <li>Disaster recovery system</li>
    </ul>

    <h2>Implementation Details</h2>
    <p>Implementation follows the enhanced protocol described in section 6.3 of the technical specifications.</p>
    <p>All components must pass integration and performance tests before deployment.</p>
    
    <table border="1">
        <tr>
            <th>Component</th>
            <th>Responsible Team</th>
            <th>Status</th>
            <th>Performance</th>
        </tr>
        <tr>
            <td>Data ingestion</td>
            <td>Data Engineering</td>
            <td>Complete</td>
            <td>Exceeds SLA</td>
        </tr>
        <tr>
            <td>Processing engine</td>
            <td>Core Systems</td>
            <td>Complete</td>
            <td>Meets SLA</td>
        </tr>
        <tr>
            <td>Storage layer</td>
            <td>Infrastructure</td>
            <td>Complete</td>
            <td>Meets SLA</td>
        </tr>
        <tr>
            <td>API gateway</td>
            <td>API Team</td>
            <td>Complete</td>
            <td>Exceeds SLA</td>
        </tr>
        <tr>
            <td>Monitoring platform</td>
            <td>DevOps</td>
            <td>Complete</td>
            <td>Meets SLA</td>
        </tr>
    </table>

    <h2>Scalability Considerations</h2>
    <p>The system is designed to scale horizontally with increasing load.</p>
    <p>Auto-scaling policies are configured for all compute resources.</p>
    <p>Database sharding is implemented for high-volume tenants.</p>
</body>
</html>"""

# ---------------------------------------------------------------------------
# 5.  Human‑readable change checklist (for demo output only)
# ---------------------------------------------------------------------------
CHANGE_SUMMARY: Dict[str, List[str]] = {
    "insertions": [
        "New <meta author> tag",
        "'and integration' added to intro paragraph",
        "AES‑256 wording added to encryption para",
        "Two‑factor authentication mention added",
        "Independent security firm phrase added",
        "Entire penetration‑testing paragraph added",
        "'with enhanced performance' in exec summary",
        "Audience now includes operations teams",
        "ISO‑27001 paragraph added",
        "'cloud‑native' adjective added",
        "Real‑time processing detail added",
        "'Distributed' processing engine detail",
        "Multi‑region storage detail",
        "Rate‑limiting mention in API gateway",
        "Two new architecture components",
        "Protocol reference bumped to 6.3",
        "Performance tests requirement added",
        "New PERFORMANCE column in table",
        "New Monitoring‑platform row",
        "Whole SCALABILITY section added",
    ],
    "deletions": [
        "API‑gateway status 'Planning' removed",
        "Deployment‑process section removed",
    ],
    "moves": [
        "Appendix moved before Security section",
        "Security section moved before Exec‑Summary",
    ],
    "updates": [
        "<title> suffixed with '2025 Update'",
        "Meta description tweaked",
        "Updated date to 5 May 2025",
        "Support e‑mail address changed",
        "Processing‑engine status updated",
    ],
}

# ---------------------------------------------------------------------------
# 6.  Async helper running the diff + reporting
# ---------------------------------------------------------------------------
OUTPUT_DIR = Path(__file__).with_suffix("").parent / "redline_outputs"
MARKDOWN_PATH = OUTPUT_DIR / "detected_redline_differences.md"


async def generate_redline() -> None:
    CONSOLE.print("\n[bold blue]Generating HTML redline…[/bold blue]")
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    try:
        result = await create_html_redline(
            original_html=ORIGINAL_HTML,
            modified_html=MODIFIED_HTML,
            detect_moves=True,
            include_css=True,
            add_navigation=True,
            output_format="html",
            generate_markdown=True,
            markdown_path=str(MARKDOWN_PATH),
        )
    except Exception as exc:  # demo only
        LOGGER.error("Failed to generate redline", exc_info=True)
        CONSOLE.print(f"[red bold]Error:[/red bold] {escape(str(exc))}")
        return

    # ── Rich stats table ────────────────────────────────────────────────────────────
    stats_tbl = Table(title="Redline statistics", box=box.ROUNDED)
    stats_tbl.add_column("Metric", style="cyan")
    stats_tbl.add_column("Value", style="magenta")
    for k, v in result["stats"].items():
        stats_tbl.add_row(k.replace("_", " ").title(), str(v))
    stats_tbl.add_row("Processing time", f"{result['processing_time']:.3f}s")
    CONSOLE.print(stats_tbl)

    # ── manual checklist ────────────────────────────────────────────────────────────
    CONSOLE.print("\n[bold green]Manual checklist of expected changes[/bold green]")
    for cat, items in CHANGE_SUMMARY.items():
        CONSOLE.print(f"[cyan]{cat.title()}[/cyan] ({len(items)})")
        for idx, txt in enumerate(items, 1):
            CONSOLE.print(f"   {idx:>2}. {txt}")

    # ── write HTML diff ─────────────────────────────────────────────────────────────
    html_path = OUTPUT_DIR / "comprehensive_redline_demo.html"
    try:
        await write_file(path=str(html_path), content=result["redline_html"])
    except (ToolError, Exception) as exc:  # demo only
        LOGGER.warning("Unable to save HTML", exc_info=True)
        CONSOLE.print(f"\n[bold red]Warning:[/bold red] Could not save HTML — {exc}")
    else:
        LOGGER.info("Saved redline to %s", html_path)
        CONSOLE.print(f"\n[green]HTML written to:[/green] {html_path}")

    # ── ensure Markdown file exists (tool usually writes it already) ────────────────
    if not MARKDOWN_PATH.is_file() and "markdown_summary" in result:
        MARKDOWN_PATH.write_text(result["markdown_summary"], encoding="utf-8")
    if MARKDOWN_PATH.is_file():
        CONSOLE.print(f"[green]Markdown summary:[/green] {MARKDOWN_PATH}")


# ───────────────────────────── 7. entrypoint ────────────────────────────────────────
async def _amain() -> int:
    CONSOLE.rule("[white on blue]📝  Comprehensive Text-Redline Demo  📝")
    await generate_redline()
    CONSOLE.rule("Complete", style="green")
    return 0


if __name__ == "__main__":
    sys.exit(asyncio.run(_amain()))

```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/manager.py:
--------------------------------------------------------------------------------

```python
"""Knowledge base manager for RAG functionality."""
import time
from typing import Any, Dict, List, Optional

from ultimate_mcp_server.services.vector import VectorDatabaseService
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


class KnowledgeBaseManager:
    """
    Manager for creating and maintaining knowledge bases for RAG applications.
    
    The KnowledgeBaseManager provides a high-level interface for working with vector 
    databases as knowledge bases for Retrieval-Augmented Generation (RAG) systems.
    It abstracts the complexities of vector database operations, focusing on the 
    domain-specific needs of knowledge management for AI applications.
    
    Key Features:
    - Knowledge base lifecycle management (create, delete, list, get)
    - Document ingestion with metadata support
    - Vector embedding management for semantic search
    - Document chunking and processing
    - Persistence and durability guarantees
    - Metadata tracking for knowledge base statistics
    
    Architecture:
    The manager sits between RAG applications and the underlying vector database,
    providing domain-specific operations while delegating storage and embedding
    to specialized services. It primarily interacts with:
    1. Vector Database Service - for persistent storage of embeddings and documents
    2. Embedding Service - for converting text to vector representations
    3. Text Chunking Service - for breaking documents into optimal retrieval units
    
    Technical Characteristics:
    - Asynchronous API for high throughput in server environments
    - Thread-safe operations for concurrent access
    - Consistent error handling and logging
    - Idempotent operations where possible
    - Transactional guarantees for critical operations
    
    This service is typically accessed through the singleton get_knowledge_base_manager()
    function, which ensures a single instance is shared across the application.
    
    Example Usage:
    ```python
    # Get the manager
    kb_manager = get_knowledge_base_manager()
    
    # Create a new knowledge base
    await kb_manager.create_knowledge_base(
        name="company_policies",
        description="Corporate policy documents and guidelines"
    )
    
    # Add documents with metadata
    await kb_manager.add_documents(
        knowledge_base_name="company_policies",
        documents=[
            "All employees must complete annual security training.",
            "Remote work is available for eligible positions with manager approval."
        ],
        metadatas=[
            {"source": "security_policy.pdf", "category": "security", "page": 12},
            {"source": "hr_handbook.pdf", "category": "remote_work", "page": 45}
        ],
        chunk_size=500,
        chunk_method="semantic"
    )
    
    # List available knowledge bases
    kb_list = await kb_manager.list_knowledge_bases()
    print(f"Found {kb_list['count']} knowledge bases")
    
    # Get details about a specific knowledge base
    kb_info = await kb_manager.get_knowledge_base("company_policies")
    doc_count = kb_info.get("metadata", {}).get("doc_count", 0)
    print(f"Knowledge base contains {doc_count} document chunks")
    ```
    """
    
    def __init__(self, vector_service: VectorDatabaseService):
        """Initialize the knowledge base manager.
        
        Args:
            vector_service: Vector database service for storing embeddings
        """
        self.vector_service = vector_service
        logger.info("Knowledge base manager initialized", extra={"emoji_key": "success"})
    
    async def create_knowledge_base(
        self,
        name: str,
        description: Optional[str] = None,
        embedding_model: Optional[str] = None,
        overwrite: bool = False
    ) -> Dict[str, Any]:
        """Create a new knowledge base.
        
        Args:
            name: Knowledge base name
            description: Optional description
            embedding_model: Optional embedding model name
            overwrite: Whether to overwrite existing knowledge base
            
        Returns:
            Knowledge base metadata
        """
        # Check if knowledge base already exists
        collections = await self.vector_service.list_collections()
        
        if name in collections and not overwrite:
            logger.warning(
                f"Knowledge base '{name}' already exists", 
                extra={"emoji_key": "warning"}
            )
            return {"status": "exists", "name": name}
        
        # Create new collection for knowledge base
        metadata = {
            "type": "knowledge_base",
            "description": description or "",
            "created_at": time.time(),
            "doc_count": 0
        }
        
        # Only add embedding_model if not None (to avoid ChromaDB errors)
        if embedding_model is not None:
            metadata["embedding_model"] = embedding_model
            
        logger.debug(f"Creating knowledge base with metadata: {metadata}")
        
        # Ensure any existing collection is deleted first
        if overwrite:
            try:
                # Force delete any existing collection
                await self.vector_service.delete_collection(name)
                logger.debug(f"Force deleted existing collection '{name}' for clean creation")
                # Add a small delay to ensure deletion completes
                import asyncio
                await asyncio.sleep(0.2)  
            except Exception as e:
                logger.debug(f"Error during force deletion: {str(e)}")
        
        try:
            await self.vector_service.create_collection(name, metadata=metadata)
            
            logger.info(
                f"Created knowledge base '{name}'", 
                extra={"emoji_key": "success"}
            )
            
            return {
                "status": "created",
                "name": name,
                "metadata": metadata
            }
        except Exception as e:
            logger.error(
                f"Failed to create knowledge base '{name}': {str(e)}", 
                extra={"emoji_key": "error"}
            )
            raise ValueError(f"Failed to create knowledge base: {str(e)}") from e
    
    async def delete_knowledge_base(self, name: str) -> Dict[str, Any]:
        """Delete a knowledge base.
        
        Args:
            name: Knowledge base name
            
        Returns:
            Deletion status
        """
        # Check if knowledge base exists
        collections = await self.vector_service.list_collections()
        
        if name not in collections:
            logger.warning(
                f"Knowledge base '{name}' not found", 
                extra={"emoji_key": "warning"}
            )
            return {"status": "not_found", "name": name}
        
        # Delete collection
        await self.vector_service.delete_collection(name)
        
        logger.info(
            f"Deleted knowledge base '{name}'", 
            extra={"emoji_key": "success"}
        )
        
        return {
            "status": "deleted",
            "name": name
        }
    
    async def list_knowledge_bases(self):
        """List all knowledge bases.
        
        Returns:
            List of knowledge bases with metadata
        """
        collection_names = await self.vector_service.list_collections()
        kb_list = []
        
        for name in collection_names:
            try:
                metadata = await self.vector_service.get_collection_metadata(name)
                # Only include collections that are knowledge bases
                if metadata and metadata.get("type") == "knowledge_base":
                    # Create a simple dict with name and metadata
                    kb = {
                        "name": name,
                        "metadata": metadata
                    }
                    kb_list.append(kb)
            except Exception as e:
                logger.error(
                    f"Error getting metadata for collection '{name}': {str(e)}", 
                    extra={"emoji_key": "error"}
                )
        
        return {
            "count": len(kb_list),
            "knowledge_bases": kb_list
        }
    
    async def get_knowledge_base(self, name: str) -> Dict[str, Any]:
        """Get knowledge base metadata.
        
        Args:
            name: Knowledge base name
            
        Returns:
            Knowledge base metadata
        """
        # Check if knowledge base exists
        collections = await self.vector_service.list_collections()
        
        if name not in collections:
            logger.warning(
                f"Knowledge base '{name}' not found", 
                extra={"emoji_key": "warning"}
            )
            return {"status": "not_found", "name": name}
        
        # Get metadata
        metadata = await self.vector_service.get_collection_metadata(name)
        
        if metadata.get("type") != "knowledge_base":
            logger.warning(
                f"Collection '{name}' is not a knowledge base", 
                extra={"emoji_key": "warning"}
            )
            return {"status": "not_knowledge_base", "name": name}
        
        return {
            "status": "found",
            "name": name,
            "metadata": metadata
        }
    
    async def add_documents(
        self,
        knowledge_base_name: str,
        documents: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        ids: Optional[List[str]] = None,
        embedding_model: Optional[str] = None,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        chunk_method: str = "semantic"
    ) -> Dict[str, Any]:
        """Add documents to a knowledge base.
        
        Args:
            knowledge_base_name: Knowledge base name
            documents: List of document texts
            metadatas: Optional list of document metadata
            ids: Optional list of document IDs
            embedding_model: Optional embedding model name
            chunk_size: Chunk size for document processing
            chunk_overlap: Chunk overlap for document processing
            chunk_method: Chunking method (token, semantic, etc.)
            
        Returns:
            Document addition status
        """
        logger.debug(f"DEBUG: Adding documents to knowledge base '{knowledge_base_name}'")
        logger.debug(f"DEBUG: Document count: {len(documents)}")
        logger.debug(f"DEBUG: First document sample: {documents[0][:100]}...")
        logger.debug(f"DEBUG: Metadatas: {metadatas[:2] if metadatas else None}")
        logger.debug(f"DEBUG: Chunk settings - size: {chunk_size}, overlap: {chunk_overlap}, method: {chunk_method}")
        
        # Check if knowledge base exists
        kb_info = await self.get_knowledge_base(knowledge_base_name)
        
        if kb_info["status"] != "found":
            logger.warning(
                f"Knowledge base '{knowledge_base_name}' not found", 
                extra={"emoji_key": "warning"}
            )
            return {"status": "not_found", "name": knowledge_base_name}
            
        try:
            # Add documents to vector store
            doc_ids = await self.vector_service.add_texts(
                collection_name=knowledge_base_name,
                texts=documents,
                metadatas=metadatas,
                ids=ids,
                embedding_model=embedding_model
            )
            
            # Update document count in metadata
            current_metadata = await self.vector_service.get_collection_metadata(knowledge_base_name)
            doc_count = current_metadata.get("doc_count", 0) + len(documents)
            
            # Prepare metadata updates
            metadata_updates = {"doc_count": doc_count}
            
            # Store embedding model in metadata if provided (for consistent retrieval)
            if embedding_model:
                metadata_updates["embedding_model"] = embedding_model
            
            # Update metadata
            await self.vector_service.update_collection_metadata(
                name=knowledge_base_name,
                metadata=metadata_updates
            )
            
            logger.info(
                f"Added {len(documents)} documents to knowledge base '{knowledge_base_name}'", 
                extra={"emoji_key": "success"}
            )
            
            return {
                "status": "success",
                "name": knowledge_base_name,
                "added_count": len(documents),
                "ids": doc_ids
            }
        except Exception as e:
            logger.error(
                f"Error adding documents to knowledge base '{knowledge_base_name}': {str(e)}", 
                extra={"emoji_key": "error"}
            )
            raise 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/cli/helpers.py:
--------------------------------------------------------------------------------

```python
"""Helper functions for the Ultimate MCP Server CLI."""
import json
import sys
from typing import Any, Dict, List, Optional, Union

from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.syntax import Syntax
from rich.table import Table

from ultimate_mcp_server.config import get_env
from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS, Provider
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)
console = Console(file=sys.stderr)


def print_cost_table() -> None:
    """Display pricing information for all supported LLM models.
    
    This function creates and prints a formatted table showing the cost per million tokens
    for various LLM models across all supported providers (OpenAI, Anthropic, DeepSeek, etc.).
    The table separates input token costs from output token costs, as these are typically
    billed at different rates.
    
    Models are grouped by provider and sorted alphabetically for easy reference.
    This information is useful for cost planning, provider comparison, and
    understanding the financial implications of different model choices.
    """
    # Create table
    table = Table(title="Model Cost Per Million Tokens")
    table.add_column("Provider", style="cyan")
    table.add_column("Model", style="blue")
    table.add_column("Input ($/M)", style="green")
    table.add_column("Output ($/M)", style="yellow")
    
    # Group models by provider
    models_by_provider = {}
    for model, costs in COST_PER_MILLION_TOKENS.items():
        # Determine provider
        provider = None
        if "gpt" in model:
            provider = Provider.OPENAI.value
        elif "claude" in model:
            provider = Provider.ANTHROPIC.value
        elif "deepseek" in model:
            provider = Provider.DEEPSEEK.value
        elif "gemini" in model:
            provider = Provider.GEMINI.value
        else:
            provider = "other"
        
        if provider not in models_by_provider:
            models_by_provider[provider] = []
        
        models_by_provider[provider].append((model, costs))
    
    # Add rows for each provider's models
    for provider in sorted(models_by_provider.keys()):
        models = sorted(models_by_provider[provider], key=lambda x: x[0])
        
        for model, costs in models:
            table.add_row(
                provider,
                model,
                f"${costs['input']:.3f}",
                f"${costs['output']:.3f}"
            )
    
    # Print table
    console.print(table)


def format_tokens(tokens: int) -> str:
    """Format token count with thousands separator for better readability.
    
    Converts raw token counts (e.g., 1234567) into a more human-readable format
    with commas as thousand separators (e.g., "1,234,567"). This improves
    the readability of token usage statistics in CLI outputs and reports.
    
    Args:
        tokens: Raw token count as an integer
        
    Returns:
        Formatted string with thousand separators (e.g., "1,234,567")
    """
    return f"{tokens:,}"


def format_duration(seconds: float) -> str:
    """Format time duration in a human-friendly, adaptive format.
    
    Converts raw seconds into a more readable format, automatically selecting
    the appropriate unit based on the magnitude:
    - Milliseconds for durations under 0.1 seconds
    - Seconds with decimal precision for durations under 60 seconds
    - Minutes and seconds for longer durations
    
    This provides intuitive time displays in benchmarks and performance reports.
    
    Args:
        seconds: Duration in seconds (can be fractional)
        
    Returns:
        Formatted string like "50ms", "2.45s", or "1m 30.5s" depending on duration
    """
    if seconds < 0.1:
        return f"{seconds * 1000:.0f}ms"
    elif seconds < 60:
        return f"{seconds:.2f}s"
    else:
        minutes = int(seconds // 60)
        remaining_seconds = seconds % 60
        return f"{minutes}m {remaining_seconds:.1f}s"


def save_output_to_file(text: str, file_path: str, mode: str = "w") -> bool:
    """Write text content to a file with error handling and user feedback.
    
    This utility function safely writes text to a file, handling encoding
    and providing user feedback on success or failure. It's commonly used
    to save LLM outputs, generated code, or other text data for later use.
    
    Args:
        text: The string content to write to the file
        file_path: Target file path (absolute or relative to current directory)
        mode: File open mode - "w" for overwrite or "a" for append to existing content
        
    Returns:
        Boolean indicating success (True) or failure (False)
    """
    try:
        with open(file_path, mode, encoding="utf-8") as f:
            f.write(text)
        
        console.print(f"[green]Output saved to {file_path}[/green]")
        return True
    except Exception as e:
        console.print(f"[red]Error saving output: {str(e)}[/red]")
        return False


def load_file_content(file_path: str) -> Optional[str]:
    """Read and return the entire contents of a text file.
    
    This utility function safely reads text from a file with proper UTF-8 encoding,
    handling any errors that may occur during the process. It's useful for loading
    prompts, templates, or other text files needed for LLM operations.
    
    Args:
        file_path: Path to the file to read (absolute or relative to current directory)
        
    Returns:
        The file's contents as a string if successful, or None if an error occurred
    """
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            return f.read()
    except Exception as e:
        console.print(f"[red]Error loading file: {str(e)}[/red]")
        return None


def print_markdown(markdown_text: str) -> None:
    """Display Markdown content with proper formatting and styling.
    
    Renders Markdown text with appropriate styling (headings, bold, italic, 
    lists, code blocks, etc.) in the terminal using Rich's Markdown renderer.
    This provides a more readable and visually appealing output for 
    documentation, examples, or LLM responses that use Markdown formatting.
    
    Args:
        markdown_text: Raw Markdown-formatted text to render
    """
    md = Markdown(markdown_text)
    console.print(md)


def print_json(json_data: Union[Dict, List]) -> None:
    """Display JSON data with syntax highlighting and proper formatting.
    
    Converts a Python dictionary or list into a properly indented JSON string
    and displays it with syntax highlighting for improved readability.
    This is useful for displaying API responses, configuration data,
    or other structured data in a human-friendly format.
    
    Args:
        json_data: Python dictionary or list to be displayed as formatted JSON
    """
    json_str = json.dumps(json_data, indent=2)
    syntax = Syntax(json_str, "json", theme="monokai", word_wrap=True)
    console.print(syntax)


def print_code(code: str, language: str = "python") -> None:
    """Display source code with syntax highlighting and line numbers.
    
    Renders code with proper syntax highlighting based on the specified language,
    along with line numbers for easier reference. This improves readability
    when displaying code examples, LLM-generated code, or code snippets
    from files.
    
    Args:
        code: Source code text to display
        language: Programming language for syntax highlighting (e.g., "python",
                 "javascript", "rust", "sql", etc.)
    """
    syntax = Syntax(code, language, theme="monokai", line_numbers=True)
    console.print(syntax)


def print_model_comparison(
    provider: str,
    models: List[str],
    metrics: List[Dict[str, Any]]
) -> None:
    """Display a side-by-side comparison of multiple models from the same provider.
    
    Creates a formatted table comparing performance metrics for different models
    from the same LLM provider. This is useful for identifying the optimal model
    for specific use cases based on response time, throughput, and cost metrics.
    
    The comparison includes:
    - Response time (formatted appropriately for the magnitude)
    - Processing speed (tokens per second)
    - Cost per request
    - Total token usage
    
    Args:
        provider: Name of the LLM provider (e.g., "openai", "anthropic")
        models: List of model identifiers to compare
        metrics: List of dictionaries containing performance metrics for each model,
                with keys like "time", "tokens_per_second", "cost", "total_tokens"
    """
    # Create table
    table = Table(title=f"{provider.capitalize()} Model Comparison")
    table.add_column("Model", style="cyan")
    table.add_column("Response Time", style="green")
    table.add_column("Tokens/Sec", style="yellow")
    table.add_column("Cost", style="magenta")
    table.add_column("Total Tokens", style="dim")
    
    # Add rows for each model
    for model, metric in zip(models, metrics, strict=False):
        table.add_row(
            model,
            format_duration(metric.get("time", 0)),
            f"{metric.get('tokens_per_second', 0):.1f}",
            f"${metric.get('cost', 0):.6f}",
            format_tokens(metric.get("total_tokens", 0))
        )
    
    # Print table
    console.print(table)


def print_environment_info() -> None:
    """Display current environment configuration for diagnostics.
    
    Creates a formatted table showing important environment variables and their
    current values, with a focus on API keys, logging configuration, and cache settings.
    This is useful for troubleshooting and verifying that the environment is
    configured correctly before running the server or other commands.
    
    The output includes:
    - Status of API keys for each supported provider (set or not set)
    - Logging level configuration
    - Cache settings
    - Other relevant environment variables
    """
    # Create table
    table = Table(title="Environment Information")
    table.add_column("Setting", style="cyan")
    table.add_column("Value", style="green")
    
    # Add API key info
    for provider in [p.value for p in Provider]:
        env_var = f"{provider.upper()}_API_KEY"
        has_key = bool(get_env(env_var))
        table.add_row(env_var, "✅ Set" if has_key else "❌ Not set")
    
    # Add other environment variables
    for var in ["LOG_LEVEL", "CACHE_ENABLED", "CACHE_DIR"]:
        value = get_env(var, "Not set")
        table.add_row(var, value)
    
    # Print table
    console.print(table)


def print_examples() -> None:
    """Display common usage examples for the CLI commands.
    
    Shows a set of syntax-highlighted example commands demonstrating how to use
    the most common features of the Ultimate MCP Server CLI. This helps users
    quickly learn the command patterns and options available without having to
    consult the full documentation.
    
    Examples cover:
    - Starting the server
    - Listing and testing providers
    - Generating completions (with and without streaming)
    - Running benchmarks
    - Managing the cache
    """
    examples = """
# Run the server
ultimate-mcp-server run --host 0.0.0.0 --port 8013

# List available providers
ultimate-mcp-server providers --check

# Test a provider
ultimate-mcp-server test openai --model gpt-4.1-mini --prompt "Hello, world!"

# Generate a completion
ultimate-mcp-server complete --provider anthropic --model claude-3-5-haiku-20241022 --prompt "Explain quantum computing"

# Stream a completion
ultimate-mcp-server complete --provider openai --stream --prompt "Write a poem about AI"

# Run benchmarks
ultimate-mcp-server benchmark --providers openai anthropic --runs 3

# Check cache status
ultimate-mcp-server cache --status

# Clear cache
ultimate-mcp-server cache --clear
"""
    
    syntax = Syntax(examples, "bash", theme="monokai", word_wrap=True)
    console.print(Panel(syntax, title="CLI Examples", border_style="cyan"))


def confirm_action(message: str, default: bool = False) -> bool:
    """Prompt the user for confirmation before performing a potentially destructive action.
    
    Displays a yes/no prompt with the specified message and waits for user input.
    This is used to confirm potentially destructive operations like clearing the cache
    or deleting files to prevent accidental data loss.
    
    Args:
        message: The question or confirmation message to display to the user
        default: The default response if the user just presses Enter without typing
                 anything (True for yes, False for no)
        
    Returns:
        Boolean indicating whether the user confirmed (True) or canceled (False) the action
    """
    default_str = "Y/n" if default else "y/N"
    response = input(f"{message} [{default_str}]: ")
    
    if not response:
        return default
    
    return response.lower() in ["y", "yes"]
```

--------------------------------------------------------------------------------
/examples/measure_model_speeds.py:
--------------------------------------------------------------------------------

```python
import argparse
import asyncio
import json
import os
import sys
import time
from typing import Any, Dict, List

from rich.console import Console
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from rich.table import Table

# --- Add project root to sys.path ---
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
sys.path.insert(0, project_root)
# -------------------------------------

from ultimate_mcp_server.constants import (  # noqa: E402
    COST_PER_MILLION_TOKENS,
    Provider,
)
from ultimate_mcp_server.exceptions import (  # noqa: E402
    ProviderError,
    ToolError,
)
from ultimate_mcp_server.tools.completion import generate_completion  # noqa: E402
from ultimate_mcp_server.utils import get_logger  # noqa: E402
from ultimate_mcp_server.utils.display import CostTracker  # noqa: E402

# Use Rich Console for better output
console = Console()
logger = get_logger("measure_model_speeds")

# --- Configuration ---
DEFAULT_PROMPT = (
    "Explain the concept of Transfer Learning in Machine Learning in about 300 words. "
    "Detail its primary benefits, common use cases across different domains (like NLP and Computer Vision), "
    "and mention potential challenges or limitations when applying it."
)
DEFAULT_OUTPUT_FILENAME = "empirically_measured_model_speeds.json"
# Exclude models known not to work well with simple completion or require specific setup
EXCLUDED_MODELS_BY_DEFAULT = [
    "mistralai/mistral-nemo", # Often requires specific setup/endpoint
    # Add others if they consistently cause issues in this simple test
]
DEFAULT_MODELS_TO_TEST = [
    m for m in COST_PER_MILLION_TOKENS.keys() if m not in EXCLUDED_MODELS_BY_DEFAULT
]

# Re-introduce the provider extraction logic
def extract_provider_model(model_identifier: str) -> tuple[str | None, str]:
    """Extracts provider and model name, always returning the model name without the prefix."""
    model_identifier = model_identifier.strip()
    provider: str | None = None
    model_name_only: str = model_identifier # Start with the original identifier

    # 1. Check for explicit provider prefix (using /)
    known_providers = [p.value for p in Provider] # Get list of known providers
    if '/' in model_identifier:
        parts = model_identifier.split('/', 1)
        # Patch: If the model is an OpenRouter model like 'mistralai/mistral-nemo', treat as openrouter
        if model_identifier.startswith('mistralai/') or model_identifier == 'mistralai/mistral-nemo':
            provider = Provider.OPENROUTER.value
            model_name_only = model_identifier
        elif len(parts) == 2 and parts[0] in known_providers and parts[1]:
            provider = parts[0]
            model_name_only = parts[1]
            # Handle potential nested OpenRouter names like openrouter/mistralai/mistral-7b
            # The current split('/', 1) already achieves this.
        else:
            # It has a slash, but doesn't match known provider format
            logger.warning(f"Invalid or unknown provider prefix in '{model_identifier}'. Cannot extract provider reliably.")
            return None, model_identifier # Return original identifier if prefix is invalid

    # 2. Infer provider from model name pattern if no prefix was found
    if provider is None:
        if model_identifier.startswith('claude-'):
            provider = Provider.ANTHROPIC.value
        elif model_identifier.startswith('gemini-'):
            provider = Provider.GEMINI.value
        elif model_identifier.startswith('deepseek-'):
            provider = Provider.DEEPSEEK.value
        elif model_identifier.startswith('grok-'): # Added Grok
            provider = Provider.GROK.value
        # Add other inferences if necessary

        # Assume OpenAI if it looks like an OpenAI model (common short names or gpt- prefix)
        openai_short_names = [
            'gpt-4o', 'gpt-4o-mini', 'gpt-4.1', 'gpt-4.1-mini', 'gpt-4.1-nano',
            'o1-preview', 'o3-mini', 'gpt-3.5-turbo'
        ]
        if provider is None and (model_identifier in openai_short_names or model_identifier.startswith('gpt-')):
            provider = Provider.OPENAI.value
        
        # If provider was inferred, model_name_only is already correct (the original identifier)

    # 3. Return provider and model_name_only (which has prefix removed if found)
    if provider:
        # Log the extracted provider and model name for clarity during debugging
        logger.debug(f"Extracted Provider: {provider}, Model Name: {model_name_only} from Input: {model_identifier}")
        return provider, model_name_only
    else:
        # If provider couldn't be determined even after inference
        logger.error(f"Could not determine provider for '{model_identifier}'. Skipping measurement.")
        return None, model_identifier # Return original identifier as model_name if provider is unknown

async def measure_speed(model_identifier: str, prompt: str, tracker: CostTracker) -> Dict[str, Any]:
    """Measures the completion speed for a single model by calling the tool directly."""
    result_data: Dict[str, Any] = {}
    
    # Extract provider and model name using the helper
    provider, model_name = extract_provider_model(model_identifier)

    if provider is None:
        # Skip if provider could not be determined
        return {"error": f"Could not determine provider for '{model_identifier}'", "error_code": "INVALID_PARAMETER"}

    # logger.info(f"Testing model {provider}/{model_name}...", emoji_key="timer") # Progress bar shows this

    try:
        start_time = time.monotonic()
        # Call generate_completion with explicit provider and model name
        result = await generate_completion(
            provider=provider,       # Pass the determined provider
            model=model_name,        # Pass the model name (without prefix)
            prompt=prompt,
            # Optional: max_tokens=500
        )
        end_time = time.monotonic()

        if result and isinstance(result, dict) and result.get("success"):
            # Track cost for successful calls
            tracker.add_call(result)

            processing_time = result.get("processing_time")
            if processing_time is None:
                processing_time = end_time - start_time

            output_tokens = result.get("tokens", {}).get("output", 0)

            if processing_time > 0 and output_tokens > 0:
                tokens_per_second = output_tokens / processing_time
                result_data = {
                    "total_time_s": round(processing_time, 3),
                    "output_tokens": output_tokens,
                    "output_tokens_per_second": round(tokens_per_second, 2),
                }
            elif output_tokens == 0:
                logger.warning(f"Warning: {model_identifier} - Completed but generated 0 output tokens.", emoji_key="warning")
                result_data = {"error": "Completed with 0 output tokens", "total_time_s": round(processing_time, 3)}
            else:
                logger.warning(f"Warning: {model_identifier} - Processing time reported as {processing_time:.4f}s. Cannot calculate tokens/s reliably.", emoji_key="warning")
                result_data = {"error": "Processing time too low to calculate speed", "total_time_s": round(processing_time, 3)}
        else:
            manual_time = end_time - start_time
            error_message = result.get("error", "Unknown error or unexpected result format")
            error_code = result.get("error_code", "UNKNOWN_ERROR")
            logger.error(f"Error: {model_identifier} - Tool call failed. Manual Time: {manual_time:.2f}s. Error: {error_message} ({error_code})", emoji_key="error")
            result_data = {"error": error_message, "error_code": error_code, "manual_time_s": round(manual_time, 3)}

    except ProviderError as e:
        logger.error(f"Error: {model_identifier} ({provider}) - Provider Error: {e}", emoji_key="error", exc_info=False)
        result_data = {"error": str(e), "error_code": getattr(e, 'error_code', 'PROVIDER_ERROR')}
    except ToolError as e:
        logger.error(f"Error: {model_identifier} ({provider}) - Tool Error: {e}", emoji_key="error", exc_info=False)
        result_data = {"error": str(e), "error_code": getattr(e, 'error_code', 'TOOL_ERROR')}
    except Exception as e:
        logger.error(f"Error: {model_identifier} ({provider}) - Unexpected error: {e}", emoji_key="error", exc_info=True)
        result_data = {"error": f"Unexpected error: {str(e)}"}

    return result_data

async def main(models_to_test: List[str], output_file: str, prompt: str):
    """Main function to run speed tests and save results."""
    logger.info("Starting LLM speed measurement script...", emoji_key="rocket")
    tracker = CostTracker() # Instantiate tracker
    results: Dict[str, Dict[str, Any]] = {}

    # Use Rich Progress bar
    with Progress(
        SpinnerColumn(),
        "[progress.description]{task.description}",
        BarColumn(),
        "[progress.percentage]{task.percentage:>3.0f}%",
        TimeElapsedColumn(),
        TextColumn("[bold green]{task.completed} done"),
        console=console,
        transient=False, # Keep progress bar after completion
    ) as progress:
        task = progress.add_task("[cyan]Measuring speeds...", total=len(models_to_test))

        for model_id in models_to_test:
            progress.update(task, description=f"[cyan]Measuring speeds... [bold yellow]({model_id})[/]")
            if not model_id or not isinstance(model_id, str):
                logger.warning(f"Skipping invalid model entry: {model_id}")
                progress.update(task, advance=1)
                continue

            results[model_id] = await measure_speed(model_id, prompt, tracker)
            progress.update(task, advance=1)
            # await asyncio.sleep(0.1) # Reduce sleep time if desired

    # --- Display Results Table ---
    table = Table(title="LLM Speed Measurement Results", show_header=True, header_style="bold magenta")
    table.add_column("Model", style="dim cyan", width=40)
    table.add_column("Time (s)", justify="right", style="green")
    table.add_column("Output Tokens", justify="right", style="blue")
    table.add_column("Tokens/s", justify="right", style="bold yellow")
    table.add_column("Status/Error", style="red")

    for model_id, data in sorted(results.items()):
        if "error" in data:
            status = f"Error: {data['error']}"
            if 'error_code' in data:
                status += f" ({data['error_code']})"
            time_s = data.get("total_time_s") or data.get("manual_time_s")
            time_str = f"{time_s:.2f}" if time_s is not None else "-"
            table.add_row(model_id, time_str, "-", "-", status)
        else:
            table.add_row(
                model_id,
                f"{data.get('total_time_s', 0):.2f}",
                str(data.get('output_tokens', '-')),
                f"{data.get('output_tokens_per_second', 0):.2f}",
                "Success"
            )
    console.print(table)

    # Display cost summary
    tracker.display_summary(console)

    # --- Save Results --- (Saving logic remains the same)
    script_dir = os.path.dirname(os.path.abspath(__file__))
    project_root = os.path.dirname(script_dir)
    output_path = os.path.join(project_root, output_file)

    logger.info(f"Saving results to: {output_path}", emoji_key="save")
    try:
        with open(output_path, 'w') as f:
            json.dump(results, f, indent=4)
        logger.info("Results saved successfully.", emoji_key="success")
    except IOError as e:
        logger.error(f"Failed to write results to {output_path}: {e}", emoji_key="error", exc_info=True)
        console.print(f"[bold red]Error:[/bold red] Could not write results to {output_path}. Check permissions. Details: {e}")

    logger.info("Speed measurement script finished.", emoji_key="checkered_flag")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Measure LLM completion speeds.")
    parser.add_argument(
        "--models",
        nargs='+',
        default=DEFAULT_MODELS_TO_TEST,
        help="Space-separated list of models to test (e.g., openai/gpt-4o-mini anthropic/claude-3-5-haiku-20241022). Defaults to available models."
    )
    parser.add_argument(
        "--output",
        default=DEFAULT_OUTPUT_FILENAME,
        help=f"Output JSON filename. Defaults to {DEFAULT_OUTPUT_FILENAME} in the project root."
    )
    parser.add_argument(
        "--prompt",
        default=DEFAULT_PROMPT,
        help="The prompt to use for testing."
    )

    args = parser.parse_args()

    if not args.models or not all(isinstance(m, str) and m for m in args.models):
        console.print("[bold red]Error:[/bold red] Invalid --models argument. Please provide a list of non-empty model names.")
        exit(1)

    models_unique = sorted(list(set(args.models)))
    # Use Rich print for startup info
    console.print("[bold blue]--- LLM Speed Measurement ---[/bold blue]")
    console.print(f"Models to test ({len(models_unique)}): [cyan]{', '.join(models_unique)}[/cyan]")
    console.print(f"Output file: [green]{args.output}[/green]")
    console.print(f"Prompt length: {len(args.prompt)} characters")
    console.print("[bold blue]-----------------------------[/bold blue]")

    asyncio.run(main(models_unique, args.output, args.prompt)) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/deepseek.py:
--------------------------------------------------------------------------------

```python
"""DeepSeek provider implementation."""
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple

from openai import AsyncOpenAI

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.utils import get_logger

# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.deepseek")


class DeepSeekProvider(BaseProvider):
    """Provider implementation for DeepSeek API (using OpenAI-compatible interface)."""
    
    provider_name = Provider.DEEPSEEK.value
    
    def __init__(self, api_key: Optional[str] = None, **kwargs):
        """Initialize the DeepSeek provider.
        
        Args:
            api_key: DeepSeek API key
            **kwargs: Additional options
        """
        super().__init__(api_key=api_key, **kwargs)
        self.base_url = kwargs.get("base_url", "https://api.deepseek.com")
        self.models_cache = None
        
    async def initialize(self) -> bool:
        """Initialize the DeepSeek client.
        
        Returns:
            bool: True if initialization was successful
        """
        try:
            # DeepSeek uses OpenAI-compatible API
            self.client = AsyncOpenAI(
                api_key=self.api_key, 
                base_url=self.base_url,
            )
            
            self.logger.success(
                "DeepSeek provider initialized successfully", 
                emoji_key="provider"
            )
            return True
            
        except Exception as e:
            self.logger.error(
                f"Failed to initialize DeepSeek provider: {str(e)}", 
                emoji_key="error"
            )
            return False
        
    async def generate_completion(
        self,
        prompt: Optional[str] = None,
        messages: Optional[List[Dict[str, Any]]] = None,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        json_mode: bool = False,
        **kwargs
    ) -> ModelResponse:
        """Generate a completion using DeepSeek's API.
        
        Args:
            prompt: Text prompt to send to the model (optional if messages provided)
            messages: List of message dictionaries (optional if prompt provided)
            model: Model name to use
            max_tokens: Maximum tokens to generate
            temperature: Temperature parameter (0.0-1.0)
            json_mode: If True, attempt to generate JSON output
            **kwargs: Additional parameters
            
        Returns:
            ModelResponse with the completion result
        """
        if not self.client:
            await self.initialize()
            
        # Verify we have either prompt or messages
        if prompt is None and not messages:
            raise ValueError("Either prompt or messages must be provided")
            
        # Use default model if not specified
        model = model or self.get_default_model()
        
        # Prepare API parameters
        if messages:
            # Using chat completion with messages
            params = {
                "model": model,
                "messages": messages,
                "temperature": temperature
            }
        else:
            # Using completion with prompt
            # Convert prompt to messages format for DeepSeek
            params = {
                "model": model,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": temperature
            }
            
        # Add max_tokens if provided
        if max_tokens is not None:
            params["max_tokens"] = max_tokens
            
        # Handle JSON mode via response_format for compatible models
        if json_mode:
            params["response_format"] = {"type": "json_object"}
            self.logger.debug("Setting response_format to JSON mode for DeepSeek")
            
        # Add any remaining parameters
        for key, value in kwargs.items():
            if key not in params:
                params[key] = value
                
        # Log request parameters
        prompt_length = len(prompt) if prompt else sum(len(m.get("content", "")) for m in messages)
        self.logger.info(
            f"Generating completion with DeepSeek model {model}",
            emoji_key=self.provider_name,
            prompt_length=prompt_length,
            json_mode=json_mode
        )
        
        try:
            # Start timer
            start_time = time.time()
            
            # Make API call
            response = await self.client.chat.completions.create(**params)
            
            # Calculate processing time
            processing_time = time.time() - start_time
            
            # Extract text from response
            completion_text = response.choices[0].message.content
            
            # Create ModelResponse
            result = ModelResponse(
                text=completion_text,
                model=f"{self.provider_name}/{model}",
                provider=self.provider_name,
                input_tokens=response.usage.prompt_tokens,
                output_tokens=response.usage.completion_tokens,
                total_tokens=response.usage.total_tokens,
                processing_time=processing_time,
                raw_response=response
            )
            
            # Add message for compatibility with chat_completion
            result.message = {"role": "assistant", "content": completion_text}
            
            # Log success
            self.logger.success(
                "DeepSeek completion successful",
                emoji_key="success",
                model=model,
                tokens={"input": result.input_tokens, "output": result.output_tokens},
                cost=result.cost,
                time=processing_time
            )
            
            return result
            
        except Exception as e:
            # Log error
            self.logger.error(
                f"DeepSeek completion failed: {str(e)}",
                emoji_key="error",
                model=model
            )
            raise
            
    async def generate_completion_stream(
        self,
        prompt: Optional[str] = None,
        messages: Optional[List[Dict[str, Any]]] = None,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        json_mode: bool = False,
        **kwargs
    ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
        """Generate a streaming completion using DeepSeek.
        
        Args:
            prompt: Text prompt to send to the model (optional if messages provided)
            messages: List of message dictionaries (optional if prompt provided)
            model: Model name to use
            max_tokens: Maximum tokens to generate
            temperature: Temperature parameter (0.0-1.0)
            json_mode: If True, attempt to generate JSON output
            **kwargs: Additional parameters
            
        Yields:
            Tuples of (text_chunk, metadata)
        """
        if not self.client:
            await self.initialize()
            
        # Verify we have either prompt or messages
        if prompt is None and not messages:
            raise ValueError("Either prompt or messages must be provided")
            
        # Use default model if not specified
        model = model or self.get_default_model()
        
        # Prepare API parameters
        if messages:
            # Using chat completion with messages
            params = {
                "model": model,
                "messages": messages,
                "temperature": temperature,
                "stream": True
            }
        else:
            # Using completion with prompt
            # Convert prompt to messages format for DeepSeek
            params = {
                "model": model,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": temperature,
                "stream": True
            }
            
        # Add max_tokens if provided
        if max_tokens is not None:
            params["max_tokens"] = max_tokens
            
        # Handle JSON mode via response_format for compatible models
        if json_mode:
            params["response_format"] = {"type": "json_object"}
            self.logger.debug("Setting response_format to JSON mode for DeepSeek streaming")
            
        # Add any remaining parameters
        for key, value in kwargs.items():
            if key not in params and key != "stream":  # Don't allow overriding stream
                params[key] = value
                
        # Log request parameters
        prompt_length = len(prompt) if prompt else sum(len(m.get("content", "")) for m in messages)
        self.logger.info(
            f"Generating streaming completion with DeepSeek model {model}",
            emoji_key=self.provider_name,
            prompt_length=prompt_length,
            json_mode=json_mode
        )
        
        start_time = time.time()
        total_chunks = 0
        
        try:
            # Make streaming API call
            stream = await self.client.chat.completions.create(**params)
            
            # Process the stream
            async for chunk in stream:
                total_chunks += 1
                
                # Extract content from the chunk
                delta = chunk.choices[0].delta
                content = delta.content or ""
                
                # Metadata for this chunk
                metadata = {
                    "model": f"{self.provider_name}/{model}",
                    "provider": self.provider_name,
                    "chunk_index": total_chunks,
                    "finish_reason": chunk.choices[0].finish_reason,
                }
                
                yield content, metadata
                
            # Log success
            processing_time = time.time() - start_time
            self.logger.success(
                "DeepSeek streaming completion successful",
                emoji_key="success",
                model=model,
                chunks=total_chunks,
                time=processing_time
            )
            
            # Yield final metadata chunk
            final_metadata = {
                "model": f"{self.provider_name}/{model}",
                "provider": self.provider_name,
                "chunk_index": total_chunks + 1,
                "processing_time": processing_time,
                "finish_reason": "stop"
            }
            yield "", final_metadata
            
        except Exception as e:
            processing_time = time.time() - start_time
            self.logger.error(
                f"DeepSeek streaming completion failed: {str(e)}",
                emoji_key="error",
                model=model
            )
            
            # Yield error metadata
            error_metadata = {
                "model": f"{self.provider_name}/{model}",
                "provider": self.provider_name,
                "chunk_index": total_chunks + 1,
                "error": f"{type(e).__name__}: {str(e)}",
                "processing_time": processing_time,
                "finish_reason": "error"
            }
            yield "", error_metadata
            
    async def list_models(self) -> List[Dict[str, Any]]:
        """List available DeepSeek models.
        
        Returns:
            List of model information dictionaries
        """
        # DeepSeek doesn't have a comprehensive models endpoint, so we return a static list
        if self.models_cache:
            return self.models_cache
            
        models = [
            {
                "id": "deepseek-chat",
                "provider": self.provider_name,
                "description": "General-purpose chat model",
            },
            {
                "id": "deepseek-reasoner",
                "provider": self.provider_name,
                "description": "Enhanced reasoning capabilities",
            },
        ]
        
        # Cache results
        self.models_cache = models
        
        return models
            
    def get_default_model(self) -> str:
        """Get the default DeepSeek model.
        
        Returns:
            Default model name
        """
        from ultimate_mcp_server.config import get_config
        
        # Safely get from config if available
        try:
            config = get_config()
            provider_config = getattr(config, 'providers', {}).get(self.provider_name, None)
            if provider_config and provider_config.default_model:
                return provider_config.default_model
        except (AttributeError, TypeError):
            # Handle case when providers attribute doesn't exist or isn't a dict
            pass
            
        # Otherwise return hard-coded default
        return "deepseek-chat"
        
    async def check_api_key(self) -> bool:
        """Check if the DeepSeek API key is valid.
        
        Returns:
            bool: True if API key is valid
        """
        try:
            # Try a simple completion to validate the API key
            await self.client.chat.completions.create(
                model=self.get_default_model(),
                messages=[{"role": "user", "content": "Hello"}],
                max_tokens=1,
            )
            return True
        except Exception:
            return False
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/state_store.py:
--------------------------------------------------------------------------------

```python
import asyncio
import os
import pickle
from typing import Any, Dict, Optional

import aiofiles


class StateStore:
    """
    Thread-safe, async-compatible state management system with optional persistence.
    
    The StateStore provides a robust solution for managing application state in asynchronous
    environments. It organizes data into namespaces, each containing key-value pairs, and
    provides thread-safe access through asyncio.Lock-based concurrency control.
    
    Key features:
    - Namespace-based organization to separate different types of state data
    - Thread-safe async methods for all operations (get, set, delete)
    - Optional persistence to disk with automatic load/save
    - Granular locking per namespace to maximize concurrency
    - Graceful handling of corrupted or missing persistent data
    
    Usage example:
    ```python
    # Initialize with persistence
    store = StateStore(persistence_dir="./state")
    
    # Store values
    await store.set("user_preferences", "theme", "dark")
    await store.set("session_data", "user_id", 12345)
    
    # Retrieve values (with default if missing)
    theme = await store.get("user_preferences", "theme", default="light")
    user_id = await store.get("session_data", "user_id", default=None)
    
    # Delete values
    await store.delete("session_data", "temp_token")
    ```
    
    The StateStore is used internally by the Ultimate MCP Server to maintain state
    across multiple tools and components, and is exposed to tools via the 
    with_state_management decorator.
    """
    
    def __init__(self, persistence_dir: Optional[str] = None):
        """
        Initialize a new StateStore instance.
        
        The StateStore provides a thread-safe, async-compatible key-value store organized
        by namespaces. It supports both in-memory operation and optional persistence to disk.
        The store is designed for use in multi-threaded or async applications where state
        needs to be shared safely between components.
        
        Each namespace acts as a separate dictionary with its own concurrency protection.
        Operations within a namespace are serialized using asyncio.Lock, while operations
        across different namespaces can proceed concurrently.
        
        Args:
            persistence_dir: Optional directory path where state data will be persisted.
                            If provided, each namespace will be stored as a separate pickle
                            file in this directory. If None, the store operates in memory-only
                            mode and state is lost when the application stops.
                            
        Notes:
            - The directory will be created if it doesn't exist
            - Each namespace is persisted as a separate file named "{namespace}.pickle"
            - Data is serialized using Python's pickle module, so stored values should be
              pickle-compatible
            - No automatic cleanup of old or unused namespaces is performed
        """
        self._in_memory_store: Dict[str, Dict[str, Any]] = {}
        self._locks: Dict[str, asyncio.Lock] = {}
        self._persistence_dir = persistence_dir
        if persistence_dir and not os.path.exists(persistence_dir):
            os.makedirs(persistence_dir)
    
    def _get_lock(self, namespace: str) -> asyncio.Lock:
        """
        Get or create an asyncio.Lock for a specific namespace.
        
        This private method manages the locks used for concurrency control. It maintains
        a dictionary of locks keyed by namespace name, creating new locks as needed.
        This ensures that operations on the same namespace are properly serialized to
        prevent race conditions, while allowing operations on different namespaces to
        proceed concurrently.
        
        Args:
            namespace: Name of the namespace for which to get or create a lock
            
        Returns:
            An asyncio.Lock instance specific to the requested namespace
            
        Notes:
            - Each namespace gets its own independent lock
            - Locks are created on-demand when a namespace is first accessed
            - Locks persist for the lifetime of the StateStore instance
            - This method is called by all public methods (get, set, delete) to
              ensure thread-safe access to namespaces
        """
        if namespace not in self._locks:
            self._locks[namespace] = asyncio.Lock()
        return self._locks[namespace]
    
    async def get(self, namespace: str, key: str, default: Any = None) -> Any:
        """
        Retrieve a value from the state store with thread-safe access control.
        
        This method provides a concurrency-safe way to retrieve state data from the specified 
        namespace. If the namespace doesn't exist in memory, it attempts to load it from disk
        (if persistence is enabled) before returning the requested value or default.
        
        Retrieval behavior:
        - The method first acquires a lock for the specified namespace to ensure thread safety
        - If the namespace is not in memory, it attempts to load it from disk if persistence is enabled
        - If the namespace can't be loaded or doesn't exist, an empty namespace is created
        - Returns the value for the specified key, or the default value if the key is not found
        
        Args:
            namespace: Logical grouping for related state data (e.g., "tools", "user_settings")
            key: Unique identifier within the namespace for the data to retrieve
            default: Value to return if the key is not found in the namespace
            
        Returns:
            The stored value if found, otherwise the default value
            
        Notes:
            - Acquiring the namespace lock is an async operation and may block if another
              operation is currently accessing the same namespace
            - If persistence is enabled, this method may perform disk I/O when a namespace
              needs to be loaded from disk
        """
        async with self._get_lock(namespace):
            if namespace not in self._in_memory_store:
                # Try to load from disk if persistence is enabled
                if self._persistence_dir:
                    await self._load_namespace(namespace)
                else:
                    self._in_memory_store[namespace] = {}
            
            return self._in_memory_store[namespace].get(key, default)
    
    async def set(self, namespace: str, key: str, value: Any) -> None:
        """
        Store a value in the state store with thread-safe access control.
        
        This method provides a concurrency-safe way to store state data in the specified namespace.
        The implementation uses asyncio.Lock to ensure that concurrent access to the same namespace
        doesn't lead to race conditions or data corruption.
        
        Storage behavior:
        - Values are first stored in an in-memory dictionary
        - If persistence_dir is configured, values are also immediately persisted to disk
        - Each namespace is stored as a separate pickle file
        - Values can be any pickle-serializable Python object
        
        Args:
            namespace: Logical grouping for related state data (e.g., "tools", "user_settings")
            key: Unique identifier within the namespace for this piece of data
            value: Any pickle-serializable value to store
            
        Notes:
            - Acquiring the namespace lock is an async operation and may block if another
              operation is currently accessing the same namespace
            - If persistence is enabled, this method performs disk I/O which could take time
              depending on the value size and disk performance
        """
        async with self._get_lock(namespace):
            if namespace not in self._in_memory_store:
                self._in_memory_store[namespace] = {}
            
            self._in_memory_store[namespace][key] = value
            
            # Persist immediately if enabled
            if self._persistence_dir:
                await self._persist_namespace(namespace)
    
    async def delete(self, namespace: str, key: str) -> None:
        """
        Delete a value from the state store with thread-safe access control.
        
        This method safely removes a key-value pair from the specified namespace,
        and optionally persists the change to disk if persistence is enabled. The
        operation is concurrency-safe through the use of namespace-specific locks.
        
        Deletion behavior:
        - The method first acquires a lock for the specified namespace to ensure thread safety
        - If the namespace doesn't exist or the key is not found, the operation is a no-op
        - If persistence is enabled, the updated namespace state is written to disk
          immediately after deletion
        
        Args:
            namespace: Logical grouping for related state data (e.g., "tools", "user_settings")
            key: Unique identifier within the namespace for the data to delete
            
        Notes:
            - Acquiring the namespace lock is an async operation and may block if another
              operation is currently accessing the same namespace
            - If persistence is enabled, this method performs disk I/O when persisting
              the updated namespace after deletion
            - This method does not raise an exception if the key doesn't exist in the namespace
        """
        async with self._get_lock(namespace):
            if namespace in self._in_memory_store and key in self._in_memory_store[namespace]:
                del self._in_memory_store[namespace][key]
                
                # Persist the change if enabled
                if self._persistence_dir:
                    await self._persist_namespace(namespace)
    
    async def _persist_namespace(self, namespace: str) -> None:
        """
        Persist a namespace's data to disk as a pickle file.
        
        This private method handles the actual disk I/O for saving state data. It serializes
        the entire namespace dictionary to a pickle file named after the namespace in the
        configured persistence directory.
        
        Args:
            namespace: Name of the namespace whose data should be persisted
            
        Notes:
            - This method is a no-op if persistence_dir is not configured
            - Uses aiofiles for non-blocking async file I/O
            - The file is named "{namespace}.pickle" and stored in the persistence_dir
            - The entire namespace is serialized in a single operation, which may be
              inefficient for very large namespaces
            - This method is called internally by set() and delete() methods
              after modifying namespace data
        """
        if not self._persistence_dir:
            return
            
        file_path = os.path.join(self._persistence_dir, f"{namespace}.pickle")
        async with aiofiles.open(file_path, 'wb') as f:
            await f.write(pickle.dumps(self._in_memory_store[namespace]))
    
    async def _load_namespace(self, namespace: str) -> None:
        """
        Load a namespace's data from disk into memory.
        
        This private method handles loading serialized state data from disk into the in-memory store.
        It is called automatically by the get() method when a namespace is requested but not yet
        loaded in memory. The method implements the lazy-loading pattern, only reading from disk
        when necessary.
        
        The loading process follows these steps:
        1. Check if persistence is enabled; if not, initialize an empty namespace dictionary
        2. Locate the pickle file for the namespace (named "{namespace}.pickle")
        3. If the file doesn't exist, initialize an empty namespace dictionary
        4. If the file exists, read and deserialize it using pickle
        5. Handle potential serialization errors gracefully (corrupted files, version mismatches)
        
        Args:
            namespace: Name of the namespace whose data should be loaded. This corresponds
                      directly to a "{namespace}.pickle" file in the persistence directory.
            
        Returns:
            None: The method modifies the internal self._in_memory_store dictionary directly.
            
        Notes:
            - Uses aiofiles for non-blocking async file I/O
            - In case of corrupt data (pickle errors), the namespace is initialized as empty 
              rather than raising exceptions to the caller
            - Example of file path: /path/to/persistence_dir/user_settings.pickle for the
              "user_settings" namespace
            - This method is idempotent - calling it multiple times for the same namespace
              has no additional effect after the first call
            
        Examples:
            ```python
            # This method is called internally by get(), not typically called directly
            store = StateStore(persistence_dir="./state")
            
            # When this executes, _load_namespace("user_settings") will be called internally
            # if the namespace is not already in memory
            value = await store.get("user_settings", "theme")
            ```
        """
        if not self._persistence_dir:
            self._in_memory_store[namespace] = {}
            return
            
        file_path = os.path.join(self._persistence_dir, f"{namespace}.pickle")
        if not os.path.exists(file_path):
            self._in_memory_store[namespace] = {}
            return
            
        try:
            async with aiofiles.open(file_path, 'rb') as f:
                data = await f.read()
                self._in_memory_store[namespace] = pickle.loads(data)
        except (pickle.PickleError, EOFError):
            # Handle corrupt data
            self._in_memory_store[namespace] = {} 
```

--------------------------------------------------------------------------------
/resource_annotations.py:
--------------------------------------------------------------------------------

```python
"""
Resource annotations for Model Control Protocol (MCP) systems.

This module implements the resource annotation system specified in the MCP protocol,
which enables AI systems to make intelligent decisions about how to process, prioritize,
and present different types of resources in multi-modal and multi-resource contexts.

Resource annotations serve multiple critical functions in AI/LLM systems:

1. PRIORITIZATION: Help AI systems allocate attention optimally among multiple resources
   when token constraints prevent processing everything (e.g., which document to focus on)

2. VISIBILITY CONTROL: Determine which resources should be visible to different actors
   in the system (e.g., assistant-only resources vs. user-facing resources)

3. FORMAT PRESERVATION: Indicate when resources have structured formats that should be
   maintained (e.g., code, tables, JSON) rather than freely interpreted

4. CHUNKING GUIDANCE: Provide hints about how to divide large resources efficiently
   for processing within context window constraints

The module provides:
- The ResourceAnnotations class for creating annotation metadata
- Pre-defined annotation templates for common resource types
- Utilities for working with annotated resources (e.g., chunking)

Usage example:
    ```python
    # Create custom annotations for a research paper
    paper_annotations = ResourceAnnotations(
        priority=0.8,
        audience=["assistant"],
        chunking_recommended=True,
        description="Research paper on quantum computing effects"
    )
    
    # Annotate and chunk a large document
    paper_content = open("quantum_paper.txt").read()
    chunks = format_chunked_content(paper_content, chunk_size=3000)
    
    # Use a predefined annotation template for code
    code_resource = {
        "content": "def calculate_entropy(data):\\n    ...",
        "annotations": CODE_RESOURCE.to_dict()
    }
    ```

These annotations integrate with the MCP protocol to help LLMs process resources
more intelligently and efficiently in complex, multi-resource scenarios.
"""
from typing import List, Optional


class ResourceAnnotations:
    """
    Annotations that guide LLMs in handling and prioritizing resources within the MCP protocol.
    
    ResourceAnnotations provide crucial metadata that helps LLMs make intelligent decisions about:
    - IMPORTANCE: How critical a resource is to the current task (via priority)
    - AUDIENCE: Who should see or interact with the resource
    - FORMATTING: How the resource should be rendered or processed
    - CHUNKING: Whether and how to divide large resources into manageable pieces
    
    These annotations serve multiple purposes in the MCP ecosystem:
    1. Help LLMs prioritize which resources to analyze first when multiple are available
    2. Control visibility of resources between assistants and users
    3. Preserve structural integrity of formatted content (code, tables, etc.)
    4. Provide chunking guidance for efficient processing of large resources
    
    When resources are annotated appropriately, LLMs can make better decisions about:
    - Which resources deserve the most attention in token-constrained contexts
    - When to preserve formatting vs. when content structure is less important
    - How to efficiently process large documents while maintaining context
    - Whether certain resources are meant for the assistant's understanding only
    
    Usage example:
        ```python
        # For a source code file that should preserve formatting
        code_annotations = ResourceAnnotations(
            priority=0.8,              # High importance
            audience=["assistant"],    # Only the assistant needs to see this
            structured_format=True,    # Preserve code formatting
            chunking_recommended=True, # Chunk if large
            max_recommended_chunk_size=2000,
            description="Python source code implementing the core algorithm"
        )
        
        # Apply annotations to a resource
        resource = {
            "id": "algorithm.py",
            "content": "def calculate(x, y):\n    return x + y",
            "annotations": code_annotations.to_dict()
        }
        ```
    """
    
    def __init__(
        self,
        priority: float = 0.5,
        audience: List[str] = None,
        structured_format: bool = False,
        chunking_recommended: bool = False,
        max_recommended_chunk_size: Optional[int] = None,
        description: Optional[str] = None
    ):
        """
        Initialize resource annotations.
        
        Args:
            priority: How important this resource is (0.0-1.0, higher is more important).
                0.0 = entirely optional, 1.0 = effectively required.
                Affects how much attention an LLM should give this resource when multiple
                resources are available but context limits prevent using all of them.
                Default: 0.5 (medium importance)
                
            audience: Who should see this resource, as a list of roles:
                - "assistant": The AI assistant should process this resource
                - "user": The human user should see this resource
                Both can be specified for resources relevant to both parties.
                Default: ["assistant"] (assistant-only)
                
            structured_format: Whether this resource has a structured format that
                should be preserved (e.g., code, JSON, tables). When True, the LLM should
                maintain the exact formatting, indentation, and structure of the content.
                Default: False
                
            chunking_recommended: Whether this resource should be chunked if large.
                Setting this to True signals that the content is suitable for being
                divided into smaller pieces for processing (e.g., long documents).
                Default: False
                
            max_recommended_chunk_size: Maximum recommended chunk size in characters.
                Provides guidance on how large each chunk should be if chunking is applied.
                Default: None (no specific recommendation)
                
            description: Optional description of the resource that provides context
                about its purpose, content, or importance.
        """
        self.priority = max(0.0, min(1.0, priority))  # Clamp between 0 and 1
        self.audience = audience or ["assistant"]
        self.structured_format = structured_format
        self.chunking_recommended = chunking_recommended
        self.max_recommended_chunk_size = max_recommended_chunk_size
        self.description = description
        
    def to_dict(self) -> dict:
        """Convert annotations to dictionary for MCP protocol."""
        result = {
            "priority": self.priority,
            "audience": self.audience
        }
        
        # Add extended properties
        if self.description:
            result["description"] = self.description
        
        # Add chunking metadata if recommended
        if self.chunking_recommended:
            result["chunking"] = {
                "recommended": True
            }
            if self.max_recommended_chunk_size:
                result["chunking"]["maxSize"] = self.max_recommended_chunk_size
                
        # Add format information
        if self.structured_format:
            result["format"] = {
                "structured": True
            }
            
        return result


# Pre-defined annotation templates for common resource types

# For critical resources that need immediate attention
# Use for resources essential to the current task's success
# Examples: Primary task instructions, critical context documents
HIGH_PRIORITY_RESOURCE = ResourceAnnotations(
    priority=0.9,
    audience=["assistant", "user"],
    description="Critical resource that should be prioritized"
)

# For source code and programming-related content
# Preserves indentation, formatting, and structure
# Recommends chunking for large codebases
# Examples: Source files, configuration files, scripts
CODE_RESOURCE = ResourceAnnotations(
    priority=0.8,
    audience=["assistant"],
    structured_format=True,
    chunking_recommended=True,
    max_recommended_chunk_size=2000,
    description="Source code that should preserve formatting"
)

# For lengthy text resources that should be divided into smaller parts
# Good for processing long documents without overwhelming context windows
# Examples: Articles, documentation, books, long explanations
LARGE_TEXT_RESOURCE = ResourceAnnotations(
    priority=0.6,
    audience=["assistant"],
    chunking_recommended=True,
    max_recommended_chunk_size=4000,
    description="Large text that should be chunked for processing"
)

# For data formats where structure is important
# Preserves formatting but doesn't automatically suggest chunking
# Examples: JSON data, database records, tabular data, XML
STRUCTURED_DATA_RESOURCE = ResourceAnnotations(
    priority=0.7,
    audience=["assistant"],
    structured_format=True,
    description="Structured data like JSON or tables"
)

# For supplementary information that provides additional context
# Low priority indicates it can be skipped if context is limited
# Examples: Background information, history, tangential details
OPTIONAL_RESOURCE = ResourceAnnotations(
    priority=0.2,
    audience=["assistant"],
    description="Supplementary information that isn't critical"
)

# For content meant to be shown to the user directly
# Not intended for assistant's processing (assistant not in audience)
# Examples: Final results, generated content, presentations
USER_FACING_RESOURCE = ResourceAnnotations(
    priority=0.7,
    audience=["user"],
    description="Resource meant for user consumption"
)


def format_chunked_content(content: str, chunk_size: int = 4000, overlap: int = 200) -> List[dict]:
    """
    Format content into overlapping chunks with rich metadata for efficient LLM processing.
    
    This utility function implements a sliding window approach to divide large content
    into manageable, context-aware chunks. Each chunk is annotated with detailed positioning
    metadata, allowing LLMs to understand the chunk's relationship to the overall content
    and maintain coherence across chunk boundaries.
    
    Key features:
    - Consistent overlap between chunks preserves context and prevents information loss
    - Automatic metadata generation provides LLMs with crucial positioning information
    - Standard annotation format compatible with the MCP resource protocol
    - Configurable chunk size to adapt to different model context window limitations
    
    The overlap between chunks is particularly important as it helps LLMs maintain
    coherence when processing information that spans chunk boundaries. Without overlap,
    context might be lost at chunk transitions, leading to degraded performance on tasks
    that require understanding the full content.
    
    Args:
        content: The source text content to be chunked. This can be any string content
            like a document, article, code file, or other text-based resource.
        chunk_size: Maximum size of each chunk in characters (default: 4000).
            This should be set based on the target LLM's context window limitations,
            typically 25-50% less than the model's maximum to allow room for prompts.
        overlap: Number of characters to overlap between consecutive chunks (default: 200).
            Larger overlap values provide more context continuity between chunks but
            increase redundancy and total token usage.
        
    Returns:
        List of dictionaries, each representing a content chunk with metadata:
        - "text": The actual chunk content (substring of the original content)
        - "annotations": Metadata dictionary containing:
          - priority: Importance hint for the LLM (default: 0.7)
          - audience: Who should see this chunk (default: ["assistant"])
          - chunk_info: Detailed positioning metadata including:
            - index: Zero-based index of this chunk in the sequence
            - total_chunks: Total number of chunks in the complete content
            - start_position: Character offset where this chunk begins in the original content
            - end_position: Character offset where this chunk ends in the original content
            - has_more: Boolean indicating if more chunks follow this one
    
    Usage examples:
        # Basic usage with default parameters
        chunks = format_chunked_content("Long document text...")
        
        # Using smaller chunks for models with limited context windows
        small_chunks = format_chunked_content(
            content="Large article text...",
            chunk_size=1000,
            overlap=100
        )
        
        # Process chunks sequentially while maintaining context
        for chunk in chunks:
            response = await generate_completion(
                prompt=f"Analyze this text: {chunk['text']}",
                # Include chunk metadata so the LLM understands context
                additional_context=f"This is chunk {chunk['annotations']['chunk_info']['index']+1} "
                                  f"of {chunk['annotations']['chunk_info']['total_chunks']}"
            )
    """
    chunks = []
    
    # Create chunks with overlap
    for i in range(0, len(content), chunk_size - overlap):
        chunk_text = content[i:i + chunk_size]
        if chunk_text:
            # Create chunk with annotations
            chunk = {
                "text": chunk_text,
                "annotations": {
                    "priority": 0.7,
                    "audience": ["assistant"],
                    "chunk_info": {
                        "index": len(chunks),
                        "total_chunks": (len(content) + chunk_size - 1) // (chunk_size - overlap),
                        "start_position": i,
                        "end_position": min(i + chunk_size, len(content)),
                        "has_more": i + chunk_size < len(content)
                    }
                }
            }
            chunks.append(chunk)
    
    return chunks 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/__init__.py:
--------------------------------------------------------------------------------

```python
"""MCP Tools for Ultimate MCP Server."""

import inspect
import sys
from typing import Any, Dict

from ultimate_mcp_server.tools.base import (
    BaseTool,  # Keep BaseTool in case other modules use it
    register_tool,
    with_error_handling,
    with_retry,
    with_tool_metrics,
)
from ultimate_mcp_server.utils import get_logger

# from .audio_transcription import (
#     chat_with_transcript,
#     extract_audio_transcript_key_points,
#     transcribe_audio,
# )
# Import base decorators/classes that might be used by other tool modules
from .completion import chat_completion, generate_completion, multi_completion, stream_completion
from .document_conversion_and_processing import (
    analyze_pdf_structure,
    batch_format_texts,
    canonicalise_entities,
    chunk_document,
    clean_and_format_text_as_markdown,
    convert_document,
    detect_content_type,
    enhance_ocr_text,
    extract_entities,
    extract_metrics,
    extract_tables,
    flag_risks,
    generate_qa_pairs,
    identify_sections,
    ocr_image,
    optimize_markdown_formatting,
    process_document_batch,
    summarize_document,
)

# from .docstring_refiner import refine_tool_documentation
# from .entity_relation_graph import extract_entity_graph
# from .extraction import (
#     extract_code_from_response,
#     extract_json,
#     extract_key_value_pairs,
#     extract_semantic_schema,
#     extract_table,
# )
from .filesystem import (
    create_directory,
    directory_tree,
    edit_file,
    get_file_info,
    get_unique_filepath,
    list_allowed_directories,
    list_directory,
    move_file,
    read_file,
    read_multiple_files,
    search_files,
    write_file,
)
from .local_text_tools import (
    get_workspace_dir,
    run_awk,
    run_awk_stream,
    run_jq,
    run_jq_stream,
    run_ripgrep,
    run_ripgrep_stream,
    run_sed,
    run_sed_stream,
)

# from .marqo_fused_search import marqo_fused_search
# from .meta_api_tool import register_api_meta_tools
from .optimization import (
    compare_models,
    estimate_cost,
    execute_optimized_workflow,
    recommend_model,
)
from .provider import get_provider_status, list_models
from .python_sandbox import (
    execute_python,
    repl_python,
)

# from .rag import (
#     add_documents,
#     create_knowledge_base,
#     delete_knowledge_base,
#     generate_with_rag,
#     list_knowledge_bases,
#     retrieve_context,
# )
from .sentiment_analysis import analyze_business_sentiment, analyze_business_text_batch

# from .single_shot_synthesis import single_shot_synthesis
from .smart_browser import (
    autopilot,
    browse,
    click,
    collect_documentation,
    download,
    download_site_pdfs,
    parallel,
    run_macro,
    search,
    type_text,
)

# from .sql_databases import access_audit_log, execute_sql, explore_database, manage_database
# from .text_classification import text_classification
# from .text_redline_tools import (
#     compare_documents_redline,
#     create_html_redline,
# )
# from .tournament import (
#     cancel_tournament,
#     create_tournament,
#     get_tournament_results,
#     get_tournament_status,
#     list_tournaments,
# )
from .unified_memory_system import (
    add_tag_to_memory,
    consolidate_memories,
    create_embedding,
    create_goal,
    create_memory_link,
    create_workflow,
    decay_link_strengths,
    diagnose_file_access_issues,
    focus_memory,
    generate_reflection,
    generate_workflow_report,
    get_artifact_by_id,
    get_artifacts,
    get_contradictions,
    get_embedding,
    get_goal_details,
    get_linked_memories,
    get_memory_by_id,
    get_memory_metadata,
    get_memory_tags,
    get_recent_actions,
    get_rich_context_package,
    get_similar_memories,
    get_subgraph,
    get_thought_chain,
    get_workflow_details,
    get_workflow_metadata,
    get_working_memory,
    hybrid_search_memories,
    load_cognitive_state,
    optimize_working_memory,
    promote_memory_level,
    query_goals,
    query_memories,
    record_action_completion,
    record_action_start,
    record_artifact,
    save_cognitive_state,
    store_memory,
    update_goal_status,
    update_memory,
    update_memory_link_metadata,
    update_memory_metadata,
    vector_similarity,
)

__all__ = [
    # Base decorators/classes
    "BaseTool",
    "with_tool_metrics",
    "with_retry",
    "with_error_handling",
    "register_tool", 
    
    # LLM Completion tools
    "generate_completion",
    "stream_completion",
    "chat_completion",
    "multi_completion",
    "get_provider_status",
    "list_models",

    # Extraction tools
    # "extract_json",
    # "extract_table",
    # "extract_key_value_pairs",
    # "extract_semantic_schema",
    # "extract_entity_graph",
    # "extract_code_from_response",

    # Knowledge base tools
    # "create_knowledge_base",
    # "list_knowledge_bases",
    # "delete_knowledge_base",
    # "add_documents",
    # "retrieve_context",
    # "generate_with_rag",
    # "text_classification",

    # Cost optimization tools
    "estimate_cost",
    "compare_models",
    "recommend_model",
    "execute_optimized_workflow",
    "refine_tool_documentation",
    
    # Filesystem tools
    "read_file",
    "read_multiple_files",
    "write_file",
    "edit_file",
    "create_directory",
    "list_directory",
    "directory_tree",
    "move_file",
    "search_files",
    "get_file_info",
    "list_allowed_directories",
    "get_unique_filepath",

    # Local Text Tools
    "run_ripgrep",
    "run_awk",
    "run_sed",
    "run_jq",
    "run_ripgrep_stream",
    "run_awk_stream",
    "run_sed_stream",
    "run_jq_stream",
    "get_workspace_dir",

    # SQL databases tools
    # "manage_database",
    # "execute_sql",
    # "explore_database",
    # "access_audit_log",

    # Python sandbox tools
    "execute_python",
    "repl_python",

    # Smart Browser Standalone Functions
    "click",
    "browse",
    "type_text",
    "search",
    "download",
    "download_site_pdfs",
    "collect_documentation",
    "parallel",
    "run_macro",
    "autopilot",
    
    # Document conversion and processing tools
    "convert_document",
    "chunk_document",
    "clean_and_format_text_as_markdown",
    "detect_content_type",
    "batch_format_texts",
    "optimize_markdown_formatting",
    "identify_sections",
    "generate_qa_pairs",
    "summarize_document",
    "extract_metrics",
    "flag_risks",
    "canonicalise_entities",
    "ocr_image",
    "enhance_ocr_text",
    "analyze_pdf_structure",
    "process_document_batch",
    "extract_entities",
    "extract_tables",

    # Text Redline tools
    # "compare_documents_redline",
    # "create_html_redline",

    # Meta API tools
    # "register_api_meta_tools",

    # Marqo tool
    # "marqo_fused_search",

    # Tournament tools
    # "create_tournament",
    # "get_tournament_status",
    # "list_tournaments",
    # "get_tournament_results",
    # "cancel_tournament",

    # Audio tools
    # "transcribe_audio",
    # "extract_audio_transcript_key_points",
    # "chat_with_transcript",
    
    # Sentiment analysis tool
    "analyze_business_sentiment",
    "analyze_business_text_batch",
    
    # Unified Memory System tools
    "create_workflow",
    "get_workflow_details",
    "record_action_start",
    "record_action_completion",
    "get_recent_actions",
    "get_thought_chain",
    "store_memory",
    "get_memory_by_id",
    "get_memory_metadata",
    "get_memory_tags",
    "update_memory_metadata",
    "update_memory_link_metadata",
    "create_memory_link",
    "get_workflow_metadata",
    "get_contradictions",
    "query_memories",
    "update_memory",
    "get_linked_memories",
    "add_tag_to_memory",
    "create_embedding",
    "get_embedding",
    "get_working_memory",
    "focus_memory",
    "optimize_working_memory",
    "promote_memory_level",
    "save_cognitive_state",
    "load_cognitive_state",
    "decay_link_strengths",
    "generate_reflection",
    "get_rich_context_package",
    "get_goal_details",
    "create_goal",
    "update_goal_status",
    "vector_similarity",
    "record_artifact",
    "get_artifacts",
    "get_artifact_by_id",
    "get_similar_memories",
    "query_goals",
    "consolidate_memories",
    "diagnose_file_access_issues",
    "generate_workflow_report",
    "hybrid_search_memories",
    "get_subgraph",
]

logger = get_logger("ultimate_mcp_server.tools")


# --- Tool Registration --- 

# Generate STANDALONE_TOOL_FUNCTIONS by filtering __all__ for actual function objects
# This eliminates the redundancy between __all__ and STANDALONE_TOOL_FUNCTIONS
def _get_standalone_tool_functions():
    """Dynamically generates list of standalone tool functions from __all__."""
    current_module = sys.modules[__name__]
    standalone_functions = []
    
    for item_name in __all__:
        if item_name in ["BaseTool", "with_tool_metrics", "with_retry", 
                         "with_error_handling", "register_tool"]:
            # Skip base classes and decorators
            continue
            
        # Get the actual item from the module
        item = getattr(current_module, item_name, None)
        
        # Only include callable async functions (not classes or other exports)
        if callable(item) and inspect.iscoroutinefunction(item):
            standalone_functions.append(item)
            
    return standalone_functions

# Get the list of standalone functions to register
STANDALONE_TOOL_FUNCTIONS = _get_standalone_tool_functions()


def register_all_tools(mcp_server) -> Dict[str, Any]:
    """Registers all tools (standalone and class-based) with the MCP server.

    Args:
        mcp_server: The MCP server instance.

    Returns:
        Dictionary containing information about registered tools.
    """
    from ultimate_mcp_server.config import get_config
    cfg = get_config()
    filter_enabled = cfg.tool_registration.filter_enabled
    included_tools = cfg.tool_registration.included_tools
    excluded_tools = cfg.tool_registration.excluded_tools
    
    logger.info("Registering tools based on configuration...")
    if filter_enabled:
        if included_tools:
            logger.info(f"Tool filtering enabled: including only {len(included_tools)} specified tools")
        if excluded_tools:
            logger.info(f"Tool filtering enabled: excluding {len(excluded_tools)} specified tools")
    
    registered_tools: Dict[str, Any] = {}
    
    # --- Register Standalone Functions ---
    standalone_count = 0
    for tool_func in STANDALONE_TOOL_FUNCTIONS:
        if not callable(tool_func) or not inspect.iscoroutinefunction(tool_func):
            logger.warning(f"Item {getattr(tool_func, '__name__', repr(tool_func))} in STANDALONE_TOOL_FUNCTIONS is not a callable async function.")
            continue
            
        tool_name = tool_func.__name__
        
        # Apply tool filtering logic
        if filter_enabled:
            # Skip if not in included_tools when included_tools is specified
            if included_tools and tool_name not in included_tools:
                logger.debug(f"Skipping tool {tool_name} (not in included_tools)")
                continue
                
            # Skip if in excluded_tools
            if tool_name in excluded_tools:
                logger.debug(f"Skipping tool {tool_name} (in excluded_tools)")
                continue
        
        # Register the tool
        mcp_server.tool(name=tool_name)(tool_func)
        registered_tools[tool_name] = {
            "description": inspect.getdoc(tool_func) or "",
            "type": "standalone_function"
        }
        logger.info(f"Registered tool function: {tool_name}", emoji_key="⚙️")
        standalone_count += 1
    

    # --- Register Class-Based Tools ---

    # Register Meta API Tool
    if (not filter_enabled or 
        "meta_api_tool" in included_tools or 
        (not included_tools and "meta_api_tool" not in excluded_tools)):
        try:
            from ultimate_mcp_server.tools.meta_api_tool import register_api_meta_tools
            register_api_meta_tools(mcp_server)
            logger.info("Registered API Meta-Tool functions", emoji_key="⚙️")
            standalone_count += 1
        except ImportError:
            logger.warning("Meta API tools not found (ultimate_mcp_server.tools.meta_api_tool)")
        except Exception as e:
            logger.error(f"Failed to register Meta API tools: {e}", exc_info=True)
    
    # Register Excel Spreadsheet Automation Tool
    if (not filter_enabled or 
        "excel_spreadsheet_automation" in included_tools or 
        (not included_tools and "excel_spreadsheet_automation" not in excluded_tools)):
        try:
            from ultimate_mcp_server.tools.excel_spreadsheet_automation import (
                WINDOWS_EXCEL_AVAILABLE,
                register_excel_spreadsheet_tools,
            )
            if WINDOWS_EXCEL_AVAILABLE:
                register_excel_spreadsheet_tools(mcp_server)
                logger.info("Registered Excel spreadsheet tools", emoji_key="⚙️")
                standalone_count += 1
            else:
                # Automatically exclude Excel tools if not available
                logger.warning("Excel automation tools are only available on Windows with Excel installed. These tools will not be registered.")
                # If not already explicitly excluded, add to excluded_tools
                if "excel_spreadsheet_automation" not in excluded_tools:
                    if not cfg.tool_registration.filter_enabled:
                        cfg.tool_registration.filter_enabled = True
                    if not hasattr(cfg.tool_registration, "excluded_tools"):
                        cfg.tool_registration.excluded_tools = []
                    cfg.tool_registration.excluded_tools.append("excel_spreadsheet_automation")
        except ImportError:
            logger.warning("Excel spreadsheet tools not found (ultimate_mcp_server.tools.excel_spreadsheet_automation)")
        except Exception as e:
            logger.error(f"Failed to register Excel spreadsheet tools: {e}", exc_info=True)
    
    logger.info(
        f"Completed tool registration. Registered {standalone_count} tools.", 
        emoji_key="⚙️"
    )
    
    # Return info about registered tools
    return registered_tools

```

--------------------------------------------------------------------------------
/examples/advanced_vector_search_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Demo of advanced vector search capabilities using real Ultimate MCP Server tools."""
import asyncio
import sys
import time
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich.markup import escape
from rich.rule import Rule

from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.services.vector import get_vector_db_service
from ultimate_mcp_server.services.vector.embeddings import cosine_similarity, get_embedding_service

# --- Add Marqo Tool Import ---
from ultimate_mcp_server.tools.marqo_fused_search import marqo_fused_search
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import (
    display_embedding_generation_results,
    display_text_content_result,
    display_vector_similarity_results,
    parse_and_display_result,
)

# ---------------------------
# --- Add Rich Imports ---
from ultimate_mcp_server.utils.logging.console import console

# ----------------------

# Initialize logger
logger = get_logger("example.advanced_vector_search")

# Initialize global gateway
gateway = None
vector_service = None
embedding_service = None

async def setup_services():
    """Set up the gateway and vector service for demonstration."""
    global gateway, vector_service, embedding_service
    
    logger.info("Initializing gateway and services...", emoji_key="start")
    gateway = Gateway("vector-demo", register_tools=False)
    await gateway._initialize_providers()
    
    embedding_service = get_embedding_service() # Gateway will provide API keys through provider system
    vector_service = get_vector_db_service()
    
    logger.success("Services initialized.", emoji_key="success")


async def embedding_generation_demo():
    """Demonstrate embedding generation with real providers using Rich."""
    console.print(Rule("[bold blue]Embedding Generation Demo[/bold blue]"))
    logger.info("Starting embedding generation demo", emoji_key="start")
    
    text_samples = [
        "Quantum computing leverages quantum mechanics to perform computations",
        "Artificial intelligence systems can learn from data and improve over time",
        "Cloud infrastructure enables scalable and flexible computing resources"
    ]
    console.print("Input Text Samples:")
    for i, sample in enumerate(text_samples): 
        console.print(f"  {i+1}. {escape(sample)}")
    
    # Define models to test (ensure they are supported by your embedding_service config)
    models_to_test = [
        "text-embedding-3-small",
        "text-embedding-3-large",
        "text-embedding-ada-002"
    ]

    # Collect results for display
    results_data = {"models": []}

    for model_name in models_to_test:
        try:
            logger.info(f"Generating embeddings with {model_name}...", emoji_key="processing")
            start_time = time.time()
            embeddings = await embedding_service.create_embeddings(
                texts=text_samples
            )
            processing_time = time.time() - start_time
            
            model_result = {
                "name": model_name,
                "success": embeddings and len(embeddings) > 0,
                "time": processing_time,
                "cost": embedding_service.last_request_cost if hasattr(embedding_service, 'last_request_cost') else 0.0,
            }
            
            if embeddings and len(embeddings) > 0:
                dims = len(embeddings[0])
                model_result["dimensions"] = dims
                model_result["embedding_sample"] = embeddings[0][:3]
                logger.success(f"Generated {len(embeddings)} embeddings ({dims} dims) for {model_name}", emoji_key="success")
            else:
                logger.warning(f"No embeddings returned for {model_name}", emoji_key="warning")
            
            results_data["models"].append(model_result)
                
        except Exception as e:
            logger.error(f"Error generating embeddings with {model_name}: {e}", emoji_key="error", exc_info=True)
            results_data["models"].append({
                "name": model_name,
                "success": False,
                "error": str(e)
            })
    
    # Use the shared display utility to show results
    display_embedding_generation_results(results_data)


async def vector_search_demo():
    """Demonstrate vector search capabilities using Rich."""
    console.print(Rule("[bold blue]Vector Search Demo[/bold blue]"))
    logger.info("Starting vector search demo", emoji_key="start")
    
    documents = [
        "Quantum computing uses quantum bits or qubits to perform calculations.",
        "Machine learning algorithms learn patterns from data without explicit programming.",
        "Blockchain technology creates a distributed and immutable ledger of transactions.",
        "Cloud computing delivers computing services over the internet on demand.",
        "Natural language processing helps computers understand and interpret human language.",
        "Artificial intelligence systems can simulate human intelligence in machines.",
        "Edge computing processes data closer to where it is generated rather than in a centralized location.",
        "Cybersecurity involves protecting systems from digital attacks and unauthorized access.",
        "Internet of Things (IoT) connects everyday devices to the internet for data sharing.",
        "Virtual reality creates an immersive computer-generated environment."
    ]
    document_metadata = [
        {"id": "doc1", "category": "quantum", "level": "advanced"},
        {"id": "doc2", "category": "ai", "level": "intermediate"},
        {"id": "doc3", "category": "blockchain", "level": "beginner"},
        {"id": "doc4", "category": "cloud", "level": "intermediate"},
        {"id": "doc5", "category": "ai", "level": "advanced"},
        {"id": "doc6", "category": "ai", "level": "beginner"},
        {"id": "doc7", "category": "cloud", "level": "advanced"},
        {"id": "doc8", "category": "security", "level": "intermediate"},
        {"id": "doc9", "category": "iot", "level": "beginner"},
        {"id": "doc10", "category": "vr", "level": "intermediate"}
    ]
    
    collection_name = "demo_vector_store_rich"
    embedding_dimension = 1536 # Default for text-embedding-ada-002 / 3-small

    try:
        logger.info(f"Creating/resetting collection: {collection_name}", emoji_key="db")
        await vector_service.create_collection(
            name=collection_name,
            dimension=embedding_dimension, 
            overwrite=True, 
            metadata={"description": "Demo collection for Rich vector search"}
        )
        
        logger.info("Adding documents to vector store...", emoji_key="processing")
        ids = await vector_service.add_texts(
            collection_name=collection_name,
            texts=documents,
            metadatas=document_metadata,
            batch_size=5
        )
        logger.success(f"Added {len(ids)} documents.", emoji_key="success")
        
        # --- Perform Searches ---
        search_queries = [
            "How does quantum computing work?",
            "Machine learning for image recognition",
            "Secure blockchain implementation"
        ]
        
        console.print(Rule("[green]Vector Search Results[/green]"))
        for query in search_queries:
            logger.info(f'Searching for: "{escape(query)}"...', emoji_key="search")
            search_start_time = time.time()
            results = await vector_service.search_by_text(
                collection_name=collection_name,
                query_text=query,
                top_k=3,
                include_vectors=False,
                # Example filter: metadata_filter={"category": "ai"}
            )
            search_time = time.time() - search_start_time
            
            # Format the results for the display utility
            search_result = {
                "processing_time": search_time,
                "results": results,
                "query": query
            }
            
            # Use the shared display utility
            parse_and_display_result(
                title=f"Search: {query}",
                input_data={"query": query},
                result=search_result
            )
        
    except Exception as e:
        logger.error(f"Error during vector search demo: {e}", emoji_key="error", exc_info=True)
        console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
    finally:
         # Clean up the collection
        try:
            logger.info(f"Deleting collection: {collection_name}", emoji_key="db")
            await vector_service.delete_collection(collection_name)
        except Exception as delete_err:
             logger.warning(f"Could not delete collection {collection_name}: {delete_err}", emoji_key="warning")
    console.print()


async def hybrid_search_demo():
    """Demonstrate hybrid search using the marqo_fused_search tool."""
    console.print(Rule("[bold blue]Hybrid Search Demo (using Marqo Fused Search Tool)[/bold blue]"))
    logger.info("Starting hybrid search demo (conceptual)", emoji_key="start")
    
    # This demo uses the marqo_fused_search tool, which performs hybrid search.
    # It requires a running Marqo instance and a configured index 
    # as defined in marqo_index_config.json.
    
    # Note: For this demo to work correctly, the configured Marqo index
    # should contain documents related to the query, potentially including
    # metadata fields like 'tags' if filtering is intended.
    # The setup below is removed as the data needs to be pre-indexed in Marqo.
    # collection_name = "demo_hybrid_store_rich"
    # try:
    #    logger.info(f"Creating/resetting collection: {collection_name}", emoji_key="db")
    #    # ... [Code to create collection and add documents would go here if using local DB] ...
    # except Exception as setup_e:
    #     logger.error(f"Failed to setup data for hybrid demo: {setup_e}", emoji_key="error")
    #     console.print(f"[bold red]Error setting up demo data: {escape(str(setup_e))}[/bold red]")
    #     return
    
    try:
        # --- Perform Hybrid Search (Simulated) ---
        query = "cloud semantic search techniques"
        # keywords = ["cloud", "semantic"] # Keywords can be included in query or filters
        semantic_weight_param = 0.6 # Weight for semantic search (alpha)
        
        logger.info(f'Hybrid search for: "{escape(query)}" with semantic weight {semantic_weight_param}', emoji_key="search")
        
        # Call the marqo_fused_search tool directly
        hybrid_result = await marqo_fused_search(
            query=query,
            limit=3, # Request top 3 results
            semantic_weight=semantic_weight_param
            # Add filters={}, date_range=None etc. if needed based on schema
        )

        display_text_content_result(
            f"Hybrid Search Results (Weight={semantic_weight_param})",
            hybrid_result # Pass the result dict directly
        )

    except Exception as e:
        logger.error(f"Error during hybrid search demo: {e}", emoji_key="error", exc_info=True)
        console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
    # Removed cleanup as we assume Marqo index exists independently
    console.print()

async def semantic_similarity_demo():
    """Demonstrate calculating semantic similarity using Rich."""
    console.print(Rule("[bold blue]Semantic Similarity Demo[/bold blue]"))
    logger.info("Starting semantic similarity demo", emoji_key="start")
    
    text_pairs = [
        ("The cat sat on the mat", "A feline was resting upon the rug"),
        ("AI is transforming industries", "Artificial intelligence drives innovation"),
        ("Cloud computing offers scalability", "The weather today is sunny")
    ]
    
    model_name = "text-embedding-ada-002" # Use a consistent model
    logger.info(f"Calculating similarity using model: {model_name}", emoji_key="model")
    
    # Prepare data structure for the shared display utility
    similarity_data = {
        "pairs": [],
        "model": model_name
    }

    try:
        all_texts = [text for pair in text_pairs for text in pair]
        embeddings = await embedding_service.create_embeddings(
            texts=all_texts
        )
        
        if len(embeddings) == len(all_texts):
            for i, pair in enumerate(text_pairs):
                idx1 = i * 2
                idx2 = i * 2 + 1
                score = cosine_similarity(embeddings[idx1], embeddings[idx2])
                
                similarity_data["pairs"].append({
                    "text1": pair[0],
                    "text2": pair[1],
                    "score": score
                })
            
            # Use the specialized display function for similarity results
            display_vector_similarity_results(similarity_data)
        else:
            logger.error("Mismatch between number of texts and embeddings received.", emoji_key="error")
            console.print("[red]Error calculating similarities: Embedding count mismatch.[/red]")
            
    except Exception as e:
        logger.error(f"Error calculating semantic similarity: {e}", emoji_key="error", exc_info=True)
        console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
        
    console.print()

async def main():
    """Run all advanced vector search demonstrations."""
    await setup_services()
    console.print(Rule("[bold magenta]Advanced Vector Search Demos Starting[/bold magenta]"))
    
    try:
        await embedding_generation_demo()
        await vector_search_demo()
        await hybrid_search_demo()
        await semantic_similarity_demo()
        
    except Exception as e:
        logger.critical(f"Vector search demo failed: {str(e)}", emoji_key="critical", exc_info=True)
        console.print(f"[bold red]Critical Demo Error:[/bold red] {escape(str(e))}")
        return 1
    
    logger.success("Advanced Vector Search Demos Finished Successfully!", emoji_key="complete")
    console.print(Rule("[bold magenta]Advanced Vector Search Demos Complete[/bold magenta]"))
    return 0


if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
```
Page 3/35FirstPrevNextLast