This is page 2 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/emojis.py:
--------------------------------------------------------------------------------
```python
"""
Emoji definitions for Gateway logging system.
This module contains constants for emojis used in logging to provide visual cues
about the type and severity of log messages.
"""
from typing import Dict
# Log level emojis
INFO = "ℹ️"
DEBUG = "🔍"
WARNING = "⚠️"
ERROR = "❌"
CRITICAL = "🚨"
SUCCESS = "✅"
TRACE = "📍"
# Status emojis
RUNNING = "🔄"
PENDING = "⏳"
COMPLETED = "🏁"
FAILED = "👎"
STARTING = "🚀"
STOPPING = "🛑"
RESTARTING = "🔁"
LOADING = "📥"
SAVING = "📤"
CANCELLED = "🚫"
TIMEOUT = "⏱️"
SKIPPED = "⏭️"
# Operation emojis (Adapt for ultimate)
REQUEST = "➡️" # Example
RESPONSE = "⬅️" # Example
PROCESS = "⚙️" # Example
CACHE_HIT = "✅" # Example
CACHE_MISS = "❌" # Example
AUTHENTICATE = "🔒" # Example
AUTHORIZE = "🔑" # Example
VALIDATE = "✔️"
CONNECT = "🔌"
DISCONNECT = "🔌"
UPDATE = "📝"
# Component emojis (Adapt for ultimate)
CORE = "⚙️"
PROVIDER = "☁️" # Example
ROUTER = "🔀" # Example
CACHE = "📦"
API = "🌐"
MCP = "📡" # Keep if relevant
UTILS = "🔧" # Example
# Tool emojis (Keep/remove/add as needed)
# RIPGREP = "🔍"
# AWK = "🔧"
# JQ = "🧰"
# SQLITE = "🗃️"
# Result emojis
FOUND = "🎯"
NOT_FOUND = "🔍"
PARTIAL = "◐"
UNKNOWN = "❓"
HIGH_CONFIDENCE = "🔒"
MEDIUM_CONFIDENCE = "🔓"
LOW_CONFIDENCE = "🚪"
# System emojis
STARTUP = "🔆"
SHUTDOWN = "🔅"
CONFIG = "⚙️"
ERROR = "⛔" # Distinct from level error
WARNING = "⚠️" # Same as level warning
DEPENDENCY = "🧱"
VERSION = "🏷️"
UPDATE_AVAILABLE = "🆕"
# User interaction emojis (Keep if relevant)
INPUT = "⌨️"
OUTPUT = "📺"
HELP = "❓"
HINT = "💡"
EXAMPLE = "📋"
QUESTION = "❓"
ANSWER = "💬"
# Time emojis
TIMING = "⏱️"
SCHEDULED = "📅"
DELAYED = "⏰"
OVERTIME = "⌛"
# Convenience mapping for log levels
LEVEL_EMOJIS: Dict[str, str] = {
"info": INFO,
"debug": DEBUG,
"warning": WARNING,
"error": ERROR,
"critical": CRITICAL,
"success": SUCCESS,
"trace": TRACE,
}
# Dictionary for mapping operation names to emojis
OPERATION_EMOJIS: Dict[str, str] = {
"request": REQUEST,
"response": RESPONSE,
"process": PROCESS,
"cache_hit": CACHE_HIT,
"cache_miss": CACHE_MISS,
"authenticate": AUTHENTICATE,
"authorize": AUTHORIZE,
"validate": VALIDATE,
"connect": CONNECT,
"disconnect": DISCONNECT,
"update": UPDATE,
# Add other common operations here
"startup": STARTUP,
"shutdown": SHUTDOWN,
"config": CONFIG,
}
# Dictionary for mapping component names to emojis
COMPONENT_EMOJIS: Dict[str, str] = {
"core": CORE,
"provider": PROVIDER,
"router": ROUTER,
"cache": CACHE,
"api": API,
"mcp": MCP,
"utils": UTILS,
# Add other components here
}
# Get emoji by name function for more dynamic access
def get_emoji(category: str, name: str) -> str:
"""Get an emoji by category and name.
Args:
category: The category of emoji (e.g., 'level', 'status', 'operation', 'component')
name: The name of the emoji within that category
Returns:
The emoji string or a default '?' if not found
"""
category = category.lower()
name_lower = name.lower()
if category == "level":
return LEVEL_EMOJIS.get(name_lower, "?")
elif category == "operation":
return OPERATION_EMOJIS.get(name_lower, "⚙️") # Default to generic gear
elif category == "component":
return COMPONENT_EMOJIS.get(name_lower, "🧩") # Default to puzzle piece
# Fallback for other categories or direct constant lookup
name_upper = name.upper()
globals_dict = globals()
if name_upper in globals_dict:
return globals_dict[name_upper]
# Default if nothing matches
return "❓"
```
--------------------------------------------------------------------------------
/examples/sample/text_classification_samples/support_tickets.txt:
--------------------------------------------------------------------------------
```
BUG REPORT:
The export to PDF feature is completely broken in version 3.2.1. When I click the export button, the application freezes for about 30 seconds, then crashes with error code 0x8007EE7. This happens consistently on every attempt. I've tried reinstalling the software and clearing the cache as suggested in the knowledge base, but the issue persists. This is blocking our team from delivering reports to clients. System specs: Windows 11 Pro, 16GB RAM, Intel i7-12700K.
FEATURE REQUEST:
It would be extremely helpful if you could add a bulk editing option for tags in the content management system. Currently, we have to edit tags one by one, which is very time-consuming when managing hundreds of articles. Ideally, we should be able to select multiple content pieces and apply or remove tags from all of them at once. This would save our editorial team hours of work each week and reduce the chance of tagging inconsistencies.
ACCOUNT ISSUE:
I'm unable to access my premium account despite having an active subscription. When I log in, the system still shows I have a free account with limited features. I can see in my bank statement that the $14.99 monthly charge went through three days ago. I've tried logging out and back in, clearing cookies, and using a different browser, but the problem remains. My account email is [email protected] and my customer ID is CUST-58924.
BILLING QUESTION:
I noticed an unexpected charge of $29.99 on my credit card statement from your company dated June 15th. I was under the impression that my subscription was $19.99/month. Was there a price increase that I missed notification about? Or is this an error? Please clarify what this charge covers and if there's been a change to my subscription terms. My account is registered under [email protected].
TECHNICAL QUESTION:
Is it possible to integrate your API with Zapier? We're trying to automate our workflow between your platform and our CRM system. I've looked through the documentation but couldn't find specific information about Zapier integrations. If this is supported, could you point me to relevant documentation or examples? If not, do you have any recommendations for alternative integration methods that wouldn't require custom development?
BUG REPORT:
There appears to be a security vulnerability in the user permission system. I discovered that standard users can access administrative reports by directly navigating to the URL pattern /admin/reports/custom/[report-id] even without admin privileges. I've verified this with two different standard user accounts. This potentially exposes sensitive company data to unauthorized personnel. Please address this urgently as it represents a significant security concern for our organization.
FEATURE REQUEST:
Could you please consider adding dark mode to both the web and mobile applications? Working with the current bright interface during evening hours is causing eye strain for many of our team members. Ideally, the dark mode would be automatically triggered based on system settings but with the option to manually override. This has become a standard feature in most professional applications, and would greatly improve the user experience for those of us who work long hours.
ACCOUNT ISSUE:
Our team admin left the company last week, and we need to transfer administrative privileges to another team member. The admin account was under [email protected]. We need to assign admin rights to [email protected] as soon as possible, as we're currently unable to add new team members or modify subscription settings. Our business account number is BIZ-4452-T. Please advise on the process for this transfer.
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/cache/utils.py:
--------------------------------------------------------------------------------
```python
"""Cache utility functions for Ultimate MCP Server.
This module provides utility functions for working with the cache service
that were previously defined in example scripts but are now part of the library.
"""
import hashlib
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.services.cache import get_cache_service
from ultimate_mcp_server.utils import get_logger
# Initialize logger
logger = get_logger("ultimate_mcp_server.services.cache.utils")
async def run_completion_with_cache(
prompt: str,
provider_name: str = Provider.OPENAI.value,
model: str = None,
temperature: float = 0.1,
max_tokens: int = None,
use_cache: bool = True,
ttl: int = 3600, # Default 1 hour cache TTL
api_key: str = None
):
"""Run a completion with automatic caching.
This utility function handles provider initialization, cache key generation,
cache lookups, and caching results automatically.
Args:
prompt: Text prompt for completion
provider_name: Provider to use (default: OpenAI)
model: Model name (optional, uses provider default if not specified)
temperature: Temperature for generation (default: 0.1)
max_tokens: Maximum tokens to generate (optional)
use_cache: Whether to use cache (default: True)
ttl: Cache TTL in seconds (default: 3600/1 hour)
api_key: Provider API key (optional, falls back to internal provider system)
Returns:
Completion result with additional processing_time attribute
"""
try:
# Let the provider system handle API keys if none provided
provider = await get_provider(provider_name, api_key=api_key)
await provider.initialize()
except Exception as e:
logger.error(f"Failed to initialize provider {provider_name}: {e}", emoji_key="error")
raise # Re-raise exception to stop execution if provider fails
cache_service = get_cache_service()
# Create a more robust cache key using all relevant parameters
model_id = model or provider.get_default_model() # Ensure we have a model id
# Create consistent hash of parameters that affect the result
params_str = f"{prompt}:{temperature}:{max_tokens if max_tokens else 'default'}"
params_hash = hashlib.md5(params_str.encode()).hexdigest()
cache_key = f"completion:{provider_name}:{model_id}:{params_hash}"
if use_cache and cache_service.enabled:
cached_result = await cache_service.get(cache_key)
if cached_result is not None:
logger.success("Cache hit! Using cached result", emoji_key="cache")
# Set processing time for cache retrieval (negligible)
cached_result.processing_time = 0.001
return cached_result
# Generate completion if not cached or cache disabled
if use_cache:
logger.info("Cache miss. Generating new completion...", emoji_key="processing")
else:
logger.info("Cache disabled by request. Generating new completion...", emoji_key="processing")
# Use the determined model_id and pass through other parameters
result = await provider.generate_completion(
prompt=prompt,
model=model_id,
temperature=temperature,
max_tokens=max_tokens
)
# Save to cache if enabled
if use_cache and cache_service.enabled:
await cache_service.set(
key=cache_key,
value=result,
ttl=ttl
)
logger.info(f"Result saved to cache (key: ...{cache_key[-10:]})", emoji_key="cache")
return result
```
--------------------------------------------------------------------------------
/TEST_README.md:
--------------------------------------------------------------------------------
```markdown
# Ultimate MCP Server Test Scripts
This directory contains test scripts to validate your Ultimate MCP Server functionality.
## Prerequisites
Make sure you have FastMCP installed:
```bash
pip install fastmcp
# or
uv add fastmcp
```
Also install aiohttp for REST API testing:
```bash
pip install aiohttp
# or
uv add aiohttp
```
## Test Scripts
### 1. `quick_test.py` - Quick Connectivity Test
**Purpose**: Fast basic connectivity and functionality check
**Runtime**: ~5 seconds
```bash
python quick_test.py
```
This script tests:
- ✅ Basic MCP connection
- 📢 Echo tool functionality
- 🔌 Provider availability
- 🛠️ Tool count
- 📚 Resource count
### 2. `test_client.py` - Interactive Test Client
**Purpose**: Comprehensive testing with interactive mode
**Runtime**: Variable (can be used interactively)
```bash
python test_client.py
```
This script tests:
- 🔗 Server connection
- 📋 Tool listing and calling
- 📚 Resource reading
- 🤖 LLM completions
- 📁 Filesystem tools
- 🐍 Python execution
- 📝 Text processing tools
- 🎮 Interactive command mode
**Interactive Commands**:
- `list` - Show available tools
- `resources` - Show available resources
- `call <tool_name> <json_params>` - Call a tool
- `read <resource_uri>` - Read a resource
- `quit` - Exit
### 3. `comprehensive_test.py` - Full Test Suite
**Purpose**: Complete validation of MCP and REST API functionality
**Runtime**: ~30 seconds
```bash
python comprehensive_test.py
```
This script tests:
- 🔧 MCP Interface (tools, providers, filesystem, Python)
- 🌐 REST API Endpoints (discovery, health, docs, cognitive states, performance, artifacts)
- 🤖 LLM Completions (actual generation with available providers)
- 🧠 Memory System (storage, retrieval, cognitive states)
## Understanding Results
### ✅ Green Check - Working Correctly
The feature is functioning as expected.
### ❌ Red X - Needs Attention
The feature failed or is not available. Common reasons:
- API keys not configured
- Provider services unavailable
- Database connection issues
- Missing dependencies
## Your Server Configuration
Based on your server startup logs, your server has:
- **107 tools** loaded (all available tools mode)
- **7 LLM providers** configured:
- ✅ Anthropic (3 models)
- ✅ DeepSeek (2 models)
- ✅ Gemini (4 models)
- ✅ OpenRouter (3 models)
- ✅ Ollama (3 models) - Local
- ✅ Grok (4 models)
- ✅ OpenAI (47 models)
## Endpoints Available
### MCP Protocol
- `http://127.0.0.1:8013/mcp` - Main MCP streamable-HTTP endpoint
### REST API
- `http://127.0.0.1:8013/` - Discovery endpoint
- `http://127.0.0.1:8013/api/health` - Health check
- `http://127.0.0.1:8013/api/docs` - Swagger UI documentation
- `http://127.0.0.1:8013/api/cognitive-states` - Cognitive state management
- `http://127.0.0.1:8013/api/performance/overview` - Performance metrics
- `http://127.0.0.1:8013/api/artifacts` - Artifact management
### UMS Explorer
- `http://127.0.0.1:8013/api/ums-explorer` - Memory system explorer UI
## Troubleshooting
### Connection Failed
- Verify server is running on port 8013
- Check firewall settings
- Ensure no other service is using the port
### Provider Errors
- Check API keys in environment variables
- Verify provider service availability
- Test with local Ollama first (no API key needed)
### Tool Errors
- Check filesystem permissions
- Verify Python sandbox configuration
- Check database connectivity
## Example Usage
```bash
# Quick smoke test
python quick_test.py
# Interactive exploration
python test_client.py
# Then type: list
# Then type: call echo {"message": "Hello!"}
# Full validation
python comprehensive_test.py
```
## Next Steps
After successful testing:
1. Check the Swagger UI at `http://127.0.0.1:8013/api/docs`
2. Explore the UMS Explorer at `http://127.0.0.1:8013/api/ums-explorer`
3. Test with a real MCP client like Claude Desktop
4. Start building your applications using the MCP tools!
```
--------------------------------------------------------------------------------
/tests/manual/test_extraction_advanced.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Manual test for advanced extraction tools using standardized completion.
This script tests the remaining extraction tools that were refactored to use
the standardized completion tool.
"""
import asyncio
import json
import os
import sys
# Add the project root to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.tools.extraction import extract_code_from_response, extract_semantic_schema
async def test_extract_semantic_schema():
"""Test the extract_semantic_schema function with a simple schema."""
print("\n--- Testing extract_semantic_schema ---")
# Define a JSON schema to extract data
schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string"},
"phone": {"type": "string"},
"interests": {"type": "array", "items": {"type": "string"}}
}
}
# Sample text containing information matching the schema
sample_text = """
Profile information:
Name: Sarah Johnson
Contact: [email protected]
Phone Number: 555-987-6543
Sarah is interested in: machine learning, data visualization, and hiking.
"""
result = await extract_semantic_schema(
text=sample_text,
semantic_schema=schema,
provider=Provider.OPENAI.value,
model="gpt-3.5-turbo"
)
print(f"Success: {result.get('success', False)}")
print(f"Model used: {result.get('model', 'unknown')}")
print(f"Tokens: {result.get('tokens', {})}")
print(f"Processing time: {result.get('processing_time', 0):.2f}s")
# Pretty print the extracted data
if result.get('data'):
print("Extracted Schema Data:")
print(json.dumps(result['data'], indent=2))
else:
print("Failed to extract schema data")
print(f"Error: {result.get('error', 'unknown error')}")
async def test_extract_code_from_response():
"""Test the extract_code_from_response function."""
print("\n--- Testing extract_code_from_response ---")
# Sample text with a code block
sample_text = """
Here's a Python function to calculate the factorial of a number:
```python
def factorial(n):
if n == 0 or n == 1:
return 1
else:
return n * factorial(n-1)
# Example usage
print(factorial(5)) # Output: 120
```
This uses a recursive approach to calculate the factorial.
"""
# Test with regex-based extraction
print("Testing regex-based extraction...")
extracted_code = await extract_code_from_response(
response_text=sample_text,
model="openai/gpt-3.5-turbo",
timeout=10
)
print("Extracted Code:")
print(extracted_code)
# Test with LLM-based extraction on text without markdown
print("\nTesting LLM-based extraction...")
sample_text_no_markdown = """
Here's a Python function to calculate the factorial of a number:
def factorial(n):
if n == 0 or n == 1:
return 1
else:
return n * factorial(n-1)
# Example usage
print(factorial(5)) # Output: 120
This uses a recursive approach to calculate the factorial.
"""
extracted_code = await extract_code_from_response(
response_text=sample_text_no_markdown,
model="openai/gpt-3.5-turbo",
timeout=10
)
print("Extracted Code:")
print(extracted_code)
async def main():
"""Run all tests."""
print("Testing advanced extraction tools with standardized completion...")
await test_extract_semantic_schema()
await test_extract_code_from_response()
print("\nAll tests completed.")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/examples/sample/article.txt:
--------------------------------------------------------------------------------
```
## Tech Industry Shake-up: Microsoft Acquires AI Startup Anthropic for $10B
March 25, 2025 | Sarah Johnson, Technology Reporter
In a move that has sent shockwaves through Silicon Valley, Microsoft Corporation announced yesterday its acquisition of AI research company Anthropic for $10 billion. The deal, which was finalized after months of secretive negotiations, marks Microsoft's largest investment in artificial intelligence to date.
Microsoft CEO Satya Nadella explained the strategic importance of the acquisition during a press conference held at Microsoft's headquarters in Redmond, Washington. "Anthropic's Claude AI models represent some of the most sophisticated language systems ever developed," Nadella stated. "This acquisition strengthens our commitment to responsible AI development and ensures Microsoft remains at the forefront of the AI revolution."
Anthropic, founded in 2021 by former OpenAI researchers Dario Amodei and Daniela Amodei, has gained recognition for its Claude family of AI models that emphasize safety and interpretability. The company had previously received significant funding from Google and Amazon, with Google investing $300 million for a 10% stake in 2023, and Amazon committing up to $4 billion in September 2023.
Both Dario Amodei, who serves as Anthropic's CEO, and Daniela Amodei, the company's President, will join Microsoft's AI leadership team while continuing to oversee Anthropic's operations. "Joining forces with Microsoft gives us the computational resources and research talent needed to advance our constitutional AI approach," said Dario Amodei. "We believe this partnership will accelerate our mission to develop AI systems that are steerable, interpretable, and robust."
The acquisition has raised antitrust concerns, with the Federal Trade Commission (FTC) Chair Lina Khan announcing an immediate review of the deal. "We will scrutinize this acquisition carefully to ensure it doesn't further concentrate power in the already consolidated AI sector," Khan said in a statement released by the FTC.
Google's parent company Alphabet and Amazon, both major investors in Anthropic, may face significant losses from the acquisition. Alphabet's stock (GOOGL) fell 3.2% following the announcement, while Amazon (AMZN) saw a more modest decline of 1.5%. In contrast, Microsoft (MSFT) shares jumped 5.8% to $420.75.
OpenAI CEO Sam Altman expressed surprise at the acquisition in a post on X (formerly Twitter): "Congratulations to the Anthropic team. This creates an interesting competitive landscape. Game on." OpenAI, which has received approximately $13 billion in investment from Microsoft, now finds itself in the unusual position of competing with another Microsoft-owned AI company.
Industry analyst Maria Rodriguez from Morgan Stanley noted that the acquisition signals a new phase in the AI arms race. "Microsoft is clearly hedging its bets by owning stakes in both leading frontier AI labs. This could be interpreted as uncertainty about which approach to AI safety and capabilities will ultimately succeed," Rodriguez explained in a research note to investors.
The deal includes provisions for Anthropic to continue operating as a separate entity within Microsoft, with guaranteed compute resources on Microsoft's Azure cloud platform. All of Anthropic's 350 employees will be retained, and the company's San Francisco headquarters will remain operational.
According to sources familiar with the matter, the acquisition talks began after a dinner meeting between Nadella and Dario Amodei at the World Economic Forum in Davos, Switzerland in January 2025. Microsoft President Brad Smith and CFO Amy Hood were reportedly instrumental in structuring the complex deal.
The acquisition is expected to close by Q3 2025, pending regulatory approval. If approved, it would mark another significant milestone in the rapidly evolving artificial intelligence industry, where companies are increasingly competing for talent, technology, and market position.
```
--------------------------------------------------------------------------------
/examples/sample/text_classification_samples/product_reviews.txt:
--------------------------------------------------------------------------------
```
POSITIVE REVIEW:
I absolutely love this coffee maker! It brews the perfect cup every time and the temperature control is spot on. After three months of daily use, I'm still impressed with how consistent the results are. The app connectivity seemed gimmicky at first, but being able to schedule brews from bed has been a game-changer for my morning routine. Clean-up is simple with the removable parts and the sleek design fits perfectly in my kitchen. Best appliance purchase I've made in years!
NEGATIVE REVIEW:
This laptop has been nothing but trouble since day one. The battery barely lasts 2 hours despite the "all-day battery life" claim, and it overheats constantly even with basic web browsing. The keyboard started having sticky keys after just two weeks, and customer support has been completely unhelpful. To make matters worse, the screen has strange flickering issues that come and go randomly. Save your money and avoid this model completely - total waste of $1200.
NEUTRAL REVIEW:
The wireless earbuds are decent for the price point. Sound quality is acceptable though not exceptional - you get what you pay for. Battery life matches the advertised 4 hours, and the charging case provides about 3 full charges as expected. The fit is comfortable enough for short periods, though they start to hurt after about 2 hours of continuous use. Connectivity is generally stable but occasional drops occur when the phone is in a pocket. Overall, a reasonable budget option if you're not an audiophile.
POSITIVE REVIEW:
This air fryer has completely transformed how I cook! Food comes out perfectly crispy on the outside and juicy on the inside, all with minimal or no oil. It preheats quickly and the digital controls are intuitive and responsive. I appreciate the dishwasher-safe basket which makes cleanup a breeze. Even my kids are eating more vegetables now because they taste so good prepared this way. The unit is a bit bulky on the counter, but the performance more than makes up for the space it takes.
NEGATIVE REVIEW:
I regret purchasing this robot vacuum. It constantly gets stuck under furniture despite claiming to have "smart navigation," and the battery dies before finishing our modestly sized apartment. The app disconnects frequently requiring tedious reconnection processes. The dust bin is way too small and needs emptying after each use. Worst of all, it scratched our hardwood floors in several places! Customer service offered little help beyond basic troubleshooting steps I'd already tried. Returning this disappointment ASAP.
NEUTRAL REVIEW:
The fitness tracker works as advertised for basic functions. Step counting seems accurate enough for casual use, and the sleep tracking provides interesting if not necessarily actionable data. Heart rate monitoring is hit or miss during high-intensity workouts but fine for resting measurements. The app is somewhat clunky but gets the job done. Battery lasts about 4 days which is adequate. The band is comfortable but shows signs of wear after a couple months. It's not outstanding but reasonable value for the price point.
POSITIVE REVIEW:
This blender is an absolute powerhouse! I've thrown everything at it from frozen fruits to tough vegetables and nuts, and it creates perfectly smooth blends every time. The variable speed control gives precise results whether you want chunky salsa or silky smoothies. It's definitely louder than my previous blender, but the performance justifies the noise. The container is easy to clean and the blades are impressively durable. Yes, it's expensive, but given the quality and 7-year warranty, it's worth every penny for a serious home cook.
NEGATIVE REVIEW:
These "premium" headphones are anything but premium. The sound quality is muddy with overwhelming bass that drowns out mids and highs. The noise cancellation is so weak it barely reduces ambient noise. The build quality feels cheap with plastic parts that creak when you adjust them. After just one month, the right ear cup started cutting out intermittently. For this price point, I expected far better quality and performance. Definitely returning these and going with a more reliable brand.
```
--------------------------------------------------------------------------------
/examples/sample/text_classification_samples/news_samples.txt:
--------------------------------------------------------------------------------
```
TECH NEWS SAMPLE:
Apple unveiled its new Apple Intelligence features today, integrating advanced AI capabilities directly into iOS 18, macOS Sequoia, and iPadOS 18. This marks a significant shift in Apple's strategy, embracing generative AI while maintaining their focus on privacy by processing most tasks on-device. Features include intelligent email summarization, photo editing via text prompts, and a significantly enhanced Siri experience that can understand context across apps.
SPORTS NEWS SAMPLE:
The Boston Celtics clinched their 18th NBA championship with a decisive 106-88 victory over the Dallas Mavericks last night, winning the series 4-1. Jayson Tatum was named Finals MVP after averaging 28.5 points and 9.8 rebounds during the series. "This is what we've worked for all season," said Celtics coach Joe Mazzulla. The victory ends a 16-year championship drought for the storied franchise, which now moves ahead of the Los Angeles Lakers for most NBA titles.
POLITICS NEWS SAMPLE:
The Senate passed a major bipartisan infrastructure bill today with a 69-30 vote, allocating $1.2 trillion for roads, bridges, public transit, and broadband internet. The legislation represents a rare moment of cooperation in a deeply divided Congress. "This bill shows America can do big things when we work together," said President Biden at a press conference following the vote. The bill now moves to the House, where progressive Democrats have tied its passage to a larger $3.5 trillion social spending package.
HEALTH NEWS SAMPLE:
A new study published in The Lancet suggests that intermittent fasting may not offer significant weight loss advantages over traditional calorie restriction diets. Researchers followed 250 participants over a 12-month period and found that while both approaches led to weight loss, there was no statistically significant difference between the two methods. "What matters most is consistency and finding an eating pattern that works for your lifestyle," said lead researcher Dr. Emily Chen from Stanford University.
ENTERTAINMENT NEWS SAMPLE:
The 96th Academy Awards ceremony delivered several surprises, with "The Quiet Hour" taking home Best Picture despite being a low-budget independent film. Lead actress Zoe Kazan won Best Actress for her role as a Holocaust survivor, while Christopher Nolan finally secured his first Best Director Oscar for "Synchronicity." The ceremony saw a 12% increase in viewership from last year, reversing a years-long decline in ratings for Hollywood's biggest night.
SCIENCE NEWS SAMPLE:
NASA's Europa Clipper mission has entered its final assembly phase, with launch scheduled for October 2024. The spacecraft will conduct detailed reconnaissance of Jupiter's moon Europa, which scientists believe harbors a subsurface ocean that could potentially support life. "This mission represents our best chance to determine if Europa's ocean is habitable," said project scientist Dr. Robert Pappalardo. The spacecraft will make nearly 50 close flybys of Europa, collecting data that will help scientists understand the moon's potential to harbor life.
BUSINESS NEWS SAMPLE:
Tesla announced record quarterly profits today, exceeding Wall Street expectations with revenue of $24.3 billion and earnings per share of $1.24. The electric vehicle manufacturer delivered 466,000 vehicles in Q2, a 50% increase from the same period last year. CEO Elon Musk attributed the success to improved production efficiency and strong demand for the Model Y. The company also revealed plans to begin production of its Cybertruck at the Texas Gigafactory by early next quarter, ending years of delays for the highly anticipated vehicle.
EDUCATION NEWS SAMPLE:
A landmark study from the Department of Education found that states implementing universal pre-kindergarten programs saw significant improvements in literacy rates and reduced achievement gaps. The research, which followed 28,000 students across 12 states, showed that children who attended quality pre-K programs were 38% more likely to read at grade level by third grade compared to their peers. "This provides compelling evidence that early childhood education should be a national priority," said Education Secretary Miguel Cardona.
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/utils.py:
--------------------------------------------------------------------------------
```python
"""Utility functions for the knowledge base services."""
from typing import Any, Dict, List, Optional
def build_metadata_filter(
filters: Optional[Dict[str, Any]] = None,
operator: str = "$and"
) -> Optional[Dict[str, Any]]:
"""Build a ChromaDB metadata filter.
Args:
filters: Dictionary of metadata filters (field->value or field->{op: value})
operator: Logical operator to combine filters ($and or $or)
Returns:
ChromaDB-compatible filter or None
"""
if not filters:
return None
# Handle direct equality case with single filter
if len(filters) == 1 and not any(isinstance(v, dict) for v in filters.values()):
field, value = next(iter(filters.items()))
return {field: value} # ChromaDB handles direct equality
# Process complex filters
filter_conditions = []
for field, condition in filters.items():
if isinstance(condition, dict):
# Already has operators
if any(k.startswith("$") for k in condition.keys()):
filter_conditions.append({field: condition})
else:
# Convert to $eq
filter_conditions.append({field: {"$eq": condition}})
else:
# Simple equality
filter_conditions.append({field: {"$eq": condition}})
# If only one condition, no need for logical operator
if len(filter_conditions) == 1:
return filter_conditions[0]
# Combine with logical operator
return {operator: filter_conditions}
def extract_keywords(text: str, min_length: int = 3, max_keywords: int = 10) -> List[str]:
"""Extract important keywords from text.
Args:
text: Input text
min_length: Minimum length of keywords
max_keywords: Maximum number of keywords to extract
Returns:
List of keywords
"""
# Simple keyword extraction (could be improved with NLP)
words = text.lower().split()
# Filter out short words and common stop words
stop_words = {
"the", "and", "a", "an", "in", "on", "at", "to", "for", "with", "by",
"is", "are", "was", "were", "be", "been", "has", "have", "had", "of", "that"
}
keywords = [
word.strip(".,?!\"'()[]{}:;")
for word in words
if len(word) >= min_length and word.lower() not in stop_words
]
# Count occurrences
keyword_counts = {}
for word in keywords:
if word in keyword_counts:
keyword_counts[word] += 1
else:
keyword_counts[word] = 1
# Sort by frequency
sorted_keywords = sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)
# Return top keywords
return [k for k, _ in sorted_keywords[:max_keywords]]
def generate_token_estimate(text: str) -> int:
"""Generate a rough estimate of token count.
Args:
text: Input text
Returns:
Estimated token count
"""
# Rough estimate based on whitespace tokenization and a multiplier
# This is a very crude approximation
words = len(text.split())
# Adjust for non-English or technical content
if any(ord(c) > 127 for c in text): # Has non-ASCII chars
return int(words * 1.5) # Non-English texts need more tokens
# Standard English approximation
return int(words * 1.3) # Account for tokenization differences
def create_document_metadata(
document: str,
source: Optional[str] = None,
document_type: Optional[str] = None
) -> Dict[str, Any]:
"""Create metadata for a document.
Args:
document: Document text
source: Optional source of the document
document_type: Optional document type
Returns:
Document metadata
"""
# Basic metadata
metadata = {
"length": len(document),
"token_estimate": generate_token_estimate(document),
"created_at": int(1000 * import_time()),
}
# Add source if provided
if source:
metadata["source"] = source
# Add document type if provided
if document_type:
metadata["type"] = document_type
# Extract potential title from first line
lines = document.strip().split("\n")
if lines and len(lines[0]) < 100: # Potential title
metadata["potential_title"] = lines[0]
return metadata
# Import at the end to avoid circular imports
import time as import_time # noqa: E402
```
--------------------------------------------------------------------------------
/tests/manual/test_extraction.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Manual test for extraction tools using standardized completion.
This script tests the key functions in extraction.py to ensure they work
with the updated standardized completion tool.
"""
import asyncio
import json
import os
import sys
# Add the project root to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.tools.extraction import (
extract_json,
extract_key_value_pairs,
extract_table,
)
async def test_extract_json():
"""Test the extract_json function with a simple JSON object."""
print("\n--- Testing extract_json ---")
# Simplified JSON without nested structures
sample_text = """
Here's the result of my analysis:
{
"name": "John Smith",
"age": 42,
"skills": "programming, design, project management",
"email": "[email protected]",
"phone": "555-1234"
}
Let me know if you need any more information.
"""
result = await extract_json(
text=sample_text,
provider=Provider.OPENAI.value,
model="gpt-3.5-turbo"
)
print(f"Success: {result.get('success', False)}")
print(f"Model used: {result.get('model', 'unknown')}")
print(f"Tokens: {result.get('tokens', {})}")
print(f"Processing time: {result.get('processing_time', 0):.2f}s")
# Pretty print the extracted data
if result.get('data'):
print("Extracted JSON:")
print(json.dumps(result['data'], indent=2))
else:
print("Failed to extract JSON")
print(f"Error: {result.get('error', 'unknown error')}")
async def test_extract_table():
"""Test the extract_table function with a simple table."""
print("\n--- Testing extract_table ---")
sample_text = """
Here's a summary of our quarterly sales:
| Product | Q1 Sales | Q2 Sales |
|----------|----------|----------|
| Widget A | 1200 | 1350 |
| Widget B | 850 | 940 |
As you can see, Widget A performed best in Q2.
"""
result = await extract_table(
text=sample_text,
return_formats=["json"], # Just request json to keep it simple
provider=Provider.OPENAI.value,
model="gpt-3.5-turbo"
)
print(f"Success: {result.get('success', False)}")
print(f"Model used: {result.get('model', 'unknown')}")
print(f"Tokens: {result.get('tokens', {})}")
print(f"Processing time: {result.get('processing_time', 0):.2f}s")
# Print the extracted data
if result.get('data'):
print("Extracted Table Data:")
if isinstance(result['data'], dict) and "json" in result['data']:
print("JSON Format:")
print(json.dumps(result['data']["json"], indent=2))
else:
print(json.dumps(result['data'], indent=2))
else:
print("Failed to extract table")
print(f"Error: {result.get('error', 'unknown error')}")
if result.get('raw_text'):
print(f"Raw text: {result.get('raw_text')[:200]}...")
async def test_extract_key_value_pairs():
"""Test the extract_key_value_pairs function."""
print("\n--- Testing extract_key_value_pairs ---")
sample_text = """
Patient Information:
Name: Jane Doe
DOB: 05/12/1985
Gender: Female
Blood Type: O+
Height: 5'6"
Weight: 145 lbs
Allergies: Penicillin, Shellfish
Primary Care Physician: Dr. Robert Chen
"""
result = await extract_key_value_pairs(
text=sample_text,
provider=Provider.OPENAI.value,
model="gpt-3.5-turbo"
)
print(f"Success: {result.get('success', False)}")
print(f"Model used: {result.get('model', 'unknown')}")
print(f"Tokens: {result.get('tokens', {})}")
print(f"Processing time: {result.get('processing_time', 0):.2f}s")
# Print the extracted data
if result.get('data'):
print("Extracted Key-Value Pairs:")
for key, value in result['data'].items():
print(f" {key}: {value}")
else:
print("Failed to extract key-value pairs")
print(f"Error: {result.get('error', 'unknown error')}")
async def main():
"""Run all tests."""
print("Testing extraction tools with standardized completion...")
await test_extract_json()
await test_extract_table()
await test_extract_key_value_pairs()
print("\nAll tests completed.")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/examples/sample/text_classification_samples/email_classification.txt:
--------------------------------------------------------------------------------
```
SPAM EMAIL:
CONGRATULATIONS! You have been selected as the LUCKY WINNER of our INTERNATIONAL LOTTERY! Your email was randomly chosen from our database and you have won $5,000,000.00 USD (FIVE MILLION UNITED STATES DOLLARS). To claim your prize, please contact our claims agent immediately at [email protected] with your full name, address, phone number, and a copy of your ID. A processing fee of $199 is required to release your funds. DO NOT DELAY! This offer expires in 48 HOURS!
URGENT EMAIL:
Subject: Critical Security Breach - Immediate Action Required
Dear IT Team,
Our monitoring systems have detected an unauthorized access attempt to our main customer database at 3:42 AM EST. The attempt originated from an IP address in Eastern Europe and appears to have successfully extracted approximately 25,000 customer records including names, email addresses, and hashed passwords. Our security team has temporarily shut down external access to the affected systems.
Please implement the emergency response protocol immediately:
1. Activate the incident response team
2. Reset all administrative credentials
3. Deploy the prepared statement to affected customers
4. Begin forensic analysis of the breach vector
This is classified as a Severity 1 incident requiring immediate attention.
David Chen
Chief Information Security Officer
PROMOTIONAL EMAIL:
Subject: Summer Sale - 48 Hours Only! Up to 70% Off Everything
Beat the heat with sizzling savings! 🔥
Our biggest sale of the season is HERE! For just 48 hours, enjoy:
• Up to 70% off ALL clothing and accessories
• Buy one, get one 50% off on summer essentials
• Free shipping on orders over $50
• Extra 15% off with code: SUMMER24
Plus, the first 500 orders receive a FREE beach tote (valued at $45)!
Don't miss out - sale ends Sunday at midnight.
Shop now: https://www.fashionretailer.com/summer-sale
INFORMATIONAL EMAIL:
Subject: Upcoming System Maintenance - May 15th
Dear Valued Customer,
Please be informed that we will be conducting scheduled maintenance on our systems to improve performance and reliability. During this time, our services will be temporarily unavailable.
Maintenance details:
• Date: Tuesday, May 15th, 2024
• Time: 2:00 AM - 5:00 AM EDT (UTC-4)
• Affected services: Online banking portal, mobile app, and automated phone system
No action is required on your part. All services will resume automatically once maintenance is complete. We recommend completing any urgent transactions before the maintenance window begins.
We apologize for any inconvenience this may cause and appreciate your understanding as we work to enhance your experience.
Sincerely,
Customer Support Team
First National Bank
PHISHING EMAIL:
Subject: Your Amazon Account Has Been Suspended
Dear Valued Customer,
We regret to inform you that your Amazon account has been temporarily suspended due to unusual activity. Our security system has detected multiple failed login attempts from unrecognized devices.
To verify your identity and restore your account access, please update your payment information by clicking the link below:
>> Restore Account Access Now <<
If you do not verify your account within 24 hours, your account will be permanently deactivated and all pending orders will be canceled.
Thank you for your immediate attention to this matter.
Amazon Customer Service Team
PERSONAL EMAIL:
Subject: Vacation Plans for Next Month
Hi Sarah,
How are you doing? I hope everything's going well with the new job! I've been thinking about our conversation last month about taking a short vacation together, and I wanted to follow up.
I checked some options for that beach town we talked about, and there are some great deals for the weekend of the 15th. I found a cute rental cottage about two blocks from the beach for $180/night, which seems reasonable if we split it. The weather should be perfect that time of year too.
Let me know if you're still interested and if those dates work for you. I could book it this week to secure the place before summer rates kick in.
Can't wait to catch up properly!
Talk soon,
Michael
TRANSACTIONAL EMAIL:
Subject: Order #78291 Confirmation - Your Purchase from TechGadgets
Dear Alex Rodriguez,
Thank you for your recent purchase from TechGadgets. We're processing your order and will ship it soon.
Order Details:
• Order Number: #78291
• Order Date: April 3, 2024
• Payment Method: Visa ending in 4872
• Shipping Method: Standard (3-5 business days)
Items Purchased:
1. Wireless Earbuds Pro - Black (1) - $129.99
2. Fast Charging Cable 6ft (2) - $19.99 each
3. Screen Protector Ultra (1) - $24.99
Subtotal: $194.96
Shipping: $5.99
Tax: $16.57
Total: $217.52
You will receive a shipping confirmation email with tracking information once your order ships. You can also check your order status anytime by logging into your account.
If you have any questions about your order, please contact our customer service team at [email protected] or call 1-800-555-1234.
Thank you for shopping with us!
The TechGadgets Team
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "ultimate_mcp_server"
version = "0.1.0"
description = "The Ultimate Model Context Protocol (MCP) Server, providing unified access to a wide variety of useful and powerful tools."
readme = "README.md"
requires-python = ">=3.13"
license = {file = "LICENSE"}
authors = [
{name = "Jeffrey Emanuel", email = "[email protected]"},
]
maintainers = [
{name = "Jeffrey Emanuel", email = "[email protected]"},
]
keywords = ["ultimte", "mcp", "server", "agent", "ai", "claude", "gpt", "gemini", "deepseek"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.13",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
]
dependencies = [
# Core MCP and LLM providers
"mcp>=0",
"anthropic>=0",
"openai>=0",
"google-genai>=0",
# Async utilities
"httpx>=0",
"aiofiles>=0",
# Data processing
"pydantic>=0",
"tenacity>=0", # For retry logic
# Caching and persistence
"diskcache>=0", # Persistent disk cache
"msgpack>=0", # Efficient serialization
# Vector database for semantic caching
"numpy>=0",
"sentence-transformers>=0", # For embeddings
"chromadb>=0", # Vector DB
# Analytics and monitoring
"prometheus-client>=0",
"pandas>=0",
"rich>=0", # Console output formatting
# Templating for prompt management
"jinja2>=0",
# Multi-modal support
"pillow>=0", # Image processing
# Utilities
"python-slugify>=0", # For URL-friendly strings
"colorama>=0", # Terminal colors
"tqdm>=0", # Progress bars
"tiktoken>=0", # Token counting
"python-decouple>=0", # .env management
"pydantic-settings>=0",
"jsonschema>=0",
"matplotlib>=0",
"marqo>=0", # Added for Marqo search tool
"pytest-playwright>=0", # For web browser automation
"sqlalchemy>=0", # For SQL database interactions
"aiosqlite>=0", # Async SQLite database access
"pyvis>=0", # Graph visualization
"python-docx>=0", # MS Word DOCX support
"opencv-python>=0", # For OCR tools
"pytesseract>=0", # For OCR
"pdf2image>=0", # For OCR
"PyPDF2>=0", # PDF conversion
"pdfplumber>=0", # For OCR
"fitz>=0", # For OCR
"pymupdf>=0", # For OCR
"beautifulsoup4>=0", # Dealing with HTML
"xmldiff>=0", # for redlines
"lxml>=0", # XML parser
"faster-whisper>=0", # Audio transcripts
"html2text>=0",
"readability-lxml>=0",
"markdownify>=0",
"trafilatura>=0",
"markdown>=0",
"jsonpatch>=0",
"jsonpointer>=0",
"pygments>=0",
"typer>=0", # For CLI interface
"docling>=0", # For document conversion
"aiohttp>=0",
"boto3>=0", # For AWS secrets management
"hvac>=0", # For HashiVault pw management
"pandera>=0", # Data validation
"rapidfuzz>=0",
"magika>=0",
"tabula-py>=0",
"brotli>=0",
"pygments>=0",
"fastapi>=0.115.9",
"uvicorn>=0.34.2",
"networkx>0",
"scipy>0",
"fastmcp>0",
]
[project.optional-dependencies]
advanced = [
"torch",
"torchvision",
"torchaudio",
"pytorch-triton",
"transformers>=0",
"accelerate>=0",
]
#excel_automation = [
# "win32com", # Excel automation,
# "win32com",
#]
# Development and testing
dev = [
"pytest>=0",
"pytest-asyncio>=0",
"pytest-cov>=0",
"isort>=0",
"mypy>=0",
"ruff>=0",
"types-aiofiles>=0",
"pre-commit>=0",
]
# Documentation
docs = [
"mkdocs>=0",
"mkdocs-material>=0",
"mkdocstrings>=0",
"mkdocstrings-python>=0",
]
# All extras
all = ["ultimate_mcp_server[advanced,dev,docs]"]
[[tool.uv.index]]
name = "pypi"
url = "https://pypi.org/simple"
[tool.uv.pip]
prerelease = "allow"
torch-backend = "auto"
[project.urls]
Homepage = "https://github.com/Dicklesworthstone/ultimate_mcp_server"
Documentation = "https://github.com/Dicklesworthstone/ultimate_mcp_server/docs"
Repository = "https://github.com/Dicklesworthstone/ultimate_mcp_server.git"
"Bug Reports" = "https://github.com/Dicklesworthstone/ultimate_mcp_server/issues"
[project.scripts]
umcp = "ultimate_mcp_server.cli.typer_cli:cli"
[tool.hatch.version]
path = "ultimate_mcp_server/__init__.py"
[tool.hatch.build.targets.sdist]
include = [
"/ultimate_mcp_server",
"/examples",
"/tests",
"LICENSE",
"README.md",
"pyproject.toml",
]
[tool.hatch.build.targets.wheel]
packages = ["ultimate_mcp_server"]
[tool.black]
line-length = 100
target-version = ["py313"]
include = '\.pyi?$'
[tool.isort]
profile = "black"
line_length = 100
multi_line_output = 3
[tool.mypy]
python_version = "3.13"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
strict_optional = true
[tool.pytest.ini_options]
minversion = "7.0"
addopts = "--cov=ultimate_mcp_server --cov-report=term-missing -v"
testpaths = ["tests"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
[tool.ruff]
line-length = 100
target-version = "py313"
[tool.ruff.lint]
select = ["E", "F", "B", "I", "Q"]
ignore = ["E203", "E501", "Q000"]
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/cache/persistence.py:
--------------------------------------------------------------------------------
```python
"""Cache persistence mechanisms."""
import json
import os
import pickle
from pathlib import Path
from typing import Any, Dict, Optional
import aiofiles
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
class CachePersistence:
"""Handles cache persistence operations."""
def __init__(self, cache_dir: Path):
"""Initialize the cache persistence handler.
Args:
cache_dir: Directory for cache storage
"""
self.cache_dir = cache_dir
self.cache_file = cache_dir / "cache.pkl"
self.metadata_file = cache_dir / "metadata.json"
# Create cache directory if it doesn't exist
self.cache_dir.mkdir(parents=True, exist_ok=True)
async def save_cache(self, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> bool:
"""Save cache data to disk.
Args:
data: Cache data to save
metadata: Optional metadata about the cache
Returns:
True if successful
"""
try:
# Save cache data
temp_file = f"{self.cache_file}.tmp"
async with aiofiles.open(temp_file, 'wb') as f:
await f.write(pickle.dumps(data))
# Rename temp file to cache file (atomic operation)
os.replace(temp_file, self.cache_file)
# Save metadata if provided
if metadata:
await self.save_metadata(metadata)
logger.debug(
f"Saved cache data to {self.cache_file}",
emoji_key="cache"
)
return True
except Exception as e:
logger.error(
f"Failed to save cache data: {str(e)}",
emoji_key="error"
)
return False
async def load_cache(self) -> Optional[Dict[str, Any]]:
"""Load cache data from disk.
Returns:
Cache data or None if file doesn't exist or error occurs
"""
if not self.cache_file.exists():
return None
try:
async with aiofiles.open(self.cache_file, 'rb') as f:
data = await f.read()
cache_data = pickle.loads(data)
logger.debug(
f"Loaded cache data from {self.cache_file}",
emoji_key="cache"
)
return cache_data
except Exception as e:
logger.error(
f"Failed to load cache data: {str(e)}",
emoji_key="error"
)
return None
async def save_metadata(self, metadata: Dict[str, Any]) -> bool:
"""Save cache metadata to disk.
Args:
metadata: Metadata to save
Returns:
True if successful
"""
try:
# Save metadata
temp_file = f"{self.metadata_file}.tmp"
async with aiofiles.open(temp_file, 'w') as f:
await f.write(json.dumps(metadata, indent=2))
# Rename temp file to metadata file (atomic operation)
os.replace(temp_file, self.metadata_file)
return True
except Exception as e:
logger.error(
f"Failed to save cache metadata: {str(e)}",
emoji_key="error"
)
return False
async def load_metadata(self) -> Optional[Dict[str, Any]]:
"""Load cache metadata from disk.
Returns:
Metadata or None if file doesn't exist or error occurs
"""
if not self.metadata_file.exists():
return None
try:
async with aiofiles.open(self.metadata_file, 'r') as f:
data = await f.read()
metadata = json.loads(data)
return metadata
except Exception as e:
logger.error(
f"Failed to load cache metadata: {str(e)}",
emoji_key="error"
)
return None
async def cleanup_old_cache_files(self, max_age_days: int = 30) -> int:
"""Clean up old cache files.
Args:
max_age_days: Maximum age of cache files in days
Returns:
Number of files deleted
"""
import time
now = time.time()
max_age_seconds = max_age_days * 24 * 60 * 60
deleted_count = 0
try:
# Find all cache files
cache_files = list(self.cache_dir.glob("*.tmp"))
# Delete old files
for file_path in cache_files:
mtime = file_path.stat().st_mtime
age = now - mtime
if age > max_age_seconds:
file_path.unlink()
deleted_count += 1
if deleted_count > 0:
logger.info(
f"Cleaned up {deleted_count} old cache files",
emoji_key="cache"
)
return deleted_count
except Exception as e:
logger.error(
f"Failed to clean up old cache files: {str(e)}",
emoji_key="error"
)
return deleted_count
```
--------------------------------------------------------------------------------
/check_api_keys.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Script to check API key configurations for Ultimate MCP Server using rich formatting."""
import asyncio
import sys
from pathlib import Path
# Add project root to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.utils import get_logger
# Initialize rich console
console = Console()
logger = get_logger("api_key_checker")
# Map provider names to the corresponding environment variable names
# Used for informational display only
PROVIDER_ENV_VAR_MAP = {
"openai": "OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"deepseek": "DEEPSEEK_API_KEY",
"gemini": "GEMINI_API_KEY",
"openrouter": "OPENROUTER_API_KEY",
}
async def check_api_keys():
"""
Check API key configurations and display a comprehensive report.
This async function:
1. Loads the current configuration settings from all sources (environment variables,
.env file, configuration files)
2. Initializes a minimal Gateway instance to access provider configurations
3. Checks if API keys are properly configured for all supported providers
4. Displays formatted results using rich tables and panels, including:
- Provider-by-provider API key status
- Configuration loading priority information
- How to set API keys properly
- Example .env file content
The function checks keys for all providers defined in the Provider enum,
including OpenAI, Anthropic, DeepSeek, Gemini, and OpenRouter.
Returns:
int: Exit code (0 for success)
"""
# Force load config to ensure we get the latest resolved settings
cfg = get_config()
# Create Gateway with minimal initialization (no tools) - kept for potential future checks
gateway = Gateway(name="api-key-checker", register_tools=False) # noqa: F841
console.print(Panel(
"Checking API Key Configuration based on loaded settings",
title="[bold cyan]Ultimate MCP Server API Key Check[/bold cyan]",
expand=False,
border_style="blue"
))
# Create table for results
table = Table(title="Provider API Key Status", show_header=True, header_style="bold magenta")
table.add_column("Provider", style="dim", width=12)
table.add_column("API Key Status", style="cyan")
table.add_column("Relevant Env Var", style="yellow")
table.add_column("Status", style="bold")
# Check each provider based on the loaded configuration
for provider_name in [p.value for p in Provider]:
# Get provider config from the loaded GatewayConfig object
provider_config = getattr(cfg.providers, provider_name, None)
# Check if key exists in the loaded config
# This key would have been resolved from .env, env vars, or config file by get_config()
config_key = provider_config.api_key if provider_config else None
# Format key for display (if present)
key_display = Text("Not set in config", style="dim yellow")
status_text = Text("NOT CONFIGURED", style="red")
status_icon = "❌"
if config_key:
if len(config_key) > 8:
key_display = Text(f"{config_key[:4]}...{config_key[-4:]}", style="green")
else:
key_display = Text("[INVALID KEY FORMAT]", style="bold red")
status_text = Text("CONFIGURED", style="green")
status_icon = "✅"
# Get the corresponding environment variable name for informational purposes
env_var_name = PROVIDER_ENV_VAR_MAP.get(provider_name, "N/A")
# Add row to table
table.add_row(
provider_name.capitalize(),
key_display,
env_var_name,
f"[{status_text.style}]{status_icon} {status_text}[/]"
)
# Print the table
console.print(table)
# Configuration Loading Info Panel
config_info = Text.assemble(
("1. ", "bold blue"), ("Environment Variables", "cyan"), (" (e.g., ", "dim"), ("GATEWAY_PROVIDERS__OPENAI__API_KEY=...", "yellow"), (")\n", "dim"),
("2. ", "bold blue"), ("Values in a ", "cyan"), (".env", "yellow"), (" file in the project root\n", "cyan"),
("3. ", "bold blue"), ("Values in a config file", "cyan"), (" (e.g., ", "dim"), ("gateway_config.yaml", "yellow"), (")\n", "dim"),
("4. ", "bold blue"), ("Default values defined in the configuration models", "cyan")
)
console.print(Panel(config_info, title="[bold]Configuration Loading Priority[/]", border_style="blue"))
# How to Set Keys Panel
set_keys_info = Text.assemble(
("Ensure API keys are available via one of the methods above,\n", "white"),
("preferably using ", "white"), ("environment variables", "cyan"), (" or a ", "white"), (".env", "yellow"), (" file.", "white")
)
console.print(Panel(set_keys_info, title="[bold]How to Set API Keys[/]", border_style="green"))
# Example .env Panel
env_example_lines = []
for env_var in PROVIDER_ENV_VAR_MAP.values():
env_example_lines.append(Text.assemble((env_var, "yellow"), "=", ("your_", "dim"), (env_var.lower(), "dim cyan"), ("_here", "dim")))
env_example_content = Text("\n").join(env_example_lines)
console.print(Panel(env_example_content, title="[bold dim]Example .env file content[/]", border_style="yellow"))
console.print("[bold green]Run your example scripts or the main server after setting the API keys.[/bold green]")
return 0
if __name__ == "__main__":
exit_code = asyncio.run(check_api_keys())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/prompts/repository.py:
--------------------------------------------------------------------------------
```python
"""Prompt repository for managing and accessing prompts."""
import asyncio
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import aiofiles
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
class PromptRepository:
"""Repository for managing and accessing prompts."""
_instance = None
def __new__(cls, *args, **kwargs):
"""Singleton implementation for prompt repository."""
if cls._instance is None:
cls._instance = super(PromptRepository, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, base_dir: Optional[Union[str, Path]] = None):
"""Initialize the prompt repository.
Args:
base_dir: Base directory for prompt storage
"""
# Only initialize once for singleton
if self._initialized:
return
# Set base directory
if base_dir:
self.base_dir = Path(base_dir)
else:
# Default to project directory / prompts
self.base_dir = Path.home() / ".ultimate" / "prompts"
# Create directory if it doesn't exist
self.base_dir.mkdir(parents=True, exist_ok=True)
# Cache for prompts
self.prompts = {}
# Flag as initialized
self._initialized = True
logger.info(
f"Prompt repository initialized (base_dir: {self.base_dir})",
emoji_key="provider"
)
async def get_prompt(self, prompt_id: str) -> Optional[Dict[str, Any]]:
"""Get a prompt by ID.
Args:
prompt_id: Prompt identifier
Returns:
Prompt data or None if not found
"""
# Check cache first
if prompt_id in self.prompts:
return self.prompts[prompt_id]
# Try to load from file
prompt_path = self.base_dir / f"{prompt_id}.json"
if not prompt_path.exists():
logger.warning(
f"Prompt '{prompt_id}' not found",
emoji_key="warning"
)
return None
try:
# Load prompt from file
async with asyncio.Lock():
with open(prompt_path, "r", encoding="utf-8") as f:
prompt_data = json.load(f)
# Cache for future use
self.prompts[prompt_id] = prompt_data
return prompt_data
except Exception as e:
logger.error(
f"Error loading prompt '{prompt_id}': {str(e)}",
emoji_key="error"
)
return None
async def save_prompt(self, prompt_id: str, prompt_data: Dict[str, Any]) -> bool:
"""Save a prompt.
Args:
prompt_id: Prompt identifier
prompt_data: Prompt data to save
Returns:
True if successful
"""
# Validate prompt data
if not isinstance(prompt_data, dict) or "template" not in prompt_data:
logger.error(
f"Invalid prompt data for '{prompt_id}'",
emoji_key="error"
)
return False
try:
# Save to cache
self.prompts[prompt_id] = prompt_data
# Save to file
prompt_path = self.base_dir / f"{prompt_id}.json"
async with asyncio.Lock():
async with aiofiles.open(prompt_path, "w", encoding="utf-8") as f:
await f.write(json.dumps(prompt_data, indent=2))
logger.info(
f"Saved prompt '{prompt_id}'",
emoji_key="success"
)
return True
except Exception as e:
logger.error(
f"Error saving prompt '{prompt_id}': {str(e)}",
emoji_key="error"
)
return False
async def delete_prompt(self, prompt_id: str) -> bool:
"""Delete a prompt.
Args:
prompt_id: Prompt identifier
Returns:
True if successful
"""
# Remove from cache
if prompt_id in self.prompts:
del self.prompts[prompt_id]
# Remove file if exists
prompt_path = self.base_dir / f"{prompt_id}.json"
if prompt_path.exists():
try:
os.remove(prompt_path)
logger.info(
f"Deleted prompt '{prompt_id}'",
emoji_key="success"
)
return True
except Exception as e:
logger.error(
f"Error deleting prompt '{prompt_id}': {str(e)}",
emoji_key="error"
)
return False
return False
async def list_prompts(self) -> List[str]:
"""List available prompts.
Returns:
List of prompt IDs
"""
try:
# Get prompt files
prompt_files = list(self.base_dir.glob("*.json"))
# Extract IDs from filenames
prompt_ids = [f.stem for f in prompt_files]
return prompt_ids
except Exception as e:
logger.error(
f"Error listing prompts: {str(e)}",
emoji_key="error"
)
return []
def get_prompt_repository(base_dir: Optional[Union[str, Path]] = None) -> PromptRepository:
"""Get the prompt repository singleton instance.
Args:
base_dir: Base directory for prompt storage
Returns:
PromptRepository singleton instance
"""
return PromptRepository(base_dir)
```
--------------------------------------------------------------------------------
/examples/simple_completion_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
Simple completion demo using Ultimate MCP Server's direct provider functionality.
This example demonstrates how to:
1. Initialize the Ultimate MCP Server Gateway
2. Connect directly to an LLM provider (OpenAI)
3. Generate a text completion with a specific model
4. Track and display token usage and costs
The demo bypasses the MCP tool interface and interacts directly with provider APIs,
which is useful for understanding the underlying provider connections or when you need
lower-level access to provider-specific features. It also showcases the CostTracker
utility for monitoring token usage and associated costs across multiple requests.
This script can be run as a standalone Python module and serves as a minimal example of
direct provider integration with the Ultimate MCP Server framework.
Usage:
python examples/simple_completion_demo.py
"""
import asyncio
import sys
from pathlib import Path
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker
from ultimate_mcp_server.utils.logging.console import console
# Initialize logger
logger = get_logger("example.simple_completion")
async def run_model_demo(tracker: CostTracker):
"""
Run a simple completion demo using direct provider access to LLM APIs.
This function demonstrates the complete workflow for generating text completions
using the Ultimate MCP Server framework with direct provider access:
1. Initialize a Gateway instance without registering tools
2. Initialize the LLM providers from configuration
3. Access a specific provider (OpenAI in this case)
4. Generate a completion with a specific prompt and model
5. Display the completion result with Rich formatting
6. Track and display token usage and cost metrics
Direct provider access (vs. using MCP tools) offers more control over provider-specific
parameters and is useful for applications that need to customize provider interactions
beyond what the standard MCP tools offer.
Args:
tracker: CostTracker instance to record token usage and costs for this operation.
The tracker will be updated with the completion results.
Returns:
int: Exit code - 0 for success, 1 for failure
Raises:
Various exceptions may be raised by the provider initialization or completion
generation process, but these are logged and contained within this function.
"""
logger.info("Starting simple completion demo", emoji_key="start")
# Use Rich Rule for title
console.print(Rule("[bold blue]Simple Completion Demo[/bold blue]"))
# Create Gateway instance
gateway = Gateway("simple-demo", register_tools=False)
# Initialize providers
logger.info("Initializing providers", emoji_key="provider")
await gateway._initialize_providers()
# Get provider (OpenAI)
provider_name = Provider.OPENAI.value
provider = gateway.providers.get(provider_name)
if not provider:
logger.error(f"Provider {provider_name} not available", emoji_key="error")
return 1
logger.success(f"Provider {provider_name} initialized", emoji_key="success")
# List available models
models = await provider.list_models()
logger.info(f"Available models: {len(models)}", emoji_key="model")
# Pick a valid model from the provider
model = "gpt-4.1-mini" # A valid model from constants.py
# Generate a completion
prompt = "Explain quantum computing in simple terms."
logger.info(f"Generating completion with {model}", emoji_key="processing")
result = await provider.generate_completion(
prompt=prompt,
model=model,
temperature=0.7,
max_tokens=150
)
# Print the result using Rich Panel
logger.success("Completion generated successfully!", emoji_key="success")
console.print(Panel(
result.text.strip(),
title=f"Quantum Computing Explanation ({model})",
subtitle=f"Prompt: {prompt}",
border_style="green",
expand=False
))
# Print stats using Rich Table
stats_table = Table(title="Completion Stats", show_header=False, box=None)
stats_table.add_column("Metric", style="cyan")
stats_table.add_column("Value", style="white")
stats_table.add_row("Input Tokens", str(result.input_tokens))
stats_table.add_row("Output Tokens", str(result.output_tokens))
stats_table.add_row("Cost", f"${result.cost:.6f}")
stats_table.add_row("Processing Time", f"{result.processing_time:.2f}s")
console.print(stats_table)
# Track the call
tracker.add_call(result)
# Display cost summary
tracker.display_summary(console)
return 0
async def main():
"""
Entry point function that sets up the demo environment and error handling.
This function:
1. Creates a CostTracker instance to monitor token usage and costs
2. Calls the run_model_demo function within a try-except block
3. Handles and logs any uncaught exceptions
4. Returns an appropriate exit code based on execution success/failure
The separation between main() and run_model_demo() allows for clean error handling
and resource management at the top level while keeping the demo logic organized
in its own function.
Returns:
int: Exit code - 0 for success, 1 for failure
"""
tracker = CostTracker()
try:
return await run_model_demo(tracker)
except Exception as e:
logger.critical(f"Demo failed: {str(e)}", emoji_key="critical")
return 1
if __name__ == "__main__":
# Run the demo
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/examples/sample/medical_case.txt:
--------------------------------------------------------------------------------
```
PATIENT MEDICAL RECORD
Memorial Hospital Medical Center
123 Medical Center Blvd, Boston, MA 02118
CONFIDENTIAL - FOR MEDICAL PERSONNEL ONLY
Patient ID: MH-459872
Date of Admission: April 10, 2025
Attending Physician: Dr. Elizabeth Chen, MD (Cardiology)
Consulting Physicians: Dr. Robert Martinez, MD (Neurology), Dr. Sarah Williams, MD (Endocrinology)
PATIENT INFORMATION
Name: John Anderson
DOB: 05/22/1968 (57 years old)
Gender: Male
Address: 45 Maple Street, Apt 3B, Cambridge, MA 02139
Contact: (617) 555-3829
Emergency Contact: Mary Anderson (Wife) - (617) 555-4912
Insurance: BlueCross BlueShield, Policy #BCB-88765432
CHIEF COMPLAINT
Patient presented to the Emergency Department with acute chest pain, shortness of breath, and left arm numbness beginning approximately 2 hours prior to arrival.
CURRENT MEDICATIONS
1. Metformin 1000mg twice daily (for Type 2 Diabetes, prescribed by Dr. Williams in 2019)
2. Atorvastatin 40mg daily (for Hypercholesterolemia, prescribed by Dr. Chen in 2022)
3. Lisinopril 20mg daily (for Hypertension, prescribed by Dr. Chen in 2022)
4. Aspirin 81mg daily (for cardiovascular health, prescribed by Dr. Chen in 2022)
5. Sertraline 50mg daily (for Depression, prescribed by Dr. Thomas Gordon in 2023)
ALLERGIES
1. Penicillin (Severe - Hives, Difficulty Breathing)
2. Shellfish (Moderate - Gastrointestinal distress)
PAST MEDICAL HISTORY
1. Type 2 Diabetes Mellitus (Diagnosed 2019 by Dr. Williams)
2. Hypertension (Diagnosed 2020 by Dr. Chen)
3. Hypercholesterolemia (Diagnosed 2020 by Dr. Chen)
4. Depression (Diagnosed 2023 by Dr. Gordon)
5. Left knee arthroscopy (2015, Boston Orthopedic Center, Dr. James Miller)
FAMILY HISTORY
- Father: Deceased at age 68 from myocardial infarction, had hypertension, type 2 diabetes
- Mother: Living, age 82, has hypertension, osteoarthritis
- Brother: Age 60, has type 2 diabetes, hypercholesterolemia
- Sister: Age 55, no known medical conditions
SOCIAL HISTORY
Occupation: High school mathematics teacher
Tobacco: Former smoker, quit in 2018 (25 pack-year history)
Alcohol: Occasional (1-2 drinks per week)
Exercise: Walks 20 minutes, 3 times per week
Diet: Reports following a "mostly" diabetic diet with occasional non-compliance
PHYSICAL EXAMINATION
Vital Signs:
- BP: 165/95 mmHg
- HR: 95 bpm
- RR: 22 breaths/min
- Temp: 98.6°F (37°C)
- O2 Saturation: 94% on room air
General: Patient is alert but anxious, in moderate distress
Cardiovascular: Irregular rhythm, tachycardia, S3 gallop present, no murmurs
Respiratory: Bibasilar crackles, decreased breath sounds at bases bilaterally
Neurological: Left arm weakness (4/5 strength), otherwise grossly intact
Extremities: No edema, normal peripheral pulses
DIAGNOSTIC STUDIES
Laboratory:
- Troponin I: 2.3 ng/mL (elevated)
- CK-MB: 12.5 ng/mL (elevated)
- BNP: 450 pg/mL (elevated)
- Complete Blood Count: WBC 12,000/μL, Hgb 13.5 g/dL, Plt 230,000/μL
- Complete Metabolic Panel: Glucose 185 mg/dL, Cr 1.1 mg/dL, BUN 22 mg/dL
- Lipid Panel: Total Chol 210 mg/dL, LDL 130 mg/dL, HDL 35 mg/dL, TG 190 mg/dL
- HbA1c: 7.8%
Imaging and Other Studies:
- ECG: ST-segment elevation in leads II, III, aVF; reciprocal changes in I, aVL
- Chest X-ray: Mild pulmonary edema, cardiomegaly
- Echocardiogram: EF 40%, inferior wall hypokinesis, moderate mitral regurgitation
- Cardiac Catheterization: 90% occlusion of right coronary artery, 70% occlusion of left circumflex artery
DIAGNOSIS
1. Acute ST-elevation Myocardial Infarction (STEMI), inferior wall
2. Coronary Artery Disease, multivessel
3. Congestive Heart Failure, acute onset (NYHA Class III)
4. Type 2 Diabetes Mellitus, inadequately controlled
5. Essential Hypertension, inadequately controlled
6. Hyperlipidemia
TREATMENT
Procedures:
1. Emergency Percutaneous Coronary Intervention (PCI) with drug-eluting stent placement in right coronary artery by Dr. Michael Wilson on April 10, 2025
2. Scheduled PCI for left circumflex artery by Dr. Wilson on April 13, 2025
Medications:
1. Aspirin 325mg daily
2. Clopidogrel 75mg daily
3. Metoprolol succinate 50mg daily
4. Lisinopril 40mg daily (increased from 20mg)
5. Atorvastatin 80mg daily (increased from 40mg)
6. Furosemide 40mg twice daily
7. Metformin continued at 1000mg twice daily
8. Insulin glargine 20 units at bedtime (new)
9. Sertraline continued at 50mg daily
HOSPITAL COURSE
Patient was admitted through the Emergency Department and taken emergently to the cardiac catheterization lab where he underwent successful PCI with stent placement to the right coronary artery. Post-procedure, the patient was transferred to the Cardiac Care Unit (CCU) for close monitoring. Patient experienced a brief episode of ventricular fibrillation on the first night, which was successfully treated with defibrillation. Cardiology and endocrinology were consulted for management of heart failure and diabetes. Follow-up echocardiogram on April 12 showed improvement in EF to 45%. Patient underwent scheduled PCI of the left circumflex artery on April 13 without complications.
DISCHARGE PLAN
Discharge Date: April 16, 2025
Discharge Disposition: Home with scheduled home health visits from Memorial Home Health Services
Follow-up Appointments:
1. Dr. Elizabeth Chen (Cardiology) - April 23, 2025 at 10:00 AM
2. Dr. Sarah Williams (Endocrinology) - April 25, 2025 at 2:30 PM
3. Cardiac Rehabilitation evaluation - April 30, 2025 at 1:00 PM
Patient Education:
1. STEMI and coronary artery disease management
2. Diabetes self-management and glucometer use
3. Heart-healthy diet (consultation with nutritionist completed)
4. Medication management and adherence
5. Warning signs requiring immediate medical attention
PROGNOSIS
Guarded. Patient has significant coronary artery disease with reduced ejection fraction. Long-term prognosis will depend on medication adherence, lifestyle modifications, and management of comorbidities.
ATTESTATION
I have personally examined the patient and reviewed all diagnostic studies. This documentation is complete and accurate to the best of my knowledge.
Electronically signed by:
Elizabeth Chen, MD
Cardiology
Memorial Hospital Medical Center
Date: April 16, 2025 | Time: 14:35
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/graceful_shutdown.py:
--------------------------------------------------------------------------------
```python
"""
Graceful shutdown utilities for Ultimate MCP Server.
This module provides utilities to handle signals and gracefully terminate
the application with ZERO error outputs during shutdown using OS-level redirection.
"""
import asyncio
import logging
import os
import signal
import sys
import warnings
from contextlib import suppress
from typing import Callable, List, Optional
logger = logging.getLogger("ultimate_mcp_server.shutdown")
# Track registered shutdown handlers and state
_shutdown_handlers: List[Callable] = []
_shutdown_in_progress = False
_original_stderr_fd = None
_devnull_fd = None
def _redirect_stderr_to_devnull():
"""Redirect stderr to /dev/null at the OS level"""
global _original_stderr_fd, _devnull_fd
try:
if _original_stderr_fd is None:
# Save original stderr file descriptor
_original_stderr_fd = os.dup(sys.stderr.fileno())
# Open /dev/null
_devnull_fd = os.open(os.devnull, os.O_WRONLY)
# Redirect stderr to /dev/null
os.dup2(_devnull_fd, sys.stderr.fileno())
except Exception:
# If redirection fails, just continue
pass
def _restore_stderr():
"""Restore original stderr"""
global _original_stderr_fd, _devnull_fd
try:
if _original_stderr_fd is not None:
os.dup2(_original_stderr_fd, sys.stderr.fileno())
os.close(_original_stderr_fd)
_original_stderr_fd = None
if _devnull_fd is not None:
os.close(_devnull_fd)
_devnull_fd = None
except Exception:
pass
def register_shutdown_handler(handler: Callable) -> None:
"""Register a function to be called during graceful shutdown."""
if handler not in _shutdown_handlers:
_shutdown_handlers.append(handler)
def remove_shutdown_handler(handler: Callable) -> None:
"""Remove a previously registered shutdown handler."""
if handler in _shutdown_handlers:
_shutdown_handlers.remove(handler)
async def _execute_shutdown_handlers():
"""Execute all registered shutdown handlers with complete error suppression"""
for handler in _shutdown_handlers:
with suppress(Exception): # Suppress ALL exceptions
if asyncio.iscoroutinefunction(handler):
with suppress(asyncio.TimeoutError, asyncio.CancelledError):
await asyncio.wait_for(handler(), timeout=3.0)
else:
handler()
def _handle_shutdown_signal(signum, frame):
"""Handle shutdown signals - IMMEDIATE TERMINATION"""
global _shutdown_in_progress
if _shutdown_in_progress:
# Force immediate exit on second signal
os._exit(1)
return
_shutdown_in_progress = True
# Print final message to original stderr if possible
try:
if _original_stderr_fd:
os.write(_original_stderr_fd, b"\n[Graceful Shutdown] Signal received. Exiting...\n")
else:
print("\n[Graceful Shutdown] Signal received. Exiting...", file=sys.__stderr__)
except Exception:
pass
# Immediately redirect stderr to suppress any error output
_redirect_stderr_to_devnull()
# Suppress all warnings
warnings.filterwarnings("ignore")
# Try to run shutdown handlers quickly, but don't wait long
try:
loop = asyncio.get_running_loop()
# Create a task but don't wait for it - just exit
asyncio.create_task(_execute_shutdown_handlers())
# Give it a tiny bit of time then exit
loop.call_later(0.5, lambda: os._exit(0))
except RuntimeError:
# No running loop - just exit immediately
os._exit(0)
def setup_signal_handlers(loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
"""Set up signal handlers for immediate shutdown"""
# Use traditional signal handlers for immediate termination
signal.signal(signal.SIGINT, _handle_shutdown_signal)
signal.signal(signal.SIGTERM, _handle_shutdown_signal)
# Also try to set up async handlers if we have a loop
if loop is not None:
try:
for sig in [signal.SIGINT, signal.SIGTERM]:
try:
loop.add_signal_handler(sig, lambda s=sig: _handle_shutdown_signal(s, None))
except (NotImplementedError, OSError):
# Platform doesn't support async signal handlers
pass
except Exception:
# Fallback is already set up with signal.signal above
pass
def enable_quiet_shutdown():
"""Enable comprehensive quiet shutdown - immediate termination approach"""
# Set up signal handlers immediately
setup_signal_handlers()
# Suppress asyncio debug mode
try:
asyncio.get_event_loop().set_debug(False)
except RuntimeError:
pass
# Suppress warnings
warnings.filterwarnings("ignore")
def force_silent_exit():
"""Force immediate silent exit with no output whatsoever"""
global _shutdown_in_progress
_shutdown_in_progress = True
_redirect_stderr_to_devnull()
os._exit(0)
class QuietUvicornServer:
"""Custom Uvicorn server that overrides signal handling for quiet shutdown"""
def __init__(self, config):
import uvicorn
self.config = config
self.server = uvicorn.Server(config)
def install_signal_handlers(self):
"""Override uvicorn's signal handlers with our quiet ones"""
# Set up our own signal handlers instead of uvicorn's
setup_signal_handlers()
def run(self):
"""Run the server with custom signal handling"""
# Patch the server's install_signal_handlers method
self.server.install_signal_handlers = self.install_signal_handlers
# Set up our signal handlers immediately
setup_signal_handlers()
# Run the server
self.server.run()
def create_quiet_server(config):
"""Create a uvicorn server with quiet shutdown handling"""
return QuietUvicornServer(config)
```
--------------------------------------------------------------------------------
/model_preferences.py:
--------------------------------------------------------------------------------
```python
"""
Model preferences for MCP servers.
This module implements the ModelPreferences capability from the MCP protocol,
allowing servers to express preferences for model selection during sampling.
"""
from typing import List, Optional
class ModelHint:
"""
Hint for model selection.
Model hints allow the server to suggest specific models or model families
that would be appropriate for a given task.
"""
def __init__(self, name: str):
"""
Initialize a model hint.
Args:
name: A hint for a model name (e.g., 'claude-3-5-sonnet', 'sonnet', 'claude').
This should be treated as a substring matching.
"""
self.name = name
def to_dict(self) -> dict:
"""Convert model hint to dictionary."""
return {"name": self.name}
class ModelPreferences:
"""
Preferences for model selection to guide LLM client decisions.
The ModelPreferences class provides a standardized way for servers to express
prioritization along three key dimensions (intelligence, speed, cost) that can
help clients make more informed decisions when selecting LLM models for specific tasks.
These preferences serve as advisory hints that help optimize the tradeoffs between:
- Intelligence/capability: Higher quality, more capable models (but often slower/costlier)
- Speed: Faster response time and lower latency (but potentially less capable)
- Cost: Lower token or API costs (but potentially less capable or slower)
The class also supports model-specific hints that can recommend particular models
or model families that are well-suited for specific tasks (e.g., suggesting Claude
models for creative writing or GPT-4V for image analysis).
All preferences are expressed with normalized values between 0.0 (lowest priority)
and 1.0 (highest priority) to allow for consistent interpretation across different
implementations.
Note: These preferences are always advisory. Clients may use them as guidance but
are not obligated to follow them, particularly if there are overriding user preferences
or system constraints.
Usage example:
```python
# For a coding task requiring high intelligence but where cost is a major concern
preferences = ModelPreferences(
intelligence_priority=0.8, # High priority on capability
speed_priority=0.4, # Moderate priority on speed
cost_priority=0.7, # High priority on cost
hints=[ModelHint("gpt-4-turbo")] # Specific model recommendation
)
```
"""
def __init__(
self,
intelligence_priority: float = 0.5,
speed_priority: float = 0.5,
cost_priority: float = 0.5,
hints: Optional[List[ModelHint]] = None
):
"""
Initialize model preferences.
Args:
intelligence_priority: How much to prioritize intelligence/capabilities (0.0-1.0).
Higher values favor more capable, sophisticated models that may produce
higher quality outputs, handle complex tasks, or follow instructions better.
Default: 0.5 (balanced)
speed_priority: How much to prioritize sampling speed/latency (0.0-1.0).
Higher values favor faster models with lower latency, which is important
for real-time applications, interactive experiences, or time-sensitive tasks.
Default: 0.5 (balanced)
cost_priority: How much to prioritize cost efficiency (0.0-1.0).
Higher values favor more economical models with lower token or API costs,
which is important for budget-constrained applications or high-volume usage.
Default: 0.5 (balanced)
hints: Optional model hints in preference order. These can suggest specific
models or model families that would be appropriate for the task.
The list should be ordered by preference (most preferred first).
"""
# Clamp values between 0 and 1
self.intelligence_priority = max(0.0, min(1.0, intelligence_priority))
self.speed_priority = max(0.0, min(1.0, speed_priority))
self.cost_priority = max(0.0, min(1.0, cost_priority))
self.hints = hints or []
def to_dict(self) -> dict:
"""Convert model preferences to dictionary."""
return {
"intelligencePriority": self.intelligence_priority,
"speedPriority": self.speed_priority,
"costPriority": self.cost_priority,
"hints": [hint.to_dict() for hint in self.hints]
}
# Pre-defined preference templates for common use cases
# Default balanced preference profile - no strong bias in any direction
# Use when there's no clear priority between intelligence, speed, and cost
# Good for general-purpose applications where trade-offs are acceptable
BALANCED_PREFERENCES = ModelPreferences(
intelligence_priority=0.5,
speed_priority=0.5,
cost_priority=0.5
)
# Prioritizes high-quality, sophisticated model responses
# Use for complex reasoning, creative tasks, or critical applications
# where accuracy and capability matter more than speed or cost
INTELLIGENCE_FOCUSED = ModelPreferences(
intelligence_priority=0.9,
speed_priority=0.3,
cost_priority=0.3,
hints=[ModelHint("claude-3-5-opus")]
)
# Prioritizes response speed and low latency
# Use for real-time applications, interactive experiences,
# chatbots, or any use case where user wait time is critical
SPEED_FOCUSED = ModelPreferences(
intelligence_priority=0.3,
speed_priority=0.9,
cost_priority=0.5,
hints=[ModelHint("claude-3-haiku"), ModelHint("gemini-flash")]
)
# Prioritizes cost efficiency and token economy
# Use for high-volume applications, background processing,
# or when operating under strict budget constraints
COST_FOCUSED = ModelPreferences(
intelligence_priority=0.3,
speed_priority=0.5,
cost_priority=0.9,
hints=[ModelHint("mistral"), ModelHint("gemini-flash")]
)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/themes.py:
--------------------------------------------------------------------------------
```python
"""
Color themes for Gateway logging system.
This module defines color schemes for different log levels, operations, and components
to provide visual consistency and improve readability of log output.
"""
from typing import Optional, Tuple
from rich.style import Style
from rich.theme import Theme
COLORS = {
# Main colors
"primary": "bright_blue",
"secondary": "cyan",
"accent": "magenta",
"success": "green",
"warning": "yellow",
"error": "red",
"critical": "bright_red",
"info": "bright_blue",
"debug": "bright_black",
"trace": "bright_black",
# Component-specific colors (Adapt as needed for ultimate)
"core": "blue",
"provider": "cyan", # Example: Renamed 'composite' to 'provider'
"router": "green", # Example: Renamed 'analysis' to 'router'
"cache": "bright_magenta",
"api": "bright_yellow",
"mcp": "bright_blue", # Kept if relevant
"utils": "magenta", # Example: Added 'utils'
"default_component": "bright_cyan", # Fallback component color
# Tool-specific colors (Keep or remove as needed)
"ripgrep": "blue",
"awk": "green",
"jq": "yellow",
"sqlite": "magenta",
# Result/Status colors
"high_confidence": "green",
"medium_confidence": "yellow",
"low_confidence": "red",
# Misc
"muted": "bright_black",
"highlight": "bright_white",
"timestamp": "bright_black",
"path": "bright_blue",
"code": "bright_cyan",
"data": "bright_yellow",
"data.key": "bright_black", # Added for context tables
}
STYLES = {
# Base styles for log levels
"info": Style(color=COLORS["info"]),
"debug": Style(color=COLORS["debug"]),
"warning": Style(color=COLORS["warning"], bold=True),
"error": Style(color=COLORS["error"], bold=True),
"critical": Style(color=COLORS["critical"], bold=True, reverse=True),
"success": Style(color=COLORS["success"], bold=True),
"trace": Style(color=COLORS["trace"], dim=True),
# Component styles (Matching adapted COLORS)
"core": Style(color=COLORS["core"], bold=True),
"provider": Style(color=COLORS["provider"], bold=True),
"router": Style(color=COLORS["router"], bold=True),
"cache": Style(color=COLORS["cache"], bold=True),
"api": Style(color=COLORS["api"], bold=True),
"mcp": Style(color=COLORS["mcp"], bold=True),
"utils": Style(color=COLORS["utils"], bold=True),
"default_component": Style(color=COLORS["default_component"], bold=True),
# Operation styles
"operation": Style(color=COLORS["accent"], bold=True),
"startup": Style(color=COLORS["success"], bold=True),
"shutdown": Style(color=COLORS["error"], bold=True),
"request": Style(color=COLORS["primary"], bold=True),
"response": Style(color=COLORS["secondary"], bold=True),
# Confidence level styles
"high_confidence": Style(color=COLORS["high_confidence"], bold=True),
"medium_confidence": Style(color=COLORS["medium_confidence"], bold=True),
"low_confidence": Style(color=COLORS["low_confidence"], bold=True),
# Misc styles
"timestamp": Style(color=COLORS["timestamp"], dim=True),
"path": Style(color=COLORS["path"], underline=True),
"code": Style(color=COLORS["code"], italic=True),
"data": Style(color=COLORS["data"]),
"data.key": Style(color=COLORS["data.key"], bold=True),
"muted": Style(color=COLORS["muted"], dim=True),
"highlight": Style(color=COLORS["highlight"], bold=True),
}
# Rich theme that can be used directly with Rich Console
RICH_THEME = Theme({name: style for name, style in STYLES.items()})
# Get the appropriate style for a log level
def get_level_style(level: str) -> Style:
"""Get the Rich style for a specific log level.
Args:
level: The log level (info, debug, warning, error, critical, success, trace)
Returns:
The corresponding Rich Style
"""
level = level.lower()
return STYLES.get(level, STYLES["info"]) # Default to info style
# Get style for a component
def get_component_style(component: str) -> Style:
"""Get the Rich style for a specific component.
Args:
component: The component name (core, provider, router, etc.)
Returns:
The corresponding Rich Style
"""
component = component.lower()
# Fallback to a default component style if specific one not found
return STYLES.get(component, STYLES["default_component"])
# Get color by name
def get_color(name: str) -> str:
"""Get a color by name.
Args:
name: The color name
Returns:
The color string that can be used with Rich
"""
return COLORS.get(name.lower(), COLORS["primary"])
# Apply style to text directly
def style_text(text: str, style_name: str) -> str:
"""Apply a named style to text (for use without Rich console).
This is a utility function that doesn't depend on Rich, useful for
simple terminal output or when Rich console isn't available.
Args:
text: The text to style
style_name: The name of the style to apply
Returns:
Text with ANSI color codes applied (using Rich tags for simplicity)
"""
# This uses Rich markup format for simplicity, assuming it will be printed
# by a Rich console later or that the markup is acceptable.
return f"[{style_name}]{text}[/{style_name}]"
# Get foreground and background colors for a specific context
def get_context_colors(
context: str, component: Optional[str] = None
) -> Tuple[str, Optional[str]]:
"""Get appropriate foreground and background colors for a log context.
Args:
context: The log context (e.g., 'request', 'response')
component: Optional component name for further refinement
Returns:
Tuple of (foreground_color, background_color) or (color, None)
"""
style = STYLES.get(context.lower()) or STYLES.get("default_component")
if style and style.color:
return (str(style.color.name), str(style.bgcolor.name) if style.bgcolor else None)
else:
# Fallback to basic colors
fg = COLORS.get(context.lower(), COLORS["primary"])
return (fg, None)
```
--------------------------------------------------------------------------------
/examples/test_code_extraction.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Test script for the LLM-based code extraction function.
This script loads the tournament state from a previous run and tests
the new code extraction function against the raw response texts.
"""
import asyncio
import json
import sys
from pathlib import Path
from typing import Dict
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
from rich import box
from rich.panel import Panel
from rich.table import Table
from ultimate_mcp_server.core.server import Gateway
# Import the extraction function from the library
from ultimate_mcp_server.tools import extract_code_from_response
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker # Import CostTracker
from ultimate_mcp_server.utils.logging.console import console
# Initialize logger
logger = get_logger("example.test_extraction")
# Create a simple structure for cost tracking (though likely won't be used directly here)
# TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])
# Initialize global gateway
gateway = None
# Path to the tournament state file from the last run
TOURNAMENT_STATE_PATH = "/data/projects/ultimate_mcp_server/storage/tournaments/2025-04-01_03-24-37_tournament_76009a9a/tournament_state.json"
async def setup_gateway():
"""Set up the gateway for testing."""
global gateway
# Create gateway instance
logger.info("Initializing gateway for testing", emoji_key="start")
gateway = Gateway("test-extraction", register_tools=False)
# Initialize the server with all providers and built-in tools
await gateway._initialize_providers()
logger.info("Gateway initialized", emoji_key="success")
async def load_tournament_state() -> Dict:
"""Load the tournament state from the previous run."""
try:
with open(TOURNAMENT_STATE_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading tournament state: {str(e)}", emoji_key="error")
return {}
async def test_extraction(tracker: CostTracker): # Add tracker
"""Test the LLM-based code extraction function."""
# Load the tournament state
tournament_state = await load_tournament_state()
if not tournament_state:
logger.error("Failed to load tournament state", emoji_key="error")
return 1
# Check if we have rounds_results
rounds_results = tournament_state.get('rounds_results', [])
if not rounds_results:
logger.error("No round results found in tournament state", emoji_key="error")
return 1
# Create a table to display the results
console.print("\n[bold]Testing LLM-based Code Extraction Function[/bold]\n")
# Create a table for extraction results
extraction_table = Table(box=box.MINIMAL, show_header=True, expand=False)
extraction_table.add_column("Round", style="cyan")
extraction_table.add_column("Model", style="magenta")
extraction_table.add_column("Code Extracted", style="green")
extraction_table.add_column("Line Count", style="yellow", justify="right")
# Process each round
for round_idx, round_data in enumerate(rounds_results):
responses = round_data.get('responses', {})
for model_id, response in responses.items():
display_model = model_id.split(':')[-1] if ':' in model_id else model_id
response_text = response.get('response_text', '')
if response_text:
# Extract code using our new function, passing the tracker
extracted_code = await extract_code_from_response(response_text, tracker=tracker)
# Calculate line count
line_count = len(extracted_code.split('\n')) if extracted_code else 0
# Add to the table
extraction_table.add_row(
str(round_idx),
display_model,
"✅" if extracted_code else "❌",
str(line_count)
)
# Print detailed results
if extracted_code:
console.print(Panel(
f"[bold]Round {round_idx} - {display_model}[/bold]\n\n"
f"[green]Successfully extracted {line_count} lines of code[/green]\n",
title="Extraction Result",
expand=False
))
# Print first 10 lines of code as a preview
code_preview = "\n".join(extracted_code.split('\n')[:10])
if len(extracted_code.split('\n')) > 10:
code_preview += "\n..."
console.print(Panel(
code_preview,
title="Code Preview",
expand=False
))
else:
console.print(Panel(
f"[bold]Round {round_idx} - {display_model}[/bold]\n\n"
f"[red]Failed to extract code[/red]\n",
title="Extraction Result",
expand=False
))
# Display the summary table
console.print("\n[bold]Extraction Summary:[/bold]")
console.print(extraction_table)
# Display cost summary at the end
tracker.display_summary(console)
return 0
async def main():
"""Run the test script."""
tracker = CostTracker() # Instantiate tracker
try:
# Set up gateway
await setup_gateway()
# Run the extraction test
return await test_extraction(tracker) # Pass tracker
except Exception as e:
logger.critical(f"Test failed: {str(e)}", emoji_key="critical", exc_info=True)
return 1
finally:
# Clean up
if gateway:
pass # No cleanup needed for Gateway instance
if __name__ == "__main__":
# Run the script
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/examples/cache_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Cache demonstration for Ultimate MCP Server."""
import asyncio
import sys
import time
from pathlib import Path
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
from rich.markup import escape
from rich.rule import Rule
from ultimate_mcp_server.services.cache import get_cache_service, run_completion_with_cache
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker, display_cache_stats
# --- Add Rich Imports ---
from ultimate_mcp_server.utils.logging.console import console
# ----------------------
# Initialize logger
logger = get_logger("example.cache_demo")
async def demonstrate_cache(tracker: CostTracker = None):
"""Demonstrate cache functionality using Rich."""
console.print(Rule("[bold blue]Cache Demonstration[/bold blue]"))
logger.info("Starting cache demonstration", emoji_key="start")
cache_service = get_cache_service()
if not cache_service.enabled:
logger.warning("Cache is disabled by default. Enabling for demonstration.", emoji_key="warning")
cache_service.enabled = True
cache_service.clear() # Start with a clean slate
logger.info("Cache cleared for demonstration", emoji_key="cache")
prompt = "Explain how caching works in distributed systems."
console.print(f"[cyan]Using Prompt:[/cyan] {escape(prompt)}")
console.print()
results = {}
times = {}
stats_log = {}
try:
# Helper function to get current stats snapshot
def get_current_stats_dict():
return {
"get_count": getattr(cache_service.metrics, "gets", 0), # Use gets for Total Gets
"hit_count": getattr(cache_service.metrics, "hits", 0),
"miss_count": getattr(cache_service.metrics, "misses", 0),
"set_count": getattr(cache_service.metrics, "stores", 0), # Use stores for Total Sets
# Add other stats if needed by display_cache_stats later
}
# 1. Cache Miss
logger.info("1. Running first completion (expect cache MISS)...", emoji_key="processing")
start_time = time.time()
results[1] = await run_completion_with_cache(prompt, use_cache=True)
times[1] = time.time() - start_time
stats_log[1] = get_current_stats_dict()
# Track cost - only for non-cache hits (actual API calls)
if tracker:
tracker.add_call(results[1])
console.print(f" [yellow]MISS:[/yellow] Took [bold]{times[1]:.3f}s[/bold] (Cost: ${results[1].cost:.6f}, Tokens: {results[1].total_tokens})")
# 2. Cache Hit
logger.info("2. Running second completion (expect cache HIT)...", emoji_key="processing")
start_time = time.time()
results[2] = await run_completion_with_cache(prompt, use_cache=True)
times[2] = time.time() - start_time
stats_log[2] = get_current_stats_dict()
speedup = times[1] / times[2] if times[2] > 0 else float('inf')
console.print(f" [green]HIT:[/green] Took [bold]{times[2]:.3f}s[/bold] (Speed-up: {speedup:.1f}x vs Miss)")
# 3. Cache Bypass
logger.info("3. Running third completion (BYPASS cache)...", emoji_key="processing")
start_time = time.time()
results[3] = await run_completion_with_cache(prompt, use_cache=False)
times[3] = time.time() - start_time
stats_log[3] = get_current_stats_dict() # Stats shouldn't change much for bypass
# Track cost - bypassing cache calls the API
if tracker:
tracker.add_call(results[3])
console.print(f" [cyan]BYPASS:[/cyan] Took [bold]{times[3]:.3f}s[/bold] (Cost: ${results[3].cost:.6f}, Tokens: {results[3].total_tokens})")
# 4. Another Cache Hit
logger.info("4. Running fourth completion (expect cache HIT again)...", emoji_key="processing")
start_time = time.time()
results[4] = await run_completion_with_cache(prompt, use_cache=True)
times[4] = time.time() - start_time
stats_log[4] = get_current_stats_dict()
speedup_vs_bypass = times[3] / times[4] if times[4] > 0 else float('inf')
console.print(f" [green]HIT:[/green] Took [bold]{times[4]:.3f}s[/bold] (Speed-up: {speedup_vs_bypass:.1f}x vs Bypass)")
console.print()
except Exception as e:
logger.error(f"Error during cache demonstration run: {e}", emoji_key="error", exc_info=True)
console.print(f"[bold red]Error during demo run:[/bold red] {escape(str(e))}")
# Attempt to display stats even if error occurred mid-way
final_stats_dict = get_current_stats_dict() # Get stats even on error
else:
# Get final stats if all runs succeeded
final_stats_dict = get_current_stats_dict()
# Prepare the final stats dictionary for display_cache_stats
# It expects top-level keys like 'enabled', 'persistence', and a 'stats' sub-dict
display_stats = {
"enabled": cache_service.enabled,
"persistence": cache_service.enable_persistence,
"stats": final_stats_dict,
# Add savings if available/calculated (Example: Placeholder)
# "savings": { "cost": getattr(cache_service.metrics, "saved_cost", 0.0) }
}
# Display Final Cache Statistics using our display function
display_cache_stats(display_stats, stats_log, console)
console.print()
# Use the persistence setting directly from cache_service
if cache_service.enable_persistence:
logger.info("Cache persistence is enabled.", emoji_key="cache")
if hasattr(cache_service, 'cache_dir'):
console.print(f"[dim]Cache Directory: {cache_service.cache_dir}[/dim]")
else:
logger.info("Cache persistence is disabled.", emoji_key="cache")
console.print()
async def main():
"""Run cache demonstration."""
tracker = CostTracker() # Create cost tracker instance
try:
await demonstrate_cache(tracker)
# Display cost summary at the end
tracker.display_summary(console)
except Exception as e:
logger.critical(f"Cache demonstration failed: {str(e)}", emoji_key="critical")
return 1
return 0
if __name__ == "__main__":
# Run the demonstration
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/examples/multi_provider_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Multi-provider completion demo using Ultimate MCP Server."""
import asyncio
import sys
from pathlib import Path
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
# Third-party imports
# These imports need to be below sys.path modification, which is why they have noqa comments
from rich import box # noqa: E402
from rich.markup import escape # noqa: E402
from rich.panel import Panel # noqa: E402
from rich.rule import Rule # noqa: E402
from rich.table import Table # noqa: E402
# Project imports
from ultimate_mcp_server.constants import Provider # noqa: E402
from ultimate_mcp_server.core.server import Gateway # noqa: E402
from ultimate_mcp_server.utils import get_logger # noqa: E402
from ultimate_mcp_server.utils.display import CostTracker # Import CostTracker
from ultimate_mcp_server.utils.logging.console import console # noqa: E402
# Initialize logger and console
logger = get_logger("example.multi_provider")
async def run_provider_comparison(tracker: CostTracker):
"""Run a comparison of completions across multiple providers using Rich."""
console.print(Rule("[bold blue]Multi-Provider Completion Comparison[/bold blue]"))
logger.info("Starting multi-provider comparison demo", emoji_key="start")
# Create Gateway instance - this handles provider initialization
gateway = Gateway("multi-provider-demo", register_tools=False)
# Initialize providers
logger.info("Initializing providers...", emoji_key="provider")
await gateway._initialize_providers()
prompt = "Explain the advantages of quantum computing in 3-4 sentences."
console.print(f"[cyan]Prompt:[/cyan] {escape(prompt)}")
# Use model names directly if providers are inferred or handled by get_provider
configs = [
{"provider": Provider.OPENAI.value, "model": "gpt-4.1-mini"},
{"provider": Provider.ANTHROPIC.value, "model": "claude-3-5-haiku-20241022"},
{"provider": Provider.GEMINI.value, "model": "gemini-2.0-flash-lite"},
{"provider": Provider.DEEPSEEK.value, "model": "deepseek-chat"},
{"provider": Provider.GROK.value, "model": "grok-3-mini-latest"},
{"provider": Provider.OPENROUTER.value, "model": "mistralai/mistral-nemo"},
{"provider": Provider.OLLAMA.value, "model": "llama3.2"}
]
results_data = []
for config in configs:
provider_name = config["provider"]
model_name = config["model"]
provider = gateway.providers.get(provider_name)
if not provider:
logger.warning(f"Provider {provider_name} not available or initialized, skipping.", emoji_key="warning")
continue
try:
logger.info(f"Generating completion with {provider_name}/{model_name}...", emoji_key="processing")
result = await provider.generate_completion(
prompt=prompt,
model=model_name,
temperature=0.7,
max_tokens=150
)
# Track the cost
tracker.add_call(result)
results_data.append({
"provider": provider_name,
"model": model_name,
"text": result.text,
"input_tokens": result.input_tokens,
"output_tokens": result.output_tokens,
"cost": result.cost,
"processing_time": result.processing_time
})
logger.success(f"Completion from {provider_name}/{model_name} successful.", emoji_key="success")
except Exception as e:
logger.error(f"Error with {provider_name}/{model_name}: {e}", emoji_key="error", exc_info=True)
# Optionally store error result
results_data.append({
"provider": provider_name,
"model": model_name,
"text": f"[red]Error: {escape(str(e))}[/red]",
"cost": 0.0, "processing_time": 0.0, "input_tokens": 0, "output_tokens": 0
})
# Print comparison results using Rich Panels
console.print(Rule("[bold green]Comparison Results[/bold green]"))
for result in results_data:
stats_line = (
f"Cost: [green]${result['cost']:.6f}[/green] | "
f"Time: [yellow]{result['processing_time']:.2f}s[/yellow] | "
f"Tokens: [cyan]{result['input_tokens']} in, {result['output_tokens']} out[/cyan]"
)
console.print(Panel(
escape(result['text'].strip()),
title=f"[bold magenta]{escape(result['provider'])} / {escape(result['model'])}[/bold magenta]",
subtitle=stats_line,
border_style="blue",
expand=False
))
# Filter out error results before calculating summary stats
valid_results = [r for r in results_data if "Error" not in r["text"]]
if valid_results:
summary_table = Table(title="Comparison Summary", box=box.ROUNDED, show_header=False)
summary_table.add_column("Metric", style="cyan")
summary_table.add_column("Value", style="white")
try:
fastest = min(valid_results, key=lambda r: r['processing_time'])
summary_table.add_row("⚡ Fastest", f"{escape(fastest['provider'])}/{escape(fastest['model'])} ({fastest['processing_time']:.2f}s)")
except ValueError:
pass # Handle empty list
try:
cheapest = min(valid_results, key=lambda r: r['cost'])
summary_table.add_row("💰 Cheapest", f"{escape(cheapest['provider'])}/{escape(cheapest['model'])} (${cheapest['cost']:.6f})")
except ValueError:
pass
try:
most_tokens = max(valid_results, key=lambda r: r['output_tokens'])
summary_table.add_row("📄 Most Tokens", f"{escape(most_tokens['provider'])}/{escape(most_tokens['model'])} ({most_tokens['output_tokens']} tokens)")
except ValueError:
pass
if summary_table.row_count > 0:
console.print(summary_table)
else:
console.print("[yellow]No valid results to generate summary.[/yellow]")
# Display final summary
tracker.display_summary(console) # Display summary at the end
console.print() # Final spacing
return 0
async def main():
"""Run the demo."""
tracker = CostTracker() # Instantiate tracker
try:
return await run_provider_comparison(tracker) # Pass tracker
except Exception as e:
logger.critical(f"Demo failed: {str(e)}", emoji_key="critical")
return 1
if __name__ == "__main__":
# Run the demo
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/tool_annotations.py:
--------------------------------------------------------------------------------
```python
"""
Tool annotations for MCP servers.
This module provides a standardized way to annotate tools with hints that help LLMs
understand when and how to use them effectively.
"""
from typing import List, Optional
class ToolAnnotations:
"""
Tool annotations providing hints to LLMs about tool behavior and usage patterns.
ToolAnnotations supply metadata that helps LLMs make informed decisions about:
- WHEN to use a particular tool (appropriate contexts and priority)
- HOW to use the tool correctly (through examples and behavior hints)
- WHAT the potential consequences of using the tool might be (read-only vs. destructive)
- WHO should use the tool (assistant, user, or both via audience hints)
These annotations serve as a bridge between tool developers and LLMs, providing
crucial context beyond just function signatures and descriptions. For example, the
annotations can indicate that a file deletion tool is destructive and should be used
with caution, or that a search tool is safe to retry multiple times.
The system supports four key behavioral hints:
- read_only_hint: Tool doesn't modify state (safe for exploratory use)
- destructive_hint: Tool may perform irreversible changes (use with caution)
- idempotent_hint: Repeated calls with same arguments produce same results
- open_world_hint: Tool interacts with external systems beyond the LLM's knowledge
Additional metadata includes:
- audience: Who can/should use this tool
- priority: How important/commonly used this tool is
- title: Human-readable name for the tool
- examples: Sample inputs and expected outputs
Usage example:
```python
# For a document deletion tool
delete_doc_annotations = ToolAnnotations(
read_only_hint=False, # Modifies state
destructive_hint=True, # Deletion is destructive
idempotent_hint=True, # Deleting twice has same effect as once
open_world_hint=True, # Changes external file system
audience=["assistant"], # Only assistant should use it
priority=0.3, # Lower priority (use cautiously)
title="Delete Document",
examples=[{
"input": {"document_id": "doc-123"},
"output": {"success": True, "message": "Document deleted"}
}]
)
```
Note: All hints are advisory only - they don't enforce behavior but help LLMs
make better decisions about tool usage.
"""
def __init__(
self,
read_only_hint: bool = False,
destructive_hint: bool = True,
idempotent_hint: bool = False,
open_world_hint: bool = True,
audience: List[str] = None,
priority: float = 0.5,
title: Optional[str] = None,
examples: List[dict] = None,
):
"""
Initialize tool annotations.
Args:
read_only_hint: If True, indicates this tool does not modify its environment.
Tools with read_only_hint=True are safe to call for exploration without
side effects. Examples: search tools, data retrieval, information queries.
Default: False
destructive_hint: If True, the tool may perform destructive updates that
can't easily be reversed or undone. Only meaningful when read_only_hint
is False. Examples: deletion operations, irreversible state changes, payments.
Default: True
idempotent_hint: If True, calling the tool repeatedly with the same arguments
will have no additional effect beyond the first call. Useful for retry logic.
Only meaningful when read_only_hint is False. Examples: setting a value,
deleting an item (calling it twice doesn't delete it twice).
Default: False
open_world_hint: If True, this tool may interact with systems or information
outside the LLM's knowledge context (external APIs, file systems, etc.).
If False, the tool operates in a closed domain the LLM can fully model.
Default: True
audience: Who is the intended user of this tool, as a list of roles:
- "assistant": The AI assistant can use this tool
- "user": The human user can use this tool
Default: ["assistant"]
priority: How important this tool is, from 0.0 (lowest) to 1.0 (highest).
Higher priority tools should be considered first when multiple tools
might accomplish a similar task. Default: 0.5 (medium priority)
title: Human-readable title for the tool. If not provided, the tool's
function name is typically used instead.
examples: List of usage examples, each containing 'input' and 'output' keys.
These help the LLM understand expected patterns of use and responses.
"""
self.read_only_hint = read_only_hint
self.destructive_hint = destructive_hint
self.idempotent_hint = idempotent_hint
self.open_world_hint = open_world_hint
self.audience = audience or ["assistant"]
self.priority = max(0.0, min(1.0, priority)) # Clamp between 0 and 1
self.title = title
self.examples = examples or []
def to_dict(self) -> dict:
"""Convert annotations to dictionary for MCP protocol."""
return {
"readOnlyHint": self.read_only_hint,
"destructiveHint": self.destructive_hint,
"idempotentHint": self.idempotent_hint,
"openWorldHint": self.open_world_hint,
"title": self.title,
"audience": self.audience,
"priority": self.priority,
"examples": self.examples
}
# Pre-defined annotation templates for common tool types
# A tool that only reads/queries data without modifying any state
READONLY_TOOL = ToolAnnotations(
read_only_hint=True,
destructive_hint=False,
idempotent_hint=True,
open_world_hint=False,
priority=0.8,
title="Read-Only Tool"
)
# A tool that queries external systems or APIs for information
QUERY_TOOL = ToolAnnotations(
read_only_hint=True,
destructive_hint=False,
idempotent_hint=True,
open_world_hint=True,
priority=0.7,
title="Query Tool"
)
# A tool that performs potentially irreversible changes to state
# The LLM should use these with caution, especially without confirmation
DESTRUCTIVE_TOOL = ToolAnnotations(
read_only_hint=False,
destructive_hint=True,
idempotent_hint=False,
open_world_hint=True,
priority=0.3,
title="Destructive Tool"
)
# A tool that modifies state but can be safely called multiple times
# with the same arguments (e.g., setting a value, creating if not exists)
IDEMPOTENT_UPDATE_TOOL = ToolAnnotations(
read_only_hint=False,
destructive_hint=False,
idempotent_hint=True,
open_world_hint=False,
priority=0.5,
title="Idempotent Update Tool"
)
```
--------------------------------------------------------------------------------
/examples/test_content_detection.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
Test script to demonstrate enhanced content type detection with Magika integration
in the DocumentProcessingTool.
"""
import asyncio
import sys
from pathlib import Path
# Add project root to path for imports when running as script
_PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from rich.console import Console # noqa: E402
from rich.panel import Panel # noqa: E402
from rich.table import Table # noqa: E402
from ultimate_mcp_server.core.server import Gateway # noqa: E402
from ultimate_mcp_server.tools.document_conversion_and_processing import ( # noqa: E402
DocumentProcessingTool, # noqa: E402
)
console = Console()
# Sample content for testing
HTML_CONTENT = """<!DOCTYPE html>
<html>
<head>
<title>Test HTML Document</title>
<meta charset="utf-8">
</head>
<body>
<h1>This is a test HTML document</h1>
<p>This paragraph is for testing the content detection.</p>
<div class="container">
<ul>
<li>Item 1</li>
<li>Item 2</li>
</ul>
</div>
<script>
// Some JavaScript
console.log("Hello world");
</script>
</body>
</html>
"""
MARKDOWN_CONTENT = """# Test Markdown Document
This is a paragraph in markdown format.
## Section 1
* Item 1
* Item 2
[Link to example](https://example.com)
```python
def hello_world():
print("Hello world")
```
| Column 1 | Column 2 |
|----------|----------|
| Cell 1 | Cell 2 |
"""
CODE_CONTENT = """
#!/usr/bin/env python
import sys
from typing import List, Dict, Optional
class TestClass:
def __init__(self, name: str, value: int = 0):
self.name = name
self.value = value
def process(self, data: List[Dict]) -> Optional[Dict]:
result = {}
for item in data:
if "key" in item:
result[item["key"]] = item["value"]
return result if result else None
def main():
test = TestClass("test", 42)
result = test.process([{"key": "a", "value": 1}, {"key": "b", "value": 2}])
print(f"Result: {result}")
if __name__ == "__main__":
main()
"""
PLAIN_TEXT_CONTENT = """
This is a plain text document with no special formatting.
It contains multiple paragraphs and some sentences.
There are no markdown elements, HTML tags, or code structures.
Just regular text that someone might write in a simple text editor.
"""
AMBIGUOUS_CONTENT = """
Here's some text with a <div> tag in it.
# This looks like a heading
But it also has some <span>HTML</span> elements.
def is_this_code():
return "maybe"
Regular paragraph text continues here.
"""
async def test_content_detection():
console.print(Panel("Testing Content Type Detection with Magika Integration", style="bold green"))
# Initialize the document processor
gateway = Gateway("content-detection-test")
# Initialize providers
console.print("Initializing gateway and providers...")
await gateway._initialize_providers()
# Create document processing tool
doc_tool = DocumentProcessingTool(gateway)
# Define test cases
test_cases = [
("HTML Document", HTML_CONTENT),
("Markdown Document", MARKDOWN_CONTENT),
("Code Document", CODE_CONTENT),
("Plain Text Document", PLAIN_TEXT_CONTENT),
("Ambiguous Content", AMBIGUOUS_CONTENT),
]
# Create results table
results_table = Table(title="Content Type Detection Results")
results_table.add_column("Content Type", style="cyan")
results_table.add_column("Detected Type", style="green")
results_table.add_column("Confidence", style="yellow")
results_table.add_column("Method", style="magenta")
results_table.add_column("Detection Criteria", style="blue")
# Test each case
for name, content in test_cases:
console.print(f"\nDetecting content type for: [bold cyan]{name}[/]")
# Detect content type
result = await doc_tool.detect_content_type(content)
# Get detection details
detected_type = result.get("content_type", "unknown")
confidence = result.get("confidence", 0.0)
criteria = ", ".join(result.get("detection_criteria", []))
# Check if Magika was used
method = "Magika" if result.get("detection_method") == "magika" else "Heuristic"
if not result.get("detection_method") == "magika" and result.get("magika_details"):
method = "Combined (Magika + Heuristic)"
# Add to results table
results_table.add_row(
name,
detected_type,
f"{confidence:.2f}",
method,
criteria[:100] + "..." if len(criteria) > 100 else criteria
)
# Show all scores
scores = result.get("all_scores", {})
if scores:
scores_table = Table(title="Detection Scores")
scores_table.add_column("Content Type", style="cyan")
scores_table.add_column("Score", style="yellow")
for ctype, score in scores.items():
scores_table.add_row(ctype, f"{score:.3f}")
console.print(scores_table)
# Show Magika details if available
if "magika_details" in result:
magika_details = result["magika_details"]
console.print(Panel(
f"Magika Type: {magika_details.get('type', 'unknown')}\n"
f"Magika Confidence: {magika_details.get('confidence', 0.0):.3f}\n"
f"Matched Primary Type: {magika_details.get('matched_primary_type', False)}",
title="Magika Details",
style="blue"
))
# Print final results table
console.print("\n")
console.print(results_table)
# Now test HTML to Markdown conversion with a clearly broken HTML case
console.print(Panel("Testing HTML to Markdown Conversion with Content Detection", style="bold green"))
# Create a test case with problematic HTML (the one that previously failed)
problematic_html = """<!DOCTYPE html>
<html class="client-nojs vector-feature-language-in-header-enabled vector-feature-language-in-main-page-header-disabled">
<head>
<meta charset="UTF-8">
<title>Transformer (deep learning architecture) - Wikipedia</title>
<script>(function(){var className="client-js vector-feature-language-in-header-enabled vector-feature-language-in-main-page-header-disabled";</script>
</head>
<body>
<h1>Transformer Model</h1>
<p>The Transformer is a deep learning model introduced in the paper "Attention Is All You Need".</p>
</body>
</html>"""
console.print("Converting problematic HTML to Markdown...")
result = await doc_tool.clean_and_format_text_as_markdown(
text=problematic_html,
extraction_method="auto",
preserve_tables=True,
preserve_links=True
)
console.print(Panel(
f"Original Type: {result.get('original_content_type', 'unknown')}\n"
f"Was HTML: {result.get('was_html', False)}\n"
f"Extraction Method: {result.get('extraction_method_used', 'none')}",
title="Conversion Details",
style="cyan"
))
console.print(Panel(
result.get("markdown_text", "No markdown produced"),
title="Converted Markdown",
style="green"
))
if __name__ == "__main__":
asyncio.run(test_content_detection())
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/provider.py:
--------------------------------------------------------------------------------
```python
"""Provider tools for Ultimate MCP Server."""
from typing import Any, Dict, Optional
# Import ToolError explicitly
from ultimate_mcp_server.exceptions import ToolError
# REMOVE global instance logic
from ultimate_mcp_server.utils import get_logger
from .base import with_error_handling, with_tool_metrics
logger = get_logger("ultimate_mcp_server.tools.provider")
def _get_provider_status_dict() -> Dict[str, Any]:
"""Reliably gets the provider_status dictionary from the gateway instance."""
provider_status = {}
# Import here to avoid circular dependency at module load time
try:
from ultimate_mcp_server.core import get_gateway_instance
gateway = get_gateway_instance()
if gateway and hasattr(gateway, 'provider_status'):
provider_status = gateway.provider_status
if provider_status:
logger.debug("Retrieved provider status via global instance.")
return provider_status
except ImportError as e:
logger.error(f"Failed to import get_gateway_instance: {e}")
except Exception as e:
logger.error(f"Error accessing global gateway instance: {e}")
if not provider_status:
logger.warning("Could not retrieve provider status from global gateway instance.")
return provider_status
# --- Tool Functions (Standalone, Decorated) ---
@with_tool_metrics
@with_error_handling
async def get_provider_status() -> Dict[str, Any]:
"""Checks the status and availability of all configured LLM providers.
Use this tool to determine which LLM providers (e.g., OpenAI, Anthropic, Gemini)
are currently enabled, configured correctly (e.g., API keys), and ready to accept requests.
This helps in deciding which provider to use for a task or for troubleshooting.
Returns:
A dictionary mapping provider names to their status details:
{
"providers": {
"openai": { # Example for one provider
"enabled": true, # Is the provider enabled in the server config?
"available": true, # Is the provider initialized and ready for requests?
"api_key_configured": true, # Is the necessary API key set?
"error": null, # Error message if initialization failed, null otherwise.
"models_count": 38 # Number of models detected for this provider.
},
"anthropic": { # Example for another provider
"enabled": true,
"available": false,
"api_key_configured": true,
"error": "Initialization failed: Connection timeout",
"models_count": 0
},
...
}
}
Returns an empty "providers" dict and a message if status info is unavailable.
Usage:
- Call this tool before attempting complex tasks to ensure required providers are available.
- Use the output to inform the user about available options or diagnose issues.
- If a provider shows "available: false", check the "error" field for clues.
"""
provider_status = _get_provider_status_dict()
if not provider_status:
# Raise ToolError if status cannot be retrieved
raise ToolError(status_code=503, detail="Provider status information is currently unavailable. The server might be initializing or an internal error occurred.")
return {
"providers": {
name: {
"enabled": status.enabled,
"available": status.available,
"api_key_configured": status.api_key_configured,
"error": status.error,
"models_count": len(status.models)
}
for name, status in provider_status.items()
}
}
@with_tool_metrics
@with_error_handling
async def list_models(
provider: Optional[str] = None
) -> Dict[str, Any]:
"""Lists available LLM models, optionally filtered by provider.
Use this tool to discover specific models offered by the configured and available LLM providers.
The returned model IDs (e.g., 'openai/gpt-4.1-mini') are needed for other tools like
`chat_completion`, `generate_completion`, `estimate_cost`, or `create_tournament`.
Args:
provider: (Optional) The specific provider name (e.g., "openai", "anthropic", "gemini")
to list models for. If omitted, models from *all available* providers are listed.
Returns:
A dictionary mapping provider names to a list of their available models:
{
"models": {
"openai": [ # Example for one provider
{
"id": "openai/gpt-4.1-mini", # Unique ID used in other tools
"name": "GPT-4o Mini", # Human-friendly name
"context_window": 128000,
"features": ["chat", "completion", "vision"],
"input_cost_pmt": 0.15, # Cost per Million Tokens (Input)
"output_cost_pmt": 0.60 # Cost per Million Tokens (Output)
},
...
],
"gemini": [ # Example for another provider
{
"id": "gemini/gemini-2.5-pro-preview-03-25",
"name": "Gemini 2.5 Pro Experimental",
"context_window": 8192,
"features": ["chat", "completion"],
"input_cost_pmt": null, # Cost info might be null
"output_cost_pmt": null
},
...
],
...
}
}
Returns an empty "models" dict or includes warnings/errors if providers/models are unavailable.
Usage Flow:
1. (Optional) Call `get_provider_status` to see which providers are generally available.
2. Call `list_models` (optionally specifying a provider) to get usable model IDs.
3. Use a specific model ID (like "openai/gpt-4.1-mini") as the 'model' parameter in other tools.
Raises:
ToolError: If the specified provider name is invalid or provider status is unavailable.
"""
provider_status = _get_provider_status_dict()
if not provider_status:
raise ToolError(status_code=503, detail="Provider status information is currently unavailable. Cannot list models.")
models = {}
if provider:
if provider not in provider_status:
valid_providers = list(provider_status.keys())
raise ToolError(status_code=404, detail=f"Invalid provider specified: '{provider}'. Valid options: {valid_providers}")
status = provider_status[provider]
if not status.available:
# Return empty list for the provider but include a warning message
return {
"models": {provider: []},
"warning": f"Provider '{provider}' is configured but currently unavailable. Reason: {status.error or 'Unknown error'}"
}
# Use model details directly from the ProviderStatus object
models[provider] = [m for m in status.models] if status.models else []
else:
# List models for all *available* providers
any_available = False
for name, status in provider_status.items():
if status.available:
any_available = True
# Use model details directly from the ProviderStatus object
models[name] = [m for m in status.models] if status.models else []
# else: Provider not available, don't include it unless specifically requested
if not any_available:
return {
"models": {},
"warning": "No providers are currently available. Check provider status using get_provider_status."
}
elif all(len(model_list) == 0 for model_list in models.values()):
return {
"models": models,
"warning": "No models listed for any available provider. Check provider status or configuration."
}
return {"models": models}
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/cache.py:
--------------------------------------------------------------------------------
```python
"""Cache service for LLM and RAG results."""
import asyncio
import os
import pickle
import time
from pathlib import Path
from typing import Any, Dict, Optional
from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
# Singleton instance
_cache_service = None
def get_cache_service():
"""
Get or create the global singleton cache service instance.
This function implements the singleton pattern for the CacheService, ensuring that only
one instance is created across the entire application. On the first call, it creates a
new CacheService instance and stores it in a module-level variable. Subsequent calls
return the same instance.
Using this function instead of directly instantiating CacheService ensures consistent
caching behavior throughout the application, with a shared cache that persists across
different components and requests.
Returns:
CacheService: The global singleton cache service instance.
Example:
```python
# Get the same cache service instance from anywhere in the code
cache = get_cache_service()
await cache.set("my_key", my_value, ttl=3600)
```
"""
global _cache_service
if _cache_service is None:
_cache_service = CacheService()
return _cache_service
class CacheService:
"""Service for caching LLM and RAG results."""
def __init__(self, cache_dir: Optional[str] = None):
"""Initialize the cache service.
Args:
cache_dir: Directory to store cache files
"""
config = get_config()
cache_config = config.cache
self.cache_dir = cache_config.directory
# Create cache directory if it doesn't exist
os.makedirs(self.cache_dir, exist_ok=True)
# In-memory cache
self.memory_cache: Dict[str, Dict[str, Any]] = {}
# Load cache from disk
self._load_cache()
# Schedule cache maintenance
self._schedule_maintenance()
logger.info(f"Cache service initialized with directory: {self.cache_dir}")
def _load_cache(self) -> None:
"""Load cache from disk."""
try:
cache_file = Path(self.cache_dir) / "cache.pickle"
if cache_file.exists():
with open(cache_file, "rb") as f:
loaded_cache = pickle.load(f)
# Filter out expired items
current_time = time.time()
filtered_cache = {
key: value for key, value in loaded_cache.items()
if "expiry" not in value or value["expiry"] > current_time
}
self.memory_cache = filtered_cache
logger.info(f"Loaded {len(self.memory_cache)} items from cache")
else:
logger.info("No cache file found, starting with empty cache")
except Exception as e:
logger.error(f"Error loading cache: {str(e)}")
# Start with empty cache
self.memory_cache = {}
def _save_cache(self) -> None:
"""Save cache to disk."""
try:
cache_file = Path(self.cache_dir) / "cache.pickle"
with open(cache_file, "wb") as f:
pickle.dump(self.memory_cache, f)
logger.info(f"Saved {len(self.memory_cache)} items to cache")
except Exception as e:
logger.error(f"Error saving cache: {str(e)}")
def _schedule_maintenance(self) -> None:
"""Schedule periodic cache maintenance."""
asyncio.create_task(self._periodic_maintenance())
async def _periodic_maintenance(self) -> None:
"""Perform periodic cache maintenance."""
while True:
try:
# Clean expired items
self._clean_expired()
# Save cache to disk
self._save_cache()
# Wait for next maintenance cycle (every hour)
await asyncio.sleep(3600)
except Exception as e:
logger.error(f"Error in cache maintenance: {str(e)}")
await asyncio.sleep(300) # Wait 5 minutes on error
def _clean_expired(self) -> None:
"""Clean expired items from cache."""
current_time = time.time()
initial_count = len(self.memory_cache)
self.memory_cache = {
key: value for key, value in self.memory_cache.items()
if "expiry" not in value or value["expiry"] > current_time
}
removed = initial_count - len(self.memory_cache)
if removed > 0:
logger.info(f"Cleaned {removed} expired items from cache")
async def get(self, key: str) -> Optional[Any]:
"""Get an item from the cache.
Args:
key: Cache key
Returns:
Cached value or None if not found or expired
"""
if key not in self.memory_cache:
return None
cache_item = self.memory_cache[key]
# Check expiry
if "expiry" in cache_item and cache_item["expiry"] < time.time():
# Remove expired item
del self.memory_cache[key]
return None
# Update access time
cache_item["last_access"] = time.time()
cache_item["access_count"] = cache_item.get("access_count", 0) + 1
return cache_item["value"]
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
) -> bool:
"""Set an item in the cache.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds (None for no expiry)
Returns:
True if successful
"""
try:
expiry = time.time() + ttl if ttl is not None else None
self.memory_cache[key] = {
"value": value,
"created": time.time(),
"last_access": time.time(),
"access_count": 0,
"expiry": expiry
}
# Schedule save if more than 10 items have been added since last save
if len(self.memory_cache) % 10 == 0:
asyncio.create_task(self._async_save_cache())
return True
except Exception as e:
logger.error(f"Error setting cache item: {str(e)}")
return False
async def _async_save_cache(self) -> None:
"""Save cache asynchronously."""
self._save_cache()
async def delete(self, key: str) -> bool:
"""Delete an item from the cache.
Args:
key: Cache key
Returns:
True if item was deleted, False if not found
"""
if key in self.memory_cache:
del self.memory_cache[key]
return True
return False
async def clear(self) -> None:
"""Clear all items from the cache."""
self.memory_cache.clear()
self._save_cache()
logger.info("Cache cleared")
async def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics.
Returns:
Cache statistics
"""
total_items = len(self.memory_cache)
# Count expired items
current_time = time.time()
expired_items = sum(
1 for item in self.memory_cache.values()
if "expiry" in item and item["expiry"] < current_time
)
# Calculate average access count
access_counts = [
item.get("access_count", 0)
for item in self.memory_cache.values()
]
avg_access = sum(access_counts) / max(1, len(access_counts))
return {
"total_items": total_items,
"expired_items": expired_items,
"active_items": total_items - expired_items,
"avg_access_count": avg_access,
"cache_dir": self.cache_dir
}
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
"""Pytest fixtures for Ultimate MCP Server tests."""
import asyncio
import json
import os
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional
import pytest
from pytest import MonkeyPatch
from ultimate_mcp_server.config import Config, get_config
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.utils import get_logger
logger = get_logger("tests")
class MockResponse:
"""Mock response for testing."""
def __init__(self, status_code: int = 200, json_data: Optional[Dict[str, Any]] = None):
self.status_code = status_code
self.json_data = json_data or {}
async def json(self):
return self.json_data
async def text(self):
return json.dumps(self.json_data)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
class MockClient:
"""Mock HTTP client for testing."""
def __init__(self, responses: Optional[Dict[str, Any]] = None):
self.responses = responses or {}
self.requests = []
async def post(self, url: str, json: Dict[str, Any], headers: Optional[Dict[str, str]] = None):
self.requests.append({"url": url, "json": json, "headers": headers})
return MockResponse(json_data=self.responses.get(url, {"choices": [{"message": {"content": "Mock response"}}]}))
async def get(self, url: str, headers: Optional[Dict[str, str]] = None):
self.requests.append({"url": url, "headers": headers})
return MockResponse(json_data=self.responses.get(url, {"data": [{"id": "mock-model"}]}))
class MockProvider(BaseProvider):
"""Mock provider for testing."""
provider_name = "mock"
def __init__(self, api_key: Optional[str] = None, **kwargs):
super().__init__(api_key=api_key, **kwargs)
self.responses = kwargs.pop("responses", {})
self.initialized = False
self.calls = []
async def initialize(self) -> bool:
self.initialized = True
self.logger.success("Mock provider initialized successfully", emoji_key="provider")
return True
async def generate_completion(
self,
prompt: str,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
**kwargs
) -> ModelResponse:
self.calls.append({
"type": "completion",
"prompt": prompt,
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
"kwargs": kwargs
})
model = model or self.get_default_model()
return ModelResponse(
text=self.responses.get("text", "Mock completion response"),
model=model,
provider=self.provider_name,
input_tokens=100,
output_tokens=50,
processing_time=0.1,
raw_response={"id": "mock-response-id"}
)
async def generate_completion_stream(
self,
prompt: str,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
**kwargs
):
self.calls.append({
"type": "stream",
"prompt": prompt,
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
"kwargs": kwargs
})
model = model or self.get_default_model()
chunks = self.responses.get("chunks", ["Mock ", "streaming ", "response"])
for i, chunk in enumerate(chunks):
yield chunk, {
"model": model,
"provider": self.provider_name,
"chunk_index": i + 1,
"finish_reason": "stop" if i == len(chunks) - 1 else None
}
async def list_models(self) -> List[Dict[str, Any]]:
return self.responses.get("models", [
{
"id": "mock-model-1",
"provider": self.provider_name,
"description": "Mock model 1"
},
{
"id": "mock-model-2",
"provider": self.provider_name,
"description": "Mock model 2"
}
])
def get_default_model(self) -> str:
return "mock-model-1"
async def check_api_key(self) -> bool:
return True
@pytest.fixture
def test_dir() -> Path:
"""Get the tests directory path."""
return Path(__file__).parent
@pytest.fixture
def sample_data_dir(test_dir: Path) -> Path:
"""Get the sample data directory path."""
data_dir = test_dir / "data"
data_dir.mkdir(exist_ok=True)
return data_dir
@pytest.fixture
def mock_env_vars(monkeypatch: MonkeyPatch) -> None:
"""Set mock environment variables."""
monkeypatch.setenv("OPENAI_API_KEY", "mock-openai-key")
monkeypatch.setenv("ANTHROPIC_API_KEY", "mock-anthropic-key")
monkeypatch.setenv("GEMINI_API_KEY", "mock-gemini-key")
monkeypatch.setenv("DEEPSEEK_API_KEY", "mock-deepseek-key")
monkeypatch.setenv("CACHE_ENABLED", "true")
monkeypatch.setenv("LOG_LEVEL", "DEBUG")
@pytest.fixture
def test_config() -> Config:
"""Get a test configuration."""
# Create a test configuration
test_config = Config()
# Override settings for testing
test_config.cache.enabled = True
test_config.cache.ttl = 60 # Short TTL for testing
test_config.cache.max_entries = 100
test_config.server.port = 8888 # Different port for testing
# Set test API keys
test_config.providers.openai.api_key = "test-openai-key"
test_config.providers.anthropic.api_key = "test-anthropic-key"
test_config.providers.gemini.api_key = "test-gemini-key"
test_config.providers.deepseek.api_key = "test-deepseek-key"
return test_config
@pytest.fixture
def mock_provider() -> MockProvider:
"""Get a mock provider."""
return MockProvider(api_key="mock-api-key")
@pytest.fixture
def mock_gateway(mock_provider: MockProvider) -> Gateway:
"""Get a mock gateway with the mock provider."""
gateway = Gateway(name="test-gateway")
# Add mock provider
gateway.providers["mock"] = mock_provider
gateway.provider_status["mock"] = {
"enabled": True,
"available": True,
"api_key_configured": True,
"models": [
{
"id": "mock-model-1",
"provider": "mock",
"description": "Mock model 1"
},
{
"id": "mock-model-2",
"provider": "mock",
"description": "Mock model 2"
}
]
}
return gateway
@pytest.fixture
def mock_http_client(monkeypatch: MonkeyPatch) -> MockClient:
"""Mock HTTP client to avoid real API calls."""
mock_client = MockClient()
# We'll need to patch any HTTP clients used by the providers
# This will be implemented as needed in specific tests
return mock_client
@pytest.fixture
def sample_document() -> str:
"""Get a sample document for testing."""
return """
# Sample Document
This is a sample document for testing purposes.
## Section 1
Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Nullam auctor, nisl eget ultricies aliquam, est libero tincidunt nisi,
eu aliquet nunc nisl eu nisl.
## Section 2
Praesent euismod, nisl eget ultricies aliquam, est libero tincidunt nisi,
eu aliquet nunc nisl eu nisl.
### Subsection 2.1
- Item 1
- Item 2
- Item 3
### Subsection 2.2
1. First item
2. Second item
3. Third item
"""
@pytest.fixture
def sample_json_data() -> Dict[str, Any]:
"""Get sample JSON data for testing."""
return {
"name": "Test User",
"age": 30,
"email": "[email protected]",
"address": {
"street": "123 Test St",
"city": "Test City",
"state": "TS",
"zip": "12345"
},
"tags": ["test", "sample", "json"]
}
@pytest.fixture(scope="session")
def event_loop_policy():
"""Return an event loop policy for the test session."""
return asyncio.DefaultEventLoopPolicy()
```
--------------------------------------------------------------------------------
/tests/integration/test_server.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for the Ultimate MCP Server server."""
from contextlib import asynccontextmanager
from typing import Any, Dict, Optional
import pytest
from pytest import MonkeyPatch
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.utils import get_logger
logger = get_logger("test.integration.server")
@pytest.fixture
async def test_gateway() -> Gateway:
"""Create a test gateway instance."""
gateway = Gateway(name="test-gateway")
await gateway._initialize_providers()
return gateway
class TestGatewayServer:
"""Tests for the Gateway server."""
async def test_initialization(self, test_gateway: Gateway):
"""Test gateway initialization."""
logger.info("Testing gateway initialization", emoji_key="test")
assert test_gateway.name == "test-gateway"
assert test_gateway.mcp is not None
assert hasattr(test_gateway, "providers")
assert hasattr(test_gateway, "provider_status")
async def test_provider_status(self, test_gateway: Gateway):
"""Test provider status information."""
logger.info("Testing provider status", emoji_key="test")
# Should have provider status information
assert test_gateway.provider_status is not None
# Get info - we need to use the resource accessor instead of get_resource
@test_gateway.mcp.resource("info://server")
def server_info() -> Dict[str, Any]:
return {
"name": test_gateway.name,
"version": "0.1.0",
"providers": list(test_gateway.provider_status.keys())
}
# Access the server info
server_info_data = server_info()
assert server_info_data is not None
assert "name" in server_info_data
assert "version" in server_info_data
assert "providers" in server_info_data
async def test_tool_registration(self, test_gateway: Gateway):
"""Test tool registration."""
logger.info("Testing tool registration", emoji_key="test")
# Define a test tool
@test_gateway.mcp.tool()
async def test_tool(arg1: str, arg2: Optional[str] = None) -> Dict[str, Any]:
"""Test tool for testing."""
return {"result": f"{arg1}-{arg2 or 'default'}", "success": True}
# Execute the tool - result appears to be a list not a dict
result = await test_gateway.mcp.call_tool("test_tool", {"arg1": "test", "arg2": "value"})
# Verify test passed by checking we get a valid response (without assuming exact structure)
assert result is not None
# Execute with default
result2 = await test_gateway.mcp.call_tool("test_tool", {"arg1": "test"})
assert result2 is not None
async def test_tool_error_handling(self, test_gateway: Gateway):
"""Test error handling in tools."""
logger.info("Testing tool error handling", emoji_key="test")
# Define a tool that raises an error
@test_gateway.mcp.tool()
async def error_tool(should_fail: bool = True) -> Dict[str, Any]:
"""Tool that fails on demand."""
if should_fail:
raise ValueError("Test error")
return {"success": True}
# Execute and catch the error
with pytest.raises(Exception): # MCP might wrap the error # noqa: B017
await test_gateway.mcp.call_tool("error_tool", {"should_fail": True})
# Execute successful case
result = await test_gateway.mcp.call_tool("error_tool", {"should_fail": False})
# Just check a result is returned, not its specific structure
assert result is not None
class TestServerLifecycle:
"""Tests for server lifecycle."""
async def test_server_lifespan(self, monkeypatch: MonkeyPatch):
"""Test server lifespan context manager."""
logger.info("Testing server lifespan", emoji_key="test")
# Track lifecycle events
events = []
# Mock Gateway.run method to avoid asyncio conflicts
def mock_gateway_run(self):
events.append("run")
monkeypatch.setattr(Gateway, "run", mock_gateway_run)
# Create a fully mocked lifespan context manager
@asynccontextmanager
async def mock_lifespan(server):
"""Mock lifespan context manager that directly adds events"""
events.append("enter")
try:
yield {"mocked": "context"}
finally:
events.append("exit")
# Create a gateway and replace its _server_lifespan with our mock
gateway = Gateway(name="test-lifecycle")
monkeypatch.setattr(gateway, "_server_lifespan", mock_lifespan)
# Test run method (now mocked)
gateway.run()
assert "run" in events
# Test the mocked lifespan context manager
async with gateway._server_lifespan(None) as context:
events.append("in_context")
assert context is not None
# Check all expected events were recorded
assert "enter" in events, f"Events: {events}"
assert "in_context" in events, f"Events: {events}"
assert "exit" in events, f"Events: {events}"
class TestServerIntegration:
"""Integration tests for server with tools."""
async def test_provider_tools(self, test_gateway: Gateway, monkeypatch: MonkeyPatch):
"""Test provider-related tools."""
logger.info("Testing provider tools", emoji_key="test")
# Mock tool execution
async def mock_call_tool(tool_name, params):
if tool_name == "get_provider_status":
return {
"providers": {
"openai": {
"enabled": True,
"available": True,
"api_key_configured": True,
"error": None,
"models_count": 3
},
"anthropic": {
"enabled": True,
"available": True,
"api_key_configured": True,
"error": None,
"models_count": 5
}
}
}
elif tool_name == "list_models":
provider = params.get("provider")
if provider == "openai":
return {
"models": {
"openai": [
{"id": "gpt-4o", "provider": "openai"},
{"id": "gpt-4.1-mini", "provider": "openai"},
{"id": "gpt-4.1-mini", "provider": "openai"}
]
}
}
else:
return {
"models": {
"openai": [
{"id": "gpt-4o", "provider": "openai"},
{"id": "gpt-4.1-mini", "provider": "openai"}
],
"anthropic": [
{"id": "claude-3-opus-20240229", "provider": "anthropic"},
{"id": "claude-3-5-haiku-20241022", "provider": "anthropic"}
]
}
}
else:
return {"error": f"Unknown tool: {tool_name}"}
monkeypatch.setattr(test_gateway.mcp, "call_tool", mock_call_tool)
# Test get_provider_status
status = await test_gateway.mcp.call_tool("get_provider_status", {})
assert "providers" in status
assert "openai" in status["providers"]
assert "anthropic" in status["providers"]
# Test list_models with provider
models = await test_gateway.mcp.call_tool("list_models", {"provider": "openai"})
assert "models" in models
assert "openai" in models["models"]
assert len(models["models"]["openai"]) == 3
# Test list_models without provider
all_models = await test_gateway.mcp.call_tool("list_models", {})
assert "models" in all_models
assert "openai" in all_models["models"]
assert "anthropic" in all_models["models"]
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/console.py:
--------------------------------------------------------------------------------
```python
"""
Rich console configuration for Gateway logging system.
This module provides a configured Rich console instance for beautiful terminal output,
along with utility functions for common console operations.
"""
import sys # Add this import
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Tuple, Union
from rich.box import ROUNDED, Box
from rich.console import Console, ConsoleRenderable
from rich.live import Live
from rich.panel import Panel
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.status import Status
from rich.syntax import Syntax
from rich.table import Table
from rich.text import Text
from rich.traceback import install as install_rich_traceback
from rich.tree import Tree
# Use relative import for theme
from .themes import RICH_THEME
# Configure global console with our theme
# Note: Recording might be useful for testing or specific scenarios
console = Console(
theme=RICH_THEME,
highlight=True,
markup=True,
emoji=True,
record=False, # Set to True to capture output for testing
width=None, # Auto-width, or set a fixed width if desired
color_system="auto", # "auto", "standard", "256", "truecolor"
file=sys.stderr, # Always use stderr to avoid interfering with JSON-RPC messages on stdout
)
# Install rich traceback handler for beautiful error tracebacks
# show_locals=True can be verbose, consider False for production
install_rich_traceback(console=console, show_locals=False)
# Custom progress bar setup
def create_progress(
transient: bool = True,
auto_refresh: bool = True,
disable: bool = False,
**kwargs
) -> Progress:
"""Create a customized Rich Progress instance.
Args:
transient: Whether to remove the progress bar after completion
auto_refresh: Whether to auto-refresh the progress bar
disable: Whether to disable the progress bar
**kwargs: Additional arguments passed to Progress constructor
Returns:
Configured Progress instance
"""
return Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"), # Use theme style
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.0f}%", # Use theme style
TimeElapsedColumn(),
TimeRemainingColumn(),
console=console,
transient=transient,
auto_refresh=auto_refresh,
disable=disable,
**kwargs
)
@contextmanager
def status(message: str, spinner: str = "dots", **kwargs):
"""Context manager for displaying a status message during an operation.
Args:
message: The status message to display
spinner: The spinner animation to use
**kwargs: Additional arguments to pass to Status constructor
Yields:
Rich Status object that can be updated
"""
with Status(message, console=console, spinner=spinner, **kwargs) as status_obj:
yield status_obj
def print_panel(
content: Union[str, Text, ConsoleRenderable],
title: Optional[str] = None,
style: Optional[str] = "info", # Use theme styles by default
box: Optional[Box] = ROUNDED,
expand: bool = True,
padding: Tuple[int, int] = (1, 2),
**kwargs
) -> None:
"""Print content in a styled panel.
Args:
content: The content to display in the panel
title: Optional panel title
style: Style name to apply (from theme)
box: Box style to use
expand: Whether the panel should expand to fill width
padding: Panel padding (vertical, horizontal)
**kwargs: Additional arguments to pass to Panel constructor
"""
if isinstance(content, str):
content = Text.from_markup(content) # Allow markup in string content
panel = Panel(
content,
title=title,
style=style if style else "none", # Pass style name directly
border_style=style, # Use same style for border unless overridden
box=box,
expand=expand,
padding=padding,
**kwargs
)
console.print(panel)
def print_syntax(
code: str,
language: str = "python",
line_numbers: bool = True,
theme: str = "monokai", # Standard Rich theme
title: Optional[str] = None,
background_color: Optional[str] = None,
**kwargs
) -> None:
"""Print syntax-highlighted code.
Args:
code: The code to highlight
language: The programming language
line_numbers: Whether to show line numbers
theme: Syntax highlighting theme (e.g., 'monokai', 'native')
title: Optional title for the code block (creates a panel)
background_color: Optional background color
**kwargs: Additional arguments to pass to Syntax constructor
"""
syntax = Syntax(
code,
language,
theme=theme,
line_numbers=line_numbers,
background_color=background_color,
**kwargs
)
if title:
# Use a neutral panel style for code
print_panel(syntax, title=title, style="none", padding=(0,1))
else:
console.print(syntax)
def print_table(
title: Optional[str] = None,
columns: Optional[List[Union[str, Dict[str, Any]]]] = None,
rows: Optional[List[List[Any]]] = None,
box: Box = ROUNDED,
show_header: bool = True,
**kwargs
) -> Table:
"""Create and print a Rich table.
Args:
title: Optional table title
columns: List of column names or dicts for more control (e.g., {"header": "Name", "style": "bold"})
rows: List of rows, each a list of values (will be converted to str)
box: Box style to use
show_header: Whether to show the table header
**kwargs: Additional arguments to pass to Table constructor
Returns:
The created Table instance (in case further modification is needed)
"""
table = Table(title=title, box=box, show_header=show_header, **kwargs)
if columns:
for column in columns:
if isinstance(column, dict):
table.add_column(**column)
else:
table.add_column(str(column))
if rows:
for row in rows:
# Ensure all items are renderable (convert simple types to str)
renderable_row = [
item if isinstance(item, ConsoleRenderable) else str(item)
for item in row
]
table.add_row(*renderable_row)
console.print(table)
return table
def print_tree(
name: str,
data: Union[Dict[str, Any], List[Any]],
guide_style: str = "bright_black",
highlight: bool = True,
**kwargs
) -> None:
"""Print a hierarchical tree structure from nested data.
Args:
name: The root label of the tree
data: Nested dictionary or list to render as a tree
guide_style: Style for the tree guides
highlight: Apply highlighting to the tree
**kwargs: Additional arguments to pass to Tree constructor
"""
tree = Tree(name, guide_style=guide_style, highlight=highlight, **kwargs)
def build_tree(branch, node_data):
"""Recursively build the tree from nested data."""
if isinstance(node_data, dict):
for key, value in node_data.items():
sub_branch = branch.add(str(key))
build_tree(sub_branch, value)
elif isinstance(node_data, list):
for index, item in enumerate(node_data):
# Use index as label or try to represent item briefly
label = f"[{index}]"
sub_branch = branch.add(label)
build_tree(sub_branch, item)
else:
# Leaf node
branch.add(Text(str(node_data)))
build_tree(tree, data)
console.print(tree)
def print_json(data: Any, title: Optional[str] = None, indent: int = 2, highlight: bool = True) -> None:
"""Print data formatted as JSON with syntax highlighting.
Args:
data: The data to format as JSON.
title: Optional title (creates a panel).
indent: JSON indentation level.
highlight: Apply syntax highlighting.
"""
import json
try:
json_str = json.dumps(data, indent=indent, ensure_ascii=False)
if highlight:
syntax = Syntax(json_str, "json", theme="native", word_wrap=True)
if title:
print_panel(syntax, title=title, style="none", padding=(0, 1))
else:
console.print(syntax)
else:
if title:
print_panel(json_str, title=title, style="none", padding=(0, 1))
else:
console.print(json_str)
except Exception as e:
console.print(f"[error]Could not format data as JSON: {e}[/error]")
@contextmanager
def live_display(renderable: ConsoleRenderable, **kwargs):
"""Context manager for displaying a live-updating renderable.
Args:
renderable: The Rich renderable to display live.
**kwargs: Additional arguments for the Live instance.
Yields:
The Live instance.
"""
with Live(renderable, console=console, **kwargs) as live:
yield live
def get_rich_console() -> Console:
"""Returns the shared Rich Console instance."""
return console
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/security.py:
--------------------------------------------------------------------------------
```python
"""Security utilities for Ultimate MCP Server."""
import base64
import hashlib
import hmac
import re
import secrets
import time
from typing import Any, Dict, List, Optional, Tuple
from ultimate_mcp_server.config import get_env
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
def mask_api_key(api_key: str) -> str:
"""Mask API key for safe logging.
Args:
api_key: API key to mask
Returns:
Masked API key
"""
if not api_key:
return ""
# Keep first 4 and last 4 characters, mask the rest
if len(api_key) <= 8:
return "*" * len(api_key)
return api_key[:4] + "*" * (len(api_key) - 8) + api_key[-4:]
def validate_api_key(api_key: str, provider: str) -> bool:
"""Validate API key format for a provider.
Args:
api_key: API key to validate
provider: Provider name
Returns:
True if API key format is valid
"""
if not api_key:
return False
# Provider-specific validation patterns
patterns = {
"openai": r'^sk-[a-zA-Z0-9]{48}$',
"anthropic": r'^sk-ant-[a-zA-Z0-9]{48}$',
"deepseek": r'^sk-[a-zA-Z0-9]{32,64}$',
"gemini": r'^[a-zA-Z0-9_-]{39}$',
# Add more providers as needed
}
# Get pattern for provider
pattern = patterns.get(provider.lower())
if not pattern:
# For unknown providers, check minimum length
return len(api_key) >= 16
# Check if API key matches the pattern
return bool(re.match(pattern, api_key))
def generate_random_string(length: int = 32) -> str:
"""Generate a cryptographically secure random string.
Args:
length: Length of the string
Returns:
Random string
"""
# Generate random bytes
random_bytes = secrets.token_bytes(length)
# Convert to URL-safe base64
random_string = base64.urlsafe_b64encode(random_bytes).decode('utf-8')
# Truncate to desired length
return random_string[:length]
def generate_api_key(prefix: str = 'lgw') -> str:
"""Generate an API key for the gateway.
Args:
prefix: Key prefix
Returns:
Generated API key
"""
# Generate timestamp
timestamp = int(time.time())
# Generate random bytes
random_bytes = secrets.token_bytes(24)
# Combine and encode
timestamp_bytes = timestamp.to_bytes(4, byteorder='big')
combined = timestamp_bytes + random_bytes
encoded = base64.urlsafe_b64encode(combined).decode('utf-8').rstrip('=')
# Add prefix
return f"{prefix}-{encoded}"
def create_hmac_signature(
key: str,
message: str,
algorithm: str = 'sha256'
) -> str:
"""Create an HMAC signature.
Args:
key: Secret key
message: Message to sign
algorithm: Hash algorithm to use
Returns:
HMAC signature as hexadecimal string
"""
# Convert inputs to bytes
key_bytes = key.encode('utf-8')
message_bytes = message.encode('utf-8')
# Create HMAC
if algorithm == 'sha256':
h = hmac.new(key_bytes, message_bytes, hashlib.sha256)
elif algorithm == 'sha512':
h = hmac.new(key_bytes, message_bytes, hashlib.sha512)
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
# Return hexadecimal digest
return h.hexdigest()
def verify_hmac_signature(
key: str,
message: str,
signature: str,
algorithm: str = 'sha256'
) -> bool:
"""Verify an HMAC signature.
Args:
key: Secret key
message: Original message
signature: HMAC signature to verify
algorithm: Hash algorithm used
Returns:
True if signature is valid
"""
# Calculate expected signature
expected = create_hmac_signature(key, message, algorithm)
# Compare signatures (constant-time comparison)
return hmac.compare_digest(signature, expected)
def sanitize_input(text: str, allowed_patterns: Optional[List[str]] = None) -> str:
"""Sanitize user input to prevent injection attacks.
Args:
text: Input text to sanitize
allowed_patterns: List of regex patterns for allowed content
Returns:
Sanitized input
"""
if not text:
return ""
# Remove control characters
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
# Apply allowed patterns if specified
if allowed_patterns:
# Filter out anything not matching allowed patterns
filtered = ""
for pattern in allowed_patterns:
matches = re.finditer(pattern, text)
for match in matches:
filtered += match.group(0)
return filtered
# Default sanitization (alphanumeric, spaces, and common punctuation)
return re.sub(r'[^\w\s.,;:!?"\'-]', '', text)
def sanitize_path(path: str) -> str:
"""Sanitize file path to prevent path traversal attacks.
Args:
path: File path to sanitize
Returns:
Sanitized path
"""
if not path:
return ""
# Normalize path separators
path = path.replace('\\', '/')
# Remove path traversal sequences
path = re.sub(r'\.\.[/\\]', '', path)
path = re.sub(r'[/\\]\.\.[/\\]', '/', path)
# Remove multiple consecutive slashes
path = re.sub(r'[/\\]{2,}', '/', path)
# Remove leading slash
path = re.sub(r'^[/\\]', '', path)
# Remove dangerous characters
path = re.sub(r'[<>:"|?*]', '', path)
return path
def create_session_token(user_id: str, expires_in: int = 86400) -> Dict[str, Any]:
"""Create a session token for a user.
Args:
user_id: User identifier
expires_in: Token expiration time in seconds
Returns:
Dictionary with token and expiration
"""
# Generate expiration timestamp
expiration = int(time.time()) + expires_in
# Generate random token
token = generate_random_string(48)
# Compute signature
# In a real implementation, use a secure key from config
secret_key = get_env('SESSION_SECRET_KEY', 'default_session_key')
signature_msg = f"{user_id}:{token}:{expiration}"
signature = create_hmac_signature(secret_key, signature_msg)
return {
'token': token,
'signature': signature,
'user_id': user_id,
'expiration': expiration,
}
def verify_session_token(token_data: Dict[str, Any]) -> bool:
"""Verify a session token.
Args:
token_data: Token data dictionary
Returns:
True if token is valid
"""
# Check required fields
required_fields = ['token', 'signature', 'user_id', 'expiration']
if not all(field in token_data for field in required_fields):
return False
# Check expiration
if int(time.time()) > token_data['expiration']:
return False
# Verify signature
secret_key = get_env('SESSION_SECRET_KEY', 'default_session_key')
signature_msg = f"{token_data['user_id']}:{token_data['token']}:{token_data['expiration']}"
return verify_hmac_signature(secret_key, signature_msg, token_data['signature'])
def hash_password(password: str, salt: Optional[str] = None) -> Tuple[str, str]:
"""Hash a password securely.
Args:
password: Password to hash
salt: Optional salt (generated if not provided)
Returns:
Tuple of (hash, salt)
"""
# Generate salt if not provided
if not salt:
salt = secrets.token_hex(16)
# Create key derivation
key = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000, # 100,000 iterations
dklen=32
)
# Convert to hexadecimal
password_hash = key.hex()
return password_hash, salt
def verify_password(password: str, stored_hash: str, salt: str) -> bool:
"""Verify a password against a stored hash.
Args:
password: Password to verify
stored_hash: Stored password hash
salt: Salt used for hashing
Returns:
True if password is correct
"""
# Hash the provided password with the same salt
password_hash, _ = hash_password(password, salt)
# Compare hashes (constant-time comparison)
return hmac.compare_digest(password_hash, stored_hash)
def is_safe_url(url: str, allowed_hosts: Optional[List[str]] = None) -> bool:
"""Check if a URL is safe to redirect to.
Args:
url: URL to check
allowed_hosts: List of allowed hosts
Returns:
True if URL is safe
"""
if not url:
return False
# Check if URL is absolute and has a network location
if not url.startswith(('http://', 'https://')):
# Relative URLs are considered safe
return True
# Parse URL
try:
from urllib.parse import urlparse
parsed_url = urlparse(url)
# Check network location
if not parsed_url.netloc:
return False
# Check against allowed hosts
if allowed_hosts:
return parsed_url.netloc in allowed_hosts
# Default: only allow relative URLs
return False
except Exception:
return False
```
--------------------------------------------------------------------------------
/tests/unit/test_cache.py:
--------------------------------------------------------------------------------
```python
"""Tests for the cache service."""
import asyncio
from pathlib import Path
import pytest
from ultimate_mcp_server.services.cache import (
CacheService,
with_cache,
)
from ultimate_mcp_server.services.cache.strategies import (
ExactMatchStrategy,
SemanticMatchStrategy,
TaskBasedStrategy,
)
from ultimate_mcp_server.utils import get_logger
logger = get_logger("test.cache")
@pytest.fixture
def temp_cache_dir(tmp_path: Path) -> Path:
"""Create a temporary cache directory."""
cache_dir = tmp_path / "cache"
cache_dir.mkdir(exist_ok=True)
return cache_dir
@pytest.fixture
def cache_service(temp_cache_dir: Path) -> CacheService:
"""Get a cache service instance with a temporary directory."""
return CacheService(
enabled=True,
ttl=60, # Short TTL for testing
max_entries=10,
enable_persistence=True,
cache_dir=str(temp_cache_dir),
enable_fuzzy_matching=True
)
class TestCacheService:
"""Tests for the cache service."""
async def test_init(self, cache_service: CacheService):
"""Test cache service initialization."""
logger.info("Testing cache service initialization", emoji_key="test")
assert cache_service.enabled
assert cache_service.ttl == 60
assert cache_service.max_entries == 10
assert cache_service.enable_persistence
assert cache_service.enable_fuzzy_matching
async def test_get_set(self, cache_service: CacheService):
"""Test basic get and set operations."""
logger.info("Testing cache get/set operations", emoji_key="test")
# Set a value
key = "test-key"
value = {"text": "Test value", "metadata": {"test": True}}
await cache_service.set(key, value)
# Get the value back
result = await cache_service.get(key)
assert result == value
# Check cache stats
assert cache_service.metrics.hits == 1
assert cache_service.metrics.misses == 0
assert cache_service.metrics.stores == 1
async def test_cache_miss(self, cache_service: CacheService):
"""Test cache miss."""
logger.info("Testing cache miss", emoji_key="test")
# Get a non-existent key
result = await cache_service.get("non-existent-key")
assert result is None
# Check cache stats
assert cache_service.metrics.hits == 0
assert cache_service.metrics.misses == 1
async def test_cache_expiry(self, cache_service: CacheService):
"""Test cache entry expiry."""
logger.info("Testing cache expiry", emoji_key="test")
# Set a value with short TTL
key = "expiring-key"
value = {"text": "Expiring value"}
await cache_service.set(key, value, ttl=1) # 1 second TTL
# Get immediately (should hit)
result = await cache_service.get(key)
assert result == value
# Wait for expiry
await asyncio.sleep(1.5)
# Get again (should miss)
result = await cache_service.get(key)
assert result is None
# Check stats
assert cache_service.metrics.hits == 1
assert cache_service.metrics.misses == 1
async def test_cache_eviction(self, cache_service: CacheService):
"""Test cache eviction when max size is reached."""
logger.info("Testing cache eviction", emoji_key="test")
# Set max_entries + 1 values
for i in range(cache_service.max_entries + 5):
key = f"key-{i}"
value = {"text": f"Value {i}"}
await cache_service.set(key, value)
# Check size - should be at most max_entries
assert len(cache_service.cache) <= cache_service.max_entries
# Check stats
assert cache_service.metrics.evictions > 0
async def test_fuzzy_matching(self, cache_service: CacheService):
"""Test fuzzy matching of cache keys."""
logger.info("Testing fuzzy matching", emoji_key="test")
# Set a value with a prompt that would generate a fuzzy key
request_params = {
"prompt": "What is the capital of France?",
"model": "test-model",
"temperature": 0.7
}
key = cache_service.generate_cache_key(request_params)
fuzzy_key = cache_service.generate_fuzzy_key(request_params)
value = {"text": "The capital of France is Paris."}
await cache_service.set(key, value, fuzzy_key=fuzzy_key, request_params=request_params)
# Create a similar request that should match via fuzzy lookup
similar_request = {
"prompt": "What is the capital of France? Tell me about it.",
"model": "different-model",
"temperature": 0.5
}
similar_key = cache_service.generate_cache_key(similar_request)
similar_fuzzy = cache_service.generate_fuzzy_key(similar_request) # noqa: F841
# Should still find the original value
result = await cache_service.get(similar_key, fuzzy=True)
assert result == value
async def test_cache_decorator(self):
"""Test the cache decorator."""
logger.info("Testing cache decorator", emoji_key="test")
call_count = 0
@with_cache(ttl=60)
async def test_function(arg1, arg2=None):
nonlocal call_count
call_count += 1
return {"result": arg1 + str(arg2)}
# First call should execute the function
result1 = await test_function("test", arg2="123")
assert result1 == {"result": "test123"}
assert call_count == 1
# Second call with same args should use cache
result2 = await test_function("test", arg2="123")
assert result2 == {"result": "test123"}
assert call_count == 1 # Still 1
# Call with different args should execute function again
result3 = await test_function("test", arg2="456")
assert result3 == {"result": "test456"}
assert call_count == 2
class TestCacheStrategies:
"""Tests for cache strategies."""
def test_exact_match_strategy(self):
"""Test exact match strategy."""
logger.info("Testing exact match strategy", emoji_key="test")
strategy = ExactMatchStrategy()
# Generate key for a request
request = {
"prompt": "Test prompt",
"model": "test-model",
"temperature": 0.7
}
key = strategy.generate_key(request)
assert key.startswith("exact:")
# Should cache most requests
assert strategy.should_cache(request, {"text": "Test response"})
# Shouldn't cache streaming requests
streaming_request = request.copy()
streaming_request["stream"] = True
assert not strategy.should_cache(streaming_request, {"text": "Test response"})
def test_semantic_match_strategy(self):
"""Test semantic match strategy."""
logger.info("Testing semantic match strategy", emoji_key="test")
strategy = SemanticMatchStrategy()
# Generate key for a request
request = {
"prompt": "What is the capital of France?",
"model": "test-model",
"temperature": 0.7
}
key = strategy.generate_key(request)
assert key.startswith("exact:") # Primary key is still exact
semantic_key = strategy.generate_semantic_key(request)
assert semantic_key.startswith("semantic:")
# Should generate similar semantic keys for similar prompts
similar_request = {
"prompt": "Tell me the capital city of France?",
"model": "test-model",
"temperature": 0.7
}
similar_semantic_key = strategy.generate_semantic_key(similar_request)
assert similar_semantic_key.startswith("semantic:")
# The two semantic keys should share many common words
# This is a bit harder to test deterministically, so we'll skip detailed assertions
def test_task_based_strategy(self):
"""Test task-based strategy."""
logger.info("Testing task-based strategy", emoji_key="test")
strategy = TaskBasedStrategy()
# Test different task types
summarization_request = {
"prompt": "Summarize this document: Lorem ipsum...",
"model": "test-model",
"task_type": "summarization"
}
extraction_request = {
"prompt": "Extract entities from this text: John Smith...",
"model": "test-model",
"task_type": "extraction"
}
# Generate keys
summary_key = strategy.generate_key(summarization_request)
extraction_key = strategy.generate_key(extraction_request)
# Keys should include task type
assert "summarization" in summary_key
assert "extraction" in extraction_key
# Task-specific TTL
summary_ttl = strategy.get_ttl(summarization_request, None)
extraction_ttl = strategy.get_ttl(extraction_request, None)
# Summarization should have longer TTL than extraction (typically)
assert summary_ttl is not None
assert extraction_ttl is not None
assert summary_ttl > extraction_ttl
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tool_token_counter.py:
--------------------------------------------------------------------------------
```python
import inspect
import json
from typing import Any, Callable, Dict, List, Optional
from rich.console import Console
from rich.table import Table
from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS
from ultimate_mcp_server.tools.base import _get_json_schema_type
from ultimate_mcp_server.utils.text import count_tokens
def extract_tool_info(func: Callable, tool_name: Optional[str] = None) -> Dict[str, Any]:
"""
Extract tool information from a function, similar to how MCP does it.
Args:
func: The function to extract information from
tool_name: Optional custom name for the tool (defaults to function name)
Returns:
Dictionary containing the tool information
"""
# Get function name and docstring
name = tool_name or func.__name__
description = func.__doc__ or f"Tool: {name}"
# Get function parameters
sig = inspect.signature(func)
params = {}
for param_name, param in sig.parameters.items():
# Skip 'self' parameter for class methods
if param_name == 'self':
continue
# Skip context parameter which is usually added by decorators
if param_name == 'ctx':
continue
# Also skip state management parameters
if param_name in ['get_state', 'set_state', 'delete_state']:
continue
# Get parameter type annotation and default value
param_type = param.annotation
param_default = param.default if param.default is not inspect.Parameter.empty else None
# Convert Python type to JSON Schema
if param_type is not inspect.Parameter.empty:
param_schema = _get_json_schema_type(param_type)
else:
param_schema = {"type": "object"} # Default to object for unknown types
# Add default value if available
if param_default is not None:
param_schema["default"] = param_default
# Add to parameters
params[param_name] = param_schema
# Construct input schema
input_schema = {
"type": "object",
"properties": params,
"required": [param_name for param_name, param in sig.parameters.items()
if param.default is inspect.Parameter.empty
and param_name not in ['self', 'ctx', 'get_state', 'set_state', 'delete_state']]
}
# Construct final tool info
tool_info = {
"name": name,
"description": description,
"inputSchema": input_schema
}
return tool_info
def count_tool_registration_tokens(tools: List[Callable], model: str = "gpt-4o") -> int:
"""
Count the tokens that would be used to register the given tools with an LLM.
Args:
tools: List of tool functions
model: The model to use for token counting (default: gpt-4o)
Returns:
Total number of tokens
"""
# Extract tool info for each tool
tool_infos = [extract_tool_info(tool) for tool in tools]
# Convert to JSON string (similar to what MCP does when sending to LLM)
tools_json = json.dumps({"tools": tool_infos}, ensure_ascii=False)
# Count tokens
token_count = count_tokens(tools_json, model)
return token_count
def calculate_cost_per_provider(token_count: int) -> Dict[str, float]:
"""
Calculate the cost of including the tokens as input for various API providers.
Args:
token_count: Number of tokens
Returns:
Dictionary mapping provider names to costs in USD
"""
costs = {}
try:
# Make sure we can access the cost data structure
if not isinstance(COST_PER_MILLION_TOKENS, dict):
console = Console()
console.print("[yellow]Warning: COST_PER_MILLION_TOKENS is not a dictionary[/yellow]")
return costs
for provider_name, provider_info in COST_PER_MILLION_TOKENS.items():
# Skip if provider_info is not a dictionary
if not isinstance(provider_info, dict):
continue
# Choose a reasonable default input cost if we can't determine from models
default_input_cost = 0.01 # $0.01 per million tokens as a safe default
input_cost_per_million = default_input_cost
try:
# Try to get cost from provider models if available
if provider_info and len(provider_info) > 0:
# Try to find the most expensive model
max_cost = 0
for _model_name, model_costs in provider_info.items():
if isinstance(model_costs, dict) and 'input' in model_costs:
cost = model_costs['input']
if cost > max_cost:
max_cost = cost
if max_cost > 0:
input_cost_per_million = max_cost
except Exception as e:
# If any error occurs, use the default cost
console = Console()
console.print(f"[yellow]Warning getting costs for {provider_name}: {str(e)}[/yellow]")
# Calculate cost for this token count
cost = (token_count / 1_000_000) * input_cost_per_million
# Store in results
costs[provider_name] = cost
except Exception as e:
console = Console()
console.print(f"[red]Error calculating costs: {str(e)}[/red]")
return costs
def display_tool_token_usage(current_tools_info: List[Dict[str, Any]], all_tools_info: List[Dict[str, Any]]):
"""
Display token usage information for tools in a Rich table.
Args:
current_tools_info: List of tool info dictionaries for currently registered tools
all_tools_info: List of tool info dictionaries for all available tools
"""
# Convert to JSON and count tokens
current_json = json.dumps({"tools": current_tools_info}, ensure_ascii=False)
all_json = json.dumps({"tools": all_tools_info}, ensure_ascii=False)
current_token_count = count_tokens(current_json)
all_token_count = count_tokens(all_json)
# Calculate size in KB
current_kb = len(current_json) / 1024
all_kb = len(all_json) / 1024
# Calculate costs for each provider
current_costs = calculate_cost_per_provider(current_token_count)
all_costs = calculate_cost_per_provider(all_token_count)
# Create Rich table
console = Console()
table = Table(title="Tool Registration Token Usage")
# Add columns
table.add_column("Metric", style="cyan")
table.add_column("Current Tools", style="green")
table.add_column("All Tools", style="yellow")
table.add_column("Difference", style="magenta")
# Add rows
table.add_row(
"Number of Tools",
str(len(current_tools_info)),
str(len(all_tools_info)),
str(len(all_tools_info) - len(current_tools_info))
)
table.add_row(
"Size (KB)",
f"{current_kb:.2f}",
f"{all_kb:.2f}",
f"{all_kb - current_kb:.2f}"
)
table.add_row(
"Token Count",
f"{current_token_count:,}",
f"{all_token_count:,}",
f"{all_token_count - current_token_count:,}"
)
# Add cost rows for each provider
for provider_name in sorted(current_costs.keys()):
current_cost = current_costs.get(provider_name, 0)
all_cost = all_costs.get(provider_name, 0)
table.add_row(
f"Cost ({provider_name})",
f"${current_cost:.4f}",
f"${all_cost:.4f}",
f"${all_cost - current_cost:.4f}"
)
# Print table
console.print(table)
return {
"current_tools": {
"count": len(current_tools_info),
"size_kb": current_kb,
"tokens": current_token_count,
"costs": current_costs
},
"all_tools": {
"count": len(all_tools_info),
"size_kb": all_kb,
"tokens": all_token_count,
"costs": all_costs
}
}
async def count_registered_tools_tokens(mcp_server):
"""
Count tokens for tools that are currently registered with the MCP server.
Args:
mcp_server: The MCP server instance
Returns:
Dictionary with token counts and costs
"""
# Get registered tools info from the server
# Since we might not have direct access to the function objects, extract tool info from the MCP API
if hasattr(mcp_server, 'tools') and hasattr(mcp_server.tools, 'list'):
# Try to get tool definitions directly
current_tools_info = await mcp_server.tools.list()
else:
# Fallback if we can't access the tools directly
current_tools_info = []
console = Console()
console.print("[yellow]Warning: Could not directly access registered tools from MCP server[/yellow]")
try:
# Import all available tools
from ultimate_mcp_server.tools import STANDALONE_TOOL_FUNCTIONS
# Extract full tool info for all available tools
all_tools_info = [extract_tool_info(func) for func in STANDALONE_TOOL_FUNCTIONS]
except ImportError:
console = Console()
console.print("[yellow]Warning: Could not import STANDALONE_TOOL_FUNCTIONS[/yellow]")
all_tools_info = []
# Display token usage
result = display_tool_token_usage(current_tools_info, all_tools_info)
return result
```