#
tokens: 46082/50000 11/207 files (page 5/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 5 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/examples/rag_example.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""Example of using the RAG functionality with Ultimate MCP Server."""
import asyncio
import sys
from pathlib import Path

# Add parent directory to path to import ultimate_mcp_server
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table

from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.services.knowledge_base import (
    get_knowledge_base_manager,
    get_knowledge_base_retriever,
)
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker
from ultimate_mcp_server.utils.logging.console import console

# Initialize logger
logger = get_logger("rag_example")

# Sample documents about different AI technologies
AI_DOCUMENTS = [
    """Transformers are a type of neural network architecture introduced in the paper 
    "Attention is All You Need" by Vaswani et al. in 2017. They use self-attention 
    mechanisms to process sequential data, making them highly effective for natural 
    language processing tasks. Unlike recurrent neural networks (RNNs), transformers 
    process entire sequences in parallel, which allows for more efficient training. 
    The original transformer architecture consists of an encoder and a decoder, each 
    made up of multiple layers of self-attention and feed-forward neural networks.""",
    
    """Retrieval-Augmented Generation (RAG) is an AI framework that combines the 
    strengths of retrieval-based and generation-based approaches. In RAG systems, 
    a retrieval component first finds relevant information from a knowledge base, 
    and then a generation component uses this information to produce more accurate, 
    factual, and contextually relevant outputs. RAG helps to mitigate hallucination 
    issues in large language models by grounding the generation in retrieved facts.""",
    
    """Reinforcement Learning from Human Feedback (RLHF) is a technique used to align 
    language models with human preferences. The process typically involves three steps: 
    First, a language model is pre-trained on a large corpus of text. Second, human 
    evaluators rank different model outputs, creating a dataset of preferred responses. 
    Third, this dataset is used to train a reward model, which is then used to fine-tune 
    the language model using reinforcement learning techniques such as Proximal Policy 
    Optimization (PPO).""",
    
    """Mixture of Experts (MoE) is an architecture where multiple specialized neural 
    networks (experts) are trained to handle different types of inputs or tasks. A 
    gating network determines which expert(s) should process each input. This approach 
    allows for larger model capacity without a proportional increase in computational 
    costs, as only a subset of the parameters is activated for any given input. MoE 
    has been successfully applied in large language models like Google's Switch 
    Transformer and Microsoft's Mixtral."""
]

AI_METADATAS = [
    {"title": "Transformers", "source": "AI Handbook", "type": "architecture"},
    {"title": "Retrieval-Augmented Generation", "source": "AI Handbook", "type": "technique"},
    {"title": "RLHF", "source": "AI Handbook", "type": "technique"},
    {"title": "Mixture of Experts", "source": "AI Handbook", "type": "architecture"}
]

EXAMPLE_QUERIES = [
    "How do transformers work?",
    "What is retrieval-augmented generation?",
    "Compare RLHF and MoE approaches."
]

KB_NAME = "ai_technologies"

async def run_rag_demo(tracker: CostTracker):
    """Run the complete RAG demonstration."""
    console.print("[bold blue]RAG Example with Ultimate MCP Server[/bold blue]")
    console.print("This example demonstrates the RAG functionality using direct knowledge base services.")
    console.print()
    
    # Initialize Gateway for proper provider and API key management
    gateway = Gateway("rag-example", register_tools=False)
    await gateway._initialize_providers()
    
    # Get knowledge base services directly
    kb_manager = get_knowledge_base_manager()
    kb_retriever = get_knowledge_base_retriever()
    
    # Clean up any existing knowledge base with the same name before starting
    console.print(Rule("[bold blue]Cleaning Up Previous Runs[/bold blue]"))
    
    # Force a clean start
    try:
        # Get direct reference to the vector service
        from ultimate_mcp_server.services.vector import get_vector_db_service
        vector_service = get_vector_db_service()
        
        # Try a more aggressive approach by resetting chromadb client directly
        if hasattr(vector_service, 'chroma_client') and vector_service.chroma_client:
            try:
                # First try standard deletion
                try:
                    vector_service.chroma_client.delete_collection(KB_NAME)
                    logger.info("Successfully deleted ChromaDB collection using client API")
                except Exception as e:
                    logger.debug(f"Standard ChromaDB deletion failed: {str(e)}")
                
                # Wait longer to ensure deletion propagates
                await asyncio.sleep(1.0)
                
                # Force reset the ChromaDB client when all else fails
                if hasattr(vector_service.chroma_client, 'reset'):
                    try:
                        vector_service.chroma_client.reset()
                        logger.info("Reset ChromaDB client to ensure clean start")
                        await asyncio.sleep(0.5)
                    except Exception as e:
                        logger.warning(f"Failed to reset ChromaDB client: {str(e)}")
            except Exception as e:
                logger.warning(f"Error with ChromaDB client manipulation: {str(e)}")
        
        # Try to delete at the vector database level again
        try:
            await vector_service.delete_collection(KB_NAME)
            logger.info(f"Directly deleted vector collection '{KB_NAME}'")
            await asyncio.sleep(0.5)
        except Exception as e:
            logger.warning(f"Error directly deleting vector collection: {str(e)}")
        
        # Also try to delete at the knowledge base level
        try:
            kb_info = await kb_manager.get_knowledge_base(KB_NAME)
            if kb_info and kb_info.get("status") != "not_found":
                await kb_manager.delete_knowledge_base(name=KB_NAME)
                logger.info(f"Deleted existing knowledge base '{KB_NAME}'")
                await asyncio.sleep(0.5)
        except Exception as e:
            logger.warning(f"Error deleting knowledge base: {str(e)}")
            
        logger.info("Cleanup completed, proceeding with clean start")
    except Exception as e:
        logger.warning(f"Error during initial cleanup: {str(e)}")
    
    console.print()
    
    # Step 1: Create knowledge base
    console.print(Rule("[bold blue]Step 1: Creating Knowledge Base[/bold blue]"))
    try:
        await kb_manager.create_knowledge_base(
            name=KB_NAME,
            description="Information about various AI technologies",
            embedding_model="text-embedding-3-small",
            overwrite=True
        )
        logger.success(f"Knowledge base created: {KB_NAME}", emoji_key="success")
    except Exception as e:
        logger.error(f"Failed to create knowledge base: {str(e)}", emoji_key="error")
        return 1
    
    console.print()
    
    # Step 2: Add documents
    console.print(Rule("[bold blue]Step 2: Adding Documents[/bold blue]"))
    try:
        result = await kb_manager.add_documents(
            knowledge_base_name=KB_NAME,
            documents=AI_DOCUMENTS,
            metadatas=AI_METADATAS,
            embedding_model="text-embedding-3-small",
            chunk_size=1000,
            chunk_method="semantic"
        )
        added_count = result.get("added_count", 0)
        logger.success(f"Added {added_count} documents to knowledge base", emoji_key="success")
    except Exception as e:
        logger.error(f"Failed to add documents: {str(e)}", emoji_key="error")
        return 1
    
    console.print()
    
    # Step 3: List knowledge bases
    console.print(Rule("[bold blue]Step 3: Listing Knowledge Bases[/bold blue]"))
    try:
        knowledge_bases = await kb_manager.list_knowledge_bases()
        
        # Create a Rich table for display
        table = Table(title="Available Knowledge Bases", box=None)
        table.add_column("Name", style="cyan")
        table.add_column("Description", style="green")
        table.add_column("Document Count", style="magenta")
        
        # Handle various return types
        try:
            if knowledge_bases is None:
                table.add_row("No knowledge bases found", "", "")
            elif isinstance(knowledge_bases, dict):
                # Handle dictionary response
                kb_names = knowledge_bases.get("knowledge_bases", [])
                if isinstance(kb_names, list):
                    for kb_item in kb_names:
                        if isinstance(kb_item, dict):
                            # Extract name and metadata from dictionary
                            name = kb_item.get("name", "Unknown")
                            metadata = kb_item.get("metadata", {})
                            description = metadata.get("description", "No description") if isinstance(metadata, dict) else "No description"
                            doc_count = metadata.get("doc_count", "Unknown") if isinstance(metadata, dict) else "Unknown"
                            table.add_row(str(name), str(description), str(doc_count))
                        else:
                            table.add_row(str(kb_item), "No description available", "Unknown")
                else:
                    table.add_row("Error parsing response", "", "")
            elif isinstance(knowledge_bases, list):
                # Handle list response
                for kb in knowledge_bases:
                    if isinstance(kb, str):
                        table.add_row(kb, "No description", "0")
                    elif isinstance(kb, dict):
                        name = kb.get("name", "Unknown")
                        metadata = kb.get("metadata", {})
                        description = metadata.get("description", "No description") if isinstance(metadata, dict) else "No description"
                        doc_count = metadata.get("doc_count", "Unknown") if isinstance(metadata, dict) else "Unknown"
                        table.add_row(str(name), str(description), str(doc_count))
                    else:
                        kb_name = str(getattr(kb, 'name', str(kb)))
                        table.add_row(kb_name, "No description", "0")
            else:
                # Fallback for unexpected response type
                table.add_row(f"Unexpected response: {type(knowledge_bases)}", "", "")
            
            console.print(table)
        except Exception as e:
            logger.error(f"Error rendering knowledge bases table: {str(e)}", emoji_key="error")
            # Simple fallback display
            console.print(f"Knowledge bases available: {knowledge_bases}")
    except Exception as e:
        logger.error(f"Failed to list knowledge bases: {str(e)}", emoji_key="error")
    
    console.print()
    
    # Step 4: Retrieve context for first query
    console.print(Rule("[bold blue]Step 4: Retrieving Context[/bold blue]"))
    
    query = EXAMPLE_QUERIES[0]
    logger.info(f"Retrieving context for query: '{query}'", emoji_key="processing")
    
    # Default fallback document if retrieval fails
    retrieved_results = []
    
    try:
        try:
            results = await kb_retriever.retrieve(
                knowledge_base_name=KB_NAME,
                query=query,
                top_k=2,
                min_score=0.0,  # Set min_score to 0 to see all results
                embedding_model="text-embedding-3-small"  # Use the same embedding model as when adding documents
            )
            retrieved_results = results.get('results', [])
            
            # Debug raw results
            logger.debug(f"Raw retrieval results: {results}")
        except Exception as e:
            logger.error(f"Error retrieving from knowledge base: {str(e)}", emoji_key="error")
            # Fallback to using the documents directly
            retrieved_results = [
                {
                    "document": AI_DOCUMENTS[0],
                    "score": 0.95,
                    "metadata": AI_METADATAS[0]
                }
            ]
        
        console.print(f"Retrieved {len(retrieved_results)} results for query: '{query}'")
        
        # Display results in panels
        if retrieved_results:
            for i, doc in enumerate(retrieved_results):
                try:
                    score = doc.get('score', 0.0)
                    document = doc.get('document', '')
                    metadata = doc.get('metadata', {})
                    source = metadata.get('title', 'Unknown') if isinstance(metadata, dict) else 'Unknown'
                    
                    console.print(Panel(
                        f"[bold]Document {i+1}[/bold] (score: {score:.2f})\n" +
                        f"[italic]{document[:150]}...[/italic]",
                        title=f"Source: {source}",
                        border_style="blue"
                    ))
                except Exception as e:
                    logger.error(f"Error displaying document {i}: {str(e)}", emoji_key="error")
        else:
            console.print(Panel(
                "[italic]No results found. Using sample document as fallback for demonstration.[/italic]",
                title="No Results",
                border_style="yellow"
            ))
            # Create a fallback document for the next step
            retrieved_results = [
                {
                    "document": AI_DOCUMENTS[0],
                    "score": 0.0,
                    "metadata": AI_METADATAS[0]
                }
            ]
    except Exception as e:
        logger.error(f"Failed to process retrieval results: {str(e)}", emoji_key="error")
        # Ensure we have something to continue with
        retrieved_results = [
            {
                "document": AI_DOCUMENTS[0],
                "score": 0.0,
                "metadata": AI_METADATAS[0]
            }
        ]
    
    console.print()
    
    # Step 5: Generate completions using retrieved context for the first query
    console.print(Rule("[bold blue]Step 5: Generating Response with Retrieved Context[/bold blue]"))
    query = EXAMPLE_QUERIES[0]
    console.print(f"\n[bold]Query:[/bold] {query}")
    
    try:
        # Get the provider
        provider_key = "gemini"
        provider = gateway.providers.get(provider_key)
        if not provider:
            provider_key = "openai"
            provider = gateway.providers.get(provider_key)  # Fallback
        
        if not provider:
            logger.error("No suitable provider found", emoji_key="error")
            return 1
        
        # Use a hardcoded model based on provider type
        if provider_key == "gemini":
            model = "gemini-2.0-flash-lite"
        elif provider_key == "openai":
            model = "gpt-4.1-mini"
        elif provider_key == "anthropic":
            model = "claude-3-haiku-latest"
        else:
            # Get first available model or fallback
            models = getattr(provider, 'available_models', [])
            model = models[0] if models else "unknown-model"
            
        # Prepare context from retrieved documents
        if retrieved_results:
            context = "\n\n".join([doc.get("document", "") for doc in retrieved_results if doc.get("document")])
        else:
            # Fallback to using the first document directly if no results
            context = AI_DOCUMENTS[0]
        
        # Build prompt with context
        prompt = f"""Answer the following question based on the provided context. 
        If the context doesn't contain relevant information, say so.
        
        Context:
        {context}
        
        Question: {query}
        
        Answer:"""
        
        # Generate response
        response = await provider.generate_completion(
            prompt=prompt,
            model=model,
            temperature=0.3,
            max_tokens=300
        )
        
        # Display the answer
        console.print(Panel(
            response.text,
            title=f"Answer from {provider_key}/{model}",
            border_style="green"
        ))
        
        # Display usage stats
        metrics_table = Table(title="Performance Metrics", box=None)
        metrics_table.add_column("Metric", style="cyan")
        metrics_table.add_column("Value", style="white")
        metrics_table.add_row("Input Tokens", str(response.input_tokens))
        metrics_table.add_row("Output Tokens", str(response.output_tokens))
        metrics_table.add_row("Processing Time", f"{response.processing_time:.2f}s")
        metrics_table.add_row("Cost", f"${response.cost:.6f}")
        
        console.print(metrics_table)

        # Track the generation call
        tracker.add_call(response)

    except Exception as e:
        logger.error(f"Failed to generate response: {str(e)}", emoji_key="error")
    
    console.print()
    
    # Step 6: Clean up
    console.print(Rule("[bold blue]Step 6: Cleaning Up[/bold blue]"))

    # Display cost summary before final cleanup
    tracker.display_summary(console)

    try:
        await kb_manager.delete_knowledge_base(name=KB_NAME)
        logger.success(f"Knowledge base {KB_NAME} deleted successfully", emoji_key="success")
    except Exception as e:
        logger.error(f"Failed to delete knowledge base: {str(e)}", emoji_key="error")
        return 1
    
    return 0

async def main():
    """Run the RAG example."""
    tracker = CostTracker() # Instantiate tracker
    try:
        await run_rag_demo(tracker) # Pass tracker
    except Exception as e:
        logger.critical(f"RAG demo failed unexpectedly: {e}", exc_info=True)
        return 1
    return 0

if __name__ == "__main__":
    # Run the demonstration
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/tournaments/manager.py:
--------------------------------------------------------------------------------

