This is page 9 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/cli/typer_cli.py:
--------------------------------------------------------------------------------
```python
"""Typer CLI implementation for the Ultimate MCP Server."""
import asyncio
import os
import sys
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional
import typer
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm
from rich.table import Table
# Get version hardcoded to avoid import errors
__version__ = "0.1.0" # Hardcode since there are import issues
from ultimate_mcp_server.cli.commands import (
benchmark_providers,
check_cache,
generate_completion,
list_providers,
run_server,
test_provider,
)
from ultimate_mcp_server.constants import BASE_TOOLSET_CATEGORIES
from ultimate_mcp_server.utils import get_logger
# Use consistent namespace and get console for Rich output
logger = get_logger("ultimate_mcp_server.cli")
console = Console(file=sys.stderr) # Use stderr to avoid interfering with MCP protocol
# Create typer app
app = typer.Typer(
name="umcp",
help= (
"[bold green]Ultimate MCP Server[/bold green]: Multi-provider LLM management server\n"
"[italic]Unified CLI to run your server, manage providers, and more.[/italic]"
),
rich_markup_mode="rich",
no_args_is_help=True,
add_completion=True,
)
def version_callback(value: bool):
"""Show the version information and exit.
This callback is triggered by the --version/-v flag and displays
the current version of Ultimate MCP Server before exiting.
"""
if value:
console.print(f"Ultimate MCP Server version: [bold]{__version__}[/bold]")
raise typer.Exit()
class TransportMode(str, Enum):
"""Transport mode for the server."""
SSE = "sse"
STDIO = "stdio"
STREAMABLE_HTTP = "streamable-http"
SHTTP = "shttp" # Short alias for streamable-http
# Define tool-to-example mapping
TOOL_TO_EXAMPLE_MAP: Dict[str, str] = {
# Completion tools
"generate_completion": "simple_completion_demo.py",
"stream_completion": "simple_completion_demo.py",
"chat_completion": "claude_integration_demo.py",
"multi_completion": "multi_provider_demo.py",
# Provider tools
"get_provider_status": "multi_provider_demo.py",
"list_models": "multi_provider_demo.py",
# Document tools
"summarize_document": "document_conversion_and_processing_demo.py",
"extract_entities": "document_conversion_and_processing_demo.py",
"chunk_document": "document_conversion_and_processing_demo.py",
"process_document_batch": "document_conversion_and_processing_demo.py",
"extract_text_from_pdf": "document_conversion_and_processing_demo.py",
"process_image_ocr": "document_conversion_and_processing_demo.py",
# Extraction tools
"extract_json": "advanced_extraction_demo.py",
"extract_table": "advanced_extraction_demo.py",
"extract_key_value_pairs": "advanced_extraction_demo.py",
"extract_semantic_schema": "advanced_extraction_demo.py",
# Entity graph tools
"extract_entity_graph": "entity_relation_graph_demo.py",
# RAG tools
"create_knowledge_base": "rag_example.py",
"add_documents": "rag_example.py",
"retrieve_context": "rag_example.py",
"generate_with_rag": "rag_example.py",
# Classification tools
"text_classification": "text_classification_demo.py",
# Tournament tools
"create_tournament": "tournament_text_demo.py",
"list_tournaments": "tournament_text_demo.py",
"get_tournament_results": "tournament_text_demo.py",
# Optimization tools
"estimate_cost": "cost_optimization.py",
"compare_models": "cost_optimization.py",
"recommend_model": "cost_optimization.py",
# Filesystem tools
"read_file": "filesystem_operations_demo.py",
"write_file": "filesystem_operations_demo.py",
"list_directory": "filesystem_operations_demo.py",
"search_files": "filesystem_operations_demo.py",
# HTML tools
"clean_and_format_text_as_markdown": "html_to_markdown_demo.py",
# Text comparison tools
"compare_documents_redline": "text_redline_demo.py",
# Marqo search tools
"marqo_fused_search": "marqo_fused_search_demo.py",
# SQL tools
"connect_to_database": "sql_database_interactions_demo.py",
"execute_query": "sql_database_interactions_demo.py",
# Audio tools
"transcribe_audio": "audio_transcription_demo.py",
# Browser automation tools
"browser_init": "browser_automation_demo.py",
"execute_web_workflow": "browser_automation_demo.py",
}
# Group examples by category
EXAMPLE_CATEGORIES: Dict[str, List[str]] = {
"text-generation": [
"simple_completion_demo.py",
"claude_integration_demo.py",
"multi_provider_demo.py",
"grok_integration_demo.py",
],
"document-conversion-and-processing": [
"document_conversion_and_processing_demo.py",
"advanced_extraction_demo.py",
],
"search-and-retrieval": [
"rag_example.py",
"vector_search_demo.py",
"advanced_vector_search_demo.py",
"marqo_fused_search_demo.py",
],
"browser-automation": [
"browser_automation_demo.py",
"sse_client_demo.py",
],
"data-analysis": [
"sql_database_interactions_demo.py",
"analytics_reporting_demo.py",
],
"specialized-tools": [
"audio_transcription_demo.py",
"text_redline_demo.py",
"html_to_markdown_demo.py",
"entity_relation_graph_demo.py",
],
"workflows": [
"workflow_delegation_demo.py",
"tool_composition_examples.py",
"research_workflow_demo.py",
],
}
# Define option constants to avoid function calls in default arguments
HOST_OPTION = typer.Option(
None,
"-h",
"--host",
help="[cyan]Host[/cyan] or [cyan]IP address[/cyan] to bind the server to (-h shortcut). Defaults from config.",
rich_help_panel="Server Options",
)
PORT_OPTION = typer.Option(
None,
"-p",
"--port",
help="[cyan]Port[/cyan] to listen on (-p shortcut). Defaults from config.",
rich_help_panel="Server Options",
)
WORKERS_OPTION = typer.Option(
None,
"-w",
"--workers",
help="[cyan]Number of worker[/cyan] processes to spawn (-w shortcut). Defaults from config.",
rich_help_panel="Server Options",
)
TRANSPORT_MODE_OPTION = typer.Option(
TransportMode.STREAMABLE_HTTP,
"-t",
"--transport-mode",
help="[cyan]Transport mode[/cyan] for server communication (-t shortcut). Options: 'sse', 'stdio', 'streamable-http', 'shttp'.",
rich_help_panel="Server Options",
)
DEBUG_OPTION = typer.Option(
False,
"-d",
"--debug",
help="[yellow]Enable debug logging[/yellow] for detailed output (-d shortcut).",
rich_help_panel="Server Options",
)
INCLUDE_TOOLS_OPTION = typer.Option(
None,
"--include-tools",
help="[green]List of tool names to include[/green] when running the server. Adds to the 'Base Toolset' by default, or to all tools if --load-all-tools is used.",
rich_help_panel="Server Options",
)
EXCLUDE_TOOLS_OPTION = typer.Option(
None,
"--exclude-tools",
help="[red]List of tool names to exclude[/red] when running the server. Applies after including tools.",
rich_help_panel="Server Options",
)
LOAD_ALL_TOOLS_OPTION = typer.Option(
False,
"-a",
"--load-all-tools",
help="[yellow]Load all available tools[/yellow] instead of just the default 'Base Toolset' (-a shortcut).",
rich_help_panel="Server Options",
)
CHECK_OPTION = typer.Option(
False,
"-c",
"--check",
help="[yellow]Check API keys[/yellow] for all configured providers (-c shortcut).",
rich_help_panel="Provider Options",
)
MODELS_OPTION = typer.Option(
False,
"--models",
help="[green]List available models[/green] for each provider.",
rich_help_panel="Provider Options",
)
MODEL_OPTION = typer.Option(
None,
"--model",
help="[cyan]Model ID[/cyan] to test (defaults to the provider's default).",
rich_help_panel="Test Options",
)
PROMPT_OPTION = typer.Option(
"Hello, world!",
"--prompt",
help="[magenta]Prompt text[/magenta] to send to the provider.",
rich_help_panel="Test Options",
)
PROVIDER_OPTION = typer.Option(
"openai",
"--provider",
help="[cyan]Provider[/cyan] to use (default: openai)",
rich_help_panel="Completion Options",
)
COMPLETION_MODEL_OPTION = typer.Option(
None,
"--model",
help="[blue]Model ID[/blue] for completion (defaults to the provider's default)",
rich_help_panel="Completion Options",
)
COMPLETION_PROMPT_OPTION = typer.Option(
None,
"--prompt",
help="[magenta]Prompt text[/magenta] for generation (reads from stdin if not provided)",
rich_help_panel="Completion Options",
)
TEMPERATURE_OPTION = typer.Option(
0.7,
"--temperature",
help="[yellow]Sampling temperature[/yellow] (0.0 - 2.0, default: 0.7)",
rich_help_panel="Completion Options",
)
MAX_TOKENS_OPTION = typer.Option(
None,
"--max-tokens",
help="[green]Max tokens[/green] to generate (defaults to provider's setting)",
rich_help_panel="Completion Options",
)
SYSTEM_OPTION = typer.Option(
None,
"--system",
help="[blue]System prompt[/blue] for providers that support it.",
rich_help_panel="Completion Options",
)
STREAM_OPTION = typer.Option(
False,
"-s",
"--stream",
help="[cyan]Stream[/cyan] the response token by token (-s shortcut).",
rich_help_panel="Completion Options",
)
STATUS_OPTION = typer.Option(
True,
"--status",
help="[green]Show cache status[/green]",
rich_help_panel="Cache Options",
)
CLEAR_OPTION = typer.Option(
False,
"--clear",
help="[red]Clear the cache[/red]",
rich_help_panel="Cache Options",
)
DEFAULT_PROVIDERS = ["openai", "anthropic", "deepseek", "gemini", "openrouter"]
PROVIDERS_OPTION = typer.Option(
DEFAULT_PROVIDERS,
"--providers",
help="[cyan]Providers list[/cyan] to benchmark (default: all)",
rich_help_panel="Benchmark Options",
)
BENCHMARK_MODELS_OPTION = typer.Option(
None,
"--models",
help="[blue]Model IDs[/blue] to benchmark (defaults to default model of each provider)",
rich_help_panel="Benchmark Options",
)
BENCHMARK_PROMPT_OPTION = typer.Option(
None,
"--prompt",
help="[magenta]Prompt text[/magenta] to use for benchmarking (default built-in)",
rich_help_panel="Benchmark Options",
)
RUNS_OPTION = typer.Option(
3,
"-r",
"--runs",
help="[green]Number of runs[/green] per provider/model (-r shortcut, default: 3)",
rich_help_panel="Benchmark Options",
)
VERSION_OPTION = typer.Option(
False,
"--version",
"-v",
callback=version_callback,
is_eager=True,
help="[yellow]Show the application version and exit.[/yellow]",
rich_help_panel="Global Options",
)
# Options for tools command
CATEGORY_OPTION = typer.Option(
None,
"--category",
help="[cyan]Filter category[/cyan] when listing tools.",
rich_help_panel="Tools Options",
)
CATEGORY_FILTER_OPTION = typer.Option(
None,
"--category",
help="[cyan]Filter category[/cyan] when listing examples.",
rich_help_panel="Examples Options",
)
# Options for examples command
SHOW_EXAMPLES_OPTION = typer.Option(
False,
"--examples",
help="[magenta]Show example scripts[/magenta] alongside tools.",
rich_help_panel="Tools Options",
)
LIST_OPTION = typer.Option(
False,
"-l",
"--list",
help="[green]List examples[/green] instead of running one (-l shortcut).",
rich_help_panel="Examples Options",
)
@app.command(name="run")
def run(
host: Optional[str] = HOST_OPTION,
port: Optional[int] = PORT_OPTION,
workers: Optional[int] = WORKERS_OPTION,
transport_mode: TransportMode = TRANSPORT_MODE_OPTION,
debug: bool = DEBUG_OPTION,
include_tools: List[str] = INCLUDE_TOOLS_OPTION,
exclude_tools: List[str] = EXCLUDE_TOOLS_OPTION,
load_all_tools: bool = LOAD_ALL_TOOLS_OPTION,
):
"""
[bold green]Run the Ultimate MCP Server[/bold green]
Start the MCP server with configurable networking, performance, and tool options.
The server exposes MCP-protocol compatible endpoints that AI agents can use to
access various tools and capabilities.
By default, only the [yellow]'Base Toolset'[/yellow] is loaded to optimize context window usage.
Use `--load-all-tools` to load all available tools.
Network settings control server accessibility, workers affect concurrency,
and tool filtering lets you customize which capabilities are exposed.
[bold]Examples:[/bold]
[cyan]umcp run --host 0.0.0.0 --port 8000 --workers 4[/cyan] (Runs with Base Toolset)
[cyan]umcp run --load-all-tools --debug[/cyan] (Runs with all tools and debug logging)
[cyan]umcp run --include-tools browser,audio[/cyan] (Adds browser and audio tools to the Base Toolset)
[cyan]umcp run --load-all-tools --exclude-tools filesystem[/cyan] (Loads all tools except filesystem)
"""
# Set debug mode if requested
if debug:
os.environ["LOG_LEVEL"] = "DEBUG"
# Print server info
server_info_str = (
f"Host: [cyan]{host or 'default from config'}[/cyan]\n"
f"Port: [cyan]{port or 'default from config'}[/cyan]\n"
f"Workers: [cyan]{workers or 'default from config'}[/cyan]\n"
f"Transport mode: [cyan]{transport_mode}[/cyan]"
)
# Tool Loading Status
if load_all_tools:
server_info_str += "\nTool Loading: [yellow]All Available Tools[/yellow]"
else:
server_info_str += "\nTool Loading: [yellow]Base Toolset Only[/yellow] (Use --load-all-tools to load all)"
# Format the categories for display
category_lines = []
for category, tools in BASE_TOOLSET_CATEGORIES.items():
category_lines.append(f" [cyan]{category}[/cyan]: {', '.join(tools)}")
server_info_str += "\n [bold]Includes:[/bold]\n" + "\n".join(category_lines)
# Print tool filtering info if enabled
if include_tools or exclude_tools:
server_info_str += "\n[bold]Tool Filtering:[/bold]"
if include_tools:
server_info_str += f"\nIncluding: [cyan]{', '.join(include_tools)}[/cyan]"
if exclude_tools:
server_info_str += f"\nExcluding: [red]{', '.join(exclude_tools)}[/red]"
console.print(Panel(server_info_str, title="[bold blue]Starting Ultimate MCP Server[/bold blue]", expand=False))
console.print() # Add a newline for spacing
# Convert transport_mode enum to string and handle aliases
if transport_mode == TransportMode.SHTTP:
actual_transport_mode = "streamable-http"
else:
# Convert enum to string value (e.g., TransportMode.SSE -> "sse")
actual_transport_mode = transport_mode.value
# Run the server
run_server(
host=host,
port=port,
workers=workers,
transport_mode=actual_transport_mode,
include_tools=include_tools,
exclude_tools=exclude_tools,
load_all_tools=load_all_tools,
)
@app.command(name="providers")
def providers(
check: bool = CHECK_OPTION,
models: bool = MODELS_OPTION,
):
"""
[bold green]List Available Providers[/bold green]
Display configured LLM providers (OpenAI, Anthropic, Gemini, etc.)
with their connection status, default models, and API key validation.
Use this command to verify your configuration, troubleshoot API keys,
or explore available models across all providers.
Usage:
umcp providers # Basic provider listing
umcp providers --check # Validate API keys with providers
umcp providers --models # List available models for each provider
Examples:
umcp providers --check --models # Comprehensive provider diagnostics
"""
asyncio.run(list_providers(check_keys=check, list_models=models))
@app.command(name="test")
def test(
provider: str = typer.Argument(..., help="Provider to test (openai, anthropic, deepseek, gemini)", rich_help_panel="Test Options"),
model: Optional[str] = MODEL_OPTION,
prompt: str = PROMPT_OPTION,
):
"""
[bold green]Test a Specific Provider[/bold green]
Verify connectivity and functionality of an LLM provider by sending a test
prompt and displaying the response. This command performs a full API round-trip
to validate your credentials, model availability, and proper configuration.
The output includes the response text, token counts, cost estimate,
and response time metrics to help diagnose performance issues.
Usage:
umcp test openai # Test default OpenAI model
umcp test anthropic --model claude-3-5-haiku-20241022 # Test specific model
umcp test gemini --prompt "Summarize quantum computing" # Custom prompt
Examples:
umcp test openai # Quick health check with default settings
"""
with console.status(f"[bold green]Testing provider '{provider}'..."):
try:
asyncio.run(test_provider(provider=provider, model=model, prompt=prompt))
except Exception as e:
console.print(Panel(f"Failed to test provider '{provider}':\n{str(e)}", title="[bold red]Test Error[/bold red]", border_style="red"))
raise typer.Exit(code=1) from e
@app.command(name="complete")
def complete(
provider: str = PROVIDER_OPTION,
model: Optional[str] = COMPLETION_MODEL_OPTION,
prompt: Optional[str] = COMPLETION_PROMPT_OPTION,
temperature: float = TEMPERATURE_OPTION,
max_tokens: Optional[int] = MAX_TOKENS_OPTION,
system: Optional[str] = SYSTEM_OPTION,
stream: bool = STREAM_OPTION,
):
"""
[bold green]Generate Text Completion[/bold green]
Request text generation directly from an LLM provider through the CLI.
This command bypasses the server's MCP endpoint and sends requests
directly to the provider's API, useful for testing or quick generations.
Supports input from arguments, stdin (piped content), or interactive prompt.
The command provides full control over provider selection, model choice,
and generation parameters. Results include token counts and cost estimates.
Usage:
echo "Tell me about Mars" | umcp complete # Pipe content as prompt
umcp complete --prompt "Write a haiku" # Direct prompt
umcp complete --provider anthropic --model claude-3-5-sonnet-20241022 # Specify model
umcp complete --temperature 1.5 --max-tokens 250 # Adjust generation params
umcp complete --system "You are a helpful assistant" # Set system prompt
umcp complete --stream # Stream tokens in real-time
Examples:
umcp complete --prompt "Write a haiku about autumn." --stream
"""
# Get prompt from stdin if not provided
if prompt is None:
if sys.stdin.isatty():
console.print("Enter prompt (Ctrl+D to finish):")
prompt = sys.stdin.read().strip()
asyncio.run(
generate_completion(
provider=provider,
model=model,
prompt=prompt,
temperature=temperature,
max_tokens=max_tokens,
system=system,
stream=stream,
)
)
@app.command(name="cache")
def cache(
status: bool = STATUS_OPTION,
clear: bool = CLEAR_OPTION,
):
"""
[bold green]Cache Management[/bold green]
Monitor and maintain the server's response cache system.
Caching stores previous LLM responses to avoid redundant API calls,
significantly reducing costs and latency for repeated or similar requests.
The status view shows backend type, item count, hit rate percentage,
and estimated cost savings from cache hits. Clearing the cache removes
all stored responses, which may be necessary after configuration changes
or to force fresh responses.
Usage:
umcp cache # View cache statistics and status
umcp cache --status # Explicitly request status view
umcp cache --clear # Remove all cached entries (with confirmation)
umcp cache --status --clear # View stats before clearing
Examples:
umcp cache # Check current cache performance and hit rate
"""
should_clear = False
if clear:
if Confirm.ask("[bold yellow]Are you sure you want to clear the cache?[/bold yellow]"):
should_clear = True
else:
console.print("[yellow]Cache clear aborted.[/yellow]")
raise typer.Exit()
# Only run the async part if needed
if status or should_clear:
with console.status("[bold green]Accessing cache..."):
try:
asyncio.run(check_cache(show_status=status, clear=should_clear))
except Exception as e:
console.print(Panel(f"Failed to access cache:\n{str(e)}", title="[bold red]Cache Error[/bold red]", border_style="red"))
raise typer.Exit(code=1) from e
elif not clear: # If only clear was specified but user aborted
pass # Do nothing, already printed message
else:
console.print("Use --status to view status or --clear to clear the cache.")
@app.command(name="benchmark")
def benchmark(
providers: List[str] = PROVIDERS_OPTION,
models: Optional[List[str]] = BENCHMARK_MODELS_OPTION,
prompt: Optional[str] = BENCHMARK_PROMPT_OPTION,
runs: int = RUNS_OPTION,
):
"""
[bold green]Benchmark Providers[/bold green]
Compare performance metrics and costs across different LLM providers and models.
The benchmark sends identical prompts to each selected provider/model combination
and measures response time, token processing speed, and cost per request.
Results are presented in a table format showing average metrics across
multiple runs to ensure statistical validity. This helps identify the
most performant or cost-effective options for your specific use cases.
Usage:
umcp benchmark # Test all configured providers
umcp benchmark --providers openai,anthropic # Test specific providers
umcp benchmark --models gpt-4o,claude-3-5-haiku # Test specific models
umcp benchmark --prompt "Explain quantum computing" --runs 5 # Custom benchmark
Examples:
umcp benchmark --runs 3 --providers openai,gemini # Compare top providers
"""
asyncio.run(benchmark_providers(providers=providers, models=models, prompt=prompt, runs=runs))
@app.command(name="tools")
def tools(
category: Optional[str] = CATEGORY_OPTION,
show_examples: bool = SHOW_EXAMPLES_OPTION,
):
"""
[bold green]List Available Tools[/bold green]
Display the MCP tools registered in the server, organized by functional categories.
These tools represent the server's capabilities that can be invoked by AI agents
through the Model Context Protocol (MCP) interface.
Tools are grouped into logical categories like completion, document processing,
filesystem access, browser automation, and more. For each tool, you can
optionally view associated example scripts that demonstrate its usage patterns.
Usage:
umcp tools # List all tools across all categories
umcp tools --category document # Show only document-related tools
umcp tools --examples # Show example scripts for each tool
Examples:
umcp tools --category filesystem --examples # Learn filesystem tools with examples
"""
# Manually list tools by category for demonstration
tool_categories: Dict[str, List[str]] = {
"completion": [
"generate_completion",
"stream_completion",
"chat_completion",
"multi_completion"
],
"document": [
"summarize_document",
"extract_entities",
"chunk_document",
"process_document_batch"
],
"extraction": [
"extract_json",
"extract_table",
"extract_key_value_pairs",
"extract_semantic_schema"
],
"rag": [
"create_knowledge_base",
"add_documents",
"retrieve_context",
"generate_with_rag"
],
"filesystem": [
"read_file",
"write_file",
"list_directory",
"search_files"
],
"browser": [
"browser_init",
"browser_navigate",
"browser_click",
"execute_web_workflow"
]
}
# Filter by category if specified
if category and category in tool_categories:
categories_to_show = {category: tool_categories[category]}
else:
categories_to_show = tool_categories
# Create Rich table for display
table = Table(title="Ultimate MCP Server Tools")
table.add_column("Category", style="cyan")
table.add_column("Tool", style="green")
if show_examples:
table.add_column("Example Script", style="yellow")
# Add rows to table
for module_name, tool_names in sorted(categories_to_show.items()):
for tool_name in sorted(tool_names):
example_script = TOOL_TO_EXAMPLE_MAP.get(tool_name, "")
if show_examples:
table.add_row(
module_name,
tool_name,
example_script if example_script else "N/A"
)
else:
table.add_row(module_name, tool_name)
console.print(table)
# Print help for running examples
if show_examples:
console.print("\n[bold]Tip:[/bold] Run examples using the command:")
console.print(" [cyan]umcp examples <example_name>[/cyan]")
@app.command(name="examples")
def examples(
example_name: Optional[str] = typer.Argument(None, help="Name of the example to run"),
category: Optional[str] = CATEGORY_FILTER_OPTION,
list_examples: bool = LIST_OPTION,
):
"""
[bold green]Run or List Example Scripts[/bold green]
Browse and execute the demonstration Python scripts included with Ultimate MCP Server.
These examples showcase real-world usage patterns and integration techniques for
different server capabilities, from basic completions to complex workflows.
Examples are organized by functional category (text-generation, document-processing,
browser-automation, etc.) and contain fully functional code that interacts with
a running MCP server. They serve as both educational resources and starting
points for your own implementations.
Usage:
umcp examples # List all available example scripts
umcp examples --list # List-only mode (same as above)
umcp examples --category browser-automation # Filter by category
umcp examples rag_example # Run specific example (with or without .py extension)
umcp examples rag_example.py # Explicit extension version
Examples:
umcp examples simple_completion_demo # Run the basic completion example
"""
# Ensure we have the examples directory
project_root = Path(__file__).parent.parent.parent
examples_dir = project_root / "examples"
if not examples_dir.exists() or not examples_dir.is_dir():
console.print(f"[bold red]Error:[/bold red] Examples directory not found at: {examples_dir}")
console.print(Panel(f"Examples directory not found at: {examples_dir}", title="[bold red]Error[/bold red]", border_style="red"))
return 1
# If just listing examples
if list_examples or not example_name:
# Create Rich table for display
table = Table(title="Ultimate MCP Server Example Scripts")
table.add_column("Category", style="cyan")
table.add_column("Example Script", style="green")
# List available examples by category
for category_name, script_names in sorted(EXAMPLE_CATEGORIES.items()):
for script_name in sorted(script_names):
table.add_row(category_name, script_name)
console.print(table)
# Print help for running examples
console.print("\n[bold]Run an example:[/bold]")
console.print(" [cyan]umcp examples <example_name>[/cyan]")
return 0
# Run the specified example
example_file = None
# Check if .py extension was provided
if example_name.endswith('.py'):
example_path = examples_dir / example_name
if example_path.exists():
example_file = example_path
else:
# Try with .py extension
example_path = examples_dir / f"{example_name}.py"
if example_path.exists():
example_file = example_path
else:
# Try finding in the tool map
if example_name in TOOL_TO_EXAMPLE_MAP:
example_script = TOOL_TO_EXAMPLE_MAP[example_name]
example_path = examples_dir / example_script
if example_path.exists():
example_file = example_path
if not example_file:
console.print(f"[bold red]Error:[/bold red] Example '{example_name}' not found")
console.print(Panel(f"Example script '{example_name}' not found in {examples_dir}", title="[bold red]Error[/bold red]", border_style="red"))
return 1
# Run the example script
console.print(f"[bold blue]Running example:[/bold blue] {example_file.name}")
# Change to the project root directory to ensure imports work
os.chdir(project_root)
# Use subprocess to run the script
import subprocess
try:
# Execute the Python script
result = subprocess.run(
[sys.executable, str(example_file)],
check=True
)
return result.returncode
except subprocess.CalledProcessError as e:
console.print(f"[bold red]Error:[/bold red] Example script failed with exit code {e.returncode}")
console.print(Panel(f"Example script '{example_file.name}' failed with exit code {e.returncode}", title="[bold red]Execution Error[/bold red]", border_style="red"))
return e.returncode
except Exception as e:
console.print(f"[bold red]Error:[/bold red] Failed to run example: {str(e)}")
console.print(Panel(f"Failed to run example '{example_file.name}':\n{str(e)}", title="[bold red]Execution Error[/bold red]", border_style="red"))
return 1
@app.callback()
def main(
version: bool = VERSION_OPTION,
):
"""Ultimate MCP Server - A comprehensive AI agent operating system.
The Ultimate MCP Server provides a unified interface to manage LLM providers,
tools, and capabilities through the Model Context Protocol (MCP). It enables
AI agents to access dozens of powerful capabilities including file operations,
browser automation, document processing, database access, and much more.
This CLI provides commands to:
• Start and configure the server
• Manage and test LLM providers
• Generate text completions directly
• View and clear the response cache
• Benchmark provider performance
• List available tools and capabilities
• Run example scripts demonstrating usage patterns
"""
# This function will be called before any command
pass
def cli():
"""Entry point for CLI package installation.
This function serves as the main entry point when the package is installed
and the 'umcp' command is invoked. It's referenced in pyproject.toml's
[project.scripts] section to create the command-line executable.
"""
app()
if __name__ == "__main__":
app()
```
--------------------------------------------------------------------------------
/examples/tool_composition_examples.py:
--------------------------------------------------------------------------------
```python
"""
Tool composition patterns for MCP servers.
This module demonstrates how to design tools that work together effectively
in sequences and patterns, making it easier for LLMs to understand how to
compose tools for multi-step operations.
"""
import csv
import io
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from error_handling import non_empty_string, validate_inputs, with_error_handling
from tool_annotations import QUERY_TOOL, READONLY_TOOL
from ultimate_mcp_server.exceptions import ToolExecutionError, ToolInputError
from ultimate_mcp_server.tools.document_conversion_and_processing import (
summarize_document_standalone,
)
from ultimate_mcp_server.tools.filesystem import delete_file, read_file, write_file
from ultimate_mcp_server.tools.local_text_tools import run_sed
from ultimate_mcp_server.utils import get_logger
logger = get_logger("tool_composition_examples")
class DocumentProcessingExample:
"""
Example of tool composition for document processing.
This class demonstrates a pattern where multiple tools work together
to process a document through multiple stages:
1. Chunking - Break large document into manageable pieces
2. Analysis - Process each chunk individually
3. Aggregation - Combine results into a final output
This pattern is ideal for working with large documents that exceed
context windows.
"""
def __init__(self, mcp_server):
"""Initialize with an MCP server instance."""
self.mcp = mcp_server
self._register_tools()
def _register_tools(self):
"""Register document processing tools with the MCP server."""
@self.mcp.tool(
description=(
"Split a document into manageable chunks for processing. "
"This is the FIRST step in processing large documents that exceed context windows. "
"After chunking, process each chunk separately with analyze_chunk()."
),
annotations=READONLY_TOOL.to_dict(),
examples=[
{
"name": "Chunk research paper",
"description": "Split a research paper into chunks",
"input": {"document": "This is a long research paper...", "chunk_size": 1000},
"output": {
"chunks": ["Chunk 1...", "Chunk 2..."],
"chunk_count": 2,
"chunk_ids": ["doc123_chunk_1", "doc123_chunk_2"]
}
}
]
)
@with_error_handling
@validate_inputs(document=non_empty_string)
async def chunk_document(
document: str,
chunk_size: int = 1000,
overlap: int = 100,
ctx=None
) -> Dict[str, Any]:
"""
Split a document into manageable chunks for processing.
This tool is the first step in a multi-step document processing workflow:
1. First, chunk the document with chunk_document() (this tool)
2. Then, process each chunk with analyze_chunk()
3. Finally, combine results with aggregate_chunks()
Args:
document: The document text to split
chunk_size: Maximum size of each chunk in characters
overlap: Number of characters to overlap between chunks
ctx: Context object passed by the MCP server
Returns:
Dictionary containing the chunks and their metadata
"""
# Simple chunking strategy - split by character count with overlap
chunks = []
chunk_ids = []
doc_id = f"doc_{hash(document) % 10000}"
# Create chunks with overlap
for i in range(0, len(document), chunk_size - overlap):
chunk_text = document[i:i + chunk_size]
if chunk_text:
chunk_id = f"{doc_id}_chunk_{len(chunks) + 1}"
chunks.append(chunk_text)
chunk_ids.append(chunk_id)
return {
"chunks": chunks,
"chunk_count": len(chunks),
"chunk_ids": chunk_ids,
"document_id": doc_id,
"next_step": "analyze_chunk" # Hint for the next tool to use
}
@self.mcp.tool(
description=(
"Analyze a single document chunk by summarizing it. "
"This is the SECOND step in the document processing workflow. "
"Use after chunk_document() and before aggregate_chunks()."
),
annotations=READONLY_TOOL.to_dict(),
examples=[
{
"name": "Analyze document chunk",
"description": "Analyze a single chunk from a research paper",
"input": {"chunk": "This chunk discusses methodology...", "chunk_id": "doc123_chunk_1"},
"output": {
"analysis": {"key_topics": ["methodology", "experiment design"]},
"chunk_id": "doc123_chunk_1"
}
}
]
)
@with_error_handling
@validate_inputs(chunk=non_empty_string)
async def analyze_chunk(
chunk: str,
chunk_id: str,
analysis_type: str = "general",
ctx=None
) -> Dict[str, Any]:
"""
Analyze a single document chunk by summarizing it.
This tool is the second step in a multi-step document processing workflow:
1. First, chunk the document with chunk_document()
2. Then, process each chunk with analyze_chunk() (this tool)
3. Finally, combine results with aggregate_chunks()
Args:
chunk: The text chunk to analyze
chunk_id: The ID of the chunk (from chunk_document)
analysis_type: Type of analysis to perform
ctx: Context object passed by the MCP server
Returns:
Dictionary containing the analysis results
"""
# --- Call the actual summarize_document tool ---
logger.info(f"Analyzing chunk {chunk_id} with summarize_document...")
try:
# Use a concise summary for chunk analysis
summary_result = await summarize_document_standalone(
document=chunk,
summary_format="key_points", # Use key points for chunk analysis
max_length=100 # Keep chunk summaries relatively short
# We might need to specify provider/model if defaults aren't suitable
)
if summary_result.get("success"):
analysis = {
"summary": summary_result.get("summary", "[Summary Unavailable]"),
"analysis_type": "summary", # Indicate the type of analysis performed
"metrics": { # Include metrics from the summary call
"cost": summary_result.get("cost", 0.0),
"tokens": summary_result.get("tokens", {}),
"processing_time": summary_result.get("processing_time", 0.0)
}
}
logger.success(f"Chunk {chunk_id} analyzed successfully.")
else:
logger.warning(f"Summarize tool failed for chunk {chunk_id}: {summary_result.get('error')}")
analysis = {"error": f"Analysis failed: {summary_result.get('error')}"}
except Exception as e:
logger.error(f"Error calling summarize_document for chunk {chunk_id}: {e}", exc_info=True)
analysis = {"error": f"Analysis error: {str(e)}"}
# -------------------------------------------------
return {
"analysis": analysis,
"chunk_id": chunk_id,
"next_step": "aggregate_chunks" # Hint for the next tool to use
}
@self.mcp.tool(
description=(
"Aggregate analysis results from multiple document chunks. "
"This is the FINAL step in the document processing workflow. "
"Use after analyzing individual chunks with analyze_chunk()."
),
annotations=READONLY_TOOL.to_dict(),
examples=[
{
"name": "Aggregate analysis results",
"description": "Combine analysis results from multiple chunks",
"input": {
"analysis_results": [
{"analysis": {"key_topics": ["methodology"]}, "chunk_id": "doc123_chunk_1"},
{"analysis": {"key_topics": ["results"]}, "chunk_id": "doc123_chunk_2"}
]
},
"output": {
"document_summary": "This document covers methodology and results...",
"overall_statistics": {"total_chunks": 2, "word_count": 2500}
}
}
]
)
@with_error_handling
async def aggregate_chunks(
analysis_results: List[Dict[str, Any]],
aggregation_type: str = "summary",
ctx=None
) -> Dict[str, Any]:
"""
Aggregate analysis results from multiple document chunks.
This tool is the final step in a multi-step document processing workflow:
1. First, chunk the document with chunk_document()
2. Then, process each chunk with analyze_chunk()
3. Finally, combine results with aggregate_chunks() (this tool)
Args:
analysis_results: List of analysis results from analyze_chunk
aggregation_type: Type of aggregation to perform
ctx: Context object passed by the MCP server
Returns:
Dictionary containing the aggregated results
"""
# Validate input
if not analysis_results or not isinstance(analysis_results, list):
return {
"error": "Invalid analysis_results. Must provide a non-empty list of analysis results."
}
# Extract all analyses
all_analyses = [result.get("analysis", {}) for result in analysis_results if "analysis" in result]
total_chunks = len(all_analyses)
# Calculate overall statistics
total_word_count = sum(analysis.get("word_count", 0) for analysis in all_analyses)
all_key_sentences = [sentence for analysis in all_analyses
for sentence in analysis.get("key_sentences", [])]
# Generate summary based on aggregation type
if aggregation_type == "summary":
summary = f"Document contains {total_chunks} chunks with {total_word_count} words total."
if all_key_sentences:
summary += f" Key points include: {' '.join(all_key_sentences[:3])}..."
elif aggregation_type == "sentiment":
# Aggregate sentiment scores if available
sentiment_scores = [analysis.get("sentiment_score", 0.5) for analysis in all_analyses
if "sentiment_score" in analysis]
avg_sentiment = sum(sentiment_scores) / len(sentiment_scores) if sentiment_scores else 0.5
sentiment_label = "positive" if avg_sentiment > 0.6 else "neutral" if avg_sentiment > 0.4 else "negative"
summary = f"Document has an overall {sentiment_label} sentiment (score: {avg_sentiment:.2f})."
else:
summary = f"Aggregated {total_chunks} chunks with {total_word_count} total words."
return {
"document_summary": summary,
"overall_statistics": {
"total_chunks": total_chunks,
"word_count": total_word_count,
"key_sentences_count": len(all_key_sentences)
},
"workflow_complete": True # Indicate this is the end of the workflow
}
# --- Helper: Get a temporary path within allowed storage ---
# Assume storage directory exists and is allowed for this demo context
STORAGE_DIR = Path(__file__).resolve().parent.parent / "storage"
TEMP_DATA_DIR = STORAGE_DIR / "temp_pipeline_data"
async def _setup_temp_data_files():
"""Create temporary data files for the pipeline demo."""
TEMP_DATA_DIR.mkdir(exist_ok=True)
# Sample CSV Data
csv_data = io.StringIO()
writer = csv.writer(csv_data)
writer.writerow(["date", "amount", "category"])
writer.writerow(["2023-01-01", "1,200", "electronics"]) # Note: Amount as string with comma
writer.writerow(["2023-01-02", "950", "clothing"])
writer.writerow(["2023-01-03", "1500", "electronics"])
writer.writerow(["2023-01-04", "800", "food"])
csv_content = csv_data.getvalue()
csv_path = TEMP_DATA_DIR / "temp_sales.csv"
await write_file(path=str(csv_path), content=csv_content) # Use write_file tool implicitly
# Sample JSON Data
json_data = [
{"user_id": 101, "name": "Alice", "active": True, "last_login": "2023-01-10"},
{"user_id": 102, "name": "Bob", "active": False, "last_login": "2022-12-15"},
{"user_id": 103, "name": "Charlie", "active": True, "last_login": "2023-01-05"},
]
json_content = json.dumps(json_data, indent=2)
json_path = TEMP_DATA_DIR / "temp_users.json"
await write_file(path=str(json_path), content=json_content) # Use write_file tool implicitly
return {"csv": str(csv_path), "json": str(json_path)}
async def _cleanup_temp_data_files(temp_files: Dict[str, str]):
"""Remove temporary data files."""
for file_path in temp_files.values():
try:
await delete_file(path=file_path) # Use delete_file tool implicitly
logger.debug(f"Cleaned up temp file: {file_path}")
except Exception as e:
logger.warning(f"Failed to clean up temp file {file_path}: {e}")
try:
# Attempt to remove the directory if empty
if TEMP_DATA_DIR.exists() and not any(TEMP_DATA_DIR.iterdir()):
TEMP_DATA_DIR.rmdir()
logger.debug(f"Cleaned up temp directory: {TEMP_DATA_DIR}")
except Exception as e:
logger.warning(f"Failed to remove temp directory {TEMP_DATA_DIR}: {e}")
# --- End Helper ---
class DataPipelineExample:
"""
Example of tool composition for data processing pipelines.
This class demonstrates a pattern where tools form a processing
pipeline to transform, filter, and analyze data:
1. Fetch - Get data from a source
2. Transform - Clean and process the data
3. Filter - Select relevant data
4. Analyze - Perform analysis on filtered data
This pattern is ideal for working with structured data that
needs multiple processing steps.
"""
def __init__(self, mcp_server):
"""Initialize with an MCP server instance."""
self.mcp = mcp_server
self._register_tools()
def _register_tools(self):
"""Register data pipeline tools with the MCP server."""
@self.mcp.tool(
description=(
"Fetch data from a temporary source file based on type. "
"This is the FIRST step in the data pipeline. "
"Continue with transform_data() to clean the fetched data."
),
annotations=QUERY_TOOL.to_dict(),
examples=[
{
"name": "Fetch CSV data",
"description": "Fetch data from a CSV source",
"input": {"source_type": "csv"},
"output": {
"data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
"record_count": 2,
"schema": {"date": "string", "amount": "number"}
}
}
]
)
@with_error_handling
async def fetch_data(
source_type: str,
limit: Optional[int] = None,
ctx=None
) -> Dict[str, Any]:
"""
Fetch data from a temporary source file based on type.
This tool is the first step in a data processing pipeline:
1. First, fetch data with fetch_data() (this tool) - Creates temp files if needed.
2. Then, clean the data with transform_data()
3. Then, filter the data with filter_data()
4. Finally, analyze the data with analyze_data()
Args:
source_type: Type of data source (csv or json for this demo).
limit: Maximum number of records to fetch/read (applied after reading).
ctx: Context object passed by the MCP server.
Returns:
Dictionary containing the fetched data and metadata
"""
# Ensure temp files exist
temp_files = await _setup_temp_data_files()
source_path = temp_files.get(source_type.lower())
if not source_path:
raise ToolInputError(f"Unsupported source_type for demo: {source_type}. Use 'csv' or 'json'.")
logger.info(f"Fetching data from temporary file: {source_path}")
# Use read_file tool implicitly to get content
read_result = await read_file(path=source_path)
if not read_result.get("success"):
raise ToolExecutionError(f"Failed to read temporary file {source_path}: {read_result.get('error')}")
# Assuming read_file returns content in a predictable way (e.g., result['content'][0]['text'])
# Adjust parsing based on actual read_file output structure
content = read_result.get("content", [])
if not content or not isinstance(content, list) or "text" not in content[0]:
raise ToolExecutionError(f"Unexpected content structure from read_file for {source_path}")
file_content = content[0]["text"]
data = []
schema = {}
try:
if source_type.lower() == "csv":
# Parse CSV data
csv_reader = csv.reader(io.StringIO(file_content))
headers = next(csv_reader)
for row in csv_reader:
if row: # Skip empty rows
data.append(dict(zip(headers, row, strict=False)))
elif source_type.lower() == "json":
# Parse JSON data
data = json.loads(file_content)
else:
# Default dummy data if somehow type is wrong despite check
data = [{"id": i, "value": f"Sample {i}"} for i in range(1, 6)]
except Exception as parse_error:
raise ToolExecutionError(f"Failed to parse content from {source_path}: {parse_error}") from parse_error
# Apply limit if specified AFTER reading/parsing
if limit and limit > 0 and len(data) > limit:
data = data[:limit]
# Infer schema from first record
if data:
first_record = data[0]
for key, value in first_record.items():
value_type = "string"
if isinstance(value, (int, float)):
value_type = "number"
elif isinstance(value, bool):
value_type = "boolean"
schema[key] = value_type
return {
"data": data,
"record_count": len(data),
"schema": schema,
"source_info": {"type": source_type, "path": source_path},
"next_step": "transform_data" # Hint for the next tool to use
}
@self.mcp.tool(
description=(
"Transform and clean data using basic text processing tools (sed). "
"This is the SECOND step in the data pipeline. "
"Use after fetch_data() and before filter_data()."
),
annotations=READONLY_TOOL.to_dict(),
examples=[
{
"name": "Transform sales data",
"description": "Clean and transform sales data",
"input": {
"data": [{"date": "2023-01-01", "amount": "1,200"}, {"date": "2023-01-02", "amount": "950"}],
"transformations": ["convert_dates", "normalize_numbers"]
},
"output": {
"transformed_data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
"transformation_log": ["Converted 2 dates", "Normalized 2 numbers"]
}
}
]
)
@with_error_handling
async def transform_data(
data: List[Dict[str, Any]],
transformations: List[str] = None,
custom_transformations: Dict[str, str] = None,
ctx=None
) -> Dict[str, Any]:
"""
Transform and clean data using basic text processing tools (sed).
This tool is the second step in a data processing pipeline:
1. First, fetch data with fetch_data()
2. Then, clean the data with transform_data() (this tool)
3. Then, filter the data with filter_data()
4. Finally, analyze the data with analyze_data()
Args:
data: List of data records to transform
transformations: List of built-in transformations to apply
custom_transformations: Dictionary of field->transform_expression
ctx: Context object passed by the MCP server
Returns:
Dictionary containing the transformed data and transformation log
"""
# Validate input
if not data or not isinstance(data, list) or not all(isinstance(r, dict) for r in data):
return {
"error": "Invalid data. Must provide a non-empty list of records (dictionaries)."
}
transformation_log = []
# Convert input data (list of dicts) to a string format suitable for sed (e.g., JSON lines)
try:
input_text = "\n".join(json.dumps(record) for record in data)
except Exception as e:
return {"error": f"Could not serialize input data for transformation: {e}"}
current_text = input_text
sed_scripts = [] # Accumulate sed commands
# Apply standard transformations if specified
transformations = transformations or []
for transform in transformations:
if transform == "convert_dates":
# Use sed to replace '/' with '-' in date-like fields (heuristic)
# This is complex with JSON structure, better done after parsing.
# For demo, we apply a simple global substitution (less robust)
sed_scripts.append("s|/|-|g")
transformation_log.append("Applied date conversion (sed: s|/|-|g)")
elif transform == "normalize_numbers":
# Use sed to remove commas from numbers (heuristic)
# Example: "amount": "1,200" -> "amount": "1200"
sed_scripts.append('s/"([a-zA-Z_]+)":"([0-9,]+)"/"\1":"\2"/g; s/,//g') # More complex sed needed
transformation_log.append("Applied number normalization (sed: remove commas)")
# --- Execute accumulated sed scripts ---
if sed_scripts:
# Combine scripts with -e for each
combined_script = " ".join([f"-e '{s}'" for s in sed_scripts])
logger.info(f"Running sed transformation with script: {combined_script}")
try:
sed_result = await run_sed(
args_str=combined_script, # Pass combined script
input_data=current_text
)
if sed_result.get("success"):
current_text = sed_result["stdout"]
logger.success("Sed transformation completed successfully.")
else:
error_msg = sed_result.get("error", "Sed command failed")
logger.error(f"Sed transformation failed: {error_msg}")
return {"error": f"Transformation failed: {error_msg}"}
except Exception as e:
logger.error(f"Error running sed transformation: {e}", exc_info=True)
return {"error": f"Transformation execution error: {e}"}
# --- Attempt to parse back to list of dicts ---
try:
transformed_data = []
for line in current_text.strip().split("\n"):
if line:
record = json.loads(line)
# Post-processing for number normalization (sed only removes commas)
if "normalize_numbers" in transformations:
for key, value in record.items():
if isinstance(value, str) and value.replace(".", "", 1).isdigit():
try:
record[key] = float(value) if "." in value else int(value)
except ValueError:
pass # Keep as string if conversion fails
transformed_data.append(record)
logger.success("Successfully parsed transformed data back to JSON objects.")
except Exception as e:
logger.error(f"Could not parse transformed data back to JSON: {e}", exc_info=True)
# Return raw text if parsing fails
return {
"transformed_data_raw": current_text,
"transformation_log": transformation_log,
"warning": "Could not parse final data back to JSON records"
}
# ---------------------------------------------------
return {
"transformed_data": transformed_data,
"transformation_log": transformation_log,
"record_count": len(transformed_data),
"next_step": "filter_data" # Hint for the next tool to use
}
@self.mcp.tool(
description=(
"Filter data based on criteria. "
"This is the THIRD step in the data pipeline. "
"Use after transform_data() and before analyze_data()."
),
annotations=READONLY_TOOL.to_dict(),
examples=[
{
"name": "Filter data",
"description": "Filter data based on criteria",
"input": {
"data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
"filter_criteria": {"amount": {"$gt": 1000}}
},
"output": {
"filtered_data": [{"date": "2023-01-01", "amount": 1200}],
"filter_criteria": {"amount": {"$gt": 1000}}
}
}
]
)
@with_error_handling
async def filter_data(
data: List[Dict[str, Any]],
filter_criteria: Dict[str, Any],
ctx=None
) -> Dict[str, Any]:
"""
Filter data based on criteria.
This tool is the third step in a data processing pipeline:
1. First, fetch data with fetch_data()
2. Then, clean the data with transform_data()
3. Then, filter the data with filter_data() (this tool)
4. Finally, analyze the data with analyze_data()
Args:
data: List of data records to filter
filter_criteria: Criteria to filter data
ctx: Context object passed by the MCP server
Returns:
Dictionary containing the filtered data and filter criteria
"""
# Filter data based on criteria
filtered_data = [record for record in data if all(record.get(key) == value for key, value in filter_criteria.items())]
return {
"filtered_data": filtered_data,
"filter_criteria": filter_criteria,
"record_count": len(filtered_data)
}
@self.mcp.tool(
description=(
"Analyze data. "
"This is the FINAL step in the data pipeline. "
"Use after filtering data with filter_data()."
),
annotations=READONLY_TOOL.to_dict(),
examples=[
{
"name": "Analyze data",
"description": "Analyze filtered data",
"input": {
"data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
"analysis_type": "summary"
},
"output": {
"analysis_results": [
{"analysis": {"key_topics": ["methodology"]}, "chunk_id": "doc123_chunk_1"},
{"analysis": {"key_topics": ["results"]}, "chunk_id": "doc123_chunk_2"}
],
"analysis_type": "summary"
}
}
]
)
@with_error_handling
async def analyze_data(
data: List[Dict[str, Any]],
analysis_type: str = "summary",
ctx=None
) -> Dict[str, Any]:
"""
Analyze data.
This tool is the final step in a data processing pipeline:
1. First, fetch data with fetch_data()
2. Then, clean the data with transform_data()
3. Then, filter the data with filter_data()
4. Finally, analyze the data with analyze_data() (this tool)
Args:
data: List of data records to analyze
analysis_type: Type of analysis to perform
ctx: Context object passed by the MCP server
Returns:
Dictionary containing the analysis results
"""
# Simulate analysis based on analysis_type
if analysis_type == "summary":
# Aggregate analysis results
analysis_results = [{"analysis": {"key_topics": ["methodology"]}, "chunk_id": "doc123_chunk_1"},
{"analysis": {"key_topics": ["results"]}, "chunk_id": "doc123_chunk_2"}]
else:
# Placeholder for other analysis types
analysis_results = []
return {
"analysis_results": analysis_results,
"analysis_type": analysis_type
}
async def cleanup_pipeline_data(self):
"""Cleans up temporary data files created by fetch_data."""
await _cleanup_temp_data_files({"csv": str(TEMP_DATA_DIR / "temp_sales.csv"), "json": str(TEMP_DATA_DIR / "temp_users.json")})
# Example usage (if this file were run directly or imported)
# async def run_pipeline_example():
# # ... initialize MCP server ...
# pipeline = DataPipelineExample(mcp_server)
# try:
# # ... run pipeline steps ...
# fetch_result = await pipeline.fetch_data(source_type="csv")
# transform_result = await pipeline.transform_data(data=fetch_result['data'])
# # ... etc ...
# finally:
# await pipeline.cleanup_pipeline_data()
# asyncio.run(run_pipeline_example())
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/completion.py:
--------------------------------------------------------------------------------
```python
"""Text completion tools for Ultimate MCP Server."""
import asyncio
import time
from typing import Any, AsyncGenerator, Dict, List, Optional
from ultimate_mcp_server.constants import Provider, TaskType
from ultimate_mcp_server.core.providers.base import get_provider, parse_model_string
from ultimate_mcp_server.exceptions import ProviderError, ToolError, ToolInputError
from ultimate_mcp_server.services.cache import with_cache
from ultimate_mcp_server.tools.base import with_error_handling, with_retry, with_tool_metrics
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.tools.completion")
# --- Tool Functions (Standalone, Decorated) ---
@with_tool_metrics
@with_error_handling
async def generate_completion(
prompt: str,
provider: str = Provider.OPENAI.value,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
stream: bool = False,
json_mode: bool = False,
additional_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Generates a single, complete text response for a given prompt (non-streaming).
Use this tool for single-turn tasks where the entire response is needed at once,
such as answering a question, summarizing text, translating, or classifying content.
It waits for the full response from the LLM before returning.
If you need the response to appear incrementally (e.g., for user interfaces or long generations),
use the `stream_completion` tool instead.
Args:
prompt: The input text prompt for the LLM.
provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini"). Defaults to "openai".
Use `list_models` or `get_provider_status` to see available providers.
model: The specific model ID (e.g., "openai/gpt-4.1-mini", "anthropic/claude-3-5-haiku-20241022").
If None, the provider's default model is used. Use `list_models` to find available IDs.
max_tokens: (Optional) Maximum number of tokens to generate in the response.
temperature: (Optional) Controls response randomness (0.0=deterministic, 1.0=creative). Default 0.7.
stream: Must be False for this tool. Set to True to trigger an error directing to `stream_completion`.
json_mode: (Optional) When True, instructs the model to return a valid JSON response. Default False.
Note: Support and behavior varies by provider.
additional_params: (Optional) Dictionary of additional provider-specific parameters (e.g., `{"top_p": 0.9}`).
Returns:
A dictionary containing the full completion and metadata:
{
"text": "The generated completion text...",
"model": "provider/model-used",
"provider": "provider-name",
"tokens": {
"input": 15,
"output": 150,
"total": 165
},
"cost": 0.000123, # Estimated cost in USD
"processing_time": 1.23, # Execution time in seconds
"success": true
}
Raises:
ToolInputError: If `stream` is set to True.
ProviderError: If the provider is unavailable or the LLM request fails.
ToolError: For other internal errors.
"""
# Streaming not supported for this endpoint
if stream:
raise ToolInputError(
"Streaming is not supported for `generate_completion`. Use the `stream_completion` tool instead.",
param_name="stream",
provided_value=stream
)
start_time = time.time()
# Check if model contains a provider prefix (e.g., "openai/gpt-4.1-mini")
if model:
extracted_provider, extracted_model = parse_model_string(model)
if extracted_provider:
provider = extracted_provider # Override provider with the one from the model string
model = extracted_model # Use the model name without the provider prefix
logger.debug(f"Using provider '{provider}' and model '{model}' extracted from model string")
# Get provider instance
try:
# Use provider name directly, get_provider handles splitting if needed
provider_instance = await get_provider(provider)
except Exception as e:
raise ProviderError(
f"Failed to initialize provider '{provider}': {str(e)}",
provider=provider,
cause=e
) from e
# Set default additional params
additional_params = additional_params or {}
# Conditionally construct parameters for the provider call
params_for_provider = {
"prompt": prompt,
"model": model, # model here is already stripped of provider prefix if applicable
"temperature": temperature,
"json_mode": json_mode,
# messages will be handled by chat_completion, this is for simple completion
}
if max_tokens is not None:
params_for_provider["max_tokens"] = max_tokens
# Merge any other additional_params, ensuring they don't overwrite core params already set
# or ensuring that additional_params are provider-specific and don't conflict.
# A safer merge would be params_for_provider.update({k: v for k, v in additional_params.items() if k not in params_for_provider})
# However, the current **additional_params likely intends to override if keys match, so we keep that behavior for now
# but apply it to the conditionally built dict.
final_provider_params = {**params_for_provider, **additional_params}
try:
# Generate completion
result = await provider_instance.generate_completion(
**final_provider_params
)
# Calculate processing time
processing_time = time.time() - start_time
# Log success
logger.success(
f"Completion generated successfully with {provider}/{result.model}",
emoji_key=TaskType.COMPLETION.value,
tokens={
"input": result.input_tokens,
"output": result.output_tokens
},
cost=result.cost,
time=processing_time
)
# Return standardized result
return {
"text": result.text,
"model": result.model, # Return the actual model used (might differ from input if default)
"provider": provider,
"tokens": {
"input": result.input_tokens,
"output": result.output_tokens,
"total": result.total_tokens,
},
"cost": result.cost,
"processing_time": processing_time,
"success": True
}
except Exception as e:
# Convert to provider error
# Use the potentially prefixed model name in the error context
error_model = model or f"{provider}/default"
raise ProviderError(
f"Completion generation failed for model '{error_model}': {str(e)}",
provider=provider,
model=error_model,
cause=e
) from e
@with_tool_metrics
@with_error_handling
async def stream_completion(
prompt: str,
provider: str = Provider.OPENAI.value,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
json_mode: bool = False,
additional_params: Optional[Dict[str, Any]] = None
) -> AsyncGenerator[Dict[str, Any], None]:
"""Generates a text completion for a prompt and streams the response chunk by chunk.
Use this tool when you need to display the LLM's response progressively as it's generated,
improving perceived responsiveness for users, especially for longer outputs.
If you need the entire response at once, use `generate_completion`.
Args:
prompt: The input text prompt for the LLM.
provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini"). Defaults to "openai".
Use `list_models` or `get_provider_status` to see available providers.
model: The specific model ID (e.g., "openai/gpt-4.1-mini", "anthropic/claude-3-5-haiku-20241022").
If None, the provider's default model is used. Use `list_models` to find available IDs.
max_tokens: (Optional) Maximum number of tokens to generate in the response.
temperature: (Optional) Controls response randomness (0.0=deterministic, 1.0=creative). Default 0.7.
json_mode: (Optional) When True, instructs the model to return a valid JSON response. Default False.
additional_params: (Optional) Dictionary of additional provider-specific parameters (e.g., `{"top_p": 0.9}`).
Yields:
A stream of dictionary chunks. Each chunk contains:
{
"text": "The incremental piece of generated text...",
"chunk_index": 1, # Sequence number of this chunk (starts at 1)
"provider": "provider-name",
"model": "provider/model-used",
"finish_reason": null, # Reason generation stopped (e.g., "stop", "length"), null until the end
"finished": false # True only for the very last yielded dictionary
}
The *final* yielded dictionary will have `finished: true` and may also contain aggregate metadata:
{
"text": "", # Final chunk might be empty
"chunk_index": 10,
"provider": "provider-name",
"model": "provider/model-used",
"finish_reason": "stop",
"finished": true,
"full_text": "The complete generated text...", # Full response concatenated
"processing_time": 5.67, # Total time in seconds
"tokens": { "input": ..., "output": ..., "total": ... }, # Final token counts
"cost": 0.000543 # Final estimated cost
"error": null # Error message if one occurred during streaming
}
Raises:
ProviderError: If the provider is unavailable or the LLM stream request fails initially.
Errors during the stream yield an error message in the final chunk.
"""
start_time = time.time()
# Add MCP annotations for audience and priority
# annotations = { # noqa: F841
# "audience": ["assistant", "user"], # Useful for both assistant and user
# "priority": 0.8 # High priority but not required (generate_completion is the primary tool)
# }
# Check if model contains a provider prefix (e.g., "openai/gpt-4.1-mini")
if model:
extracted_provider, extracted_model = parse_model_string(model)
if extracted_provider:
provider = extracted_provider # Override provider with the one from the model string
model = extracted_model # Use the model name without the provider prefix
logger.debug(f"Using provider '{provider}' and model '{model}' extracted from model string")
# Get provider instance
try:
provider_instance = await get_provider(provider)
except Exception as e:
logger.error(
f"Failed to initialize provider '{provider}': {str(e)}",
emoji_key="error",
provider=provider
)
# Yield a single error chunk if provider init fails
yield {
"error": f"Failed to initialize provider '{provider}': {str(e)}",
"text": None,
"finished": True,
"provider": provider,
"model": model
}
return
# Set default additional params
additional_params = additional_params or {}
logger.info(
f"Starting streaming completion with {provider}",
emoji_key=TaskType.COMPLETION.value,
prompt_length=len(prompt),
json_mode_requested=json_mode # Log the request
)
chunk_count = 0
full_text = ""
final_metadata = {} # To store final metadata like model, cost etc.
error_during_stream = None
actual_model_used = model # Keep track of the actual model used
try:
# Get stream, passing json_mode directly
stream = provider_instance.generate_completion_stream(
prompt=prompt,
model=model,
max_tokens=max_tokens,
temperature=temperature,
json_mode=json_mode, # Pass the flag here
**additional_params
)
async for chunk, metadata in stream:
chunk_count += 1
full_text += chunk
final_metadata.update(metadata) # Keep track of latest metadata
actual_model_used = metadata.get("model", actual_model_used) # Update if metadata provides it
# Yield chunk with metadata
yield {
"text": chunk,
"chunk_index": chunk_count,
"provider": provider,
"model": actual_model_used,
"finish_reason": metadata.get("finish_reason"),
"finished": False,
}
except Exception as e:
error_during_stream = f"Error during streaming after {chunk_count} chunks: {type(e).__name__}: {str(e)}"
logger.error(
f"Error during streaming completion with {provider}/{actual_model_used or 'default'}: {error_during_stream}",
emoji_key="error"
)
# Don't return yet, yield the final chunk with the error
# --- Final Chunk ---
processing_time = time.time() - start_time
# Log completion (success or failure based on error_during_stream)
log_level = logger.error if error_during_stream else logger.success
log_message = f"Streaming completion finished ({chunk_count} chunks)" if not error_during_stream else f"Streaming completion failed after {chunk_count} chunks"
log_level(
log_message,
emoji_key="error" if error_during_stream else "success",
provider=provider,
model=actual_model_used,
tokens={
"input": final_metadata.get("input_tokens"),
"output": final_metadata.get("output_tokens")
},
cost=final_metadata.get("cost"),
time=processing_time,
error=error_during_stream
)
# Yield the final aggregated chunk
yield {
"text": "", # No new text in the final summary chunk
"chunk_index": chunk_count + 1,
"provider": provider,
"model": actual_model_used,
"finish_reason": final_metadata.get("finish_reason"),
"finished": True,
"full_text": full_text,
"processing_time": processing_time,
"tokens": {
"input": final_metadata.get("input_tokens"),
"output": final_metadata.get("output_tokens"),
"total": final_metadata.get("total_tokens")
},
"cost": final_metadata.get("cost"),
"error": error_during_stream
}
@with_tool_metrics
@with_error_handling
async def generate_completion_stream(
prompt: str,
provider: str = Provider.OPENAI.value,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
json_mode: bool = False,
additional_params: Optional[Dict[str, Any]] = None
) -> AsyncGenerator[Dict[str, Any], None]:
"""Generates a text response in a streaming fashion for a given prompt.
Use this tool when you want to display the response as it's being generated
without waiting for the entire response to be completed. It yields chunks of text
as they become available, allowing for more interactive user experiences.
Args:
prompt: The text prompt to send to the LLM.
provider: The LLM provider to use (default: "openai").
model: The specific model to use (if None, uses provider's default).
max_tokens: Maximum tokens to generate in the response.
temperature: Controls randomness in the output (0.0-1.0).
json_mode: Whether to request JSON formatted output from the model.
additional_params: Additional provider-specific parameters.
Yields:
Dictionary containing the generated text chunk and metadata:
{
"text": str, # The text chunk
"metadata": {...}, # Additional information about the generation
"done": bool # Whether this is the final chunk
}
Raises:
ToolError: If an error occurs during text generation.
"""
# Initialize variables to track metrics
start_time = time.time()
try:
# Get provider instance
provider_instance = await get_provider(provider)
if not provider_instance:
raise ValueError(f"Invalid provider: {provider}")
# Add json_mode to additional_params if specified
params = additional_params.copy() if additional_params else {}
if json_mode:
params["json_mode"] = True
# Stream the completion
async for chunk, metadata in provider_instance.generate_completion_stream(
prompt=prompt,
model=model,
max_tokens=max_tokens,
temperature=temperature,
**params
):
# Calculate elapsed time for each chunk
elapsed_time = time.time() - start_time
# Include additional metadata with each chunk
response = {
"text": chunk,
"metadata": {
**metadata,
"elapsed_time": elapsed_time,
},
"done": metadata.get("finish_reason") is not None
}
yield response
except Exception as e:
logger.error(f"Error in generate_completion_stream: {str(e)}", exc_info=True)
raise ToolError(f"Failed to generate streaming completion: {str(e)}") from e
@with_cache(ttl=24 * 60 * 60) # Cache results for 24 hours
@with_tool_metrics
@with_retry(max_retries=2, retry_delay=1.0) # Retry up to 2 times on failure
@with_error_handling
async def chat_completion(
messages: List[Dict[str, Any]],
provider: str = Provider.OPENAI.value,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
system_prompt: Optional[str] = None,
json_mode: bool = False,
additional_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Generates a response within a conversational context (multi-turn chat).
Use this tool for chatbot interactions, instruction following, or any task requiring
the LLM to consider previous turns in a conversation. It takes a list of messages
(user, assistant, system roles) as input.
This tool automatically retries on transient failures and caches results for identical requests
(based on messages, model, etc.) for 24 hours to save costs and time.
Streaming is NOT supported; this tool returns the complete chat response at once.
Args:
messages: A list of message dictionaries representing the conversation history.
Each dictionary must have "role" ("user", "assistant", or "system") and "content" (string).
Example: `[{"role": "user", "content": "Hello!"}, {"role": "assistant", "content": "Hi there!"}]`
provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini"). Defaults to "openai".
model: The specific model ID (e.g., "openai/gpt-4o", "anthropic/claude-3-7-sonnet-20250219").
If None, the provider's default model is used. Use `list_models` to find available IDs.
max_tokens: (Optional) Maximum number of tokens for the *assistant's* response.
temperature: (Optional) Controls response randomness (0.0=deterministic, 1.0=creative). Default 0.7.
system_prompt: (Optional) An initial system message to guide the model's behavior (e.g., persona, instructions).
If provided, it's effectively prepended to the `messages` list as a system message.
json_mode: (Optional) Request structured JSON output from the LLM. Default False.
additional_params: (Optional) Dictionary of additional provider-specific parameters (e.g., `{"top_p": 0.9}`).
Returns:
A dictionary containing the assistant's response message and metadata:
{
"message": {
"role": "assistant",
"content": "The assistant's generated response..."
},
"model": "provider/model-used",
"provider": "provider-name",
"tokens": {
"input": 55, # Includes all input messages
"output": 120, # Assistant's response only
"total": 175
},
"cost": 0.000150, # Estimated cost in USD
"processing_time": 2.50, # Execution time in seconds
"cached_result": false, # True if the result was served from cache
"success": true
}
Raises:
ToolInputError: If the `messages` format is invalid.
ProviderError: If the provider is unavailable or the LLM request fails (after retries).
ToolError: For other internal errors.
"""
start_time = time.time()
# Validate messages format
if not isinstance(messages, list) or not all(isinstance(m, dict) and 'role' in m and 'content' in m for m in messages):
raise ToolInputError(
"Invalid messages format. Must be a list of dictionaries, each with 'role' and 'content'.",
param_name="messages",
provided_value=messages
)
# Prepend system prompt if provided
if system_prompt:
# Avoid modifying the original list if called multiple times
processed_messages = [{"role": "system", "content": system_prompt}] + messages
else:
processed_messages = messages
# Check if model contains a provider prefix (e.g., "openai/gpt-4.1-mini")
if model:
extracted_provider, extracted_model = parse_model_string(model)
if extracted_provider:
provider = extracted_provider # Override provider with the one from the model string
model = extracted_model # Use the model name without the provider prefix
logger.debug(f"Using provider '{provider}' and model '{model}' extracted from model string")
# Get provider instance
try:
provider_instance = await get_provider(provider)
except Exception as e:
raise ProviderError(
f"Failed to initialize provider '{provider}': {str(e)}",
provider=provider,
cause=e
) from e
additional_params = additional_params or {}
# Add json_mode to additional params if specified
if json_mode:
additional_params["json_mode"] = True
try:
result = await provider_instance.generate_completion(
messages=processed_messages,
model=model,
max_tokens=max_tokens,
temperature=temperature,
**additional_params
)
processing_time = time.time() - start_time
logger.success(
f"Chat completion generated successfully with {provider}/{result.model}",
emoji_key=TaskType.CHAT.value,
tokens={
"input": result.input_tokens,
"output": result.output_tokens
},
cost=result.cost,
time=processing_time
)
return {
"message": result.message.dict() if hasattr(result.message, 'dict') else result.message, # Return message as dict
"model": result.model,
"provider": provider,
"tokens": {
"input": result.input_tokens,
"output": result.output_tokens,
"total": result.total_tokens,
},
"cost": result.cost,
"processing_time": processing_time,
# Note: cached_result is automatically added by the @with_cache decorator if applicable
"success": True
}
except Exception as e:
error_model = model or f"{provider}/default"
# Check if the exception has the model attribute, otherwise use the determined error_model
error_model_from_exception = getattr(e, 'model', None)
final_error_model = error_model_from_exception or error_model
raise ProviderError(
f"Chat completion generation failed for model '{final_error_model}': {str(e)}",
provider=provider,
model=final_error_model,
cause=e
) from e
@with_cache(ttl=7 * 24 * 60 * 60) # Cache results for 7 days
@with_tool_metrics
@with_error_handling # Error handling should be used
async def multi_completion(
prompt: str,
providers: List[Dict[str, Any]],
max_concurrency: int = 3,
timeout: Optional[float] = 30.0
) -> Dict[str, Any]:
"""Generates completions for the same prompt from multiple LLM providers/models concurrently.
Use this tool to compare responses, latency, or cost across different models for a specific prompt.
It runs requests in parallel up to `max_concurrency`.
Results are cached for 7 days based on the prompt and provider configurations.
Args:
prompt: The input text prompt to send to all specified models.
providers: A list of dictionaries, each specifying a provider and model configuration.
Example: `[{"provider": "openai", "model": "gpt-4.1-mini"}, {"provider": "anthropic", "model": "claude-3-5-haiku-20241022", "max_tokens": 50}]`
Each dict must contain at least "provider". "model" is optional (uses provider default).
Other valid parameters for `generate_completion` (like `max_tokens`, `temperature`) can be included.
max_concurrency: (Optional) Maximum number of provider requests to run in parallel. Default 3.
timeout: (Optional) Maximum time in seconds to wait for each individual provider request. Default 30.0.
Requests exceeding this time will result in a timeout error for that specific provider.
Returns:
A dictionary containing results from each provider, along with aggregate statistics:
{
"results": {
"openai/gpt-4.1-mini": { # Keyed by provider/model
"text": "Response from OpenAI...",
"model": "openai/gpt-4.1-mini",
"provider": "openai",
"tokens": { ... },
"cost": 0.000123,
"processing_time": 1.5,
"success": true,
"error": null
},
"anthropic/claude-3-5-haiku-20241022": {
"text": null,
"model": "anthropic/claude-3-5-haiku-20241022",
"provider": "anthropic",
"tokens": null,
"cost": 0.0,
"processing_time": 30.0,
"success": false,
"error": "Request timed out after 30.0 seconds"
},
...
},
"aggregate_stats": {
"total_requests": 2,
"successful_requests": 1,
"failed_requests": 1,
"total_cost": 0.000123,
"total_processing_time": 30.1, # Total wall time for the concurrent execution
"average_processing_time": 1.5 # Average time for successful requests
},
"cached_result": false # True if the entire multi_completion result was cached
}
Raises:
ToolInputError: If the `providers` list format is invalid.
ToolError: For other internal errors during setup.
Individual provider errors are captured within the "results" dictionary.
"""
start_time = time.time()
# Validate providers format
if not isinstance(providers, list) or not all(isinstance(p, dict) and 'provider' in p for p in providers):
raise ToolInputError(
"Invalid providers format. Must be a list of dictionaries, each with at least a 'provider' key.",
param_name="providers",
provided_value=providers
)
results = {}
tasks = []
semaphore = asyncio.Semaphore(max_concurrency)
total_cost = 0.0
successful_requests = 0
failed_requests = 0
successful_times = []
async def process_provider(provider_config):
nonlocal total_cost, successful_requests, failed_requests, successful_times
provider_name = provider_config.get("provider")
model_name = provider_config.get("model")
# Create a unique key for results dictionary, handling cases where model might be None initially
result_key = f"{provider_name}/{model_name or 'default'}"
async with semaphore:
provider_start_time = time.time()
error_message = None
result_data = None
actual_model_used = model_name # Store the actual model reported by the result
try:
# Extract specific params for generate_completion
completion_params = {k: v for k, v in provider_config.items() if k not in ["provider"]}
completion_params["prompt"] = prompt # Add the common prompt
logger.debug(f"Calling generate_completion for {provider_name} / {model_name or 'default'}...")
# Call generate_completion with timeout
completion_task = generate_completion(provider=provider_name, **completion_params)
result_data = await asyncio.wait_for(completion_task, timeout=timeout)
provider_processing_time = time.time() - provider_start_time
if result_data and result_data.get("success"):
cost = result_data.get("cost", 0.0)
total_cost += cost
successful_requests += 1
successful_times.append(provider_processing_time)
actual_model_used = result_data.get("model") # Get the actual model used
logger.info(f"Success from {result_key} in {provider_processing_time:.2f}s")
else:
failed_requests += 1
error_message = result_data.get("error", "Unknown error during completion") if isinstance(result_data, dict) else "Invalid result format"
logger.warning(f"Failure from {result_key}: {error_message}")
except asyncio.TimeoutError:
provider_processing_time = time.time() - provider_start_time
failed_requests += 1
error_message = f"Request timed out after {timeout:.1f} seconds"
logger.warning(f"Timeout for {result_key} after {timeout:.1f}s")
except ProviderError as pe:
provider_processing_time = time.time() - provider_start_time
failed_requests += 1
error_message = f"ProviderError: {str(pe)}"
logger.warning(f"ProviderError for {result_key}: {str(pe)}")
actual_model_used = pe.model # Get model from exception if available
except Exception as e:
provider_processing_time = time.time() - provider_start_time
failed_requests += 1
error_message = f"Unexpected error: {type(e).__name__}: {str(e)}"
logger.error(f"Unexpected error for {result_key}: {e}", exc_info=True)
# Store result or error
# Use the potentially updated result_key
results[result_key] = {
"text": result_data.get("text") if result_data else None,
"model": actual_model_used, # Use the actual model name from result or exception
"provider": provider_name,
"tokens": result_data.get("tokens") if result_data else None,
"cost": result_data.get("cost", 0.0) if result_data else 0.0,
"processing_time": provider_processing_time,
"success": error_message is None,
"error": error_message
}
# Create tasks
for config in providers:
task = asyncio.create_task(process_provider(config))
tasks.append(task)
# Wait for all tasks to complete
await asyncio.gather(*tasks)
total_processing_time = time.time() - start_time
average_processing_time = sum(successful_times) / len(successful_times) if successful_times else 0.0
logger.info(
f"Multi-completion finished. Success: {successful_requests}, Failed: {failed_requests}, Total Cost: ${total_cost:.6f}, Total Time: {total_processing_time:.2f}s",
emoji_key="info"
)
return {
"results": results,
"aggregate_stats": {
"total_requests": len(providers),
"successful_requests": successful_requests,
"failed_requests": failed_requests,
"total_cost": total_cost,
"total_processing_time": total_processing_time,
"average_processing_time": average_processing_time
}
# Note: cached_result is added by decorator
}
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/html_to_markdown.py:
--------------------------------------------------------------------------------
```python
"""HTML to Markdown conversion tools for Ultimate MCP Server."""
import re
import time
from typing import Any, Dict, List
import html2text
import readability
import trafilatura
from bs4 import BeautifulSoup
from markdownify import markdownify as md
from ultimate_mcp_server.exceptions import ToolInputError
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.tools.html_to_markdown")
# --- Helper Functions ---
def _is_html_fragment(text: str) -> bool:
"""Detect if text is likely an HTML fragment.
Args:
text: Input text to check
Returns:
bool: True if the text appears to be HTML, False otherwise
"""
# Simple heuristics to check if the text contains HTML
html_patterns = [
r"<\s*[a-zA-Z]+[^>]*>", # Basic HTML tag pattern
r"<\s*/\s*[a-zA-Z]+\s*>", # Closing HTML tag
r"&[a-zA-Z]+;", # HTML entities
r"<!\s*DOCTYPE", # DOCTYPE declaration
r"<!\s*--", # HTML comment
r"style\s*=\s*['\"]", # style attribute
r"class\s*=\s*['\"]", # class attribute
r"id\s*=\s*['\"]", # id attribute
r"href\s*=\s*['\"]", # href attribute
r"src\s*=\s*['\"]", # src attribute
]
# Check if the text matches any of the patterns
for pattern in html_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False
def _clean_html_with_beautifulsoup(html: str) -> str:
"""Clean HTML using BeautifulSoup.
Args:
html: HTML content to clean
Returns:
Cleaned HTML string with unwanted elements removed
"""
try:
soup = BeautifulSoup(html, 'html.parser')
# Remove unwanted elements
for element in soup(['script', 'style', 'svg', 'iframe', 'canvas', 'noscript']):
element.decompose()
# Remove base64 data attributes and other potentially problematic attributes
for tag in soup.find_all(True):
for attr in list(tag.attrs):
# Clean data URLs
if attr == 'src' and isinstance(tag.attrs[attr], str) and 'data:' in tag.attrs[attr]:
del tag.attrs[attr]
# Remove other problematic attributes
elif attr.startswith('on') or attr == 'style' or attr.startswith('data-'):
del tag.attrs[attr]
return str(soup)
except Exception as e:
logger.warning(f"Error cleaning HTML with BeautifulSoup: {str(e)}")
# If BeautifulSoup fails, return the original HTML
return html
def _html_to_markdown_with_html2text(html: str) -> str:
"""Convert HTML to Markdown using html2text.
Args:
html: HTML content to convert
Returns:
Markdown formatted text
"""
try:
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = False
h.ignore_tables = False
h.unicode_snob = True # Use Unicode instead of ASCII
h.body_width = 0 # No wrapping
return h.handle(html)
except Exception as e:
logger.warning(f"Error converting HTML to Markdown with html2text: {str(e)}")
# If html2text fails, try a simpler approach
return html
def _html_to_markdown_with_markdownify(html: str) -> str:
"""Convert HTML to Markdown using markdownify.
Args:
html: HTML content to convert
Returns:
Markdown formatted text
"""
try:
return md(html, heading_style="ATX")
except Exception as e:
logger.warning(f"Error converting HTML to Markdown with markdownify: {str(e)}")
# If markdownify fails, try a simpler approach
return html
def _extract_content_with_readability(html: str) -> str:
"""Extract main content from HTML using readability.
Args:
html: HTML content to process
Returns:
HTML string containing only the main content
"""
try:
doc = readability.Document(html)
content = doc.summary()
return content
except Exception as e:
logger.warning(f"Error extracting content with readability: {str(e)}")
# If readability fails, return the original HTML
return html
def _extract_content_with_trafilatura(html: str) -> str:
"""Extract main content from HTML using trafilatura.
Args:
html: HTML content to process
Returns:
Extracted text content
"""
try:
extracted_text = trafilatura.extract(html, include_comments=False, include_tables=True)
if extracted_text:
return extracted_text
# Fall back to HTML extraction if text extraction fails
extracted_html = trafilatura.extract(html, output_format='html', include_comments=False, include_tables=True)
return extracted_html or html
except Exception as e:
logger.warning(f"Error extracting content with trafilatura: {str(e)}")
# If trafilatura fails, return the original HTML
return html
def _sanitize_markdown(markdown: str) -> str:
"""Clean up and format the markdown to be more readable.
Args:
markdown: Markdown text to sanitize
Returns:
Cleaned markdown text
"""
# Fix excessive newlines (more than 2 consecutive)
sanitized = re.sub(r'\n{3,}', '\n\n', markdown)
# Fix list item spacing
sanitized = re.sub(r'(\n[*-].*\n)(?!\n)', r'\1\n', sanitized)
# Remove trailing whitespace from lines
sanitized = re.sub(r' +$', '', sanitized, flags=re.MULTILINE)
# Fix markdown heading formatting (ensure space after #)
sanitized = re.sub(r'(^|\n)(#{1,6})([^#\s])', r'\1\2 \3', sanitized)
# Fix code block formatting
sanitized = re.sub(r'```\s*\n', '```\n', sanitized)
sanitized = re.sub(r'\n\s*```', '\n```', sanitized)
# Ensure proper code block syntax (start with language or leave empty)
sanitized = re.sub(r'```([^a-zA-Z\s\n][^`\n]*)$', '```\n\\1', sanitized, flags=re.MULTILINE)
# Normalize list indicators (consistent use of - or * for unordered lists)
sanitized = re.sub(r'^[*+] ', '- ', sanitized, flags=re.MULTILINE)
return sanitized
def _improve_markdown_formatting(markdown: str) -> str:
"""Improve the formatting of the markdown to make it more readable.
Args:
markdown: Markdown text to improve
Returns:
Improved markdown text
"""
# Ensure proper spacing for headings
improved = re.sub(r'(\n#{1,6}[^\n]+)(\n[^\n#])', r'\1\n\2', markdown)
# Ensure paragraphs have proper spacing
improved = re.sub(r'(\n[^\s#>*-][^\n]+)(\n[^\s#>*-])', r'\1\n\2', improved)
# Fix blockquote formatting
improved = re.sub(r'(\n>[ ][^\n]+)(\n[^>\s])', r'\1\n\2', improved)
# Fix nested list formatting
improved = re.sub(r'(\n[ ]{2,}[*-][ ][^\n]+)(\n[^\s*-])', r'\1\n\2', improved)
# Add horizontal rules for clear section breaks (if large content gaps exist)
improved = re.sub(r'\n\n\n\n+', '\n\n---\n\n', improved)
return improved
def _convert_html_tables_to_markdown(html: str) -> str:
"""Specifically handle HTML tables and convert them to markdown tables.
Args:
html: HTML content with tables to convert
Returns:
Markdown text with properly formatted tables
"""
try:
soup = BeautifulSoup(html, 'html.parser')
tables = soup.find_all('table')
# If no tables, return original HTML
if not tables:
return html
for table in tables:
rows = table.find_all('tr')
if not rows:
continue
markdown_table = []
# Process header row
header_cells = rows[0].find_all(['th', 'td'])
if header_cells:
header_row = '| ' + ' | '.join([cell.get_text().strip() for cell in header_cells]) + ' |'
markdown_table.append(header_row)
# Add separator row
separator_row = '| ' + ' | '.join(['---' for _ in header_cells]) + ' |'
markdown_table.append(separator_row)
# Process data rows
for row in rows[1:]:
cells = row.find_all('td')
if cells:
data_row = '| ' + ' | '.join([cell.get_text().strip() for cell in cells]) + ' |'
markdown_table.append(data_row)
# Replace the table with its markdown equivalent
table_html = str(table)
table_markdown = '\n'.join(markdown_table)
html = html.replace(table_html, table_markdown)
return html
except Exception as e:
logger.warning(f"Error converting HTML tables to Markdown: {str(e)}")
# If conversion fails, return the original HTML
return html
# --- Main Tool Function ---
@with_tool_metrics
@with_error_handling
async def clean_and_format_text_as_markdown(
text: str,
force_markdown_conversion: bool = False,
extraction_method: str = "auto",
preserve_tables: bool = True,
preserve_links: bool = True,
preserve_images: bool = False,
max_line_length: int = 0 # 0 means no wrapping
) -> Dict[str, Any]:
"""Converts plain text or HTML to clean, well-formatted markdown.
Automatically detects if input is HTML, then cleans and converts it.
For non-HTML text, it applies minimal formatting to create valid markdown.
Args:
text: The input text to clean and format (plain text or HTML).
force_markdown_conversion: Whether to force markdown conversion even if the text doesn't
look like HTML. Default is False.
extraction_method: Method to extract content from HTML. Options:
- "auto": Automatically choose the best method
- "readability": Use Mozilla's Readability algorithm
- "trafilatura": Use trafilatura library
- "raw": Don't extract main content, convert the whole document
Default is "auto".
preserve_tables: Whether to preserve and convert HTML tables to markdown tables.
Default is True.
preserve_links: Whether to preserve and convert HTML links to markdown links.
Default is True.
preserve_images: Whether to preserve and convert HTML images to markdown image syntax.
Default is False.
max_line_length: Maximum line length for text wrapping. 0 means no wrapping.
Default is 0.
Returns:
Dictionary containing:
{
"markdown_text": "Cleaned and formatted markdown text",
"was_html": true, # Whether the input was detected as HTML
"extraction_method_used": "readability", # Which extraction method was used
"processing_time": 0.35, # Time taken in seconds
"success": true
}
Raises:
ToolInputError: If the input text is empty or not a string.
"""
start_time = time.time()
# Input validation
if not text:
raise ToolInputError("Input text cannot be empty")
if not isinstance(text, str):
raise ToolInputError("Input text must be a string")
# Determine if input is HTML
is_html = _is_html_fragment(text) or force_markdown_conversion
# Process based on content type
if is_html:
logger.info("Input detected as HTML, processing for conversion to markdown")
# Convert HTML tables to markdown before main processing
if preserve_tables:
text = _convert_html_tables_to_markdown(text)
# Extract main content based on specified method
extraction_method_used = extraction_method
if extraction_method == "auto":
# If the text is a small fragment, use raw conversion
if len(text) < 1000:
extraction_method_used = "raw"
else:
# Try trafilatura first, fallback to readability
try:
extracted = _extract_content_with_trafilatura(text)
if extracted and len(extracted) > 0.2 * len(text): # Ensure we got meaningful extraction
text = extracted
extraction_method_used = "trafilatura"
else:
text = _extract_content_with_readability(text)
extraction_method_used = "readability"
except Exception:
text = _extract_content_with_readability(text)
extraction_method_used = "readability"
elif extraction_method == "readability":
text = _extract_content_with_readability(text)
elif extraction_method == "trafilatura":
text = _extract_content_with_trafilatura(text)
# For "raw", we use the text as is
# Clean HTML before conversion
text = _clean_html_with_beautifulsoup(text)
# Set up conversion options based on parameters
h = html2text.HTML2Text()
h.ignore_links = not preserve_links
h.ignore_images = not preserve_images
h.ignore_tables = not preserve_tables
h.body_width = max_line_length
h.unicode_snob = True
# Try multiple conversion methods and use the best result
try:
markdown_text = h.handle(text)
# Fallback to markdownify if html2text result looks problematic
if '<' in markdown_text or '>' in markdown_text or len(markdown_text.strip()) < 100 and len(text) > 500:
try:
alternative = _html_to_markdown_with_markdownify(text)
if len(alternative.strip()) > len(markdown_text.strip()):
markdown_text = alternative
except Exception:
pass
except Exception as e:
logger.warning(f"Primary markdown conversion failed: {str(e)}")
try:
markdown_text = _html_to_markdown_with_markdownify(text)
except Exception:
# Last resort: strip tags and return plain text
markdown_text = re.sub(r'<[^>]*>', '', text)
else:
logger.info("Input detected as plain text, applying minimal markdown formatting")
# For plain text, just clean it up a bit
markdown_text = text
extraction_method_used = "none"
# Final cleanup and formatting of the markdown
markdown_text = _sanitize_markdown(markdown_text)
markdown_text = _improve_markdown_formatting(markdown_text)
processing_time = time.time() - start_time
logger.info(f"Text cleaned and formatted as markdown in {processing_time:.2f}s")
return {
"markdown_text": markdown_text,
"was_html": is_html,
"extraction_method_used": extraction_method_used,
"processing_time": processing_time,
"success": True
}
# --- Additional Tool Functions ---
@with_tool_metrics
@with_error_handling
async def detect_content_type(text: str) -> Dict[str, Any]:
"""Analyzes text to detect its type: HTML, markdown, code, or plain text.
Applies multiple heuristics to determine the most likely content type
of the provided text string.
Args:
text: The input text to analyze
Returns:
Dictionary containing:
{
"content_type": "html", # One of: "html", "markdown", "code", "plain_text"
"confidence": 0.85, # Confidence score (0.0-1.0)
"details": {
"html_markers": 12, # Count of HTML markers found
"markdown_markers": 3, # Count of markdown markers found
"code_markers": 1, # Count of code markers found
"detected_language": "javascript" # If code is detected
},
"success": true
}
Raises:
ToolInputError: If the input text is empty or not a string.
"""
if not text:
raise ToolInputError("Input text cannot be empty")
if not isinstance(text, str):
raise ToolInputError("Input text must be a string")
# Initialize counters for markers
html_markers = 0
markdown_markers = 0
code_markers = 0
detected_language = None
# Check for HTML markers
html_patterns = [
(r"<\s*[a-zA-Z]+[^>]*>", 1), # HTML tag
(r"<\s*/\s*[a-zA-Z]+\s*>", 1), # Closing HTML tag
(r"&[a-zA-Z]+;", 0.5), # HTML entity
(r"<!\s*DOCTYPE", 2), # DOCTYPE
(r"<!\s*--", 1), # HTML comment
(r"<!--.*?-->", 1), # Complete HTML comment
(r"<(div|span|p|a|img|table|ul|ol|li|h[1-6])\b", 1.5), # Common HTML tags
(r"</(div|span|p|a|img|table|ul|ol|li|h[1-6])>", 1.5), # Common closing tags
(r"<(html|head|body|meta|link|script|style)\b", 2), # Structure tags
(r"</(html|head|body|script|style)>", 2), # Structure closing tags
(r"style\s*=\s*['\"]", 1), # style attribute
(r"class\s*=\s*['\"]", 1), # class attribute
(r"id\s*=\s*['\"]", 1), # id attribute
(r"href\s*=\s*['\"]", 1), # href attribute
(r"src\s*=\s*['\"]", 1) # src attribute
]
for pattern, weight in html_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
html_markers += len(matches) * weight
# Check for Markdown markers
markdown_patterns = [
(r"^#\s+.+$", 2), # Heading level 1
(r"^#{2,6}\s+.+$", 1.5), # Headings levels 2-6
(r"^\s*[*-]\s+.+$", 1), # Unordered list
(r"^\s*\d+\.\s+.+$", 1), # Ordered list
(r"^\s*>\s+.+$", 1.5), # Blockquote
(r"\[.+?\]\(.+?\)", 2), # Link
(r"!\[.+?\]\(.+?\)", 2), # Image
(r"`[^`\n]+`", 1), # Inline code
(r"^```\s*\w*$", 2), # Code block start
(r"^```$", 2), # Code block end
(r"\*\*.+?\*\*", 1), # Bold
(r"\*.+?\*", 0.5), # Italic
(r"__(.+?)__", 1), # Bold with underscore
(r"_(.+?)_", 0.5), # Italic with underscore
(r"~~.+?~~", 1), # Strikethrough
(r"^\s*[-*_]{3,}\s*$", 1.5), # Horizontal rule
(r"^\s*\|(.+\|)+\s*$", 2), # Table row
(r"^\s*\|([-:]+\|)+\s*$", 3) # Table header/divider
]
for pattern, weight in markdown_patterns:
matches = re.findall(pattern, text, re.MULTILINE)
markdown_markers += len(matches) * weight
# Check for code markers
code_patterns = [
(r"function\s+\w+\s*\(.*?\)\s*\{", 2), # Function declaration
(r"(var|let|const)\s+\w+\s*=", 1.5), # Variable declaration JS
(r"if\s*\(.*?\)\s*\{", 1), # If statement
(r"for\s*\(.*?;.*?;.*?\)\s*\{", 2), # For loop
(r"while\s*\(.*?\)\s*\{", 2), # While loop
(r"class\s+\w+(\s+extends\s+\w+)?\s*\{", 2), # Class declaration
(r"import\s+.*?from\s+['\"].*?['\"]", 2), # ES6 Import
(r"def\s+\w+\s*\(.*?\):", 2), # Python function
(r"class\s+\w+(\(\w+\))?:", 2), # Python class
(r"import\s+\w+(\s+as\s+\w+)?", 1.5), # Python import
(r"from\s+\w+(\.\w+)*\s+import", 1.5), # Python from import
(r"public\s+(static\s+)?(void|int|String)\s+\w+\s*\(", 2), # Java method
(r"#include\s*<.*?>", 2), # C/C++ include
(r"^\s*package\s+[\w\.]+;", 2), # Java/Kotlin package
(r"^\s*using\s+[\w\.]+;", 2), # C# using
(r"^\s*(public|private|protected)\s+class", 2) # Access modifier
]
for pattern, weight in code_patterns:
matches = re.findall(pattern, text, re.MULTILINE)
code_markers += len(matches) * weight
# Detect programming language if it looks like code
if code_markers > 5:
# Very basic language detection based on unique syntax
language_patterns = [
(r"function\s+\w+|var\s+\w+|let\s+\w+|const\s+\w+|document\.|\$\(", "javascript"),
(r"<\?php|\$[a-zA-Z_]", "php"),
(r"def\s+\w+\s*\(.*?\):|import\s+\w+|from\s+\w+\s+import", "python"),
(r"public\s+class\s+\w+|public\s+static\s+void\s+main", "java"),
(r"#include\s*<.*?>|int\s+main\s*\(", "c/c++"),
(r"^\s*using\s+System;|namespace\s+\w+|public\s+class\s+\w+\s*:", "c#"),
(r"module\s+\w+|fn\s+\w+|let\s+\w+|impl", "rust"),
(r"^\s*import\s+\w+\s+from\s+['\"]|export\s+(default\s+)?", "typescript"),
(r"^package\s+main|func\s+\w+\(|import\s+\([^)]*\)", "go")
]
for pattern, lang in language_patterns:
if re.search(pattern, text, re.MULTILINE | re.IGNORECASE):
detected_language = lang
break
# Calculate final scores and confidence
html_score = html_markers / max(len(text) / 100, 1)
markdown_score = markdown_markers / max(len(text.split('\n')), 1)
code_score = code_markers / max(len(text.split('\n')), 1)
# Plain text has no specific markers, so it's the default fallback
plain_text_score = 1.0 - max(min(html_score / 10, 1), min(markdown_score / 5, 1), min(code_score / 5, 1))
# Determine the content type
scores = {
"html": html_score,
"markdown": markdown_score,
"code": code_score,
"plain_text": plain_text_score
}
content_type = max(scores, key=scores.get)
max_score = scores[content_type]
# Calculate confidence based on how dominant the max score is
total_score = sum(scores.values())
if total_score > 0:
confidence = max_score / total_score
else:
confidence = 0.25 # Equal probability for all types
# Adjust confidence if very few markers were found
if content_type != "plain_text" and (html_markers + markdown_markers + code_markers) < 3:
confidence *= 0.7
return {
"content_type": content_type,
"confidence": min(confidence, 1.0),
"details": {
"html_markers": html_markers,
"markdown_markers": markdown_markers,
"code_markers": code_markers,
"detected_language": detected_language if content_type == "code" else None
},
"success": True
}
@with_tool_metrics
@with_error_handling
async def batch_format_texts(
texts: List[str],
force_markdown_conversion: bool = False,
extraction_method: str = "auto",
max_concurrency: int = 5,
preserve_tables: bool = True
) -> Dict[str, Any]:
"""Processes multiple text inputs in parallel, converting each to markdown.
Efficiently handles a batch of text inputs by processing them concurrently
up to a specified concurrency limit.
Args:
texts: List of text strings to clean and format.
force_markdown_conversion: Whether to force markdown conversion for all inputs.
Default is False.
extraction_method: Method to extract content from HTML. Options:
- "auto": Automatically choose the best method
- "readability": Use Mozilla's Readability algorithm
- "trafilatura": Use trafilatura library
- "raw": Don't extract main content, convert the whole document
Default is "auto".
max_concurrency: Maximum number of texts to process simultaneously.
Default is 5.
preserve_tables: Whether to preserve and convert HTML tables to markdown tables.
Default is True.
Returns:
Dictionary containing:
{
"results": [
{
"markdown_text": "Cleaned and formatted markdown text",
"was_html": true,
"extraction_method_used": "readability"
},
...
],
"total_processing_time": 2.45, # Total time in seconds
"success_count": 5, # Number of successfully processed texts
"failure_count": 0, # Number of failed texts
"success": true
}
Raises:
ToolInputError: If the input list is empty or not a list of strings.
"""
import asyncio
start_time = time.time()
# Input validation
if not texts:
raise ToolInputError("Input texts list cannot be empty")
if not isinstance(texts, list):
raise ToolInputError("Input must be a list of text strings")
# Set up concurrency control
semaphore = asyncio.Semaphore(max_concurrency)
async def process_text(text, index):
"""Process a single text with semaphore control."""
async with semaphore:
try:
result = await clean_and_format_text_as_markdown(
text=text,
force_markdown_conversion=force_markdown_conversion,
extraction_method=extraction_method,
preserve_tables=preserve_tables
)
result["index"] = index # Add original index for ordering
return result
except Exception as e:
logger.error(f"Error processing text at index {index}: {str(e)}")
return {
"index": index,
"error": str(e),
"success": False
}
# Process all texts concurrently
tasks = [process_text(text, i) for i, text in enumerate(texts)]
results = await asyncio.gather(*tasks)
# Sort results by original index
sorted_results = sorted(results, key=lambda x: x.get("index", 0))
# Remove index from results
for result in sorted_results:
if "index" in result:
del result["index"]
# Calculate statistics
success_count = sum(1 for result in sorted_results if result.get("success", False))
failure_count = len(sorted_results) - success_count
total_time = time.time() - start_time
return {
"results": sorted_results,
"total_processing_time": total_time,
"success_count": success_count,
"failure_count": failure_count,
"success": True
}
@with_tool_metrics
@with_error_handling
async def optimize_markdown_formatting(
markdown: str,
normalize_headings: bool = False,
fix_lists: bool = True,
fix_links: bool = True,
add_line_breaks: bool = True,
compact_mode: bool = False,
max_line_length: int = 0
) -> Dict[str, Any]:
"""Optimizes and improves the formatting of existing markdown text.
Takes markdown text and enhances its formatting by fixing common issues
and applying stylistic improvements.
Args:
markdown: The markdown text to optimize.
normalize_headings: If True, ensures heading levels start at h1 and are sequential.
Default is False.
fix_lists: If True, fixes common issues with list formatting.
Default is True.
fix_links: If True, fixes common issues with link formatting.
Default is True.
add_line_breaks: If True, ensures proper paragraph breaks.
Default is True.
compact_mode: If True, reduces whitespace for a more compact presentation.
Default is False.
max_line_length: Maximum line length for wrapping. 0 means no wrapping.
Default is 0.
Returns:
Dictionary containing:
{
"optimized_markdown": "Cleaned and formatted markdown text",
"changes_made": {
"headings_normalized": true,
"lists_fixed": true,
"links_fixed": true,
"line_breaks_added": true
},
"processing_time": 0.15, # Time taken in seconds
"success": true
}
Raises:
ToolInputError: If the input markdown is empty or not a string.
"""
import re
start_time = time.time()
# Input validation
if not markdown:
raise ToolInputError("Input markdown cannot be empty")
if not isinstance(markdown, str):
raise ToolInputError("Input markdown must be a string")
# Track changes made
changes_made = {
"headings_normalized": False,
"lists_fixed": False,
"links_fixed": False,
"line_breaks_added": False,
"whitespace_adjusted": False
}
optimized = markdown
# Fix markdown heading formatting (ensure space after #)
if "#" in optimized:
original = optimized
optimized = re.sub(r'(^|\n)(#{1,6})([^#\s])', r'\1\2 \3', optimized)
changes_made["headings_normalized"] = original != optimized
# Normalize heading levels if requested
if normalize_headings and "#" in optimized:
original = optimized
# Find all headings and their levels
heading_pattern = r'(^|\n)(#{1,6})\s+(.*?)(\n|$)'
headings = [(m.group(2), m.group(3), m.start(), m.end())
for m in re.finditer(heading_pattern, optimized)]
if headings:
# Find the minimum heading level used
min_level = min(len(h[0]) for h in headings)
# Adjust heading levels if the minimum isn't h1
if min_level > 1:
# Process headings in reverse order to avoid messing up positions
for level, text, start, end in reversed(headings):
new_level = '#' * (len(level) - min_level + 1)
replacement = f"{optimized[start:start+1]}{new_level} {text}{optimized[end-1:end]}"
optimized = optimized[:start] + replacement + optimized[end:]
changes_made["headings_normalized"] = True
# Fix list formatting
if fix_lists and any(c in optimized for c in ['-', '*', '+']):
original = optimized
# Ensure consistent list markers
optimized = re.sub(r'^([*+]) ', r'- ', optimized, flags=re.MULTILINE)
# Fix list item spacing
optimized = re.sub(r'(\n- .+)(\n[^-\s])', r'\1\n\2', optimized)
# Fix indentation in nested lists
optimized = re.sub(r'(\n- .+\n)(\s{1,3}- )', r'\1 \2', optimized)
changes_made["lists_fixed"] = original != optimized
# Fix link formatting
if fix_links and "[" in optimized:
original = optimized
# Fix reference-style links (ensure consistent spacing)
optimized = re.sub(r'\]\[', r'] [', optimized)
# Fix malformed links with space between []()
optimized = re.sub(r'\] \(', r'](', optimized)
# Ensure proper spacing around links in sentences
optimized = re.sub(r'([^\s])\[', r'\1 [', optimized)
optimized = re.sub(r'\]([^\(\s])', r'] \1', optimized)
changes_made["links_fixed"] = original != optimized
# Add proper line breaks for readability
if add_line_breaks:
original = optimized
# Ensure headings have a blank line before (except at start of document)
optimized = re.sub(r'(?<!\n\n)(^|\n)#', r'\1\n#', optimized)
# Ensure paragraphs have blank lines between them
optimized = re.sub(r'(\n[^\s#>*-][^\n]+)(\n[^\s#>*-])', r'\1\n\2', optimized)
# Clean up any excessive blank lines created
optimized = re.sub(r'\n{3,}', r'\n\n', optimized)
changes_made["line_breaks_added"] = original != optimized
# Adjust whitespace based on compact_mode
original = optimized
if compact_mode:
# Reduce blank lines to single blank lines
optimized = re.sub(r'\n\s*\n', r'\n\n', optimized)
# Remove trailing whitespace
optimized = re.sub(r' +$', '', optimized, flags=re.MULTILINE)
else:
# Ensure consistent double line breaks for section transitions
optimized = re.sub(r'(\n#{1,6}[^\n]+\n)(?!\n)', r'\1\n', optimized)
changes_made["whitespace_adjusted"] = original != optimized
# Apply line wrapping if specified
if max_line_length > 0:
import textwrap
# Split into paragraphs, wrap each, then rejoin
paragraphs = re.split(r'\n\s*\n', optimized)
wrapped_paragraphs = []
for p in paragraphs:
# Skip wrapping for code blocks, lists, and headings
if (p.strip().startswith("```") or
re.match(r'^\s*[*\-+]', p, re.MULTILINE) or
re.match(r'^#{1,6}\s', p.strip())):
wrapped_paragraphs.append(p)
else:
# Wrap regular paragraphs
lines = p.split('\n')
wrapped_lines = []
for line in lines:
if not line.strip().startswith(('>', '#', '-', '*', '+')):
wrapped = textwrap.fill(line, width=max_line_length)
wrapped_lines.append(wrapped)
else:
wrapped_lines.append(line)
wrapped_paragraphs.append('\n'.join(wrapped_lines))
optimized = '\n\n'.join(wrapped_paragraphs)
processing_time = time.time() - start_time
return {
"optimized_markdown": optimized,
"changes_made": changes_made,
"processing_time": processing_time,
"success": True
}
```
--------------------------------------------------------------------------------
/examples/advanced_extraction_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Demo of advanced extraction capabilities using Ultimate MCP Server."""
import asyncio
import json
import os
import re
import sys
import time
from pathlib import Path
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
from rich.traceback import Traceback
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker, parse_and_display_result
from ultimate_mcp_server.utils.logging.console import console
from ultimate_mcp_server.utils.parsing import extract_json_from_markdown
# --- Debug Flag ---
USE_DEBUG_LOGS = True # Set to True to enable detailed logging
# ------------------
# Initialize logger
logger = get_logger("example.advanced_extraction")
logger.set_level("debug")
# Configure the OpenAI client for direct extraction demos
async def setup_openai_provider():
"""Set up an OpenAI provider for demonstration."""
try:
logger.info("Initializing OpenAI for demonstration", emoji_key="start")
# Get OpenAI provider - get_provider will return None if key missing/invalid in config
provider = await get_provider(Provider.OPENAI.value)
if not provider:
logger.error("Failed to get OpenAI provider. Is the OPENAI_API_KEY configured correctly in your environment/config?")
return None
logger.success("OpenAI provider initialized successfully.")
return provider
except Exception as e:
logger.error(f"Failed to initialize OpenAI provider: {e}", emoji_key="error")
return None
async def run_json_extraction_example(provider, tracker: CostTracker):
"""Demonstrate JSON extraction."""
if USE_DEBUG_LOGS:
logger.debug("Entering run_json_extraction_example.")
if not provider:
console.print("[yellow]Skipping JSON extraction demo - no provider available.[/yellow]")
if USE_DEBUG_LOGS:
logger.debug("Exiting run_json_extraction_example (no provider).")
return
console.print(Rule("[bold blue]1. JSON Extraction Example[/bold blue]"))
# Load sample text
sample_path = Path(__file__).parent / "data" / "sample_event.txt"
if not sample_path.exists():
# Create a sample text for demonstration
sample_text = """
Tech Conference 2024
Location: San Francisco Convention Center, 123 Tech Blvd, San Francisco, CA 94103
Date: June 15-17, 2024
Time: 9:00 AM - 6:00 PM daily
Registration Fee: $599 (Early Bird: $499 until March 31)
Keynote Speakers:
- Dr. Sarah Johnson, AI Research Director at TechCorp
- Mark Williams, CTO of FutureTech Industries
- Prof. Emily Chen, MIT Computer Science Department
Special Events:
- Networking Reception: June 15, 7:00 PM - 10:00 PM
- Hackathon: June 16, 9:00 PM - 9:00 AM (overnight)
- Career Fair: June 17, 1:00 PM - 5:00 PM
For more information, contact [email protected] or call (555) 123-4567.
"""
# Ensure the data directory exists
os.makedirs(os.path.dirname(sample_path), exist_ok=True)
# Write sample text to file
with open(sample_path, "w") as f:
f.write(sample_text)
else:
# Read existing sample text
with open(sample_path, "r") as f:
sample_text = f.read()
# Display sample text
console.print(Panel(sample_text, title="Sample Event Text", border_style="blue"))
# Define JSON schema for event
event_schema = {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Event name"},
"location": {
"type": "object",
"properties": {
"venue": {"type": "string"},
"address": {"type": "string"},
"city": {"type": "string"},
"state": {"type": "string"},
"zip": {"type": "string"}
}
},
"dates": {
"type": "object",
"properties": {
"start": {"type": "string", "format": "date"},
"end": {"type": "string", "format": "date"}
}
},
"time": {"type": "string"},
"registration": {
"type": "object",
"properties": {
"regular_fee": {"type": "number"},
"early_bird_fee": {"type": "number"},
"early_bird_deadline": {"type": "string", "format": "date"}
}
},
"speakers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"title": {"type": "string"},
"organization": {"type": "string"}
}
}
},
"special_events": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"date": {"type": "string", "format": "date"},
"time": {"type": "string"}
}
}
},
"contact": {
"type": "object",
"properties": {
"email": {"type": "string", "format": "email"},
"phone": {"type": "string"}
}
}
}
}
# Display JSON schema
schema_json = json.dumps(event_schema, indent=2)
console.print(Panel(
Syntax(schema_json, "json", theme="monokai", line_numbers=True),
title="Event JSON Schema",
border_style="green"
))
# Extract JSON using direct provider call
logger.info("Extracting structured JSON data from text...", emoji_key="processing")
try:
start_time = time.time()
# Instead of using the tool, use direct completion for demo purposes
prompt = f"""
Extract structured information from the following text into a JSON object.
Follow the provided JSON schema exactly.
TEXT:
{sample_text}
JSON SCHEMA:
{json.dumps(event_schema, indent=2)}
Provide only the valid JSON object as output, with no additional commentary.
"""
if USE_DEBUG_LOGS:
logger.debug(f"JSON Extraction Prompt:\n{prompt}")
# Call the provider directly
result = await provider.generate_completion(
prompt=prompt,
model="gpt-4.1-mini", # Use an available OpenAI model
temperature=0.2, # Lower temperature for more deterministic output
max_tokens=1500 # Enough tokens for a full response
)
# Track cost
tracker.add_call(result)
if USE_DEBUG_LOGS:
logger.debug(f"Raw JSON Extraction Result Text:\n{result.text}")
# Process the result to extract just the JSON
try:
# Try to parse the response as JSON
raw_text = result.text.strip()
text_to_parse = extract_json_from_markdown(raw_text)
if USE_DEBUG_LOGS:
logger.debug(f"Raw text received: {raw_text[:500]}...")
logger.debug(f"Attempting to parse JSON after cleaning: {text_to_parse[:500]}...")
json_result = json.loads(text_to_parse)
if USE_DEBUG_LOGS:
logger.debug(f"Successfully parsed JSON: {json.dumps(json_result, indent=2)}")
# Create a dictionary with structured data and metadata for display
structured_result_data = {
"json": json_result, # The actual parsed JSON
"validated": True, # Assuming validation happens elsewhere or is implied
"model": result.model,
"processing_time": time.time() - start_time,
"tokens": {
"input": result.input_tokens,
"output": result.output_tokens,
"total": result.input_tokens + result.output_tokens
},
"cost": result.cost
}
# Display the results using the utility function
parse_and_display_result(
title="JSON Extraction Results",
input_data={"text": sample_text, "schema": event_schema},
result=structured_result_data, # Pass the structured data
console=console
)
except json.JSONDecodeError as e:
# Log the error regardless of debug flag
logger.error(f"JSONDecodeError occurred: {e}", exc_info=False)
if USE_DEBUG_LOGS:
# Log the string that caused the error (before cleaning)
logger.debug(f"Raw string causing JSONDecodeError:\n{raw_text}")
# Log the string that failed parsing (after cleaning)
logger.debug(f"Cleaned string that failed JSON parsing:\n{text_to_parse}")
# Print a rich traceback to the console
console.print("[bold red]-- Traceback for JSONDecodeError --[/bold red]")
console.print(Traceback())
console.print("[bold red]-- End Traceback --[/bold red]")
# If JSON parsing fails, show the raw response
console.print(Panel(
raw_text, # Show the original raw text from the model
title="[yellow]Raw Model Output (JSON parsing failed)[/yellow]",
border_style="red"
))
except Exception as e:
logger.error(f"Error extracting JSON: {str(e)}", emoji_key="error", exc_info=True)
console.print()
if USE_DEBUG_LOGS:
logger.debug("Exiting run_json_extraction_example.")
async def table_extraction_demo(provider, tracker: CostTracker):
"""Demonstrate table extraction capabilities."""
if USE_DEBUG_LOGS:
logger.debug("Entering table_extraction_demo.")
if not provider:
console.print("[yellow]Skipping table extraction demo - no provider available.[/yellow]")
if USE_DEBUG_LOGS:
logger.debug("Exiting table_extraction_demo (no provider).")
return
logger.info("Starting table extraction demo", emoji_key="start")
# Sample text with embedded table
text = """
Financial Performance by Quarter (2023-2024)
| Quarter | Revenue ($M) | Expenses ($M) | Profit ($M) | Growth (%) |
|---------|-------------|---------------|-------------|------------|
| Q1 2023 | 42.5 | 32.1 | 10.4 | 3.2 |
| Q2 2023 | 45.7 | 33.8 | 11.9 | 6.5 |
| Q3 2023 | 50.2 | 35.6 | 14.6 | 9.8 |
| Q4 2023 | 58.3 | 38.2 | 20.1 | 15.2 |
| Q1 2024 | 60.1 | 39.5 | 20.6 | 3.1 |
| Q2 2024 | 65.4 | 41.2 | 24.2 | 8.8 |
Note: All figures are in millions of dollars and are unaudited.
Growth percentages are relative to the previous quarter.
"""
# Log extraction attempt
logger.info("Performing table extraction", emoji_key="processing")
try:
start_time = time.time()
# Prompt for table extraction
prompt = f"""
Extract the table from the following text and format it as both JSON and Markdown.
TEXT:
{text}
For the JSON format, use this structure:
{{
"headers": ["Header1", "Header2", ...],
"rows": [
{{"Header1": "value", "Header2": "value", ...}},
...
]
}}
For the Markdown format, output a well-formatted Markdown table.
Also extract any metadata about the table (title, notes, etc.).
Format your response as JSON with the following structure:
{{
"json_table": {{...}},
"markdown_table": "...",
"metadata": {{
"title": "...",
"notes": [
"..."
]
}}
}}
"""
if USE_DEBUG_LOGS:
logger.debug(f"Table Extraction Prompt:\n{prompt}")
# Call the provider directly
result = await provider.generate_completion(
prompt=prompt,
model="gpt-4.1-mini",
temperature=0.2,
max_tokens=1500
)
# Track cost
tracker.add_call(result)
if USE_DEBUG_LOGS:
logger.debug(f"Raw Table Extraction Result Text:\n{result.text}")
try:
# Try to parse the response as JSON
raw_text = result.text.strip() # Keep raw text separate
text_to_parse = extract_json_from_markdown(raw_text) # Clean it
if USE_DEBUG_LOGS:
# Log both raw and cleaned versions
logger.debug(f"Raw text received (Table): {raw_text[:500]}...")
logger.debug(f"Attempting to parse Table Extraction JSON after cleaning: {text_to_parse[:500]}...")
json_result = json.loads(text_to_parse) # Parse the cleaned version
if USE_DEBUG_LOGS:
logger.debug(f"Successfully parsed Table Extraction JSON: {json.dumps(json_result, indent=2)}")
# Create structured data dictionary for display
structured_result_data = {
"formats": {
"json": json_result.get("json_table", {}),
"markdown": json_result.get("markdown_table", "")
},
"metadata": json_result.get("metadata", {}),
"model": result.model,
"processing_time": time.time() - start_time,
"tokens": {
"input": result.input_tokens,
"output": result.output_tokens,
"total": result.input_tokens + result.output_tokens
},
"cost": result.cost
}
# Parse the result using the shared utility
parse_and_display_result(
"Table Extraction Demo",
{"text": text},
structured_result_data # Pass the structured data
)
except json.JSONDecodeError as e:
# Log the error regardless of debug flag
logger.error(f"JSONDecodeError in Table Extraction occurred: {e}", exc_info=False)
if USE_DEBUG_LOGS:
# Log both raw and cleaned versions for debugging the failure
logger.debug(f"Raw string causing JSONDecodeError in Table Extraction:\n{raw_text}")
logger.debug(f"Cleaned string that failed JSON parsing in Table Extraction:\n{text_to_parse}")
# Print a rich traceback to the console
console.print("[bold red]-- Traceback for JSONDecodeError (Table Extraction) --[/bold red]")
console.print(Traceback())
console.print("[bold red]-- End Traceback --[/bold red]")
# If JSON parsing fails, show the raw response using the original raw_text
console.print(Panel(
raw_text,
title="[yellow]Raw Model Output (JSON parsing failed)[/yellow]",
border_style="red"
))
except Exception as e:
logger.error(f"Error in table extraction: {str(e)}", emoji_key="error")
# Add exit log
if USE_DEBUG_LOGS:
logger.debug("Exiting table_extraction_demo.")
async def semantic_schema_inference_demo(provider, tracker: CostTracker):
"""Demonstrate semantic schema inference."""
if USE_DEBUG_LOGS:
logger.debug("Entering semantic_schema_inference_demo.")
if not provider:
console.print("[yellow]Skipping semantic schema inference demo - no provider available.[/yellow]")
if USE_DEBUG_LOGS:
logger.debug("Exiting semantic_schema_inference_demo (no provider).")
return
logger.info("Starting semantic schema inference demo", emoji_key="start")
# Sample text for schema inference
text = """
Patient Record: John Smith
Date of Birth: 05/12/1978
Patient ID: P-98765
Blood Type: O+
Height: 182 cm
Weight: 76 kg
Medications:
- Lisinopril 10mg, once daily
- Metformin 500mg, twice daily
- Atorvastatin 20mg, once daily at bedtime
Allergies:
- Penicillin (severe)
- Shellfish (mild)
Recent Vital Signs:
Date: 03/15/2024
Blood Pressure: 128/85 mmHg
Heart Rate: 72 bpm
Temperature: 98.6°F
Oxygen Saturation: 98%
Medical History:
- Type 2 Diabetes (diagnosed 2015)
- Hypertension (diagnosed 2017)
- Hyperlipidemia (diagnosed 2019)
- Appendectomy (2005)
"""
# Define a schema template for the extraction
patient_schema = {
"type": "object",
"properties": {
"patient": {
"type": "object",
"properties": {
"name": {"type": "string"},
"dob": {"type": "string"},
"id": {"type": "string"},
"blood_type": {"type": "string"},
"height": {"type": "string"},
"weight": {"type": "string"}
}
},
"medications": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"dosage": {"type": "string"},
"frequency": {"type": "string"}
}
}
},
"allergies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"allergen": {"type": "string"},
"severity": {"type": "string"}
}
}
},
"vital_signs": {
"type": "object",
"properties": {
"date": {"type": "string"},
"blood_pressure": {"type": "string"},
"heart_rate": {"type": "string"},
"temperature": {"type": "string"},
"oxygen_saturation": {"type": "string"}
}
},
"medical_history": {
"type": "array",
"items": {
"type": "object",
"properties": {
"condition": {"type": "string"},
"diagnosed": {"type": "string"}
}
}
}
}
}
# Log schema inference attempt
logger.info("Performing schema inference", emoji_key="processing")
try:
start_time = time.time()
# Prompt for semantic schema extraction
prompt = f"""
Extract structured information from the text according to the provided semantic schema.
TEXT:
{text}
SEMANTIC SCHEMA:
{json.dumps(patient_schema, indent=2)}
Analyze the text and extract information following the schema structure. Return a valid JSON object.
"""
if USE_DEBUG_LOGS:
logger.debug(f"Schema Inference Prompt:\n{prompt}")
# Call the provider directly
result = await provider.generate_completion(
prompt=prompt,
model="gpt-4.1-mini",
temperature=0.2,
max_tokens=1000
)
# Track cost
tracker.add_call(result)
if USE_DEBUG_LOGS:
logger.debug(f"Raw Schema Inference Result Text:\n{result.text}")
try:
# Try to parse the response as JSON
raw_text = result.text.strip()
text_to_parse = extract_json_from_markdown(raw_text)
if USE_DEBUG_LOGS:
logger.debug(f"Raw text received (Schema): {raw_text[:500]}...")
logger.debug(f"Attempting to parse Schema Inference JSON after cleaning: {text_to_parse[:500]}...")
json_result = json.loads(text_to_parse)
if USE_DEBUG_LOGS:
logger.debug(f"Successfully parsed Schema Inference JSON: {json.dumps(json_result, indent=2)}")
# Create structured data dictionary for display
structured_result_data = {
"extracted_data": json_result,
"model": result.model,
"processing_time": time.time() - start_time,
"tokens": {
"input": result.input_tokens,
"output": result.output_tokens,
"total": result.input_tokens + result.output_tokens
},
"cost": result.cost
}
# Parse the result using the shared utility
parse_and_display_result(
"Semantic Schema Inference Demo",
{"text": text},
structured_result_data # Pass the structured data
)
except json.JSONDecodeError as e:
# Log the error regardless of debug flag
logger.error(f"JSONDecodeError in Schema Inference occurred: {e}", exc_info=False)
if USE_DEBUG_LOGS:
# Log both raw and cleaned versions
logger.debug(f"Raw string causing JSONDecodeError in Schema Inference:\n{raw_text}")
logger.debug(f"Cleaned string that failed JSON parsing in Schema Inference:\n{text_to_parse}")
# Print a rich traceback to the console
console.print("[bold red]-- Traceback for JSONDecodeError (Schema Inference) --[/bold red]")
console.print(Traceback())
console.print("[bold red]-- End Traceback --[/bold red]")
# If JSON parsing fails, show the raw response
console.print(Panel(
raw_text,
title="[yellow]Raw Model Output (JSON parsing failed)[/yellow]",
border_style="red"
))
except Exception as e:
logger.error(f"Error in schema inference: {str(e)}", emoji_key="error")
# Add exit log
if USE_DEBUG_LOGS:
logger.debug("Exiting semantic_schema_inference_demo.")
async def entity_extraction_demo(provider, tracker: CostTracker):
"""Demonstrate entity extraction capabilities."""
if USE_DEBUG_LOGS:
logger.debug("Entering entity_extraction_demo.")
if not provider:
console.print("[yellow]Skipping entity extraction demo - no provider available.[/yellow]")
if USE_DEBUG_LOGS:
logger.debug("Exiting entity_extraction_demo (no provider).")
return
logger.info("Starting entity extraction demo", emoji_key="start")
# Sample text for entity extraction
text = """
In a groundbreaking announcement on March 15, 2024, Tesla unveiled its latest solar energy
technology in partnership with SolarCity. CEO Elon Musk presented the new PowerWall 4.0
battery system at their headquarters in Austin, Texas. The system can store up to 20kWh of
energy and costs approximately $6,500 per unit.
According to Dr. Maria Chen, lead researcher at the National Renewable Energy Laboratory (NREL),
this technology represents a significant advancement in residential energy storage. The new
system integrates with the Tesla mobile app on both iOS and Android platforms, allowing users
to monitor energy usage in real-time.
Tesla stock (TSLA) rose 5.8% following the announcement, reaching $248.32 per share on the NASDAQ.
The company plans to begin production at their Gigafactory Nevada location by June 2024, with
initial deployments in California and Texas markets.
"""
# Log entity extraction attempt
logger.info("Performing entity extraction", emoji_key="processing")
try:
start_time = time.time()
# Prompt for entity extraction
prompt = f"""
Extract key-value pairs and entities from the following text, categorized by type.
TEXT:
{text}
Extract the following categories of information:
- Organizations (companies, institutions, etc.)
- People (names and titles)
- Locations (cities, states, facilities, etc.)
- Dates and Times
- Products and Technologies
- Numerical Values (monetary values, percentages, measurements, etc.)
Format the output as a JSON object with these categories as keys, and each containing relevant entities found.
Within each category, provide structured information when possible.
"""
if USE_DEBUG_LOGS:
logger.debug(f"Entity Extraction Prompt:\n{prompt}")
# Call the provider directly
result = await provider.generate_completion(
prompt=prompt,
model="gpt-4.1-mini",
temperature=0.2,
max_tokens=500
)
# Track cost
tracker.add_call(result)
if USE_DEBUG_LOGS:
logger.debug(f"Raw Entity Extraction Result Text:\n{result.text}")
try:
# Try to parse the response as JSON
raw_text = result.text.strip()
text_to_parse = extract_json_from_markdown(raw_text)
if USE_DEBUG_LOGS:
logger.debug(f"Raw text received (Entity): {raw_text[:500]}...")
logger.debug(f"Attempting to parse Entity Extraction JSON after cleaning: {text_to_parse[:500]}...")
if USE_DEBUG_LOGS:
logger.debug(f"EXACT STRING PASSED TO json.loads: >>>{text_to_parse}<<<")
try:
# First try standard parsing
json_result = json.loads(text_to_parse)
except json.JSONDecodeError as e:
logger.warning(f"Standard JSON parsing failed: {e}. Attempting emergency repair.")
# Emergency fallback for malformed JSON due to unterminated strings
# 1. Look for the raw JSON structure with markdown removed
text_no_markdown = text_to_parse
# 2. Manually check for key entity categories, even if JSON is malformed
# Create a structured result with categories we expect to find
json_result = {
"Organizations": [],
"People": [],
"Locations": [],
"Dates and Times": [],
"Products and Technologies": [],
"Numerical Values": []
}
# Look for entity categories using regex
org_matches = re.findall(r'"name"\s*:\s*"([^"]+)".*?"type"\s*:\s*"([^"]+)"', text_no_markdown)
for name, entity_type in org_matches:
# Determine which category this entity belongs to based on type
if any(keyword in entity_type.lower() for keyword in ["company", "corporation", "institution", "exchange"]):
json_result["Organizations"].append({"name": name, "type": entity_type})
elif any(keyword in entity_type.lower() for keyword in ["city", "state", "facility"]):
json_result["Locations"].append({"name": name, "type": entity_type})
elif any(keyword in entity_type.lower() for keyword in ["battery", "app", "system", "technology"]):
json_result["Products and Technologies"].append({"name": name, "type": entity_type})
# Look for people - they usually have titles and organizations
people_matches = re.findall(r'"name"\s*:\s*"([^"]+)".*?"title"\s*:\s*"([^"]+)".*?"organization"\s*:\s*"([^"]*)"', text_no_markdown)
for name, title, org in people_matches:
json_result["People"].append({"name": name, "title": title, "organization": org})
# Dates and numerical values are harder to extract generically
# but we can look for obvious patterns
date_matches = re.findall(r'"date"\s*:\s*"([^"]+)".*?"event"\s*:\s*"([^"]+)"', text_no_markdown)
for date, event in date_matches:
json_result["Dates and Times"].append({"date": date, "event": event})
# For numerical values, look for values with units
value_matches = re.findall(r'"value"\s*:\s*([^,]+).*?"unit"\s*:\s*"([^"]+)"', text_no_markdown)
for value, unit in value_matches:
# Clean up the value
clean_value = value.strip('" ')
item = {"value": clean_value, "unit": unit}
# Look for a description if available
desc_match = re.search(r'"description"\s*:\s*"([^"]+)"', text_no_markdown)
if desc_match:
item["description"] = desc_match.group(1)
json_result["Numerical Values"].append(item)
# Add a note about emergency repair
logger.warning("Used emergency JSON repair - results may be incomplete")
if USE_DEBUG_LOGS:
logger.debug(f"Successfully parsed Entity Extraction JSON: {json.dumps(json_result, indent=2)}")
# Create structured data dictionary for display
structured_result_data = {
"extracted_data": json_result,
"structured": True,
"categorized": True,
"model": result.model,
"processing_time": time.time() - start_time,
"tokens": {
"input": result.input_tokens,
"output": result.output_tokens,
"total": result.input_tokens + result.output_tokens
},
"cost": result.cost
}
# Parse the result using the shared utility
parse_and_display_result(
"Entity Extraction Demo",
{"text": text},
structured_result_data # Pass the structured data
)
except json.JSONDecodeError as e:
# Log the error regardless of debug flag
logger.error(f"JSONDecodeError in Entity Extraction occurred: {e}", exc_info=False)
if USE_DEBUG_LOGS:
# Log both raw and cleaned versions
logger.debug(f"Raw string causing JSONDecodeError in Entity Extraction:\n{raw_text}")
logger.debug(f"Cleaned string that failed JSON parsing in Entity Extraction:\n{text_to_parse}")
# Print a rich traceback to the console
console.print("[bold red]-- Traceback for JSONDecodeError (Entity Extraction) --[/bold red]")
console.print(Traceback())
console.print("[bold red]-- End Traceback --[/bold red]")
# If JSON parsing fails, show the raw response
console.print(Panel(
raw_text,
title="[yellow]Raw Model Output (JSON parsing failed)[/yellow]",
border_style="red"
))
except Exception as e:
logger.error(f"Error in entity extraction: {str(e)}", emoji_key="error")
# Add exit log
if USE_DEBUG_LOGS:
logger.debug("Exiting entity_extraction_demo.")
async def main():
"""Run the advanced extraction demos."""
tracker = CostTracker() # Instantiate tracker
provider = await setup_openai_provider()
if not provider:
logger.warning("OpenAI provider not available. Demo sections requiring it will be skipped.", emoji_key="warning")
console.print(Rule("[bold magenta]Advanced Extraction Demos Starting[/bold magenta]"))
demos_to_run = [
(run_json_extraction_example, "JSON Extraction"),
(table_extraction_demo, "Table Extraction"),
(semantic_schema_inference_demo, "Schema Inference"),
(entity_extraction_demo, "Entity Extraction")
]
# Execute demos sequentially
for demo_func, demo_name in demos_to_run:
try:
await demo_func(provider, tracker) # Pass tracker
except Exception as e:
logger.error(f"Error running {demo_name} demo: {e}", emoji_key="error", exc_info=True)
# Display final cost summary
tracker.display_summary(console)
logger.success("Advanced Extraction Demo finished successfully!", emoji_key="complete")
console.print(Rule("[bold magenta]Advanced Extraction Demos Complete[/bold magenta]"))
if __name__ == "__main__":
# Run the demos
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/anthropic.py:
--------------------------------------------------------------------------------
```python
# ultimate_mcp_server/providers/anthropic.py
"""Anthropic (Claude) provider implementation."""
import json
import re
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from anthropic import AsyncAnthropic
from ultimate_mcp_server.constants import Provider, TaskType # Import TaskType for logging
from ultimate_mcp_server.core.providers.base import (
BaseProvider,
ModelResponse,
)
from ultimate_mcp_server.utils import get_logger
# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.anthropic")
class AnthropicProvider(BaseProvider):
"""Provider implementation for Anthropic (Claude) API."""
provider_name = Provider.ANTHROPIC.value
def __init__(self, api_key: Optional[str] = None, **kwargs):
"""Initialize the Anthropic provider.
Args:
api_key: Anthropic API key
**kwargs: Additional options (e.g., base_url)
"""
super().__init__(api_key=api_key, **kwargs)
self.base_url = kwargs.get("base_url")
self.models_cache = None
self.client: Optional[AsyncAnthropic] = None # Initialize client attribute
async def initialize(self) -> bool:
"""Initialize the Anthropic client.
Returns:
bool: True if initialization was successful
"""
if not self.api_key:
self.logger.error("Anthropic API key is not configured.", emoji_key="error")
return False
try:
self.client = AsyncAnthropic(
api_key=self.api_key,
base_url=self.base_url,
)
# Skip API call if using a mock key (for tests)
if "mock-" in self.api_key:
self.logger.info(
"Using mock Anthropic key - skipping API validation", emoji_key="mock"
)
# Assume mock initialization is always successful for testing purposes
self.is_initialized = True
return True
# Optional: Add a quick check_api_key() call here if desired,
# but initialize might succeed even if key is invalid later.
# is_valid = await self.check_api_key() # This makes initialize slower
# if not is_valid:
# self.logger.error("Anthropic API key appears invalid.", emoji_key="error")
# return False
self.logger.success("Anthropic provider initialized successfully", emoji_key="provider")
self.is_initialized = True # Mark as initialized
return True
except Exception as e:
self.logger.error(
f"Failed to initialize Anthropic provider: {str(e)}",
emoji_key="error",
exc_info=True, # Log traceback for debugging
)
self.is_initialized = False
return False
async def generate_completion(
self,
prompt: Optional[str] = None,
messages: Optional[List[Dict[str, Any]]] = None,
model: Optional[str] = None,
max_tokens: Optional[int] = 1024, # Signature default
temperature: float = 0.7,
json_mode: bool = False,
**kwargs,
) -> ModelResponse:
"""Generate a single non-chat completion using Anthropic Claude.
Args:
prompt: Text prompt to send to the model.
messages: List of message dictionaries, alternative to prompt.
model: Model name to use (e.g., "claude-3-opus-20240229").
max_tokens: Maximum tokens to generate. Defaults to 1024.
temperature: Temperature parameter (0.0-1.0).
json_mode: If True, attempt to guide model towards JSON output (via prompting).
**kwargs: Additional model-specific parameters (e.g., top_p, system).
Returns:
ModelResponse object.
"""
if not self.client:
if not await self.initialize():
raise ConnectionError("Anthropic provider failed to initialize.")
model = model or self.get_default_model()
actual_model_name = self.strip_provider_prefix(model)
# Original logic: Validate that either prompt or messages is provided
if prompt is None and not messages:
raise ValueError("Either 'prompt' or 'messages' must be provided")
# Original logic: If messages are provided, use the chat_completion function
if messages:
# Ensure all necessary parameters are passed to generate_chat_completion
# This includes system_prompt if it's in kwargs
return await self.generate_chat_completion(
messages=messages,
model=model, # Pass original model ID
max_tokens=max_tokens,
temperature=temperature,
json_mode=json_mode, # Pass json_mode
**kwargs # Pass other kwargs like system, top_p etc.
)
# Original logic: Prepare message list for the API from prompt
# This path is taken if only 'prompt' is provided (and not 'messages')
current_api_messages = [{"role": "user", "content": prompt}]
# Original logic: Handle system prompt if passed in kwargs for the simple prompt case
system_prompt = kwargs.pop("system", None)
# Original logic: Handle JSON mode for simple prompt case
if json_mode:
self.logger.debug(
"json_mode=True requested for completion (simple prompt), modifying user message for Anthropic."
)
# Modify the user message content in current_api_messages
user_message_idx = -1
for i, msg in enumerate(current_api_messages):
if msg["role"] == "user":
user_message_idx = i
break
if user_message_idx != -1:
original_content = current_api_messages[user_message_idx]["content"]
if isinstance(original_content, str) and "Please respond with valid JSON" not in original_content:
current_api_messages[user_message_idx]["content"] = (
f"{original_content}\\nPlease respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."
)
else:
# This case should ideally not happen if prompt is always user role.
# If it could, one might append a new user message asking for JSON,
# or include it in system prompt if system_prompt is being constructed here.
self.logger.warning("Could not find user message to append JSON instruction for simple prompt case.")
# Prepare API call parameters using max_tokens directly from signature
api_params = {
"messages": current_api_messages,
"model": actual_model_name,
"max_tokens": max_tokens, # Uses max_tokens from signature (which defaults to 1024 if not passed)
"temperature": temperature,
**kwargs, # Pass remaining kwargs (like top_p, etc.) that were not popped
}
if system_prompt: # Add system prompt if it was extracted
api_params["system"] = system_prompt
# Logging before API call (original style)
self.logger.info(
f"Generating completion with Anthropic model {actual_model_name}",
emoji_key=TaskType.COMPLETION.value,
prompt_length=len(prompt) if prompt else 0, # length of prompt if provided
json_mode_requested=json_mode,
)
try:
response, processing_time = await self.process_with_timer(
self.client.messages.create, **api_params
)
except Exception as e:
error_message = f"Anthropic API error during completion for model {actual_model_name}: {type(e).__name__}: {str(e)}"
self.logger.error(error_message, exc_info=True)
raise ConnectionError(error_message) from e
if (
not response.content
or not isinstance(response.content, list)
or not hasattr(response.content[0], "text")
):
raise ValueError(f"Unexpected response format from Anthropic API: {response}")
completion_text = response.content[0].text
# Post-process if JSON mode was requested (for simple prompt case) - best effort extraction
if json_mode: # This json_mode is the original parameter
original_text_for_json_check = completion_text
completion_text = self._extract_json_from_text(completion_text)
if original_text_for_json_check != completion_text:
self.logger.debug("Extracted JSON content from Anthropic response post-processing (simple prompt case).")
result = ModelResponse(
text=completion_text,
model=f"{self.provider_name}/{actual_model_name}",
provider=self.provider_name,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
processing_time=processing_time,
raw_response=response.model_dump(),
)
result.message = {"role": "assistant", "content": completion_text}
self.logger.success(
"Anthropic completion successful",
emoji_key="success",
model=result.model,
tokens={"input": result.input_tokens, "output": result.output_tokens},
cost=result.cost,
time=result.processing_time,
)
return result
# --- NEW METHOD ---
async def generate_chat_completion(
self,
messages: List[
Dict[str, Any]
], # Use Dict for broader compatibility, or specific MessageParam type
model: Optional[str] = None,
max_tokens: Optional[int] = 1024, # Provide a default
temperature: float = 0.7,
json_mode: bool = False, # Add json_mode parameter
**kwargs,
) -> ModelResponse:
"""Generate a chat completion using Anthropic Claude.
Args:
messages: A list of message dictionaries (e.g., [{"role": "user", "content": "..."}]).
Should conform to Anthropic's expected format.
model: Model name to use (e.g., "claude-3-opus-20240229").
max_tokens: Maximum tokens to generate. Defaults to 1024.
temperature: Temperature parameter (0.0-1.0).
json_mode: If True, guide the model to generate JSON output (via prompt engineering).
**kwargs: Additional model-specific parameters (e.g., top_p, system).
Returns:
ModelResponse object containing the assistant's message.
"""
if not self.client:
if not await self.initialize():
raise ConnectionError("Anthropic provider failed to initialize.")
model = model or self.get_default_model()
actual_model_name = self.strip_provider_prefix(model)
# Handle system prompt extraction
system_prompt = kwargs.pop("system", None)
# Process the messages to extract system message and convert to Anthropic format
processed_messages = []
extracted_system = None
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
# Extract system message if present
if role == "system":
if extracted_system is None: # Take the first system message
extracted_system = content
# Don't add system messages to the processed_messages list
continue
elif role in ("user", "assistant"):
# Keep user and assistant messages
processed_messages.append({"role": role, "content": content})
else:
self.logger.warning(f"Ignoring unsupported message role: {role}")
# If we found a system message, use it (overrides any system in kwargs)
if extracted_system is not None:
system_prompt = extracted_system
# Process json_mode by modifying system prompt or last user message
json_mode_requested = json_mode
if json_mode_requested:
self.logger.debug(
"json_mode=True requested for chat completion, implementing via prompt engineering for Anthropic"
)
# If we have a system prompt, update it to include JSON instructions
if system_prompt:
system_prompt = f"{system_prompt}\n\nIMPORTANT: You must respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."
# Otherwise, if there's at least one user message, modify the last one
elif processed_messages and any(m.get("role") == "user" for m in processed_messages):
# Find last user message
for i in range(len(processed_messages) - 1, -1, -1):
if processed_messages[i].get("role") == "user":
user_content = processed_messages[i].get("content", "")
# Only add JSON instruction if not already present
if "respond with JSON" not in user_content and "respond in JSON" not in user_content:
processed_messages[i]["content"] = f"{user_content}\n\nPlease respond ONLY with valid JSON. Do not include explanations or markdown formatting."
break
# If neither system prompt nor user messages to modify, add a system prompt
else:
system_prompt = "You must respond ONLY with valid JSON. Do not include explanations or markdown formatting."
# Prepare API call parameters
api_params = {
"messages": processed_messages,
"model": actual_model_name,
"max_tokens": max_tokens,
"temperature": temperature,
**kwargs, # Pass remaining kwargs (like top_p, etc.)
}
if system_prompt:
api_params["system"] = system_prompt
self.logger.info(
f"Generating chat completion with Anthropic model {actual_model_name}",
emoji_key=TaskType.CHAT.value, # Use enum value
message_count=len(processed_messages),
json_mode_requested=json_mode_requested, # Log if it was requested
)
try:
response, processing_time = await self.process_with_timer(
self.client.messages.create, **api_params
)
except Exception as e:
error_message = f"Anthropic API error during chat completion for model {actual_model_name}: {type(e).__name__}: {str(e)}"
self.logger.error(error_message, exc_info=True)
raise ConnectionError(error_message) from e
# Extract response content
if (
not response.content
or not isinstance(response.content, list)
or not hasattr(response.content[0], "text")
):
raise ValueError(f"Unexpected response format from Anthropic API: {response}")
assistant_content = response.content[0].text
# Create standardized response including the assistant message
result = ModelResponse(
text=assistant_content, # Keep raw text accessible
model=f"{self.provider_name}/{actual_model_name}", # Return prefixed model ID
provider=self.provider_name,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
processing_time=processing_time,
raw_response=response.model_dump(), # Use model_dump() if Pydantic
)
# Add message to result for chat_completion
result.message = {"role": "assistant", "content": assistant_content}
# Log success
self.logger.success(
"Anthropic chat completion successful",
emoji_key="success",
model=result.model,
tokens={"input": result.input_tokens, "output": result.output_tokens},
cost=result.cost,
time=result.processing_time,
)
return result
# --- END NEW METHOD ---
async def generate_completion_stream(
self,
# Keep existing signature: accepts prompt primarily, but also messages/system in kwargs
prompt: Optional[str] = None, # Make prompt optional if messages are primary input
messages: Optional[List[Dict[str, Any]]] = None, # Allow messages directly
model: Optional[str] = None,
max_tokens: Optional[int] = 1024, # Default max_tokens
temperature: float = 0.7,
json_mode: bool = False, # Accept json_mode flag
**kwargs,
) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
"""Generate a streaming completion using Anthropic Claude. Handles both prompt and message inputs.
Args:
prompt: (Optional) Text prompt (if messages not provided).
messages: (Optional) List of message dictionaries. Takes precedence over prompt.
model: Model name to use.
max_tokens: Maximum tokens to generate. Defaults to 1024.
temperature: Temperature parameter.
json_mode: If True, guides model towards JSON (via prompting if using prompt input).
**kwargs: Additional parameters (system, top_p, etc.).
Yields:
Tuple of (text_chunk, metadata).
Raises:
ConnectionError: If provider initialization fails or API call fails.
ValueError: If neither prompt nor messages are provided.
"""
if not self.client:
if not await self.initialize():
raise ConnectionError("Anthropic provider failed to initialize.")
model = model or self.get_default_model()
actual_model_name = self.strip_provider_prefix(model)
# Prepare system prompt if provided in kwargs
system_prompt = kwargs.pop("system", None)
# Determine input messages: Use 'messages' if provided, otherwise construct from 'prompt'
if messages:
# Process the messages to extract system message and convert to Anthropic format
processed_messages = []
extracted_system = None
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
# Extract system message if present
if role == "system":
if extracted_system is None: # Take the first system message
extracted_system = content
# Don't add system messages to the processed_messages list
continue
elif role in ("user", "assistant"):
# Keep user and assistant messages
processed_messages.append({"role": role, "content": content})
else:
self.logger.warning(f"Ignoring unsupported message role in streaming: {role}")
# If we found a system message, use it (overrides any system in kwargs)
if extracted_system is not None:
system_prompt = extracted_system
input_desc = f"{len(processed_messages)} messages"
elif prompt:
# Construct messages from prompt
processed_messages = [{"role": "user", "content": prompt}]
input_desc = f"prompt ({len(prompt)} chars)"
# Apply JSON mode prompt modification ONLY if using prompt input
if json_mode:
self.logger.debug(
"json_mode=True requested for stream completion, modifying prompt for Anthropic."
)
user_message = processed_messages[-1]
original_content = user_message["content"]
if "Please respond with valid JSON" not in original_content:
user_message["content"] = (
f"{original_content}\nPlease respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."
)
else:
raise ValueError(
"Either 'prompt' or 'messages' must be provided for generate_completion_stream"
)
# Apply JSON mode to system prompt if using messages input and json_mode is True
json_mode_requested = kwargs.pop("json_mode", json_mode) # Keep track if it was requested
if json_mode_requested and messages:
if system_prompt:
system_prompt = f"{system_prompt}\n\nIMPORTANT: You must respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."
else:
system_prompt = "You must respond ONLY with valid JSON matching the expected schema. Do not include explanations or markdown formatting."
# Prepare API call parameters
params = {
"model": actual_model_name,
"messages": processed_messages,
"temperature": temperature,
"max_tokens": max_tokens, # Use the default or provided value
**kwargs, # Pass remaining kwargs
}
if system_prompt:
params["system"] = system_prompt
self.logger.info(
f"Generating streaming completion with Anthropic model {actual_model_name}",
emoji_key=self.provider_name,
input_type=input_desc,
json_mode_requested=json_mode_requested,
)
start_time = time.time()
total_chunks = 0
final_input_tokens = 0
final_output_tokens = 0
finish_reason = None # Track finish reason
try:
async with self.client.messages.stream(**params) as stream:
async for chunk in stream:
# Extract text delta
if chunk.type == "content_block_delta":
content = chunk.delta.text
total_chunks += 1
metadata = {
"model": f"{self.provider_name}/{actual_model_name}",
"provider": self.provider_name,
"chunk_index": total_chunks,
"finish_reason": None, # Not final yet
}
yield content, metadata
# Don't attempt to capture usage from delta chunks - wait for final message
# Important: Get final tokens from the final message state
try:
final_message = await stream.get_final_message()
final_input_tokens = final_message.usage.input_tokens if hasattr(final_message, 'usage') else 0
final_output_tokens = final_message.usage.output_tokens if hasattr(final_message, 'usage') else 0
# Ensure finish_reason is captured from the final message
finish_reason = final_message.stop_reason if hasattr(final_message, 'stop_reason') else "unknown"
except Exception as e:
# If we can't get the final message for any reason, log it but continue
self.logger.warning(f"Couldn't get final message stats: {e}")
# Estimate token counts based on total characters / avg chars per token
char_count = sum(len(m.get("content", "")) for m in processed_messages)
final_input_tokens = char_count // 4 # Rough estimate
final_output_tokens = total_chunks * 5 # Very rough estimate
processing_time = time.time() - start_time
self.logger.success(
"Anthropic streaming completion successful",
emoji_key="success",
model=f"{self.provider_name}/{actual_model_name}",
chunks=total_chunks,
tokens={"input": final_input_tokens, "output": final_output_tokens},
time=processing_time,
finish_reason=finish_reason,
)
# Yield a final empty chunk with aggregated metadata
final_metadata = {
"model": f"{self.provider_name}/{actual_model_name}",
"provider": self.provider_name,
"chunk_index": total_chunks + 1,
"input_tokens": final_input_tokens,
"output_tokens": final_output_tokens,
"total_tokens": final_input_tokens + final_output_tokens,
"processing_time": processing_time,
"finish_reason": finish_reason,
}
yield "", final_metadata
except Exception as e:
processing_time = time.time() - start_time
self.logger.error(
f"Anthropic streaming completion failed after {processing_time:.2f}s: {str(e)}",
emoji_key="error",
model=f"{self.provider_name}/{actual_model_name}",
exc_info=True,
)
# Yield a final error chunk
error_metadata = {
"model": f"{self.provider_name}/{actual_model_name}",
"provider": self.provider_name,
"chunk_index": total_chunks + 1,
"error": f"{type(e).__name__}: {str(e)}",
"finish_reason": "error",
"processing_time": processing_time,
}
yield "", error_metadata
# Don't re-raise here, let the caller handle the error chunk
async def list_models(self) -> List[Dict[str, Any]]:
"""List available Anthropic Claude models.
Returns:
List of model information dictionaries including the provider prefix.
"""
# Anthropic doesn't have a list models endpoint, return static list WITH prefix
# Based on the models defined in constants.py
static_models = [
# Define with the full ID including provider prefix
{
"id": f"{self.provider_name}/claude-3-7-sonnet-20250219",
"name": "Claude 3.7 Sonnet",
"context_window": 200000,
"input_cost_pmt": 3.0,
"output_cost_pmt": 15.0,
"features": ["chat", "completion", "vision", "tool_use"],
},
{
"id": f"{self.provider_name}/claude-3-5-haiku-20241022",
"name": "Claude 3.5 Haiku",
"context_window": 200000,
"input_cost_pmt": 0.80,
"output_cost_pmt": 4.0,
"features": ["chat", "completion", "vision"],
},
{
"id": f"{self.provider_name}/claude-3-opus-20240229",
"name": "Claude 3 Opus",
"context_window": 200000,
"input_cost_pmt": 15.0,
"output_cost_pmt": 75.0,
"features": ["chat", "completion", "vision"],
},
]
# Simple caching (optional, as list is static)
if not self.models_cache:
self.models_cache = static_models
return self.models_cache
def get_default_model(self) -> str:
"""Get the default Anthropic model ID (including provider prefix).
Returns:
Default model ID string (e.g., "anthropic/claude-3-5-haiku-20241022").
"""
# Try getting from config first
from ultimate_mcp_server.config import get_config
default_model_id = f"{self.provider_name}/claude-3-5-haiku-20241022" # Hardcoded default
try:
config = get_config()
# Access nested provider config safely
provider_config = config.providers.get(self.provider_name) if config.providers else None
if provider_config and provider_config.default_model:
# Ensure the configured default includes the prefix
configured_default = provider_config.default_model
if not configured_default.startswith(f"{self.provider_name}/"):
self.logger.warning(
f"Configured default model '{configured_default}' for Anthropic is missing the provider prefix. Using hardcoded default: {default_model_id}"
)
return default_model_id
else:
return configured_default
except (ImportError, AttributeError, TypeError) as e:
self.logger.debug(
f"Could not retrieve default model from config ({e}), using hardcoded default."
)
return default_model_id
async def check_api_key(self) -> bool:
"""Check if the Anthropic API key is valid by making a minimal request.
Returns:
bool: True if API key allows a basic request.
"""
if not self.client:
self.logger.warning("Cannot check API key: Anthropic client not initialized.")
# Attempt initialization first
if not await self.initialize():
return False # Initialization failed, key likely invalid or other issue
# If initialize succeeded but client still None (e.g., mock key path)
if not self.client:
return True # Assume mock key is 'valid' for testing
try:
# Use the *unprefixed* default model name for the check
default_model_unprefixed = self.strip_provider_prefix(self.get_default_model())
await self.client.messages.create(
model=default_model_unprefixed,
messages=[{"role": "user", "content": "Test"}],
max_tokens=1,
)
self.logger.info("Anthropic API key validation successful.")
return True
except Exception as e:
self.logger.warning(f"Anthropic API key validation failed: {type(e).__name__}")
return False
def strip_provider_prefix(self, model_id: str) -> str:
"""Removes the provider prefix (e.g., 'anthropic/') from a model ID."""
prefix = f"{self.provider_name}/"
if model_id.startswith(prefix):
return model_id[len(prefix) :]
# Handle ':' separator as well for backward compatibility if needed
alt_prefix = f"{self.provider_name}:"
if model_id.startswith(alt_prefix):
return model_id[len(alt_prefix) :]
return model_id # Return original if no prefix found
def _extract_json_from_text(self, text: str) -> str:
"""Extract JSON content from text that might include markdown code blocks or explanatory text.
Args:
text: The raw text response that might contain JSON
Returns:
Cleaned JSON content
"""
# First check if the text is already valid JSON
try:
json.loads(text)
return text # Already valid JSON
except json.JSONDecodeError:
pass # Continue with extraction
# Extract JSON from code blocks - most common Anthropic pattern
code_block_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', text)
if code_block_match:
code_content = code_block_match.group(1).strip()
try:
json.loads(code_content)
return code_content
except json.JSONDecodeError:
# Try to fix common JSON syntax issues like trailing commas
fixed_content = re.sub(r',\s*([}\]])', r'\1', code_content)
try:
json.loads(fixed_content)
return fixed_content
except json.JSONDecodeError:
pass # Continue with other extraction methods
# Look for JSON array or object patterns in the content
# Find the first [ or { and the matching closing ] or }
stripped = text.strip()
# Try to extract array
if '[' in stripped and ']' in stripped:
start = stripped.find('[')
# Find the matching closing bracket
end = -1
depth = 0
for i in range(start, len(stripped)):
if stripped[i] == '[':
depth += 1
elif stripped[i] == ']':
depth -= 1
if depth == 0:
end = i + 1
break
if end > start:
array_content = stripped[start:end]
try:
json.loads(array_content)
return array_content
except json.JSONDecodeError:
pass # Try other methods
# Try to extract object
if '{' in stripped and '}' in stripped:
start = stripped.find('{')
# Find the matching closing bracket
end = -1
depth = 0
for i in range(start, len(stripped)):
if stripped[i] == '{':
depth += 1
elif stripped[i] == '}':
depth -= 1
if depth == 0:
end = i + 1
break
if end > start:
object_content = stripped[start:end]
try:
json.loads(object_content)
return object_content
except json.JSONDecodeError:
pass # Try other methods
# If all else fails, return the original text
return text
async def process_with_timer(self, func, *args, **kwargs) -> Tuple[Any, float]:
"""Helper to time an async function call."""
start_time = time.perf_counter()
result = await func(*args, **kwargs)
end_time = time.perf_counter()
return result, end_time - start_time
```