This is page 3 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/test_sse_client.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
SSE Test Client for Ultimate MCP Server
Tests server functionality over SSE (Server-Sent Events) transport
"""
import asyncio
import json
from fastmcp import Client
async def test_sse_server():
"""Test Ultimate MCP Server over SSE transport."""
# SSE endpoint - note the /sse path for SSE transport
server_url = "http://127.0.0.1:8013/sse"
print("🔥 Ultimate MCP Server SSE Test Client")
print("=" * 50)
print(f"🔗 Connecting to Ultimate MCP Server SSE endpoint at {server_url}")
try:
async with Client(server_url) as client:
print("✅ Successfully connected to SSE server")
# Test 1: List available tools
print("\n📋 Testing tool discovery...")
tools = await client.list_tools()
print(f"Found {len(tools)} tools via SSE transport:")
for i, tool in enumerate(tools[:10]): # Show first 10
print(f" {i+1:2d}. {tool.name}")
if len(tools) > 10:
print(f" ... and {len(tools) - 10} more tools")
# Test 2: List available resources
print("\n📚 Testing resource discovery...")
resources = await client.list_resources()
print(f"Found {len(resources)} resources:")
for resource in resources:
print(f" - {resource.uri}")
# Test 3: Echo tool test
print("\n🔊 Testing echo tool over SSE...")
echo_result = await client.call_tool("echo", {"message": "Hello from SSE client!"})
if echo_result:
echo_data = json.loads(echo_result[0].text)
print(f"✅ Echo response: {json.dumps(echo_data, indent=2)}")
# Test 4: Provider status test
print("\n🔌 Testing provider status over SSE...")
try:
provider_result = await client.call_tool("get_provider_status", {})
if provider_result:
provider_data = json.loads(provider_result[0].text)
providers = provider_data.get("providers", {})
print(f"✅ Found {len(providers)} providers via SSE:")
for name, status in providers.items():
available = "✅" if status.get("available") else "❌"
model_count = len(status.get("models", []))
print(f" {available} {name}: {model_count} models")
except Exception as e:
print(f"❌ Provider status failed: {e}")
# Test 5: Resource reading test
print("\n📖 Testing resource reading over SSE...")
if resources:
try:
resource_uri = resources[0].uri
resource_content = await client.read_resource(resource_uri)
if resource_content:
content = resource_content[0].text
preview = content[:200] + "..." if len(content) > 200 else content
print(f"✅ Resource {resource_uri} content preview:")
print(f" {preview}")
except Exception as e:
print(f"❌ Resource reading failed: {e}")
# Test 6: Simple completion test (if providers available)
print("\n🤖 Testing completion over SSE...")
try:
completion_result = await client.call_tool(
"generate_completion",
{
"prompt": "Say hello in exactly 3 words",
"provider": "ollama",
"model": "mix_77/gemma3-qat-tools:27b",
"max_tokens": 10,
},
)
if completion_result:
result_data = json.loads(completion_result[0].text)
print("✅ Completion via SSE:")
print(f" Text: '{result_data.get('text', 'No text')}'")
print(f" Model: {result_data.get('model', 'Unknown')}")
print(f" Success: {result_data.get('success', False)}")
print(f" Processing time: {result_data.get('processing_time', 0):.2f}s")
except Exception as e:
print(f"⚠️ Completion test failed (expected if no providers): {e}")
# Test 7: Filesystem tool test
print("\n📁 Testing filesystem tools over SSE...")
try:
dirs_result = await client.call_tool("list_allowed_directories", {})
if dirs_result:
dirs_data = json.loads(dirs_result[0].text)
print(f"✅ Allowed directories via SSE: {dirs_data.get('count', 0)} directories")
except Exception as e:
print(f"❌ Filesystem test failed: {e}")
# Test 8: Text processing tool test
print("\n📝 Testing text processing over SSE...")
try:
ripgrep_result = await client.call_tool(
"run_ripgrep",
{
"args_str": "'async' . -t py --max-count 5",
"input_dir": "."
}
)
if ripgrep_result:
ripgrep_data = json.loads(ripgrep_result[0].text)
if ripgrep_data.get("success"):
lines = ripgrep_data.get("output", "").split('\n')
line_count = len([l for l in lines if l.strip()]) # noqa: E741
print(f"✅ Ripgrep via SSE found {line_count} matching lines")
else:
print("⚠️ Ripgrep completed but found no matches")
except Exception as e:
print(f"❌ Text processing test failed: {e}")
print("\n🎉 SSE transport functionality test completed!")
return True
except Exception as e:
print(f"❌ SSE connection failed: {e}")
print("\nTroubleshooting:")
print("1. Make sure the server is running in SSE mode:")
print(" umcp run -t sse")
print("2. Verify the server is accessible at http://127.0.0.1:8013")
print("3. Check that the SSE endpoint is available at /sse")
return False
async def test_sse_interactive():
"""Interactive SSE testing mode."""
server_url = "http://127.0.0.1:8013/sse"
print("\n🎮 Entering SSE interactive mode...")
print("Type 'list' to see available tools, 'quit' to exit")
try:
async with Client(server_url) as client:
tools = await client.list_tools()
resources = await client.list_resources()
while True:
try:
command = input("\nSSE> ").strip()
if command.lower() in ['quit', 'exit', 'q']:
print("👋 Goodbye!")
break
elif command.lower() == 'list':
print("Available tools:")
for i, tool in enumerate(tools[:20]):
print(f" {i+1:2d}. {tool.name}")
if len(tools) > 20:
print(f" ... and {len(tools) - 20} more")
elif command.lower() == 'resources':
print("Available resources:")
for resource in resources:
print(f" - {resource.uri}")
elif command.startswith("tool "):
# Call tool: tool <tool_name> <json_params>
parts = command[5:].split(' ', 1)
tool_name = parts[0]
params = json.loads(parts[1]) if len(parts) > 1 else {}
try:
result = await client.call_tool(tool_name, params)
if result:
print(f"✅ Tool result: {result[0].text}")
else:
print("❌ No result returned")
except Exception as e:
print(f"❌ Tool call failed: {e}")
elif command.startswith("read "):
# Read resource: read <resource_uri>
resource_uri = command[5:].strip()
try:
result = await client.read_resource(resource_uri)
if result:
content = result[0].text
preview = content[:500] + "..." if len(content) > 500 else content
print(f"✅ Resource content: {preview}")
else:
print("❌ No content returned")
except Exception as e:
print(f"❌ Resource read failed: {e}")
else:
print("Commands:")
print(" list - List available tools")
print(" resources - List available resources")
print(" tool <name> <params> - Call a tool with JSON params")
print(" read <uri> - Read a resource")
print(" quit - Exit interactive mode")
except KeyboardInterrupt:
print("\n👋 Goodbye!")
break
except Exception as e:
print(f"❌ Command error: {e}")
except Exception as e:
print(f"❌ SSE interactive mode failed: {e}")
async def main():
"""Main test function."""
# Run basic functionality test
success = await test_sse_server()
if success:
# Ask if user wants interactive mode
try:
response = input("\nWould you like to enter SSE interactive mode? (y/n): ").strip().lower()
if response in ['y', 'yes']:
await test_sse_interactive()
except KeyboardInterrupt:
print("\n👋 Goodbye!")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Gateway Logging Package.
This package provides enhanced logging capabilities with rich formatting,
progress tracking, and console output for the Gateway system.
"""
import logging
import logging.handlers
from typing import Any, Dict, List, Optional
# Import Rich-based console
# Adjusted imports to be relative within the new structure
from .console import (
console,
create_progress,
live_display,
print_json,
print_panel,
print_syntax,
print_table,
print_tree,
status,
)
# Import emojis
from .emojis import (
COMPLETED,
CRITICAL,
DEBUG,
ERROR,
FAILED,
INFO,
RUNNING,
SUCCESS,
WARNING,
get_emoji,
)
# Import formatters and handlers
from .formatter import (
DetailedLogFormatter,
GatewayLogRecord,
RichLoggingHandler,
SimpleLogFormatter,
create_rich_console_handler, # Added missing import used in server.py LOGGING_CONFIG
)
# Import logger and related utilities
from .logger import (
Logger,
critical,
debug,
error,
info,
section,
success,
warning,
)
# Import panels
from .panels import (
CodePanel,
ErrorPanel,
HeaderPanel,
InfoPanel,
ResultPanel,
ToolOutputPanel,
WarningPanel,
display_code,
display_error,
display_header,
display_info,
display_results,
display_tool_output,
display_warning,
)
# Import progress tracking
from .progress import (
GatewayProgress,
track,
)
# Create a global logger instance for importing
logger = Logger("ultimate")
# Removed configure_root_logger, initialize_logging, set_log_level functions
# Logging is now configured via dictConfig in main.py (or server.py equivalent)
def get_logger(name: str) -> Logger:
"""
Get or create a specialized Logger instance for a specific component.
This function provides access to the enhanced logging system of the Ultimate MCP Server,
returning a Logger instance that includes rich formatting, emoji support, and other
advanced features beyond Python's standard logging.
The returned Logger is configured with the project's logging settings and integrates
with the rich console output system. It provides methods like success() and section()
in addition to standard logging methods.
Args:
name: The logger name, typically the module or component name.
Can use dot notation for hierarchy (e.g., "module.submodule").
Returns:
An enhanced Logger instance with rich formatting and emoji support
Example:
```python
# In a module file
from ultimate_mcp_server.utils.logging import get_logger
# Create logger with the module name
logger = get_logger(__name__)
# Use the enhanced logging methods
logger.info("Server starting") # Basic info log
logger.success("Operation completed") # Success log (not in std logging)
logger.warning("Resource low", resource="RAM") # With additional context
logger.error("Failed to connect", emoji_key="network") # With custom emoji
```
"""
# Use the new base name for sub-loggers if needed, or keep original logic
# return Logger(f"ultimate_mcp_server.{name}") # Option 1: Prefix with base name
return Logger(name) # Option 2: Keep original name logic
def capture_logs(level: Optional[str] = None) -> "LogCapture":
"""
Create a context manager to capture logs for testing or debugging.
This function is a convenience wrapper around the LogCapture class, creating
and returning a context manager that will capture logs at or above the specified
level during its active scope.
Use this function when you need to verify that certain log messages are emitted
during tests, or when you want to collect logs for analysis without modifying
the application's logging configuration.
Args:
level: Minimum log level to capture (e.g., "INFO", "WARNING", "ERROR").
If None, all log levels are captured. Default: None
Returns:
A LogCapture context manager that will collect logs when active
Example:
```python
# Test that a function produces expected log messages
def test_login_function():
with capture_logs("WARNING") as logs:
# Call function that should produce a warning log for invalid login
result = login("invalid_user", "wrong_password")
# Assert that the expected warning was logged
assert logs.contains("Invalid login attempt")
assert len(logs.get_logs()) == 1
```
"""
return LogCapture(level)
# Log capturing for testing
class LogCapture:
"""
Context manager for capturing and analyzing logs during execution.
This class provides a way to intercept, store, and analyze logs emitted during
a specific block of code execution. It's primarily useful for:
- Testing: Verify that specific log messages were emitted during tests
- Debugging: Collect logs for examination without changing logging configuration
- Analysis: Gather statistics about logging patterns
The LogCapture acts as a context manager, capturing logs only within its scope
and providing methods to retrieve and analyze the captured logs after execution.
Each captured log entry is stored as a dictionary with details including the
message, level, timestamp, and source file/line information.
Example usage:
```python
# Capture all logs
with LogCapture() as capture:
# Code that generates logs
perform_operation()
# Check for specific log messages
assert capture.contains("Database connected")
assert not capture.contains("Error")
# Get all captured logs
all_logs = capture.get_logs()
# Get only warning and error messages
warnings = capture.get_logs(level="WARNING")
```
"""
def __init__(self, level: Optional[str] = None):
"""Initialize the log capture.
Args:
level: Minimum log level to capture
"""
self.level = level
self.level_num = getattr(logging, self.level.upper(), 0) if self.level else 0
self.logs: List[Dict[str, Any]] = []
self.handler = self._create_handler()
def _create_handler(self) -> logging.Handler:
"""Create a handler to capture logs.
Returns:
Log handler
"""
class CaptureHandler(logging.Handler):
def __init__(self, capture):
super().__init__()
self.capture = capture
def emit(self, record):
# Skip if record level is lower than minimum
if record.levelno < self.capture.level_num:
return
# Add log record to captured logs
self.capture.logs.append({
"level": record.levelname,
"message": record.getMessage(),
"name": record.name,
"time": record.created,
"file": record.pathname,
"line": record.lineno,
})
return CaptureHandler(self)
def __enter__(self) -> "LogCapture":
"""Enter the context manager.
Returns:
Self
"""
# Add handler to root logger
# Use the project's logger name
logging.getLogger("ultimate").addHandler(self.handler)
# Consider adding to the absolute root logger as well if needed
# logging.getLogger().addHandler(self.handler)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit the context manager.
Args:
exc_type: Exception type
exc_val: Exception value
exc_tb: Exception traceback
"""
# Remove handler from root logger
logging.getLogger("ultimate").removeHandler(self.handler)
# logging.getLogger().removeHandler(self.handler)
def get_logs(self, level: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get captured logs, optionally filtered by level.
Args:
level: Filter logs by level
Returns:
List of log records
"""
if not level:
return self.logs
level_num = getattr(logging, level.upper(), 0)
return [log for log in self.logs if getattr(logging, log["level"], 0) >= level_num]
def get_messages(self, level: Optional[str] = None) -> List[str]:
"""Get captured log messages, optionally filtered by level.
Args:
level: Filter logs by level
Returns:
List of log messages
"""
return [log["message"] for log in self.get_logs(level)]
def contains(self, text: str, level: Optional[str] = None) -> bool:
"""Check if any log message contains the given text.
Args:
text: Text to search for
level: Optional level filter
Returns:
True if text is found in any message
"""
return any(text in msg for msg in self.get_messages(level))
__all__ = [
# Console
"console",
"create_progress",
"status",
"print_panel",
"print_syntax",
"print_table",
"print_tree",
"print_json",
"live_display",
# Logger and utilities
"logger",
"Logger",
"debug",
"info",
"success",
"warning",
"error",
"critical",
"section",
"get_logger",
"capture_logs",
"LogCapture",
# Emojis
"get_emoji",
"INFO",
"DEBUG",
"WARNING",
"ERROR",
"CRITICAL",
"SUCCESS",
"RUNNING",
"COMPLETED",
"FAILED",
# Panels
"HeaderPanel",
"ResultPanel",
"InfoPanel",
"WarningPanel",
"ErrorPanel",
"ToolOutputPanel",
"CodePanel",
"display_header",
"display_results",
"display_info",
"display_warning",
"display_error",
"display_tool_output",
"display_code",
# Progress tracking
"GatewayProgress",
"track",
# Formatters and handlers
"GatewayLogRecord",
"SimpleLogFormatter",
"DetailedLogFormatter",
"RichLoggingHandler",
"create_rich_console_handler",
]
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/vector/embeddings.py:
--------------------------------------------------------------------------------
```python
"""Embedding generation service for vector operations."""
import asyncio
import hashlib
import os
from pathlib import Path
from typing import List, Optional
import numpy as np
from openai import AsyncOpenAI
from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
# Global dictionary to store embedding instances (optional)
embedding_instances = {}
class EmbeddingCache:
"""Cache for embeddings to avoid repeated API calls."""
def __init__(self, cache_dir: Optional[str] = None):
"""Initialize the embedding cache.
Args:
cache_dir: Directory to store cache files
"""
if cache_dir:
self.cache_dir = Path(cache_dir)
else:
self.cache_dir = Path.home() / ".ultimate" / "embeddings"
# Create cache directory if it doesn't exist
self.cache_dir.mkdir(parents=True, exist_ok=True)
# In-memory cache
self.cache = {}
logger.info(
f"Embeddings cache initialized (directory: {self.cache_dir})",
emoji_key="cache"
)
def _get_cache_key(self, text: str, model: str) -> str:
"""Generate a cache key for text and model.
Args:
text: Text to embed
model: Embedding model name
Returns:
Cache key
"""
# Create a hash based on text and model
text_hash = hashlib.md5(text.encode("utf-8")).hexdigest()
return f"{model}_{text_hash}"
def _get_cache_file_path(self, key: str) -> Path:
"""Get cache file path for a key.
Args:
key: Cache key
Returns:
Cache file path
"""
return self.cache_dir / f"{key}.npy"
def get(self, text: str, model: str) -> Optional[np.ndarray]:
"""Get embedding from cache.
Args:
text: Text to embed
model: Embedding model name
Returns:
Cached embedding or None if not found
"""
key = self._get_cache_key(text, model)
# Check in-memory cache first
if key in self.cache:
return self.cache[key]
# Check disk cache
cache_file = self._get_cache_file_path(key)
if cache_file.exists():
try:
embedding = np.load(str(cache_file))
# Add to in-memory cache
self.cache[key] = embedding
return embedding
except Exception as e:
logger.error(
f"Failed to load embedding from cache: {str(e)}",
emoji_key="error"
)
return None
def set(self, text: str, model: str, embedding: np.ndarray) -> None:
"""Set embedding in cache.
Args:
text: Text to embed
model: Embedding model name
embedding: Embedding vector
"""
key = self._get_cache_key(text, model)
# Add to in-memory cache
self.cache[key] = embedding
# Save to disk
cache_file = self._get_cache_file_path(key)
try:
np.save(str(cache_file), embedding)
except Exception as e:
logger.error(
f"Failed to save embedding to cache: {str(e)}",
emoji_key="error"
)
def clear(self) -> None:
"""Clear the embedding cache."""
# Clear in-memory cache
self.cache.clear()
# Clear disk cache
for cache_file in self.cache_dir.glob("*.npy"):
try:
cache_file.unlink()
except Exception as e:
logger.error(
f"Failed to delete cache file {cache_file}: {str(e)}",
emoji_key="error"
)
logger.info(
"Embeddings cache cleared",
emoji_key="cache"
)
class EmbeddingService:
"""Generic service to create embeddings using different providers."""
def __init__(self, provider_type: str = 'openai', model_name: str = 'text-embedding-3-small', api_key: Optional[str] = None, **kwargs):
"""Initialize the embedding service.
Args:
provider_type: The type of embedding provider (e.g., 'openai').
model_name: The specific embedding model to use.
api_key: Optional API key. If not provided, attempts to load from config.
**kwargs: Additional provider-specific arguments.
"""
self.provider_type = provider_type.lower()
self.model_name = model_name
self.client = None
self.api_key = api_key
self.kwargs = kwargs
try:
config = get_config()
if self.provider_type == 'openai':
provider_config = config.providers.openai
# Use provided key first, then config key
self.api_key = self.api_key or provider_config.api_key
if not self.api_key:
raise ValueError("OpenAI API key not provided or found in configuration.")
# Pass base_url and organization from config if available
openai_kwargs = {
'api_key': self.api_key,
'base_url': provider_config.base_url or self.kwargs.get('base_url'),
'organization': provider_config.organization or self.kwargs.get('organization'),
'timeout': provider_config.timeout or self.kwargs.get('timeout'),
}
# Filter out None values before passing to OpenAI client
openai_kwargs = {k: v for k, v in openai_kwargs.items() if v is not None}
# Always use AsyncOpenAI
self.client = AsyncOpenAI(**openai_kwargs)
logger.info(f"Initialized AsyncOpenAI embedding client for model: {self.model_name}")
else:
raise ValueError(f"Unsupported embedding provider type: {self.provider_type}")
except Exception as e:
logger.error(f"Failed to initialize embedding service for provider {self.provider_type}: {e}", exc_info=True)
raise RuntimeError(f"Embedding service initialization failed: {e}") from e
async def create_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Create embeddings for a list of texts.
Args:
texts: A list of strings to embed.
Returns:
A list of embedding vectors (each a list of floats).
Raises:
ValueError: If the provider type is unsupported or embedding fails.
RuntimeError: If the client is not initialized.
"""
if self.client is None:
raise RuntimeError("Embedding client is not initialized.")
try:
if self.provider_type == 'openai':
response = await self.client.embeddings.create(
input=texts,
model=self.model_name
)
# Extract the embedding data
embeddings = [item.embedding for item in response.data]
logger.debug(f"Successfully created {len(embeddings)} embeddings using {self.model_name}.")
return embeddings
else:
raise ValueError(f"Unsupported provider type: {self.provider_type}")
except Exception as e:
logger.error(f"Failed to create embeddings using {self.provider_type} model {self.model_name}: {e}", exc_info=True)
# Re-raise the error or return an empty list/handle appropriately
raise ValueError(f"Embedding creation failed: {e}") from e
def get_embedding_service(provider_type: str = 'openai', model_name: str = 'text-embedding-3-small', **kwargs) -> EmbeddingService:
"""Factory function to get or create an EmbeddingService instance.
Args:
provider_type: The type of embedding provider.
model_name: The specific embedding model.
**kwargs: Additional arguments passed to the EmbeddingService constructor.
Returns:
An initialized EmbeddingService instance.
"""
# Optional: Implement caching/singleton pattern for instances if desired
instance_key = (provider_type, model_name)
if instance_key in embedding_instances:
# TODO: Check if kwargs match cached instance? For now, assume they do.
logger.debug(f"Returning cached embedding service instance for {provider_type}/{model_name}")
return embedding_instances[instance_key]
else:
logger.debug(f"Creating new embedding service instance for {provider_type}/{model_name}")
instance = EmbeddingService(provider_type=provider_type, model_name=model_name, **kwargs)
embedding_instances[instance_key] = instance
return instance
# Example usage (for testing)
async def main():
# setup_logging(log_level="DEBUG") # Removed as logging is configured centrally
# Make sure OPENAI_API_KEY is set in your .env file or environment
os.environ['GATEWAY_FORCE_CONFIG_RELOAD'] = 'true' # Ensure latest config
try:
# Get the default OpenAI service
openai_service = get_embedding_service()
texts_to_embed = [
"The quick brown fox jumps over the lazy dog.",
"Quantum computing leverages quantum mechanics.",
"Paris is the capital of France."
]
embeddings = await openai_service.create_embeddings(texts_to_embed)
print(f"Generated {len(embeddings)} embeddings.")
print(f"Dimension of first embedding: {len(embeddings[0])}")
# print(f"First embedding (preview): {embeddings[0][:10]}...")
# Example of specifying a different model (if available and configured)
# try:
# ada_service = get_embedding_service(model_name='text-embedding-ada-002')
# ada_embeddings = await ada_service.create_embeddings(["Test with Ada model"])
# print(\"\nSuccessfully used Ada model.\")
# except Exception as e:
# print(f\"\nCould not use Ada model (may need different API key/config): {e}\")
except Exception as e:
print(f"An error occurred during the example: {e}")
finally:
if 'GATEWAY_FORCE_CONFIG_RELOAD' in os.environ:
del os.environ['GATEWAY_FORCE_CONFIG_RELOAD']
if __name__ == "__main__":
import asyncio
asyncio.run(main())
```
--------------------------------------------------------------------------------
/examples/marqo_fused_search_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""Demo script showcasing the marqo_fused_search tool."""
import asyncio
import json
import os
import sys
import time # Add time import
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
# Add Rich imports
from rich.console import Console
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
# Add the project root to the Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, project_root)
from ultimate_mcp_server.tools.marqo_fused_search import DateRange, marqo_fused_search # noqa: E402
from ultimate_mcp_server.utils.logging import logger # noqa: E402
# Initialize Rich Console
console = Console()
# --- Configuration ---
CONFIG_FILE_PATH = os.path.join(project_root, "marqo_index_config.json")
def load_marqo_config() -> Dict[str, Any]:
"""Loads Marqo configuration from the JSON file."""
try:
with open(CONFIG_FILE_PATH, 'r') as f:
config = json.load(f)
logger.info(f"Loaded Marqo config from {CONFIG_FILE_PATH}")
return config
except FileNotFoundError:
logger.error(f"Marqo config file not found at {CONFIG_FILE_PATH}. Cannot run dynamic examples.")
return {}
except json.JSONDecodeError as e:
logger.error(f"Error decoding Marqo config file {CONFIG_FILE_PATH}: {e}")
return {}
def find_schema_field(schema: Dict[str, Any], required_properties: Dict[str, Any]) -> Optional[str]:
"""
Finds the first field name in the schema that matches all required properties.
Handles nested properties like 'type'.
"""
if not schema or "fields" not in schema:
return None
for field_name, properties in schema["fields"].items():
match = True
for req_prop, req_value in required_properties.items():
# Allow checking properties like 'type', 'filterable', 'sortable', 'role', 'searchable'
if properties.get(req_prop) != req_value:
match = False
break
if match:
# Avoid returning internal fields like _id unless specifically requested
if field_name == "_id" and required_properties.get("role") != "internal":
continue
return field_name
return None
# --- Helper Function ---
async def run_search_example(example_name: str, **kwargs):
"""Runs a single search example and prints the results using Rich."""
console.print(Rule(f"[bold cyan]{example_name}[/bold cyan]"))
# Display parameters using a panel
param_str_parts = []
for key, value in kwargs.items():
# Format DateRange nicely
if isinstance(value, DateRange):
start_str = value.start_date.strftime("%Y-%m-%d") if value.start_date else "N/A"
end_str = value.end_date.strftime("%Y-%m-%d") if value.end_date else "N/A"
param_str_parts.append(f" [green]{key}[/green]: Start=[yellow]{start_str}[/yellow], End=[yellow]{end_str}[/yellow]")
else:
param_str_parts.append(f" [green]{key}[/green]: [yellow]{escape(str(value))}[/yellow]")
param_str = "\n".join(param_str_parts)
console.print(Panel(param_str, title="Search Parameters", border_style="blue", expand=False))
try:
start_time = time.time() # Use time for accurate timing
results = await marqo_fused_search(**kwargs)
processing_time = time.time() - start_time
logger.debug(f"Raw results for '{example_name}': {results}") # Keep debug log
if results.get("success"):
logger.success(f"Search successful for '{example_name}'! ({processing_time:.3f}s)", emoji_key="success")
# Display results using Rich Syntax for JSON
results_json = json.dumps(results, indent=2, default=str)
syntax = Syntax(results_json, "json", theme="default", line_numbers=True)
console.print(Panel(syntax, title="Marqo Search Results", border_style="green"))
else:
# Display error nicely if success is False but no exception was raised
error_msg = results.get("error", "Unknown error")
error_code = results.get("error_code", "UNKNOWN_CODE")
logger.error(f"Search failed for '{example_name}': {error_code} - {error_msg}", emoji_key="error")
console.print(Panel(f"[bold red]Error ({error_code}):[/bold red]\n{escape(error_msg)}", title="Search Failed", border_style="red"))
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"An exception occurred during '{example_name}' ({processing_time:.3f}s): {e}", emoji_key="critical", exc_info=True)
# Display exception using Rich traceback
console.print_exception(show_locals=False)
console.print(Panel(f"[bold red]Exception:[/bold red]\n{escape(str(e))}", title="Execution Error", border_style="red"))
console.print() # Add space after each example
# --- Main Demo Function ---
async def main():
"""Runs various demonstrations of the marqo_fused_search tool."""
# Load Marqo configuration and schema
marqo_config = load_marqo_config()
if not marqo_config:
logger.error("Exiting demo as Marqo config could not be loaded.")
return
schema = marqo_config.get("default_schema", {})
tensor_field = schema.get("tensor_field")
# content_field = schema.get("default_content_field", "content") # Not directly used in examples
date_field = schema.get("default_date_field") # Used for date range
# --- Find suitable fields dynamically ---
# For filter examples (keyword preferred)
filter_field = find_schema_field(schema, {"filterable": True, "type": "keyword"}) or \
find_schema_field(schema, {"filterable": True}) # Fallback to any filterable
# For lexical search (requires searchable='lexical')
lexical_field_1 = find_schema_field(schema, {"searchable": "lexical"})
lexical_field_2 = find_schema_field(schema, {"searchable": "lexical", "field_name_not": lexical_field_1}) or lexical_field_1 # Find a second one if possible
# For hybrid search (need tensor + lexical)
hybrid_tensor_field = tensor_field # Use the main tensor field
hybrid_lexical_field_1 = lexical_field_1
hybrid_lexical_field_2 = lexical_field_2
# For explicit tensor search (need tensor field)
explicit_tensor_field = tensor_field
logger.info("Dynamically determined fields for examples:")
logger.info(f" Filter Field: '{filter_field}'")
logger.info(f" Lexical Fields: '{lexical_field_1}', '{lexical_field_2}'")
logger.info(f" Tensor Field (for hybrid/explicit): '{hybrid_tensor_field}'")
logger.info(f" Date Field (for range): '{date_field}'")
# --- Run Examples ---
# --- Example 1: Basic Semantic Search --- (No specific fields needed)
await run_search_example(
"Basic Semantic Search",
query="impact of AI on software development"
)
# --- Example 2: Search with Metadata Filter ---
if filter_field:
# Use a plausible value; specific value might not exist in data
example_filter_value = "10-K" if filter_field == "form_type" else "example_value"
await run_search_example(
"Search with Metadata Filter",
query="latest advancements in renewable energy",
filters={filter_field: example_filter_value}
)
else:
logger.warning("Skipping Example 2: No suitable filterable field found in schema.")
# --- Example 3: Search with Multiple Filter Values (OR condition) ---
if filter_field:
# Use plausible values
example_filter_values = ["10-K", "10-Q"] if filter_field == "form_type" else ["value1", "value2"]
await run_search_example(
"Search with Multiple Filter Values (OR)",
query="financial report analysis",
filters={filter_field: example_filter_values}
)
else:
logger.warning("Skipping Example 3: No suitable filterable field found in schema.")
# --- Example 4: Search with Date Range ---
if date_field and find_schema_field(schema, {"name": date_field, "type": "timestamp"}):
start_date = datetime.now() - timedelta(days=900)
end_date = datetime.now() - timedelta(days=30)
await run_search_example(
"Search with Date Range",
query="market trends",
date_range=DateRange(start_date=start_date, end_date=end_date)
)
else:
logger.warning(f"Skipping Example 4: No sortable timestamp field named '{date_field}' (default_date_field) found in schema.")
# --- Example 5: Pure Lexical Search --- (Relies on schema having lexical fields)
# The tool will auto-detect lexical fields if not specified, but this tests the weight
await run_search_example(
"Pure Lexical Search",
query="exact sciences", # Query likely to hit company name etc.
semantic_weight=0.0
)
# --- Example 6: Hybrid Search with Custom Weight --- (Relies on schema having both)
await run_search_example(
"Hybrid Search with Custom Weight",
query="balancing innovation and regulation",
semantic_weight=0.5 # Equal weight
)
# --- Example 7: Pagination (Limit and Offset) --- (No specific fields needed)
await run_search_example(
"Pagination (Limit and Offset)",
query="common programming paradigms",
limit=10,
offset=10
)
# --- Example 8: Explicit Searchable Attributes (Tensor Search) ---
if explicit_tensor_field:
await run_search_example(
"Explicit Tensor Searchable Attributes",
query="neural network architectures",
searchable_attributes=[explicit_tensor_field],
semantic_weight=1.0 # Ensure tensor search is used
)
else:
logger.warning("Skipping Example 8: No tensor field found in schema.")
# --- Example 9: Explicit Hybrid Search Attributes ---
if hybrid_tensor_field and hybrid_lexical_field_1:
lexical_fields = [hybrid_lexical_field_1]
if hybrid_lexical_field_2 and hybrid_lexical_field_1 != hybrid_lexical_field_2:
lexical_fields.append(hybrid_lexical_field_2)
await run_search_example(
"Explicit Hybrid Search Attributes",
query="machine learning applications in healthcare",
hybrid_search_attributes={
"tensor": [hybrid_tensor_field],
"lexical": lexical_fields
},
semantic_weight=0.6 # Specify hybrid search balance
)
else:
logger.warning("Skipping Example 9: Need both tensor and lexical fields defined in schema.")
# --- Example 12: Overriding Marqo URL and Index Name --- (Keep commented out)
# ... rest of the code ...
console.print(Rule("[bold magenta]Marqo Fused Search Demo Complete[/bold magenta]"))
if __name__ == "__main__":
console.print(Rule("[bold magenta]Starting Marqo Fused Search Demo[/bold magenta]"))
# logger.info("Starting Marqo Fused Search Demo...") # Replaced by Rich rule
asyncio.run(main())
# logger.info("Marqo Fused Search Demo finished.") # Replaced by Rich rule
```
--------------------------------------------------------------------------------
/examples/claude_integration_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Claude integration demonstration using Ultimate MCP Server."""
import asyncio
import sys
import time
from pathlib import Path
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
# Third-party imports
# These imports need to be below sys.path modification, which is why they have noqa comments
from rich import box # noqa: E402
from rich.markup import escape # noqa: E402
from rich.panel import Panel # noqa: E402
from rich.rule import Rule # noqa: E402
from rich.table import Table # noqa: E402
# Project imports
from ultimate_mcp_server.constants import Provider # noqa: E402
from ultimate_mcp_server.core.server import Gateway # noqa: E402
from ultimate_mcp_server.utils import get_logger # noqa: E402
from ultimate_mcp_server.utils.display import CostTracker # Import CostTracker
from ultimate_mcp_server.utils.logging.console import console # noqa: E402
# Initialize logger
logger = get_logger("example.claude_integration_demo")
async def compare_claude_models(tracker: CostTracker):
"""Compare different Claude models."""
console.print(Rule("[bold blue]Claude Model Comparison[/bold blue]"))
logger.info("Starting Claude models comparison", emoji_key="start")
# Create Gateway instance - this handles provider initialization
gateway = Gateway("claude-demo", register_tools=False)
# Initialize providers
logger.info("Initializing providers...", emoji_key="provider")
await gateway._initialize_providers()
provider_name = Provider.ANTHROPIC.value
try:
# Get the provider from the gateway
provider = gateway.providers.get(provider_name)
if not provider:
logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
return
logger.info(f"Using provider: {provider_name}", emoji_key="provider")
models = await provider.list_models()
model_names = [m["id"] for m in models] # Extract names from model dictionaries
console.print(f"Found {len(model_names)} Claude models: [cyan]{escape(str(model_names))}[/cyan]")
# Select specific models to compare (Ensure these are valid and available)
claude_models = [
"anthropic/claude-3-7-sonnet-20250219",
"anthropic/claude-3-5-haiku-20241022"
]
# Filter based on available models
models_to_compare = [m for m in claude_models if m in model_names]
if not models_to_compare:
logger.error("None of the selected models for comparison are available. Exiting comparison.", emoji_key="error")
console.print("[red]Selected models not found in available list.[/red]")
return
console.print(f"Comparing models: [yellow]{escape(str(models_to_compare))}[/yellow]")
prompt = """
Explain the concept of quantum entanglement in a way that a high school student would understand.
Keep your response brief and accessible.
"""
console.print(f"[cyan]Using Prompt:[/cyan] {escape(prompt.strip())[:100]}...")
results_data = []
for model_name in models_to_compare:
try:
logger.info(f"Testing model: {model_name}", emoji_key="model")
start_time = time.time()
result = await provider.generate_completion(
prompt=prompt,
model=model_name,
temperature=0.3,
max_tokens=300
)
processing_time = time.time() - start_time
# Track the cost
tracker.add_call(result)
results_data.append({
"model": model_name,
"text": result.text,
"tokens": {
"input": result.input_tokens,
"output": result.output_tokens,
"total": result.total_tokens
},
"cost": result.cost,
"time": processing_time
})
logger.success(
f"Completion for {model_name} successful",
emoji_key="success",
# Tokens/cost/time logged implicitly by storing in results_data
)
except Exception as e:
logger.error(f"Error testing model {model_name}: {str(e)}", emoji_key="error", exc_info=True)
# Optionally add an error entry to results_data if needed
# Display comparison results using Rich
if results_data:
console.print(Rule("[bold green]Comparison Results[/bold green]"))
for result_item in results_data:
model = result_item["model"]
time_s = result_item["time"]
tokens = result_item.get("tokens", {}).get("total", 0)
tokens_per_second = tokens / time_s if time_s > 0 else 0
cost = result_item.get("cost", 0.0)
text = result_item.get("text", "[red]Error generating response[/red]").strip()
stats_line = (
f"Time: [yellow]{time_s:.2f}s[/yellow] | "
f"Tokens: [cyan]{tokens}[/cyan] | "
f"Speed: [blue]{tokens_per_second:.1f} tok/s[/blue] | "
f"Cost: [green]${cost:.6f}[/green]"
)
console.print(Panel(
escape(text),
title=f"[bold magenta]{escape(model)}[/bold magenta]",
subtitle=stats_line,
border_style="blue",
expand=False
))
console.print()
except Exception as e:
logger.error(f"Error in model comparison: {str(e)}", emoji_key="error", exc_info=True)
# Optionally re-raise or handle differently
async def demonstrate_system_prompt(tracker: CostTracker):
"""Demonstrate Claude with system prompts."""
console.print(Rule("[bold blue]Claude System Prompt Demonstration[/bold blue]"))
logger.info("Demonstrating Claude with system prompts", emoji_key="start")
# Create Gateway instance - this handles provider initialization
gateway = Gateway("claude-demo", register_tools=False)
# Initialize providers
logger.info("Initializing providers...", emoji_key="provider")
await gateway._initialize_providers()
provider_name = Provider.ANTHROPIC.value
try:
# Get the provider from the gateway
provider = gateway.providers.get(provider_name)
if not provider:
logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
return
# Use a fast Claude model (ensure it's available)
model = "anthropic/claude-3-5-haiku-20241022"
available_models = await provider.list_models()
if model not in [m["id"] for m in available_models]:
logger.warning(f"Model {model} not available, falling back to default.", emoji_key="warning")
model = provider.get_default_model()
if not model:
logger.error("No suitable Claude model found for system prompt demo.", emoji_key="error")
return
logger.info(f"Using model: {model}", emoji_key="model")
system_prompt = """
You are a helpful assistant with expertise in physics.
Keep all explanations accurate but very concise.
Always provide real-world examples to illustrate concepts.
"""
user_prompt = "Explain the concept of gravity."
logger.info("Generating completion with system prompt", emoji_key="processing")
result = await provider.generate_completion(
prompt=user_prompt,
model=model,
temperature=0.7,
system=system_prompt,
max_tokens=1000 # Increased max_tokens
)
# Track the cost
tracker.add_call(result)
logger.success("Completion with system prompt successful", emoji_key="success")
# Display result using Rich Panels
console.print(Panel(
escape(system_prompt.strip()),
title="[bold cyan]System Prompt[/bold cyan]",
border_style="dim cyan",
expand=False
))
console.print(Panel(
escape(user_prompt.strip()),
title="[bold yellow]User Prompt[/bold yellow]",
border_style="dim yellow",
expand=False
))
console.print(Panel(
escape(result.text.strip()),
title="[bold green]Claude Response[/bold green]",
border_style="green",
expand=False
))
# Display stats in a small table
stats_table = Table(title="Execution Stats", show_header=False, box=box.MINIMAL, expand=False)
stats_table.add_column("Metric", style="cyan")
stats_table.add_column("Value", style="white")
stats_table.add_row("Input Tokens", str(result.input_tokens))
stats_table.add_row("Output Tokens", str(result.output_tokens))
stats_table.add_row("Cost", f"${result.cost:.6f}")
stats_table.add_row("Processing Time", f"{result.processing_time:.3f}s")
console.print(stats_table)
console.print()
except Exception as e:
logger.error(f"Error in system prompt demonstration: {str(e)}", emoji_key="error", exc_info=True)
# Optionally re-raise or handle differently
async def explore_claude_models():
"""Display available Claude models."""
console.print(Rule("[bold cyan]Available Claude Models[/bold cyan]"))
# Create Gateway instance - this handles provider initialization
gateway = Gateway("claude-demo", register_tools=False)
# Initialize providers
logger.info("Initializing providers...", emoji_key="provider")
await gateway._initialize_providers()
# Get provider from the gateway
provider = gateway.providers.get(Provider.ANTHROPIC.value)
if not provider:
logger.error(f"Provider {Provider.ANTHROPIC.value} not available or initialized", emoji_key="error")
return
# Get list of available models
models = await provider.list_models()
model_names = [m["id"] for m in models] # Extract names from model dictionaries
console.print(f"Found {len(model_names)} Claude models: [cyan]{escape(str(model_names))}[/cyan]")
async def main():
"""Run Claude integration examples."""
tracker = CostTracker() # Instantiate tracker here
try:
# Run model comparison
await compare_claude_models(tracker) # Pass tracker
console.print() # Add space between sections
# Run system prompt demonstration
await demonstrate_system_prompt(tracker) # Pass tracker
# Run explore Claude models
await explore_claude_models()
# Display final summary
tracker.display_summary(console) # Display summary at the end
except Exception as e:
logger.critical(f"Example failed: {str(e)}", emoji_key="critical", exc_info=True)
return 1
logger.success("Claude Integration Demo Finished Successfully!", emoji_key="complete")
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/comprehensive_test.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Comprehensive test script for Ultimate MCP Server
Tests specific tools and REST API endpoints
"""
import asyncio
import json
import aiohttp
from fastmcp import Client
async def test_mcp_interface():
"""Test the MCP interface functionality."""
server_url = "http://127.0.0.1:8013/mcp"
print("🔧 Testing MCP Interface")
print("=" * 40)
try:
async with Client(server_url) as client:
print("✅ MCP client connected")
# Test core tools
tools_to_test = [
("echo", {"message": "Hello MCP!"}),
("get_provider_status", {}),
("list_models", {}),
]
for tool_name, params in tools_to_test:
try:
result = await client.call_tool(tool_name, params)
if result:
print(f"✅ {tool_name}: OK")
# Show sample of result for key tools
if tool_name == "get_provider_status":
data = json.loads(result[0].text)
provider_count = len(data.get('providers', {}))
print(f" → {provider_count} providers configured")
elif tool_name == "list_models":
data = json.loads(result[0].text)
total_models = sum(len(models) for models in data.get('models', {}).values())
print(f" → {total_models} total models available")
else:
print(f"❌ {tool_name}: No response")
except Exception as e:
print(f"❌ {tool_name}: {e}")
# Test filesystem tools
print("\n📁 Testing filesystem access...")
try:
dirs_result = await client.call_tool("list_allowed_directories", {})
if dirs_result:
print("✅ Filesystem access configured")
except Exception as e:
print(f"❌ Filesystem access: {e}")
# Test Python execution
print("\n🐍 Testing Python sandbox...")
try:
python_result = await client.call_tool("execute_python", {
"code": "import sys; print(f'Python {sys.version_info.major}.{sys.version_info.minor}')"
})
if python_result:
result_data = json.loads(python_result[0].text)
if result_data.get('success'):
print("✅ Python sandbox working")
print(f" → {result_data.get('output', '').strip()}")
else:
print("❌ Python sandbox failed")
except Exception as e:
print(f"❌ Python sandbox: {e}")
except Exception as e:
print(f"❌ MCP interface failed: {e}")
async def test_rest_api():
"""Test the REST API endpoints."""
base_url = "http://127.0.0.1:8013"
print("\n🌐 Testing REST API Endpoints")
print("=" * 40)
async with aiohttp.ClientSession() as session:
# Test discovery endpoint
try:
async with session.get(f"{base_url}/") as response:
if response.status == 200:
data = await response.json()
print(f"✅ Discovery endpoint: {data.get('type')}")
print(f" → Transport: {data.get('transport')}")
print(f" → Endpoint: {data.get('endpoint')}")
else:
print(f"❌ Discovery endpoint: HTTP {response.status}")
except Exception as e:
print(f"❌ Discovery endpoint: {e}")
# Test health endpoint
try:
async with session.get(f"{base_url}/api/health") as response:
if response.status == 200:
data = await response.json()
print(f"✅ Health endpoint: {data.get('status')}")
else:
print(f"❌ Health endpoint: HTTP {response.status}")
except Exception as e:
print(f"❌ Health endpoint: {e}")
# Test OpenAPI docs
try:
async with session.get(f"{base_url}/api/docs") as response:
if response.status == 200:
print("✅ Swagger UI accessible")
else:
print(f"❌ Swagger UI: HTTP {response.status}")
except Exception as e:
print(f"❌ Swagger UI: {e}")
# Test cognitive states endpoint
try:
async with session.get(f"{base_url}/api/cognitive-states") as response:
if response.status == 200:
data = await response.json()
print(f"✅ Cognitive states: {data.get('total', 0)} states")
else:
print(f"❌ Cognitive states: HTTP {response.status}")
except Exception as e:
print(f"❌ Cognitive states: {e}")
# Test performance overview
try:
async with session.get(f"{base_url}/api/performance/overview") as response:
if response.status == 200:
data = await response.json()
overview = data.get('overview', {})
print(f"✅ Performance overview: {overview.get('total_actions', 0)} actions")
else:
print(f"❌ Performance overview: HTTP {response.status}")
except Exception as e:
print(f"❌ Performance overview: {e}")
# Test artifacts endpoint
try:
async with session.get(f"{base_url}/api/artifacts") as response:
if response.status == 200:
data = await response.json()
print(f"✅ Artifacts: {data.get('total', 0)} artifacts")
else:
print(f"❌ Artifacts: HTTP {response.status}")
except Exception as e:
print(f"❌ Artifacts: {e}")
async def test_tool_completions():
"""Test actual completions with available providers."""
server_url = "http://127.0.0.1:8013/mcp"
print("\n🤖 Testing LLM Completions")
print("=" * 40)
try:
async with Client(server_url) as client:
# Get available providers first
provider_result = await client.call_tool("get_provider_status", {})
provider_data = json.loads(provider_result[0].text)
available_providers = []
for name, status in provider_data.get('providers', {}).items():
if status.get('available') and status.get('models'):
available_providers.append((name, status['models'][0]))
if not available_providers:
print("❌ No providers available for testing")
return
# Test with first available provider
provider_name, model_info = available_providers[0]
model_id = model_info.get('id')
print(f"🧪 Testing with {provider_name} / {model_id}")
try:
result = await client.call_tool("generate_completion", {
"prompt": "Count from 1 to 5",
"provider": provider_name,
"model": model_id,
"max_tokens": 50
})
if result:
response_data = json.loads(result[0].text)
if response_data.get('success', True):
print("✅ Completion successful")
print(f" → Response: {response_data.get('text', '')[:100]}...")
if 'usage' in response_data:
usage = response_data['usage']
print(f" → Tokens: {usage.get('total_tokens', 'N/A')}")
else:
print(f"❌ Completion failed: {response_data.get('error')}")
else:
print("❌ No completion response")
except Exception as e:
print(f"❌ Completion error: {e}")
except Exception as e:
print(f"❌ Completion test failed: {e}")
async def test_memory_system():
"""Test the memory and cognitive state system."""
server_url = "http://127.0.0.1:8013/mcp"
print("\n🧠 Testing Memory System")
print("=" * 40)
try:
async with Client(server_url) as client:
# Test memory storage
try:
memory_result = await client.call_tool("store_memory", {
"memory_type": "test",
"content": "This is a test memory for the test client",
"importance": 7.5,
"tags": ["test", "client"]
})
if memory_result:
memory_data = json.loads(memory_result[0].text)
if memory_data.get('success'):
memory_id = memory_data.get('memory_id')
print(f"✅ Memory stored: {memory_id}")
# Test memory retrieval
try:
get_result = await client.call_tool("get_memory_by_id", {
"memory_id": memory_id
})
if get_result:
print("✅ Memory retrieved successfully")
except Exception as e:
print(f"❌ Memory retrieval: {e}")
else:
print(f"❌ Memory storage failed: {memory_data.get('error')}")
except Exception as e:
print(f"❌ Memory system: {e}")
# Test cognitive state
try:
state_result = await client.call_tool("save_cognitive_state", {
"state_type": "test_state",
"description": "Test cognitive state from client",
"data": {"test": True, "client": "test_client"}
})
if state_result:
state_data = json.loads(state_result[0].text)
if state_data.get('success'):
print("✅ Cognitive state saved")
else:
print(f"❌ Cognitive state failed: {state_data.get('error')}")
except Exception as e:
print(f"❌ Cognitive state: {e}")
except Exception as e:
print(f"❌ Memory system test failed: {e}")
async def main():
"""Run all comprehensive tests."""
print("🚀 Ultimate MCP Server Comprehensive Test Suite")
print("=" * 60)
# Test MCP interface
await test_mcp_interface()
# Test REST API
await test_rest_api()
# Test completions
await test_tool_completions()
# Test memory system
await test_memory_system()
print("\n🎯 Comprehensive testing completed!")
print("\nIf you see mostly ✅ symbols, your server is working correctly!")
print("Any ❌ symbols indicate areas that may need attention.")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/constants.py:
--------------------------------------------------------------------------------
```python
"""
Global constants and enumerations for the Ultimate MCP Server.
This module defines system-wide constants, enumerations, and mappings used throughout
the Ultimate MCP Server codebase. Centralizing these values ensures consistency across
the application and simplifies maintenance when values need to be updated.
The module includes:
- Provider enum: Supported LLM providers (OpenAI, Anthropic, etc.)
- TaskType enum: Categories of tasks that can be performed with LLMs
- LogLevel enum: Standard logging levels
- COST_PER_MILLION_TOKENS: Cost estimates for different models
- DEFAULT_MODELS: Default model mappings for each provider
- EMOJI_MAP: Emoji icons for enhanced logging and visualization
These constants should be imported and used directly rather than duplicating their
values in other parts of the codebase. This approach ensures that when values need
to be updated (e.g., adding a new provider or updating pricing), changes only need
to be made in this central location.
Example usage:
```python
from ultimate_mcp_server.constants import Provider, TaskType, EMOJI_MAP
# Use provider enum
default_provider = Provider.OPENAI
# Get emoji for logging
success_emoji = EMOJI_MAP["success"] # ✅
# Check task type
if task_type == TaskType.COMPLETION:
# Handle completion task
```
"""
from enum import Enum
from typing import Dict
class Provider(str, Enum):
"""
Enumeration of supported LLM providers in the Ultimate MCP Server.
This enum defines the canonical names for each supported large language model
provider in the system. These identifiers are used consistently throughout the
codebase for:
- Configuration settings (provider-specific API keys, endpoints, etc.)
- Tool parameters (selecting which provider to use for a task)
- Logging and error reporting (identifying the source of requests/responses)
- Cost calculation and billing (provider-specific pricing models)
New providers should be added here as they are integrated into the system.
The string values should be lowercase and match the provider's canonical name
where possible, as these values appear in API requests/responses.
Usage:
```python
# Reference a provider by enum
default_provider = Provider.OPENAI
# Convert between string and enum
provider_name = "anthropic"
provider_enum = Provider(provider_name) # Provider.ANTHROPIC
# Check if a provider is supported
if user_provider in Provider.__members__.values():
use_provider(user_provider)
```
"""
OPENAI = "openai"
ANTHROPIC = "anthropic"
DEEPSEEK = "deepseek"
GEMINI = "gemini"
OPENROUTER = "openrouter"
OLLAMA = "ollama"
GROK = "grok"
MISTRAL = "mistral"
AWS = "aws"
AZURE = "azure"
class TaskType(str, Enum):
"""
Enumeration of task types that can be performed by LLMs in the system.
This enum categorizes the different types of operations that LLMs can perform
within the MCP ecosystem. These task types are used for:
- Logging and analytics (tracking usage patterns by task type)
- Prompt selection (optimizing prompts for specific task types)
- Resource allocation (prioritizing resources for different task types)
- Performance monitoring (measuring success rates by task category)
The categorization helps organize tools in a semantically meaningful way and
provides metadata for optimizing the system's handling of different tasks.
When tools register with the system, they typically specify which task type
they represent.
Task types are roughly organized into these categories:
- Text generation (COMPLETION, GENERATION, etc.)
- Analysis and understanding (ANALYSIS, CLASSIFICATION, etc.)
- Data manipulation (EXTRACTION, TRANSLATION, etc.)
- System interaction (DATABASE, BROWSER, etc.)
- Document operations (DOCUMENT_PROCESSING, etc.)
Usage:
```python
# Log with task type
logger.info("Generating text completion", task_type=TaskType.COMPLETION)
# Register tool with its task type
@register_tool(name="generate_text", task_type=TaskType.COMPLETION)
async def generate_text(prompt: str):
# Implementation
```
"""
COMPLETION = "completion"
CHAT = "chat"
SUMMARIZATION = "summarization"
EXTRACTION = "extraction"
GENERATION = "generation"
ANALYSIS = "analysis"
CLASSIFICATION = "classification"
TRANSLATION = "translation"
QA = "qa"
DATABASE = "database"
QUERY = "query"
BROWSER = "browser"
DOWNLOAD = "download"
UPLOAD = "upload"
DOCUMENT_PROCESSING = "document_processing"
DOCUMENT = "document"
TEXT_ENHANCEMENT = "text_enhancement"
NER = "ner"
QUESTION_ANSWERING = "question_answering"
QUALITY_ASSESSMENT = "quality_assessment"
OCR = "ocr"
TEXT_EXTRACTION = "text_extraction"
CODE_EXECUTION = "code_execution"
class LogLevel(str, Enum):
"""Log levels."""
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
# Cost estimates for model pricing (in dollars per million tokens)
# This constant defines the estimated costs for different models, used for cost tracking and budgeting
# Values represent US dollars per million tokens, differentiated by input (prompt) and output (completion) costs
# These costs may change as providers update their pricing, and should be periodically reviewed
COST_PER_MILLION_TOKENS: Dict[str, Dict[str, float]] = {
# OpenAI models
"gpt-4o": {"input": 2.5, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.6},
"gpt-4.1": {"input": 2.0, "output": 8.0},
"gpt-4.1-mini": {"input": 0.40, "output": 1.60},
"gpt-4.1-nano": {"input": 0.10, "output": 0.40},
"o1-preview": {"input": 15.00, "output": 60.00},
"o3-mini": {"input": 1.10, "output": 4.40},
# Claude models
"claude-3-7-sonnet-20250219": {"input": 3.0, "output": 15.0},
"claude-3-5-haiku-20241022": {"input": 0.80, "output": 4.0},
"claude-3-opus-20240229": {"input": 15.0, "output": 75.0},
# DeepSeek models
"deepseek-chat": {"input": 0.27, "output": 1.10},
"deepseek-reasoner": {"input": 0.55, "output": 2.19},
# Gemini models
"gemini-2.0-flash-lite": {"input": 0.075, "output": 0.30},
"gemini-2.0-flash": {"input": 0.35, "output": 1.05},
"gemini-2.0-flash-thinking-exp-01-21": {"input": 0.0, "output": 0.0},
"gemini-2.5-pro-preview-03-25": {"input": 1.25, "output": 10.0},
# OpenRouter models
"mistralai/mistral-nemo": {"input": 0.035, "output": 0.08},
# Grok models (based on the provided documentation)
"grok-3-latest": {"input": 3.0, "output": 15.0},
"grok-3-fast-latest": {"input": 5.0, "output": 25.0},
"grok-3-mini-latest": {"input": 0.30, "output": 0.50},
"grok-3-mini-fast-latest": {"input": 0.60, "output": 4.0},
# Ollama models (very low estimated costs since they run locally)
"mix_77/gemma3-qat-tools:27b": {"input": 0.0001, "output": 0.0001},
"JollyLlama/GLM-Z1-32B-0414-Q4_K_M:latest": {"input": 0.0001, "output": 0.0001},
"llama3.2-vision:latest": {"input": 0.0001, "output": 0.0001},
}
# Default models by provider
# This mapping defines the recommended default model for each supported provider
# Used when no specific model is requested in API calls or tool invocations
# These defaults aim to balance quality, speed, and cost for general-purpose usage
DEFAULT_MODELS = {
Provider.OPENAI: "gpt-4.1-mini",
Provider.ANTHROPIC: "claude-3-5-haiku-20241022",
Provider.DEEPSEEK: "deepseek-chat",
Provider.GEMINI: "gemini-2.5-pro-preview-03-25",
Provider.OPENROUTER: "mistralai/mistral-nemo",
Provider.GROK: "grok-3-latest",
Provider.OLLAMA: "mix_77/gemma3-qat-tools:27b"
}
# Emoji mapping by log type and action
# Provides visual indicators for different log types, components, and actions
# Used in rich logging output to improve readability and visual scanning
# Organized into sections: general status, components, tasks, and providers
EMOJI_MAP = {
"start": "🚀",
"success": "✅",
"error": "❌",
"warning": "⚠️",
"info": "ℹ️",
"debug": "🔍",
"critical": "🔥",
# Component-specific emojis
"server": "🖥️",
"cache": "💾",
"provider": "🔌",
"request": "📤",
"response": "📥",
"processing": "⚙️",
"model": "🧠",
"config": "🔧",
"token": "🔢",
"cost": "💰",
"time": "⏱️",
"tool": "🛠️",
"tournament": "🏆",
"cancel": "🛑",
"database": "🗄️",
"browser": "🌐",
# Task-specific emojis
"completion": "✍️",
"chat": "💬",
"summarization": "📝",
"extraction": "🔍",
"generation": "🎨",
"analysis": "📊",
"classification": "🏷️",
"query": "🔍",
"browser_automation": "🌐",
"database_interactions": "🗄️",
"download": "⬇️",
"upload": "⬆️",
"document_processing": "📄",
"document": "📄",
"translation": "🔄",
"qa": "❓",
# Provider-specific emojis
Provider.OPENAI: "🟢",
Provider.ANTHROPIC: "🟣",
Provider.DEEPSEEK: "🟠",
Provider.GEMINI: "🔵",
Provider.OPENROUTER: "🌐",
Provider.OLLAMA: "🦙",
Provider.GROK: "⚡"
}
# Base toolset categories for the server
BASE_TOOLSET_CATEGORIES = {
"Completion": ["generate_completion", "stream_completion", "chat_completion", "multi_completion"],
"Provider": ["get_provider_status", "list_models"],
"Filesystem": ["read_file", "write_file", "edit_file", "list_directory", "directory_tree", "search_files"],
"Optimization": ["estimate_cost", "compare_models", "recommend_model"],
"Text Processing": ["run_ripgrep", "run_awk", "run_sed", "run_jq"],
"Meta": ["get_tool_info", "get_llm_instructions", "get_tool_recommendations"],
"Search": ["marqo_fused_search"],
# Browser automation tools
"Browser": [
"browser_init", "browser_navigate", "browser_click", "browser_type",
"browser_screenshot", "browser_close", "browser_select", "browser_checkbox",
"browser_get_text", "browser_get_attributes", "browser_execute_javascript",
"browser_wait", "browser_back", "browser_forward", "browser_reload",
"browser_get_console_logs", "browser_download_file", "browser_upload_file",
"browser_pdf", "browser_tab_new", "browser_tab_close", "browser_tab_list",
"browser_tab_select"
],
"Web Research": [
"execute_web_workflow", "extract_structured_data_from_pages",
"find_and_download_pdfs", "multi_engine_search_summary",
"monitor_web_data_points", "research_and_synthesize_report"
],
# HTML to markdown tools
"HTML Processing": [
"clean_and_format_text_as_markdown", "detect_content_type",
"batch_format_texts", "optimize_markdown_formatting"
],
# Extraction tools
"Extraction": [
"extract_json", "extract_table", "extract_key_value_pairs",
"extract_semantic_schema"
],
# Cognitive and agent memory tools
"Cognitive and Agent Memory": [
"initialize_memory_system", "create_workflow", "update_workflow_status",
"record_action_start", "record_action_completion", "get_action_details",
"summarize_context_block", "add_action_dependency", "get_action_dependencies",
"record_artifact", "record_thought", "store_memory", "get_memory_by_id",
"hybrid_search_memories", "create_memory_link",
"query_memories", "list_workflows", "get_workflow_details", "get_recent_actions",
"get_artifacts", "get_artifact_by_id", "create_thought_chain", "get_thought_chain",
"get_working_memory", "focus_memory", "optimize_working_memory",
"save_cognitive_state", "load_cognitive_state", "get_workflow_context",
"auto_update_focus", "promote_memory_level", "update_memory", "get_linked_memories",
"consolidate_memories", "generate_reflection", "summarize_text",
"delete_expired_memories", "compute_memory_statistics"
],
}
```
--------------------------------------------------------------------------------
/tests/unit/test_tools.py:
--------------------------------------------------------------------------------
```python
"""Tests for the tool implementations."""
from typing import Any, Dict
import pytest
from pytest import MonkeyPatch
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.tools.base import (
BaseTool,
register_tool,
with_retry,
with_tool_metrics,
)
# Remove the CompletionTools import as the class was deleted
# from ultimate_mcp_server.tools.completion import CompletionTools
from ultimate_mcp_server.tools.document import DocumentTools
from ultimate_mcp_server.tools.extraction import ExtractionTools
from ultimate_mcp_server.utils import get_logger
logger = get_logger("test.tools")
class TestBaseTools:
"""Tests for the base tool classes and decorators."""
def test_base_tool_init(self, mock_gateway: Gateway):
"""Test base tool initialization."""
logger.info("Testing base tool initialization", emoji_key="test")
# Create a minimal tool class
class TestTool(BaseTool):
tool_name = "test-tool"
description = "Test tool"
def _register_tools(self):
# No tools to register
pass
# Initialize
tool = TestTool(mock_gateway)
# Check properties
assert tool.tool_name == "test-tool"
assert tool.description == "Test tool"
assert tool.mcp == mock_gateway.mcp
assert tool.logger is not None
assert tool.metrics is not None
@pytest.mark.asyncio
async def test_with_tool_metrics(self):
"""Test the with_tool_metrics decorator."""
logger.info("Testing with_tool_metrics decorator", emoji_key="test")
# Create a tool class with metrics
class TestTool(BaseTool):
tool_name = "test-metrics-tool"
description = "Test metrics tool"
def _register_tools(self):
pass
@with_tool_metrics
async def test_method(self, arg1, arg2=None, ctx=None):
return {"result": arg1 + str(arg2 or "")}
# Create a mock MCP server
mock_mcp = type("MockMCP", (), {"tool": lambda: lambda x: x})
mock_gateway = type("MockGateway", (), {"mcp": mock_mcp})
# Initialize
tool = TestTool(mock_gateway)
# Call method
result = await tool.test_method("test", "arg")
# Check result
assert result == {"result": "testarg"}
# Check metrics
assert tool.metrics.total_calls == 1
assert tool.metrics.successful_calls == 1
assert tool.metrics.failed_calls == 0
# Test error case
@with_tool_metrics
async def failing_method(self, arg):
raise ValueError("Test error")
# Add to class
TestTool.failing_method = failing_method
# Call failing method
with pytest.raises(ValueError):
await tool.failing_method("test")
# Check metrics
assert tool.metrics.total_calls == 2
assert tool.metrics.successful_calls == 1
assert tool.metrics.failed_calls == 1
@pytest.mark.asyncio
async def test_with_retry(self):
"""Test the with_retry decorator."""
logger.info("Testing with_retry decorator", emoji_key="test")
# Track calls
calls = []
@with_retry(max_retries=2, retry_delay=0.1)
async def flaky_function(succeed_after):
calls.append(len(calls))
if len(calls) < succeed_after:
raise ValueError("Temporary error")
return "success"
# Should succeed on first try
calls = []
result = await flaky_function(1)
assert result == "success"
assert len(calls) == 1
# Should fail first, succeed on retry
calls = []
result = await flaky_function(2)
assert result == "success"
assert len(calls) == 2
# Should fail first two, succeed on second retry
calls = []
result = await flaky_function(3)
assert result == "success"
assert len(calls) == 3
# Should fail too many times
calls = []
with pytest.raises(ValueError):
await flaky_function(4) # Will make 3 attempts (original + 2 retries)
assert len(calls) == 3
def test_register_tool(self, mock_gateway: Gateway):
"""Test the register_tool decorator."""
logger.info("Testing register_tool decorator", emoji_key="test")
# Create a mock MCP server with a tool registration function
registered_tools = {}
class MockMCP:
def tool(self, name=None, description=None):
def decorator(f):
registered_tools[name or f.__name__] = {
"function": f,
"description": description or f.__doc__
}
return f
return decorator
mock_mcp = MockMCP()
mock_gateway.mcp = mock_mcp
# Register a tool
@register_tool(mock_gateway.mcp, name="test-tool", description="Test tool")
async def test_tool(arg1, arg2=None):
"""Tool docstring."""
return {"result": arg1 + str(arg2 or "")}
# Check registration
assert "test-tool" in registered_tools
assert registered_tools["test-tool"]["description"] == "Test tool"
# Register with defaults
@register_tool(mock_gateway.mcp)
async def another_tool(arg):
"""Another tool docstring."""
return {"result": arg}
# Check registration with defaults
assert "another_tool" in registered_tools
assert registered_tools["another_tool"]["description"] == "Another tool docstring."
# Comment out the entire TestCompletionTools class as it relies on the deleted class structure
# class TestCompletionTools:
# """Tests for the completion tools."""
#
# @pytest.fixture
# def mock_completion_tools(self, mock_gateway: Gateway) -> CompletionTools:
# """Get mock completion tools."""
# # This fixture is no longer valid as CompletionTools doesn't exist
# # We would need to refactor tests to mock standalone functions
# pass
# # return CompletionTools(mock_gateway)
#
# def test_init(self, mock_completion_tools: CompletionTools):
# """Test initialization."""
# logger.info("Testing completion tools initialization", emoji_key="test")
# # This test is no longer valid
# # assert mock_completion_tools.tool_name == "completion"
# # assert mock_completion_tools.description is not None
# pass
#
# async def test_generate_completion(self, mock_completion_tools: CompletionTools, mock_gateway: Gateway, monkeypatch: MonkeyPatch):
# """Test generate_completion tool."""
# logger.info("Testing generate_completion tool", emoji_key="test")
#
# # Mocking needs to target the standalone function now, not a method
# # This test needs complete refactoring
# pass
#
# async def test_chat_completion(self, mock_completion_tools: CompletionTools, mock_gateway: Gateway, monkeypatch: MonkeyPatch):
# """Test chat_completion tool."""
# logger.info("Testing chat_completion tool", emoji_key="test")
# # This test needs complete refactoring
# pass
#
# async def test_stream_completion(self, mock_completion_tools: CompletionTools, mock_gateway: Gateway, monkeypatch: MonkeyPatch):
# """Test stream_completion tool."""
# logger.info("Testing stream_completion tool", emoji_key="test")
# # This test needs complete refactoring
# pass
#
# async def test_multi_completion(self, mock_completion_tools: CompletionTools, mock_gateway: Gateway, monkeypatch: MonkeyPatch):
# """Test multi_completion tool."""
# logger.info("Testing multi_completion tool", emoji_key="test")
# # This test needs complete refactoring
# pass
class TestDocumentTools:
"""Tests for the document tools."""
@pytest.fixture
def mock_document_tools(self, mock_gateway: Gateway) -> DocumentTools:
"""Get mock document tools."""
return DocumentTools(mock_gateway)
def test_init(self, mock_document_tools: DocumentTools):
"""Test initialization."""
logger.info("Testing document tools initialization", emoji_key="test")
assert mock_document_tools.tool_name is not None
assert mock_document_tools.description is not None
async def test_chunk_document(self, mock_document_tools: DocumentTools, sample_document: str, monkeypatch: MonkeyPatch):
"""Test chunk_document tool."""
logger.info("Testing chunk_document tool", emoji_key="test")
# Create a simplified implementation for testing
async def mock_chunk_document(document, chunk_size=1000, chunk_overlap=100, method="token", ctx=None):
chunks = []
# Simple paragraph chunking for testing
for para in document.split("\n\n"):
if para.strip():
chunks.append(para.strip())
return {
"chunks": chunks,
"chunk_count": len(chunks),
"method": method,
"processing_time": 0.1
}
# Create a mock execute function for our BaseTool
async def mock_execute(tool_name, params):
# Call our mock implementation
return await mock_chunk_document(**params)
# Monkeypatch the tool execution using our new execute method
monkeypatch.setattr(mock_document_tools, "execute", mock_execute)
# Call the tool
result = await mock_document_tools.execute("chunk_document", {
"document": sample_document,
"method": "paragraph"
})
# Check result
assert isinstance(result, dict)
assert "chunks" in result
assert isinstance(result["chunks"], list)
assert result["chunk_count"] > 0
assert result["method"] == "paragraph"
assert result["processing_time"] > 0
class TestExtractionTools:
"""Tests for the extraction tools."""
@pytest.fixture
def mock_extraction_tools(self, mock_gateway: Gateway) -> ExtractionTools:
"""Get mock extraction tools."""
return ExtractionTools(mock_gateway)
def test_init(self, mock_extraction_tools: ExtractionTools):
"""Test initialization."""
logger.info("Testing extraction tools initialization", emoji_key="test")
assert mock_extraction_tools.tool_name == "extraction"
assert mock_extraction_tools.description is not None
async def test_extract_json(self, mock_extraction_tools: ExtractionTools, sample_json_data: Dict[str, Any], monkeypatch: MonkeyPatch):
"""Test extract_json tool."""
logger.info("Testing extract_json tool", emoji_key="test")
# Mock the tool execution
async def mock_extract_json(text, schema=None, provider="openai", model=None, max_attempts=3, ctx=None):
return {
"data": sample_json_data,
"provider": provider,
"model": model or "mock-model",
"tokens": {
"input": 50,
"output": 30,
"total": 80
},
"cost": 0.01,
"processing_time": 0.2
}
# Create a mock execute function for our BaseTool
async def mock_execute(tool_name, params):
# Call our mock implementation
return await mock_extract_json(**params)
# Monkeypatch the tool execution using our new execute method
monkeypatch.setattr(mock_extraction_tools, "execute", mock_execute)
# Call the tool
result = await mock_extraction_tools.execute("extract_json", {
"text": "Extract JSON from this: " + str(sample_json_data),
"provider": "mock",
"model": "mock-model"
})
# Check result
assert isinstance(result, dict)
assert "data" in result
assert result["data"] == sample_json_data
assert result["provider"] == "mock"
assert result["model"] == "mock-model"
assert "tokens" in result
assert "cost" in result
assert "processing_time" in result
```
--------------------------------------------------------------------------------
/examples/text_redline_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Comprehensive HTML‑redline demo that exercises **every** change type.
Run this file after you have installed/linked the Ultimate‑MCP‑Server package
in editable mode (``pip install -e .``) or added the repo root to ``PYTHONPATH``.
It generates a single HTML file (``./redline_outputs/comprehensive_redline_demo.html``)
that you can open in any browser to see insertions (blue), deletions (red),
move‑targets/sources (green), attribute changes (orange) and inline word‑level
diffs.
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
from typing import Dict, List
from rich import box
from rich.console import Console
from rich.markup import escape
from rich.table import Table
# ---------------------------------------------------------------------------
# 1. Make sure we can import the Ultimate‑MCP‑Server package from source.
# ---------------------------------------------------------------------------
ROOT = Path(__file__).resolve().parents[1] # repo root (../..)
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
# ---------------------------------------------------------------------------
# 2. Project‑level imports (raise immediately if the dev env is broken)
# ---------------------------------------------------------------------------
from ultimate_mcp_server.exceptions import ToolError # noqa: E402
from ultimate_mcp_server.tools.filesystem import write_file # noqa: E402
from ultimate_mcp_server.tools.text_redline_tools import ( # noqa: E402
create_html_redline, # noqa: E402
)
from ultimate_mcp_server.utils import get_logger # noqa: E402
# ---------------------------------------------------------------------------
# 3. Logger / console helpers
# ---------------------------------------------------------------------------
LOGGER = get_logger("demo.comprehensive_redline")
CONSOLE = Console()
# ---------------------------------------------------------------------------
# 4. Demo input documents (original vs. modified)
# ---------------------------------------------------------------------------
ORIGINAL_HTML = """<!DOCTYPE html>
<html>
<head>
<title>Comprehensive Demo Document</title>
<meta name="description" content="A document to demonstrate redlining features">
</head>
<body>
<h1>Project Documentation</h1>
<div class="intro">
<p>This project documentation covers all aspects of the Alpha system implementation.</p>
<p>Last updated on January 15, 2025</p>
</div>
<h2>Executive Summary</h2>
<p>The Alpha system provides robust data processing capabilities for enterprise applications.</p>
<p>This documentation serves as the primary reference for developers and system architects.</p>
<h2>Architecture Overview</h2>
<p>The system follows a microservices architecture with the following components:</p>
<ul>
<li>Data ingestion layer</li>
<li>Processing engine</li>
<li>Storage layer</li>
<li>API gateway</li>
</ul>
<h2>Implementation Details</h2>
<p>Implementation follows the standard protocol described in section 5.2 of the technical specifications.</p>
<p>All components must pass integration tests before deployment.</p>
<h2>Deployment Process</h2>
<p>Deployment occurs in three phases:</p>
<ol>
<li>Development environment validation</li>
<li>Staging environment testing</li>
<li>Production rollout</li>
</ol>
<p>Each phase requires approval from the technical lead.</p>
<h2>Security Considerations</h2>
<p>All data must be encrypted during transfer and at rest.</p>
<p>Authentication uses OAuth 2.0 with JWT tokens.</p>
<p>Regular security audits are conducted quarterly.</p>
<table border="1">
<tr>
<th>Component</th>
<th>Responsible Team</th>
<th>Status</th>
</tr>
<tr>
<td>Data ingestion</td>
<td>Data Engineering</td>
<td>Complete</td>
</tr>
<tr>
<td>Processing engine</td>
<td>Core Systems</td>
<td>In progress</td>
</tr>
<tr>
<td>Storage layer</td>
<td>Infrastructure</td>
<td>Complete</td>
</tr>
<tr>
<td>API gateway</td>
<td>API Team</td>
<td>Planning</td>
</tr>
</table>
<h2>Appendix</h2>
<p>For additional information, refer to the technical specifications document.</p>
<p>Contact <a href="mailto:[email protected]">[email protected]</a> with any questions.</p>
</body>
</html>"""
MODIFIED_HTML = """<!DOCTYPE html>
<html>
<head>
<title>Comprehensive Demo Document - 2025 Update</title>
<meta name="description" content="A document to demonstrate all redlining features">
<meta name="author" content="Documentation Team">
</head>
<body>
<h1>Project Documentation</h1>
<div class="intro">
<p>This project documentation covers all aspects of the Alpha system implementation and integration.</p>
<p>Last updated on May 5, 2025</p>
</div>
<h2>Appendix</h2>
<p>For additional information, refer to the technical specifications document and API references.</p>
<p>Contact <a href="mailto:[email protected]">[email protected]</a> with any questions.</p>
<h2>Security Considerations</h2>
<p>All data must be encrypted during transfer and at rest using AES-256 encryption.</p>
<p>Authentication uses OAuth 2.0 with JWT tokens and optional two-factor authentication.</p>
<p>Regular security audits are conducted quarterly by an independent security firm.</p>
<p>Penetration testing is performed bi-annually.</p>
<h2>Executive Summary</h2>
<p>The Alpha system provides robust data processing capabilities for enterprise applications with enhanced performance.</p>
<p>This documentation serves as the primary reference for developers, system architects, and operations teams.</p>
<p>The system has been validated against ISO 27001 standards.</p>
<h2>Architecture Overview</h2>
<p>The system implements a cloud-native microservices architecture with the following components:</p>
<ul>
<li>Data ingestion layer with real-time processing</li>
<li>Distributed processing engine</li>
<li>Multi-region storage layer</li>
<li>API gateway with rate limiting</li>
<li>Monitoring and observability platform</li>
<li>Disaster recovery system</li>
</ul>
<h2>Implementation Details</h2>
<p>Implementation follows the enhanced protocol described in section 6.3 of the technical specifications.</p>
<p>All components must pass integration and performance tests before deployment.</p>
<table border="1">
<tr>
<th>Component</th>
<th>Responsible Team</th>
<th>Status</th>
<th>Performance</th>
</tr>
<tr>
<td>Data ingestion</td>
<td>Data Engineering</td>
<td>Complete</td>
<td>Exceeds SLA</td>
</tr>
<tr>
<td>Processing engine</td>
<td>Core Systems</td>
<td>Complete</td>
<td>Meets SLA</td>
</tr>
<tr>
<td>Storage layer</td>
<td>Infrastructure</td>
<td>Complete</td>
<td>Meets SLA</td>
</tr>
<tr>
<td>API gateway</td>
<td>API Team</td>
<td>Complete</td>
<td>Exceeds SLA</td>
</tr>
<tr>
<td>Monitoring platform</td>
<td>DevOps</td>
<td>Complete</td>
<td>Meets SLA</td>
</tr>
</table>
<h2>Scalability Considerations</h2>
<p>The system is designed to scale horizontally with increasing load.</p>
<p>Auto-scaling policies are configured for all compute resources.</p>
<p>Database sharding is implemented for high-volume tenants.</p>
</body>
</html>"""
# ---------------------------------------------------------------------------
# 5. Human‑readable change checklist (for demo output only)
# ---------------------------------------------------------------------------
CHANGE_SUMMARY: Dict[str, List[str]] = {
"insertions": [
"New <meta author> tag",
"'and integration' added to intro paragraph",
"AES‑256 wording added to encryption para",
"Two‑factor authentication mention added",
"Independent security firm phrase added",
"Entire penetration‑testing paragraph added",
"'with enhanced performance' in exec summary",
"Audience now includes operations teams",
"ISO‑27001 paragraph added",
"'cloud‑native' adjective added",
"Real‑time processing detail added",
"'Distributed' processing engine detail",
"Multi‑region storage detail",
"Rate‑limiting mention in API gateway",
"Two new architecture components",
"Protocol reference bumped to 6.3",
"Performance tests requirement added",
"New PERFORMANCE column in table",
"New Monitoring‑platform row",
"Whole SCALABILITY section added",
],
"deletions": [
"API‑gateway status 'Planning' removed",
"Deployment‑process section removed",
],
"moves": [
"Appendix moved before Security section",
"Security section moved before Exec‑Summary",
],
"updates": [
"<title> suffixed with '2025 Update'",
"Meta description tweaked",
"Updated date to 5 May 2025",
"Support e‑mail address changed",
"Processing‑engine status updated",
],
}
# ---------------------------------------------------------------------------
# 6. Async helper running the diff + reporting
# ---------------------------------------------------------------------------
OUTPUT_DIR = Path(__file__).with_suffix("").parent / "redline_outputs"
MARKDOWN_PATH = OUTPUT_DIR / "detected_redline_differences.md"
async def generate_redline() -> None:
CONSOLE.print("\n[bold blue]Generating HTML redline…[/bold blue]")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
try:
result = await create_html_redline(
original_html=ORIGINAL_HTML,
modified_html=MODIFIED_HTML,
detect_moves=True,
include_css=True,
add_navigation=True,
output_format="html",
generate_markdown=True,
markdown_path=str(MARKDOWN_PATH),
)
except Exception as exc: # demo only
LOGGER.error("Failed to generate redline", exc_info=True)
CONSOLE.print(f"[red bold]Error:[/red bold] {escape(str(exc))}")
return
# ── Rich stats table ────────────────────────────────────────────────────────────
stats_tbl = Table(title="Redline statistics", box=box.ROUNDED)
stats_tbl.add_column("Metric", style="cyan")
stats_tbl.add_column("Value", style="magenta")
for k, v in result["stats"].items():
stats_tbl.add_row(k.replace("_", " ").title(), str(v))
stats_tbl.add_row("Processing time", f"{result['processing_time']:.3f}s")
CONSOLE.print(stats_tbl)
# ── manual checklist ────────────────────────────────────────────────────────────
CONSOLE.print("\n[bold green]Manual checklist of expected changes[/bold green]")
for cat, items in CHANGE_SUMMARY.items():
CONSOLE.print(f"[cyan]{cat.title()}[/cyan] ({len(items)})")
for idx, txt in enumerate(items, 1):
CONSOLE.print(f" {idx:>2}. {txt}")
# ── write HTML diff ─────────────────────────────────────────────────────────────
html_path = OUTPUT_DIR / "comprehensive_redline_demo.html"
try:
await write_file(path=str(html_path), content=result["redline_html"])
except (ToolError, Exception) as exc: # demo only
LOGGER.warning("Unable to save HTML", exc_info=True)
CONSOLE.print(f"\n[bold red]Warning:[/bold red] Could not save HTML — {exc}")
else:
LOGGER.info("Saved redline to %s", html_path)
CONSOLE.print(f"\n[green]HTML written to:[/green] {html_path}")
# ── ensure Markdown file exists (tool usually writes it already) ────────────────
if not MARKDOWN_PATH.is_file() and "markdown_summary" in result:
MARKDOWN_PATH.write_text(result["markdown_summary"], encoding="utf-8")
if MARKDOWN_PATH.is_file():
CONSOLE.print(f"[green]Markdown summary:[/green] {MARKDOWN_PATH}")
# ───────────────────────────── 7. entrypoint ────────────────────────────────────────
async def _amain() -> int:
CONSOLE.rule("[white on blue]📝 Comprehensive Text-Redline Demo 📝")
await generate_redline()
CONSOLE.rule("Complete", style="green")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(_amain()))
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/manager.py:
--------------------------------------------------------------------------------
```python
"""Knowledge base manager for RAG functionality."""
import time
from typing import Any, Dict, List, Optional
from ultimate_mcp_server.services.vector import VectorDatabaseService
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
class KnowledgeBaseManager:
"""
Manager for creating and maintaining knowledge bases for RAG applications.
The KnowledgeBaseManager provides a high-level interface for working with vector
databases as knowledge bases for Retrieval-Augmented Generation (RAG) systems.
It abstracts the complexities of vector database operations, focusing on the
domain-specific needs of knowledge management for AI applications.
Key Features:
- Knowledge base lifecycle management (create, delete, list, get)
- Document ingestion with metadata support
- Vector embedding management for semantic search
- Document chunking and processing
- Persistence and durability guarantees
- Metadata tracking for knowledge base statistics
Architecture:
The manager sits between RAG applications and the underlying vector database,
providing domain-specific operations while delegating storage and embedding
to specialized services. It primarily interacts with:
1. Vector Database Service - for persistent storage of embeddings and documents
2. Embedding Service - for converting text to vector representations
3. Text Chunking Service - for breaking documents into optimal retrieval units
Technical Characteristics:
- Asynchronous API for high throughput in server environments
- Thread-safe operations for concurrent access
- Consistent error handling and logging
- Idempotent operations where possible
- Transactional guarantees for critical operations
This service is typically accessed through the singleton get_knowledge_base_manager()
function, which ensures a single instance is shared across the application.
Example Usage:
```python
# Get the manager
kb_manager = get_knowledge_base_manager()
# Create a new knowledge base
await kb_manager.create_knowledge_base(
name="company_policies",
description="Corporate policy documents and guidelines"
)
# Add documents with metadata
await kb_manager.add_documents(
knowledge_base_name="company_policies",
documents=[
"All employees must complete annual security training.",
"Remote work is available for eligible positions with manager approval."
],
metadatas=[
{"source": "security_policy.pdf", "category": "security", "page": 12},
{"source": "hr_handbook.pdf", "category": "remote_work", "page": 45}
],
chunk_size=500,
chunk_method="semantic"
)
# List available knowledge bases
kb_list = await kb_manager.list_knowledge_bases()
print(f"Found {kb_list['count']} knowledge bases")
# Get details about a specific knowledge base
kb_info = await kb_manager.get_knowledge_base("company_policies")
doc_count = kb_info.get("metadata", {}).get("doc_count", 0)
print(f"Knowledge base contains {doc_count} document chunks")
```
"""
def __init__(self, vector_service: VectorDatabaseService):
"""Initialize the knowledge base manager.
Args:
vector_service: Vector database service for storing embeddings
"""
self.vector_service = vector_service
logger.info("Knowledge base manager initialized", extra={"emoji_key": "success"})
async def create_knowledge_base(
self,
name: str,
description: Optional[str] = None,
embedding_model: Optional[str] = None,
overwrite: bool = False
) -> Dict[str, Any]:
"""Create a new knowledge base.
Args:
name: Knowledge base name
description: Optional description
embedding_model: Optional embedding model name
overwrite: Whether to overwrite existing knowledge base
Returns:
Knowledge base metadata
"""
# Check if knowledge base already exists
collections = await self.vector_service.list_collections()
if name in collections and not overwrite:
logger.warning(
f"Knowledge base '{name}' already exists",
extra={"emoji_key": "warning"}
)
return {"status": "exists", "name": name}
# Create new collection for knowledge base
metadata = {
"type": "knowledge_base",
"description": description or "",
"created_at": time.time(),
"doc_count": 0
}
# Only add embedding_model if not None (to avoid ChromaDB errors)
if embedding_model is not None:
metadata["embedding_model"] = embedding_model
logger.debug(f"Creating knowledge base with metadata: {metadata}")
# Ensure any existing collection is deleted first
if overwrite:
try:
# Force delete any existing collection
await self.vector_service.delete_collection(name)
logger.debug(f"Force deleted existing collection '{name}' for clean creation")
# Add a small delay to ensure deletion completes
import asyncio
await asyncio.sleep(0.2)
except Exception as e:
logger.debug(f"Error during force deletion: {str(e)}")
try:
await self.vector_service.create_collection(name, metadata=metadata)
logger.info(
f"Created knowledge base '{name}'",
extra={"emoji_key": "success"}
)
return {
"status": "created",
"name": name,
"metadata": metadata
}
except Exception as e:
logger.error(
f"Failed to create knowledge base '{name}': {str(e)}",
extra={"emoji_key": "error"}
)
raise ValueError(f"Failed to create knowledge base: {str(e)}") from e
async def delete_knowledge_base(self, name: str) -> Dict[str, Any]:
"""Delete a knowledge base.
Args:
name: Knowledge base name
Returns:
Deletion status
"""
# Check if knowledge base exists
collections = await self.vector_service.list_collections()
if name not in collections:
logger.warning(
f"Knowledge base '{name}' not found",
extra={"emoji_key": "warning"}
)
return {"status": "not_found", "name": name}
# Delete collection
await self.vector_service.delete_collection(name)
logger.info(
f"Deleted knowledge base '{name}'",
extra={"emoji_key": "success"}
)
return {
"status": "deleted",
"name": name
}
async def list_knowledge_bases(self):
"""List all knowledge bases.
Returns:
List of knowledge bases with metadata
"""
collection_names = await self.vector_service.list_collections()
kb_list = []
for name in collection_names:
try:
metadata = await self.vector_service.get_collection_metadata(name)
# Only include collections that are knowledge bases
if metadata and metadata.get("type") == "knowledge_base":
# Create a simple dict with name and metadata
kb = {
"name": name,
"metadata": metadata
}
kb_list.append(kb)
except Exception as e:
logger.error(
f"Error getting metadata for collection '{name}': {str(e)}",
extra={"emoji_key": "error"}
)
return {
"count": len(kb_list),
"knowledge_bases": kb_list
}
async def get_knowledge_base(self, name: str) -> Dict[str, Any]:
"""Get knowledge base metadata.
Args:
name: Knowledge base name
Returns:
Knowledge base metadata
"""
# Check if knowledge base exists
collections = await self.vector_service.list_collections()
if name not in collections:
logger.warning(
f"Knowledge base '{name}' not found",
extra={"emoji_key": "warning"}
)
return {"status": "not_found", "name": name}
# Get metadata
metadata = await self.vector_service.get_collection_metadata(name)
if metadata.get("type") != "knowledge_base":
logger.warning(
f"Collection '{name}' is not a knowledge base",
extra={"emoji_key": "warning"}
)
return {"status": "not_knowledge_base", "name": name}
return {
"status": "found",
"name": name,
"metadata": metadata
}
async def add_documents(
self,
knowledge_base_name: str,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None,
embedding_model: Optional[str] = None,
chunk_size: int = 1000,
chunk_overlap: int = 200,
chunk_method: str = "semantic"
) -> Dict[str, Any]:
"""Add documents to a knowledge base.
Args:
knowledge_base_name: Knowledge base name
documents: List of document texts
metadatas: Optional list of document metadata
ids: Optional list of document IDs
embedding_model: Optional embedding model name
chunk_size: Chunk size for document processing
chunk_overlap: Chunk overlap for document processing
chunk_method: Chunking method (token, semantic, etc.)
Returns:
Document addition status
"""
logger.debug(f"DEBUG: Adding documents to knowledge base '{knowledge_base_name}'")
logger.debug(f"DEBUG: Document count: {len(documents)}")
logger.debug(f"DEBUG: First document sample: {documents[0][:100]}...")
logger.debug(f"DEBUG: Metadatas: {metadatas[:2] if metadatas else None}")
logger.debug(f"DEBUG: Chunk settings - size: {chunk_size}, overlap: {chunk_overlap}, method: {chunk_method}")
# Check if knowledge base exists
kb_info = await self.get_knowledge_base(knowledge_base_name)
if kb_info["status"] != "found":
logger.warning(
f"Knowledge base '{knowledge_base_name}' not found",
extra={"emoji_key": "warning"}
)
return {"status": "not_found", "name": knowledge_base_name}
try:
# Add documents to vector store
doc_ids = await self.vector_service.add_texts(
collection_name=knowledge_base_name,
texts=documents,
metadatas=metadatas,
ids=ids,
embedding_model=embedding_model
)
# Update document count in metadata
current_metadata = await self.vector_service.get_collection_metadata(knowledge_base_name)
doc_count = current_metadata.get("doc_count", 0) + len(documents)
# Prepare metadata updates
metadata_updates = {"doc_count": doc_count}
# Store embedding model in metadata if provided (for consistent retrieval)
if embedding_model:
metadata_updates["embedding_model"] = embedding_model
# Update metadata
await self.vector_service.update_collection_metadata(
name=knowledge_base_name,
metadata=metadata_updates
)
logger.info(
f"Added {len(documents)} documents to knowledge base '{knowledge_base_name}'",
extra={"emoji_key": "success"}
)
return {
"status": "success",
"name": knowledge_base_name,
"added_count": len(documents),
"ids": doc_ids
}
except Exception as e:
logger.error(
f"Error adding documents to knowledge base '{knowledge_base_name}': {str(e)}",
extra={"emoji_key": "error"}
)
raise
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/cli/helpers.py:
--------------------------------------------------------------------------------
```python
"""Helper functions for the Ultimate MCP Server CLI."""
import json
import sys
from typing import Any, Dict, List, Optional, Union
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.syntax import Syntax
from rich.table import Table
from ultimate_mcp_server.config import get_env
from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS, Provider
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
console = Console(file=sys.stderr)
def print_cost_table() -> None:
"""Display pricing information for all supported LLM models.
This function creates and prints a formatted table showing the cost per million tokens
for various LLM models across all supported providers (OpenAI, Anthropic, DeepSeek, etc.).
The table separates input token costs from output token costs, as these are typically
billed at different rates.
Models are grouped by provider and sorted alphabetically for easy reference.
This information is useful for cost planning, provider comparison, and
understanding the financial implications of different model choices.
"""
# Create table
table = Table(title="Model Cost Per Million Tokens")
table.add_column("Provider", style="cyan")
table.add_column("Model", style="blue")
table.add_column("Input ($/M)", style="green")
table.add_column("Output ($/M)", style="yellow")
# Group models by provider
models_by_provider = {}
for model, costs in COST_PER_MILLION_TOKENS.items():
# Determine provider
provider = None
if "gpt" in model:
provider = Provider.OPENAI.value
elif "claude" in model:
provider = Provider.ANTHROPIC.value
elif "deepseek" in model:
provider = Provider.DEEPSEEK.value
elif "gemini" in model:
provider = Provider.GEMINI.value
else:
provider = "other"
if provider not in models_by_provider:
models_by_provider[provider] = []
models_by_provider[provider].append((model, costs))
# Add rows for each provider's models
for provider in sorted(models_by_provider.keys()):
models = sorted(models_by_provider[provider], key=lambda x: x[0])
for model, costs in models:
table.add_row(
provider,
model,
f"${costs['input']:.3f}",
f"${costs['output']:.3f}"
)
# Print table
console.print(table)
def format_tokens(tokens: int) -> str:
"""Format token count with thousands separator for better readability.
Converts raw token counts (e.g., 1234567) into a more human-readable format
with commas as thousand separators (e.g., "1,234,567"). This improves
the readability of token usage statistics in CLI outputs and reports.
Args:
tokens: Raw token count as an integer
Returns:
Formatted string with thousand separators (e.g., "1,234,567")
"""
return f"{tokens:,}"
def format_duration(seconds: float) -> str:
"""Format time duration in a human-friendly, adaptive format.
Converts raw seconds into a more readable format, automatically selecting
the appropriate unit based on the magnitude:
- Milliseconds for durations under 0.1 seconds
- Seconds with decimal precision for durations under 60 seconds
- Minutes and seconds for longer durations
This provides intuitive time displays in benchmarks and performance reports.
Args:
seconds: Duration in seconds (can be fractional)
Returns:
Formatted string like "50ms", "2.45s", or "1m 30.5s" depending on duration
"""
if seconds < 0.1:
return f"{seconds * 1000:.0f}ms"
elif seconds < 60:
return f"{seconds:.2f}s"
else:
minutes = int(seconds // 60)
remaining_seconds = seconds % 60
return f"{minutes}m {remaining_seconds:.1f}s"
def save_output_to_file(text: str, file_path: str, mode: str = "w") -> bool:
"""Write text content to a file with error handling and user feedback.
This utility function safely writes text to a file, handling encoding
and providing user feedback on success or failure. It's commonly used
to save LLM outputs, generated code, or other text data for later use.
Args:
text: The string content to write to the file
file_path: Target file path (absolute or relative to current directory)
mode: File open mode - "w" for overwrite or "a" for append to existing content
Returns:
Boolean indicating success (True) or failure (False)
"""
try:
with open(file_path, mode, encoding="utf-8") as f:
f.write(text)
console.print(f"[green]Output saved to {file_path}[/green]")
return True
except Exception as e:
console.print(f"[red]Error saving output: {str(e)}[/red]")
return False
def load_file_content(file_path: str) -> Optional[str]:
"""Read and return the entire contents of a text file.
This utility function safely reads text from a file with proper UTF-8 encoding,
handling any errors that may occur during the process. It's useful for loading
prompts, templates, or other text files needed for LLM operations.
Args:
file_path: Path to the file to read (absolute or relative to current directory)
Returns:
The file's contents as a string if successful, or None if an error occurred
"""
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
console.print(f"[red]Error loading file: {str(e)}[/red]")
return None
def print_markdown(markdown_text: str) -> None:
"""Display Markdown content with proper formatting and styling.
Renders Markdown text with appropriate styling (headings, bold, italic,
lists, code blocks, etc.) in the terminal using Rich's Markdown renderer.
This provides a more readable and visually appealing output for
documentation, examples, or LLM responses that use Markdown formatting.
Args:
markdown_text: Raw Markdown-formatted text to render
"""
md = Markdown(markdown_text)
console.print(md)
def print_json(json_data: Union[Dict, List]) -> None:
"""Display JSON data with syntax highlighting and proper formatting.
Converts a Python dictionary or list into a properly indented JSON string
and displays it with syntax highlighting for improved readability.
This is useful for displaying API responses, configuration data,
or other structured data in a human-friendly format.
Args:
json_data: Python dictionary or list to be displayed as formatted JSON
"""
json_str = json.dumps(json_data, indent=2)
syntax = Syntax(json_str, "json", theme="monokai", word_wrap=True)
console.print(syntax)
def print_code(code: str, language: str = "python") -> None:
"""Display source code with syntax highlighting and line numbers.
Renders code with proper syntax highlighting based on the specified language,
along with line numbers for easier reference. This improves readability
when displaying code examples, LLM-generated code, or code snippets
from files.
Args:
code: Source code text to display
language: Programming language for syntax highlighting (e.g., "python",
"javascript", "rust", "sql", etc.)
"""
syntax = Syntax(code, language, theme="monokai", line_numbers=True)
console.print(syntax)
def print_model_comparison(
provider: str,
models: List[str],
metrics: List[Dict[str, Any]]
) -> None:
"""Display a side-by-side comparison of multiple models from the same provider.
Creates a formatted table comparing performance metrics for different models
from the same LLM provider. This is useful for identifying the optimal model
for specific use cases based on response time, throughput, and cost metrics.
The comparison includes:
- Response time (formatted appropriately for the magnitude)
- Processing speed (tokens per second)
- Cost per request
- Total token usage
Args:
provider: Name of the LLM provider (e.g., "openai", "anthropic")
models: List of model identifiers to compare
metrics: List of dictionaries containing performance metrics for each model,
with keys like "time", "tokens_per_second", "cost", "total_tokens"
"""
# Create table
table = Table(title=f"{provider.capitalize()} Model Comparison")
table.add_column("Model", style="cyan")
table.add_column("Response Time", style="green")
table.add_column("Tokens/Sec", style="yellow")
table.add_column("Cost", style="magenta")
table.add_column("Total Tokens", style="dim")
# Add rows for each model
for model, metric in zip(models, metrics, strict=False):
table.add_row(
model,
format_duration(metric.get("time", 0)),
f"{metric.get('tokens_per_second', 0):.1f}",
f"${metric.get('cost', 0):.6f}",
format_tokens(metric.get("total_tokens", 0))
)
# Print table
console.print(table)
def print_environment_info() -> None:
"""Display current environment configuration for diagnostics.
Creates a formatted table showing important environment variables and their
current values, with a focus on API keys, logging configuration, and cache settings.
This is useful for troubleshooting and verifying that the environment is
configured correctly before running the server or other commands.
The output includes:
- Status of API keys for each supported provider (set or not set)
- Logging level configuration
- Cache settings
- Other relevant environment variables
"""
# Create table
table = Table(title="Environment Information")
table.add_column("Setting", style="cyan")
table.add_column("Value", style="green")
# Add API key info
for provider in [p.value for p in Provider]:
env_var = f"{provider.upper()}_API_KEY"
has_key = bool(get_env(env_var))
table.add_row(env_var, "✅ Set" if has_key else "❌ Not set")
# Add other environment variables
for var in ["LOG_LEVEL", "CACHE_ENABLED", "CACHE_DIR"]:
value = get_env(var, "Not set")
table.add_row(var, value)
# Print table
console.print(table)
def print_examples() -> None:
"""Display common usage examples for the CLI commands.
Shows a set of syntax-highlighted example commands demonstrating how to use
the most common features of the Ultimate MCP Server CLI. This helps users
quickly learn the command patterns and options available without having to
consult the full documentation.
Examples cover:
- Starting the server
- Listing and testing providers
- Generating completions (with and without streaming)
- Running benchmarks
- Managing the cache
"""
examples = """
# Run the server
ultimate-mcp-server run --host 0.0.0.0 --port 8013
# List available providers
ultimate-mcp-server providers --check
# Test a provider
ultimate-mcp-server test openai --model gpt-4.1-mini --prompt "Hello, world!"
# Generate a completion
ultimate-mcp-server complete --provider anthropic --model claude-3-5-haiku-20241022 --prompt "Explain quantum computing"
# Stream a completion
ultimate-mcp-server complete --provider openai --stream --prompt "Write a poem about AI"
# Run benchmarks
ultimate-mcp-server benchmark --providers openai anthropic --runs 3
# Check cache status
ultimate-mcp-server cache --status
# Clear cache
ultimate-mcp-server cache --clear
"""
syntax = Syntax(examples, "bash", theme="monokai", word_wrap=True)
console.print(Panel(syntax, title="CLI Examples", border_style="cyan"))
def confirm_action(message: str, default: bool = False) -> bool:
"""Prompt the user for confirmation before performing a potentially destructive action.
Displays a yes/no prompt with the specified message and waits for user input.
This is used to confirm potentially destructive operations like clearing the cache
or deleting files to prevent accidental data loss.
Args:
message: The question or confirmation message to display to the user
default: The default response if the user just presses Enter without typing
anything (True for yes, False for no)
Returns:
Boolean indicating whether the user confirmed (True) or canceled (False) the action
"""
default_str = "Y/n" if default else "y/N"
response = input(f"{message} [{default_str}]: ")
if not response:
return default
return response.lower() in ["y", "yes"]
```
--------------------------------------------------------------------------------
/examples/measure_model_speeds.py:
--------------------------------------------------------------------------------
```python
import argparse
import asyncio
import json
import os
import sys
import time
from typing import Any, Dict, List
from rich.console import Console
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from rich.table import Table
# --- Add project root to sys.path ---
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
sys.path.insert(0, project_root)
# -------------------------------------
from ultimate_mcp_server.constants import ( # noqa: E402
COST_PER_MILLION_TOKENS,
Provider,
)
from ultimate_mcp_server.exceptions import ( # noqa: E402
ProviderError,
ToolError,
)
from ultimate_mcp_server.tools.completion import generate_completion # noqa: E402
from ultimate_mcp_server.utils import get_logger # noqa: E402
from ultimate_mcp_server.utils.display import CostTracker # noqa: E402
# Use Rich Console for better output
console = Console()
logger = get_logger("measure_model_speeds")
# --- Configuration ---
DEFAULT_PROMPT = (
"Explain the concept of Transfer Learning in Machine Learning in about 300 words. "
"Detail its primary benefits, common use cases across different domains (like NLP and Computer Vision), "
"and mention potential challenges or limitations when applying it."
)
DEFAULT_OUTPUT_FILENAME = "empirically_measured_model_speeds.json"
# Exclude models known not to work well with simple completion or require specific setup
EXCLUDED_MODELS_BY_DEFAULT = [
"mistralai/mistral-nemo", # Often requires specific setup/endpoint
# Add others if they consistently cause issues in this simple test
]
DEFAULT_MODELS_TO_TEST = [
m for m in COST_PER_MILLION_TOKENS.keys() if m not in EXCLUDED_MODELS_BY_DEFAULT
]
# Re-introduce the provider extraction logic
def extract_provider_model(model_identifier: str) -> tuple[str | None, str]:
"""Extracts provider and model name, always returning the model name without the prefix."""
model_identifier = model_identifier.strip()
provider: str | None = None
model_name_only: str = model_identifier # Start with the original identifier
# 1. Check for explicit provider prefix (using /)
known_providers = [p.value for p in Provider] # Get list of known providers
if '/' in model_identifier:
parts = model_identifier.split('/', 1)
# Patch: If the model is an OpenRouter model like 'mistralai/mistral-nemo', treat as openrouter
if model_identifier.startswith('mistralai/') or model_identifier == 'mistralai/mistral-nemo':
provider = Provider.OPENROUTER.value
model_name_only = model_identifier
elif len(parts) == 2 and parts[0] in known_providers and parts[1]:
provider = parts[0]
model_name_only = parts[1]
# Handle potential nested OpenRouter names like openrouter/mistralai/mistral-7b
# The current split('/', 1) already achieves this.
else:
# It has a slash, but doesn't match known provider format
logger.warning(f"Invalid or unknown provider prefix in '{model_identifier}'. Cannot extract provider reliably.")
return None, model_identifier # Return original identifier if prefix is invalid
# 2. Infer provider from model name pattern if no prefix was found
if provider is None:
if model_identifier.startswith('claude-'):
provider = Provider.ANTHROPIC.value
elif model_identifier.startswith('gemini-'):
provider = Provider.GEMINI.value
elif model_identifier.startswith('deepseek-'):
provider = Provider.DEEPSEEK.value
elif model_identifier.startswith('grok-'): # Added Grok
provider = Provider.GROK.value
# Add other inferences if necessary
# Assume OpenAI if it looks like an OpenAI model (common short names or gpt- prefix)
openai_short_names = [
'gpt-4o', 'gpt-4o-mini', 'gpt-4.1', 'gpt-4.1-mini', 'gpt-4.1-nano',
'o1-preview', 'o3-mini', 'gpt-3.5-turbo'
]
if provider is None and (model_identifier in openai_short_names or model_identifier.startswith('gpt-')):
provider = Provider.OPENAI.value
# If provider was inferred, model_name_only is already correct (the original identifier)
# 3. Return provider and model_name_only (which has prefix removed if found)
if provider:
# Log the extracted provider and model name for clarity during debugging
logger.debug(f"Extracted Provider: {provider}, Model Name: {model_name_only} from Input: {model_identifier}")
return provider, model_name_only
else:
# If provider couldn't be determined even after inference
logger.error(f"Could not determine provider for '{model_identifier}'. Skipping measurement.")
return None, model_identifier # Return original identifier as model_name if provider is unknown
async def measure_speed(model_identifier: str, prompt: str, tracker: CostTracker) -> Dict[str, Any]:
"""Measures the completion speed for a single model by calling the tool directly."""
result_data: Dict[str, Any] = {}
# Extract provider and model name using the helper
provider, model_name = extract_provider_model(model_identifier)
if provider is None:
# Skip if provider could not be determined
return {"error": f"Could not determine provider for '{model_identifier}'", "error_code": "INVALID_PARAMETER"}
# logger.info(f"Testing model {provider}/{model_name}...", emoji_key="timer") # Progress bar shows this
try:
start_time = time.monotonic()
# Call generate_completion with explicit provider and model name
result = await generate_completion(
provider=provider, # Pass the determined provider
model=model_name, # Pass the model name (without prefix)
prompt=prompt,
# Optional: max_tokens=500
)
end_time = time.monotonic()
if result and isinstance(result, dict) and result.get("success"):
# Track cost for successful calls
tracker.add_call(result)
processing_time = result.get("processing_time")
if processing_time is None:
processing_time = end_time - start_time
output_tokens = result.get("tokens", {}).get("output", 0)
if processing_time > 0 and output_tokens > 0:
tokens_per_second = output_tokens / processing_time
result_data = {
"total_time_s": round(processing_time, 3),
"output_tokens": output_tokens,
"output_tokens_per_second": round(tokens_per_second, 2),
}
elif output_tokens == 0:
logger.warning(f"Warning: {model_identifier} - Completed but generated 0 output tokens.", emoji_key="warning")
result_data = {"error": "Completed with 0 output tokens", "total_time_s": round(processing_time, 3)}
else:
logger.warning(f"Warning: {model_identifier} - Processing time reported as {processing_time:.4f}s. Cannot calculate tokens/s reliably.", emoji_key="warning")
result_data = {"error": "Processing time too low to calculate speed", "total_time_s": round(processing_time, 3)}
else:
manual_time = end_time - start_time
error_message = result.get("error", "Unknown error or unexpected result format")
error_code = result.get("error_code", "UNKNOWN_ERROR")
logger.error(f"Error: {model_identifier} - Tool call failed. Manual Time: {manual_time:.2f}s. Error: {error_message} ({error_code})", emoji_key="error")
result_data = {"error": error_message, "error_code": error_code, "manual_time_s": round(manual_time, 3)}
except ProviderError as e:
logger.error(f"Error: {model_identifier} ({provider}) - Provider Error: {e}", emoji_key="error", exc_info=False)
result_data = {"error": str(e), "error_code": getattr(e, 'error_code', 'PROVIDER_ERROR')}
except ToolError as e:
logger.error(f"Error: {model_identifier} ({provider}) - Tool Error: {e}", emoji_key="error", exc_info=False)
result_data = {"error": str(e), "error_code": getattr(e, 'error_code', 'TOOL_ERROR')}
except Exception as e:
logger.error(f"Error: {model_identifier} ({provider}) - Unexpected error: {e}", emoji_key="error", exc_info=True)
result_data = {"error": f"Unexpected error: {str(e)}"}
return result_data
async def main(models_to_test: List[str], output_file: str, prompt: str):
"""Main function to run speed tests and save results."""
logger.info("Starting LLM speed measurement script...", emoji_key="rocket")
tracker = CostTracker() # Instantiate tracker
results: Dict[str, Dict[str, Any]] = {}
# Use Rich Progress bar
with Progress(
SpinnerColumn(),
"[progress.description]{task.description}",
BarColumn(),
"[progress.percentage]{task.percentage:>3.0f}%",
TimeElapsedColumn(),
TextColumn("[bold green]{task.completed} done"),
console=console,
transient=False, # Keep progress bar after completion
) as progress:
task = progress.add_task("[cyan]Measuring speeds...", total=len(models_to_test))
for model_id in models_to_test:
progress.update(task, description=f"[cyan]Measuring speeds... [bold yellow]({model_id})[/]")
if not model_id or not isinstance(model_id, str):
logger.warning(f"Skipping invalid model entry: {model_id}")
progress.update(task, advance=1)
continue
results[model_id] = await measure_speed(model_id, prompt, tracker)
progress.update(task, advance=1)
# await asyncio.sleep(0.1) # Reduce sleep time if desired
# --- Display Results Table ---
table = Table(title="LLM Speed Measurement Results", show_header=True, header_style="bold magenta")
table.add_column("Model", style="dim cyan", width=40)
table.add_column("Time (s)", justify="right", style="green")
table.add_column("Output Tokens", justify="right", style="blue")
table.add_column("Tokens/s", justify="right", style="bold yellow")
table.add_column("Status/Error", style="red")
for model_id, data in sorted(results.items()):
if "error" in data:
status = f"Error: {data['error']}"
if 'error_code' in data:
status += f" ({data['error_code']})"
time_s = data.get("total_time_s") or data.get("manual_time_s")
time_str = f"{time_s:.2f}" if time_s is not None else "-"
table.add_row(model_id, time_str, "-", "-", status)
else:
table.add_row(
model_id,
f"{data.get('total_time_s', 0):.2f}",
str(data.get('output_tokens', '-')),
f"{data.get('output_tokens_per_second', 0):.2f}",
"Success"
)
console.print(table)
# Display cost summary
tracker.display_summary(console)
# --- Save Results --- (Saving logic remains the same)
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
output_path = os.path.join(project_root, output_file)
logger.info(f"Saving results to: {output_path}", emoji_key="save")
try:
with open(output_path, 'w') as f:
json.dump(results, f, indent=4)
logger.info("Results saved successfully.", emoji_key="success")
except IOError as e:
logger.error(f"Failed to write results to {output_path}: {e}", emoji_key="error", exc_info=True)
console.print(f"[bold red]Error:[/bold red] Could not write results to {output_path}. Check permissions. Details: {e}")
logger.info("Speed measurement script finished.", emoji_key="checkered_flag")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Measure LLM completion speeds.")
parser.add_argument(
"--models",
nargs='+',
default=DEFAULT_MODELS_TO_TEST,
help="Space-separated list of models to test (e.g., openai/gpt-4o-mini anthropic/claude-3-5-haiku-20241022). Defaults to available models."
)
parser.add_argument(
"--output",
default=DEFAULT_OUTPUT_FILENAME,
help=f"Output JSON filename. Defaults to {DEFAULT_OUTPUT_FILENAME} in the project root."
)
parser.add_argument(
"--prompt",
default=DEFAULT_PROMPT,
help="The prompt to use for testing."
)
args = parser.parse_args()
if not args.models or not all(isinstance(m, str) and m for m in args.models):
console.print("[bold red]Error:[/bold red] Invalid --models argument. Please provide a list of non-empty model names.")
exit(1)
models_unique = sorted(list(set(args.models)))
# Use Rich print for startup info
console.print("[bold blue]--- LLM Speed Measurement ---[/bold blue]")
console.print(f"Models to test ({len(models_unique)}): [cyan]{', '.join(models_unique)}[/cyan]")
console.print(f"Output file: [green]{args.output}[/green]")
console.print(f"Prompt length: {len(args.prompt)} characters")
console.print("[bold blue]-----------------------------[/bold blue]")
asyncio.run(main(models_unique, args.output, args.prompt))
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/deepseek.py:
--------------------------------------------------------------------------------
```python
"""DeepSeek provider implementation."""
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from openai import AsyncOpenAI
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.utils import get_logger
# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.deepseek")
class DeepSeekProvider(BaseProvider):
"""Provider implementation for DeepSeek API (using OpenAI-compatible interface)."""
provider_name = Provider.DEEPSEEK.value
def __init__(self, api_key: Optional[str] = None, **kwargs):
"""Initialize the DeepSeek provider.
Args:
api_key: DeepSeek API key
**kwargs: Additional options
"""
super().__init__(api_key=api_key, **kwargs)
self.base_url = kwargs.get("base_url", "https://api.deepseek.com")
self.models_cache = None
async def initialize(self) -> bool:
"""Initialize the DeepSeek client.
Returns:
bool: True if initialization was successful
"""
try:
# DeepSeek uses OpenAI-compatible API
self.client = AsyncOpenAI(
api_key=self.api_key,
base_url=self.base_url,
)
self.logger.success(
"DeepSeek provider initialized successfully",
emoji_key="provider"
)
return True
except Exception as e:
self.logger.error(
f"Failed to initialize DeepSeek provider: {str(e)}",
emoji_key="error"
)
return False
async def generate_completion(
self,
prompt: Optional[str] = None,
messages: Optional[List[Dict[str, Any]]] = None,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
json_mode: bool = False,
**kwargs
) -> ModelResponse:
"""Generate a completion using DeepSeek's API.
Args:
prompt: Text prompt to send to the model (optional if messages provided)
messages: List of message dictionaries (optional if prompt provided)
model: Model name to use
max_tokens: Maximum tokens to generate
temperature: Temperature parameter (0.0-1.0)
json_mode: If True, attempt to generate JSON output
**kwargs: Additional parameters
Returns:
ModelResponse with the completion result
"""
if not self.client:
await self.initialize()
# Verify we have either prompt or messages
if prompt is None and not messages:
raise ValueError("Either prompt or messages must be provided")
# Use default model if not specified
model = model or self.get_default_model()
# Prepare API parameters
if messages:
# Using chat completion with messages
params = {
"model": model,
"messages": messages,
"temperature": temperature
}
else:
# Using completion with prompt
# Convert prompt to messages format for DeepSeek
params = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature
}
# Add max_tokens if provided
if max_tokens is not None:
params["max_tokens"] = max_tokens
# Handle JSON mode via response_format for compatible models
if json_mode:
params["response_format"] = {"type": "json_object"}
self.logger.debug("Setting response_format to JSON mode for DeepSeek")
# Add any remaining parameters
for key, value in kwargs.items():
if key not in params:
params[key] = value
# Log request parameters
prompt_length = len(prompt) if prompt else sum(len(m.get("content", "")) for m in messages)
self.logger.info(
f"Generating completion with DeepSeek model {model}",
emoji_key=self.provider_name,
prompt_length=prompt_length,
json_mode=json_mode
)
try:
# Start timer
start_time = time.time()
# Make API call
response = await self.client.chat.completions.create(**params)
# Calculate processing time
processing_time = time.time() - start_time
# Extract text from response
completion_text = response.choices[0].message.content
# Create ModelResponse
result = ModelResponse(
text=completion_text,
model=f"{self.provider_name}/{model}",
provider=self.provider_name,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens,
processing_time=processing_time,
raw_response=response
)
# Add message for compatibility with chat_completion
result.message = {"role": "assistant", "content": completion_text}
# Log success
self.logger.success(
"DeepSeek completion successful",
emoji_key="success",
model=model,
tokens={"input": result.input_tokens, "output": result.output_tokens},
cost=result.cost,
time=processing_time
)
return result
except Exception as e:
# Log error
self.logger.error(
f"DeepSeek completion failed: {str(e)}",
emoji_key="error",
model=model
)
raise
async def generate_completion_stream(
self,
prompt: Optional[str] = None,
messages: Optional[List[Dict[str, Any]]] = None,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
json_mode: bool = False,
**kwargs
) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
"""Generate a streaming completion using DeepSeek.
Args:
prompt: Text prompt to send to the model (optional if messages provided)
messages: List of message dictionaries (optional if prompt provided)
model: Model name to use
max_tokens: Maximum tokens to generate
temperature: Temperature parameter (0.0-1.0)
json_mode: If True, attempt to generate JSON output
**kwargs: Additional parameters
Yields:
Tuples of (text_chunk, metadata)
"""
if not self.client:
await self.initialize()
# Verify we have either prompt or messages
if prompt is None and not messages:
raise ValueError("Either prompt or messages must be provided")
# Use default model if not specified
model = model or self.get_default_model()
# Prepare API parameters
if messages:
# Using chat completion with messages
params = {
"model": model,
"messages": messages,
"temperature": temperature,
"stream": True
}
else:
# Using completion with prompt
# Convert prompt to messages format for DeepSeek
params = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature,
"stream": True
}
# Add max_tokens if provided
if max_tokens is not None:
params["max_tokens"] = max_tokens
# Handle JSON mode via response_format for compatible models
if json_mode:
params["response_format"] = {"type": "json_object"}
self.logger.debug("Setting response_format to JSON mode for DeepSeek streaming")
# Add any remaining parameters
for key, value in kwargs.items():
if key not in params and key != "stream": # Don't allow overriding stream
params[key] = value
# Log request parameters
prompt_length = len(prompt) if prompt else sum(len(m.get("content", "")) for m in messages)
self.logger.info(
f"Generating streaming completion with DeepSeek model {model}",
emoji_key=self.provider_name,
prompt_length=prompt_length,
json_mode=json_mode
)
start_time = time.time()
total_chunks = 0
try:
# Make streaming API call
stream = await self.client.chat.completions.create(**params)
# Process the stream
async for chunk in stream:
total_chunks += 1
# Extract content from the chunk
delta = chunk.choices[0].delta
content = delta.content or ""
# Metadata for this chunk
metadata = {
"model": f"{self.provider_name}/{model}",
"provider": self.provider_name,
"chunk_index": total_chunks,
"finish_reason": chunk.choices[0].finish_reason,
}
yield content, metadata
# Log success
processing_time = time.time() - start_time
self.logger.success(
"DeepSeek streaming completion successful",
emoji_key="success",
model=model,
chunks=total_chunks,
time=processing_time
)
# Yield final metadata chunk
final_metadata = {
"model": f"{self.provider_name}/{model}",
"provider": self.provider_name,
"chunk_index": total_chunks + 1,
"processing_time": processing_time,
"finish_reason": "stop"
}
yield "", final_metadata
except Exception as e:
processing_time = time.time() - start_time
self.logger.error(
f"DeepSeek streaming completion failed: {str(e)}",
emoji_key="error",
model=model
)
# Yield error metadata
error_metadata = {
"model": f"{self.provider_name}/{model}",
"provider": self.provider_name,
"chunk_index": total_chunks + 1,
"error": f"{type(e).__name__}: {str(e)}",
"processing_time": processing_time,
"finish_reason": "error"
}
yield "", error_metadata
async def list_models(self) -> List[Dict[str, Any]]:
"""List available DeepSeek models.
Returns:
List of model information dictionaries
"""
# DeepSeek doesn't have a comprehensive models endpoint, so we return a static list
if self.models_cache:
return self.models_cache
models = [
{
"id": "deepseek-chat",
"provider": self.provider_name,
"description": "General-purpose chat model",
},
{
"id": "deepseek-reasoner",
"provider": self.provider_name,
"description": "Enhanced reasoning capabilities",
},
]
# Cache results
self.models_cache = models
return models
def get_default_model(self) -> str:
"""Get the default DeepSeek model.
Returns:
Default model name
"""
from ultimate_mcp_server.config import get_config
# Safely get from config if available
try:
config = get_config()
provider_config = getattr(config, 'providers', {}).get(self.provider_name, None)
if provider_config and provider_config.default_model:
return provider_config.default_model
except (AttributeError, TypeError):
# Handle case when providers attribute doesn't exist or isn't a dict
pass
# Otherwise return hard-coded default
return "deepseek-chat"
async def check_api_key(self) -> bool:
"""Check if the DeepSeek API key is valid.
Returns:
bool: True if API key is valid
"""
try:
# Try a simple completion to validate the API key
await self.client.chat.completions.create(
model=self.get_default_model(),
messages=[{"role": "user", "content": "Hello"}],
max_tokens=1,
)
return True
except Exception:
return False
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/state_store.py:
--------------------------------------------------------------------------------
```python
import asyncio
import os
import pickle
from typing import Any, Dict, Optional
import aiofiles
class StateStore:
"""
Thread-safe, async-compatible state management system with optional persistence.
The StateStore provides a robust solution for managing application state in asynchronous
environments. It organizes data into namespaces, each containing key-value pairs, and
provides thread-safe access through asyncio.Lock-based concurrency control.
Key features:
- Namespace-based organization to separate different types of state data
- Thread-safe async methods for all operations (get, set, delete)
- Optional persistence to disk with automatic load/save
- Granular locking per namespace to maximize concurrency
- Graceful handling of corrupted or missing persistent data
Usage example:
```python
# Initialize with persistence
store = StateStore(persistence_dir="./state")
# Store values
await store.set("user_preferences", "theme", "dark")
await store.set("session_data", "user_id", 12345)
# Retrieve values (with default if missing)
theme = await store.get("user_preferences", "theme", default="light")
user_id = await store.get("session_data", "user_id", default=None)
# Delete values
await store.delete("session_data", "temp_token")
```
The StateStore is used internally by the Ultimate MCP Server to maintain state
across multiple tools and components, and is exposed to tools via the
with_state_management decorator.
"""
def __init__(self, persistence_dir: Optional[str] = None):
"""
Initialize a new StateStore instance.
The StateStore provides a thread-safe, async-compatible key-value store organized
by namespaces. It supports both in-memory operation and optional persistence to disk.
The store is designed for use in multi-threaded or async applications where state
needs to be shared safely between components.
Each namespace acts as a separate dictionary with its own concurrency protection.
Operations within a namespace are serialized using asyncio.Lock, while operations
across different namespaces can proceed concurrently.
Args:
persistence_dir: Optional directory path where state data will be persisted.
If provided, each namespace will be stored as a separate pickle
file in this directory. If None, the store operates in memory-only
mode and state is lost when the application stops.
Notes:
- The directory will be created if it doesn't exist
- Each namespace is persisted as a separate file named "{namespace}.pickle"
- Data is serialized using Python's pickle module, so stored values should be
pickle-compatible
- No automatic cleanup of old or unused namespaces is performed
"""
self._in_memory_store: Dict[str, Dict[str, Any]] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._persistence_dir = persistence_dir
if persistence_dir and not os.path.exists(persistence_dir):
os.makedirs(persistence_dir)
def _get_lock(self, namespace: str) -> asyncio.Lock:
"""
Get or create an asyncio.Lock for a specific namespace.
This private method manages the locks used for concurrency control. It maintains
a dictionary of locks keyed by namespace name, creating new locks as needed.
This ensures that operations on the same namespace are properly serialized to
prevent race conditions, while allowing operations on different namespaces to
proceed concurrently.
Args:
namespace: Name of the namespace for which to get or create a lock
Returns:
An asyncio.Lock instance specific to the requested namespace
Notes:
- Each namespace gets its own independent lock
- Locks are created on-demand when a namespace is first accessed
- Locks persist for the lifetime of the StateStore instance
- This method is called by all public methods (get, set, delete) to
ensure thread-safe access to namespaces
"""
if namespace not in self._locks:
self._locks[namespace] = asyncio.Lock()
return self._locks[namespace]
async def get(self, namespace: str, key: str, default: Any = None) -> Any:
"""
Retrieve a value from the state store with thread-safe access control.
This method provides a concurrency-safe way to retrieve state data from the specified
namespace. If the namespace doesn't exist in memory, it attempts to load it from disk
(if persistence is enabled) before returning the requested value or default.
Retrieval behavior:
- The method first acquires a lock for the specified namespace to ensure thread safety
- If the namespace is not in memory, it attempts to load it from disk if persistence is enabled
- If the namespace can't be loaded or doesn't exist, an empty namespace is created
- Returns the value for the specified key, or the default value if the key is not found
Args:
namespace: Logical grouping for related state data (e.g., "tools", "user_settings")
key: Unique identifier within the namespace for the data to retrieve
default: Value to return if the key is not found in the namespace
Returns:
The stored value if found, otherwise the default value
Notes:
- Acquiring the namespace lock is an async operation and may block if another
operation is currently accessing the same namespace
- If persistence is enabled, this method may perform disk I/O when a namespace
needs to be loaded from disk
"""
async with self._get_lock(namespace):
if namespace not in self._in_memory_store:
# Try to load from disk if persistence is enabled
if self._persistence_dir:
await self._load_namespace(namespace)
else:
self._in_memory_store[namespace] = {}
return self._in_memory_store[namespace].get(key, default)
async def set(self, namespace: str, key: str, value: Any) -> None:
"""
Store a value in the state store with thread-safe access control.
This method provides a concurrency-safe way to store state data in the specified namespace.
The implementation uses asyncio.Lock to ensure that concurrent access to the same namespace
doesn't lead to race conditions or data corruption.
Storage behavior:
- Values are first stored in an in-memory dictionary
- If persistence_dir is configured, values are also immediately persisted to disk
- Each namespace is stored as a separate pickle file
- Values can be any pickle-serializable Python object
Args:
namespace: Logical grouping for related state data (e.g., "tools", "user_settings")
key: Unique identifier within the namespace for this piece of data
value: Any pickle-serializable value to store
Notes:
- Acquiring the namespace lock is an async operation and may block if another
operation is currently accessing the same namespace
- If persistence is enabled, this method performs disk I/O which could take time
depending on the value size and disk performance
"""
async with self._get_lock(namespace):
if namespace not in self._in_memory_store:
self._in_memory_store[namespace] = {}
self._in_memory_store[namespace][key] = value
# Persist immediately if enabled
if self._persistence_dir:
await self._persist_namespace(namespace)
async def delete(self, namespace: str, key: str) -> None:
"""
Delete a value from the state store with thread-safe access control.
This method safely removes a key-value pair from the specified namespace,
and optionally persists the change to disk if persistence is enabled. The
operation is concurrency-safe through the use of namespace-specific locks.
Deletion behavior:
- The method first acquires a lock for the specified namespace to ensure thread safety
- If the namespace doesn't exist or the key is not found, the operation is a no-op
- If persistence is enabled, the updated namespace state is written to disk
immediately after deletion
Args:
namespace: Logical grouping for related state data (e.g., "tools", "user_settings")
key: Unique identifier within the namespace for the data to delete
Notes:
- Acquiring the namespace lock is an async operation and may block if another
operation is currently accessing the same namespace
- If persistence is enabled, this method performs disk I/O when persisting
the updated namespace after deletion
- This method does not raise an exception if the key doesn't exist in the namespace
"""
async with self._get_lock(namespace):
if namespace in self._in_memory_store and key in self._in_memory_store[namespace]:
del self._in_memory_store[namespace][key]
# Persist the change if enabled
if self._persistence_dir:
await self._persist_namespace(namespace)
async def _persist_namespace(self, namespace: str) -> None:
"""
Persist a namespace's data to disk as a pickle file.
This private method handles the actual disk I/O for saving state data. It serializes
the entire namespace dictionary to a pickle file named after the namespace in the
configured persistence directory.
Args:
namespace: Name of the namespace whose data should be persisted
Notes:
- This method is a no-op if persistence_dir is not configured
- Uses aiofiles for non-blocking async file I/O
- The file is named "{namespace}.pickle" and stored in the persistence_dir
- The entire namespace is serialized in a single operation, which may be
inefficient for very large namespaces
- This method is called internally by set() and delete() methods
after modifying namespace data
"""
if not self._persistence_dir:
return
file_path = os.path.join(self._persistence_dir, f"{namespace}.pickle")
async with aiofiles.open(file_path, 'wb') as f:
await f.write(pickle.dumps(self._in_memory_store[namespace]))
async def _load_namespace(self, namespace: str) -> None:
"""
Load a namespace's data from disk into memory.
This private method handles loading serialized state data from disk into the in-memory store.
It is called automatically by the get() method when a namespace is requested but not yet
loaded in memory. The method implements the lazy-loading pattern, only reading from disk
when necessary.
The loading process follows these steps:
1. Check if persistence is enabled; if not, initialize an empty namespace dictionary
2. Locate the pickle file for the namespace (named "{namespace}.pickle")
3. If the file doesn't exist, initialize an empty namespace dictionary
4. If the file exists, read and deserialize it using pickle
5. Handle potential serialization errors gracefully (corrupted files, version mismatches)
Args:
namespace: Name of the namespace whose data should be loaded. This corresponds
directly to a "{namespace}.pickle" file in the persistence directory.
Returns:
None: The method modifies the internal self._in_memory_store dictionary directly.
Notes:
- Uses aiofiles for non-blocking async file I/O
- In case of corrupt data (pickle errors), the namespace is initialized as empty
rather than raising exceptions to the caller
- Example of file path: /path/to/persistence_dir/user_settings.pickle for the
"user_settings" namespace
- This method is idempotent - calling it multiple times for the same namespace
has no additional effect after the first call
Examples:
```python
# This method is called internally by get(), not typically called directly
store = StateStore(persistence_dir="./state")
# When this executes, _load_namespace("user_settings") will be called internally
# if the namespace is not already in memory
value = await store.get("user_settings", "theme")
```
"""
if not self._persistence_dir:
self._in_memory_store[namespace] = {}
return
file_path = os.path.join(self._persistence_dir, f"{namespace}.pickle")
if not os.path.exists(file_path):
self._in_memory_store[namespace] = {}
return
try:
async with aiofiles.open(file_path, 'rb') as f:
data = await f.read()
self._in_memory_store[namespace] = pickle.loads(data)
except (pickle.PickleError, EOFError):
# Handle corrupt data
self._in_memory_store[namespace] = {}
```
--------------------------------------------------------------------------------
/resource_annotations.py:
--------------------------------------------------------------------------------
```python
"""
Resource annotations for Model Control Protocol (MCP) systems.
This module implements the resource annotation system specified in the MCP protocol,
which enables AI systems to make intelligent decisions about how to process, prioritize,
and present different types of resources in multi-modal and multi-resource contexts.
Resource annotations serve multiple critical functions in AI/LLM systems:
1. PRIORITIZATION: Help AI systems allocate attention optimally among multiple resources
when token constraints prevent processing everything (e.g., which document to focus on)
2. VISIBILITY CONTROL: Determine which resources should be visible to different actors
in the system (e.g., assistant-only resources vs. user-facing resources)
3. FORMAT PRESERVATION: Indicate when resources have structured formats that should be
maintained (e.g., code, tables, JSON) rather than freely interpreted
4. CHUNKING GUIDANCE: Provide hints about how to divide large resources efficiently
for processing within context window constraints
The module provides:
- The ResourceAnnotations class for creating annotation metadata
- Pre-defined annotation templates for common resource types
- Utilities for working with annotated resources (e.g., chunking)
Usage example:
```python
# Create custom annotations for a research paper
paper_annotations = ResourceAnnotations(
priority=0.8,
audience=["assistant"],
chunking_recommended=True,
description="Research paper on quantum computing effects"
)
# Annotate and chunk a large document
paper_content = open("quantum_paper.txt").read()
chunks = format_chunked_content(paper_content, chunk_size=3000)
# Use a predefined annotation template for code
code_resource = {
"content": "def calculate_entropy(data):\\n ...",
"annotations": CODE_RESOURCE.to_dict()
}
```
These annotations integrate with the MCP protocol to help LLMs process resources
more intelligently and efficiently in complex, multi-resource scenarios.
"""
from typing import List, Optional
class ResourceAnnotations:
"""
Annotations that guide LLMs in handling and prioritizing resources within the MCP protocol.
ResourceAnnotations provide crucial metadata that helps LLMs make intelligent decisions about:
- IMPORTANCE: How critical a resource is to the current task (via priority)
- AUDIENCE: Who should see or interact with the resource
- FORMATTING: How the resource should be rendered or processed
- CHUNKING: Whether and how to divide large resources into manageable pieces
These annotations serve multiple purposes in the MCP ecosystem:
1. Help LLMs prioritize which resources to analyze first when multiple are available
2. Control visibility of resources between assistants and users
3. Preserve structural integrity of formatted content (code, tables, etc.)
4. Provide chunking guidance for efficient processing of large resources
When resources are annotated appropriately, LLMs can make better decisions about:
- Which resources deserve the most attention in token-constrained contexts
- When to preserve formatting vs. when content structure is less important
- How to efficiently process large documents while maintaining context
- Whether certain resources are meant for the assistant's understanding only
Usage example:
```python
# For a source code file that should preserve formatting
code_annotations = ResourceAnnotations(
priority=0.8, # High importance
audience=["assistant"], # Only the assistant needs to see this
structured_format=True, # Preserve code formatting
chunking_recommended=True, # Chunk if large
max_recommended_chunk_size=2000,
description="Python source code implementing the core algorithm"
)
# Apply annotations to a resource
resource = {
"id": "algorithm.py",
"content": "def calculate(x, y):\n return x + y",
"annotations": code_annotations.to_dict()
}
```
"""
def __init__(
self,
priority: float = 0.5,
audience: List[str] = None,
structured_format: bool = False,
chunking_recommended: bool = False,
max_recommended_chunk_size: Optional[int] = None,
description: Optional[str] = None
):
"""
Initialize resource annotations.
Args:
priority: How important this resource is (0.0-1.0, higher is more important).
0.0 = entirely optional, 1.0 = effectively required.
Affects how much attention an LLM should give this resource when multiple
resources are available but context limits prevent using all of them.
Default: 0.5 (medium importance)
audience: Who should see this resource, as a list of roles:
- "assistant": The AI assistant should process this resource
- "user": The human user should see this resource
Both can be specified for resources relevant to both parties.
Default: ["assistant"] (assistant-only)
structured_format: Whether this resource has a structured format that
should be preserved (e.g., code, JSON, tables). When True, the LLM should
maintain the exact formatting, indentation, and structure of the content.
Default: False
chunking_recommended: Whether this resource should be chunked if large.
Setting this to True signals that the content is suitable for being
divided into smaller pieces for processing (e.g., long documents).
Default: False
max_recommended_chunk_size: Maximum recommended chunk size in characters.
Provides guidance on how large each chunk should be if chunking is applied.
Default: None (no specific recommendation)
description: Optional description of the resource that provides context
about its purpose, content, or importance.
"""
self.priority = max(0.0, min(1.0, priority)) # Clamp between 0 and 1
self.audience = audience or ["assistant"]
self.structured_format = structured_format
self.chunking_recommended = chunking_recommended
self.max_recommended_chunk_size = max_recommended_chunk_size
self.description = description
def to_dict(self) -> dict:
"""Convert annotations to dictionary for MCP protocol."""
result = {
"priority": self.priority,
"audience": self.audience
}
# Add extended properties
if self.description:
result["description"] = self.description
# Add chunking metadata if recommended
if self.chunking_recommended:
result["chunking"] = {
"recommended": True
}
if self.max_recommended_chunk_size:
result["chunking"]["maxSize"] = self.max_recommended_chunk_size
# Add format information
if self.structured_format:
result["format"] = {
"structured": True
}
return result
# Pre-defined annotation templates for common resource types
# For critical resources that need immediate attention
# Use for resources essential to the current task's success
# Examples: Primary task instructions, critical context documents
HIGH_PRIORITY_RESOURCE = ResourceAnnotations(
priority=0.9,
audience=["assistant", "user"],
description="Critical resource that should be prioritized"
)
# For source code and programming-related content
# Preserves indentation, formatting, and structure
# Recommends chunking for large codebases
# Examples: Source files, configuration files, scripts
CODE_RESOURCE = ResourceAnnotations(
priority=0.8,
audience=["assistant"],
structured_format=True,
chunking_recommended=True,
max_recommended_chunk_size=2000,
description="Source code that should preserve formatting"
)
# For lengthy text resources that should be divided into smaller parts
# Good for processing long documents without overwhelming context windows
# Examples: Articles, documentation, books, long explanations
LARGE_TEXT_RESOURCE = ResourceAnnotations(
priority=0.6,
audience=["assistant"],
chunking_recommended=True,
max_recommended_chunk_size=4000,
description="Large text that should be chunked for processing"
)
# For data formats where structure is important
# Preserves formatting but doesn't automatically suggest chunking
# Examples: JSON data, database records, tabular data, XML
STRUCTURED_DATA_RESOURCE = ResourceAnnotations(
priority=0.7,
audience=["assistant"],
structured_format=True,
description="Structured data like JSON or tables"
)
# For supplementary information that provides additional context
# Low priority indicates it can be skipped if context is limited
# Examples: Background information, history, tangential details
OPTIONAL_RESOURCE = ResourceAnnotations(
priority=0.2,
audience=["assistant"],
description="Supplementary information that isn't critical"
)
# For content meant to be shown to the user directly
# Not intended for assistant's processing (assistant not in audience)
# Examples: Final results, generated content, presentations
USER_FACING_RESOURCE = ResourceAnnotations(
priority=0.7,
audience=["user"],
description="Resource meant for user consumption"
)
def format_chunked_content(content: str, chunk_size: int = 4000, overlap: int = 200) -> List[dict]:
"""
Format content into overlapping chunks with rich metadata for efficient LLM processing.
This utility function implements a sliding window approach to divide large content
into manageable, context-aware chunks. Each chunk is annotated with detailed positioning
metadata, allowing LLMs to understand the chunk's relationship to the overall content
and maintain coherence across chunk boundaries.
Key features:
- Consistent overlap between chunks preserves context and prevents information loss
- Automatic metadata generation provides LLMs with crucial positioning information
- Standard annotation format compatible with the MCP resource protocol
- Configurable chunk size to adapt to different model context window limitations
The overlap between chunks is particularly important as it helps LLMs maintain
coherence when processing information that spans chunk boundaries. Without overlap,
context might be lost at chunk transitions, leading to degraded performance on tasks
that require understanding the full content.
Args:
content: The source text content to be chunked. This can be any string content
like a document, article, code file, or other text-based resource.
chunk_size: Maximum size of each chunk in characters (default: 4000).
This should be set based on the target LLM's context window limitations,
typically 25-50% less than the model's maximum to allow room for prompts.
overlap: Number of characters to overlap between consecutive chunks (default: 200).
Larger overlap values provide more context continuity between chunks but
increase redundancy and total token usage.
Returns:
List of dictionaries, each representing a content chunk with metadata:
- "text": The actual chunk content (substring of the original content)
- "annotations": Metadata dictionary containing:
- priority: Importance hint for the LLM (default: 0.7)
- audience: Who should see this chunk (default: ["assistant"])
- chunk_info: Detailed positioning metadata including:
- index: Zero-based index of this chunk in the sequence
- total_chunks: Total number of chunks in the complete content
- start_position: Character offset where this chunk begins in the original content
- end_position: Character offset where this chunk ends in the original content
- has_more: Boolean indicating if more chunks follow this one
Usage examples:
# Basic usage with default parameters
chunks = format_chunked_content("Long document text...")
# Using smaller chunks for models with limited context windows
small_chunks = format_chunked_content(
content="Large article text...",
chunk_size=1000,
overlap=100
)
# Process chunks sequentially while maintaining context
for chunk in chunks:
response = await generate_completion(
prompt=f"Analyze this text: {chunk['text']}",
# Include chunk metadata so the LLM understands context
additional_context=f"This is chunk {chunk['annotations']['chunk_info']['index']+1} "
f"of {chunk['annotations']['chunk_info']['total_chunks']}"
)
"""
chunks = []
# Create chunks with overlap
for i in range(0, len(content), chunk_size - overlap):
chunk_text = content[i:i + chunk_size]
if chunk_text:
# Create chunk with annotations
chunk = {
"text": chunk_text,
"annotations": {
"priority": 0.7,
"audience": ["assistant"],
"chunk_info": {
"index": len(chunks),
"total_chunks": (len(content) + chunk_size - 1) // (chunk_size - overlap),
"start_position": i,
"end_position": min(i + chunk_size, len(content)),
"has_more": i + chunk_size < len(content)
}
}
}
chunks.append(chunk)
return chunks
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/__init__.py:
--------------------------------------------------------------------------------
```python
"""MCP Tools for Ultimate MCP Server."""
import inspect
import sys
from typing import Any, Dict
from ultimate_mcp_server.tools.base import (
BaseTool, # Keep BaseTool in case other modules use it
register_tool,
with_error_handling,
with_retry,
with_tool_metrics,
)
from ultimate_mcp_server.utils import get_logger
# from .audio_transcription import (
# chat_with_transcript,
# extract_audio_transcript_key_points,
# transcribe_audio,
# )
# Import base decorators/classes that might be used by other tool modules
from .completion import chat_completion, generate_completion, multi_completion, stream_completion
from .document_conversion_and_processing import (
analyze_pdf_structure,
batch_format_texts,
canonicalise_entities,
chunk_document,
clean_and_format_text_as_markdown,
convert_document,
detect_content_type,
enhance_ocr_text,
extract_entities,
extract_metrics,
extract_tables,
flag_risks,
generate_qa_pairs,
identify_sections,
ocr_image,
optimize_markdown_formatting,
process_document_batch,
summarize_document,
)
# from .docstring_refiner import refine_tool_documentation
# from .entity_relation_graph import extract_entity_graph
# from .extraction import (
# extract_code_from_response,
# extract_json,
# extract_key_value_pairs,
# extract_semantic_schema,
# extract_table,
# )
from .filesystem import (
create_directory,
directory_tree,
edit_file,
get_file_info,
get_unique_filepath,
list_allowed_directories,
list_directory,
move_file,
read_file,
read_multiple_files,
search_files,
write_file,
)
from .local_text_tools import (
get_workspace_dir,
run_awk,
run_awk_stream,
run_jq,
run_jq_stream,
run_ripgrep,
run_ripgrep_stream,
run_sed,
run_sed_stream,
)
# from .marqo_fused_search import marqo_fused_search
# from .meta_api_tool import register_api_meta_tools
from .optimization import (
compare_models,
estimate_cost,
execute_optimized_workflow,
recommend_model,
)
from .provider import get_provider_status, list_models
from .python_sandbox import (
execute_python,
repl_python,
)
# from .rag import (
# add_documents,
# create_knowledge_base,
# delete_knowledge_base,
# generate_with_rag,
# list_knowledge_bases,
# retrieve_context,
# )
from .sentiment_analysis import analyze_business_sentiment, analyze_business_text_batch
# from .single_shot_synthesis import single_shot_synthesis
from .smart_browser import (
autopilot,
browse,
click,
collect_documentation,
download,
download_site_pdfs,
parallel,
run_macro,
search,
type_text,
)
# from .sql_databases import access_audit_log, execute_sql, explore_database, manage_database
# from .text_classification import text_classification
# from .text_redline_tools import (
# compare_documents_redline,
# create_html_redline,
# )
# from .tournament import (
# cancel_tournament,
# create_tournament,
# get_tournament_results,
# get_tournament_status,
# list_tournaments,
# )
from .unified_memory_system import (
add_tag_to_memory,
consolidate_memories,
create_embedding,
create_goal,
create_memory_link,
create_workflow,
decay_link_strengths,
diagnose_file_access_issues,
focus_memory,
generate_reflection,
generate_workflow_report,
get_artifact_by_id,
get_artifacts,
get_contradictions,
get_embedding,
get_goal_details,
get_linked_memories,
get_memory_by_id,
get_memory_metadata,
get_memory_tags,
get_recent_actions,
get_rich_context_package,
get_similar_memories,
get_subgraph,
get_thought_chain,
get_workflow_details,
get_workflow_metadata,
get_working_memory,
hybrid_search_memories,
load_cognitive_state,
optimize_working_memory,
promote_memory_level,
query_goals,
query_memories,
record_action_completion,
record_action_start,
record_artifact,
save_cognitive_state,
store_memory,
update_goal_status,
update_memory,
update_memory_link_metadata,
update_memory_metadata,
vector_similarity,
)
__all__ = [
# Base decorators/classes
"BaseTool",
"with_tool_metrics",
"with_retry",
"with_error_handling",
"register_tool",
# LLM Completion tools
"generate_completion",
"stream_completion",
"chat_completion",
"multi_completion",
"get_provider_status",
"list_models",
# Extraction tools
# "extract_json",
# "extract_table",
# "extract_key_value_pairs",
# "extract_semantic_schema",
# "extract_entity_graph",
# "extract_code_from_response",
# Knowledge base tools
# "create_knowledge_base",
# "list_knowledge_bases",
# "delete_knowledge_base",
# "add_documents",
# "retrieve_context",
# "generate_with_rag",
# "text_classification",
# Cost optimization tools
"estimate_cost",
"compare_models",
"recommend_model",
"execute_optimized_workflow",
"refine_tool_documentation",
# Filesystem tools
"read_file",
"read_multiple_files",
"write_file",
"edit_file",
"create_directory",
"list_directory",
"directory_tree",
"move_file",
"search_files",
"get_file_info",
"list_allowed_directories",
"get_unique_filepath",
# Local Text Tools
"run_ripgrep",
"run_awk",
"run_sed",
"run_jq",
"run_ripgrep_stream",
"run_awk_stream",
"run_sed_stream",
"run_jq_stream",
"get_workspace_dir",
# SQL databases tools
# "manage_database",
# "execute_sql",
# "explore_database",
# "access_audit_log",
# Python sandbox tools
"execute_python",
"repl_python",
# Smart Browser Standalone Functions
"click",
"browse",
"type_text",
"search",
"download",
"download_site_pdfs",
"collect_documentation",
"parallel",
"run_macro",
"autopilot",
# Document conversion and processing tools
"convert_document",
"chunk_document",
"clean_and_format_text_as_markdown",
"detect_content_type",
"batch_format_texts",
"optimize_markdown_formatting",
"identify_sections",
"generate_qa_pairs",
"summarize_document",
"extract_metrics",
"flag_risks",
"canonicalise_entities",
"ocr_image",
"enhance_ocr_text",
"analyze_pdf_structure",
"process_document_batch",
"extract_entities",
"extract_tables",
# Text Redline tools
# "compare_documents_redline",
# "create_html_redline",
# Meta API tools
# "register_api_meta_tools",
# Marqo tool
# "marqo_fused_search",
# Tournament tools
# "create_tournament",
# "get_tournament_status",
# "list_tournaments",
# "get_tournament_results",
# "cancel_tournament",
# Audio tools
# "transcribe_audio",
# "extract_audio_transcript_key_points",
# "chat_with_transcript",
# Sentiment analysis tool
"analyze_business_sentiment",
"analyze_business_text_batch",
# Unified Memory System tools
"create_workflow",
"get_workflow_details",
"record_action_start",
"record_action_completion",
"get_recent_actions",
"get_thought_chain",
"store_memory",
"get_memory_by_id",
"get_memory_metadata",
"get_memory_tags",
"update_memory_metadata",
"update_memory_link_metadata",
"create_memory_link",
"get_workflow_metadata",
"get_contradictions",
"query_memories",
"update_memory",
"get_linked_memories",
"add_tag_to_memory",
"create_embedding",
"get_embedding",
"get_working_memory",
"focus_memory",
"optimize_working_memory",
"promote_memory_level",
"save_cognitive_state",
"load_cognitive_state",
"decay_link_strengths",
"generate_reflection",
"get_rich_context_package",
"get_goal_details",
"create_goal",
"update_goal_status",
"vector_similarity",
"record_artifact",
"get_artifacts",
"get_artifact_by_id",
"get_similar_memories",
"query_goals",
"consolidate_memories",
"diagnose_file_access_issues",
"generate_workflow_report",
"hybrid_search_memories",
"get_subgraph",
]
logger = get_logger("ultimate_mcp_server.tools")
# --- Tool Registration ---
# Generate STANDALONE_TOOL_FUNCTIONS by filtering __all__ for actual function objects
# This eliminates the redundancy between __all__ and STANDALONE_TOOL_FUNCTIONS
def _get_standalone_tool_functions():
"""Dynamically generates list of standalone tool functions from __all__."""
current_module = sys.modules[__name__]
standalone_functions = []
for item_name in __all__:
if item_name in ["BaseTool", "with_tool_metrics", "with_retry",
"with_error_handling", "register_tool"]:
# Skip base classes and decorators
continue
# Get the actual item from the module
item = getattr(current_module, item_name, None)
# Only include callable async functions (not classes or other exports)
if callable(item) and inspect.iscoroutinefunction(item):
standalone_functions.append(item)
return standalone_functions
# Get the list of standalone functions to register
STANDALONE_TOOL_FUNCTIONS = _get_standalone_tool_functions()
def register_all_tools(mcp_server) -> Dict[str, Any]:
"""Registers all tools (standalone and class-based) with the MCP server.
Args:
mcp_server: The MCP server instance.
Returns:
Dictionary containing information about registered tools.
"""
from ultimate_mcp_server.config import get_config
cfg = get_config()
filter_enabled = cfg.tool_registration.filter_enabled
included_tools = cfg.tool_registration.included_tools
excluded_tools = cfg.tool_registration.excluded_tools
logger.info("Registering tools based on configuration...")
if filter_enabled:
if included_tools:
logger.info(f"Tool filtering enabled: including only {len(included_tools)} specified tools")
if excluded_tools:
logger.info(f"Tool filtering enabled: excluding {len(excluded_tools)} specified tools")
registered_tools: Dict[str, Any] = {}
# --- Register Standalone Functions ---
standalone_count = 0
for tool_func in STANDALONE_TOOL_FUNCTIONS:
if not callable(tool_func) or not inspect.iscoroutinefunction(tool_func):
logger.warning(f"Item {getattr(tool_func, '__name__', repr(tool_func))} in STANDALONE_TOOL_FUNCTIONS is not a callable async function.")
continue
tool_name = tool_func.__name__
# Apply tool filtering logic
if filter_enabled:
# Skip if not in included_tools when included_tools is specified
if included_tools and tool_name not in included_tools:
logger.debug(f"Skipping tool {tool_name} (not in included_tools)")
continue
# Skip if in excluded_tools
if tool_name in excluded_tools:
logger.debug(f"Skipping tool {tool_name} (in excluded_tools)")
continue
# Register the tool
mcp_server.tool(name=tool_name)(tool_func)
registered_tools[tool_name] = {
"description": inspect.getdoc(tool_func) or "",
"type": "standalone_function"
}
logger.info(f"Registered tool function: {tool_name}", emoji_key="⚙️")
standalone_count += 1
# --- Register Class-Based Tools ---
# Register Meta API Tool
if (not filter_enabled or
"meta_api_tool" in included_tools or
(not included_tools and "meta_api_tool" not in excluded_tools)):
try:
from ultimate_mcp_server.tools.meta_api_tool import register_api_meta_tools
register_api_meta_tools(mcp_server)
logger.info("Registered API Meta-Tool functions", emoji_key="⚙️")
standalone_count += 1
except ImportError:
logger.warning("Meta API tools not found (ultimate_mcp_server.tools.meta_api_tool)")
except Exception as e:
logger.error(f"Failed to register Meta API tools: {e}", exc_info=True)
# Register Excel Spreadsheet Automation Tool
if (not filter_enabled or
"excel_spreadsheet_automation" in included_tools or
(not included_tools and "excel_spreadsheet_automation" not in excluded_tools)):
try:
from ultimate_mcp_server.tools.excel_spreadsheet_automation import (
WINDOWS_EXCEL_AVAILABLE,
register_excel_spreadsheet_tools,
)
if WINDOWS_EXCEL_AVAILABLE:
register_excel_spreadsheet_tools(mcp_server)
logger.info("Registered Excel spreadsheet tools", emoji_key="⚙️")
standalone_count += 1
else:
# Automatically exclude Excel tools if not available
logger.warning("Excel automation tools are only available on Windows with Excel installed. These tools will not be registered.")
# If not already explicitly excluded, add to excluded_tools
if "excel_spreadsheet_automation" not in excluded_tools:
if not cfg.tool_registration.filter_enabled:
cfg.tool_registration.filter_enabled = True
if not hasattr(cfg.tool_registration, "excluded_tools"):
cfg.tool_registration.excluded_tools = []
cfg.tool_registration.excluded_tools.append("excel_spreadsheet_automation")
except ImportError:
logger.warning("Excel spreadsheet tools not found (ultimate_mcp_server.tools.excel_spreadsheet_automation)")
except Exception as e:
logger.error(f"Failed to register Excel spreadsheet tools: {e}", exc_info=True)
logger.info(
f"Completed tool registration. Registered {standalone_count} tools.",
emoji_key="⚙️"
)
# Return info about registered tools
return registered_tools
```
--------------------------------------------------------------------------------
/examples/advanced_vector_search_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Demo of advanced vector search capabilities using real Ultimate MCP Server tools."""
import asyncio
import sys
import time
from pathlib import Path
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
from rich.markup import escape
from rich.rule import Rule
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.services.vector import get_vector_db_service
from ultimate_mcp_server.services.vector.embeddings import cosine_similarity, get_embedding_service
# --- Add Marqo Tool Import ---
from ultimate_mcp_server.tools.marqo_fused_search import marqo_fused_search
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import (
display_embedding_generation_results,
display_text_content_result,
display_vector_similarity_results,
parse_and_display_result,
)
# ---------------------------
# --- Add Rich Imports ---
from ultimate_mcp_server.utils.logging.console import console
# ----------------------
# Initialize logger
logger = get_logger("example.advanced_vector_search")
# Initialize global gateway
gateway = None
vector_service = None
embedding_service = None
async def setup_services():
"""Set up the gateway and vector service for demonstration."""
global gateway, vector_service, embedding_service
logger.info("Initializing gateway and services...", emoji_key="start")
gateway = Gateway("vector-demo", register_tools=False)
await gateway._initialize_providers()
embedding_service = get_embedding_service() # Gateway will provide API keys through provider system
vector_service = get_vector_db_service()
logger.success("Services initialized.", emoji_key="success")
async def embedding_generation_demo():
"""Demonstrate embedding generation with real providers using Rich."""
console.print(Rule("[bold blue]Embedding Generation Demo[/bold blue]"))
logger.info("Starting embedding generation demo", emoji_key="start")
text_samples = [
"Quantum computing leverages quantum mechanics to perform computations",
"Artificial intelligence systems can learn from data and improve over time",
"Cloud infrastructure enables scalable and flexible computing resources"
]
console.print("Input Text Samples:")
for i, sample in enumerate(text_samples):
console.print(f" {i+1}. {escape(sample)}")
# Define models to test (ensure they are supported by your embedding_service config)
models_to_test = [
"text-embedding-3-small",
"text-embedding-3-large",
"text-embedding-ada-002"
]
# Collect results for display
results_data = {"models": []}
for model_name in models_to_test:
try:
logger.info(f"Generating embeddings with {model_name}...", emoji_key="processing")
start_time = time.time()
embeddings = await embedding_service.create_embeddings(
texts=text_samples
)
processing_time = time.time() - start_time
model_result = {
"name": model_name,
"success": embeddings and len(embeddings) > 0,
"time": processing_time,
"cost": embedding_service.last_request_cost if hasattr(embedding_service, 'last_request_cost') else 0.0,
}
if embeddings and len(embeddings) > 0:
dims = len(embeddings[0])
model_result["dimensions"] = dims
model_result["embedding_sample"] = embeddings[0][:3]
logger.success(f"Generated {len(embeddings)} embeddings ({dims} dims) for {model_name}", emoji_key="success")
else:
logger.warning(f"No embeddings returned for {model_name}", emoji_key="warning")
results_data["models"].append(model_result)
except Exception as e:
logger.error(f"Error generating embeddings with {model_name}: {e}", emoji_key="error", exc_info=True)
results_data["models"].append({
"name": model_name,
"success": False,
"error": str(e)
})
# Use the shared display utility to show results
display_embedding_generation_results(results_data)
async def vector_search_demo():
"""Demonstrate vector search capabilities using Rich."""
console.print(Rule("[bold blue]Vector Search Demo[/bold blue]"))
logger.info("Starting vector search demo", emoji_key="start")
documents = [
"Quantum computing uses quantum bits or qubits to perform calculations.",
"Machine learning algorithms learn patterns from data without explicit programming.",
"Blockchain technology creates a distributed and immutable ledger of transactions.",
"Cloud computing delivers computing services over the internet on demand.",
"Natural language processing helps computers understand and interpret human language.",
"Artificial intelligence systems can simulate human intelligence in machines.",
"Edge computing processes data closer to where it is generated rather than in a centralized location.",
"Cybersecurity involves protecting systems from digital attacks and unauthorized access.",
"Internet of Things (IoT) connects everyday devices to the internet for data sharing.",
"Virtual reality creates an immersive computer-generated environment."
]
document_metadata = [
{"id": "doc1", "category": "quantum", "level": "advanced"},
{"id": "doc2", "category": "ai", "level": "intermediate"},
{"id": "doc3", "category": "blockchain", "level": "beginner"},
{"id": "doc4", "category": "cloud", "level": "intermediate"},
{"id": "doc5", "category": "ai", "level": "advanced"},
{"id": "doc6", "category": "ai", "level": "beginner"},
{"id": "doc7", "category": "cloud", "level": "advanced"},
{"id": "doc8", "category": "security", "level": "intermediate"},
{"id": "doc9", "category": "iot", "level": "beginner"},
{"id": "doc10", "category": "vr", "level": "intermediate"}
]
collection_name = "demo_vector_store_rich"
embedding_dimension = 1536 # Default for text-embedding-ada-002 / 3-small
try:
logger.info(f"Creating/resetting collection: {collection_name}", emoji_key="db")
await vector_service.create_collection(
name=collection_name,
dimension=embedding_dimension,
overwrite=True,
metadata={"description": "Demo collection for Rich vector search"}
)
logger.info("Adding documents to vector store...", emoji_key="processing")
ids = await vector_service.add_texts(
collection_name=collection_name,
texts=documents,
metadatas=document_metadata,
batch_size=5
)
logger.success(f"Added {len(ids)} documents.", emoji_key="success")
# --- Perform Searches ---
search_queries = [
"How does quantum computing work?",
"Machine learning for image recognition",
"Secure blockchain implementation"
]
console.print(Rule("[green]Vector Search Results[/green]"))
for query in search_queries:
logger.info(f'Searching for: "{escape(query)}"...', emoji_key="search")
search_start_time = time.time()
results = await vector_service.search_by_text(
collection_name=collection_name,
query_text=query,
top_k=3,
include_vectors=False,
# Example filter: metadata_filter={"category": "ai"}
)
search_time = time.time() - search_start_time
# Format the results for the display utility
search_result = {
"processing_time": search_time,
"results": results,
"query": query
}
# Use the shared display utility
parse_and_display_result(
title=f"Search: {query}",
input_data={"query": query},
result=search_result
)
except Exception as e:
logger.error(f"Error during vector search demo: {e}", emoji_key="error", exc_info=True)
console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
finally:
# Clean up the collection
try:
logger.info(f"Deleting collection: {collection_name}", emoji_key="db")
await vector_service.delete_collection(collection_name)
except Exception as delete_err:
logger.warning(f"Could not delete collection {collection_name}: {delete_err}", emoji_key="warning")
console.print()
async def hybrid_search_demo():
"""Demonstrate hybrid search using the marqo_fused_search tool."""
console.print(Rule("[bold blue]Hybrid Search Demo (using Marqo Fused Search Tool)[/bold blue]"))
logger.info("Starting hybrid search demo (conceptual)", emoji_key="start")
# This demo uses the marqo_fused_search tool, which performs hybrid search.
# It requires a running Marqo instance and a configured index
# as defined in marqo_index_config.json.
# Note: For this demo to work correctly, the configured Marqo index
# should contain documents related to the query, potentially including
# metadata fields like 'tags' if filtering is intended.
# The setup below is removed as the data needs to be pre-indexed in Marqo.
# collection_name = "demo_hybrid_store_rich"
# try:
# logger.info(f"Creating/resetting collection: {collection_name}", emoji_key="db")
# # ... [Code to create collection and add documents would go here if using local DB] ...
# except Exception as setup_e:
# logger.error(f"Failed to setup data for hybrid demo: {setup_e}", emoji_key="error")
# console.print(f"[bold red]Error setting up demo data: {escape(str(setup_e))}[/bold red]")
# return
try:
# --- Perform Hybrid Search (Simulated) ---
query = "cloud semantic search techniques"
# keywords = ["cloud", "semantic"] # Keywords can be included in query or filters
semantic_weight_param = 0.6 # Weight for semantic search (alpha)
logger.info(f'Hybrid search for: "{escape(query)}" with semantic weight {semantic_weight_param}', emoji_key="search")
# Call the marqo_fused_search tool directly
hybrid_result = await marqo_fused_search(
query=query,
limit=3, # Request top 3 results
semantic_weight=semantic_weight_param
# Add filters={}, date_range=None etc. if needed based on schema
)
display_text_content_result(
f"Hybrid Search Results (Weight={semantic_weight_param})",
hybrid_result # Pass the result dict directly
)
except Exception as e:
logger.error(f"Error during hybrid search demo: {e}", emoji_key="error", exc_info=True)
console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
# Removed cleanup as we assume Marqo index exists independently
console.print()
async def semantic_similarity_demo():
"""Demonstrate calculating semantic similarity using Rich."""
console.print(Rule("[bold blue]Semantic Similarity Demo[/bold blue]"))
logger.info("Starting semantic similarity demo", emoji_key="start")
text_pairs = [
("The cat sat on the mat", "A feline was resting upon the rug"),
("AI is transforming industries", "Artificial intelligence drives innovation"),
("Cloud computing offers scalability", "The weather today is sunny")
]
model_name = "text-embedding-ada-002" # Use a consistent model
logger.info(f"Calculating similarity using model: {model_name}", emoji_key="model")
# Prepare data structure for the shared display utility
similarity_data = {
"pairs": [],
"model": model_name
}
try:
all_texts = [text for pair in text_pairs for text in pair]
embeddings = await embedding_service.create_embeddings(
texts=all_texts
)
if len(embeddings) == len(all_texts):
for i, pair in enumerate(text_pairs):
idx1 = i * 2
idx2 = i * 2 + 1
score = cosine_similarity(embeddings[idx1], embeddings[idx2])
similarity_data["pairs"].append({
"text1": pair[0],
"text2": pair[1],
"score": score
})
# Use the specialized display function for similarity results
display_vector_similarity_results(similarity_data)
else:
logger.error("Mismatch between number of texts and embeddings received.", emoji_key="error")
console.print("[red]Error calculating similarities: Embedding count mismatch.[/red]")
except Exception as e:
logger.error(f"Error calculating semantic similarity: {e}", emoji_key="error", exc_info=True)
console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
console.print()
async def main():
"""Run all advanced vector search demonstrations."""
await setup_services()
console.print(Rule("[bold magenta]Advanced Vector Search Demos Starting[/bold magenta]"))
try:
await embedding_generation_demo()
await vector_search_demo()
await hybrid_search_demo()
await semantic_similarity_demo()
except Exception as e:
logger.critical(f"Vector search demo failed: {str(e)}", emoji_key="critical", exc_info=True)
console.print(f"[bold red]Critical Demo Error:[/bold red] {escape(str(e))}")
return 1
logger.success("Advanced Vector Search Demos Finished Successfully!", emoji_key="complete")
console.print(Rule("[bold magenta]Advanced Vector Search Demos Complete[/bold magenta]"))
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)
```