```python
# --- core/tournaments/manager.py (Updates) ---
import asyncio
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple  # Added Type

from pydantic import ValidationError

import ultimate_mcp_server.core.evaluation.evaluators  # Ensures evaluators are registered  # noqa: F401
from ultimate_mcp_server.core.evaluation.base import EVALUATOR_REGISTRY, Evaluator
from ultimate_mcp_server.core.models.tournament import (
    CreateTournamentInput,
    TournamentConfig,  # ModelConfig is nested in TournamentConfig from CreateTournamentInput
    TournamentData,
    TournamentRoundResult,
    TournamentStatus,
)
from ultimate_mcp_server.core.models.tournament import (
    ModelConfig as CoreModelConfig,  # Alias to avoid confusion
)
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tournaments.manager")

STORAGE_DIR = Path(__file__).resolve().parent.parent.parent.parent / "storage" 
TOURNAMENT_STORAGE_BASE = STORAGE_DIR / "tournaments"

class TournamentManager:
    def __init__(self):
        self.tournaments: Dict[str, TournamentData] = {}
        # --- NEW: Store instantiated evaluators per tournament ---
        self.tournament_evaluators: Dict[str, List[Evaluator]] = {}
        TOURNAMENT_STORAGE_BASE.mkdir(parents=True, exist_ok=True)
        logger.info(f"Tournament storage initialized at: {TOURNAMENT_STORAGE_BASE}")
        self._load_all_tournaments()

    def _instantiate_evaluators(self, tournament_id: str, config: TournamentConfig) -> bool:
        """Instantiates and stores evaluators for a tournament."""
        self.tournament_evaluators[tournament_id] = []
        for eval_config in config.evaluators:
            evaluator_cls = EVALUATOR_REGISTRY.get(eval_config.type)
            if not evaluator_cls:
                logger.error(f"Unknown evaluator type '{eval_config.type}' for tournament {tournament_id}. Skipping.")
                # Optionally, fail tournament creation if a critical evaluator is missing
                continue 
            try:
                self.tournament_evaluators[tournament_id].append(evaluator_cls(eval_config.params))
                logger.info(f"Instantiated evaluator '{eval_config.type}' (ID: {eval_config.evaluator_id}) for tournament {tournament_id}")
            except Exception as e:
                logger.error(f"Failed to instantiate evaluator '{eval_config.type}' (ID: {eval_config.evaluator_id}): {e}", exc_info=True)
                # Decide if this is a fatal error for the tournament
                return False # Example: Fail if any evaluator instantiation fails
        return True
    
    def get_evaluators_for_tournament(self, tournament_id: str) -> List[Evaluator]:
        """Returns the list of instantiated evaluators for a given tournament."""
        return self.tournament_evaluators.get(tournament_id, [])

    def create_tournament(self, input_data: CreateTournamentInput) -> Optional[TournamentData]:
        try:
            logger.debug(f"Creating tournament with name: {input_data.name}, {len(input_data.model_configs)} model configs")
            
            # Map input ModelConfig to core ModelConfig used in TournamentConfig
            core_model_configs = [
                CoreModelConfig(
                    model_id=mc.model_id,
                    diversity_count=mc.diversity_count,
                    temperature=mc.temperature,
                    max_tokens=mc.max_tokens,
                    system_prompt=mc.system_prompt,
                    seed=mc.seed
                ) for mc in input_data.model_configs
            ]

            tournament_cfg = TournamentConfig(
                name=input_data.name,
                prompt=input_data.prompt,
                models=core_model_configs, # Use the mapped core_model_configs
                rounds=input_data.rounds,
                tournament_type=input_data.tournament_type,
                extraction_model_id=input_data.extraction_model_id,
                evaluators=input_data.evaluators, # Pass evaluator configs
                max_retries_per_model_call=input_data.max_retries_per_model_call,
                retry_backoff_base_seconds=input_data.retry_backoff_base_seconds,
                max_concurrent_model_calls=input_data.max_concurrent_model_calls
            )
            
            tournament = TournamentData(
                name=input_data.name,
                config=tournament_cfg,
                current_round=-1, # Initialize current_round
                start_time=None,  # Will be set when execution starts
                end_time=None
            )
            
            tournament.storage_path = str(self._get_storage_path(tournament.tournament_id, tournament.name)) # Pass name for better paths
            
            # --- NEW: Instantiate evaluators ---
            if not self._instantiate_evaluators(tournament.tournament_id, tournament.config):
                logger.error(f"Failed to instantiate one or more evaluators for tournament {tournament.name}. Creation aborted.")
                # Clean up if necessary, e.g., remove from self.tournament_evaluators
                if tournament.tournament_id in self.tournament_evaluators:
                    del self.tournament_evaluators[tournament.tournament_id]
                return None # Or raise an error

            self.tournaments[tournament.tournament_id] = tournament
            self._save_tournament_state(tournament)
            logger.info(f"Tournament '{tournament.name}' (ID: {tournament.tournament_id}) created successfully.")
            return tournament
        except ValidationError as ve:
            logger.error(f"Tournament validation failed: {ve}")
            return None
        except Exception as e:
            logger.error(f"Unexpected error creating tournament: {e}", exc_info=True)
            return None

    def get_tournament(self, tournament_id: str, force_reload: bool = False) -> Optional[TournamentData]:
        logger.debug(f"Getting tournament {tournament_id} (force_reload={force_reload})")
        if not force_reload and tournament_id in self.tournaments:
            return self.tournaments[tournament_id]
        
        tournament = self._load_tournament_state(tournament_id)
        if tournament:
            # --- NEW: Ensure evaluators are loaded/re-instantiated if not present ---
            if tournament_id not in self.tournament_evaluators:
                logger.info(f"Evaluators for tournament {tournament_id} not in memory, re-instantiating from config.")
                if not self._instantiate_evaluators(tournament_id, tournament.config):
                    logger.error(f"Failed to re-instantiate evaluators for loaded tournament {tournament_id}. Evaluation might fail.")
            self.tournaments[tournament_id] = tournament # Update cache
        return tournament

    def _save_tournament_state(self, tournament: TournamentData):
        if not tournament.storage_path:
            logger.error(f"Cannot save state for tournament {tournament.tournament_id}: storage_path not set.")
            return
            
        state_file = Path(tournament.storage_path) / "tournament_state.json"
        try:
            state_file.parent.mkdir(parents=True, exist_ok=True)
            # Pydantic's model_dump_json handles datetime to ISO string conversion
            json_data = tournament.model_dump_json(indent=2)
            with open(state_file, 'w', encoding='utf-8') as f:
                f.write(json_data)
            logger.debug(f"Saved state for tournament {tournament.tournament_id} to {state_file}")
        except IOError as e:
            logger.error(f"Failed to save state for tournament {tournament.tournament_id}: {e}")
        except Exception as e: # Catch other potential errors from model_dump_json
            logger.error(f"Error serializing tournament state for {tournament.tournament_id}: {e}", exc_info=True)


    def _load_tournament_state(self, tournament_id: str) -> Optional[TournamentData]:
        # Try finding by explicit ID first (common case for direct access)
        # The storage path might be complex now, so scan might be more reliable if ID is the only input
        
        # Robust scan: iterate through all subdirectories of TOURNAMENT_STORAGE_BASE
        if TOURNAMENT_STORAGE_BASE.exists():
            for potential_tournament_dir in TOURNAMENT_STORAGE_BASE.iterdir():
                if potential_tournament_dir.is_dir():
                    state_file = potential_tournament_dir / "tournament_state.json"
                    if state_file.exists():
                        try:
                            with open(state_file, 'r', encoding='utf-8') as f:
                                data = json.load(f)
                                if data.get("tournament_id") == tournament_id:
                                    # Use Pydantic for robust parsing and type conversion
                                    parsed_tournament = TournamentData.model_validate(data)
                                    logger.debug(f"Loaded state for tournament {tournament_id} from {state_file}")
                                    return parsed_tournament
                        except (IOError, json.JSONDecodeError, ValidationError) as e:
                            logger.warning(f"Failed to load or validate state from {state_file} for tournament ID {tournament_id}: {e}")
                            # Don't return, continue scanning
                        except Exception as e: # Catch any other unexpected error
                            logger.error(f"Unexpected error loading state from {state_file}: {e}", exc_info=True)

        logger.debug(f"Tournament {tournament_id} not found in any storage location during scan.")
        return None

    def _load_all_tournaments(self):
        logger.info(f"Scanning {TOURNAMENT_STORAGE_BASE} for existing tournaments...")
        count = 0
        if not TOURNAMENT_STORAGE_BASE.exists():
            logger.warning("Tournament storage directory does not exist. No tournaments loaded.")
            return
            
        for item in TOURNAMENT_STORAGE_BASE.iterdir():
            if item.is_dir():
                # Attempt to load tournament_state.json from this directory
                state_file = item / "tournament_state.json"
                if state_file.exists():
                    try:
                        with open(state_file, 'r', encoding='utf-8') as f:
                            data = json.load(f)
                        tournament_id_from_file = data.get("tournament_id")
                        if not tournament_id_from_file:
                            logger.warning(f"Skipping directory {item.name}, tournament_state.json missing 'tournament_id'.")
                            continue
                        
                        if tournament_id_from_file not in self.tournaments: # Avoid reloading if already cached by some other means
                            # Use the get_tournament method which handles re-instantiating evaluators
                            loaded_tournament = self.get_tournament(tournament_id_from_file, force_reload=True)
                            if loaded_tournament:
                                count += 1
                                logger.debug(f"Loaded tournament '{loaded_tournament.name}' (ID: {loaded_tournament.tournament_id}) from {item.name}")
                            else:
                                logger.warning(f"Failed to fully load tournament from directory: {item.name} (ID in file: {tournament_id_from_file})")
                    except (IOError, json.JSONDecodeError, ValidationError) as e:
                        logger.warning(f"Error loading tournament from directory {item.name}: {e}")
                    except Exception as e:
                        logger.error(f"Unexpected error loading tournament from {item.name}: {e}", exc_info=True)
        logger.info(f"Finished scan. Loaded {count} existing tournaments into manager.")

    def start_tournament_execution(self, tournament_id: str) -> bool:
        logger.debug(f"Attempting to start tournament execution for {tournament_id}")
        tournament = self.get_tournament(tournament_id) # Ensures evaluators are loaded
        if not tournament:
            logger.error(f"Cannot start execution: Tournament {tournament_id} not found.")
            return False
        
        if tournament.status not in [TournamentStatus.PENDING, TournamentStatus.CREATED]:
            logger.warning(f"Tournament {tournament_id} is not in a runnable state ({tournament.status}). Cannot start.")
            return False

        tournament.status = TournamentStatus.RUNNING # Or QUEUED if worker mode is implemented
        tournament.start_time = datetime.now(timezone.utc)
        tournament.current_round = 0 # Explicitly set to 0 when starting
        # Ensure rounds_results is initialized if empty
        if not tournament.rounds_results:
            tournament.rounds_results = [
                TournamentRoundResult(round_num=i) for i in range(tournament.config.rounds)
            ]

        self._save_tournament_state(tournament)
        logger.info(f"Tournament {tournament_id} status set to {tournament.status}, ready for async execution.")

        try:
            from ultimate_mcp_server.core.tournaments.tasks import (
                run_tournament_async,  # Local import
            )
            asyncio.create_task(run_tournament_async(tournament_id)) 
            logger.info(f"Asyncio task created for tournament {tournament_id}.")
            return True
        except Exception as e:
             logger.error(f"Error creating asyncio task for tournament {tournament_id}: {e}", exc_info=True)
             tournament.status = TournamentStatus.FAILED
             tournament.error_message = f"Failed during asyncio task creation: {str(e)}"
             tournament.end_time = datetime.now(timezone.utc)
             self._save_tournament_state(tournament)
             return False

    async def cancel_tournament(self, tournament_id: str) -> Tuple[bool, str, TournamentStatus]: # Return final status
        """Attempts to cancel a tournament. Returns success, message, and final status."""
        tournament = self.get_tournament(tournament_id, force_reload=True)
        if not tournament:
            logger.warning(f"Cannot cancel non-existent tournament {tournament_id}")
            # Use FAILED or a specific status for "not found" if added to enum,
            # or rely on the tool layer to raise 404. For manager, FAILED can represent this.
            return False, "Tournament not found.", TournamentStatus.FAILED 

        current_status = tournament.status
        final_status = current_status # Default to current status if no change
        message = ""

        if current_status == TournamentStatus.RUNNING or current_status == TournamentStatus.QUEUED:
            logger.info(f"Attempting to cancel tournament {tournament_id} (status: {current_status})...")
            tournament.status = TournamentStatus.CANCELLED
            tournament.error_message = tournament.error_message or "Tournament cancelled by user request."
            tournament.end_time = datetime.now(timezone.utc)
            final_status = TournamentStatus.CANCELLED
            message = "Cancellation requested. Tournament status set to CANCELLED."
            self._save_tournament_state(tournament)
            logger.info(f"Tournament {tournament_id} status set to CANCELLED.")
            # The background task needs to observe this status.
            return True, message, final_status
        elif current_status in [TournamentStatus.COMPLETED, TournamentStatus.FAILED, TournamentStatus.CANCELLED]:
             message = f"Tournament {tournament_id} is already finished or cancelled (Status: {current_status})."
             logger.warning(message)
             return False, message, final_status
        elif current_status == TournamentStatus.PENDING or current_status == TournamentStatus.CREATED:
            tournament.status = TournamentStatus.CANCELLED
            tournament.error_message = "Tournament cancelled before starting."
            tournament.end_time = datetime.now(timezone.utc)
            final_status = TournamentStatus.CANCELLED
            message = "Pending/Created tournament cancelled successfully."
            self._save_tournament_state(tournament)
            logger.info(f"Pending/Created tournament {tournament_id} cancelled.")
            return True, message, final_status
        else:
            # Should not happen, but handle unknown state
            message = f"Tournament {tournament_id} is in an unexpected state ({current_status}). Cannot determine cancellation action."
            logger.error(message)
            return False, message, current_status


    def list_tournaments(self) -> List[Dict[str, Any]]:
        # Ensure cache is up-to-date if new tournaments might have been added externally (less likely with file storage)
        # self._load_all_tournaments() # Consider if this is too expensive for every list call
        
        basic_list = []
        for t_data in self.tournaments.values():
            basic_list.append({
                "tournament_id": t_data.tournament_id,
                "name": t_data.name,
                "tournament_type": t_data.config.tournament_type,
                "status": t_data.status,
                "current_round": t_data.current_round,
                "total_rounds": t_data.config.rounds,
                "created_at": t_data.created_at.isoformat() if t_data.created_at else None, # Ensure ISO format
                "updated_at": t_data.updated_at.isoformat() if t_data.updated_at else None,
                "start_time": t_data.start_time.isoformat() if t_data.start_time else None,
                "end_time": t_data.end_time.isoformat() if t_data.end_time else None,
            })
        basic_list.sort(key=lambda x: x['created_at'] or '', reverse=True) # Handle None created_at for sorting
        return basic_list
        
    def _get_storage_path(self, tournament_id: str, tournament_name: str) -> Path:
        timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        # Sanitize tournament name for use in path
        safe_name = re.sub(r'[^\w\s-]', '', tournament_name).strip().replace(' ', '_')
        safe_name = re.sub(r'[-\s]+', '-', safe_name) # Replace multiple spaces/hyphens with single hyphen
        safe_name = safe_name[:50] # Limit length
        
        # Use first 8 chars of UUID for brevity if name is too generic or empty
        uuid_suffix = tournament_id.split('-')[0]
        
        folder_name = f"{timestamp_str}_{safe_name}_{uuid_suffix}" if safe_name else f"{timestamp_str}_{uuid_suffix}"
        
        path = TOURNAMENT_STORAGE_BASE / folder_name
        path.mkdir(parents=True, exist_ok=True) # Ensure directory is created
        return path

tournament_manager = TournamentManager()
```

--------------------------------------------------------------------------------
/examples/audio_transcription_demo.py:
--------------------------------------------------------------------------------

```python
"""Demonstration script for audio transcription using faster-whisper.

This version uses the faster-whisper library which offers better performance than whisper.cpp.
"""

import asyncio
import os
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Tuple

from rich import box
from rich.console import Console
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
from rich.progress import (
    BarColumn,
    Progress,
    SpinnerColumn,
    TextColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
)
from rich.rule import Rule
from rich.table import Table

# Add the project root to the Python path
# This allows finding the ultimate package when running the script directly
project_root = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(project_root))

EXAMPLE_DIR = Path(__file__).parent
DATA_DIR = EXAMPLE_DIR / "data"
SAMPLE_AUDIO_PATH = str(DATA_DIR / "Steve_Jobs_Introducing_The_iPhone_compressed.mp3")

from ultimate_mcp_server.utils import get_logger  # noqa: E402

# --- Configuration ---
logger = get_logger("audio_demo")

# Get the directory of the current script
SCRIPT_DIR = Path(__file__).parent.resolve()
DATA_DIR = SCRIPT_DIR / "data"

# Define allowed audio extensions
ALLOWED_EXTENSIONS = [".mp3", ".wav", ".flac", ".ogg", ".m4a"]

# --- Helper Functions ---
def find_audio_files(directory: Path) -> List[Path]:
    """Finds audio files with allowed extensions in the given directory."""
    return [p for p in directory.iterdir() if p.is_file() and p.suffix.lower() in ALLOWED_EXTENSIONS]

def format_timestamp(seconds: float) -> str:
    """Format seconds into a timestamp string."""
    hours = int(seconds / 3600)
    minutes = int((seconds % 3600) / 60)
    secs = seconds % 60
    if hours > 0:
        return f"{hours:02d}:{minutes:02d}:{secs:05.2f}"
    else:
        return f"{minutes:02d}:{secs:05.2f}"

def detect_device() -> Tuple[str, str, str]:
    """Detect if CUDA GPU is available and return appropriate device and compute_type."""
    try:
        # Import torch to check if CUDA is available
        import torch
        if torch.cuda.is_available():
            # Get GPU info for display
            gpu_name = torch.cuda.get_device_name(0)
            return "cuda", "float16", gpu_name
        else:
            return "cpu", "int8", None
    except ImportError:
        # If torch is not available, try to directly check for NVIDIA GPUs with ctranslate2
        try:
            import subprocess
            nvidia_smi_output = subprocess.check_output(["nvidia-smi", "-L"], text=True, stderr=subprocess.DEVNULL)
            if "GPU" in nvidia_smi_output:
                # Extract GPU name
                gpu_name = nvidia_smi_output.strip().split(':')[1].strip().split('(')[0].strip()
                return "cuda", "float16", gpu_name
            else:
                return "cpu", "int8", None
        except Exception:
            # If all else fails, default to CPU
            return "cpu", "int8", None

def generate_markdown_transcript(transcript: Dict[str, Any], file_path: str) -> str:
    """Generate a markdown version of the transcript with metadata."""
    audio_filename = os.path.basename(file_path)
    metadata = transcript.get("metadata", {})
    segments = transcript.get("segments", [])
    
    markdown = [
        f"# Transcript: {audio_filename}",
        "",
        "## Metadata",
        f"- **Duration:** {format_timestamp(metadata.get('duration', 0))}",
        f"- **Language:** {metadata.get('language', 'unknown')} (confidence: {metadata.get('language_probability', 0):.2f})",
        f"- **Transcription Model:** {metadata.get('model', 'unknown')}",
        f"- **Device:** {metadata.get('device', 'unknown')}",
        f"- **Processing Time:** {transcript.get('processing_time', {}).get('total', 0):.2f} seconds",
        "",
        "## Full Transcript",
        "",
        transcript.get("enhanced_transcript", transcript.get("raw_transcript", "")),
        "",
        "## Segments",
        ""
    ]
    
    for segment in segments:
        start_time = format_timestamp(segment["start"])
        end_time = format_timestamp(segment["end"])
        markdown.append(f"**[{start_time} → {end_time}]** {segment['text']}")
        markdown.append("")
    
    return "\n".join(markdown)

def save_markdown_transcript(transcript: Dict[str, Any], file_path: str) -> Tuple[str, str]:
    """Save the transcript as markdown and text files.
    
    Returns:
        Tuple containing paths to markdown and text files
    """
    audio_path = Path(file_path)
    markdown_path = audio_path.with_suffix(".md")
    txt_path = audio_path.with_suffix(".txt")
    
    # Generate and save markdown (enhanced transcript)
    markdown_content = generate_markdown_transcript(transcript, file_path)
    with open(markdown_path, "w", encoding="utf-8") as f:
        f.write(markdown_content)
    
    # Save raw transcript as plain text file
    with open(txt_path, "w", encoding="utf-8") as f:
        f.write(transcript.get("raw_transcript", ""))
    
    return str(markdown_path), str(txt_path)

async def enhance_transcript_with_llm(raw_transcript: str, console: Console) -> str:
    """Enhance the transcript using an LLM to improve readability."""
    try:
        from ultimate_mcp_server.tools.completion import chat_completion
    except ImportError:
        console.print("[yellow]Ultimate MCP Server tools not available for enhancement. Using raw transcript.[/yellow]")
        return raw_transcript
    
    # Setup progress display
    with Progress(
        SpinnerColumn(),
        TextColumn("[bold green]Enhancing transcript with LLM[/bold green]"),
        BarColumn(),
        TextColumn("[cyan]{task.percentage:>3.0f}%"),
        TimeElapsedColumn(),
        console=console
    ) as progress:
        enhance_task = progress.add_task("Enhancing...", total=100)
        
        try:
            # Create the prompt for transcript enhancement
            system_prompt = """You are an expert transcription editor. Your task is to enhance the following raw transcript:
1. Fix any spelling or grammar errors
2. Add proper punctuation and capitalization
3. Format the text into logical paragraphs
4. Remove filler words and repeated phrases
5. Preserve the original meaning and all factual content
6. Format numbers, acronyms, and technical terms consistently
7. Keep the text faithful to the original but make it more readable"""

            user_prompt = f"Here is the raw transcript to enhance:\n\n{raw_transcript}\n\nPlease provide only the enhanced transcript without explanations."

            # Split the transcript into chunks if it's very long
            progress.update(enhance_task, completed=20)
            
            # Call the chat completion function
            result = await chat_completion(
                system_prompt=system_prompt,
                messages=[{"role": "user", "content": user_prompt}],
                model="gpt-4.1-mini",
                temperature=0.3,
            )
            
            progress.update(enhance_task, completed=90)
            
            enhanced_transcript = result.get("content", raw_transcript)
            
            progress.update(enhance_task, completed=100)
            
            return enhanced_transcript
        
        except Exception as e:
            console.print(f"[red]Error enhancing transcript: {e}[/red]")
            progress.update(enhance_task, completed=100)
            return raw_transcript

async def transcribe_with_faster_whisper(file_path: str, console: Console) -> Dict[str, Any]:
    """Transcribe audio using faster-whisper library with real-time progress updates."""
    logger.info(f"Processing file: {file_path}")
    
    # Check if audio file exists
    if not os.path.exists(file_path):
        logger.error(f"Audio file not found at {file_path}")
        return {"success": False, "error": f"Audio file not found at {file_path}"}
    
    try:
        # Import faster-whisper - install if not present
        try:
            from faster_whisper import WhisperModel
        except ImportError:
            console.print("[yellow]faster-whisper not installed. Installing now...[/yellow]")
            import subprocess
            subprocess.check_call([sys.executable, "-m", "pip", "install", "faster-whisper"])
            from faster_whisper import WhisperModel
        
        # Start timing
        start_time = time.time()
        
        # Get audio duration for progress calculation
        audio_duration = 0
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]{task.description}"),
            console=console
        ) as progress:
            analysis_task = progress.add_task("Analyzing audio file...", total=None)
            try:
                import av
                with av.open(file_path) as container:
                    # Get duration in seconds
                    if container.duration is not None:
                        audio_duration = container.duration / 1000000  # microseconds to seconds
                        console.print(f"Audio duration: [cyan]{format_timestamp(audio_duration)}[/cyan] seconds")
                        progress.update(analysis_task, completed=True)
            except Exception as e:
                console.print(f"[yellow]Could not determine audio duration: {e}[/yellow]")
        
        # Detect device (CPU or GPU)
        device, compute_type, gpu_name = detect_device()
        
        # Load the model with progress
        model_size = "large-v3"
        console.print(f"Loading Whisper model: [bold]{model_size}[/bold]")
        
        if device == "cuda" and gpu_name:
            console.print(f"Using device: [bold green]GPU ({gpu_name})[/bold green], compute_type: [bold cyan]{compute_type}[/bold cyan]")
        else:
            console.print(f"Using device: [bold yellow]CPU[/bold yellow], compute_type: [bold cyan]{compute_type}[/bold cyan]")
        
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]{task.description}"),
            BarColumn(),
            TextColumn("[bold cyan]{task.percentage:>3.0f}%"),
            console=console
        ) as progress:
            load_task = progress.add_task("Loading model...", total=100)
            model = WhisperModel(model_size, device=device, compute_type=compute_type, download_root="./models")
            progress.update(load_task, completed=100)
        
        # Setup progress display for transcription
        console.print("\n[bold green]Starting transcription...[/bold green]")
        
        # Create table for displaying transcribed segments in real time
        table = Table(title="Transcription Progress", expand=True, box=box.ROUNDED)
        table.add_column("Segment")
        table.add_column("Time", style="yellow")
        table.add_column("Text", style="white")
        
        # Progress bar for overall transcription
        progress = Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]Transcribing..."),
            BarColumn(),
            TextColumn("[cyan]{task.percentage:>3.0f}%"),
            TimeElapsedColumn(),
            TimeRemainingColumn(),
        )
        
        # Add main progress task
        transcribe_task = progress.add_task("Transcription", total=100)
        
        # Combine table and progress bar
        transcription_display = Table.grid()
        transcription_display.add_row(table)
        transcription_display.add_row(progress)
        
        segments_list = []
        segment_idx = 0
        
        # Run the transcription with live updating display
        with Live(transcription_display, console=console, refresh_per_second=10) as live:
            # Run transcription
            segments, info = model.transcribe(
                file_path,
                beam_size=5,
                vad_filter=True,
                word_timestamps=True,
                language="en",  # Specify language to avoid language detection phase
            )
            
            # Process segments as they become available
            for segment in segments:
                segments_list.append(segment)
                
                # Update progress bar based on timestamp
                if audio_duration > 0:
                    current_progress = min(int((segment.end / audio_duration) * 100), 99)
                    progress.update(transcribe_task, completed=current_progress)
                
                # Add segment to table
                timestamp = f"[{format_timestamp(segment.start)} → {format_timestamp(segment.end)}]"
                table.add_row(
                    f"[cyan]#{segment_idx+1}[/cyan]", 
                    timestamp, 
                    segment.text
                )
                
                # Update the live display
                live.update(transcription_display)
                segment_idx += 1
            
            # Finish progress
            progress.update(transcribe_task, completed=100)
        
        # Build full transcript
        raw_transcript = " ".join([segment.text for segment in segments_list])
        
        # Convert segments to dictionary format
        segments_dict = []
        for segment in segments_list:
            segments_dict.append({
                "start": segment.start,
                "end": segment.end,
                "text": segment.text,
                "words": [{"word": word.word, "start": word.start, "end": word.end, "probability": word.probability} 
                         for word in (segment.words or [])]
            })
        
        # Enhance the transcript with LLM
        console.print("\n[bold green]Raw transcription complete. Now enhancing the transcript...[/bold green]")
        enhanced_transcript = await enhance_transcript_with_llm(raw_transcript, console)
        
        # Calculate processing time
        processing_time = time.time() - start_time
        
        # Create the result dictionary
        result = {
            "success": True,
            "raw_transcript": raw_transcript,
            "enhanced_transcript": enhanced_transcript,
            "segments": segments_dict,
            "metadata": {
                "language": info.language,
                "language_probability": info.language_probability,
                "model": model_size,
                "duration": audio_duration,
                "device": device
            },
            "processing_time": {
                "total": processing_time,
                "transcription": processing_time
            }
        }
        
        # Save the transcripts
        markdown_path, txt_path = save_markdown_transcript(result, file_path)
        console.print(f"\n[bold green]Saved enhanced transcript to:[/bold green] [cyan]{markdown_path}[/cyan]")
        console.print(f"[bold green]Saved raw transcript to:[/bold green] [cyan]{txt_path}[/cyan]")
        
        return result
                
    except Exception as e:
        import traceback
        logger.error(f"Transcription error: {e}")
        logger.error(traceback.format_exc())
        return {"success": False, "error": f"Transcription error: {e}"}


async def main():
    """Runs the audio transcription demonstrations."""

    logger.info("Starting Audio Transcription Demo", emoji_key="audio")

    console = Console()
    console.print(Rule("[bold green]Audio Transcription Demo (faster-whisper)[/bold green]"))

    # --- Find Audio Files ---
    audio_files = find_audio_files(DATA_DIR)
    if not audio_files:
        console.print(f"[bold red]Error:[/bold red] No audio files found in {DATA_DIR}. Please place audio files (e.g., .mp3, .wav) there.")
        return

    console.print(f"Found {len(audio_files)} audio file(s) in {DATA_DIR}:")
    for f in audio_files:
        console.print(f"- [cyan]{f.name}[/cyan]")
    console.print()

    # --- Process Each File ---
    for file_path in audio_files:
        try:
            console.print(Panel(
                f"Processing file: [cyan]{escape(str(file_path))}[/cyan]",
                title="Audio Transcription",
                border_style="blue"
            ))

            # Call our faster-whisper transcription function
            result = await transcribe_with_faster_whisper(str(file_path), console)
            
            if result.get("success", False):
                console.print(f"[green]Transcription successful for {escape(str(file_path))}.[/green]")
                
                # Show comparison of raw vs enhanced transcript
                if "raw_transcript" in result and "enhanced_transcript" in result:
                    comparison = Table(title="Transcript Comparison", expand=True, box=box.ROUNDED)
                    comparison.add_column("Raw Transcript", style="yellow")
                    comparison.add_column("Enhanced Transcript", style="green")
                    
                    # Limit to a preview of the first part
                    raw_preview = result["raw_transcript"][:500] + ("..." if len(result["raw_transcript"]) > 500 else "")
                    enhanced_preview = result["enhanced_transcript"][:500] + ("..." if len(result["enhanced_transcript"]) > 500 else "")
                    
                    comparison.add_row(raw_preview, enhanced_preview)
                    console.print(comparison)
                    
                    # Display metadata if available
                    if "metadata" in result and result["metadata"]:
                        console.print("[bold]Metadata:[/bold]")
                        for key, value in result["metadata"].items():
                            console.print(f"  - [cyan]{key}[/cyan]: {value}")
                    
                    # Display processing time
                    if "processing_time" in result:
                        console.print("[bold]Processing Times:[/bold]")
                        for key, value in result["processing_time"].items():
                            if isinstance(value, (int, float)):
                                console.print(f"  - [cyan]{key}[/cyan]: {value:.2f}s")
                            else:
                                console.print(f"  - [cyan]{key}[/cyan]: {value}")
                else:
                    console.print("[yellow]Warning:[/yellow] No transcript was returned.")
            else:
                console.print(f"[bold red]Transcription failed:[/bold red] {escape(result.get('error', 'Unknown error'))}")
            
            console.print() # Add a blank line between files
                
        except Exception as outer_e:
            import traceback
            console.print(f"[bold red]Unexpected error processing file {escape(str(file_path))}:[/bold red] {escape(str(outer_e))}")
            console.print("[bold red]Traceback:[/bold red]")
            console.print(escape(traceback.format_exc()))
            continue # Move to the next file

    logger.info("Audio Transcription Demo Finished", emoji_key="audio")

if __name__ == "__main__":
    # Basic error handling for the async execution itself
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"An error occurred running the demo: {e}")
        import traceback
        traceback.print_exc() 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/panels.py:
--------------------------------------------------------------------------------

```python
"""
Panel definitions for Ultimate MCP Server logging system.

This module provides specialized panels for different types of output like
headers, results, errors, warnings, etc.
"""

