#
tokens: 44675/50000 6/207 files (page 9/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 9 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/ultimate_mcp_server/cli/typer_cli.py:
--------------------------------------------------------------------------------

```python
"""Typer CLI implementation for the Ultimate MCP Server."""
import asyncio
import os
import sys
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional

import typer
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm
from rich.table import Table

# Get version hardcoded to avoid import errors
__version__ = "0.1.0"  # Hardcode since there are import issues

from ultimate_mcp_server.cli.commands import (
    benchmark_providers,
    check_cache,
    generate_completion,
    list_providers,
    run_server,
    test_provider,
)
from ultimate_mcp_server.constants import BASE_TOOLSET_CATEGORIES
from ultimate_mcp_server.utils import get_logger

# Use consistent namespace and get console for Rich output
logger = get_logger("ultimate_mcp_server.cli")
console = Console(file=sys.stderr)  # Use stderr to avoid interfering with MCP protocol

# Create typer app
app = typer.Typer(
    name="umcp",
    help= (
        "[bold green]Ultimate MCP Server[/bold green]: Multi-provider LLM management server\n"
        "[italic]Unified CLI to run your server, manage providers, and more.[/italic]"
    ),
    rich_markup_mode="rich",
    no_args_is_help=True,
    add_completion=True,
)


def version_callback(value: bool):
    """Show the version information and exit.
    
    This callback is triggered by the --version/-v flag and displays
    the current version of Ultimate MCP Server before exiting.
    """
    if value:
        console.print(f"Ultimate MCP Server version: [bold]{__version__}[/bold]")
        raise typer.Exit()


class TransportMode(str, Enum):
    """Transport mode for the server."""

    SSE = "sse"
    STDIO = "stdio"
    STREAMABLE_HTTP = "streamable-http"
    SHTTP = "shttp"  # Short alias for streamable-http


# Define tool-to-example mapping
TOOL_TO_EXAMPLE_MAP: Dict[str, str] = {
    # Completion tools
    "generate_completion": "simple_completion_demo.py",
    "stream_completion": "simple_completion_demo.py",
    "chat_completion": "claude_integration_demo.py",
    "multi_completion": "multi_provider_demo.py",
    
    # Provider tools
    "get_provider_status": "multi_provider_demo.py",
    "list_models": "multi_provider_demo.py",
    
    # Document tools
    "summarize_document": "document_conversion_and_processing_demo.py",
    "extract_entities": "document_conversion_and_processing_demo.py",
    "chunk_document": "document_conversion_and_processing_demo.py",
    "process_document_batch": "document_conversion_and_processing_demo.py",
    "extract_text_from_pdf": "document_conversion_and_processing_demo.py",
    "process_image_ocr": "document_conversion_and_processing_demo.py",
    
    # Extraction tools
    "extract_json": "advanced_extraction_demo.py",
    "extract_table": "advanced_extraction_demo.py",
    "extract_key_value_pairs": "advanced_extraction_demo.py",
    "extract_semantic_schema": "advanced_extraction_demo.py",
    
    # Entity graph tools
    "extract_entity_graph": "entity_relation_graph_demo.py",
    
    # RAG tools
    "create_knowledge_base": "rag_example.py",
    "add_documents": "rag_example.py",
    "retrieve_context": "rag_example.py",
    "generate_with_rag": "rag_example.py",
    
    # Classification tools
    "text_classification": "text_classification_demo.py",
    
    # Tournament tools
    "create_tournament": "tournament_text_demo.py",
    "list_tournaments": "tournament_text_demo.py",
    "get_tournament_results": "tournament_text_demo.py",
    
    # Optimization tools
    "estimate_cost": "cost_optimization.py",
    "compare_models": "cost_optimization.py",
    "recommend_model": "cost_optimization.py",
    
    # Filesystem tools
    "read_file": "filesystem_operations_demo.py",
    "write_file": "filesystem_operations_demo.py",
    "list_directory": "filesystem_operations_demo.py",
    "search_files": "filesystem_operations_demo.py",
    

    # HTML tools
    "clean_and_format_text_as_markdown": "html_to_markdown_demo.py",
    
    # Text comparison tools
    "compare_documents_redline": "text_redline_demo.py",
    
    # Marqo search tools
    "marqo_fused_search": "marqo_fused_search_demo.py",
    
    # SQL tools
    "connect_to_database": "sql_database_interactions_demo.py",
    "execute_query": "sql_database_interactions_demo.py",
    
    # Audio tools
    "transcribe_audio": "audio_transcription_demo.py",
    
    # Browser automation tools
    "browser_init": "browser_automation_demo.py",
    "execute_web_workflow": "browser_automation_demo.py",
}

# Group examples by category
EXAMPLE_CATEGORIES: Dict[str, List[str]] = {
    "text-generation": [
        "simple_completion_demo.py",
        "claude_integration_demo.py",
        "multi_provider_demo.py",
        "grok_integration_demo.py",
    ],
    "document-conversion-and-processing": [
        "document_conversion_and_processing_demo.py",
        "advanced_extraction_demo.py",
    ],
    "search-and-retrieval": [
        "rag_example.py",
        "vector_search_demo.py",
        "advanced_vector_search_demo.py",
        "marqo_fused_search_demo.py",
    ],
    "browser-automation": [
        "browser_automation_demo.py",
        "sse_client_demo.py",
    ],
    "data-analysis": [
        "sql_database_interactions_demo.py",
        "analytics_reporting_demo.py",
    ],
    "specialized-tools": [
        "audio_transcription_demo.py",
        "text_redline_demo.py",
        "html_to_markdown_demo.py",
        "entity_relation_graph_demo.py",
    ],
    "workflows": [
        "workflow_delegation_demo.py",
        "tool_composition_examples.py",
        "research_workflow_demo.py",
    ],
}


# Define option constants to avoid function calls in default arguments
HOST_OPTION = typer.Option(
    None,
    "-h",
    "--host",
    help="[cyan]Host[/cyan] or [cyan]IP address[/cyan] to bind the server to (-h shortcut). Defaults from config.",
    rich_help_panel="Server Options",
)
PORT_OPTION = typer.Option(
    None,
    "-p",
    "--port",
    help="[cyan]Port[/cyan] to listen on (-p shortcut). Defaults from config.",
    rich_help_panel="Server Options",
)
WORKERS_OPTION = typer.Option(
    None,
    "-w",
    "--workers",
    help="[cyan]Number of worker[/cyan] processes to spawn (-w shortcut). Defaults from config.",
    rich_help_panel="Server Options",
)
TRANSPORT_MODE_OPTION = typer.Option(
    TransportMode.STREAMABLE_HTTP,
    "-t",
    "--transport-mode",
    help="[cyan]Transport mode[/cyan] for server communication (-t shortcut). Options: 'sse', 'stdio', 'streamable-http', 'shttp'.",
    rich_help_panel="Server Options",
)
DEBUG_OPTION = typer.Option(
    False,
    "-d",
    "--debug",
    help="[yellow]Enable debug logging[/yellow] for detailed output (-d shortcut).",
    rich_help_panel="Server Options",
)
INCLUDE_TOOLS_OPTION = typer.Option(
    None,
    "--include-tools",
    help="[green]List of tool names to include[/green] when running the server. Adds to the 'Base Toolset' by default, or to all tools if --load-all-tools is used.",
    rich_help_panel="Server Options",
)
EXCLUDE_TOOLS_OPTION = typer.Option(
    None,
    "--exclude-tools",
    help="[red]List of tool names to exclude[/red] when running the server. Applies after including tools.",
    rich_help_panel="Server Options",
)
LOAD_ALL_TOOLS_OPTION = typer.Option(
    False,
    "-a",
    "--load-all-tools",
    help="[yellow]Load all available tools[/yellow] instead of just the default 'Base Toolset' (-a shortcut).",
    rich_help_panel="Server Options",
)

CHECK_OPTION = typer.Option(
    False,
    "-c",
    "--check",
    help="[yellow]Check API keys[/yellow] for all configured providers (-c shortcut).",
    rich_help_panel="Provider Options",
)
MODELS_OPTION = typer.Option(
    False,
    "--models",
    help="[green]List available models[/green] for each provider.",
    rich_help_panel="Provider Options",
)

MODEL_OPTION = typer.Option(
    None,
    "--model",
    help="[cyan]Model ID[/cyan] to test (defaults to the provider's default).",
    rich_help_panel="Test Options",
)
PROMPT_OPTION = typer.Option(
    "Hello, world!",
    "--prompt",
    help="[magenta]Prompt text[/magenta] to send to the provider.",
    rich_help_panel="Test Options",
)

PROVIDER_OPTION = typer.Option(
    "openai",
    "--provider",
    help="[cyan]Provider[/cyan] to use (default: openai)",
    rich_help_panel="Completion Options",
)
COMPLETION_MODEL_OPTION = typer.Option(
    None,
    "--model",
    help="[blue]Model ID[/blue] for completion (defaults to the provider's default)",
    rich_help_panel="Completion Options",
)
COMPLETION_PROMPT_OPTION = typer.Option(
    None,
    "--prompt",
    help="[magenta]Prompt text[/magenta] for generation (reads from stdin if not provided)",
    rich_help_panel="Completion Options",
)
TEMPERATURE_OPTION = typer.Option(
    0.7,
    "--temperature",
    help="[yellow]Sampling temperature[/yellow] (0.0 - 2.0, default: 0.7)",
    rich_help_panel="Completion Options",
)
MAX_TOKENS_OPTION = typer.Option(
    None,
    "--max-tokens",
    help="[green]Max tokens[/green] to generate (defaults to provider's setting)",
    rich_help_panel="Completion Options",
)
SYSTEM_OPTION = typer.Option(
    None,
    "--system",
    help="[blue]System prompt[/blue] for providers that support it.",
    rich_help_panel="Completion Options",
)
STREAM_OPTION = typer.Option(
    False,
    "-s",
    "--stream",
    help="[cyan]Stream[/cyan] the response token by token (-s shortcut).",
    rich_help_panel="Completion Options",
)

STATUS_OPTION = typer.Option(
    True,
    "--status",
    help="[green]Show cache status[/green]",
    rich_help_panel="Cache Options",
)
CLEAR_OPTION = typer.Option(
    False,
    "--clear",
    help="[red]Clear the cache[/red]",
    rich_help_panel="Cache Options",
)

DEFAULT_PROVIDERS = ["openai", "anthropic", "deepseek", "gemini", "openrouter"]
PROVIDERS_OPTION = typer.Option(
    DEFAULT_PROVIDERS,
    "--providers",
    help="[cyan]Providers list[/cyan] to benchmark (default: all)",
    rich_help_panel="Benchmark Options",
)
BENCHMARK_MODELS_OPTION = typer.Option(
    None,
    "--models",
    help="[blue]Model IDs[/blue] to benchmark (defaults to default model of each provider)",
    rich_help_panel="Benchmark Options",
)
BENCHMARK_PROMPT_OPTION = typer.Option(
    None,
    "--prompt",
    help="[magenta]Prompt text[/magenta] to use for benchmarking (default built-in)",
    rich_help_panel="Benchmark Options",
)
RUNS_OPTION = typer.Option(
    3,
    "-r",
    "--runs",
    help="[green]Number of runs[/green] per provider/model (-r shortcut, default: 3)",
    rich_help_panel="Benchmark Options",
)

VERSION_OPTION = typer.Option(
    False,
    "--version",
    "-v",
    callback=version_callback,
    is_eager=True,
    help="[yellow]Show the application version and exit.[/yellow]",
    rich_help_panel="Global Options",
)

# Options for tools command
CATEGORY_OPTION = typer.Option(
    None,
    "--category",
    help="[cyan]Filter category[/cyan] when listing tools.",
    rich_help_panel="Tools Options",
)
CATEGORY_FILTER_OPTION = typer.Option(
    None,
    "--category",
    help="[cyan]Filter category[/cyan] when listing examples.",
    rich_help_panel="Examples Options",
)

# Options for examples command
SHOW_EXAMPLES_OPTION = typer.Option(
    False,
    "--examples",
    help="[magenta]Show example scripts[/magenta] alongside tools.",
    rich_help_panel="Tools Options",
)

LIST_OPTION = typer.Option(
    False,
    "-l",
    "--list",
    help="[green]List examples[/green] instead of running one (-l shortcut).",
    rich_help_panel="Examples Options",
)


@app.command(name="run")
def run(
    host: Optional[str] = HOST_OPTION,
    port: Optional[int] = PORT_OPTION,
    workers: Optional[int] = WORKERS_OPTION,
    transport_mode: TransportMode = TRANSPORT_MODE_OPTION,
    debug: bool = DEBUG_OPTION,
    include_tools: List[str] = INCLUDE_TOOLS_OPTION,
    exclude_tools: List[str] = EXCLUDE_TOOLS_OPTION,
    load_all_tools: bool = LOAD_ALL_TOOLS_OPTION,
):
    """
    [bold green]Run the Ultimate MCP Server[/bold green]

    Start the MCP server with configurable networking, performance, and tool options.
    The server exposes MCP-protocol compatible endpoints that AI agents can use to
    access various tools and capabilities.

    By default, only the [yellow]'Base Toolset'[/yellow] is loaded to optimize context window usage.
    Use `--load-all-tools` to load all available tools.

    Network settings control server accessibility, workers affect concurrency,
    and tool filtering lets you customize which capabilities are exposed.

    [bold]Examples:[/bold]
      [cyan]umcp run --host 0.0.0.0 --port 8000 --workers 4[/cyan] (Runs with Base Toolset)
      [cyan]umcp run --load-all-tools --debug[/cyan] (Runs with all tools and debug logging)
      [cyan]umcp run --include-tools browser,audio[/cyan] (Adds browser and audio tools to the Base Toolset)
      [cyan]umcp run --load-all-tools --exclude-tools filesystem[/cyan] (Loads all tools except filesystem)
    """
    # Set debug mode if requested
    if debug:
        os.environ["LOG_LEVEL"] = "DEBUG"

    # Print server info
    server_info_str = (
        f"Host: [cyan]{host or 'default from config'}[/cyan]\n"
        f"Port: [cyan]{port or 'default from config'}[/cyan]\n"
        f"Workers: [cyan]{workers or 'default from config'}[/cyan]\n"
        f"Transport mode: [cyan]{transport_mode}[/cyan]"
    )
    
    # Tool Loading Status
    if load_all_tools:
        server_info_str += "\nTool Loading: [yellow]All Available Tools[/yellow]"
    else:
        server_info_str += "\nTool Loading: [yellow]Base Toolset Only[/yellow] (Use --load-all-tools to load all)"
        # Format the categories for display
        category_lines = []
        for category, tools in BASE_TOOLSET_CATEGORIES.items():
            category_lines.append(f"    [cyan]{category}[/cyan]: {', '.join(tools)}")
        
        server_info_str += "\n  [bold]Includes:[/bold]\n" + "\n".join(category_lines)


    # Print tool filtering info if enabled
    if include_tools or exclude_tools:
        server_info_str += "\n[bold]Tool Filtering:[/bold]"
        if include_tools:
            server_info_str += f"\nIncluding: [cyan]{', '.join(include_tools)}[/cyan]"
        if exclude_tools:
            server_info_str += f"\nExcluding: [red]{', '.join(exclude_tools)}[/red]"

    console.print(Panel(server_info_str, title="[bold blue]Starting Ultimate MCP Server[/bold blue]", expand=False))
    console.print() # Add a newline for spacing
    
    # Convert transport_mode enum to string and handle aliases
    if transport_mode == TransportMode.SHTTP:
        actual_transport_mode = "streamable-http"
    else:
        # Convert enum to string value (e.g., TransportMode.SSE -> "sse")
        actual_transport_mode = transport_mode.value
    
    # Run the server
    run_server(
        host=host,
        port=port,
        workers=workers,
        transport_mode=actual_transport_mode,
        include_tools=include_tools,
        exclude_tools=exclude_tools,
        load_all_tools=load_all_tools,
    )


@app.command(name="providers")
def providers(
    check: bool = CHECK_OPTION,
    models: bool = MODELS_OPTION,
):
    """
    [bold green]List Available Providers[/bold green]

    Display configured LLM providers (OpenAI, Anthropic, Gemini, etc.) 
    with their connection status, default models, and API key validation.
    
    Use this command to verify your configuration, troubleshoot API keys,
    or explore available models across all providers.

    Usage:
      umcp providers               # Basic provider listing
      umcp providers --check       # Validate API keys with providers
      umcp providers --models      # List available models for each provider

    Examples:
      umcp providers --check --models  # Comprehensive provider diagnostics
    """
    asyncio.run(list_providers(check_keys=check, list_models=models))


@app.command(name="test")
def test(
    provider: str = typer.Argument(..., help="Provider to test (openai, anthropic, deepseek, gemini)", rich_help_panel="Test Options"),
    model: Optional[str] = MODEL_OPTION,
    prompt: str = PROMPT_OPTION,
):
    """
    [bold green]Test a Specific Provider[/bold green]

    Verify connectivity and functionality of an LLM provider by sending a test
    prompt and displaying the response. This command performs a full API round-trip
    to validate your credentials, model availability, and proper configuration.
    
    The output includes the response text, token counts, cost estimate,
    and response time metrics to help diagnose performance issues.

    Usage:
      umcp test openai                          # Test default OpenAI model
      umcp test anthropic --model claude-3-5-haiku-20241022  # Test specific model
      umcp test gemini --prompt "Summarize quantum computing"  # Custom prompt

    Examples:
      umcp test openai  # Quick health check with default settings
    """
    with console.status(f"[bold green]Testing provider '{provider}'..."): 
        try:
            asyncio.run(test_provider(provider=provider, model=model, prompt=prompt))
        except Exception as e:
            console.print(Panel(f"Failed to test provider '{provider}':\n{str(e)}", title="[bold red]Test Error[/bold red]", border_style="red"))
            raise typer.Exit(code=1) from e


@app.command(name="complete")
def complete(
    provider: str = PROVIDER_OPTION,
    model: Optional[str] = COMPLETION_MODEL_OPTION,
    prompt: Optional[str] = COMPLETION_PROMPT_OPTION,
    temperature: float = TEMPERATURE_OPTION,
    max_tokens: Optional[int] = MAX_TOKENS_OPTION,
    system: Optional[str] = SYSTEM_OPTION,
    stream: bool = STREAM_OPTION,
):
    """
    [bold green]Generate Text Completion[/bold green]

    Request text generation directly from an LLM provider through the CLI.
    This command bypasses the server's MCP endpoint and sends requests
    directly to the provider's API, useful for testing or quick generations.
    
    Supports input from arguments, stdin (piped content), or interactive prompt.
    The command provides full control over provider selection, model choice, 
    and generation parameters. Results include token counts and cost estimates.

    Usage:
      echo "Tell me about Mars" | umcp complete             # Pipe content as prompt
      umcp complete --prompt "Write a haiku"                # Direct prompt
      umcp complete --provider anthropic --model claude-3-5-sonnet-20241022  # Specify model
      umcp complete --temperature 1.5 --max-tokens 250      # Adjust generation params
      umcp complete --system "You are a helpful assistant"  # Set system prompt
      umcp complete --stream                                # Stream tokens in real-time

    Examples:
      umcp complete --prompt "Write a haiku about autumn." --stream
    """
    # Get prompt from stdin if not provided
    if prompt is None:
        if sys.stdin.isatty():
            console.print("Enter prompt (Ctrl+D to finish):")
        prompt = sys.stdin.read().strip()
    
    asyncio.run(
        generate_completion(
            provider=provider,
            model=model,
            prompt=prompt,
            temperature=temperature,
            max_tokens=max_tokens,
            system=system,
            stream=stream,
        )
    )


@app.command(name="cache")
def cache(
    status: bool = STATUS_OPTION,
    clear: bool = CLEAR_OPTION,
):
    """
    [bold green]Cache Management[/bold green]

    Monitor and maintain the server's response cache system.
    Caching stores previous LLM responses to avoid redundant API calls, 
    significantly reducing costs and latency for repeated or similar requests.
    
    The status view shows backend type, item count, hit rate percentage,
    and estimated cost savings from cache hits. Clearing the cache removes
    all stored responses, which may be necessary after configuration changes
    or to force fresh responses.

    Usage:
      umcp cache                # View cache statistics and status
      umcp cache --status       # Explicitly request status view
      umcp cache --clear        # Remove all cached entries (with confirmation)
      umcp cache --status --clear  # View stats before clearing

    Examples:
      umcp cache  # Check current cache performance and hit rate
    """
    should_clear = False
    if clear:
        if Confirm.ask("[bold yellow]Are you sure you want to clear the cache?[/bold yellow]"):
            should_clear = True
        else:
            console.print("[yellow]Cache clear aborted.[/yellow]")
            raise typer.Exit()

    # Only run the async part if needed
    if status or should_clear:
        with console.status("[bold green]Accessing cache..."):
            try:
                asyncio.run(check_cache(show_status=status, clear=should_clear))
            except Exception as e:
                console.print(Panel(f"Failed to access cache:\n{str(e)}", title="[bold red]Cache Error[/bold red]", border_style="red"))
                raise typer.Exit(code=1) from e
    elif not clear: # If only clear was specified but user aborted
        pass # Do nothing, already printed message
    else:
        console.print("Use --status to view status or --clear to clear the cache.")


@app.command(name="benchmark")
def benchmark(
    providers: List[str] = PROVIDERS_OPTION,
    models: Optional[List[str]] = BENCHMARK_MODELS_OPTION,
    prompt: Optional[str] = BENCHMARK_PROMPT_OPTION,
    runs: int = RUNS_OPTION,
):
    """
    [bold green]Benchmark Providers[/bold green]

    Compare performance metrics and costs across different LLM providers and models.
    The benchmark sends identical prompts to each selected provider/model combination
    and measures response time, token processing speed, and cost per request.
    
    Results are presented in a table format showing average metrics across
    multiple runs to ensure statistical validity. This helps identify the
    most performant or cost-effective options for your specific use cases.

    Usage:
      umcp benchmark                     # Test all configured providers
      umcp benchmark --providers openai,anthropic  # Test specific providers
      umcp benchmark --models gpt-4o,claude-3-5-haiku  # Test specific models
      umcp benchmark --prompt "Explain quantum computing" --runs 5  # Custom benchmark
      
    Examples:
      umcp benchmark --runs 3 --providers openai,gemini  # Compare top providers
    """
    asyncio.run(benchmark_providers(providers=providers, models=models, prompt=prompt, runs=runs))


@app.command(name="tools")
def tools(
    category: Optional[str] = CATEGORY_OPTION,
    show_examples: bool = SHOW_EXAMPLES_OPTION,
):
    """
    [bold green]List Available Tools[/bold green]

    Display the MCP tools registered in the server, organized by functional categories.
    These tools represent the server's capabilities that can be invoked by AI agents
    through the Model Context Protocol (MCP) interface.
    
    Tools are grouped into logical categories like completion, document processing,
    filesystem access, browser automation, and more. For each tool, you can
    optionally view associated example scripts that demonstrate its usage patterns.

    Usage:
      umcp tools                      # List all tools across all categories
      umcp tools --category document  # Show only document-related tools
      umcp tools --examples           # Show example scripts for each tool

    Examples:
      umcp tools --category filesystem --examples  # Learn filesystem tools with examples
    """
    # Manually list tools by category for demonstration
    tool_categories: Dict[str, List[str]] = {
        "completion": [
            "generate_completion",
            "stream_completion", 
            "chat_completion", 
            "multi_completion"
        ],
        "document": [
            "summarize_document",
            "extract_entities",
            "chunk_document",
            "process_document_batch"
        ],
        "extraction": [
            "extract_json",
            "extract_table",
            "extract_key_value_pairs",
            "extract_semantic_schema"
        ],
        "rag": [
            "create_knowledge_base",
            "add_documents",
            "retrieve_context",
            "generate_with_rag"
        ],
        "filesystem": [
            "read_file",
            "write_file",
            "list_directory",
            "search_files"
        ],
        "browser": [
            "browser_init",
            "browser_navigate",
            "browser_click",
            "execute_web_workflow"
        ]
    }
    
    # Filter by category if specified
    if category and category in tool_categories:
        categories_to_show = {category: tool_categories[category]}
    else:
        categories_to_show = tool_categories
    
    # Create Rich table for display
    table = Table(title="Ultimate MCP Server Tools")
    table.add_column("Category", style="cyan")
    table.add_column("Tool", style="green")
    
    if show_examples:
        table.add_column("Example Script", style="yellow")
    
    # Add rows to table
    for module_name, tool_names in sorted(categories_to_show.items()):
        for tool_name in sorted(tool_names):
            example_script = TOOL_TO_EXAMPLE_MAP.get(tool_name, "")
            
            if show_examples:
                table.add_row(
                    module_name, 
                    tool_name,
                    example_script if example_script else "N/A"
                )
            else:
                table.add_row(module_name, tool_name)
    
    console.print(table)
    
    # Print help for running examples
    if show_examples:
        console.print("\n[bold]Tip:[/bold] Run examples using the command:")
        console.print("  [cyan]umcp examples <example_name>[/cyan]")


@app.command(name="examples")
def examples(
    example_name: Optional[str] = typer.Argument(None, help="Name of the example to run"),
    category: Optional[str] = CATEGORY_FILTER_OPTION,
    list_examples: bool = LIST_OPTION,
):
    """
    [bold green]Run or List Example Scripts[/bold green]

    Browse and execute the demonstration Python scripts included with Ultimate MCP Server.
    These examples showcase real-world usage patterns and integration techniques for
    different server capabilities, from basic completions to complex workflows.
    
    Examples are organized by functional category (text-generation, document-processing,
    browser-automation, etc.) and contain fully functional code that interacts with
    a running MCP server. They serve as both educational resources and starting
    points for your own implementations.

    Usage:
      umcp examples               # List all available example scripts
      umcp examples --list        # List-only mode (same as above)
      umcp examples --category browser-automation  # Filter by category
      umcp examples rag_example   # Run specific example (with or without .py extension)
      umcp examples rag_example.py  # Explicit extension version

    Examples:
      umcp examples simple_completion_demo  # Run the basic completion example
    """
    # Ensure we have the examples directory
    project_root = Path(__file__).parent.parent.parent
    examples_dir = project_root / "examples"
    
    if not examples_dir.exists() or not examples_dir.is_dir():
        console.print(f"[bold red]Error:[/bold red] Examples directory not found at: {examples_dir}")
        console.print(Panel(f"Examples directory not found at: {examples_dir}", title="[bold red]Error[/bold red]", border_style="red"))
        return 1
    
    # If just listing examples
    if list_examples or not example_name:
        # Create Rich table for display
        table = Table(title="Ultimate MCP Server Example Scripts")
        table.add_column("Category", style="cyan")
        table.add_column("Example Script", style="green")
        
        # List available examples by category
        for category_name, script_names in sorted(EXAMPLE_CATEGORIES.items()):
            for script_name in sorted(script_names):
                table.add_row(category_name, script_name)
        
        console.print(table)
        
        # Print help for running examples
        console.print("\n[bold]Run an example:[/bold]")
        console.print("  [cyan]umcp examples <example_name>[/cyan]")
        
        return 0
    
    # Run the specified example
    example_file = None
    
    # Check if .py extension was provided
    if example_name.endswith('.py'):
        example_path = examples_dir / example_name
        if example_path.exists():
            example_file = example_path
    else:
        # Try with .py extension
        example_path = examples_dir / f"{example_name}.py"
        if example_path.exists():
            example_file = example_path
        else:
            # Try finding in the tool map
            if example_name in TOOL_TO_EXAMPLE_MAP:
                example_script = TOOL_TO_EXAMPLE_MAP[example_name]
                example_path = examples_dir / example_script
                if example_path.exists():
                    example_file = example_path
    
    if not example_file:
        console.print(f"[bold red]Error:[/bold red] Example '{example_name}' not found")
        console.print(Panel(f"Example script '{example_name}' not found in {examples_dir}", title="[bold red]Error[/bold red]", border_style="red"))
        return 1
    
    # Run the example script
    console.print(f"[bold blue]Running example:[/bold blue] {example_file.name}")
    
    # Change to the project root directory to ensure imports work
    os.chdir(project_root)
    
    # Use subprocess to run the script
    import subprocess
    try:
        # Execute the Python script
        result = subprocess.run(
            [sys.executable, str(example_file)], 
            check=True
        )
        return result.returncode
    except subprocess.CalledProcessError as e:
        console.print(f"[bold red]Error:[/bold red] Example script failed with exit code {e.returncode}")
        console.print(Panel(f"Example script '{example_file.name}' failed with exit code {e.returncode}", title="[bold red]Execution Error[/bold red]", border_style="red"))
        return e.returncode
    except Exception as e:
        console.print(f"[bold red]Error:[/bold red] Failed to run example: {str(e)}")
        console.print(Panel(f"Failed to run example '{example_file.name}':\n{str(e)}", title="[bold red]Execution Error[/bold red]", border_style="red"))
        return 1


@app.callback()
def main(
    version: bool = VERSION_OPTION,
):
    """Ultimate MCP Server - A comprehensive AI agent operating system.
    
    The Ultimate MCP Server provides a unified interface to manage LLM providers,
    tools, and capabilities through the Model Context Protocol (MCP). It enables
    AI agents to access dozens of powerful capabilities including file operations,
    browser automation, document processing, database access, and much more.

    This CLI provides commands to:
    • Start and configure the server
    • Manage and test LLM providers
    • Generate text completions directly
    • View and clear the response cache
    • Benchmark provider performance
    • List available tools and capabilities
    • Run example scripts demonstrating usage patterns
    """
    # This function will be called before any command
    pass


def cli():
    """Entry point for CLI package installation.
    
    This function serves as the main entry point when the package is installed
    and the 'umcp' command is invoked. It's referenced in pyproject.toml's
    [project.scripts] section to create the command-line executable.
    """
    app()


if __name__ == "__main__":
    app() 
```

--------------------------------------------------------------------------------
/examples/tool_composition_examples.py:
--------------------------------------------------------------------------------

```python
"""
Tool composition patterns for MCP servers.

This module demonstrates how to design tools that work together effectively
in sequences and patterns, making it easier for LLMs to understand how to
compose tools for multi-step operations.
"""
import csv
import io
import json
from pathlib import Path
from typing import Any, Dict, List, Optional

from error_handling import non_empty_string, validate_inputs, with_error_handling
from tool_annotations import QUERY_TOOL, READONLY_TOOL
from ultimate_mcp_server.exceptions import ToolExecutionError, ToolInputError
from ultimate_mcp_server.tools.document_conversion_and_processing import (
    summarize_document_standalone,
)
from ultimate_mcp_server.tools.filesystem import delete_file, read_file, write_file
from ultimate_mcp_server.tools.local_text_tools import run_sed
from ultimate_mcp_server.utils import get_logger

logger = get_logger("tool_composition_examples")


class DocumentProcessingExample:
    """
    Example of tool composition for document processing.
    
    This class demonstrates a pattern where multiple tools work together
    to process a document through multiple stages:
    1. Chunking - Break large document into manageable pieces
    2. Analysis - Process each chunk individually
    3. Aggregation - Combine results into a final output
    
    This pattern is ideal for working with large documents that exceed
    context windows.
    """
    
    def __init__(self, mcp_server):
        """Initialize with an MCP server instance."""
        self.mcp = mcp_server
        self._register_tools()
        
    def _register_tools(self):
        """Register document processing tools with the MCP server."""
        
        @self.mcp.tool(
            description=(
                "Split a document into manageable chunks for processing. "
                "This is the FIRST step in processing large documents that exceed context windows. "
                "After chunking, process each chunk separately with analyze_chunk()."
            ),
            annotations=READONLY_TOOL.to_dict(),
            examples=[
                {
                    "name": "Chunk research paper",
                    "description": "Split a research paper into chunks",
                    "input": {"document": "This is a long research paper...", "chunk_size": 1000},
                    "output": {
                        "chunks": ["Chunk 1...", "Chunk 2..."],
                        "chunk_count": 2,
                        "chunk_ids": ["doc123_chunk_1", "doc123_chunk_2"]
                    }
                }
            ]
        )
        @with_error_handling
        @validate_inputs(document=non_empty_string)
        async def chunk_document(
            document: str,
            chunk_size: int = 1000,
            overlap: int = 100,
            ctx=None
        ) -> Dict[str, Any]:
            """
            Split a document into manageable chunks for processing.
            
            This tool is the first step in a multi-step document processing workflow:
            1. First, chunk the document with chunk_document() (this tool)
            2. Then, process each chunk with analyze_chunk()
            3. Finally, combine results with aggregate_chunks()
            
            Args:
                document: The document text to split
                chunk_size: Maximum size of each chunk in characters
                overlap: Number of characters to overlap between chunks
                ctx: Context object passed by the MCP server
                
            Returns:
                Dictionary containing the chunks and their metadata
            """
            # Simple chunking strategy - split by character count with overlap
            chunks = []
            chunk_ids = []
            doc_id = f"doc_{hash(document) % 10000}"
            
            # Create chunks with overlap
            for i in range(0, len(document), chunk_size - overlap):
                chunk_text = document[i:i + chunk_size]
                if chunk_text:
                    chunk_id = f"{doc_id}_chunk_{len(chunks) + 1}"
                    chunks.append(chunk_text)
                    chunk_ids.append(chunk_id)
            
            return {
                "chunks": chunks,
                "chunk_count": len(chunks),
                "chunk_ids": chunk_ids,
                "document_id": doc_id,
                "next_step": "analyze_chunk"  # Hint for the next tool to use
            }
        
        @self.mcp.tool(
            description=(
                "Analyze a single document chunk by summarizing it. "
                "This is the SECOND step in the document processing workflow. "
                "Use after chunk_document() and before aggregate_chunks()."
            ),
            annotations=READONLY_TOOL.to_dict(),
            examples=[
                {
                    "name": "Analyze document chunk",
                    "description": "Analyze a single chunk from a research paper",
                    "input": {"chunk": "This chunk discusses methodology...", "chunk_id": "doc123_chunk_1"},
                    "output": {
                        "analysis": {"key_topics": ["methodology", "experiment design"]},
                        "chunk_id": "doc123_chunk_1"
                    }
                }
            ]
        )
        @with_error_handling
        @validate_inputs(chunk=non_empty_string)
        async def analyze_chunk(
            chunk: str,
            chunk_id: str,
            analysis_type: str = "general",
            ctx=None
        ) -> Dict[str, Any]:
            """
            Analyze a single document chunk by summarizing it.
            
            This tool is the second step in a multi-step document processing workflow:
            1. First, chunk the document with chunk_document()
            2. Then, process each chunk with analyze_chunk() (this tool)
            3. Finally, combine results with aggregate_chunks()
            
            Args:
                chunk: The text chunk to analyze
                chunk_id: The ID of the chunk (from chunk_document)
                analysis_type: Type of analysis to perform
                ctx: Context object passed by the MCP server
                
            Returns:
                Dictionary containing the analysis results
            """
            # --- Call the actual summarize_document tool --- 
            logger.info(f"Analyzing chunk {chunk_id} with summarize_document...")
            try:
                # Use a concise summary for chunk analysis
                summary_result = await summarize_document_standalone(
                    document=chunk,
                    summary_format="key_points", # Use key points for chunk analysis
                    max_length=100 # Keep chunk summaries relatively short
                    # We might need to specify provider/model if defaults aren't suitable
                )

                if summary_result.get("success"):
                    analysis = {
                        "summary": summary_result.get("summary", "[Summary Unavailable]"),
                        "analysis_type": "summary", # Indicate the type of analysis performed
                        "metrics": { # Include metrics from the summary call
                            "cost": summary_result.get("cost", 0.0),
                            "tokens": summary_result.get("tokens", {}),
                            "processing_time": summary_result.get("processing_time", 0.0)
                        }
                    }
                    logger.success(f"Chunk {chunk_id} analyzed successfully.")
                else:
                    logger.warning(f"Summarize tool failed for chunk {chunk_id}: {summary_result.get('error')}")
                    analysis = {"error": f"Analysis failed: {summary_result.get('error')}"}
            except Exception as e:
                logger.error(f"Error calling summarize_document for chunk {chunk_id}: {e}", exc_info=True)
                analysis = {"error": f"Analysis error: {str(e)}"}
            # -------------------------------------------------
            
            return {
                "analysis": analysis,
                "chunk_id": chunk_id,
                "next_step": "aggregate_chunks"  # Hint for the next tool to use
            }
        
        @self.mcp.tool(
            description=(
                "Aggregate analysis results from multiple document chunks. "
                "This is the FINAL step in the document processing workflow. "
                "Use after analyzing individual chunks with analyze_chunk()."
            ),
            annotations=READONLY_TOOL.to_dict(),
            examples=[
                {
                    "name": "Aggregate analysis results",
                    "description": "Combine analysis results from multiple chunks",
                    "input": {
                        "analysis_results": [
                            {"analysis": {"key_topics": ["methodology"]}, "chunk_id": "doc123_chunk_1"},
                            {"analysis": {"key_topics": ["results"]}, "chunk_id": "doc123_chunk_2"}
                        ]
                    },
                    "output": {
                        "document_summary": "This document covers methodology and results...",
                        "overall_statistics": {"total_chunks": 2, "word_count": 2500}
                    }
                }
            ]
        )
        @with_error_handling
        async def aggregate_chunks(
            analysis_results: List[Dict[str, Any]],
            aggregation_type: str = "summary",
            ctx=None
        ) -> Dict[str, Any]:
            """
            Aggregate analysis results from multiple document chunks.
            
            This tool is the final step in a multi-step document processing workflow:
            1. First, chunk the document with chunk_document()
            2. Then, process each chunk with analyze_chunk()
            3. Finally, combine results with aggregate_chunks() (this tool)
            
            Args:
                analysis_results: List of analysis results from analyze_chunk
                aggregation_type: Type of aggregation to perform
                ctx: Context object passed by the MCP server
                
            Returns:
                Dictionary containing the aggregated results
            """
            # Validate input
            if not analysis_results or not isinstance(analysis_results, list):
                return {
                    "error": "Invalid analysis_results. Must provide a non-empty list of analysis results."
                }
            
            # Extract all analyses
            all_analyses = [result.get("analysis", {}) for result in analysis_results if "analysis" in result]
            total_chunks = len(all_analyses)
            
            # Calculate overall statistics
            total_word_count = sum(analysis.get("word_count", 0) for analysis in all_analyses)
            all_key_sentences = [sentence for analysis in all_analyses 
                                for sentence in analysis.get("key_sentences", [])]
            
            # Generate summary based on aggregation type
            if aggregation_type == "summary":
                summary = f"Document contains {total_chunks} chunks with {total_word_count} words total."
                if all_key_sentences:
                    summary += f" Key points include: {' '.join(all_key_sentences[:3])}..."
            elif aggregation_type == "sentiment":
                # Aggregate sentiment scores if available
                sentiment_scores = [analysis.get("sentiment_score", 0.5) for analysis in all_analyses 
                                   if "sentiment_score" in analysis]
                avg_sentiment = sum(sentiment_scores) / len(sentiment_scores) if sentiment_scores else 0.5
                sentiment_label = "positive" if avg_sentiment > 0.6 else "neutral" if avg_sentiment > 0.4 else "negative"
                summary = f"Document has an overall {sentiment_label} sentiment (score: {avg_sentiment:.2f})."
            else:
                summary = f"Aggregated {total_chunks} chunks with {total_word_count} total words."
            
            return {
                "document_summary": summary,
                "overall_statistics": {
                    "total_chunks": total_chunks,
                    "word_count": total_word_count,
                    "key_sentences_count": len(all_key_sentences)
                },
                "workflow_complete": True  # Indicate this is the end of the workflow
            }


# --- Helper: Get a temporary path within allowed storage ---
# Assume storage directory exists and is allowed for this demo context
STORAGE_DIR = Path(__file__).resolve().parent.parent / "storage"
TEMP_DATA_DIR = STORAGE_DIR / "temp_pipeline_data"

async def _setup_temp_data_files():
    """Create temporary data files for the pipeline demo."""
    TEMP_DATA_DIR.mkdir(exist_ok=True)
    # Sample CSV Data
    csv_data = io.StringIO()
    writer = csv.writer(csv_data)
    writer.writerow(["date", "amount", "category"])
    writer.writerow(["2023-01-01", "1,200", "electronics"]) # Note: Amount as string with comma
    writer.writerow(["2023-01-02", "950", "clothing"])
    writer.writerow(["2023-01-03", "1500", "electronics"])
    writer.writerow(["2023-01-04", "800", "food"])
    csv_content = csv_data.getvalue()
    csv_path = TEMP_DATA_DIR / "temp_sales.csv"
    await write_file(path=str(csv_path), content=csv_content) # Use write_file tool implicitly

    # Sample JSON Data
    json_data = [
        {"user_id": 101, "name": "Alice", "active": True, "last_login": "2023-01-10"},
        {"user_id": 102, "name": "Bob", "active": False, "last_login": "2022-12-15"},
        {"user_id": 103, "name": "Charlie", "active": True, "last_login": "2023-01-05"},
    ]
    json_content = json.dumps(json_data, indent=2)
    json_path = TEMP_DATA_DIR / "temp_users.json"
    await write_file(path=str(json_path), content=json_content) # Use write_file tool implicitly

    return {"csv": str(csv_path), "json": str(json_path)}

async def _cleanup_temp_data_files(temp_files: Dict[str, str]):
    """Remove temporary data files."""
    for file_path in temp_files.values():
        try:
            await delete_file(path=file_path) # Use delete_file tool implicitly
            logger.debug(f"Cleaned up temp file: {file_path}")
        except Exception as e:
            logger.warning(f"Failed to clean up temp file {file_path}: {e}")
    try:
        # Attempt to remove the directory if empty
        if TEMP_DATA_DIR.exists() and not any(TEMP_DATA_DIR.iterdir()):
             TEMP_DATA_DIR.rmdir()
             logger.debug(f"Cleaned up temp directory: {TEMP_DATA_DIR}")
    except Exception as e:
         logger.warning(f"Failed to remove temp directory {TEMP_DATA_DIR}: {e}")

# --- End Helper ---

class DataPipelineExample:
    """
    Example of tool composition for data processing pipelines.
    
    This class demonstrates a pattern where tools form a processing
    pipeline to transform, filter, and analyze data:
    1. Fetch - Get data from a source
    2. Transform - Clean and process the data
    3. Filter - Select relevant data
    4. Analyze - Perform analysis on filtered data
    
    This pattern is ideal for working with structured data that
    needs multiple processing steps.
    """
    
    def __init__(self, mcp_server):
        """Initialize with an MCP server instance."""
        self.mcp = mcp_server
        self._register_tools()
        
    def _register_tools(self):
        """Register data pipeline tools with the MCP server."""
        
        @self.mcp.tool(
            description=(
                "Fetch data from a temporary source file based on type. "
                "This is the FIRST step in the data pipeline. "
                "Continue with transform_data() to clean the fetched data."
            ),
            annotations=QUERY_TOOL.to_dict(),
            examples=[
                {
                    "name": "Fetch CSV data",
                    "description": "Fetch data from a CSV source",
                    "input": {"source_type": "csv"},
                    "output": {
                        "data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
                        "record_count": 2,
                        "schema": {"date": "string", "amount": "number"}
                    }
                }
            ]
        )
        @with_error_handling
        async def fetch_data(
            source_type: str,
            limit: Optional[int] = None,
            ctx=None
        ) -> Dict[str, Any]:
            """
            Fetch data from a temporary source file based on type.
            
            This tool is the first step in a data processing pipeline:
            1. First, fetch data with fetch_data() (this tool) - Creates temp files if needed.
            2. Then, clean the data with transform_data()
            3. Then, filter the data with filter_data()
            4. Finally, analyze the data with analyze_data()
            
            Args:
                source_type: Type of data source (csv or json for this demo).
                limit: Maximum number of records to fetch/read (applied after reading).
                ctx: Context object passed by the MCP server.
                
            Returns:
                Dictionary containing the fetched data and metadata
            """
            # Ensure temp files exist
            temp_files = await _setup_temp_data_files()
            source_path = temp_files.get(source_type.lower())

            if not source_path:
                 raise ToolInputError(f"Unsupported source_type for demo: {source_type}. Use 'csv' or 'json'.")

            logger.info(f"Fetching data from temporary file: {source_path}")
            
            # Use read_file tool implicitly to get content
            read_result = await read_file(path=source_path)
            if not read_result.get("success"):
                 raise ToolExecutionError(f"Failed to read temporary file {source_path}: {read_result.get('error')}")
            
            # Assuming read_file returns content in a predictable way (e.g., result['content'][0]['text'])
            # Adjust parsing based on actual read_file output structure
            content = read_result.get("content", [])
            if not content or not isinstance(content, list) or "text" not in content[0]:
                 raise ToolExecutionError(f"Unexpected content structure from read_file for {source_path}")
            
            file_content = content[0]["text"]
            data = []
            schema = {}

            try:
                if source_type.lower() == "csv":
                    # Parse CSV data
                    csv_reader = csv.reader(io.StringIO(file_content))
                    headers = next(csv_reader)
                    for row in csv_reader:
                        if row: # Skip empty rows
                            data.append(dict(zip(headers, row, strict=False)))
                elif source_type.lower() == "json":
                    # Parse JSON data
                    data = json.loads(file_content)
                else:
                    # Default dummy data if somehow type is wrong despite check
                    data = [{"id": i, "value": f"Sample {i}"} for i in range(1, 6)]
            except Exception as parse_error:
                 raise ToolExecutionError(f"Failed to parse content from {source_path}: {parse_error}") from parse_error

            # Apply limit if specified AFTER reading/parsing
            if limit and limit > 0 and len(data) > limit:
                data = data[:limit]
            
            # Infer schema from first record
            if data:
                first_record = data[0]
                for key, value in first_record.items():
                    value_type = "string"
                    if isinstance(value, (int, float)):
                        value_type = "number"
                    elif isinstance(value, bool):
                        value_type = "boolean"
                    schema[key] = value_type
            
            return {
                "data": data,
                "record_count": len(data),
                "schema": schema,
                "source_info": {"type": source_type, "path": source_path},
                "next_step": "transform_data"  # Hint for the next tool to use
            }
        
        @self.mcp.tool(
            description=(
                "Transform and clean data using basic text processing tools (sed). "
                "This is the SECOND step in the data pipeline. "
                "Use after fetch_data() and before filter_data()."
            ),
            annotations=READONLY_TOOL.to_dict(),
            examples=[
                {
                    "name": "Transform sales data",
                    "description": "Clean and transform sales data",
                    "input": {
                        "data": [{"date": "2023-01-01", "amount": "1,200"}, {"date": "2023-01-02", "amount": "950"}],
                        "transformations": ["convert_dates", "normalize_numbers"]
                    },
                    "output": {
                        "transformed_data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
                        "transformation_log": ["Converted 2 dates", "Normalized 2 numbers"]
                    }
                }
            ]
        )
        @with_error_handling
        async def transform_data(
            data: List[Dict[str, Any]],
            transformations: List[str] = None,
            custom_transformations: Dict[str, str] = None,
            ctx=None
        ) -> Dict[str, Any]:
            """
            Transform and clean data using basic text processing tools (sed).
            
            This tool is the second step in a data processing pipeline:
            1. First, fetch data with fetch_data()
            2. Then, clean the data with transform_data() (this tool)
            3. Then, filter the data with filter_data()
            4. Finally, analyze the data with analyze_data()
            
            Args:
                data: List of data records to transform
                transformations: List of built-in transformations to apply
                custom_transformations: Dictionary of field->transform_expression
                ctx: Context object passed by the MCP server
                
            Returns:
                Dictionary containing the transformed data and transformation log
            """
            # Validate input
            if not data or not isinstance(data, list) or not all(isinstance(r, dict) for r in data):
                return {
                    "error": "Invalid data. Must provide a non-empty list of records (dictionaries)."
                }
            
            transformation_log = []
            # Convert input data (list of dicts) to a string format suitable for sed (e.g., JSON lines)
            try:
                input_text = "\n".join(json.dumps(record) for record in data)
            except Exception as e:
                return {"error": f"Could not serialize input data for transformation: {e}"}
            
            current_text = input_text
            sed_scripts = [] # Accumulate sed commands
            
            # Apply standard transformations if specified
            transformations = transformations or []
            for transform in transformations:
                if transform == "convert_dates":
                    # Use sed to replace '/' with '-' in date-like fields (heuristic)
                    # This is complex with JSON structure, better done after parsing.
                    # For demo, we apply a simple global substitution (less robust)
                    sed_scripts.append("s|/|-|g")
                    transformation_log.append("Applied date conversion (sed: s|/|-|g)")
                
                elif transform == "normalize_numbers":
                    # Use sed to remove commas from numbers (heuristic)
                    # Example: "amount": "1,200" -> "amount": "1200"
                    sed_scripts.append('s/"([a-zA-Z_]+)":"([0-9,]+)"/"\1":"\2"/g; s/,//g') # More complex sed needed
                    transformation_log.append("Applied number normalization (sed: remove commas)")
            
            # --- Execute accumulated sed scripts --- 
            if sed_scripts:
                # Combine scripts with -e for each
                combined_script = " ".join([f"-e '{s}'" for s in sed_scripts])
                logger.info(f"Running sed transformation with script: {combined_script}")
                try:
                    sed_result = await run_sed(
                        args_str=combined_script, # Pass combined script
                        input_data=current_text
                    )

                    if sed_result.get("success"):
                        current_text = sed_result["stdout"]
                        logger.success("Sed transformation completed successfully.")
                    else:
                        error_msg = sed_result.get("error", "Sed command failed")
                        logger.error(f"Sed transformation failed: {error_msg}")
                        return {"error": f"Transformation failed: {error_msg}"}
                except Exception as e:
                    logger.error(f"Error running sed transformation: {e}", exc_info=True)
                    return {"error": f"Transformation execution error: {e}"}
            
            # --- Attempt to parse back to list of dicts --- 
            try:
                transformed_data = []
                for line in current_text.strip().split("\n"):
                    if line:
                        record = json.loads(line)
                        # Post-processing for number normalization (sed only removes commas)
                        if "normalize_numbers" in transformations:
                            for key, value in record.items():
                                if isinstance(value, str) and value.replace(".", "", 1).isdigit():
                                    try:
                                        record[key] = float(value) if "." in value else int(value)
                                    except ValueError:
                                        pass # Keep as string if conversion fails
                        transformed_data.append(record)
                logger.success("Successfully parsed transformed data back to JSON objects.")
            except Exception as e:
                logger.error(f"Could not parse transformed data back to JSON: {e}", exc_info=True)
                # Return raw text if parsing fails
                return {
                    "transformed_data_raw": current_text,
                    "transformation_log": transformation_log,
                    "warning": "Could not parse final data back to JSON records"
                }
            # ---------------------------------------------------

            return {
                "transformed_data": transformed_data,
                "transformation_log": transformation_log,
                "record_count": len(transformed_data),
                "next_step": "filter_data"  # Hint for the next tool to use
            }
        
        @self.mcp.tool(
            description=(
                "Filter data based on criteria. "
                "This is the THIRD step in the data pipeline. "
                "Use after transform_data() and before analyze_data()."
            ),
            annotations=READONLY_TOOL.to_dict(),
            examples=[
                {
                    "name": "Filter data",
                    "description": "Filter data based on criteria",
                    "input": {
                        "data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
                        "filter_criteria": {"amount": {"$gt": 1000}}
                    },
                    "output": {
                        "filtered_data": [{"date": "2023-01-01", "amount": 1200}],
                        "filter_criteria": {"amount": {"$gt": 1000}}
                    }
                }
            ]
        )
        @with_error_handling
        async def filter_data(
            data: List[Dict[str, Any]],
            filter_criteria: Dict[str, Any],
            ctx=None
        ) -> Dict[str, Any]:
            """
            Filter data based on criteria.
            
            This tool is the third step in a data processing pipeline:
            1. First, fetch data with fetch_data()
            2. Then, clean the data with transform_data()
            3. Then, filter the data with filter_data() (this tool)
            4. Finally, analyze the data with analyze_data()
            
            Args:
                data: List of data records to filter
                filter_criteria: Criteria to filter data
                ctx: Context object passed by the MCP server
                
            Returns:
                Dictionary containing the filtered data and filter criteria
            """
            # Filter data based on criteria
            filtered_data = [record for record in data if all(record.get(key) == value for key, value in filter_criteria.items())]
            
            return {
                "filtered_data": filtered_data,
                "filter_criteria": filter_criteria,
                "record_count": len(filtered_data)
            }
        
        @self.mcp.tool(
            description=(
                "Analyze data. "
                "This is the FINAL step in the data pipeline. "
                "Use after filtering data with filter_data()."
            ),
            annotations=READONLY_TOOL.to_dict(),
            examples=[
                {
                    "name": "Analyze data",
                    "description": "Analyze filtered data",
                    "input": {
                        "data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
                        "analysis_type": "summary"
                    },
                    "output": {
                        "analysis_results": [
                            {"analysis": {"key_topics": ["methodology"]}, "chunk_id": "doc123_chunk_1"},
                            {"analysis": {"key_topics": ["results"]}, "chunk_id": "doc123_chunk_2"}
                        ],
                        "analysis_type": "summary"
                    }
                }
            ]
        )
        @with_error_handling
        async def analyze_data(
            data: List[Dict[str, Any]],
            analysis_type: str = "summary",
            ctx=None
        ) -> Dict[str, Any]:
            """
            Analyze data.
            
            This tool is the final step in a data processing pipeline:
            1. First, fetch data with fetch_data()
            2. Then, clean the data with transform_data()
            3. Then, filter the data with filter_data()
            4. Finally, analyze the data with analyze_data() (this tool)
            
            Args:
                data: List of data records to analyze
                analysis_type: Type of analysis to perform
                ctx: Context object passed by the MCP server
                
            Returns:
                Dictionary containing the analysis results
            """
            # Simulate analysis based on analysis_type
            if analysis_type == "summary":
                # Aggregate analysis results
                analysis_results = [{"analysis": {"key_topics": ["methodology"]}, "chunk_id": "doc123_chunk_1"},
                                    {"analysis": {"key_topics": ["results"]}, "chunk_id": "doc123_chunk_2"}]
            else:
                # Placeholder for other analysis types
                analysis_results = []
            
            return {
                "analysis_results": analysis_results,
                "analysis_type": analysis_type
            }

    async def cleanup_pipeline_data(self):
        """Cleans up temporary data files created by fetch_data."""
        await _cleanup_temp_data_files({"csv": str(TEMP_DATA_DIR / "temp_sales.csv"), "json": str(TEMP_DATA_DIR / "temp_users.json")})

# Example usage (if this file were run directly or imported)
# async def run_pipeline_example():
#     # ... initialize MCP server ...
#     pipeline = DataPipelineExample(mcp_server)
#     try:
#         # ... run pipeline steps ...
#         fetch_result = await pipeline.fetch_data(source_type="csv")
#         transform_result = await pipeline.transform_data(data=fetch_result['data'])
#         # ... etc ...
#     finally:
#         await pipeline.cleanup_pipeline_data()

# asyncio.run(run_pipeline_example()) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/completion.py:
--------------------------------------------------------------------------------

```python
"""Text completion tools for Ultimate MCP Server."""
import asyncio
import time
from typing import Any, AsyncGenerator, Dict, List, Optional

from ultimate_mcp_server.constants import Provider, TaskType
from ultimate_mcp_server.core.providers.base import get_provider, parse_model_string
from ultimate_mcp_server.exceptions import ProviderError, ToolError, ToolInputError
from ultimate_mcp_server.services.cache import with_cache
from ultimate_mcp_server.tools.base import with_error_handling, with_retry, with_tool_metrics
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.completion")

# --- Tool Functions (Standalone, Decorated) ---

@with_tool_metrics
@with_error_handling
async def generate_completion(
    prompt: str,
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None,
    max_tokens: Optional[int] = None,
    temperature: float = 0.7,
    stream: bool = False,
    json_mode: bool = False,
    additional_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """Generates a single, complete text response for a given prompt (non-streaming).

    Use this tool for single-turn tasks where the entire response is needed at once,
    such as answering a question, summarizing text, translating, or classifying content.
    It waits for the full response from the LLM before returning.

    If you need the response to appear incrementally (e.g., for user interfaces or long generations),
    use the `stream_completion` tool instead.

    Args:
        prompt: The input text prompt for the LLM.
        provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini"). Defaults to "openai".
                  Use `list_models` or `get_provider_status` to see available providers.
        model: The specific model ID (e.g., "openai/gpt-4.1-mini", "anthropic/claude-3-5-haiku-20241022").
               If None, the provider's default model is used. Use `list_models` to find available IDs.
        max_tokens: (Optional) Maximum number of tokens to generate in the response.
        temperature: (Optional) Controls response randomness (0.0=deterministic, 1.0=creative). Default 0.7.
        stream: Must be False for this tool. Set to True to trigger an error directing to `stream_completion`.
        json_mode: (Optional) When True, instructs the model to return a valid JSON response. Default False.
                   Note: Support and behavior varies by provider.
        additional_params: (Optional) Dictionary of additional provider-specific parameters (e.g., `{"top_p": 0.9}`).

    Returns:
        A dictionary containing the full completion and metadata:
        {
            "text": "The generated completion text...",
            "model": "provider/model-used",
            "provider": "provider-name",
            "tokens": {
                "input": 15,
                "output": 150,
                "total": 165
            },
            "cost": 0.000123, # Estimated cost in USD
            "processing_time": 1.23, # Execution time in seconds
            "success": true
        }

    Raises:
        ToolInputError: If `stream` is set to True.
        ProviderError: If the provider is unavailable or the LLM request fails.
        ToolError: For other internal errors.
    """
    # Streaming not supported for this endpoint
    if stream:
        raise ToolInputError(
            "Streaming is not supported for `generate_completion`. Use the `stream_completion` tool instead.",
            param_name="stream",
            provided_value=stream
        )
            
    start_time = time.time()
    
    # Check if model contains a provider prefix (e.g., "openai/gpt-4.1-mini")
    if model:
        extracted_provider, extracted_model = parse_model_string(model)
        if extracted_provider:
            provider = extracted_provider  # Override provider with the one from the model string
            model = extracted_model  # Use the model name without the provider prefix
            logger.debug(f"Using provider '{provider}' and model '{model}' extracted from model string")
    
    # Get provider instance
    try:
        # Use provider name directly, get_provider handles splitting if needed
        provider_instance = await get_provider(provider)
    except Exception as e:
        raise ProviderError(
            f"Failed to initialize provider '{provider}': {str(e)}",
            provider=provider,
            cause=e
        ) from e
    
    # Set default additional params
    additional_params = additional_params or {}
    
    # Conditionally construct parameters for the provider call
    params_for_provider = {
        "prompt": prompt,
        "model": model, # model here is already stripped of provider prefix if applicable
        "temperature": temperature,
        "json_mode": json_mode,
        # messages will be handled by chat_completion, this is for simple completion
    }
    if max_tokens is not None:
        params_for_provider["max_tokens"] = max_tokens
    
    # Merge any other additional_params, ensuring they don't overwrite core params already set
    # or ensuring that additional_params are provider-specific and don't conflict.
    # A safer merge would be params_for_provider.update({k: v for k, v in additional_params.items() if k not in params_for_provider})
    # However, the current **additional_params likely intends to override if keys match, so we keep that behavior for now
    # but apply it to the conditionally built dict.
    final_provider_params = {**params_for_provider, **additional_params}

    try:
        # Generate completion
        result = await provider_instance.generate_completion(
            **final_provider_params
        )
        
        # Calculate processing time
        processing_time = time.time() - start_time
        
        # Log success
        logger.success(
            f"Completion generated successfully with {provider}/{result.model}",
            emoji_key=TaskType.COMPLETION.value,
            tokens={
                "input": result.input_tokens,
                "output": result.output_tokens
            },
            cost=result.cost,
            time=processing_time
        )
        
        # Return standardized result
        return {
            "text": result.text,
            "model": result.model, # Return the actual model used (might differ from input if default)
            "provider": provider,
            "tokens": {
                "input": result.input_tokens,
                "output": result.output_tokens,
                "total": result.total_tokens,
            },
            "cost": result.cost,
            "processing_time": processing_time,
            "success": True
        }
        
    except Exception as e:
        # Convert to provider error
        # Use the potentially prefixed model name in the error context
        error_model = model or f"{provider}/default"
        raise ProviderError(
            f"Completion generation failed for model '{error_model}': {str(e)}",
            provider=provider,
            model=error_model,
            cause=e
        ) from e

@with_tool_metrics
@with_error_handling
async def stream_completion(
    prompt: str,
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None,
    max_tokens: Optional[int] = None,
    temperature: float = 0.7,
    json_mode: bool = False,
    additional_params: Optional[Dict[str, Any]] = None
) -> AsyncGenerator[Dict[str, Any], None]:
    """Generates a text completion for a prompt and streams the response chunk by chunk.

    Use this tool when you need to display the LLM's response progressively as it's generated,
    improving perceived responsiveness for users, especially for longer outputs.

    If you need the entire response at once, use `generate_completion`.

    Args:
        prompt: The input text prompt for the LLM.
        provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini"). Defaults to "openai".
                  Use `list_models` or `get_provider_status` to see available providers.
        model: The specific model ID (e.g., "openai/gpt-4.1-mini", "anthropic/claude-3-5-haiku-20241022").
               If None, the provider's default model is used. Use `list_models` to find available IDs.
        max_tokens: (Optional) Maximum number of tokens to generate in the response.
        temperature: (Optional) Controls response randomness (0.0=deterministic, 1.0=creative). Default 0.7.
        json_mode: (Optional) When True, instructs the model to return a valid JSON response. Default False.
        additional_params: (Optional) Dictionary of additional provider-specific parameters (e.g., `{"top_p": 0.9}`).

    Yields:
        A stream of dictionary chunks. Each chunk contains:
        {
            "text": "The incremental piece of generated text...",
            "chunk_index": 1,          # Sequence number of this chunk (starts at 1)
            "provider": "provider-name",
            "model": "provider/model-used",
            "finish_reason": null,    # Reason generation stopped (e.g., "stop", "length"), null until the end
            "finished": false         # True only for the very last yielded dictionary
        }
        The *final* yielded dictionary will have `finished: true` and may also contain aggregate metadata:
        {
            "text": "",               # Final chunk might be empty
            "chunk_index": 10,
            "provider": "provider-name",
            "model": "provider/model-used",
            "finish_reason": "stop",
            "finished": true,
            "full_text": "The complete generated text...", # Full response concatenated
            "processing_time": 5.67,                   # Total time in seconds
            "tokens": { "input": ..., "output": ..., "total": ... }, # Final token counts
            "cost": 0.000543                          # Final estimated cost
            "error": null                             # Error message if one occurred during streaming
        }

    Raises:
        ProviderError: If the provider is unavailable or the LLM stream request fails initially.
                       Errors during the stream yield an error message in the final chunk.
    """
    start_time = time.time()
    
    # Add MCP annotations for audience and priority
    # annotations = {  # noqa: F841
    #     "audience": ["assistant", "user"],  # Useful for both assistant and user
    #     "priority": 0.8  # High priority but not required (generate_completion is the primary tool)
    # }
    
    # Check if model contains a provider prefix (e.g., "openai/gpt-4.1-mini")
    if model:
        extracted_provider, extracted_model = parse_model_string(model)
        if extracted_provider:
            provider = extracted_provider  # Override provider with the one from the model string
            model = extracted_model  # Use the model name without the provider prefix
            logger.debug(f"Using provider '{provider}' and model '{model}' extracted from model string")
    
    # Get provider instance
    try:
        provider_instance = await get_provider(provider)
    except Exception as e:
        logger.error(
            f"Failed to initialize provider '{provider}': {str(e)}",
            emoji_key="error",
            provider=provider
        )
        # Yield a single error chunk if provider init fails
        yield {
            "error": f"Failed to initialize provider '{provider}': {str(e)}",
            "text": None,
            "finished": True,
            "provider": provider,
            "model": model
        }
        return
    
    # Set default additional params
    additional_params = additional_params or {}
    
    logger.info(
        f"Starting streaming completion with {provider}",
        emoji_key=TaskType.COMPLETION.value,
        prompt_length=len(prompt),
        json_mode_requested=json_mode # Log the request
    )
    
    chunk_count = 0
    full_text = ""
    final_metadata = {} # To store final metadata like model, cost etc.
    error_during_stream = None
    actual_model_used = model # Keep track of the actual model used

    try:
        # Get stream, passing json_mode directly
        stream = provider_instance.generate_completion_stream(
            prompt=prompt,
            model=model,
            max_tokens=max_tokens,
            temperature=temperature,
            json_mode=json_mode, # Pass the flag here
            **additional_params
        )
        
        async for chunk, metadata in stream:
            chunk_count += 1
            full_text += chunk
            final_metadata.update(metadata) # Keep track of latest metadata
            actual_model_used = metadata.get("model", actual_model_used) # Update if metadata provides it
            
            # Yield chunk with metadata
            yield {
                "text": chunk,
                "chunk_index": chunk_count,
                "provider": provider,
                "model": actual_model_used,
                "finish_reason": metadata.get("finish_reason"),
                "finished": False,
            }
            
    except Exception as e:
        error_during_stream = f"Error during streaming after {chunk_count} chunks: {type(e).__name__}: {str(e)}"
        logger.error(
            f"Error during streaming completion with {provider}/{actual_model_used or 'default'}: {error_during_stream}",
            emoji_key="error"
        )
        # Don't return yet, yield the final chunk with the error

    # --- Final Chunk --- 
    processing_time = time.time() - start_time
    
    # Log completion (success or failure based on error_during_stream)
    log_level = logger.error if error_during_stream else logger.success
    log_message = f"Streaming completion finished ({chunk_count} chunks)" if not error_during_stream else f"Streaming completion failed after {chunk_count} chunks"
    
    log_level(
        log_message,
        emoji_key="error" if error_during_stream else "success",
        provider=provider,
        model=actual_model_used,
        tokens={
            "input": final_metadata.get("input_tokens"),
            "output": final_metadata.get("output_tokens")
        },
        cost=final_metadata.get("cost"),
        time=processing_time,
        error=error_during_stream
    )

    # Yield the final aggregated chunk
    yield {
        "text": "", # No new text in the final summary chunk
        "chunk_index": chunk_count + 1,
        "provider": provider,
        "model": actual_model_used,
        "finish_reason": final_metadata.get("finish_reason"),
        "finished": True,
        "full_text": full_text,
        "processing_time": processing_time,
        "tokens": { 
            "input": final_metadata.get("input_tokens"), 
            "output": final_metadata.get("output_tokens"), 
            "total": final_metadata.get("total_tokens")
        },
        "cost": final_metadata.get("cost"),
        "error": error_during_stream 
    }

@with_tool_metrics
@with_error_handling
async def generate_completion_stream(
    prompt: str,
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None,
    max_tokens: Optional[int] = None,
    temperature: float = 0.7,
    json_mode: bool = False,
    additional_params: Optional[Dict[str, Any]] = None
) -> AsyncGenerator[Dict[str, Any], None]:
    """Generates a text response in a streaming fashion for a given prompt.
    
    Use this tool when you want to display the response as it's being generated
    without waiting for the entire response to be completed. It yields chunks of text
    as they become available, allowing for more interactive user experiences.
    
    Args:
        prompt: The text prompt to send to the LLM.
        provider: The LLM provider to use (default: "openai").
        model: The specific model to use (if None, uses provider's default).
        max_tokens: Maximum tokens to generate in the response.
        temperature: Controls randomness in the output (0.0-1.0).
        json_mode: Whether to request JSON formatted output from the model.
        additional_params: Additional provider-specific parameters.
        
    Yields:
        Dictionary containing the generated text chunk and metadata:
        {
            "text": str,           # The text chunk
            "metadata": {...},     # Additional information about the generation
            "done": bool           # Whether this is the final chunk
        }
        
    Raises:
        ToolError: If an error occurs during text generation.
    """
    # Initialize variables to track metrics
    start_time = time.time()
    
    try:
        # Get provider instance
        provider_instance = await get_provider(provider)
        if not provider_instance:
            raise ValueError(f"Invalid provider: {provider}")
            
        # Add json_mode to additional_params if specified
        params = additional_params.copy() if additional_params else {}
        if json_mode:
            params["json_mode"] = True
        
        # Stream the completion
        async for chunk, metadata in provider_instance.generate_completion_stream(
            prompt=prompt,
            model=model,
            max_tokens=max_tokens,
            temperature=temperature,
            **params
        ):
            # Calculate elapsed time for each chunk
            elapsed_time = time.time() - start_time
            
            # Include additional metadata with each chunk
            response = {
                "text": chunk,
                "metadata": {
                    **metadata,
                    "elapsed_time": elapsed_time,
                },
                "done": metadata.get("finish_reason") is not None
            }
            
            yield response
            
    except Exception as e:
        logger.error(f"Error in generate_completion_stream: {str(e)}", exc_info=True)
        raise ToolError(f"Failed to generate streaming completion: {str(e)}") from e

@with_cache(ttl=24 * 60 * 60) # Cache results for 24 hours
@with_tool_metrics
@with_retry(max_retries=2, retry_delay=1.0) # Retry up to 2 times on failure
@with_error_handling
async def chat_completion(
    messages: List[Dict[str, Any]],
    provider: str = Provider.OPENAI.value,
    model: Optional[str] = None,
    max_tokens: Optional[int] = None,
    temperature: float = 0.7,
    system_prompt: Optional[str] = None,
    json_mode: bool = False,
    additional_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """Generates a response within a conversational context (multi-turn chat).

    Use this tool for chatbot interactions, instruction following, or any task requiring
    the LLM to consider previous turns in a conversation. It takes a list of messages
    (user, assistant, system roles) as input.

    This tool automatically retries on transient failures and caches results for identical requests
    (based on messages, model, etc.) for 24 hours to save costs and time.
    Streaming is NOT supported; this tool returns the complete chat response at once.

    Args:
        messages: A list of message dictionaries representing the conversation history.
                  Each dictionary must have "role" ("user", "assistant", or "system") and "content" (string).
                  Example: `[{"role": "user", "content": "Hello!"}, {"role": "assistant", "content": "Hi there!"}]`
        provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini"). Defaults to "openai".
        model: The specific model ID (e.g., "openai/gpt-4o", "anthropic/claude-3-7-sonnet-20250219").
               If None, the provider's default model is used. Use `list_models` to find available IDs.
        max_tokens: (Optional) Maximum number of tokens for the *assistant's* response.
        temperature: (Optional) Controls response randomness (0.0=deterministic, 1.0=creative). Default 0.7.
        system_prompt: (Optional) An initial system message to guide the model's behavior (e.g., persona, instructions).
                       If provided, it's effectively prepended to the `messages` list as a system message.
        json_mode: (Optional) Request structured JSON output from the LLM. Default False.
        additional_params: (Optional) Dictionary of additional provider-specific parameters (e.g., `{"top_p": 0.9}`).

    Returns:
        A dictionary containing the assistant's response message and metadata:
        {
            "message": {
                "role": "assistant",
                "content": "The assistant's generated response..."
            },
            "model": "provider/model-used",
            "provider": "provider-name",
            "tokens": {
                "input": 55,  # Includes all input messages
                "output": 120, # Assistant's response only
                "total": 175
            },
            "cost": 0.000150, # Estimated cost in USD
            "processing_time": 2.50, # Execution time in seconds
            "cached_result": false, # True if the result was served from cache
            "success": true
        }

    Raises:
        ToolInputError: If the `messages` format is invalid.
        ProviderError: If the provider is unavailable or the LLM request fails (after retries).
        ToolError: For other internal errors.
    """
    start_time = time.time()

    # Validate messages format
    if not isinstance(messages, list) or not all(isinstance(m, dict) and 'role' in m and 'content' in m for m in messages):
        raise ToolInputError(
            "Invalid messages format. Must be a list of dictionaries, each with 'role' and 'content'.",
            param_name="messages",
            provided_value=messages
        )

    # Prepend system prompt if provided
    if system_prompt:
        # Avoid modifying the original list if called multiple times
        processed_messages = [{"role": "system", "content": system_prompt}] + messages
    else:
        processed_messages = messages
        
    # Check if model contains a provider prefix (e.g., "openai/gpt-4.1-mini")
    if model:
        extracted_provider, extracted_model = parse_model_string(model)
        if extracted_provider:
            provider = extracted_provider  # Override provider with the one from the model string
            model = extracted_model  # Use the model name without the provider prefix
            logger.debug(f"Using provider '{provider}' and model '{model}' extracted from model string")

    # Get provider instance
    try:
        provider_instance = await get_provider(provider)
    except Exception as e:
        raise ProviderError(
            f"Failed to initialize provider '{provider}': {str(e)}",
            provider=provider,
            cause=e
        ) from e

    additional_params = additional_params or {}
    # Add json_mode to additional params if specified
    if json_mode:
        additional_params["json_mode"] = True

    try:
        result = await provider_instance.generate_completion(
            messages=processed_messages,
            model=model,
            max_tokens=max_tokens,
            temperature=temperature,
            **additional_params
        )
        
        processing_time = time.time() - start_time
        
        logger.success(
            f"Chat completion generated successfully with {provider}/{result.model}",
            emoji_key=TaskType.CHAT.value,
            tokens={
                "input": result.input_tokens,
                "output": result.output_tokens
            },
            cost=result.cost,
            time=processing_time
        )

        return {
            "message": result.message.dict() if hasattr(result.message, 'dict') else result.message, # Return message as dict
            "model": result.model,
            "provider": provider,
            "tokens": {
                "input": result.input_tokens,
                "output": result.output_tokens,
                "total": result.total_tokens,
            },
            "cost": result.cost,
            "processing_time": processing_time,
            # Note: cached_result is automatically added by the @with_cache decorator if applicable
            "success": True
        }

    except Exception as e:
        error_model = model or f"{provider}/default"
        # Check if the exception has the model attribute, otherwise use the determined error_model
        error_model_from_exception = getattr(e, 'model', None)
        final_error_model = error_model_from_exception or error_model

        raise ProviderError(
            f"Chat completion generation failed for model '{final_error_model}': {str(e)}",
            provider=provider,
            model=final_error_model,
            cause=e
        ) from e


@with_cache(ttl=7 * 24 * 60 * 60) # Cache results for 7 days
@with_tool_metrics
@with_error_handling # Error handling should be used
async def multi_completion(
    prompt: str,
    providers: List[Dict[str, Any]],
    max_concurrency: int = 3,
    timeout: Optional[float] = 30.0
) -> Dict[str, Any]:
    """Generates completions for the same prompt from multiple LLM providers/models concurrently.

    Use this tool to compare responses, latency, or cost across different models for a specific prompt.
    It runs requests in parallel up to `max_concurrency`.

    Results are cached for 7 days based on the prompt and provider configurations.

    Args:
        prompt: The input text prompt to send to all specified models.
        providers: A list of dictionaries, each specifying a provider and model configuration.
                   Example: `[{"provider": "openai", "model": "gpt-4.1-mini"}, {"provider": "anthropic", "model": "claude-3-5-haiku-20241022", "max_tokens": 50}]`
                   Each dict must contain at least "provider". "model" is optional (uses provider default).
                   Other valid parameters for `generate_completion` (like `max_tokens`, `temperature`) can be included.
        max_concurrency: (Optional) Maximum number of provider requests to run in parallel. Default 3.
        timeout: (Optional) Maximum time in seconds to wait for each individual provider request. Default 30.0.
                 Requests exceeding this time will result in a timeout error for that specific provider.

    Returns:
        A dictionary containing results from each provider, along with aggregate statistics:
        {
            "results": {
                "openai/gpt-4.1-mini": {        # Keyed by provider/model
                    "text": "Response from OpenAI...",
                    "model": "openai/gpt-4.1-mini",
                    "provider": "openai",
                    "tokens": { ... },
                    "cost": 0.000123,
                    "processing_time": 1.5,
                    "success": true,
                    "error": null
                },
                "anthropic/claude-3-5-haiku-20241022": {
                    "text": null,
                    "model": "anthropic/claude-3-5-haiku-20241022",
                    "provider": "anthropic",
                    "tokens": null,
                    "cost": 0.0,
                    "processing_time": 30.0,
                    "success": false,
                    "error": "Request timed out after 30.0 seconds"
                },
                ...
            },
            "aggregate_stats": {
                "total_requests": 2,
                "successful_requests": 1,
                "failed_requests": 1,
                "total_cost": 0.000123,
                "total_processing_time": 30.1, # Total wall time for the concurrent execution
                "average_processing_time": 1.5 # Average time for successful requests
            },
            "cached_result": false # True if the entire multi_completion result was cached
        }

    Raises:
        ToolInputError: If the `providers` list format is invalid.
        ToolError: For other internal errors during setup.
                 Individual provider errors are captured within the "results" dictionary.
    """
    start_time = time.time()
    
    # Validate providers format
    if not isinstance(providers, list) or not all(isinstance(p, dict) and 'provider' in p for p in providers):
        raise ToolInputError(
            "Invalid providers format. Must be a list of dictionaries, each with at least a 'provider' key.",
            param_name="providers",
            provided_value=providers
        )
        
    results = {}
    tasks = []
    semaphore = asyncio.Semaphore(max_concurrency)
    total_cost = 0.0
    successful_requests = 0
    failed_requests = 0
    successful_times = []

    async def process_provider(provider_config):
        nonlocal total_cost, successful_requests, failed_requests, successful_times
        provider_name = provider_config.get("provider")
        model_name = provider_config.get("model")
        # Create a unique key for results dictionary, handling cases where model might be None initially
        result_key = f"{provider_name}/{model_name or 'default'}"
        
        async with semaphore:
            provider_start_time = time.time()
            error_message = None
            result_data = None
            actual_model_used = model_name # Store the actual model reported by the result

            try:
                # Extract specific params for generate_completion
                completion_params = {k: v for k, v in provider_config.items() if k not in ["provider"]}
                completion_params["prompt"] = prompt # Add the common prompt
                
                logger.debug(f"Calling generate_completion for {provider_name} / {model_name or 'default'}...")
                
                # Call generate_completion with timeout
                completion_task = generate_completion(provider=provider_name, **completion_params)
                result_data = await asyncio.wait_for(completion_task, timeout=timeout)
                
                provider_processing_time = time.time() - provider_start_time
                
                if result_data and result_data.get("success"):
                    cost = result_data.get("cost", 0.0)
                    total_cost += cost
                    successful_requests += 1
                    successful_times.append(provider_processing_time)
                    actual_model_used = result_data.get("model") # Get the actual model used
                    logger.info(f"Success from {result_key} in {provider_processing_time:.2f}s")
                else:
                    failed_requests += 1
                    error_message = result_data.get("error", "Unknown error during completion") if isinstance(result_data, dict) else "Invalid result format"
                    logger.warning(f"Failure from {result_key}: {error_message}")

            except asyncio.TimeoutError:
                provider_processing_time = time.time() - provider_start_time
                failed_requests += 1
                error_message = f"Request timed out after {timeout:.1f} seconds"
                logger.warning(f"Timeout for {result_key} after {timeout:.1f}s")
            except ProviderError as pe:
                 provider_processing_time = time.time() - provider_start_time
                 failed_requests += 1
                 error_message = f"ProviderError: {str(pe)}"
                 logger.warning(f"ProviderError for {result_key}: {str(pe)}")
                 actual_model_used = pe.model # Get model from exception if available
            except Exception as e:
                provider_processing_time = time.time() - provider_start_time
                failed_requests += 1
                error_message = f"Unexpected error: {type(e).__name__}: {str(e)}"
                logger.error(f"Unexpected error for {result_key}: {e}", exc_info=True)

            # Store result or error
            # Use the potentially updated result_key
            results[result_key] = {
                "text": result_data.get("text") if result_data else None,
                "model": actual_model_used, # Use the actual model name from result or exception
                "provider": provider_name,
                "tokens": result_data.get("tokens") if result_data else None,
                "cost": result_data.get("cost", 0.0) if result_data else 0.0,
                "processing_time": provider_processing_time,
                "success": error_message is None,
                "error": error_message
            }

    # Create tasks
    for config in providers:
        task = asyncio.create_task(process_provider(config))
        tasks.append(task)

    # Wait for all tasks to complete
    await asyncio.gather(*tasks)

    total_processing_time = time.time() - start_time
    average_processing_time = sum(successful_times) / len(successful_times) if successful_times else 0.0
    
    logger.info(
        f"Multi-completion finished. Success: {successful_requests}, Failed: {failed_requests}, Total Cost: ${total_cost:.6f}, Total Time: {total_processing_time:.2f}s",
        emoji_key="info"
    )

    return {
        "results": results,
        "aggregate_stats": {
            "total_requests": len(providers),
            "successful_requests": successful_requests,
            "failed_requests": failed_requests,
            "total_cost": total_cost,
            "total_processing_time": total_processing_time,
            "average_processing_time": average_processing_time
        }
        # Note: cached_result is added by decorator
    }
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/html_to_markdown.py:
--------------------------------------------------------------------------------

```python
"""HTML to Markdown conversion tools for Ultimate MCP Server."""
import re
import time
from typing import Any, Dict, List

import html2text
import readability
import trafilatura
from bs4 import BeautifulSoup
from markdownify import markdownify as md

from ultimate_mcp_server.exceptions import ToolInputError
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.html_to_markdown")

# --- Helper Functions ---

def _is_html_fragment(text: str) -> bool:
    """Detect if text is likely an HTML fragment.
    
    Args:
        text: Input text to check
        
    Returns:
        bool: True if the text appears to be HTML, False otherwise
    """
    # Simple heuristics to check if the text contains HTML
    html_patterns = [
        r"<\s*[a-zA-Z]+[^>]*>",  # Basic HTML tag pattern
        r"<\s*/\s*[a-zA-Z]+\s*>",  # Closing HTML tag
        r"&[a-zA-Z]+;",  # HTML entities
        r"<!\s*DOCTYPE",  # DOCTYPE declaration
        r"<!\s*--",  # HTML comment
        r"style\s*=\s*['\"]",  # style attribute
        r"class\s*=\s*['\"]",  # class attribute
        r"id\s*=\s*['\"]",  # id attribute
        r"href\s*=\s*['\"]",  # href attribute
        r"src\s*=\s*['\"]",  # src attribute
    ]
    
    # Check if the text matches any of the patterns
    for pattern in html_patterns:
        if re.search(pattern, text, re.IGNORECASE):
            return True
    
    return False

def _clean_html_with_beautifulsoup(html: str) -> str:
    """Clean HTML using BeautifulSoup.
    
    Args:
        html: HTML content to clean
        
    Returns:
        Cleaned HTML string with unwanted elements removed
    """
    try:
        soup = BeautifulSoup(html, 'html.parser')
        
        # Remove unwanted elements
        for element in soup(['script', 'style', 'svg', 'iframe', 'canvas', 'noscript']):
            element.decompose()
        
        # Remove base64 data attributes and other potentially problematic attributes
        for tag in soup.find_all(True):
            for attr in list(tag.attrs):
                # Clean data URLs
                if attr == 'src' and isinstance(tag.attrs[attr], str) and 'data:' in tag.attrs[attr]:
                    del tag.attrs[attr]
                # Remove other problematic attributes
                elif attr.startswith('on') or attr == 'style' or attr.startswith('data-'):
                    del tag.attrs[attr]
        
        return str(soup)
    except Exception as e:
        logger.warning(f"Error cleaning HTML with BeautifulSoup: {str(e)}")
        # If BeautifulSoup fails, return the original HTML
        return html

def _html_to_markdown_with_html2text(html: str) -> str:
    """Convert HTML to Markdown using html2text.
    
    Args:
        html: HTML content to convert
        
    Returns:
        Markdown formatted text
    """
    try:
        h = html2text.HTML2Text()
        h.ignore_links = False
        h.ignore_images = False
        h.ignore_tables = False
        h.unicode_snob = True  # Use Unicode instead of ASCII
        h.body_width = 0  # No wrapping
        
        return h.handle(html)
    except Exception as e:
        logger.warning(f"Error converting HTML to Markdown with html2text: {str(e)}")
        # If html2text fails, try a simpler approach
        return html

def _html_to_markdown_with_markdownify(html: str) -> str:
    """Convert HTML to Markdown using markdownify.
    
    Args:
        html: HTML content to convert
        
    Returns:
        Markdown formatted text
    """
    try:
        return md(html, heading_style="ATX")
    except Exception as e:
        logger.warning(f"Error converting HTML to Markdown with markdownify: {str(e)}")
        # If markdownify fails, try a simpler approach
        return html

def _extract_content_with_readability(html: str) -> str:
    """Extract main content from HTML using readability.
    
    Args:
        html: HTML content to process
        
    Returns:
        HTML string containing only the main content
    """
    try:
        doc = readability.Document(html)
        content = doc.summary()
        return content
    except Exception as e:
        logger.warning(f"Error extracting content with readability: {str(e)}")
        # If readability fails, return the original HTML
        return html

def _extract_content_with_trafilatura(html: str) -> str:
    """Extract main content from HTML using trafilatura.
    
    Args:
        html: HTML content to process
        
    Returns:
        Extracted text content
    """
    try:
        extracted_text = trafilatura.extract(html, include_comments=False, include_tables=True)
        if extracted_text:
            return extracted_text
        # Fall back to HTML extraction if text extraction fails
        extracted_html = trafilatura.extract(html, output_format='html', include_comments=False, include_tables=True)
        return extracted_html or html
    except Exception as e:
        logger.warning(f"Error extracting content with trafilatura: {str(e)}")
        # If trafilatura fails, return the original HTML
        return html

def _sanitize_markdown(markdown: str) -> str:
    """Clean up and format the markdown to be more readable.
    
    Args:
        markdown: Markdown text to sanitize
        
    Returns:
        Cleaned markdown text
    """
    # Fix excessive newlines (more than 2 consecutive)
    sanitized = re.sub(r'\n{3,}', '\n\n', markdown)
    
    # Fix list item spacing
    sanitized = re.sub(r'(\n[*-].*\n)(?!\n)', r'\1\n', sanitized)
    
    # Remove trailing whitespace from lines
    sanitized = re.sub(r' +$', '', sanitized, flags=re.MULTILINE)
    
    # Fix markdown heading formatting (ensure space after #)
    sanitized = re.sub(r'(^|\n)(#{1,6})([^#\s])', r'\1\2 \3', sanitized)
    
    # Fix code block formatting
    sanitized = re.sub(r'```\s*\n', '```\n', sanitized)
    sanitized = re.sub(r'\n\s*```', '\n```', sanitized)
    
    # Ensure proper code block syntax (start with language or leave empty)
    sanitized = re.sub(r'```([^a-zA-Z\s\n][^`\n]*)$', '```\n\\1', sanitized, flags=re.MULTILINE)
    
    # Normalize list indicators (consistent use of - or * for unordered lists)
    sanitized = re.sub(r'^[*+] ', '- ', sanitized, flags=re.MULTILINE)
    
    return sanitized

def _improve_markdown_formatting(markdown: str) -> str:
    """Improve the formatting of the markdown to make it more readable.
    
    Args:
        markdown: Markdown text to improve
        
    Returns:
        Improved markdown text
    """
    # Ensure proper spacing for headings
    improved = re.sub(r'(\n#{1,6}[^\n]+)(\n[^\n#])', r'\1\n\2', markdown)
    
    # Ensure paragraphs have proper spacing
    improved = re.sub(r'(\n[^\s#>*-][^\n]+)(\n[^\s#>*-])', r'\1\n\2', improved)
    
    # Fix blockquote formatting
    improved = re.sub(r'(\n>[ ][^\n]+)(\n[^>\s])', r'\1\n\2', improved)
    
    # Fix nested list formatting
    improved = re.sub(r'(\n[ ]{2,}[*-][ ][^\n]+)(\n[^\s*-])', r'\1\n\2', improved)
    
    # Add horizontal rules for clear section breaks (if large content gaps exist)
    improved = re.sub(r'\n\n\n\n+', '\n\n---\n\n', improved)
    
    return improved

def _convert_html_tables_to_markdown(html: str) -> str:
    """Specifically handle HTML tables and convert them to markdown tables.
    
    Args:
        html: HTML content with tables to convert
        
    Returns:
        Markdown text with properly formatted tables
    """
    try:
        soup = BeautifulSoup(html, 'html.parser')
        tables = soup.find_all('table')
        
        # If no tables, return original HTML
        if not tables:
            return html
        
        for table in tables:
            rows = table.find_all('tr')
            if not rows:
                continue
                
            markdown_table = []
            
            # Process header row
            header_cells = rows[0].find_all(['th', 'td'])
            if header_cells:
                header_row = '| ' + ' | '.join([cell.get_text().strip() for cell in header_cells]) + ' |'
                markdown_table.append(header_row)
                
                # Add separator row
                separator_row = '| ' + ' | '.join(['---' for _ in header_cells]) + ' |'
                markdown_table.append(separator_row)
            
            # Process data rows
            for row in rows[1:]:
                cells = row.find_all('td')
                if cells:
                    data_row = '| ' + ' | '.join([cell.get_text().strip() for cell in cells]) + ' |'
                    markdown_table.append(data_row)
            
            # Replace the table with its markdown equivalent
            table_html = str(table)
            table_markdown = '\n'.join(markdown_table)
            html = html.replace(table_html, table_markdown)
    
        return html
        
    except Exception as e:
        logger.warning(f"Error converting HTML tables to Markdown: {str(e)}")
        # If conversion fails, return the original HTML
        return html

# --- Main Tool Function ---

@with_tool_metrics
@with_error_handling
async def clean_and_format_text_as_markdown(
    text: str,
    force_markdown_conversion: bool = False,
    extraction_method: str = "auto",
    preserve_tables: bool = True,
    preserve_links: bool = True,
    preserve_images: bool = False,
    max_line_length: int = 0  # 0 means no wrapping
) -> Dict[str, Any]:
    """Converts plain text or HTML to clean, well-formatted markdown.
    
    Automatically detects if input is HTML, then cleans and converts it.
    For non-HTML text, it applies minimal formatting to create valid markdown.
    
    Args:
        text: The input text to clean and format (plain text or HTML).
        force_markdown_conversion: Whether to force markdown conversion even if the text doesn't
                                  look like HTML. Default is False.
        extraction_method: Method to extract content from HTML. Options:
                          - "auto": Automatically choose the best method
                          - "readability": Use Mozilla's Readability algorithm
                          - "trafilatura": Use trafilatura library
                          - "raw": Don't extract main content, convert the whole document
                          Default is "auto".
        preserve_tables: Whether to preserve and convert HTML tables to markdown tables.
                        Default is True.
        preserve_links: Whether to preserve and convert HTML links to markdown links.
                       Default is True.
        preserve_images: Whether to preserve and convert HTML images to markdown image syntax.
                        Default is False.
        max_line_length: Maximum line length for text wrapping. 0 means no wrapping.
                        Default is 0.
    
    Returns:
        Dictionary containing:
        {
            "markdown_text": "Cleaned and formatted markdown text",
            "was_html": true,  # Whether the input was detected as HTML
            "extraction_method_used": "readability",  # Which extraction method was used
            "processing_time": 0.35,  # Time taken in seconds
            "success": true
        }
    
    Raises:
        ToolInputError: If the input text is empty or not a string.
    """
    start_time = time.time()
    
    # Input validation
    if not text:
        raise ToolInputError("Input text cannot be empty")
    if not isinstance(text, str):
        raise ToolInputError("Input text must be a string")
    
    # Determine if input is HTML
    is_html = _is_html_fragment(text) or force_markdown_conversion
    
    # Process based on content type
    if is_html:
        logger.info("Input detected as HTML, processing for conversion to markdown")
        
        # Convert HTML tables to markdown before main processing
        if preserve_tables:
            text = _convert_html_tables_to_markdown(text)
        
        # Extract main content based on specified method
        extraction_method_used = extraction_method
        if extraction_method == "auto":
            # If the text is a small fragment, use raw conversion
            if len(text) < 1000:
                extraction_method_used = "raw"
            else:
                # Try trafilatura first, fallback to readability
                try:
                    extracted = _extract_content_with_trafilatura(text)
                    if extracted and len(extracted) > 0.2 * len(text):  # Ensure we got meaningful extraction
                        text = extracted
                        extraction_method_used = "trafilatura"
                    else:
                        text = _extract_content_with_readability(text)
                        extraction_method_used = "readability"
                except Exception:
                    text = _extract_content_with_readability(text)
                    extraction_method_used = "readability"
        elif extraction_method == "readability":
            text = _extract_content_with_readability(text)
        elif extraction_method == "trafilatura":
            text = _extract_content_with_trafilatura(text)
        # For "raw", we use the text as is
        
        # Clean HTML before conversion
        text = _clean_html_with_beautifulsoup(text)
        
        # Set up conversion options based on parameters
        h = html2text.HTML2Text()
        h.ignore_links = not preserve_links
        h.ignore_images = not preserve_images
        h.ignore_tables = not preserve_tables
        h.body_width = max_line_length
        h.unicode_snob = True
        
        # Try multiple conversion methods and use the best result
        try:
            markdown_text = h.handle(text)
            
            # Fallback to markdownify if html2text result looks problematic
            if '&lt;' in markdown_text or '&gt;' in markdown_text or len(markdown_text.strip()) < 100 and len(text) > 500:
                try:
                    alternative = _html_to_markdown_with_markdownify(text)
                    if len(alternative.strip()) > len(markdown_text.strip()):
                        markdown_text = alternative
                except Exception:
                    pass
        except Exception as e:
            logger.warning(f"Primary markdown conversion failed: {str(e)}")
            try:
                markdown_text = _html_to_markdown_with_markdownify(text)
            except Exception:
                # Last resort: strip tags and return plain text
                markdown_text = re.sub(r'<[^>]*>', '', text)
    else:
        logger.info("Input detected as plain text, applying minimal markdown formatting")
        # For plain text, just clean it up a bit
        markdown_text = text
        extraction_method_used = "none"
    
    # Final cleanup and formatting of the markdown
    markdown_text = _sanitize_markdown(markdown_text)
    markdown_text = _improve_markdown_formatting(markdown_text)
    
    processing_time = time.time() - start_time
    logger.info(f"Text cleaned and formatted as markdown in {processing_time:.2f}s")
    
    return {
        "markdown_text": markdown_text,
        "was_html": is_html,
        "extraction_method_used": extraction_method_used,
        "processing_time": processing_time,
        "success": True
    }

# --- Additional Tool Functions ---

@with_tool_metrics
@with_error_handling
async def detect_content_type(text: str) -> Dict[str, Any]:
    """Analyzes text to detect its type: HTML, markdown, code, or plain text.
    
    Applies multiple heuristics to determine the most likely content type
    of the provided text string.
    
    Args:
        text: The input text to analyze
    
    Returns:
        Dictionary containing:
        {
            "content_type": "html",  # One of: "html", "markdown", "code", "plain_text"
            "confidence": 0.85,  # Confidence score (0.0-1.0)
            "details": {
                "html_markers": 12,  # Count of HTML markers found
                "markdown_markers": 3,  # Count of markdown markers found
                "code_markers": 1,  # Count of code markers found
                "detected_language": "javascript"  # If code is detected
            },
            "success": true
        }
    
    Raises:
        ToolInputError: If the input text is empty or not a string.
    """
    if not text:
        raise ToolInputError("Input text cannot be empty")
    if not isinstance(text, str):
        raise ToolInputError("Input text must be a string")
    
    # Initialize counters for markers
    html_markers = 0
    markdown_markers = 0
    code_markers = 0
    detected_language = None
    
    # Check for HTML markers
    html_patterns = [
        (r"<\s*[a-zA-Z]+[^>]*>", 1),  # HTML tag
        (r"<\s*/\s*[a-zA-Z]+\s*>", 1),  # Closing HTML tag
        (r"&[a-zA-Z]+;", 0.5),  # HTML entity
        (r"<!\s*DOCTYPE", 2),  # DOCTYPE
        (r"<!\s*--", 1),  # HTML comment
        (r"<!--.*?-->", 1),  # Complete HTML comment
        (r"<(div|span|p|a|img|table|ul|ol|li|h[1-6])\b", 1.5),  # Common HTML tags
        (r"</(div|span|p|a|img|table|ul|ol|li|h[1-6])>", 1.5),  # Common closing tags
        (r"<(html|head|body|meta|link|script|style)\b", 2),  # Structure tags
        (r"</(html|head|body|script|style)>", 2),  # Structure closing tags
        (r"style\s*=\s*['\"]", 1),  # style attribute
        (r"class\s*=\s*['\"]", 1),  # class attribute
        (r"id\s*=\s*['\"]", 1),  # id attribute
        (r"href\s*=\s*['\"]", 1),  # href attribute
        (r"src\s*=\s*['\"]", 1)  # src attribute
    ]
    
    for pattern, weight in html_patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        html_markers += len(matches) * weight
    
    # Check for Markdown markers
    markdown_patterns = [
        (r"^#\s+.+$", 2),  # Heading level 1
        (r"^#{2,6}\s+.+$", 1.5),  # Headings levels 2-6
        (r"^\s*[*-]\s+.+$", 1),  # Unordered list
        (r"^\s*\d+\.\s+.+$", 1),  # Ordered list
        (r"^\s*>\s+.+$", 1.5),  # Blockquote
        (r"\[.+?\]\(.+?\)", 2),  # Link
        (r"!\[.+?\]\(.+?\)", 2),  # Image
        (r"`[^`\n]+`", 1),  # Inline code
        (r"^```\s*\w*$", 2),  # Code block start
        (r"^```$", 2),  # Code block end
        (r"\*\*.+?\*\*", 1),  # Bold
        (r"\*.+?\*", 0.5),  # Italic
        (r"__(.+?)__", 1),  # Bold with underscore
        (r"_(.+?)_", 0.5),  # Italic with underscore
        (r"~~.+?~~", 1),  # Strikethrough
        (r"^\s*[-*_]{3,}\s*$", 1.5),  # Horizontal rule
        (r"^\s*\|(.+\|)+\s*$", 2),  # Table row
        (r"^\s*\|([-:]+\|)+\s*$", 3)  # Table header/divider
    ]
    
    for pattern, weight in markdown_patterns:
        matches = re.findall(pattern, text, re.MULTILINE)
        markdown_markers += len(matches) * weight
    
    # Check for code markers
    code_patterns = [
        (r"function\s+\w+\s*\(.*?\)\s*\{", 2),  # Function declaration
        (r"(var|let|const)\s+\w+\s*=", 1.5),  # Variable declaration JS
        (r"if\s*\(.*?\)\s*\{", 1),  # If statement
        (r"for\s*\(.*?;.*?;.*?\)\s*\{", 2),  # For loop
        (r"while\s*\(.*?\)\s*\{", 2),  # While loop
        (r"class\s+\w+(\s+extends\s+\w+)?\s*\{", 2),  # Class declaration
        (r"import\s+.*?from\s+['\"].*?['\"]", 2),  # ES6 Import
        (r"def\s+\w+\s*\(.*?\):", 2),  # Python function
        (r"class\s+\w+(\(\w+\))?:", 2),  # Python class
        (r"import\s+\w+(\s+as\s+\w+)?", 1.5),  # Python import
        (r"from\s+\w+(\.\w+)*\s+import", 1.5),  # Python from import
        (r"public\s+(static\s+)?(void|int|String)\s+\w+\s*\(", 2),  # Java method
        (r"#include\s*<.*?>", 2),  # C/C++ include
        (r"^\s*package\s+[\w\.]+;", 2),  # Java/Kotlin package
        (r"^\s*using\s+[\w\.]+;", 2),  # C# using
        (r"^\s*(public|private|protected)\s+class", 2)  # Access modifier
    ]
    
    for pattern, weight in code_patterns:
        matches = re.findall(pattern, text, re.MULTILINE)
        code_markers += len(matches) * weight
    
    # Detect programming language if it looks like code
    if code_markers > 5:
        # Very basic language detection based on unique syntax
        language_patterns = [
            (r"function\s+\w+|var\s+\w+|let\s+\w+|const\s+\w+|document\.|\$\(", "javascript"),
            (r"<\?php|\$[a-zA-Z_]", "php"),
            (r"def\s+\w+\s*\(.*?\):|import\s+\w+|from\s+\w+\s+import", "python"),
            (r"public\s+class\s+\w+|public\s+static\s+void\s+main", "java"),
            (r"#include\s*<.*?>|int\s+main\s*\(", "c/c++"),
            (r"^\s*using\s+System;|namespace\s+\w+|public\s+class\s+\w+\s*:", "c#"),
            (r"module\s+\w+|fn\s+\w+|let\s+\w+|impl", "rust"),
            (r"^\s*import\s+\w+\s+from\s+['\"]|export\s+(default\s+)?", "typescript"),
            (r"^package\s+main|func\s+\w+\(|import\s+\([^)]*\)", "go")
        ]
        
        for pattern, lang in language_patterns:
            if re.search(pattern, text, re.MULTILINE | re.IGNORECASE):
                detected_language = lang
                break
    
    # Calculate final scores and confidence
    html_score = html_markers / max(len(text) / 100, 1)
    markdown_score = markdown_markers / max(len(text.split('\n')), 1)
    code_score = code_markers / max(len(text.split('\n')), 1)
    
    # Plain text has no specific markers, so it's the default fallback
    plain_text_score = 1.0 - max(min(html_score / 10, 1), min(markdown_score / 5, 1), min(code_score / 5, 1))
    
    # Determine the content type
    scores = {
        "html": html_score,
        "markdown": markdown_score,
        "code": code_score,
        "plain_text": plain_text_score
    }
    
    content_type = max(scores, key=scores.get)
    max_score = scores[content_type]
    
    # Calculate confidence based on how dominant the max score is
    total_score = sum(scores.values())
    if total_score > 0:
        confidence = max_score / total_score
    else:
        confidence = 0.25  # Equal probability for all types
    
    # Adjust confidence if very few markers were found
    if content_type != "plain_text" and (html_markers + markdown_markers + code_markers) < 3:
        confidence *= 0.7
    
    return {
        "content_type": content_type,
        "confidence": min(confidence, 1.0),
        "details": {
            "html_markers": html_markers,
            "markdown_markers": markdown_markers,
            "code_markers": code_markers,
            "detected_language": detected_language if content_type == "code" else None
        },
        "success": True
    }

@with_tool_metrics
@with_error_handling
async def batch_format_texts(
    texts: List[str],
    force_markdown_conversion: bool = False,
    extraction_method: str = "auto",
    max_concurrency: int = 5,
    preserve_tables: bool = True
) -> Dict[str, Any]:
    """Processes multiple text inputs in parallel, converting each to markdown.
    
    Efficiently handles a batch of text inputs by processing them concurrently
    up to a specified concurrency limit.
    
    Args:
        texts: List of text strings to clean and format.
        force_markdown_conversion: Whether to force markdown conversion for all inputs.
                                  Default is False.
        extraction_method: Method to extract content from HTML. Options:
                          - "auto": Automatically choose the best method
                          - "readability": Use Mozilla's Readability algorithm
                          - "trafilatura": Use trafilatura library
                          - "raw": Don't extract main content, convert the whole document
                          Default is "auto".
        max_concurrency: Maximum number of texts to process simultaneously.
                        Default is 5.
        preserve_tables: Whether to preserve and convert HTML tables to markdown tables.
                        Default is True.
    
    Returns:
        Dictionary containing:
        {
            "results": [
                {
                    "markdown_text": "Cleaned and formatted markdown text",
                    "was_html": true,
                    "extraction_method_used": "readability"
                },
                ...
            ],
            "total_processing_time": 2.45,  # Total time in seconds
            "success_count": 5,  # Number of successfully processed texts
            "failure_count": 0,  # Number of failed texts
            "success": true
        }
    
    Raises:
        ToolInputError: If the input list is empty or not a list of strings.
    """
    import asyncio
    
    start_time = time.time()
    
    # Input validation
    if not texts:
        raise ToolInputError("Input texts list cannot be empty")
    if not isinstance(texts, list):
        raise ToolInputError("Input must be a list of text strings")
    
    # Set up concurrency control
    semaphore = asyncio.Semaphore(max_concurrency)
    
    async def process_text(text, index):
        """Process a single text with semaphore control."""
        async with semaphore:
            try:
                result = await clean_and_format_text_as_markdown(
                    text=text,
                    force_markdown_conversion=force_markdown_conversion,
                    extraction_method=extraction_method,
                    preserve_tables=preserve_tables
                )
                result["index"] = index  # Add original index for ordering
                return result
            except Exception as e:
                logger.error(f"Error processing text at index {index}: {str(e)}")
                return {
                    "index": index,
                    "error": str(e),
                    "success": False
                }
    
    # Process all texts concurrently
    tasks = [process_text(text, i) for i, text in enumerate(texts)]
    results = await asyncio.gather(*tasks)
    
    # Sort results by original index
    sorted_results = sorted(results, key=lambda x: x.get("index", 0))
    
    # Remove index from results
    for result in sorted_results:
        if "index" in result:
            del result["index"]
    
    # Calculate statistics
    success_count = sum(1 for result in sorted_results if result.get("success", False))
    failure_count = len(sorted_results) - success_count
    total_time = time.time() - start_time
    
    return {
        "results": sorted_results,
        "total_processing_time": total_time,
        "success_count": success_count,
        "failure_count": failure_count,
        "success": True
    }

@with_tool_metrics
@with_error_handling
async def optimize_markdown_formatting(
    markdown: str,
    normalize_headings: bool = False,
    fix_lists: bool = True,
    fix_links: bool = True,
    add_line_breaks: bool = True,
    compact_mode: bool = False,
    max_line_length: int = 0
) -> Dict[str, Any]:
    """Optimizes and improves the formatting of existing markdown text.
    
    Takes markdown text and enhances its formatting by fixing common issues
    and applying stylistic improvements.
    
    Args:
        markdown: The markdown text to optimize.
        normalize_headings: If True, ensures heading levels start at h1 and are sequential.
                           Default is False.
        fix_lists: If True, fixes common issues with list formatting.
                  Default is True.
        fix_links: If True, fixes common issues with link formatting.
                  Default is True.
        add_line_breaks: If True, ensures proper paragraph breaks.
                        Default is True.
        compact_mode: If True, reduces whitespace for a more compact presentation.
                     Default is False.
        max_line_length: Maximum line length for wrapping. 0 means no wrapping.
                        Default is 0.
    
    Returns:
        Dictionary containing:
        {
            "optimized_markdown": "Cleaned and formatted markdown text",
            "changes_made": {
                "headings_normalized": true,
                "lists_fixed": true,
                "links_fixed": true,
                "line_breaks_added": true
            },
            "processing_time": 0.15,  # Time taken in seconds
            "success": true
        }
    
    Raises:
        ToolInputError: If the input markdown is empty or not a string.
    """
    import re
    
    start_time = time.time()
    
    # Input validation
    if not markdown:
        raise ToolInputError("Input markdown cannot be empty")
    if not isinstance(markdown, str):
        raise ToolInputError("Input markdown must be a string")
    
    # Track changes made
    changes_made = {
        "headings_normalized": False,
        "lists_fixed": False,
        "links_fixed": False,
        "line_breaks_added": False,
        "whitespace_adjusted": False
    }
    
    optimized = markdown
    
    # Fix markdown heading formatting (ensure space after #)
    if "#" in optimized:
        original = optimized
        optimized = re.sub(r'(^|\n)(#{1,6})([^#\s])', r'\1\2 \3', optimized)
        changes_made["headings_normalized"] = original != optimized
    
    # Normalize heading levels if requested
    if normalize_headings and "#" in optimized:
        original = optimized
        
        # Find all headings and their levels
        heading_pattern = r'(^|\n)(#{1,6})\s+(.*?)(\n|$)'
        headings = [(m.group(2), m.group(3), m.start(), m.end()) 
                    for m in re.finditer(heading_pattern, optimized)]
        
        if headings:
            # Find the minimum heading level used
            min_level = min(len(h[0]) for h in headings)
            
            # Adjust heading levels if the minimum isn't h1
            if min_level > 1:
                # Process headings in reverse order to avoid messing up positions
                for level, text, start, end in reversed(headings):
                    new_level = '#' * (len(level) - min_level + 1)
                    replacement = f"{optimized[start:start+1]}{new_level} {text}{optimized[end-1:end]}"
                    optimized = optimized[:start] + replacement + optimized[end:]
                
                changes_made["headings_normalized"] = True
    
    # Fix list formatting
    if fix_lists and any(c in optimized for c in ['-', '*', '+']):
        original = optimized
        
        # Ensure consistent list markers
        optimized = re.sub(r'^([*+]) ', r'- ', optimized, flags=re.MULTILINE)
        
        # Fix list item spacing
        optimized = re.sub(r'(\n- .+)(\n[^-\s])', r'\1\n\2', optimized)
        
        # Fix indentation in nested lists
        optimized = re.sub(r'(\n- .+\n)(\s{1,3}- )', r'\1  \2', optimized)
        
        changes_made["lists_fixed"] = original != optimized
    
    # Fix link formatting
    if fix_links and "[" in optimized:
        original = optimized
        
        # Fix reference-style links (ensure consistent spacing)
        optimized = re.sub(r'\]\[', r'] [', optimized)
        
        # Fix malformed links with space between []()
        optimized = re.sub(r'\] \(', r'](', optimized)
        
        # Ensure proper spacing around links in sentences
        optimized = re.sub(r'([^\s])\[', r'\1 [', optimized)
        optimized = re.sub(r'\]([^\(\s])', r'] \1', optimized)
        
        changes_made["links_fixed"] = original != optimized
    
    # Add proper line breaks for readability
    if add_line_breaks:
        original = optimized
        
        # Ensure headings have a blank line before (except at start of document)
        optimized = re.sub(r'(?<!\n\n)(^|\n)#', r'\1\n#', optimized)
        
        # Ensure paragraphs have blank lines between them
        optimized = re.sub(r'(\n[^\s#>*-][^\n]+)(\n[^\s#>*-])', r'\1\n\2', optimized)
        
        # Clean up any excessive blank lines created
        optimized = re.sub(r'\n{3,}', r'\n\n', optimized)
        
        changes_made["line_breaks_added"] = original != optimized
    
    # Adjust whitespace based on compact_mode
    original = optimized
    if compact_mode:
        # Reduce blank lines to single blank lines
        optimized = re.sub(r'\n\s*\n', r'\n\n', optimized)
        
        # Remove trailing whitespace
        optimized = re.sub(r' +$', '', optimized, flags=re.MULTILINE)
    else:
        # Ensure consistent double line breaks for section transitions
        optimized = re.sub(r'(\n#{1,6}[^\n]+\n)(?!\n)', r'\1\n', optimized)
    
    changes_made["whitespace_adjusted"] = original != optimized
    
    # Apply line wrapping if specified
    if max_line_length > 0:
        import textwrap
        
        # Split into paragraphs, wrap each, then rejoin
        paragraphs = re.split(r'\n\s*\n', optimized)
        wrapped_paragraphs = []
        
        for p in paragraphs:
            # Skip wrapping for code blocks, lists, and headings
            if (p.strip().startswith("```") or
                re.match(r'^\s*[*\-+]', p, re.MULTILINE) or
                re.match(r'^#{1,6}\s', p.strip())):
                wrapped_paragraphs.append(p)
            else:
                # Wrap regular paragraphs
                lines = p.split('\n')
                wrapped_lines = []
                for line in lines:
                    if not line.strip().startswith(('>', '#', '-', '*', '+')):
                        wrapped = textwrap.fill(line, width=max_line_length)
                        wrapped_lines.append(wrapped)
                    else:
                        wrapped_lines.append(line)
                wrapped_paragraphs.append('\n'.join(wrapped_lines))
        
        optimized = '\n\n'.join(wrapped_paragraphs)
    
    processing_time = time.time() - start_time
    
    return {
        "optimized_markdown": optimized,
        "changes_made": changes_made,
        "processing_time": processing_time,
        "success": True
    }
```

--------------------------------------------------------------------------------
/examples/advanced_extraction_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Demo of advanced extraction capabilities using Ultimate MCP Server."""
import asyncio
import json
import os
import re
import sys
import time
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
from rich.traceback import Traceback

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker, parse_and_display_result
from ultimate_mcp_server.utils.logging.console import console
from ultimate_mcp_server.utils.parsing import extract_json_from_markdown

# --- Debug Flag ---
USE_DEBUG_LOGS = True # Set to True to enable detailed logging
# ------------------

# Initialize logger
logger = get_logger("example.advanced_extraction")
logger.set_level("debug")

# Configure the OpenAI client for direct extraction demos
async def setup_openai_provider():
    """Set up an OpenAI provider for demonstration."""
    try:
        logger.info("Initializing OpenAI for demonstration", emoji_key="start")
        
        # Get OpenAI provider - get_provider will return None if key missing/invalid in config
        provider = await get_provider(Provider.OPENAI.value)
        if not provider: 
             logger.error("Failed to get OpenAI provider. Is the OPENAI_API_KEY configured correctly in your environment/config?")
             return None
             
        logger.success("OpenAI provider initialized successfully.")
        return provider
    except Exception as e:
        logger.error(f"Failed to initialize OpenAI provider: {e}", emoji_key="error")
        return None

async def run_json_extraction_example(provider, tracker: CostTracker):
    """Demonstrate JSON extraction."""
    if USE_DEBUG_LOGS:
        logger.debug("Entering run_json_extraction_example.")
    if not provider:
        console.print("[yellow]Skipping JSON extraction demo - no provider available.[/yellow]")
        if USE_DEBUG_LOGS:
            logger.debug("Exiting run_json_extraction_example (no provider).")
        return
        
    console.print(Rule("[bold blue]1. JSON Extraction Example[/bold blue]"))
    
    # Load sample text
    sample_path = Path(__file__).parent / "data" / "sample_event.txt"
    if not sample_path.exists():
        # Create a sample text for demonstration
        sample_text = """
        Tech Conference 2024
        Location: San Francisco Convention Center, 123 Tech Blvd, San Francisco, CA 94103
        Date: June 15-17, 2024
        Time: 9:00 AM - 6:00 PM daily
        
        Registration Fee: $599 (Early Bird: $499 until March 31)
        
        Keynote Speakers:
        - Dr. Sarah Johnson, AI Research Director at TechCorp
        - Mark Williams, CTO of FutureTech Industries
        - Prof. Emily Chen, MIT Computer Science Department
        
        Special Events:
        - Networking Reception: June 15, 7:00 PM - 10:00 PM
        - Hackathon: June 16, 9:00 PM - 9:00 AM (overnight)
        - Career Fair: June 17, 1:00 PM - 5:00 PM
        
        For more information, contact [email protected] or call (555) 123-4567.
        """
        # Ensure the data directory exists
        os.makedirs(os.path.dirname(sample_path), exist_ok=True)
        # Write sample text to file
        with open(sample_path, "w") as f:
            f.write(sample_text)
    else:
        # Read existing sample text
        with open(sample_path, "r") as f:
            sample_text = f.read()
    
    # Display sample text
    console.print(Panel(sample_text, title="Sample Event Text", border_style="blue"))
    
    # Define JSON schema for event
    event_schema = {
        "type": "object",
        "properties": {
            "name": {"type": "string", "description": "Event name"},
            "location": {
                "type": "object",
                "properties": {
                    "venue": {"type": "string"},
                    "address": {"type": "string"},
                    "city": {"type": "string"},
                    "state": {"type": "string"},
                    "zip": {"type": "string"}
                }
            },
            "dates": {
                "type": "object",
                "properties": {
                    "start": {"type": "string", "format": "date"},
                    "end": {"type": "string", "format": "date"}
                }
            },
            "time": {"type": "string"},
            "registration": {
                "type": "object",
                "properties": {
                    "regular_fee": {"type": "number"},
                    "early_bird_fee": {"type": "number"},
                    "early_bird_deadline": {"type": "string", "format": "date"}
                }
            },
            "speakers": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "title": {"type": "string"},
                        "organization": {"type": "string"}
                    }
                }
            },
            "special_events": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "date": {"type": "string", "format": "date"},
                        "time": {"type": "string"}
                    }
                }
            },
            "contact": {
                "type": "object",
                "properties": {
                    "email": {"type": "string", "format": "email"},
                    "phone": {"type": "string"}
                }
            }
        }
    }
    
    # Display JSON schema
    schema_json = json.dumps(event_schema, indent=2)
    console.print(Panel(
        Syntax(schema_json, "json", theme="monokai", line_numbers=True),
        title="Event JSON Schema",
        border_style="green"
    ))
    
    # Extract JSON using direct provider call
    logger.info("Extracting structured JSON data from text...", emoji_key="processing")
    
    try:
        start_time = time.time()
        
        # Instead of using the tool, use direct completion for demo purposes
        prompt = f"""
        Extract structured information from the following text into a JSON object.
        Follow the provided JSON schema exactly.
        
        TEXT:
        {sample_text}
        
        JSON SCHEMA:
        {json.dumps(event_schema, indent=2)}
        
        Provide only the valid JSON object as output, with no additional commentary.
        """
        
        if USE_DEBUG_LOGS:
            logger.debug(f"JSON Extraction Prompt:\n{prompt}")
        
        # Call the provider directly
        result = await provider.generate_completion(
            prompt=prompt,
            model="gpt-4.1-mini",  # Use an available OpenAI model
            temperature=0.2,       # Lower temperature for more deterministic output
            max_tokens=1500        # Enough tokens for a full response
        )
        
        # Track cost
        tracker.add_call(result)

        if USE_DEBUG_LOGS:
            logger.debug(f"Raw JSON Extraction Result Text:\n{result.text}")
        
        # Process the result to extract just the JSON
        try:
            # Try to parse the response as JSON
            raw_text = result.text.strip()
            text_to_parse = extract_json_from_markdown(raw_text)
            if USE_DEBUG_LOGS:
                logger.debug(f"Raw text received: {raw_text[:500]}...")
                logger.debug(f"Attempting to parse JSON after cleaning: {text_to_parse[:500]}...")
            json_result = json.loads(text_to_parse)
            if USE_DEBUG_LOGS:
                logger.debug(f"Successfully parsed JSON: {json.dumps(json_result, indent=2)}")
            
            # Create a dictionary with structured data and metadata for display
            structured_result_data = {
                "json": json_result, # The actual parsed JSON
                "validated": True,   # Assuming validation happens elsewhere or is implied
                "model": result.model,
                "processing_time": time.time() - start_time,
                "tokens": {
                    "input": result.input_tokens,
                    "output": result.output_tokens,
                    "total": result.input_tokens + result.output_tokens
                },
                "cost": result.cost
            }
            
            # Display the results using the utility function
            parse_and_display_result(
                title="JSON Extraction Results",
                input_data={"text": sample_text, "schema": event_schema},
                result=structured_result_data, # Pass the structured data
                console=console
            )
            
        except json.JSONDecodeError as e:
            # Log the error regardless of debug flag
            logger.error(f"JSONDecodeError occurred: {e}", exc_info=False)
            
            if USE_DEBUG_LOGS:
                # Log the string that caused the error (before cleaning)
                logger.debug(f"Raw string causing JSONDecodeError:\n{raw_text}")
                # Log the string that failed parsing (after cleaning)
                logger.debug(f"Cleaned string that failed JSON parsing:\n{text_to_parse}")
                # Print a rich traceback to the console
                console.print("[bold red]-- Traceback for JSONDecodeError --[/bold red]")
                console.print(Traceback())
                console.print("[bold red]-- End Traceback --[/bold red]")
                
            # If JSON parsing fails, show the raw response
            console.print(Panel(
                raw_text, # Show the original raw text from the model
                title="[yellow]Raw Model Output (JSON parsing failed)[/yellow]",
                border_style="red"
            ))
            
    except Exception as e:
        logger.error(f"Error extracting JSON: {str(e)}", emoji_key="error", exc_info=True)
        
    console.print()
    if USE_DEBUG_LOGS:
        logger.debug("Exiting run_json_extraction_example.")

async def table_extraction_demo(provider, tracker: CostTracker):
    """Demonstrate table extraction capabilities."""
    if USE_DEBUG_LOGS:
        logger.debug("Entering table_extraction_demo.")
    if not provider:
        console.print("[yellow]Skipping table extraction demo - no provider available.[/yellow]")
        if USE_DEBUG_LOGS:
            logger.debug("Exiting table_extraction_demo (no provider).")
        return
        
    logger.info("Starting table extraction demo", emoji_key="start")
    
    # Sample text with embedded table
    text = """
    Financial Performance by Quarter (2023-2024)
    
    | Quarter | Revenue ($M) | Expenses ($M) | Profit ($M) | Growth (%) |
    |---------|-------------|---------------|-------------|------------|
    | Q1 2023 | 42.5        | 32.1          | 10.4        | 3.2        |
    | Q2 2023 | 45.7        | 33.8          | 11.9        | 6.5        |
    | Q3 2023 | 50.2        | 35.6          | 14.6        | 9.8        |
    | Q4 2023 | 58.3        | 38.2          | 20.1        | 15.2       |
    | Q1 2024 | 60.1        | 39.5          | 20.6        | 3.1        |
    | Q2 2024 | 65.4        | 41.2          | 24.2        | 8.8        |
    
    Note: All figures are in millions of dollars and are unaudited.
    Growth percentages are relative to the previous quarter.
    """
    
    # Log extraction attempt
    logger.info("Performing table extraction", emoji_key="processing")
    
    try:
        start_time = time.time()
        
        # Prompt for table extraction
        prompt = f"""
        Extract the table from the following text and format it as both JSON and Markdown.
        
        TEXT:
        {text}
        
        For the JSON format, use this structure:
        {{
            "headers": ["Header1", "Header2", ...],
            "rows": [
                {{"Header1": "value", "Header2": "value", ...}},
                ...
            ]
        }}
        
        For the Markdown format, output a well-formatted Markdown table.
        
        Also extract any metadata about the table (title, notes, etc.).
        
        Format your response as JSON with the following structure:
        {{
            "json_table": {{...}},
            "markdown_table": "...",
            "metadata": {{
                "title": "...",
                "notes": [
                    "..."
                ]
            }}
        }}
        """
        
        if USE_DEBUG_LOGS:
            logger.debug(f"Table Extraction Prompt:\n{prompt}")
        
        # Call the provider directly
        result = await provider.generate_completion(
            prompt=prompt,
            model="gpt-4.1-mini",
            temperature=0.2,
            max_tokens=1500
        )
        
        # Track cost
        tracker.add_call(result)

        if USE_DEBUG_LOGS:
            logger.debug(f"Raw Table Extraction Result Text:\n{result.text}")
        
        try:
            # Try to parse the response as JSON
            raw_text = result.text.strip() # Keep raw text separate
            text_to_parse = extract_json_from_markdown(raw_text) # Clean it
            if USE_DEBUG_LOGS:
                # Log both raw and cleaned versions
                logger.debug(f"Raw text received (Table): {raw_text[:500]}...")
                logger.debug(f"Attempting to parse Table Extraction JSON after cleaning: {text_to_parse[:500]}...")
            json_result = json.loads(text_to_parse) # Parse the cleaned version
            if USE_DEBUG_LOGS:
                logger.debug(f"Successfully parsed Table Extraction JSON: {json.dumps(json_result, indent=2)}")
            
            # Create structured data dictionary for display
            structured_result_data = {
                "formats": {
                    "json": json_result.get("json_table", {}),
                    "markdown": json_result.get("markdown_table", "")
                },
                "metadata": json_result.get("metadata", {}),
                "model": result.model,
                "processing_time": time.time() - start_time,
                "tokens": {
                    "input": result.input_tokens,
                    "output": result.output_tokens,
                    "total": result.input_tokens + result.output_tokens
                },
                "cost": result.cost
            }
            
            # Parse the result using the shared utility
            parse_and_display_result(
                "Table Extraction Demo", 
                {"text": text}, 
                structured_result_data # Pass the structured data
            )
            
        except json.JSONDecodeError as e:
            # Log the error regardless of debug flag
            logger.error(f"JSONDecodeError in Table Extraction occurred: {e}", exc_info=False)
            
            if USE_DEBUG_LOGS:
                # Log both raw and cleaned versions for debugging the failure
                logger.debug(f"Raw string causing JSONDecodeError in Table Extraction:\n{raw_text}")
                logger.debug(f"Cleaned string that failed JSON parsing in Table Extraction:\n{text_to_parse}")
                # Print a rich traceback to the console
                console.print("[bold red]-- Traceback for JSONDecodeError (Table Extraction) --[/bold red]")
                console.print(Traceback())
                console.print("[bold red]-- End Traceback --[/bold red]")
                
            # If JSON parsing fails, show the raw response using the original raw_text
            console.print(Panel(
                raw_text,
                title="[yellow]Raw Model Output (JSON parsing failed)[/yellow]",
                border_style="red"
            ))
            
    except Exception as e:
        logger.error(f"Error in table extraction: {str(e)}", emoji_key="error")
    # Add exit log
    if USE_DEBUG_LOGS:
        logger.debug("Exiting table_extraction_demo.")

async def semantic_schema_inference_demo(provider, tracker: CostTracker):
    """Demonstrate semantic schema inference."""
    if USE_DEBUG_LOGS:
        logger.debug("Entering semantic_schema_inference_demo.")
    if not provider:
        console.print("[yellow]Skipping semantic schema inference demo - no provider available.[/yellow]")
        if USE_DEBUG_LOGS:
            logger.debug("Exiting semantic_schema_inference_demo (no provider).")
        return
        
    logger.info("Starting semantic schema inference demo", emoji_key="start")
    
    # Sample text for schema inference
    text = """
    Patient Record: John Smith
    Date of Birth: 05/12/1978
    Patient ID: P-98765
    Blood Type: O+
    Height: 182 cm
    Weight: 76 kg
    
    Medications:
    - Lisinopril 10mg, once daily
    - Metformin 500mg, twice daily
    - Atorvastatin 20mg, once daily at bedtime
    
    Allergies:
    - Penicillin (severe)
    - Shellfish (mild)
    
    Recent Vital Signs:
    Date: 03/15/2024
    Blood Pressure: 128/85 mmHg
    Heart Rate: 72 bpm
    Temperature: 98.6°F
    Oxygen Saturation: 98%
    
    Medical History:
    - Type 2 Diabetes (diagnosed 2015)
    - Hypertension (diagnosed 2017)
    - Hyperlipidemia (diagnosed 2019)
    - Appendectomy (2005)
    """
    
    # Define a schema template for the extraction
    patient_schema = {
        "type": "object",
        "properties": {
            "patient": {
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "dob": {"type": "string"},
                    "id": {"type": "string"},
                    "blood_type": {"type": "string"},
                    "height": {"type": "string"},
                    "weight": {"type": "string"}
                }
            },
            "medications": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "dosage": {"type": "string"},
                        "frequency": {"type": "string"}
                    }
                }
            },
            "allergies": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "allergen": {"type": "string"},
                        "severity": {"type": "string"}
                    }
                }
            },
            "vital_signs": {
                "type": "object",
                "properties": {
                    "date": {"type": "string"},
                    "blood_pressure": {"type": "string"},
                    "heart_rate": {"type": "string"},
                    "temperature": {"type": "string"},
                    "oxygen_saturation": {"type": "string"}
                }
            },
            "medical_history": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "condition": {"type": "string"},
                        "diagnosed": {"type": "string"}
                    }
                }
            }
        }
    }
    
    # Log schema inference attempt
    logger.info("Performing schema inference", emoji_key="processing")
    
    try:
        start_time = time.time()
        
        # Prompt for semantic schema extraction
        prompt = f"""
        Extract structured information from the text according to the provided semantic schema.
        
        TEXT:
        {text}
        
        SEMANTIC SCHEMA:
        {json.dumps(patient_schema, indent=2)}
        
        Analyze the text and extract information following the schema structure. Return a valid JSON object.
        """
        
        if USE_DEBUG_LOGS:
            logger.debug(f"Schema Inference Prompt:\n{prompt}")
        
        # Call the provider directly
        result = await provider.generate_completion(
            prompt=prompt,
            model="gpt-4.1-mini",
            temperature=0.2,
            max_tokens=1000
        )
        
        # Track cost
        tracker.add_call(result)

        if USE_DEBUG_LOGS:
            logger.debug(f"Raw Schema Inference Result Text:\n{result.text}")
        
        try:
            # Try to parse the response as JSON
            raw_text = result.text.strip()
            text_to_parse = extract_json_from_markdown(raw_text)
            if USE_DEBUG_LOGS:
                logger.debug(f"Raw text received (Schema): {raw_text[:500]}...")
                logger.debug(f"Attempting to parse Schema Inference JSON after cleaning: {text_to_parse[:500]}...")
            json_result = json.loads(text_to_parse)
            if USE_DEBUG_LOGS:
                logger.debug(f"Successfully parsed Schema Inference JSON: {json.dumps(json_result, indent=2)}")
            
            # Create structured data dictionary for display
            structured_result_data = {
                "extracted_data": json_result,
                "model": result.model,
                "processing_time": time.time() - start_time,
                "tokens": {
                    "input": result.input_tokens,
                    "output": result.output_tokens,
                    "total": result.input_tokens + result.output_tokens
                },
                "cost": result.cost
            }
            
            # Parse the result using the shared utility
            parse_and_display_result(
                "Semantic Schema Inference Demo", 
                {"text": text}, 
                structured_result_data # Pass the structured data
            )
            
        except json.JSONDecodeError as e:
            # Log the error regardless of debug flag
            logger.error(f"JSONDecodeError in Schema Inference occurred: {e}", exc_info=False)
            
            if USE_DEBUG_LOGS:
                # Log both raw and cleaned versions
                logger.debug(f"Raw string causing JSONDecodeError in Schema Inference:\n{raw_text}")
                logger.debug(f"Cleaned string that failed JSON parsing in Schema Inference:\n{text_to_parse}")
                # Print a rich traceback to the console
                console.print("[bold red]-- Traceback for JSONDecodeError (Schema Inference) --[/bold red]")
                console.print(Traceback())
                console.print("[bold red]-- End Traceback --[/bold red]")
                
            # If JSON parsing fails, show the raw response
            console.print(Panel(
                raw_text,
                title="[yellow]Raw Model Output (JSON parsing failed)[/yellow]",
                border_style="red"
            ))
            
    except Exception as e:
        logger.error(f"Error in schema inference: {str(e)}", emoji_key="error")
    # Add exit log
    if USE_DEBUG_LOGS:
        logger.debug("Exiting semantic_schema_inference_demo.")

async def entity_extraction_demo(provider, tracker: CostTracker):
    """Demonstrate entity extraction capabilities."""
    if USE_DEBUG_LOGS:
        logger.debug("Entering entity_extraction_demo.")
    if not provider:
        console.print("[yellow]Skipping entity extraction demo - no provider available.[/yellow]")
        if USE_DEBUG_LOGS:
            logger.debug("Exiting entity_extraction_demo (no provider).")
        return
        
    logger.info("Starting entity extraction demo", emoji_key="start")
    
    # Sample text for entity extraction
    text = """
    In a groundbreaking announcement on March 15, 2024, Tesla unveiled its latest solar energy
    technology in partnership with SolarCity. CEO Elon Musk presented the new PowerWall 4.0 
    battery system at their headquarters in Austin, Texas. The system can store up to 20kWh of 
    energy and costs approximately $6,500 per unit.
    
    According to Dr. Maria Chen, lead researcher at the National Renewable Energy Laboratory (NREL),
    this technology represents a significant advancement in residential energy storage. The new
    system integrates with the Tesla mobile app on both iOS and Android platforms, allowing users
    to monitor energy usage in real-time.
    
    Tesla stock (TSLA) rose 5.8% following the announcement, reaching $248.32 per share on the NASDAQ.
    The company plans to begin production at their Gigafactory Nevada location by June 2024, with
    initial deployments in California and Texas markets.
    """
    
    # Log entity extraction attempt
    logger.info("Performing entity extraction", emoji_key="processing")
    
    try:
        start_time = time.time()
        
        # Prompt for entity extraction
        prompt = f"""
        Extract key-value pairs and entities from the following text, categorized by type.
        
        TEXT:
        {text}
        
        Extract the following categories of information:
        - Organizations (companies, institutions, etc.)
        - People (names and titles)
        - Locations (cities, states, facilities, etc.)
        - Dates and Times
        - Products and Technologies
        - Numerical Values (monetary values, percentages, measurements, etc.)
        
        Format the output as a JSON object with these categories as keys, and each containing relevant entities found.
        Within each category, provide structured information when possible.
        """
        
        if USE_DEBUG_LOGS:
            logger.debug(f"Entity Extraction Prompt:\n{prompt}")
            
        # Call the provider directly
        result = await provider.generate_completion(
            prompt=prompt,
            model="gpt-4.1-mini", 
            temperature=0.2,
            max_tokens=500
        )
        
        # Track cost
        tracker.add_call(result)

        if USE_DEBUG_LOGS:
            logger.debug(f"Raw Entity Extraction Result Text:\n{result.text}")
            
        try:
            # Try to parse the response as JSON
            raw_text = result.text.strip()
            text_to_parse = extract_json_from_markdown(raw_text)
            if USE_DEBUG_LOGS:
                logger.debug(f"Raw text received (Entity): {raw_text[:500]}...")
                logger.debug(f"Attempting to parse Entity Extraction JSON after cleaning: {text_to_parse[:500]}...")
            if USE_DEBUG_LOGS:
                logger.debug(f"EXACT STRING PASSED TO json.loads: >>>{text_to_parse}<<<")
            
            try:
                # First try standard parsing
                json_result = json.loads(text_to_parse)
            except json.JSONDecodeError as e:
                logger.warning(f"Standard JSON parsing failed: {e}. Attempting emergency repair.")
                
                # Emergency fallback for malformed JSON due to unterminated strings
                # 1. Look for the raw JSON structure with markdown removed
                text_no_markdown = text_to_parse
                
                # 2. Manually check for key entity categories, even if JSON is malformed
                # Create a structured result with categories we expect to find
                json_result = {
                    "Organizations": [],
                    "People": [],
                    "Locations": [],
                    "Dates and Times": [],
                    "Products and Technologies": [],
                    "Numerical Values": []
                }
                
                # Look for entity categories using regex
                org_matches = re.findall(r'"name"\s*:\s*"([^"]+)".*?"type"\s*:\s*"([^"]+)"', text_no_markdown)
                for name, entity_type in org_matches:
                    # Determine which category this entity belongs to based on type
                    if any(keyword in entity_type.lower() for keyword in ["company", "corporation", "institution", "exchange"]):
                        json_result["Organizations"].append({"name": name, "type": entity_type})
                    elif any(keyword in entity_type.lower() for keyword in ["city", "state", "facility"]):
                        json_result["Locations"].append({"name": name, "type": entity_type})
                    elif any(keyword in entity_type.lower() for keyword in ["battery", "app", "system", "technology"]):
                        json_result["Products and Technologies"].append({"name": name, "type": entity_type})
                
                # Look for people - they usually have titles and organizations
                people_matches = re.findall(r'"name"\s*:\s*"([^"]+)".*?"title"\s*:\s*"([^"]+)".*?"organization"\s*:\s*"([^"]*)"', text_no_markdown)
                for name, title, org in people_matches:
                    json_result["People"].append({"name": name, "title": title, "organization": org})
                
                # Dates and numerical values are harder to extract generically
                # but we can look for obvious patterns
                date_matches = re.findall(r'"date"\s*:\s*"([^"]+)".*?"event"\s*:\s*"([^"]+)"', text_no_markdown)
                for date, event in date_matches:
                    json_result["Dates and Times"].append({"date": date, "event": event})
                
                # For numerical values, look for values with units
                value_matches = re.findall(r'"value"\s*:\s*([^,]+).*?"unit"\s*:\s*"([^"]+)"', text_no_markdown)
                for value, unit in value_matches:
                    # Clean up the value
                    clean_value = value.strip('" ')
                    item = {"value": clean_value, "unit": unit}
                    
                    # Look for a description if available
                    desc_match = re.search(r'"description"\s*:\s*"([^"]+)"', text_no_markdown)
                    if desc_match:
                        item["description"] = desc_match.group(1)
                        
                    json_result["Numerical Values"].append(item)
                
                # Add a note about emergency repair
                logger.warning("Used emergency JSON repair - results may be incomplete")
            
            if USE_DEBUG_LOGS:
                logger.debug(f"Successfully parsed Entity Extraction JSON: {json.dumps(json_result, indent=2)}")
            
            # Create structured data dictionary for display
            structured_result_data = {
                "extracted_data": json_result,
                "structured": True,
                "categorized": True,
                "model": result.model,
                "processing_time": time.time() - start_time,
                "tokens": {
                    "input": result.input_tokens,
                    "output": result.output_tokens,
                    "total": result.input_tokens + result.output_tokens
                },
                "cost": result.cost
            }
            
            # Parse the result using the shared utility
            parse_and_display_result(
                "Entity Extraction Demo", 
                {"text": text}, 
                structured_result_data # Pass the structured data
            )
            
        except json.JSONDecodeError as e:
            # Log the error regardless of debug flag
            logger.error(f"JSONDecodeError in Entity Extraction occurred: {e}", exc_info=False)
            
            if USE_DEBUG_LOGS:
                # Log both raw and cleaned versions
                logger.debug(f"Raw string causing JSONDecodeError in Entity Extraction:\n{raw_text}")
                logger.debug(f"Cleaned string that failed JSON parsing in Entity Extraction:\n{text_to_parse}")
                # Print a rich traceback to the console
                console.print("[bold red]-- Traceback for JSONDecodeError (Entity Extraction) --[/bold red]")
                console.print(Traceback())
                console.print("[bold red]-- End Traceback --[/bold red]")
                
            # If JSON parsing fails, show the raw response
            console.print(Panel(
                raw_text,
                title="[yellow]Raw Model Output (JSON parsing failed)[/yellow]",
                border_style="red"
            ))
            
    except Exception as e:
        logger.error(f"Error in entity extraction: {str(e)}", emoji_key="error")
    # Add exit log
    if USE_DEBUG_LOGS:
        logger.debug("Exiting entity_extraction_demo.")

async def main():
    """Run the advanced extraction demos."""
    tracker = CostTracker() # Instantiate tracker
    provider = await setup_openai_provider()
    
    if not provider:
        logger.warning("OpenAI provider not available. Demo sections requiring it will be skipped.", emoji_key="warning")
        
    console.print(Rule("[bold magenta]Advanced Extraction Demos Starting[/bold magenta]"))
    
    demos_to_run = [
        (run_json_extraction_example, "JSON Extraction"),
        (table_extraction_demo, "Table Extraction"),
        (semantic_schema_inference_demo, "Schema Inference"),
        (entity_extraction_demo, "Entity Extraction")
    ]
    
    # Execute demos sequentially
    for demo_func, demo_name in demos_to_run:
        try:
            await demo_func(provider, tracker) # Pass tracker
        except Exception as e:
            logger.error(f"Error running {demo_name} demo: {e}", emoji_key="error", exc_info=True)
    
    # Display final cost summary
    tracker.display_summary(console)

    logger.success("Advanced Extraction Demo finished successfully!", emoji_key="complete")
    console.print(Rule("[bold magenta]Advanced Extraction Demos Complete[/bold magenta]"))

if __name__ == "__main__":
    # Run the demos
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/anthropic.py:
--------------------------------------------------------------------------------

```python
# ultimate_mcp_server/providers/anthropic.py
"""Anthropic (Claude) provider implementation."""

import json
import re
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple

from anthropic import AsyncAnthropic

from ultimate_mcp_server.constants import Provider, TaskType  # Import TaskType for logging
from ultimate_mcp_server.core.providers.base import (
    BaseProvider,
    ModelResponse,
)
from ultimate_mcp_server.utils import get_logger

# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.anthropic")


class AnthropicProvider(BaseProvider):
    """Provider implementation for Anthropic (Claude) API."""

    provider_name = Provider.ANTHROPIC.value

    def __init__(self, api_key: Optional[str] = None, **kwargs):
        """Initialize the Anthropic provider.

        Args:
            api_key: Anthropic API key
            **kwargs: Additional options (e.g., base_url)
        """
        super().__init__(api_key=api_key, **kwargs)
        self.base_url = kwargs.get("base_url")
        self.models_cache = None
        self.client: Optional[AsyncAnthropic] = None  # Initialize client attribute

    async def initialize(self) -> bool:
        """Initialize the Anthropic client.

        Returns:
            bool: True if initialization was successful
        """
        if not self.api_key:
            self.logger.error("Anthropic API key is not configured.", emoji_key="error")
            return False

        try:
            self.client = AsyncAnthropic(
                api_key=self.api_key,
                base_url=self.base_url,
            )

            # Skip API call if using a mock key (for tests)
            if "mock-" in self.api_key:
                self.logger.info(
                    "Using mock Anthropic key - skipping API validation", emoji_key="mock"
                )
                # Assume mock initialization is always successful for testing purposes
                self.is_initialized = True
                return True

            # Optional: Add a quick check_api_key() call here if desired,
            # but initialize might succeed even if key is invalid later.
            # is_valid = await self.check_api_key() # This makes initialize slower
            # if not is_valid:
            #     self.logger.error("Anthropic API key appears invalid.", emoji_key="error")
            #     return False

            self.logger.success("Anthropic provider initialized successfully", emoji_key="provider")
            self.is_initialized = True  # Mark as initialized
            return True

        except Exception as e:
            self.logger.error(
                f"Failed to initialize Anthropic provider: {str(e)}",
                emoji_key="error",
                exc_info=True,  # Log traceback for debugging
            )
            self.is_initialized = False
            return False

    async def generate_completion(
        self,
        prompt: Optional[str] = None,
        messages: Optional[List[Dict[str, Any]]] = None,
        model: Optional[str] = None,
        max_tokens: Optional[int] = 1024,  # Signature default
        temperature: float = 0.7,
        json_mode: bool = False,
        **kwargs,
    ) -> ModelResponse:
        """Generate a single non-chat completion using Anthropic Claude.

        Args:
            prompt: Text prompt to send to the model.
            messages: List of message dictionaries, alternative to prompt.
            model: Model name to use (e.g., "claude-3-opus-20240229").
            max_tokens: Maximum tokens to generate. Defaults to 1024.
            temperature: Temperature parameter (0.0-1.0).
            json_mode: If True, attempt to guide model towards JSON output (via prompting).
            **kwargs: Additional model-specific parameters (e.g., top_p, system).

        Returns:
            ModelResponse object.
        """
        if not self.client:
            if not await self.initialize():
                raise ConnectionError("Anthropic provider failed to initialize.")

        model = model or self.get_default_model()
        actual_model_name = self.strip_provider_prefix(model)

        # Original logic: Validate that either prompt or messages is provided
        if prompt is None and not messages:
            raise ValueError("Either 'prompt' or 'messages' must be provided")
            
        # Original logic: If messages are provided, use the chat_completion function
        if messages:
            # Ensure all necessary parameters are passed to generate_chat_completion
            # This includes system_prompt if it's in kwargs
            return await self.generate_chat_completion(
                messages=messages,
                model=model, # Pass original model ID
                max_tokens=max_tokens,
                temperature=temperature,
                json_mode=json_mode, # Pass json_mode
                **kwargs # Pass other kwargs like system, top_p etc.
            )

        # Original logic: Prepare message list for the API from prompt
        # This path is taken if only 'prompt' is provided (and not 'messages')
        current_api_messages = [{"role": "user", "content": prompt}]

        # Original logic: Handle system prompt if passed in kwargs for the simple prompt case
        system_prompt = kwargs.pop("system", None)

        # Original logic: Handle JSON mode for simple prompt case
        if json_mode:
            self.logger.debug(
                "json_mode=True requested for completion (simple prompt), modifying user message for Anthropic."
            )
            # Modify the user message content in current_api_messages
            user_message_idx = -1
            for i, msg in enumerate(current_api_messages):
                if msg["role"] == "user":
                    user_message_idx = i
                    break
            
            if user_message_idx != -1:
                original_content = current_api_messages[user_message_idx]["content"]
                if isinstance(original_content, str) and "Please respond with valid JSON" not in original_content:
                     current_api_messages[user_message_idx]["content"] = (
                        f"{original_content}\\nPlease respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."
                    )
            else:
                # This case should ideally not happen if prompt is always user role.
                # If it could, one might append a new user message asking for JSON,
                # or include it in system prompt if system_prompt is being constructed here.
                self.logger.warning("Could not find user message to append JSON instruction for simple prompt case.")

        # Prepare API call parameters using max_tokens directly from signature
        api_params = {
            "messages": current_api_messages,
            "model": actual_model_name,
            "max_tokens": max_tokens, # Uses max_tokens from signature (which defaults to 1024 if not passed)
            "temperature": temperature,
            **kwargs,  # Pass remaining kwargs (like top_p, etc.) that were not popped
        }
        if system_prompt: # Add system prompt if it was extracted
            api_params["system"] = system_prompt
        
        # Logging before API call (original style)
        self.logger.info(
            f"Generating completion with Anthropic model {actual_model_name}",
            emoji_key=TaskType.COMPLETION.value,
            prompt_length=len(prompt) if prompt else 0, # length of prompt if provided
            json_mode_requested=json_mode,
        )

        try:
            response, processing_time = await self.process_with_timer(
                self.client.messages.create, **api_params
            )
        except Exception as e:
            error_message = f"Anthropic API error during completion for model {actual_model_name}: {type(e).__name__}: {str(e)}"
            self.logger.error(error_message, exc_info=True)
            raise ConnectionError(error_message) from e

        if (
            not response.content
            or not isinstance(response.content, list)
            or not hasattr(response.content[0], "text")
        ):
            raise ValueError(f"Unexpected response format from Anthropic API: {response}")
        completion_text = response.content[0].text

        # Post-process if JSON mode was requested (for simple prompt case) - best effort extraction
        if json_mode: # This json_mode is the original parameter
            original_text_for_json_check = completion_text
            completion_text = self._extract_json_from_text(completion_text)
            if original_text_for_json_check != completion_text:
                self.logger.debug("Extracted JSON content from Anthropic response post-processing (simple prompt case).")

        result = ModelResponse(
            text=completion_text,
            model=f"{self.provider_name}/{actual_model_name}",
            provider=self.provider_name,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            processing_time=processing_time,
            raw_response=response.model_dump(),
        )
        result.message = {"role": "assistant", "content": completion_text}

        self.logger.success(
            "Anthropic completion successful",
            emoji_key="success",
            model=result.model,
            tokens={"input": result.input_tokens, "output": result.output_tokens},
            cost=result.cost,
            time=result.processing_time,
        )
        return result

    # --- NEW METHOD ---
    async def generate_chat_completion(
        self,
        messages: List[
            Dict[str, Any]
        ],  # Use Dict for broader compatibility, or specific MessageParam type
        model: Optional[str] = None,
        max_tokens: Optional[int] = 1024,  # Provide a default
        temperature: float = 0.7,
        json_mode: bool = False,  # Add json_mode parameter
        **kwargs,
    ) -> ModelResponse:
        """Generate a chat completion using Anthropic Claude.

        Args:
            messages: A list of message dictionaries (e.g., [{"role": "user", "content": "..."}]).
                      Should conform to Anthropic's expected format.
            model: Model name to use (e.g., "claude-3-opus-20240229").
            max_tokens: Maximum tokens to generate. Defaults to 1024.
            temperature: Temperature parameter (0.0-1.0).
            json_mode: If True, guide the model to generate JSON output (via prompt engineering).
            **kwargs: Additional model-specific parameters (e.g., top_p, system).

        Returns:
            ModelResponse object containing the assistant's message.
        """
        if not self.client:
            if not await self.initialize():
                raise ConnectionError("Anthropic provider failed to initialize.")

        model = model or self.get_default_model()
        actual_model_name = self.strip_provider_prefix(model)

        # Handle system prompt extraction
        system_prompt = kwargs.pop("system", None)
        
        # Process the messages to extract system message and convert to Anthropic format
        processed_messages = []
        extracted_system = None
        
        for msg in messages:
            role = msg.get("role", "")
            content = msg.get("content", "")
            
            # Extract system message if present
            if role == "system":
                if extracted_system is None:  # Take the first system message
                    extracted_system = content
                # Don't add system messages to the processed_messages list
                continue
            elif role in ("user", "assistant"):
                # Keep user and assistant messages
                processed_messages.append({"role": role, "content": content})
            else:
                self.logger.warning(f"Ignoring unsupported message role: {role}")
                
        # If we found a system message, use it (overrides any system in kwargs)
        if extracted_system is not None:
            system_prompt = extracted_system

        # Process json_mode by modifying system prompt or last user message
        json_mode_requested = json_mode
        
        if json_mode_requested:
            self.logger.debug(
                "json_mode=True requested for chat completion, implementing via prompt engineering for Anthropic"
            )
            
            # If we have a system prompt, update it to include JSON instructions
            if system_prompt:
                system_prompt = f"{system_prompt}\n\nIMPORTANT: You must respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."
            # Otherwise, if there's at least one user message, modify the last one
            elif processed_messages and any(m.get("role") == "user" for m in processed_messages):
                # Find last user message
                for i in range(len(processed_messages) - 1, -1, -1):
                    if processed_messages[i].get("role") == "user":
                        user_content = processed_messages[i].get("content", "")
                        # Only add JSON instruction if not already present
                        if "respond with JSON" not in user_content and "respond in JSON" not in user_content:
                            processed_messages[i]["content"] = f"{user_content}\n\nPlease respond ONLY with valid JSON. Do not include explanations or markdown formatting."
                        break
            # If neither system prompt nor user messages to modify, add a system prompt
            else:
                system_prompt = "You must respond ONLY with valid JSON. Do not include explanations or markdown formatting."

        # Prepare API call parameters
        api_params = {
            "messages": processed_messages,
            "model": actual_model_name,
            "max_tokens": max_tokens,
            "temperature": temperature,
            **kwargs,  # Pass remaining kwargs (like top_p, etc.)
        }
        if system_prompt:
            api_params["system"] = system_prompt

        self.logger.info(
            f"Generating chat completion with Anthropic model {actual_model_name}",
            emoji_key=TaskType.CHAT.value,  # Use enum value
            message_count=len(processed_messages),
            json_mode_requested=json_mode_requested,  # Log if it was requested
        )

        try:
            response, processing_time = await self.process_with_timer(
                self.client.messages.create, **api_params
            )
        except Exception as e:
            error_message = f"Anthropic API error during chat completion for model {actual_model_name}: {type(e).__name__}: {str(e)}"
            self.logger.error(error_message, exc_info=True)
            raise ConnectionError(error_message) from e

        # Extract response content
        if (
            not response.content
            or not isinstance(response.content, list)
            or not hasattr(response.content[0], "text")
        ):
            raise ValueError(f"Unexpected response format from Anthropic API: {response}")
        assistant_content = response.content[0].text

        # Create standardized response including the assistant message
        result = ModelResponse(
            text=assistant_content,  # Keep raw text accessible
            model=f"{self.provider_name}/{actual_model_name}",  # Return prefixed model ID
            provider=self.provider_name,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            processing_time=processing_time,
            raw_response=response.model_dump(),  # Use model_dump() if Pydantic
        )
        
        # Add message to result for chat_completion
        result.message = {"role": "assistant", "content": assistant_content}

        # Log success
        self.logger.success(
            "Anthropic chat completion successful",
            emoji_key="success",
            model=result.model,
            tokens={"input": result.input_tokens, "output": result.output_tokens},
            cost=result.cost,
            time=result.processing_time,
        )

        return result

    # --- END NEW METHOD ---

    async def generate_completion_stream(
        self,
        # Keep existing signature: accepts prompt primarily, but also messages/system in kwargs
        prompt: Optional[str] = None,  # Make prompt optional if messages are primary input
        messages: Optional[List[Dict[str, Any]]] = None,  # Allow messages directly
        model: Optional[str] = None,
        max_tokens: Optional[int] = 1024,  # Default max_tokens
        temperature: float = 0.7,
        json_mode: bool = False,  # Accept json_mode flag
        **kwargs,
    ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
        """Generate a streaming completion using Anthropic Claude. Handles both prompt and message inputs.

        Args:
            prompt: (Optional) Text prompt (if messages not provided).
            messages: (Optional) List of message dictionaries. Takes precedence over prompt.
            model: Model name to use.
            max_tokens: Maximum tokens to generate. Defaults to 1024.
            temperature: Temperature parameter.
            json_mode: If True, guides model towards JSON (via prompting if using prompt input).
            **kwargs: Additional parameters (system, top_p, etc.).

        Yields:
            Tuple of (text_chunk, metadata).

        Raises:
            ConnectionError: If provider initialization fails or API call fails.
            ValueError: If neither prompt nor messages are provided.
        """
        if not self.client:
            if not await self.initialize():
                raise ConnectionError("Anthropic provider failed to initialize.")

        model = model or self.get_default_model()
        actual_model_name = self.strip_provider_prefix(model)

        # Prepare system prompt if provided in kwargs
        system_prompt = kwargs.pop("system", None)

        # Determine input messages: Use 'messages' if provided, otherwise construct from 'prompt'
        if messages:
            # Process the messages to extract system message and convert to Anthropic format
            processed_messages = []
            extracted_system = None
            
            for msg in messages:
                role = msg.get("role", "")
                content = msg.get("content", "")
                
                # Extract system message if present
                if role == "system":
                    if extracted_system is None:  # Take the first system message
                        extracted_system = content
                    # Don't add system messages to the processed_messages list
                    continue
                elif role in ("user", "assistant"):
                    # Keep user and assistant messages
                    processed_messages.append({"role": role, "content": content})
                else:
                    self.logger.warning(f"Ignoring unsupported message role in streaming: {role}")
                    
            # If we found a system message, use it (overrides any system in kwargs)
            if extracted_system is not None:
                system_prompt = extracted_system
                
            input_desc = f"{len(processed_messages)} messages"
        elif prompt:
            # Construct messages from prompt
            processed_messages = [{"role": "user", "content": prompt}]
            input_desc = f"prompt ({len(prompt)} chars)"

            # Apply JSON mode prompt modification ONLY if using prompt input
            if json_mode:
                self.logger.debug(
                    "json_mode=True requested for stream completion, modifying prompt for Anthropic."
                )
                user_message = processed_messages[-1]
                original_content = user_message["content"]
                if "Please respond with valid JSON" not in original_content:
                    user_message["content"] = (
                        f"{original_content}\nPlease respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."
                    )
        else:
            raise ValueError(
                "Either 'prompt' or 'messages' must be provided for generate_completion_stream"
            )

        # Apply JSON mode to system prompt if using messages input and json_mode is True
        json_mode_requested = kwargs.pop("json_mode", json_mode)  # Keep track if it was requested
        if json_mode_requested and messages:
            if system_prompt:
                system_prompt = f"{system_prompt}\n\nIMPORTANT: You must respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."
            else:
                system_prompt = "You must respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."

        # Prepare API call parameters
        params = {
            "model": actual_model_name,
            "messages": processed_messages,
            "temperature": temperature,
            "max_tokens": max_tokens,  # Use the default or provided value
            **kwargs,  # Pass remaining kwargs
        }
        if system_prompt:
            params["system"] = system_prompt

        self.logger.info(
            f"Generating streaming completion with Anthropic model {actual_model_name}",
            emoji_key=self.provider_name,
            input_type=input_desc,
            json_mode_requested=json_mode_requested,
        )

        start_time = time.time()
        total_chunks = 0
        final_input_tokens = 0
        final_output_tokens = 0
        finish_reason = None  # Track finish reason

        try:
            async with self.client.messages.stream(**params) as stream:
                async for chunk in stream:
                    # Extract text delta
                    if chunk.type == "content_block_delta":
                        content = chunk.delta.text
                        total_chunks += 1
                        metadata = {
                            "model": f"{self.provider_name}/{actual_model_name}",
                            "provider": self.provider_name,
                            "chunk_index": total_chunks,
                            "finish_reason": None,  # Not final yet
                        }
                        yield content, metadata

                    # Don't attempt to capture usage from delta chunks - wait for final message

                # Important: Get final tokens from the final message state
                try:
                    final_message = await stream.get_final_message()
                    final_input_tokens = final_message.usage.input_tokens if hasattr(final_message, 'usage') else 0
                    final_output_tokens = final_message.usage.output_tokens if hasattr(final_message, 'usage') else 0
                    # Ensure finish_reason is captured from the final message
                    finish_reason = final_message.stop_reason if hasattr(final_message, 'stop_reason') else "unknown"
                except Exception as e:
                    # If we can't get the final message for any reason, log it but continue
                    self.logger.warning(f"Couldn't get final message stats: {e}")
                    # Estimate token counts based on total characters / avg chars per token
                    char_count = sum(len(m.get("content", "")) for m in processed_messages)
                    final_input_tokens = char_count // 4  # Rough estimate
                    final_output_tokens = total_chunks * 5  # Very rough estimate

            processing_time = time.time() - start_time
            self.logger.success(
                "Anthropic streaming completion successful",
                emoji_key="success",
                model=f"{self.provider_name}/{actual_model_name}",
                chunks=total_chunks,
                tokens={"input": final_input_tokens, "output": final_output_tokens},
                time=processing_time,
                finish_reason=finish_reason,
            )

            # Yield a final empty chunk with aggregated metadata
            final_metadata = {
                "model": f"{self.provider_name}/{actual_model_name}",
                "provider": self.provider_name,
                "chunk_index": total_chunks + 1,
                "input_tokens": final_input_tokens,
                "output_tokens": final_output_tokens,
                "total_tokens": final_input_tokens + final_output_tokens,
                "processing_time": processing_time,
                "finish_reason": finish_reason,
            }
            yield "", final_metadata

        except Exception as e:
            processing_time = time.time() - start_time
            self.logger.error(
                f"Anthropic streaming completion failed after {processing_time:.2f}s: {str(e)}",
                emoji_key="error",
                model=f"{self.provider_name}/{actual_model_name}",
                exc_info=True,
            )
            # Yield a final error chunk
            error_metadata = {
                "model": f"{self.provider_name}/{actual_model_name}",
                "provider": self.provider_name,
                "chunk_index": total_chunks + 1,
                "error": f"{type(e).__name__}: {str(e)}",
                "finish_reason": "error",
                "processing_time": processing_time,
            }
            yield "", error_metadata
            # Don't re-raise here, let the caller handle the error chunk

    async def list_models(self) -> List[Dict[str, Any]]:
        """List available Anthropic Claude models.

        Returns:
            List of model information dictionaries including the provider prefix.
        """
        # Anthropic doesn't have a list models endpoint, return static list WITH prefix
        # Based on the models defined in constants.py
        static_models = [
            # Define with the full ID including provider prefix
            {
                "id": f"{self.provider_name}/claude-3-7-sonnet-20250219",
                "name": "Claude 3.7 Sonnet",
                "context_window": 200000,
                "input_cost_pmt": 3.0,
                "output_cost_pmt": 15.0,
                "features": ["chat", "completion", "vision", "tool_use"],
            },
            {
                "id": f"{self.provider_name}/claude-3-5-haiku-20241022",
                "name": "Claude 3.5 Haiku",
                "context_window": 200000,
                "input_cost_pmt": 0.80,
                "output_cost_pmt": 4.0,
                "features": ["chat", "completion", "vision"],
            },
            {
                "id": f"{self.provider_name}/claude-3-opus-20240229",
                "name": "Claude 3 Opus",
                "context_window": 200000,
                "input_cost_pmt": 15.0,
                "output_cost_pmt": 75.0,
                "features": ["chat", "completion", "vision"],
            },
        ]

        # Simple caching (optional, as list is static)
        if not self.models_cache:
            self.models_cache = static_models
        return self.models_cache

    def get_default_model(self) -> str:
        """Get the default Anthropic model ID (including provider prefix).

        Returns:
            Default model ID string (e.g., "anthropic/claude-3-5-haiku-20241022").
        """
        # Try getting from config first
        from ultimate_mcp_server.config import get_config

        default_model_id = f"{self.provider_name}/claude-3-5-haiku-20241022"  # Hardcoded default

        try:
            config = get_config()
            # Access nested provider config safely
            provider_config = config.providers.get(self.provider_name) if config.providers else None
            if provider_config and provider_config.default_model:
                # Ensure the configured default includes the prefix
                configured_default = provider_config.default_model
                if not configured_default.startswith(f"{self.provider_name}/"):
                    self.logger.warning(
                        f"Configured default model '{configured_default}' for Anthropic is missing the provider prefix. Using hardcoded default: {default_model_id}"
                    )
                    return default_model_id
                else:
                    return configured_default
        except (ImportError, AttributeError, TypeError) as e:
            self.logger.debug(
                f"Could not retrieve default model from config ({e}), using hardcoded default."
            )

        return default_model_id

    async def check_api_key(self) -> bool:
        """Check if the Anthropic API key is valid by making a minimal request.

        Returns:
            bool: True if API key allows a basic request.
        """
        if not self.client:
            self.logger.warning("Cannot check API key: Anthropic client not initialized.")
            # Attempt initialization first
            if not await self.initialize():
                return False  # Initialization failed, key likely invalid or other issue
            # If initialize succeeded but client still None (e.g., mock key path)
            if not self.client:
                return True  # Assume mock key is 'valid' for testing

        try:
            # Use the *unprefixed* default model name for the check
            default_model_unprefixed = self.strip_provider_prefix(self.get_default_model())
            await self.client.messages.create(
                model=default_model_unprefixed,
                messages=[{"role": "user", "content": "Test"}],
                max_tokens=1,
            )
            self.logger.info("Anthropic API key validation successful.")
            return True
        except Exception as e:
            self.logger.warning(f"Anthropic API key validation failed: {type(e).__name__}")
            return False

    def strip_provider_prefix(self, model_id: str) -> str:
        """Removes the provider prefix (e.g., 'anthropic/') from a model ID."""
        prefix = f"{self.provider_name}/"
        if model_id.startswith(prefix):
            return model_id[len(prefix) :]
        # Handle ':' separator as well for backward compatibility if needed
        alt_prefix = f"{self.provider_name}:"
        if model_id.startswith(alt_prefix):
            return model_id[len(alt_prefix) :]
        return model_id  # Return original if no prefix found

    def _extract_json_from_text(self, text: str) -> str:
        """Extract JSON content from text that might include markdown code blocks or explanatory text.
        
        Args:
            text: The raw text response that might contain JSON
            
        Returns:
            Cleaned JSON content
        """
        
        # First check if the text is already valid JSON
        try:
            json.loads(text)
            return text  # Already valid JSON
        except json.JSONDecodeError:
            pass  # Continue with extraction
        
        # Extract JSON from code blocks - most common Anthropic pattern
        code_block_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', text)
        if code_block_match:
            code_content = code_block_match.group(1).strip()
            try:
                json.loads(code_content)
                return code_content
            except json.JSONDecodeError:
                # Try to fix common JSON syntax issues like trailing commas
                fixed_content = re.sub(r',\s*([}\]])', r'\1', code_content)
                try:
                    json.loads(fixed_content)
                    return fixed_content
                except json.JSONDecodeError:
                    pass  # Continue with other extraction methods
        
        # Look for JSON array or object patterns in the content
        # Find the first [ or { and the matching closing ] or }
        stripped = text.strip()
        
        # Try to extract array
        if '[' in stripped and ']' in stripped:
            start = stripped.find('[')
            # Find the matching closing bracket
            end = -1
            depth = 0
            for i in range(start, len(stripped)):
                if stripped[i] == '[':
                    depth += 1
                elif stripped[i] == ']':
                    depth -= 1
                    if depth == 0:
                        end = i + 1
                        break
            
            if end > start:
                array_content = stripped[start:end]
                try:
                    json.loads(array_content)
                    return array_content
                except json.JSONDecodeError:
                    pass  # Try other methods
        
        # Try to extract object
        if '{' in stripped and '}' in stripped:
            start = stripped.find('{')
            # Find the matching closing bracket
            end = -1
            depth = 0
            for i in range(start, len(stripped)):
                if stripped[i] == '{':
                    depth += 1
                elif stripped[i] == '}':
                    depth -= 1
                    if depth == 0:
                        end = i + 1
                        break
            
            if end > start:
                object_content = stripped[start:end]
                try:
                    json.loads(object_content)
                    return object_content
                except json.JSONDecodeError:
                    pass  # Try other methods
        
        # If all else fails, return the original text
        return text

    async def process_with_timer(self, func, *args, **kwargs) -> Tuple[Any, float]:
        """Helper to time an async function call."""
        start_time = time.perf_counter()
        result = await func(*args, **kwargs)
        end_time = time.perf_counter()
        return result, end_time - start_time


```
Page 9/35FirstPrevNextLast