#
tokens: 46133/50000 6/207 files (page 10/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 10 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/examples/smart_browser_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
DETAILED Demonstration script for the Smart Browser Tools in Ultimate MCP Server,
showcasing browsing, interaction, search, download, macro, and autopilot features.
"""

import asyncio
import logging
import sys
import time
import traceback
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

# Add project root to path for imports when running as script
# Adjust this relative path if your script structure is different
_PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))
    print(f"INFO: Added {_PROJECT_ROOT} to sys.path")

# Rich imports for enhanced terminal UI
from rich import box, get_console  # noqa: E402
from rich.console import Group  # noqa: E402
from rich.markup import escape  # noqa: E402
from rich.panel import Panel  # noqa: E402
from rich.rule import Rule  # noqa: E402
from rich.table import Table  # noqa: E402
from rich.text import Text  # noqa: E402
from rich.traceback import install as install_rich_traceback  # noqa: E402

# Initialize Rich console
console = get_console()

# Define a fallback logger in case the import fails
def create_fallback_logger(name):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

# Import Gateway and MCP components
from ultimate_mcp_server.core.server import Gateway  # noqa: E402
from ultimate_mcp_server.exceptions import ToolError, ToolInputError  # noqa: E402

# Import smart browser tools directly
from ultimate_mcp_server.tools.smart_browser import (  # noqa: E402
    autopilot,
    browse,
    click,
    collect_documentation,
    download,
    download_site_pdfs,
    parallel,
    run_macro,
    search,
    shutdown,
    type_text,
)
from ultimate_mcp_server.utils import get_logger  # noqa: E402
from ultimate_mcp_server.utils.display import CostTracker  # noqa: E402

# Initialize logger 
logger = get_logger("demo.smart_browser")

# Install rich tracebacks
install_rich_traceback(show_locals=True, width=console.width, extra_lines=2)

# --- Configuration ---
# Base directory for Smart Browser outputs
SMART_BROWSER_INTERNAL_BASE = "storage/smart_browser_internal"  # Relative path used by the tool
SMART_BROWSER_DOWNLOADS_BASE = "storage/smart_browser_downloads"  # Default download relative path
DEMO_OUTPUTS_DIR = Path(
    "./sb_demo_outputs"
)  # Local dir for demo-specific outputs like the test HTML

# Example URLs for demo
URL_EXAMPLE = "http://example.com"
URL_BOOKSTORE = "http://books.toscrape.com/"
URL_QUOTES = "http://quotes.toscrape.com/"
URL_PDF_SAMPLE = "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf"
URL_GITHUB = "https://github.com/features/copilot"

# --- Demo Helper Functions (Unchanged from previous version) ---


def timestamp_str(short: bool = False) -> str:
    """Return a formatted timestamp string."""
    now = time.time()  # Use time.time for consistency
    dt_now = datetime.fromtimestamp(now)
    if short:
        return f"[dim]{dt_now.strftime('%H:%M:%S')}[/]"
    return f"[dim]{dt_now.strftime('%Y-%m-%d %H:%M:%S')}[/]"


def truncate_text_by_lines(text: str, max_lines: int = 50) -> str:
    """Truncates text to show first/last lines if too long."""
    if not text:
        return ""
    lines = text.splitlines()
    if len(lines) <= max_lines:
        return text
    half_lines = max_lines // 2
    # Ensure half_lines is at least 1 if max_lines >= 2
    half_lines = max(1, half_lines)
    # Handle edge case where max_lines is 1
    if max_lines == 1:
        return lines[0] + "\n[...TRUNCATED...]"

    # Return first half, separator, and last half
    return "\n".join(lines[:half_lines] + ["[...TRUNCATED...]"] + lines[-half_lines:])


def format_value(key: str, value: Any, detail_level: int = 1) -> Any:
    """Format specific values for display, returning strings with markup."""
    if value is None:
        return "[dim]None[/]"  # Keep markup
    if isinstance(value, bool):
        return "[green]Yes[/]" if value else "[red]No[/]"  # Keep markup
    if isinstance(value, float):
        return f"{value:.3f}"  # Return simple string
    if key.lower().endswith("time_seconds") or key.lower() == "duration_ms":
        try:
            val_s = float(value) / 1000.0 if key.lower() == "duration_ms" else float(value)
            return f"[green]{val_s:.3f}s[/]"  # Keep markup
        except (ValueError, TypeError):
            return escape(str(value))  # Fallback for non-numeric time values
    if key.lower() == "size_bytes" and isinstance(value, int):
        if value < 0:
            return "[dim]N/A[/]"
        if value > 1024 * 1024:
            return f"{value / (1024 * 1024):.2f} MB"
        if value > 1024:
            return f"{value / 1024:.2f} KB"
        return f"{value} Bytes"  # Return simple string

    if isinstance(value, list):
        if not value:
            return "[dim]Empty List[/]"  # Keep markup
        list_len = len(value)
        preview_count = 3 if detail_level < 2 else 5
        suffix = (
            f" [dim]... ({list_len} items total)[/]" if list_len > preview_count else ""
        )  # Keep markup
        if detail_level >= 1:
            previews = [
                str(
                    format_value(f"{key}[{i}]", item, detail_level=0)
                )  # Recursive call returns string
                for i, item in enumerate(value[:preview_count])
            ]
            return f"[{', '.join(previews)}]{suffix}"  # Returns string with markup
        else:
            return f"[List with {list_len} items]"  # Keep markup

    if isinstance(value, dict):
        if not value:
            return "[dim]Empty Dict[/]"  # Keep markup
        dict_len = len(value)
        preview_count = 4 if detail_level < 2 else 8
        preview_keys = list(value.keys())[:preview_count]
        suffix = (
            f" [dim]... ({dict_len} keys total)[/]" if dict_len > preview_count else ""
        )  # Keep markup
        if detail_level >= 1:
            items_preview = [
                # Key repr for clarity, value formatted recursively
                f"{repr(k)}: {str(format_value(k, value[k], detail_level=0))}"
                for k in preview_keys
            ]
            return f"{{{'; '.join(items_preview)}}}{suffix}"  # Returns string with markup
        else:
            return f"[Dict with {dict_len} keys]"  # Keep markup

    if isinstance(value, str):
        value_truncated = truncate_text_by_lines(value, 30)  # Truncate by lines first
        preview_len = 300 if detail_level < 2 else 600
        suffix = ""
        # Check length after line truncation
        if len(value_truncated) > preview_len:
            value_display = value_truncated[:preview_len]
            suffix = "[dim]... (truncated)[/]"  # Keep markup
        else:
            value_display = value_truncated

        # Escape only if it doesn't look like it contains Rich markup
        if "[" in value_display and "]" in value_display and "/" in value_display:
            # Heuristic: Assume it might contain markup, don't escape
            return value_display + suffix
        else:
            # Safe to escape plain strings
            return escape(value_display) + suffix

    # Fallback: escape the string representation of other types
    return escape(str(value))


def display_page_state(state: Dict[str, Any], title: str = "Page State"):
    """Display the 'page_state' dictionary nicely."""
    panel_content = []
    url = state.get("url", "N/A")
    panel_content.append(
        Text.from_markup(f"[bold cyan]URL:[/bold cyan] [link={url}]{escape(url)}[/link]")
    )
    panel_content.append(
        Text.from_markup(f"[bold cyan]Title:[/bold cyan] {escape(state.get('title', 'N/A'))}")
    )

    main_text = state.get("main_text", "")
    if main_text:
        truncated_text = truncate_text_by_lines(main_text, 15)
        panel_content.append(Text.from_markup("\n[bold cyan]Main Text Summary:[/bold cyan]"))
        panel_content.append(Panel(escape(truncated_text), border_style="dim", padding=(0, 1)))

    elements = state.get("elements", [])
    if elements:
        elements_table = Table(
            title=Text.from_markup(f"Interactive Elements ({len(elements)} found)"),
            box=box.MINIMAL,
            show_header=True,
            padding=(0, 1),
            border_style="blue",
        )
        elements_table.add_column("ID", style="magenta", no_wrap=True)
        elements_table.add_column("Tag", style="cyan")
        elements_table.add_column("Role", style="yellow")
        elements_table.add_column("Text Preview", style="white", max_width=60)
        elements_table.add_column("BBox", style="dim")

        preview_count = 15
        for elem in elements[:preview_count]:
            elem_text_raw = elem.get("text", "")
            elem_text_preview = escape(
                elem_text_raw[:60] + ("..." if len(elem_text_raw) > 60 else "")
            )
            bbox = elem.get("bbox", [])
            if len(bbox) == 4:
                bbox_str = f"({bbox[0]}x{bbox[1]}, {bbox[2]}w{bbox[3]}h)"
            else:
                bbox_str = "[Invalid Bbox]"

            elements_table.add_row(
                str(elem.get("id", "?")),
                str(elem.get("tag", "?")),
                str(elem.get("role", "")),
                elem_text_preview,  # Pass escaped preview string
                bbox_str,
            )
        if len(elements) > preview_count:
            elements_table.add_row(
                "...",
                Text.from_markup(f"[dim]{len(elements) - preview_count} more...[/]"),
                "",
                "",
                "",
            )

        panel_content.append(Text.from_markup("\n[bold cyan]Elements:[/bold cyan]"))
        panel_content.append(elements_table)

    console.print(
        Panel(
            Group(*panel_content),
            title=Text.from_markup(title),
            border_style="blue",
            padding=(1, 2),
            expand=False,
        )
    )


def display_result(
    title: str, result: Dict[str, Any], display_options: Optional[Dict] = None
) -> None:
    """Display operation result with enhanced formatting using Rich."""
    display_options = display_options or {}
    console.print(
        Rule(
            Text.from_markup(f"[bold cyan]{escape(title)}[/] {timestamp_str(short=True)}"),
            style="cyan",
        )
    )

    success = result.get("success", False)
    detail_level = display_options.get("detail_level", 1)
    # Use _display_options from result if available, otherwise use passed options
    effective_display_options = result.get("_display_options", display_options)

    hide_keys_set = set(
        effective_display_options.get(
            "hide_keys",
            [
                "success",
                "page_state",
                "results",
                "steps",
                "download",
                "final_page_state",
                "documentation",
                "raw_response",
                "raw_llm_response",
                "_display_options",  # Also hide internal options
            ],
        )
    )

    # --- Status Panel ---
    status_panel_content = Text.from_markup(
        f"Status: {'[bold green]Success[/]' if success else '[bold red]Failed[/]'}\n"
    )
    if not success:
        error_code = result.get("error_code", "N/A")
        error_msg = result.get("error", "Unknown error")
        status_panel_content.append(
            Text.from_markup(f"Error Code: [yellow]{escape(str(error_code))}[/]\n")
        )
        status_panel_content.append(
            Text.from_markup(f"Message: [red]{escape(str(error_msg))}[/]\n")
        )
        console.print(
            Panel(
                status_panel_content,
                title="Operation Status",
                border_style="red",
                padding=(1, 2),
                expand=False,
            )
        )
    else:
        console.print(
            Panel(
                status_panel_content,
                title="Operation Status",
                border_style="green",
                padding=(0, 1),
                expand=False,
            )
        )

    # --- Top Level Details ---
    details_table = Table(
        title="Result Summary", box=box.MINIMAL, show_header=False, padding=(0, 1)
    )
    details_table.add_column("Key", style="cyan", justify="right", no_wrap=True)
    details_table.add_column("Value", style="white")
    has_details = False
    for key, value in result.items():
        if key in hide_keys_set or key.startswith("_"):
            continue
        formatted_value = format_value(key, value, detail_level=detail_level)
        details_table.add_row(
            escape(str(key)), formatted_value
        )  # formatted_value is already string/markup
        has_details = True
    if has_details:
        console.print(details_table)

    # --- Special Section Displays ---

    # Page State
    if "page_state" in result and isinstance(result["page_state"], dict):
        display_page_state(result["page_state"], title="Page State After Action")
    elif "final_page_state" in result and isinstance(result["final_page_state"], dict):
        display_page_state(result["final_page_state"], title="Final Page State")

    # Search Results
    if "results" in result and isinstance(result["results"], list) and "query" in result:
        search_results = result["results"]
        search_table = Table(
            title=Text.from_markup(
                f"Search Results for '{escape(result['query'])}' ({len(search_results)} found)"
            ),
            box=box.ROUNDED,
            show_header=True,
            padding=(0, 1),
        )
        search_table.add_column("#", style="dim")
        search_table.add_column("Title", style="cyan")
        search_table.add_column("URL", style="blue", no_wrap=False)
        search_table.add_column("Snippet", style="white", no_wrap=False)
        for i, item in enumerate(search_results, 1):
            title = truncate_text_by_lines(item.get("title", ""), 3)
            snippet = truncate_text_by_lines(item.get("snippet", ""), 5)
            url = item.get("url", "")
            search_table.add_row(
                str(i), escape(title), f"[link={url}]{escape(url)}[/link]", escape(snippet)
            )
        console.print(search_table)

    # Download Result
    if "download" in result and isinstance(result["download"], dict):
        dl_info = result["download"]
        dl_table = Table(
            title="Download Details", box=box.MINIMAL, show_header=False, padding=(0, 1)
        )
        dl_table.add_column("Metric", style="cyan", justify="right")
        dl_table.add_column("Value", style="white")
        dl_table.add_row("File Path", escape(dl_info.get("file_path", "N/A")))
        dl_table.add_row("File Name", escape(dl_info.get("file_name", "N/A")))
        dl_table.add_row("SHA256", escape(dl_info.get("sha256", "N/A")))
        dl_table.add_row("Size", format_value("size_bytes", dl_info.get("size_bytes", -1)))
        dl_table.add_row("Source URL", escape(dl_info.get("url", "N/A")))
        dl_table.add_row(
            "Tables Extracted",
            format_value("tables_extracted", dl_info.get("tables_extracted", False)),
        )
        if dl_info.get("tables"):
            # format_value handles potential markup in table preview string
            dl_table.add_row("Table Preview", format_value("tables", dl_info.get("tables")))
        console.print(
            Panel(dl_table, title="Download Result", border_style="green", padding=(1, 2))
        )

    # Macro/Autopilot Steps
    if "steps" in result and isinstance(result["steps"], list):
        steps = result["steps"]
        steps_table = Table(
            title=Text.from_markup(f"Macro/Autopilot Steps ({len(steps)} executed)"),
            box=box.ROUNDED,
            show_header=True,
            padding=(0, 1),
        )
        steps_table.add_column("#", style="dim")
        steps_table.add_column("Action/Tool", style="cyan")
        steps_table.add_column("Arguments/Hint", style="white", no_wrap=False)
        steps_table.add_column("Status", style="yellow")
        steps_table.add_column("Result/Error", style="white", no_wrap=False)

        for i, step in enumerate(steps, 1):
            action = step.get("action", step.get("tool", "?"))
            args = step.get("args")  # Check if 'args' exists
            if args is None:  # If no 'args', use the step itself excluding status keys
                args = {
                    k: v
                    for k, v in step.items()
                    if k
                    not in ["action", "tool", "success", "result", "error", "step", "duration_ms"]
                }

            args_preview = format_value("args", args, detail_level=0)  # format_value handles markup
            success_step = step.get("success", False)
            status = "[green]OK[/]" if success_step else "[red]FAIL[/]"  # Markup string
            outcome = step.get("result", step.get("error", ""))
            outcome_preview = format_value(
                "outcome", outcome, detail_level=0
            )  # format_value handles markup
            steps_table.add_row(str(i), escape(action), args_preview, status, outcome_preview)
        console.print(steps_table)

    # Documentation (assuming it's stored under 'file_path' key now)
    if (
        "file_path" in result and result.get("pages_collected") is not None
    ):  # Check for doc collection result structure
        doc_file_path = result.get("file_path")
        pages_collected = result.get("pages_collected")
        if doc_file_path and pages_collected > 0:
            content_to_display: Any = f"[dim]Documentation saved to: {escape(doc_file_path)}[/]"
            try:
                with open(doc_file_path, "r", encoding="utf-8") as f:
                    content = f.read(1500)  # Read preview
                content_to_display += f"\n\n[bold]File Preview ({len(content)} chars):[/]\n"
                content_to_display += escape(content) + "\n[dim]...[/]"
            except Exception as e:
                content_to_display += f"\n[yellow]Could not read file preview: {escape(str(e))}[/]"

            console.print(
                Panel(
                    Text.from_markup(content_to_display),
                    title=f"Collected Documentation ({pages_collected} pages)",
                    border_style="magenta",
                    padding=(1, 2),
                )
            )

    console.print()  # Add spacing


async def safe_tool_call(
    operation_name: str, tool_func: callable, *args, tracker: Optional[CostTracker] = None, **kwargs
) -> Tuple[bool, Dict[str, Any]]:
    """Safely call a tool function, handling exceptions and logging."""
    console.print(
        f"\n[cyan]Calling Tool:[/][bold] {escape(operation_name)}[/] {timestamp_str(short=True)}"
    )
    display_options = kwargs.pop("display_options", {})

    log_args_repr = {}
    MAX_ARG_LEN = 100
    for k, v in kwargs.items():
        try:
            if isinstance(v, (str, bytes)) and len(v) > MAX_ARG_LEN:
                log_args_repr[k] = f"{type(v).__name__}(len={len(v)})"
            elif isinstance(v, (list, dict)) and len(v) > 10:
                log_args_repr[k] = f"{type(v).__name__}(len={len(v)})"
            else:
                log_args_repr[k] = repr(v)
        except Exception:  # Handle potential errors during repr()
            log_args_repr[k] = f"<{type(v).__name__} repr_error>"

    logger.debug(f"Executing {operation_name} with args: {args}, kwargs: {log_args_repr}")

    try:
        # Call the tool function directly
        result = await tool_func(*args, **kwargs)
        if not isinstance(result, dict):
            logger.error(f"Tool '{operation_name}' returned non-dict type: {type(result)}")
            return False, {
                "success": False,
                "error": f"Tool returned unexpected type: {type(result).__name__}",
                "error_code": "INTERNAL_ERROR",
                "_display_options": display_options,
            }

        # Store display options within the result for the display function
        result["_display_options"] = display_options
        logger.debug(f"Tool '{operation_name}' completed.")
        # Add success=True if missing and no error key present (should usually be set by tool)
        if "success" not in result and "error" not in result:
            result["success"] = True
        return result.get("success", False), result  # Return success flag and the result dict
    except ToolInputError as e:
        logger.warning(f"Input error for {operation_name}: {e}")
        return False, {
            "success": False,
            "error": str(e),
            "error_code": getattr(e, "error_code", "INPUT_ERROR"),
            "_display_options": display_options,
        }
    except ToolError as e:
        logger.error(f"Tool error during {operation_name}: {e}", exc_info=True)
        return False, {
            "success": False,
            "error": str(e),
            "error_code": getattr(e, "error_code", "TOOL_ERROR"),
            "_display_options": display_options,
        }
    except Exception as e:
        logger.error(f"Unexpected error during {operation_name}: {e}", exc_info=True)
        tb_str = traceback.format_exc(limit=1)
        return False, {
            "success": False,
            "error": f"{type(e).__name__}: {e}\n{tb_str}",
            "error_type": type(e).__name__,
            "error_code": "UNEXPECTED_ERROR",
            "_display_options": display_options,
        }


# --- Demo Sections ---

async def demo_section_1_browse(gateway, tracker: CostTracker) -> None:
    console.print(Rule("[bold green]Demo 1: Basic Browsing[/]", style="green"))
    logger.info("Starting Demo Section 1: Basic Browsing")

    # 1a: Browse Example.com
    success, result = await safe_tool_call(
        "Browse Example.com", browse, url=URL_EXAMPLE, tracker=tracker
    )
    display_result("Browse Example.com", result)

    # 1b: Browse Bookstore (wait for specific element)
    success, result = await safe_tool_call(
        "Browse Bookstore (wait for footer)",
        browse,
        url=URL_BOOKSTORE,
        wait_for_selector="footer.footer",
        tracker=tracker,
    )
    display_result("Browse Bookstore (Wait)", result)


async def demo_section_2_interaction(gateway, tracker: CostTracker) -> None:
    console.print(Rule("[bold green]Demo 2: Page Interaction[/]", style="green"))
    logger.info("Starting Demo Section 2: Page Interaction")

    # 2a: Search on Bookstore
    console.print(f"--- Scenario: Search for 'Science' on {URL_BOOKSTORE} ---")
    success, initial_state_res = await safe_tool_call(
        "Load Bookstore Search Page",
        browse,
        url=URL_BOOKSTORE,
        tracker=tracker,
    )
    if not success:
        console.print("[red]Cannot proceed with interaction demo, failed to load page.[/]")
        return
    display_result("Bookstore Initial State", initial_state_res)

    # Fill the search form using task hints
    fields_to_type = [
        {"task_hint": "The search input field", "text": "Science", "enter": False},
    ]
    success, fill_res = await safe_tool_call(
        "Type into Bookstore Search Form",
        type_text,
        url=URL_BOOKSTORE,
        fields=fields_to_type,
        submit_hint="The search button",
        wait_after_submit_ms=1500,
        tracker=tracker,
    )
    display_result("Type into Bookstore Search Form", fill_res)

    # 2b: Click the first search result (if successful)
    if success:
        console.print("--- Scenario: Click the first search result ---")
        current_url = fill_res.get("page_state", {}).get("url", URL_BOOKSTORE)

        success, click_res = await safe_tool_call(
            "Click First Book Result",
            click,
            url=current_url,
            task_hint="The link for the first book shown in the results list",
            wait_ms=1000,
            tracker=tracker,
        )
        display_result("Click First Book Result", click_res)


async def demo_section_3_search(gateway, tracker: CostTracker) -> None:
    console.print(Rule("[bold green]Demo 3: Web Search[/]", style="green"))
    logger.info("Starting Demo Section 3: Web Search")

    search_query = "latest advancements in large language models"

    # 3a: Search Bing
    success, result = await safe_tool_call(
        "Search Bing",
        search,
        query=search_query,
        engine="bing",
        max_results=5,
        tracker=tracker,
    )
    display_result(f"Search Bing: '{search_query}'", result)

    # 3b: Search DuckDuckGo
    success, result = await safe_tool_call(
        "Search DuckDuckGo",
        search,
        query=search_query,
        engine="duckduckgo",
        max_results=5,
        tracker=tracker,
    )
    display_result(f"Search DuckDuckGo: '{search_query}'", result)


async def demo_section_4_download(gateway, tracker: CostTracker) -> None:
    console.print(Rule("[bold green]Demo 4: File Download[/]", style="green"))
    logger.info("Starting Demo Section 4: File Download")

    # Ensure local demo output dir exists
    DEMO_OUTPUTS_DIR_ABS = DEMO_OUTPUTS_DIR.resolve(strict=False) # Resolve to absolute, allow non-existent
    DEMO_OUTPUTS_DIR_ABS.mkdir(parents=True, exist_ok=True) # Ensure it exists after resolving

    # Create the parent directory for PDF downloads if it doesn't exist
    pdf_parent_dir = "storage/smart_browser_site_pdfs"
    console.print(f"[cyan]Creating parent directory for PDFs: {pdf_parent_dir}[/cyan]")
    from ultimate_mcp_server.tools.filesystem import create_directory
    parent_dir_result = await create_directory(path=pdf_parent_dir)
    if not parent_dir_result.get("success", False):
        console.print(f"[yellow]Warning: Could not create parent directory: {parent_dir_result.get('error', 'Unknown error')}[/yellow]")
    else:
        console.print(f"[green]Successfully created parent directory: {pdf_parent_dir}[/green]")

    # 4a: Download PDFs from a site
    console.print("--- Scenario: Find and Download PDFs from Example.com ---")
    success, result = await safe_tool_call(
        "Download PDFs from Example.com",
        download_site_pdfs,
        start_url=URL_EXAMPLE,
        max_depth=1,
        max_pdfs=5,
        dest_subfolder="example_com_pdfs",
        tracker=tracker,
    )
    display_result("Download PDFs from Example.com", result)
    if result.get("pdf_count", 0) == 0:
        console.print("[yellow]Note: No PDFs found on example.com as expected.[/]")

    # 4b: Click-based download
    download_page_content = f"""
    <!DOCTYPE html>
    <html><head><title>Download Test</title></head>
    <body><h1>Download Page</h1>
    <p>Click the link to download a dummy PDF.</p>
    <a href="{URL_PDF_SAMPLE}" id="downloadLink">Download Dummy PDF Now</a>
    <p>Another paragraph.</p>
    </body></html>
    """
    download_page_path = DEMO_OUTPUTS_DIR_ABS / "download_test.html"
    try:
        download_page_path.write_text(download_page_content, encoding="utf-8")
        local_url = download_page_path.as_uri()

        console.print("\n--- Scenario: Click a link to download a file ---")
        success, result = await safe_tool_call(
        "Click to Download PDF",
        download,
        url=local_url,
        task_hint="The 'Download Dummy PDF Now' link",
        dest_dir="storage/sb_demo_outputs/clicked_downloads", # Adjusted path
        tracker=tracker,
        )
        display_result("Click to Download PDF", result)
    except Exception as e:
        console.print(f"[red]Error setting up or running click-download demo: {e}[/]")
    finally:
        if download_page_path.exists():
            try:
                download_page_path.unlink()
            except OSError:
                pass


async def demo_section_5_macro(gateway, tracker: CostTracker) -> None:
    console.print(Rule("[bold green]Demo 5: Execute Macro[/]", style="green"))
    logger.info("Starting Demo Section 5: Execute Macro")

    macro_task = f"Go to {URL_BOOKSTORE}, search for 'History', find the book 'Sapiens: A Brief History of Humankind', and click its link."
    console.print("--- Scenario: Execute Macro ---")
    console.print(f"[italic]Task:[/italic] {escape(macro_task)}")

    success, result = await safe_tool_call(
        "Execute Bookstore Search Macro",
        run_macro,
        url=URL_BOOKSTORE,
        task=macro_task,
        max_rounds=5,
        tracker=tracker,
    )
    display_result("Execute Bookstore Search Macro", result)


async def demo_section_6_autopilot(gateway, tracker: CostTracker) -> None:
    console.print(Rule("[bold green]Demo 6: Autopilot[/]", style="green"))
    logger.info("Starting Demo Section 6: Autopilot")

    autopilot_task = "Search the web for the official documentation URL of the 'httpx' Python library, then browse that URL and summarize the main page content."
    console.print("--- Scenario: Autopilot ---")
    console.print(f"[italic]Task:[/italic] {escape(autopilot_task)}")

    success, result = await safe_tool_call(
        "Run Autopilot: Find httpx Docs",
        autopilot,
        task=autopilot_task,
        max_steps=8,
        scratch_subdir="autopilot_demo",
        tracker=tracker,
    )
    display_result("Run Autopilot: Find httpx Docs", result)
    if result.get("run_log"):
        console.print(f"[dim]Autopilot run log saved to: {result['run_log']}[/]")


async def demo_section_7_parallel(gateway, tracker: CostTracker) -> None:
    console.print(Rule("[bold green]Demo 7: Parallel Processing[/]", style="green"))
    logger.info("Starting Demo Section 7: Parallel Processing")

    urls_to_process = [
        URL_EXAMPLE,
        URL_BOOKSTORE,
        URL_QUOTES,
        "http://httpbin.org/delay/1",
        "https://webscraper.io/test-sites/e-commerce/static",
    ]
    console.print("--- Scenario: Get Page State for Multiple URLs in Parallel ---")
    console.print(f"[dim]URLs:[/dim] {urls_to_process}")

    success, result = await safe_tool_call(
        "Parallel Get Page State",
        parallel,
        urls=urls_to_process,
        action="get_state",  # Only 'get_state' supported currently
        # max_tabs=3 # Can override default here if needed
        tracker=tracker,
    )

    # Custom display for parallel results (same logic as before)
    console.print(Rule("[bold cyan]Parallel Processing Results[/]", style="cyan"))
    if success:
        console.print(f"Total URLs Processed: {result.get('processed_count', 0)}")
        console.print(f"Successful: {result.get('successful_count', 0)}")
        console.print("-" * 20)
        for i, item_result in enumerate(result.get("results", [])):
            url = item_result.get("url", f"URL {i + 1}")
            item_success = item_result.get("success", False)
            panel_title = f"Result for: {escape(url)}"
            border = "green" if item_success else "red"
            content = ""
            if item_success:
                state = item_result.get("page_state", {})
                content = f"Title: {escape(state.get('title', 'N/A'))}\nElements Found: {len(state.get('elements', []))}"
            else:
                content = f"[red]Error:[/red] {escape(item_result.get('error', 'Unknown'))}"
            console.print(
                Panel(content, title=panel_title, border_style=border, padding=(0, 1), expand=False)
            )
    else:
        console.print(
            Panel(
                f"[red]Parallel processing tool call failed:[/red]\n{escape(result.get('error', '?'))}",
                border_style="red",
            )
        )
    console.print()


async def demo_section_8_docs(gateway, tracker: CostTracker) -> None:
    console.print(Rule("[bold green]Demo 8: Documentation Collection[/]", style="green"))
    logger.info("Starting Demo Section 8: Documentation Collection")

    package_name = "fastapi"  # Use a different package
    console.print(f"--- Scenario: Collect Documentation for '{package_name}' ---")

    success, result = await safe_tool_call(
        f"Collect Docs: {package_name}",
        collect_documentation,
        package=package_name,
        max_pages=15,
        rate_limit_rps=2.0,
        tracker=tracker,
    )
    # Use the updated display logic that looks for file_path and pages_collected
    display_result(f"Collect Docs: {package_name}", result)


# --- Main Function ---
async def main() -> int:
    """Run the SmartBrowser tools demo."""
    console.print(Rule("[bold magenta]Smart Browser Tools Demo[/bold magenta]"))

    exit_code = 0
    gateway = None

    # Ensure local demo output directory exists
    DEMO_OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)
    console.print(f"[dim]Demo-specific outputs will be saved in: {DEMO_OUTPUTS_DIR}[/]")

    try:
        # --- Initialize Gateway for providers only ---
        console.print("[cyan]Initializing MCP Gateway...[/]")
        gateway = Gateway("smart-browser-demo")
        console.print("[cyan]Initializing Providers (for LLM tools)...[/]")
        await gateway._initialize_providers()
        
        # --- Initialize Smart Browser module ---
        console.print("[cyan]Initializing Smart Browser tool...[/]")
        # await initialize()
        
        # Initialize CostTracker
        tracker = CostTracker()

        # Run Demo Sections (passing gateway and tracker)
        await demo_section_1_browse(gateway, tracker)
        await demo_section_2_interaction(gateway, tracker)
        await demo_section_3_search(gateway, tracker)
        await demo_section_4_download(gateway, tracker)
        await demo_section_5_macro(gateway, tracker)
        await demo_section_6_autopilot(gateway, tracker) # Uncomment to run autopilot
        # console.print(
        #     "[yellow]Skipping Autopilot demo section (can be intensive). Uncomment to run.[/]"
        # )
        await demo_section_7_parallel(gateway, tracker)
        await demo_section_8_docs(gateway, tracker)

        console.print(Rule("[bold magenta]Demo Complete[/bold magenta]"))

    except Exception as e:
        logger.critical(f"Demo failed with critical error: {e}", exc_info=True)
        console.print("[bold red]CRITICAL ERROR DURING DEMO:[/]")
        console.print_exception(show_locals=True)
        exit_code = 1
    finally:
        # Shutdown Smart Browser
        console.print("[cyan]Shutting down Smart Browser tool...[/]")
        try:
            await shutdown()
        except Exception as e:
            logger.error(f"Error during Smart Browser shutdown: {e}")

    return exit_code


if __name__ == "__main__":
    # Ensure the script is run with asyncio
    try:
        exit_code = asyncio.run(main())
        sys.exit(exit_code)
    except KeyboardInterrupt:
        console.print("\n[yellow]Demo interrupted by user. Shutting down...[/]")
        # Try to run shutdown asynchronously even on keyboard interrupt
        try:
            asyncio.run(shutdown())
        except Exception as e:
            print(f"Error during emergency shutdown: {e}")
        sys.exit(1)

```

--------------------------------------------------------------------------------
/examples/grok_integration_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Grok integration demonstration using Ultimate MCP Server."""
import asyncio
import json
import sys
import time
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

# Third-party imports
from rich import box
from rich.align import Align
from rich.columns import Columns
from rich.console import Console, Group
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
from rich.progress import (
    BarColumn,
    Progress,
    TaskProgressColumn,
    TextColumn,
    TimeElapsedColumn,
)
from rich.rule import Rule
from rich.table import Table
from rich.text import Text
from rich.tree import Tree

# Project imports
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker
from ultimate_mcp_server.utils.logging.console import console

# Initialize logger
logger = get_logger("example.grok_integration")

# Create a separate console for detailed debugging output
debug_console = Console(stderr=True, highlight=False)


async def compare_grok_models(tracker: CostTracker):
    """Compare different Grok models."""
    console.print(Rule("[bold cyan]⚡ Grok Model Comparison [/bold cyan]", style="bold blue"))
    logger.info("Starting Grok models comparison", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("grok-demo", register_tools=False, provider_exclusions=[Provider.OPENROUTER.value])
    
    # Initialize providers
    logger.info("Initializing providers...", emoji_key="provider")
    await gateway._initialize_providers()
    
    provider_name = Provider.GROK
    try:
        # Get the provider from the gateway
        provider = gateway.providers.get(provider_name)
        if not provider:
            logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
            return
        
        logger.info(f"Using provider: {provider_name}", emoji_key="provider")
        
        models = await provider.list_models()
        model_names = [m["id"] for m in models]  # Extract names from model dictionaries
        
        # Display available models in a tree structure with consistent padding
        model_tree = Tree("[bold cyan]Available Grok Models[/bold cyan]")
        for model in model_names:
            # Only display grok-3 models
            if not model.startswith("grok-3"):
                continue
                
            if "fast" in model:
                model_tree.add(f"[bold yellow]{model}[/bold yellow] [dim](optimized for speed)[/dim]")
            elif "mini" in model:
                model_tree.add(f"[bold green]{model}[/bold green] [dim](optimized for reasoning)[/dim]")
            else:
                model_tree.add(f"[bold magenta]{model}[/bold magenta] [dim](general purpose)[/dim]")
        
        # Add padding around the tree
        console.print(Panel(model_tree, border_style="dim cyan", padding=(1, 2)))
        
        # Select specific models to compare
        grok_models = [
            "grok-3-latest",
            "grok-3-mini-latest"
        ]
        
        # Filter based on available models
        models_to_compare = [m for m in grok_models if m in model_names]
        if not models_to_compare:
            # Only use grok-3 models
            models_to_compare = [m for m in model_names if m.startswith("grok-3")][:2]
        
        if not models_to_compare:
            logger.warning("No grok-3 models available for comparison.", emoji_key="warning")
            return
        
        # Consistent panel styling
        console.print(Panel(
            f"Comparing models: [yellow]{escape(', '.join(models_to_compare))}[/yellow]",
            title="[bold]Comparison Setup[/bold]",
            border_style="blue", # Use blue for info
            padding=(1, 2)
        ))
        
        prompt = """
        Explain the concept of quantum entanglement in a way that a high school student would understand.
        Keep your response brief and accessible.
        """
        
        # Consistent panel styling for prompt
        console.print(Panel(
            escape(prompt.strip()),
            title="[bold]Test Prompt[/bold]",
            border_style="yellow", # Yellow for prompts
            expand=False,
            padding=(1, 2)
        ))
        
        results_data = []
        
        # Create progress display with TaskProgressColumn
        with Progress(
            TextColumn("[bold blue]{task.description}"),
            BarColumn(complete_style="green", finished_style="green"),
            TaskProgressColumn(),
            TextColumn("[green]{task.completed} of {task.total}"),
            TimeElapsedColumn(),
            console=console,
            expand=True
        ) as progress:
            task_id = progress.add_task("[cyan]Testing models...", total=len(models_to_compare))
            
            for model_name in models_to_compare:
                progress.update(task_id, description=f"[cyan]Testing model: [bold]{model_name}[/bold]")
                
                try:
                    logger.info(f"Testing model: {model_name}", emoji_key="model")
                    start_time = time.time()
                    result = await provider.generate_completion(
                        prompt=prompt,
                        model=model_name,
                        temperature=0.3,
                        max_tokens=300
                    )
                    processing_time = time.time() - start_time
                    
                    # Track the cost
                    tracker.add_call(result)
                    
                    # Log detailed timing info to debug console
                    debug_console.print(f"[dim]Model {model_name} processing details:[/dim]")
                    debug_console.print(f"[dim]Time: {processing_time:.2f}s | Tokens: {result.total_tokens}[/dim]")
                    
                    # Check if model is a mini model with reasoning output
                    reasoning_content = None
                    reasoning_tokens = None
                    if "mini" in model_name and result.metadata:
                        reasoning_content = result.metadata.get("reasoning_content")
                        reasoning_tokens = result.metadata.get("reasoning_tokens")
                    
                    results_data.append({
                        "model": model_name,
                        "text": result.text,
                        "tokens": {
                            "input": result.input_tokens,
                            "output": result.output_tokens,
                            "total": result.total_tokens
                        },
                        "reasoning_content": reasoning_content,
                        "reasoning_tokens": reasoning_tokens,
                        "cost": result.cost,
                        "time": processing_time
                    })
                    
                    logger.success(
                        f"Completion for {model_name} successful",
                        emoji_key="success",
                    )
                    
                except Exception as e:
                    logger.error(f"Error testing model {model_name}: {str(e)}", emoji_key="error", exc_info=True)
                    debug_console.print_exception()
                    results_data.append({
                        "model": model_name,
                        "error": str(e)
                    })
                
                progress.advance(task_id)
        
        # Display comparison results using Rich
        if results_data:
            # Bolder rule style
            console.print(Rule("[bold green]⚡ Comparison Results [/bold green]", style="bold green"))
            
            # Store panels for potential column layout
            comparison_panels = []
            
            for result_item in results_data:
                model = result_item["model"]
                
                if "error" in result_item:
                    # Handle error case with consistent styling
                    error_panel = Panel(
                        f"[red]{escape(result_item['error'])}[/red]",
                        title=f"[bold red]{escape(model)} - ERROR[/bold red]",
                        border_style="red", # Red for errors
                        expand=False,
                        padding=(1, 2)
                    )
                    comparison_panels.append(error_panel)
                    continue
                
                time_s = result_item["time"]
                tokens = result_item.get("tokens", {})
                input_tokens = tokens.get("input", 0)
                output_tokens = tokens.get("output", 0)
                total_tokens = tokens.get("total", 0)
                
                tokens_per_second = total_tokens / time_s if time_s > 0 else 0
                cost = result_item.get("cost", 0.0)
                text = result_item.get("text", "[red]Error generating response[/red]").strip()
                
                # Determine border color based on model type (consistent scheme)
                border_style = "magenta" # Magenta for general models
                if "mini" in model:
                    border_style = "green" # Green for reasoning
                elif "fast" in model:
                    border_style = "yellow" # Yellow for speed
                
                # Create the panel for this model's output
                model_panel = Panel(
                    escape(text),
                    title=f"[bold {border_style}]{escape(model)}[/bold {border_style}]", # Use border color in title
                    subtitle="[dim]Response Text[/dim]",
                    border_style=border_style,
                    expand=True,
                    # height=len(text.splitlines()) + 4, # Adjust height dynamically based on padding
                    padding=(1, 2) # Standard padding
                )
                
                # Create beautiful stats table with a slightly different box
                stats_table = Table(box=box.MINIMAL, show_header=False, expand=True, padding=0)
                stats_table.add_column("Metric", style="dim cyan", width=15)
                stats_table.add_column("Value", style="white")
                stats_table.add_row("Input Tokens", f"[yellow]{input_tokens}[/yellow]")
                stats_table.add_row("Output Tokens", f"[green]{output_tokens}[/green]")
                stats_table.add_row("Total Tokens", f"[bold cyan]{total_tokens}[/bold cyan]")
                stats_table.add_row("Time", f"[yellow]{time_s:.2f}s[/yellow]")
                stats_table.add_row("Speed", f"[blue]{tokens_per_second:.1f} tok/s[/blue]")
                stats_table.add_row("Cost", f"[green]${cost:.6f}[/green]")
                
                # Combine as a single compact panel
                combined_panel = Panel(
                    Group(
                        model_panel,
                        Align.center(stats_table)
                    ),
                    border_style=border_style,
                    padding=(1, 1), # Reduced padding for combined view
                    title=f"[bold]Response from {escape(model)}[/bold]"
                )
                
                # If there's reasoning content, show it directly
                reasoning_content = result_item.get("reasoning_content")
                reasoning_tokens = result_item.get("reasoning_tokens")
                
                if reasoning_content:
                    reasoning_panel = Panel(
                        escape(reasoning_content),
                        title="[bold cyan]Reasoning Process[/bold cyan]",
                        subtitle=f"[dim]Reasoning Tokens: {reasoning_tokens}[/dim]",
                        border_style="cyan", # Cyan for reasoning/tools
                        expand=True,
                        # height=len(reasoning_content.splitlines()) + 4, # Adjust height
                        padding=(1, 2) # Standard padding
                    )
                    # Group main result and reasoning
                    comparison_panels.append(Group(combined_panel, reasoning_panel))
                else:
                    comparison_panels.append(combined_panel)

            # Use Columns layout if exactly two results (and no errors resulted in fewer panels)
            if len(comparison_panels) == 2 and len(comparison_panels) == len(results_data):
                 console.print(Columns(comparison_panels, equal=True, expand=True))
            else:
                 # Otherwise, print panels sequentially
                 for panel in comparison_panels:
                     console.print(panel)
        
    except Exception as e:
        logger.error(f"Error in model comparison: {str(e)}", emoji_key="error", exc_info=True)


async def demonstrate_reasoning(tracker: CostTracker):
    """Demonstrate Grok-mini reasoning capabilities."""
    console.print(Rule("[bold cyan]⚡ Grok Reasoning Demonstration [/bold cyan]", style="bold blue"))
    logger.info("Demonstrating Grok-mini reasoning capabilities", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("grok-demo", register_tools=False, provider_exclusions=[Provider.OPENROUTER.value])
    
    # Initialize providers
    logger.info("Initializing providers...", emoji_key="provider")
    await gateway._initialize_providers()
    
    provider_name = Provider.GROK
    try:
        # Get the provider from the gateway
        provider = gateway.providers.get(provider_name)
        if not provider:
            logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
            return
        
        # Use a Grok mini model (ensure it's available)
        model = "grok-3-mini-latest"
        available_models = await provider.list_models()
        model_names = [m["id"] for m in available_models]
        
        if model not in model_names:
            # Find any mini model
            for m in model_names:
                if "mini" in m:
                    model = m
                    break
            else:
                logger.warning("No mini model available for reasoning demo. Using default model.", emoji_key="warning")
                model = provider.get_default_model()
        
        logger.info(f"Using model: {model}", emoji_key="model")
        
        # Problem requiring reasoning
        problem = """
        A cylindrical water tank has a radius of 3 meters and a height of 4 meters.
        If water flows in at a rate of 2 cubic meters per minute, how long will it take to fill the tank?
        Show your work step by step.
        """
        
        # Consistent panel styling for prompt
        console.print(Panel(
            escape(problem.strip()),
            title="[bold yellow]Math Problem[/bold yellow]",
            border_style="yellow", # Yellow for prompts
            expand=False,
            padding=(1, 2) # Standard padding
        ))
        
        with Progress(
            TextColumn("[bold blue]Status:"),
            BarColumn(complete_style="green", finished_style="green"),
            TaskProgressColumn(),
            TextColumn("[cyan]{task.description}"),
            TimeElapsedColumn(),
            console=console,
            expand=True
        ) as progress:
            task = progress.add_task("[cyan]Thinking...", total=1)
            
            logger.info("Generating solution with reasoning", emoji_key="processing")
            
            result = await provider.generate_completion(
                prompt=problem,
                model=model,
                temperature=0.3,
                reasoning_effort="high",  # Use high reasoning effort
                max_tokens=1000
            )
            
            # Track the cost
            tracker.add_call(result)
            
            progress.update(task, description="Complete!", completed=1)
        
        logger.success("Reasoning solution completed", emoji_key="success")
        
        # Extract reasoning content
        reasoning_content = None
        reasoning_tokens = None
        if result.metadata:
            reasoning_content = result.metadata.get("reasoning_content")
            reasoning_tokens = result.metadata.get("reasoning_tokens")
        
        # Create a more compact layout for reasoning demo
        if reasoning_content:
            reasoning_panel = Panel(
                escape(reasoning_content),
                title="[bold cyan]Thinking Process[/bold cyan]",
                subtitle=f"[dim]Reasoning Tokens: {reasoning_tokens}[/dim]",
                border_style="cyan", # Cyan for reasoning/tools
                expand=True,
                # height=len(reasoning_content.splitlines()) + 4, # Adjust height
                padding=(1, 2) # Standard padding
            )
        else:
            reasoning_panel = Panel(
                "[italic]No explicit reasoning process available[/italic]",
                title="[bold cyan]Thinking Process[/bold cyan]",
                border_style="cyan", # Cyan for reasoning/tools
                expand=True,
                padding=(1, 2) # Standard padding
            )
        
        # Format the answer
        answer_panel = Panel(
            escape(result.text.strip()),
            title="[bold green]Final Solution[/bold green]",
            subtitle=f"[dim]Tokens: {result.input_tokens} in, {result.output_tokens} out | Cost: ${result.cost:.6f} | Time: {result.processing_time:.2f}s[/dim]",
            border_style="green", # Green for success/final result
            expand=True,
            # height=len(result.text.strip().splitlines()) + 4, # Adjust height
            padding=(1, 2) # Standard padding
        )
        
        # Use Group for better vertical spacing control than grid
        console.print(Group(reasoning_panel, answer_panel))
        
    except Exception as e:
        logger.error(f"Error in reasoning demonstration: {str(e)}", emoji_key="error", exc_info=True)


async def demonstrate_function_calling(tracker: CostTracker):
    """Demonstrate Grok function calling capabilities."""
    console.print(Rule("[bold cyan]⚡ Grok Function Calling Demonstration [/bold cyan]", style="bold blue"))
    logger.info("Demonstrating Grok function calling capabilities", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("grok-demo", register_tools=False, provider_exclusions=[Provider.OPENROUTER.value])
    
    # Initialize providers
    logger.info("Initializing providers...", emoji_key="provider")
    await gateway._initialize_providers()
    
    provider_name = Provider.GROK
    try:
        # Get the provider from the gateway
        provider = gateway.providers.get(provider_name)
        if not provider:
            logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
            return
        
        # Use default Grok model
        model = provider.get_default_model()
        logger.info(f"Using model: {model}", emoji_key="model")
        
        # Define tools for the model to use
        tools = [
            {
                "type": "function",
                "function": {
                    "name": "get_weather",
                    "description": "Get the current weather in a given location",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "location": {
                                "type": "string",
                                "description": "The city and state, e.g. San Francisco, CA"
                            },
                            "unit": {
                                "type": "string",
                                "enum": ["celsius", "fahrenheit"],
                                "description": "The unit of temperature to use"
                            }
                        },
                        "required": ["location"]
                    }
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "get_flight_info",
                    "description": "Get flight information between two cities",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "departure_city": {
                                "type": "string",
                                "description": "The departure city"
                            },
                            "arrival_city": {
                                "type": "string",
                                "description": "The arrival city"
                            },
                            "date": {
                                "type": "string",
                                "description": "The date of travel in YYYY-MM-DD format"
                            }
                        },
                        "required": ["departure_city", "arrival_city"]
                    }
                }
            }
        ]
        
        # Display tools in a Panel for consistency
        tools_table = Table(title="[bold cyan]Available Tools[/bold cyan]", box=box.MINIMAL, show_header=True, header_style="bold magenta")
        tools_table.add_column("Tool Name", style="cyan", no_wrap=True)
        tools_table.add_column("Description", style="white")
        tools_table.add_column("Parameters", style="green")
        
        for tool in tools:
            function = tool["function"]
            name = function["name"]
            description = function["description"]
            params = ", ".join([p for p in function["parameters"]["properties"]])
            tools_table.add_row(name, description, params)
        
        console.print(Panel(tools_table, border_style="cyan", padding=(1, 2))) # Cyan for tools
        
        # User query
        user_query = "I'm planning a trip from New York to Los Angeles next week. What's the weather like in LA, and can you help me find flight information?"
        
        # Consistent panel styling for prompt
        console.print(Panel(
            escape(user_query),
            title="[bold yellow]User Query[/bold yellow]",
            border_style="yellow", # Yellow for prompts
            expand=False,
            padding=(1, 2) # Standard padding
        ))
        
        with Progress(
            TextColumn("[bold blue]Status:"),
            BarColumn(complete_style="green", finished_style="green"),
            TaskProgressColumn(),
            TextColumn("[cyan]{task.description}"),
            TimeElapsedColumn(),
            console=console,
            expand=True
        ) as progress:
            task = progress.add_task("[cyan]Processing...", total=1)
            
            logger.info("Generating completion with function calling", emoji_key="processing")
            
            result = await provider.generate_completion(
                prompt=user_query,
                model=model,
                temperature=0.7,
                tools=tools,
                tool_choice="auto"
            )
            
            # Track the cost
            tracker.add_call(result)
            
            progress.update(task, description="Complete!", completed=1)
        
        logger.success("Function calling completed", emoji_key="success")
        
        # Check if there are tool calls in the response
        tool_calls = None
        if hasattr(result.raw_response.choices[0].message, 'tool_calls') and \
           result.raw_response.choices[0].message.tool_calls:
            tool_calls = result.raw_response.choices[0].message.tool_calls
        
        if tool_calls:
            # Format the model response
            response_text = escape(result.text.strip()) if result.text else "[italic dim]No direct text response, only tool calls.[/italic dim]"
            response_info = f"[dim]Input Tokens: {result.input_tokens} | Output Tokens: {result.output_tokens} | Cost: ${result.cost:.6f}[/dim]"
            
            model_response_panel = Panel(
                response_text,
                title="[bold green]Model Response[/bold green]",
                subtitle=response_info,
                padding=(1, 2), # Standard padding
                border_style="green" # Green for success/results
            )

            # Prepare panels for tool calls
            tool_panels = []
            for tool_call in tool_calls:
                # Parse JSON arguments
                try:
                    args = json.loads(tool_call.function.arguments)
                    args_formatted = f"[json]{escape(json.dumps(args, indent=2))}[/json]"
                except Exception:
                    args_formatted = escape(tool_call.function.arguments)
                
                # Create compact tool call display content
                call_content_lines = [
                    f"[bold cyan]Function:[/bold cyan] [magenta]{tool_call.function.name}[/magenta]",
                    f"[bold cyan]Arguments:[/bold cyan]\n{args_formatted}"
                ]

                # Add mock function result if available
                result_data = None
                if tool_call.function.name == "get_weather":
                    location = args.get("location", "Unknown")
                    unit = args.get("unit", "fahrenheit")
                    temp = 75 if unit == "fahrenheit" else 24
                    result_data = {
                        "location": location,
                        "temperature": temp,
                        "unit": unit,
                        "condition": "Sunny",
                        "humidity": 65
                    }
                elif tool_call.function.name == "get_flight_info":
                    departure = args.get("departure_city", "Unknown")
                    arrival = args.get("arrival_city", "Unknown")
                    date = args.get("date", "2025-04-20") # noqa: F841
                    result_data = {
                        "flights": [
                            {
                                "airline": "Delta", "flight": "DL1234",
                                "departure": f"{departure} 08:30 AM", "arrival": f"{arrival} 11:45 AM",
                                "price": "$349.99"
                            },
                            {
                                "airline": "United", "flight": "UA567",
                                "departure": f"{departure} 10:15 AM", "arrival": f"{arrival} 1:30 PM",
                                "price": "$289.99"
                            }
                        ]
                    }
                
                if result_data:
                    result_formatted = f"[json]{escape(json.dumps(result_data, indent=2))}[/json]"
                    call_content_lines.append(f"\n[bold blue]Mock Result:[/bold blue]\n{result_formatted}")
                
                # Join content lines for the panel
                call_content = "\n".join(call_content_lines)
                
                tool_panel = Panel(
                    call_content,
                    title=f"[bold magenta]Tool Call: {tool_call.function.name}[/bold magenta]",
                    subtitle=f"[dim]ID: {tool_call.id}[/dim]",
                    border_style="magenta", # Magenta for specific tool calls
                    padding=(1, 2) # Standard padding
                )
                tool_panels.append(tool_panel)
            
            # Use Columns for horizontal layout if multiple tool calls
            if len(tool_panels) > 1:
                tool_call_display = Columns(tool_panels, equal=True, expand=True)
            elif tool_panels:
                tool_call_display = tool_panels[0]
            else: # Should not happen if tool_calls is true, but handle defensively
                tool_call_display = Text("No tool calls processed.", style="dim")

            # Create combined panel with response and tool calls
            combined_panel = Panel(
                Group(
                    model_response_panel,
                    tool_call_display
                ),
                title="[bold green]Function Calling Results[/bold green]",
                border_style="green", # Green for overall success
                padding=(1, 1) # Slightly reduced outer padding
            )
            
            console.print(combined_panel)
        else:
            # No tool calls, just display the response with consistent styling
            console.print(Panel(
                escape(result.text.strip()),
                title="[bold green]Model Response (No Tool Calls)[/bold green]",
                subtitle=f"[dim]Tokens: {result.input_tokens} in, {result.output_tokens} out | Cost: ${result.cost:.6f}[/dim]",
                border_style="green", # Green for success/result
                padding=(1, 2) # Standard padding
            ))
        
        console.print() # Keep spacing
        
    except Exception as e:
        logger.error(f"Error in function calling demonstration: {str(e)}", emoji_key="error", exc_info=True)