from typing import Any, Dict, List, Optional, Union

from rich.box import SIMPLE
from rich.columns import Columns
from rich.console import ConsoleRenderable
from rich.panel import Panel
from rich.syntax import Syntax
from rich.table import Table
from rich.text import Text

from ultimate_mcp_server.utils.logging.console import console
from ultimate_mcp_server.utils.logging.emojis import ERROR, INFO, SUCCESS, WARNING


class HeaderPanel:
    """Panel for section headers."""
    
    def __init__(
        self,
        title: str,
        subtitle: Optional[str] = None,
        component: Optional[str] = None,
        style: str = "bright_blue",
    ):
        """Initialize a header panel.
        
        Args:
            title: Panel title
            subtitle: Optional subtitle
            component: Optional component name
            style: Panel style
        """
        self.title = title
        self.subtitle = subtitle
        self.component = component
        self.style = style
    
    def __rich__(self) -> ConsoleRenderable:
        """Render the panel."""
        # Create the title text
        title_text = Text()
        title_text.append("- ", style="bright_black")
        title_text.append(self.title, style="bold")
        title_text.append(" -", style="bright_black")
        
        # Create the content
        content = Text()
        
        if self.component:
            content.append(f"[{self.component}] ", style="component")
            
        if self.subtitle:
            content.append(self.subtitle)
            
        return Panel(
            content,
            title=title_text,
            title_align="center",
            border_style=self.style,
            expand=True,
            padding=(1, 2),
        )

class ResultPanel:
    """Panel for displaying operation results."""
    
    def __init__(
        self,
        title: str,
        results: Union[List[Dict[str, Any]], Dict[str, Any]],
        status: str = "success",
        component: Optional[str] = None,
        show_count: bool = True,
        compact: bool = False,
    ):
        """Initialize a result panel.
        
        Args:
            title: Panel title
            results: Results to display (list of dicts or single dict)
            status: Result status (success, warning, error)
            component: Optional component name
            show_count: Whether to show result count in title
            compact: Whether to use a compact display style
        """
        self.title = title
        self.results = results if isinstance(results, list) else [results]
        self.status = status.lower()
        self.component = component
        self.show_count = show_count
        self.compact = compact
    
    def __rich__(self) -> ConsoleRenderable:
        """Render the panel."""
        # Determine style and emoji based on status
        if self.status == "success":
            style = "result.success"
            emoji = SUCCESS
        elif self.status == "warning":
            style = "result.warning"
            emoji = WARNING
        elif self.status == "error":
            style = "result.error"
            emoji = ERROR
        else:
            style = "result.info"
            emoji = INFO
            
        # Create title
        title_text = Text()
        title_text.append(f"{emoji} ", style=style)
        title_text.append(self.title, style=f"bold {style}")
        
        if self.show_count and len(self.results) > 0:
            title_text.append(f" ({len(self.results)} items)", style="bright_black")
            
        # Create content
        if self.compact:
            # Compact mode - just show key/value list
            rows = []
            for item in self.results:
                for k, v in item.items():
                    rows.append({
                        "key": k,
                        "value": self._format_value(v),
                    })
            
            table = Table(box=None, expand=True, show_header=False)
            table.add_column("Key", style="data.key")
            table.add_column("Value", style="", overflow="fold")
            
            for row in rows:
                table.add_row(row["key"], row["value"])
                
            content = table
        else:
            # Full mode - create a table per result item
            tables = []
            
            for i, item in enumerate(self.results):
                if not item:  # Skip empty items
                    continue
                    
                table = Table(
                    box=SIMPLE,
                    title=f"Item {i+1}" if len(self.results) > 1 else None,
                    title_style="bright_black",
                    expand=True,
                    show_header=False,
                )
                table.add_column("Key", style="data.key")
                table.add_column("Value", style="", overflow="fold")
                
                for k, v in item.items():
                    table.add_row(k, self._format_value(v))
                    
                tables.append(table)
                
            content = Columns(tables) if len(tables) > 1 else tables[0] if tables else Text("No results")
            
        # Return the panel
        return Panel(
            content,
            title=title_text,
            border_style=style,
            expand=True,
            padding=(1, 1),
        )
        
    def _format_value(self, value: Any) -> str:
        """Format a value for display.
        
        Args:
            value: Value to format
            
        Returns:
            Formatted string
        """
        if value is None:
            return "[dim]None[/dim]"
        elif isinstance(value, bool):
            return str(value)
        elif isinstance(value, (int, float)):
            return str(value)
        elif isinstance(value, list):
            return ", ".join(self._format_value(v) for v in value[:5]) + \
                   (f" ... (+{len(value) - 5} more)" if len(value) > 5 else "")
        elif isinstance(value, dict):
            if len(value) == 0:
                return "{}"
            else:
                return "{...}"  # Just indicate there's content
        else:
            return str(value)

class InfoPanel:
    """Panel for displaying information."""
    
    def __init__(
        self,
        title: str,
        content: Union[str, List[str], Dict[str, Any]],
        icon: Optional[str] = None,
        style: str = "info",
    ):
        """Initialize an information panel.
        
        Args:
            title: Panel title
            content: Content to display (string, list, or dict)
            icon: Emoji or icon character
            style: Style name to apply (from theme)
        """
        self.title = title
        self.content = content
        self.icon = icon or INFO
        self.style = style
    
    def __rich__(self) -> ConsoleRenderable:
        """Render the panel."""
        # Create title
        title_text = Text()
        title_text.append(f"{self.icon} ", style=self.style)
        title_text.append(self.title, style=f"bold {self.style}")
        
        # Format content based on type
        if isinstance(self.content, str):
            content = Text(self.content)
        elif isinstance(self.content, list):
            content = Text()
            for i, item in enumerate(self.content):
                if i > 0:
                    content.append("\n")
                content.append(f"• {item}")
        elif isinstance(self.content, dict):
            # Create a table for dict content
            table = Table(box=None, expand=True, show_header=False)
            table.add_column("Key", style="data.key")
            table.add_column("Value", style="", overflow="fold")
            
            for k, v in self.content.items():
                table.add_row(k, str(v))
                
            content = table
        else:
            content = Text(str(self.content))
            
        # Return the panel
        return Panel(
            content,
            title=title_text,
            border_style=self.style,
            expand=True,
            padding=(1, 2),
        )

class WarningPanel:
    """Panel for displaying warnings."""
    
    def __init__(
        self,
        title: Optional[str] = None,
        message: str = "",
        details: Optional[List[str]] = None,
    ):
        """Initialize a warning panel.
        
        Args:
            title: Optional panel title
            message: Main warning message
            details: Optional list of detail points
        """
        self.title = title or "Warning"
        self.message = message
        self.details = details or []
    
    def __rich__(self) -> ConsoleRenderable:
        """Render the panel."""
        # Create title
        title_text = Text()
        title_text.append(f"{WARNING} ", style="warning")
        title_text.append(self.title, style="bold warning")
        
        # Create content
        content = Text()
        
        # Add message
        if self.message:
            content.append(self.message)
            
        # Add details if any
        if self.details and len(self.details) > 0:
            if self.message:
                content.append("\n\n")
                
            content.append("Details:", style="bold")
            content.append("\n")
            
            for i, detail in enumerate(self.details):
                if i > 0:
                    content.append("\n")
                content.append(f"• {detail}")
                
        # Return the panel
        return Panel(
            content,
            title=title_text,
            border_style="warning",
            expand=True,
            padding=(1, 2),
        )

class ErrorPanel:
    """Panel for displaying errors."""
    
    def __init__(
        self,
        title: Optional[str] = None,
        message: str = "",
        details: Optional[str] = None,
        resolution_steps: Optional[List[str]] = None,
        error_code: Optional[str] = None,
    ):
        """Initialize an error panel.
        
        Args:
            title: Optional panel title
            message: Main error message
            details: Optional error details
            resolution_steps: Optional list of steps to resolve the error
            error_code: Optional error code for reference
        """
        self.title = title or "Error"
        self.message = message
        self.details = details
        self.resolution_steps = resolution_steps or []
        self.error_code = error_code
    
    def __rich__(self) -> ConsoleRenderable:
        """Render the panel."""
        # Create title
        title_text = Text()
        title_text.append(f"{ERROR} ", style="error")
        title_text.append(self.title, style="bold error")
        
        if self.error_code:
            title_text.append(f" [{self.error_code}]", style="bright_black")
            
        # Create content
        content = Text()
        
        # Add message
        if self.message:
            content.append(self.message, style="bold")
            
        # Add details if any
        if self.details:
            if self.message:
                content.append("\n\n")
                
            content.append(self.details)
            
        # Add resolution steps if any
        if self.resolution_steps and len(self.resolution_steps) > 0:
            if self.message or self.details:
                content.append("\n\n")
                
            content.append("Resolution steps:", style="bold")
            content.append("\n")
            
            for i, step in enumerate(self.resolution_steps):
                if i > 0:
                    content.append("\n")
                content.append(f"{i+1}. {step}")
                
        # Return the panel
        return Panel(
            content,
            title=title_text,
            border_style="error",
            expand=True,
            padding=(1, 2),
        )

class ToolOutputPanel:
    """Panel for displaying tool command output."""
    
    def __init__(
        self,
        tool: str,
        command: str,
        output: str,
        status: str = "success",
        duration: Optional[float] = None,
    ):
        """Initialize a tool output panel.
        
        Args:
            tool: Tool name (ripgrep, awk, jq, etc.)
            command: Command that was executed
            output: Command output text
            status: Execution status (success, error)
            duration: Optional execution duration in seconds
        """
        self.tool = tool
        self.command = command
        self.output = output
        self.status = status.lower()
        self.duration = duration
    
    def __rich__(self) -> ConsoleRenderable:
        """Render the panel."""
        # Determine style and emoji based on status
        if self.status == "success":
            style = "tool.success"
            emoji = SUCCESS
        else:
            style = "tool.error"
            emoji = ERROR
            
        # Create title
        title_text = Text()
        title_text.append(f"{emoji} ", style=style)
        title_text.append(f"{self.tool}", style=f"bold {style}")
        
        if self.duration is not None:
            title_text.append(f" ({self.duration:.2f}s)", style="tool.duration")
            
        # Create content
        content = Columns(
            [
                Panel(
                    Text(self.command, style="tool.command"),
                    title="Command",
                    title_style="bright_black",
                    border_style="tool.command",
                    padding=(1, 1),
                ),
                Panel(
                    Text(self.output, style="tool.output"),
                    title="Output",
                    title_style="bright_black",
                    border_style="bright_black",
                    padding=(1, 1),
                ),
            ],
            expand=True,
            padding=(0, 1),
        )
            
        # Return the panel
        return Panel(
            content,
            title=title_text,
            border_style=style,
            expand=True,
            padding=(1, 1),
        )

class CodePanel:
    """Panel for displaying code with syntax highlighting."""
    
    def __init__(
        self,
        code: str,
        language: str = "python",
        title: Optional[str] = None,
        line_numbers: bool = True,
        highlight_lines: Optional[List[int]] = None,
    ):
        """Initialize a code panel.
        
        Args:
            code: The code to display
            language: Programming language for syntax highlighting
            title: Optional panel title
            line_numbers: Whether to show line numbers
            highlight_lines: List of line numbers to highlight
        """
        self.code = code
        self.language = language
        self.title = title
        self.line_numbers = line_numbers
        self.highlight_lines = highlight_lines
    
    def __rich__(self) -> ConsoleRenderable:
        """Render the panel."""
        # Create syntax highlighting component
        syntax = Syntax(
            self.code,
            self.language,
            theme="monokai",
            line_numbers=self.line_numbers,
            highlight_lines=self.highlight_lines,
        )
        
        # Create title
        if self.title:
            title_text = Text(self.title)
        else:
            title_text = Text()
            title_text.append(self.language.capitalize(), style="bright_blue bold")
            title_text.append(" Code", style="bright_black")
            
        # Return the panel
        return Panel(
            syntax,
            title=title_text,
            border_style="bright_blue",
            expand=True,
            padding=(0, 0),
        )

# Helper functions for creating panels

def display_header(
    title: str,
    subtitle: Optional[str] = None,
    component: Optional[str] = None,
) -> None:
    """Display a section header.
    
    Args:
        title: Section title
        subtitle: Optional subtitle
        component: Optional component name
    """
    panel = HeaderPanel(title, subtitle, component)
    console.print(panel)

def display_results(
    title: str,
    results: Union[List[Dict[str, Any]], Dict[str, Any]],
    status: str = "success",
    component: Optional[str] = None,
    show_count: bool = True,
    compact: bool = False,
) -> None:
    """Display operation results.
    
    Args:
        title: Results title
        results: Results to display (list of dicts or single dict)
        status: Result status (success, warning, error)
        component: Optional component name
        show_count: Whether to show result count in title
        compact: Whether to use a compact display style
    """
    panel = ResultPanel(title, results, status, component, show_count, compact)
    console.print(panel)

def display_info(
    title: str,
    content: Union[str, List[str], Dict[str, Any]],
    icon: Optional[str] = None,
    style: str = "info",
) -> None:
    """Display an information panel.
    
    Args:
        title: Panel title
        content: Content to display (string, list, or dict)
        icon: Emoji or icon character
        style: Style name to apply (from theme)
    """
    panel = InfoPanel(title, content, icon, style)
    console.print(panel)

def display_warning(
    title: Optional[str] = None,
    message: str = "",
    details: Optional[List[str]] = None,
) -> None:
    """Display a warning panel.
    
    Args:
        title: Optional panel title
        message: Main warning message
        details: Optional list of detail points
    """
    panel = WarningPanel(title, message, details)
    console.print(panel)

def display_error(
    title: Optional[str] = None,
    message: str = "",
    details: Optional[str] = None,
    resolution_steps: Optional[List[str]] = None,
    error_code: Optional[str] = None,
) -> None:
    """Display an error panel.
    
    Args:
        title: Optional panel title
        message: Main error message
        details: Optional error details
        resolution_steps: Optional list of steps to resolve the error
        error_code: Optional error code for reference
    """
    panel = ErrorPanel(title, message, details, resolution_steps, error_code)
    console.print(panel)

def display_tool_output(
    tool: str,
    command: str,
    output: str,
    status: str = "success",
    duration: Optional[float] = None,
) -> None:
    """Display tool command output.
    
    Args:
        tool: Tool name (ripgrep, awk, jq, etc.)
        command: Command that was executed
        output: Command output text
        status: Execution status (success, error)
        duration: Optional execution duration in seconds
    """
    panel = ToolOutputPanel(tool, command, output, status, duration)
    console.print(panel)

def display_code(
    code: str,
    language: str = "python",
    title: Optional[str] = None,
    line_numbers: bool = True,
    highlight_lines: Optional[List[int]] = None,
) -> None:
    """Display code with syntax highlighting.
    
    Args:
        code: The code to display
        language: Programming language for syntax highlighting
        title: Optional panel title
        line_numbers: Whether to show line numbers
        highlight_lines: List of line numbers to highlight
    """
    panel = CodePanel(code, language, title, line_numbers, highlight_lines)
    console.print(panel) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/evaluation/evaluators.py:
--------------------------------------------------------------------------------

