This is page 7 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/gemini.py:
--------------------------------------------------------------------------------
```python
"""Google Gemini provider implementation."""
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from google import genai
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.utils import get_logger
# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.gemini")
class GeminiProvider(BaseProvider):
"""Provider implementation for Google Gemini API."""
provider_name = Provider.GEMINI.value
def __init__(self, api_key: Optional[str] = None, **kwargs):
"""Initialize the Gemini provider.
Args:
api_key: Google API key
**kwargs: Additional options
"""
super().__init__(api_key=api_key, **kwargs)
self.models_cache = None
async def initialize(self) -> bool:
"""Initialize the Gemini client.
Returns:
bool: True if initialization was successful
"""
try:
# Skip real API calls if using mock key for tests
if self.api_key and "mock-" in self.api_key:
self.logger.info(
"Using mock Gemini key - skipping API initialization",
emoji_key="mock"
)
self.client = {"mock_client": True}
return True
# Create a client instance instead of configuring globally
self.client = genai.Client(
api_key=self.api_key,
http_options={"api_version": "v1alpha"}
)
self.logger.success(
"Gemini provider initialized successfully",
emoji_key="provider"
)
return True
except Exception as e:
self.logger.error(
f"Failed to initialize Gemini provider: {str(e)}",
emoji_key="error"
)
return False
async def generate_completion(
self,
prompt: Optional[str] = None,
messages: Optional[List[Dict[str, Any]]] = None,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
**kwargs
) -> ModelResponse:
"""Generate a completion using Google Gemini.
Args:
prompt: Text prompt to send to the model (or None if messages provided)
messages: List of message dictionaries (alternative to prompt)
model: Model name to use (e.g., "gemini-2.0-flash-lite")
max_tokens: Maximum tokens to generate
temperature: Temperature parameter (0.0-1.0)
**kwargs: Additional model-specific parameters
Returns:
ModelResponse: Standardized response
Raises:
Exception: If API call fails
"""
if not self.client:
await self.initialize()
# Use default model if not specified
model = model or self.get_default_model()
# Strip provider prefix if present (e.g., "gemini:gemini-2.0-pro" -> "gemini-2.0-pro")
if ":" in model:
original_model = model
model = model.split(":", 1)[1]
self.logger.debug(f"Stripped provider prefix from model name: {original_model} -> {model}")
# Validate that either prompt or messages is provided
if prompt is None and not messages:
raise ValueError("Either 'prompt' or 'messages' must be provided")
# Prepare generation config and API call kwargs
config = {
"temperature": temperature,
}
if max_tokens is not None:
config["max_output_tokens"] = max_tokens # Gemini uses max_output_tokens
# Pop json_mode flag
json_mode = kwargs.pop("json_mode", False)
# Set up JSON mode in config dict per Gemini API docs
if json_mode:
# For Gemini, JSON mode is set via response_mime_type in the config dict
config["response_mime_type"] = "application/json"
self.logger.debug("Setting response_mime_type to application/json for Gemini in config")
# Add remaining kwargs to config
for key in list(kwargs.keys()):
if key in ["top_p", "top_k", "candidate_count", "stop_sequences"]:
config[key] = kwargs.pop(key)
# Store other kwargs that might need to be passed directly
request_params = {}
for key in list(kwargs.keys()):
if key in ["safety_settings", "tools", "system"]:
request_params[key] = kwargs.pop(key)
# Prepare content based on input type (prompt or messages)
content = None
if prompt:
content = prompt
log_input_size = len(prompt)
elif messages:
# Convert messages to Gemini format
content = []
log_input_size = 0
for msg in messages:
role = msg.get("role", "").lower()
text = msg.get("content", "")
log_input_size += len(text)
# Map roles to Gemini's expectations
if role == "system":
# For system messages, prepend to user input or add as user message
system_text = text
# Find the next user message to prepend to
for _i, future_msg in enumerate(messages[messages.index(msg) + 1:], messages.index(msg) + 1):
if future_msg.get("role", "").lower() == "user":
# Leave this system message to be handled when we reach the user message
# Just track its content for now
break
else:
# No user message found after system, add as separate user message
content.append({"role": "user", "parts": [{"text": system_text}]})
continue
elif role == "user":
# Check if previous message was a system message
prev_system_text = ""
if messages.index(msg) > 0:
prev_msg = messages[messages.index(msg) - 1]
if prev_msg.get("role", "").lower() == "system":
prev_system_text = prev_msg.get("content", "")
# If there was a system message before, prepend it to the user message
if prev_system_text:
gemini_role = "user"
gemini_text = f"{prev_system_text}\n\n{text}"
else:
gemini_role = "user"
gemini_text = text
elif role == "assistant":
gemini_role = "model"
gemini_text = text
else:
self.logger.warning(f"Unsupported message role '{role}', treating as user")
gemini_role = "user"
gemini_text = text
content.append({"role": gemini_role, "parts": [{"text": gemini_text}]})
# Log request
self.logger.info(
f"Generating completion with Gemini model {model}",
emoji_key=self.provider_name,
prompt_length=log_input_size,
json_mode_requested=json_mode
)
start_time = time.time()
try:
# Check if we're using a mock client for testing
if isinstance(self.client, dict) and self.client.get("mock_client"):
# Return mock response for tests
completion_text = "Mock Gemini response for testing"
processing_time = 0.1
response = None
else:
# Pass everything in the correct structure according to the API
if isinstance(content, list): # messages format
response = self.client.models.generate_content(
model=model,
contents=content,
config=config, # Pass config dict containing temperature, max_output_tokens, etc.
**request_params # Pass other params directly if needed
)
else: # prompt format (string)
response = self.client.models.generate_content(
model=model,
contents=content,
config=config, # Pass config dict containing temperature, max_output_tokens, etc.
**request_params # Pass other params directly if needed
)
processing_time = time.time() - start_time
# Extract response text
completion_text = response.text
# Estimate token usage (Gemini doesn't provide token counts)
# Roughly 4 characters per token as a crude approximation
char_to_token_ratio = 4.0
estimated_input_tokens = log_input_size / char_to_token_ratio
estimated_output_tokens = len(completion_text) / char_to_token_ratio
# Create standardized response
result = ModelResponse(
text=completion_text,
model=model,
provider=self.provider_name,
input_tokens=int(estimated_input_tokens),
output_tokens=int(estimated_output_tokens),
processing_time=processing_time,
raw_response=None, # Don't need raw response for tests
metadata={"token_count_estimated": True}
)
# Add message for consistency with other providers
result.message = {"role": "assistant", "content": completion_text}
# Log success
self.logger.success(
"Gemini completion successful",
emoji_key="success",
model=model,
tokens={
"input": result.input_tokens,
"output": result.output_tokens
},
cost=result.cost,
time=result.processing_time
)
return result
except Exception as e:
self.logger.error(
f"Gemini completion failed: {str(e)}",
emoji_key="error",
model=model
)
raise
async def generate_completion_stream(
self,
prompt: Optional[str] = None,
messages: Optional[List[Dict[str, Any]]] = None,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
**kwargs
) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
"""Generate a streaming completion using Google Gemini.
Args:
prompt: Text prompt to send to the model (or None if messages provided)
messages: List of message dictionaries (alternative to prompt)
model: Model name to use (e.g., "gemini-2.0-flash-lite")
max_tokens: Maximum tokens to generate
temperature: Temperature parameter (0.0-1.0)
**kwargs: Additional model-specific parameters
Yields:
Tuple of (text_chunk, metadata)
Raises:
Exception: If API call fails
"""
if not self.client:
await self.initialize()
# Use default model if not specified
model = model or self.get_default_model()
# Strip provider prefix if present (e.g., "gemini:gemini-2.0-pro" -> "gemini-2.0-pro")
if ":" in model:
original_model = model
model = model.split(":", 1)[1]
self.logger.debug(f"Stripped provider prefix from model name (stream): {original_model} -> {model}")
# Validate that either prompt or messages is provided
if prompt is None and not messages:
raise ValueError("Either 'prompt' or 'messages' must be provided")
# Prepare config dict per Gemini API
config = {
"temperature": temperature,
}
if max_tokens is not None:
config["max_output_tokens"] = max_tokens
# Pop json_mode flag
json_mode = kwargs.pop("json_mode", False)
# Set up JSON mode in config dict
if json_mode:
# For Gemini, JSON mode is set via response_mime_type in the config dict
config["response_mime_type"] = "application/json"
self.logger.debug("Setting response_mime_type to application/json for Gemini streaming in config")
# Add remaining kwargs to config
for key in list(kwargs.keys()):
if key in ["top_p", "top_k", "candidate_count", "stop_sequences"]:
config[key] = kwargs.pop(key)
# Store other kwargs that might need to be passed directly
request_params = {}
for key in list(kwargs.keys()):
if key in ["safety_settings", "tools", "system"]:
request_params[key] = kwargs.pop(key)
# Prepare content based on input type (prompt or messages)
content = None
if prompt:
content = prompt
log_input_size = len(prompt)
elif messages:
# Convert messages to Gemini format
content = []
log_input_size = 0
for msg in messages:
role = msg.get("role", "").lower()
text = msg.get("content", "")
log_input_size += len(text)
# Map roles to Gemini's expectations
if role == "system":
# For system messages, prepend to user input or add as user message
system_text = text
# Find the next user message to prepend to
for _i, future_msg in enumerate(messages[messages.index(msg) + 1:], messages.index(msg) + 1):
if future_msg.get("role", "").lower() == "user":
# Leave this system message to be handled when we reach the user message
# Just track its content for now
break
else:
# No user message found after system, add as separate user message
content.append({"role": "user", "parts": [{"text": system_text}]})
continue
elif role == "user":
# Check if previous message was a system message
prev_system_text = ""
if messages.index(msg) > 0:
prev_msg = messages[messages.index(msg) - 1]
if prev_msg.get("role", "").lower() == "system":
prev_system_text = prev_msg.get("content", "")
# If there was a system message before, prepend it to the user message
if prev_system_text:
gemini_role = "user"
gemini_text = f"{prev_system_text}\n\n{text}"
else:
gemini_role = "user"
gemini_text = text
elif role == "assistant":
gemini_role = "model"
gemini_text = text
else:
self.logger.warning(f"Unsupported message role '{role}', treating as user")
gemini_role = "user"
gemini_text = text
content.append({"role": gemini_role, "parts": [{"text": gemini_text}]})
# Log request
self.logger.info(
f"Generating streaming completion with Gemini model {model}",
emoji_key=self.provider_name,
input_type=f"{'prompt' if prompt else 'messages'} ({log_input_size} chars)",
json_mode_requested=json_mode
)
start_time = time.time()
total_chunks = 0
try:
# Use the dedicated streaming method as per Google's documentation
try:
if isinstance(content, list): # messages format
stream_response = self.client.models.generate_content_stream(
model=model,
contents=content,
config=config,
**request_params
)
else: # prompt format (string)
stream_response = self.client.models.generate_content_stream(
model=model,
contents=content,
config=config,
**request_params
)
# Process the stream - iterating over chunks
async def iterate_response():
# Convert sync iterator to async
for chunk in stream_response:
yield chunk
async for chunk in iterate_response():
total_chunks += 1
# Extract text from the chunk
chunk_text = ""
if hasattr(chunk, 'text'):
chunk_text = chunk.text
elif hasattr(chunk, 'candidates') and chunk.candidates:
if hasattr(chunk.candidates[0], 'content') and chunk.candidates[0].content:
if hasattr(chunk.candidates[0].content, 'parts') and chunk.candidates[0].content.parts:
chunk_text = chunk.candidates[0].content.parts[0].text
# Metadata for this chunk
metadata = {
"model": model,
"provider": self.provider_name,
"chunk_index": total_chunks,
}
yield chunk_text, metadata
# Log success
processing_time = time.time() - start_time
self.logger.success(
"Gemini streaming completion successful",
emoji_key="success",
model=model,
chunks=total_chunks,
time=processing_time
)
# Yield final metadata chunk
yield "", {
"model": model,
"provider": self.provider_name,
"chunk_index": total_chunks + 1,
"processing_time": processing_time,
"finish_reason": "stop", # Gemini doesn't provide this directly
}
except (AttributeError, TypeError) as e:
# If streaming isn't supported, fall back to non-streaming
self.logger.warning(f"Streaming not supported for current Gemini API: {e}. Falling back to non-streaming.")
# Call generate_completion and yield the entire result as one chunk
completion = await self.generate_completion(
prompt=prompt,
messages=messages,
model=model,
max_tokens=max_tokens,
temperature=temperature,
json_mode=json_mode,
**kwargs
)
yield completion.text, {
"model": model,
"provider": self.provider_name,
"chunk_index": 1,
"is_fallback": True
}
total_chunks = 1
# Skip the rest of the streaming logic
raise StopAsyncIteration() from e
except Exception as e:
self.logger.error(
f"Gemini streaming completion failed: {str(e)}",
emoji_key="error",
model=model
)
# Yield error info in final chunk
yield "", {
"model": model,
"provider": self.provider_name,
"chunk_index": total_chunks + 1,
"error": f"{type(e).__name__}: {str(e)}",
"processing_time": time.time() - start_time,
"finish_reason": "error"
}
async def list_models(self) -> List[Dict[str, Any]]:
"""List available Gemini models.
Returns:
List of model information dictionaries
"""
# Gemini doesn't have a comprehensive models endpoint, so we return a static list
if self.models_cache:
return self.models_cache
models = [
{
"id": "gemini-2.0-flash-lite",
"provider": self.provider_name,
"description": "Fastest and most efficient Gemini model",
},
{
"id": "gemini-2.0-flash",
"provider": self.provider_name,
"description": "Fast Gemini model with good quality",
},
{
"id": "gemini-2.0-pro",
"provider": self.provider_name,
"description": "More capable Gemini model",
},
{
"id": "gemini-2.5-pro-preview-03-25",
"provider": self.provider_name,
"description": "Most capable Gemini model",
},
]
# Cache results
self.models_cache = models
return models
def get_default_model(self) -> str:
"""Get the default Gemini model.
Returns:
Default model name
"""
from ultimate_mcp_server.config import get_config
# Safely get from config if available
try:
config = get_config()
provider_config = getattr(config, 'providers', {}).get(self.provider_name, None)
if provider_config and provider_config.default_model:
return provider_config.default_model
except (AttributeError, TypeError):
# Handle case when providers attribute doesn't exist or isn't a dict
pass
# Otherwise return hard-coded default
return "gemini-2.0-flash-lite"
async def check_api_key(self) -> bool:
"""Check if the Gemini API key is valid.
Returns:
bool: True if API key is valid
"""
try:
# Try listing models to validate the API key
# Use the client's models API to check if API key is valid
self.client.models.list()
return True
except Exception:
return False
```
--------------------------------------------------------------------------------
/mcp_python_lib_docs.md:
--------------------------------------------------------------------------------
```markdown
# MCP Python SDK
<div align="center">
<strong>Python implementation of the Model Context Protocol (MCP)</strong>
[![PyPI][pypi-badge]][pypi-url]
[![MIT licensed][mit-badge]][mit-url]
[![Python Version][python-badge]][python-url]
[![Documentation][docs-badge]][docs-url]
[![Specification][spec-badge]][spec-url]
[![GitHub Discussions][discussions-badge]][discussions-url]
</div>
<!-- omit in toc -->
## Table of Contents
- [MCP Python SDK](#mcp-python-sdk)
- [Overview](#overview)
- [Installation](#installation)
- [Adding MCP to your python project](#adding-mcp-to-your-python-project)
- [Running the standalone MCP development tools](#running-the-standalone-mcp-development-tools)
- [Quickstart](#quickstart)
- [What is MCP?](#what-is-mcp)
- [Core Concepts](#core-concepts)
- [Server](#server)
- [Resources](#resources)
- [Tools](#tools)
- [Prompts](#prompts)
- [Images](#images)
- [Context](#context)
- [Running Your Server](#running-your-server)
- [Development Mode](#development-mode)
- [Claude Desktop Integration](#claude-desktop-integration)
- [Direct Execution](#direct-execution)
- [Mounting to an Existing ASGI Server](#mounting-to-an-existing-asgi-server)
- [Examples](#examples)
- [Echo Server](#echo-server)
- [SQLite Explorer](#sqlite-explorer)
- [Advanced Usage](#advanced-usage)
- [Low-Level Server](#low-level-server)
- [Writing MCP Clients](#writing-mcp-clients)
- [MCP Primitives](#mcp-primitives)
- [Server Capabilities](#server-capabilities)
- [Documentation](#documentation)
- [Contributing](#contributing)
- [License](#license)
[pypi-badge]: https://img.shields.io/pypi/v/mcp.svg
[pypi-url]: https://pypi.org/project/mcp/
[mit-badge]: https://img.shields.io/pypi/l/mcp.svg
[mit-url]: https://github.com/modelcontextprotocol/python-sdk/blob/main/LICENSE
[python-badge]: https://img.shields.io/pypi/pyversions/mcp.svg
[python-url]: https://www.python.org/downloads/
[docs-badge]: https://img.shields.io/badge/docs-modelcontextprotocol.io-blue.svg
[docs-url]: https://modelcontextprotocol.io
[spec-badge]: https://img.shields.io/badge/spec-spec.modelcontextprotocol.io-blue.svg
[spec-url]: https://spec.modelcontextprotocol.io
[discussions-badge]: https://img.shields.io/github/discussions/modelcontextprotocol/python-sdk
[discussions-url]: https://github.com/modelcontextprotocol/python-sdk/discussions
## Overview
The Model Context Protocol allows applications to provide context for LLMs in a standardized way, separating the concerns of providing context from the actual LLM interaction. This Python SDK implements the full MCP specification, making it easy to:
- Build MCP clients that can connect to any MCP server
- Create MCP servers that expose resources, prompts and tools
- Use standard transports like stdio and SSE
- Handle all MCP protocol messages and lifecycle events
## Installation
### Adding MCP to your python project
We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects. In a uv managed python project, add mcp to dependencies by:
```bash
uv add "mcp[cli]"
```
Alternatively, for projects using pip for dependencies:
```bash
pip install mcp
```
### Running the standalone MCP development tools
To run the mcp command with uv:
```bash
uv run mcp
```
## Quickstart
Let's create a simple MCP server that exposes a calculator tool and some data:
```python
# server.py
from mcp.server.fastmcp import FastMCP
# Create an MCP server
mcp = FastMCP("Demo")
# Add an addition tool
@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b
# Add a dynamic greeting resource
@mcp.resource("greeting://{name}")
def get_greeting(name: str) -> str:
"""Get a personalized greeting"""
return f"Hello, {name}!"
```
You can install this server in [Claude Desktop](https://claude.ai/download) and interact with it right away by running:
```bash
mcp install server.py
```
Alternatively, you can test it with the MCP Inspector:
```bash
mcp dev server.py
```
## What is MCP?
The [Model Context Protocol (MCP)](https://modelcontextprotocol.io) lets you build servers that expose data and functionality to LLM applications in a secure, standardized way. Think of it like a web API, but specifically designed for LLM interactions. MCP servers can:
- Expose data through **Resources** (think of these sort of like GET endpoints; they are used to load information into the LLM's context)
- Provide functionality through **Tools** (sort of like POST endpoints; they are used to execute code or otherwise produce a side effect)
- Define interaction patterns through **Prompts** (reusable templates for LLM interactions)
- And more!
## Core Concepts
### Server
The FastMCP server is your core interface to the MCP protocol. It handles connection management, protocol compliance, and message routing:
```python
# Add lifespan support for startup/shutdown with strong typing
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from dataclasses import dataclass
from fake_database import Database # Replace with your actual DB type
from mcp.server.fastmcp import Context, FastMCP
# Create a named server
mcp = FastMCP("My App")
# Specify dependencies for deployment and development
mcp = FastMCP("My App", dependencies=["pandas", "numpy"])
@dataclass
class AppContext:
db: Database
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""Manage application lifecycle with type-safe context"""
# Initialize on startup
db = await Database.connect()
try:
yield AppContext(db=db)
finally:
# Cleanup on shutdown
await db.disconnect()
# Pass lifespan to server
mcp = FastMCP("My App", lifespan=app_lifespan)
# Access type-safe lifespan context in tools
@mcp.tool()
def query_db(ctx: Context) -> str:
"""Tool that uses initialized resources"""
db = ctx.request_context.lifespan_context["db"]
return db.query()
```
### Resources
Resources are how you expose data to LLMs. They're similar to GET endpoints in a REST API - they provide data but shouldn't perform significant computation or have side effects:
```python
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("My App")
@mcp.resource("config://app")
def get_config() -> str:
"""Static configuration data"""
return "App configuration here"
@mcp.resource("users://{user_id}/profile")
def get_user_profile(user_id: str) -> str:
"""Dynamic user data"""
return f"Profile data for user {user_id}"
```
### Tools
Tools let LLMs take actions through your server. Unlike resources, tools are expected to perform computation and have side effects:
```python
import httpx
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("My App")
@mcp.tool()
def calculate_bmi(weight_kg: float, height_m: float) -> float:
"""Calculate BMI given weight in kg and height in meters"""
return weight_kg / (height_m**2)
@mcp.tool()
async def fetch_weather(city: str) -> str:
"""Fetch current weather for a city"""
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.weather.com/{city}")
return response.text
```
### Prompts
Prompts are reusable templates that help LLMs interact with your server effectively:
```python
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.prompts import base
mcp = FastMCP("My App")
@mcp.prompt()
def review_code(code: str) -> str:
return f"Please review this code:\n\n{code}"
@mcp.prompt()
def debug_error(error: str) -> list[base.Message]:
return [
base.UserMessage("I'm seeing this error:"),
base.UserMessage(error),
base.AssistantMessage("I'll help debug that. What have you tried so far?"),
]
```
### Images
FastMCP provides an `Image` class that automatically handles image data:
```python
from mcp.server.fastmcp import FastMCP, Image
from PIL import Image as PILImage
mcp = FastMCP("My App")
@mcp.tool()
def create_thumbnail(image_path: str) -> Image:
"""Create a thumbnail from an image"""
img = PILImage.open(image_path)
img.thumbnail((100, 100))
return Image(data=img.tobytes(), format="png")
```
### Context
The Context object gives your tools and resources access to MCP capabilities:
```python
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("My App")
@mcp.tool()
async def long_task(files: list[str], ctx: Context) -> str:
"""Process multiple files with progress tracking"""
for i, file in enumerate(files):
ctx.info(f"Processing {file}")
await ctx.report_progress(i, len(files))
data, mime_type = await ctx.read_resource(f"file://{file}")
return "Processing complete"
```
## Running Your Server
### Development Mode
The fastest way to test and debug your server is with the MCP Inspector:
```bash
mcp dev server.py
# Add dependencies
mcp dev server.py --with pandas --with numpy
# Mount local code
mcp dev server.py --with-editable .
```
### Claude Desktop Integration
Once your server is ready, install it in Claude Desktop:
```bash
mcp install server.py
# Custom name
mcp install server.py --name "My Analytics Server"
# Environment variables
mcp install server.py -v API_KEY=abc123 -v DB_URL=postgres://...
mcp install server.py -f .env
```
### Direct Execution
For advanced scenarios like custom deployments:
```python
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("My App")
if __name__ == "__main__":
mcp.run()
```
Run it with:
```bash
python server.py
# or
mcp run server.py
```
### Mounting to an Existing ASGI Server
You can mount the SSE server to an existing ASGI server using the `sse_app` method. This allows you to integrate the SSE server with other ASGI applications.
```python
from starlette.applications import Starlette
from starlette.routes import Mount, Host
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("My App")
# Mount the SSE server to the existing ASGI server
app = Starlette(
routes=[
Mount('/', app=mcp.sse_app()),
]
)
# or dynamically mount as host
app.router.routes.append(Host('mcp.acme.corp', app=mcp.sse_app()))
```
For more information on mounting applications in Starlette, see the [Starlette documentation](https://www.starlette.io/routing/#submounting-routes).
## Examples
### Echo Server
A simple server demonstrating resources, tools, and prompts:
```python
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Echo")
@mcp.resource("echo://{message}")
def echo_resource(message: str) -> str:
"""Echo a message as a resource"""
return f"Resource echo: {message}"
@mcp.tool()
def echo_tool(message: str) -> str:
"""Echo a message as a tool"""
return f"Tool echo: {message}"
@mcp.prompt()
def echo_prompt(message: str) -> str:
"""Create an echo prompt"""
return f"Please process this message: {message}"
```
### SQLite Explorer
A more complex example showing database integration:
```python
import sqlite3
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("SQLite Explorer")
@mcp.resource("schema://main")
def get_schema() -> str:
"""Provide the database schema as a resource"""
conn = sqlite3.connect("database.db")
schema = conn.execute("SELECT sql FROM sqlite_master WHERE type='table'").fetchall()
return "\n".join(sql[0] for sql in schema if sql[0])
@mcp.tool()
def query_data(sql: str) -> str:
"""Execute SQL queries safely"""
conn = sqlite3.connect("database.db")
try:
result = conn.execute(sql).fetchall()
return "\n".join(str(row) for row in result)
except Exception as e:
return f"Error: {str(e)}"
```
## Advanced Usage
### Low-Level Server
For more control, you can use the low-level server implementation directly. This gives you full access to the protocol and allows you to customize every aspect of your server, including lifecycle management through the lifespan API:
```python
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from fake_database import Database # Replace with your actual DB type
from mcp.server import Server
@asynccontextmanager
async def server_lifespan(server: Server) -> AsyncIterator[dict]:
"""Manage server startup and shutdown lifecycle."""
# Initialize resources on startup
db = await Database.connect()
try:
yield {"db": db}
finally:
# Clean up on shutdown
await db.disconnect()
# Pass lifespan to server
server = Server("example-server", lifespan=server_lifespan)
# Access lifespan context in handlers
@server.call_tool()
async def query_db(name: str, arguments: dict) -> list:
ctx = server.request_context
db = ctx.lifespan_context["db"]
return await db.query(arguments["query"])
```
The lifespan API provides:
- A way to initialize resources when the server starts and clean them up when it stops
- Access to initialized resources through the request context in handlers
- Type-safe context passing between lifespan and request handlers
```python
import mcp.server.stdio
import mcp.types as types
from mcp.server.lowlevel import NotificationOptions, Server
from mcp.server.models import InitializationOptions
# Create a server instance
server = Server("example-server")
@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
return [
types.Prompt(
name="example-prompt",
description="An example prompt template",
arguments=[
types.PromptArgument(
name="arg1", description="Example argument", required=True
)
],
)
]
@server.get_prompt()
async def handle_get_prompt(
name: str, arguments: dict[str, str] | None
) -> types.GetPromptResult:
if name != "example-prompt":
raise ValueError(f"Unknown prompt: {name}")
return types.GetPromptResult(
description="Example prompt",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(type="text", text="Example prompt text"),
)
],
)
async def run():
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="example",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
asyncio.run(run())
```
### Writing MCP Clients
The SDK provides a high-level client interface for connecting to MCP servers:
```python
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
# Create server parameters for stdio connection
server_params = StdioServerParameters(
command="python", # Executable
args=["example_server.py"], # Optional command line arguments
env=None, # Optional environment variables
)
# Optional: create a sampling callback
async def handle_sampling_message(
message: types.CreateMessageRequestParams,
) -> types.CreateMessageResult:
return types.CreateMessageResult(
role="assistant",
content=types.TextContent(
type="text",
text="Hello, world! from model",
),
model="gpt-4.1-mini",
stopReason="endTurn",
)
async def run():
async with stdio_client(server_params) as (read, write):
async with ClientSession(
read, write, sampling_callback=handle_sampling_message
) as session:
# Initialize the connection
await session.initialize()
# List available prompts
prompts = await session.list_prompts()
# Get a prompt
prompt = await session.get_prompt(
"example-prompt", arguments={"arg1": "value"}
)
# List available resources
resources = await session.list_resources()
# List available tools
tools = await session.list_tools()
# Read a resource
content, mime_type = await session.read_resource("file://some/path")
# Call a tool
result = await session.call_tool("tool-name", arguments={"arg1": "value"})
if __name__ == "__main__":
import asyncio
asyncio.run(run())
```
### MCP Primitives
The MCP protocol defines three core primitives that servers can implement:
| Primitive | Control | Description | Example Use |
|-----------|-----------------------|-----------------------------------------------------|------------------------------|
| Prompts | User-controlled | Interactive templates invoked by user choice | Slash commands, menu options |
| Resources | Application-controlled| Contextual data managed by the client application | File contents, API responses |
| Tools | Model-controlled | Functions exposed to the LLM to take actions | API calls, data updates |
### Server Capabilities
MCP servers declare capabilities during initialization:
| Capability | Feature Flag | Description |
|-------------|------------------------------|------------------------------------|
| `prompts` | `listChanged` | Prompt template management |
| `resources` | `subscribe`<br/>`listChanged`| Resource exposure and updates |
| `tools` | `listChanged` | Tool discovery and execution |
| `logging` | - | Server logging configuration |
| `completion`| - | Argument completion suggestions |
## Tool Composition Patterns
When building complex workflows with MCP, effectively chaining tools together is crucial for success:
```python
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("Analytics Pipeline")
@mcp.tool()
async def fetch_data(source: str, date_range: str, ctx: Context) -> str:
"""Fetch raw data from a source for analysis"""
# Fetch operation that might be slow
await ctx.report_progress(0.3, 1.0)
return f"Data from {source} for {date_range}"
@mcp.tool()
def transform_data(raw_data: str, format_type: str = "json") -> dict:
"""Transform raw data into structured format"""
# Data transformation logic
return {"processed": raw_data, "format": format_type}
@mcp.tool()
def analyze_data(data: dict, metric: str) -> str:
"""Analyze transformed data with specific metrics"""
# Analysis logic
return f"Analysis of {metric}: Result based on {data['processed']}"
# Usage pattern (for LLMs):
# 1. First fetch the raw data
# 2. Transform the fetched data
# 3. Then analyze the transformed result
```
**Pattern: Sequential Dependency Chain**
```
fetch_data → transform_data → analyze_data
```
**Pattern: Parallel Processing with Aggregation**
```python
@mcp.tool()
async def parallel_process(sources: list[str], ctx: Context) -> dict:
"""Process multiple sources in parallel and aggregate results"""
results = {}
for i, source in enumerate(sources):
# Get data for each source (these could be separate tool calls)
data = await fetch_data(source, "last_week", ctx)
transformed = transform_data(data)
results[source] = transformed
await ctx.report_progress(i / len(sources), 1.0)
return results
```
## Error Recovery Strategies
When tools fail or return unexpected results, LLMs should follow these recovery patterns:
**Strategy: Retry with Backoff**
```python
@mcp.tool()
async def resilient_operation(resource_id: str, ctx: Context) -> str:
"""Example of resilient operation with retry logic"""
MAX_ATTEMPTS = 3
for attempt in range(1, MAX_ATTEMPTS + 1):
try:
# Attempt the operation
return f"Successfully processed {resource_id}"
except Exception as e:
if attempt == MAX_ATTEMPTS:
# If final attempt, report the failure clearly
ctx.warning(f"Operation failed after {MAX_ATTEMPTS} attempts: {str(e)}")
return f"ERROR: Could not process {resource_id} - {str(e)}"
# For earlier attempts, log and retry
ctx.info(f"Attempt {attempt} failed, retrying...")
await asyncio.sleep(2 ** attempt) # Exponential backoff
```
**Strategy: Fallback Chain**
```python
@mcp.tool()
async def get_data_with_fallbacks(primary_source: str, fallback_sources: list[str] = None) -> dict:
"""Try multiple data sources in order until one succeeds"""
sources = [primary_source] + (fallback_sources or [])
errors = []
for source in sources:
try:
# Try to get data from this source
result = {"source": source, "data": f"Data from {source}"}
return result
except Exception as e:
# Record the error and try the next source
errors.append(f"{source}: {str(e)}")
# If all sources failed, return a clear error with history
return {"error": "All sources failed", "attempts": errors}
```
**Error Reporting Best Practices**
- Always return structured error information (not just exception text)
- Include specific error codes when possible
- Provide actionable suggestions for recovery
- Log detailed error context for debugging
## Resource Selection Optimization
Efficiently managing resources within context limits requires strategic selection:
**Progressive Loading Pattern**
```python
@mcp.tool()
async def analyze_document(doc_uri: str, ctx: Context) -> str:
"""Analyze a document with progressively loaded sections"""
# First load metadata for quick access
metadata = await ctx.read_resource(f"{doc_uri}/metadata")
# Based on metadata, selectively load relevant sections
relevant_sections = identify_relevant_sections(metadata)
# Only load sections that are actually needed
section_data = {}
for section in relevant_sections:
section_data[section] = await ctx.read_resource(f"{doc_uri}/sections/{section}")
# Process with only the necessary context
return f"Analysis of {len(section_data)} relevant sections"
```
**Context Budget Management**
```python
@mcp.tool()
async def summarize_large_dataset(dataset_uri: str, ctx: Context) -> str:
"""Summarize a large dataset while respecting context limits"""
# Get total size to plan the approach
metadata = await ctx.read_resource(f"{dataset_uri}/metadata")
total_size = metadata.get("size_kb", 0)
if total_size > 100: # Arbitrary threshold
# For large datasets, use chunking approach
chunks = await ctx.read_resource(f"{dataset_uri}/summary_chunks")
return f"Summary of {len(chunks)} chunks: {', '.join(chunks)}"
else:
# For smaller datasets, process everything at once
full_data = await ctx.read_resource(dataset_uri)
return f"Complete analysis of {dataset_uri}"
```
**Resource Relevance Filtering**
- Focus on the most recent/relevant data first
- Filter resources to match the specific query intent
- Use metadata to decide which resources to load
- Prefer sampling representative data over loading everything
## Documentation
- [Model Context Protocol documentation](https://modelcontextprotocol.io)
- [Model Context Protocol specification](https://spec.modelcontextprotocol.io)
- [Officially supported servers](https://github.com/modelcontextprotocol/servers)
## Contributing
We are passionate about supporting contributors of all levels of experience and would love to see you get involved in the project. See the [contributing guide](CONTRIBUTING.md) to get started.
## License
This project is licensed under the MIT License - see the LICENSE file for details.
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/cache/strategies.py:
--------------------------------------------------------------------------------
```python
"""Cache strategy implementations."""
import hashlib
import json
import re
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
class CacheStrategy(ABC):
"""Abstract base class for cache strategies."""
@abstractmethod
def generate_key(self, request: Dict[str, Any]) -> str:
"""Generate a cache key for the request.
Args:
request: Request parameters
Returns:
Cache key
"""
pass
@abstractmethod
def should_cache(self, request: Dict[str, Any], response: Any) -> bool:
"""Determine if a response should be cached.
Args:
request: Request parameters
response: Response data
Returns:
True if the response should be cached
"""
pass
def get_ttl(self, request: Dict[str, Any], response: Any) -> Optional[int]:
"""Get the TTL (time-to-live) for a cached response.
Args:
request: Request parameters
response: Response data
Returns:
TTL in seconds or None to use default
"""
return None
class ExactMatchStrategy(CacheStrategy):
"""Strategy for exact matching of requests."""
def generate_key(self, request: Dict[str, Any]) -> str:
"""Generate an exact match cache key.
Args:
request: Request parameters
Returns:
Cache key based on normalized parameters
"""
# Remove non-deterministic fields
clean_request = self._clean_request(request)
# Serialize and hash
json_str = json.dumps(clean_request, sort_keys=True)
return f"exact:{hashlib.sha256(json_str.encode('utf-8')).hexdigest()}"
def should_cache(self, request: Dict[str, Any], response: Any) -> bool:
"""Determine if a response should be cached based on request type.
Args:
request: Request parameters
response: Response data
Returns:
True if the response should be cached
"""
# Don't cache if explicitly disabled
if request.get("cache", True) is False:
return False
# Don't cache streaming responses
if request.get("stream", False):
return False
# Don't cache high temperature responses (too random)
if request.get("temperature", 0.7) > 0.9:
return False
return True
def get_ttl(self, request: Dict[str, Any], response: Any) -> Optional[int]:
"""Get TTL based on request types.
Args:
request: Request parameters
response: Response data
Returns:
TTL in seconds or None to use default
"""
# Use custom TTL if specified
if "cache_ttl" in request:
return request["cache_ttl"]
# Base TTL on content length - longer content gets longer TTL
if hasattr(response, "text") and isinstance(response.text, str):
content_length = len(response.text)
# Simplified TTL scaling
if content_length > 10000:
return 7 * 24 * 60 * 60 # 1 week for long responses
elif content_length > 1000:
return 3 * 24 * 60 * 60 # 3 days for medium responses
return None # Use default TTL
def _clean_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Remove non-deterministic fields from request.
Args:
request: Original request
Returns:
Cleaned request for caching
"""
# Make a copy to avoid modifying the original
clean = request.copy()
# Remove non-deterministic fields
for field in [
"request_id", "timestamp", "session_id", "trace_id",
"user_id", "cache", "cache_ttl"
]:
clean.pop(field, None)
return clean
class SemanticMatchStrategy(CacheStrategy):
"""Strategy for semantic matching of requests."""
def __init__(self, similarity_threshold: float = 0.95):
"""Initialize semantic matching strategy.
Args:
similarity_threshold: Threshold for semantic similarity (0.0-1.0)
"""
self.similarity_threshold = similarity_threshold
self.exact_strategy = ExactMatchStrategy()
def generate_key(self, request: Dict[str, Any]) -> str:
"""Generate both exact and semantic keys.
Args:
request: Request parameters
Returns:
Primary cache key (always the exact match key)
"""
# Always use the exact match key as the primary key
return self.exact_strategy.generate_key(request)
def generate_semantic_key(self, request: Dict[str, Any]) -> Optional[str]:
"""Generate a semantic fingerprint for the request.
Args:
request: Request parameters
Returns:
Semantic key or None if request doesn't support semantic matching
"""
# Extract the prompt or relevant text
text = self._extract_text(request)
if not text:
return None
# Normalize text
text = self._normalize_text(text)
# Generate fingerprint based on significant words and structure
significant_words = self._extract_significant_words(text)
# Create a fuzzy key
if significant_words:
words_key = " ".join(sorted(significant_words))
return f"semantic:{hashlib.md5(words_key.encode('utf-8')).hexdigest()}"
return None
def should_cache(self, request: Dict[str, Any], response: Any) -> bool:
"""Determine if a response should be cached.
Args:
request: Request parameters
response: Response data
Returns:
True if the response should be cached
"""
# Use the same logic as exact matching
return self.exact_strategy.should_cache(request, response)
def get_ttl(self, request: Dict[str, Any], response: Any) -> Optional[int]:
"""Get the TTL for semantic matches.
Args:
request: Request parameters
response: Response data
Returns:
TTL in seconds (shorter for semantic matches)
"""
# Get base TTL from exact strategy
base_ttl = self.exact_strategy.get_ttl(request, response)
# For semantic matching, use shorter TTL
if base_ttl is not None:
return int(base_ttl * 0.5) # 50% of exact match TTL
return None
def _extract_text(self, request: Dict[str, Any]) -> Optional[str]:
"""Extract the relevant text for semantic matching.
Args:
request: Request parameters
Returns:
Extracted text or None
"""
# Try to extract prompt text
if "prompt" in request:
return request["prompt"]
# Try to extract from messages
if "messages" in request and isinstance(request["messages"], list):
# Extract text from the last user message
for message in reversed(request["messages"]):
if message.get("role") == "user" and "content" in message:
if isinstance(message["content"], str):
return message["content"]
elif isinstance(message["content"], list):
# Handle content list (multimodal messages)
text_parts = []
for part in message["content"]:
if isinstance(part, dict) and part.get("type") == "text":
text_parts.append(part.get("text", ""))
return " ".join(text_parts)
# No suitable text found
return None
def _normalize_text(self, text: str) -> str:
"""Normalize text for semantic matching.
Args:
text: Original text
Returns:
Normalized text
"""
# Convert to lowercase
text = text.lower()
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Remove punctuation
text = re.sub(r'[^\w\s]', '', text)
return text
def _extract_significant_words(self, text: str, max_words: int = 15) -> List[str]:
"""Extract significant words from text.
Args:
text: Normalized text
max_words: Maximum number of words to include
Returns:
List of significant words
"""
# Split into words
words = text.split()
# Filter out short words and common stop words
stop_words = {
"the", "and", "a", "an", "in", "to", "for", "of", "with", "on",
"is", "are", "am", "was", "were", "be", "been", "being",
"this", "that", "these", "those", "it", "they", "them",
"their", "have", "has", "had", "do", "does", "did", "will",
"would", "could", "should", "may", "might", "must", "can",
"about", "above", "after", "again", "against", "all", "any",
"because", "before", "below", "between", "both", "but", "by",
"down", "during", "each", "few", "from", "further", "here",
"how", "into", "more", "most", "no", "nor", "not", "only",
"or", "other", "out", "over", "own", "same", "so", "than",
"then", "there", "through", "under", "until", "up", "very",
"what", "when", "where", "which", "while", "who", "whom",
"why", "you", "your", "yours", "yourself", "ourselves",
"i", "me", "my", "mine", "myself", "we", "us", "our", "ours"
}
# Initial filtering of short words and stopwords
filtered_words = [w for w in words if len(w) > 3 and w not in stop_words]
# Calculate word frequencies for TF-IDF like weighing
word_freq = {}
for word in filtered_words:
word_freq[word] = word_freq.get(word, 0) + 1
# Score words based on a combination of:
# 1. Length (longer words tend to be more significant)
# 2. Frequency (less common words are often more significant)
# 3. Position (words at the beginning are often more significant)
word_scores = {}
# Normalize position weight based on document length
position_weight_factor = 100 / max(1, len(words))
for i, word in enumerate(filtered_words):
if word in word_scores:
continue
# Length score: favor longer words (0.1 to 1.0)
length_score = min(1.0, 0.1 + (len(word) / 20))
# Rarity score: favor words that appear less frequently (0.2 to 1.0)
freq = word_freq[word]
rarity_score = 1.0 / (0.5 + (freq / 5))
rarity_score = max(0.2, min(1.0, rarity_score))
# Position score: favor words that appear earlier (0.2 to 1.0)
earliest_pos = min([i for i, w in enumerate(filtered_words) if w == word])
position_score = 1.0 - min(0.8, (earliest_pos * position_weight_factor) / 100)
# Calculate final score
final_score = (length_score * 0.3) + (rarity_score * 0.5) + (position_score * 0.2)
word_scores[word] = final_score
# Sort words by score and take top max_words
significant_words = sorted(word_scores.keys(), key=lambda w: word_scores[w], reverse=True)
# Include at least a few of the most frequent words for context
top_by_freq = sorted(word_freq.keys(), key=lambda w: word_freq[w], reverse=True)[:5]
# Ensure these frequent words are included in the result
result = significant_words[:max_words]
for word in top_by_freq:
if word not in result and len(result) < max_words:
result.append(word)
return result
class TaskBasedStrategy(CacheStrategy):
"""Strategy based on task type."""
def __init__(self):
"""Initialize task-based strategy."""
self.exact_strategy = ExactMatchStrategy()
self.semantic_strategy = SemanticMatchStrategy()
def generate_key(self, request: Dict[str, Any]) -> str:
"""Generate key based on task type.
Args:
request: Request parameters
Returns:
Cache key
"""
task_type = self._detect_task_type(request)
# Use exact matching for most tasks
key = self.exact_strategy.generate_key(request)
# Add task type to the key
return f"{task_type}:{key}"
def should_cache(self, request: Dict[str, Any], response: Any) -> bool:
"""Determine if a response should be cached based on task type.
Args:
request: Request parameters
response: Response data
Returns:
True if the response should be cached
"""
task_type = self._detect_task_type(request)
# Always cache these task types
always_cache_tasks = {
"summarization", "information_extraction", "classification",
"translation", "rewriting", "question_answering"
}
if task_type in always_cache_tasks:
return True
# Use base strategy for other tasks
return self.exact_strategy.should_cache(request, response)
def get_ttl(self, request: Dict[str, Any], response: Any) -> Optional[int]:
"""Get TTL based on task type.
Args:
request: Request parameters
response: Response data
Returns:
TTL in seconds
"""
task_type = self._detect_task_type(request)
# Task-specific TTLs
ttl_map = {
"summarization": 30 * 24 * 60 * 60, # 30 days
"information_extraction": 14 * 24 * 60 * 60, # 14 days
"extraction": 14 * 24 * 60 * 60, # 14 days - Add explicit mapping for extraction
"classification": 30 * 24 * 60 * 60, # 30 days
"translation": 60 * 24 * 60 * 60, # 60 days
"creative_writing": 1 * 24 * 60 * 60, # 1 day
"chat": 1 * 24 * 60 * 60, # 1 day
}
if task_type in ttl_map:
return ttl_map[task_type]
# Default to base strategy
return self.exact_strategy.get_ttl(request, response)
def _detect_task_type(self, request: Dict[str, Any]) -> str:
"""Detect the task type from the request using multiple techniques.
This function uses a combination of:
1. Explicit tags in the request
2. Request structure analysis
3. NLP-based content analysis
4. Model and parameter hints
Args:
request: Request parameters
Returns:
Task type identifier
"""
# 1. Check for explicit task type
if "task_type" in request:
return request["task_type"]
# 2. Check for task-specific parameters
if "format" in request and request["format"] in ["json", "structured", "extraction"]:
return "information_extraction"
if "max_tokens" in request and request.get("max_tokens", 0) < 100:
return "classification" # Short responses often indicate classification
# 3. Check system prompt for clues
system_prompt = None
if "system" in request:
system_prompt = request["system"]
elif "messages" in request:
for msg in request.get("messages", []):
if msg.get("role") == "system":
system_prompt = msg.get("content", "")
break
if system_prompt:
system_lower = system_prompt.lower()
# Check system prompt for task indicators
if any(x in system_lower for x in ["summarize", "summary", "summarization", "summarize the following"]):
return "summarization"
if any(x in system_lower for x in ["extract", "extraction", "identify all", "parse"]):
return "information_extraction"
if any(x in system_lower for x in ["classify", "categorize", "determine the type"]):
return "classification"
if any(x in system_lower for x in ["translate", "translation", "convert to"]):
return "translation"
if any(x in system_lower for x in ["creative", "write a story", "compose", "generate a poem"]):
return "creative_writing"
if any(x in system_lower for x in ["reasoning", "solve", "think step by step"]):
return "reasoning"
if any(x in system_lower for x in ["chat", "conversation", "assistant", "helpful"]):
return "chat"
# 4. Extract text for content analysis
text = self.semantic_strategy._extract_text(request)
if not text:
return "unknown"
# 5. Sophisticated content analysis
import re
# Normalize text
text_lower = text.lower()
# Task-specific pattern matching
task_patterns = {
"summarization": [
r"\bsummarize\b", r"\bsummary\b", r"\btldr\b", r"\bcondense\b",
r"(provide|give|create).{1,20}(summary|overview)",
r"(summarize|summarise).{1,30}(text|document|paragraph|content|article)",
r"(key|main|important).{1,20}(points|ideas|concepts)"
],
"information_extraction": [
r"\bextract\b", r"\bidentify\b", r"\bfind all\b", r"\blist the\b",
r"(extract|pull out|identify).{1,30}(information|data|details)",
r"(list|enumerate).{1,20}(all|the)",
r"(find|extract).{1,30}(names|entities|locations|dates)"
],
"classification": [
r"\bclassify\b", r"\bcategorize\b", r"\bgroup\b", r"\blabel\b",
r"what (type|kind|category|class)",
r"(determine|identify).{1,20}(type|class|category)",
r"(which|what).{1,20}(category|group|type|class)"
],
"translation": [
r"\btranslate\b", r"\btranslation\b",
r"(translate|convert).{1,30}(into|to|from).{1,20}(language|english|spanish|french)",
r"(in|into).{1,10}(spanish|french|german|italian|japanese|chinese|korean)"
],
"creative_writing": [
r"\bwrite\b", r"\bcreate\b", r"\bgenerate\b", r"\bcompose\b",
r"(write|create|generate|compose).{1,30}(story|poem|essay|article|blog post)",
r"(creative|fiction|imaginative).{1,20}(writing|text|content)",
r"(story|narrative|tale|fiction)"
],
"question_answering": [
r"(why|how|what|who|where|when).{1,30}\?",
r"(explain|describe|define).{1,40}",
r"(question|answer|respond)",
r"(can you|could you|please).{1,30}(tell me|explain|describe)"
],
"reasoning": [
r"(solve|calculate|compute|reason|deduce)",
r"(step by step|detailed|reasoning|rationale)",
r"(problem|puzzle|challenge|riddle|question)",
r"(math|mathematical|logic|logical)"
],
"coding": [
r"(code|function|program|script|algorithm)",
r"(write|create|generate|implement).{1,30}(code|function|class|method)",
r"(python|javascript|java|c\+\+|ruby|go|rust|typescript)"
],
"chat": [
r"(chat|conversation|discuss|talk)",
r"(assist|help).{1,20}(me|with|in)",
r"(you are|as a|you're).{1,20}(assistant|helper)"
]
}
# Score each task type
task_scores = {}
for task, patterns in task_patterns.items():
score = 0
for pattern in patterns:
matches = re.findall(pattern, text_lower)
score += len(matches) * 2 # Each match adds 2 points
# Award bonus point for match in the first 50 chars (likely the main request)
if re.search(pattern, text_lower[:50]):
score += 3
# Check for indicators in the first 100 characters (usually the intent)
first_100 = text_lower[:100]
if any(re.search(pattern, first_100) for pattern in patterns):
score += 5
task_scores[task] = score
# 6. Check for additional structural clues
# If JSON output requested, likely extraction
if "json" in text_lower or "structured" in text_lower:
task_scores["information_extraction"] += 5
# If it contains code blocks or technical terms, likely coding
if "```" in text or any(lang in text_lower for lang in ["python", "javascript", "java", "html", "css"]):
task_scores["coding"] += 5
# Check for question mark presence and density
question_marks = text.count("?")
if question_marks > 0:
# Multiple questions indicate question answering
task_scores["question_answering"] += min(question_marks * 2, 10)
# 7. Check model hints
model = request.get("model", "")
# Some models are specialized for specific tasks
if "instruct" in model.lower():
task_scores["question_answering"] += 2
if "chat" in model.lower():
task_scores["chat"] += 2
if "code" in model.lower() or "davinci-code" in model.lower():
task_scores["coding"] += 5
# 8. Determine highest scoring task
if not task_scores:
return "general"
# Get task with highest score
best_task = max(task_scores.items(), key=lambda x: x[1])
# If score is too low, default to general
if best_task[1] < 3:
return "general"
return best_task[0]
# Factory function
def get_strategy(strategy_name: str) -> CacheStrategy:
"""Get a cache strategy by name.
Args:
strategy_name: Strategy name
Returns:
CacheStrategy instance
Raises:
ValueError: If strategy name is invalid
"""
strategies = {
"exact": ExactMatchStrategy(),
"semantic": SemanticMatchStrategy(),
"task": TaskBasedStrategy(),
}
if strategy_name not in strategies:
raise ValueError(
f"Invalid cache strategy: {strategy_name}. " +
f"Valid options: {', '.join(strategies.keys())}"
)
return strategies[strategy_name]
```
--------------------------------------------------------------------------------
/example_structured_tool.py:
--------------------------------------------------------------------------------
```python
"""
Example of a well-structured MCP tool with best practices.
This module demonstrates how to create a comprehensive MCP tool
that implements all the best practices for LLM usability:
- Tool annotations for better decision-making
- Standardized error handling
- Input validation
- Detailed documentation with examples
- Structured outputs with consistent formats
"""
import time
import uuid
from typing import Any, Dict, Optional
# ---------------------------------
from error_handling import non_empty_string, validate_inputs, with_error_handling
from tool_annotations import ToolAnnotations
# --- Import RAG tools/services ---
# Assuming direct function import for simplicity in example
# In a real structured app, might use dependency injection or service locators
from ultimate_mcp_server.tools.rag import (
add_documents,
create_knowledge_base,
delete_knowledge_base,
retrieve_context,
)
# --- Define KB Name for Demo ---
DEMO_KB_NAME = f"example_tool_kb_{uuid.uuid4().hex[:8]}"
# ------------------------------
# --- Sample Data (moved to top) ---
# This data will now be *added* to the KB during setup
SAMPLE_DOCUMENTS = [
{
"id": "kb-001",
"title": "Introduction to Climate Change",
"text": "An overview of climate change causes and effects.",
"type": "article",
"level": "beginner",
"date": "2023-01-15",
"score_for_ranking": 0.95 # Keep score for potential sorting demonstration?
},
{
"id": "kb-002",
"title": "Machine Learning Fundamentals",
"text": "Learn the basics of machine learning algorithms.",
"type": "tutorial",
"level": "beginner",
"date": "2023-02-20",
"score_for_ranking": 0.92
},
{
"id": "kb-003",
"title": "Advanced Neural Networks",
"text": "Deep dive into neural network architectures.",
"type": "tutorial",
"level": "advanced",
"date": "2023-03-10",
"score_for_ranking": 0.88
},
{
"id": "kb-004",
"title": "Climate Policy FAQ",
"text": "Frequently asked questions about climate policies.",
"type": "faq",
"level": "intermediate",
"date": "2023-04-05",
"score_for_ranking": 0.82
},
{
"id": "kb-005",
"title": "Python Reference for Data Science",
"text": "Reference guide for Python in data science applications.",
"type": "reference",
"level": "intermediate",
"date": "2023-05-12",
"score_for_ranking": 0.78
}
]
# -------------------------------------
class ExampleTool:
"""
Example implementation of a well-structured MCP tool with best practices.
The ExampleTool class serves as a reference implementation that demonstrates how to properly
design and implement tools for the Model Control Protocol (MCP) ecosystem. It showcases
a real-world RAG (Retrieval-Augmented Generation) tool that interacts with a knowledge base.
Key design features:
- Proper tool registration with the MCP server
- Comprehensive schema definitions for inputs and outputs
- Clear tool descriptions with usage guidance for LLMs
- Tool annotations that provide semantic hints about tool behavior
- Consistent error handling and input validation
- Well-structured implementation with clean separation of concerns
The class implements a search_knowledge_base tool that allows querying a vector store
containing sample documents. The implementation demonstrates how to:
- Process input parameters and apply validation
- Interact with external services (the knowledge base)
- Format response data in a consistent structure
- Handle errors gracefully with meaningful error messages
- Add appropriate metadata to help LLMs use the tool effectively
This implementation is intended as an educational reference for developers creating
their own MCP tools, showing patterns and practices that lead to tools that are
easily discoverable, usable, and maintainable.
Usage:
```python
# Initialize the MCP server
server = MCPServer()
# Create an instance (automatically registers all tools)
tool = ExampleTool(server)
# The tool is now available for use through the server
# After ensuring the knowledge base is set up
await setup_demo_kb()
```
"""
def __init__(self, mcp_server):
"""
Initialize an ExampleTool instance and register its tools with the MCP server.
This constructor creates a new instance of the ExampleTool class and automatically
registers all tools implemented by this class with the provided MCP server. It
serves as the entry point for integrating the example tools into an MCP server.
The initialization process:
1. Stores a reference to the provided MCP server instance
2. Calls the _register_tools method to define and register all tools
3. Establishes all necessary connections to the underlying knowledge base
After initialization, the tools become available for use through the MCP server's
tool invocation interface. No further setup is required for the tools themselves,
although the underlying knowledge base (see setup_demo_kb) must be initialized
before the tools can be used effectively.
Args:
mcp_server: An instance of the MCP server to register tools with. This must be
a fully initialized server object with a functional tool registration
system available through its 'tool' decorator.
Returns:
None
Notes:
- Tool registration happens immediately during initialization
- Tool usage requires the demo knowledge base to be set up separately
- The server instance is stored but not modified beyond tool registration
"""
self.mcp = mcp_server
self._register_tools()
def _register_tools(self):
"""
Register all tools provided by this class with the MCP server.
This private method is called during initialization and handles the registration
of all tools implemented by the ExampleTool class. It defines and registers
individual tools with appropriate metadata, schemas, and implementations.
For each tool, the method:
1. Creates tool annotations with appropriate behavioral hints
2. Defines the tool's description, input schema, and output schema
3. Implements the tool function with error handling and input validation
4. Registers the complete tool with the MCP server
The primary tool defined here is:
- search_knowledge_base: Searches the demo knowledge base for relevant documents
based on user queries and optional filters
Each tool is decorated with:
- @self.mcp.tool: Registers the function as an MCP tool
- @with_error_handling: Provides standardized exception handling
- @validate_inputs: Validates required parameters before execution
The detailed tool definitions include human-readable descriptions, parameter
schemas with comprehensive type information, and examples demonstrating proper
tool usage for LLMs.
Returns:
None - Tools are registered as a side effect
"""
# Create tool annotations with appropriate hints
search_annotations = ToolAnnotations(
read_only_hint=True, # This tool doesn't modify anything
destructive_hint=False, # No destructive operations
idempotent_hint=True, # Can be called repeatedly with same results
open_world_hint=True, # Interacts with external data sources
audience=["assistant"], # Intended for the LLM to use
priority=0.8, # High priority tool
title="Search Knowledge Base", # Human-readable title
examples=[
{
"name": "Basic search",
"description": "Search for information about a topic",
"input": {"query": "climate change", "filters": {"type": "article"}},
"output": {
"results": [
{"title": "Climate Change Basics", "score": 0.92},
{"title": "Effects of Global Warming", "score": 0.87}
],
"total_matches": 2,
"search_time_ms": 105
}
},
{
"name": "Advanced search",
"description": "Search with multiple filters and limits",
"input": {
"query": "machine learning",
"filters": {"type": "tutorial", "level": "beginner"},
"limit": 1
},
"output": {
"results": [
{"title": "Introduction to Machine Learning", "score": 0.95}
],
"total_matches": 1,
"search_time_ms": 87
}
}
]
)
@self.mcp.tool(
name="search_knowledge_base",
description=(
"Search for information in the knowledge base using keywords and filters.\n\n"
"This tool is ideal for finding relevant information on specific topics. "
"It supports filtering by content type, date ranges, and other metadata. "
"The tool returns a list of matching results sorted by relevance score.\n\n"
"WHEN TO USE:\n"
"- When you need to find specific information on a topic\n"
"- When you want to discover relevant articles or documentation\n"
"- Before generating content to ensure accuracy\n\n"
"WHEN NOT TO USE:\n"
"- When you need to modify or create content (use content_* tools instead)\n"
"- When you need very recent information that might not be in the knowledge base\n"
"- When you need exact answers to questions (use qa_* tools instead)"
),
annotations=search_annotations.to_dict(),
input_schema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (required)"
},
"filters": {
"type": "object",
"description": "Optional filters to narrow results",
"properties": {
"type": {
"type": "string",
"enum": ["article", "tutorial", "reference", "faq"],
"description": "Content type filter"
},
"level": {
"type": "string",
"enum": ["beginner", "intermediate", "advanced"],
"description": "Difficulty level filter"
},
"date_after": {
"type": "string",
"format": "date",
"description": "Only include content after this date (YYYY-MM-DD)"
}
}
},
"limit": {
"type": "integer",
"minimum": 1,
"maximum": 20,
"default": 5,
"description": "Maximum number of results to return (1-20, default 5)"
}
},
"required": ["query"]
},
output_schema={
"type": "object",
"properties": {
"results": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"title": {"type": "string"},
"summary": {"type": "string"},
"type": {"type": "string"},
"date": {"type": "string", "format": "date"},
"score": {"type": "number"}
}
}
},
"total_matches": {"type": "integer"},
"search_time_ms": {"type": "integer"}
}
}
)
@with_error_handling
@validate_inputs(query=non_empty_string)
async def search_knowledge_base(
query: str,
filters: Optional[Dict[str, Any]] = None,
limit: int = 5,
ctx=None
) -> Dict[str, Any]:
"""
Search for information in the knowledge base using keywords and filters.
This tool is ideal for finding relevant information on specific topics.
It supports filtering by content type, date ranges, and other metadata.
Args:
query: Search query string (required)
filters: Optional filters to narrow results
- type: Content type filter (article, tutorial, reference, faq)
- level: Difficulty level filter (beginner, intermediate, advanced)
- date_after: Only include content after this date (YYYY-MM-DD)
limit: Maximum number of results to return (1-20, default 5)
ctx: Context object passed by the MCP server
Returns:
Dictionary containing:
- results: List of retrieved document chunks with metadata and scores.
- count: Number of results returned (respecting limit).
- retrieval_time: Time taken for retrieval in seconds.
Examples:
Basic search:
search_knowledge_base(query="climate change")
Filtered search:
search_knowledge_base(
query="machine learning",
filters={"type": "tutorial", "level": "beginner"},
limit=3
)
"""
# Start timing
start_time = time.time()
# Convert simple filters to ChromaDB compatible format if needed
# The retrieve_context tool might already handle this, depending on its implementation.
# For simplicity, we pass the filters dict directly.
metadata_filter = filters # Pass filters directly
# Ensure limit is positive
limit = max(1, limit)
try:
# Call the actual retrieve_context tool
# Ensure DEMO_KB_NAME is defined appropriately
retrieval_result = await retrieve_context(
knowledge_base_name=DEMO_KB_NAME,
query=query,
top_k=limit,
metadata_filter=metadata_filter
# Add other relevant params like min_score if needed
)
# Return formatted results
# The retrieve_context tool already returns a dict with 'success', 'results', etc.
# We can return it directly or reformat if needed.
if retrieval_result.get("success"):
return {
"results": retrieval_result.get("results", []),
"count": len(retrieval_result.get("results", [])),
"retrieval_time": retrieval_result.get("retrieval_time", time.time() - start_time)
}
else:
# Propagate the error from retrieve_context
return {
"error": retrieval_result.get("message", "Retrieval failed"),
"results": [],
"count": 0,
"retrieval_time": time.time() - start_time
}
except Exception as e:
# Log the error (in a real implementation)
print(f"Search error: {str(e)}")
# Return error response
return {"error": f"Search failed: {str(e)}"}
# --- Added Setup/Teardown for Demo KB ---
async def setup_demo_kb():
"""
Creates and populates the demo knowledge base with sample documents.
This function handles the initialization of the demo knowledge base used by
the example tools. It performs the following operations in sequence:
1. Creates a new knowledge base with the name defined in DEMO_KB_NAME
2. Extracts documents, metadata, and IDs from the SAMPLE_DOCUMENTS constant
3. Adds the extracted information to the newly created knowledge base
The knowledge base is created with overwrite=True, which means any existing
knowledge base with the same name will be deleted and recreated. This ensures
a clean starting state for the demo.
Each document in the sample data is structured with:
- id: Unique identifier for the document
- title: Document title
- text: The actual document content to be vectorized
- type: Document category (article, tutorial, reference, faq)
- level: Difficulty level (beginner, intermediate, advanced)
- date: Publication date in YYYY-MM-DD format
- score_for_ranking: A number between 0-1 used for demonstration purposes
The function logs its progress to stdout and raises any exceptions it encounters,
allowing the caller to handle failures appropriately.
Returns:
None
Raises:
Exception: If any step in the setup process fails. The original exception is
preserved and propagated with context information.
Usage:
await setup_demo_kb() # Must be called in an async context
"""
print(f"Setting up demo knowledge base: {DEMO_KB_NAME}...")
try:
await create_knowledge_base(name=DEMO_KB_NAME, overwrite=True)
texts_to_add = [doc["text"] for doc in SAMPLE_DOCUMENTS]
metadatas_to_add = [{k:v for k,v in doc.items() if k != 'text'} for doc in SAMPLE_DOCUMENTS]
ids_to_add = [doc["id"] for doc in SAMPLE_DOCUMENTS]
await add_documents(
knowledge_base_name=DEMO_KB_NAME,
documents=texts_to_add,
metadatas=metadatas_to_add,
ids=ids_to_add
)
print("Demo knowledge base setup complete.")
except Exception as e:
print(f"Error setting up demo KB: {e}")
raise
async def teardown_demo_kb():
"""
Deletes the demo knowledge base and cleans up associated resources.
This function is responsible for properly disposing of the demo knowledge base
after the examples have been run. It ensures that temporary resources created
for demonstration purposes don't persist unnecessarily. Specifically, it:
1. Attempts to delete the knowledge base identified by DEMO_KB_NAME
2. Logs the success or failure of the operation to stdout
3. Suppresses any exceptions to prevent cleanup errors from propagating
Unlike setup_demo_kb(), this function does not raise exceptions for failures,
as cleanup errors should not prevent the application from continuing or shutting
down normally. Instead, errors are logged but suppressed.
The function can be safely called multiple times or even if the knowledge base
doesn't exist (the underlying delete_knowledge_base function should handle such cases).
This function should be called during application shutdown or after example
tools are no longer needed, typically in one of these contexts:
- Server shutdown hooks/lifecycle events
- After example demonstration is complete
- During application cleanup phases
Returns:
None
Usage:
await teardown_demo_kb() # Must be called in an async context
Note:
In production systems, more robust cleanup might involve tracking created
resources and ensuring proper disposal even after unexpected termination.
"""
print(f"Cleaning up demo knowledge base: {DEMO_KB_NAME}...")
try:
await delete_knowledge_base(name=DEMO_KB_NAME)
print("Demo knowledge base cleaned up.")
except Exception as e:
print(f"Error cleaning up demo KB: {e}")
# -----------------------------------------
def register_example_tools(mcp_server):
"""
Register all example tools with the MCP server and set up required resources.
This function serves as the main entry point for integrating the example tools
into an MCP server instance. It instantiates the ExampleTool class, which registers
all individual tools with the provided server. Additionally, it handles concerns
related to the setup and teardown of resources required by the example tools.
Key responsibilities:
1. Creates an instance of ExampleTool, which registers all example tools with the server
2. Manages the initialization of required resources (demo knowledge base)
3. Documents integration concerns and known limitations
Integration notes:
- The demo knowledge base (DEMO_KB_NAME) must be set up before tools are used
- In a production environment, the async setup should be handled as part of the
server lifecycle (e.g., using lifespan or startup events) rather than directly here
- Current implementation leaves knowledge base setup as a separate step due to
challenges with mixing sync/async code in the registration process
Args:
mcp_server: An instance of the MCP server to register tools with. This should be
a fully initialized server object with a working tool registration system.
Returns:
None
Usage:
```python
# During server initialization:
server = MCPServer()
register_example_tools(server)
# Remember to set up the knowledge base separately (due to async requirements):
await setup_demo_kb() # Before using the tools
# And clean up when done:
await teardown_demo_kb() # After tools are no longer needed
```
Known limitations:
- Cannot perform async setup directly in this function due to sync/async boundary issues
- Knowledge base setup must be handled separately as an async operation
- Resource cleanup must also be manually triggered as an async operation
"""
# Perform setup when tools are registered
# Note: In a real server, setup/teardown might be handled differently (e.g., lifespan)
# Running async setup directly here might block if called synchronously.
# A better approach might be to trigger setup after server start.
# For this example modification, we assume it can be awaited here or handled externally.
# asyncio.run(setup_demo_kb()) # This would block if register_example_tools is sync
# TODO: Need a way to run async setup/teardown non-blockingly or during server lifespan.
# Skipping async setup call here due to potential blocking issues.
# KB needs to be set up *before* the tool is called in a demo.
ExampleTool(mcp_server)
```
--------------------------------------------------------------------------------
/examples/single_shot_synthesis_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Single-Shot Synthesis Demo - Demonstrates the single_shot_synthesis tool
This script shows how to:
1. Define a prompt and a set of "expert" models.
2. Specify a "synthesizer" model.
3. Call the single_shot_synthesis tool to get a fused response.
4. Display the individual expert responses and the final synthesized output.
Usage:
python examples/single_shot_synthesis_demo.py [--prompt "Your question here"] [--type text|code]
Options:
--prompt TEXT The prompt/question for the models.
--name TEXT A descriptive name for the synthesis task.
--type TYPE Type of synthesis: 'text' or 'code' (default: text).
--expert-models MODEL [MODEL...] List of expert model IDs.
--synthesizer-model MODEL Model ID for the synthesizer.
"""
import argparse
import asyncio
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from rich import box
from rich.console import Group
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.exceptions import ProviderError, ToolError # For error handling
# from ultimate_mcp_server.tools.single_shot_synthesis import single_shot_synthesis # Called via gateway
from ultimate_mcp_server.utils import get_logger, process_mcp_result
from ultimate_mcp_server.utils.display import CostTracker # Reusing CostTracker
from ultimate_mcp_server.utils.logging.console import console
logger = get_logger("example.single_shot_synthesis")
gateway: Optional[Gateway] = None
# --- Configuration ---
DEFAULT_EXPERT_MODEL_CONFIGS_SSS: List[Dict[str, Any]] = [ # SSS suffix for SingleShotSynthesis
{"model_id": "openai/gpt-4o-mini", "temperature": 0.7},
{"model_id": "anthropic/claude-3-5-haiku-20241022", "temperature": 0.65},
# {"model_id": "google/gemini-1.5-flash-latest", "temperature": 0.7},
]
DEFAULT_SYNTHESIZER_MODEL_CONFIG_SSS: Dict[str, Any] = {
"model_id": "anthropic/claude-3-7-sonnet-20250219",
"temperature": 0.5,
"max_tokens": 3000, # Allow more tokens for comprehensive synthesis
}
# Fallback if preferred synthesizer isn't available
FALLBACK_SYNTHESIZER_MODEL_CONFIG_SSS: Dict[str, Any] = {
"model_id": "anthropic/claude-3-7-sonnet-20250219", # Fallback to Sonnet 3.5
"temperature": 0.5,
"max_tokens": 3000,
}
DEFAULT_SSS_PROMPT = "Compare and contrast the query optimization strategies used in PostgreSQL versus MySQL for complex analytical queries involving multiple joins and aggregations. Highlight key differences in their execution planners and indexing techniques."
DEFAULT_SSS_TASK_NAME = "DB Query Optimization Comparison"
DEFAULT_SSS_TYPE = "text"
def parse_arguments_sss():
parser = argparse.ArgumentParser(description="Run a single-shot multi-model synthesis demo")
parser.add_argument("--prompt", type=str, default=DEFAULT_SSS_PROMPT)
parser.add_argument("--name", type=str, default=DEFAULT_SSS_TASK_NAME)
parser.add_argument("--type", type=str, default=DEFAULT_SSS_TYPE, choices=["text", "code"])
parser.add_argument(
"--expert-models",
type=str,
nargs="+",
default=[mc["model_id"] for mc in DEFAULT_EXPERT_MODEL_CONFIGS_SSS],
help="List of expert model IDs.",
)
parser.add_argument(
"--synthesizer-model",
type=str,
default=DEFAULT_SYNTHESIZER_MODEL_CONFIG_SSS["model_id"],
help="Model ID for the synthesizer.",
)
return parser.parse_args()
async def setup_gateway_for_demo_sss():
global gateway
if gateway:
return
logger.info("Initializing gateway for single-shot synthesis demo...", emoji_key="rocket")
try:
gateway = Gateway(name="sss_demo_gateway", register_tools=True, load_all_tools=True)
if not gateway.providers:
await gateway._initialize_providers()
except Exception as e:
logger.critical(f"Failed to initialize Gateway: {e}", exc_info=True)
raise
mcp_tools = await gateway.mcp.list_tools()
registered_tool_names = [t.name for t in mcp_tools]
if "single_shot_synthesis" not in registered_tool_names:
logger.error(
"Gateway initialized, but 'single_shot_synthesis' tool is missing!", emoji_key="error"
)
raise RuntimeError("Required 'single_shot_synthesis' tool not registered.")
logger.success(
"Gateway for demo initialized and synthesis tool verified.", emoji_key="heavy_check_mark"
)
def display_single_shot_synthesis_results(
results_data: Dict[str, Any],
original_prompt: str, # Added to display the prompt
console_instance
):
"""Displays the results from the single_shot_synthesis tool."""
console_instance.print(
Rule(
f"[bold magenta]Single-Shot Synthesis Task: {results_data.get('name', 'N/A')}[/bold magenta]"
)
)
if not results_data or not isinstance(results_data, dict) or not results_data.get("request_id"):
console_instance.print(Panel("[bold red]No valid results data to display or essential fields missing.[/bold red]", border_style="red"))
if isinstance(results_data, dict) and results_data.get("error_message"):
console_instance.print(f"[bold red]Error in results data:[/bold red] {escape(results_data['error_message'])}")
return
console_instance.print(f"Request ID: [cyan]{results_data.get('request_id')}[/cyan]")
status = results_data.get("status", "UNKNOWN")
status_color = (
"green" if status == "SUCCESS" else ("yellow" if status == "PARTIAL_SUCCESS" else "red")
)
console_instance.print(f"Status: [bold {status_color}]{status}[/bold {status_color}]")
if results_data.get("error_message") and status in ["FAILED", "PARTIAL_SUCCESS"]:
console_instance.print(
Panel(f"[red]{escape(results_data.get('error_message'))}[/red]", title="[bold red]Error Message[/bold red]", border_style="red")
)
storage_path = results_data.get("storage_path")
if storage_path:
console_instance.print(
f"Artifacts Storage: [blue underline]{escape(storage_path)}[/blue underline]"
)
console_instance.print(Panel(escape(original_prompt), title="[bold]Original Prompt[/bold]", border_style="blue", expand=False))
console_instance.print(Rule("[bold blue]Expert Model Responses[/bold blue]"))
expert_responses = results_data.get("expert_responses", [])
if expert_responses:
for i, resp_dict in enumerate(expert_responses):
model_id_display = resp_dict.get("model_id", "Unknown Model")
has_error = bool(resp_dict.get("error"))
status_icon = "❌" if has_error else "✔️"
panel_title = f"{status_icon} Expert {i + 1}: {model_id_display}"
current_border_style = "red" if has_error else "dim cyan"
if has_error:
panel_title += " [bold red](Failed)[/bold red]"
content_table = Table(box=None, show_header=False, padding=(0,1))
content_table.add_column(style="dim")
content_table.add_column()
if resp_dict.get("error"):
content_table.add_row("[bold red]Error[/bold red]", escape(resp_dict.get('error', '')))
text_content = resp_dict.get("response_text")
# Assuming 'code' type experts are not used in this specific demo,
# but adding for completeness if structure changes.
# code_content = resp_dict.get("extracted_code")
# if code_content:
# content_table.add_row("Extracted Code", Syntax(code_content, "python", theme="monokai", line_numbers=True, word_wrap=True))
if text_content:
content_table.add_row("Response Text", escape(text_content[:1000] + ('...' if len(text_content) > 1000 else '')))
elif not resp_dict.get("error"):
content_table.add_row("Response Text", "[italic]No content from this expert.[/italic]")
metrics = resp_dict.get("metrics", {})
cost = metrics.get("cost", 0.0)
api_latency = metrics.get("api_latency_ms", "N/A")
total_task_time = metrics.get("total_task_time_ms", "N/A")
input_tokens = metrics.get("input_tokens", "N/A")
output_tokens = metrics.get("output_tokens", "N/A")
metrics_table = Table(box=box.ROUNDED, show_header=False, title="Metrics")
metrics_table.add_column(style="cyan")
metrics_table.add_column(style="white")
metrics_table.add_row("Cost", f"${cost:.6f}")
metrics_table.add_row("Input Tokens", str(input_tokens))
metrics_table.add_row("Output Tokens", str(output_tokens))
metrics_table.add_row("API Latency", f"{api_latency} ms")
metrics_table.add_row("Total Task Time", f"{total_task_time} ms")
if metrics.get("api_model_id_used") and metrics.get("api_model_id_used") != model_id_display:
metrics_table.add_row("API Model Used", str(metrics.get("api_model_id_used")))
main_panel_content = [content_table, metrics_table]
console_instance.print(
Panel(
Group(*main_panel_content),
title=f"[bold cyan]{panel_title}[/bold cyan]",
border_style=current_border_style,
expand=False,
)
)
else:
console_instance.print("[italic]No expert responses available.[/italic]")
console_instance.print(Rule("[bold green]Synthesized Response[/bold green]"))
synthesizer_metrics = results_data.get("synthesizer_metrics", {})
synthesizer_model_id_used_api = synthesizer_metrics.get("api_model_id_used")
# Attempt to get configured synthesizer model from input if API one is not available (should be rare)
# This requires passing synthesizer_config to this function or storing it in results_data
# For now, we rely on api_model_id_used from metrics.
# Example: configured_synth_model = results_data.get("synthesizer_model_config", {}).get("model_id", "N/A")
# synthesizer_model_display = synthesizer_model_id_used_api or configured_synth_model
if synthesizer_model_id_used_api:
console_instance.print(f"Synthesizer Model Used (from API): [magenta]{synthesizer_model_id_used_api}[/magenta]")
else:
# If not in metrics, try to infer from input or display N/A (needs input passed)
console_instance.print("Synthesizer Model: [magenta]N/A (configured model not directly in output, check logs or input config)[/magenta]")
thinking_process = results_data.get("synthesizer_thinking_process")
if thinking_process:
console_instance.print(
Panel(
escape(thinking_process),
title="[bold]Synthesizer Thinking Process[/bold]",
border_style="yellow",
expand=False,
)
)
final_text = results_data.get("synthesized_response_text")
final_code = results_data.get("synthesized_extracted_code")
# Determine if the original task was for code
tournament_type = results_data.get("tournament_type", "text") # Assuming this field might be added to output for context
if tournament_type == "code" and final_code:
console_instance.print(
Panel(
Syntax(final_code, "python", theme="monokai", line_numbers=True, word_wrap=True),
title="[bold]Final Synthesized Code[/bold]",
border_style="green",
)
)
elif final_text: # Also show text if it's a code tournament but no code was extracted, or if it's text type
console_instance.print(
Panel(
escape(final_text),
title="[bold]Final Synthesized Text[/bold]",
border_style="green",
)
)
else:
console_instance.print(
"[italic]No synthesized response generated (or it was empty).[/italic]"
)
if synthesizer_metrics:
console_instance.print(Rule("[bold]Synthesizer Metrics[/bold]"))
synth_metrics_table = Table(box=box.SIMPLE, show_header=False, title_justify="left")
synth_metrics_table.add_column("Metric", style="cyan")
synth_metrics_table.add_column("Value", style="white")
synth_metrics_table.add_row("Cost", f"${synthesizer_metrics.get('cost', 0.0):.6f}")
synth_metrics_table.add_row("Input Tokens", str(synthesizer_metrics.get("input_tokens", "N/A")))
synth_metrics_table.add_row("Output Tokens", str(synthesizer_metrics.get("output_tokens", "N/A")))
synth_metrics_table.add_row(
"API Latency", f"{synthesizer_metrics.get('api_latency_ms', 'N/A')} ms"
)
synth_metrics_table.add_row(
"API Model Used", str(synthesizer_metrics.get("api_model_id_used", "N/A"))
)
console_instance.print(synth_metrics_table)
console_instance.print(Rule("[bold]Overall Metrics for Entire Operation[/bold]"))
total_metrics = results_data.get("total_metrics", {})
# Reuse Rich Table for overall metrics
overall_metrics_table = Table(box=box.SIMPLE, show_header=False, title_justify="left")
overall_metrics_table.add_column("Metric", style="cyan")
overall_metrics_table.add_column("Value", style="white")
overall_metrics_table.add_row("Total Cost", f"${total_metrics.get('total_cost', 0.0):.6f}")
overall_metrics_table.add_row(
"Total Input Tokens (All Calls)", str(total_metrics.get("total_input_tokens", "N/A"))
)
overall_metrics_table.add_row(
"Total Output Tokens (All Calls)", str(total_metrics.get("total_output_tokens", "N/A"))
)
overall_metrics_table.add_row(
"Overall Task Time", f"{total_metrics.get('overall_task_time_ms', 'N/A')} ms"
)
console_instance.print(overall_metrics_table)
console_instance.print()
# Display the full prompt sent to the synthesizer model
if storage_path and results_data.get("status") in ["SUCCESS", "PARTIAL_SUCCESS"]:
synthesis_prompt_file = Path(storage_path) / "synthesis_prompt.md"
if synthesis_prompt_file.exists():
try:
synthesis_prompt_content = synthesis_prompt_file.read_text(encoding='utf-8')
console_instance.print(Rule("[bold yellow]Full Prompt to Synthesizer Model[/bold yellow]"))
console_instance.print(
Panel(
Syntax(synthesis_prompt_content, "markdown", theme="monokai", line_numbers=True, word_wrap=True),
title="[bold]Synthesizer Input Prompt[/bold]",
border_style="yellow",
expand=False # Keep it collapsed by default as it can be long
)
)
except Exception as e:
logger.warning(f"Could not read or display synthesis_prompt.md: {e}", exc_info=True)
else:
logger.info("synthesis_prompt.md not found, skipping display.")
async def run_single_shot_demo(tracker: CostTracker, args: argparse.Namespace):
console.print(Rule(f"[bold blue]Single-Shot Synthesis Demo - Task: {args.name}[/bold blue]"))
console.print(
f"Prompt: [yellow]{escape(args.prompt[:100] + ('...' if len(args.prompt) > 100 else ''))}[/yellow]"
)
console.print(f"Task Type: [magenta]{args.type}[/magenta]")
expert_configs_for_tool: List[Dict[str, Any]] = []
for model_id_str in args.expert_models:
default_mc = next(
(mc for mc in DEFAULT_EXPERT_MODEL_CONFIGS_SSS if mc["model_id"] == model_id_str), None
)
if default_mc:
expert_configs_for_tool.append(default_mc.copy()) # Use copy
else:
expert_configs_for_tool.append({"model_id": model_id_str})
synthesizer_config_for_tool: Dict[str, Any] = {"model_id": args.synthesizer_model}
if args.synthesizer_model == DEFAULT_SYNTHESIZER_MODEL_CONFIG_SSS["model_id"]:
synthesizer_config_for_tool = DEFAULT_SYNTHESIZER_MODEL_CONFIG_SSS.copy()
elif args.synthesizer_model == FALLBACK_SYNTHESIZER_MODEL_CONFIG_SSS["model_id"]:
synthesizer_config_for_tool = FALLBACK_SYNTHESIZER_MODEL_CONFIG_SSS.copy()
console.print(
f"Expert Models: [cyan]{', '.join([mc['model_id'] for mc in expert_configs_for_tool])}[/cyan]"
)
console.print(f"Synthesizer Model: [cyan]{synthesizer_config_for_tool['model_id']}[/cyan]")
# Tool expects "expert_models" and "synthesizer_model" as per Pydantic aliases
synthesis_input_for_tool = {
"name": args.name,
"prompt": args.prompt,
"expert_models": expert_configs_for_tool,
"synthesizer_model": synthesizer_config_for_tool,
"tournament_type": args.type,
# "synthesis_instructions": "Please synthesize these for clarity and impact..."
}
try:
logger.info(f"Calling single_shot_synthesis tool for task: {args.name}", emoji_key="gear")
console.print(
Panel(
f"Initiating Single-Shot Synthesis task: '[bold]{escape(args.name)}[/bold]'.\\n"
f"This involves parallel calls to [cyan]{len(expert_configs_for_tool)}[/cyan] expert model(s) "
f"followed by the synthesizer model ([cyan]{synthesizer_config_for_tool['model_id']}[/cyan]).\\n"
f"Prompt: '{escape(args.prompt[:150] + ('...' if len(args.prompt)>150 else ''))}'\\n"
"[italic]Please wait, this may take a few moments...[/italic]",
title="[bold blue]🚀 Starting Synthesis Process[/bold blue]",
border_style="blue",
expand=False
)
)
synthesis_data_dict: Optional[Dict[str, Any]] = None
with console.status("[bold yellow]Processing synthesis request via single_shot_synthesis tool...", spinner="dots"):
# The tool 'single_shot_synthesis' is already registered with the gateway
synthesis_result_raw = await gateway.mcp.call_tool("single_shot_synthesis", synthesis_input_for_tool)
# Process the result (moved out of the status context)
if isinstance(synthesis_result_raw, dict):
logger.info("Tool call returned a dictionary directly. Using it as result.", emoji_key="package")
synthesis_data_dict = synthesis_result_raw
elif isinstance(synthesis_result_raw, list):
logger.info("Tool call returned a list. Processing its first element.", emoji_key="package")
if synthesis_result_raw:
first_element = synthesis_result_raw[0]
if isinstance(first_element, dict):
synthesis_data_dict = first_element
elif hasattr(first_element, 'text') and isinstance(first_element.text, str):
logger.info("First element has a .text attribute (like TextContent). Attempting to parse its .text attribute as JSON.", emoji_key="memo")
try:
synthesis_data_dict = json.loads(first_element.text)
except json.JSONDecodeError as e:
logger.warning(f"JSON parsing of .text attribute failed: {e}. Falling back to process_mcp_result with the .text content.", emoji_key="warning")
synthesis_data_dict = await process_mcp_result(first_element.text) # Pass the string for LLM repair
else:
logger.warning(f"Tool call returned a list, but its first element is not a dictionary or TextContent-like. Content: {synthesis_result_raw!r:.500}", emoji_key="warning")
synthesis_data_dict = await process_mcp_result(synthesis_result_raw) # Fallback with the whole list
else:
logger.warning("Tool call returned an empty list. Falling back to process_mcp_result.", emoji_key="warning")
synthesis_data_dict = await process_mcp_result(synthesis_result_raw) # Fallback
elif isinstance(synthesis_result_raw, str): # If it's a string, try to parse
logger.info("Tool call returned a string. Attempting to parse with process_mcp_result.", emoji_key="memo")
synthesis_data_dict = await process_mcp_result(synthesis_result_raw)
else: # If it's some other type, log and try process_mcp_result
logger.warning(f"Tool call returned an unexpected type: {type(synthesis_result_raw)}. Attempting to process with process_mcp_result.", emoji_key="warning")
synthesis_data_dict = await process_mcp_result(synthesis_result_raw)
# Check for errors from the tool call itself or if the synthesis_data_dict is problematic
if not synthesis_data_dict or not isinstance(synthesis_data_dict, dict) or \
synthesis_data_dict.get("success", True) is False or \
(synthesis_data_dict.get("status") == "FAILED" and synthesis_data_dict.get("error_message")):
error_msg = "Unknown error or empty/invalid data from synthesis tool call."
if synthesis_data_dict and isinstance(synthesis_data_dict, dict):
error_msg = synthesis_data_dict.get("error_message", synthesis_data_dict.get("error", error_msg))
logger.error(
f"Single-shot synthesis tool call failed or returned invalid data: {error_msg}", emoji_key="cross_mark"
)
console.print(
f"[bold red]Error from synthesis tool call:[/bold red] {escape(error_msg)}"
)
# Still attempt to display partial data if the structure is somewhat intact
if synthesis_data_dict and isinstance(synthesis_data_dict, dict):
display_single_shot_synthesis_results(synthesis_data_dict, args.prompt, console)
else:
console.print(Panel("[bold red]Received no usable data from the synthesis tool.[/bold red]", border_style="red"))
return 1
console.print(Rule("[bold green]✔️ Synthesis Process Completed[/bold green]"))
# Pass the original prompt (args.prompt) to the display function
display_single_shot_synthesis_results(synthesis_data_dict, args.prompt, console)
# Cost tracking
total_metrics = synthesis_data_dict.get("total_metrics", {})
cost = total_metrics.get("total_cost", 0.0)
input_tokens = total_metrics.get("total_input_tokens", 0)
output_tokens = total_metrics.get("total_output_tokens", 0)
# For tracker, provider/model is ambiguous for the whole operation, use task name
tracker.record_call(
cost=cost,
provider="synthesis_tool_operation",
model=args.name,
input_tokens=input_tokens,
output_tokens=output_tokens
)
except (ToolError, ProviderError, Exception) as e:
logger.error(
f"An error occurred during the single-shot synthesis demo: {e}",
exc_info=True,
emoji_key="error",
)
console.print(f"[bold red]Demo Error:[/bold red] {escape(str(e))}")
return 1
finally:
tracker.display_summary(console)
logger.info("Single-shot synthesis demo finished.", emoji_key="party_popper")
return 0
async def main_async_sss():
args = parse_arguments_sss()
tracker = CostTracker()
exit_code = 1
try:
await setup_gateway_for_demo_sss()
exit_code = await run_single_shot_demo(tracker, args)
except Exception as e:
console.print(
f"[bold red]Critical error in demo setup or execution:[/bold red] {escape(str(e))}"
)
logger.critical(f"Demo main_async_sss failed: {e}", exc_info=True)
finally:
logger.info("Demo finished.")
return exit_code
if __name__ == "__main__":
try:
final_exit_code = asyncio.run(main_async_sss())
except KeyboardInterrupt:
console.print("\n[bold yellow]Demo interrupted by user.[/bold yellow]")
final_exit_code = 130
sys.exit(final_exit_code)
```
--------------------------------------------------------------------------------
/examples/python_sandbox_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Comprehensive demonstration script for PythonSandbox tools in Ultimate MCP Server."""
# --- Standard Library Imports ---
import argparse
import asyncio
import sys
import uuid
from pathlib import Path
# --- Configuration & Path Setup ---
# Add project root to path for imports when running as script
# Adjust this path if your script location relative to the project root differs
try:
PROJECT_ROOT = Path(__file__).resolve().parent.parent
if not (PROJECT_ROOT / "ultimate_mcp_server").is_dir(): # Check for the actual package dir
# Fallback if running from a different structure (e.g., examples dir directly)
PROJECT_ROOT = (
Path(__file__).resolve().parent.parent.parent
) # Go up two levels if in examples
if not (PROJECT_ROOT / "ultimate_mcp_server").is_dir():
print(
"Error: Could not reliably determine project root. Make sure 'ultimate_mcp_server' is importable.",
file=sys.stderr,
)
sys.exit(1)
sys.path.insert(0, str(PROJECT_ROOT))
print(f"DEBUG: Added '{PROJECT_ROOT}' to sys.path")
except Exception as e:
print(f"Error during initial path setup: {e}", file=sys.stderr)
sys.exit(1)
# --- IMPORTANT: Playwright Check FIRST ---
# The sandbox relies heavily on Playwright. Check availability early.
try:
import playwright.async_api as pw # noqa F401 - Check import
PLAYWRIGHT_AVAILABLE_DEMO = True
except ImportError:
PLAYWRIGHT_AVAILABLE_DEMO = False
print(
"[ERROR] Playwright library not found. Please install it (`pip install playwright && playwright install chromium`).",
file=sys.stderr,
)
# Exit immediately if Playwright is crucial for the demo's purpose
sys.exit(1)
# --- Defer ultimate_mcp_server imports AFTER path setup ---
# Import Rich components
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.traceback import install as install_rich_traceback
# Import necessary tool functions and exceptions
from ultimate_mcp_server.exceptions import ProviderError, ToolError, ToolInputError
from ultimate_mcp_server.tools.python_sandbox import (
_close_all_sandboxes, # Import cleanup function
display_sandbox_result,
execute_python,
repl_python,
)
from ultimate_mcp_server.utils import get_logger
# Use the generic display helper and make a sandbox-specific one
from ultimate_mcp_server.utils.display import safe_tool_call
from ultimate_mcp_server.utils.logging.console import console
# --- Logger and Constants ---
logger = get_logger("example.python_sandbox")
# Use a unique session ID for REPL tests
REPL_SESSION_HANDLE = f"demo-repl-{uuid.uuid4().hex[:8]}"
# Install rich tracebacks for better error display
install_rich_traceback(show_locals=False, width=console.width)
# --- Enhanced Display Helper (from older script) ---
# --- Argument Parsing ---
def parse_arguments():
"""Parse command line arguments for the demo."""
parser = argparse.ArgumentParser(
description="Python Sandbox Demo for Ultimate MCP Server Tools",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""Available demos:
all - Run all demos (default)
basic - Basic execution, stdout/stderr, result capture
errors - Syntax and Runtime error handling
timeout - Execution timeout handling
packages - Package loading (numpy, pandas)
wheels - Wheel loading via micropip (requires --allow-network)
repl - Persistent REPL state and reset functionality
security - Network and filesystem access controls
visualization - Data visualization using matplotlib (requires package)
""",
)
parser.add_argument(
"demo",
nargs="?",
default="all",
choices=[
"all",
"basic",
"errors",
"timeout",
"packages",
"wheels",
"repl",
"security",
"visualization",
],
help="Specific demo to run (default: all)",
)
parser.add_argument(
"--allow-network",
action="store_true",
help="Enable network access within the sandbox for demos requiring it (e.g., wheel loading)",
)
parser.add_argument(
"--allow-fs",
action="store_true",
help="Enable filesystem access bridge (mcpfs) within the sandbox. Requires filesystem tool to be configured.",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Increase output verbosity (Note: internal tool logging is often DEBUG level)",
)
return parser.parse_args()
# --- Demo Functions ---
async def demonstrate_basic_execution(args):
"""Demonstrate basic code execution, I/O capture, results."""
console.print(Rule("[bold cyan]1. Basic Execution & I/O[/bold cyan]", style="cyan"))
logger.info("Demonstrating basic execution...", emoji_key="code")
# --- Simple Execution with Result ---
code_simple = """
result = 40 + 2
print("Calculation done.")
"""
result = await safe_tool_call(
execute_python,
{"code": code_simple},
description="Executing simple addition (result = 40 + 2)",
)
display_sandbox_result("Basic Addition", result, code_simple)
# --- Stdout/Stderr Capture ---
code_io = """
import sys
print("Hello to stdout!")
print("This is line 2 on stdout.")
print("Error message to stderr!", file=sys.stderr)
print("Another error line!", file=sys.stderr)
result = "IO test complete"
"""
result = await safe_tool_call(
execute_python, {"code": code_io}, description="Capturing stdout and stderr"
)
display_sandbox_result("Stdout/Stderr Capture", result, code_io)
# --- No 'result' variable assigned ---
code_no_result = """
x = 10
y = 20
print(f"x + y = {x+y}")
# 'result' variable is not assigned
"""
result = await safe_tool_call(
execute_python,
{"code": code_no_result},
description="Executing code without assigning to 'result'",
)
display_sandbox_result("No 'result' Variable Assigned", result, code_no_result)
async def demonstrate_error_handling(args):
"""Demonstrate handling of syntax and runtime errors."""
console.print(Rule("[bold cyan]2. Error Handling Demo[/bold cyan]", style="cyan"))
logger.info("Starting error handling demo")
# --- Syntax Error ---
code_syntax_error = "result = 1 +" # Missing operand
result = await safe_tool_call(
execute_python,
{"code": code_syntax_error},
description="Executing code with SyntaxError (should fail)",
)
display_sandbox_result("Syntax Error Handling", result, code_syntax_error)
# --- Runtime Error ---
code_runtime_error = """
def divide(a, b):
return a / b
result = divide(10, 0) # ZeroDivisionError
"""
result = await safe_tool_call(
execute_python,
{"code": code_runtime_error},
description="Executing code with ZeroDivisionError (should fail)",
)
display_sandbox_result("Runtime Error Handling (ZeroDivisionError)", result, code_runtime_error)
# --- Name Error ---
code_name_error = "result = undefined_variable + 5"
result = await safe_tool_call(
execute_python,
{"code": code_name_error},
description="Executing code with NameError (should fail)",
)
display_sandbox_result("Runtime Error Handling (NameError)", result, code_name_error)
async def demonstrate_timeout(args):
"""Demonstrate timeout handling."""
console.print(Rule("[bold cyan]3. Timeout Handling Demo[/bold cyan]", style="cyan"))
logger.info("Starting timeout handling demo")
code_timeout = """
import time
print("Starting computation that will time out...")
time.sleep(5) # Sleep for 5 seconds
print("This line should not be reached due to timeout")
result = "Completed successfully despite timeout request?" # Should not happen
"""
# Use a short timeout (3 seconds) to trigger the error
result = await safe_tool_call(
execute_python,
{"code": code_timeout, "timeout_ms": 3000},
description="Executing code that exceeds timeout (3s)",
)
display_sandbox_result("Timeout Handling (3s Timeout)", result, code_timeout)
async def demonstrate_packages(args):
"""Demonstrate loading Python packages."""
console.print(
Rule("[bold cyan]4. Package Loading Demo (NumPy & Pandas)[/bold cyan]", style="cyan")
)
logger.info("Starting package loading demo")
# NumPy example
numpy_code = """
import numpy as np
a = np.array([[1, 2], [3, 4]])
result = {
'shape': a.shape,
'mean': np.mean(a).item(), # Use .item() for scalar
'determinant': np.linalg.det(a).item() if a.shape == (2, 2) else 'N/A'
}
print(f"Array:\\n{a}")
"""
result = await safe_tool_call(
execute_python,
{"code": numpy_code, "packages": ["numpy"], "timeout_ms": 15000},
description="Using numpy package",
)
display_sandbox_result("NumPy Package Demo", result, numpy_code)
# Pandas example (depends on numpy)
pandas_code = """
import pandas as pd
data = {'col1': [1, 2, 3], 'col2': [4, 5, 6]}
df = pd.DataFrame(data)
print("DataFrame Head:")
print(df.head())
result = df.describe().to_dict() # Return summary stats as dict
"""
result = await safe_tool_call(
execute_python,
{"code": pandas_code, "packages": ["pandas"], "timeout_ms": 20000},
description="Using pandas package",
)
display_sandbox_result("Pandas Package Demo", result, pandas_code)
async def demonstrate_wheels(args):
"""Demonstrate loading wheels (requires network)."""
console.print(Rule("[bold cyan]5. Wheel Loading Demo (httpx)[/bold cyan]", style="cyan"))
logger.info("Starting wheel loading demo")
if not args.allow_network:
console.print(
Panel(
"Skipping wheel loading demo.\n"
"Network access is required to install wheels from URLs or PyPI.\n"
"Rerun with the [yellow]--allow-network[/yellow] flag to include this test.",
title="Network Access Disabled",
border_style="yellow",
expand=False,
)
)
return
code_wheel = """
try:
import httpx
print(f"httpx version: {httpx.__version__}")
# Make a simple request to test network access
response = httpx.get('https://httpbin.org/get?demo=wheel', timeout=10)
response.raise_for_status()
data = response.json()
result = f"Successfully fetched URL via httpx. Origin IP: {data.get('origin', 'Unknown')}"
except Exception as e:
# Raising an exception shows up nicely in stderr display
raise RuntimeError(f"Error using httpx: {e}") from e
"""
# Specify package 'httpx'. Micropip should handle fetching it if not preloaded.
result = await safe_tool_call(
execute_python,
{"code": code_wheel, "packages": ["httpx"], "allow_network": True, "timeout_ms": 25000},
description="Loading 'httpx' package/wheel (requires network)",
)
display_sandbox_result("Wheel Loading Demo (httpx)", result, code_wheel)
async def demonstrate_repl(args):
"""Demonstrate persistent REPL sessions and reset."""
console.print(Rule("[bold cyan]6. Persistent REPL Sessions[/bold cyan]", style="cyan"))
logger.info("Demonstrating REPL functionality...", emoji_key="repl")
repl_handle = REPL_SESSION_HANDLE # Use a consistent handle for the demo
# --- Call 1: Define Variable & Function ---
code1 = """
x = 100
def double(val):
return val * 2
print(f"Defined x = {x} and function double()")
result = "Setup complete"
"""
result1 = await safe_tool_call(
repl_python,
{"code": code1, "handle": repl_handle},
description=f"REPL Call 1 (Handle: {repl_handle[-8:]}): Define x and double()",
)
display_sandbox_result(f"REPL Step 1 (Handle: ...{repl_handle[-8:]})", result1, code1)
if (
not result1
or not result1.get("success")
or result1.get("result", {}).get("handle") != repl_handle
):
console.print(
"[bold red]Error:[/bold red] Failed to get handle from first REPL call. Aborting REPL demo."
)
return
# --- Call 2: Use Variable & Function ---
code2 = "result = double(x) # Uses x and double() from previous call"
result2 = await safe_tool_call(
repl_python,
{"code": code2, "handle": repl_handle},
description=f"REPL Call 2 (Handle: {repl_handle[-8:]}): Call double(x)",
)
display_sandbox_result(f"REPL Step 2 (Handle: ...{repl_handle[-8:]})", result2, code2)
# --- Call 3: Import and Use ---
code3 = """
import math
result = math.sqrt(x) # Use x again
print(f"Square root of x ({x}) is {result}")
"""
result3 = await safe_tool_call(
repl_python,
{
"code": code3,
"handle": repl_handle,
"packages": [],
},
description=f"REPL Call 3 (Handle: {repl_handle[-8:]}): Import math and use x",
)
display_sandbox_result(f"REPL Step 3 (Handle: ...{repl_handle[-8:]})", result3, code3)
# --- Call 4: Reset Session ---
# Code is empty, only resetting
result4 = await safe_tool_call(
repl_python,
{"code": "", "handle": repl_handle, "reset": True},
description=f"REPL Call 4 (Handle: {repl_handle[-8:]}): Resetting the session",
)
display_sandbox_result(
f"REPL Step 4 - Reset (Handle: ...{repl_handle[-8:]})",
result4,
"# Resetting the REPL state",
)
# --- Call 5: Try Using Variable After Reset (should fail) ---
code5 = """
try:
result = double(x) # Should fail as x and double are gone
except NameError as e:
print(f"Caught expected error: {e}")
result = f"Caught expected NameError: {e}"
"""
result5 = await safe_tool_call(
repl_python,
{"code": code5, "handle": repl_handle},
description=f"REPL Call 5 (Handle: {repl_handle[-8:]}): Using state after reset (should fail/catch NameError)",
)
display_sandbox_result(
f"REPL Step 5 - Post-Reset (Handle: ...{repl_handle[-8:]})", result5, code5
)
async def demonstrate_security(args):
"""Demonstrate network and filesystem access controls."""
console.print(Rule("[bold cyan]7. Security Controls[/bold cyan]", style="cyan"))
logger.info("Demonstrating security controls...", emoji_key="security")
# --- Network Access Control ---
console.print(Rule("Network Access", style="dim"))
code_network = """
import httpx
try:
# Use httpx which was potentially loaded in wheel demo
print("Attempting network request to httpbin...")
response = httpx.get('https://httpbin.org/get?demo=network_security', timeout=5)
response.raise_for_status()
result = f"Network access successful. Status: {response.status_code}"
except Exception as e:
# Use print instead of raise to see the output in the demo result
print(f"Network request failed: {type(e).__name__}: {e}")
result = f"Network request failed as expected (or httpx not loaded)."
"""
# Attempt without network access (should fail within sandbox)
console.print(
Panel(
"Attempting network access with [red]allow_network=False[/red] (expected failure or httpx import error)",
title="Network Test 1",
)
)
result_net_denied = await safe_tool_call(
execute_python,
{"code": code_network, "packages": ["httpx"], "allow_network": False},
description="Network access with allow_network=False",
)
display_sandbox_result("Network Access Denied", result_net_denied, code_network)
# Attempt with network access (should succeed IF network flag is passed)
console.print(
Panel(
"Attempting network access with [green]allow_network=True[/green]",
title="Network Test 2",
)
)
if args.allow_network:
result_net_allowed = await safe_tool_call(
execute_python,
{"code": code_network, "packages": ["httpx"], "allow_network": True},
description="Network access with allow_network=True",
)
display_sandbox_result("Network Access Allowed", result_net_allowed, code_network)
else:
console.print(
"[yellow]Skipped:[/yellow] Rerun demo with --allow-network flag to test allowed network access."
)
# --- Filesystem Access Control ---
console.print(Rule("Filesystem Access (via mcpfs bridge)", style="dim"))
code_fs_list = """
try:
import mcpfs
print("Attempting to list directory '.' via mcpfs...")
# Note: Path inside sandbox needs to map to an allowed host path
target_path = '.' # Represents the sandbox's current dir
listing = mcpfs.listdir(target_path)
result = f"Successfully listed '{target_path}': {len(listing)} entries found via mcpfs."
print(f"Listing result: {listing}")
except ModuleNotFoundError:
print("mcpfs module not available (allow_fs=False?)")
result = "mcpfs module not found (expected failure)"
except Exception as e:
print(f"Filesystem access failed: {type(e).__name__}: {e}")
result = f"Filesystem access failed: {e}"
"""
# Attempt without FS access (should fail - ModuleNotFoundError)
console.print(
Panel(
"Attempting filesystem access with [red]allow_fs=False[/red] (expected ModuleNotFoundError)",
title="Filesystem Test 1",
)
)
result_fs_denied = await safe_tool_call(
execute_python,
{"code": code_fs_list, "allow_fs": False},
description="Filesystem access with allow_fs=False",
)
display_sandbox_result("Filesystem Access Denied (mcpfs)", result_fs_denied, code_fs_list)
# Attempt with FS access (should succeed IF FS flag is passed AND FS tool configured on host)
console.print(
Panel(
"Attempting filesystem access with [green]allow_fs=True[/green]",
title="Filesystem Test 2",
)
)
if args.allow_fs:
console.print(
"[yellow]Note:[/yellow] Success requires the host Filesystem tool to be configured with allowed directories."
)
result_fs_allowed = await safe_tool_call(
execute_python,
{"code": code_fs_list, "allow_fs": True},
description="Filesystem access with allow_fs=True",
)
display_sandbox_result("Filesystem Access Allowed (mcpfs)", result_fs_allowed, code_fs_list)
else:
console.print(
"[yellow]Skipped:[/yellow] Rerun demo with --allow-fs flag to test allowed filesystem access bridge."
)
console.print(
"[dim](Also ensure the host Filesystem tool is configured with allowed directories.)[/dim]"
)
async def demonstrate_visualization(args):
"""Demonstrate data visualization capabilities."""
console.print(
Rule("[bold cyan]8. Data Visualization Demo (Matplotlib)[/bold cyan]", style="cyan")
)
logger.info("Starting data visualization demo")
matplotlib_code = """
# Ensure backend is non-interactive
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import numpy as np
from io import BytesIO
import base64
try:
print("Generating plot...")
# Generate data
x = np.linspace(-np.pi, np.pi, 200)
y_sin = np.sin(x)
y_cos = np.cos(x)
# Create plot
fig, ax = plt.subplots(figsize=(8, 5)) # Use fig, ax pattern
ax.plot(x, y_sin, label='sin(x)')
ax.plot(x, y_cos, label='cos(x)', linestyle='--')
ax.set_title('Sine and Cosine Waves')
ax.set_xlabel('Radians')
ax.set_ylabel('Value')
ax.grid(True)
ax.legend()
plt.tight_layout() # Adjust layout
# Save plot to base64
buffer = BytesIO()
fig.savefig(buffer, format='png', dpi=90) # Save the figure object
buffer.seek(0)
img_str = base64.b64encode(buffer.read()).decode('utf-8')
plt.close(fig) # Close the figure to free memory
print(f"Generated plot as base64 string (Length: {len(img_str)} chars)")
result = f"data:image/png;base64,{img_str}"
except Exception as e:
print(f"Error during plotting: {type(e).__name__}: {e}")
import traceback
traceback.print_exc() # Print traceback to stderr for diagnosis
result = f"Plot generation failed: {e}"
"""
# Requires matplotlib and numpy packages
result = await safe_tool_call(
execute_python,
{"code": matplotlib_code, "packages": ["numpy", "matplotlib"], "timeout_ms": 25000},
description="Generating plot with Matplotlib",
)
# Display result, summarizing the base64 string
result_display = result.copy()
if result_display.get("success") and "result" in result_display.get("result", {}):
res_value = result_display["result"]["result"]
if isinstance(res_value, str) and res_value.startswith("data:image/png;base64,"):
result_display["result"]["result"] = f"[Base64 image data - {len(res_value)} chars]"
display_sandbox_result("Matplotlib Visualization", result_display, matplotlib_code)
console.print(
Panel(
"[yellow]Note:[/] The 'result' contains base64 image data. In a web UI, this could be displayed using an `<img>` tag.",
border_style="yellow",
)
)
async def main():
"""Run the Python Sandbox tools demonstration."""
args = parse_arguments()
exit_code = 0
console.print(Rule("[bold magenta]Python Sandbox Tools Demo[/bold magenta]", style="white"))
# Explicitly check for Playwright availability
if not PLAYWRIGHT_AVAILABLE_DEMO:
console.print(
"[bold red]Error:[/bold red] Playwright is required for the Python Sandbox tool but is not installed or importable."
)
console.print(
"Please install it via: [cyan]pip install playwright && playwright install chromium[/]"
)
return 1 # Exit if core dependency is missing
logger.info("Starting Python Sandbox demonstration", emoji_key="start")
try:
# --- Display Demo Options ---
if args.demo == "all":
console.print(
Panel(
"Running all demo sections.\n"
"Use command-line arguments to run specific sections (e.g., `python examples/python_sandbox_demo.py repl`).\n"
"Use `--allow-network` or `--allow-fs` to enable those features for relevant tests.",
title="Demo Options",
border_style="cyan",
expand=False,
)
)
# --- Run Selected Demonstrations ---
run_all = args.demo == "all"
if run_all or args.demo == "basic":
await demonstrate_basic_execution(args)
console.print()
if run_all or args.demo == "errors":
await demonstrate_error_handling(args)
console.print()
if run_all or args.demo == "timeout":
await demonstrate_timeout(args)
console.print()
if run_all or args.demo == "packages":
await demonstrate_packages(args)
console.print()
if run_all or args.demo == "wheels":
await demonstrate_wheels(args)
console.print()
if run_all or args.demo == "repl":
await demonstrate_repl(args)
console.print()
if run_all or args.demo == "security":
await demonstrate_security(args)
console.print()
if run_all or args.demo == "visualization":
await demonstrate_visualization(args)
console.print()
logger.success(f"Python Sandbox Demo(s) completed: {args.demo}", emoji_key="complete")
console.print(Rule("[bold green]Demo Complete[/bold green]", style="green"))
except (ToolInputError, ToolError, ProviderError) as e:
logger.error(f"Tool Error during demo: {e}", emoji_key="error", exc_info=True)
console.print(f"\n[bold red]TOOL ERROR:[/bold red] {escape(str(e))}")
if hasattr(e, "details") and e.details:
console.print("[bold]Details:[/bold]")
console.print(escape(str(e.details)))
exit_code = 1
except Exception as e:
logger.critical(f"Demo crashed unexpectedly: {str(e)}", emoji_key="critical", exc_info=True)
console.print(f"\n[bold red]CRITICAL ERROR:[/bold red] {escape(str(e))}")
console.print_exception(show_locals=False)
exit_code = 1
finally:
# --- Cleanup ---
console.print(Rule("Cleanup", style="dim"))
try:
# Explicitly call the sandbox cleanup function
await _close_all_sandboxes()
logger.info("Sandbox cleanup completed.", emoji_key="cleanup")
console.print("Sandbox cleanup finished.")
except Exception as e:
logger.error(f"Error during sandbox cleanup: {e}", emoji_key="error")
console.print(f"[bold red]Error during sandbox cleanup:[/bold red] {escape(str(e))}")
return exit_code
if __name__ == "__main__":
# Run the demo
final_exit_code = asyncio.run(main())
sys.exit(final_exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/analytics/metrics.py:
--------------------------------------------------------------------------------
```python
"""Metrics collection and monitoring for Ultimate MCP Server."""
import json
import os
import time
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional, Union
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
try:
import aiofiles
AIOFILES_AVAILABLE = True
except ImportError:
AIOFILES_AVAILABLE = False
try:
from prometheus_client import Counter, Histogram
PROMETHEUS_AVAILABLE = True
except ImportError:
PROMETHEUS_AVAILABLE = False
class MetricsTracker:
"""Comprehensive metrics tracking and monitoring system for Ultimate MCP Server.
The MetricsTracker is a singleton class that collects, processes, and persists
operational metrics related to LLM usage, costs, performance, and errors. It provides
the data foundation for analytics reporting and monitoring tools.
Key features:
- Singleton design pattern ensures consistent metrics across application
- Persistent storage with automatic serialization to JSON
- Tracks usage by provider, model, and time periods (hourly, daily)
- Records request counts, token usage, costs, errors, and performance metrics
- Cache efficiency monitoring (hits, misses, cost savings)
- Optional Prometheus integration for external monitoring systems
- Asynchronous persistence to minimize performance impact
- Automatic data retention policies to prevent memory bloat
The metrics are automatically persisted to disk and can be loaded on startup,
providing continuity across server restarts. Time-series data is maintained
for historical analysis and trend visualization.
Usage:
# Get the singleton instance
metrics = get_metrics_tracker()
# Record a request
metrics.record_request(
provider="anthropic",
model="claude-3-opus",
input_tokens=150,
output_tokens=500,
cost=0.0325,
duration=2.5
)
# Record cache operations
metrics.record_cache_hit(cost_saved=0.015)
metrics.record_cache_miss()
# Get current statistics
stats = metrics.get_stats()
# Manually trigger persistence (usually automatic)
metrics.save_metrics()
"""
_instance = None
def __new__(cls, *args, **kwargs):
"""Create a singleton instance."""
if cls._instance is None:
cls._instance = super(MetricsTracker, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(
self,
metrics_dir: Optional[Union[str, Path]] = None,
enable_prometheus: bool = False,
reset_on_start: bool = False
):
"""Initialize the metrics tracker.
Args:
metrics_dir: Directory for metrics storage
enable_prometheus: Whether to enable Prometheus metrics
reset_on_start: Whether to reset metrics on startup
"""
# Only initialize once for singleton
if self._initialized:
return
# Set metrics directory
if metrics_dir:
self.metrics_dir = Path(metrics_dir)
else:
self.metrics_dir = Path.home() / ".ultimate" / "metrics"
# Create metrics directory if it doesn't exist
self.metrics_dir.mkdir(parents=True, exist_ok=True)
# Prometheus settings
self.enable_prometheus = enable_prometheus and PROMETHEUS_AVAILABLE
# Initialize metrics data
if reset_on_start:
self._reset_metrics()
else:
self._load_metrics()
# Initialize Prometheus metrics if enabled
if self.enable_prometheus:
self._init_prometheus_metrics()
self._initialized = True
logger.info(
f"Metrics tracker initialized (dir: {self.metrics_dir}, prometheus: {self.enable_prometheus})",
emoji_key="analytics"
)
def _reset_metrics(self):
"""Reset all metrics data."""
# General stats
self.start_time = time.time()
self.requests_total = 0
self.tokens_total = 0
self.cost_total = 0.0
# Provider-specific stats
self.provider_requests = defaultdict(int)
self.provider_tokens = defaultdict(int)
self.provider_costs = defaultdict(float)
# Model-specific stats
self.model_requests = defaultdict(int)
self.model_tokens = defaultdict(int)
self.model_costs = defaultdict(float)
# Request timing stats
self.request_times = []
self.request_times_by_provider = defaultdict(list)
self.request_times_by_model = defaultdict(list)
# Error stats
self.errors_total = 0
self.errors_by_provider = defaultdict(int)
self.errors_by_model = defaultdict(int)
# Token usage by time period
self.hourly_tokens = defaultdict(int)
self.daily_tokens = defaultdict(int)
# Request counts by time period
self.hourly_requests = defaultdict(int)
self.daily_requests = defaultdict(int)
# Cost by time period
self.hourly_costs = defaultdict(float)
self.daily_costs = defaultdict(float)
# Cache stats
self.cache_hits = 0
self.cache_misses = 0
self.cache_saved_cost = 0.0
def _load_metrics(self):
"""Load metrics from disk."""
metrics_file = self.metrics_dir / "metrics.json"
if metrics_file.exists():
try:
with open(metrics_file, "r") as f:
data = json.load(f)
# Load general stats
self.start_time = data.get("start_time", time.time())
self.requests_total = data.get("requests_total", 0)
self.tokens_total = data.get("tokens_total", 0)
self.cost_total = data.get("cost_total", 0.0)
# Load provider stats
self.provider_requests = defaultdict(int, data.get("provider_requests", {}))
self.provider_tokens = defaultdict(int, data.get("provider_tokens", {}))
self.provider_costs = defaultdict(float, data.get("provider_costs", {}))
# Load model stats
self.model_requests = defaultdict(int, data.get("model_requests", {}))
self.model_tokens = defaultdict(int, data.get("model_tokens", {}))
self.model_costs = defaultdict(float, data.get("model_costs", {}))
# Load timing stats (limited to last 1000 for memory)
self.request_times = data.get("request_times", [])[-1000:]
self.request_times_by_provider = defaultdict(list)
for provider, times in data.get("request_times_by_provider", {}).items():
self.request_times_by_provider[provider] = times[-1000:]
self.request_times_by_model = defaultdict(list)
for model, times in data.get("request_times_by_model", {}).items():
self.request_times_by_model[model] = times[-1000:]
# Load error stats
self.errors_total = data.get("errors_total", 0)
self.errors_by_provider = defaultdict(int, data.get("errors_by_provider", {}))
self.errors_by_model = defaultdict(int, data.get("errors_by_model", {}))
# Load time period stats
self.hourly_tokens = defaultdict(int, data.get("hourly_tokens", {}))
self.daily_tokens = defaultdict(int, data.get("daily_tokens", {}))
self.hourly_costs = defaultdict(float, data.get("hourly_costs", {}))
self.daily_costs = defaultdict(float, data.get("daily_costs", {}))
self.hourly_requests = defaultdict(int, data.get("hourly_requests", {}))
self.daily_requests = defaultdict(int, data.get("daily_requests", {}))
# Load cache stats
self.cache_hits = data.get("cache_hits", 0)
self.cache_misses = data.get("cache_misses", 0)
self.cache_saved_cost = data.get("cache_saved_cost", 0.0)
logger.info(
f"Loaded metrics from {metrics_file}",
emoji_key="analytics"
)
except Exception as e:
logger.error(
f"Failed to load metrics: {str(e)}",
emoji_key="error"
)
self._reset_metrics()
else:
self._reset_metrics()
def _init_prometheus_metrics(self):
"""Initialize Prometheus metrics."""
if not PROMETHEUS_AVAILABLE:
return
# Request metrics
self.prom_requests_total = Counter(
"ultimate_requests_total",
"Total number of requests",
["provider", "model"]
)
# Token metrics
self.prom_tokens_total = Counter(
"ultimate_tokens_total",
"Total number of tokens",
["provider", "model", "type"] # type: input or output
)
# Cost metrics
self.prom_cost_total = Counter(
"ultimate_cost_total",
"Total cost in USD",
["provider", "model"]
)
# Timing metrics
self.prom_request_duration = Histogram(
"ultimate_request_duration_seconds",
"Request duration in seconds",
["provider", "model"],
buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0)
)
# Error metrics
self.prom_errors_total = Counter(
"ultimate_errors_total",
"Total number of errors",
["provider", "model"]
)
# Cache metrics
self.prom_cache_hits = Counter(
"ultimate_cache_hits_total",
"Total number of cache hits"
)
self.prom_cache_misses = Counter(
"ultimate_cache_misses_total",
"Total number of cache misses"
)
self.prom_cache_saved_cost = Counter(
"ultimate_cache_saved_cost_total",
"Total cost saved by cache in USD"
)
async def _save_metrics_async(self):
"""Save metrics to disk asynchronously."""
if not AIOFILES_AVAILABLE:
return
metrics_file = self.metrics_dir / "metrics.json"
temp_file = metrics_file.with_suffix(".tmp")
try:
# Prepare data for storage
data = {
"start_time": self.start_time,
"requests_total": self.requests_total,
"tokens_total": self.tokens_total,
"cost_total": self.cost_total,
"provider_requests": dict(self.provider_requests),
"provider_tokens": dict(self.provider_tokens),
"provider_costs": dict(self.provider_costs),
"model_requests": dict(self.model_requests),
"model_tokens": dict(self.model_tokens),
"model_costs": dict(self.model_costs),
"request_times": self.request_times,
"request_times_by_provider": {k: v for k, v in self.request_times_by_provider.items()},
"request_times_by_model": {k: v for k, v in self.request_times_by_model.items()},
"errors_total": self.errors_total,
"errors_by_provider": dict(self.errors_by_provider),
"errors_by_model": dict(self.errors_by_model),
"hourly_tokens": dict(self.hourly_tokens),
"daily_tokens": dict(self.daily_tokens),
"hourly_costs": dict(self.hourly_costs),
"daily_costs": dict(self.daily_costs),
"hourly_requests": dict(self.hourly_requests),
"daily_requests": dict(self.daily_requests),
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"cache_saved_cost": self.cache_saved_cost,
"last_updated": time.time()
}
# Save to temp file
async with aiofiles.open(temp_file, "w") as f:
await f.write(json.dumps(data, indent=2))
# Rename temp file to actual file
os.replace(temp_file, metrics_file)
except Exception as e:
logger.error(
f"Failed to save metrics: {str(e)}",
emoji_key="error"
)
def save_metrics(self):
"""Save metrics to disk synchronously."""
metrics_file = self.metrics_dir / "metrics.json"
temp_file = metrics_file.with_suffix(".tmp")
try:
# Prepare data for storage
data = {
"start_time": self.start_time,
"requests_total": self.requests_total,
"tokens_total": self.tokens_total,
"cost_total": self.cost_total,
"provider_requests": dict(self.provider_requests),
"provider_tokens": dict(self.provider_tokens),
"provider_costs": dict(self.provider_costs),
"model_requests": dict(self.model_requests),
"model_tokens": dict(self.model_tokens),
"model_costs": dict(self.model_costs),
"request_times": self.request_times,
"request_times_by_provider": {k: v for k, v in self.request_times_by_provider.items()},
"request_times_by_model": {k: v for k, v in self.request_times_by_model.items()},
"errors_total": self.errors_total,
"errors_by_provider": dict(self.errors_by_provider),
"errors_by_model": dict(self.errors_by_model),
"hourly_tokens": dict(self.hourly_tokens),
"daily_tokens": dict(self.daily_tokens),
"hourly_costs": dict(self.hourly_costs),
"daily_costs": dict(self.daily_costs),
"hourly_requests": dict(self.hourly_requests),
"daily_requests": dict(self.daily_requests),
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"cache_saved_cost": self.cache_saved_cost,
"last_updated": time.time()
}
# Save to temp file
with open(temp_file, "w") as f:
json.dump(data, f, indent=2)
# Rename temp file to actual file
os.replace(temp_file, metrics_file)
except Exception as e:
logger.error(
f"Failed to save metrics: {str(e)}",
emoji_key="error"
)
def record_request(
self,
provider: str,
model: str,
input_tokens: int,
output_tokens: int,
cost: float,
duration: float,
success: bool = True
):
"""Record metrics for a request.
Args:
provider: Provider name
model: Model name
input_tokens: Number of input tokens
output_tokens: Number of output tokens
cost: Cost of the request
duration: Duration of the request in seconds
success: Whether the request was successful
"""
# Update general stats
self.requests_total += 1
total_tokens = input_tokens + output_tokens
self.tokens_total += total_tokens
self.cost_total += cost
# Update provider stats
self.provider_requests[provider] += 1
self.provider_tokens[provider] += total_tokens
self.provider_costs[provider] += cost
# Update model stats
self.model_requests[model] += 1
self.model_tokens[model] += total_tokens
self.model_costs[model] += cost
# Update timing stats
self.request_times.append(duration)
if len(self.request_times) > 1000:
self.request_times = self.request_times[-1000:]
self.request_times_by_provider[provider].append(duration)
if len(self.request_times_by_provider[provider]) > 1000:
self.request_times_by_provider[provider] = self.request_times_by_provider[provider][-1000:]
self.request_times_by_model[model].append(duration)
if len(self.request_times_by_model[model]) > 1000:
self.request_times_by_model[model] = self.request_times_by_model[model][-1000:]
# Update error stats if request failed
if not success:
self.errors_total += 1
self.errors_by_provider[provider] += 1
self.errors_by_model[model] += 1
# Update time period stats
current_time = time.time()
hour_key = datetime.fromtimestamp(current_time).strftime("%Y-%m-%d-%H")
day_key = datetime.fromtimestamp(current_time).strftime("%Y-%m-%d")
self.hourly_tokens[hour_key] += total_tokens
self.daily_tokens[day_key] += total_tokens
self.hourly_costs[hour_key] += cost
self.daily_costs[day_key] += cost
self.hourly_requests[hour_key] += 1
self.daily_requests[day_key] += 1
# Update Prometheus metrics if enabled
if self.enable_prometheus:
self.prom_requests_total.labels(provider=provider, model=model).inc()
self.prom_tokens_total.labels(provider=provider, model=model, type="input").inc(input_tokens)
self.prom_tokens_total.labels(provider=provider, model=model, type="output").inc(output_tokens)
self.prom_cost_total.labels(provider=provider, model=model).inc(cost)
self.prom_request_duration.labels(provider=provider, model=model).observe(duration)
if not success:
self.prom_errors_total.labels(provider=provider, model=model).inc()
# Schedule metrics saving
try:
import asyncio
asyncio.create_task(self._save_metrics_async())
except (ImportError, RuntimeError):
# Fall back to synchronous saving if asyncio not available
self.save_metrics()
def record_cache_hit(self, cost_saved: float = 0.0):
"""Record a cache hit.
Args:
cost_saved: Cost saved by the cache hit
"""
self.cache_hits += 1
self.cache_saved_cost += cost_saved
# Update Prometheus metrics if enabled
if self.enable_prometheus:
self.prom_cache_hits.inc()
self.prom_cache_saved_cost.inc(cost_saved)
def record_cache_miss(self):
"""Record a cache miss."""
self.cache_misses += 1
# Update Prometheus metrics if enabled
if self.enable_prometheus:
self.prom_cache_misses.inc()
def get_stats(self) -> Dict[str, Any]:
"""Get current metrics.
Returns:
Dictionary of metrics
"""
# Calculate uptime
uptime = time.time() - self.start_time
# Calculate request rate (per minute)
request_rate = self.requests_total / (uptime / 60) if uptime > 0 else 0
# Calculate average response time
avg_response_time = sum(self.request_times) / len(self.request_times) if self.request_times else 0
# Calculate cache hit ratio
cache_total = self.cache_hits + self.cache_misses
cache_hit_ratio = self.cache_hits / cache_total if cache_total > 0 else 0
# Get top providers by usage
top_providers = sorted(
[(provider, tokens) for provider, tokens in self.provider_tokens.items()],
key=lambda x: x[1],
reverse=True
)[:5]
# Get top models by usage
top_models = sorted(
[(model, tokens) for model, tokens in self.model_tokens.items()],
key=lambda x: x[1],
reverse=True
)[:5]
# Get daily token usage for last 7 days
today = datetime.now().strftime("%Y-%m-%d")
daily_usage = []
for i in range(7):
day = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
daily_usage.append((
day,
self.daily_tokens.get(day, 0),
self.daily_costs.get(day, 0.0),
self.daily_requests.get(day, 0)
))
# Compile stats
return {
"general": {
"uptime": uptime,
"uptime_human": self._format_duration(uptime),
"requests_total": self.requests_total,
"tokens_total": self.tokens_total,
"cost_total": self.cost_total,
"request_rate": request_rate,
"avg_response_time": avg_response_time,
"errors_total": self.errors_total,
"error_rate": self.errors_total / self.requests_total if self.requests_total > 0 else 0,
},
"providers": {
provider: {
"requests": count,
"tokens": self.provider_tokens.get(provider, 0),
"cost": self.provider_costs.get(provider, 0.0),
"avg_response_time": sum(self.request_times_by_provider.get(provider, [])) / len(self.request_times_by_provider.get(provider, [])) if self.request_times_by_provider.get(provider, []) else 0,
"errors": self.errors_by_provider.get(provider, 0),
}
for provider, count in self.provider_requests.items()
},
"models": {
model: {
"requests": count,
"tokens": self.model_tokens.get(model, 0),
"cost": self.model_costs.get(model, 0.0),
"avg_response_time": sum(self.request_times_by_model.get(model, [])) / len(self.request_times_by_model.get(model, [])) if self.request_times_by_model.get(model, []) else 0,
"errors": self.errors_by_model.get(model, 0),
}
for model, count in self.model_requests.items()
},
"cache": {
"hits": self.cache_hits,
"misses": self.cache_misses,
"hit_ratio": cache_hit_ratio,
"saved_cost": self.cache_saved_cost,
},
"top_providers": [
{
"provider": provider,
"tokens": tokens,
"percentage": tokens / self.tokens_total if self.tokens_total > 0 else 0,
}
for provider, tokens in top_providers
],
"top_models": [
{
"model": model,
"tokens": tokens,
"percentage": tokens / self.tokens_total if self.tokens_total > 0 else 0,
}
for model, tokens in top_models
],
"daily_usage": [
{
"date": date,
"tokens": tokens,
"cost": cost,
"requests": requests
}
for date, tokens, cost, requests in daily_usage
],
"today": {
"tokens": self.daily_tokens.get(today, 0),
"cost": self.daily_costs.get(today, 0.0),
"requests": self.daily_requests.get(today, 0)
}
}
def _format_duration(self, seconds: float) -> str:
"""Format duration in a human-readable format.
Args:
seconds: Duration in seconds
Returns:
Formatted duration
"""
if seconds < 60:
return f"{seconds:.1f} seconds"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f} minutes"
elif seconds < 86400:
hours = seconds / 3600
return f"{hours:.1f} hours"
else:
days = seconds / 86400
return f"{days:.1f} days"
def reset(self):
"""Reset all metrics."""
self._reset_metrics()
logger.info(
"Metrics reset",
emoji_key="analytics"
)
# Singleton instance getter
def get_metrics_tracker(
metrics_dir: Optional[Union[str, Path]] = None,
enable_prometheus: bool = False,
reset_on_start: bool = False
) -> MetricsTracker:
"""Get the metrics tracker singleton instance.
Args:
metrics_dir: Directory for metrics storage
enable_prometheus: Whether to enable Prometheus metrics
reset_on_start: Whether to reset metrics on startup
Returns:
MetricsTracker singleton instance
"""
return MetricsTracker(metrics_dir, enable_prometheus, reset_on_start)
```
--------------------------------------------------------------------------------
/examples/web_automation_instruction_packs.py:
--------------------------------------------------------------------------------
```python
# examples/web_automation_instruction_packs.py
"""
Instruction Packs for the Abstract `find_and_download_pdfs` Tool.
This file contains pre-defined instruction dictionaries that configure the behavior
of the generic `find_and_download_pdfs` tool for specific tasks.
"""
# --- Instruction Pack 1: Academic Papers (arXiv) ---
ACADEMIC_PAPER_INSTRUCTIONS = {
"search_phase": {
"search_query_template": "site:arxiv.org {topic} filetype:pdf",
"target_site_identification_prompt": """Analyze the search results summary for "{search_term}".
Identify the URL that is most likely a direct PDF link (ending in .pdf) or a specific relevant paper's abstract page on arXiv.org. PRIORITIZE direct PDF links over abstract pages. Always choose direct PDF links when available.
Search Results Summary:
---
{search_results_summary}
---
Respond ONLY with a valid JSON object: {{"target_url": "URL_or_null"}}.""",
"search_engine": "google", # Changed from duckduckgo to google which is more stable
"num_search_results_per_query": 8 # Increased number of results to find more PDFs
},
"exploration_phase": {
"exploration_goal_prompt": """You are an AI assistant tasked with finding and downloading PDF research papers from arXiv related to '{topic}'.
Your goal is to find PDF download links for relevant papers. Look for links labeled 'PDF', 'Download PDF', or links ending in .pdf within the page.
IMPORTANT: When you find a PDF link, you MUST use the "download_pdf" action with the full PDF URL to download it. Do not try to summarize the content - downloading the PDF is the primary goal.
Please follow these guidelines:
1. If the current page is a direct PDF or has PDF in the URL, use "download_pdf" action immediately
2. If you're on an abstract page, look for PDF links and use "download_pdf" action
3. If you're on a search results page, look for relevant paper links and click them
4. Use "scroll" action to see more results if needed
5. If you can't find relevant papers after multiple steps, use "goal_impossible"
6. If you successfully download at least one PDF, use "goal_achieved"
Remember: Your PRIMARY goal is to DOWNLOAD PDFs using the "download_pdf" action, not just navigate or summarize.""",
"navigation_keywords": ["abstract", "pdf", "view", "download", "related", "version", "year", "author", "subject", "search"],
"pdf_keywords": ["pdf", "download pdf", "download"],
"pdf_url_patterns": [r'/pdf/\d+\.\d+v?\d*', r'\.pdf$'], # Updated arXiv pattern
"max_steps": 10,
"valid_actions": ["click", "scroll", "download_pdf", "go_back", "goal_achieved", "goal_impossible"]
},
"download_phase": {
"metadata_extraction_prompt": """Based on the context below (URL, surrounding text/elements, often from an arXiv abstract page) for the PDF link below, extract the paper's TITLE, the primary AUTHOR's last name, and the YEAR of publication (YYYY).
Context:
---
{context}
---
Respond ONLY with a valid JSON object: {{"title": "...", "author_lastname": "...", "year": "YYYY"}}. Use "Unknown" or the current year if a value cannot be found.""",
"filename_template": "{year}_{author_lastname}_{topic}_{title}",
"required_metadata": ["title", "author_lastname", "year"]
}
}
# --- Instruction Pack 2: Government Reports ---
GOVERNMENT_REPORT_INSTRUCTIONS = {
"search_phase": {
"search_query_template": '"{topic}" official report site:gov.uk OR site:*.gov OR site:*.gov.au OR site:*.gc.ca', # Added more gov domains
"target_site_identification_prompt": """Analyze the search results summary for "{search_term}".
Identify the single most promising URL pointing to an official government webpage (e.g., *.gov.uk, *.gov, *.gc.ca, *.gov.au) or official agency site likely hosting the definitive report or publication page for the topic. Avoid news articles or commentary sites.
Search Results Summary:
---
{search_results_summary}
---
Respond ONLY with a valid JSON object: {{"target_url": "URL_or_null"}}.""",
"search_engine": "google"
},
"exploration_phase": {
"exploration_goal_prompt": "Explore the official government website related to '{topic}' to find and download the primary official report(s) or policy document(s) in PDF format.",
"navigation_keywords": ["publication", "report", "document", "research", "policy", "consultation", "guidance", "download", "library", "archive", "statistics", "data"],
"pdf_keywords": ["pdf", "download", "full report", "final report", "publication", "document", "read", "view", "annex", "appendix", "data"],
"pdf_url_patterns": [r'\.pdf(\?|$)', r'/assets/', r'/download/', r'/file/', r'/media/'],
"max_steps": 15
},
"download_phase": {
"metadata_extraction_prompt": """Based on the context (URL, surrounding text/elements) for the government document PDF link below, determine the PUBLICATION_DATE (format YYYY-MM-DD, or YYYY-MM, or just YYYY if only year is available) and a concise DOCUMENT_TYPE (e.g., 'Policy Paper', 'Impact Assessment', 'Consultation Response', 'Research Report', 'Official Guidance', 'Statistics Release').
Context:
---
{context}
---
Respond ONLY with a valid JSON object: {{"date": "...", "document_type": "..."}}. Use best guess (e.g., {datetime.now().strftime('%Y-%m-%d')}, 'Report') if a value cannot be reliably determined.""",
"filename_template": "{date}_GovReport_{topic}_{document_type}",
"required_metadata": ["date", "document_type"]
}
}
# --- Instruction Pack 3: Product Manuals/Datasheets ---
PRODUCT_MANUAL_INSTRUCTIONS = {
"search_phase": {
"search_query_template": "{topic} official manual OR datasheet OR support download PDF", # Broadened query slightly
"target_site_identification_prompt": """Analyze the search results summary for "{search_term}".
Identify the single URL most likely leading to the official manufacturer's product support, downloads, or manual page for the specified product. Prioritize the manufacturer's own domain. Avoid retailer sites (like Amazon, BestBuy) or general review sites.
Search Results Summary:
---
{search_results_summary}
---
Respond ONLY with a valid JSON object: {{"target_url": "URL_or_null"}}.""",
"search_engine": "google"
},
"exploration_phase": {
"exploration_goal_prompt": "Explore the manufacturer's website for the product '{topic}' to find the primary user manual, user guide, or technical datasheet available as a PDF download. Look in 'Support', 'Downloads', or 'Documentation' sections.",
"navigation_keywords": ["support", "download", "manual", "documentation", "guide", "specification", "datasheet", "product", "resource", "driver", "software", "firmware"],
"pdf_keywords": ["manual", "guide", "datasheet", "specification", "pdf", "download", "instructions", "service manual", "user guide"],
"pdf_url_patterns": [r'manual.*\.pdf', r'datasheet.*\.pdf', r'\.pdf(\?|$)', r'guide.*\.pdf', r'spec.*\.pdf'],
"max_steps": 12
},
"download_phase": {
"metadata_extraction_prompt": """Based on the context (URL, link text, surrounding elements) for the PDF link below, determine the DOCUMENT_TYPE (e.g., 'User Manual', 'Quick Start Guide', 'Datasheet', 'Specifications', 'Service Manual') and LANGUAGE (e.g., 'EN', 'DE', 'FR', 'Multi', if obvious, otherwise default to 'EN').
Context:
---
{context}
---
Respond ONLY with a valid JSON object: {{"document_type": "...", "language": "..."}}. Use 'Manual' and 'EN' as defaults if unsure.""",
"filename_template": "{topic}_{document_type}_{language}",
"required_metadata": ["document_type"] # Language is helpful but not strictly required
}
}
# --- Instruction Pack 4: Finding Specific Legal Documents (Example - Requires careful prompting) ---
# NOTE: Legal document searches can be complex due to jurisdiction, specific courts, etc.
# This is a simplified example.
LEGAL_DOCUMENT_INSTRUCTIONS = {
"search_phase": {
"search_query_template": '"{topic}" court filing OR legal document OR case text PDF',
"target_site_identification_prompt": """Analyze the search results summary for "{search_term}".
Identify a URL likely pointing to an official court website (e.g., *.uscourts.gov), legal repository (like CourtListener, RECAP), or official government archive hosting the specific legal case document or docket. Avoid news summaries or law firm analyses unless they directly link to the official document PDF.
Search Results Summary:
---
{search_results_summary}
---
Respond ONLY with JSON: {{"target_url": "URL_or_null"}}.""",
"search_engine": "google"
},
"exploration_phase": {
"exploration_goal_prompt": "Explore the legal resource website for '{topic}'. Identify and download the relevant court filing, judgment, or legal document PDF.",
"navigation_keywords": ["document", "filing", "opinion", "judgment", "docket", "case", "pdf", "download", "view", "attachment", "exhibit"],
"pdf_keywords": ["document", "filing", "opinion", "judgment", "pdf", "download", "attachment", "exhibit", "order"],
"pdf_url_patterns": [r'\.pdf(\?|$)', r'/downloadDoc', r'/viewDoc'],
"max_steps": 15
},
"download_phase": {
"metadata_extraction_prompt": """Based on the context for the legal document PDF link below, extract the approximate FILING_DATE (YYYY-MM-DD or YYYY) and a short DOCUMENT_CATEGORY (e.g., 'Complaint', 'Motion', 'Opinion', 'Judgment', 'Order', 'Exhibit').
Context:
---
{context}
---
Respond ONLY with JSON: {{"date": "...", "document_category": "..."}}. Use current date or 'Filing' if unknown.""",
"filename_template": "{date}_{topic}_{document_category}",
"required_metadata": ["date", "document_category"]
}
}
# ____________________________________________________________________________________________________________________________________________________________________________________________
# --- Instruction Pack 5: Simple Search Summary ---
SIMPLE_SEARCH_SUMMARY_INSTRUCTIONS = {
"search_params": {
"engines": ["google", "duckduckgo"], # Which engines to use
"num_results_per_engine": 3 # How many results to fetch from each
},
# Prompt for the LLM that will summarize each page's content
"summarization_prompt": """Concisely summarize the key information from the following web page content, focusing on its relevance to the search query '{query}'. Output a brief 2-3 sentence summary only.
Page Content:
---
{page_content}
---
Concise Summary:""",
# Optional: Add filters if needed
# "url_filter_keywords": [], # e.g., ["blog", "news"] to only summarize blog/news
# "min_content_length_for_summary": 150 # e.g., skip very short pages
}
# --- Instruction Pack 6: Technical Search Summary ---
TECHNICAL_SEARCH_SUMMARY_INSTRUCTIONS = {
"search_params": {
"engines": ["google", "bing"], # Maybe Bing is better for some technical queries
"num_results_per_engine": 5 # Get more results for technical topics
},
"summarization_prompt": """Analyze the following web page content related to the technical search query '{query}'. Extract and summarize the core technical concepts, definitions, or conclusions presented. Focus on accuracy and specific details if available. Keep the summary to 3-4 sentences.
Page Content:
---
{page_content}
---
Technical Summary:""",
"url_filter_keywords": ["docs", "tutorial", "research", "arxiv", "github", "developer"], # Prioritize technical sources
"min_content_length_for_summary": 300 # Expect longer content
}
# ____________________________________________________________________________________________________________________________________________________________________________________________
# --- Instruction Pack 7: Extract Job Posting Details ---
JOB_POSTING_EXTRACTION_INSTRUCTIONS = {
"data_source": {
"source_type": "dynamic_crawl", # Find URLs by crawling
"crawl_config": {
"start_url": "https://www.google.com/search?q=software+engineer+jobs+remote", # Example search
"list_item_selector": "a[href*='/jobs/']", # Adjust selector based on actual job board/search results
"next_page_selector": "#pnnext", # Google's next page link ID (may change)
"max_pages_to_crawl": 3, # Limit crawl depth
"max_urls_limit": 20 # Limit total jobs to process
}
# Alternatively, provide a list directly:
# "source_type": "list",
# "urls": ["https://example-job-board.com/job/123", "https://example-job-board.com/job/456"]
},
"extraction_details": {
# Prompt asking LLM to extract specific fields
"schema_or_prompt": """From the provided job posting web page content, extract the following details:
- job_title: The official title of the position.
- company_name: The name of the hiring company.
- location: The primary location(s) listed (e.g., "Remote", "New York, NY").
- salary_range: Any mentioned salary range or compensation details (e.g., "$120k - $150k", "Competitive").
- key_skills: A list of the top 3-5 required technical skills or qualifications mentioned.
Web Page Content Context:
---
{page_content}
---
Respond ONLY with a valid JSON object containing these keys. If a field is not found, use null or an empty list for key_skills.""",
"extraction_llm_model": "openai/gpt-4.1-mini" # Specify model for extraction
},
"output_config": {
"format": "json_list", # Output as a list of JSON objects
"error_handling": "include_error" # Include URLs that failed in the errors dict
}
}
# --- Instruction Pack 8: Extract Product Details (Schema Example) ---
ECOMMERCE_PRODUCT_EXTRACTION_INSTRUCTIONS = {
"data_source": {
"source_type": "list",
# URLs would be provided by the calling code/agent based on what products to check
"urls": [
# Example URLs (replace with actual ones for testing)
# "https://www.amazon.com/dp/B08H75RTZ8/", # Example Kindle Paperwhite
# "https://www.bestbuy.com/site/sony-wh1000xm5-wireless-noise-cancelling-over-the-ear-headphones-black/6505725.p?skuId=6505725"
]
},
"extraction_details": {
# Using a JSON schema to define desired output
"schema_or_prompt": {
"type": "object",
"properties": {
"product_name": {"type": "string", "description": "The main name or title of the product."},
"price": {"type": "string", "description": "The current listed price, including currency symbol (e.g., '$149.99')."},
"rating": {"type": "number", "description": "The average customer rating (e.g., 4.7). Null if not found."},
"num_reviews": {"type": "integer", "description": "The total number of customer reviews. Null if not found."},
"availability": {"type": "string", "description": "Stock status (e.g., 'In Stock', 'Out of Stock', 'Available for Pre-order')."}
},
"required": ["product_name", "price", "availability"]
},
"extraction_llm_model": "openai/gpt-4.1-mini" # Use a capable model
},
"output_config": {
"format": "csv_string", # Output as CSV text
"error_handling": "skip" # Skip pages that fail
}
}
# ____________________________________________________________________________________________________________________________________________________________________________________________
# --- Instruction Pack 9: Login and Check Order Status ---
ORDER_STATUS_WORKFLOW_INSTRUCTIONS = {
"start_url": "https://the-internet.herokuapp.com/login", # Example login page
"workflow_goal_prompt": "Log in using the provided 'username' and 'password', navigate to the secure area, and read the text content of the success message banner.",
"available_actions": ["type", "click", "read_value", "finish_success", "finish_failure"],
"llm_model": "openai/gpt-4.1-mini", # Model for guidance
"max_steps": 8,
"input_data_mapping": { # Maps abstract names to keys in input_data passed to the tool
"user": "username",
"pass": "password",
},
"element_finding_hints": ["username field", "password field", "login button", "success message", "logout link"],
# success_condition_prompt could be added for more complex checks
# step_prompts are likely not needed for this simple login example
}
# --- Instruction Pack 10: Submit a Simple Contact Form ---
CONTACT_FORM_WORKFLOW_INSTRUCTIONS = {
"start_url": "https://www.selenium.dev/selenium/web/web-form.html", # Example form page
"workflow_goal_prompt": "Fill out the web form using the provided 'name', 'email', and 'message'. Then click the submit button and confirm submission by checking if the page title changes to 'Web form processed'.",
"available_actions": ["type", "click", "finish_success", "finish_failure"],
"llm_model": "openai/gpt-4.1-mini",
"max_steps": 10,
"input_data_mapping": {
"contact_name": "name",
"contact_email": "email", # Assuming input_data has key "email"
"contact_message": "message"
},
"element_finding_hints": ["text input field (my-text)", "password input (my-password)", "textarea (my-textarea)", "submit button"],
# This workflow implicitly checks success via title change, but an explicit prompt could be added:
# "success_condition_prompt": "Does the current page title indicate the form was processed successfully (e.g., contains 'processed')?"
}
# ____________________________________________________________________________________________________________________________________________________________________________________________
# --- Instruction Pack 11: Monitor Product Price and Availability ---
PRODUCT_MONITORING_INSTRUCTIONS = {
"monitoring_targets": [
{
"url": "https://www.bestbuy.com/site/sony-wh1000xm5-wireless-noise-cancelling-over-the-ear-headphones-black/6505725.p?skuId=6505725", # Example URL
"data_points": [
{
"name": "price",
"identifier": ".priceView-hero-price span[aria-hidden='true']", # CSS selector for price element (INSPECT CAREFULLY!)
"extraction_method": "selector",
"condition": "changed" # Alert if price changes from previous_values
},
{
"name": "availability",
"identifier": "button[data-button-state='ADD_TO_CART']", # Selector for Add to Cart button
"extraction_method": "selector", # We just check existence/text
# Condition check via LLM
"condition": "llm_eval",
"llm_condition_prompt": "Based on the extracted text/presence of the element ('Current Value'), is the product currently available for purchase? Respond {\"condition_met\": true} if available, {\"condition_met\": false} otherwise."
# If extraction returns text like "Add to Cart", LLM should say true. If "Sold Out" or None, should say false.
},
{
"name": "product_title", # Example LLM extraction
"identifier": "Extract the main product title from the page content.",
"extraction_method": "llm",
"condition": "contains", # Check if title contains expected keyword
"condition_value": "WH-1000XM5"
}
]
},
# Add more target product URLs here...
# { "url": "https://...", "data_points": [...] }
],
"llm_config": {
"model": "openai/gpt-4.1-mini" # Model for LLM extraction/evaluation
},
"concurrency": {
"max_concurrent_pages": 2 # Limit concurrency for scraping politeness
},
"browser_options": {
"headless": True
}
}
# --- Instruction Pack 12: Monitor Website Content Section ---
WEBSITE_SECTION_MONITORING_INSTRUCTIONS = {
"monitoring_targets": [
{
"url": "https://news.google.com/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pKVGlnQVAB?hl=en-US&gl=US&ceid=US%3Aen", # Example Google News AI section
"data_points": [
{
"name": "top_headline_text",
"identifier": "h3 > a.gPFEn", # Selector for top headline link text (INSPECT CAREFULLY!)
"extraction_method": "selector",
"condition": "changed" # Alert if the top headline changes
},
{
"name": "second_headline_text",
"identifier": "article:nth-of-type(2) h3 > a.gPFEn", # Selector for second headline
"extraction_method": "selector",
"condition": "changed"
}
]
}
],
"llm_config": {
"model": "openai/gpt-4.1-mini" # Not strictly needed if only using selectors
},
"concurrency": { "max_concurrent_pages": 3 },
"browser_options": { "headless": True }
}
# ____________________________________________________________________________________________________________________________________________________________________________________________
# --- Instruction Pack 13: Market Trend Summary ---
MARKET_TREND_RESEARCH_INSTRUCTIONS = {
"research_goal_prompt": "Generate a brief summary of the current market trends for {topic}, based on recent news articles and analysis reports.",
"search_phase": {
"search_queries": [
"{topic} market trends 2024",
"latest news {topic} industry",
"{topic} market analysis report"
],
"search_engine": "google",
"num_search_results_per_query": 5 # Get a few results per query
},
"site_selection_phase": {
# Prompt to select relevant news/analysis sites
"selection_prompt": """From the search results for '{topic}', select up to {max_urls} URLs that appear to be recent (within the last year if possible) news articles, market analysis reports, or reputable industry blogs discussing market trends. Avoid forum discussions, product pages, or very old content.
Search Results Context:
---
{search_results_context}
---
Respond ONLY with JSON: {{"selected_urls": ["url1", ...]}}""",
"max_sites_to_visit": 5 # Limit how many articles are processed
},
"extraction_phase": {
# Prompt to extract key points related to trends
"extraction_prompt_or_schema": """Extract the main points, key findings, or trend descriptions related to '{topic}' from the provided web page content. Focus on statements about market direction, growth, challenges, or notable events. Output as a JSON object with a key "key_findings" containing a list of strings (each string is a finding/point).
Web Page Content Context:
---
{page_content}
---
Extracted JSON Data:""",
"extraction_llm_model": "openai/gpt-4.1-mini" # Model for extraction
},
"synthesis_phase": {
# Prompt to synthesize the findings into a paragraph
"synthesis_prompt": """Based on the extracted key findings below regarding '{topic}', write a concise paragraph (3-5 sentences) summarizing the major market trends discussed.
Extracted Information Context:
---
{extracted_information_context}
---
Synthesized Market Trend Summary:""",
"synthesis_llm_model": "openai/gpt-4.1-mini", # Model for synthesis
"report_format_description": "A single paragraph summarizing market trends."
}
}
# --- Instruction Pack 14: Competitive Analysis Snippets ---
COMPETITIVE_ANALYSIS_INSTRUCTIONS = {
"research_goal_prompt": "Gather brief summaries of direct competitors mentioned for the product/service '{topic}' from recent reviews or comparison articles.",
"search_phase": {
"search_queries": [
"{topic} vs competitors",
"{topic} alternatives review",
"comparison {topic}"
],
"search_engine": "google",
"num_search_results_per_query": 8
},
"site_selection_phase": {
"selection_prompt": """From the search results for '{topic}', select up to {max_urls} URLs that seem to be review sites, comparison articles, or tech news discussing competitors or alternatives to {topic}. Prioritize recent results if possible.
Search Results Context:
---
{search_results_context}
---
Respond ONLY with JSON: {{"selected_urls": ["url1", ...]}}""",
"max_sites_to_visit": 4
},
"extraction_phase": {
"extraction_prompt_or_schema": """Identify any direct competitors to '{topic}' mentioned in the provided web page content. For each competitor found, extract its NAME and a brief (1-sentence) summary of how it's compared to {topic} or its key differentiator mentioned.
Web Page Content Context:
---
{page_content}
---
Respond ONLY with a valid JSON object with a key "competitors", where the value is a list of objects, each like {"name": "...", "comparison_summary": "..."}. If no competitors are mentioned, return {"competitors": []}.""",
"extraction_llm_model": "openai/gpt-4.1-mini"
},
"synthesis_phase": {
"synthesis_prompt": """Consolidate the extracted competitor information related to '{topic}' into a markdown list. For each competitor found across the sources, list its name and a bullet point summary of the comparison points mentioned. Group findings by competitor name.
Extracted Information Context:
---
{extracted_information_context}
---
Consolidated Competitor Markdown List:""",
"synthesis_llm_model": "openai/gpt-4.1-mini",
"report_format_description": "A markdown list summarizing mentioned competitors and comparison points."
}
}
```
--------------------------------------------------------------------------------
/examples/workflow_delegation_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Workflow delegation example using Ultimate MCP Server."""
import asyncio
import json
import sys
import time
from collections import namedtuple # Import namedtuple
from pathlib import Path
from typing import Any, Dict, List, Optional
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
from fastmcp import FastMCP
from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.exceptions import ToolExecutionError
from ultimate_mcp_server.utils import get_logger, process_mcp_result
# --- Add Display Utils Import ---
from ultimate_mcp_server.utils.display import CostTracker, _display_stats # Import CostTracker
# --- Add Rich Imports ---
from ultimate_mcp_server.utils.logging.console import console
# --- Import Tools Needed ---
# Import tool functions directly if not registering them all
# from ultimate_mcp_server.tools.optimization import recommend_model, execute_optimized_workflow # No, call via MCP
# from ultimate_mcp_server.tools.completion import generate_completion # Call via MCP
# -------------------------
# Initialize logger
logger = get_logger("example.workflow_delegation")
# Create a simple structure for cost tracking from dict
TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])
# Initialize FastMCP server
mcp = FastMCP("Workflow Delegation Demo")
# Mock provider initialization function (replace with actual if needed)
async def initialize_providers():
logger.info("Initializing required providers...", emoji_key="provider")
# Initialize gateway to let it handle provider initialization
gateway = Gateway("workflow-delegation-demo", register_tools=False)
await gateway._initialize_providers()
# Check if we have the necessary providers initialized
required_providers = ["openai", "anthropic", "gemini"]
missing_providers = []
for provider_name in required_providers:
try:
provider = await get_provider(provider_name)
if provider:
logger.info(f"Provider {provider_name} is available", emoji_key="success")
else:
missing_providers.append(provider_name)
except Exception:
missing_providers.append(provider_name)
if missing_providers:
logger.warning(f"Missing providers: {', '.join(missing_providers)}. Some demos might fail.", emoji_key="warning")
console.print(f"[yellow]Warning:[/yellow] Missing providers: {', '.join(missing_providers)}")
else:
logger.info("All required providers are available", emoji_key="success")
# Keep execute_workflow as a locally defined tool demonstrating the concept
@mcp.tool()
async def execute_workflow(
workflow_steps: List[Dict[str, Any]],
initial_input: Optional[str] = None, # Make initial_input optional
max_concurrency: int = 1, # Keep concurrency, though sequential for demo
ctx = None # Keep ctx for potential use by called tools
) -> Dict[str, Any]:
"""Execute a multi-step workflow by calling registered project tools."""
start_time = time.time()
total_cost = 0.0
step_results: Dict[str, Any] = {} # Store results keyed by step_id
# Mapping from simple operation names to actual tool names
operation_to_tool_map = {
"summarize": "summarize_document",
"extract_entities": "extract_entities",
"generate_questions": "generate_qa_pairs", # Correct tool name
"chunk": "chunk_document",
# Add mappings for other tools as needed
"completion": "generate_completion",
"chat": "chat_completion",
"retrieve": "retrieve_context",
"rag_generate": "generate_with_rag",
}
current_input_value = initial_input
logger.info(f"Starting workflow execution with {len(workflow_steps)} steps.")
for i, step in enumerate(workflow_steps):
step_id = step.get("id")
operation = step.get("operation")
tool_name = operation_to_tool_map.get(operation)
parameters = step.get("parameters", {}).copy() # Get parameters
input_from_step = step.get("input_from") # ID of previous step for input
output_as = step.get("output_as", step_id) # Key to store output under
if not step_id:
raise ValueError(f"Workflow step {i} is missing required 'id' key.")
if not tool_name:
raise ValueError(f"Unsupported operation '{operation}' in workflow step '{step_id}'. Mapped tool name not found.")
logger.info(f"Executing workflow step {i+1}/{len(workflow_steps)}: ID='{step_id}', Tool='{tool_name}'")
# Resolve input: Use previous step output or initial input
step_input_data = None
if input_from_step:
if input_from_step not in step_results:
raise ValueError(f"Input for step '{step_id}' requires output from '{input_from_step}', which has not run or failed.")
# Decide which part of the previous result to use
# This needs a more robust mechanism (e.g., specifying the key)
# For now, assume the primary output is needed (e.g., 'text', 'summary', 'chunks', etc.)
prev_result = step_results[input_from_step]
# Simple logic: look for common output keys
if isinstance(prev_result, dict):
if 'summary' in prev_result:
step_input_data = prev_result['summary']
elif 'text' in prev_result:
step_input_data = prev_result['text']
elif 'chunks' in prev_result:
step_input_data = prev_result['chunks'] # May need specific handling
elif 'result' in prev_result:
step_input_data = prev_result['result'] # From DocumentResponse
else:
step_input_data = prev_result # Pass the whole dict?
else:
step_input_data = prev_result # Pass raw output
logger.debug(f"Using output from step '{input_from_step}' as input.")
else:
step_input_data = current_input_value # Use input from previous step or initial
logger.debug("Using input from previous step/initial input.")
# --- Construct parameters for the target tool ---
# This needs mapping based on the target tool's expected signature
# Example: If tool is 'summarize_document', map step_input_data to 'document' param
if tool_name == "summarize_document" and isinstance(step_input_data, str):
parameters["document"] = step_input_data
elif tool_name == "extract_entities" and isinstance(step_input_data, str):
parameters["document"] = step_input_data
# Ensure entity_types is a list
if "entity_types" not in parameters or not isinstance(parameters["entity_types"], list):
parameters["entity_types"] = ["organization", "person", "concept"] # Default
elif tool_name == "generate_qa_pairs" and isinstance(step_input_data, str):
parameters["document"] = step_input_data
parameters["num_pairs"] = parameters.get("num_questions") or 5 # Map parameter name
elif tool_name in ["generate_completion", "chat_completion"] and isinstance(step_input_data, str):
if "prompt" not in parameters:
parameters["prompt"] = step_input_data # Assume input is the prompt if not specified
# Add more mappings as needed for other tools...
else:
# Fallback: pass the input data under a generic key if not handled?
# Or maybe the tool parameter should explicitly name the input field?
# For now, we assume the tool can handle the input directly if not mapped.
# This requires careful workflow definition.
# Maybe add 'input_arg_name' to workflow step definition?
logger.warning(f"Input mapping for tool '{tool_name}' not explicitly defined. Passing raw input.")
# Decide how to pass step_input_data if no specific mapping exists
# Example: parameters['input_data'] = step_input_data
# --- Call the actual tool via MCP ---
try:
logger.debug(f"Calling tool '{tool_name}' with params: {parameters}")
tool_result = await mcp.call_tool(tool_name, parameters)
# Process result to handle potential list format from MCP
step_output = process_mcp_result(tool_result)
logger.debug(f"Tool '{tool_name}' returned: {step_output}")
if isinstance(step_output, dict) and step_output.get("error"):
raise ToolExecutionError(f"Tool '{tool_name}' failed: {step_output['error']}")
# Store the successful result
step_results[output_as] = step_output
# Update current_input_value for the next step (assuming primary output is desired)
# This logic might need refinement based on tool outputs
if isinstance(step_output, dict):
current_input_value = step_output.get("text") or step_output.get("summary") or step_output.get("result") or step_output
else:
current_input_value = step_output
# Accumulate cost if available
if isinstance(step_output, dict) and "cost" in step_output:
total_cost += float(step_output["cost"])
except Exception as e:
logger.error(f"Error executing step '{step_id}' (Tool: {tool_name}): {e}", exc_info=True)
# Propagate exception to fail the workflow
raise ToolExecutionError(f"Workflow failed at step '{step_id}': {e}") from e
# Workflow completed successfully
processing_time = time.time() - start_time
logger.success(f"Workflow completed successfully in {processing_time:.2f}s")
return {
"outputs": step_results,
"processing_time": processing_time,
"total_cost": total_cost,
"success": True # Indicate overall success
}
# Enhanced display function for workflow demos
def display_workflow_result(title: str, result: Any):
"""Display workflow result with consistent formatting."""
console.print(Rule(f"[bold blue]{escape(title)}[/bold blue]"))
# Process result to handle list or dict format
result = process_mcp_result(result)
# Display outputs if present
if "outputs" in result and result["outputs"]:
for output_name, output_text in result["outputs"].items():
console.print(Panel(
escape(str(output_text).strip()),
title=f"[bold magenta]Output: {escape(output_name)}[/bold magenta]",
border_style="magenta",
expand=False
))
elif "text" in result:
# Display single text output if there's no outputs dictionary
console.print(Panel(
escape(result["text"].strip()),
title="[bold magenta]Result[/bold magenta]",
border_style="magenta",
expand=False
))
# Display execution stats
_display_stats(result, console)
# Enhanced display function for task analysis
def display_task_analysis(title: str, result: Any):
"""Display task analysis result with consistent formatting."""
console.print(Rule(f"[bold blue]{escape(title)}[/bold blue]"))
# Process result to handle list or dict format
result = process_mcp_result(result)
# Display task type and features
analysis_table = Table(box=box.SIMPLE, show_header=False)
analysis_table.add_column("Metric", style="cyan")
analysis_table.add_column("Value", style="white")
analysis_table.add_row("Task Type", escape(result.get("task_type", "N/A")))
analysis_table.add_row("Required Features", escape(str(result.get("required_features", []))))
console.print(analysis_table)
# Display features explanation
if "features_explanation" in result:
console.print(Panel(
escape(result["features_explanation"]),
title="[bold]Features Explanation[/bold]",
border_style="dim blue",
expand=False
))
# Display recommendations
if "recommendations" in result and result["recommendations"]:
rec_table = Table(title="[bold]Model Recommendations[/bold]", box=box.ROUNDED, show_header=True)
rec_table.add_column("Provider", style="magenta")
rec_table.add_column("Model", style="blue")
rec_table.add_column("Explanation", style="white")
for rec in result["recommendations"]:
rec_table.add_row(
escape(rec.get("provider", "N/A")),
escape(rec.get("model", "N/A")),
escape(rec.get("explanation", "N/A"))
)
console.print(rec_table)
# Display execution stats
_display_stats(result, console)
# Move _get_provider_for_model above run_delegate_task_demo
def _get_provider_for_model(model_name: str) -> str:
"""Helper to determine provider from model name."""
# Accept both 'provider/model' and legacy short names
model_lower = model_name.lower()
if '/' in model_lower:
# e.g., 'gemini/gemini-2.0-flash' or 'anthropic/claude-3-7-sonnet-20250219'
return model_lower.split('/')[0]
elif ':' in model_lower:
return model_lower.split(':')[0]
elif model_lower.startswith("gpt-"):
return Provider.OPENAI.value
elif model_lower.startswith("claude-"):
return Provider.ANTHROPIC.value
elif model_lower.startswith("gemini-"):
return Provider.GEMINI.value
elif model_lower.startswith("deepseek-"):
return "deepseek"
elif model_lower.startswith("grok-"):
return "grok"
elif model_lower.startswith("o1-") or model_lower.startswith("o3-"):
return Provider.OPENAI.value
else:
raise ValueError(f"Unknown model prefix for model: {model_name}")
# --- Demo Functions ---
async def run_analyze_task_demo():
"""Demonstrate the analyze_task tool."""
console.print(Rule("[bold blue]Analyze Task Demo[/bold blue]"))
logger.info("Running analyze_task demo...", emoji_key="start")
task_description = "Summarize the provided technical document about AI advancements and extract key entities."
console.print(f"[cyan]Task Description:[/cyan] {escape(task_description)}")
try:
# Call the real recommend_model tool
# Need to estimate input/output length for recommend_model
# Rough estimate for demo purposes
input_len_chars = len(task_description) * 10 # Assume task needs more context
output_len_chars = 200 # Estimate output size
result = await mcp.call_tool("recommend_model", {
"task_type": "summarization", # Added to match required argument
"expected_input_length": input_len_chars,
"expected_output_length": output_len_chars,
# Can add other recommend_model params like required_capabilities, max_cost
})
# Use enhanced display function
display_task_analysis("Analysis Results", result)
except Exception as e:
logger.error(f"Error in analyze_task demo: {e}", emoji_key="error", exc_info=True)
console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
console.print()
async def run_delegate_task_demo(tracker: CostTracker): # Add tracker
"""Demonstrate the delegate_task tool."""
console.print(Rule("[bold blue]Delegate Task Demo[/bold blue]"))
logger.info("Running task delegation demo (using recommend_model + completion)...", emoji_key="start")
task_description = "Generate a short marketing blurb for a new AI-powered writing assistant."
prompt = "Write a catchy, 2-sentence marketing blurb for 'AI Writer Pro', a tool that helps users write faster and better."
console.print(f"[cyan]Task Description:[/cyan] {escape(task_description)}")
console.print(f"[cyan]Prompt:[/cyan] {escape(prompt)}")
priorities = ["balanced", "cost", "quality"]
for priority in priorities:
console.print(Rule(f"[yellow]Delegating with Priority: {priority}[/yellow]"))
logger.info(f"Delegating task with priority: {priority}", emoji_key="processing")
try:
# 1. Get recommendation
recommendation_result_raw = await mcp.call_tool("recommend_model", {
"task_type": "creative_writing", # Infer task type
"expected_input_length": len(prompt),
"expected_output_length": 100, # Estimate blurb length
"priority": priority
})
recommendation_result = process_mcp_result(recommendation_result_raw)
if "error" in recommendation_result or not recommendation_result.get("recommendations"):
logger.error(f"Could not get recommendation for priority '{priority}'.")
console.print(f"[red]Error getting recommendation for '{priority}'.[/red]")
continue
# 2. Execute with recommended model
top_rec = recommendation_result["recommendations"][0]
rec_provider = _get_provider_for_model(top_rec["model"])
rec_model = top_rec["model"]
logger.info(f"Recommendation for '{priority}': Use {rec_provider}/{rec_model}")
# Call generate_completion tool
completion_result_raw = await mcp.call_tool("generate_completion", {
"prompt": prompt,
"provider": rec_provider,
"model": rec_model,
"max_tokens": 100
})
# Track cost if possible
completion_result = process_mcp_result(completion_result_raw)
if isinstance(completion_result, dict) and all(k in completion_result for k in ["cost", "provider", "model"]) and "tokens" in completion_result:
try:
trackable = TrackableResult(
cost=completion_result.get("cost", 0.0),
input_tokens=completion_result.get("tokens", {}).get("input", 0),
output_tokens=completion_result.get("tokens", {}).get("output", 0),
provider=completion_result.get("provider", rec_provider), # Use known provider as fallback
model=completion_result.get("model", rec_model), # Use known model as fallback
processing_time=completion_result.get("processing_time", 0.0)
)
tracker.add_call(trackable)
except Exception as track_err:
logger.warning(f"Could not track cost for delegated task ({priority}): {track_err}", exc_info=False)
# Display result
if "error" in completion_result:
logger.error(f"Completion failed for recommended model {rec_model}: {completion_result['error']}")
console.print(f"[red]Completion failed for {rec_model}: {completion_result['error']}[/red]")
else:
console.print(Panel(
escape(completion_result.get("text", "").strip()),
title=f"[bold green]Delegated Result ({escape(priority)} -> {escape(rec_model)})[/bold green]",
border_style="green",
expand=False
))
_display_stats(completion_result, console) # Display stats from completion
except Exception as e:
logger.error(f"Error delegating task with priority {priority}: {e}", emoji_key="error", exc_info=True)
console.print(f"[bold red]Error ({escape(priority)}):[/bold red] {escape(str(e))}")
console.print()
async def run_workflow_demo():
"""Demonstrate the execute_workflow tool."""
console.print(Rule("[bold blue]Execute Workflow Demo[/bold blue]"))
logger.info("Running execute_workflow demo...", emoji_key="start")
initial_text = """
Artificial intelligence (AI) is rapidly transforming various sectors.
In healthcare, AI algorithms analyze medical images with remarkable accuracy,
aiding radiologists like Dr. Evelyn Reed. Pharmaceutical companies, such as InnovatePharma,
use AI to accelerate drug discovery. Meanwhile, financial institutions leverage AI
for fraud detection and algorithmic trading. The field continues to evolve,
driven by researchers like Kenji Tanaka and advancements in machine learning.
"""
workflow = [
{
"id": "step1_summarize",
"operation": "summarize",
"provider": Provider.ANTHROPIC.value,
"model": "claude-3-5-haiku-20241022",
"parameters": {"format": "Provide a 2-sentence summary"},
"output_as": "summary"
},
{
"id": "step2_extract",
"operation": "extract_entities",
"provider": Provider.OPENAI.value,
"model": "gpt-4.1-mini",
"parameters": {"entity_types": ["person", "organization", "field"]},
"input_from": None, # Use initial_input
"output_as": "entities"
},
{
"id": "step3_questions",
"operation": "generate_questions",
"provider": Provider.GEMINI.value,
"model": "gemini-2.0-flash-lite",
"parameters": {"question_count": 2, "question_type": "insightful"},
"input_from": "summary", # Use output from step 1
"output_as": "questions"
}
]
console.print("[cyan]Initial Input Text:[/cyan]")
console.print(Panel(escape(initial_text.strip()), border_style="dim blue", expand=False))
console.print("[cyan]Workflow Definition:[/cyan]")
try:
workflow_json = json.dumps(workflow, indent=2, default=lambda o: o.value if isinstance(o, Provider) else str(o)) # Handle enum serialization
console.print(Panel(
Syntax(workflow_json, "json", theme="default", line_numbers=True, word_wrap=True),
title="[bold]Workflow Steps[/bold]",
border_style="blue",
expand=False
))
except Exception as json_err:
console.print(f"[red]Could not display workflow definition: {escape(str(json_err))}[/red]")
logger.info(f"Executing workflow with {len(workflow)} steps...", emoji_key="processing")
try:
result = await mcp.call_tool("execute_workflow", {
"workflow_steps": workflow,
"initial_input": initial_text
})
# Use enhanced display function
display_workflow_result("Workflow Results", result)
except Exception as e:
logger.error(f"Error executing workflow: {e}", emoji_key="error", exc_info=True)
console.print(f"[bold red]Workflow Execution Error:[/bold red] {escape(str(e))}")
console.print()
async def run_prompt_optimization_demo():
"""Demonstrate the optimize_prompt tool."""
console.print(Rule("[bold blue]Prompt Optimization Demo[/bold blue]"))
logger.info("Running optimize_prompt demo...", emoji_key="start")
original_prompt = "Tell me about Large Language Models."
target_model = "claude-3-opus-20240229"
optimization_type = "detailed_response" # e.g., conciseness, detailed_response, specific_format
console.print(f"[cyan]Original Prompt:[/cyan] {escape(original_prompt)}")
console.print(f"[cyan]Target Model:[/cyan] {escape(target_model)}")
console.print(f"[cyan]Optimization Type:[/cyan] {escape(optimization_type)}")
logger.info(f"Optimizing prompt for {target_model}...", emoji_key="processing")
try:
result = await mcp.call_tool("optimize_prompt", {
"prompt": original_prompt,
"target_model": target_model,
"optimization_type": optimization_type,
"provider": Provider.OPENAI.value # Using OpenAI to optimize for Claude
})
# Process result to handle list or dict format
result = process_mcp_result(result)
# Get optimized prompt text
optimized_prompt = result.get("optimized_prompt", "")
if not optimized_prompt and hasattr(result, 'text'):
optimized_prompt = result.text
console.print(Panel(
escape(optimized_prompt.strip() if optimized_prompt else "[red]Optimization failed[/red]"),
title="[bold green]Optimized Prompt[/bold green]",
border_style="green",
expand=False
))
# Display execution stats
_display_stats(result, console)
except Exception as e:
logger.error(f"Error optimizing prompt: {e}", emoji_key="error", exc_info=True)
console.print(f"[bold red]Prompt Optimization Error:[/bold red] {escape(str(e))}")
console.print()
async def main():
"""Run workflow delegation examples."""
console.print(Rule("[bold magenta]Workflow Delegation Demo Suite[/bold magenta]"))
tracker = CostTracker() # Instantiate tracker
try:
# Setup providers first
await initialize_providers() # Ensure keys are checked/providers ready
console.print(Rule("[bold magenta]Workflow & Delegation Demos Starting[/bold magenta]"))
# --- Register Necessary Tools ---
# Ensure tools called by demos are registered on the MCP instance
from ultimate_mcp_server.tools.completion import generate_completion
from ultimate_mcp_server.tools.document import (
extract_entities,
generate_qa_pairs,
summarize_document,
)
from ultimate_mcp_server.tools.optimization import recommend_model
mcp.tool()(recommend_model)
mcp.tool()(generate_completion)
mcp.tool()(summarize_document)
mcp.tool()(extract_entities)
mcp.tool()(generate_qa_pairs)
logger.info("Manually registered recommend_model, completion, and document tools.")
# --------------------------------
await run_analyze_task_demo()
# Pass tracker only to delegate demo
await run_delegate_task_demo(tracker)
await run_workflow_demo()
# await run_prompt_optimization_demo() # Add back if needed
# Display final cost summary
tracker.display_summary(console)
logger.success("Workflow Delegation Demo Finished Successfully!", emoji_key="complete")
console.print(Rule("[bold magenta]Workflow Delegation Demos Complete[/bold magenta]"))
return 0
except Exception as e:
logger.critical(f"Workflow demo failed: {str(e)}", emoji_key="critical", exc_info=True)
console.print(f"[bold red]Critical Demo Error:[/bold red] {escape(str(e))}")
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)
```