async def streaming_example(tracker: CostTracker):
    """Demonstrate Grok streaming capabilities."""
    console.print(Rule("[bold cyan]⚡ Grok Streaming Demonstration [/bold cyan]", style="bold blue"))
    logger.info("Demonstrating Grok streaming capabilities", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("grok-demo", register_tools=False, provider_exclusions=[Provider.OPENROUTER.value])
    
    # Initialize providers
    logger.info("Initializing providers...", emoji_key="provider")
    await gateway._initialize_providers()
    
    provider_name = Provider.GROK
    try:
        # Get the provider from the gateway
        provider = gateway.providers.get(provider_name)
        if not provider:
            logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
            return
        
        # Use default Grok model
        model = provider.get_default_model()
        logger.info(f"Using model: {model}", emoji_key="model")
        
        # Create prompt for streaming
        prompt = "Write a short story about an AI that discovers emotions for the first time."
        
        # Consistent panel styling for prompt
        console.print(Panel(
            escape(prompt),
            title="[bold yellow]Streaming Prompt[/bold yellow]",
            border_style="yellow", # Yellow for prompts
            expand=False,
            padding=(1, 2) # Standard padding
        ))
        
        # Create streaming panel with consistent styling
        stream_panel = Panel(
            "",
            title=f"[bold green]Streaming Output from {model}[/bold green]",
            subtitle="[dim]Live output...[/dim]",
            border_style="green", # Green for results
            expand=True,
            height=15, # Slightly increased height for better visibility
            padding=(1, 2) # Standard padding
        )
        
        # Setup for streaming
        logger.info("Starting stream", emoji_key="processing")
        stream = provider.generate_completion_stream(
            prompt=prompt,
            model=model,
            temperature=0.7,
            max_tokens=500
        )
        
        full_text = ""
        chunk_count = 0
        start_time = time.time()
        
        # Display streaming content with Rich Live display
        with Live(stream_panel, console=console, refresh_per_second=10, vertical_overflow="visible") as live:
            async for content, _metadata in stream:
                chunk_count += 1
                full_text += content
                
                # Update the live display
                # Ensure renderable is Text for better control if needed, though escape works
                stream_panel.renderable = Text(escape(full_text))
                stream_panel.subtitle = f"[dim]Received {chunk_count} chunks...[/dim]"
                live.update(stream_panel) # No need to pass stream_panel again

        # Final update to show completion
        stream_panel.subtitle = f"[bold green]Stream Complete ({chunk_count} chunks)[/bold green]"
        # Update the panel content one last time outside the live context
        console.print(stream_panel)

        # Calculate stats
        processing_time = time.time() - start_time
        # More accurate token estimation might involve encoding, but keep simple for demo
        estimated_tokens = len(full_text.split()) * 1.3 # Rough estimate
        tokens_per_second = estimated_tokens / processing_time if processing_time > 0 else 0
        
        # Display final stats in a Panel with a Table
        stats_table = Table(title="[bold blue]Streaming Stats[/bold blue]", box=box.MINIMAL, padding=(0,1), show_header=False)
        stats_table.add_column("Metric", style="dim cyan")
        stats_table.add_column("Value", style="white")
        stats_table.add_row("Total Time", f"[yellow]{processing_time:.2f}s[/yellow]")
        stats_table.add_row("Chunks Received", f"[green]{chunk_count}[/green]")
        stats_table.add_row("Est. Output Tokens", f"[cyan]~{int(estimated_tokens)}[/cyan]")
        stats_table.add_row("Est. Speed", f"[blue]{tokens_per_second:.1f} tok/s[/blue]")
        
        console.print(Panel(stats_table, border_style="blue", padding=(1, 2))) # Blue for info/stats
        logger.success("Streaming completed", emoji_key="success")
        
    except Exception as e:
        logger.error(f"Error in streaming demonstration: {str(e)}", emoji_key="error", exc_info=True)


async def main():
    """Run Grok integration examples."""
    tracker = CostTracker()
    try:
        # Create title with padding
        title = Text("⚡ Grok Integration Showcase ⚡", style="bold white on blue")
        title.justify = "center"
        # Add padding to the main title panel
        console.print(Panel(title, box=box.DOUBLE_EDGE, padding=(1, 0))) # Vertical padding
        
        debug_console.print("[dim]Starting Grok integration demo in debug mode[/dim]")
        
        # Run model comparison
        await compare_grok_models(tracker)
        
        console.print()  # Add space between sections
        
        # Run reasoning demonstration
        await demonstrate_reasoning(tracker)
        
        console.print()  # Add space between sections
        
        # Run function calling demonstration
        await demonstrate_function_calling(tracker)
        
        console.print()  # Add space between sections
        
        # Run streaming example
        await streaming_example(tracker)
        
        # Display final summary
        tracker.display_summary(console)

    except Exception as e:
        logger.critical(f"Example failed: {str(e)}", emoji_key="critical", exc_info=True)
        debug_console.print_exception(show_locals=True)
        return 1
    
    logger.success("Grok Integration Demo Finished Successfully!", emoji_key="complete")
    return 0


if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/cli/commands.py:
--------------------------------------------------------------------------------

```python
"""Command implementations for the Ultimate MCP Server CLI."""
import asyncio
import inspect
import os
import sys
import time
from typing import List, Optional

from rich.console import Console
from rich.progress import (
    BarColumn,
    Progress,
    SpinnerColumn,
    TaskProgressColumn,
    TextColumn,
    TimeElapsedColumn,
)
from rich.rule import Rule
from rich.table import Table

from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.constants import BASE_TOOLSET_CATEGORIES, Provider
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.core.server import Gateway, start_server
from ultimate_mcp_server.graceful_shutdown import enable_quiet_shutdown
from ultimate_mcp_server.services.cache import get_cache_service
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)
console = Console(file=sys.stderr)  # Use stderr to avoid interfering with MCP protocol


def run_server(
    host: Optional[str] = None,
    port: Optional[int] = None,
    workers: Optional[int] = None,
    log_level: Optional[str] = None,
    transport_mode: str = "sse",
    include_tools: Optional[List[str]] = None,
    exclude_tools: Optional[List[str]] = None,
    load_all_tools: bool = False,
) -> None:
    """Start the Ultimate MCP Server with specified configuration.
    
    This function initializes and launches the MCP-compliant server that provides
    an API for AI agents to use various tools and capabilities. The server uses 
    FastAPI under the hood and can be customized through configuration overrides.
    Server settings from command-line arguments take precedence over .env and config file values.
    
    Args:
        host: Network interface to bind to (e.g., '127.0.0.1' for local, '0.0.0.0' for all)
        port: TCP port to listen on (default: 8013 from config)
        workers: Number of parallel worker processes (more workers = higher concurrency)
        log_level: Logging verbosity (debug, info, warning, error, critical)
        transport_mode: Communication protocol mode ('sse' for Server-Sent Events streaming
                        or 'stdio' for standard input/output in certain environments)
        include_tools: Allowlist of specific tool names to register (if not specified, all
                       available tools are registered)
        exclude_tools: Blocklist of specific tool names to exclude (takes precedence over
                       include_tools if both are specified)
        load_all_tools: If True, load all available tools. If False (default), load only 
                       the Base Toolset (completion, filesystem, optimization, provider, 
                       local_text, meta, search).
    """
    # Set up graceful shutdown handling
    enable_quiet_shutdown()
    
    # Get the current config
    cfg = get_config()
    
    # Override config with provided values
    if host:
        cfg.server.host = host
    if port:
        cfg.server.port = port
    if workers:
        cfg.server.workers = workers
    
    # Update tool registration config
    if include_tools or exclude_tools:
        cfg.tool_registration.filter_enabled = True
        
    if include_tools:
        cfg.tool_registration.included_tools = include_tools
        
    if exclude_tools:
        cfg.tool_registration.excluded_tools = exclude_tools
    
    # Determine effective log level
    effective_log_level = log_level or getattr(cfg, 'log_level', 'info')
    
    # Print server info
    console.print("[bold blue]Starting Ultimate MCP Server server[/bold blue]")
    console.print(f"Host: [cyan]{cfg.server.host}[/cyan]")
    console.print(f"Port: [cyan]{cfg.server.port}[/cyan]")
    console.print(f"Workers: [cyan]{cfg.server.workers}[/cyan]")
    console.print(f"Log level: [cyan]{effective_log_level.upper()}[/cyan]")
    console.print(f"Transport mode: [cyan]{transport_mode}[/cyan]")
    
    # Tool Loading Status
    if load_all_tools:
        console.print("Tool Loading: [yellow]All Available Tools[/yellow]")
    else:
        console.print("Tool Loading: [yellow]Base Toolset Only[/yellow] (Use --load-all-tools to load all)")
        # Format the categories for display
        console.print("  [bold]Includes:[/bold]")
        for category, tools in BASE_TOOLSET_CATEGORIES.items():
            console.print(f"    [cyan]{category}[/cyan]: {', '.join(tools)}")
    
    # Print tool filtering info if enabled
    if cfg.tool_registration.filter_enabled:
        if cfg.tool_registration.included_tools:
            console.print(f"Including tools: [cyan]{', '.join(cfg.tool_registration.included_tools)}[/cyan]")
        if cfg.tool_registration.excluded_tools:
            console.print(f"Excluding tools: [red]{', '.join(cfg.tool_registration.excluded_tools)}[/red]")
    
    console.print()
    
    # Set environment variables for the tool context estimator to use
    os.environ["MCP_SERVER_HOST"] = cfg.server.host
    os.environ["MCP_SERVER_PORT"] = str(cfg.server.port)
    
    # Just run the server directly - no threading, no token analysis
    start_server(
        host=cfg.server.host,
        port=cfg.server.port,
        workers=cfg.server.workers,
        log_level=effective_log_level,
        transport_mode=transport_mode,
        include_tools=cfg.tool_registration.included_tools if cfg.tool_registration.filter_enabled else None,
        exclude_tools=cfg.tool_registration.excluded_tools if cfg.tool_registration.filter_enabled else None,
        load_all_tools=load_all_tools,
    )


async def list_providers(check_keys: bool = False, list_models: bool = False) -> None:
    """Display information about configured LLM providers and their status.
    
    This function queries all supported LLM providers (OpenAI, Anthropic, Gemini, etc.),
    displays their configuration status, and can optionally verify API key validity 
    and list available models. This is useful for diagnosing configuration issues 
    or exploring what models are accessible through each provider.
    
    The basic output shows provider names, enabled status, API key presence,
    and default model configuration. Additional information is available through
    the optional parameters.
    
    Args:
        check_keys: When True, tests each configured API key against the respective
                   provider's authentication endpoint to verify it's valid and active
        list_models: When True, queries each provider's model list endpoint and displays
                    all models available for use with your current credentials
    """
    # Get the current config
    cfg = get_config()
    
    # Create provider table
    table = Table(title="Available LLM Providers")
    table.add_column("Provider", style="cyan")
    table.add_column("Enabled", style="green")
    table.add_column("API Key", style="yellow")
    table.add_column("Default Model", style="blue")
    
    # Add spinner during provider initialization
    with Progress(
        SpinnerColumn(),
        TextColumn("[bold blue]Initializing providers...[/bold blue]"),
        transient=True
    ) as progress:
        progress.add_task("init", total=None)
        
        # Create Gateway instance (initializes all providers)
        gateway = Gateway()
        while not hasattr(gateway, 'provider_status') or not gateway.provider_status:
            await asyncio.sleep(0.1)
    
    # Get provider status
    provider_status = gateway.provider_status
    
    # Add rows to table
    for provider_name in [p.value for p in Provider]:
        status = provider_status.get(provider_name, None)
        
        if status:
            enabled = "✅" if status.enabled else "❌"
            api_key = "✅" if status.api_key_configured else "❌"
            
            # Get default model
            provider_cfg = getattr(cfg, 'providers', {}).get(provider_name, None)
            default_model = provider_cfg.default_model if provider_cfg else "N/A"
            
            table.add_row(provider_name, enabled, api_key, default_model)
    
    # Print table
    console.print(table)
    
    # Check API keys if requested
    if check_keys:
        console.print("\n[bold]API Key Check:[/bold]")
        
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]Checking API keys...[/bold blue]"),
            transient=True
        ) as progress:
            progress.add_task("check", total=None)
            
            for provider_name in [p.value for p in Provider]:
                console.print(Rule(f"[bold cyan]{provider_name}[/bold cyan]", align="center"))
                status = provider_status.get(provider_name, None)
                
                if status and status.api_key_configured:
                    try:
                        # Get provider instance
                        provider = get_provider(provider_name)
                        
                        # Check API key
                        valid = await provider.check_api_key()
                        
                        status_text = "[green]valid[/green]" if valid else "[red]invalid[/red]"
                        console.print(f"API Key Status: {status_text}")
                        
                    except Exception as e:
                        console.print(f"[red]Error checking API key: {str(e)}[/red]")
                else:
                    if status:
                        console.print("API Key Status: [yellow]not configured[/yellow]")
                    else:
                        console.print("API Key Status: [dim]Provider not configured/enabled[/dim]")
                console.print() # Add spacing after each provider's check
    
    # List models if requested
    if list_models:
        console.print("\n[bold]Available Models:[/bold]")
        
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]Loading models...[/bold blue]"),
            transient=True
        ) as progress:
            progress.add_task("load", total=None)
            
            for provider_name in [p.value for p in Provider]:
                status = provider_status.get(provider_name, None)
                
                if status and status.available:
                    provider_instance = gateway.providers.get(provider_name)
                    
                    if provider_instance:
                        console.print(Rule(f"[bold cyan]{provider_name} Models[/bold cyan]", align="center"))
                        try:
                            # Get models
                            models = await provider_instance.list_models()
                            
                            # Create model table
                            model_table = Table(box=None, show_header=True, header_style="bold blue")
                            model_table.add_column("Model ID", style="cyan")
                            model_table.add_column("Description", style="green")
                            
                            if models:
                                for model in models:
                                    model_table.add_row(
                                        model["id"],
                                        model.get("description", "[dim]N/A[/dim]")
                                    )
                                console.print(model_table)
                            else:
                                console.print("[yellow]No models found or returned by provider.[/yellow]")
                            
                        except Exception as e:
                            console.print(f"[red]Error listing models for {provider_name}: {str(e)}[/red]")
                else:
                    if status:
                        console.print(Rule(f"[bold dim]{provider_name}[/bold dim]", align="center"))
                        console.print("[yellow]Provider not available or configured.[/yellow]")
                console.print() # Add spacing after each provider's models


async def test_provider(provider: str, model: Optional[str] = None, prompt: str = "Hello, world!") -> None:
    """Test an LLM provider's functionality with a complete request-response cycle.
    
    This function conducts an end-to-end test of a specified LLM provider by 
    sending a sample prompt and displaying the resulting completion. It measures
    performance metrics such as response time, token usage, and estimated cost.
    
    The test verifies:
    - API key validity and configuration
    - Connection to the provider's endpoint
    - Model availability and functionality
    - Response generation capabilities
    - Cost estimation correctness
    
    Results include the generated text, model used, token counts (input/output),
    estimated cost, and response time, providing a comprehensive health check.
    
    Args:
        provider: Provider identifier (e.g., 'openai', 'anthropic', 'gemini')
        model: Specific model to use for the test (if None, uses the provider's default model)
        prompt: Text prompt to send to the model (defaults to a simple greeting)
    """
    console.print(f"[bold]Testing provider:[/bold] [cyan]{provider}[/cyan]")
    console.print(f"[bold]Model:[/bold] [cyan]{model or 'default'}[/cyan]")
    console.print(f'[bold]Prompt:[/bold] [green]"{prompt}"[/green]')
    console.print()
    
    try:
        # Get provider instance
        provider_instance = get_provider(provider)
        
        # Initialize provider
        await provider_instance.initialize()
        
        # Show spinner during generation
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]Generating completion...[/bold blue]"),
            transient=True
        ) as progress:
            progress.add_task("generate", total=None)
            
            # Generate completion
            start_time = time.time()
            result = await provider_instance.generate_completion(
                prompt=prompt,
                model=model,
                temperature=0.7
            )
            elapsed_time = time.time() - start_time
        
        # Print result
        console.print("[bold cyan]Generated text:[/bold cyan]")
        console.print(result.text)
        console.print()
        
        # Print metrics
        console.print(f"[bold]Model used:[/bold] [cyan]{result.model}[/cyan]")
        console.print(f"[bold]Tokens:[/bold] [yellow]{result.input_tokens}[/yellow] input, [yellow]{result.output_tokens}[/yellow] output, [yellow]{result.total_tokens}[/yellow] total")
        console.print(f"[bold]Cost:[/bold] [green]${result.cost:.6f}[/green]")
        console.print(f"[bold]Time:[/bold] [blue]{elapsed_time:.2f}s[/blue]")
        
    except Exception as e:
        console.print(f"[bold red]Error testing provider:[/bold red] {str(e)}")


async def generate_completion(
    provider: str,
    model: Optional[str] = None,
    prompt: str = "",
    temperature: float = 0.7,
    max_tokens: Optional[int] = None,
    system: Optional[str] = None,
    stream: bool = False
) -> None:
    """Generate text from an LLM provider directly through the CLI.
    
    This function provides direct access to LLM text generation capabilities 
    without requiring an MCP server or client. It supports both synchronous 
    (wait for full response) and streaming (token-by-token) output modes.
    
    For providers that natively support system prompts (like Anthropic), 
    the system parameter is passed directly to the API. For other providers,
    the system message is prepended to the prompt with appropriate formatting.
    
    The function displays:
    - Generated text (with real-time streaming if requested)
    - Model information
    - Token usage statistics (input/output)
    - Cost estimate
    - Response time
    
    Args:
        provider: LLM provider identifier (e.g., 'openai', 'anthropic', 'gemini')
        model: Specific model ID to use (if None, uses provider's default model)
        prompt: The text prompt to send to the model
        temperature: Sampling temperature (0.0-2.0) controlling output randomness
                     (lower = more deterministic, higher = more creative)
        max_tokens: Maximum number of tokens to generate in the response
                    (if None, uses provider's default maximum)
        system: Optional system message for setting context/behavior
                (handled differently depending on provider capabilities)
        stream: When True, displays generated tokens as they arrive rather than
                waiting for the complete response (may affect metric reporting)
    """
    try:
        # Get provider instance
        provider_instance = get_provider(provider)
        
        # Initialize provider
        await provider_instance.initialize()
        
        # Set extra parameters based on provider
        kwargs = {}
        if system:
            if provider == Provider.ANTHROPIC.value:
                kwargs["system"] = system
            else:
                # For other providers, prepend system message to prompt
                prompt = f"System: {system}\n\nUser: {prompt}"
        
        # Show progress for non-streaming generation
        if not stream:
            with Progress(
                SpinnerColumn(),
                TextColumn("[bold blue]Generating completion...[/bold blue]"),
                transient=True
            ) as progress:
                progress.add_task("generate", total=None)
                
                # Generate completion
                start_time = time.time()
                result = await provider_instance.generate_completion(
                    prompt=prompt,
                    model=model,
                    temperature=temperature,
                    max_tokens=max_tokens,
                    **kwargs
                )
                elapsed_time = time.time() - start_time
            
            # Print result
            console.print(f"[cyan]{result.text}[/cyan]")
            console.print()
            
            # Print metrics
            console.print(f"[bold]Model:[/bold] [blue]{result.model}[/blue]")
            console.print(f"[bold]Tokens:[/bold] [yellow]{result.input_tokens}[/yellow] input, [yellow]{result.output_tokens}[/yellow] output")
            console.print(f"[bold]Cost:[/bold] [green]${result.cost:.6f}[/green]")
            console.print(f"[bold]Time:[/bold] [blue]{elapsed_time:.2f}s[/blue]")
            
        else:
            # Streaming generation
            console.print("[bold blue]Generating completion (streaming)...[/bold blue]")
            
            # Generate streaming completion
            start_time = time.time()
            stream = provider_instance.generate_completion_stream(
                prompt=prompt,
                model=model,
                temperature=temperature,
                max_tokens=max_tokens,
                **kwargs
            )
            
            # Process stream
            full_text = ""
            async for chunk, metadata in stream:  # noqa: B007
                console.print(chunk, end="")
                sys.stderr.flush()  # Use stderr instead of stdout
                full_text += chunk
            
            elapsed_time = time.time() - start_time
            
            # Print metrics
            console.print("\n")
            console.print(f"[bold]Model:[/bold] [blue]{metadata.get('model', model or 'unknown')}[/blue]")
            console.print(f"[bold]Time:[/bold] [blue]{elapsed_time:.2f}s[/blue]")
            
    except Exception as e:
        console.print(f"[bold red]Error generating completion:[/bold red] {str(e)}")


async def check_cache(show_status: bool = True, clear: bool = False) -> None:
    """View cache statistics and optionally clear the LLM response cache.
    
    The server employs a caching system that stores LLM responses to avoid
    redundant API calls when the same or similar requests are made repeatedly.
    This significantly reduces API costs and improves response times.
    
    This function displays comprehensive cache information including:
    - Basic configuration (enabled status, TTL, max size)
    - Storage details (persistence, directory location)
    - Performance metrics (hit/miss counts, hit ratio percentage)
    - Efficiency data (estimated cost savings, total tokens saved)
    
    Clearing the cache removes all stored responses, which might be useful
    when testing changes, addressing potential staleness, or reclaiming disk space.
    
    Args:
        show_status: When True, displays detailed cache statistics and configuration
        clear: When True, purges all entries from the cache (requires confirmation)
    """
    # Get cache service
    cache_service = get_cache_service()
    
    if clear:
        # Clear cache
        console.print("[bold]Clearing cache...[/bold]")
        cache_service.clear()
        console.print("[green]Cache cleared successfully[/green]")
    
    if show_status:
        # Get cache stats
        stats = cache_service.get_stats()
        
        # Create status table
        table = Table(title="Cache Status")
        table.add_column("Setting", style="cyan")
        table.add_column("Value", style="green")
        
        # Add rows
        table.add_row("Enabled", "✅" if stats["enabled"] else "❌")
        table.add_row("Size", f"{stats['size']} / {stats['max_size']} entries")
        table.add_row("TTL", f"{stats['ttl']} seconds")
        table.add_row("Persistence", "✅" if stats["persistence"]["enabled"] else "❌")
        table.add_row("Cache Directory", stats["persistence"]["cache_dir"])
        table.add_row("Fuzzy Matching", "✅" if stats["fuzzy_matching"] else "❌")
        
        # Print table
        console.print(table)
        
        # Create stats table
        stats_table = Table(title="Cache Statistics")
        stats_table.add_column("Metric", style="cyan")
        stats_table.add_column("Value", style="green")
        
        # Add rows
        cache_stats = stats["stats"]
        stats_table.add_row("Hits", str(cache_stats["hits"]))
        stats_table.add_row("Misses", str(cache_stats["misses"]))
        stats_table.add_row("Hit Ratio", f"{cache_stats['hit_ratio']:.2%}")
        stats_table.add_row("Stores", str(cache_stats["stores"]))
        stats_table.add_row("Evictions", str(cache_stats["evictions"]))
        stats_table.add_row("Total Saved Tokens", f"{cache_stats['total_saved_tokens']:,}")
        stats_table.add_row("Estimated Cost Savings", f"${cache_stats['estimated_cost_savings']:.6f}")
        
        # Print table
        console.print(stats_table)


async def benchmark_providers(
    providers: List[str] = None,
    models: List[str] = None,
    prompt: Optional[str] = None,
    runs: int = 3
) -> None:
    """Compare performance metrics across different LLM providers and models.
    
    This benchmark utility sends identical prompts to multiple provider/model
    combinations and measures various performance characteristics. It runs
    each test multiple times to establish average metrics for more reliable
    comparison.
    
    Benchmarks measure:
    - Response time: How long it takes to receive a complete response
    - Processing speed: Tokens per second throughput
    - Token efficiency: Input/output token ratio
    - Cost: Estimated price per request
    
    Results are presented in a table format for easy comparison. This helps
    identify the optimal provider/model for specific needs based on speed,
    cost, or quality considerations.
    
    Args:
        providers: List of provider identifiers to benchmark (e.g., ['openai', 'anthropic'])
                  If None, benchmarks all available and configured providers
        models: List of specific model IDs to test (e.g., ['gpt-4o', 'claude-3-5-haiku'])
               If None, uses each provider's default model
        prompt: Text prompt to use for testing (should be identical across all providers)
                If None, uses a default explanation prompt
        runs: Number of test iterations to run for each provider/model combination
              (higher values produce more reliable averages but take longer)
    """
    # Use default providers if not specified
    if not providers:
        providers = [p.value for p in Provider]
    
    # Set default prompt if not provided
    if not prompt:
        prompt = "Explain the concept of quantum computing in simple terms that a high school student would understand."
    
    console.print(f"[bold]Running benchmark with {runs} runs per provider/model[/bold]")
    console.print(f'[bold]Prompt:[/bold] [green]"{prompt}"[/green]')
    console.print()
    
    # Create results table
    table = Table(title="Benchmark Results")
    table.add_column("Provider", style="cyan")
    table.add_column("Model", style="blue")
    table.add_column("Avg. Time (s)", style="green")
    table.add_column("Tokens/Sec", style="yellow")
    table.add_column("Avg. Cost ($)", style="magenta")
    table.add_column("Input Tokens", style="dim")
    table.add_column("Output Tokens", style="dim")
    
    # Track benchmarks for progress bar
    total_benchmarks = 0
    for provider_name in providers:
        try:
            provider_instance = get_provider(provider_name)
            await provider_instance.initialize()
            
            # Get available models
            available_models = await provider_instance.list_models()
            
            # Filter models if specified
            if models:
                available_models = [m for m in available_models if m["id"] in models]
            else:
                # Use default model if no models specified
                default_model = provider_instance.get_default_model()
                available_models = [m for m in available_models if m["id"] == default_model]
            
            total_benchmarks += len(available_models)
            
        except Exception:
            # Skip providers that can't be initialized
            pass
    
    # Run benchmarks with progress bar
    with Progress(
        TextColumn("[progress.description]{task.description}"),
        BarColumn(),
        TaskProgressColumn(),
        TimeElapsedColumn()
    ) as progress:
        benchmark_task = progress.add_task("[bold blue]Running benchmarks...", total=total_benchmarks * runs)
        
        for provider_name in providers:
            try:
                # Get provider instance
                provider_instance = get_provider(provider_name)
                await provider_instance.initialize()
                
                # Get available models
                available_models = await provider_instance.list_models()
                
                # Filter models if specified
                if models:
                    available_models = [m for m in available_models if m["id"] in models]
                else:
                    # Use default model if no models specified
                    default_model = provider_instance.get_default_model()
                    available_models = [m for m in available_models if m["id"] == default_model]
                
                for model_info in available_models:
                    model_id = model_info["id"]
                    
                    # Run benchmark for this model
                    total_time = 0.0
                    total_cost = 0.0
                    total_input_tokens = 0
                    total_output_tokens = 0
                    total_tokens = 0
                    
                    for run in range(runs):
                        try:
                            # Update progress description
                            progress.update(
                                benchmark_task,
                                description=f"[bold blue]Benchmarking {provider_name}/{model_id} (Run {run+1}/{runs})"
                            )
                            
                            # Run benchmark
                            start_time = time.time()
                            result = await provider_instance.generate_completion(
                                prompt=prompt,
                                model=model_id,
                                temperature=0.7
                            )
                            run_time = time.time() - start_time
                            
                            # Record metrics
                            total_time += run_time
                            total_cost += result.cost
                            total_input_tokens += result.input_tokens
                            total_output_tokens += result.output_tokens
                            total_tokens += result.total_tokens
                            
                            # Update progress
                            progress.advance(benchmark_task)
                            
                        except Exception as e:
                            console.print(f"[red]Error in run {run+1} for {provider_name}/{model_id}: {str(e)}[/red]")
                            # Still advance progress
                            progress.advance(benchmark_task)
                    
                    # Calculate averages
                    avg_time = total_time / max(1, runs)
                    avg_cost = total_cost / max(1, runs)
                    avg_input_tokens = total_input_tokens // max(1, runs)
                    avg_output_tokens = total_output_tokens // max(1, runs)
                    
                    # Calculate tokens per second
                    tokens_per_second = total_tokens / total_time if total_time > 0 else 0
                    
                    # Add to results table
                    table.add_row(
                        provider_name,
                        model_id,
                        f"{avg_time:.2f}",
                        f"{tokens_per_second:.1f}",
                        f"{avg_cost:.6f}",
                        str(avg_input_tokens),
                        str(avg_output_tokens)
                    )
                    
            except Exception as e:
                console.print(f"[red]Error benchmarking provider {provider_name}: {str(e)}[/red]")
    
    # Print results
    console.print(table)


async def list_tools(category: Optional[str] = None) -> None:
    """Display all available MCP tools registered in the server.
    
    This function provides a comprehensive listing of tools that can be called
    through the Model Context Protocol (MCP) interface. Each tool represents
    a specific capability that AI agents can access, from text generation to
    filesystem operations, browser automation, database access, and more.
    
    Tools are organized into functional categories such as:
    - completion: Text generation capabilities
    - document: Document processing and analysis
    - extraction: Structured data extraction from text
    - filesystem: File and directory operations
    - browser: Web browsing and automation
    - rag: Retrieval-augmented generation
    - database: SQL database interactions
    - meta: Self-reflection and tool discovery
    
    The listing includes tool names, categories, and brief descriptions.
    The output also provides usage hints for filtering which tools are
    enabled when starting the server.
    
    Args:
        category: When specified, only shows tools belonging to the given
                 category (e.g., 'filesystem', 'document', 'browser')
    """
    # Import tools module to get the list of available tools
    from ultimate_mcp_server.tools import STANDALONE_TOOL_FUNCTIONS
    
    # Create tools table
    table = Table(title="Available Ultimate MCP Server Tools")
    table.add_column("Tool Name", style="cyan")
    table.add_column("Category", style="green")
    table.add_column("Description", style="yellow")
    
    # Define tool categories
    categories = {
        "completion": ["generate_completion", "stream_completion", "chat_completion", "multi_completion"],
        "provider": ["get_provider_status", "list_models"],
        "tournament": ["create_tournament", "get_tournament_status", "list_tournaments", "get_tournament_results", "cancel_tournament"],
        "document": ["chunk_document", "summarize_document", "extract_entities", "generate_qa_pairs", "process_document_batch"],
        "extraction": ["extract_json", "extract_table", "extract_key_value_pairs", "extract_semantic_schema", "extract_entity_graph", "extract_code_from_response"],
        "filesystem": ["read_file", "read_multiple_files", "write_file", "edit_file", "create_directory", "list_directory", "directory_tree", "move_file", "search_files", "get_file_info", "list_allowed_directories"],
        "rag": ["create_knowledge_base", "list_knowledge_bases", "delete_knowledge_base", "add_documents", "retrieve_context", "generate_with_rag"],
        "meta": ["get_tool_info", "get_llm_instructions", "get_tool_recommendations", "register_api_meta_tools"],
        "search": ["marqo_fused_search"],
        "ocr": ["extract_text_from_pdf", "process_image_ocr", "enhance_ocr_text", "analyze_pdf_structure", "batch_process_documents"],
        "optimization": ["estimate_cost", "compare_models", "recommend_model", "execute_optimized_workflow"],
        "database": ["connect_to_database", "disconnect_from_database", "discover_database_schema", "execute_query", "generate_database_documentation", "get_table_details", "find_related_tables", "analyze_column_statistics", "execute_parameterized_query", "create_database_view", "create_database_index", "test_connection", "execute_transaction", "execute_query_with_pagination", "get_database_status"],
        "audio": ["transcribe_audio", "extract_audio_transcript_key_points", "chat_with_transcript"],
        "browser": ["browser_init", "browser_navigate", "browser_click", "browser_type", "browser_screenshot", "browser_close", "browser_select", "browser_checkbox", "browser_get_text", "browser_get_attributes", "browser_execute_javascript", "browser_wait", "execute_web_workflow", "extract_structured_data_from_pages", "find_and_download_pdfs", "multi_engine_search_summary"],
        "classification": ["text_classification"],
    }
    
    # Find category for each tool
    tool_categories = {}
    for cat_name, tools in categories.items():
        for tool in tools:
            tool_categories[tool] = cat_name
    
    # Add rows to table
    for tool_func in STANDALONE_TOOL_FUNCTIONS:
        if callable(tool_func):
            tool_name = getattr(tool_func, "__name__", str(tool_func))
            tool_category = tool_categories.get(tool_name, "other")
            
            # Skip if category filter is provided and doesn't match
            if category and category.lower() != tool_category.lower():
                continue
                
            # Get docstring (first line only for description)
            docstring = inspect.getdoc(tool_func) or ""
            description = docstring.split("\n")[0] if docstring else ""
            
            table.add_row(tool_name, tool_category, description)
    
    # Add the special meta tool registrars
    if not category or category.lower() in ["meta", "other"]:
        if not category or category.lower() == "meta":
            table.add_row("register_api_meta_tools", "meta", "Register Meta API tools")
    
    # Sort table by category and tool name
    console.print(table)
    
    # Print usage hint
    console.print("\n[bold]Usage with tool filtering:[/bold]")
    console.print("To include only specific tools:")
    console.print("  umcp run --include-tools tool1 tool2 tool3")
    console.print("\nTo exclude specific tools:")
    console.print("  umcp run --exclude-tools tool1 tool2 tool3")
    console.print("\nTo include tools by category:")
    console.print("  umcp tools --category filesystem  # List filesystem tools")
    console.print("  umcp run --include-tools read_file write_file edit_file  # Include only these filesystem tools")
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/extraction.py:
--------------------------------------------------------------------------------

```python
"""Advanced extraction tools for Ultimate MCP Server.

This module provides tools for extracting structured data (JSON, tables, key-value pairs, code)
from unstructured or semi-structured text using LLMs.
"""

import asyncio
import json
import re  # Added for code extraction
import time
from typing import Any, Dict, List, Optional, Tuple

import jsonschema

from ultimate_mcp_server.constants import Provider

# Removed CompletionRequest import as not directly used by standalone functions
from ultimate_mcp_server.exceptions import ProviderError, ToolInputError
from ultimate_mcp_server.tools.base import BaseTool, with_error_handling, with_tool_metrics

# Import the standardized completion tool
from ultimate_mcp_server.tools.completion import generate_completion
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.extraction")

def _extract_and_parse_json(text: str) -> Tuple[Any, bool, Optional[str]]:
    """
    Robust utility to extract and parse JSON from text, handling various formats and edge cases.
    
    Args:
        text: The text that may contain JSON.
        
    Returns:
        Tuple of (parsed_data, success_flag, error_message)
    """
    # Start with a clean slate
    text = text.strip()
    error_message = None
    
    # Try a series of increasingly aggressive extraction techniques
    extraction_methods = [
        # Method 1: Direct parsing if it's already valid JSON
        lambda t: json.loads(t),
        
        # Method 2: Extract JSON using regex for common patterns
        lambda t: json.loads(re.search(r'(?s)(?:```(?:json)?\s*)?({[\s\S]*?}|\[[\s\S]*?\])(?:\s*```)?', t).group(1).strip()),
        
        # Method 3: Remove markdown fences and try again
        lambda t: json.loads(re.sub(r'```(?:json)?\s*|\s*```', '', t).strip()),
        
        # Method 4: Fix common JSON syntax errors and try again
        lambda t: json.loads(_fix_common_json_errors(t)),
        
        # Method 5: Use ast.literal_eval as a fallback for Python literals
        lambda t: _safe_literal_eval(t)
    ]
    
    # Try each method in sequence until one works
    for i, method in enumerate(extraction_methods):
        try:
            result = method(text)
            logger.debug(f"Successfully parsed JSON using method {i+1}")
            return result, True, None
        except Exception as e:
            # Continue to next method on failure
            if i == len(extraction_methods) - 1:  # Last method
                error_message = f"All JSON parsing methods failed. Last error: {str(e)}"
    
    return None, False, error_message

def _fix_common_json_errors(json_str: str) -> str:
    """
    Fix common JSON syntax errors found in LLM outputs.
    
    Args:
        json_str: The JSON string to fix
        
    Returns:
        Corrected JSON string
    """
    # Remove any text before the first '{' or '['
    json_str = re.sub(r'^.*?([{\[])', r'\1', json_str, flags=re.DOTALL)
    
    # Remove any text after the last '}' or ']'
    json_str = re.sub(r'([}\]])[^}\]]*$', r'\1', json_str, flags=re.DOTALL)
    
    # Fix missing quotes around keys
    json_str = re.sub(r'(\s*)(\w+)(\s*):', r'\1"\2"\3:', json_str)
    
    # Fix trailing commas in arrays
    json_str = re.sub(r',(\s*[\]}])', r'\1', json_str)
    
    # Fix missing commas between elements
    json_str = re.sub(r'(["}\]])(\s*)(["{\[])', r'\1,\2\3', json_str)
    
    return json_str

def _safe_literal_eval(text: str) -> Any:
    """
    Safely evaluate a string containing a Python literal.
    
    Args:
        text: The text containing a Python literal
        
    Returns:
        The evaluated Python object
        
    Raises:
        SyntaxError: If the text cannot be parsed as a Python literal
    """
    import ast
    
    # Remove any text before the first '{' or '['
    text = re.sub(r'^.*?([{\[])', r'\1', text, flags=re.DOTALL)
    
    # Remove any text after the last '}' or ']'
    text = re.sub(r'([}\]])[^}\]]*$', r'\1', text, flags=re.DOTALL)
    
    return ast.literal_eval(text)

@with_tool_metrics
@with_error_handling
async def extract_json(
    text: str,
    json_schema: Optional[Dict] = None,
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None,
    validate_output: bool = True
    # Removed ctx=None
) -> Dict[str, Any]:
    """Extracts structured data formatted as JSON from within a larger text body.

    Use this tool when the input text contains a JSON object or list (potentially embedded
    within other text or markdown code fences) that needs to be isolated and parsed.
    Optionally validates the extracted JSON against a provided schema.

    Args:
        text: The input text potentially containing an embedded JSON object or list.
        json_schema: (Optional) A JSON schema (as a Python dictionary) to validate the extracted
                     JSON against. If validation fails, the error is included in the result.
        provider: The name of the LLM provider (e.g., "openai"). Defaults to "openai".
                  Providers supporting JSON mode (like OpenAI) are recommended for reliability.
        model: The specific model ID (e.g., "openai/gpt-4.1-mini"). Uses provider default if None.
        validate_output: (Optional) If True (default) and `json_schema` is provided, validates
                         the extracted data against the schema.

    Returns:
        A dictionary containing the extraction results:
        {
            "data": { ... } | [ ... ] | null, # The extracted JSON data (or null if extraction/parsing failed).
            "validation_result": {             # Included if json_schema provided & validate_output=True
                "valid": true | false,
                "errors": [ "Validation error message..." ] # List of errors if not valid
            } | null,
            "raw_text": "...",                # Included if JSON parsing failed
            "model": "provider/model-used",
            "provider": "provider-name",
            "tokens": { ... },
            "cost": 0.000045,
            "processing_time": 1.8,
            "success": true | false,
            "error": "Error message if success is false"
        }

    Raises:
        ProviderError: If the provider/LLM fails.
        ToolError: For other internal errors.
    """
    start_time = time.time()
    
    if not text or not isinstance(text, str):
        raise ToolInputError("Input 'text' must be a non-empty string.", param_name="text", provided_value=text)
        
    try:
        # Check if there's already valid JSON in the input text
        extracted_data, success, error_message = _extract_and_parse_json(text)
        
        # If we found valid JSON in the input, return it right away
        if success:
            logger.info("Found and extracted valid JSON directly from input text")
            return {
                "data": extracted_data,
                "validation_result": None,  # No validation done for direct extraction
                "raw_text": None,
                "model": "direct-extraction", # No model used
                "provider": "direct-extraction", # No provider used
                "tokens": {"input": len(text), "output": 0, "total": len(text)},
                "cost": 0.0,  # No cost for direct extraction
                "processing_time": time.time() - start_time,
                "success": True,
                "error": None
            }
        
        # Prepare model ID based on provider format
        effective_model = model
        # Ensure model ID includes provider prefix if not already present
        if model and provider not in model:
             effective_model = f"{provider}/{model}"
        
        schema_description = f"The extracted JSON should conform to this JSON schema:\n```json\n{json.dumps(json_schema, indent=2)}\n```\n" if json_schema else ""
        # Improved prompt asking the LLM to identify and extract the JSON
        prompt = f"Identify and extract the primary JSON object or list embedded within the following text. " \
                 f"{schema_description}Focus on extracting only the JSON data structure itself, removing any surrounding text or markdown fences. " \
                 f"Text:\n```\n{text}\n```\nExtracted JSON:"
        
        # Use JSON mode if supported by the provider (e.g., OpenAI)
        additional_params = {}
        if provider == Provider.OPENAI.value:
            additional_params["response_format"] = {"type": "json_object"}
        
        # Use the standardized completion tool instead of direct provider call
        completion_result = await generate_completion(
            prompt=prompt, 
            model=effective_model,
            provider=provider,
            temperature=0.0, # Low temp for precise extraction
            max_tokens=4000, # Allow for large JSON objects
            additional_params=additional_params
        )
        
        # Extract data from the standardized result format
        processing_time = completion_result.get("processing_time", time.time() - start_time)
        actual_model_used = completion_result.get("model", effective_model) 
        raw_text_output = completion_result.get("text", "").strip()
        token_info = completion_result.get("tokens", {})
        cost = completion_result.get("cost", 0.0)
        tool_success = completion_result.get("success", False)
        
        # If the tool call failed, propagate the error
        if not tool_success:
            error_message = completion_result.get("error", "Unknown error during completion")
            raise ProviderError(
                f"JSON extraction failed: {error_message}", 
                provider=provider, 
                model=actual_model_used
            )
        
        # Use our robust parsing function
        extracted_data, success, error_message = _extract_and_parse_json(raw_text_output)
        validation_result = None
        
        # Validate against schema if requested and extraction succeeded
        if success and json_schema and validate_output:
            validation_result = {"valid": True, "errors": []}
            try: 
                jsonschema.validate(instance=extracted_data, schema=json_schema)
                logger.debug("JSON validated successfully against schema.")
            except jsonschema.exceptions.ValidationError as e:
                validation_result = {"valid": False, "errors": [str(e)]}
                logger.warning(f"JSON validation failed: {e}")
                # Keep success=True as extraction worked, but validation failed

        logger.info(f"JSON extraction attempt complete. Success: {success}, Validated: {validation_result.get('valid') if validation_result else 'N/A'}. Time: {processing_time:.2f}s")
        return {
            "data": extracted_data,
            "validation_result": validation_result,
            "raw_text": raw_text_output if not success else None, # Include raw only on parse failure
            "model": actual_model_used,
            "provider": provider,
            "tokens": token_info,
            "cost": cost,
            "processing_time": processing_time,
            "success": success,
            "error": error_message
        }
            
    except Exception as e:
        error_model = model or f"{provider}/default"
        if isinstance(e, ProviderError):
            raise # Re-raise
        else:
            raise ProviderError(f"JSON extraction failed: {str(e)}", provider=provider, model=error_model, cause=e) from e

@with_tool_metrics
@with_error_handling
async def extract_table(
    text: str,
    headers: Optional[List[str]] = None,
    return_formats: Optional[List[str]] = None, # Renamed from 'formats'
    extract_metadata: bool = False,
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None
    # Removed ctx=None
) -> Dict[str, Any]:
    """Extracts tabular data found within text content.

    Identifies table structures in the input text and extracts the data, attempting
    to return it in specified formats (e.g., JSON list of objects, Markdown table).

    Args:
        text: The input text potentially containing one or more tables.
        headers: (Optional) A list of expected header strings. Providing headers helps the LLM
                 identify the correct table and map columns accurately.
        return_formats: (Optional) List of desired output formats. Supported: "json", "markdown".
                        Defaults to ["json"]. The result dictionary will contain keys matching these formats.
        extract_metadata: (Optional) If True, attempts to extract contextual metadata about the table,
                          such as a title, surrounding notes, or source information. Default False.
        provider: The name of the LLM provider (e.g., "openai"). Defaults to "openai".
        model: The specific model ID (e.g., "openai/gpt-4.1-mini"). Uses provider default if None.

    Returns:
        A dictionary containing the extracted table data and metadata:
        {
            "data": {                           # Dictionary containing requested formats
                "json": [ { "Header1": "Row1Val1", "Header2": "Row1Val2" }, ... ],
                "markdown": "| Header1 | Header2 |\n|---|---|\n| Row1Val1 | Row1Val2 |\n...",
                "metadata": { "title": "Table Title...", "notes": "..." } # If extract_metadata=True
            } | null, # Null if extraction fails
            "model": "provider/model-used",
            "provider": "provider-name",
            "tokens": { ... },
            "cost": 0.000180,
            "processing_time": 3.5,
            "success": true | false,
            "error": "Error message if success is false"
        }

    Raises:
        ProviderError: If the provider/LLM fails.
        ToolError: For other internal errors, including failure to parse the LLM response.
    """
    return_formats = return_formats or ["json"]
    start_time = time.time()
    
    if not text or not isinstance(text, str):
        raise ToolInputError("Input 'text' must be a non-empty string.", param_name="text", provided_value=text)
        
    try:
        # Prepare model ID based on provider format
        effective_model = model
        # Ensure model ID includes provider prefix if not already present
        if model and provider not in model:
            effective_model = f"{provider}/{model}" 
        
        headers_guidance = f"The table likely has headers similar to: {', '.join(headers)}.\n" if headers else "Attempt to identify table headers automatically.\n"
        metadata_guidance = "Also extract any surrounding metadata like a table title, caption, or source notes.\n" if extract_metadata else ""
        formats_guidance = f"Return the extracted table data in these formats: {', '.join(return_formats)}."
        
        # Improved prompt asking for specific formats in a JSON structure
        prompt = f"Identify and extract the primary data table from the following text. " \
                 f"{headers_guidance}{metadata_guidance}{formats_guidance}" \
                 f"Format the output as a single JSON object containing keys for each requested format ({', '.join(return_formats)}) " \
                 f"and optionally a 'metadata' key if requested. Ensure the values are the table represented in that format." \
                 f"\n\nText:\n```\n{text}\n```\nResult JSON:"
        
        # Use JSON mode if supported by the provider
        additional_params = {}
        if provider == Provider.OPENAI.value:
            additional_params["response_format"] = {"type": "json_object"}
        
        # Use the standardized completion tool instead of direct provider call
        completion_result = await generate_completion(
            prompt=prompt, 
            model=effective_model,
            provider=provider,
            temperature=0.0, # Low temp for precise extraction
            max_tokens=4000, 
            additional_params=additional_params
        )
        
        # Extract data from the standardized result format
        processing_time = completion_result.get("processing_time", time.time() - start_time)
        actual_model_used = completion_result.get("model", effective_model)
        raw_text_output = completion_result.get("text", "").strip()
        token_info = completion_result.get("tokens", {})
        cost = completion_result.get("cost", 0.0)
        tool_success = completion_result.get("success", False)
        
        # If the tool call failed, propagate the error
        if not tool_success:
            error_message = completion_result.get("error", "Unknown error during completion")
            raise ProviderError(
                f"Table extraction failed: {error_message}", 
                provider=provider, 
                model=actual_model_used
            )
        
        # Use our robust parsing function
        extraction_result, success, error_message = _extract_and_parse_json(raw_text_output)
            
        # Basic validation if extraction succeeded
        if success and (not isinstance(extraction_result, dict) or not any(fmt in extraction_result for fmt in return_formats)):
            logger.warning(f"Table extraction JSON result missing expected structure or formats ({return_formats}). Result: {extraction_result}")
            # Allow partial success if it's a dict, but log warning
            if isinstance(extraction_result, dict):
                error_message = f"Warning: LLM output did not contain all requested formats ({return_formats})."
            else:
                error_message = "Expected a JSON object with format keys"
                success = False
                extraction_result = None
        
        logger.info(f"Table extraction attempt complete. Success: {success}. Time: {processing_time:.2f}s")
        return {
            "data": extraction_result, 
            "raw_text": raw_text_output if not success else None, # Include raw only on parse failure
            "model": actual_model_used, 
            "provider": provider,
            "tokens": token_info,
            "cost": cost, 
            "processing_time": processing_time, 
            "success": success,
            "error": error_message
        }
            
    except Exception as e:
        error_model = model or f"{provider}/default"
        if isinstance(e, ProviderError):
            raise
        else:
            raise ProviderError(f"Table extraction failed: {str(e)}", provider=provider, model=error_model, cause=e) from e

@with_tool_metrics
@with_error_handling
async def extract_key_value_pairs(
    text: str,
    keys: Optional[List[str]] = None,
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None
    # Removed ctx=None
) -> Dict[str, Any]:
    """Extracts key-value pairs from text, optionally targeting specific keys.

    Use this tool to pull out data points that appear in a "Key: Value" or similar format
    within unstructured text (e.g., fields from a form, details from a description).

    Args:
        text: The input text containing key-value pairs.
        keys: (Optional) A list of specific key names to look for and extract. If omitted,
              the tool attempts to extract all identifiable key-value pairs.
        provider: The name of the LLM provider (e.g., "openai"). Defaults to "openai".
        model: The specific model ID (e.g., "openai/gpt-4.1-mini"). Uses provider default if None.

    Returns:
        A dictionary containing the extracted key-value data and metadata:
        {
            "data": {             # Dictionary of extracted key-value pairs
                "Name": "Alice",
                "Role": "Engineer",
                "Location": "Remote", ...
            } | null,           # Null if extraction fails
            "model": "provider/model-used",
            "provider": "provider-name",
            "tokens": { ... },
            "cost": 0.000070,
            "processing_time": 2.1,
            "success": true | false,
            "error": "Error message if success is false"
        }

    Raises:
        ProviderError: If the provider/LLM fails.
        ToolError: For other internal errors, including failure to parse the LLM JSON response.
    """
    start_time = time.time()
    
    if not text or not isinstance(text, str):
        raise ToolInputError("Input 'text' must be a non-empty string.", param_name="text", provided_value=text)
        
    try:
        # Prepare model ID based on provider format
        effective_model = model
        # Ensure model ID includes provider prefix if not already present
        if model and provider not in model:
            effective_model = f"{provider}/{model}"
        
        keys_guidance = f"Extract the values for these specific keys: {', '.join(keys)}.\n" if keys else "Identify and extract all distinct key-value pairs present in the text.\n"
        prompt = f"Analyze the following text and extract key-value pairs. {keys_guidance}" \
                 f"Format the output as a single, flat JSON object mapping the extracted keys (as strings) to their corresponding values (as strings or appropriate simple types). " \
                 f"Infer the value associated with each key from the text context. Ignore keys not present in the text.\n\n" \
                 f"Text:\n```\n{text}\n```\nResult JSON object:"
        
        # Use JSON mode if supported by the provider
        additional_params = {}
        if provider == Provider.OPENAI.value:
            additional_params["response_format"] = {"type": "json_object"}
        
        # Use the standardized completion tool instead of direct provider call
        completion_result = await generate_completion(
            prompt=prompt, 
            model=effective_model,
            provider=provider,
            temperature=0.0, # Low temp for precise extraction
            max_tokens=2000,
            additional_params=additional_params
        )
        
        # Extract data from the standardized result format
        processing_time = completion_result.get("processing_time", time.time() - start_time)
        actual_model_used = completion_result.get("model", effective_model)
        raw_text_output = completion_result.get("text", "").strip()
        token_info = completion_result.get("tokens", {})
        cost = completion_result.get("cost", 0.0)
        tool_success = completion_result.get("success", False)
        
        # If the tool call failed, propagate the error
        if not tool_success:
            error_message = completion_result.get("error", "Unknown error during completion")
            raise ProviderError(
                f"Key-value pair extraction failed: {error_message}", 
                provider=provider, 
                model=actual_model_used
            )

        # Use our robust parsing function
        kv_data, success, error_message = _extract_and_parse_json(raw_text_output)
        
        # Validate it's a dictionary if extraction succeeded
        if success and not isinstance(kv_data, dict):
            error_message = "Extracted data is not a valid key-value dictionary"
            logger.warning(error_message)
            success = False
            kv_data = None
                 
        logger.info(f"Key-Value pair extraction attempt complete. Success: {success}. Time: {processing_time:.2f}s")
        return {
            "data": kv_data, 
            "raw_text": raw_text_output if not success else None,
            "model": actual_model_used, 
            "provider": provider,
            "tokens": token_info,
            "cost": cost, 
            "processing_time": processing_time, 
            "success": success,
            "error": error_message
        }
            
    except Exception as e:
        error_model = model or f"{provider}/default"
        if isinstance(e, ProviderError):
            raise
        else:
            raise ProviderError(f"Key-value pair extraction failed: {str(e)}", provider=provider, model=error_model, cause=e) from e

@with_tool_metrics
@with_error_handling
async def extract_semantic_schema(
    text: str,
    # Schema should ideally be passed as a structured dict, not within the prompt
    semantic_schema: Dict[str, Any], # Changed from embedding prompt
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None
    # Removed ctx=None
) -> Dict[str, Any]:
    """Extracts information from text matching a specified semantic structure (schema).

    Use this tool when you need to populate a predefined JSON structure with information
    found or inferred from the input text. Unlike `extract_json`, the target JSON structure
    is *defined by you* (via `semantic_schema`), not expected to be present in the input text.

    Args:
        text: The input text containing information to extract.
        semantic_schema: A Python dictionary representing the desired JSON schema for the output.
                         Use JSON Schema conventions (e.g., {"type": "object", "properties": { ... }}).
                         This guides the LLM on what fields to extract and their expected types.
        provider: The name of the LLM provider (e.g., "openai"). Defaults to "openai".
                  Providers supporting JSON mode or strong instruction following are recommended.
        model: The specific model ID (e.g., "openai/gpt-4o"). Uses provider default if None.

    Returns:
        A dictionary containing the extracted data conforming to the schema and metadata:
        {
            "data": { ... }, # The extracted data, structured according to semantic_schema
            "model": "provider/model-used",
            "provider": "provider-name",
            "tokens": { ... },
            "cost": 0.000210,
            "processing_time": 4.1,
            "success": true | false,
            "error": "Error message if success is false"
        }

    Raises:
        ToolInputError: If `semantic_schema` is not a valid dictionary.
        ProviderError: If the provider/LLM fails.
        ToolError: For other internal errors, including failure to parse the LLM JSON response.
    """
    start_time = time.time()
    
    if not text or not isinstance(text, str):
         raise ToolInputError("Input 'text' must be a non-empty string.", param_name="text", provided_value=text)
    if not semantic_schema or not isinstance(semantic_schema, dict):
        raise ToolInputError("Input 'semantic_schema' must be a non-empty dictionary representing a JSON schema.", param_name="semantic_schema", provided_value=semantic_schema)

    try:
        # Prepare model ID based on provider format
        effective_model = model
        # Ensure model ID includes provider prefix if not already present
        if model and provider not in model:
            effective_model = f"{provider}/{model}" 
        
        # Create a clear prompt explaining the task and providing the schema
        schema_str = json.dumps(semantic_schema, indent=2)
        prompt = f"Analyze the following text and extract information that conforms to the provided JSON schema. " \
                 f"Populate the fields in the schema based *only* on information present in the text. " \
                 f"If information for a field is not found, omit the field or use a null value as appropriate according to the schema. " \
                 f"Return ONLY the populated JSON object conforming to the schema.\n\n" \
                 f"JSON Schema:\n```json\n{schema_str}\n```\n\n" \
                 f"Text:\n```\n{text}\n```\nPopulated JSON object:"

        # Use JSON mode if supported by the provider
        additional_params = {}
        if provider == Provider.OPENAI.value:
            additional_params["response_format"] = {"type": "json_object"}
        
        # Use the standardized completion tool instead of direct provider call
        completion_result = await generate_completion(
            prompt=prompt, 
            model=effective_model,
            provider=provider,
            temperature=0.0, # Low temp for precise extraction
            max_tokens=4000,
            additional_params=additional_params
        )
        
        # Extract data from the standardized result format
        processing_time = completion_result.get("processing_time", time.time() - start_time)
        actual_model_used = completion_result.get("model", effective_model)
        raw_text_output = completion_result.get("text", "").strip()
        token_info = completion_result.get("tokens", {})
        cost = completion_result.get("cost", 0.0)
        tool_success = completion_result.get("success", False)
        
        # If the tool call failed, propagate the error
        if not tool_success:
            error_message = completion_result.get("error", "Unknown error during completion")
            raise ProviderError(
                f"Semantic schema extraction failed: {error_message}", 
                provider=provider, 
                model=actual_model_used
            )
        
        # Use our robust parsing function
        extracted_data, success, error_message = _extract_and_parse_json(raw_text_output)
        
        # Validate against the provided schema if extraction succeeded
        if success:
            try:
                jsonschema.validate(instance=extracted_data, schema=semantic_schema)
                logger.debug("Successfully parsed and validated semantic schema JSON.")
            except jsonschema.exceptions.ValidationError as e:
                 error_message = f"Warning: LLM output did not strictly conform to schema: {str(e)}"
                 logger.warning(f"{error_message}. Data: {extracted_data}")
                 # Still consider extraction successful if parsable
                 
        logger.info(f"Semantic schema extraction attempt complete. Success: {success}. Time: {processing_time:.2f}s")
        return {
            "data": extracted_data,
            "raw_text": raw_text_output if not success else None,
            "model": actual_model_used,
            "provider": provider,
            "tokens": token_info,
            "cost": cost,
            "processing_time": processing_time,
            "success": success,
            "error": error_message
        }

    except Exception as e:
        error_model = model or f"{provider}/default"
        if isinstance(e, ProviderError):
            raise
        else:
             raise ProviderError(f"Semantic schema extraction failed: {str(e)}", provider=provider, model=error_model, cause=e) from e

# Note: This is a utility function, not typically exposed as a direct tool,
# but kept here as it relates to extraction from LLM *responses*.
# No standard decorators applied.
async def extract_code_from_response(
    response_text: str, 
    model: str = "openai/gpt-4.1-mini", 
    timeout: int = 15,
    tracker: Optional[Any] = None # Add optional tracker (use Any for now to avoid circular import)
) -> str:
    """Extracts code blocks from LLM response text, using an LLM for complex cases.

    Primarily designed to clean up responses from code generation tasks.
    It first tries simple regex matching for markdown code fences. If that fails,
    it uses a specified LLM to identify and extract the code.

    Args:
        response_text: The text potentially containing code blocks.
        model: The specific model ID to use for LLM-based extraction if regex fails.
               Defaults to "openai/gpt-4.1-mini".
        timeout: Timeout in seconds for the LLM extraction call. Default 15.
        tracker: (Optional) An instance of a CostTracker for tracking cost and metrics.

    Returns:
        The extracted code block as a string, or the original text if no code is found or extraction fails.
    """
    if not response_text or not isinstance(response_text, str):
        return "" # Return empty if no input
        
    # Try simple regex extraction first (common markdown format)
    code_blocks = re.findall(r"```(?:[a-zA-Z0-9\-_]*\n)?(.*?)\n?```", response_text, re.DOTALL)
    
    if code_blocks:
        # Return the content of the first code block found
        logger.debug("Extracted code using regex.")
        return code_blocks[0].strip()
        
    # If regex fails, use LLM for more robust extraction
    logger.debug("Regex failed, attempting LLM-based code extraction.")
    try:
        # Parse provider from model string if it contains a slash
        provider_id = model.split('/')[0] if '/' in model else Provider.OPENAI.value
        effective_model = model  # Use the full model string as provided
        
        prompt = f"Extract only the main code block from the following text. Return just the code itself, without any explanations or markdown fences.\n\nText:\n```\n{response_text}\n```\n\nCode:"
        
        # Set a timeout using asyncio.wait_for
        completion_task = generate_completion(
            prompt=prompt,
            model=effective_model,
            provider=provider_id,
            temperature=0.0,
            max_tokens=len(response_text) # Allow enough tokens, approx original length
        )
        
        # Use asyncio.wait_for to implement timeout
        completion_result = await asyncio.wait_for(completion_task, timeout=timeout)
        
        # Check if completion succeeded
        if not completion_result.get("success", False):
            logger.warning(f"LLM code extraction failed: {completion_result.get('error', 'Unknown error')}. Returning original text.")
            return response_text
        
        # Track cost if tracker is provided
        if tracker:
            try:
                # Use getattr to safely access attributes, provide defaults
                # Create a temporary object for tracking as CostTracker expects attributes
                class Trackable: 
                    pass
                trackable = Trackable()
                trackable.cost = completion_result.get('cost', 0.0)
                trackable.input_tokens = completion_result.get('tokens', {}).get('input', 0)
                trackable.output_tokens = completion_result.get('tokens', {}).get('output', 0)
                trackable.provider = provider_id
                trackable.model = completion_result.get('model', effective_model)
                trackable.processing_time = completion_result.get('processing_time', 0.0)
                tracker.add_call(trackable)
            except Exception as track_err:
                 logger.warning(f"Could not track cost for LLM code extraction: {track_err}", exc_info=False)

        extracted_code = completion_result.get("text", "").strip()
        logger.info(f"Extracted code using LLM ({effective_model}).")
        return extracted_code
        
    except asyncio.TimeoutError:
        logger.warning(f"LLM code extraction timed out after {timeout}s. Returning original text.")
        return response_text # Fallback to original on timeout
    except Exception as e:
        logger.error(f"LLM code extraction failed: {str(e)}. Returning original text.", exc_info=False)
        return response_text # Fallback to original on error

class ExtractionTools(BaseTool):
    """Tools for extracting structured data from unstructured text."""
    
    tool_name = "extraction"
    description = "Tools for extracting structured data from unstructured text, including JSON, tables, and key-value pairs."
    
    def __init__(self, gateway):
        """Initialize extraction tools.
        
        Args:
            gateway: Gateway or MCP server instance
        """
        super().__init__(gateway)
        self._register_tools()
        
    def _register_tools(self):
        """Register extraction tools with MCP server."""
        # Register the extraction functions as tools
        self.mcp.tool(name="extract_json")(extract_json)
        self.mcp.tool(name="extract_table")(extract_table) 
        self.mcp.tool(name="extract_key_value_pairs")(extract_key_value_pairs)
        self.mcp.tool(name="extract_semantic_schema")(extract_semantic_schema)
        self.logger.info("Registered extraction tools", emoji_key="success")
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/base.py:
--------------------------------------------------------------------------------

```python
"""Base LLM provider interface."""
import abc
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple

from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS, Provider
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


class ModelResponse:
    """
    Standard response format for all LLM provider completions in the Ultimate MCP Server.
    
    This class provides a unified representation of responses from different LLM providers,
    normalizing their various formats into a consistent structure. It handles common
    operations like:
    
    - Storing and accessing the generated text content
    - Tracking token usage statistics (input, output, total)
    - Computing cost estimates based on provider pricing
    - Preserving metadata and raw responses for debugging
    - Formatting response data for serialization
    
    The ModelResponse serves as a bridge between provider-specific response formats
    and the standardized interface presented by the Ultimate MCP Server. This allows
    client code to work with responses in a consistent way regardless of which
    provider generated them.
    
    All provider implementations in the system should return responses wrapped in
    this class to ensure consistent behavior across the application.
    """
    
    def __init__(
        self,
        text: str,
        model: str,
        provider: str,
        input_tokens: int = 0,
        output_tokens: int = 0,
        total_tokens: int = 0,
        processing_time: float = 0.0,
        raw_response: Any = None,
        metadata: Optional[Dict[str, Any]] = None,
    ):
        """
        Initialize a standardized model response object.
        
        This constructor creates a unified response object that normalizes the various
        response formats from different LLM providers into a consistent structure.
        It automatically calculates derived values like total token count and cost
        estimates based on the provided parameters.
        
        Args:
            text: The generated text content from the LLM. This is the primary output
                 that would be presented to users.
            model: The specific model name that generated this response (e.g., "gpt-4o",
                  "claude-3-5-haiku-20241022").
            provider: The provider name that served this response (e.g., "openai",
                     "anthropic").
            input_tokens: Number of input/prompt tokens consumed in this request.
                         Used for usage tracking and cost calculation.
            output_tokens: Number of output/completion tokens generated in this response.
                          Used for usage tracking and cost calculation.
            total_tokens: Total token count for the request. If not explicitly provided,
                         calculated as input_tokens + output_tokens.
            processing_time: Time taken to generate the response in seconds, measured
                            from request initiation to response completion.
            raw_response: The original, unmodified response object from the provider's API.
                         Useful for debugging and accessing provider-specific data.
            metadata: Additional response metadata as a dictionary. Can contain provider-specific
                     information like finish_reason, logprobs, etc.
        """
        self.text = text
        self.model = model
        self.provider = provider
        self.input_tokens = input_tokens
        self.output_tokens = output_tokens
        self.total_tokens = total_tokens or (input_tokens + output_tokens)
        self.processing_time = processing_time
        self.raw_response = raw_response
        self.metadata = metadata or {}
        
        # Calculate cost based on token usage
        self.cost = self._calculate_cost()
        
    def _calculate_cost(self) -> float:
        """
        Calculate the estimated cost of the request based on token usage and current pricing.
        
        This internal method computes a cost estimate by:
        1. Looking up the per-million-token costs for the specific model used
        2. Applying different rates for input (prompt) and output (completion) tokens
        3. Computing the final cost based on actual token counts
        
        If pricing data isn't available for the specific model, the method falls back
        to reasonable default estimations and logs a warning.
        
        Returns:
            Estimated cost in USD as a floating-point value. Returns 0.0 if token counts
            are not available or if the model name is not recognized.
            
        Note:
            This is an estimation and may not precisely match actual billing from providers,
            especially as pricing changes over time or for custom deployment configurations.
        """
        if not self.model or not self.input_tokens or not self.output_tokens:
            return 0.0
            
        # Extract model name without provider prefix (e.g., strip "openai/" from "openai/gpt-4o")
        model_name = self.model
        if "/" in model_name:
            model_name = model_name.split("/", 1)[1]
            
        # Get cost per token for this model
        model_costs = COST_PER_MILLION_TOKENS.get(model_name, None)
        if not model_costs:
            # If model not found, use a default estimation
            model_costs = {"input": 0.5, "output": 1.5}
            logger.warning(
                f"Cost data not found for model {self.model}. Using estimates.", 
                emoji_key="cost"
            )
            
        # Calculate cost
        input_cost = (self.input_tokens / 1_000_000) * model_costs["input"]
        output_cost = (self.output_tokens / 1_000_000) * model_costs["output"]
        
        return input_cost + output_cost
        
    def to_dict(self) -> Dict[str, Any]:
        """
        Convert the response object to a dictionary suitable for serialization.
        
        This method creates a structured dictionary representation of the response
        that can be easily serialized to JSON or other formats. The dictionary
        preserves all important fields while organizing them into a clean,
        hierarchical structure.
        
        The token usage statistics are grouped under a 'usage' key, making it
        easier to access and analyze metrics separately from the content.
        
        Returns:
            A dictionary containing all relevant response data with the following structure:
            {
                "text": str,              # The generated text content
                "model": str,             # Model name used
                "provider": str,          # Provider name
                "usage": {                # Token usage statistics
                    "input_tokens": int,
                    "output_tokens": int,
                    "total_tokens": int
                },
                "processing_time": float, # Time taken in seconds
                "cost": float,            # Estimated cost in USD
                "metadata": dict          # Additional response metadata
            }
            
        Example:
            ```python
            response = await provider.generate_completion(prompt="Hello")
            response_dict = response.to_dict()
            json_response = json.dumps(response_dict)
            ```
        """
        return {
            "text": self.text,
            "model": self.model,
            "provider": self.provider,
            "usage": {
                "input_tokens": self.input_tokens,
                "output_tokens": self.output_tokens,
                "total_tokens": self.total_tokens,
            },
            "processing_time": self.processing_time,
            "cost": self.cost,
            "metadata": self.metadata,
        }


class BaseProvider(abc.ABC):
    """
    Abstract base class that defines the interface for all LLM providers in Ultimate MCP Server.
    
    This class establishes the common API contract that all provider implementations must follow,
    ensuring consistent behavior regardless of the underlying LLM service (OpenAI, Anthropic, etc.).
    It standardizes key operations like:
    
    - Provider initialization and API key management
    - Text completion generation (both synchronous and streaming)
    - Model listing and default model selection
    - API key validation
    - Request timing and performance tracking
    
    By implementing this interface, each provider ensures compatibility with the broader
    Ultimate MCP Server framework. This abstraction layer allows the system to work with multiple
    LLM providers interchangeably, while hiding the provider-specific implementation details
    from the rest of the application.
    
    Provider implementations should inherit from this class and override all abstract methods.
    They may also extend the interface with provider-specific functionality as needed,
    though core components of the Ultimate MCP Server should rely only on the methods defined
    in this base class to ensure provider-agnostic operation.
    
    Usage example:
        ```python
        class OpenAIProvider(BaseProvider):
            provider_name = "openai"
            
            async def initialize(self) -> bool:
                # OpenAI-specific initialization...
                
            async def generate_completion(self, prompt: str, **kwargs) -> ModelResponse:
                # OpenAI-specific completion implementation...
                
            # Other required method implementations...
        ```
    """
    
    provider_name: str = "base"
    
    def __init__(self, api_key: Optional[str] = None, **kwargs):
        """Initialize the provider.
        
        Args:
            api_key: API key for the provider
            **kwargs: Additional provider-specific options
        """
        # Get API key from environment if not provided
        if api_key is None:
            api_key = None  # No longer try to get from env, will be provided by config system
            
        self.api_key = api_key
        self.options = kwargs
        self.client = None
        self.logger = get_logger(f"provider.{self.provider_name}")
        
    @abc.abstractmethod
    async def initialize(self) -> bool:
        """
        Initialize the provider client and verify API connectivity.
        
        This abstract method defines the standard interface for initializing a provider
        connection. All provider implementations must override this method with their
        provider-specific initialization logic while maintaining this signature.
        
        The initialization process typically includes:
        1. Creating the provider-specific client with the API key and configuration
        2. Setting up any required HTTP headers, authentication, or session management
        3. Verifying API connectivity with a lightweight request when possible
        4. Setting up provider-specific rate limiting or retry mechanisms
        5. Loading any required provider-specific resources or configurations
        
        This method is called:
        - When a provider is first instantiated via the get_provider factory
        - When a provider connection needs to be refreshed or re-established
        - Before any operations that require an active client connection
        
        Returns:
            bool: True if initialization was successful and the provider is ready for use,
                 False if initialization failed for any reason. A False return will
                 typically prevent the provider from being used by the calling code.
                 
        Raises:
            No exceptions should be raised directly. All errors should be handled
            internally, logged appropriately, and reflected in the return value.
            If initialization fails, detailed error information should be logged
            to help diagnose the issue.
            
        Implementation guidelines:
            - Handle API keys securely, avoiding logging them even in error messages
            - Implement retries with exponential backoff for transient errors
            - Set reasonable timeouts on API connection attempts
            - Log detailed diagnostics on initialization failures
            - Cache expensive resources to improve subsequent initialization times
        """
        pass
        
    @abc.abstractmethod
    async def generate_completion(
        self,
        prompt: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> ModelResponse:
        """
        Generate a text completion from the provider (non-streaming).
        
        This abstract method defines the standard interface for generating text completions
        from any LLM provider. All provider implementations must override this method with
        their provider-specific implementation while maintaining this signature.
        
        The method handles sending a prompt to the LLM, processing the response, and
        converting it to the standardized ModelResponse format. It is responsible for
        handling provider-specific API calls, error handling, and token counting.
        
        Args:
            prompt: The text prompt to send to the model. This is the primary input that
                   the model will generate a completion for.
            model: The specific model identifier to use (e.g., "gpt-4o", "claude-3-opus").
                  If None, the provider's default model will be used.
            max_tokens: Maximum number of tokens to generate in the response. If None,
                       the provider's default or maximum limit will be used.
            temperature: Controls randomness in the output. Lower values (e.g., 0.1) make
                        the output more deterministic, while higher values (e.g., 1.0)
                        make it more random and creative. Range is typically 0.0-1.0
                        but may vary by provider.
            **kwargs: Additional provider-specific parameters such as:
                     - top_p: Nucleus sampling parameter (alternative to temperature)
                     - stop_sequences: Tokens/strings that will stop generation when encountered
                     - frequency_penalty: Penalty for using frequent tokens
                     - presence_penalty: Penalty for repeated tokens
                     - system_prompt: System instructions for providers that support it
                     - response_format: Structured format request (e.g., JSON)
            
        Returns:
            ModelResponse: A standardized response object containing:
                         - The generated text
                         - Token usage statistics (input, output, total)
                         - Cost estimation
                         - Processing time
                         - Provider and model information
                         - Any provider-specific metadata
            
        Raises:
            ValueError: For invalid parameter combinations or values
            ConnectionError: For network or API connectivity issues
            AuthenticationError: For API key or authentication problems
            RateLimitError: When provider rate limits are exceeded
            ProviderError: For other provider-specific errors
            
        Implementation guidelines:
            - Use the provider's official client library when available
            - Handle error conditions gracefully with meaningful error messages
            - Track token usage precisely for accurate cost estimation
            - Measure processing time with the process_with_timer utility
            - Include relevant provider-specific metadata in the response
        """
        pass
        
    @abc.abstractmethod
    async def generate_completion_stream(
        self,
        prompt: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
        """
        Generate a streaming text completion with real-time token delivery.
        
        This abstract method defines the standard interface for streaming text completions
        from any LLM provider. All provider implementations must override this method with
        their provider-specific streaming implementation while maintaining this signature.
        
        Unlike the non-streaming generate_completion method, this method:
        - Returns content incrementally as it's generated
        - Uses an async generator that yields content chunks
        - Provides metadata with each chunk to track generation progress
        - Enables real-time display and processing of partial responses
        
        Args:
            prompt: The text prompt to send to the model. This is the primary input that
                   the model will generate a completion for.
            model: The specific model identifier to use (e.g., "gpt-4o", "claude-3-opus").
                  If None, the provider's default model will be used.
            max_tokens: Maximum number of tokens to generate in the response. If None,
                       the provider's default or maximum limit will be used.
            temperature: Controls randomness in the output. Lower values (e.g., 0.1) make
                        the output more deterministic, while higher values (e.g., 1.0)
                        make it more random and creative. Range is typically 0.0-1.0
                        but may vary by provider.
            **kwargs: Additional provider-specific parameters, identical to those
                     supported by generate_completion.
            
        Yields:
            Tuple[str, Dict[str, Any]]: Each yield returns:
                - str: The next chunk of generated text
                - Dict: Metadata about the generation process, including at minimum:
                  - done: Boolean indicating if this is the final chunk
                  - chunk_index: Integer index of the current chunk (0-based)
                  
                  The metadata may also include provider-specific information such as:
                  - finish_reason: Why the generation stopped (e.g., "stop", "length")
                  - token_count: Running count of tokens generated
                  - model: Model information if it changed during generation
                  
        Raises:
            ValueError: For invalid parameter combinations or values
            ConnectionError: For network or API connectivity issues
            AuthenticationError: For API key or authentication problems
            RateLimitError: When provider rate limits are exceeded
            ProviderError: For other provider-specific errors
            
        Implementation guidelines:
            - Use the provider's official streaming endpoints when available
            - Ensure chunks represent logical breaks where possible (e.g., words, not partial UTF-8)
            - Handle connection interruptions gracefully
            - Set the 'done' flag to True only in the final yielded chunk
            - Provide consistent metadata structure across all yielded chunks
        """
        pass
        
    async def list_models(self) -> List[Dict[str, Any]]:
        """
        List available models from this provider with their capabilities and metadata.
        
        This method retrieves information about all available models from the provider,
        including their identifiers, capabilities, and contextual metadata. Providers
        typically override this method to query their API's model list endpoint and
        normalize the responses into a consistent format.
        
        The base implementation returns a minimal default model entry, but provider-specific
        implementations should:
        1. Query the provider's models API or endpoint
        2. Transform provider-specific model data into the standard format
        3. Enrich the models with useful metadata like token limits and capabilities
        4. Filter models based on access permissions if applicable
        
        Returns:
            A list of dictionaries, each representing a model with at least these keys:
            - id (str): The model identifier (e.g., "gpt-4o", "claude-3-opus")
            - provider (str): The provider name (e.g., "openai", "anthropic")
            - description (str): A human-readable description of the model
            
            Models may also include additional metadata such as:
            - max_tokens (int): Maximum combined tokens (prompt + completion)
            - created (str): Creation/version date of the model
            - pricing (dict): Cost information for input/output tokens
            - capabilities (list): Features the model supports (e.g., "vision", "function_calling")
            - deprecated (bool): Whether the model is deprecated or scheduled for retirement
            
        Raises:
            ConnectionError: If the provider API cannot be reached
            AuthenticationError: If authentication fails during the request
            ProviderError: For other provider-specific errors
            
        Note:
            Model data may be cached internally to reduce API calls. Providers should
            implement appropriate caching strategies to balance freshness with performance.
        """
        # Default implementation - override in provider-specific classes
        return [
            {
                "id": "default-model",
                "provider": self.provider_name,
                "description": "Default model",
            }
        ]
        
    def get_default_model(self) -> str:
        """
        Get the default model identifier for this provider.
        
        This method returns the standard or recommended model identifier to use when
        no specific model is requested. Each provider implementation must override this
        method to specify its default model.
        
        The default model should be:
        - Generally available to all users of the provider
        - Well-balanced between capabilities and cost
        - Appropriate for general-purpose text generation tasks
        - Stable and reliable for production use
        
        The implementation should consider:
        1. Provider-specific configuration settings
        2. Environment variables or system settings
        3. User access level and permissions
        4. Regional availability of models
        5. Current model deprecation status
        
        Returns:
            str: The model identifier string (e.g., "gpt-4o", "claude-3-haiku")
                that will be used when no model is explicitly specified.
                This identifier should be valid and usable without additional prefixing.
                
        Raises:
            NotImplementedError: In the base class implementation, signaling that 
                               subclasses must override this method.
            
        Note:
            Provider implementations should periodically review and update their default
            model selections as newer, more capable models become available or as pricing
            structures change.
        """
        raise NotImplementedError("Provider must implement get_default_model")
        
    async def check_api_key(self) -> bool:
        """
        Check if the API key for this provider is valid and functional.
        
        This method verifies that the configured API key is valid and can be used
        to authenticate with the provider's API. The default implementation simply
        checks if an API key is present, but provider-specific implementations
        should override this to perform actual validation against the provider's API.
        
        A proper implementation should:
        1. Make a lightweight API call to an endpoint that requires authentication
        2. Handle authentication errors specifically to differentiate from other failures
        3. Cache results when appropriate to avoid excessive validation calls
        4. Respect rate limits during validation
        
        This method is typically called during:
        - Server startup to verify all configured providers
        - Before first use of a provider to ensure it's properly configured
        - Periodically as a health check to detect expired or revoked keys
        - After configuration changes that might affect authentication
        
        Returns:
            bool: True if the API key is valid and usable, False otherwise.
                 The default implementation returns True if an API key is present,
                 which does not guarantee the key is actually valid or functional.
                 
        Note:
            Provider implementations should log descriptive error messages when
            validation fails to help with troubleshooting, but should avoid logging
            the actual API key or other sensitive credentials.
        """
        # Default implementation just checks if key exists
        return bool(self.api_key)
        
    async def process_with_timer(
        self, 
        func: callable, 
        *args, 
        **kwargs
    ) -> Tuple[Any, float]:
        """
        Process an async function call with precise timing measurement.
        
        This utility method provides a standardized way to execute any async function
        while measuring its execution time with high precision. It's particularly useful for:
        
        - Tracking LLM API call latency for performance monitoring
        - Measuring request-response round trip times
        - Providing accurate timing data for usage reports and optimizations
        - Including processing time in log messages and response metadata
        
        The method handles the time measurement boilerplate, ensuring consistent
        timing practices across all provider implementations. The measured processing
        time is returned alongside the function's result, allowing both to be used
        in subsequent operations.
        
        Args:
            func: The async function (callable) to execute and time. This should be
                 an awaitable function that performs the actual operation.
            *args: Positional arguments to pass to the function.
            **kwargs: Keyword arguments to pass to the function.
            
        Returns:
            Tuple containing:
              - The result returned by the executed function (any type)
              - The processing time in seconds as a float, measured with
                high precision from just before the function call to just after
                it completes.
                
        Example usage:
            ```python
            # Timing an API call
            response, duration = await self.process_with_timer(
                self.client.create,
                model="gpt-4o",
                prompt="Hello, world!"
            )
            
            # Using the measured time in a response
            return ModelResponse(
                text=response.choices[0].text,
                model="gpt-4o",
                provider=self.provider_name,
                processing_time=duration
            )
            ```
        """
        start_time = time.time()
        result = await func(*args, **kwargs)
        processing_time = time.time() - start_time
        
        return result, processing_time


def parse_model_string(model_string: str) -> Tuple[str, str]:
    """Parse a model string that might include a provider prefix.
    
    This function parses a model identifier string that may include a provider prefix
    (e.g., 'openai/gpt-4o' or 'anthropic:claude-3-sonnet'). It supports two separator
    formats: forward slash ('/') and colon (':'). If a valid provider prefix is found, 
    the function returns the provider name and model name as separate strings.
    
    Provider validation is performed against the Provider enum values to ensure the 
    prefix represents a supported provider. If no valid provider prefix is found, the
    provider component will be None, indicating the model should use the default provider.
    
    This function is particularly useful in contexts where users can specify models with optional
    provider prefixes, allowing the system to route requests to the appropriate provider
    even when the provider isn't explicitly specified elsewhere.
    
    Args:
        model_string: A model string, possibly including a provider prefix.
                     Examples: "openai/gpt-4.1-mini", "anthropic/claude-3-opus", 
                               "gemini:gemini-pro", "gpt-4o" (no provider)
                     
    Returns:
        Tuple of (provider_name, model_name):
        - provider_name (str or None): Lowercase provider name if a valid prefix was found,
          or None if no valid provider prefix was detected.
        - model_name (str): The model identifier without the provider prefix.
        
    Examples:
        >>> parse_model_string("openai/gpt-4o")
        ('openai', 'gpt-4o')
        
        >>> parse_model_string("anthropic:claude-3-opus")
        ('anthropic', 'claude-3-opus')
        
        >>> parse_model_string("gpt-4o")  # No provider prefix
        (None, 'gpt-4o')
        
        >>> parse_model_string("unknown/model-name")  # Invalid provider
        (None, 'unknown/model-name')
    """
    separator = None
    if '/' in model_string:
        separator = '/'
    elif ':' in model_string:
        separator = ':'
        
    if separator:
        # Try to extract provider prefix
        parts = model_string.split(separator, 1)
        if len(parts) == 2:
            provider_prefix, model_name = parts
            
            # Check if the prefix is a valid provider name
            # Use list comprehension for cleaner check against Provider enum values
            valid_providers = [p.value.lower() for p in Provider]
            if provider_prefix.lower() in valid_providers:
                return provider_prefix.lower(), model_name
    
    # No valid provider prefix found or no separator
    return None, model_string


async def get_provider(provider_name: str, **kwargs) -> BaseProvider:
    """
    Factory function to dynamically create and initialize a provider instance by name.
    
    This function serves as the central provider instantiation mechanism in the Ultimate MCP Server,
    dynamically creating and initializing the appropriate provider implementation based on
    the requested provider name. It handles:
    
    1. Provider name validation and normalization
    2. Provider class selection based on the standardized Provider enum
    3. Model string parsing to extract provider information from model identifiers
    4. Configuration retrieval from the Ultimate MCP Server config system
    5. Provider instance creation with appropriate parameters
    6. Provider initialization and validation
    
    The function supports specifying provider names directly or extracting them from
    model identifiers that include provider prefixes (e.g., "openai/gpt-4o"). This flexibility
    allows for more intuitive access to providers when working with specific models.
    
    Args:
        provider_name: Provider identifier to instantiate. This should match one of the
                      values in the Provider enum (case-insensitive). Examples include
                      "openai", "anthropic", "gemini", etc.
        **kwargs: Additional provider-specific configuration options to pass to the
                 provider's constructor. Common options include:
                 - api_key: Override the API key from configuration
                 - model: Model name to use (may include provider prefix)
                 - base_url: Alternative API endpoint URL
                 - organization: Organization ID for providers that support it
                 
    Returns:
        An initialized provider instance ready for use. The specific return type will
        be a subclass of BaseProvider corresponding to the requested provider.
        
    Raises:
        ValueError: If the provider name is invalid or initialization fails. This ensures
                   that only fully functional provider instances are returned.
                   
    Example usage:
        ```python
        # Basic usage with direct provider name
        openai_provider = await get_provider("openai")
        
        # Using a model string with provider prefix
        provider = await get_provider("openai", model="anthropic/claude-3-opus")
        # The above actually returns an AnthropicProvider because the model string
        # overrides the provider_name parameter
        
        # With additional configuration
        custom_provider = await get_provider(
            "openai",
            api_key="custom-key",
            base_url="https://custom-endpoint.example.com/v1",
            model="gpt-4o"
        )
        ```
    """
    cfg = get_config()
    provider_name = provider_name.lower().strip()
    
    # If a model was provided, check if it has a provider prefix
    # This helps with models like "openai/gpt-4.1-mini" to ensure they go to the right provider
    if 'model' in kwargs and isinstance(kwargs['model'], str):
        extracted_provider, extracted_model = parse_model_string(kwargs['model'])
        if extracted_provider:
            # If we have a provider prefix in the model string, use that provider
            # and update the model name to remove the prefix
            provider_name = extracted_provider
            kwargs['model'] = extracted_model
            logger.debug(f"Extracted provider '{provider_name}' and model '{extracted_model}' from model string")
    
    from ultimate_mcp_server.core.providers.anthropic import AnthropicProvider
    from ultimate_mcp_server.core.providers.deepseek import DeepSeekProvider
    from ultimate_mcp_server.core.providers.gemini import GeminiProvider
    from ultimate_mcp_server.core.providers.grok import GrokProvider
    from ultimate_mcp_server.core.providers.ollama import OllamaProvider
    from ultimate_mcp_server.core.providers.openai import OpenAIProvider
    from ultimate_mcp_server.core.providers.openrouter import OpenRouterProvider
    
    providers = {
        Provider.OPENAI: OpenAIProvider,
        Provider.ANTHROPIC: AnthropicProvider,
        Provider.DEEPSEEK: DeepSeekProvider,
        Provider.GEMINI: GeminiProvider,
        Provider.OPENROUTER: OpenRouterProvider,
        Provider.GROK: GrokProvider,
        Provider.OLLAMA: OllamaProvider,
    }
    
    if provider_name not in providers:
        raise ValueError(f"Invalid provider name: {provider_name}")
        
    # Get the top-level 'providers' config object, default to None if it doesn't exist
    providers_config = getattr(cfg, 'providers', None)
    
    # Get the specific provider config (e.g., providers_config.openai) from the providers_config object
    # Default to None if providers_config is None or the specific provider attr doesn't exist
    provider_cfg = getattr(providers_config, provider_name, None) if providers_config else None
    
    # Now use provider_cfg to get the api_key if needed
    if 'api_key' not in kwargs and provider_cfg and hasattr(provider_cfg, 'api_key') and provider_cfg.api_key:
        kwargs['api_key'] = provider_cfg.api_key
    
    provider_class = providers[provider_name]
    instance = provider_class(**kwargs)
    
    # Initialize the provider immediately
    initialized = await instance.initialize()
    if not initialized:
        # Raise an error if initialization fails to prevent returning an unusable instance
        raise ValueError(f"Failed to initialize provider: {provider_name}")

    return instance
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/sentiment_analysis.py:
--------------------------------------------------------------------------------

```python
"""Business-focused sentiment analysis tools for Ultimate MCP Server."""
import json
import time
from typing import Any, Dict, List, Optional

from ultimate_mcp_server.constants import Provider, TaskType
from ultimate_mcp_server.exceptions import ProviderError, ToolError, ToolInputError
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.tools.completion import generate_completion
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.business_sentiment")

@with_tool_metrics
@with_error_handling
async def analyze_business_sentiment(
    text: str,
    industry: Optional[str] = None,
    analysis_mode: str = "standard",
    entity_extraction: bool = False,
    aspect_based: bool = False,
    competitive_analysis: bool = False,
    intent_detection: bool = False,
    risk_assessment: bool = False,
    language: str = "english",
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None,
    threshold_config: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
    """Performs comprehensive business-oriented sentiment analysis for commercial applications.

    This enterprise-grade tool analyzes customer feedback, reviews, support tickets, survey responses,
    social media mentions and other business text data to extract actionable insights. It provides
    customizable analysis for specific industries and use cases with options for deep-dive analysis.

    Args:
        text: The business text to analyze (feedback, review, survey response, etc.).
        industry: Optional industry context to tailor analysis (e.g., "retail", "financial_services", 
                 "healthcare", "hospitality", "technology", "telecommunications", "manufacturing").
                 Improves accuracy by applying industry-specific terminology and benchmarks.
        analysis_mode: Type of analysis to perform:
                     - "standard": Basic business sentiment with key indicators
                     - "comprehensive": Detailed analysis with all available metrics
                     - "customer_experience": Focus on satisfaction, loyalty, and effort
                     - "product_feedback": Focus on feature sentiment and product improvement
                     - "brand_perception": Focus on brand attributes and competitive positioning
                     - "support_ticket": Optimized for support ticket prioritization and resolution
                     - "sales_opportunity": Focus on purchase intent and sales readiness
        entity_extraction: Whether to identify and extract mentioned products, services, features,
                          and business entities. Useful for pinpointing what customers are discussing.
        aspect_based: Whether to break down sentiment by specific aspects/features mentioned.
                     Helps identify which specific product/service elements drive sentiment.
        competitive_analysis: Whether to identify competitor mentions and comparative sentiment.
                             Useful for competitive intelligence and benchmarking.
        intent_detection: Whether to detect customer intents (e.g., purchase interest, cancellation
                         risk, support request, recommendation intent, complaint, praise).
        risk_assessment: Whether to evaluate potential business risks (e.g., churn risk, PR risk,
                        legal/compliance issues, potential escalation) based on the text.
        language: Language of the input text (supports multiple languages for global businesses).
        provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini").
        model: The specific model ID. If None, uses the provider's default model.
        threshold_config: Optional dictionary of threshold values for various metrics to customize
                         sensitivity levels (e.g., {"churn_risk": 0.7, "urgency": 0.8}).

    Returns:
        A dictionary containing comprehensive business sentiment analysis:
        {
            "core_metrics": {
                "primary_sentiment": "positive",  # Overall business sentiment
                "sentiment_score": 0.75,          # Normalized score (-1.0 to 1.0)
                "confidence": 0.92,               # Confidence in the assessment
                "satisfaction_score": 4.2,        # Estimated satisfaction (1-5 scale)
                "nps_category": "promoter",       # Predicted NPS category: detractor/passive/promoter
                "urgency": "low",                 # Action urgency assessment: low/medium/high/critical
                "actionability": 0.35            # How actionable the feedback is (0.0-1.0)
            },
            "business_dimensions": {              # Business-specific metrics
                "customer_satisfaction": 0.82,    # Satisfaction indicator (0.0-1.0)
                "product_sentiment": 0.75,        # Product sentiment (0.0-1.0)
                "value_perception": 0.68,         # Price-to-value perception (0.0-1.0)
                "ease_of_use": 0.90,              # Usability perception when relevant (0.0-1.0)
                "customer_effort_score": 2.1,     # Estimated CES (1-7 scale, lower is better)
                "loyalty_indicators": 0.85,       # Loyalty/retention indicators (0.0-1.0)
                "recommendation_likelihood": 0.87 # Likelihood to recommend (0.0-1.0)
            },
            "intent_analysis": {                  # Only if intent_detection=True
                "purchase_intent": 0.15,          # Purchase interest level (0.0-1.0)
                "churn_risk": 0.08,               # Risk of customer churn (0.0-1.0)
                "support_needed": 0.75,           # Likelihood customer needs support (0.0-1.0)
                "feedback_type": "suggestion",    # Type: complaint/praise/question/suggestion
                "information_request": false      # Whether customer is requesting information
            },
            "aspect_sentiment": {                 # Only if aspect_based=True
                "product_quality": 0.85,
                "customer_service": 0.92,
                "shipping_speed": 0.45,
                "return_process": 0.30,
                "website_usability": 0.78
            },
            "entity_extraction": {                # Only if entity_extraction=True
                "products": ["Product X Pro", "Legacy Model"],
                "features": ["battery life", "touchscreen responsiveness"],
                "services": ["customer support", "technical assistance"],
                "mentioned_departments": ["billing", "technical support"]
            },
            "competitive_insights": {             # Only if competitive_analysis=True
                "competitor_mentions": ["Competitor A", "Competitor B"],
                "comparative_sentiment": {
                    "Competitor A": -0.2,         # Negative comparison to competitor
                    "Competitor B": 0.3           # Positive comparison to competitor
                },
                "perceived_advantages": ["price", "features"],
                "perceived_disadvantages": ["support response time"]
            },
            "risk_assessment": {                  # Only if risk_assessment=True
                "churn_probability": 0.32,
                "response_urgency": "medium",
                "pr_risk": "low",
                "legal_compliance_flags": ["data privacy concern"],
                "escalation_probability": 0.15
            },
            "message_characteristics": {
                "key_topics": ["product quality", "customer service", "pricing"],
                "key_phrases": ["extremely satisfied", "quick resolution"],
                "tone_indicators": ["appreciative", "constructive"],
                "clarity": 0.9,                  # How clear/specific the feedback is (0.0-1.0)
                "subjectivity": 0.4,             # Subjective vs. objective content (0.0-1.0)
                "emotional_intensity": 0.65      # Intensity of emotion expressed (0.0-1.0)
            },
            "industry_specific_insights": {},    # Varies based on 'industry' parameter
            "recommended_actions": [             # Business action recommendations
                "Follow up regarding mentioned technical issue",
                "Highlight positive experience in success stories"
            ],
            "meta": {                           # Metadata about the analysis
                "provider": "anthropic",
                "model": "claude-3-5-sonnet-20241022",
                "analysis_mode": "comprehensive",
                "language_detected": "english",
                "tokens": { "input": 350, "output": 820, "total": 1170 },
                "cost": 0.000843,
                "processing_time": 1.25,
                "version": "2.4.0"
            },
            "success": true
        }

    Raises:
        ToolInputError: If parameters are invalid or incompatible.
        ProviderError: If the provider is unavailable or the LLM request fails.
        ToolError: For other errors during processing.
    """
    start_time = time.time()
    
    # Parameter validation
    if not text or not isinstance(text, str):
        raise ToolInputError(
            "Input text must be a non-empty string.",
            param_name="text",
            provided_value=text
        )
    
    valid_analysis_modes = [
        "standard", "comprehensive", "customer_experience", "product_feedback", 
        "brand_perception", "support_ticket", "sales_opportunity"
    ]
    if analysis_mode not in valid_analysis_modes:
        raise ToolInputError(
            f"Invalid analysis_mode. Must be one of: {', '.join(valid_analysis_modes)}",
            param_name="analysis_mode",
            provided_value=analysis_mode
        )
    
    # Construct the analysis prompt based on parameters
    system_prompt = _build_sentiment_system_prompt(
        industry=industry,
        analysis_mode=analysis_mode,
        entity_extraction=entity_extraction,
        aspect_based=aspect_based,
        competitive_analysis=competitive_analysis,
        intent_detection=intent_detection,
        risk_assessment=risk_assessment,
        language=language,
        threshold_config=threshold_config
    )
    
    user_prompt = f"""
    Analyze the following business text according to the specified parameters:
    
    Text to analyze:
    ```
    {text}
    ```
    
    Provide a detailed JSON response according to the format specified in the system instructions.
    """
    
    # Combined prompt for all providers
    combined_prompt = f"{system_prompt}\n\n{user_prompt}"
    
    try:
        # Consistently use generate_completion for all providers
        completion_result = await generate_completion(
            prompt=combined_prompt,
            provider=provider,
            model=model,
            temperature=0.2,
            max_tokens=2000,
            additional_params={"response_format": {"type": "json_object"}} if provider.lower() == "openai" else None
        )
        
        # Extract response text from the completion result
        response_text = completion_result["text"].strip()
        
        # Extract JSON response
        try:
            # Try to extract JSON if wrapped in code blocks
            if "```json" in response_text:
                json_start = response_text.find("```json") + 7
                json_end = response_text.find("```", json_start)
                if json_end > json_start:
                    response_text = response_text[json_start:json_end].strip()
            elif "```" in response_text:
                json_start = response_text.find("```") + 3
                json_end = response_text.find("```", json_start)
                if json_end > json_start:
                    response_text = response_text[json_start:json_end].strip()
            
            # Parse and validate JSON
            analysis_data = json.loads(response_text)
            
            # Validate minimum required fields
            if "core_metrics" not in analysis_data:
                logger.warning("Missing 'core_metrics' in response, adding empty object")
                analysis_data["core_metrics"] = {}
            
            # Ensure core metrics contains primary sentiment
            core_metrics = analysis_data["core_metrics"]
            if "primary_sentiment" not in core_metrics:
                sentiment_score = core_metrics.get("sentiment_score", 0.0)
                if sentiment_score > 0.2:
                    primary_sentiment = "positive"
                elif sentiment_score < -0.2:
                    primary_sentiment = "negative"
                else:
                    primary_sentiment = "neutral"
                core_metrics["primary_sentiment"] = primary_sentiment
                logger.debug(f"Added missing primary_sentiment: {primary_sentiment}")
            
            # Populate metadata
            processing_time = time.time() - start_time
            
            # Extract provider and model info from completion result
            result_provider = completion_result.get("provider", provider)
            result_model = completion_result.get("model", model)
            input_tokens = completion_result.get("tokens", {}).get("input", 0)
            output_tokens = completion_result.get("tokens", {}).get("output", 0)
            total_tokens = completion_result.get("tokens", {}).get("total", 0)
            cost = completion_result.get("cost", 0.0)
            
            meta = {
                "provider": result_provider,
                "model": result_model,
                "analysis_mode": analysis_mode,
                "language_detected": language,  # Actual detection would need more logic
                "tokens": {
                    "input": input_tokens,
                    "output": output_tokens,
                    "total": total_tokens,
                },
                "cost": cost,
                "processing_time": processing_time,
                "version": "2.4.0"  # Tool version
            }
            
            # Include metadata in the final response
            analysis_data["meta"] = meta
            analysis_data["success"] = True
            
            # Log successful completion
            logger.success(
                f"Business sentiment analysis completed successfully with {result_provider}/{result_model}",
                emoji_key=TaskType.CLASSIFICATION.value,
                analysis_mode=analysis_mode,
                sentiment=core_metrics.get("primary_sentiment", "unknown"),
                tokens={
                    "input": input_tokens,
                    "output": output_tokens
                },
                cost=cost,
                time=processing_time
            )
            
            return analysis_data
            
        except json.JSONDecodeError as e:
            logger.error(
                f"Failed to parse JSON response: {e}",
                emoji_key="error",
                raw_response=response_text[:500]  # Log partial response for debugging
            )
            raise ToolError(
                f"Failed to parse business sentiment analysis response: {e}",
                error_code="invalid_response_format",
                details={"raw_response": response_text[:500]}
            ) from e
            
    except Exception as e:
        raise ProviderError(
            f"Business sentiment analysis failed: {str(e)}",
            provider=provider,
            model=model,
            cause=e
        ) from e


def _build_sentiment_system_prompt(
    industry: Optional[str],
    analysis_mode: str,
    entity_extraction: bool,
    aspect_based: bool,
    competitive_analysis: bool,
    intent_detection: bool, 
    risk_assessment: bool,
    language: str,
    threshold_config: Optional[Dict[str, float]]
) -> str:
    """Builds a comprehensive system prompt for business sentiment analysis based on parameters."""
    
    # Base prompt with core instructions
    base_prompt = """
    You are an enterprise-grade business sentiment analysis system designed to extract actionable insights from customer and stakeholder feedback. Your analysis should be precise, nuanced, and tailored to business decision-making.
    
    Provide analysis in a structured JSON format with the following core sections:
    1. core_metrics: Essential sentiment indicators
    2. business_dimensions: Business-specific metrics like satisfaction and loyalty
    3. message_characteristics: Content properties like topics and tone
    
    All numerical scores should be consistent (higher is better unless otherwise specified) and normalized within their specified ranges.
    """
    
    # Industry-specific tailoring
    industry_prompt = ""
    if industry:
        industry_mappings = {
            "retail": "Retail and e-commerce context: Focus on product quality, shopping experience, delivery, returns, and customer service. Include retail-specific metrics like purchase satisfaction and repeat purchase intent.",
            
            "financial_services": "Financial services context: Focus on trust, security, transparency, and service quality. Include financial-specific metrics like perceived financial benefit, trust indicator, and financial confidence impact.",
            
            "healthcare": "Healthcare context: Focus on care quality, staff interactions, facility experience, and outcomes. Include healthcare-specific metrics like perceived care quality, staff empathy, and outcome satisfaction.",
            
            "hospitality": "Hospitality context: Focus on accommodations, amenities, staff service, and overall experience. Include hospitality-specific metrics like comfort rating, staff attentiveness, and value perception.",
            
            "technology": "Technology/SaaS context: Focus on software/product functionality, reliability, ease of use, and technical support. Include tech-specific metrics like feature satisfaction, reliability perception, and technical resolution satisfaction.",
            
            "telecommunications": "Telecommunications context: Focus on service reliability, coverage, customer support, and value. Include telecom-specific metrics like service reliability rating, coverage satisfaction, and value perception.",
            
            "manufacturing": "Manufacturing context: Focus on product quality, durability, specifications adherence, and support. Include manufacturing-specific metrics like quality rating, durability perception, and technical specification satisfaction."
        }
        
        if industry.lower() in industry_mappings:
            industry_prompt = f"\nINDUSTRY CONTEXT: {industry_mappings[industry.lower()]}\n"
            industry_prompt += "\nInclude an 'industry_specific_insights' section with metrics and insights specific to this industry."
        else:
            industry_prompt = f"\nINDUSTRY CONTEXT: {industry} - Apply industry-specific terminology and standards.\n"
    
    # Analysis mode specification
    mode_prompt = "\nANALYSIS MODE: "
    if analysis_mode == "standard":
        mode_prompt += "Standard business sentiment with core metrics and key indicators."
    elif analysis_mode == "comprehensive":
        mode_prompt += "Comprehensive analysis with all available metrics and maximum detail."
    elif analysis_mode == "customer_experience":
        mode_prompt += "Customer experience focus: Emphasize satisfaction, loyalty, and effort metrics. Pay special attention to service interactions, pain points, and moments of delight."
    elif analysis_mode == "product_feedback":
        mode_prompt += "Product feedback focus: Emphasize feature sentiment, product quality, and improvement suggestions. Identify specific product components mentioned and their sentiment."
    elif analysis_mode == "brand_perception":
        mode_prompt += "Brand perception focus: Emphasize brand attributes, positioning, and emotional connections. Analyze brand promise fulfillment and competitive positioning."
    elif analysis_mode == "support_ticket":
        mode_prompt += "Support ticket focus: Emphasize issue categorization, severity, urgency, and resolution path. Detect technical terms and problem indicators."
    elif analysis_mode == "sales_opportunity":
        mode_prompt += "Sales opportunity focus: Emphasize purchase intent, objections, and decision factors. Analyze buying signals and sales readiness indicators."
    
    # Optional analysis components
    optional_components = []
    
    if entity_extraction:
        optional_components.append("""
        ENTITY EXTRACTION: Extract and categorize business entities mentioned in the text.
        - Include an 'entity_extraction' section with arrays of identified products, features, services, departments, locations, etc.
        - Normalize entity names when variations of the same entity are mentioned.
        - Exclude generic mentions and focus on specific named entities.
        """)
    
    if aspect_based:
        optional_components.append("""
        ASPECT-BASED SENTIMENT: Break down sentiment by specific aspects or features mentioned.
        - Include an 'aspect_sentiment' section with sentiment scores for each identified aspect.
        - Aspects should be specific (e.g., 'website_usability', 'checkout_process', 'product_quality').
        - Only include aspects explicitly mentioned or strongly implied in the text.
        - Score each aspect from -1.0 (extremely negative) to 1.0 (extremely positive).
        """)
    
    if competitive_analysis:
        optional_components.append("""
        COMPETITIVE ANALYSIS: Identify and analyze competitor mentions and comparisons.
        - Include a 'competitive_insights' section with competitor names and comparative sentiment.
        - Capture explicit and implicit comparisons to competitors.
        - Identify perceived advantages and disadvantages relative to competitors.
        - Score comparative sentiment from -1.0 (negative comparison) to 1.0 (positive comparison).
        """)
    
    if intent_detection:
        optional_components.append("""
        INTENT DETECTION: Identify customer intentions and likely next actions.
        - Include an 'intent_analysis' section with probabilities for purchase intent, churn risk, etc.
        - Classify the feedback type (complaint, praise, question, suggestion).
        - Detect specific intents like information requests, cancellation warnings, escalation threats.
        - Score intent probabilities from 0.0 (no indication) to 1.0 (strong indication).
        """)
    
    if risk_assessment:
        optional_components.append("""
        RISK ASSESSMENT: Evaluate potential business risks in the feedback.
        - Include a 'risk_assessment' section with probabilities and categories of identified risks.
        - Assess churn probability, PR/reputation risk, legal/compliance concerns, etc.
        - Provide an escalation probability and urgency level.
        - Flag sensitive content that may require special attention.
        """)
    
    # Language specification
    language_prompt = f"\nLANGUAGE: Analyze text in {language}. Ensure all scores and categorizations are correctly interpreted within cultural and linguistic context."
    
    # Threshold configurations if provided
    threshold_prompt = ""
    if threshold_config and isinstance(threshold_config, dict):
        threshold_prompt = "\nTHRESHOLD CONFIGURATION:"
        for metric, value in threshold_config.items():
            threshold_prompt += f"\n- {metric}: {value}"
    
    # Combine all prompt components
    full_prompt = base_prompt + industry_prompt + mode_prompt + language_prompt + threshold_prompt
    
    if optional_components:
        full_prompt += "\n\nADDITIONAL ANALYSIS COMPONENTS:"
        full_prompt += "\n".join(optional_components)
    
    # Output format specification
    output_format = """
    RESPONSE FORMAT: Respond only with a valid JSON object containing all applicable sections based on the analysis parameters.
    
    Always include these core sections:
    - core_metrics: Overall sentiment, scores, and primary indicators
    - business_dimensions: Business-specific satisfaction and perception metrics
    - message_characteristics: Content properties, topics, and expression styles
    - recommended_actions: 1-3 specific business actions based on the analysis
    - meta: Will be populated with metadata about the analysis
    
    Add optional sections as specified by the analysis parameters.
    
    Ensure all numerical values are normalized to their specified ranges and all categorical values use consistent terminology.
    """
    
    full_prompt += output_format
    
    return full_prompt


@with_tool_metrics
@with_error_handling
async def analyze_business_text_batch(
    texts: List[str],
    analysis_config: Dict[str, Any],
    aggregate_results: bool = True,
    max_concurrency: int = 3,
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None
) -> Dict[str, Any]:
    """Processes a batch of business texts for sentiment analysis with aggregated insights.

    Designed for analyzing large volumes of business feedback (reviews, surveys, tickets)
    efficiently with detailed individual analyses and optional aggregated metrics. Ideal for
    business intelligence, customer experience programs, and trend identification.

    Args:
        texts: List of text items to analyze (reviews, feedback, etc.).
        analysis_config: Configuration dictionary for analyze_business_sentiment.
                        Example: {"analysis_mode": "standard", "entity_extraction": True}
                        All parameters from analyze_business_sentiment except text, provider, model.
        aggregate_results: Whether to generate aggregated insights across all analyzed texts.
                         Includes trend detection, sentiment distribution, and pattern identification.
        max_concurrency: Maximum number of parallel analyses to run.
        provider: The name of the LLM provider to use.
        model: The specific model ID. If None, uses the provider's default.

    Returns:
        A dictionary containing individual and aggregated results:
        {
            "individual_results": [
                {
                    "text_id": 0,
                    "text_preview": "First 50 characters of text...",
                    "analysis": { /* Complete analysis result for this text */ }
                },
                // Additional individual results...
            ],
            "aggregate_insights": {  // Only if aggregate_results=True
                "sentiment_distribution": {
                    "positive": 0.65,  // 65% positive
                    "neutral": 0.20,   // 20% neutral
                    "negative": 0.15   // 15% negative
                },
                "average_metrics": {
                    "sentiment_score": 0.42,
                    "satisfaction_score": 3.8,
                    // Other averaged metrics...
                },
                "top_aspects": [
                    {"name": "customer_service", "avg_sentiment": 0.75, "mention_count": 42},
                    {"name": "product_quality", "avg_sentiment": 0.62, "mention_count": 38},
                    // Additional aspects...
                ],
                "key_topics": [
                    {"topic": "shipping delays", "mention_count": 35, "avg_sentiment": -0.3},
                    {"topic": "easy checkout", "mention_count": 28, "avg_sentiment": 0.8},
                    // Additional topics...
                ],
                "entity_mention_frequencies": {
                    "products": {"Product X": 45, "Product Y": 23},
                    "features": {"user interface": 38, "reliability": 27}
                },
                "emerging_patterns": [
                    "Increasing mentions of mobile app usability",
                    "Growing negative sentiment about recent policy change"
                ],
                "risk_indicators": [
                    {"issue": "shipping delays", "severity": "medium", "trend": "increasing"},
                    {"issue": "billing confusion", "severity": "low", "trend": "stable"}
                ]
            },
            "meta": {
                "batch_size": 250,
                "success_count": 248,
                "error_count": 2,
                "processing_time": 128.5,
                "total_cost": 4.87,
                "timestamp": "2025-04-21T14:30:00Z"
            },
            "success": true
        }

    Raises:
        ToolInputError: If input parameters are invalid.
        ProviderError: If the provider service fails.
        ToolError: For other processing errors.
    """
    start_time = time.time()
    total_cost = 0.0
    success_count = 0
    error_count = 0
    
    # Validate inputs
    if not texts or not isinstance(texts, list):
        raise ToolInputError(
            "The 'texts' parameter must be a non-empty list of strings.",
            param_name="texts",
            provided_value=texts
        )
    
    if not analysis_config or not isinstance(analysis_config, dict):
        raise ToolInputError(
            "The 'analysis_config' parameter must be a dictionary of configuration options.",
            param_name="analysis_config",
            provided_value=analysis_config
        )
    
    # Process texts with concurrency control
    import asyncio
    semaphore = asyncio.Semaphore(max_concurrency)
    individual_results = []
    all_analyses = []
    
    async def process_text(idx: int, text: str):
        nonlocal total_cost, success_count, error_count
        
        async with semaphore:
            text_preview = text[:50] + ("..." if len(text) > 50 else "")
            logger.debug(f"Processing text {idx+1}/{len(texts)}: {text_preview}")
            
            try:
                # Create a copy of analysis_config to avoid modifying the original
                config = analysis_config.copy()
                
                # Add provider and model to config
                config["provider"] = provider
                config["model"] = model
                
                # Process the individual text using our refactored analyze_business_sentiment
                result = await analyze_business_sentiment(
                    text=text,
                    **config
                )
                
                # Update metrics
                total_cost += result.get("meta", {}).get("cost", 0.0)
                success_count += 1
                
                # Record result
                individual_results.append({
                    "text_id": idx,
                    "text_preview": text_preview,
                    "analysis": result
                })
                
                # Store for aggregation
                all_analyses.append(result)
                
                return result
                
            except Exception as e:
                logger.error(f"Error analyzing text {idx}: {str(e)}", exc_info=True)
                error_count += 1
                
                # Record error
                individual_results.append({
                    "text_id": idx,
                    "text_preview": text_preview,
                    "error": str(e)
                })
                
                return None
    
    # Create and run tasks
    tasks = [process_text(i, text) for i, text in enumerate(texts)]
    await asyncio.gather(*tasks)
    
    # Sort results by text_id to maintain original order
    individual_results.sort(key=lambda x: x["text_id"])
    
    # Build response
    result = {
        "individual_results": individual_results,
        "meta": {
            "batch_size": len(texts),
            "success_count": success_count,
            "error_count": error_count,
            "processing_time": time.time() - start_time,
            "total_cost": total_cost,
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        },
        "success": True
    }
    
    # Calculate aggregate insights if requested and we have successful analyses
    if aggregate_results and all_analyses:
        try:
            aggregate_insights = _calculate_aggregate_insights(all_analyses)
            result["aggregate_insights"] = aggregate_insights
        except Exception as e:
            logger.error(f"Error calculating aggregate insights: {str(e)}", exc_info=True)
            result["aggregate_insights_error"] = str(e)
    
    return result


def _calculate_aggregate_insights(analyses: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Calculates aggregate insights across multiple business sentiment analyses."""
    
    # Initialize aggregation containers
    sentiment_counts = {"positive": 0, "neutral": 0, "negative": 0}
    sentiment_scores = []
    satisfaction_scores = []
    loyalty_indicators = []
    aspect_sentiments = {}
    topics = {}
    mentioned_entities = {
        "products": {},
        "features": {},
        "services": {}
    }
    
    # Process each analysis
    for analysis in analyses:
        # Skip any analyses without core_metrics
        if "core_metrics" not in analysis:
            continue
        
        core = analysis.get("core_metrics", {})
        business = analysis.get("business_dimensions", {})
        
        # Sentiment distribution
        sentiment = core.get("primary_sentiment", "neutral").lower()
        if sentiment in sentiment_counts:
            sentiment_counts[sentiment] += 1
        
        # Collect numerical metrics
        if "sentiment_score" in core:
            sentiment_scores.append(core["sentiment_score"])
        
        if "satisfaction_score" in business:
            satisfaction_scores.append(business["satisfaction_score"])
            
        if "loyalty_indicators" in business:
            loyalty_indicators.append(business["loyalty_indicators"])
        
        # Aspect sentiments
        for aspect, score in analysis.get("aspect_sentiment", {}).items():
            if aspect not in aspect_sentiments:
                aspect_sentiments[aspect] = {"scores": [], "count": 0}
            
            aspect_sentiments[aspect]["scores"].append(score)
            aspect_sentiments[aspect]["count"] += 1
        
        # Topics
        for topic in analysis.get("message_characteristics", {}).get("key_topics", []):
            if topic not in topics:
                topics[topic] = 0
            topics[topic] += 1
        
        # Entity mentions
        for entity_type, entities in analysis.get("entity_extraction", {}).items():
            if entity_type in mentioned_entities and isinstance(entities, list):
                for entity in entities:
                    if entity not in mentioned_entities[entity_type]:
                        mentioned_entities[entity_type][entity] = 0
                    mentioned_entities[entity_type][entity] += 1
    
    # Calculate distributions as percentages
    total_sentiments = sum(sentiment_counts.values())
    sentiment_distribution = {
        k: round(v / total_sentiments, 2) if total_sentiments else 0 
        for k, v in sentiment_counts.items()
    }
    
    # Calculate average metrics
    average_metrics = {}
    if sentiment_scores:
        average_metrics["sentiment_score"] = sum(sentiment_scores) / len(sentiment_scores)
    
    if satisfaction_scores:
        average_metrics["satisfaction_score"] = sum(satisfaction_scores) / len(satisfaction_scores)
    
    if loyalty_indicators:
        average_metrics["loyalty_indicators"] = sum(loyalty_indicators) / len(loyalty_indicators)
    
    # Process aspect sentiments
    top_aspects = []
    for aspect, data in aspect_sentiments.items():
        avg_sentiment = sum(data["scores"]) / len(data["scores"]) if data["scores"] else 0
        top_aspects.append({
            "name": aspect,
            "avg_sentiment": round(avg_sentiment, 2),
            "mention_count": data["count"]
        })
    
    # Sort aspects by mention count
    top_aspects.sort(key=lambda x: x["mention_count"], reverse=True)
    
    # Process topics
    key_topics = [{"topic": k, "mention_count": v} for k, v in topics.items()]
    key_topics.sort(key=lambda x: x["mention_count"], reverse=True)
    
    # Build aggregated insights
    aggregate_insights = {
        "sentiment_distribution": sentiment_distribution,
        "average_metrics": average_metrics,
        "top_aspects": top_aspects[:10],  # Limit to top 10
        "key_topics": key_topics[:10],    # Limit to top 10
        "entity_mention_frequencies": mentioned_entities
    }
    
    return aggregate_insights
```
Page 10/35FirstPrevNextLast