```python
# --- core/evaluation/evaluators.py ---
import re
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional

from ultimate_mcp_server.core.evaluation.base import (
    EvaluationScore,
    Evaluator,
    register_evaluator,
)
from ultimate_mcp_server.core.models.tournament import ModelResponseData
from ultimate_mcp_server.tools.completion import generate_completion

# --- Import the sandbox execution tool ---
from ultimate_mcp_server.tools.python_sandbox import (
    ProviderError,
    ToolError,
    ToolInputError,
    execute_python,
)
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.evaluation.evaluators")


@register_evaluator
class LLMGraderEvaluator(Evaluator):
    evaluator_type = "llm_grader"

    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.grader_model_id = config.get("model_id", "anthropic/claude-3-5-haiku-20241022")
        self.rubric = config.get(
            "rubric",
            "Score the response on a scale of 0-100 for quality, relevance, and clarity. Explain your reasoning.",
        )
        self.score_extraction_regex_str = config.get(
            "score_extraction_regex", r"Score:\s*(\d{1,3})"
        )
        try:
            self.score_extraction_regex = re.compile(self.score_extraction_regex_str)
        except re.error as e:
            logger.error(
                f"Invalid regex for score_extraction_regex in LLMGrader: {self.score_extraction_regex_str}. Error: {e}"
            )
            self.score_extraction_regex = re.compile(r"Score:\s*(\d{1,3})")

    async def score(
        self,
        response_data: ModelResponseData,
        original_prompt: str,
        tournament_type: Literal["code", "text"],
    ) -> EvaluationScore:
        # ... (LLMGraderEvaluator code remains the same) ...
        content_to_grade = (
            response_data.extracted_code
            if tournament_type == "code" and response_data.extracted_code
            else response_data.response_text
        )

        if not content_to_grade:
            return EvaluationScore(score=0.0, details="No content to grade.")

        prompt = f"""Original Prompt:
{original_prompt}

Model Response to Evaluate:
---
{content_to_grade}
---

Rubric:
{self.rubric}

Please provide a score (0-100) and a brief justification. Format the score clearly, e.g., "Score: 90".
"""
        try:
            provider = self.grader_model_id.split("/")[0] if "/" in self.grader_model_id else None

            grader_response_dict = await generate_completion(
                prompt=prompt,
                model=self.grader_model_id,
                provider=provider,
                max_tokens=500,
                temperature=0.2,
            )  # Changed var name

            if not grader_response_dict.get("success"):  # Use new var name
                return EvaluationScore(
                    score=0.0, details=f"Grader LLM failed: {grader_response_dict.get('error')}"
                )

            grader_text = grader_response_dict.get("text", "")  # Use new var name

            score_match = self.score_extraction_regex.search(grader_text)
            numerical_score = 0.0
            if score_match:
                try:
                    numerical_score = float(score_match.group(1))
                    if not (0 <= numerical_score <= 100):
                        numerical_score = max(0.0, min(100.0, numerical_score))
                except ValueError:
                    logger.warning(
                        f"LLMGrader: Could not parse score from '{score_match.group(1)}'"
                    )
                except IndexError:
                    logger.warning(
                        f"LLMGrader: Regex '{self.score_extraction_regex_str}' matched but had no capture group 1."
                    )
            else:
                logger.warning(
                    f"LLMGrader: Could not find score pattern in grader response: {grader_text[:200]}"
                )

            return EvaluationScore(
                score=numerical_score,
                details=grader_text,
                metrics={"grader_cost": grader_response_dict.get("cost", 0)},  # Use new var name
            )

        except Exception as e:
            logger.error(f"LLMGrader failed: {e}", exc_info=True)
            return EvaluationScore(score=0.0, details=f"Error during LLM grading: {str(e)}")


@register_evaluator
class UnitTestEvaluator(Evaluator):
    evaluator_type = "unit_test"

    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        test_file_path_str = config.get("test_file_path")
        self.required_packages: List[str] = config.get("required_packages", [])  # For sandbox

        if not test_file_path_str:
            logger.warning(
                "UnitTestEvaluator: 'test_file_path' not provided in config. This evaluator may not function."
            )
            self.test_file_path = Path()
        else:
            self.test_file_path = Path(test_file_path_str)
        self.timeout_seconds = config.get("timeout_seconds", 30)  # Sandbox timeout is in ms

    async def score(
        self,
        response_data: ModelResponseData,
        original_prompt: str,  # Unused but part of interface
        tournament_type: Literal["code", "text"],
    ) -> EvaluationScore:
        if tournament_type != "code" or not response_data.extracted_code:
            return EvaluationScore(
                score=0.0,
                details="Unit test evaluator only applicable to code tournaments with extracted code.",
            )

        if (
            not self.test_file_path
            or not self.test_file_path.exists()
            or not self.test_file_path.is_file()
        ):
            details = f"Test file not found, not configured, or not a file: {self.test_file_path}"
            if not self.test_file_path.name:
                details = "Test file path not configured for UnitTestEvaluator."
            logger.warning(f"UnitTestEvaluator: {details}")
            return EvaluationScore(score=0.0, details=details)

        try:
            # Read the user's test code from the host filesystem
            user_test_code = self.test_file_path.read_text(encoding="utf-8")
        except Exception as e:
            logger.error(f"UnitTestEvaluator: Failed to read test file {self.test_file_path}: {e}")
            return EvaluationScore(score=0.0, details=f"Failed to read test file: {e}")

        # Combine the generated code and the user's test code into a single script
        # to be run in the sandbox.
        # The generated code will be defined first, then the test code.
        # We assume the test code can import/use things defined in the generated code.
        # A common pattern is for generated code to be in a module `solution` or similar.
        # Here, we'll just put them in the same global scope for simplicity.

        # Let's make the generated code importable as 'generated_solution'
        # and the test code able to 'from generated_solution import *' or specific functions/classes.
        # This requires the generated code to be structured as a module.
        # For now, a simpler approach: just concatenate.
        # More robust: write generated_code to solution.py, test_code to test_solution.py,
        # then run test_solution.py which imports solution.py. This is harder without a true sandbox FS.

        # --- Simpler approach: Inject generated code directly, then test code ---
        # Test code should be written to assume the generated code's functions/classes
        # are available in the global scope or importable from a predefined module name.
        # For Pyodide, defining them globally is easiest.

        # The `unittest_runner_script` will execute the combined code.
        # It will define the generated code, then the test code, then run unittest.

        generated_code_to_run = response_data.extracted_code

        # This script will be executed by python_sandbox.py
        # It needs to define the generated functions/classes, then define and run tests.
        # stdout from this script will be parsed for results.
        unittest_runner_script = f"""
# --- Generated Code from Model ---
{generated_code_to_run}
# --- End of Generated Code ---

# --- User's Test Code ---
{user_test_code}
# --- End of User's Test Code ---

# --- Unittest Execution ---
import unittest
import sys
import io # To capture unittest output

# Capture unittest's output to a string buffer instead of stderr
# This makes parsing easier and cleaner from the sandbox output.
suite = unittest.defaultTestLoader.loadTestsFromModule(sys.modules[__name__])
output_buffer = io.StringIO()
runner = unittest.TextTestRunner(stream=output_buffer, verbosity=2)
result = runner.run(suite)

# Print results in a parsable format to STDOUT
# The python_sandbox tool will capture this stdout.
print("UNIT_TEST_RESULTS_START") # Delimiter for easier parsing
print(f"TestsRun:{{result.testsRun}}")
print(f"Failures:{{len(result.failures)}}")
print(f"Errors:{{len(result.errors)}}")
pass_rate = (result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun if result.testsRun > 0 else 0.0
print(f"PassRate:{{pass_rate:.4f}}")
print("UNIT_TEST_RESULTS_END")

# Also print the full unittest output (which was captured in output_buffer)
# This can go to stdout as well, or we can separate it.
print("\\n--- Unittest Full Output ---")
print(output_buffer.getvalue())
"""
        details_output = "Unit test execution details via Pyodide Sandbox:\n"
        pass_rate = 0.0
        tests_run = 0
        failures = 0
        errors = 0
        sandbox_stdout = ""
        sandbox_stderr = ""

        try:
            sandbox_result = await execute_python(
                code=unittest_runner_script,
                packages=self.required_packages,  # Pass packages needed by generated code or tests
                # wheels=... # If wheels are needed
                allow_network=False,  # Usually False for unit tests unless they test network code
                allow_fs=False,  # Usually False unless tests interact with mcpfs
                timeout_ms=self.timeout_seconds * 1000,
            )

            if sandbox_result.get("success"):
                sandbox_stdout = sandbox_result.get("stdout", "")
                sandbox_stderr = sandbox_result.get("stderr", "")  # Unittest output now in stdout
                details_output += f"Sandbox STDOUT:\n{sandbox_stdout}\n"
                if sandbox_stderr:  # Still log stderr if sandbox itself had issues
                    details_output += f"Sandbox STDERR:\n{sandbox_stderr}\n"

                # Parse metrics from sandbox_stdout
                # Use re.search with MULTILINE if parsing from a larger block
                run_match = re.search(r"TestsRun:(\d+)", sandbox_stdout)
                fail_match = re.search(r"Failures:(\d+)", sandbox_stdout)
                err_match = re.search(r"Errors:(\d+)", sandbox_stdout)
                rate_match = re.search(r"PassRate:([0-9.]+)", sandbox_stdout)

                if run_match:
                    tests_run = int(run_match.group(1))
                if fail_match:
                    failures = int(fail_match.group(1))
                if err_match:
                    errors = int(err_match.group(1))
                if rate_match:
                    pass_rate = float(rate_match.group(1))
                else:
                    logger.warning(
                        f"UnitTestEvaluator: Could not parse PassRate from sandbox stdout. Output: {sandbox_stdout[:500]}"
                    )
                    details_output += "Warning: Could not parse PassRate from output.\n"
            else:  # Sandbox execution itself failed
                error_msg = sandbox_result.get("error_message", "Sandbox execution failed")
                error_details = sandbox_result.get("error_details", {})
                details_output += (
                    f"Sandbox Execution Failed: {error_msg}\nDetails: {error_details}\n"
                )
                logger.error(
                    f"UnitTestEvaluator: Sandbox execution failed: {error_msg} - {error_details}"
                )
                pass_rate = 0.0

        except (
            ProviderError,
            ToolError,
            ToolInputError,
        ) as e:  # Catch errors from execute_python tool
            logger.error(f"UnitTestEvaluator: Error calling python_sandbox: {e}", exc_info=True)
            details_output += f"Error calling python_sandbox: {str(e)}\n"
            pass_rate = 0.0
        except Exception as e:  # Catch any other unexpected errors
            logger.error(f"UnitTestEvaluator: Unexpected error: {e}", exc_info=True)
            details_output += f"Unexpected error during unit test evaluation: {str(e)}\n"
            pass_rate = 0.0

        return EvaluationScore(
            score=pass_rate * 100,  # Score 0-100
            details=details_output,
            metrics={
                "tests_run": tests_run,
                "failures": failures,
                "errors": errors,
                "pass_rate": pass_rate,
                "sandbox_stdout_len": len(sandbox_stdout),
                "sandbox_stderr_len": len(sandbox_stderr),
            },
        )


@register_evaluator
class RegexMatchEvaluator(Evaluator):
    evaluator_type = "regex_match"

    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.patterns_str: List[str] = config.get("patterns", [])
        if not self.patterns_str or not isinstance(self.patterns_str, list):
            logger.error("RegexMatchEvaluator: 'patterns' (list of strings) is required in config.")
            self.patterns_str = []

        self.target_field: Literal["response_text", "extracted_code"] = config.get(
            "target_field", "response_text"
        )
        self.match_mode: Literal["all_must_match", "any_can_match", "proportion_matched"] = (
            config.get("match_mode", "all_must_match")
        )

        flag_options_str: Optional[List[str]] = config.get("regex_flag_options")
        self.regex_flags: int = 0
        if flag_options_str:
            for flag_name in flag_options_str:
                if hasattr(re, flag_name.upper()):
                    self.regex_flags |= getattr(re, flag_name.upper())
                else:
                    logger.warning(
                        f"RegexMatchEvaluator: Unknown regex flag '{flag_name}' specified."
                    )

        self.compiled_patterns: List[re.Pattern] = []
        for i, p_str in enumerate(
            self.patterns_str
        ):  # Use enumerate to get index for original string
            try:
                self.compiled_patterns.append(re.compile(p_str, self.regex_flags))
            except re.error as e:
                logger.error(
                    f"RegexMatchEvaluator: Invalid regex pattern '{p_str}' (index {i}): {e}. Skipping this pattern."
                )
                # Add a placeholder or skip to keep lengths consistent if needed,
                # or ensure patterns_str is filtered alongside compiled_patterns.
                # For simplicity now, compiled_patterns might be shorter if errors occur.

    async def score(
        self,
        response_data: ModelResponseData,
        original_prompt: str,
        tournament_type: Literal["code", "text"],
    ) -> EvaluationScore:
        # Iterate using original patterns_str for error reporting if compiled_patterns is shorter
        num_configured_patterns = len(self.patterns_str)

        if not self.compiled_patterns and self.patterns_str:  # Some patterns were invalid
            return EvaluationScore(
                score=0.0,
                details="No valid regex patterns could be compiled from configuration.",
                metrics={
                    "patterns_configured": num_configured_patterns,
                    "patterns_compiled": 0,
                    "patterns_matched": 0,
                },
            )
        if not self.compiled_patterns and not self.patterns_str:  # No patterns provided at all
            return EvaluationScore(
                score=0.0,
                details="No regex patterns configured for matching.",
                metrics={"patterns_configured": 0, "patterns_compiled": 0, "patterns_matched": 0},
            )

        content_to_check: Optional[str] = None
        if self.target_field == "extracted_code":
            content_to_check = response_data.extracted_code
        elif self.target_field == "response_text":
            content_to_check = response_data.response_text
        else:
            return EvaluationScore(
                score=0.0,
                details=f"Invalid target_field '{self.target_field}'.",
                metrics={"patterns_compiled": len(self.compiled_patterns), "patterns_matched": 0},
            )

        if content_to_check is None:
            return EvaluationScore(
                score=0.0,
                details=f"Target content field '{self.target_field}' is empty or None.",
                metrics={"patterns_compiled": len(self.compiled_patterns), "patterns_matched": 0},
            )

        num_matched = 0
        all_patterns_details: List[str] = []

        # Corrected loop over successfully compiled patterns
        for pattern_obj in self.compiled_patterns:
            if pattern_obj.search(content_to_check):
                num_matched += 1
                all_patterns_details.append(f"Pattern '{pattern_obj.pattern}': MATCHED")
            else:
                all_patterns_details.append(f"Pattern '{pattern_obj.pattern}': NOT MATCHED")

        final_score = 0.0
        num_effective_patterns = len(self.compiled_patterns)  # Base score on only valid patterns

        if num_effective_patterns == 0 and num_configured_patterns > 0:  # All patterns were invalid
            details_str = f"Target field: '{self.target_field}'. Mode: '{self.match_mode}'.\nAll {num_configured_patterns} configured regex patterns were invalid and could not be compiled."
            return EvaluationScore(
                score=0.0,
                details=details_str,
                metrics={
                    "patterns_configured": num_configured_patterns,
                    "patterns_compiled": 0,
                    "patterns_matched": 0,
                },
            )
        elif num_effective_patterns == 0 and num_configured_patterns == 0:  # No patterns configured
            details_str = f"Target field: '{self.target_field}'. Mode: '{self.match_mode}'.\nNo regex patterns configured."
            return EvaluationScore(
                score=0.0,
                details=details_str,
                metrics={"patterns_configured": 0, "patterns_compiled": 0, "patterns_matched": 0},
            )

        if self.match_mode == "all_must_match":
            final_score = 100.0 if num_matched == num_effective_patterns else 0.0
        elif self.match_mode == "any_can_match":
            final_score = 100.0 if num_matched > 0 else 0.0
        elif self.match_mode == "proportion_matched":
            final_score = (num_matched / num_effective_patterns) * 100.0

        details_str = f"Target field: '{self.target_field}'. Mode: '{self.match_mode}'.\n"
        details_str += f"Matched {num_matched} out of {num_effective_patterns} validly compiled patterns (from {num_configured_patterns} configured).\n"
        details_str += "\n".join(all_patterns_details)

        return EvaluationScore(
            score=final_score,
            details=details_str,
            metrics={
                "patterns_configured": num_configured_patterns,
                "patterns_compiled": num_effective_patterns,
                "patterns_matched": num_matched,
                "match_proportion_compiled": (num_matched / num_effective_patterns)
                if num_effective_patterns
                else 0.0,
            },
        )

```

--------------------------------------------------------------------------------
/ultimate_mcp_server/clients/completion_client.py:
--------------------------------------------------------------------------------

