#
tokens: 39483/50000 4/207 files (page 13/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 13 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/examples/tournament_text_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Tournament Text Demo - Demonstrates running a text improvement tournament

This script shows how to:
1. Create a tournament with multiple models focused on text refinement
2. Track progress across multiple rounds
3. Retrieve and analyze the improved essay/text

The tournament task is to refine and improve a comparative essay on
transformer vs. diffusion model architectures, demonstrating how
the tournament system can be used for general text refinement tasks.

Usage:
  python examples/tournament_text_demo.py [--topic TOPIC]

Options:
  --topic TOPIC    Specify a different essay topic (default: transformers vs diffusion models)
"""

import argparse
import asyncio
import json
import os
import re
import sys
from collections import namedtuple
from pathlib import Path
from typing import Any, Dict, List, Optional

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table

from ultimate_mcp_server.core.models.requests import CompletionRequest
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.services.prompts import PromptTemplate
from ultimate_mcp_server.tools.tournament import (
    create_tournament,
    get_tournament_results,
    get_tournament_status,
)
from ultimate_mcp_server.utils import get_logger, process_mcp_result
from ultimate_mcp_server.utils.display import (
    CostTracker,
    display_tournament_results,
    display_tournament_status,
)
from ultimate_mcp_server.utils.logging.console import console

DEFAULT_MODEL_CONFIGS_TEXT: List[Dict[str, Any]] = [
    {
        "model_id": "openai/gpt-4o-mini",
        "diversity_count": 1,
        "temperature": 0.75,
    },
    {
        "model_id": "anthropic/claude-3-5-haiku-20241022",
        "diversity_count": 1,
        "temperature": 0.7,
    },
]
DEFAULT_NUM_ROUNDS_TEXT = 2
DEFAULT_TOURNAMENT_NAME_TEXT = "Advanced Text Refinement Tournament"

def parse_arguments_text():
    parser = argparse.ArgumentParser(description="Run a text refinement tournament demo")
    parser.add_argument(
        "--topic", type=str, default="transformer_vs_diffusion",
        choices=list(TOPICS.keys()) + ["custom"],
        help="Essay topic (default: transformer_vs_diffusion)"
    )
    parser.add_argument(
        "--custom-topic", type=str,
        help="Custom essay topic (used when --topic=custom)"
    )
    parser.add_argument(
        "--rounds", type=int, default=DEFAULT_NUM_ROUNDS_TEXT,
        help=f"Number of tournament rounds (default: {DEFAULT_NUM_ROUNDS_TEXT})"
    )
    parser.add_argument(
        "--models", type=str, nargs="+",
        default=[mc["model_id"] for mc in DEFAULT_MODEL_CONFIGS_TEXT],
        help="List of model IDs to participate."
    )
    return parser.parse_args()


# Initialize logger using get_logger
logger = get_logger("example.tournament_text")

# Create a simple structure for cost tracking from dict (tokens might be missing)
TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])

# Initialize global gateway
gateway: Optional[Gateway] = None

# --- Configuration ---
# Adjust model IDs based on your configured providers
MODEL_IDS = [
    "openai:gpt-4.1-mini",
    "deepseek:deepseek-chat",
    "gemini:gemini-2.5-pro-preview-03-25"
]
NUM_ROUNDS = 2  # Changed from 3 to 2 for faster execution and debugging
TOURNAMENT_NAME = "Text Refinement Tournament Demo"  # More generic name

# The generic essay prompt template
TEMPLATE_TEXT = """
# GENERIC TEXT TOURNAMENT PROMPT TEMPLATE

Please write a high-quality, comprehensive {{content_type}} on the topic of: "{{topic}}".

{{context}}

Your {{content_type}} should thoroughly explore the following sections and subtopics:
{% for section in sections %}
## {{section.title}}
{% for subtopic in section.subtopics %}
- {{subtopic}}
{% endfor %}
{% endfor %}

Adhere to the following style and content requirements:
{{style_requirements}}

Please provide only the {{content_type}} text. If you have meta-comments or a thinking process,
enclose it in <thinking>...</thinking> tags at the very beginning of your response.
"""

# Define predefined topics
TOPICS = {
    "transformer_vs_diffusion": {
        "content_type": "technical essay",
        "topic": "comparing transformer architecture and diffusion models",
        "context": "Focus on their underlying mechanisms, common applications, strengths, weaknesses, and future potential in AI.",
        "sections": [
            {"title": "Core Principles", "subtopics": ["Transformer self-attention, positional encoding", "Diffusion forward/reverse processes, noise schedules"]},
            {"title": "Applications & Performance", "subtopics": ["Typical tasks for transformers (NLP, vision)", "Typical tasks for diffusion models (image/audio generation)", "Comparative performance benchmarks or known strengths"]},
            {"title": "Limitations & Challenges", "subtopics": ["Computational costs, data requirements", "Interpretability, controllability, known failure modes for each"]},
            {"title": "Future Outlook", "subtopics": ["Potential for hybridization", "Scaling frontiers", "Impact on AGI research"]}
        ],
        "style_requirements": "Write in a clear, objective, and technically precise manner suitable for an audience with a machine learning background. Aim for around 800-1200 words."
    },
    "llm_vs_traditional_ai": {
        "content_type": "comparative analysis",
        "topic": "comparing large language models to traditional AI approaches",
        "context": "The rise of large language models has shifted the AI landscape significantly.",
        "sections": [
            {
                "title": "Fundamental Differences",
                "subtopics": [
                    "How LLMs differ architecturally from traditional ML/AI systems",
                    "Data requirements and training approaches"
                ]
            },
            {
                "title": "Capabilities and Limitations",
                "subtopics": [
                    "Tasks where LLMs excel compared to traditional approaches",
                    "Situations where traditional AI methods remain superior",
                    "Emergent capabilities unique to large language models"
                ]
            },
            {
                "title": "Real-world Applications",
                "subtopics": [
                    "Industries being transformed by LLMs",
                    "Where traditional AI approaches continue to dominate",
                    "Examples of hybrid systems combining both approaches"
                ]
            },
            {
                "title": "Future Outlook",
                "subtopics": [
                    "Projected evolution of both paradigms",
                    "Potential convergence or further divergence",
                    "Research frontiers for each approach"
                ]
            }
        ],
        "style_requirements": "Present a balanced analysis that acknowledges the strengths and weaknesses of both paradigms. Support claims with specific examples where possible."
    }
}

# Create custom topic template
def create_custom_topic_variables(topic_description):
    """Create a simple custom topic with standard sections"""
    return {
        "content_type": "essay",
        "topic": topic_description,
        "context": "",
        "sections": [
            {
                "title": "Background and Key Concepts",
                "subtopics": [
                    "Define and explain the core elements of the topic",
                    "Provide necessary historical or theoretical context"
                ]
            },
            {
                "title": "Analysis of Main Aspects",
                "subtopics": [
                    "Examine the primary dimensions or elements of the topic",
                    "Discuss relationships between different aspects",
                    "Identify patterns or trends relevant to the topic"
                ]
            },
            {
                "title": "Practical Implications",
                "subtopics": [
                    "Real-world applications or impacts",
                    "How this topic affects related fields or domains"
                ]
            },
            {
                "title": "Future Perspectives",
                "subtopics": [
                    "Emerging trends or developments",
                    "Potential challenges and opportunities",
                    "Areas requiring further research or exploration"
                ]
            }
        ],
        "style_requirements": "Present a comprehensive and well-structured analysis with clear reasoning and specific examples where appropriate."
    }

# Create the prompt template object
essay_template = PromptTemplate(
    template=TEMPLATE_TEXT,
    template_id="text_tournament_template",
    description="A template for text tournament prompts",
    required_vars=["content_type", "topic", "context", "sections", "style_requirements"]
)

# --- Helper Functions ---
def parse_result(result):
    """Parse the result from a tool call into a usable dictionary.
    
    Handles various return types from MCP tools.
    """
    try:
        # Handle TextContent object (which has a .text attribute)
        if hasattr(result, 'text'):
            try:
                # Try to parse the text as JSON
                return json.loads(result.text)
            except json.JSONDecodeError:
                # Return the raw text if not JSON
                return {"text": result.text}
                
        # Handle list result
        if isinstance(result, list):
            if result:
                first_item = result[0]
                if hasattr(first_item, 'text'):
                    try:
                        return json.loads(first_item.text)
                    except json.JSONDecodeError:
                        return {"text": first_item.text}
                else:
                    return first_item
            return {}
            
        # Handle dictionary directly
        if isinstance(result, dict):
            return result
            
        # Handle other potential types or return error
        else:
            return {"error": f"Unexpected result type: {type(result)}"}
        
    except Exception as e:
        return {"error": f"Error parsing result: {str(e)}"}


async def setup_gateway():
    """Set up the gateway for demonstration."""
    global gateway
    
    # Create gateway instance
    logger.info("Initializing gateway for demonstration", emoji_key="start")
    gateway = Gateway("text-tournament-demo", register_tools=False)
    
    # Initialize the server with all providers and built-in tools
    await gateway._initialize_providers()
    
    # Manually register tournament tools
    mcp = gateway.mcp
    mcp.tool()(create_tournament)
    mcp.tool()(get_tournament_status)
    mcp.tool()(get_tournament_results)
    logger.info("Manually registered tournament tools.")

    # Verify tools are registered
    tools = await gateway.mcp.list_tools()
    tournament_tools = [t.name for t in tools if t.name.startswith('tournament') or 'tournament' in t.name]
    logger.info(f"Registered tournament tools: {tournament_tools}", emoji_key="info")
    
    if not any('tournament' in t.lower() for t in [t.name for t in tools]):
        logger.warning("No tournament tools found. Make sure tournament plugins are registered.", emoji_key="warning")
    
    logger.success("Gateway initialized", emoji_key="success")


async def poll_tournament_status(tournament_id: str, storage_path: Optional[str] = None, interval: int = 5) -> Optional[str]:
    """Poll the tournament status until it reaches a final state.
    
    Args:
        tournament_id: ID of the tournament to poll
        storage_path: Optional storage path to avoid tournament not found issues
        interval: Time between status checks in seconds
    """
    logger.info(f"Polling status for tournament {tournament_id}...", emoji_key="poll")
    final_states = ["COMPLETED", "FAILED", "CANCELLED"]
    
    # Add direct file polling capability to handle case where tournament manager can't find the tournament
    if storage_path:
        storage_dir = Path(storage_path)
        state_file = storage_dir / "tournament_state.json"
        logger.debug(f"Will check tournament state file directly at: {state_file}")
    
    while True:
        status_input = {"tournament_id": tournament_id}
        status_result = await gateway.mcp.call_tool("get_tournament_status", status_input)
        status_data = await process_mcp_result(status_result)
        
        if "error" in status_data:
            # If tournament manager couldn't find the tournament but we have the storage path,
            # try to read the state file directly (this is a fallback mechanism)
            if storage_path and "not found" in status_data.get("error", "").lower():
                try:
                    logger.debug(f"Attempting to read tournament state directly from: {state_file}")
                    if state_file.exists():
                        with open(state_file, 'r', encoding='utf-8') as f:
                            direct_status_data = json.load(f)
                            status = direct_status_data.get("status")
                            current_round = direct_status_data.get("current_round", 0)
                            total_rounds = direct_status_data.get("config", {}).get("rounds", 0)
                            
                            # Create a status object compatible with our display function
                            status_data = {
                                "tournament_id": tournament_id,
                                "status": status,
                                "current_round": current_round,
                                "total_rounds": total_rounds,
                                "storage_path": storage_path
                            }
                            logger.debug(f"Successfully read direct state: {status}")
                    else:
                        logger.warning(f"State file not found at: {state_file}")
                except Exception as e:
                    logger.error(f"Error reading state file directly: {e}")
                    logger.error(f"Error fetching status: {status_data['error']}", emoji_key="error")
                    return None # Indicate error during polling
            else:
                # Standard error case
                logger.error(f"Error fetching status: {status_data['error']}", emoji_key="error")
                return None # Indicate error during polling
            
        # Display improved status using the imported function
        display_tournament_status(status_data)
        
        status = status_data.get("status")
        if status in final_states:
            logger.success(f"Tournament reached final state: {status}", emoji_key="success")
            return status
            
        await asyncio.sleep(interval)


def extract_thinking(text: str) -> str:
    """Extract <thinking> tags content (simple version)."""
    match = re.search(r"<thinking>(.*?)</thinking>", text, re.DOTALL)
    return match.group(1).strip() if match else ""


def analyze_text_quality(text: str) -> Dict[str, Any]:
    """Basic text quality analysis."""
    word_count = len(text.split())
    # Add more metrics later (readability, sentiment, etc.)
    return {"word_count": word_count}


async def evaluate_essays(essays_by_model: Dict[str, str], tracker: CostTracker = None) -> Dict[str, Any]:
    """Use LLM to evaluate which essay is the best.
    
    Args:
        essays_by_model: Dictionary mapping model IDs to their essay texts
        tracker: Optional CostTracker to track API call costs
        
    Returns:
        Dictionary with evaluation results
    """
    if not essays_by_model or len(essays_by_model) < 2:
        return {"error": "Not enough essays to compare"}
    
    eval_cost = 0.0 # Initialize evaluation cost

    try:
        # Format the essays for evaluation
        evaluation_prompt = "# Essay Evaluation\n\nPlease analyze the following essays on the same topic and determine which one is the best. "
        evaluation_prompt += "Consider factors such as technical accuracy, clarity, organization, depth of analysis, and overall quality.\n\n"
        
        # Add each essay
        for i, (model_id, essay) in enumerate(essays_by_model.items(), 1):
            display_model = model_id.split(':')[-1] if ':' in model_id else model_id
            # Limit each essay to 3000 chars to fit context windows
            truncated_essay = essay[:3000]
            if len(essay) > 3000:
                truncated_essay += "..."
            evaluation_prompt += f"## Essay {i} (by {display_model})\n\n{truncated_essay}\n\n"
        
        evaluation_prompt += "\n# Your Evaluation Task\n\n"
        evaluation_prompt += "1. Rank the essays from best to worst\n"
        evaluation_prompt += "2. Explain your reasoning for the ranking\n"
        evaluation_prompt += "3. Highlight specific strengths of the best essay\n"
        evaluation_prompt += "4. Suggest one improvement for each essay\n"
        
        # Use a more capable model for evaluation
        model_to_use = "gemini:gemini-2.5-pro-preview-03-25"
        
        logger.info(f"Evaluating essays using {model_to_use}...", emoji_key="evaluate")
        
        # Get the provider
        provider_id = model_to_use.split(':')[0]
        provider = await get_provider(provider_id)
        
        if not provider:
            return {
                "error": f"Provider {provider_id} not available for evaluation",
                "model_used": model_to_use,
                "eval_prompt": evaluation_prompt,
                "cost": 0.0
            }
        
        # Generate completion for evaluation with timeout
        try:
            request = CompletionRequest(prompt=evaluation_prompt, model=model_to_use)
            
            # Set a timeout for the completion request
            completion_task = provider.generate_completion(
                prompt=request.prompt,
                model=request.model
            )
            
            # 45 second timeout for evaluation
            completion_result = await asyncio.wait_for(completion_task, timeout=45)
            
            # Track API call if tracker provided
            if tracker:
                tracker.add_call(completion_result)
            
            # Accumulate cost
            if hasattr(completion_result, 'cost'):
                eval_cost = completion_result.cost
            elif hasattr(completion_result, 'metrics') and isinstance(completion_result.metrics, dict):
                eval_cost = completion_result.metrics.get('cost', 0.0)
            
            # Prepare result dict
            result = {
                "evaluation": completion_result.text,
                "model_used": model_to_use,
                "eval_prompt": evaluation_prompt,
                "cost": eval_cost # Return the cost
            }
        except asyncio.TimeoutError:
            logger.warning(f"Evaluation with {model_to_use} timed out after 45 seconds", emoji_key="warning")
            return {
                "error": "Evaluation timed out after 45 seconds",
                "model_used": model_to_use,
                "eval_prompt": evaluation_prompt,
                "cost": 0.0
            }
        except Exception as request_error:
            logger.error(f"Error during model request: {str(request_error)}", emoji_key="error")
            return {
                "error": f"Error during model request: {str(request_error)}",
                "model_used": model_to_use,
                "eval_prompt": evaluation_prompt,
                "cost": 0.0
            }
    
    except Exception as e:
        logger.error(f"Essay evaluation failed: {str(e)}", emoji_key="error", exc_info=True)
        return {
            "error": str(e),
            "model_used": model_to_use if 'model_to_use' in locals() else "unknown",
            "eval_prompt": evaluation_prompt if 'evaluation_prompt' in locals() else "Error generating prompt",
            "cost": 0.0
        }

    return result


async def calculate_tournament_costs(rounds_results, evaluation_cost=None):
    """Calculate total costs of the tournament by model and grand total.
    
    Args:
        rounds_results: List of round results data from tournament results
        evaluation_cost: Optional cost of the final evaluation step
        
    Returns:
        Dictionary with cost information
    """
    model_costs = {}
    total_cost = 0.0
    
    # Process costs for each round
    for _round_idx, round_data in enumerate(rounds_results):
        responses = round_data.get('responses', {})
        for model_id, response in responses.items():
            metrics = response.get('metrics', {})
            cost = metrics.get('cost', 0.0)
            
            # Convert to float if it's a string
            if isinstance(cost, str):
                try:
                    cost = float(cost.replace('$', ''))
                except (ValueError, TypeError):
                    cost = 0.0
            
            # Initialize model if not present
            if model_id not in model_costs:
                model_costs[model_id] = 0.0
                
            # Add to model total and grand total
            model_costs[model_id] += cost
            total_cost += cost
    
    # Add evaluation cost if provided
    if evaluation_cost:
        total_cost += evaluation_cost
        model_costs['evaluation'] = evaluation_cost
    
    return {
        'model_costs': model_costs,
        'total_cost': total_cost
    }


# --- Main Script Logic ---
async def run_tournament_demo(tracker: CostTracker):
    """Run the text tournament demo."""
    # Parse command line arguments
    args = parse_arguments_text()
    
    # Determine which topic to use
    if args.topic == "custom" and args.custom_topic:
        # Custom topic provided via command line
        topic_name = "custom"
        essay_variables = create_custom_topic_variables(args.custom_topic)
        topic_description = args.custom_topic
        log_topic_info = f"Using custom topic: [yellow]{escape(topic_description)}[/yellow]"
    elif args.topic in TOPICS:
        # Use one of the predefined topics
        topic_name = args.topic
        essay_variables = TOPICS[args.topic]
        topic_description = essay_variables["topic"]
        log_topic_info = f"Using predefined topic: [yellow]{escape(topic_description)}[/yellow]"
    else:
        # Default to transformer vs diffusion if topic not recognized
        topic_name = "transformer_vs_diffusion"
        essay_variables = TOPICS[topic_name]
        topic_description = essay_variables['topic']
        log_topic_info = f"Using default topic: [yellow]{escape(topic_description)}[/yellow]"
    
    # Use Rich Rule for title
    console.print(Rule(f"[bold blue]{TOURNAMENT_NAME} - {topic_name.replace('_', ' ').title()}[/bold blue]"))
    console.print(log_topic_info)
    console.print(f"Models: [cyan]{', '.join(MODEL_IDS)}[/cyan]")
    console.print(f"Rounds: [cyan]{NUM_ROUNDS}[/cyan]")
    
    # Render the template
    try:
        rendered_prompt = essay_template.render(essay_variables)
        logger.info(f"Template rendered for topic: {topic_name}", emoji_key="template")
        
        # Show prompt preview in a Panel
        prompt_preview = rendered_prompt.split("\n")[:10] # Show more lines
        preview_text = "\n".join(prompt_preview) + "\n..."
        console.print(Panel(escape(preview_text), title="[bold]Rendered Prompt Preview[/bold]", border_style="dim blue", expand=False))
        
    except Exception as e:
        logger.error(f"Template rendering failed: {str(e)}", emoji_key="error", exc_info=True)
        # Log template and variables for debugging using logger
        logger.debug(f"Template: {TEMPLATE_TEXT}")
        logger.debug(f"Variables: {escape(str(essay_variables))}") # Escape potentially complex vars
        return 1
    
    # 1. Create the tournament
    # Prepare model configurations
    # Default temperature from DEFAULT_MODEL_CONFIGS_TEXT, assuming it's a common parameter.
    # The create_tournament tool itself will parse these against InputModelConfig.
    model_configs = [{"model_id": mid, "diversity_count": 1, "temperature": 0.7 } for mid in MODEL_IDS]

    create_input = {
        "name": f"{TOURNAMENT_NAME} - {topic_name.replace('_', ' ').title()}",
        "prompt": rendered_prompt,
        "models": model_configs, # Changed from model_ids to models
        "rounds": NUM_ROUNDS,
        "tournament_type": "text"
    }
    
    try:
        logger.info("Creating tournament...", emoji_key="processing")
        create_result = await gateway.mcp.call_tool("create_tournament", create_input)
        create_data = await process_mcp_result(create_result)
        
        if "error" in create_data:
            error_msg = create_data.get("error", "Unknown error")
            logger.error(f"Failed to create tournament: {error_msg}. Exiting.", emoji_key="error")
            return 1
            
        tournament_id = create_data.get("tournament_id")
        if not tournament_id:
            logger.error("No tournament ID returned. Exiting.", emoji_key="error")
            return 1
            
        # Extract storage path for reference
        storage_path = create_data.get("storage_path")
        logger.info(f"Tournament created with ID: {tournament_id}", emoji_key="tournament")
        if storage_path:
            logger.info(f"Tournament storage path: {storage_path}", emoji_key="path")
            
        # Add a small delay to ensure the tournament state is saved before proceeding
        await asyncio.sleep(2)
        
        # 2. Poll for status
        final_status = await poll_tournament_status(tournament_id, storage_path)

        # 3. Fetch and display final results
        if final_status == "COMPLETED":
            logger.info("Fetching final results...", emoji_key="results")
            results_input = {"tournament_id": tournament_id}
            final_results = await gateway.mcp.call_tool("get_tournament_results", results_input)
            results_data = await process_mcp_result(final_results)

            if "error" not in results_data:
                # Use the imported display function for tournament results
                display_tournament_results(results_data)
                
                # Track aggregated tournament cost (excluding separate evaluation)
                if isinstance(results_data, dict) and "cost" in results_data:
                    try:
                        total_cost = results_data.get("cost", {}).get("total_cost", 0.0)
                        processing_time = results_data.get("total_processing_time", 0.0)
                        trackable = TrackableResult(
                            cost=total_cost,
                            input_tokens=0,
                            output_tokens=0,
                            provider="tournament",
                            model="text_tournament",
                            processing_time=processing_time
                        )
                        tracker.add_call(trackable)
                        logger.info(f"Tracked tournament cost: ${total_cost:.6f}", emoji_key="cost")
                    except Exception as track_err:
                        logger.warning(f"Could not track tournament cost: {track_err}", exc_info=False)

                # Analyze round progression if available
                rounds_results = results_data.get('rounds_results', [])
                if rounds_results:
                    console.print(Rule("[bold blue]Essay Evolution Analysis[/bold blue]"))

                    for round_idx, round_data in enumerate(rounds_results):
                        console.print(f"[bold]Round {round_idx} Analysis:[/bold]")
                        responses = round_data.get('responses', {})
                        
                        round_table = Table(box=box.MINIMAL, show_header=True, expand=False)
                        round_table.add_column("Model", style="magenta")
                        round_table.add_column("Word Count", style="green", justify="right")

                        has_responses = False
                        for model_id, response in responses.items():
                            display_model = escape(model_id.split(':')[-1])
                            response_text = response.get('response_text', '')
                            
                            if response_text:
                                has_responses = True
                                metrics = analyze_text_quality(response_text)
                                round_table.add_row(
                                    display_model, 
                                    str(metrics['word_count'])
                                )
                        
                        if has_responses:
                            console.print(round_table)
                        else:
                             console.print("[dim]No valid responses recorded for this round.[/dim]")
                        console.print() # Add space between rounds

                    # Evaluate final essays using LLM
                    final_round = rounds_results[-1]
                    final_responses = final_round.get('responses', {})
                    
                    # Track evaluation cost
                    evaluation_cost = 0.0
                    
                    if final_responses:
                        console.print(Rule("[bold blue]AI Evaluation of Essays[/bold blue]"))
                        console.print("[bold]Evaluating final essays...[/bold]")
                        
                        essays_by_model = {}
                        for model_id, response in final_responses.items():
                            essays_by_model[model_id] = response.get('response_text', '')
                        
                        evaluation_result = await evaluate_essays(essays_by_model, tracker)
                        
                        if "error" not in evaluation_result:
                            console.print(Panel(
                                escape(evaluation_result["evaluation"]),
                                title=f"[bold]Essay Evaluation (by {evaluation_result['model_used'].split(':')[-1]})[/bold]",
                                border_style="green",
                                expand=False
                            ))
                            
                            # Track evaluation cost separately
                            if evaluation_cost > 0:
                                try:
                                    trackable_eval = TrackableResult(
                                        cost=evaluation_cost,
                                        input_tokens=0, # Tokens for eval not easily available here
                                        output_tokens=0,
                                        provider=evaluation_result['model_used'].split(':')[0],
                                        model=evaluation_result['model_used'].split(':')[-1],
                                        processing_time=0 # Eval time not tracked here
                                    )
                                    tracker.add_call(trackable_eval)
                                except Exception as track_err:
                                    logger.warning(f"Could not track evaluation cost: {track_err}", exc_info=False)

                            # Save evaluation result to a file in the tournament directory
                            if storage_path:
                                try:
                                    evaluation_file = os.path.join(storage_path, "essay_evaluation.md")
                                    with open(evaluation_file, "w", encoding="utf-8") as f:
                                        f.write(f"# Essay Evaluation by {evaluation_result['model_used']}\n\n")
                                        f.write(evaluation_result["evaluation"])
                                    
                                    logger.info(f"Evaluation saved to {evaluation_file}", emoji_key="save")
                                except Exception as e:
                                    logger.warning(f"Could not save evaluation to file: {str(e)}", emoji_key="warning")
                            
                            # Track evaluation cost if available
                            evaluation_cost = evaluation_result.get('cost', 0.0)
                            logger.info(f"Evaluation cost: ${evaluation_cost:.6f}", emoji_key="cost")
                        else:
                            console.print(f"[yellow]Could not evaluate essays: {evaluation_result.get('error')}[/yellow]")
                            # Try with fallback model if Gemini fails
                            if "gemini" in evaluation_result.get("model_used", ""):
                                console.print("[bold]Trying evaluation with fallback model (gpt-4.1-mini)...[/bold]")
                                # Switch to OpenAI model as backup
                                essays_by_model_limited = {}
                                # Limit content size to avoid token limits
                                for model_id, essay in essays_by_model.items():
                                    essays_by_model_limited[model_id] = essay[:5000]  # Shorter excerpt to fit in context
                                
                                fallback_evaluation = {
                                    "model_used": "openai:gpt-4.1-mini",
                                    "eval_prompt": evaluation_result.get("eval_prompt", "Evaluation failed")
                                }
                                
                                try:
                                    provider_id = "openai"
                                    provider = await get_provider(provider_id)
                                    
                                    if provider:
                                        # Create a shorter, simplified prompt
                                        simple_prompt = "Compare these essays and rank them from best to worst:\n\n"
                                        for i, (model_id, essay) in enumerate(essays_by_model_limited.items(), 1):
                                            display_model = model_id.split(':')[-1] if ':' in model_id else model_id
                                            simple_prompt += f"Essay {i} ({display_model}):\n{essay[:2000]}...\n\n"
                                        
                                        request = CompletionRequest(prompt=simple_prompt, model="openai:gpt-4.1-mini")
                                        completion_result = await provider.generate_completion(
                                            prompt=request.prompt,
                                            model=request.model
                                        )
                                        
                                        fallback_evaluation["evaluation"] = completion_result.text
                                        
                                        # Track fallback evaluation cost
                                        if completion_result.cost > 0:
                                            try:
                                                trackable_fallback = TrackableResult(
                                                    cost=completion_result.cost,
                                                    input_tokens=0,
                                                    output_tokens=0,
                                                    provider="openai",
                                                    model="gpt-4.1-mini",
                                                    processing_time=0 # Eval time not tracked
                                                )
                                                tracker.add_call(trackable_fallback)
                                            except Exception as track_err:
                                                logger.warning(f"Could not track fallback evaluation cost: {track_err}", exc_info=False)

                                        logger.info(f"Fallback evaluation cost: ${completion_result.cost:.6f}", emoji_key="cost")
                                        
                                        console.print(Panel(
                                            escape(fallback_evaluation["evaluation"]),
                                            title="[bold]Fallback Evaluation (by gpt-4.1-mini)[/bold]",
                                            border_style="yellow",
                                            expand=False
                                        ))
                                        
                                        # Save fallback evaluation to file
                                        if storage_path:
                                            try:
                                                fallback_eval_file = os.path.join(storage_path, "fallback_evaluation.md")
                                                with open(fallback_eval_file, "w", encoding="utf-8") as f:
                                                    f.write("# Fallback Essay Evaluation by gpt-4.1-mini\n\n")
                                                    f.write(fallback_evaluation["evaluation"])
                                                
                                                logger.info(f"Fallback evaluation saved to {fallback_eval_file}", emoji_key="save")
                                            except Exception as e:
                                                logger.warning(f"Could not save fallback evaluation: {str(e)}", emoji_key="warning")
                                    else:
                                        console.print("[red]Fallback model unavailable[/red]")
                                except Exception as fallback_error:
                                    console.print(f"[red]Fallback evaluation failed: {str(fallback_error)}[/red]")

                    # Find and highlight comparison file for final round
                    comparison_file = final_round.get('comparison_file_path')
                    if comparison_file:
                        console.print(Panel(
                            f"Check the final comparison file for the full essay text and detailed round comparisons:\n[bold yellow]{escape(comparison_file)}[/bold yellow]",
                            title="[bold]Final Comparison File[/bold]",
                            border_style="yellow",
                            expand=False
                        ))
                    else:
                        logger.warning("Could not find path to final comparison file in results", emoji_key="warning")
                    
                    # Display cost summary
                    costs = await calculate_tournament_costs(rounds_results, evaluation_cost)
                    model_costs = costs.get('model_costs', {})
                    total_cost = costs.get('total_cost', 0.0)
                    
                    console.print(Rule("[bold blue]Tournament Cost Summary[/bold blue]"))
                    
                    cost_table = Table(box=box.MINIMAL, show_header=True, expand=False)
                    cost_table.add_column("Model", style="magenta")
                    cost_table.add_column("Total Cost", style="green", justify="right")
                    
                    # Add model costs to table
                    for model_id, cost in sorted(model_costs.items()):
                        if model_id == 'evaluation':
                            display_model = "Evaluation"
                        else:
                            display_model = model_id.split(':')[-1] if ':' in model_id else model_id
                        
                        cost_table.add_row(
                            display_model,
                            f"${cost:.6f}"
                        )
                    
                    # Add grand total
                    cost_table.add_row(
                        "[bold]GRAND TOTAL[/bold]",
                        f"[bold]${total_cost:.6f}[/bold]"
                    )
                    
                    console.print(cost_table)
                    
                    # Save cost summary to file
                    if storage_path:
                        try:
                            cost_file = os.path.join(storage_path, "cost_summary.md")
                            with open(cost_file, "w", encoding="utf-8") as f:
                                f.write("# Tournament Cost Summary\n\n")
                                f.write("## Per-Model Costs\n\n")
                                
                                for model_id, cost in sorted(model_costs.items()):
                                    if model_id == 'evaluation':
                                        display_model = "Evaluation"
                                    else:
                                        display_model = model_id.split(':')[-1] if ':' in model_id else model_id
                                    
                                    f.write(f"- **{display_model}**: ${cost:.6f}\n")
                                
                                f.write("\n## Grand Total\n\n")
                                f.write(f"**TOTAL COST**: ${total_cost:.6f}\n")
                            
                            logger.info(f"Cost summary saved to {cost_file}", emoji_key="save")
                        except Exception as e:
                            logger.warning(f"Could not save cost summary: {str(e)}", emoji_key="warning")
            else:
                logger.error(f"Could not fetch final results: {results_data.get('error', 'Unknown error')}", emoji_key="error")
        elif final_status:
            logger.warning(f"Tournament ended with status {final_status}. Check logs or status details for more info.", emoji_key="warning")
        
    except Exception as e:
        logger.error(f"Error in tournament demo: {str(e)}", emoji_key="error", exc_info=True)
        return 1

    # Display cost summary at the end
    tracker.display_summary(console)

    logger.success("Text Tournament Demo Finished", emoji_key="complete")
    console.print(Panel(
        "To view full essays and detailed comparisons, check the storage directory indicated in the results summary.",
        title="[bold]Next Steps[/bold]",
        border_style="dim green",
        expand=False
    ))
    return 0


async def main():
    """Run the tournament demo."""
    tracker = CostTracker() # Instantiate tracker
    try:
        # Set up gateway
        await setup_gateway()
        
        # Run the demo
        return await run_tournament_demo(tracker) # Pass tracker
    except Exception as e:
        logger.critical(f"Demo failed: {str(e)}", emoji_key="critical", exc_info=True)
        return 1
    finally:
        # Clean up
        if gateway:
            pass  # No cleanup needed for Gateway instance


if __name__ == "__main__":
    # Run the demo
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/meta_api_tool.py:
--------------------------------------------------------------------------------

```python
"""API Meta-Tool for dynamically exposing FastAPI endpoints via MCP.

This module provides a tool for automatically discovering and integrating
FastAPI-compatible REST APIs into the MCP server by pointing it at the
FastAPI server's OpenAPI specification (e.g., /openapi.json).

Usage Examples:

1. Register an API:
   ```python
   result = await client.tools.register_api(
       api_name="petstore",
       openapi_url="https://petstore.swagger.io/v2/swagger.json"
   )
   print(f"Registered {result['tools_count']} tools for the Petstore API")
   ```

2. List all registered APIs:
   ```python
   apis = await client.tools.list_registered_apis()
   for api_name, api_info in apis["apis"].items():
       print(f"{api_name}: {api_info['tools_count']} tools")
   ```

3. Call a dynamically registered tool:
   ```python
   # Get a pet by ID
   pet = await client.tools.call_dynamic_tool(
       tool_name="petstore_getPetById",
       inputs={"petId": 123}
   )
   print(f"Pet name: {pet['name']}")

   # Add a new pet
   new_pet = await client.tools.call_dynamic_tool(
       tool_name="petstore_addPet",
       inputs={
           "body": {
               "id": 0,
               "name": "Fluffy",
               "status": "available"
           }
       }
   )
   print(f"Added pet with ID: {new_pet['id']}")
   ```

4. Unregister an API:
   ```python
   result = await client.tools.unregister_api(api_name="petstore")
   print(f"Unregistered {result['tools_count']} tools")
   ```
"""

import asyncio
import json
import re
import time
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

import httpx

from ultimate_mcp_server.exceptions import ToolError, ToolInputError
from ultimate_mcp_server.services.cache import with_cache
from ultimate_mcp_server.tools.base import (
    with_error_handling,
    with_state_management,
    with_tool_metrics,
)
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.meta_api")


async def fetch_openapi_spec(
    url: str, timeout: float = 30.0, headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
    """Fetches the OpenAPI spec from the given URL.

    Args:
        url: URL of the OpenAPI spec (typically ending in /openapi.json)
        timeout: Timeout for the HTTP request in seconds
        headers: Optional headers to include in the request (e.g., for authentication)

    Returns:
        Parsed OpenAPI spec as a dictionary

    Raises:
        ToolError: If the fetch or parsing fails
    """
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(url, timeout=timeout, headers=headers)
            response.raise_for_status()
            return response.json()
    except httpx.HTTPStatusError as e:
        raise ToolError(
            f"Failed to fetch OpenAPI spec: HTTP {e.response.status_code}",
            details={"url": url, "status_code": e.response.status_code},
        ) from e
    except httpx.RequestError as e:
        raise ToolError(
            f"Failed to fetch OpenAPI spec: {str(e)}", details={"url": url, "error": str(e)}
        ) from e
    except json.JSONDecodeError as e:
        raise ToolError(
            f"Failed to parse OpenAPI spec as JSON: {str(e)}", details={"url": url, "error": str(e)}
        ) from e


def extract_endpoint_info(openapi_spec: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Extracts endpoint information from an OpenAPI spec.

    Args:
        openapi_spec: Parsed OpenAPI spec as a dictionary

    Returns:
        List of dictionaries containing endpoint information, each with keys:
        - path: The endpoint path
        - method: The HTTP method (GET, POST, etc.)
        - operation_id: The operationId from the spec (used as tool name)
        - parameters: List of parameter objects
        - request_body: Request body schema (if applicable)
        - responses: Response schemas
        - summary: Endpoint summary
        - description: Endpoint description
    """
    endpoints = []

    paths = openapi_spec.get("paths", {})
    for path, path_item in paths.items():
        for method, operation in path_item.items():
            if method.lower() not in ["get", "post", "put", "delete", "patch"]:
                continue  # Skip non-HTTP methods like "parameters"

            # Extract operation ID (fall back to generating one if not provided)
            operation_id = operation.get("operationId")
            if not operation_id:
                # Generate operation ID from path and method
                path_parts = [p for p in path.split("/") if p and not p.startswith("{")]
                if path_parts:
                    operation_id = f"{method.lower()}_{path_parts[-1]}"
                else:
                    operation_id = f"{method.lower()}_root"

                # Ensure operation_id is a valid Python identifier
                operation_id = re.sub(r"[^a-zA-Z0-9_]", "_", operation_id)
                if operation_id[0].isdigit():
                    operation_id = f"op_{operation_id}"

            # Extract parameters
            parameters = []
            # Include parameters from the path item
            if "parameters" in path_item:
                parameters.extend(path_item["parameters"])
            # Include parameters from the operation (overriding path item parameters if same name)
            if "parameters" in operation:
                # Remove any path item parameters with the same name
                path_param_names = {
                    p["name"] for p in path_item.get("parameters", []) if "name" in p
                }
                op_params = []
                for p in operation["parameters"]:
                    if p.get("name") in path_param_names:
                        # This parameter overrides a path item parameter
                        parameters = [
                            param for param in parameters if param.get("name") != p.get("name")
                        ]
                    op_params.append(p)
                parameters.extend(op_params)

            # Extract request body schema
            request_body = None
            if "requestBody" in operation:
                request_body = operation["requestBody"]

            # Extract response schemas
            responses = operation.get("responses", {})

            endpoints.append(
                {
                    "path": path,
                    "method": method.lower(),
                    "operation_id": operation_id,
                    "parameters": parameters,
                    "request_body": request_body,
                    "responses": responses,
                    "summary": operation.get("summary", ""),
                    "description": operation.get("description", ""),
                    "tags": operation.get("tags", []),
                }
            )

    return endpoints


def generate_tool_function_code(
    endpoint_info: Dict[str, Any],
    base_url: str,
    api_name: str,
    cache_ttl: Optional[int] = None,
    auth_header: Optional[str] = None,
) -> str:
    """Generates Python code for a tool function based on endpoint info.

    Args:
        endpoint_info: Dictionary containing endpoint information
        base_url: Base URL of the API
        api_name: Name of the API (used for function documentation)
        cache_ttl: Optional TTL for caching tool results in seconds
        auth_header: Optional authentication header name to include in requests

    Returns:
        String containing Python code for the tool function
    """
    operation_id = endpoint_info["operation_id"]
    path = endpoint_info["path"]
    method = endpoint_info["method"]
    summary = endpoint_info["summary"]
    description = endpoint_info["description"]
    tags = ", ".join(endpoint_info.get("tags", []))

    # Generate a clean function name (no API prefix, will be added during registration)
    function_name = operation_id

    # Generate docstring
    docstring = (
        f'"""{summary}\n\n'
        if summary
        else f'"""Calls the {method.upper()} {path} endpoint of the {api_name} API.\n\n'
    )
    if description:
        docstring += f"{description}\n\n"
    if tags:
        docstring += f"Tags: {tags}\n\n"

    docstring += "Args:\n"

    # Generate function parameters
    params = []
    path_params = []
    query_params = []
    header_params = []
    body_param = None

    for param in endpoint_info.get("parameters", []):
        param_name = param["name"]
        # Clean the parameter name to be a valid Python identifier
        clean_param_name = re.sub(r"[^a-zA-Z0-9_]", "_", param_name)
        if clean_param_name[0].isdigit():
            clean_param_name = f"p_{clean_param_name}"

        param_type = param.get("schema", {}).get("type", "string")
        required = param.get("required", False)
        param_in = param.get("in", "query")
        param_description = param.get("description", "")

        python_type = "str"
        if param_type == "integer":
            python_type = "int"
        elif param_type == "number":
            python_type = "float"
        elif param_type == "boolean":
            python_type = "bool"
        elif param_type == "array":
            python_type = "List[Any]"
        elif param_type == "object":
            python_type = "Dict[str, Any]"

        if required:
            params.append(f"{clean_param_name}: {python_type}")
            docstring += f"    {clean_param_name}: {param_description} (in: {param_in})\n"
        else:
            params.append(f"{clean_param_name}: Optional[{python_type}] = None")
            docstring += (
                f"    {clean_param_name}: (Optional) {param_description} (in: {param_in})\n"
            )

        # Store parameter location for request building
        if param_in == "path":
            path_params.append((param_name, clean_param_name))
        elif param_in == "query":
            query_params.append((param_name, clean_param_name))
        elif (
            param_in == "header" and param_name.lower() != auth_header.lower()
            if auth_header
            else True
        ):
            header_params.append((param_name, clean_param_name))

    # Handle request body
    if endpoint_info.get("request_body"):
        content = endpoint_info["request_body"].get("content", {})
        if "application/json" in content:
            body_param = "body"
            schema_desc = "Request body"
            # Try to get schema description from the content schema
            schema = content.get("application/json", {}).get("schema", {})
            if "description" in schema:
                schema_desc = schema["description"]
            params.append("body: Dict[str, Any]")
            docstring += f"    body: {schema_desc}\n"

    # Add timeout and auth_token params if needed
    params.append("timeout: float = 30.0")
    docstring += "    timeout: Timeout for the HTTP request in seconds\n"

    if auth_header:
        params.append("auth_token: Optional[str] = None")
        docstring += f"    auth_token: Optional authentication token to include in the '{auth_header}' header\n"

    docstring += '\n    Returns:\n        API response data as a dictionary\n    """'

    # Generate function body
    function_body = []
    function_body.append("    async with httpx.AsyncClient() as client:")

    # Format URL with path params
    if path_params:
        # For path params, replace {param} with {clean_param_name}
        url_format = path
        for param_name, clean_name in path_params:
            url_format = url_format.replace(f"{{{param_name}}}", f"{{{clean_name}}}")
        function_body.append(f'        url = f"{base_url}{url_format}"')
    else:
        function_body.append(f'        url = "{base_url}{path}"')

    # Prepare query params
    if query_params:
        function_body.append("        params = {}")
        for param_name, clean_name in query_params:
            function_body.append(f"        if {clean_name} is not None:")
            function_body.append(f'            params["{param_name}"] = {clean_name}')
    else:
        function_body.append("        params = None")

    # Prepare headers
    function_body.append("        headers = {}")
    if auth_header:
        function_body.append("        if auth_token is not None:")
        function_body.append(f'            headers["{auth_header}"] = auth_token')

    if header_params:
        for param_name, clean_name in header_params:
            function_body.append(f"        if {clean_name} is not None:")
            function_body.append(f'            headers["{param_name}"] = {clean_name}')

    # Prepare request
    request_args = ["url"]
    if query_params:
        request_args.append("params=params")
    if header_params or auth_header:
        request_args.append("headers=headers")
    if body_param:
        request_args.append(f"json={body_param}")
    request_args.append("timeout=timeout")

    function_body.append("        try:")
    function_body.append("            response = await client.{method}({', '.join(request_args)})")
    function_body.append("            response.raise_for_status()")
    function_body.append(
        "            if response.headers.get('content-type', '').startswith('application/json'):"
    )
    function_body.append("                return response.json()")
    function_body.append("            else:")
    function_body.append("                return {{'text': response.text}}")
    function_body.append("        except httpx.HTTPStatusError as e:")
    function_body.append("            error_detail = e.response.text")
    function_body.append("            try:")
    function_body.append("                error_json = e.response.json()")
    function_body.append("                if isinstance(error_json, dict):")
    function_body.append("                    error_detail = error_json")
    function_body.append("            except Exception:")
    function_body.append("                pass  # Couldn't parse JSON error")
    function_body.append("            raise ToolError(")
    function_body.append('                f"API request failed: HTTP {{e.response.status_code}}",')
    function_body.append(
        '                details={{"status_code": e.response.status_code, "response": error_detail}}'
    )
    function_body.append("            )")
    function_body.append("        except httpx.RequestError as e:")
    function_body.append("            raise ToolError(")
    function_body.append('                f"API request failed: {{str(e)}}",')
    function_body.append('                details={{"error": str(e)}}')
    function_body.append("            )")

    # Generate the full function
    param_str = ", ".join(params)
    if param_str:
        param_str = f", {param_str}"

    # Add decorators based on configuration
    decorators = ["@with_tool_metrics", "@with_error_handling"]

    if cache_ttl is not None:
        decorators.insert(0, f"@with_cache(ttl={cache_ttl})")

    function_code = [
        *decorators,
        f"async def {function_name}(self{param_str}):",
        f"{docstring}",
        *function_body,
    ]

    return "\n".join(function_code)


# After the generate_tool_function_code function and before register_api_meta_tools
@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def register_api(
    api_name: str,
    openapi_url: str,
    base_url: Optional[str] = None,
    cache_ttl: Optional[int] = None,
    auth_header: Optional[str] = None,
    auth_token: Optional[str] = None,
    tool_name_prefix: Optional[str] = None,
    timeout: float = 30.0,
    ctx: Optional[Dict[str, Any]] = None,
    get_state=None,
    set_state=None,
    delete_state=None
) -> Dict[str, Any]:
    """Registers an API with the MCP server by fetching its OpenAPI spec.

    Dynamically generates MCP tools for each endpoint in the API and registers
    them with the MCP server. The tools are prefixed with the API name by default,
    resulting in tool names like "api_name_operation_id".

    Args:
        api_name: A unique name for the API (used as a prefix for tool names)
        openapi_url: URL of the OpenAPI spec (typically ending in /openapi.json)
        base_url: Base URL of the API (if different from the OpenAPI URL)
        cache_ttl: Optional TTL for caching tool results in seconds
        auth_header: Optional name of the header to use for authentication
        auth_token: Optional token to use when fetching the OpenAPI spec
        tool_name_prefix: Optional prefix for tool names (default: api_name)
        timeout: Timeout for the HTTP request in seconds
        ctx: MCP context
        get_state: Function to get state from store (injected by decorator)
        set_state: Function to set state in store (injected by decorator)
        delete_state: Function to delete state from store (injected by decorator)

    Returns:
        A dictionary containing the registration results:
        {
            "success": true,
            "api_name": "example_api",
            "base_url": "https://api.example.com",
            "tools_registered": ["example_api_get_users", "example_api_create_user", ...],
            "tools_count": 5,
            "processing_time": 1.23
        }
    """
    # Validate inputs
    if not api_name:
        raise ToolInputError("api_name cannot be empty")

    # Check if API name has invalid characters
    if not re.match(r"^[a-zA-Z0-9_]+$", api_name):
        raise ToolInputError(
            "api_name must contain only alphanumeric characters and underscores"
        )

    if not openapi_url:
        raise ToolInputError("openapi_url cannot be empty")

    # Get registered APIs from state store
    registered_apis = await get_state("registered_apis", {})
    generated_tools = await get_state("generated_tools", {})

    # Check if API is already registered
    if api_name in registered_apis:
        raise ToolInputError(
            f"API {api_name} is already registered. Use a different name or unregister it first."
        )

    # Set tool name prefix
    tool_name_prefix = tool_name_prefix or api_name

    # Determine base URL if not provided
    if not base_url:
        # Extract base URL from OpenAPI URL
        try:
            parsed_url = urlparse(openapi_url)
            base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
            logger.info(f"Using base_url: {base_url} (derived from openapi_url)")
        except Exception as e:
            raise ToolInputError(f"Could not determine base_url from openapi_url: {str(e)}") from e

    # Prepare headers for fetching OpenAPI spec
    headers = None
    if auth_token and auth_header:
        headers = {auth_header: auth_token}

    # Fetch OpenAPI spec
    logger.info(f"Fetching OpenAPI spec from {openapi_url}")
    start_time = time.time()
    openapi_spec = await fetch_openapi_spec(openapi_url, timeout, headers)

    # Extract endpoint information
    endpoints = extract_endpoint_info(openapi_spec)
    logger.info(f"Extracted {len(endpoints)} endpoints from OpenAPI spec")

    # Get MCP server from context
    mcp = ctx.get('mcp')
    if not mcp:
        raise ToolError("MCP server context not available")

    # Generate and register tools for each endpoint
    registered_tools = []
    generated_code = {}

    for endpoint in endpoints:
        operation_id = endpoint["operation_id"]
        tool_name = f"{tool_name_prefix}_{operation_id}"

        # Skip if this tool is already registered
        if tool_name in generated_tools:
            logger.warning(f"Tool {tool_name} already registered, skipping")
            continue

        # Generate tool function code
        tool_code = generate_tool_function_code(
            endpoint, base_url, api_name, cache_ttl, auth_header
        )

        # Store the generated code for debugging
        generated_code[tool_name] = tool_code

        # Create and register the tool function
        try:
            # Create a namespace for the exec
            namespace = {}
            # Add required imports to the namespace
            namespace.update(
                {
                    "httpx": httpx,
                    "ToolError": ToolError,
                    "Dict": Dict,
                    "Any": Any,
                    "Optional": Optional,
                    "List": List,
                    "with_tool_metrics": with_tool_metrics,
                    "with_error_handling": with_error_handling,
                    "with_cache": with_cache,
                }
            )

            # Execute the generated code
            exec(tool_code, namespace)

            # Get the generated function from the namespace
            generated_func = namespace[operation_id]

            # Register with MCP server
            mcp.tool(name=tool_name)(generated_func)

            # Store the generated tool in state
            generated_tools[tool_name] = tool_code
            registered_tools.append(tool_name)

            logger.info(
                f"Registered tool {tool_name} for endpoint {endpoint['method'].upper()} {endpoint['path']}"
            )
        except Exception as e:
            logger.error(f"Failed to register tool {tool_name}: {str(e)}", exc_info=True)
            if "tool_code" in locals():
                logger.error(f"Generated code that failed:\n{tool_code}")

    # Store API information in state store
    registered_apis[api_name] = {
        "base_url": base_url,
        "openapi_url": openapi_url,
        "spec": openapi_spec,
        "tools": registered_tools,
        "tool_name_prefix": tool_name_prefix,
        "generated_code": generated_code,
        "auth_header": auth_header,
    }

    # Update state store
    await set_state("registered_apis", registered_apis)
    await set_state("generated_tools", generated_tools)

    processing_time = time.time() - start_time
    logger.success(
        f"API {api_name} registered with {len(registered_tools)} tools in {processing_time:.2f}s"
    )

    return {
        "success": True,
        "api_name": api_name,
        "base_url": base_url,
        "tools_registered": registered_tools,
        "tools_count": len(registered_tools),
        "processing_time": processing_time,
    }

@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def list_registered_apis(
    ctx: Optional[Dict[str, Any]] = None,
    get_state=None,
    set_state=None,
    delete_state=None
) -> Dict[str, Any]:
    """Lists all registered APIs and their endpoints.

    Args:
        ctx: MCP context
        get_state: Function to get state from store (injected by decorator)
        set_state: Function to set state in store (injected by decorator)
        delete_state: Function to delete state from store (injected by decorator)

    Returns:
        A dictionary containing the registered APIs:
        {
            "success": true,
            "apis": {
                "example_api": {
                    "base_url": "https://api.example.com",
                    "openapi_url": "https://api.example.com/openapi.json",
                    "tools_count": 5,
                    "tools": ["example_api_get_users", "example_api_create_user", ...]
                },
                ...
            },
            "total_apis": 2,
            "total_tools": 12
        }
    """
    # Get state data
    registered_apis = await get_state("registered_apis", {})
    generated_tools = await get_state("generated_tools", {})

    result = {
        "success": True,
        "apis": {},
        "total_apis": len(registered_apis),
        "total_tools": len(generated_tools),
    }

    for api_name, api_info in registered_apis.items():
        result["apis"][api_name] = {
            "base_url": api_info["base_url"],
            "openapi_url": api_info["openapi_url"],
            "tools_count": len(api_info["tools"]),
            "tools": api_info["tools"],
            "tool_name_prefix": api_info["tool_name_prefix"],
        }

    return result

@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def get_api_details(
    api_name: str,
    ctx: Optional[Dict[str, Any]] = None,
    get_state=None,
    set_state=None,
    delete_state=None
) -> Dict[str, Any]:
    """Gets detailed information about a registered API.

    Args:
        api_name: The name of the API to get details for
        ctx: MCP context
        get_state: Function to get state from store (injected by decorator)
        set_state: Function to set state in store (injected by decorator)
        delete_state: Function to delete state from store (injected by decorator)

    Returns:
        A dictionary containing the API details:
        {
            "success": true,
            "api_name": "example_api",
            "base_url": "https://api.example.com",
            "openapi_url": "https://api.example.com/openapi.json",
            "tools": [
                {
                    "name": "example_api_get_users",
                    "method": "get",
                    "path": "/users",
                    "summary": "Get all users",
                    "description": "Returns a list of all users in the system",
                    "parameters": [...]
                },
                ...
            ],
            "endpoints_count": 5
        }
    """
    # Get registered APIs from state
    registered_apis = await get_state("registered_apis", {})

    if api_name not in registered_apis:
        raise ToolInputError(f"API {api_name} not found")

    api_info = registered_apis[api_name]

    # Extract endpoint details from the OpenAPI spec
    endpoints = []
    spec = api_info["spec"]

    for endpoint_info in extract_endpoint_info(spec):
        tool_name = f"{api_info['tool_name_prefix']}_{endpoint_info['operation_id']}"
        endpoints.append(
            {
                "name": tool_name,
                "method": endpoint_info["method"],
                "path": endpoint_info["path"],
                "summary": endpoint_info["summary"],
                "description": endpoint_info["description"],
                "parameters": endpoint_info["parameters"],
                "tags": endpoint_info.get("tags", []),
            }
        )

    return {
        "success": True,
        "api_name": api_name,
        "base_url": api_info["base_url"],
        "openapi_url": api_info["openapi_url"],
        "tools": endpoints,
        "endpoints_count": len(endpoints),
    }

@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def unregister_api(
    api_name: str,
    ctx: Optional[Dict[str, Any]] = None,
    get_state=None,
    set_state=None,
    delete_state=None
) -> Dict[str, Any]:
    """Unregisters an API and all its tools from the MCP server.

    Args:
        api_name: The name of the API to unregister
        ctx: MCP context
        get_state: Function to get state from store (injected by decorator)
        set_state: Function to set state in store (injected by decorator)
        delete_state: Function to delete state from store (injected by decorator)

    Returns:
        A dictionary indicating the result:
        {
            "success": true,
            "api_name": "example_api",
            "tools_unregistered": ["example_api_get_users", "example_api_create_user", ...],
            "tools_count": 5
        }
    """
    # Get state data
    registered_apis = await get_state("registered_apis", {})
    generated_tools = await get_state("generated_tools", {})

    if api_name not in registered_apis:
        raise ToolInputError(f"API {api_name} not found")

    api_info = registered_apis[api_name]
    tools = api_info["tools"]

    # Get MCP server from context
    mcp = ctx.get('mcp')
    if not mcp:
        raise ToolError("MCP server context not available")

    # Unregister tools from MCP server
    for tool_name in tools:
        try:
            # Check if the MCP server has a method for unregistering tools
            if hasattr(mcp, "unregister_tool"):
                mcp.unregister_tool(tool_name)
            # If not, try to remove from the tools dictionary
            elif hasattr(mcp, "tools"):
                if tool_name in mcp.tools:
                    del mcp.tools[tool_name]

            # Remove from our generated tools dictionary
            if tool_name in generated_tools:
                del generated_tools[tool_name]

            logger.info(f"Unregistered tool {tool_name}")
        except Exception as e:
            logger.error(f"Failed to unregister tool {tool_name}: {str(e)}", exc_info=True)

    # Remove API from registered APIs
    del registered_apis[api_name]

    # Update state
    await set_state("registered_apis", registered_apis)
    await set_state("generated_tools", generated_tools)

    logger.success(f"API {api_name} unregistered with {len(tools)} tools")

    return {
        "success": True,
        "api_name": api_name,
        "tools_unregistered": tools,
        "tools_count": len(tools),
    }

@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def call_dynamic_tool(
    tool_name: str,
    inputs: Optional[Dict[str, Any]] = None,
    ctx: Optional[Dict[str, Any]] = None,
    get_state=None,
    set_state=None,
    delete_state=None
) -> Dict[str, Any]:
    """Calls a dynamically registered tool by name.

    This is a convenience function for calling tools registered via register_api,
    allowing direct invocation of API endpoints.

    Args:
        tool_name: Name of the tool to call
        inputs: Inputs to pass to the tool (parameters for the API endpoint)
        ctx: MCP context
        get_state: Function to get state from store (injected by decorator)
        set_state: Function to set state in store (injected by decorator)
        delete_state: Function to delete state from store (injected by decorator)

    Returns:
        The result of the tool call
    """
    # Get MCP server from context
    mcp = ctx.get('mcp')
    if not mcp:
        raise ToolError("MCP server context not available")

    # Get registered APIs and generated tools from state
    registered_apis = await get_state("registered_apis", {})
    generated_tools = await get_state("generated_tools", {})

    if not tool_name:
        raise ToolInputError("tool_name cannot be empty")

    # Check if tool exists
    if tool_name not in generated_tools:
        valid_tools = list(generated_tools.keys())
        raise ToolInputError(
            f"Tool {tool_name} not found. Valid tools: {', '.join(valid_tools[:10])}..."
            if len(valid_tools) > 10
            else f"Tool {tool_name} not found. Valid tools: {', '.join(valid_tools)}"
        )

    # Initialize inputs
    if inputs is None:
        inputs = {}

    # Find which API this tool belongs to
    api_name = None
    for name, info in registered_apis.items():
        if tool_name in info["tools"]:
            api_name = name
            break

    if not api_name:
        logger.warning(f"Could not determine which API {tool_name} belongs to")

    # Add auth_token to inputs if specified and the API has an auth_header
    api_info = registered_apis.get(api_name, {})
    if api_info.get("auth_header") and "auth_token" in ctx:
        inputs["auth_token"] = ctx["auth_token"]

    # Call the tool directly through MCP
    logger.info(f"Calling dynamic tool {tool_name} with inputs: {inputs}")
    start_time = time.time()

    # MCP execute may be different from mcp.call_tool, handle appropriately
    if hasattr(mcp, "execute"):
        result = await mcp.execute(tool_name, inputs)
    else:
        result = await mcp.call_tool(tool_name, inputs)

    processing_time = time.time() - start_time

    # Add metadata to result
    if isinstance(result, dict):
        result["processing_time"] = processing_time
        result["success"] = True
    else:
        result = {"data": result, "processing_time": processing_time, "success": True}

    logger.info(f"Called dynamic tool {tool_name} in {processing_time:.4f}s")
    return result

@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def refresh_api(
    api_name: str,
    update_base_url: Optional[str] = None,
    timeout: float = 30.0,
    ctx: Optional[Dict[str, Any]] = None,
    get_state=None,
    set_state=None,
    delete_state=None
) -> Dict[str, Any]:
    """Refreshes an API by re-fetching its OpenAPI spec and updating tools.

    This is useful when the API has been updated with new endpoints or
    modifications to existing endpoints.

    Args:
        api_name: The name of the API to refresh
        update_base_url: Optional new base URL for the API
        timeout: Timeout for the HTTP request in seconds
        ctx: MCP context
        get_state: Function to get state from store (injected by decorator)
        set_state: Function to set state in store (injected by decorator)
        delete_state: Function to delete state from store (injected by decorator)

    Returns:
        A dictionary indicating the result:
        {
            "success": true,
            "api_name": "example_api",
            "tools_added": ["example_api_new_endpoint", ...],
            "tools_updated": ["example_api_modified_endpoint", ...],
            "tools_removed": ["example_api_deleted_endpoint", ...],
            "tools_count": 8
        }
    """
    # Get registered APIs from state
    registered_apis = await get_state("registered_apis", {})

    if api_name not in registered_apis:
        raise ToolInputError(f"API {api_name} not found")

    api_info = registered_apis[api_name]
    old_tools = set(api_info["tools"])

    # Determine if we need to update the base URL
    base_url = update_base_url or api_info["base_url"]

    # First, unregister the API
    await unregister_api(api_name, ctx=ctx, get_state=get_state, set_state=set_state, delete_state=delete_state)

    # Re-register with the same parameters but potentially updated base URL
    result = await register_api(
        api_name=api_name,
        openapi_url=api_info["openapi_url"],
        base_url=base_url,
        auth_header=api_info.get("auth_header"),
        tool_name_prefix=api_info["tool_name_prefix"],
        timeout=timeout,
        ctx=ctx,
        get_state=get_state,
        set_state=set_state,
        delete_state=delete_state
    )

    # Determine which tools were added, updated, or removed
    new_tools = set(result["tools_registered"])
    tools_added = list(new_tools - old_tools)
    tools_removed = list(old_tools - new_tools)
    tools_updated = list(new_tools.intersection(old_tools))

    logger.success(
        f"API {api_name} refreshed: "
        f"{len(tools_added)} added, {len(tools_removed)} removed, {len(tools_updated)} updated"
    )

    return {
        "success": True,
        "api_name": api_name,
        "tools_added": tools_added,
        "tools_updated": tools_updated,
        "tools_removed": tools_removed,
        "tools_count": len(new_tools),
    }

@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def get_tool_details(
    tool_name: str,
    ctx: Optional[Dict[str, Any]] = None,
    get_state=None,
    set_state=None,
    delete_state=None
) -> Dict[str, Any]:
    """Gets detailed information about a dynamically registered tool.

    Args:
        tool_name: Name of the tool to get details for
        ctx: MCP context
        get_state: Function to get state from store (injected by decorator)
        set_state: Function to set state in store (injected by decorator)
        delete_state: Function to delete state from store (injected by decorator)

    Returns:
        A dictionary containing the tool details:
        {
            "success": true,
            "tool_name": "example_api_get_users",
            "api_name": "example_api",
            "method": "get",
            "path": "/users",
            "summary": "Get all users",
            "description": "Returns a list of all users in the system",
            "parameters": [...],
            "source_code": "..."
        }
    """
    # Get registered APIs and generated tools from state
    registered_apis = await get_state("registered_apis", {})
    generated_tools = await get_state("generated_tools", {})

    if tool_name not in generated_tools:
        raise ToolInputError(f"Tool {tool_name} not found")

    # Find which API this tool belongs to
    api_name = None
    for name, info in registered_apis.items():
        if tool_name in info["tools"]:
            api_name = name
            break

    if not api_name:
        raise ToolError(f"Could not determine which API {tool_name} belongs to")

    api_info = registered_apis[api_name]

    # Find endpoint information in the API's endpoint list
    endpoint_info = None
    for endpoint in extract_endpoint_info(api_info["spec"]):
        if f"{api_info['tool_name_prefix']}_{endpoint['operation_id']}" == tool_name:
            endpoint_info = endpoint
            break

    if not endpoint_info:
        raise ToolError(f"Could not find endpoint information for tool {tool_name}")

    # Get the source code
    source_code = api_info.get("generated_code", {}).get(tool_name, "Source code not available")

    return {
        "success": True,
        "tool_name": tool_name,
        "api_name": api_name,
        "method": endpoint_info["method"],
        "path": endpoint_info["path"],
        "summary": endpoint_info["summary"],
        "description": endpoint_info["description"],
        "parameters": endpoint_info["parameters"],
        "tags": endpoint_info.get("tags", []),
        "source_code": source_code,
    }

@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def list_available_tools(
    include_source_code: bool = False,
    ctx: Optional[Dict[str, Any]] = None,
    get_state=None,
    set_state=None,
    delete_state=None
) -> Dict[str, Any]:
    """Lists all available tools registered via the API Meta-Tool.

    Args:
        include_source_code: Whether to include source code in the response
        ctx: MCP context
        get_state: Function to get state from store (injected by decorator)
        set_state: Function to set state in store (injected by decorator)
        delete_state: Function to delete state from store (injected by decorator)

    Returns:
        A dictionary containing the available tools:
        {
            "success": true,
            "tools": [
                {
                    "name": "example_api_get_users",
                    "api_name": "example_api",
                    "method": "get",
                    "path": "/users",
                    "summary": "Get all users",
                    "source_code": "..." # Only if include_source_code=True
                },
                ...
            ],
            "tools_count": 12
        }
    """
    # Get registered APIs from state
    registered_apis = await get_state("registered_apis", {})
    generated_tools = await get_state("generated_tools", {})

    tools = []

    for api_name, api_info in registered_apis.items():
        spec = api_info["spec"]
        endpoints = extract_endpoint_info(spec)

        for endpoint in endpoints:
            tool_name = f"{api_info['tool_name_prefix']}_{endpoint['operation_id']}"
            if tool_name in generated_tools:
                tool_info = {
                    "name": tool_name,
                    "api_name": api_name,
                    "method": endpoint["method"],
                    "path": endpoint["path"],
                    "summary": endpoint["summary"],
                }

                if include_source_code:
                    tool_info["source_code"] = api_info.get("generated_code", {}).get(
                        tool_name, "Source code not available"
                    )

                tools.append(tool_info)

    return {"success": True, "tools": tools, "tools_count": len(tools)}

# Now we have all our stateless functions defined:
# register_api, list_registered_apis, get_api_details, unregister_api
# call_dynamic_tool, refresh_api, get_tool_details, list_available_tools

def register_api_meta_tools(mcp_server):
    """Registers API Meta-Tool with the MCP server.

    Args:
        mcp_server: MCP server instance
    """
    # Register tools with MCP server
    mcp_server.tool(name="register_api")(register_api)
    mcp_server.tool(name="list_registered_apis")(list_registered_apis)
    mcp_server.tool(name="get_api_details")(get_api_details)
    mcp_server.tool(name="unregister_api")(unregister_api)
    mcp_server.tool(name="call_dynamic_tool")(call_dynamic_tool)
    mcp_server.tool(name="refresh_api")(refresh_api)
    mcp_server.tool(name="get_tool_details")(get_tool_details)
    mcp_server.tool(name="list_available_tools")(list_available_tools)

    logger.info("Registered API Meta-Tool functions")
    return None  # No need to return an instance anymore


# Example usage if this module is run directly
if __name__ == "__main__":
    import argparse
    import asyncio

    from ultimate_mcp_server import create_app

    async def main():
        # Parse command line arguments
        parser = argparse.ArgumentParser(description="API Meta-Tool for Ultimate MCP Server")
        parser.add_argument("--register", help="Register an API with the given name")
        parser.add_argument("--url", help="OpenAPI spec URL")
        parser.add_argument("--list", action="store_true", help="List registered APIs")
        parser.add_argument("--details", help="Get details for the given API")
        parser.add_argument("--unregister", help="Unregister the given API")
        parser.add_argument("--refresh", help="Refresh the given API")
        parser.add_argument("--base-url", help="Base URL for API requests")
        args = parser.parse_args()

        # Create MCP server
        create_app()
        # In FastMCP 2.0+, access the MCP server directly from the Gateway instance
        # The create_app() should return the gateway instance or we need to get it differently
        from ultimate_mcp_server.core import _gateway_instance
        mcp_server = _gateway_instance.mcp if _gateway_instance else None
        if not mcp_server:
            raise RuntimeError("Gateway instance not initialized or MCP server not available")

        # Register API Meta-Tool
        register_api_meta_tools(mcp_server)

        # Create context for stateless functions
        ctx = {"mcp": mcp_server}

        # Process commands
        if args.register and args.url:
            result = await register_api(
                api_name=args.register, 
                openapi_url=args.url, 
                base_url=args.base_url,
                ctx=ctx
            )
            print(f"Registered API {args.register} with {result['tools_count']} tools")
            print(f"Tools: {', '.join(result['tools_registered'])}")
        elif args.list:
            result = await list_registered_apis(ctx=ctx)
            print(f"Registered APIs ({result['total_apis']}):")
            for api_name, api_info in result["apis"].items():
                print(
                    f"- {api_name}: {api_info['tools_count']} tools, Base URL: {api_info['base_url']}"
                )
        elif args.details:
            result = await get_api_details(args.details, ctx=ctx)
            print(f"API {args.details} ({result['endpoints_count']} endpoints):")
            print(f"Base URL: {result['base_url']}")
            print(f"OpenAPI URL: {result['openapi_url']}")
            print("Endpoints:")
            for endpoint in result["tools"]:
                print(f"- {endpoint['method'].upper()} {endpoint['path']} ({endpoint['name']})")
                if endpoint["summary"]:
                    print(f"  Summary: {endpoint['summary']}")
        elif args.unregister:
            result = await unregister_api(args.unregister, ctx=ctx)
            print(f"Unregistered API {args.unregister} with {result['tools_count']} tools")
        elif args.refresh:
            result = await refresh_api(
                api_name=args.refresh, 
                update_base_url=args.base_url,
                ctx=ctx
            )
            print(
                f"Refreshed API {args.refresh}: {len(result['tools_added'])} added, {len(result['tools_removed'])} removed, {len(result['tools_updated'])} updated"
            )
        else:
            print("No action specified. Use --help for usage information.")

    # Run the main function
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/examples/entity_relation_graph_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Entity relationship graph extraction and visualization demo using Ultimate MCP Server (New Version)."""

import asyncio
import json
import os
import sys
import time
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Optional

import networkx as nx
from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table
from rich.tree import Tree

try:
    project_root = Path(__file__).resolve().parent.parent
    sys.path.insert(0, str(project_root))
    from ultimate_mcp_server.constants import Provider
    from ultimate_mcp_server.tools.entity_relation_graph import (
        COMMON_SCHEMAS,
        HAS_NETWORKX,
        HAS_VISUALIZATION_LIBS,
        GraphStrategy,
        OutputFormat,
        VisualizationFormat,
        extract_entity_graph,
    )
    from ultimate_mcp_server.utils import get_logger
    from ultimate_mcp_server.utils.logging.console import console
except ImportError as e:
    print(f"Error importing Ultimate MCP Server modules: {e}")
    print("Please ensure the script is run from the correct directory or the project path is set correctly.")
    sys.exit(1)

# Initialize logger
logger = get_logger("example.entity_graph") # Updated logger name

# Setup Directories
SCRIPT_DIR = Path(__file__).resolve().parent
SAMPLE_DIR = SCRIPT_DIR / "sample"
OUTPUT_DIR = SCRIPT_DIR / "output"
os.makedirs(OUTPUT_DIR, exist_ok=True)

class TextDomain(Enum):
    """Domain types for demonstration examples."""
    BUSINESS = "business"
    ACADEMIC = "academic"
    LEGAL = "legal"
    MEDICAL = "medical"
    GENERAL = "general" # Added for cases without a specific domain schema

# Console instances
main_console = console
# Keep detail_console if needed, but wasn't used in original provided script
# detail_console = Console(width=100, highlight=True)

def display_header(title: str) -> None:
    """Display a section header."""
    main_console.print()
    main_console.print(Rule(f"[bold blue]{title}[/bold blue]"))
    main_console.print()

def display_dataset_info(dataset_path: Path, title: str) -> None:
    """Display information about a dataset."""
    if not dataset_path.exists():
        main_console.print(f"[bold red]Error:[/bold red] Dataset file not found: {dataset_path}")
        return

    try:
        with open(dataset_path, "r", encoding="utf-8") as f:
            content = f.read()

        # Count entities/characters for display
        char_count = len(content)
        word_count = len(content.split())
        # Simple sentence count (approximate)
        sentence_count = content.count('.') + content.count('?') + content.count('!')
        if sentence_count == 0 and char_count > 0:
            sentence_count = 1 # At least one sentence if there's text

        # Preview of the content (first 300 chars)
        preview = escape(content[:300] + "..." if len(content) > 300 else content)

        main_console.print(Panel(
            f"[bold cyan]Dataset:[/bold cyan] {dataset_path.name}\n"
            f"[bold cyan]Size:[/bold cyan] {char_count:,} characters | {word_count:,} words | ~{sentence_count:,} sentences\n\n"
            f"[bold cyan]Preview:[/bold cyan]\n{preview}",
            title=title,
            border_style="cyan",
            expand=False
        ))
    except Exception as e:
        main_console.print(f"[bold red]Error reading dataset file {dataset_path.name}:[/bold red] {e}")


def display_extraction_params(params: Dict[str, Any]) -> None:
    """Display extraction parameters passed to the tool."""
    param_table = Table(title="Extraction Parameters", box=box.ROUNDED, show_header=True, header_style="bold magenta")
    param_table.add_column("Parameter", style="cyan", no_wrap=True)
    param_table.add_column("Value", style="green")

    # Filter parameters to display relevant ones
    display_keys = [
        "provider", "model", "strategy", "domain", "output_format", "visualization_format",
        "include_evidence", "include_attributes", "include_positions", "include_temporal_info",
        "normalize_entities", "max_entities", "max_relations", "min_confidence", "enable_reasoning",
        "language" # Added language if used
    ]

    for key in display_keys:
        if key in params:
            value = params[key]
            # Format enums and lists nicely
            if isinstance(value, Enum):
                value_str = value.value
            elif isinstance(value, list):
                value_str = escape(", ".join(str(v) for v in value)) if value else "[dim italic]Empty List[/dim italic]"
            elif isinstance(value, bool):
                value_str = "[green]Yes[/green]" if value else "[red]No[/red]"
            elif value is None:
                value_str = "[dim italic]None[/dim italic]"
            else:
                value_str = escape(str(value))

            param_table.add_row(key, value_str)

    main_console.print(param_table)

def display_entity_stats(result: Dict[str, Any]) -> None:
    """Display statistics about extracted entities and relationships."""
    metadata = result.get("metadata", {})
    entities = result.get("entities", []) # Get entities directly for type counting if metadata missing
    relationships = result.get("relationships", [])

    entity_count = metadata.get("entity_count", len(entities))
    relationship_count = metadata.get("relationship_count", len(relationships))

    if entity_count == 0:
        main_console.print("[yellow]No entities found in extraction result.[/yellow]")
        return

    # Use metadata if available, otherwise count manually
    entity_types_meta = metadata.get("entity_types")
    rel_types_meta = metadata.get("relation_types")

    entity_type_counts = {}
    if entity_types_meta:
        for etype in entity_types_meta:
             # Count occurrences in the actual entity list for accuracy
             entity_type_counts[etype] = sum(1 for e in entities if e.get("type") == etype)
    else: # Fallback if metadata key is missing
        for entity in entities:
            ent_type = entity.get("type", "Unknown")
            entity_type_counts[ent_type] = entity_type_counts.get(ent_type, 0) + 1

    rel_type_counts = {}
    if rel_types_meta:
        for rtype in rel_types_meta:
             rel_type_counts[rtype] = sum(1 for r in relationships if r.get("type") == rtype)
    else: # Fallback
        for rel in relationships:
            rel_type = rel.get("type", "Unknown")
            rel_type_counts[rel_type] = rel_type_counts.get(rel_type, 0) + 1

    # Create entity stats table
    stats_table = Table(title="Extraction Statistics", box=box.ROUNDED, show_header=True, header_style="bold blue")
    stats_table.add_column("Metric", style="cyan")
    stats_table.add_column("Count", style="green", justify="right")

    stats_table.add_row("Total Entities", str(entity_count))
    stats_table.add_row("Total Relationships", str(relationship_count))

    # Add entity type counts
    stats_table.add_section()
    for ent_type, count in sorted(entity_type_counts.items(), key=lambda x: x[1], reverse=True):
        stats_table.add_row(f"Entity Type: [italic]{escape(ent_type)}[/italic]", str(count))

    # Add relationship type counts (top 5)
    if rel_type_counts:
        stats_table.add_section()
        for rel_type, count in sorted(rel_type_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
            stats_table.add_row(f"Relationship Type: [italic]{escape(rel_type)}[/italic]", str(count))
        if len(rel_type_counts) > 5:
             stats_table.add_row("[dim]... (other relationship types)[/dim]", "")


    main_console.print(stats_table)


def display_graph_metrics(result: Dict[str, Any]) -> None:
    """Display graph metrics if available in metadata."""
    # Metrics are now nested under metadata
    metrics = result.get("metadata", {}).get("metrics", {})
    if not metrics:
        # Check the top level as a fallback for older structure compatibility if needed
        metrics = result.get("metrics", {})
        if not metrics:
            main_console.print("[dim]No graph metrics calculated (requires networkx).[/dim]")
            return

    metrics_table = Table(title="Graph Metrics", box=box.ROUNDED, show_header=True, header_style="bold blue")
    metrics_table.add_column("Metric", style="cyan")
    metrics_table.add_column("Value", style="green", justify="right")

    # Add metrics to table
    for key, value in metrics.items():
        if isinstance(value, (int, float)):
            # Improved formatting
            if isinstance(value, float):
                if 0.0001 < abs(value) < 10000:
                    formatted_value = f"{value:.4f}"
                else:
                    formatted_value = f"{value:.3e}" # Scientific notation for very small/large
            else:
                 formatted_value = f"{value:,}" # Add commas for integers
            metrics_table.add_row(key.replace("_", " ").title(), formatted_value)
        elif value is not None:
             metrics_table.add_row(key.replace("_", " ").title(), escape(str(value)))


    main_console.print(metrics_table)


def display_entities_table(result: Dict[str, Any], limit: int = 10) -> None:
    """Display extracted entities in a table, sorted appropriately."""
    entities = result.get("entities", [])
    if not entities:
        main_console.print("[yellow]No entities found to display.[/yellow]")
        return

    # Sorting based on available metrics from _add_graph_metrics
    # The new tool adds 'degree' and 'centrality'
    sort_key = "name" # Default sort
    if entities and isinstance(entities[0], dict):
        if "centrality" in entities[0] and any(e.get("centrality", 0) > 0 for e in entities):
            entities.sort(key=lambda x: x.get("centrality", 0.0), reverse=True)
            sort_key = "centrality"
        elif "degree" in entities[0] and any(e.get("degree", 0) > 0 for e in entities):
             entities.sort(key=lambda x: x.get("degree", 0.0), reverse=True)
             sort_key = "degree"
        elif "mentions" in entities[0] and any(e.get("mentions") for e in entities):
            entities.sort(key=lambda x: len(x.get("mentions", [])), reverse=True)
            sort_key = "mentions"
        else:
             entities.sort(key=lambda x: x.get("name", "").lower()) # Fallback to name


    # Limit to top entities
    display_entities = entities[:limit]

    title = f"Top {limit} Entities"
    if sort_key != "name":
        title += f" (Sorted by {sort_key.capitalize()})"

    entity_table = Table(title=title, box=box.ROUNDED, show_header=True, header_style="bold blue")
    entity_table.add_column("ID", style="dim", width=8)
    entity_table.add_column("Name", style="cyan", max_width=40)
    entity_table.add_column("Type", style="green", max_width=20)

    # Add columns for additional information if available
    has_degree = any(e.get("degree", 0) > 0 for e in display_entities)
    has_centrality = any(e.get("centrality", 0) > 0 for e in display_entities)
    has_mentions = any(e.get("mentions") for e in display_entities)
    has_attributes = any(e.get("attributes") for e in display_entities)

    if has_degree:
        entity_table.add_column("Degree", style="magenta", justify="right", width=8)
    if has_centrality:
        entity_table.add_column("Centrality", style="magenta", justify="right", width=10)
    if has_mentions:
        entity_table.add_column("Mentions", style="yellow", justify="right", width=8)
    if has_attributes:
        entity_table.add_column("Attributes", style="blue", max_width=50)

    # Add rows for each entity
    for entity in display_entities:
        row = [
            escape(entity.get("id", "")),
            escape(entity.get("name", "")),
            escape(entity.get("type", "Unknown"))
        ]

        if has_degree:
            degree = entity.get("degree", 0.0)
            row.append(f"{degree:.3f}")
        if has_centrality:
            centrality = entity.get("centrality", 0.0)
            row.append(f"{centrality:.4f}")
        if has_mentions:
            mentions_count = len(entity.get("mentions", []))
            row.append(str(mentions_count))
        if has_attributes:
            attributes = entity.get("attributes", {})
            # Format attributes more readably
            attr_str = "; ".join(f"{k}={v}" for k, v in attributes.items() if v) # Ignore empty values
            row.append(escape(attr_str[:45] + ("..." if len(attr_str) > 45 else "")))

        entity_table.add_row(*row)

    main_console.print(entity_table)

    if len(entities) > limit:
        main_console.print(f"[dim italic]...and {len(entities) - limit} more entities[/dim italic]")


def display_relationships_table(result: Dict[str, Any], limit: int = 10) -> None:
    """Display extracted relationships in a table, sorted by confidence."""
    relationships = result.get("relationships", [])
    # Create entity map for quick name lookups
    entity_map = {entity["id"]: entity for entity in result.get("entities", []) if isinstance(entity, dict) and "id" in entity}

    if not relationships:
        main_console.print("[yellow]No relationships found to display.[/yellow]")
        return

    # Sort relationships by confidence (new tool ensures confidence exists)
    relationships.sort(key=lambda x: x.get("confidence", 0.0), reverse=True)

    # Limit to top relationships
    display_relationships = relationships[:limit]

    rel_table = Table(title=f"Top {limit} Relationships (Sorted by Confidence)", box=box.ROUNDED, show_header=True, header_style="bold blue")
    rel_table.add_column("Source", style="cyan", max_width=30)
    rel_table.add_column("Type", style="green", max_width=25)
    rel_table.add_column("Target", style="cyan", max_width=30)
    rel_table.add_column("Conf.", style="magenta", justify="right", width=6)

    # Check if we have evidence or temporal info
    has_evidence = any(r.get("evidence") for r in display_relationships)
    has_temporal = any(r.get("temporal") for r in display_relationships)

    if has_evidence:
        rel_table.add_column("Evidence", style="yellow", max_width=40)
    if has_temporal:
         rel_table.add_column("Temporal", style="blue", max_width=20)


    # Add rows for each relationship
    for rel in display_relationships:
        source_id = rel.get("source", "")
        target_id = rel.get("target", "")

        # Get entity names if available, fallback to ID
        source_name = entity_map.get(source_id, {}).get("name", source_id)
        target_name = entity_map.get(target_id, {}).get("name", target_id)

        row = [
            escape(source_name),
            escape(rel.get("type", "Unknown")),
            escape(target_name),
            f"{rel.get('confidence', 0.0):.2f}",
        ]

        if has_evidence:
            evidence = rel.get("evidence", "")
            row.append(escape(evidence[:35] + ("..." if len(evidence) > 35 else "")))
        if has_temporal:
             temporal = rel.get("temporal", {})
             temp_str = "; ".join(f"{k}={v}" for k, v in temporal.items())
             row.append(escape(temp_str[:18] + ("..." if len(temp_str) > 18 else "")))


        rel_table.add_row(*row)

    main_console.print(rel_table)

    if len(relationships) > limit:
        main_console.print(f"[dim italic]...and {len(relationships) - limit} more relationships[/dim italic]")


def display_entity_graph_tree(result: Dict[str, Any], max_depth: int = 2, max_children: int = 5) -> None:
    """Display a tree representation of the entity graph, starting from the most central node."""
    entities = result.get("entities", [])
    relationships = result.get("relationships", [])
    entity_map = {entity["id"]: entity for entity in entities if isinstance(entity, dict) and "id" in entity}


    if not entities or not relationships or not HAS_NETWORKX: # Tree view less useful without sorting/structure
        if not HAS_NETWORKX:
             main_console.print("[yellow]Cannot display graph tree: NetworkX library not available for centrality sorting.[/yellow]")
        else:
             main_console.print("[yellow]Cannot display graph tree: insufficient data.[/yellow]")
        return

    # Sort entities by centrality (assuming metrics were calculated)
    entities.sort(key=lambda x: x.get("centrality", 0.0), reverse=True)

    # Get most central entity as root
    if not entities:
        return # Should not happen if check above passed, but safety
    root_entity = entities[0]
    root_id = root_entity["id"]

    # Create rich Tree
    tree = Tree(
        f"[bold cyan]{escape(root_entity.get('name', root_id))}[/bold cyan] "
        f"([dim italic]ID: {escape(root_id)}, Type: {escape(root_entity.get('type', 'Unknown'))}[/dim italic])"
    )

    # Keep track of edges explored to represent the tree structure
    explored_edges = set()

    # Recursively build tree using BFS approach for levels
    queue = [(root_id, tree, 0)] # (entity_id, parent_tree_node, current_depth)
    visited_nodes_in_tree = {root_id} # Prevent cycles *within this tree rendering*

    while queue:
        current_id, parent_node, depth = queue.pop(0)

        if depth >= max_depth:
            continue

        # Find outgoing relationships
        outgoing_rels = [
            r for r in relationships
            if r.get("source") == current_id
            and (current_id, r.get("target"), r.get("type")) not in explored_edges
        ]
        outgoing_rels.sort(key=lambda x: x.get("confidence", 0.0), reverse=True)

        # Find incoming relationships
        incoming_rels = [
             r for r in relationships
             if r.get("target") == current_id
             and (r.get("source"), current_id, r.get("type")) not in explored_edges
        ]
        incoming_rels.sort(key=lambda x: x.get("confidence", 0.0), reverse=True)


        # Add outgoing children
        children_count = 0
        for rel in outgoing_rels:
            if children_count >= max_children:
                parent_node.add("[dim italic]... (more outgoing)[/dim]")
                break

            target_id = rel.get("target")
            if target_id and target_id not in visited_nodes_in_tree: # Avoid cycles in display
                 target_entity = entity_map.get(target_id)
                 if target_entity:
                     edge_sig = (current_id, target_id, rel.get("type"))
                     explored_edges.add(edge_sig)
                     visited_nodes_in_tree.add(target_id)

                     rel_type = escape(rel.get("type", "related to"))
                     conf = rel.get("confidence", 0.0)
                     target_name = escape(target_entity.get("name", target_id))
                     target_type = escape(target_entity.get("type", "Unknown"))

                     branch_text = (
                         f"-[[green]{rel_type}[/green] ({conf:.1f})]-> "
                         f"[cyan]{target_name}[/cyan] ([dim italic]{target_type}[/dim italic])"
                     )
                     branch = parent_node.add(branch_text)
                     queue.append((target_id, branch, depth + 1))
                     children_count += 1


        # Add incoming children (optional, can make tree busy)
        # Comment out this block if you only want outgoing relationships in the tree
        # children_count = 0
        # for rel in incoming_rels:
        #     if children_count >= max_children // 2: # Show fewer incoming
        #         parent_node.add("[dim italic]... (more incoming)[/dim]")
        #         break
        #     source_id = rel.get("source")
        #     if source_id and source_id not in visited_nodes_in_tree:
        #          source_entity = entity_map.get(source_id)
        #          if source_entity:
        #              edge_sig = (source_id, current_id, rel.get("type"))
        #              explored_edges.add(edge_sig)
        #              visited_nodes_in_tree.add(source_id)
        #
        #              rel_type = escape(rel.get("type", "related to"))
        #              conf = rel.get("confidence", 0.0)
        #              source_name = escape(source_entity.get("name", source_id))
        #              source_type = escape(source_entity.get("type", "Unknown"))
        #
        #              branch_text = (
        #                  f"<-[[red]{rel_type}[/red] ({conf:.1f})]- "
        #                  f"[magenta]{source_name}[/magenta] ([dim italic]{source_type}[/dim italic])"
        #              )
        #              branch = parent_node.add(branch_text)
        #              queue.append((source_id, branch, depth + 1))
        #              children_count += 1


    main_console.print(Panel(tree, title=f"Entity Graph Tree View (Root: {escape(root_entity.get('name', ''))})", border_style="blue"))


def display_extraction_summary(result: Dict[str, Any]) -> None:
    """Display a summary of the extraction performance and cost."""
    metadata = result.get("metadata", {})
    provider = result.get("provider", "Unknown")
    model = result.get("model", "Unknown")
    tokens = result.get("tokens", {})
    cost = result.get("cost", 0.0) # Cost is now float
    processing_time = result.get("processing_time", 0.0) # Time is now float
    strategy = metadata.get("processing_strategy", "Unknown")
    schema_used = metadata.get("schema_used", "Unknown")


    summary_table = Table(box=box.ROUNDED, show_header=False, title="Extraction Summary")
    summary_table.add_column("Metric", style="cyan", no_wrap=True)
    summary_table.add_column("Value", style="green")

    summary_table.add_row("Provider", escape(provider))
    summary_table.add_row("Model", escape(model))
    summary_table.add_row("Strategy", escape(strategy))
    summary_table.add_row("Schema Used", escape(schema_used))
    summary_table.add_row("Input Tokens", f"{tokens.get('input', 0):,}")
    summary_table.add_row("Output Tokens", f"{tokens.get('output', 0):,}")
    summary_table.add_row("Total Tokens", f"{tokens.get('total', 0):,}")
    summary_table.add_row("Cost", f"${cost:.6f}")
    summary_table.add_row("Processing Time", f"{processing_time:.2f} seconds")

    main_console.print(summary_table)


def save_visualization(result: Dict[str, Any], domain: str, strategy: str, output_dir: Path) -> Optional[str]:
    """Save visualization file based on the format present in the result."""
    visualization = result.get("visualization") # Visualization data is now under this key
    if not visualization or not isinstance(visualization, dict):
        main_console.print("[dim]No visualization data found in the result.[/dim]")
        return None

    content = None
    extension = None
    file_path = None

    # Check for different visualization formats
    if "html" in visualization:
        content = visualization["html"]
        extension = "html"
    elif "svg" in visualization:
        content = visualization["svg"]
        extension = "svg"
    elif "png_url" in visualization: # Assuming PNG might save file directly and return URL
        file_path = visualization["png_url"].replace("file://", "")
        extension = "png"
    elif "dot" in visualization:
        content = visualization["dot"]
        extension = "dot"

    if file_path: # If path was returned directly (like maybe for PNG)
         if Path(file_path).exists():
             return file_path
         else:
             main_console.print(f"[red]Visualization file path provided but not found: {file_path}[/red]")
             return None

    if content and extension:
        timestamp = int(time.time())
        # Sanitize domain and strategy for filename
        safe_domain = domain.replace(" ", "_").lower()
        safe_strategy = strategy.replace(" ", "_").lower()
        output_path = output_dir / f"graph_{safe_domain}_{safe_strategy}_{timestamp}.{extension}"
        try:
            with open(output_path, "w", encoding="utf-8") as f:
                f.write(content)
            return str(output_path)
        except Exception as e:
            main_console.print(f"[bold red]Error saving visualization file {output_path}:[/bold red] {e}")
            return None
    elif "error" in visualization:
         main_console.print(f"[yellow]Visualization generation failed:[/yellow] {visualization['error']}")
         return None
    else:
        main_console.print("[dim]Unsupported or missing visualization format in result.[/dim]")
        return None


async def run_entity_extraction(
    text: str,
    domain: TextDomain,
    strategy: GraphStrategy,
    model: str,
    output_format: OutputFormat = OutputFormat.JSON,
    visualization_format: VisualizationFormat = VisualizationFormat.HTML, # Keep HTML for demo vis
    provider: str = Provider.ANTHROPIC.value # Example provider
) -> Optional[Dict[str, Any]]:
    """Run entity graph extraction with progress indicator and display params."""
    # Setup extraction parameters
    params = {
        "text": text,
        "provider": provider, # Pass provider name string
        "model": model,
        "strategy": strategy, # Pass enum directly
        "output_format": output_format, # Pass enum directly
        "visualization_format": visualization_format, # Pass enum directly
        # --- Include Flags (consider new defaults) ---
        "include_evidence": True, # Explicitly keep True for demo
        "include_attributes": True, # Explicitly keep True for demo
        "include_positions": False, # Change to False to match new default (saves tokens)
        "include_temporal_info": True, # Explicitly keep True for demo
        # --- Control Flags ---
        "normalize_entities": True, # Keep True (new default)
        "enable_reasoning": False, # Keep False for demo speed (can be enabled)
        # --- Limits ---
        "max_entities": 75,  # Adjusted limits for demo
        "max_relations": 150,
        "min_confidence": 0.55, # Slightly higher confidence
        # --- Optional ---
        "language": None, # Specify if needed, e.g., "Spanish"
        "domain": None, # Set below if applicable
        #"custom_prompt": None, # Add if testing custom prompts
        #"system_prompt": None, # Add if testing system prompts
        #"additional_params": {"temperature": 0.2} # Example
    }

    # Add domain value string if applicable and not GENERAL
    if domain != TextDomain.GENERAL and domain.value in COMMON_SCHEMAS:
        params["domain"] = domain.value

    # Display parameters being used
    display_extraction_params(params)

    # Run extraction with progress spinner
    result = None
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=main_console,
        transient=False # Keep progress visible after completion
    ) as progress:
        task_desc = f"Extracting graph ({domain.value}/{strategy.value}/{model})..."
        task = progress.add_task(task_desc, total=None)

        try:
            start_time = time.monotonic()
            # Pass enums directly now, the tool handles conversion if needed
            result = await extract_entity_graph(**params) # type: ignore
            end_time = time.monotonic()
            duration = end_time - start_time
            progress.update(task, completed=100, description=f"[green]Extraction complete ({duration:.2f}s)[/green]")
            return result
        except Exception as e:
            logger.error(f"Extraction failed during run_entity_extraction: {e}", exc_info=True)
            progress.update(task, completed=100, description=f"[bold red]Extraction failed: {escape(str(e))}[/bold red]")
            # Optionally re-raise or just return None
            # raise # Uncomment to stop the demo on failure
            return None # Allow demo to continue


# --- Demonstration Functions (Updated calls to run_entity_extraction) ---

async def demonstrate_domain_extraction(
        domain: TextDomain,
        sample_file: str,
        strategy: GraphStrategy,
        model: str = "claude-3-5-haiku-20241022", # Default model for demos
        provider: str = Provider.ANTHROPIC.value
    ):
    """Helper function to demonstrate extraction for a specific domain."""
    domain_name = domain.value.capitalize()
    display_header(f"{domain_name} Domain Entity Graph Extraction ({strategy.value} strategy)")

    sample_path = SAMPLE_DIR / sample_file
    display_dataset_info(sample_path, f"{domain_name} Sample Text")

    if not sample_path.exists():
        return # Skip if file missing

    with open(sample_path, "r", encoding="utf-8") as f:
        text_content = f.read()

    try:
        result = await run_entity_extraction(
            text=text_content,
            domain=domain,
            strategy=strategy,
            model=model,
            provider=provider,
            visualization_format=VisualizationFormat.HTML # Request HTML for viewing
        )

        if result and result.get("success", False):
            # Display results
            display_entity_stats(result)
            if HAS_NETWORKX: # Only display metrics if networkx is installed
                 display_graph_metrics(result)
            display_entities_table(result)
            display_relationships_table(result)
            if HAS_NETWORKX: # Tree view also requires networkx
                 display_entity_graph_tree(result)

            # Save visualization if available
            vis_path = save_visualization(result, domain.value, strategy.value, OUTPUT_DIR)
            if vis_path:
                main_console.print(f"\n[green]✓[/green] Visualization saved to: [blue link=file://{vis_path}]{vis_path}[/blue]")
            elif "visualization" in result and "error" in result["visualization"]:
                 main_console.print(f"[yellow]Visualization generation failed: {result['visualization']['error']}[/yellow]")


            # Display summary
            display_extraction_summary(result)
        elif result:
             main_console.print(f"[bold red]Extraction reported failure:[/bold red] {result.get('error', 'Unknown error')}")
        # If result is None, run_entity_extraction already printed the error

    except Exception as e:
        # Catch errors not caught by run_entity_extraction's try/except
        main_console.print(f"[bold red]Error during {domain_name} demonstration:[/bold red] {escape(str(e))}")
        logger.error(f"Unhandled error in {domain_name} demo: {e}", exc_info=True)


async def demonstrate_strategy_comparison():
    """Compare different extraction strategies on the same text."""
    display_header("Strategy Comparison")

    # Load business article for comparison
    comparison_file = "article.txt"
    comparison_path = SAMPLE_DIR / comparison_file
    display_dataset_info(comparison_path, f"{comparison_file} (For Strategy Comparison)")

    if not comparison_path.exists():
        return

    with open(comparison_path, "r", encoding="utf-8") as f:
        comparison_text = f.read()

    # Define strategies to compare
    strategies_to_compare = [
        (GraphStrategy.STANDARD, "Standard"),
        (GraphStrategy.MULTISTAGE, "Multistage"),
        (GraphStrategy.CHUNKED, "Chunked"), # Will process full text if short, or chunk if long
        (GraphStrategy.STRUCTURED, "Structured"), # Needs examples from domain
        (GraphStrategy.STRICT_SCHEMA, "Strict Schema") # Needs domain
    ]

    # Setup comparison table
    comparison_table = Table(title="Strategy Comparison Results", box=box.ROUNDED, show_header=True, header_style="bold magenta")
    comparison_table.add_column("Strategy", style="cyan")
    comparison_table.add_column("Entities", style="green", justify="right")
    comparison_table.add_column("Rels", style="green", justify="right")
    comparison_table.add_column("Time (s)", style="yellow", justify="right")
    comparison_table.add_column("Tokens", style="magenta", justify="right")
    comparison_table.add_column("Cost ($)", style="blue", justify="right")

    # Use a slightly smaller model for faster comparison if needed
    comparison_model = "gpt-4.1-mini"
    comparison_provider = Provider.OPENAI.value
    comparison_domain = TextDomain.BUSINESS

    # Compare each strategy
    for strategy, desc in strategies_to_compare:
        main_console.print(f"\n[bold underline]Running {desc} Strategy[/bold underline]")

        # Use full text for chunking demo, maybe excerpt for others if needed for speed?
        # Let's use full text for all to see chunking effect properly
        text_to_use = comparison_text

        result = None # Define result outside try block
        try:
            result = await run_entity_extraction(
                text=text_to_use,
                domain=comparison_domain, # Business domain has examples/schema
                strategy=strategy,
                model=comparison_model,
                provider=comparison_provider,
                visualization_format=VisualizationFormat.NONE # Skip visualization for comparison
            )

            if result and result.get("success", False):
                # Extract metrics for comparison
                entity_count = result.get("metadata", {}).get("entity_count", 0)
                rel_count = result.get("metadata", {}).get("relationship_count", 0)
                processing_time = result.get("processing_time", 0.0)
                token_count = result.get("tokens", {}).get("total", 0)
                cost = result.get("cost", 0.0)

                # Add to comparison table
                comparison_table.add_row(
                    desc,
                    str(entity_count),
                    str(rel_count),
                    f"{processing_time:.2f}",
                    f"{token_count:,}",
                    f"{cost:.6f}"
                )
                # Display brief stats for this strategy
                display_entity_stats(result)
            else:
                 error_msg = result.get("error", "Extraction failed") if result else "Extraction returned None"
                 main_console.print(f"[bold red]Error with {desc} strategy:[/bold red] {escape(error_msg)}")
                 comparison_table.add_row(desc, "[red]ERR[/red]", "[red]ERR[/red]", "N/A", "N/A", "N/A")


        except Exception as e:
            logger.error(f"Unhandled error comparing strategy {desc}: {e}", exc_info=True)
            main_console.print(f"[bold red]Unhandled Error with {desc} strategy:[/bold red] {escape(str(e))}")
            comparison_table.add_row(desc, "[red]CRASH[/red]", "[red]CRASH[/red]", "N/A", "N/A", "N/A")


    # Display final comparison table
    main_console.print(Rule("Comparison Summary"))
    main_console.print(comparison_table)


async def demonstrate_output_formats():
    """Demonstrate different output formats using a sample text."""
    display_header("Output Format Demonstration")

    # Load academic paper for output format demo
    format_file = "research_paper.txt"
    format_path = SAMPLE_DIR / format_file
    display_dataset_info(format_path, f"{format_file} (For Output Formats)")

    if not format_path.exists():
        return

    with open(format_path, "r", encoding="utf-8") as f:
        format_text = f.read()

    # Define output formats to demonstrate
    # Exclude NetworkX if library not installed
    formats_to_demonstrate = [
        (OutputFormat.JSON, "Standard JSON"),
        (OutputFormat.CYTOSCAPE, "Cytoscape.js"),
        (OutputFormat.NEO4J, "Neo4j Cypher"),
        (OutputFormat.RDF, "RDF Triples"),
        (OutputFormat.D3, "D3.js nodes/links"),
    ]
    if HAS_NETWORKX:
        formats_to_demonstrate.insert(1, (OutputFormat.NETWORKX, "NetworkX Object"))


    main_console.print("[bold yellow]Note:[/bold yellow] This demonstrates how extracted data can be formatted.")

    # Use a short excerpt for speed
    text_excerpt = format_text[:2000]
    base_model = "gpt-4.1-mini" # Faster model for formats
    base_provider = Provider.OPENAI.value
    base_domain = TextDomain.ACADEMIC

    # Extract with each output format
    for fmt, desc in formats_to_demonstrate:
        main_console.print(f"\n[bold underline]Demonstrating {desc} Output Format[/bold underline]")

        result = None # Define outside try block
        try:
            result = await run_entity_extraction(
                text=text_excerpt,
                domain=base_domain,
                strategy=GraphStrategy.STANDARD, # Use standard strategy
                model=base_model,
                provider=base_provider,
                output_format=fmt, # Specify the output format
                visualization_format=VisualizationFormat.NONE # No viz needed here
            )

            if not result or not result.get("success", False):
                 error_msg = result.get("error", "Extraction failed") if result else "Extraction returned None"
                 main_console.print(f"[bold red]Error extracting data for {desc} format:[/bold red] {escape(error_msg)}")
                 continue # Skip displaying output for this format


            # Display format-specific output key
            output_key = fmt.value # Default key matches enum value
            if fmt == OutputFormat.NETWORKX: 
                output_key = "graph"
            if fmt == OutputFormat.NEO4J: 
                output_key = "neo4j_queries"
            if fmt == OutputFormat.RDF: 
                output_key = "rdf_triples"
            if fmt == OutputFormat.D3: 
                output_key = "d3"
            if fmt == OutputFormat.CYTOSCAPE: 
                output_key = "cytoscape"

            if output_key in result:
                data_to_display = result[output_key]
                display_title = f"Sample of {desc} Output (`{output_key}` key)"

                if fmt == OutputFormat.JSON:
                    # Display a subset of the standard JSON keys
                    json_subset = {
                        "entities": result.get("entities", [])[:2],
                        "relationships": result.get("relationships", [])[:2],
                        "metadata": {k:v for k,v in result.get("metadata",{}).items() if k in ["entity_count", "relationship_count","processing_strategy"]}
                    }
                    output_content = Syntax(json.dumps(json_subset, indent=2), "json", theme="default", line_numbers=True, word_wrap=True)
                elif fmt == OutputFormat.NETWORKX:
                    graph_obj = result["graph"]
                    info = (
                         f"[green]✓[/green] NetworkX graph object created: {isinstance(graph_obj, nx.DiGraph)}\n"
                         f"Nodes: {graph_obj.number_of_nodes()}, Edges: {graph_obj.number_of_edges()}\n\n"
                         "[italic]Allows graph algorithms (centrality, paths, etc.)[/italic]"
                    )
                    output_content = info # Simple text panel
                elif fmt == OutputFormat.CYTOSCAPE:
                     sample = {
                         "nodes": data_to_display.get("nodes", [])[:2],
                         "edges": data_to_display.get("edges", [])[:2]
                     }
                     output_content = Syntax(json.dumps(sample, indent=2), "json", theme="default", line_numbers=True, word_wrap=True)
                elif fmt == OutputFormat.NEO4J:
                    queries = data_to_display
                    sample_queries = queries[:3] # Show first few queries
                    output_content = Syntax("\n\n".join(sample_queries) + "\n...", "cypher", theme="default", line_numbers=True, word_wrap=True)
                elif fmt == OutputFormat.RDF:
                     triples = data_to_display
                     sample_triples = ['("{}", "{}", "{}")'.format(*t) for t in triples[:5]] # Format first few
                     output_content = Syntax("\n".join(sample_triples) + "\n...", "turtle", theme="default", line_numbers=True, word_wrap=True) # Turtle isn't perfect but ok
                elif fmt == OutputFormat.D3:
                     sample = {
                         "nodes": data_to_display.get("nodes", [])[:2],
                         "links": data_to_display.get("links", [])[:2]
                     }
                     output_content = Syntax(json.dumps(sample, indent=2), "json", theme="default", line_numbers=True, word_wrap=True)
                else:
                     # Fallback for unexpected formats
                     output_content = escape(str(data_to_display)[:500] + "...")


                main_console.print(Panel(
                    output_content,
                    title=display_title,
                    border_style="green",
                    expand=False
                ))

            else:
                 main_console.print(f"[yellow]Output key '{output_key}' not found in result for {desc} format.[/yellow]")


        except Exception as e:
            logger.error(f"Unhandled error demonstrating format {desc}: {e}", exc_info=True)
            main_console.print(f"[bold red]Unhandled Error with {desc} format:[/bold red] {escape(str(e))}")


async def main():
    """Run entity relation graph extraction demonstrations."""
    try:
        # Display welcome message
        main_console.print(Rule("[bold magenta]Entity Relationship Graph Extraction Demo (v2 Tool)[/bold magenta]"))
        main_console.print(
            "[bold]This demonstrates the refactored `entity_graph` tool for extracting and visualizing "
            "knowledge graphs from text across different domains and using various strategies.[/bold]\n"
        )

        # Check for dependencies needed by the demo display itself
        if not HAS_VISUALIZATION_LIBS:
             main_console.print("[yellow]Warning:[/yellow] `networkx`, `pyvis`, or `matplotlib` not installed.")
             main_console.print("Graph metrics, tree view, and some visualizations may be unavailable.")
        if not HAS_NETWORKX:
             main_console.print("[yellow]Warning:[/yellow] `networkx` not installed. Graph metrics and tree view disabled.")


        # Initialize the Gateway (optional, depends if Gateway context is needed for config/logging)
        # If the tool functions standalone, this might not be strictly necessary for the demo.
        # gateway = Gateway("entity-graph-demo", register_tools=True)
        # logger.info("Ultimate MCP Server initialized (optional for demo).")


        # Check if sample directory exists
        if not SAMPLE_DIR.exists() or not any(SAMPLE_DIR.iterdir()):
            main_console.print(f"[bold red]Error:[/bold red] Sample directory '{SAMPLE_DIR}' not found or is empty!")
            main_console.print("Please create the 'sample' directory next to this script and add text files (e.g., article.txt).")
            return 1

        # --- Run Demonstrations ---
        # Define models to use - maybe select based on availability or speed
        # Using Sonnet as a balance, Haiku for comparisons/formats
        default_model = "gpt-4.1-mini"
        default_provider = Provider.OPENAI.value # String like "anthropic"

        # 1. Domain Examples (using appropriate strategies)
        await demonstrate_domain_extraction(TextDomain.BUSINESS, "article.txt", GraphStrategy.STANDARD, model=default_model, provider=default_provider)
        await demonstrate_domain_extraction(TextDomain.ACADEMIC, "research_paper.txt", GraphStrategy.MULTISTAGE, model=default_model, provider=default_provider)
        await demonstrate_domain_extraction(TextDomain.LEGAL, "legal_contract.txt", GraphStrategy.STRUCTURED, model=default_model, provider=default_provider)
        await demonstrate_domain_extraction(TextDomain.MEDICAL, "medical_case.txt", GraphStrategy.STRICT_SCHEMA, model=default_model, provider=default_provider)

        # 2. Strategy Comparison
        await demonstrate_strategy_comparison()

        # 3. Output Format Demonstration
        await demonstrate_output_formats()

        main_console.print(Rule("[bold green]Entity Relationship Graph Extraction Demo Complete[/bold green]"))
        # Split the print into two simpler statements to avoid rich markup issues
        main_console.print(f"\n[bold]Visualizations and outputs have been saved to:[/bold] {OUTPUT_DIR}")
        main_console.print("Open any HTML files in a web browser to explore interactive graphs.")

        return 0

    except Exception as e:
        # Catch-all for unexpected errors during setup or top-level execution
        logger.critical(f"Demo failed catastrophically: {e}", exc_info=True)
        main_console.print(f"[bold red]Critical Demo Error:[/bold red] {escape(str(e))}")
        return 1

if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/ums_api/ums_models.py:
--------------------------------------------------------------------------------

```python
"""Pydantic models for UMS API endpoints."""

from datetime import datetime
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field


# ---------- Cognitive States Models ----------

class CognitiveState(BaseModel):
    state_id: str
    timestamp: float
    formatted_timestamp: str
    state_type: str
    description: Optional[str] = None
    workflow_id: Optional[str] = None
    workflow_title: Optional[str] = None
    complexity_score: float
    change_magnitude: float
    age_minutes: float
    memory_count: int
    action_count: int
    state_data: Dict[str, Any] = {}


class CognitiveStatesResponse(BaseModel):
    states: List[CognitiveState]
    total: int
    has_more: bool


class TimelineState(BaseModel):
    state_id: str
    timestamp: float
    formatted_time: str
    state_type: str
    workflow_id: Optional[str] = None
    description: Optional[str] = None
    sequence_number: int
    complexity_score: float
    change_magnitude: float


class TimelineSummaryStats(BaseModel):
    avg_complexity: float
    total_transitions: int
    max_change_magnitude: float


class CognitiveTimelineResponse(BaseModel):
    timeline_data: List[TimelineState]
    total_states: int
    time_range_hours: int
    granularity: str
    summary_stats: TimelineSummaryStats


class Memory(BaseModel):
    memory_id: str
    memory_type: str
    content: str
    importance: float
    created_at: float


class Action(BaseModel):
    action_id: str
    action_type: str
    tool_name: str
    status: str
    started_at: float


class DetailedCognitiveState(BaseModel):
    state_id: str
    timestamp: float
    formatted_timestamp: str
    state_type: str
    description: Optional[str] = None
    workflow_id: Optional[str] = None
    workflow_title: Optional[str] = None
    workflow_goal: Optional[str] = None
    state_data: Dict[str, Any]
    complexity_score: float
    memories: List[Memory] = []
    actions: List[Action] = []


class Pattern(BaseModel):
    type: str
    length: int
    similarity: float
    occurrences: int
    first_occurrence: float
    pattern_description: str


class Transition(BaseModel):
    transition: str
    count: int
    percentage: float


class Anomaly(BaseModel):
    state_id: str
    timestamp: float
    anomaly_type: str
    z_score: float
    description: str
    severity: str


class PatternSummary(BaseModel):
    pattern_count: int
    most_common_transition: Optional[Transition] = None
    anomaly_count: int


class CognitivePatternAnalysis(BaseModel):
    total_states: int
    time_range_hours: int
    patterns: List[Pattern] = []
    transitions: List[Transition] = []
    anomalies: List[Anomaly] = []
    summary: PatternSummary


class StateComparisonInfo(BaseModel):
    state_id: str
    timestamp: float
    formatted_timestamp: str


class StateDiff(BaseModel):
    added: Dict[str, Any] = {}
    removed: Dict[str, Any] = {}
    modified: Dict[str, Dict[str, Any]] = {}
    magnitude: float


class StateComparisonRequest(BaseModel):
    state_id_1: str = Field(
        ...,
        description="First cognitive state ID for comparison",
        example="state_abc123"
    )
    state_id_2: str = Field(
        ...,
        description="Second cognitive state ID for comparison", 
        example="state_xyz789"
    )


class StateComparisonResponse(BaseModel):
    state_1: StateComparisonInfo
    state_2: StateComparisonInfo
    time_diff_minutes: float
    diff: StateDiff


# ---------- Action Monitor Models ----------

class StatusIndicator(BaseModel):
    """Action status indicator with visual cues"""
    color: str = Field(..., description="Color for visual representation")
    icon: str = Field(..., description="Icon name for the status")
    label: str = Field(..., description="Human-readable status label")
    urgency: str = Field(..., description="Urgency level: low, medium, high")


class ResourceUsage(BaseModel):
    """Resource usage metrics for an action"""
    cpu_usage: float = Field(..., description="CPU usage percentage")
    memory_usage: float = Field(..., description="Memory usage percentage")
    network_io: float = Field(..., description="Network I/O in KB/s")
    disk_io: float = Field(..., description="Disk I/O in KB/s")


class RunningAction(BaseModel):
    """Model for a currently running action"""
    action_id: str = Field(..., description="Unique action identifier")
    workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
    workflow_title: Optional[str] = Field(None, description="Workflow title")
    tool_name: str = Field(..., description="Name of the tool being executed")
    status: str = Field(..., description="Current execution status")
    started_at: float = Field(..., description="Start timestamp")
    formatted_start_time: str = Field(..., description="ISO formatted start time")
    execution_time_seconds: float = Field(
        ..., description="Current execution duration in seconds"
    )
    estimated_duration: Optional[float] = Field(
        None, description="Estimated duration in seconds"
    )
    progress_percentage: float = Field(..., description="Estimated progress percentage")
    status_indicator: StatusIndicator = Field(..., description="Visual status indicator")
    performance_category: str = Field(..., description="Performance categorization")
    resource_usage: ResourceUsage = Field(..., description="Current resource usage")
    tool_data: Dict[str, Any] = Field(
        default_factory=dict, description="Tool-specific data"
    )


class RunningActionsResponse(BaseModel):
    """Response for currently running actions"""
    running_actions: List[RunningAction] = Field(
        ..., description="List of currently executing actions"
    )
    total_running: int = Field(..., description="Total number of running actions")
    avg_execution_time: float = Field(
        ..., description="Average execution time of running actions"
    )
    timestamp: str = Field(..., description="Response timestamp")


class QueuedAction(BaseModel):
    """Model for a queued action"""
    action_id: str = Field(..., description="Unique action identifier")
    workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
    workflow_title: Optional[str] = Field(None, description="Workflow title")
    tool_name: str = Field(..., description="Name of the tool to be executed")
    status: str = Field(..., description="Queue status")
    created_at: float = Field(..., description="Creation timestamp")
    formatted_queue_time: str = Field(..., description="ISO formatted queue time")
    queue_position: int = Field(..., description="Position in the queue (1-based)")
    queue_time_seconds: float = Field(..., description="Time spent in queue")
    estimated_wait_time: float = Field(..., description="Estimated wait time in seconds")
    priority: int = Field(..., description="Numeric priority value")
    priority_label: str = Field(..., description="Human-readable priority label")
    tool_data: Dict[str, Any] = Field(
        default_factory=dict, description="Tool-specific data"
    )


class ActionQueueResponse(BaseModel):
    """Response for action queue status"""
    queued_actions: List[QueuedAction] = Field(..., description="List of queued actions")
    total_queued: int = Field(..., description="Total number of queued actions")
    avg_queue_time: float = Field(..., description="Average time in queue")
    next_action: Optional[QueuedAction] = Field(
        None, description="Next action to be executed"
    )
    timestamp: str = Field(..., description="Response timestamp")


class ActionHistoryItem(BaseModel):
    """Model for a single action in history"""
    action_id: str = Field(..., description="Unique action identifier")
    workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
    workflow_title: Optional[str] = Field(None, description="Associated workflow title")
    tool_name: str = Field(..., description="Name of the tool executed")
    action_type: Optional[str] = Field(None, description="Type of action")
    status: str = Field(..., description="Action completion status")
    started_at: float = Field(..., description="Unix timestamp when action started")
    completed_at: Optional[float] = Field(
        None, description="Unix timestamp when action completed"
    )
    execution_duration_seconds: float = Field(
        ..., description="Total execution time in seconds"
    )
    performance_score: float = Field(
        ..., description="Calculated performance score (0-100)"
    )
    efficiency_rating: str = Field(
        ..., description="Efficiency rating based on time and output"
    )
    success_rate_impact: int = Field(..., description="Impact on success rate (1 or 0)")
    formatted_start_time: str = Field(..., description="ISO formatted start time")
    formatted_completion_time: Optional[str] = Field(
        None, description="ISO formatted completion time"
    )
    tool_data: Dict[str, Any] = Field(
        default_factory=dict, description="Tool-specific data"
    )
    result_data: Dict[str, Any] = Field(
        default_factory=dict, description="Action result data"
    )
    result_size: int = Field(0, description="Size of the result data")


class PerformanceSummary(BaseModel):
    """Performance summary statistics"""
    avg_score: float = Field(..., description="Average performance score")
    top_performer: Optional[Dict[str, Any]] = Field(
        None, description="Best performing tool"
    )
    worst_performer: Optional[Dict[str, Any]] = Field(
        None, description="Worst performing tool"
    )
    efficiency_distribution: Dict[str, int] = Field(
        ..., description="Distribution of efficiency ratings"
    )


class ActionHistoryResponse(BaseModel):
    """Response model for action history"""
    action_history: List[ActionHistoryItem] = Field(
        ..., description="List of completed actions"
    )
    total_actions: int = Field(
        ..., description="Total number of actions in the time period"
    )
    success_rate: float = Field(..., description="Overall success rate percentage")
    avg_execution_time: float = Field(..., description="Average execution time in seconds")
    performance_summary: PerformanceSummary = Field(
        ..., description="Performance summary statistics"
    )
    timestamp: str = Field(..., description="Response timestamp")


class OverallMetrics(BaseModel):
    """Overall action execution metrics"""
    total_actions: int = Field(..., description="Total number of actions executed")
    successful_actions: int = Field(
        ..., description="Number of successfully completed actions"
    )
    failed_actions: int = Field(..., description="Number of failed actions")
    avg_duration: Optional[float] = Field(
        None, description="Average execution duration in seconds"
    )
    success_rate_percentage: float = Field(
        ..., description="Overall success rate as percentage"
    )
    failure_rate_percentage: float = Field(
        ..., description="Overall failure rate as percentage"
    )
    avg_duration_seconds: float = Field(..., description="Average duration in seconds")


class ToolUsageStat(BaseModel):
    """Statistics for a single tool"""
    tool_name: str = Field(..., description="Name of the tool")
    usage_count: int = Field(..., description="Number of times the tool was used")
    success_count: int = Field(..., description="Number of successful executions")
    avg_duration: Optional[float] = Field(
        None, description="Average execution time in seconds"
    )


class HourlyMetric(BaseModel):
    """Hourly performance metrics"""
    hour: str = Field(..., description="Hour of the day (0-23)")
    action_count: int = Field(..., description="Number of actions in this hour")
    avg_duration: Optional[float] = Field(
        None, description="Average duration for this hour"
    )
    success_count: int = Field(..., description="Number of successful actions")


class PerformanceInsight(BaseModel):
    """Performance insight or recommendation"""
    type: str = Field(..., description="Type of insight (warning, info, etc.)")
    title: str = Field(..., description="Title of the insight")
    message: str = Field(..., description="Detailed message")
    severity: str = Field(..., description="Severity level (high, medium, low)")


class ActionMetricsResponse(BaseModel):
    """Response model for action metrics"""
    overall_metrics: OverallMetrics = Field(..., description="Overall execution metrics")
    tool_usage_stats: List[ToolUsageStat] = Field(
        ..., description="Per-tool usage statistics"
    )
    hourly_performance: List[HourlyMetric] = Field(
        ..., description="Hourly performance breakdown"
    )
    performance_insights: List[PerformanceInsight] = Field(
        ..., description="Actionable insights and recommendations"
    )
    timestamp: str = Field(..., description="Response timestamp")


# ---------- Artifacts Models ----------

class Artifact(BaseModel):
    """Model for a single artifact"""
    artifact_id: str = Field(..., description="Unique artifact identifier")
    name: str = Field(..., description="Name of the artifact")
    artifact_type: str = Field(
        ..., description="Type of artifact (document, image, code, etc.)"
    )
    description: Optional[str] = Field(None, description="Description of the artifact")
    file_path: Optional[str] = Field(None, description="File system path to the artifact")
    workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
    workflow_title: Optional[str] = Field(None, description="Title of associated workflow")
    created_at: float = Field(..., description="Creation timestamp")
    updated_at: float = Field(..., description="Last update timestamp")
    file_size: int = Field(..., description="File size in bytes")
    file_size_human: str = Field(..., description="Human-readable file size")
    importance: Optional[float] = Field(None, description="Importance score (1-10)")
    access_count: int = Field(0, description="Number of times accessed")
    tags: List[str] = Field(default_factory=list, description="Associated tags")
    metadata: Dict[str, Any] = Field(
        default_factory=dict, description="Additional metadata"
    )
    relationship_count: int = Field(0, description="Number of related artifacts")
    version_count: int = Field(0, description="Number of versions")
    formatted_created_at: str = Field(..., description="ISO formatted creation date")
    formatted_updated_at: str = Field(..., description="ISO formatted update date")
    age_days: float = Field(..., description="Age of artifact in days")


class ArtifactsFilter(BaseModel):
    """Filter parameters used in the request"""
    artifact_type: Optional[str] = Field(None, description="Type filter applied")
    workflow_id: Optional[str] = Field(None, description="Workflow filter applied")
    tags: Optional[str] = Field(None, description="Tags filter applied")
    search: Optional[str] = Field(None, description="Search query applied")
    sort_by: str = Field(..., description="Sort field used")
    sort_order: str = Field(..., description="Sort order used")


class ArtifactsResponse(BaseModel):
    """Response model for artifacts listing"""
    artifacts: List[Artifact] = Field(..., description="List of artifacts")
    total: int = Field(..., description="Total number of artifacts matching query")
    has_more: bool = Field(..., description="Whether there are more artifacts available")
    filters: ArtifactsFilter = Field(..., description="Filters that were applied")


class ArtifactTypeStats(BaseModel):
    """Statistics for a specific artifact type"""
    artifact_type: str = Field(..., description="Type of artifact")
    count: int = Field(..., description="Number of artifacts of this type")
    avg_importance: Optional[float] = Field(None, description="Average importance score")
    total_size: int = Field(..., description="Total size of all artifacts of this type")
    max_access_count: int = Field(..., description="Maximum access count for this type")


class ArtifactOverallStats(BaseModel):
    """Overall artifact statistics"""
    total_artifacts: int = Field(..., description="Total number of artifacts")
    unique_types: int = Field(..., description="Number of unique artifact types")
    unique_workflows: int = Field(..., description="Number of unique workflows")
    total_size: int = Field(..., description="Total size of all artifacts in bytes")
    total_size_human: str = Field(..., description="Human-readable total size")
    avg_size: float = Field(..., description="Average artifact size in bytes")
    latest_created: Optional[float] = Field(
        None, description="Timestamp of most recent artifact"
    )
    earliest_created: Optional[float] = Field(
        None, description="Timestamp of oldest artifact"
    )


class ArtifactStatsResponse(BaseModel):
    """Response model for artifact statistics"""
    overall: ArtifactOverallStats = Field(..., description="Overall statistics")
    by_type: List[ArtifactTypeStats] = Field(
        ..., description="Statistics broken down by type"
    )


# ---------- Memory Quality Models ----------

class MemoryDetail(BaseModel):
    """Detailed information about a memory"""
    memory_id: str = Field(..., description="Unique memory identifier")
    workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
    memory_type: str = Field(..., description="Type of memory")
    importance: float = Field(..., description="Importance score")
    created_at: float = Field(..., description="Creation timestamp")


class DuplicateGroup(BaseModel):
    """Group of duplicate memories"""
    cluster_id: str = Field(..., description="Unique identifier for this duplicate cluster")
    content_preview: str = Field(..., description="Preview of the duplicated content")
    duplicate_count: int = Field(..., description="Number of duplicates in this group")
    memory_ids: List[str] = Field(..., description="List of all memory IDs in this group")
    primary_memory_id: str = Field(..., description="Suggested primary memory to keep")
    memory_details: List[MemoryDetail] = Field(..., description="Detailed info for each memory")
    first_created: float = Field(..., description="Timestamp of earliest duplicate")
    last_created: float = Field(..., description="Timestamp of latest duplicate")
    avg_importance: float = Field(..., description="Average importance across duplicates")
    recommendation: str = Field(..., description="Recommended action (merge/review)")


class DuplicatesResponse(BaseModel):
    """Response model for duplicate analysis"""
    success: bool = Field(..., description="Whether analysis completed successfully")
    clusters: List[DuplicateGroup] = Field(..., description="List of duplicate groups")
    duplicate_groups: List[DuplicateGroup] = Field(..., description="Alias for clusters (backward compatibility)")
    total_groups: int = Field(..., description="Total number of duplicate groups found")
    total_duplicates: int = Field(..., description="Total number of duplicate memories")


class OrphanedMemory(BaseModel):
    """Model for an orphaned memory"""
    memory_id: str = Field(..., description="Unique memory identifier")
    content: str = Field(..., description="Memory content")
    memory_type: str = Field(..., description="Type of memory")
    importance: float = Field(..., description="Importance score")
    created_at: float = Field(..., description="Creation timestamp")


class OrphanedMemoriesResponse(BaseModel):
    """Response model for orphaned memories"""
    success: bool = Field(..., description="Whether query completed successfully")
    orphaned_memories: List[OrphanedMemory] = Field(..., description="List of orphaned memories")
    total_orphaned: int = Field(..., description="Total count of orphaned memories")
    recommendation: str = Field(..., description="Recommended action for orphaned memories")


class BulkOperationRequest(BaseModel):
    """Request model for bulk operations"""
    operation_type: str = Field(
        ...,
        description="Type of bulk operation to perform",
        regex="^(delete|archive|merge)$"
    )
    memory_ids: List[str] = Field(
        ...,
        description="List of memory IDs to operate on",
        min_items=1
    )
    target_memory_id: Optional[str] = Field(
        None,
        description="Target memory ID for merge operations"
    )


class BulkOperationResponse(BaseModel):
    """Response model for bulk operations"""
    success: bool = Field(..., description="Whether operation completed successfully")
    operation_type: str = Field(..., description="Type of operation performed")
    memory_ids: List[str] = Field(..., description="Memory IDs that were operated on")
    success_count: int = Field(..., description="Number of successful operations")
    error_count: int = Field(..., description="Number of failed operations")
    message: str = Field(..., description="Summary message of the operation")
    errors: List[str] = Field(default_factory=list, description="List of error messages")
    merged_into: Optional[str] = Field(None, description="Target memory ID for merge operations")


class PreviewMemory(BaseModel):
    """Memory preview for bulk operations"""
    memory_id: str = Field(..., description="Memory ID")
    content: str = Field(..., description="Memory content")
    memory_type: str = Field(..., description="Type of memory")
    importance: float = Field(..., description="Importance score")
    workflow_id: Optional[str] = Field(None, description="Associated workflow")


class BulkOperationPreview(BaseModel):
    """Preview of bulk operation effects"""
    operation_type: str = Field(..., description="Type of operation to be performed")
    total_affected: int = Field(..., description="Total memories that will be affected")
    preview_description: str = Field(..., description="Description of what will happen")
    affected_memories: List[PreviewMemory] = Field(..., description="Details of affected memories")
    merge_target: Optional[PreviewMemory] = Field(None, description="Target memory for merge")
    will_be_deleted: Optional[List[PreviewMemory]] = Field(None, description="Memories to be deleted in merge")


class BulkPreviewResponse(BaseModel):
    """Response model for bulk operation preview"""
    success: bool = Field(..., description="Whether preview generated successfully")
    operation: BulkOperationPreview = Field(..., description="Preview of the operation")


# ---------- Working Memory Models ----------

class FocusMode(BaseModel):
    """Focus mode configuration"""
    enabled: bool = Field(..., description="Whether focus mode is enabled")
    focus_keywords: List[str] = Field(default_factory=list, description="Keywords for focus filtering")


class PerformanceMetrics(BaseModel):
    """Working memory performance metrics"""
    avg_relevance_score: float = Field(..., description="Average relevance score across all memories")
    optimization_suggestions: int = Field(..., description="Number of optimization suggestions")


class WorkingMemoryStatus(BaseModel):
    """Complete working memory system status"""
    initialized: bool = Field(..., description="Whether the system is initialized")
    total_capacity: int = Field(..., description="Maximum memory capacity")
    current_size: int = Field(..., description="Current number of memories in pool")
    utilization_percentage: float = Field(..., description="Percentage of capacity used")
    focus_mode: FocusMode = Field(..., description="Focus mode configuration")
    performance_metrics: PerformanceMetrics = Field(..., description="Performance metrics")
    category_distribution: Dict[str, int] = Field(default_factory=dict, description="Memory count by category")
    last_optimization: str = Field(..., description="ISO timestamp of last optimization")
    optimization_count: int = Field(..., description="Total number of optimizations performed")


class InitializeRequest(BaseModel):
    """Request model for initializing working memory"""
    capacity: int = Field(
        100,
        ge=10,
        le=1000,
        description="Maximum number of memories in working pool"
    )
    focus_threshold: float = Field(
        0.7,
        ge=0.0,
        le=1.0,
        description="Relevance threshold for focus mode"
    )


class InitializeResponse(BaseModel):
    """Response model for initialization"""
    success: bool = Field(..., description="Whether initialization was successful")
    message: str = Field(..., description="Status message")
    configuration: Dict[str, Any] = Field(..., description="Applied configuration")


class MemoryItem(BaseModel):
    """Model for a memory in the working pool"""
    memory_id: str = Field(..., description="Unique memory identifier")
    content: str = Field(..., description="Memory content")
    category: str = Field(..., description="Memory category")
    importance: float = Field(..., description="Importance score (0-10)")
    relevance_score: float = Field(..., description="Current relevance score (0-1)")
    added_at: float = Field(..., description="Timestamp when added to working memory")
    last_accessed: float = Field(..., description="Timestamp of last access")
    access_count: int = Field(..., description="Number of times accessed")


class ActiveMemoriesResponse(BaseModel):
    """Response for active memories query"""
    memories: List[MemoryItem] = Field(..., description="List of active memories sorted by relevance")
    total_count: int = Field(..., description="Total number of memories matching criteria")
    focus_active: bool = Field(..., description="Whether focus mode filtering is active")


class SetFocusModeRequest(BaseModel):
    """Request to set focus mode"""
    enabled: bool = Field(..., description="Enable or disable focus mode")
    keywords: List[str] = Field(default_factory=list, description="Keywords for focus filtering", max_items=20)


class OptimizeResponse(BaseModel):
    """Response for optimization operation"""
    success: bool = Field(..., description="Whether optimization was successful")
    removed_count: int = Field(..., description="Number of memories removed")
    message: str = Field(..., description="Optimization result message")


# ---------- Performance Profiler Models ----------

class PerformanceOverviewStats(BaseModel):
    """Overall performance statistics"""
    total_actions: int = Field(..., description="Total number of actions executed")
    active_workflows: int = Field(..., description="Number of unique workflows")
    avg_execution_time: float = Field(..., description="Average execution time in seconds")
    min_execution_time: Optional[float] = Field(None, description="Minimum execution time")
    max_execution_time: Optional[float] = Field(None, description="Maximum execution time")
    successful_actions: int = Field(..., description="Number of successful actions")
    failed_actions: int = Field(..., description="Number of failed actions")
    tools_used: int = Field(..., description="Number of distinct tools used")
    success_rate_percentage: float = Field(..., description="Success rate as percentage")
    throughput_per_hour: float = Field(..., description="Actions processed per hour")
    error_rate_percentage: float = Field(..., description="Error rate as percentage")
    avg_workflow_size: float = Field(..., description="Average actions per workflow")


class TimelineBucket(BaseModel):
    """Performance metrics for a time bucket"""
    time_bucket: str = Field(..., description="Time bucket identifier")
    action_count: int = Field(..., description="Number of actions in this bucket")
    avg_duration: Optional[float] = Field(None, description="Average duration in seconds")
    successful_count: int = Field(..., description="Number of successful actions")
    failed_count: int = Field(..., description="Number of failed actions")
    workflow_count: int = Field(..., description="Number of unique workflows")


class ToolUtilization(BaseModel):
    """Tool utilization metrics"""
    tool_name: str = Field(..., description="Name of the tool")
    usage_count: int = Field(..., description="Number of times used")
    avg_duration: Optional[float] = Field(None, description="Average execution duration")
    success_count: int = Field(..., description="Number of successful executions")
    max_duration: Optional[float] = Field(None, description="Maximum execution duration")


class Bottleneck(BaseModel):
    """Performance bottleneck information"""
    tool_name: str = Field(..., description="Tool causing the bottleneck")
    workflow_id: Optional[str] = Field(None, description="Associated workflow")
    action_id: str = Field(..., description="Action identifier")
    started_at: float = Field(..., description="Start timestamp")
    completed_at: Optional[float] = Field(None, description="Completion timestamp")
    duration: float = Field(..., description="Duration in seconds")
    status: str = Field(..., description="Action status")
    reasoning: Optional[str] = Field(None, description="Action reasoning")


class PerformanceOverviewResponse(BaseModel):
    """Response model for performance overview"""
    overview: PerformanceOverviewStats
    timeline: List[TimelineBucket]
    tool_utilization: List[ToolUtilization]
    bottlenecks: List[Bottleneck]
    analysis_period: Dict[str, Any] = Field(..., description="Analysis period information")
    timestamp: str = Field(..., description="Response generation timestamp")


class ToolBottleneck(BaseModel):
    """Tool performance bottleneck analysis"""
    tool_name: str = Field(..., description="Name of the tool")
    total_calls: int = Field(..., description="Total number of calls")
    avg_duration: float = Field(..., description="Average execution duration")
    max_duration: float = Field(..., description="Maximum execution duration")
    min_duration: float = Field(..., description="Minimum execution duration")
    p95_duration: float = Field(..., description="95th percentile duration")
    p99_duration: float = Field(..., description="99th percentile duration")
    failure_count: int = Field(..., description="Number of failures")
    total_time_spent: float = Field(..., description="Total time spent in seconds")


class WorkflowBottleneck(BaseModel):
    """Workflow performance bottleneck"""
    workflow_id: str = Field(..., description="Workflow identifier")
    title: Optional[str] = Field(None, description="Workflow title")
    action_count: int = Field(..., description="Number of actions")
    avg_action_duration: float = Field(..., description="Average action duration")
    max_action_duration: float = Field(..., description="Maximum action duration")
    total_workflow_time: float = Field(..., description="Total workflow execution time")
    workflow_start: float = Field(..., description="Workflow start timestamp")
    workflow_end: float = Field(..., description="Workflow end timestamp")
    total_elapsed_time: float = Field(..., description="Total elapsed wall-clock time")


class ParallelizationOpportunity(BaseModel):
    """Workflow parallelization opportunity"""
    workflow_id: str = Field(..., description="Workflow identifier")
    sequential_actions: int = Field(..., description="Number of sequential actions")
    total_sequential_time: float = Field(..., description="Total sequential execution time")
    actual_elapsed_time: float = Field(..., description="Actual elapsed time")
    potential_time_savings: float = Field(..., description="Potential time savings in seconds")
    parallelization_efficiency: float = Field(..., description="Current parallelization efficiency percentage")
    optimization_score: float = Field(..., description="Optimization potential score (0-10)")


class ResourceContention(BaseModel):
    """Resource contention analysis"""
    tool_name: str = Field(..., description="Tool name")
    concurrent_usage: int = Field(..., description="Number of concurrent usages")
    avg_duration_under_contention: float = Field(..., description="Average duration when contended")


class OptimizationRecommendation(BaseModel):
    """Performance optimization recommendation"""
    type: str = Field(..., description="Type of optimization")
    priority: str = Field(..., description="Priority level (high, medium, low)")
    title: str = Field(..., description="Recommendation title")
    description: str = Field(..., description="Detailed description")
    impact: str = Field(..., description="Expected impact description")
    actions: List[str] = Field(..., description="Recommended actions to take")


class BottleneckAnalysisResponse(BaseModel):
    """Response model for bottleneck analysis"""
    tool_bottlenecks: List[ToolBottleneck]
    workflow_bottlenecks: List[WorkflowBottleneck]
    parallelization_opportunities: List[ParallelizationOpportunity]
    resource_contention: List[ResourceContention]
    recommendations: List[OptimizationRecommendation]
    analysis_summary: Dict[str, Any]
    timestamp: str


class FlameGraphNode(BaseModel):
    """Model for a flame graph node"""
    name: str = Field(..., description="Name of the node (workflow, tool, or action)")
    value: float = Field(..., description="Duration in seconds")
    children: List['FlameGraphNode'] = Field(default_factory=list, description="Child nodes")
    action_id: Optional[str] = Field(None, description="Action ID if this is an action node")
    status: Optional[str] = Field(None, description="Execution status")
    reasoning: Optional[str] = Field(None, description="Reasoning for the action")
    started_at: Optional[float] = Field(None, description="Start timestamp")
    completed_at: Optional[float] = Field(None, description="Completion timestamp")


FlameGraphNode.model_rebuild()  # Needed for recursive model


class CriticalPathAction(BaseModel):
    """Model for a critical path action"""
    action_id: str = Field(..., description="Action identifier")
    tool_name: str = Field(..., description="Tool used for the action")
    duration: float = Field(..., description="Duration in seconds")
    start_time: float = Field(..., description="Start timestamp")
    end_time: float = Field(..., description="End timestamp")


class WorkflowMetrics(BaseModel):
    """Workflow performance metrics"""
    total_actions: int = Field(..., description="Total number of actions in workflow")
    total_cpu_time: float = Field(..., description="Total CPU time (sum of all action durations)")
    wall_clock_time: float = Field(..., description="Total wall clock time from start to end")
    parallelization_efficiency: float = Field(..., description="Efficiency percentage (0-100)")
    avg_action_duration: float = Field(..., description="Average duration per action")
    workflow_start: float = Field(..., description="Workflow start timestamp")
    workflow_end: float = Field(..., description="Workflow end timestamp")


class WorkflowAnalysis(BaseModel):
    """Analysis results for workflow optimization"""
    bottleneck_tool: Optional[str] = Field(None, description="Tool causing the main bottleneck")
    parallelization_potential: float = Field(..., description="Potential time savings through parallelization")
    optimization_score: float = Field(..., description="Overall optimization score (0-10)")


class FlameGraphResponse(BaseModel):
    """Response model for flame graph generation"""
    flame_graph: FlameGraphNode = Field(..., description="Hierarchical flame graph data")
    metrics: WorkflowMetrics = Field(..., description="Workflow performance metrics")
    critical_path: List[CriticalPathAction] = Field(..., description="Critical path through the workflow")
    analysis: WorkflowAnalysis = Field(..., description="Workflow optimization analysis")
    timestamp: str = Field(..., description="Response generation timestamp")


class DailyTrend(BaseModel):
    """Model for daily performance metrics"""
    date: str = Field(..., description="Date in YYYY-MM-DD format")
    action_count: int = Field(..., description="Number of actions executed")
    avg_duration: Optional[float] = Field(None, description="Average action duration in seconds")
    success_rate: float = Field(..., description="Success rate percentage (0-100)")
    throughput: float = Field(..., description="Actions per hour")
    error_rate: float = Field(..., description="Error rate percentage (0-100)")
    successful_actions: int = Field(..., description="Number of successful actions")
    failed_actions: int = Field(..., description="Number of failed actions")
    workflow_count: int = Field(..., description="Number of unique workflows")
    tool_count: int = Field(..., description="Number of unique tools used")


class ToolTrend(BaseModel):
    """Model for tool-specific performance trends"""
    tool_name: str = Field(..., description="Name of the tool")
    date: str = Field(..., description="Date in YYYY-MM-DD format")
    usage_count: int = Field(..., description="Number of times used")
    avg_duration: Optional[float] = Field(None, description="Average execution duration")
    success_count: int = Field(..., description="Number of successful executions")


class WorkflowComplexityTrend(BaseModel):
    """Model for workflow complexity trends"""
    date: str = Field(..., description="Date in YYYY-MM-DD format")
    workflow_id: str = Field(..., description="Workflow identifier")
    action_count: int = Field(..., description="Number of actions in workflow")
    total_duration: Optional[float] = Field(None, description="Total workflow duration")
    elapsed_time: Optional[float] = Field(None, description="Wall clock time")


class TrendAnalysis(BaseModel):
    """Trend analysis results"""
    performance_trend: str = Field(..., description="Overall performance trend (improving/degrading/stable/insufficient_data)")
    success_trend: str = Field(..., description="Success rate trend (improving/degrading/stable/insufficient_data)")
    data_points: int = Field(..., description="Number of data points analyzed")
    analysis_period_days: int = Field(..., description="Analysis period in days")


class InsightMetrics(BaseModel):
    """Performance insight metrics"""
    best_performing_day: Optional[DailyTrend] = Field(None, description="Day with best performance")
    worst_performing_day: Optional[DailyTrend] = Field(None, description="Day with worst performance")
    peak_throughput_day: Optional[DailyTrend] = Field(None, description="Day with highest throughput")
    avg_daily_actions: float = Field(..., description="Average actions per day")


class PerformanceTrendsResponse(BaseModel):
    """Response model for performance trends analysis"""
    daily_trends: List[DailyTrend] = Field(..., description="Daily performance metrics")
    tool_trends: List[ToolTrend] = Field(..., description="Tool-specific performance trends")
    workflow_complexity: List[WorkflowComplexityTrend] = Field(..., description="Workflow complexity trends")
    trend_analysis: TrendAnalysis = Field(..., description="Overall trend analysis")
    patterns: List[PerformancePattern] = Field(..., description="Detected performance patterns")
    insights: InsightMetrics = Field(..., description="Key performance insights")
    timestamp: str = Field(..., description="Response generation timestamp")


class ImpactEstimate(BaseModel):
    """Model for recommendation impact estimates"""
    time_savings_potential: float = Field(..., description="Estimated time savings in seconds")
    affected_actions: int = Field(..., description="Number of actions that would benefit")
    cost_benefit_ratio: float = Field(..., description="Ratio of benefit to implementation cost")
    affected_workflows: Optional[int] = Field(None, description="Number of affected workflows")
    efficiency_improvement: Optional[float] = Field(None, description="Percentage efficiency improvement")
    reliability_improvement: Optional[float] = Field(None, description="Percentage reliability improvement")
    user_experience_impact: Optional[str] = Field(None, description="Impact on user experience (high/medium/low)")


class PerformanceRecommendation(BaseModel):
    """Model for a single performance recommendation"""
    id: str = Field(..., description="Unique identifier for the recommendation")
    type: str = Field(..., description="Type of recommendation (tool_optimization, parallelization, reliability_improvement)")
    priority: str = Field(..., description="Priority level (high, medium, low)")
    title: str = Field(..., description="Brief title of the recommendation")
    description: str = Field(..., description="Detailed description of the issue and recommendation")
    impact_estimate: ImpactEstimate = Field(..., description="Estimated impact of implementing this recommendation")
    implementation_steps: List[str] = Field(..., description="Step-by-step implementation guide")
    estimated_effort: str = Field(..., description="Estimated implementation effort (low, medium, high)")
    prerequisites: List[str] = Field(..., description="Prerequisites for implementation")
    metrics_to_track: List[str] = Field(..., description="Metrics to track after implementation")


class RecommendationSummary(BaseModel):
    """Summary statistics for recommendations"""
    total_recommendations: int = Field(..., description="Total number of recommendations generated")
    high_priority: int = Field(..., description="Number of high priority recommendations")
    medium_priority: int = Field(..., description="Number of medium priority recommendations")
    low_priority: int = Field(..., description="Number of low priority recommendations")
    estimated_total_savings: float = Field(..., description="Total estimated time savings in seconds")
    analysis_period_hours: int = Field(..., description="Hours of data analyzed")


class ImplementationRoadmap(BaseModel):
    """Categorized implementation roadmap"""
    quick_wins: List[PerformanceRecommendation] = Field(..., description="Low effort, high impact recommendations")
    major_improvements: List[PerformanceRecommendation] = Field(..., description="High effort, high impact recommendations")
    maintenance_tasks: List[PerformanceRecommendation] = Field(..., description="Low priority maintenance recommendations")


class PerformanceRecommendationsResponse(BaseModel):
    """Response model for performance recommendations"""
    recommendations: List[PerformanceRecommendation] = Field(..., description="List of actionable recommendations")
    summary: RecommendationSummary = Field(..., description="Summary statistics")
    implementation_roadmap: ImplementationRoadmap = Field(..., description="Recommendations organized by implementation strategy")
    timestamp: str = Field(..., description="ISO timestamp of analysis")


# ---------- Workflow Management Models ----------

class WorkflowScheduleRequest(BaseModel):
    """Request model for scheduling a workflow"""
    scheduled_at: datetime = Field(
        ...,
        description="ISO timestamp for when to execute the workflow",
        example="2024-01-01T12:00:00Z"
    )
    priority: int = Field(
        default=5,
        ge=1,
        le=10,
        description="Execution priority (1=highest, 10=lowest)",
        example=3
    )


class ScheduleData(BaseModel):
    """Schedule data for the workflow"""
    workflow_id: str = Field(..., description="ID of the scheduled workflow")
    scheduled_at: str = Field(..., description="Scheduled execution time")
    priority: int = Field(..., description="Execution priority")
    status: str = Field(..., description="Schedule status")
    created_at: str = Field(..., description="When the schedule was created")


class WorkflowScheduleResponse(BaseModel):
    """Response model for workflow scheduling"""
    success: bool = Field(..., description="Whether scheduling was successful")
    schedule_id: str = Field(..., description="Unique identifier for this schedule")
    message: str = Field(..., description="Success or error message")
    schedule_data: ScheduleData = Field(..., description="Details of the created schedule")


class RestoreStateRequest(BaseModel):
    """Request model for restoring a cognitive state"""
    restore_mode: str = Field(
        default="full",
        regex="^(full|partial|snapshot)$",
        description="Type of restoration to perform",
        example="full"
    )


class RestoreData(BaseModel):
    """Restoration data"""
    state_id: str = Field(..., description="ID of the state being restored")
    restore_mode: str = Field(..., description="Restoration mode used")
    restored_at: str = Field(..., description="When the restoration occurred")
    original_timestamp: Optional[float] = Field(None, description="Original state timestamp")


class RestoreStateResponse(BaseModel):
    """Response model for state restoration"""
    success: bool = Field(..., description="Whether restoration was successful")
    message: str = Field(..., description="Success or error message")
    restore_data: RestoreData = Field(..., description="Details of the restoration")


# ---------- Health Check Models ----------

class HealthResponse(BaseModel):
    """Health check response"""
    status: str = Field(..., description="Health status indicator", example="ok")
    version: str = Field(..., description="Server version string", example="0.1.0")


# ---------- Performance Trends Models ----------

class PerformancePattern(BaseModel):
    """Detected performance pattern"""
    type: str = Field(..., description="Type of pattern detected")
    description: str = Field(..., description="Description of the pattern")
    impact: str = Field(..., description="Impact level (high/medium/low)")
    recommendation: str = Field(..., description="Recommended action")
    date: Optional[str] = Field(None, description="Date of occurrence for anomalies") 
```
Page 13/35FirstPrevNextLast