```python
"""High-level client for LLM completion operations."""

import asyncio
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, get_provider
from ultimate_mcp_server.services.cache import get_cache_service
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.clients.completion")

class CompletionClient:
    """
    High-level client for LLM text generation operations with advanced features.
    
    The CompletionClient provides a unified interface for interacting with various LLM providers
    (OpenAI, Anthropic, etc.) through a simple, consistent API. It abstracts away the complexity
    of provider-specific implementations, offering a range of features that enhance reliability
    and performance.
    
    Key features:
    - Multi-provider support with unified interface
    - Automatic fallback between providers
    - Result caching for improved performance and reduced costs
    - Streaming support for real-time text generation
    - Provider initialization and error handling
    - Comprehensive error handling and logging
    
    Architecture:
    The client follows a layered architecture pattern:
    1. High-level methods (generate_completion, generate_completion_stream) provide the main API
    2. Provider abstraction layer manages provider-specific implementation details
    3. Caching layer intercepts requests to reduce redundant API calls
    4. Error handling layer provides graceful fallbacks and informative errors
    
    Performance Considerations:
    - Caching is enabled by default and can significantly reduce API costs and latency
    - For time-sensitive or unique responses, caching can be disabled per request
    - Streaming mode reduces time-to-first-token but cannot leverage caching
    - Provider fallback adds resilience but may increase latency if primary providers fail
    
    This client is designed for MCP tools that require text generation using LLMs,
    making interactions more robust by handling common issues like rate limits,
    timeouts, and provider-specific errors.
    
    Example:
    ```python
    # Create client with default settings
    client = CompletionClient()
    
    # Generate text non-streaming with specific provider and model
    result = await client.generate_completion(
        prompt="Explain quantum computing",
        provider="anthropic",
        model="claude-3-5-haiku-20241022",
        temperature=0.5,
        max_tokens=1000
    )
    print(f"Generated by {result.model} in {result.processing_time:.2f}s")
    print(result.text)
    
    # Generate text with streaming for real-time output
    async for chunk, metadata in client.generate_completion_stream(
        prompt="Write a short story about robots",
        temperature=0.8
    ):
        print(chunk, end="")
        if metadata.get("done", False):
            print("\nGeneration complete!")
            
    # Use provider fallback for high availability
    try:
        result = await client.try_providers(
            prompt="Summarize this article",
            providers=["openai", "anthropic", "gemini"],
            models=["gpt-4", "claude-instant-1", "gemini-pro"],
            temperature=0.3
        )
    except Exception as e:
        print(f"All providers failed: {e}")
    ```
    """
    
    def __init__(self, default_provider: str = Provider.OPENAI.value, use_cache_by_default: bool = True):
        """Initialize the completion client.
        
        Args:
            default_provider: Default provider to use for completions
            use_cache_by_default: Whether to use cache by default
        """
        self.default_provider = default_provider
        self.cache_service = get_cache_service()
        self.use_cache_by_default = use_cache_by_default
        
    async def initialize_provider(self, provider_name: str, api_key: Optional[str] = None) -> BaseProvider:
        """
        Initialize and return a provider instance ready for LLM interactions.
        
        This method handles the creation and initialization of a specific LLM provider,
        ensuring it's properly configured and ready to generate completions. It abstracts
        the details of provider initialization, including async initialization methods
        that some providers might require.
        
        The method performs several steps:
        1. Retrieves the provider implementation based on the provider name
        2. Applies the API key if provided (otherwise uses environment configuration)
        3. Runs any provider-specific async initialization if required
        4. Returns the ready-to-use provider instance
        
        Provider initialization follows these architecture principles:
        - Late binding: Providers are initialized on-demand, not at client creation
        - Dependency injection: API keys can be injected at runtime rather than relying only on environment
        - Fail-fast: Validation occurs during initialization rather than at generation time
        - Extensibility: New providers can be added without changing client code
        
        Common provider names include:
        - "openai": OpenAI API (GPT models)
        - "anthropic": Anthropic API (Claude models)
        - "google": Google AI/Vertex API (Gemini models)
        - "mistral": Mistral AI API (Mistral, Mixtral models)
        - "ollama": Local Ollama server for various open-source models
        
        Error handling:
        - Invalid provider names are caught and reported immediately
        - Authentication issues (e.g., invalid API keys) are detected during initialization
        - Provider-specific initialization failures are propagated with detailed error messages
        
        Args:
            provider_name: Identifier for the desired provider (e.g., "openai", "anthropic")
            api_key: Optional API key to use instead of environment-configured keys
            
        Returns:
            A fully initialized BaseProvider instance ready to generate completions
            
        Raises:
            ValueError: If the provider name is invalid or not supported
            Exception: If initialization fails (e.g., invalid API key, network issues)
            
        Note:
            This method is typically called internally by other client methods,
            but can be used directly when you need a specific provider instance
            for specialized operations not covered by the main client methods.
            
        Example:
            ```python
            # Get a specific provider instance for custom operations
            openai_provider = await client.initialize_provider("openai")
            
            # Custom operation using provider-specific features
            response = await openai_provider.some_specialized_method(...)
            ```
        """
        try:
            provider = await get_provider(provider_name, api_key=api_key)
            # Ensure the provider is initialized (some might need async init)
            if hasattr(provider, 'initialize') and asyncio.iscoroutinefunction(provider.initialize):
                await provider.initialize()
            return provider
        except Exception as e:
            logger.error(f"Failed to initialize provider {provider_name}: {e}", emoji_key="error")
            raise
    
    async def generate_completion(
        self,
        prompt: str,
        provider: Optional[str] = None,
        model: Optional[str] = None,
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        use_cache: bool = True,
        cache_ttl: int = 3600,
        **kwargs
    ):
        """
        Generate text completion from an LLM with optional caching.
        
        This method provides a unified interface for generating text completions from
        any supported LLM provider. It includes intelligent caching to avoid redundant
        API calls for identical inputs, reducing costs and latency.
        
        The caching system:
        - Creates a unique key based on the prompt, provider, model, and parameters
        - Checks for cached results before making API calls
        - Stores successful responses with a configurable TTL
        - Can be disabled per-request with the use_cache parameter
        
        Args:
            prompt: The text prompt to send to the LLM
            provider: The LLM provider to use (e.g., "openai", "anthropic", "google")
                      If None, uses the client's default_provider
            model: Specific model to use (e.g., "gpt-4", "claude-instant-1")
                   If None, uses the provider's default model
            temperature: Sampling temperature for controlling randomness (0.0-1.0)
                         Lower values are more deterministic, higher values more creative
            max_tokens: Maximum number of tokens to generate
                        If None, uses provider-specific defaults
            use_cache: Whether to use the caching system (default: True)
            cache_ttl: Time-to-live for cache entries in seconds (default: 1 hour)
            **kwargs: Additional provider-specific parameters
                      (e.g., top_p, frequency_penalty, presence_penalty)
            
        Returns:
            CompletionResult object with attributes:
            - text: The generated completion text
            - provider: The provider that generated the text
            - model: The model used
            - processing_time: Time taken to generate the completion (in seconds)
            - tokens: Token usage information (if available)
            - error: Error information (if an error occurred but was handled)
            
        Raises:
            ValueError: For invalid parameters
            Exception: For provider errors or other issues during generation
            
        Example:
            ```python
            result = await client.generate_completion(
                prompt="Write a poem about artificial intelligence",
                temperature=0.8,
                max_tokens=1000
            )
            print(f"Generated by {result.model} in {result.processing_time:.2f}s")
            print(result.text)
            ```
        """
        provider_name = provider or self.default_provider
        
        # Check cache if enabled
        if use_cache and self.cache_service.enabled:
            # Create a robust cache key
            provider_instance = await self.initialize_provider(provider_name)
            model_id = model or provider_instance.get_default_model()
            # Include relevant parameters in the cache key
            params_hash = hash((prompt, temperature, max_tokens, str(kwargs)))
            cache_key = f"completion:{provider_name}:{model_id}:{params_hash}"
            
            cached_result = await self.cache_service.get(cache_key)
            if cached_result is not None:
                logger.success("Cache hit! Using cached result", emoji_key="cache")
                # Set a nominal processing time for cached results
                cached_result.processing_time = 0.001
                return cached_result
                
        # Cache miss or cache disabled
        if use_cache and self.cache_service.enabled:
            logger.info("Cache miss. Generating new completion...", emoji_key="processing")
        else:
            logger.info("Generating completion...", emoji_key="processing")
            
        # Initialize provider and generate completion
        try:
            provider_instance = await self.initialize_provider(provider_name)
            model_id = model or provider_instance.get_default_model()
            
            result = await provider_instance.generate_completion(
                prompt=prompt,
                model=model_id,
                temperature=temperature,
                max_tokens=max_tokens,
                **kwargs
            )
            
            # Save to cache if enabled
            if use_cache and self.cache_service.enabled:
                await self.cache_service.set(
                    key=cache_key,
                    value=result,
                    ttl=cache_ttl
                )
                logger.info(f"Result saved to cache (key: ...{cache_key[-10:]})", emoji_key="cache")
                
            return result
            
        except Exception as e:
            logger.error(f"Error generating completion: {str(e)}", emoji_key="error")
            raise
    
    async def generate_completion_stream(
        self,
        prompt: str,
        provider: Optional[str] = None,
        model: Optional[str] = None,
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        **kwargs
    ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
        """
        Generate a streaming text completion with real-time chunks.
        
        This method provides a streaming interface to LLM text generation, where
        text is returned incrementally as it's generated, rather than waiting for
        the entire response. This enables real-time UI updates, faster apparent
        response times, and the ability to process partial responses.
        
        Unlike the non-streaming version, this method:
        - Does not support caching (each streaming response is unique)
        - Returns an async generator that yields content incrementally
        - Provides metadata with each chunk for tracking generation progress
        
        Args:
            prompt: The text prompt to send to the LLM
            provider: The LLM provider to use (e.g., "openai", "anthropic")
                      If None, uses the client's default_provider
            model: Specific model to use (e.g., "gpt-4", "claude-instant-1")
                   If None, uses the provider's default model
            temperature: Sampling temperature for controlling randomness (0.0-1.0)
                         Lower values are more deterministic, higher values more creative
            max_tokens: Maximum number of tokens to generate
                        If None, uses provider-specific defaults
            **kwargs: Additional provider-specific parameters
            
        Yields:
            Tuples of (chunk_text, metadata), where:
            - chunk_text: A string containing the next piece of generated text
            - metadata: A dictionary with information about the generation process:
                - done: Boolean indicating if this is the final chunk
                - chunk_index: Index of the current chunk (0-based)
                - token_count: Number of tokens in this chunk (if available)
                - total_tokens: Running total of tokens generated so far (if available)
        
        Raises:
            ValueError: For invalid parameters
            Exception: For provider errors or other issues during streaming
            
        Example:
            ```python
            # Display text as it's generated
            async for chunk, metadata in client.generate_completion_stream(
                prompt="Explain the theory of relativity",
                temperature=0.3
            ):
                print(chunk, end="")
                if metadata.get("done", False):
                    print("\nGeneration complete!")
            ```
            
        Note:
            Not all providers support streaming completions. Check the provider
            documentation for compatibility.
        """
        provider_name = provider or self.default_provider
        
        logger.info("Generating streaming completion...", emoji_key="processing")
        
        # Initialize provider and generate streaming completion
        try:
            provider_instance = await self.initialize_provider(provider_name)
            model_id = model or provider_instance.get_default_model()
            
            stream = provider_instance.generate_completion_stream(
                prompt=prompt,
                model=model_id,
                temperature=temperature,
                max_tokens=max_tokens,
                **kwargs
            )
            
            async for chunk, metadata in stream:
                yield chunk, metadata
                
        except Exception as e:
            logger.error(f"Error generating streaming completion: {str(e)}", emoji_key="error")
            raise
            
    async def try_providers(
        self,
        prompt: str,
        providers: List[str],
        models: Optional[List[str]] = None,
        **kwargs
    ):
        """
        Try multiple providers in sequence until one succeeds.
        
        This method implements an automatic fallback mechanism that attempts to generate
        a completion using a list of providers in order, continuing to the next provider
        if the current one fails. This provides resilience against provider downtime,
        rate limits, or other temporary failures.
        
        The method tries each provider exactly once in the order they're specified, with
        an optional corresponding model for each. This is useful for scenarios where you
        need high availability or want to implement prioritized provider selection.
        
        Args:
            prompt: The text prompt to send to the LLM
            providers: An ordered list of provider names to try (e.g., ["openai", "anthropic", "google"])
                       Providers are tried in the specified order until one succeeds
            models: Optional list of models to use with each provider
                    If provided, must be the same length as providers
                    If None, each provider's default model is used
            **kwargs: Additional parameters passed to generate_completion
                      Applies to all provider attempts
            
        Returns:
            CompletionResult from the first successful provider,
            with the same structure as generate_completion results
            
        Raises:
            ValueError: If no providers are specified or if models list length doesn't match providers
            Exception: If all specified providers fail, with details of the last error
            
        Example:
            ```python
            # Try OpenAI first, fall back to Anthropic, then Google
            result = await client.try_providers(
                prompt="Write a sonnet about programming",
                providers=["openai", "anthropic", "google"],
                models=["gpt-4", "claude-2", "gemini-pro"],
                temperature=0.7,
                max_tokens=800
            )
            print(f"Successfully used {result.provider} with model {result.model}")
            print(result.text)
            ```
            
        Note:
            Each provider attempt is logged, making it easy to track which providers
            succeeded or failed during the fallback sequence.
        """
        if not providers:
            raise ValueError("No providers specified")
            
        models = models or [None] * len(providers)
        if len(models) != len(providers):
            raise ValueError("If models are specified, there must be one for each provider")
            
        last_error = None
        
        for i, provider_name in enumerate(providers):
            try:
                logger.info(f"Trying provider: {provider_name}", emoji_key="provider")
                result = await self.generate_completion(
                    prompt=prompt,
                    provider=provider_name,
                    model=models[i],
                    **kwargs
                )
                return result
            except Exception as e:
                logger.warning(f"Provider {provider_name} failed: {str(e)}", emoji_key="warning")
                last_error = e
                
        # If we get here, all providers failed
        raise Exception(f"All providers failed. Last error: {str(last_error)}") 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/progress.py:
--------------------------------------------------------------------------------

```python
"""
Progress tracking and visualization for Gateway.

This module provides enhanced progress tracking capabilities with Rich,
supporting nested tasks, task groups, and dynamic progress updates.
"""
import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Any, Dict, Generator, Iterable, List, Optional, TypeVar

from rich.box import ROUNDED
from rich.console import Console, ConsoleRenderable, Group
from rich.live import Live
from rich.progress import (
    BarColumn,
    SpinnerColumn,
    TaskID,  # Import TaskID type hint
    TextColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
)
from rich.progress import Progress as RichProgress  # Renamed to avoid clash
from rich.table import Table

from .console import console as default_console  # Use the shared console instance

# Use relative imports

# TypeVar for generic progress tracking over iterables
T = TypeVar("T")

@dataclass
class TaskInfo:
    """Information about a single task being tracked."""
    description: str
    total: float
    completed: float = 0.0
    status: str = "running" # running, success, error, skipped
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    parent_id: Optional[str] = None
    rich_task_id: Optional[TaskID] = None # ID from Rich Progress
    meta: Dict[str, Any] = field(default_factory=dict)

    @property
    def elapsed(self) -> float:
        """Calculate elapsed time."""
        end = self.end_time or time.time()
        return end - self.start_time
        
    @property
    def is_complete(self) -> bool:
        """Check if the task is in a terminal state."""
        return self.status in ("success", "error", "skipped")

class GatewayProgress:
    """Manages multiple progress tasks with Rich integration and context.
    
    Allows for nested tasks and displays an overall summary.
    Uses a single Rich Progress instance managed internally.
    """
    
    def __init__(
        self,
        console: Optional[Console] = None,
        transient: bool = False, # Keep visible after completion?
        auto_refresh: bool = True,
        expand: bool = True, # Expand progress bars to full width?
        show_summary: bool = True,
        summary_refresh_rate: float = 1.0 # How often to refresh summary
    ):
        """Initialize the progress manager.
        
        Args:
            console: Rich Console instance (defaults to shared console)
            transient: Hide progress bars upon completion
            auto_refresh: Automatically refresh the display
            expand: Expand bars to console width
            show_summary: Display the summary panel below progress bars
            summary_refresh_rate: Rate limit for summary updates (seconds)
        """
        self.console = console or default_console
        self._rich_progress = self._create_progress(transient, auto_refresh, expand)
        self._live: Optional[Live] = None
        self._tasks: Dict[str, TaskInfo] = {}
        self._task_stack: List[str] = [] # For context managers
        self.show_summary = show_summary
        self._summary_renderable = self._render_summary() # Initial summary
        self._last_summary_update = 0.0
        self.summary_refresh_rate = summary_refresh_rate

    def _create_progress(self, transient: bool, auto_refresh: bool, expand: bool) -> RichProgress:
        """Create the underlying Rich Progress instance."""
        return RichProgress(
            SpinnerColumn(),
            TextColumn("[progress.description]{task.description}"),
            BarColumn(bar_width=None if expand else 40),
            "[progress.percentage]{task.percentage:>3.1f}%",
            TimeElapsedColumn(),
            TimeRemainingColumn(),
            console=self.console,
            transient=transient,
            auto_refresh=auto_refresh,
            expand=expand,
            # disable=True # Useful for debugging
        )

    def _render_summary(self) -> Group:
        """Render the overall progress summary table."""
        if not self.show_summary or not self._tasks:
            return Group() # Empty group if no summary needed or no tasks yet
            
        completed_count = sum(1 for t in self._tasks.values() if t.is_complete)
        running_count = len(self._tasks) - completed_count
        success_count = sum(1 for t in self._tasks.values() if t.status == 'success')
        error_count = sum(1 for t in self._tasks.values() if t.status == 'error')
        skipped_count = sum(1 for t in self._tasks.values() if t.status == 'skipped')
        
        total_elapsed = time.time() - min(t.start_time for t in self._tasks.values()) if self._tasks else 0
        
        # Calculate overall percentage (weighted average might be better?)
        overall_total = sum(t.total for t in self._tasks.values())
        overall_completed = sum(t.completed for t in self._tasks.values())
        overall_perc = (overall_completed / overall_total * 100) if overall_total > 0 else 100.0

        summary_table = Table(box=ROUNDED, show_header=False, padding=(0, 1), expand=True)
        summary_table.add_column("Metric", style="dim", width=15)
        summary_table.add_column("Value", style="bold")

        summary_table.add_row("Overall Prog.", f"{overall_perc:.1f}%")
        summary_table.add_row("Total Tasks", str(len(self._tasks)))
        summary_table.add_row("  Running", str(running_count))
        summary_table.add_row("  Completed", str(completed_count))
        if success_count > 0:
            summary_table.add_row("    Success", f"[success]{success_count}[/]")
        if error_count > 0:
            summary_table.add_row("    Errors", f"[error]{error_count}[/]")
        if skipped_count > 0:
            summary_table.add_row("    Skipped", f"[warning]{skipped_count}[/]")
        summary_table.add_row("Elapsed Time", f"{total_elapsed:.2f}s")
        
        return Group(summary_table)

    def _get_renderable(self) -> ConsoleRenderable:
        """Get the combined renderable for the Live display."""
        # Throttle summary updates
        now = time.time()
        if self.show_summary and (now - self._last_summary_update > self.summary_refresh_rate):
             self._summary_renderable = self._render_summary()
             self._last_summary_update = now
             
        if self.show_summary:
            return Group(self._rich_progress, self._summary_renderable)
        else:
            return self._rich_progress
            
    def add_task(
        self,
        description: str,
        name: Optional[str] = None,
        total: float = 100.0,
        parent: Optional[str] = None, # Name of parent task
        visible: bool = True,
        start: bool = True, # Start the Rich task immediately
        **meta: Any # Additional metadata
    ) -> str:
        """Add a new task to track.
        
        Args:
            description: Text description of the task.
            name: Unique name/ID for this task (auto-generated if None).
            total: Total steps/units for completion.
            parent: Name of the parent task for nesting (visual indent).
            visible: Whether the task is initially visible.
            start: Start the task in the Rich progress bar immediately.
            **meta: Arbitrary metadata associated with the task.
            
        Returns:
            The unique name/ID of the added task.
        """
        if name is None:
            name = str(uuid.uuid4()) # Generate unique ID if not provided
            
        if name in self._tasks:
             raise ValueError(f"Task with name '{name}' already exists.")
             
        parent_rich_id = None
        if parent:
            if parent not in self._tasks:
                 raise ValueError(f"Parent task '{parent}' not found.")
            parent_task_info = self._tasks[parent]
            if parent_task_info.rich_task_id is not None:
                 parent_rich_id = parent_task_info.rich_task_id
                 # Quick hack for indentation - needs better Rich integration? Rich doesn't directly support tree view in Progress
                 # description = f"  {description}" 

        task_info = TaskInfo(
            description=description,
            total=total,
            parent_id=parent,
            meta=meta,
        )
        
        # Add to Rich Progress if active
        rich_task_id = None
        if self._live and self._rich_progress:
             rich_task_id = self._rich_progress.add_task(
                 description,
                 total=total,
                 start=start,
                 visible=visible,
                 parent=parent_rich_id # Rich uses TaskID for parent
             )
             task_info.rich_task_id = rich_task_id
        
        self._tasks[name] = task_info
        return name

    def update_task(
        self,
        name: str,
        description: Optional[str] = None,
        advance: Optional[float] = None,
        completed: Optional[float] = None,
        total: Optional[float] = None,
        visible: Optional[bool] = None,
        status: Optional[str] = None, # running, success, error, skipped
        **meta: Any
    ) -> None:
        """Update an existing task.
        
        Args:
            name: The unique name/ID of the task to update.
            description: New description text.
            advance: Amount to advance the completion progress.
            completed: Set completion to a specific value.
            total: Set a new total value.
            visible: Change task visibility.
            status: Update the task status (affects summary).
            **meta: Update or add metadata.
        """
        if name not in self._tasks:
             # Optionally log a warning or error
             # default_console.print(f"[warning]Attempted to update non-existent task: {name}[/]")
             return
             
        task_info = self._tasks[name]
        update_kwargs = {}
        
        if description is not None:
            task_info.description = description
            update_kwargs['description'] = description
            
        if total is not None:
            task_info.total = float(total)
            update_kwargs['total'] = task_info.total
            
        # Update completed status
        if completed is not None:
            task_info.completed = max(0.0, min(float(completed), task_info.total))
            update_kwargs['completed'] = task_info.completed
        elif advance is not None:
            task_info.completed = max(0.0, min(task_info.completed + float(advance), task_info.total))
            update_kwargs['completed'] = task_info.completed
            
        if visible is not None:
            update_kwargs['visible'] = visible
            
        if meta:
            task_info.meta.update(meta)
        
        # Update status (after completion update)
        if status is not None:
            task_info.status = status
            if task_info.is_complete and task_info.end_time is None:
                task_info.end_time = time.time()
                # Ensure Rich task is marked as complete
                if 'completed' not in update_kwargs:
                     update_kwargs['completed'] = task_info.total

        # Update Rich progress bar if active
        if task_info.rich_task_id is not None and self._live and self._rich_progress:
            self._rich_progress.update(task_info.rich_task_id, **update_kwargs)

    def complete_task(self, name: str, status: str = "success") -> None:
        """Mark a task as complete with a final status.
        
        Args:
            name: The unique name/ID of the task.
            status: Final status ('success', 'error', 'skipped').
        """
        if name not in self._tasks:
            return # Or raise error/log warning
            
        task_info = self._tasks[name]
        self.update_task(
            name,
            completed=task_info.total, # Ensure it reaches 100%
            status=status
        )

    def start(self) -> "GatewayProgress":
        """Start the Rich Live display."""
        if self._live is None:
            # Add any tasks that were created before start()
            for _name, task_info in self._tasks.items():
                if task_info.rich_task_id is None:
                    parent_rich_id = None
                    if task_info.parent_id and task_info.parent_id in self._tasks:
                         parent_rich_id = self._tasks[task_info.parent_id].rich_task_id
                         
                    task_info.rich_task_id = self._rich_progress.add_task(
                        task_info.description,
                        total=task_info.total,
                        completed=task_info.completed,
                        start=True, # Assume tasks added before start should be started
                        visible=True, # Assume visible
                        parent=parent_rich_id
                    )
                    
            self._live = Live(self._get_renderable(), console=self.console, refresh_per_second=10, vertical_overflow="visible")
            self._live.start(refresh=True)
        return self

    def stop(self) -> None:
        """Stop the Rich Live display."""
        if self._live is not None:
            # Ensure all running tasks in Rich are marked complete before stopping Live
            # to avoid them getting stuck visually
            if self._rich_progress:
                for task in self._rich_progress.tasks:
                    if not task.finished:
                        self._rich_progress.update(task.id, completed=task.total)
            
            self._live.stop()
            self._live = None
            # Optional: Clear the Rich Progress tasks? 
            # self._rich_progress = self._create_progress(...) # Recreate if needed

    def update(self) -> None:
        """Force a refresh of the Live display (if active)."""
        if self._live:
             self._live.update(self._get_renderable(), refresh=True)

    def reset(self) -> None:
        """Reset the progress tracker, clearing all tasks."""
        self.stop() # Stop live display
        self._tasks.clear()
        self._task_stack.clear()
        # Recreate Rich progress to clear its tasks
        self._rich_progress = self._create_progress(
            self._rich_progress.transient,
            self._rich_progress.auto_refresh,
            True # Assuming expand is derived from console width anyway
        )
        self._summary_renderable = self._render_summary()
        self._last_summary_update = 0.0

    @contextmanager
    def task(
        self,
        description: str,
        name: Optional[str] = None,
        total: float = 100.0,
        parent: Optional[str] = None,
        autostart: bool = True, # Start Live display if not already started?
        **meta: Any
    ) -> Generator["GatewayProgress", None, None]: # Yields self for updates
        """Context manager for a single task.
        
        Args:
            description: Description of the task.
            name: Optional unique name/ID (auto-generated if None).
            total: Total steps/units for the task.
            parent: Optional parent task name.
            autostart: Start the overall progress display if not running.
            **meta: Additional metadata for the task.
        
        Yields:
            The GatewayProgress instance itself, allowing updates via `update_task`.
        """
        if autostart and self._live is None:
             self.start()
             
        task_name = self.add_task(description, name, total, parent, **meta)
        self._task_stack.append(task_name)
        
        try:
            yield self # Yield self to allow calling update_task(task_name, ...)
        except Exception:
            # Mark task as errored on exception
            self.complete_task(task_name, status="error")
            raise # Re-raise the exception
        else:
            # Mark task as successful if no exception
            # Check if it was already completed with a different status
            if task_name in self._tasks and not self._tasks[task_name].is_complete:
                 self.complete_task(task_name, status="success")
        finally:
            # Pop task from stack
            if self._task_stack and self._task_stack[-1] == task_name:
                self._task_stack.pop()
            # No automatic stop here - allow multiple context managers
            # self.stop() 

    def track(
        self,
        iterable: Iterable[T],
        description: str,
        name: Optional[str] = None,
        total: Optional[float] = None,
        parent: Optional[str] = None,
        autostart: bool = True,
        **meta: Any
    ) -> Iterable[T]:
        """Track progress over an iterable.
        
        Args:
            iterable: The iterable to track progress over.
            description: Description of the task.
            name: Optional unique name/ID (auto-generated if None).
            total: Total number of items (estimated if None).
            parent: Optional parent task name.
            autostart: Start the overall progress display if not running.
            **meta: Additional metadata for the task.
            
        Returns:
            The iterable, yielding items while updating progress.
        """
        if total is None:
            try:
                total = float(len(iterable)) # type: ignore
            except (TypeError, AttributeError):
                total = 100.0 # Default if length cannot be determined

        if autostart and self._live is None:
             self.start()
             
        task_name = self.add_task(description, name, total, parent, **meta)
        
        try:
            for item in iterable:
                yield item
                self.update_task(task_name, advance=1)
        except Exception:
            self.complete_task(task_name, status="error")
            raise
        else:
             # Check if it was already completed with a different status
            if task_name in self._tasks and not self._tasks[task_name].is_complete:
                 self.complete_task(task_name, status="success")
        # No automatic stop
        # finally:
            # self.stop()

    def __enter__(self) -> "GatewayProgress":
        """Enter context manager, starts the display."""
        return self.start()

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit context manager, stops the display."""
        self.stop()

# --- Global Convenience Functions (using a default progress instance) --- 
# Note: Managing a truly global progress instance can be tricky.
# It might be better to explicitly create and manage GatewayProgress instances.
_global_progress: Optional[GatewayProgress] = None

def get_global_progress() -> GatewayProgress:
    """Get or create the default global progress manager."""
    global _global_progress
    if _global_progress is None:
        _global_progress = GatewayProgress()
    return _global_progress

def track(
    iterable: Iterable[T],
    description: str,
    name: Optional[str] = None,
    total: Optional[float] = None,
    parent: Optional[str] = None,
) -> Iterable[T]:
    """Track progress over an iterable using the global progress manager."""
    prog = get_global_progress()
    # Ensure global progress is started if used this way
    if prog._live is None:
        prog.start()
    return prog.track(iterable, description, name, total, parent, autostart=False)

@contextmanager
def task(
    description: str,
    name: Optional[str] = None,
    total: float = 100.0,
    parent: Optional[str] = None,
) -> Generator["GatewayProgress", None, None]:
    """Context manager for a single task using the global progress manager."""
    prog = get_global_progress()
    # Ensure global progress is started if used this way
    if prog._live is None:
        prog.start()
    with prog.task(description, name, total, parent, autostart=False) as task_context:
        yield task_context # Yields the progress manager itself 
```

--------------------------------------------------------------------------------
/examples/compare_synthesize_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Enhanced demo of the Advanced Response Comparator & Synthesizer Tool."""
import asyncio
import json
import sys
from collections import namedtuple  # Import namedtuple
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway  # Use Gateway to get MCP

# from ultimate_mcp_server.tools.meta import compare_and_synthesize  # Add correct import
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker  # Import CostTracker
from ultimate_mcp_server.utils.logging.console import console

# Initialize logger
logger = get_logger("example.compare_synthesize_v2")

# Create a simple structure for cost tracking from dict (tokens might be missing)
TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])

# Global MCP instance (will be populated from Gateway)
mcp = None

async def setup_gateway_and_tools():
    """Set up the gateway and register tools."""
    global mcp
    logger.info("Initializing Gateway and MetaTools for enhanced demo...", emoji_key="start")
    gateway = Gateway("compare-synthesize-demo-v2", register_tools=False)

    # Initialize providers (needed for the tool to function)
    try:
        await gateway._initialize_providers()
    except Exception as e:
        logger.critical(f"Failed to initialize providers: {e}. Check API keys.", emoji_key="critical", exc_info=True)
        sys.exit(1) # Exit if providers can't be initialized

    # REMOVE MetaTools instance
    # meta_tools = MetaTools(gateway) # Pass the gateway instance  # noqa: F841
    mcp = gateway.mcp # Store the MCP server instance
    
    # Manually register the required tool
    # mcp.tool()(compare_and_synthesize) 
    # logger.info("Manually registered compare_and_synthesize tool.")

    # Verify tool registration
    tool_list = await mcp.list_tools()
    tool_names = [t.name for t in tool_list] # Access name attribute directly
    # Use console.print for tool list
    console.print(f"Registered tools: [cyan]{escape(str(tool_names))}[/cyan]")
    if "compare_and_synthesize" in tool_names:
        logger.success("compare_and_synthesize tool registered successfully.", emoji_key="success")
    else:
        logger.error("compare_and_synthesize tool FAILED to register.", emoji_key="error")
        sys.exit(1) # Exit if the required tool isn't available

    logger.success("Setup complete.", emoji_key="success")

# Refactored print_result function using Rich
def print_result(title: str, result: dict):
    """Helper function to print results clearly using Rich components."""
    console.print(Rule(f"[bold blue]{escape(title)}[/bold blue]"))
    
    # Handle potential list result format (from older tool versions?)
    if isinstance(result, list) and len(result) > 0:
        if hasattr(result[0], 'text'):
            try:
                result = json.loads(result[0].text)
            except Exception:
                result = {"error": "Failed to parse result from list format"}
        else:
            result = result[0] # Assume first item is the dict
    elif not isinstance(result, dict):
        result = {"error": f"Unexpected result format: {type(result)}"}
    
    if result.get("error"):
        error_content = f"[red]Error:[/red] {escape(result['error'])}"
        if "partial_results" in result and result["partial_results"]:
            try:
                partial_json = json.dumps(result["partial_results"], indent=2)
                error_content += "\n\n[yellow]Partial Results:[/yellow]"
                error_panel_content = Syntax(partial_json, "json", theme="default", line_numbers=False, word_wrap=True)
            except Exception as json_err:
                 error_panel_content = f"[red]Could not display partial results: {escape(str(json_err))}[/red]"
        else:
            error_panel_content = error_content
            
        console.print(Panel(
            error_panel_content,
            title="[bold red]Tool Error[/bold red]",
            border_style="red",
            expand=False
        ))
        
    else:
        # Display synthesis/analysis sections
        if "synthesis" in result:
            synthesis_data = result["synthesis"]
            if isinstance(synthesis_data, dict):
                
                if "best_response_text" in synthesis_data:
                    console.print(Panel(
                        escape(synthesis_data["best_response_text"].strip()),
                        title="[bold green]Best Response Text[/bold green]",
                        border_style="green",
                        expand=False
                    ))
                
                if "synthesized_response" in synthesis_data:
                     console.print(Panel(
                        escape(synthesis_data["synthesized_response"].strip()),
                        title="[bold magenta]Synthesized Response[/bold magenta]",
                        border_style="magenta",
                        expand=False
                    ))
                    
                if synthesis_data.get("best_response", {}).get("reasoning"):
                    console.print(Panel(
                        escape(synthesis_data["best_response"]["reasoning"].strip()),
                        title="[bold cyan]Best Response Reasoning[/bold cyan]",
                        border_style="dim cyan",
                        expand=False
                    ))
                    
                if synthesis_data.get("synthesis_strategy"):
                    console.print(Panel(
                        escape(synthesis_data["synthesis_strategy"].strip()),
                        title="[bold yellow]Synthesis Strategy Explanation[/bold yellow]",
                        border_style="dim yellow",
                        expand=False
                    ))

                if "ranking" in synthesis_data:
                    try:
                        ranking_json = json.dumps(synthesis_data["ranking"], indent=2)
                        console.print(Panel(
                            Syntax(ranking_json, "json", theme="default", line_numbers=False, word_wrap=True),
                            title="[bold]Ranking[/bold]",
                            border_style="dim blue",
                            expand=False
                        ))
                    except Exception as json_err:
                        console.print(f"[red]Could not display ranking: {escape(str(json_err))}[/red]")
                        
                if "comparative_analysis" in synthesis_data:
                    try:
                        analysis_json = json.dumps(synthesis_data["comparative_analysis"], indent=2)
                        console.print(Panel(
                            Syntax(analysis_json, "json", theme="default", line_numbers=False, word_wrap=True),
                            title="[bold]Comparative Analysis[/bold]",
                            border_style="dim blue",
                            expand=False
                        ))
                    except Exception as json_err:
                        console.print(f"[red]Could not display comparative analysis: {escape(str(json_err))}[/red]")

            else: # Handle case where synthesis data isn't a dict (e.g., raw text error)
                console.print(Panel(
                    f"[yellow]Synthesis Output (raw/unexpected format):[/yellow]\n{escape(str(synthesis_data))}",
                    title="[bold yellow]Synthesis Data[/bold yellow]",
                    border_style="yellow",
                    expand=False
                ))

        # Display Stats Table
        stats_table = Table(title="[bold]Execution Stats[/bold]", box=box.ROUNDED, show_header=False, expand=False)
        stats_table.add_column("Metric", style="cyan", no_wrap=True)
        stats_table.add_column("Value", style="white")
        stats_table.add_row("Eval/Synth Model", f"{escape(result.get('synthesis_provider','N/A'))}/{escape(result.get('synthesis_model','N/A'))}")
        stats_table.add_row("Total Cost", f"${result.get('cost', {}).get('total_cost', 0.0):.6f}")
        stats_table.add_row("Processing Time", f"{result.get('processing_time', 0.0):.2f}s")
        console.print(stats_table)
        
    console.print() # Add spacing after each result block


async def run_comparison_demo(tracker: CostTracker):
    """Demonstrate different modes of compare_and_synthesize."""
    if not mcp:
        logger.error("MCP server not initialized. Run setup first.", emoji_key="error")
        return

    prompt = "Explain the main benefits of using asynchronous programming in Python for a moderately technical audience. Provide 2-3 key advantages."

    # --- Configuration for initial responses ---
    console.print(Rule("[bold green]Configurations[/bold green]"))
    console.print(f"[cyan]Prompt:[/cyan] {escape(prompt)}")
    initial_configs = [
        {"provider": Provider.OPENAI.value, "model": "gpt-4.1-mini", "parameters": {"temperature": 0.6, "max_tokens": 150}},
        {"provider": Provider.ANTHROPIC.value, "model": "claude-3-5-haiku-20241022", "parameters": {"temperature": 0.5, "max_tokens": 150}},
        {"provider": Provider.GEMINI.value, "model": "gemini-2.0-flash-lite", "parameters": {"temperature": 0.7, "max_tokens": 150}},
        {"provider": Provider.DEEPSEEK.value, "model": "deepseek-chat", "parameters": {"temperature": 0.6, "max_tokens": 150}},
    ]
    console.print(f"[cyan]Initial Models:[/cyan] {[f'{c['provider']}:{c['model']}' for c in initial_configs]}")

    # --- Evaluation Criteria ---
    criteria = [
        "Clarity: Is the explanation clear and easy to understand for the target audience?",
        "Accuracy: Are the stated benefits of async programming technically correct?",
        "Relevance: Does the response directly address the prompt and focus on key advantages?",
        "Conciseness: Is the explanation brief and to the point?",
        "Completeness: Does it mention 2-3 distinct and significant benefits?",
    ]
    console.print("[cyan]Evaluation Criteria:[/cyan]")
    for i, criterion in enumerate(criteria): 
        console.print(f"  {i+1}. {escape(criterion)}")

    # --- Criteria Weights (Optional) ---
    criteria_weights = {
        "Clarity: Is the explanation clear and easy to understand for the target audience?": 0.3,
        "Accuracy: Are the stated benefits of async programming technically correct?": 0.3,
        "Relevance: Does the response directly address the prompt and focus on key advantages?": 0.15,
        "Conciseness: Is the explanation brief and to the point?": 0.1,
        "Completeness: Does it mention 2-3 distinct and significant benefits?": 0.15,
    }
    console.print("[cyan]Criteria Weights (Optional):[/cyan]")
    # Create a small table for weights
    weights_table = Table(box=box.MINIMAL, show_header=False)
    weights_table.add_column("Criterion Snippet", style="dim")
    weights_table.add_column("Weight", style="green")
    for crit, weight in criteria_weights.items():
        weights_table.add_row(escape(crit.split(':')[0]), f"{weight:.2f}")
    console.print(weights_table)

    # --- Synthesis/Evaluation Model ---
    synthesis_model_config = {"provider": Provider.OPENAI.value, "model": "gpt-4.1"} 
    console.print(f"[cyan]Synthesis/Evaluation Model:[/cyan] {escape(synthesis_model_config['provider'])}:{escape(synthesis_model_config['model'])}")
    console.print() # Spacing before demos start

    common_args = {
        "prompt": prompt,
        "configs": initial_configs,
        "criteria": criteria,
        "criteria_weights": criteria_weights,
    }

    # --- Demo 1: Select Best Response ---
    logger.info("Running format 'best'...", emoji_key="processing")
    try:
        result = await mcp.call_tool("compare_and_synthesize", {
            **common_args,
            "response_format": "best",
            "include_reasoning": True, # Show why it was selected
            "synthesis_model": synthesis_model_config # Explicitly specify model to avoid OpenRouter
        })
        print_result("Response Format: 'best' (with reasoning)", result)
        # Track cost
        if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
            try:
                trackable = TrackableResult(
                    cost=result.get("cost", {}).get("total_cost", 0.0),
                    input_tokens=0, # Tokens not typically aggregated in this tool's output
                    output_tokens=0,
                    provider=result.get("synthesis_provider", "unknown"),
                    model=result.get("synthesis_model", "compare_synthesize"),
                    processing_time=result.get("processing_time", 0.0)
                )
                tracker.add_call(trackable)
            except Exception as track_err:
                logger.warning(f"Could not track cost for 'best' format: {track_err}", exc_info=False)
    except Exception as e:
        logger.error(f"Error during 'best' format demo: {e}", emoji_key="error", exc_info=True)

    # --- Demo 2: Synthesize Responses (Comprehensive Strategy) ---
    logger.info("Running format 'synthesis' (comprehensive)...", emoji_key="processing")
    try:
        result = await mcp.call_tool("compare_and_synthesize", {
            **common_args,
            "response_format": "synthesis",
            "synthesis_strategy": "comprehensive",
            "synthesis_model": synthesis_model_config, # Specify model for consistency
            "include_reasoning": True,
        })
        print_result("Response Format: 'synthesis' (Comprehensive Strategy)", result)
        # Track cost
        if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
            try:
                trackable = TrackableResult(
                    cost=result.get("cost", {}).get("total_cost", 0.0),
                    input_tokens=0, # Tokens not typically aggregated
                    output_tokens=0,
                    provider=result.get("synthesis_provider", "unknown"),
                    model=result.get("synthesis_model", "compare_synthesize"),
                    processing_time=result.get("processing_time", 0.0)
                )
                tracker.add_call(trackable)
            except Exception as track_err:
                logger.warning(f"Could not track cost for 'synthesis comprehensive': {track_err}", exc_info=False)
    except Exception as e:
        logger.error(f"Error during 'synthesis comprehensive' demo: {e}", emoji_key="error", exc_info=True)

    # --- Demo 3: Synthesize Responses (Conservative Strategy, No Reasoning) ---
    logger.info("Running format 'synthesis' (conservative, no reasoning)...", emoji_key="processing")
    try:
        result = await mcp.call_tool("compare_and_synthesize", {
            **common_args,
            "response_format": "synthesis",
            "synthesis_strategy": "conservative",
            "synthesis_model": synthesis_model_config, # Explicitly specify
            "include_reasoning": False, # Hide the synthesis strategy explanation
        })
        print_result("Response Format: 'synthesis' (Conservative, No Reasoning)", result)
        # Track cost
        if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
            try:
                trackable = TrackableResult(
                    cost=result.get("cost", {}).get("total_cost", 0.0),
                    input_tokens=0, # Tokens not typically aggregated
                    output_tokens=0,
                    provider=result.get("synthesis_provider", "unknown"),
                    model=result.get("synthesis_model", "compare_synthesize"),
                    processing_time=result.get("processing_time", 0.0)
                )
                tracker.add_call(trackable)
            except Exception as track_err:
                logger.warning(f"Could not track cost for 'synthesis conservative': {track_err}", exc_info=False)
    except Exception as e:
        logger.error(f"Error during 'synthesis conservative' demo: {e}", emoji_key="error", exc_info=True)

    # --- Demo 4: Rank Responses ---
    logger.info("Running format 'ranked'...", emoji_key="processing")
    try:
        result = await mcp.call_tool("compare_and_synthesize", {
            **common_args,
            "response_format": "ranked",
            "include_reasoning": True, # Show reasoning for ranks
            "synthesis_model": synthesis_model_config, # Explicitly specify
        })
        print_result("Response Format: 'ranked' (with reasoning)", result)
        # Track cost
        if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
            try:
                trackable = TrackableResult(
                    cost=result.get("cost", {}).get("total_cost", 0.0),
                    input_tokens=0, # Tokens not typically aggregated
                    output_tokens=0,
                    provider=result.get("synthesis_provider", "unknown"),
                    model=result.get("synthesis_model", "compare_synthesize"),
                    processing_time=result.get("processing_time", 0.0)
                )
                tracker.add_call(trackable)
            except Exception as track_err:
                logger.warning(f"Could not track cost for 'ranked' format: {track_err}", exc_info=False)
    except Exception as e:
        logger.error(f"Error during 'ranked' format demo: {e}", emoji_key="error", exc_info=True)

    # --- Demo 5: Analyze Responses ---
    logger.info("Running format 'analysis'...", emoji_key="processing")
    try:
        result = await mcp.call_tool("compare_and_synthesize", {
            **common_args,
            "response_format": "analysis",
            # No reasoning needed for analysis format, it's inherent
            "synthesis_model": synthesis_model_config, # Explicitly specify
        })
        print_result("Response Format: 'analysis'", result)
        # Track cost
        if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
            try:
                trackable = TrackableResult(
                    cost=result.get("cost", {}).get("total_cost", 0.0),
                    input_tokens=0, # Tokens not typically aggregated
                    output_tokens=0,
                    provider=result.get("synthesis_provider", "unknown"),
                    model=result.get("synthesis_model", "compare_synthesize"),
                    processing_time=result.get("processing_time", 0.0)
                )
                tracker.add_call(trackable)
            except Exception as track_err:
                logger.warning(f"Could not track cost for 'analysis' format: {track_err}", exc_info=False)
    except Exception as e:
        logger.error(f"Error during 'analysis' format demo: {e}", emoji_key="error", exc_info=True)

    # Display cost summary at the end
    tracker.display_summary(console)


async def main():
    """Run the enhanced compare_and_synthesize demo."""
    tracker = CostTracker() # Instantiate tracker
    await setup_gateway_and_tools()
    await run_comparison_demo(tracker) # Pass tracker
    # logger.info("Skipping run_comparison_demo() as the 'compare_and_synthesize' tool function is missing.") # Remove skip message

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Demo stopped by user.")
    except Exception as main_err:
         logger.critical(f"Demo failed with unexpected error: {main_err}", emoji_key="critical", exc_info=True)
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/tournament.py:
--------------------------------------------------------------------------------

```python
"""Tournament tools for Ultimate MCP Server."""
from typing import Any, Dict, List, Optional

from ultimate_mcp_server.exceptions import ToolError

from ultimate_mcp_server.core.models.tournament import (
    CancelTournamentInput,
    CancelTournamentOutput,
    CreateTournamentInput,
    CreateTournamentOutput,
    GetTournamentResultsInput,
    GetTournamentStatusInput,
    GetTournamentStatusOutput,
    TournamentBasicInfo,
    TournamentData,
    TournamentStatus,
)
from ultimate_mcp_server.core.models.tournament import (
    EvaluatorConfig as InputEvaluatorConfig,
)
from ultimate_mcp_server.core.models.tournament import (
    ModelConfig as InputModelConfig,
)
from ultimate_mcp_server.core.tournaments.manager import tournament_manager
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.tournament")

# --- Standalone Tool Functions ---

@with_tool_metrics
@with_error_handling
async def create_tournament(
    name: str,
    prompt: str,
    models: List[Dict[str, Any]],
    rounds: int = 3,
    tournament_type: str = "code",
    extraction_model_id: Optional[str] = "anthropic/claude-3-5-haiku-20241022",
    evaluators: Optional[List[Dict[str, Any]]] = None,
    max_retries_per_model_call: int = 3,
    retry_backoff_base_seconds: float = 1.0,
    max_concurrent_model_calls: int = 5
) -> Dict[str, Any]:
    """
    Creates and starts a new LLM competition (tournament) based on a prompt and model configurations.

    Args:
        name: Human-readable name for the tournament (e.g., "Essay Refinement Contest", "Python Sorting Challenge").
        prompt: The task prompt provided to all participating LLM models.
        models: List of model configurations (external key is "models"). Each config is a dictionary specifying:
            - model_id (str, required): e.g., 'openai/gpt-4o'.
            - diversity_count (int, optional, default 1): Number of variants per model.
            # ... (rest of ModelConfig fields) ...
        rounds: Number of tournament rounds. Each round allows models to refine their previous output (if applicable to the tournament type). Default is 3.
        tournament_type: The type of tournament defining the task and evaluation method. Supported types include:
                         - "code": For evaluating code generation based on correctness and potentially style/efficiency.
                         - "text": For general text generation, improvement, or refinement tasks.
                         Default is "code".
        extraction_model_id: (Optional, primarily for 'code' type) Specific LLM model to use for extracting and evaluating results like code blocks. If None, a default is used.
        evaluators: (Optional) List of evaluator configurations as dicts.
        max_retries_per_model_call: Maximum retries per model call.
        retry_backoff_base_seconds: Base seconds for retry backoff.
        max_concurrent_model_calls: Maximum concurrent model calls.

    Returns:
        Dictionary with tournament creation status containing:
        - tournament_id: Unique identifier for the created tournament.
        - status: Initial tournament status (usually 'PENDING' or 'RUNNING').
        - storage_path: Filesystem path where tournament data will be stored.

    Example:
        {
            "tournament_id": "tour_abc123xyz789",
            "status": "PENDING",
            "storage_path": "/path/to/storage/tour_abc123xyz789"
        }

    Raises:
        ToolError: If input is invalid, tournament creation fails, or scheduling fails.
    """
    logger.info(f"Tool 'create_tournament' invoked for: {name}")
    try:
        parsed_model_configs = [InputModelConfig(**mc) for mc in models]
        parsed_evaluators = [InputEvaluatorConfig(**ev) for ev in (evaluators or [])]
        input_data = CreateTournamentInput(
            name=name,
            prompt=prompt,
            models=parsed_model_configs,
            rounds=rounds,
            tournament_type=tournament_type,
            extraction_model_id=extraction_model_id,
            evaluators=parsed_evaluators,
            max_retries_per_model_call=max_retries_per_model_call,
            retry_backoff_base_seconds=retry_backoff_base_seconds,
            max_concurrent_model_calls=max_concurrent_model_calls
        )

        tournament = tournament_manager.create_tournament(input_data)
        if not tournament:
            raise ToolError("Failed to create tournament entry.")

        logger.info("Calling start_tournament_execution (using asyncio)")
        success = tournament_manager.start_tournament_execution(
            tournament_id=tournament.tournament_id
        )

        if not success:
            logger.error(f"Failed to schedule background execution for tournament {tournament.tournament_id}")
            updated_tournament = tournament_manager.get_tournament(tournament.tournament_id)
            error_msg = updated_tournament.error_message if updated_tournament else "Failed to schedule execution."
            raise ToolError(f"Failed to start tournament execution: {error_msg}")

        logger.info(f"Tournament {tournament.tournament_id} ({tournament.name}) created and background execution started.")
        # Include storage_path in the return value
        output = CreateTournamentOutput(
            tournament_id=tournament.tournament_id,
            status=tournament.status,
            storage_path=tournament.storage_path,
            message=f"Tournament '{tournament.name}' created successfully and execution started."
        )
        return output.dict()

    except ValueError as ve:
        logger.warning(f"Validation error creating tournament: {ve}")
        raise ToolError(f"Invalid input: {ve}") from ve
    except Exception as e:
        logger.error(f"Error creating tournament: {e}", exc_info=True)
        raise ToolError(f"An unexpected error occurred: {e}") from e

@with_tool_metrics
@with_error_handling
async def get_tournament_status(
    tournament_id: str
) -> Dict[str, Any]:
    """Retrieves the current status and progress of a specific tournament.

    Use this tool to monitor an ongoing tournament (PENDING, RUNNING) or check the final
    state (COMPLETED, FAILED, CANCELLED) of a past tournament.

    Args:
        tournament_id: Unique identifier of the tournament to check.

    Returns:
        Dictionary containing tournament status information:
        - tournament_id: Unique identifier for the tournament.
        - name: Human-readable name of the tournament.
        - tournament_type: Type of tournament (e.g., "code", "text").
        - status: Current status (e.g., "PENDING", "RUNNING", "COMPLETED", "FAILED", "CANCELLED").
        - current_round: Current round number (1-based) if RUNNING, else the last active round.
        - total_rounds: Total number of rounds configured for this tournament.
        - created_at: ISO timestamp when the tournament was created.
        - updated_at: ISO timestamp when the tournament status was last updated.
        - error_message: Error message if the tournament FAILED (null otherwise).

    Error Handling:
        - Raises ToolError (400) if tournament_id format is invalid.
        - Raises ToolError (404) if the tournament ID is not found.
        - Raises ToolError (500) for internal server errors.

    Example:
        {
            "tournament_id": "tour_abc123xyz789",
            "name": "Essay Refinement Contest",
            "tournament_type": "text",
            "status": "RUNNING",
            "current_round": 2,
            "total_rounds": 3,
            "created_at": "2023-04-15T14:32:17.123456",
            "updated_at": "2023-04-15T14:45:22.123456",
            "error_message": null
        }
    """
    logger.debug(f"Getting status for tournament: {tournament_id}")
    try:
        if not tournament_id or not isinstance(tournament_id, str):
            raise ToolError(
                status_code=400,
                detail="Invalid tournament ID format. Tournament ID must be a non-empty string."
            )

        try:
            input_data = GetTournamentStatusInput(tournament_id=tournament_id)
        except ValueError as ve:
            raise ToolError(
                status_code=400,
                detail=f"Invalid tournament ID: {str(ve)}"
            ) from ve

        tournament = tournament_manager.get_tournament(input_data.tournament_id, force_reload=True)
        if not tournament:
            raise ToolError(
                status_code=404,
                detail=f"Tournament not found: {tournament_id}. Check if the tournament ID is correct or use list_tournaments to see all available tournaments."
            )

        try:
            output = GetTournamentStatusOutput(
                tournament_id=tournament.tournament_id,
                name=tournament.name,
                tournament_type=tournament.config.tournament_type,
                status=tournament.status,
                current_round=tournament.current_round,
                total_rounds=tournament.config.rounds,
                created_at=tournament.created_at,
                updated_at=tournament.updated_at,
                error_message=tournament.error_message
            )
            return output.dict()
        except Exception as e:
            logger.error(f"Error converting tournament data to output format: {e}", exc_info=True)
            raise ToolError(
                status_code=500,
                detail=f"Error processing tournament data: {str(e)}. The tournament data may be corrupted."
            ) from e
    except ToolError:
        raise
    except Exception as e:
        logger.error(f"Error getting tournament status for {tournament_id}: {e}", exc_info=True)
        raise ToolError(
            status_code=500,
            detail=f"Internal server error retrieving tournament status: {str(e)}. Please try again or check the server logs."
        ) from e

@with_tool_metrics
@with_error_handling
async def list_tournaments(
) -> List[Dict[str, Any]]:
    """Lists all created tournaments with basic identifying information and status.

    Useful for discovering existing tournaments and their current states without fetching full results.

    Returns:
        List of dictionaries, each containing basic tournament info:
        - tournament_id: Unique identifier for the tournament.
        - name: Human-readable name of the tournament.
        - tournament_type: Type of tournament (e.g., "code", "text").
        - status: Current status (e.g., "PENDING", "RUNNING", "COMPLETED", "FAILED", "CANCELLED").
        - created_at: ISO timestamp when the tournament was created.
        - updated_at: ISO timestamp when the tournament was last updated.

    Example:
        [
            {
                "tournament_id": "tour_abc123",
                "name": "Tournament A",
                "tournament_type": "code",
                "status": "COMPLETED",
                "created_at": "2023-04-10T10:00:00",
                "updated_at": "2023-04-10T12:30:00"
            },
            ...
        ]
    """
    logger.debug("Listing all tournaments")
    try:
        tournaments = tournament_manager.list_tournaments()
        output_list = []
        for tournament in tournaments:
            try:
                # Ensure tournament object has necessary attributes before accessing
                if not hasattr(tournament, 'tournament_id') or \
                   not hasattr(tournament, 'name') or \
                   not hasattr(tournament, 'config') or \
                   not hasattr(tournament.config, 'tournament_type') or \
                   not hasattr(tournament, 'status') or \
                   not hasattr(tournament, 'created_at') or \
                   not hasattr(tournament, 'updated_at'):
                    logger.warning(f"Skipping tournament due to missing attributes: {getattr(tournament, 'tournament_id', 'UNKNOWN ID')}")
                    continue

                basic_info = TournamentBasicInfo(
                    tournament_id=tournament.tournament_id,
                    name=tournament.name,
                    tournament_type=tournament.config.tournament_type,
                    status=tournament.status,
                    created_at=tournament.created_at,
                    updated_at=tournament.updated_at,
                )
                output_list.append(basic_info.dict())
            except Exception as e:
                logger.warning(f"Skipping tournament {getattr(tournament, 'tournament_id', 'UNKNOWN')} due to data error during processing: {e}")
        return output_list
    except Exception as e:
        logger.error(f"Error listing tournaments: {e}", exc_info=True)
        raise ToolError(
            status_code=500,
            detail=f"Internal server error listing tournaments: {str(e)}"
        ) from e

@with_tool_metrics
@with_error_handling
async def get_tournament_results(
    tournament_id: str
) -> List[Dict[str, str]]:
    """Retrieves the complete results and configuration for a specific tournament.

    Provides comprehensive details including configuration, final scores (if applicable),
    detailed round-by-round results, model outputs, and any errors encountered.
    Use this *after* a tournament has finished (COMPLETED or FAILED) for full analysis.

    Args:
        tournament_id: Unique identifier for the tournament.

    Returns:
        Dictionary containing the full tournament data (structure depends on the tournament manager's implementation, but generally includes config, status, results, timestamps, etc.).

    Example (Conceptual - actual structure may vary):
        {
            "tournament_id": "tour_abc123",
            "name": "Sorting Algo Test",
            "status": "COMPLETED",
            "config": { ... },
            "results": { "scores": { ... }, "round_results": [ { ... }, ... ] },
            "created_at": "...",
            "updated_at": "...",
            "error_message": null
        }

    Raises:
        ToolError: If the tournament ID is invalid, not found, results are not ready (still PENDING/RUNNING), or an internal error occurs.
    """
    logger.debug(f"Getting results for tournament: {tournament_id}")
    try:
        if not tournament_id or not isinstance(tournament_id, str):
            raise ToolError(
                status_code=400,
                detail="Invalid tournament ID format. Tournament ID must be a non-empty string."
            )

        try:
            input_data = GetTournamentResultsInput(tournament_id=tournament_id)
        except ValueError as ve:
             raise ToolError(
                status_code=400,
                detail=f"Invalid tournament ID: {str(ve)}"
            ) from ve

        # Make sure to request TournamentData which should contain results
        tournament_data: Optional[TournamentData] = tournament_manager.get_tournament(input_data.tournament_id, force_reload=True)

        if not tournament_data:
            # Check if the tournament exists but just has no results yet (e.g., PENDING)
            tournament_status_info = tournament_manager.get_tournament(tournament_id) # Gets basic info
            if tournament_status_info:
                current_status = tournament_status_info.status
                if current_status in [TournamentStatus.PENDING, TournamentStatus.RUNNING]:
                     raise ToolError(
                         status_code=404, # Use 404 to indicate results not ready
                         detail=f"Tournament '{tournament_id}' is currently {current_status}. Results are not yet available."
                     )
                else: # Should have results if COMPLETED or ERROR, maybe data issue?
                     logger.error(f"Tournament {tournament_id} status is {current_status} but get_tournament_results returned None.")
                     raise ToolError(
                         status_code=500,
                         detail=f"Could not retrieve results for tournament '{tournament_id}' despite status being {current_status}. There might be an internal data issue."
                     )
            else:
                raise ToolError(
                    status_code=404,
                    detail=f"Tournament not found: {tournament_id}. Cannot retrieve results."
                )

        # NEW: Return a structure that FastMCP might recognize as a pre-formatted content list
        json_string = tournament_data.json()
        logger.info(f"[DEBUG_GET_RESULTS] Returning pre-formatted TextContent list. JSON Snippet: {json_string[:150]}")
        return [{ "type": "text", "text": json_string }]

    except ToolError:
        raise
    except Exception as e:
        logger.error(f"Error getting tournament results for {tournament_id}: {e}", exc_info=True)
        raise ToolError(
            f"Internal server error retrieving tournament results: {str(e)}",
            500 # status_code
        ) from e

@with_tool_metrics
@with_error_handling
async def cancel_tournament(
    tournament_id: str
) -> Dict[str, Any]:
    """Attempts to cancel a running (RUNNING) or pending (PENDING) tournament.

    Signals the tournament manager to stop processing. Cancellation is not guaranteed
    to be immediate. Check status afterwards using `get_tournament_status`.
    Cannot cancel tournaments that are already COMPLETED, FAILED, or CANCELLED.

    Args:
        tournament_id: Unique identifier for the tournament to cancel.

    Returns:
        Dictionary confirming the cancellation attempt:
        - tournament_id: The ID of the tournament targeted for cancellation.
        - status: The status *after* the cancellation attempt (e.g., "CANCELLED", or the previous state like "COMPLETED" if cancellation was not possible).
        - message: A message indicating the outcome (e.g., "Tournament cancellation requested successfully.", "Cancellation failed: Tournament is already COMPLETED.").

    Raises:
        ToolError: If the tournament ID is invalid, not found, or an internal error occurs.
    """
    logger.info(f"Received request to cancel tournament: {tournament_id}")
    try:
        if not tournament_id or not isinstance(tournament_id, str):
            raise ToolError(status_code=400, detail="Invalid tournament ID format.")

        try:
            input_data = CancelTournamentInput(tournament_id=tournament_id)
        except ValueError as ve:
            raise ToolError(status_code=400, detail=f"Invalid tournament ID: {str(ve)}") from ve

        # Call the manager's cancel function
        success, message, final_status = await tournament_manager.cancel_tournament(input_data.tournament_id)

        # Prepare output using the Pydantic model
        output = CancelTournamentOutput(
            tournament_id=tournament_id,
            status=final_status, # Return the actual status after attempt
            message=message
        )

        if not success:
            # Log the failure but return the status/message from the manager
            logger.warning(f"Cancellation attempt for tournament {tournament_id} reported failure: {message}")
            # Raise ToolError if the status implies a client error (e.g., not found)
            if "not found" in message.lower():
                raise ToolError(status_code=404, detail=message)
            elif final_status in [TournamentStatus.COMPLETED, TournamentStatus.FAILED, TournamentStatus.CANCELLED] and "already" in message.lower():
                raise ToolError(status_code=409, detail=message)
            # Optionally handle other errors as 500
            # else:
            #     raise ToolError(status_code=500, detail=f"Cancellation failed: {message}")
        else:
            logger.info(f"Cancellation attempt for tournament {tournament_id} successful. Final status: {final_status}")

        return output.dict()

    except ToolError:
        raise
    except Exception as e:
        logger.error(f"Error cancelling tournament {tournament_id}: {e}", exc_info=True)
        raise ToolError(status_code=500, detail=f"Internal server error during cancellation: {str(e)}") from e
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/openai.py:
--------------------------------------------------------------------------------

```python
"""OpenAI provider implementation."""
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple

from openai import AsyncOpenAI

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.utils import get_logger

# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.openai")


class OpenAIProvider(BaseProvider):
    """Provider implementation for OpenAI API."""
    
    provider_name = Provider.OPENAI.value
    
    def __init__(self, api_key: Optional[str] = None, **kwargs):
        """Initialize the OpenAI provider.
        
        Args:
            api_key: OpenAI API key
            **kwargs: Additional options
        """
        super().__init__(api_key=api_key, **kwargs)
        self.base_url = kwargs.get("base_url")
        self.organization = kwargs.get("organization")
        self.models_cache = None
        
    async def initialize(self) -> bool:
        """Initialize the OpenAI client.
        
        Returns:
            bool: True if initialization was successful
        """
        try:
            self.client = AsyncOpenAI(
                api_key=self.api_key, 
                base_url=self.base_url,
                organization=self.organization,
            )
            
            # Skip API call if using a mock key (for tests)
            if self.api_key and "mock-" in self.api_key:
                self.logger.info(
                    "Using mock OpenAI key - skipping API validation",
                    emoji_key="mock"
                )
                return True
            
            # Test connection by listing models
            await self.list_models()
            
            self.logger.success(
                "OpenAI provider initialized successfully", 
                emoji_key="provider"
            )
            return True
            
        except Exception as e:
            self.logger.error(
                f"Failed to initialize OpenAI provider: {str(e)}", 
                emoji_key="error"
            )
            return False
        
    async def generate_completion(
        self,
        prompt: Optional[str] = None,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> ModelResponse:
        """Generate a completion using OpenAI.
        
        Args:
            prompt: Text prompt to send to the model
            model: Model name to use (e.g., "gpt-4o")
            max_tokens: Maximum tokens to generate
            temperature: Temperature parameter (0.0-1.0)
            **kwargs: Additional model-specific parameters
            
        Returns:
            ModelResponse with completion result
            
        Raises:
            Exception: If API call fails
        """
        if not self.client:
            await self.initialize()
            
        # Use default model if not specified
        model = model or self.get_default_model()
        
        # Strip provider prefix if present (e.g., "openai/gpt-4o" -> "gpt-4o")
        if model.startswith(f"{self.provider_name}/"):
            original_model = model
            model = model.split("/", 1)[1]
            self.logger.debug(f"Stripped provider prefix from model name: {original_model} -> {model}")
        
        # Handle case when messages are provided instead of prompt (for chat_completion)
        messages = kwargs.pop("messages", None)
        
        # If neither prompt nor messages are provided, raise an error
        if prompt is None and not messages:
            raise ValueError("Either 'prompt' or 'messages' must be provided")
            
        # Create messages if not already provided
        if not messages:
            messages = [{"role": "user", "content": prompt}]
        
        # Prepare API call parameters
        params = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
        }
        
        # Add max_tokens if specified
        if max_tokens is not None:
            params["max_tokens"] = max_tokens
            
        # Check for json_mode flag and remove it from kwargs
        json_mode = kwargs.pop("json_mode", False)
        if json_mode:
            # Use the correct response_format for JSON mode
            params["response_format"] = {"type": "json_object"}
            self.logger.debug("Setting response_format to JSON mode for OpenAI")

        # Handle any legacy response_format passed directly, but prefer json_mode
        if "response_format" in kwargs and not json_mode:
             # Support both direct format object and type-only specification
             response_format = kwargs.pop("response_format")
             if isinstance(response_format, dict):
                 params["response_format"] = response_format
             elif isinstance(response_format, str) and response_format in ["json_object", "text"]:
                 params["response_format"] = {"type": response_format}
             self.logger.debug(f"Setting response_format from direct param: {params.get('response_format')}")

        # Add any remaining additional parameters
        params.update(kwargs)

        # --- Special handling for specific model parameter constraints ---
        if model == 'o3-mini':
            if 'temperature' in params:
                self.logger.debug(f"Removing unsupported 'temperature' parameter for model {model}")
                del params['temperature']
        elif model == 'o1-preview':
            current_temp = params.get('temperature')
            # Only allow temperature if it's explicitly set to 1.0, otherwise remove it to use API default.
            if current_temp is not None and current_temp != 1.0:
                self.logger.debug(f"Removing non-default 'temperature' ({current_temp}) for model {model}")
                del params['temperature']
        # --- End special handling ---
        
        # Log request
        prompt_length = len(prompt) if prompt else sum(len(m.get("content", "")) for m in messages)
        self.logger.info(
            f"Generating completion with OpenAI model {model}",
            emoji_key=self.provider_name,
            prompt_length=prompt_length,
            json_mode=json_mode # Log if json_mode was requested
        )
        
        try:
            # API call with timing
            response, processing_time = await self.process_with_timer(
                self.client.chat.completions.create, **params
            )
            
            # Extract response text
            completion_text = response.choices[0].message.content
            
            # Create message object for chat_completion
            message = {
                "role": "assistant",
                "content": completion_text
            }
            
            # Create standardized response
            result = ModelResponse(
                text=completion_text,
                model=model,
                provider=self.provider_name,
                input_tokens=response.usage.prompt_tokens,
                output_tokens=response.usage.completion_tokens,
                total_tokens=response.usage.total_tokens,
                processing_time=processing_time,
                raw_response=response,
            )
            
            # Add message to result for chat_completion
            result.message = message
            
            # Log success
            self.logger.success(
                "OpenAI completion successful",
                emoji_key="success",
                model=model,
                tokens={
                    "input": result.input_tokens,
                    "output": result.output_tokens
                },
                cost=result.cost,
                time=result.processing_time
            )
            
            return result
            
        except Exception as e:
            self.logger.error(
                f"OpenAI completion failed: {str(e)}",
                emoji_key="error",
                model=model
            )
            raise
            
    async def generate_completion_stream(
        self,
        prompt: Optional[str] = None,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
        """Generate a streaming completion using OpenAI.
        
        Args:
            prompt: Text prompt to send to the model
            model: Model name to use (e.g., "gpt-4o")
            max_tokens: Maximum tokens to generate
            temperature: Temperature parameter (0.0-1.0)
            **kwargs: Additional model-specific parameters
            
        Yields:
            Tuple of (text_chunk, metadata)
            
        Raises:
            Exception: If API call fails
        """
        if not self.client:
            await self.initialize()
            
        # Use default model if not specified
        model = model or self.get_default_model()
        
        # Strip provider prefix if present (e.g., "openai/gpt-4o" -> "gpt-4o")
        if model.startswith(f"{self.provider_name}/"):
            original_model = model
            model = model.split("/", 1)[1]
            self.logger.debug(f"Stripped provider prefix from model name (stream): {original_model} -> {model}")
        
        # Handle case when messages are provided instead of prompt (for chat_completion)
        messages = kwargs.pop("messages", None)
        
        # If neither prompt nor messages are provided, raise an error
        if prompt is None and not messages:
            raise ValueError("Either 'prompt' or 'messages' must be provided")
            
        # Create messages if not already provided
        if not messages:
            messages = [{"role": "user", "content": prompt}]
        
        # Prepare API call parameters
        params = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "stream": True,
        }
        
        # Add max_tokens if specified
        if max_tokens is not None:
            params["max_tokens"] = max_tokens
            
        # Check for json_mode flag and remove it from kwargs
        json_mode = kwargs.pop("json_mode", False)
        if json_mode:
            # Use the correct response_format for JSON mode
            params["response_format"] = {"type": "json_object"}
            self.logger.debug("Setting response_format to JSON mode for OpenAI streaming")

        # Add any remaining additional parameters
        params.update(kwargs)
        
        # Log request
        prompt_length = len(prompt) if prompt else sum(len(m.get("content", "")) for m in messages)
        self.logger.info(
            f"Generating streaming completion with OpenAI model {model}",
            emoji_key=self.provider_name,
            prompt_length=prompt_length,
            json_mode=json_mode # Log if json_mode was requested
        )
        
        start_time = time.time()
        total_chunks = 0
        
        try:
            # Make streaming API call
            stream = await self.client.chat.completions.create(**params)
            
            # Process the stream
            async for chunk in stream:
                total_chunks += 1
                
                # Extract content from the chunk
                delta = chunk.choices[0].delta
                content = delta.content or ""
                
                # Metadata for this chunk
                metadata = {
                    "model": model,
                    "provider": self.provider_name,
                    "chunk_index": total_chunks,
                    "finish_reason": chunk.choices[0].finish_reason,
                }
                
                yield content, metadata
                
            # Log success
            processing_time = time.time() - start_time
            self.logger.success(
                "OpenAI streaming completion successful",
                emoji_key="success",
                model=model,
                chunks=total_chunks,
                time=processing_time
            )
            
        except Exception as e:
            self.logger.error(
                f"OpenAI streaming completion failed: {str(e)}",
                emoji_key="error",
                model=model
            )
            raise
            
    async def list_models(self) -> List[Dict[str, Any]]:
        """
        List available OpenAI models with their capabilities and metadata.
        
        This method queries the OpenAI API to retrieve a comprehensive list of available
        models accessible to the current API key. It filters the results to focus on
        GPT models that are relevant to text generation tasks, excluding embeddings,
        moderation, and other specialized models.
        
        For efficiency, the method uses a caching mechanism that stores the model list
        after the first successful API call. Subsequent calls return the cached results
        without making additional API requests. This reduces latency and API usage while
        ensuring the available models information is readily accessible.
        
        If the API call fails (due to network issues, invalid credentials, etc.), the
        method falls back to returning a hardcoded list of common OpenAI models to ensure
        the application can continue functioning with reasonable defaults.
        
        Returns:
            A list of dictionaries containing model information with these fields:
            - id: The model identifier used when making API calls (e.g., "gpt-4o")
            - provider: Always "openai" for this provider
            - created: Timestamp of when the model was created (if available from API)
            - owned_by: Organization that owns the model (e.g., "openai", "system")
            
            The fallback model list (used on API errors) includes basic information
            for gpt-4o, gpt-4.1-mini, and other commonly used models.
            
        Example response:
            ```python
            [
                {
                    "id": "gpt-4o",
                    "provider": "openai",
                    "created": 1693399330,
                    "owned_by": "openai"
                },
                {
                    "id": "gpt-4.1-mini",
                    "provider": "openai", 
                    "created": 1705006269,
                    "owned_by": "openai"
                }
            ]
            ```
            
        Note:
            The specific models returned depend on the API key's permissions and
            the models currently offered by OpenAI. As new models are released
            or existing ones deprecated, the list will change accordingly.
        """
        if self.models_cache:
            return self.models_cache
            
        try:
            if not self.client:
                await self.initialize()
                
            # Fetch models from API
            response = await self.client.models.list()
            
            # Process response
            models = []
            for model in response.data:
                # Filter to relevant models (chat-capable GPT models)
                if model.id.startswith("gpt-"):
                    models.append({
                        "id": model.id,
                        "provider": self.provider_name,
                        "created": model.created,
                        "owned_by": model.owned_by,
                    })
            
            # Cache results
            self.models_cache = models
            
            return models
            
        except Exception as e:
            self.logger.error(
                f"Failed to list OpenAI models: {str(e)}",
                emoji_key="error"
            )
            
            # Return basic models on error
            return [
                {
                    "id": "gpt-4o",
                    "provider": self.provider_name,
                    "description": "Most capable GPT-4 model",
                },
                {
                    "id": "gpt-4.1-mini",
                    "provider": self.provider_name,
                    "description": "Smaller, efficient GPT-4 model",
                },
                {
                    "id": "gpt-4.1-mini",
                    "provider": self.provider_name,
                    "description": "Fast and cost-effective GPT model",
                },
            ]
            
    def get_default_model(self) -> str:
        """
        Get the default OpenAI model identifier to use when none is specified.
        
        This method determines the appropriate default model for OpenAI completions
        through a prioritized selection process:
        
        1. First, it attempts to load the default_model setting from the Ultimate MCP Server
           configuration system (from providers.openai.default_model in the config)
        2. If that's not available or valid, it falls back to a hardcoded default model
           that represents a reasonable balance of capability, cost, and availability
        
        Using the configuration system allows for flexible deployment-specific defaults
        without code changes, while the hardcoded fallback ensures the system remains
        functional even with minimal configuration.
        
        Returns:
            String identifier of the default OpenAI model to use (e.g., "gpt-4.1-mini").
            This identifier can be directly used in API calls to the OpenAI API.
            
        Note:
            The current hardcoded default is "gpt-4.1-mini", chosen for its balance of
            capability and cost. This may change in future versions as new models are
            released or existing ones are deprecated.
        """
        from ultimate_mcp_server.config import get_config
        
        # Safely get from config if available
        try:
            config = get_config()
            provider_config = getattr(config, 'providers', {}).get(self.provider_name, None)
            if provider_config and provider_config.default_model:
                return provider_config.default_model
        except (AttributeError, TypeError):
            # Handle case when providers attribute doesn't exist or isn't a dict
            pass
            
        # Otherwise return hard-coded default
        return "gpt-4.1-mini"
        
    async def check_api_key(self) -> bool:
        """Check if the OpenAI API key is valid.
        
        This method performs a lightweight validation of the configured OpenAI API key
        by attempting to list available models. A successful API call confirms that:
        
        1. The API key is properly formatted and not empty
        2. The key has at least read permissions on the OpenAI API
        3. The API endpoint is accessible and responding
        4. The account associated with the key is active and not suspended
        
        This validation is useful when initializing the provider to ensure the API key
        works before attempting to make model completion requests that might fail later.
        
        Returns:
            bool: True if the API key is valid and the API is accessible, False otherwise.
            A False result may indicate an invalid key, network issues, or API service disruption.
            
        Notes:
            - This method simply calls list_models() which caches results for efficiency
            - No detailed error information is returned, only a boolean success indicator
            - The method silently catches all exceptions and returns False rather than raising
            - For debugging key issues, check server logs for the full exception details
        """
        try:
            # Just list models as a simple validation
            await self.list_models()
            return True
        except Exception:
            return False
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/rag_engine.py:
--------------------------------------------------------------------------------

```python
"""RAG engine for retrieval-augmented generation."""
import time
from typing import Any, Dict, List, Optional, Set

from ultimate_mcp_server.core.models.requests import CompletionRequest
from ultimate_mcp_server.services.cache import get_cache_service
from ultimate_mcp_server.services.knowledge_base.feedback import get_rag_feedback_service
from ultimate_mcp_server.services.knowledge_base.retriever import KnowledgeBaseRetriever
from ultimate_mcp_server.services.knowledge_base.utils import (
    extract_keywords,
    generate_token_estimate,
)
from ultimate_mcp_server.services.prompts import get_prompt_service
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


# Default RAG prompt templates
DEFAULT_RAG_TEMPLATES = {
    "rag_default": """Answer the question based only on the following context:

{context}

Question: {query}

Answer:""",
    
    "rag_with_sources": """Answer the question based only on the following context:

{context}

Question: {query}

Provide your answer along with the source document IDs in [brackets] for each piece of information:""",

    "rag_summarize": """Summarize the following context information:

{context}

Summary:""",

    "rag_analysis": """Analyze the following information and provide key insights:

{context}

Query: {query}

Analysis:"""
}


class RAGEngine:
    """Engine for retrieval-augmented generation."""
    
    def __init__(
        self, 
        retriever: KnowledgeBaseRetriever,
        provider_manager,
        optimization_service=None,
        analytics_service=None
    ):
        """Initialize the RAG engine.
        
        Args:
            retriever: Knowledge base retriever
            provider_manager: Provider manager for LLM access
            optimization_service: Optional optimization service for model selection
            analytics_service: Optional analytics service for tracking
        """
        self.retriever = retriever
        self.provider_manager = provider_manager
        self.optimization_service = optimization_service
        self.analytics_service = analytics_service
        
        # Initialize prompt service
        self.prompt_service = get_prompt_service()
        
        # Initialize feedback service
        self.feedback_service = get_rag_feedback_service()
        
        # Initialize cache service
        self.cache_service = get_cache_service()
        
        # Register RAG templates
        for template_name, template_text in DEFAULT_RAG_TEMPLATES.items():
            self.prompt_service.register_template(template_name, template_text)
        
        logger.info("RAG engine initialized", extra={"emoji_key": "success"})
    
    async def _select_optimal_model(self, task_info: Dict[str, Any]) -> Dict[str, Any]:
        """Select optimal model for a RAG task.
        
        Args:
            task_info: Task information
            
        Returns:
            Model selection
        """
        if self.optimization_service:
            try:
                return await self.optimization_service.get_optimal_model(task_info)
            except Exception as e:
                logger.error(
                    f"Error selecting optimal model: {str(e)}", 
                    extra={"emoji_key": "error"}
                )
        
        # Fallback to default models for RAG
        return {
            "provider": "openai",
            "model": "gpt-4.1-mini"
        }
    
    async def _track_rag_metrics(
        self,
        knowledge_base: str,
        query: str,
        provider: str,
        model: str,
        metrics: Dict[str, Any]
    ) -> None:
        """Track RAG operation metrics.
        
        Args:
            knowledge_base: Knowledge base name
            query: Query text
            provider: Provider name
            model: Model name
            metrics: Operation metrics
        """
        if not self.analytics_service:
            return
            
        try:
            await self.analytics_service.track_operation(
                operation_type="rag",
                provider=provider,
                model=model,
                input_tokens=metrics.get("input_tokens", 0),
                output_tokens=metrics.get("output_tokens", 0),
                total_tokens=metrics.get("total_tokens", 0),
                cost=metrics.get("cost", 0.0),
                duration=metrics.get("total_time", 0.0),
                metadata={
                    "knowledge_base": knowledge_base,
                    "query": query,
                    "retrieval_count": metrics.get("retrieval_count", 0),
                    "retrieval_time": metrics.get("retrieval_time", 0.0),
                    "generation_time": metrics.get("generation_time", 0.0)
                }
            )
        except Exception as e:
            logger.error(
                f"Error tracking RAG metrics: {str(e)}", 
                extra={"emoji_key": "error"}
            )
    
    def _format_context(
        self, 
        results: List[Dict[str, Any]],
        include_metadata: bool = True
    ) -> str:
        """Format retrieval results into context.
        
        Args:
            results: List of retrieval results
            include_metadata: Whether to include metadata
            
        Returns:
            Formatted context
        """
        context_parts = []
        
        for i, result in enumerate(results):
            # Format metadata if included
            metadata_str = ""
            if include_metadata and result.get("metadata"):
                # Extract relevant metadata fields
                metadata_fields = []
                for key in ["title", "source", "author", "date", "source_id", "potential_title"]:
                    if key in result["metadata"]:
                        metadata_fields.append(f"{key}: {result['metadata'][key]}")
                
                if metadata_fields:
                    metadata_str = " | ".join(metadata_fields)
                    metadata_str = f"[{metadata_str}]\n"
            
            # Add document with index
            context_parts.append(f"Document {i+1} [ID: {result['id']}]:\n{metadata_str}{result['document']}")
        
        return "\n\n".join(context_parts)
    
    async def _adjust_retrieval_params(self, query: str, knowledge_base_name: str) -> Dict[str, Any]:
        """Dynamically adjust retrieval parameters based on query complexity.
        
        Args:
            query: Query text
            knowledge_base_name: Knowledge base name
            
        Returns:
            Adjusted parameters
        """
        # Analyze query complexity
        query_length = len(query.split())
        query_keywords = extract_keywords(query)
        
        # Base parameters
        params = {
            "top_k": 5,
            "retrieval_method": "vector",
            "min_score": 0.6,
            "search_params": {"search_ef": 100}
        }
        
        # Adjust based on query length
        if query_length > 30:  # Complex query
            params["top_k"] = 8
            params["search_params"]["search_ef"] = 200
            params["retrieval_method"] = "hybrid"
        elif query_length < 5:  # Very short query
            params["top_k"] = 10  # Get more results for short queries
            params["min_score"] = 0.5  # Lower threshold
        
        # Check if similar queries exist
        similar_queries = await self.feedback_service.get_similar_queries(
            knowledge_base_name=knowledge_base_name,
            query=query,
            top_k=1,
            threshold=0.85
        )
        
        # If we have similar past queries, use their parameters
        if similar_queries:
            params["retrieval_method"] = "hybrid"  # Hybrid works well for repeat queries
        
        # Add keywords
        params["additional_keywords"] = query_keywords
        
        return params
    
    async def _analyze_used_documents(
        self, 
        answer: str, 
        results: List[Dict[str, Any]]
    ) -> Set[str]:
        """Analyze which documents were used in the answer.
        
        Args:
            answer: Generated answer
            results: List of retrieval results
            
        Returns:
            Set of document IDs used in the answer
        """
        used_ids = set()
        
        # Check for explicit mentions of document IDs
        for result in results:
            doc_id = result["id"]
            if f"[ID: {doc_id}]" in answer or f"[{doc_id}]" in answer:
                used_ids.add(doc_id)
        
        # Check content overlap (crude approximation)
        for result in results:
            if result["id"] in used_ids:
                continue
                
            # Check for significant phrases from document in answer
            doc_keywords = extract_keywords(result["document"], max_keywords=5)
            matched_keywords = sum(1 for kw in doc_keywords if kw in answer.lower())
            
            # If multiple keywords match, consider document used
            if matched_keywords >= 2:
                used_ids.add(result["id"])
        
        return used_ids
    
    async def _check_cached_response(
        self,
        knowledge_base_name: str,
        query: str
    ) -> Optional[Dict[str, Any]]:
        """Check for cached RAG response.
        
        Args:
            knowledge_base_name: Knowledge base name
            query: Query text
            
        Returns:
            Cached response or None
        """
        if not self.cache_service:
            return None
            
        cache_key = f"rag_{knowledge_base_name}_{query}"
        
        try:
            cached = await self.cache_service.get(cache_key)
            if cached:
                logger.info(
                    f"Using cached RAG response for query in '{knowledge_base_name}'",
                    extra={"emoji_key": "cache"}
                )
                return cached
        except Exception as e:
            logger.error(
                f"Error checking cache: {str(e)}",
                extra={"emoji_key": "error"}
            )
            
        return None
    
    async def _cache_response(
        self,
        knowledge_base_name: str,
        query: str,
        response: Dict[str, Any]
    ) -> None:
        """Cache RAG response.
        
        Args:
            knowledge_base_name: Knowledge base name
            query: Query text
            response: Response to cache
        """
        if not self.cache_service:
            return
            
        cache_key = f"rag_{knowledge_base_name}_{query}"
        
        try:
            # Cache for 1 day
            await self.cache_service.set(cache_key, response, ttl=86400)
        except Exception as e:
            logger.error(
                f"Error caching response: {str(e)}",
                extra={"emoji_key": "error"}
            )
    
    async def generate_with_rag(
        self,
        knowledge_base_name: str,
        query: str,
        provider: Optional[str] = None,
        model: Optional[str] = None,
        template: str = "rag_default",
        max_tokens: int = 1000,
        temperature: float = 0.3,
        top_k: Optional[int] = None,
        retrieval_method: Optional[str] = None,
        min_score: Optional[float] = None,
        metadata_filter: Optional[Dict[str, Any]] = None,
        include_metadata: bool = True,
        include_sources: bool = True,
        use_cache: bool = True,
        apply_feedback: bool = True,
        search_params: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Generate a response using RAG.
        
        Args:
            knowledge_base_name: Knowledge base name
            query: Query text
            provider: Provider name (auto-selected if None)
            model: Model name (auto-selected if None)
            template: RAG prompt template name
            max_tokens: Maximum tokens for generation
            temperature: Temperature for generation
            top_k: Number of documents to retrieve (auto-adjusted if None)
            retrieval_method: Retrieval method (vector, hybrid)
            min_score: Minimum similarity score
            metadata_filter: Optional metadata filter
            include_metadata: Whether to include metadata in context
            include_sources: Whether to include sources in response
            use_cache: Whether to use cached responses
            apply_feedback: Whether to apply feedback adjustments
            search_params: Optional ChromaDB search parameters
            
        Returns:
            Generated response with sources and metrics
        """
        start_time = time.time()
        operation_metrics = {}
        
        # Check cache first if enabled
        if use_cache:
            cached_response = await self._check_cached_response(knowledge_base_name, query)
            if cached_response:
                return cached_response
        
        # Auto-select model if not specified
        if not provider or not model:
            # Determine task complexity based on query
            task_complexity = "medium"
            if len(query) > 100:
                task_complexity = "high"
            elif len(query) < 30:
                task_complexity = "low"
                
            # Get optimal model
            model_selection = await self._select_optimal_model({
                "task_type": "rag_completion",
                "complexity": task_complexity,
                "query_length": len(query)
            })
            
            provider = provider or model_selection["provider"]
            model = model or model_selection["model"]
        
        # Dynamically adjust retrieval parameters if not specified
        if top_k is None or retrieval_method is None or min_score is None:
            adjusted_params = await self._adjust_retrieval_params(query, knowledge_base_name)
            
            # Use specified parameters or adjusted ones
            top_k = top_k or adjusted_params["top_k"]
            retrieval_method = retrieval_method or adjusted_params["retrieval_method"]
            min_score = min_score or adjusted_params["min_score"]
            search_params = search_params or adjusted_params.get("search_params")
            additional_keywords = adjusted_params.get("additional_keywords")
        else:
            additional_keywords = None
        
        # Retrieve context
        retrieval_start = time.time()
        
        if retrieval_method == "hybrid":
            # Use hybrid search
            retrieval_result = await self.retriever.retrieve_hybrid(
                knowledge_base_name=knowledge_base_name,
                query=query,
                top_k=top_k,
                min_score=min_score,
                metadata_filter=metadata_filter,
                additional_keywords=additional_keywords,
                apply_feedback=apply_feedback,
                search_params=search_params
            )
        else:
            # Use standard vector search
            retrieval_result = await self.retriever.retrieve(
                knowledge_base_name=knowledge_base_name,
                query=query,
                top_k=top_k,
                min_score=min_score,
                metadata_filter=metadata_filter,
                content_filter=None,  # No content filter for vector-only search
                apply_feedback=apply_feedback,
                search_params=search_params
            )
        
        retrieval_time = time.time() - retrieval_start
        operation_metrics["retrieval_time"] = retrieval_time
        
        # Check if retrieval was successful
        if retrieval_result.get("status") != "success" or not retrieval_result.get("results"):
            logger.warning(
                f"No relevant documents found for query in knowledge base '{knowledge_base_name}'", 
                extra={"emoji_key": "warning"}
            )
            
            # Return error response
            error_response = {
                "status": "no_results",
                "message": "No relevant documents found for query",
                "query": query,
                "retrieval_time": retrieval_time,
                "total_time": time.time() - start_time
            }
            
            # Cache error response if enabled
            if use_cache:
                await self._cache_response(knowledge_base_name, query, error_response)
            
            return error_response
        
        # Format context from retrieval results
        context = self._format_context(
            retrieval_result["results"],
            include_metadata=include_metadata
        )
        
        # Get prompt template
        template_text = self.prompt_service.get_template(template)
        if not template_text:
            # Fallback to default template
            template_text = DEFAULT_RAG_TEMPLATES["rag_default"]
        
        # Format prompt with template
        rag_prompt = template_text.format(
            context=context,
            query=query
        )
        
        # Calculate token estimates
        input_tokens = generate_token_estimate(rag_prompt)
        operation_metrics["context_tokens"] = generate_token_estimate(context)
        operation_metrics["input_tokens"] = input_tokens
        operation_metrics["retrieval_count"] = len(retrieval_result["results"])
        
        # Generate completion
        generation_start = time.time()
        
        provider_service = self.provider_manager.get_provider(provider)
        completion_request = CompletionRequest(
            prompt=rag_prompt,
            model=model,
            max_tokens=max_tokens,
            temperature=temperature
        )
        
        completion_result = await provider_service.generate_completion(
            request=completion_request
        )
        
        generation_time = time.time() - generation_start
        operation_metrics["generation_time"] = generation_time
        
        # Extract completion and metrics
        completion = completion_result.get("completion", "")
        operation_metrics["output_tokens"] = completion_result.get("output_tokens", 0)
        operation_metrics["total_tokens"] = completion_result.get("total_tokens", 0)
        operation_metrics["cost"] = completion_result.get("cost", 0.0)
        operation_metrics["total_time"] = time.time() - start_time
        
        # Prepare sources if requested
        sources = []
        if include_sources:
            for result in retrieval_result["results"]:
                # Include limited context for each source
                doc_preview = result["document"]
                if len(doc_preview) > 100:
                    doc_preview = doc_preview[:100] + "..."
                    
                sources.append({
                    "id": result["id"],
                    "document": doc_preview,
                    "score": result["score"],
                    "metadata": result.get("metadata", {})
                })
        
        # Analyze which documents were used in the answer
        used_doc_ids = await self._analyze_used_documents(completion, retrieval_result["results"])
        
        # Record feedback
        if apply_feedback:
            await self.retriever.record_feedback(
                knowledge_base_name=knowledge_base_name,
                query=query,
                retrieved_documents=retrieval_result["results"],
                used_document_ids=list(used_doc_ids)
            )
        
        # Track metrics
        await self._track_rag_metrics(
            knowledge_base=knowledge_base_name,
            query=query,
            provider=provider,
            model=model,
            metrics=operation_metrics
        )
        
        logger.info(
            f"Generated RAG response using {provider}/{model} in {operation_metrics['total_time']:.2f}s", 
            extra={"emoji_key": "success"}
        )
        
        # Create response
        response = {
            "status": "success",
            "query": query,
            "answer": completion,
            "sources": sources,
            "knowledge_base": knowledge_base_name,
            "provider": provider,
            "model": model,
            "used_document_ids": list(used_doc_ids),
            "metrics": operation_metrics
        }
        
        # Cache response if enabled
        if use_cache:
            await self._cache_response(knowledge_base_name, query, response)
        
        return response 
```
Page 5/35FirstPrevNextLast