This is page 5 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/examples/rag_example.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""Example of using the RAG functionality with Ultimate MCP Server."""
import asyncio
import sys
from pathlib import Path
# Add parent directory to path to import ultimate_mcp_server
sys.path.insert(0, str(Path(__file__).parent.parent))
from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.services.knowledge_base import (
get_knowledge_base_manager,
get_knowledge_base_retriever,
)
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker
from ultimate_mcp_server.utils.logging.console import console
# Initialize logger
logger = get_logger("rag_example")
# Sample documents about different AI technologies
AI_DOCUMENTS = [
"""Transformers are a type of neural network architecture introduced in the paper
"Attention is All You Need" by Vaswani et al. in 2017. They use self-attention
mechanisms to process sequential data, making them highly effective for natural
language processing tasks. Unlike recurrent neural networks (RNNs), transformers
process entire sequences in parallel, which allows for more efficient training.
The original transformer architecture consists of an encoder and a decoder, each
made up of multiple layers of self-attention and feed-forward neural networks.""",
"""Retrieval-Augmented Generation (RAG) is an AI framework that combines the
strengths of retrieval-based and generation-based approaches. In RAG systems,
a retrieval component first finds relevant information from a knowledge base,
and then a generation component uses this information to produce more accurate,
factual, and contextually relevant outputs. RAG helps to mitigate hallucination
issues in large language models by grounding the generation in retrieved facts.""",
"""Reinforcement Learning from Human Feedback (RLHF) is a technique used to align
language models with human preferences. The process typically involves three steps:
First, a language model is pre-trained on a large corpus of text. Second, human
evaluators rank different model outputs, creating a dataset of preferred responses.
Third, this dataset is used to train a reward model, which is then used to fine-tune
the language model using reinforcement learning techniques such as Proximal Policy
Optimization (PPO).""",
"""Mixture of Experts (MoE) is an architecture where multiple specialized neural
networks (experts) are trained to handle different types of inputs or tasks. A
gating network determines which expert(s) should process each input. This approach
allows for larger model capacity without a proportional increase in computational
costs, as only a subset of the parameters is activated for any given input. MoE
has been successfully applied in large language models like Google's Switch
Transformer and Microsoft's Mixtral."""
]
AI_METADATAS = [
{"title": "Transformers", "source": "AI Handbook", "type": "architecture"},
{"title": "Retrieval-Augmented Generation", "source": "AI Handbook", "type": "technique"},
{"title": "RLHF", "source": "AI Handbook", "type": "technique"},
{"title": "Mixture of Experts", "source": "AI Handbook", "type": "architecture"}
]
EXAMPLE_QUERIES = [
"How do transformers work?",
"What is retrieval-augmented generation?",
"Compare RLHF and MoE approaches."
]
KB_NAME = "ai_technologies"
async def run_rag_demo(tracker: CostTracker):
"""Run the complete RAG demonstration."""
console.print("[bold blue]RAG Example with Ultimate MCP Server[/bold blue]")
console.print("This example demonstrates the RAG functionality using direct knowledge base services.")
console.print()
# Initialize Gateway for proper provider and API key management
gateway = Gateway("rag-example", register_tools=False)
await gateway._initialize_providers()
# Get knowledge base services directly
kb_manager = get_knowledge_base_manager()
kb_retriever = get_knowledge_base_retriever()
# Clean up any existing knowledge base with the same name before starting
console.print(Rule("[bold blue]Cleaning Up Previous Runs[/bold blue]"))
# Force a clean start
try:
# Get direct reference to the vector service
from ultimate_mcp_server.services.vector import get_vector_db_service
vector_service = get_vector_db_service()
# Try a more aggressive approach by resetting chromadb client directly
if hasattr(vector_service, 'chroma_client') and vector_service.chroma_client:
try:
# First try standard deletion
try:
vector_service.chroma_client.delete_collection(KB_NAME)
logger.info("Successfully deleted ChromaDB collection using client API")
except Exception as e:
logger.debug(f"Standard ChromaDB deletion failed: {str(e)}")
# Wait longer to ensure deletion propagates
await asyncio.sleep(1.0)
# Force reset the ChromaDB client when all else fails
if hasattr(vector_service.chroma_client, 'reset'):
try:
vector_service.chroma_client.reset()
logger.info("Reset ChromaDB client to ensure clean start")
await asyncio.sleep(0.5)
except Exception as e:
logger.warning(f"Failed to reset ChromaDB client: {str(e)}")
except Exception as e:
logger.warning(f"Error with ChromaDB client manipulation: {str(e)}")
# Try to delete at the vector database level again
try:
await vector_service.delete_collection(KB_NAME)
logger.info(f"Directly deleted vector collection '{KB_NAME}'")
await asyncio.sleep(0.5)
except Exception as e:
logger.warning(f"Error directly deleting vector collection: {str(e)}")
# Also try to delete at the knowledge base level
try:
kb_info = await kb_manager.get_knowledge_base(KB_NAME)
if kb_info and kb_info.get("status") != "not_found":
await kb_manager.delete_knowledge_base(name=KB_NAME)
logger.info(f"Deleted existing knowledge base '{KB_NAME}'")
await asyncio.sleep(0.5)
except Exception as e:
logger.warning(f"Error deleting knowledge base: {str(e)}")
logger.info("Cleanup completed, proceeding with clean start")
except Exception as e:
logger.warning(f"Error during initial cleanup: {str(e)}")
console.print()
# Step 1: Create knowledge base
console.print(Rule("[bold blue]Step 1: Creating Knowledge Base[/bold blue]"))
try:
await kb_manager.create_knowledge_base(
name=KB_NAME,
description="Information about various AI technologies",
embedding_model="text-embedding-3-small",
overwrite=True
)
logger.success(f"Knowledge base created: {KB_NAME}", emoji_key="success")
except Exception as e:
logger.error(f"Failed to create knowledge base: {str(e)}", emoji_key="error")
return 1
console.print()
# Step 2: Add documents
console.print(Rule("[bold blue]Step 2: Adding Documents[/bold blue]"))
try:
result = await kb_manager.add_documents(
knowledge_base_name=KB_NAME,
documents=AI_DOCUMENTS,
metadatas=AI_METADATAS,
embedding_model="text-embedding-3-small",
chunk_size=1000,
chunk_method="semantic"
)
added_count = result.get("added_count", 0)
logger.success(f"Added {added_count} documents to knowledge base", emoji_key="success")
except Exception as e:
logger.error(f"Failed to add documents: {str(e)}", emoji_key="error")
return 1
console.print()
# Step 3: List knowledge bases
console.print(Rule("[bold blue]Step 3: Listing Knowledge Bases[/bold blue]"))
try:
knowledge_bases = await kb_manager.list_knowledge_bases()
# Create a Rich table for display
table = Table(title="Available Knowledge Bases", box=None)
table.add_column("Name", style="cyan")
table.add_column("Description", style="green")
table.add_column("Document Count", style="magenta")
# Handle various return types
try:
if knowledge_bases is None:
table.add_row("No knowledge bases found", "", "")
elif isinstance(knowledge_bases, dict):
# Handle dictionary response
kb_names = knowledge_bases.get("knowledge_bases", [])
if isinstance(kb_names, list):
for kb_item in kb_names:
if isinstance(kb_item, dict):
# Extract name and metadata from dictionary
name = kb_item.get("name", "Unknown")
metadata = kb_item.get("metadata", {})
description = metadata.get("description", "No description") if isinstance(metadata, dict) else "No description"
doc_count = metadata.get("doc_count", "Unknown") if isinstance(metadata, dict) else "Unknown"
table.add_row(str(name), str(description), str(doc_count))
else:
table.add_row(str(kb_item), "No description available", "Unknown")
else:
table.add_row("Error parsing response", "", "")
elif isinstance(knowledge_bases, list):
# Handle list response
for kb in knowledge_bases:
if isinstance(kb, str):
table.add_row(kb, "No description", "0")
elif isinstance(kb, dict):
name = kb.get("name", "Unknown")
metadata = kb.get("metadata", {})
description = metadata.get("description", "No description") if isinstance(metadata, dict) else "No description"
doc_count = metadata.get("doc_count", "Unknown") if isinstance(metadata, dict) else "Unknown"
table.add_row(str(name), str(description), str(doc_count))
else:
kb_name = str(getattr(kb, 'name', str(kb)))
table.add_row(kb_name, "No description", "0")
else:
# Fallback for unexpected response type
table.add_row(f"Unexpected response: {type(knowledge_bases)}", "", "")
console.print(table)
except Exception as e:
logger.error(f"Error rendering knowledge bases table: {str(e)}", emoji_key="error")
# Simple fallback display
console.print(f"Knowledge bases available: {knowledge_bases}")
except Exception as e:
logger.error(f"Failed to list knowledge bases: {str(e)}", emoji_key="error")
console.print()
# Step 4: Retrieve context for first query
console.print(Rule("[bold blue]Step 4: Retrieving Context[/bold blue]"))
query = EXAMPLE_QUERIES[0]
logger.info(f"Retrieving context for query: '{query}'", emoji_key="processing")
# Default fallback document if retrieval fails
retrieved_results = []
try:
try:
results = await kb_retriever.retrieve(
knowledge_base_name=KB_NAME,
query=query,
top_k=2,
min_score=0.0, # Set min_score to 0 to see all results
embedding_model="text-embedding-3-small" # Use the same embedding model as when adding documents
)
retrieved_results = results.get('results', [])
# Debug raw results
logger.debug(f"Raw retrieval results: {results}")
except Exception as e:
logger.error(f"Error retrieving from knowledge base: {str(e)}", emoji_key="error")
# Fallback to using the documents directly
retrieved_results = [
{
"document": AI_DOCUMENTS[0],
"score": 0.95,
"metadata": AI_METADATAS[0]
}
]
console.print(f"Retrieved {len(retrieved_results)} results for query: '{query}'")
# Display results in panels
if retrieved_results:
for i, doc in enumerate(retrieved_results):
try:
score = doc.get('score', 0.0)
document = doc.get('document', '')
metadata = doc.get('metadata', {})
source = metadata.get('title', 'Unknown') if isinstance(metadata, dict) else 'Unknown'
console.print(Panel(
f"[bold]Document {i+1}[/bold] (score: {score:.2f})\n" +
f"[italic]{document[:150]}...[/italic]",
title=f"Source: {source}",
border_style="blue"
))
except Exception as e:
logger.error(f"Error displaying document {i}: {str(e)}", emoji_key="error")
else:
console.print(Panel(
"[italic]No results found. Using sample document as fallback for demonstration.[/italic]",
title="No Results",
border_style="yellow"
))
# Create a fallback document for the next step
retrieved_results = [
{
"document": AI_DOCUMENTS[0],
"score": 0.0,
"metadata": AI_METADATAS[0]
}
]
except Exception as e:
logger.error(f"Failed to process retrieval results: {str(e)}", emoji_key="error")
# Ensure we have something to continue with
retrieved_results = [
{
"document": AI_DOCUMENTS[0],
"score": 0.0,
"metadata": AI_METADATAS[0]
}
]
console.print()
# Step 5: Generate completions using retrieved context for the first query
console.print(Rule("[bold blue]Step 5: Generating Response with Retrieved Context[/bold blue]"))
query = EXAMPLE_QUERIES[0]
console.print(f"\n[bold]Query:[/bold] {query}")
try:
# Get the provider
provider_key = "gemini"
provider = gateway.providers.get(provider_key)
if not provider:
provider_key = "openai"
provider = gateway.providers.get(provider_key) # Fallback
if not provider:
logger.error("No suitable provider found", emoji_key="error")
return 1
# Use a hardcoded model based on provider type
if provider_key == "gemini":
model = "gemini-2.0-flash-lite"
elif provider_key == "openai":
model = "gpt-4.1-mini"
elif provider_key == "anthropic":
model = "claude-3-haiku-latest"
else:
# Get first available model or fallback
models = getattr(provider, 'available_models', [])
model = models[0] if models else "unknown-model"
# Prepare context from retrieved documents
if retrieved_results:
context = "\n\n".join([doc.get("document", "") for doc in retrieved_results if doc.get("document")])
else:
# Fallback to using the first document directly if no results
context = AI_DOCUMENTS[0]
# Build prompt with context
prompt = f"""Answer the following question based on the provided context.
If the context doesn't contain relevant information, say so.
Context:
{context}
Question: {query}
Answer:"""
# Generate response
response = await provider.generate_completion(
prompt=prompt,
model=model,
temperature=0.3,
max_tokens=300
)
# Display the answer
console.print(Panel(
response.text,
title=f"Answer from {provider_key}/{model}",
border_style="green"
))
# Display usage stats
metrics_table = Table(title="Performance Metrics", box=None)
metrics_table.add_column("Metric", style="cyan")
metrics_table.add_column("Value", style="white")
metrics_table.add_row("Input Tokens", str(response.input_tokens))
metrics_table.add_row("Output Tokens", str(response.output_tokens))
metrics_table.add_row("Processing Time", f"{response.processing_time:.2f}s")
metrics_table.add_row("Cost", f"${response.cost:.6f}")
console.print(metrics_table)
# Track the generation call
tracker.add_call(response)
except Exception as e:
logger.error(f"Failed to generate response: {str(e)}", emoji_key="error")
console.print()
# Step 6: Clean up
console.print(Rule("[bold blue]Step 6: Cleaning Up[/bold blue]"))
# Display cost summary before final cleanup
tracker.display_summary(console)
try:
await kb_manager.delete_knowledge_base(name=KB_NAME)
logger.success(f"Knowledge base {KB_NAME} deleted successfully", emoji_key="success")
except Exception as e:
logger.error(f"Failed to delete knowledge base: {str(e)}", emoji_key="error")
return 1
return 0
async def main():
"""Run the RAG example."""
tracker = CostTracker() # Instantiate tracker
try:
await run_rag_demo(tracker) # Pass tracker
except Exception as e:
logger.critical(f"RAG demo failed unexpectedly: {e}", exc_info=True)
return 1
return 0
if __name__ == "__main__":
# Run the demonstration
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/tournaments/manager.py:
--------------------------------------------------------------------------------
```python
# --- core/tournaments/manager.py (Updates) ---
import asyncio
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple # Added Type
from pydantic import ValidationError
import ultimate_mcp_server.core.evaluation.evaluators # Ensures evaluators are registered # noqa: F401
from ultimate_mcp_server.core.evaluation.base import EVALUATOR_REGISTRY, Evaluator
from ultimate_mcp_server.core.models.tournament import (
CreateTournamentInput,
TournamentConfig, # ModelConfig is nested in TournamentConfig from CreateTournamentInput
TournamentData,
TournamentRoundResult,
TournamentStatus,
)
from ultimate_mcp_server.core.models.tournament import (
ModelConfig as CoreModelConfig, # Alias to avoid confusion
)
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.tournaments.manager")
STORAGE_DIR = Path(__file__).resolve().parent.parent.parent.parent / "storage"
TOURNAMENT_STORAGE_BASE = STORAGE_DIR / "tournaments"
class TournamentManager:
def __init__(self):
self.tournaments: Dict[str, TournamentData] = {}
# --- NEW: Store instantiated evaluators per tournament ---
self.tournament_evaluators: Dict[str, List[Evaluator]] = {}
TOURNAMENT_STORAGE_BASE.mkdir(parents=True, exist_ok=True)
logger.info(f"Tournament storage initialized at: {TOURNAMENT_STORAGE_BASE}")
self._load_all_tournaments()
def _instantiate_evaluators(self, tournament_id: str, config: TournamentConfig) -> bool:
"""Instantiates and stores evaluators for a tournament."""
self.tournament_evaluators[tournament_id] = []
for eval_config in config.evaluators:
evaluator_cls = EVALUATOR_REGISTRY.get(eval_config.type)
if not evaluator_cls:
logger.error(f"Unknown evaluator type '{eval_config.type}' for tournament {tournament_id}. Skipping.")
# Optionally, fail tournament creation if a critical evaluator is missing
continue
try:
self.tournament_evaluators[tournament_id].append(evaluator_cls(eval_config.params))
logger.info(f"Instantiated evaluator '{eval_config.type}' (ID: {eval_config.evaluator_id}) for tournament {tournament_id}")
except Exception as e:
logger.error(f"Failed to instantiate evaluator '{eval_config.type}' (ID: {eval_config.evaluator_id}): {e}", exc_info=True)
# Decide if this is a fatal error for the tournament
return False # Example: Fail if any evaluator instantiation fails
return True
def get_evaluators_for_tournament(self, tournament_id: str) -> List[Evaluator]:
"""Returns the list of instantiated evaluators for a given tournament."""
return self.tournament_evaluators.get(tournament_id, [])
def create_tournament(self, input_data: CreateTournamentInput) -> Optional[TournamentData]:
try:
logger.debug(f"Creating tournament with name: {input_data.name}, {len(input_data.model_configs)} model configs")
# Map input ModelConfig to core ModelConfig used in TournamentConfig
core_model_configs = [
CoreModelConfig(
model_id=mc.model_id,
diversity_count=mc.diversity_count,
temperature=mc.temperature,
max_tokens=mc.max_tokens,
system_prompt=mc.system_prompt,
seed=mc.seed
) for mc in input_data.model_configs
]
tournament_cfg = TournamentConfig(
name=input_data.name,
prompt=input_data.prompt,
models=core_model_configs, # Use the mapped core_model_configs
rounds=input_data.rounds,
tournament_type=input_data.tournament_type,
extraction_model_id=input_data.extraction_model_id,
evaluators=input_data.evaluators, # Pass evaluator configs
max_retries_per_model_call=input_data.max_retries_per_model_call,
retry_backoff_base_seconds=input_data.retry_backoff_base_seconds,
max_concurrent_model_calls=input_data.max_concurrent_model_calls
)
tournament = TournamentData(
name=input_data.name,
config=tournament_cfg,
current_round=-1, # Initialize current_round
start_time=None, # Will be set when execution starts
end_time=None
)
tournament.storage_path = str(self._get_storage_path(tournament.tournament_id, tournament.name)) # Pass name for better paths
# --- NEW: Instantiate evaluators ---
if not self._instantiate_evaluators(tournament.tournament_id, tournament.config):
logger.error(f"Failed to instantiate one or more evaluators for tournament {tournament.name}. Creation aborted.")
# Clean up if necessary, e.g., remove from self.tournament_evaluators
if tournament.tournament_id in self.tournament_evaluators:
del self.tournament_evaluators[tournament.tournament_id]
return None # Or raise an error
self.tournaments[tournament.tournament_id] = tournament
self._save_tournament_state(tournament)
logger.info(f"Tournament '{tournament.name}' (ID: {tournament.tournament_id}) created successfully.")
return tournament
except ValidationError as ve:
logger.error(f"Tournament validation failed: {ve}")
return None
except Exception as e:
logger.error(f"Unexpected error creating tournament: {e}", exc_info=True)
return None
def get_tournament(self, tournament_id: str, force_reload: bool = False) -> Optional[TournamentData]:
logger.debug(f"Getting tournament {tournament_id} (force_reload={force_reload})")
if not force_reload and tournament_id in self.tournaments:
return self.tournaments[tournament_id]
tournament = self._load_tournament_state(tournament_id)
if tournament:
# --- NEW: Ensure evaluators are loaded/re-instantiated if not present ---
if tournament_id not in self.tournament_evaluators:
logger.info(f"Evaluators for tournament {tournament_id} not in memory, re-instantiating from config.")
if not self._instantiate_evaluators(tournament_id, tournament.config):
logger.error(f"Failed to re-instantiate evaluators for loaded tournament {tournament_id}. Evaluation might fail.")
self.tournaments[tournament_id] = tournament # Update cache
return tournament
def _save_tournament_state(self, tournament: TournamentData):
if not tournament.storage_path:
logger.error(f"Cannot save state for tournament {tournament.tournament_id}: storage_path not set.")
return
state_file = Path(tournament.storage_path) / "tournament_state.json"
try:
state_file.parent.mkdir(parents=True, exist_ok=True)
# Pydantic's model_dump_json handles datetime to ISO string conversion
json_data = tournament.model_dump_json(indent=2)
with open(state_file, 'w', encoding='utf-8') as f:
f.write(json_data)
logger.debug(f"Saved state for tournament {tournament.tournament_id} to {state_file}")
except IOError as e:
logger.error(f"Failed to save state for tournament {tournament.tournament_id}: {e}")
except Exception as e: # Catch other potential errors from model_dump_json
logger.error(f"Error serializing tournament state for {tournament.tournament_id}: {e}", exc_info=True)
def _load_tournament_state(self, tournament_id: str) -> Optional[TournamentData]:
# Try finding by explicit ID first (common case for direct access)
# The storage path might be complex now, so scan might be more reliable if ID is the only input
# Robust scan: iterate through all subdirectories of TOURNAMENT_STORAGE_BASE
if TOURNAMENT_STORAGE_BASE.exists():
for potential_tournament_dir in TOURNAMENT_STORAGE_BASE.iterdir():
if potential_tournament_dir.is_dir():
state_file = potential_tournament_dir / "tournament_state.json"
if state_file.exists():
try:
with open(state_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if data.get("tournament_id") == tournament_id:
# Use Pydantic for robust parsing and type conversion
parsed_tournament = TournamentData.model_validate(data)
logger.debug(f"Loaded state for tournament {tournament_id} from {state_file}")
return parsed_tournament
except (IOError, json.JSONDecodeError, ValidationError) as e:
logger.warning(f"Failed to load or validate state from {state_file} for tournament ID {tournament_id}: {e}")
# Don't return, continue scanning
except Exception as e: # Catch any other unexpected error
logger.error(f"Unexpected error loading state from {state_file}: {e}", exc_info=True)
logger.debug(f"Tournament {tournament_id} not found in any storage location during scan.")
return None
def _load_all_tournaments(self):
logger.info(f"Scanning {TOURNAMENT_STORAGE_BASE} for existing tournaments...")
count = 0
if not TOURNAMENT_STORAGE_BASE.exists():
logger.warning("Tournament storage directory does not exist. No tournaments loaded.")
return
for item in TOURNAMENT_STORAGE_BASE.iterdir():
if item.is_dir():
# Attempt to load tournament_state.json from this directory
state_file = item / "tournament_state.json"
if state_file.exists():
try:
with open(state_file, 'r', encoding='utf-8') as f:
data = json.load(f)
tournament_id_from_file = data.get("tournament_id")
if not tournament_id_from_file:
logger.warning(f"Skipping directory {item.name}, tournament_state.json missing 'tournament_id'.")
continue
if tournament_id_from_file not in self.tournaments: # Avoid reloading if already cached by some other means
# Use the get_tournament method which handles re-instantiating evaluators
loaded_tournament = self.get_tournament(tournament_id_from_file, force_reload=True)
if loaded_tournament:
count += 1
logger.debug(f"Loaded tournament '{loaded_tournament.name}' (ID: {loaded_tournament.tournament_id}) from {item.name}")
else:
logger.warning(f"Failed to fully load tournament from directory: {item.name} (ID in file: {tournament_id_from_file})")
except (IOError, json.JSONDecodeError, ValidationError) as e:
logger.warning(f"Error loading tournament from directory {item.name}: {e}")
except Exception as e:
logger.error(f"Unexpected error loading tournament from {item.name}: {e}", exc_info=True)
logger.info(f"Finished scan. Loaded {count} existing tournaments into manager.")
def start_tournament_execution(self, tournament_id: str) -> bool:
logger.debug(f"Attempting to start tournament execution for {tournament_id}")
tournament = self.get_tournament(tournament_id) # Ensures evaluators are loaded
if not tournament:
logger.error(f"Cannot start execution: Tournament {tournament_id} not found.")
return False
if tournament.status not in [TournamentStatus.PENDING, TournamentStatus.CREATED]:
logger.warning(f"Tournament {tournament_id} is not in a runnable state ({tournament.status}). Cannot start.")
return False
tournament.status = TournamentStatus.RUNNING # Or QUEUED if worker mode is implemented
tournament.start_time = datetime.now(timezone.utc)
tournament.current_round = 0 # Explicitly set to 0 when starting
# Ensure rounds_results is initialized if empty
if not tournament.rounds_results:
tournament.rounds_results = [
TournamentRoundResult(round_num=i) for i in range(tournament.config.rounds)
]
self._save_tournament_state(tournament)
logger.info(f"Tournament {tournament_id} status set to {tournament.status}, ready for async execution.")
try:
from ultimate_mcp_server.core.tournaments.tasks import (
run_tournament_async, # Local import
)
asyncio.create_task(run_tournament_async(tournament_id))
logger.info(f"Asyncio task created for tournament {tournament_id}.")
return True
except Exception as e:
logger.error(f"Error creating asyncio task for tournament {tournament_id}: {e}", exc_info=True)
tournament.status = TournamentStatus.FAILED
tournament.error_message = f"Failed during asyncio task creation: {str(e)}"
tournament.end_time = datetime.now(timezone.utc)
self._save_tournament_state(tournament)
return False
async def cancel_tournament(self, tournament_id: str) -> Tuple[bool, str, TournamentStatus]: # Return final status
"""Attempts to cancel a tournament. Returns success, message, and final status."""
tournament = self.get_tournament(tournament_id, force_reload=True)
if not tournament:
logger.warning(f"Cannot cancel non-existent tournament {tournament_id}")
# Use FAILED or a specific status for "not found" if added to enum,
# or rely on the tool layer to raise 404. For manager, FAILED can represent this.
return False, "Tournament not found.", TournamentStatus.FAILED
current_status = tournament.status
final_status = current_status # Default to current status if no change
message = ""
if current_status == TournamentStatus.RUNNING or current_status == TournamentStatus.QUEUED:
logger.info(f"Attempting to cancel tournament {tournament_id} (status: {current_status})...")
tournament.status = TournamentStatus.CANCELLED
tournament.error_message = tournament.error_message or "Tournament cancelled by user request."
tournament.end_time = datetime.now(timezone.utc)
final_status = TournamentStatus.CANCELLED
message = "Cancellation requested. Tournament status set to CANCELLED."
self._save_tournament_state(tournament)
logger.info(f"Tournament {tournament_id} status set to CANCELLED.")
# The background task needs to observe this status.
return True, message, final_status
elif current_status in [TournamentStatus.COMPLETED, TournamentStatus.FAILED, TournamentStatus.CANCELLED]:
message = f"Tournament {tournament_id} is already finished or cancelled (Status: {current_status})."
logger.warning(message)
return False, message, final_status
elif current_status == TournamentStatus.PENDING or current_status == TournamentStatus.CREATED:
tournament.status = TournamentStatus.CANCELLED
tournament.error_message = "Tournament cancelled before starting."
tournament.end_time = datetime.now(timezone.utc)
final_status = TournamentStatus.CANCELLED
message = "Pending/Created tournament cancelled successfully."
self._save_tournament_state(tournament)
logger.info(f"Pending/Created tournament {tournament_id} cancelled.")
return True, message, final_status
else:
# Should not happen, but handle unknown state
message = f"Tournament {tournament_id} is in an unexpected state ({current_status}). Cannot determine cancellation action."
logger.error(message)
return False, message, current_status
def list_tournaments(self) -> List[Dict[str, Any]]:
# Ensure cache is up-to-date if new tournaments might have been added externally (less likely with file storage)
# self._load_all_tournaments() # Consider if this is too expensive for every list call
basic_list = []
for t_data in self.tournaments.values():
basic_list.append({
"tournament_id": t_data.tournament_id,
"name": t_data.name,
"tournament_type": t_data.config.tournament_type,
"status": t_data.status,
"current_round": t_data.current_round,
"total_rounds": t_data.config.rounds,
"created_at": t_data.created_at.isoformat() if t_data.created_at else None, # Ensure ISO format
"updated_at": t_data.updated_at.isoformat() if t_data.updated_at else None,
"start_time": t_data.start_time.isoformat() if t_data.start_time else None,
"end_time": t_data.end_time.isoformat() if t_data.end_time else None,
})
basic_list.sort(key=lambda x: x['created_at'] or '', reverse=True) # Handle None created_at for sorting
return basic_list
def _get_storage_path(self, tournament_id: str, tournament_name: str) -> Path:
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Sanitize tournament name for use in path
safe_name = re.sub(r'[^\w\s-]', '', tournament_name).strip().replace(' ', '_')
safe_name = re.sub(r'[-\s]+', '-', safe_name) # Replace multiple spaces/hyphens with single hyphen
safe_name = safe_name[:50] # Limit length
# Use first 8 chars of UUID for brevity if name is too generic or empty
uuid_suffix = tournament_id.split('-')[0]
folder_name = f"{timestamp_str}_{safe_name}_{uuid_suffix}" if safe_name else f"{timestamp_str}_{uuid_suffix}"
path = TOURNAMENT_STORAGE_BASE / folder_name
path.mkdir(parents=True, exist_ok=True) # Ensure directory is created
return path
tournament_manager = TournamentManager()
```
--------------------------------------------------------------------------------
/examples/audio_transcription_demo.py:
--------------------------------------------------------------------------------
```python
"""Demonstration script for audio transcription using faster-whisper.
This version uses the faster-whisper library which offers better performance than whisper.cpp.
"""
import asyncio
import os
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Tuple
from rich import box
from rich.console import Console
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.rule import Rule
from rich.table import Table
# Add the project root to the Python path
# This allows finding the ultimate package when running the script directly
project_root = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(project_root))
EXAMPLE_DIR = Path(__file__).parent
DATA_DIR = EXAMPLE_DIR / "data"
SAMPLE_AUDIO_PATH = str(DATA_DIR / "Steve_Jobs_Introducing_The_iPhone_compressed.mp3")
from ultimate_mcp_server.utils import get_logger # noqa: E402
# --- Configuration ---
logger = get_logger("audio_demo")
# Get the directory of the current script
SCRIPT_DIR = Path(__file__).parent.resolve()
DATA_DIR = SCRIPT_DIR / "data"
# Define allowed audio extensions
ALLOWED_EXTENSIONS = [".mp3", ".wav", ".flac", ".ogg", ".m4a"]
# --- Helper Functions ---
def find_audio_files(directory: Path) -> List[Path]:
"""Finds audio files with allowed extensions in the given directory."""
return [p for p in directory.iterdir() if p.is_file() and p.suffix.lower() in ALLOWED_EXTENSIONS]
def format_timestamp(seconds: float) -> str:
"""Format seconds into a timestamp string."""
hours = int(seconds / 3600)
minutes = int((seconds % 3600) / 60)
secs = seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:05.2f}"
else:
return f"{minutes:02d}:{secs:05.2f}"
def detect_device() -> Tuple[str, str, str]:
"""Detect if CUDA GPU is available and return appropriate device and compute_type."""
try:
# Import torch to check if CUDA is available
import torch
if torch.cuda.is_available():
# Get GPU info for display
gpu_name = torch.cuda.get_device_name(0)
return "cuda", "float16", gpu_name
else:
return "cpu", "int8", None
except ImportError:
# If torch is not available, try to directly check for NVIDIA GPUs with ctranslate2
try:
import subprocess
nvidia_smi_output = subprocess.check_output(["nvidia-smi", "-L"], text=True, stderr=subprocess.DEVNULL)
if "GPU" in nvidia_smi_output:
# Extract GPU name
gpu_name = nvidia_smi_output.strip().split(':')[1].strip().split('(')[0].strip()
return "cuda", "float16", gpu_name
else:
return "cpu", "int8", None
except Exception:
# If all else fails, default to CPU
return "cpu", "int8", None
def generate_markdown_transcript(transcript: Dict[str, Any], file_path: str) -> str:
"""Generate a markdown version of the transcript with metadata."""
audio_filename = os.path.basename(file_path)
metadata = transcript.get("metadata", {})
segments = transcript.get("segments", [])
markdown = [
f"# Transcript: {audio_filename}",
"",
"## Metadata",
f"- **Duration:** {format_timestamp(metadata.get('duration', 0))}",
f"- **Language:** {metadata.get('language', 'unknown')} (confidence: {metadata.get('language_probability', 0):.2f})",
f"- **Transcription Model:** {metadata.get('model', 'unknown')}",
f"- **Device:** {metadata.get('device', 'unknown')}",
f"- **Processing Time:** {transcript.get('processing_time', {}).get('total', 0):.2f} seconds",
"",
"## Full Transcript",
"",
transcript.get("enhanced_transcript", transcript.get("raw_transcript", "")),
"",
"## Segments",
""
]
for segment in segments:
start_time = format_timestamp(segment["start"])
end_time = format_timestamp(segment["end"])
markdown.append(f"**[{start_time} → {end_time}]** {segment['text']}")
markdown.append("")
return "\n".join(markdown)
def save_markdown_transcript(transcript: Dict[str, Any], file_path: str) -> Tuple[str, str]:
"""Save the transcript as markdown and text files.
Returns:
Tuple containing paths to markdown and text files
"""
audio_path = Path(file_path)
markdown_path = audio_path.with_suffix(".md")
txt_path = audio_path.with_suffix(".txt")
# Generate and save markdown (enhanced transcript)
markdown_content = generate_markdown_transcript(transcript, file_path)
with open(markdown_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
# Save raw transcript as plain text file
with open(txt_path, "w", encoding="utf-8") as f:
f.write(transcript.get("raw_transcript", ""))
return str(markdown_path), str(txt_path)
async def enhance_transcript_with_llm(raw_transcript: str, console: Console) -> str:
"""Enhance the transcript using an LLM to improve readability."""
try:
from ultimate_mcp_server.tools.completion import chat_completion
except ImportError:
console.print("[yellow]Ultimate MCP Server tools not available for enhancement. Using raw transcript.[/yellow]")
return raw_transcript
# Setup progress display
with Progress(
SpinnerColumn(),
TextColumn("[bold green]Enhancing transcript with LLM[/bold green]"),
BarColumn(),
TextColumn("[cyan]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console
) as progress:
enhance_task = progress.add_task("Enhancing...", total=100)
try:
# Create the prompt for transcript enhancement
system_prompt = """You are an expert transcription editor. Your task is to enhance the following raw transcript:
1. Fix any spelling or grammar errors
2. Add proper punctuation and capitalization
3. Format the text into logical paragraphs
4. Remove filler words and repeated phrases
5. Preserve the original meaning and all factual content
6. Format numbers, acronyms, and technical terms consistently
7. Keep the text faithful to the original but make it more readable"""
user_prompt = f"Here is the raw transcript to enhance:\n\n{raw_transcript}\n\nPlease provide only the enhanced transcript without explanations."
# Split the transcript into chunks if it's very long
progress.update(enhance_task, completed=20)
# Call the chat completion function
result = await chat_completion(
system_prompt=system_prompt,
messages=[{"role": "user", "content": user_prompt}],
model="gpt-4.1-mini",
temperature=0.3,
)
progress.update(enhance_task, completed=90)
enhanced_transcript = result.get("content", raw_transcript)
progress.update(enhance_task, completed=100)
return enhanced_transcript
except Exception as e:
console.print(f"[red]Error enhancing transcript: {e}[/red]")
progress.update(enhance_task, completed=100)
return raw_transcript
async def transcribe_with_faster_whisper(file_path: str, console: Console) -> Dict[str, Any]:
"""Transcribe audio using faster-whisper library with real-time progress updates."""
logger.info(f"Processing file: {file_path}")
# Check if audio file exists
if not os.path.exists(file_path):
logger.error(f"Audio file not found at {file_path}")
return {"success": False, "error": f"Audio file not found at {file_path}"}
try:
# Import faster-whisper - install if not present
try:
from faster_whisper import WhisperModel
except ImportError:
console.print("[yellow]faster-whisper not installed. Installing now...[/yellow]")
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "faster-whisper"])
from faster_whisper import WhisperModel
# Start timing
start_time = time.time()
# Get audio duration for progress calculation
audio_duration = 0
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
console=console
) as progress:
analysis_task = progress.add_task("Analyzing audio file...", total=None)
try:
import av
with av.open(file_path) as container:
# Get duration in seconds
if container.duration is not None:
audio_duration = container.duration / 1000000 # microseconds to seconds
console.print(f"Audio duration: [cyan]{format_timestamp(audio_duration)}[/cyan] seconds")
progress.update(analysis_task, completed=True)
except Exception as e:
console.print(f"[yellow]Could not determine audio duration: {e}[/yellow]")
# Detect device (CPU or GPU)
device, compute_type, gpu_name = detect_device()
# Load the model with progress
model_size = "large-v3"
console.print(f"Loading Whisper model: [bold]{model_size}[/bold]")
if device == "cuda" and gpu_name:
console.print(f"Using device: [bold green]GPU ({gpu_name})[/bold green], compute_type: [bold cyan]{compute_type}[/bold cyan]")
else:
console.print(f"Using device: [bold yellow]CPU[/bold yellow], compute_type: [bold cyan]{compute_type}[/bold cyan]")
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TextColumn("[bold cyan]{task.percentage:>3.0f}%"),
console=console
) as progress:
load_task = progress.add_task("Loading model...", total=100)
model = WhisperModel(model_size, device=device, compute_type=compute_type, download_root="./models")
progress.update(load_task, completed=100)
# Setup progress display for transcription
console.print("\n[bold green]Starting transcription...[/bold green]")
# Create table for displaying transcribed segments in real time
table = Table(title="Transcription Progress", expand=True, box=box.ROUNDED)
table.add_column("Segment")
table.add_column("Time", style="yellow")
table.add_column("Text", style="white")
# Progress bar for overall transcription
progress = Progress(
SpinnerColumn(),
TextColumn("[bold blue]Transcribing..."),
BarColumn(),
TextColumn("[cyan]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
TimeRemainingColumn(),
)
# Add main progress task
transcribe_task = progress.add_task("Transcription", total=100)
# Combine table and progress bar
transcription_display = Table.grid()
transcription_display.add_row(table)
transcription_display.add_row(progress)
segments_list = []
segment_idx = 0
# Run the transcription with live updating display
with Live(transcription_display, console=console, refresh_per_second=10) as live:
# Run transcription
segments, info = model.transcribe(
file_path,
beam_size=5,
vad_filter=True,
word_timestamps=True,
language="en", # Specify language to avoid language detection phase
)
# Process segments as they become available
for segment in segments:
segments_list.append(segment)
# Update progress bar based on timestamp
if audio_duration > 0:
current_progress = min(int((segment.end / audio_duration) * 100), 99)
progress.update(transcribe_task, completed=current_progress)
# Add segment to table
timestamp = f"[{format_timestamp(segment.start)} → {format_timestamp(segment.end)}]"
table.add_row(
f"[cyan]#{segment_idx+1}[/cyan]",
timestamp,
segment.text
)
# Update the live display
live.update(transcription_display)
segment_idx += 1
# Finish progress
progress.update(transcribe_task, completed=100)
# Build full transcript
raw_transcript = " ".join([segment.text for segment in segments_list])
# Convert segments to dictionary format
segments_dict = []
for segment in segments_list:
segments_dict.append({
"start": segment.start,
"end": segment.end,
"text": segment.text,
"words": [{"word": word.word, "start": word.start, "end": word.end, "probability": word.probability}
for word in (segment.words or [])]
})
# Enhance the transcript with LLM
console.print("\n[bold green]Raw transcription complete. Now enhancing the transcript...[/bold green]")
enhanced_transcript = await enhance_transcript_with_llm(raw_transcript, console)
# Calculate processing time
processing_time = time.time() - start_time
# Create the result dictionary
result = {
"success": True,
"raw_transcript": raw_transcript,
"enhanced_transcript": enhanced_transcript,
"segments": segments_dict,
"metadata": {
"language": info.language,
"language_probability": info.language_probability,
"model": model_size,
"duration": audio_duration,
"device": device
},
"processing_time": {
"total": processing_time,
"transcription": processing_time
}
}
# Save the transcripts
markdown_path, txt_path = save_markdown_transcript(result, file_path)
console.print(f"\n[bold green]Saved enhanced transcript to:[/bold green] [cyan]{markdown_path}[/cyan]")
console.print(f"[bold green]Saved raw transcript to:[/bold green] [cyan]{txt_path}[/cyan]")
return result
except Exception as e:
import traceback
logger.error(f"Transcription error: {e}")
logger.error(traceback.format_exc())
return {"success": False, "error": f"Transcription error: {e}"}
async def main():
"""Runs the audio transcription demonstrations."""
logger.info("Starting Audio Transcription Demo", emoji_key="audio")
console = Console()
console.print(Rule("[bold green]Audio Transcription Demo (faster-whisper)[/bold green]"))
# --- Find Audio Files ---
audio_files = find_audio_files(DATA_DIR)
if not audio_files:
console.print(f"[bold red]Error:[/bold red] No audio files found in {DATA_DIR}. Please place audio files (e.g., .mp3, .wav) there.")
return
console.print(f"Found {len(audio_files)} audio file(s) in {DATA_DIR}:")
for f in audio_files:
console.print(f"- [cyan]{f.name}[/cyan]")
console.print()
# --- Process Each File ---
for file_path in audio_files:
try:
console.print(Panel(
f"Processing file: [cyan]{escape(str(file_path))}[/cyan]",
title="Audio Transcription",
border_style="blue"
))
# Call our faster-whisper transcription function
result = await transcribe_with_faster_whisper(str(file_path), console)
if result.get("success", False):
console.print(f"[green]Transcription successful for {escape(str(file_path))}.[/green]")
# Show comparison of raw vs enhanced transcript
if "raw_transcript" in result and "enhanced_transcript" in result:
comparison = Table(title="Transcript Comparison", expand=True, box=box.ROUNDED)
comparison.add_column("Raw Transcript", style="yellow")
comparison.add_column("Enhanced Transcript", style="green")
# Limit to a preview of the first part
raw_preview = result["raw_transcript"][:500] + ("..." if len(result["raw_transcript"]) > 500 else "")
enhanced_preview = result["enhanced_transcript"][:500] + ("..." if len(result["enhanced_transcript"]) > 500 else "")
comparison.add_row(raw_preview, enhanced_preview)
console.print(comparison)
# Display metadata if available
if "metadata" in result and result["metadata"]:
console.print("[bold]Metadata:[/bold]")
for key, value in result["metadata"].items():
console.print(f" - [cyan]{key}[/cyan]: {value}")
# Display processing time
if "processing_time" in result:
console.print("[bold]Processing Times:[/bold]")
for key, value in result["processing_time"].items():
if isinstance(value, (int, float)):
console.print(f" - [cyan]{key}[/cyan]: {value:.2f}s")
else:
console.print(f" - [cyan]{key}[/cyan]: {value}")
else:
console.print("[yellow]Warning:[/yellow] No transcript was returned.")
else:
console.print(f"[bold red]Transcription failed:[/bold red] {escape(result.get('error', 'Unknown error'))}")
console.print() # Add a blank line between files
except Exception as outer_e:
import traceback
console.print(f"[bold red]Unexpected error processing file {escape(str(file_path))}:[/bold red] {escape(str(outer_e))}")
console.print("[bold red]Traceback:[/bold red]")
console.print(escape(traceback.format_exc()))
continue # Move to the next file
logger.info("Audio Transcription Demo Finished", emoji_key="audio")
if __name__ == "__main__":
# Basic error handling for the async execution itself
try:
asyncio.run(main())
except Exception as e:
print(f"An error occurred running the demo: {e}")
import traceback
traceback.print_exc()
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/panels.py:
--------------------------------------------------------------------------------
```python
"""
Panel definitions for Ultimate MCP Server logging system.
This module provides specialized panels for different types of output like
headers, results, errors, warnings, etc.
"""
from typing import Any, Dict, List, Optional, Union
from rich.box import SIMPLE
from rich.columns import Columns
from rich.console import ConsoleRenderable
from rich.panel import Panel
from rich.syntax import Syntax
from rich.table import Table
from rich.text import Text
from ultimate_mcp_server.utils.logging.console import console
from ultimate_mcp_server.utils.logging.emojis import ERROR, INFO, SUCCESS, WARNING
class HeaderPanel:
"""Panel for section headers."""
def __init__(
self,
title: str,
subtitle: Optional[str] = None,
component: Optional[str] = None,
style: str = "bright_blue",
):
"""Initialize a header panel.
Args:
title: Panel title
subtitle: Optional subtitle
component: Optional component name
style: Panel style
"""
self.title = title
self.subtitle = subtitle
self.component = component
self.style = style
def __rich__(self) -> ConsoleRenderable:
"""Render the panel."""
# Create the title text
title_text = Text()
title_text.append("- ", style="bright_black")
title_text.append(self.title, style="bold")
title_text.append(" -", style="bright_black")
# Create the content
content = Text()
if self.component:
content.append(f"[{self.component}] ", style="component")
if self.subtitle:
content.append(self.subtitle)
return Panel(
content,
title=title_text,
title_align="center",
border_style=self.style,
expand=True,
padding=(1, 2),
)
class ResultPanel:
"""Panel for displaying operation results."""
def __init__(
self,
title: str,
results: Union[List[Dict[str, Any]], Dict[str, Any]],
status: str = "success",
component: Optional[str] = None,
show_count: bool = True,
compact: bool = False,
):
"""Initialize a result panel.
Args:
title: Panel title
results: Results to display (list of dicts or single dict)
status: Result status (success, warning, error)
component: Optional component name
show_count: Whether to show result count in title
compact: Whether to use a compact display style
"""
self.title = title
self.results = results if isinstance(results, list) else [results]
self.status = status.lower()
self.component = component
self.show_count = show_count
self.compact = compact
def __rich__(self) -> ConsoleRenderable:
"""Render the panel."""
# Determine style and emoji based on status
if self.status == "success":
style = "result.success"
emoji = SUCCESS
elif self.status == "warning":
style = "result.warning"
emoji = WARNING
elif self.status == "error":
style = "result.error"
emoji = ERROR
else:
style = "result.info"
emoji = INFO
# Create title
title_text = Text()
title_text.append(f"{emoji} ", style=style)
title_text.append(self.title, style=f"bold {style}")
if self.show_count and len(self.results) > 0:
title_text.append(f" ({len(self.results)} items)", style="bright_black")
# Create content
if self.compact:
# Compact mode - just show key/value list
rows = []
for item in self.results:
for k, v in item.items():
rows.append({
"key": k,
"value": self._format_value(v),
})
table = Table(box=None, expand=True, show_header=False)
table.add_column("Key", style="data.key")
table.add_column("Value", style="", overflow="fold")
for row in rows:
table.add_row(row["key"], row["value"])
content = table
else:
# Full mode - create a table per result item
tables = []
for i, item in enumerate(self.results):
if not item: # Skip empty items
continue
table = Table(
box=SIMPLE,
title=f"Item {i+1}" if len(self.results) > 1 else None,
title_style="bright_black",
expand=True,
show_header=False,
)
table.add_column("Key", style="data.key")
table.add_column("Value", style="", overflow="fold")
for k, v in item.items():
table.add_row(k, self._format_value(v))
tables.append(table)
content = Columns(tables) if len(tables) > 1 else tables[0] if tables else Text("No results")
# Return the panel
return Panel(
content,
title=title_text,
border_style=style,
expand=True,
padding=(1, 1),
)
def _format_value(self, value: Any) -> str:
"""Format a value for display.
Args:
value: Value to format
Returns:
Formatted string
"""
if value is None:
return "[dim]None[/dim]"
elif isinstance(value, bool):
return str(value)
elif isinstance(value, (int, float)):
return str(value)
elif isinstance(value, list):
return ", ".join(self._format_value(v) for v in value[:5]) + \
(f" ... (+{len(value) - 5} more)" if len(value) > 5 else "")
elif isinstance(value, dict):
if len(value) == 0:
return "{}"
else:
return "{...}" # Just indicate there's content
else:
return str(value)
class InfoPanel:
"""Panel for displaying information."""
def __init__(
self,
title: str,
content: Union[str, List[str], Dict[str, Any]],
icon: Optional[str] = None,
style: str = "info",
):
"""Initialize an information panel.
Args:
title: Panel title
content: Content to display (string, list, or dict)
icon: Emoji or icon character
style: Style name to apply (from theme)
"""
self.title = title
self.content = content
self.icon = icon or INFO
self.style = style
def __rich__(self) -> ConsoleRenderable:
"""Render the panel."""
# Create title
title_text = Text()
title_text.append(f"{self.icon} ", style=self.style)
title_text.append(self.title, style=f"bold {self.style}")
# Format content based on type
if isinstance(self.content, str):
content = Text(self.content)
elif isinstance(self.content, list):
content = Text()
for i, item in enumerate(self.content):
if i > 0:
content.append("\n")
content.append(f"• {item}")
elif isinstance(self.content, dict):
# Create a table for dict content
table = Table(box=None, expand=True, show_header=False)
table.add_column("Key", style="data.key")
table.add_column("Value", style="", overflow="fold")
for k, v in self.content.items():
table.add_row(k, str(v))
content = table
else:
content = Text(str(self.content))
# Return the panel
return Panel(
content,
title=title_text,
border_style=self.style,
expand=True,
padding=(1, 2),
)
class WarningPanel:
"""Panel for displaying warnings."""
def __init__(
self,
title: Optional[str] = None,
message: str = "",
details: Optional[List[str]] = None,
):
"""Initialize a warning panel.
Args:
title: Optional panel title
message: Main warning message
details: Optional list of detail points
"""
self.title = title or "Warning"
self.message = message
self.details = details or []
def __rich__(self) -> ConsoleRenderable:
"""Render the panel."""
# Create title
title_text = Text()
title_text.append(f"{WARNING} ", style="warning")
title_text.append(self.title, style="bold warning")
# Create content
content = Text()
# Add message
if self.message:
content.append(self.message)
# Add details if any
if self.details and len(self.details) > 0:
if self.message:
content.append("\n\n")
content.append("Details:", style="bold")
content.append("\n")
for i, detail in enumerate(self.details):
if i > 0:
content.append("\n")
content.append(f"• {detail}")
# Return the panel
return Panel(
content,
title=title_text,
border_style="warning",
expand=True,
padding=(1, 2),
)
class ErrorPanel:
"""Panel for displaying errors."""
def __init__(
self,
title: Optional[str] = None,
message: str = "",
details: Optional[str] = None,
resolution_steps: Optional[List[str]] = None,
error_code: Optional[str] = None,
):
"""Initialize an error panel.
Args:
title: Optional panel title
message: Main error message
details: Optional error details
resolution_steps: Optional list of steps to resolve the error
error_code: Optional error code for reference
"""
self.title = title or "Error"
self.message = message
self.details = details
self.resolution_steps = resolution_steps or []
self.error_code = error_code
def __rich__(self) -> ConsoleRenderable:
"""Render the panel."""
# Create title
title_text = Text()
title_text.append(f"{ERROR} ", style="error")
title_text.append(self.title, style="bold error")
if self.error_code:
title_text.append(f" [{self.error_code}]", style="bright_black")
# Create content
content = Text()
# Add message
if self.message:
content.append(self.message, style="bold")
# Add details if any
if self.details:
if self.message:
content.append("\n\n")
content.append(self.details)
# Add resolution steps if any
if self.resolution_steps and len(self.resolution_steps) > 0:
if self.message or self.details:
content.append("\n\n")
content.append("Resolution steps:", style="bold")
content.append("\n")
for i, step in enumerate(self.resolution_steps):
if i > 0:
content.append("\n")
content.append(f"{i+1}. {step}")
# Return the panel
return Panel(
content,
title=title_text,
border_style="error",
expand=True,
padding=(1, 2),
)
class ToolOutputPanel:
"""Panel for displaying tool command output."""
def __init__(
self,
tool: str,
command: str,
output: str,
status: str = "success",
duration: Optional[float] = None,
):
"""Initialize a tool output panel.
Args:
tool: Tool name (ripgrep, awk, jq, etc.)
command: Command that was executed
output: Command output text
status: Execution status (success, error)
duration: Optional execution duration in seconds
"""
self.tool = tool
self.command = command
self.output = output
self.status = status.lower()
self.duration = duration
def __rich__(self) -> ConsoleRenderable:
"""Render the panel."""
# Determine style and emoji based on status
if self.status == "success":
style = "tool.success"
emoji = SUCCESS
else:
style = "tool.error"
emoji = ERROR
# Create title
title_text = Text()
title_text.append(f"{emoji} ", style=style)
title_text.append(f"{self.tool}", style=f"bold {style}")
if self.duration is not None:
title_text.append(f" ({self.duration:.2f}s)", style="tool.duration")
# Create content
content = Columns(
[
Panel(
Text(self.command, style="tool.command"),
title="Command",
title_style="bright_black",
border_style="tool.command",
padding=(1, 1),
),
Panel(
Text(self.output, style="tool.output"),
title="Output",
title_style="bright_black",
border_style="bright_black",
padding=(1, 1),
),
],
expand=True,
padding=(0, 1),
)
# Return the panel
return Panel(
content,
title=title_text,
border_style=style,
expand=True,
padding=(1, 1),
)
class CodePanel:
"""Panel for displaying code with syntax highlighting."""
def __init__(
self,
code: str,
language: str = "python",
title: Optional[str] = None,
line_numbers: bool = True,
highlight_lines: Optional[List[int]] = None,
):
"""Initialize a code panel.
Args:
code: The code to display
language: Programming language for syntax highlighting
title: Optional panel title
line_numbers: Whether to show line numbers
highlight_lines: List of line numbers to highlight
"""
self.code = code
self.language = language
self.title = title
self.line_numbers = line_numbers
self.highlight_lines = highlight_lines
def __rich__(self) -> ConsoleRenderable:
"""Render the panel."""
# Create syntax highlighting component
syntax = Syntax(
self.code,
self.language,
theme="monokai",
line_numbers=self.line_numbers,
highlight_lines=self.highlight_lines,
)
# Create title
if self.title:
title_text = Text(self.title)
else:
title_text = Text()
title_text.append(self.language.capitalize(), style="bright_blue bold")
title_text.append(" Code", style="bright_black")
# Return the panel
return Panel(
syntax,
title=title_text,
border_style="bright_blue",
expand=True,
padding=(0, 0),
)
# Helper functions for creating panels
def display_header(
title: str,
subtitle: Optional[str] = None,
component: Optional[str] = None,
) -> None:
"""Display a section header.
Args:
title: Section title
subtitle: Optional subtitle
component: Optional component name
"""
panel = HeaderPanel(title, subtitle, component)
console.print(panel)
def display_results(
title: str,
results: Union[List[Dict[str, Any]], Dict[str, Any]],
status: str = "success",
component: Optional[str] = None,
show_count: bool = True,
compact: bool = False,
) -> None:
"""Display operation results.
Args:
title: Results title
results: Results to display (list of dicts or single dict)
status: Result status (success, warning, error)
component: Optional component name
show_count: Whether to show result count in title
compact: Whether to use a compact display style
"""
panel = ResultPanel(title, results, status, component, show_count, compact)
console.print(panel)
def display_info(
title: str,
content: Union[str, List[str], Dict[str, Any]],
icon: Optional[str] = None,
style: str = "info",
) -> None:
"""Display an information panel.
Args:
title: Panel title
content: Content to display (string, list, or dict)
icon: Emoji or icon character
style: Style name to apply (from theme)
"""
panel = InfoPanel(title, content, icon, style)
console.print(panel)
def display_warning(
title: Optional[str] = None,
message: str = "",
details: Optional[List[str]] = None,
) -> None:
"""Display a warning panel.
Args:
title: Optional panel title
message: Main warning message
details: Optional list of detail points
"""
panel = WarningPanel(title, message, details)
console.print(panel)
def display_error(
title: Optional[str] = None,
message: str = "",
details: Optional[str] = None,
resolution_steps: Optional[List[str]] = None,
error_code: Optional[str] = None,
) -> None:
"""Display an error panel.
Args:
title: Optional panel title
message: Main error message
details: Optional error details
resolution_steps: Optional list of steps to resolve the error
error_code: Optional error code for reference
"""
panel = ErrorPanel(title, message, details, resolution_steps, error_code)
console.print(panel)
def display_tool_output(
tool: str,
command: str,
output: str,
status: str = "success",
duration: Optional[float] = None,
) -> None:
"""Display tool command output.
Args:
tool: Tool name (ripgrep, awk, jq, etc.)
command: Command that was executed
output: Command output text
status: Execution status (success, error)
duration: Optional execution duration in seconds
"""
panel = ToolOutputPanel(tool, command, output, status, duration)
console.print(panel)
def display_code(
code: str,
language: str = "python",
title: Optional[str] = None,
line_numbers: bool = True,
highlight_lines: Optional[List[int]] = None,
) -> None:
"""Display code with syntax highlighting.
Args:
code: The code to display
language: Programming language for syntax highlighting
title: Optional panel title
line_numbers: Whether to show line numbers
highlight_lines: List of line numbers to highlight
"""
panel = CodePanel(code, language, title, line_numbers, highlight_lines)
console.print(panel)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/evaluation/evaluators.py:
--------------------------------------------------------------------------------
```python
# --- core/evaluation/evaluators.py ---
import re
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional
from ultimate_mcp_server.core.evaluation.base import (
EvaluationScore,
Evaluator,
register_evaluator,
)
from ultimate_mcp_server.core.models.tournament import ModelResponseData
from ultimate_mcp_server.tools.completion import generate_completion
# --- Import the sandbox execution tool ---
from ultimate_mcp_server.tools.python_sandbox import (
ProviderError,
ToolError,
ToolInputError,
execute_python,
)
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.evaluation.evaluators")
@register_evaluator
class LLMGraderEvaluator(Evaluator):
evaluator_type = "llm_grader"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.grader_model_id = config.get("model_id", "anthropic/claude-3-5-haiku-20241022")
self.rubric = config.get(
"rubric",
"Score the response on a scale of 0-100 for quality, relevance, and clarity. Explain your reasoning.",
)
self.score_extraction_regex_str = config.get(
"score_extraction_regex", r"Score:\s*(\d{1,3})"
)
try:
self.score_extraction_regex = re.compile(self.score_extraction_regex_str)
except re.error as e:
logger.error(
f"Invalid regex for score_extraction_regex in LLMGrader: {self.score_extraction_regex_str}. Error: {e}"
)
self.score_extraction_regex = re.compile(r"Score:\s*(\d{1,3})")
async def score(
self,
response_data: ModelResponseData,
original_prompt: str,
tournament_type: Literal["code", "text"],
) -> EvaluationScore:
# ... (LLMGraderEvaluator code remains the same) ...
content_to_grade = (
response_data.extracted_code
if tournament_type == "code" and response_data.extracted_code
else response_data.response_text
)
if not content_to_grade:
return EvaluationScore(score=0.0, details="No content to grade.")
prompt = f"""Original Prompt:
{original_prompt}
Model Response to Evaluate:
---
{content_to_grade}
---
Rubric:
{self.rubric}
Please provide a score (0-100) and a brief justification. Format the score clearly, e.g., "Score: 90".
"""
try:
provider = self.grader_model_id.split("/")[0] if "/" in self.grader_model_id else None
grader_response_dict = await generate_completion(
prompt=prompt,
model=self.grader_model_id,
provider=provider,
max_tokens=500,
temperature=0.2,
) # Changed var name
if not grader_response_dict.get("success"): # Use new var name
return EvaluationScore(
score=0.0, details=f"Grader LLM failed: {grader_response_dict.get('error')}"
)
grader_text = grader_response_dict.get("text", "") # Use new var name
score_match = self.score_extraction_regex.search(grader_text)
numerical_score = 0.0
if score_match:
try:
numerical_score = float(score_match.group(1))
if not (0 <= numerical_score <= 100):
numerical_score = max(0.0, min(100.0, numerical_score))
except ValueError:
logger.warning(
f"LLMGrader: Could not parse score from '{score_match.group(1)}'"
)
except IndexError:
logger.warning(
f"LLMGrader: Regex '{self.score_extraction_regex_str}' matched but had no capture group 1."
)
else:
logger.warning(
f"LLMGrader: Could not find score pattern in grader response: {grader_text[:200]}"
)
return EvaluationScore(
score=numerical_score,
details=grader_text,
metrics={"grader_cost": grader_response_dict.get("cost", 0)}, # Use new var name
)
except Exception as e:
logger.error(f"LLMGrader failed: {e}", exc_info=True)
return EvaluationScore(score=0.0, details=f"Error during LLM grading: {str(e)}")
@register_evaluator
class UnitTestEvaluator(Evaluator):
evaluator_type = "unit_test"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
test_file_path_str = config.get("test_file_path")
self.required_packages: List[str] = config.get("required_packages", []) # For sandbox
if not test_file_path_str:
logger.warning(
"UnitTestEvaluator: 'test_file_path' not provided in config. This evaluator may not function."
)
self.test_file_path = Path()
else:
self.test_file_path = Path(test_file_path_str)
self.timeout_seconds = config.get("timeout_seconds", 30) # Sandbox timeout is in ms
async def score(
self,
response_data: ModelResponseData,
original_prompt: str, # Unused but part of interface
tournament_type: Literal["code", "text"],
) -> EvaluationScore:
if tournament_type != "code" or not response_data.extracted_code:
return EvaluationScore(
score=0.0,
details="Unit test evaluator only applicable to code tournaments with extracted code.",
)
if (
not self.test_file_path
or not self.test_file_path.exists()
or not self.test_file_path.is_file()
):
details = f"Test file not found, not configured, or not a file: {self.test_file_path}"
if not self.test_file_path.name:
details = "Test file path not configured for UnitTestEvaluator."
logger.warning(f"UnitTestEvaluator: {details}")
return EvaluationScore(score=0.0, details=details)
try:
# Read the user's test code from the host filesystem
user_test_code = self.test_file_path.read_text(encoding="utf-8")
except Exception as e:
logger.error(f"UnitTestEvaluator: Failed to read test file {self.test_file_path}: {e}")
return EvaluationScore(score=0.0, details=f"Failed to read test file: {e}")
# Combine the generated code and the user's test code into a single script
# to be run in the sandbox.
# The generated code will be defined first, then the test code.
# We assume the test code can import/use things defined in the generated code.
# A common pattern is for generated code to be in a module `solution` or similar.
# Here, we'll just put them in the same global scope for simplicity.
# Let's make the generated code importable as 'generated_solution'
# and the test code able to 'from generated_solution import *' or specific functions/classes.
# This requires the generated code to be structured as a module.
# For now, a simpler approach: just concatenate.
# More robust: write generated_code to solution.py, test_code to test_solution.py,
# then run test_solution.py which imports solution.py. This is harder without a true sandbox FS.
# --- Simpler approach: Inject generated code directly, then test code ---
# Test code should be written to assume the generated code's functions/classes
# are available in the global scope or importable from a predefined module name.
# For Pyodide, defining them globally is easiest.
# The `unittest_runner_script` will execute the combined code.
# It will define the generated code, then the test code, then run unittest.
generated_code_to_run = response_data.extracted_code
# This script will be executed by python_sandbox.py
# It needs to define the generated functions/classes, then define and run tests.
# stdout from this script will be parsed for results.
unittest_runner_script = f"""
# --- Generated Code from Model ---
{generated_code_to_run}
# --- End of Generated Code ---
# --- User's Test Code ---
{user_test_code}
# --- End of User's Test Code ---
# --- Unittest Execution ---
import unittest
import sys
import io # To capture unittest output
# Capture unittest's output to a string buffer instead of stderr
# This makes parsing easier and cleaner from the sandbox output.
suite = unittest.defaultTestLoader.loadTestsFromModule(sys.modules[__name__])
output_buffer = io.StringIO()
runner = unittest.TextTestRunner(stream=output_buffer, verbosity=2)
result = runner.run(suite)
# Print results in a parsable format to STDOUT
# The python_sandbox tool will capture this stdout.
print("UNIT_TEST_RESULTS_START") # Delimiter for easier parsing
print(f"TestsRun:{{result.testsRun}}")
print(f"Failures:{{len(result.failures)}}")
print(f"Errors:{{len(result.errors)}}")
pass_rate = (result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun if result.testsRun > 0 else 0.0
print(f"PassRate:{{pass_rate:.4f}}")
print("UNIT_TEST_RESULTS_END")
# Also print the full unittest output (which was captured in output_buffer)
# This can go to stdout as well, or we can separate it.
print("\\n--- Unittest Full Output ---")
print(output_buffer.getvalue())
"""
details_output = "Unit test execution details via Pyodide Sandbox:\n"
pass_rate = 0.0
tests_run = 0
failures = 0
errors = 0
sandbox_stdout = ""
sandbox_stderr = ""
try:
sandbox_result = await execute_python(
code=unittest_runner_script,
packages=self.required_packages, # Pass packages needed by generated code or tests
# wheels=... # If wheels are needed
allow_network=False, # Usually False for unit tests unless they test network code
allow_fs=False, # Usually False unless tests interact with mcpfs
timeout_ms=self.timeout_seconds * 1000,
)
if sandbox_result.get("success"):
sandbox_stdout = sandbox_result.get("stdout", "")
sandbox_stderr = sandbox_result.get("stderr", "") # Unittest output now in stdout
details_output += f"Sandbox STDOUT:\n{sandbox_stdout}\n"
if sandbox_stderr: # Still log stderr if sandbox itself had issues
details_output += f"Sandbox STDERR:\n{sandbox_stderr}\n"
# Parse metrics from sandbox_stdout
# Use re.search with MULTILINE if parsing from a larger block
run_match = re.search(r"TestsRun:(\d+)", sandbox_stdout)
fail_match = re.search(r"Failures:(\d+)", sandbox_stdout)
err_match = re.search(r"Errors:(\d+)", sandbox_stdout)
rate_match = re.search(r"PassRate:([0-9.]+)", sandbox_stdout)
if run_match:
tests_run = int(run_match.group(1))
if fail_match:
failures = int(fail_match.group(1))
if err_match:
errors = int(err_match.group(1))
if rate_match:
pass_rate = float(rate_match.group(1))
else:
logger.warning(
f"UnitTestEvaluator: Could not parse PassRate from sandbox stdout. Output: {sandbox_stdout[:500]}"
)
details_output += "Warning: Could not parse PassRate from output.\n"
else: # Sandbox execution itself failed
error_msg = sandbox_result.get("error_message", "Sandbox execution failed")
error_details = sandbox_result.get("error_details", {})
details_output += (
f"Sandbox Execution Failed: {error_msg}\nDetails: {error_details}\n"
)
logger.error(
f"UnitTestEvaluator: Sandbox execution failed: {error_msg} - {error_details}"
)
pass_rate = 0.0
except (
ProviderError,
ToolError,
ToolInputError,
) as e: # Catch errors from execute_python tool
logger.error(f"UnitTestEvaluator: Error calling python_sandbox: {e}", exc_info=True)
details_output += f"Error calling python_sandbox: {str(e)}\n"
pass_rate = 0.0
except Exception as e: # Catch any other unexpected errors
logger.error(f"UnitTestEvaluator: Unexpected error: {e}", exc_info=True)
details_output += f"Unexpected error during unit test evaluation: {str(e)}\n"
pass_rate = 0.0
return EvaluationScore(
score=pass_rate * 100, # Score 0-100
details=details_output,
metrics={
"tests_run": tests_run,
"failures": failures,
"errors": errors,
"pass_rate": pass_rate,
"sandbox_stdout_len": len(sandbox_stdout),
"sandbox_stderr_len": len(sandbox_stderr),
},
)
@register_evaluator
class RegexMatchEvaluator(Evaluator):
evaluator_type = "regex_match"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.patterns_str: List[str] = config.get("patterns", [])
if not self.patterns_str or not isinstance(self.patterns_str, list):
logger.error("RegexMatchEvaluator: 'patterns' (list of strings) is required in config.")
self.patterns_str = []
self.target_field: Literal["response_text", "extracted_code"] = config.get(
"target_field", "response_text"
)
self.match_mode: Literal["all_must_match", "any_can_match", "proportion_matched"] = (
config.get("match_mode", "all_must_match")
)
flag_options_str: Optional[List[str]] = config.get("regex_flag_options")
self.regex_flags: int = 0
if flag_options_str:
for flag_name in flag_options_str:
if hasattr(re, flag_name.upper()):
self.regex_flags |= getattr(re, flag_name.upper())
else:
logger.warning(
f"RegexMatchEvaluator: Unknown regex flag '{flag_name}' specified."
)
self.compiled_patterns: List[re.Pattern] = []
for i, p_str in enumerate(
self.patterns_str
): # Use enumerate to get index for original string
try:
self.compiled_patterns.append(re.compile(p_str, self.regex_flags))
except re.error as e:
logger.error(
f"RegexMatchEvaluator: Invalid regex pattern '{p_str}' (index {i}): {e}. Skipping this pattern."
)
# Add a placeholder or skip to keep lengths consistent if needed,
# or ensure patterns_str is filtered alongside compiled_patterns.
# For simplicity now, compiled_patterns might be shorter if errors occur.
async def score(
self,
response_data: ModelResponseData,
original_prompt: str,
tournament_type: Literal["code", "text"],
) -> EvaluationScore:
# Iterate using original patterns_str for error reporting if compiled_patterns is shorter
num_configured_patterns = len(self.patterns_str)
if not self.compiled_patterns and self.patterns_str: # Some patterns were invalid
return EvaluationScore(
score=0.0,
details="No valid regex patterns could be compiled from configuration.",
metrics={
"patterns_configured": num_configured_patterns,
"patterns_compiled": 0,
"patterns_matched": 0,
},
)
if not self.compiled_patterns and not self.patterns_str: # No patterns provided at all
return EvaluationScore(
score=0.0,
details="No regex patterns configured for matching.",
metrics={"patterns_configured": 0, "patterns_compiled": 0, "patterns_matched": 0},
)
content_to_check: Optional[str] = None
if self.target_field == "extracted_code":
content_to_check = response_data.extracted_code
elif self.target_field == "response_text":
content_to_check = response_data.response_text
else:
return EvaluationScore(
score=0.0,
details=f"Invalid target_field '{self.target_field}'.",
metrics={"patterns_compiled": len(self.compiled_patterns), "patterns_matched": 0},
)
if content_to_check is None:
return EvaluationScore(
score=0.0,
details=f"Target content field '{self.target_field}' is empty or None.",
metrics={"patterns_compiled": len(self.compiled_patterns), "patterns_matched": 0},
)
num_matched = 0
all_patterns_details: List[str] = []
# Corrected loop over successfully compiled patterns
for pattern_obj in self.compiled_patterns:
if pattern_obj.search(content_to_check):
num_matched += 1
all_patterns_details.append(f"Pattern '{pattern_obj.pattern}': MATCHED")
else:
all_patterns_details.append(f"Pattern '{pattern_obj.pattern}': NOT MATCHED")
final_score = 0.0
num_effective_patterns = len(self.compiled_patterns) # Base score on only valid patterns
if num_effective_patterns == 0 and num_configured_patterns > 0: # All patterns were invalid
details_str = f"Target field: '{self.target_field}'. Mode: '{self.match_mode}'.\nAll {num_configured_patterns} configured regex patterns were invalid and could not be compiled."
return EvaluationScore(
score=0.0,
details=details_str,
metrics={
"patterns_configured": num_configured_patterns,
"patterns_compiled": 0,
"patterns_matched": 0,
},
)
elif num_effective_patterns == 0 and num_configured_patterns == 0: # No patterns configured
details_str = f"Target field: '{self.target_field}'. Mode: '{self.match_mode}'.\nNo regex patterns configured."
return EvaluationScore(
score=0.0,
details=details_str,
metrics={"patterns_configured": 0, "patterns_compiled": 0, "patterns_matched": 0},
)
if self.match_mode == "all_must_match":
final_score = 100.0 if num_matched == num_effective_patterns else 0.0
elif self.match_mode == "any_can_match":
final_score = 100.0 if num_matched > 0 else 0.0
elif self.match_mode == "proportion_matched":
final_score = (num_matched / num_effective_patterns) * 100.0
details_str = f"Target field: '{self.target_field}'. Mode: '{self.match_mode}'.\n"
details_str += f"Matched {num_matched} out of {num_effective_patterns} validly compiled patterns (from {num_configured_patterns} configured).\n"
details_str += "\n".join(all_patterns_details)
return EvaluationScore(
score=final_score,
details=details_str,
metrics={
"patterns_configured": num_configured_patterns,
"patterns_compiled": num_effective_patterns,
"patterns_matched": num_matched,
"match_proportion_compiled": (num_matched / num_effective_patterns)
if num_effective_patterns
else 0.0,
},
)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/clients/completion_client.py:
--------------------------------------------------------------------------------
```python
"""High-level client for LLM completion operations."""
import asyncio
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, get_provider
from ultimate_mcp_server.services.cache import get_cache_service
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.clients.completion")
class CompletionClient:
"""
High-level client for LLM text generation operations with advanced features.
The CompletionClient provides a unified interface for interacting with various LLM providers
(OpenAI, Anthropic, etc.) through a simple, consistent API. It abstracts away the complexity
of provider-specific implementations, offering a range of features that enhance reliability
and performance.
Key features:
- Multi-provider support with unified interface
- Automatic fallback between providers
- Result caching for improved performance and reduced costs
- Streaming support for real-time text generation
- Provider initialization and error handling
- Comprehensive error handling and logging
Architecture:
The client follows a layered architecture pattern:
1. High-level methods (generate_completion, generate_completion_stream) provide the main API
2. Provider abstraction layer manages provider-specific implementation details
3. Caching layer intercepts requests to reduce redundant API calls
4. Error handling layer provides graceful fallbacks and informative errors
Performance Considerations:
- Caching is enabled by default and can significantly reduce API costs and latency
- For time-sensitive or unique responses, caching can be disabled per request
- Streaming mode reduces time-to-first-token but cannot leverage caching
- Provider fallback adds resilience but may increase latency if primary providers fail
This client is designed for MCP tools that require text generation using LLMs,
making interactions more robust by handling common issues like rate limits,
timeouts, and provider-specific errors.
Example:
```python
# Create client with default settings
client = CompletionClient()
# Generate text non-streaming with specific provider and model
result = await client.generate_completion(
prompt="Explain quantum computing",
provider="anthropic",
model="claude-3-5-haiku-20241022",
temperature=0.5,
max_tokens=1000
)
print(f"Generated by {result.model} in {result.processing_time:.2f}s")
print(result.text)
# Generate text with streaming for real-time output
async for chunk, metadata in client.generate_completion_stream(
prompt="Write a short story about robots",
temperature=0.8
):
print(chunk, end="")
if metadata.get("done", False):
print("\nGeneration complete!")
# Use provider fallback for high availability
try:
result = await client.try_providers(
prompt="Summarize this article",
providers=["openai", "anthropic", "gemini"],
models=["gpt-4", "claude-instant-1", "gemini-pro"],
temperature=0.3
)
except Exception as e:
print(f"All providers failed: {e}")
```
"""
def __init__(self, default_provider: str = Provider.OPENAI.value, use_cache_by_default: bool = True):
"""Initialize the completion client.
Args:
default_provider: Default provider to use for completions
use_cache_by_default: Whether to use cache by default
"""
self.default_provider = default_provider
self.cache_service = get_cache_service()
self.use_cache_by_default = use_cache_by_default
async def initialize_provider(self, provider_name: str, api_key: Optional[str] = None) -> BaseProvider:
"""
Initialize and return a provider instance ready for LLM interactions.
This method handles the creation and initialization of a specific LLM provider,
ensuring it's properly configured and ready to generate completions. It abstracts
the details of provider initialization, including async initialization methods
that some providers might require.
The method performs several steps:
1. Retrieves the provider implementation based on the provider name
2. Applies the API key if provided (otherwise uses environment configuration)
3. Runs any provider-specific async initialization if required
4. Returns the ready-to-use provider instance
Provider initialization follows these architecture principles:
- Late binding: Providers are initialized on-demand, not at client creation
- Dependency injection: API keys can be injected at runtime rather than relying only on environment
- Fail-fast: Validation occurs during initialization rather than at generation time
- Extensibility: New providers can be added without changing client code
Common provider names include:
- "openai": OpenAI API (GPT models)
- "anthropic": Anthropic API (Claude models)
- "google": Google AI/Vertex API (Gemini models)
- "mistral": Mistral AI API (Mistral, Mixtral models)
- "ollama": Local Ollama server for various open-source models
Error handling:
- Invalid provider names are caught and reported immediately
- Authentication issues (e.g., invalid API keys) are detected during initialization
- Provider-specific initialization failures are propagated with detailed error messages
Args:
provider_name: Identifier for the desired provider (e.g., "openai", "anthropic")
api_key: Optional API key to use instead of environment-configured keys
Returns:
A fully initialized BaseProvider instance ready to generate completions
Raises:
ValueError: If the provider name is invalid or not supported
Exception: If initialization fails (e.g., invalid API key, network issues)
Note:
This method is typically called internally by other client methods,
but can be used directly when you need a specific provider instance
for specialized operations not covered by the main client methods.
Example:
```python
# Get a specific provider instance for custom operations
openai_provider = await client.initialize_provider("openai")
# Custom operation using provider-specific features
response = await openai_provider.some_specialized_method(...)
```
"""
try:
provider = await get_provider(provider_name, api_key=api_key)
# Ensure the provider is initialized (some might need async init)
if hasattr(provider, 'initialize') and asyncio.iscoroutinefunction(provider.initialize):
await provider.initialize()
return provider
except Exception as e:
logger.error(f"Failed to initialize provider {provider_name}: {e}", emoji_key="error")
raise
async def generate_completion(
self,
prompt: str,
provider: Optional[str] = None,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
use_cache: bool = True,
cache_ttl: int = 3600,
**kwargs
):
"""
Generate text completion from an LLM with optional caching.
This method provides a unified interface for generating text completions from
any supported LLM provider. It includes intelligent caching to avoid redundant
API calls for identical inputs, reducing costs and latency.
The caching system:
- Creates a unique key based on the prompt, provider, model, and parameters
- Checks for cached results before making API calls
- Stores successful responses with a configurable TTL
- Can be disabled per-request with the use_cache parameter
Args:
prompt: The text prompt to send to the LLM
provider: The LLM provider to use (e.g., "openai", "anthropic", "google")
If None, uses the client's default_provider
model: Specific model to use (e.g., "gpt-4", "claude-instant-1")
If None, uses the provider's default model
temperature: Sampling temperature for controlling randomness (0.0-1.0)
Lower values are more deterministic, higher values more creative
max_tokens: Maximum number of tokens to generate
If None, uses provider-specific defaults
use_cache: Whether to use the caching system (default: True)
cache_ttl: Time-to-live for cache entries in seconds (default: 1 hour)
**kwargs: Additional provider-specific parameters
(e.g., top_p, frequency_penalty, presence_penalty)
Returns:
CompletionResult object with attributes:
- text: The generated completion text
- provider: The provider that generated the text
- model: The model used
- processing_time: Time taken to generate the completion (in seconds)
- tokens: Token usage information (if available)
- error: Error information (if an error occurred but was handled)
Raises:
ValueError: For invalid parameters
Exception: For provider errors or other issues during generation
Example:
```python
result = await client.generate_completion(
prompt="Write a poem about artificial intelligence",
temperature=0.8,
max_tokens=1000
)
print(f"Generated by {result.model} in {result.processing_time:.2f}s")
print(result.text)
```
"""
provider_name = provider or self.default_provider
# Check cache if enabled
if use_cache and self.cache_service.enabled:
# Create a robust cache key
provider_instance = await self.initialize_provider(provider_name)
model_id = model or provider_instance.get_default_model()
# Include relevant parameters in the cache key
params_hash = hash((prompt, temperature, max_tokens, str(kwargs)))
cache_key = f"completion:{provider_name}:{model_id}:{params_hash}"
cached_result = await self.cache_service.get(cache_key)
if cached_result is not None:
logger.success("Cache hit! Using cached result", emoji_key="cache")
# Set a nominal processing time for cached results
cached_result.processing_time = 0.001
return cached_result
# Cache miss or cache disabled
if use_cache and self.cache_service.enabled:
logger.info("Cache miss. Generating new completion...", emoji_key="processing")
else:
logger.info("Generating completion...", emoji_key="processing")
# Initialize provider and generate completion
try:
provider_instance = await self.initialize_provider(provider_name)
model_id = model or provider_instance.get_default_model()
result = await provider_instance.generate_completion(
prompt=prompt,
model=model_id,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
# Save to cache if enabled
if use_cache and self.cache_service.enabled:
await self.cache_service.set(
key=cache_key,
value=result,
ttl=cache_ttl
)
logger.info(f"Result saved to cache (key: ...{cache_key[-10:]})", emoji_key="cache")
return result
except Exception as e:
logger.error(f"Error generating completion: {str(e)}", emoji_key="error")
raise
async def generate_completion_stream(
self,
prompt: str,
provider: Optional[str] = None,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs
) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
"""
Generate a streaming text completion with real-time chunks.
This method provides a streaming interface to LLM text generation, where
text is returned incrementally as it's generated, rather than waiting for
the entire response. This enables real-time UI updates, faster apparent
response times, and the ability to process partial responses.
Unlike the non-streaming version, this method:
- Does not support caching (each streaming response is unique)
- Returns an async generator that yields content incrementally
- Provides metadata with each chunk for tracking generation progress
Args:
prompt: The text prompt to send to the LLM
provider: The LLM provider to use (e.g., "openai", "anthropic")
If None, uses the client's default_provider
model: Specific model to use (e.g., "gpt-4", "claude-instant-1")
If None, uses the provider's default model
temperature: Sampling temperature for controlling randomness (0.0-1.0)
Lower values are more deterministic, higher values more creative
max_tokens: Maximum number of tokens to generate
If None, uses provider-specific defaults
**kwargs: Additional provider-specific parameters
Yields:
Tuples of (chunk_text, metadata), where:
- chunk_text: A string containing the next piece of generated text
- metadata: A dictionary with information about the generation process:
- done: Boolean indicating if this is the final chunk
- chunk_index: Index of the current chunk (0-based)
- token_count: Number of tokens in this chunk (if available)
- total_tokens: Running total of tokens generated so far (if available)
Raises:
ValueError: For invalid parameters
Exception: For provider errors or other issues during streaming
Example:
```python
# Display text as it's generated
async for chunk, metadata in client.generate_completion_stream(
prompt="Explain the theory of relativity",
temperature=0.3
):
print(chunk, end="")
if metadata.get("done", False):
print("\nGeneration complete!")
```
Note:
Not all providers support streaming completions. Check the provider
documentation for compatibility.
"""
provider_name = provider or self.default_provider
logger.info("Generating streaming completion...", emoji_key="processing")
# Initialize provider and generate streaming completion
try:
provider_instance = await self.initialize_provider(provider_name)
model_id = model or provider_instance.get_default_model()
stream = provider_instance.generate_completion_stream(
prompt=prompt,
model=model_id,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
async for chunk, metadata in stream:
yield chunk, metadata
except Exception as e:
logger.error(f"Error generating streaming completion: {str(e)}", emoji_key="error")
raise
async def try_providers(
self,
prompt: str,
providers: List[str],
models: Optional[List[str]] = None,
**kwargs
):
"""
Try multiple providers in sequence until one succeeds.
This method implements an automatic fallback mechanism that attempts to generate
a completion using a list of providers in order, continuing to the next provider
if the current one fails. This provides resilience against provider downtime,
rate limits, or other temporary failures.
The method tries each provider exactly once in the order they're specified, with
an optional corresponding model for each. This is useful for scenarios where you
need high availability or want to implement prioritized provider selection.
Args:
prompt: The text prompt to send to the LLM
providers: An ordered list of provider names to try (e.g., ["openai", "anthropic", "google"])
Providers are tried in the specified order until one succeeds
models: Optional list of models to use with each provider
If provided, must be the same length as providers
If None, each provider's default model is used
**kwargs: Additional parameters passed to generate_completion
Applies to all provider attempts
Returns:
CompletionResult from the first successful provider,
with the same structure as generate_completion results
Raises:
ValueError: If no providers are specified or if models list length doesn't match providers
Exception: If all specified providers fail, with details of the last error
Example:
```python
# Try OpenAI first, fall back to Anthropic, then Google
result = await client.try_providers(
prompt="Write a sonnet about programming",
providers=["openai", "anthropic", "google"],
models=["gpt-4", "claude-2", "gemini-pro"],
temperature=0.7,
max_tokens=800
)
print(f"Successfully used {result.provider} with model {result.model}")
print(result.text)
```
Note:
Each provider attempt is logged, making it easy to track which providers
succeeded or failed during the fallback sequence.
"""
if not providers:
raise ValueError("No providers specified")
models = models or [None] * len(providers)
if len(models) != len(providers):
raise ValueError("If models are specified, there must be one for each provider")
last_error = None
for i, provider_name in enumerate(providers):
try:
logger.info(f"Trying provider: {provider_name}", emoji_key="provider")
result = await self.generate_completion(
prompt=prompt,
provider=provider_name,
model=models[i],
**kwargs
)
return result
except Exception as e:
logger.warning(f"Provider {provider_name} failed: {str(e)}", emoji_key="warning")
last_error = e
# If we get here, all providers failed
raise Exception(f"All providers failed. Last error: {str(last_error)}")
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/progress.py:
--------------------------------------------------------------------------------
```python
"""
Progress tracking and visualization for Gateway.
This module provides enhanced progress tracking capabilities with Rich,
supporting nested tasks, task groups, and dynamic progress updates.
"""
import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Any, Dict, Generator, Iterable, List, Optional, TypeVar
from rich.box import ROUNDED
from rich.console import Console, ConsoleRenderable, Group
from rich.live import Live
from rich.progress import (
BarColumn,
SpinnerColumn,
TaskID, # Import TaskID type hint
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.progress import Progress as RichProgress # Renamed to avoid clash
from rich.table import Table
from .console import console as default_console # Use the shared console instance
# Use relative imports
# TypeVar for generic progress tracking over iterables
T = TypeVar("T")
@dataclass
class TaskInfo:
"""Information about a single task being tracked."""
description: str
total: float
completed: float = 0.0
status: str = "running" # running, success, error, skipped
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
parent_id: Optional[str] = None
rich_task_id: Optional[TaskID] = None # ID from Rich Progress
meta: Dict[str, Any] = field(default_factory=dict)
@property
def elapsed(self) -> float:
"""Calculate elapsed time."""
end = self.end_time or time.time()
return end - self.start_time
@property
def is_complete(self) -> bool:
"""Check if the task is in a terminal state."""
return self.status in ("success", "error", "skipped")
class GatewayProgress:
"""Manages multiple progress tasks with Rich integration and context.
Allows for nested tasks and displays an overall summary.
Uses a single Rich Progress instance managed internally.
"""
def __init__(
self,
console: Optional[Console] = None,
transient: bool = False, # Keep visible after completion?
auto_refresh: bool = True,
expand: bool = True, # Expand progress bars to full width?
show_summary: bool = True,
summary_refresh_rate: float = 1.0 # How often to refresh summary
):
"""Initialize the progress manager.
Args:
console: Rich Console instance (defaults to shared console)
transient: Hide progress bars upon completion
auto_refresh: Automatically refresh the display
expand: Expand bars to console width
show_summary: Display the summary panel below progress bars
summary_refresh_rate: Rate limit for summary updates (seconds)
"""
self.console = console or default_console
self._rich_progress = self._create_progress(transient, auto_refresh, expand)
self._live: Optional[Live] = None
self._tasks: Dict[str, TaskInfo] = {}
self._task_stack: List[str] = [] # For context managers
self.show_summary = show_summary
self._summary_renderable = self._render_summary() # Initial summary
self._last_summary_update = 0.0
self.summary_refresh_rate = summary_refresh_rate
def _create_progress(self, transient: bool, auto_refresh: bool, expand: bool) -> RichProgress:
"""Create the underlying Rich Progress instance."""
return RichProgress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=None if expand else 40),
"[progress.percentage]{task.percentage:>3.1f}%",
TimeElapsedColumn(),
TimeRemainingColumn(),
console=self.console,
transient=transient,
auto_refresh=auto_refresh,
expand=expand,
# disable=True # Useful for debugging
)
def _render_summary(self) -> Group:
"""Render the overall progress summary table."""
if not self.show_summary or not self._tasks:
return Group() # Empty group if no summary needed or no tasks yet
completed_count = sum(1 for t in self._tasks.values() if t.is_complete)
running_count = len(self._tasks) - completed_count
success_count = sum(1 for t in self._tasks.values() if t.status == 'success')
error_count = sum(1 for t in self._tasks.values() if t.status == 'error')
skipped_count = sum(1 for t in self._tasks.values() if t.status == 'skipped')
total_elapsed = time.time() - min(t.start_time for t in self._tasks.values()) if self._tasks else 0
# Calculate overall percentage (weighted average might be better?)
overall_total = sum(t.total for t in self._tasks.values())
overall_completed = sum(t.completed for t in self._tasks.values())
overall_perc = (overall_completed / overall_total * 100) if overall_total > 0 else 100.0
summary_table = Table(box=ROUNDED, show_header=False, padding=(0, 1), expand=True)
summary_table.add_column("Metric", style="dim", width=15)
summary_table.add_column("Value", style="bold")
summary_table.add_row("Overall Prog.", f"{overall_perc:.1f}%")
summary_table.add_row("Total Tasks", str(len(self._tasks)))
summary_table.add_row(" Running", str(running_count))
summary_table.add_row(" Completed", str(completed_count))
if success_count > 0:
summary_table.add_row(" Success", f"[success]{success_count}[/]")
if error_count > 0:
summary_table.add_row(" Errors", f"[error]{error_count}[/]")
if skipped_count > 0:
summary_table.add_row(" Skipped", f"[warning]{skipped_count}[/]")
summary_table.add_row("Elapsed Time", f"{total_elapsed:.2f}s")
return Group(summary_table)
def _get_renderable(self) -> ConsoleRenderable:
"""Get the combined renderable for the Live display."""
# Throttle summary updates
now = time.time()
if self.show_summary and (now - self._last_summary_update > self.summary_refresh_rate):
self._summary_renderable = self._render_summary()
self._last_summary_update = now
if self.show_summary:
return Group(self._rich_progress, self._summary_renderable)
else:
return self._rich_progress
def add_task(
self,
description: str,
name: Optional[str] = None,
total: float = 100.0,
parent: Optional[str] = None, # Name of parent task
visible: bool = True,
start: bool = True, # Start the Rich task immediately
**meta: Any # Additional metadata
) -> str:
"""Add a new task to track.
Args:
description: Text description of the task.
name: Unique name/ID for this task (auto-generated if None).
total: Total steps/units for completion.
parent: Name of the parent task for nesting (visual indent).
visible: Whether the task is initially visible.
start: Start the task in the Rich progress bar immediately.
**meta: Arbitrary metadata associated with the task.
Returns:
The unique name/ID of the added task.
"""
if name is None:
name = str(uuid.uuid4()) # Generate unique ID if not provided
if name in self._tasks:
raise ValueError(f"Task with name '{name}' already exists.")
parent_rich_id = None
if parent:
if parent not in self._tasks:
raise ValueError(f"Parent task '{parent}' not found.")
parent_task_info = self._tasks[parent]
if parent_task_info.rich_task_id is not None:
parent_rich_id = parent_task_info.rich_task_id
# Quick hack for indentation - needs better Rich integration? Rich doesn't directly support tree view in Progress
# description = f" {description}"
task_info = TaskInfo(
description=description,
total=total,
parent_id=parent,
meta=meta,
)
# Add to Rich Progress if active
rich_task_id = None
if self._live and self._rich_progress:
rich_task_id = self._rich_progress.add_task(
description,
total=total,
start=start,
visible=visible,
parent=parent_rich_id # Rich uses TaskID for parent
)
task_info.rich_task_id = rich_task_id
self._tasks[name] = task_info
return name
def update_task(
self,
name: str,
description: Optional[str] = None,
advance: Optional[float] = None,
completed: Optional[float] = None,
total: Optional[float] = None,
visible: Optional[bool] = None,
status: Optional[str] = None, # running, success, error, skipped
**meta: Any
) -> None:
"""Update an existing task.
Args:
name: The unique name/ID of the task to update.
description: New description text.
advance: Amount to advance the completion progress.
completed: Set completion to a specific value.
total: Set a new total value.
visible: Change task visibility.
status: Update the task status (affects summary).
**meta: Update or add metadata.
"""
if name not in self._tasks:
# Optionally log a warning or error
# default_console.print(f"[warning]Attempted to update non-existent task: {name}[/]")
return
task_info = self._tasks[name]
update_kwargs = {}
if description is not None:
task_info.description = description
update_kwargs['description'] = description
if total is not None:
task_info.total = float(total)
update_kwargs['total'] = task_info.total
# Update completed status
if completed is not None:
task_info.completed = max(0.0, min(float(completed), task_info.total))
update_kwargs['completed'] = task_info.completed
elif advance is not None:
task_info.completed = max(0.0, min(task_info.completed + float(advance), task_info.total))
update_kwargs['completed'] = task_info.completed
if visible is not None:
update_kwargs['visible'] = visible
if meta:
task_info.meta.update(meta)
# Update status (after completion update)
if status is not None:
task_info.status = status
if task_info.is_complete and task_info.end_time is None:
task_info.end_time = time.time()
# Ensure Rich task is marked as complete
if 'completed' not in update_kwargs:
update_kwargs['completed'] = task_info.total
# Update Rich progress bar if active
if task_info.rich_task_id is not None and self._live and self._rich_progress:
self._rich_progress.update(task_info.rich_task_id, **update_kwargs)
def complete_task(self, name: str, status: str = "success") -> None:
"""Mark a task as complete with a final status.
Args:
name: The unique name/ID of the task.
status: Final status ('success', 'error', 'skipped').
"""
if name not in self._tasks:
return # Or raise error/log warning
task_info = self._tasks[name]
self.update_task(
name,
completed=task_info.total, # Ensure it reaches 100%
status=status
)
def start(self) -> "GatewayProgress":
"""Start the Rich Live display."""
if self._live is None:
# Add any tasks that were created before start()
for _name, task_info in self._tasks.items():
if task_info.rich_task_id is None:
parent_rich_id = None
if task_info.parent_id and task_info.parent_id in self._tasks:
parent_rich_id = self._tasks[task_info.parent_id].rich_task_id
task_info.rich_task_id = self._rich_progress.add_task(
task_info.description,
total=task_info.total,
completed=task_info.completed,
start=True, # Assume tasks added before start should be started
visible=True, # Assume visible
parent=parent_rich_id
)
self._live = Live(self._get_renderable(), console=self.console, refresh_per_second=10, vertical_overflow="visible")
self._live.start(refresh=True)
return self
def stop(self) -> None:
"""Stop the Rich Live display."""
if self._live is not None:
# Ensure all running tasks in Rich are marked complete before stopping Live
# to avoid them getting stuck visually
if self._rich_progress:
for task in self._rich_progress.tasks:
if not task.finished:
self._rich_progress.update(task.id, completed=task.total)
self._live.stop()
self._live = None
# Optional: Clear the Rich Progress tasks?
# self._rich_progress = self._create_progress(...) # Recreate if needed
def update(self) -> None:
"""Force a refresh of the Live display (if active)."""
if self._live:
self._live.update(self._get_renderable(), refresh=True)
def reset(self) -> None:
"""Reset the progress tracker, clearing all tasks."""
self.stop() # Stop live display
self._tasks.clear()
self._task_stack.clear()
# Recreate Rich progress to clear its tasks
self._rich_progress = self._create_progress(
self._rich_progress.transient,
self._rich_progress.auto_refresh,
True # Assuming expand is derived from console width anyway
)
self._summary_renderable = self._render_summary()
self._last_summary_update = 0.0
@contextmanager
def task(
self,
description: str,
name: Optional[str] = None,
total: float = 100.0,
parent: Optional[str] = None,
autostart: bool = True, # Start Live display if not already started?
**meta: Any
) -> Generator["GatewayProgress", None, None]: # Yields self for updates
"""Context manager for a single task.
Args:
description: Description of the task.
name: Optional unique name/ID (auto-generated if None).
total: Total steps/units for the task.
parent: Optional parent task name.
autostart: Start the overall progress display if not running.
**meta: Additional metadata for the task.
Yields:
The GatewayProgress instance itself, allowing updates via `update_task`.
"""
if autostart and self._live is None:
self.start()
task_name = self.add_task(description, name, total, parent, **meta)
self._task_stack.append(task_name)
try:
yield self # Yield self to allow calling update_task(task_name, ...)
except Exception:
# Mark task as errored on exception
self.complete_task(task_name, status="error")
raise # Re-raise the exception
else:
# Mark task as successful if no exception
# Check if it was already completed with a different status
if task_name in self._tasks and not self._tasks[task_name].is_complete:
self.complete_task(task_name, status="success")
finally:
# Pop task from stack
if self._task_stack and self._task_stack[-1] == task_name:
self._task_stack.pop()
# No automatic stop here - allow multiple context managers
# self.stop()
def track(
self,
iterable: Iterable[T],
description: str,
name: Optional[str] = None,
total: Optional[float] = None,
parent: Optional[str] = None,
autostart: bool = True,
**meta: Any
) -> Iterable[T]:
"""Track progress over an iterable.
Args:
iterable: The iterable to track progress over.
description: Description of the task.
name: Optional unique name/ID (auto-generated if None).
total: Total number of items (estimated if None).
parent: Optional parent task name.
autostart: Start the overall progress display if not running.
**meta: Additional metadata for the task.
Returns:
The iterable, yielding items while updating progress.
"""
if total is None:
try:
total = float(len(iterable)) # type: ignore
except (TypeError, AttributeError):
total = 100.0 # Default if length cannot be determined
if autostart and self._live is None:
self.start()
task_name = self.add_task(description, name, total, parent, **meta)
try:
for item in iterable:
yield item
self.update_task(task_name, advance=1)
except Exception:
self.complete_task(task_name, status="error")
raise
else:
# Check if it was already completed with a different status
if task_name in self._tasks and not self._tasks[task_name].is_complete:
self.complete_task(task_name, status="success")
# No automatic stop
# finally:
# self.stop()
def __enter__(self) -> "GatewayProgress":
"""Enter context manager, starts the display."""
return self.start()
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit context manager, stops the display."""
self.stop()
# --- Global Convenience Functions (using a default progress instance) ---
# Note: Managing a truly global progress instance can be tricky.
# It might be better to explicitly create and manage GatewayProgress instances.
_global_progress: Optional[GatewayProgress] = None
def get_global_progress() -> GatewayProgress:
"""Get or create the default global progress manager."""
global _global_progress
if _global_progress is None:
_global_progress = GatewayProgress()
return _global_progress
def track(
iterable: Iterable[T],
description: str,
name: Optional[str] = None,
total: Optional[float] = None,
parent: Optional[str] = None,
) -> Iterable[T]:
"""Track progress over an iterable using the global progress manager."""
prog = get_global_progress()
# Ensure global progress is started if used this way
if prog._live is None:
prog.start()
return prog.track(iterable, description, name, total, parent, autostart=False)
@contextmanager
def task(
description: str,
name: Optional[str] = None,
total: float = 100.0,
parent: Optional[str] = None,
) -> Generator["GatewayProgress", None, None]:
"""Context manager for a single task using the global progress manager."""
prog = get_global_progress()
# Ensure global progress is started if used this way
if prog._live is None:
prog.start()
with prog.task(description, name, total, parent, autostart=False) as task_context:
yield task_context # Yields the progress manager itself
```
--------------------------------------------------------------------------------
/examples/compare_synthesize_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Enhanced demo of the Advanced Response Comparator & Synthesizer Tool."""
import asyncio
import json
import sys
from collections import namedtuple # Import namedtuple
from pathlib import Path
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway # Use Gateway to get MCP
# from ultimate_mcp_server.tools.meta import compare_and_synthesize # Add correct import
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker # Import CostTracker
from ultimate_mcp_server.utils.logging.console import console
# Initialize logger
logger = get_logger("example.compare_synthesize_v2")
# Create a simple structure for cost tracking from dict (tokens might be missing)
TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])
# Global MCP instance (will be populated from Gateway)
mcp = None
async def setup_gateway_and_tools():
"""Set up the gateway and register tools."""
global mcp
logger.info("Initializing Gateway and MetaTools for enhanced demo...", emoji_key="start")
gateway = Gateway("compare-synthesize-demo-v2", register_tools=False)
# Initialize providers (needed for the tool to function)
try:
await gateway._initialize_providers()
except Exception as e:
logger.critical(f"Failed to initialize providers: {e}. Check API keys.", emoji_key="critical", exc_info=True)
sys.exit(1) # Exit if providers can't be initialized
# REMOVE MetaTools instance
# meta_tools = MetaTools(gateway) # Pass the gateway instance # noqa: F841
mcp = gateway.mcp # Store the MCP server instance
# Manually register the required tool
# mcp.tool()(compare_and_synthesize)
# logger.info("Manually registered compare_and_synthesize tool.")
# Verify tool registration
tool_list = await mcp.list_tools()
tool_names = [t.name for t in tool_list] # Access name attribute directly
# Use console.print for tool list
console.print(f"Registered tools: [cyan]{escape(str(tool_names))}[/cyan]")
if "compare_and_synthesize" in tool_names:
logger.success("compare_and_synthesize tool registered successfully.", emoji_key="success")
else:
logger.error("compare_and_synthesize tool FAILED to register.", emoji_key="error")
sys.exit(1) # Exit if the required tool isn't available
logger.success("Setup complete.", emoji_key="success")
# Refactored print_result function using Rich
def print_result(title: str, result: dict):
"""Helper function to print results clearly using Rich components."""
console.print(Rule(f"[bold blue]{escape(title)}[/bold blue]"))
# Handle potential list result format (from older tool versions?)
if isinstance(result, list) and len(result) > 0:
if hasattr(result[0], 'text'):
try:
result = json.loads(result[0].text)
except Exception:
result = {"error": "Failed to parse result from list format"}
else:
result = result[0] # Assume first item is the dict
elif not isinstance(result, dict):
result = {"error": f"Unexpected result format: {type(result)}"}
if result.get("error"):
error_content = f"[red]Error:[/red] {escape(result['error'])}"
if "partial_results" in result and result["partial_results"]:
try:
partial_json = json.dumps(result["partial_results"], indent=2)
error_content += "\n\n[yellow]Partial Results:[/yellow]"
error_panel_content = Syntax(partial_json, "json", theme="default", line_numbers=False, word_wrap=True)
except Exception as json_err:
error_panel_content = f"[red]Could not display partial results: {escape(str(json_err))}[/red]"
else:
error_panel_content = error_content
console.print(Panel(
error_panel_content,
title="[bold red]Tool Error[/bold red]",
border_style="red",
expand=False
))
else:
# Display synthesis/analysis sections
if "synthesis" in result:
synthesis_data = result["synthesis"]
if isinstance(synthesis_data, dict):
if "best_response_text" in synthesis_data:
console.print(Panel(
escape(synthesis_data["best_response_text"].strip()),
title="[bold green]Best Response Text[/bold green]",
border_style="green",
expand=False
))
if "synthesized_response" in synthesis_data:
console.print(Panel(
escape(synthesis_data["synthesized_response"].strip()),
title="[bold magenta]Synthesized Response[/bold magenta]",
border_style="magenta",
expand=False
))
if synthesis_data.get("best_response", {}).get("reasoning"):
console.print(Panel(
escape(synthesis_data["best_response"]["reasoning"].strip()),
title="[bold cyan]Best Response Reasoning[/bold cyan]",
border_style="dim cyan",
expand=False
))
if synthesis_data.get("synthesis_strategy"):
console.print(Panel(
escape(synthesis_data["synthesis_strategy"].strip()),
title="[bold yellow]Synthesis Strategy Explanation[/bold yellow]",
border_style="dim yellow",
expand=False
))
if "ranking" in synthesis_data:
try:
ranking_json = json.dumps(synthesis_data["ranking"], indent=2)
console.print(Panel(
Syntax(ranking_json, "json", theme="default", line_numbers=False, word_wrap=True),
title="[bold]Ranking[/bold]",
border_style="dim blue",
expand=False
))
except Exception as json_err:
console.print(f"[red]Could not display ranking: {escape(str(json_err))}[/red]")
if "comparative_analysis" in synthesis_data:
try:
analysis_json = json.dumps(synthesis_data["comparative_analysis"], indent=2)
console.print(Panel(
Syntax(analysis_json, "json", theme="default", line_numbers=False, word_wrap=True),
title="[bold]Comparative Analysis[/bold]",
border_style="dim blue",
expand=False
))
except Exception as json_err:
console.print(f"[red]Could not display comparative analysis: {escape(str(json_err))}[/red]")
else: # Handle case where synthesis data isn't a dict (e.g., raw text error)
console.print(Panel(
f"[yellow]Synthesis Output (raw/unexpected format):[/yellow]\n{escape(str(synthesis_data))}",
title="[bold yellow]Synthesis Data[/bold yellow]",
border_style="yellow",
expand=False
))
# Display Stats Table
stats_table = Table(title="[bold]Execution Stats[/bold]", box=box.ROUNDED, show_header=False, expand=False)
stats_table.add_column("Metric", style="cyan", no_wrap=True)
stats_table.add_column("Value", style="white")
stats_table.add_row("Eval/Synth Model", f"{escape(result.get('synthesis_provider','N/A'))}/{escape(result.get('synthesis_model','N/A'))}")
stats_table.add_row("Total Cost", f"${result.get('cost', {}).get('total_cost', 0.0):.6f}")
stats_table.add_row("Processing Time", f"{result.get('processing_time', 0.0):.2f}s")
console.print(stats_table)
console.print() # Add spacing after each result block
async def run_comparison_demo(tracker: CostTracker):
"""Demonstrate different modes of compare_and_synthesize."""
if not mcp:
logger.error("MCP server not initialized. Run setup first.", emoji_key="error")
return
prompt = "Explain the main benefits of using asynchronous programming in Python for a moderately technical audience. Provide 2-3 key advantages."
# --- Configuration for initial responses ---
console.print(Rule("[bold green]Configurations[/bold green]"))
console.print(f"[cyan]Prompt:[/cyan] {escape(prompt)}")
initial_configs = [
{"provider": Provider.OPENAI.value, "model": "gpt-4.1-mini", "parameters": {"temperature": 0.6, "max_tokens": 150}},
{"provider": Provider.ANTHROPIC.value, "model": "claude-3-5-haiku-20241022", "parameters": {"temperature": 0.5, "max_tokens": 150}},
{"provider": Provider.GEMINI.value, "model": "gemini-2.0-flash-lite", "parameters": {"temperature": 0.7, "max_tokens": 150}},
{"provider": Provider.DEEPSEEK.value, "model": "deepseek-chat", "parameters": {"temperature": 0.6, "max_tokens": 150}},
]
console.print(f"[cyan]Initial Models:[/cyan] {[f'{c['provider']}:{c['model']}' for c in initial_configs]}")
# --- Evaluation Criteria ---
criteria = [
"Clarity: Is the explanation clear and easy to understand for the target audience?",
"Accuracy: Are the stated benefits of async programming technically correct?",
"Relevance: Does the response directly address the prompt and focus on key advantages?",
"Conciseness: Is the explanation brief and to the point?",
"Completeness: Does it mention 2-3 distinct and significant benefits?",
]
console.print("[cyan]Evaluation Criteria:[/cyan]")
for i, criterion in enumerate(criteria):
console.print(f" {i+1}. {escape(criterion)}")
# --- Criteria Weights (Optional) ---
criteria_weights = {
"Clarity: Is the explanation clear and easy to understand for the target audience?": 0.3,
"Accuracy: Are the stated benefits of async programming technically correct?": 0.3,
"Relevance: Does the response directly address the prompt and focus on key advantages?": 0.15,
"Conciseness: Is the explanation brief and to the point?": 0.1,
"Completeness: Does it mention 2-3 distinct and significant benefits?": 0.15,
}
console.print("[cyan]Criteria Weights (Optional):[/cyan]")
# Create a small table for weights
weights_table = Table(box=box.MINIMAL, show_header=False)
weights_table.add_column("Criterion Snippet", style="dim")
weights_table.add_column("Weight", style="green")
for crit, weight in criteria_weights.items():
weights_table.add_row(escape(crit.split(':')[0]), f"{weight:.2f}")
console.print(weights_table)
# --- Synthesis/Evaluation Model ---
synthesis_model_config = {"provider": Provider.OPENAI.value, "model": "gpt-4.1"}
console.print(f"[cyan]Synthesis/Evaluation Model:[/cyan] {escape(synthesis_model_config['provider'])}:{escape(synthesis_model_config['model'])}")
console.print() # Spacing before demos start
common_args = {
"prompt": prompt,
"configs": initial_configs,
"criteria": criteria,
"criteria_weights": criteria_weights,
}
# --- Demo 1: Select Best Response ---
logger.info("Running format 'best'...", emoji_key="processing")
try:
result = await mcp.call_tool("compare_and_synthesize", {
**common_args,
"response_format": "best",
"include_reasoning": True, # Show why it was selected
"synthesis_model": synthesis_model_config # Explicitly specify model to avoid OpenRouter
})
print_result("Response Format: 'best' (with reasoning)", result)
# Track cost
if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
try:
trackable = TrackableResult(
cost=result.get("cost", {}).get("total_cost", 0.0),
input_tokens=0, # Tokens not typically aggregated in this tool's output
output_tokens=0,
provider=result.get("synthesis_provider", "unknown"),
model=result.get("synthesis_model", "compare_synthesize"),
processing_time=result.get("processing_time", 0.0)
)
tracker.add_call(trackable)
except Exception as track_err:
logger.warning(f"Could not track cost for 'best' format: {track_err}", exc_info=False)
except Exception as e:
logger.error(f"Error during 'best' format demo: {e}", emoji_key="error", exc_info=True)
# --- Demo 2: Synthesize Responses (Comprehensive Strategy) ---
logger.info("Running format 'synthesis' (comprehensive)...", emoji_key="processing")
try:
result = await mcp.call_tool("compare_and_synthesize", {
**common_args,
"response_format": "synthesis",
"synthesis_strategy": "comprehensive",
"synthesis_model": synthesis_model_config, # Specify model for consistency
"include_reasoning": True,
})
print_result("Response Format: 'synthesis' (Comprehensive Strategy)", result)
# Track cost
if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
try:
trackable = TrackableResult(
cost=result.get("cost", {}).get("total_cost", 0.0),
input_tokens=0, # Tokens not typically aggregated
output_tokens=0,
provider=result.get("synthesis_provider", "unknown"),
model=result.get("synthesis_model", "compare_synthesize"),
processing_time=result.get("processing_time", 0.0)
)
tracker.add_call(trackable)
except Exception as track_err:
logger.warning(f"Could not track cost for 'synthesis comprehensive': {track_err}", exc_info=False)
except Exception as e:
logger.error(f"Error during 'synthesis comprehensive' demo: {e}", emoji_key="error", exc_info=True)
# --- Demo 3: Synthesize Responses (Conservative Strategy, No Reasoning) ---
logger.info("Running format 'synthesis' (conservative, no reasoning)...", emoji_key="processing")
try:
result = await mcp.call_tool("compare_and_synthesize", {
**common_args,
"response_format": "synthesis",
"synthesis_strategy": "conservative",
"synthesis_model": synthesis_model_config, # Explicitly specify
"include_reasoning": False, # Hide the synthesis strategy explanation
})
print_result("Response Format: 'synthesis' (Conservative, No Reasoning)", result)
# Track cost
if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
try:
trackable = TrackableResult(
cost=result.get("cost", {}).get("total_cost", 0.0),
input_tokens=0, # Tokens not typically aggregated
output_tokens=0,
provider=result.get("synthesis_provider", "unknown"),
model=result.get("synthesis_model", "compare_synthesize"),
processing_time=result.get("processing_time", 0.0)
)
tracker.add_call(trackable)
except Exception as track_err:
logger.warning(f"Could not track cost for 'synthesis conservative': {track_err}", exc_info=False)
except Exception as e:
logger.error(f"Error during 'synthesis conservative' demo: {e}", emoji_key="error", exc_info=True)
# --- Demo 4: Rank Responses ---
logger.info("Running format 'ranked'...", emoji_key="processing")
try:
result = await mcp.call_tool("compare_and_synthesize", {
**common_args,
"response_format": "ranked",
"include_reasoning": True, # Show reasoning for ranks
"synthesis_model": synthesis_model_config, # Explicitly specify
})
print_result("Response Format: 'ranked' (with reasoning)", result)
# Track cost
if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
try:
trackable = TrackableResult(
cost=result.get("cost", {}).get("total_cost", 0.0),
input_tokens=0, # Tokens not typically aggregated
output_tokens=0,
provider=result.get("synthesis_provider", "unknown"),
model=result.get("synthesis_model", "compare_synthesize"),
processing_time=result.get("processing_time", 0.0)
)
tracker.add_call(trackable)
except Exception as track_err:
logger.warning(f"Could not track cost for 'ranked' format: {track_err}", exc_info=False)
except Exception as e:
logger.error(f"Error during 'ranked' format demo: {e}", emoji_key="error", exc_info=True)
# --- Demo 5: Analyze Responses ---
logger.info("Running format 'analysis'...", emoji_key="processing")
try:
result = await mcp.call_tool("compare_and_synthesize", {
**common_args,
"response_format": "analysis",
# No reasoning needed for analysis format, it's inherent
"synthesis_model": synthesis_model_config, # Explicitly specify
})
print_result("Response Format: 'analysis'", result)
# Track cost
if isinstance(result, dict) and "cost" in result and "synthesis_provider" in result and "synthesis_model" in result:
try:
trackable = TrackableResult(
cost=result.get("cost", {}).get("total_cost", 0.0),
input_tokens=0, # Tokens not typically aggregated
output_tokens=0,
provider=result.get("synthesis_provider", "unknown"),
model=result.get("synthesis_model", "compare_synthesize"),
processing_time=result.get("processing_time", 0.0)
)
tracker.add_call(trackable)
except Exception as track_err:
logger.warning(f"Could not track cost for 'analysis' format: {track_err}", exc_info=False)
except Exception as e:
logger.error(f"Error during 'analysis' format demo: {e}", emoji_key="error", exc_info=True)
# Display cost summary at the end
tracker.display_summary(console)
async def main():
"""Run the enhanced compare_and_synthesize demo."""
tracker = CostTracker() # Instantiate tracker
await setup_gateway_and_tools()
await run_comparison_demo(tracker) # Pass tracker
# logger.info("Skipping run_comparison_demo() as the 'compare_and_synthesize' tool function is missing.") # Remove skip message
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Demo stopped by user.")
except Exception as main_err:
logger.critical(f"Demo failed with unexpected error: {main_err}", emoji_key="critical", exc_info=True)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/tournament.py:
--------------------------------------------------------------------------------
```python
"""Tournament tools for Ultimate MCP Server."""
from typing import Any, Dict, List, Optional
from ultimate_mcp_server.exceptions import ToolError
from ultimate_mcp_server.core.models.tournament import (
CancelTournamentInput,
CancelTournamentOutput,
CreateTournamentInput,
CreateTournamentOutput,
GetTournamentResultsInput,
GetTournamentStatusInput,
GetTournamentStatusOutput,
TournamentBasicInfo,
TournamentData,
TournamentStatus,
)
from ultimate_mcp_server.core.models.tournament import (
EvaluatorConfig as InputEvaluatorConfig,
)
from ultimate_mcp_server.core.models.tournament import (
ModelConfig as InputModelConfig,
)
from ultimate_mcp_server.core.tournaments.manager import tournament_manager
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.tools.tournament")
# --- Standalone Tool Functions ---
@with_tool_metrics
@with_error_handling
async def create_tournament(
name: str,
prompt: str,
models: List[Dict[str, Any]],
rounds: int = 3,
tournament_type: str = "code",
extraction_model_id: Optional[str] = "anthropic/claude-3-5-haiku-20241022",
evaluators: Optional[List[Dict[str, Any]]] = None,
max_retries_per_model_call: int = 3,
retry_backoff_base_seconds: float = 1.0,
max_concurrent_model_calls: int = 5
) -> Dict[str, Any]:
"""
Creates and starts a new LLM competition (tournament) based on a prompt and model configurations.
Args:
name: Human-readable name for the tournament (e.g., "Essay Refinement Contest", "Python Sorting Challenge").
prompt: The task prompt provided to all participating LLM models.
models: List of model configurations (external key is "models"). Each config is a dictionary specifying:
- model_id (str, required): e.g., 'openai/gpt-4o'.
- diversity_count (int, optional, default 1): Number of variants per model.
# ... (rest of ModelConfig fields) ...
rounds: Number of tournament rounds. Each round allows models to refine their previous output (if applicable to the tournament type). Default is 3.
tournament_type: The type of tournament defining the task and evaluation method. Supported types include:
- "code": For evaluating code generation based on correctness and potentially style/efficiency.
- "text": For general text generation, improvement, or refinement tasks.
Default is "code".
extraction_model_id: (Optional, primarily for 'code' type) Specific LLM model to use for extracting and evaluating results like code blocks. If None, a default is used.
evaluators: (Optional) List of evaluator configurations as dicts.
max_retries_per_model_call: Maximum retries per model call.
retry_backoff_base_seconds: Base seconds for retry backoff.
max_concurrent_model_calls: Maximum concurrent model calls.
Returns:
Dictionary with tournament creation status containing:
- tournament_id: Unique identifier for the created tournament.
- status: Initial tournament status (usually 'PENDING' or 'RUNNING').
- storage_path: Filesystem path where tournament data will be stored.
Example:
{
"tournament_id": "tour_abc123xyz789",
"status": "PENDING",
"storage_path": "/path/to/storage/tour_abc123xyz789"
}
Raises:
ToolError: If input is invalid, tournament creation fails, or scheduling fails.
"""
logger.info(f"Tool 'create_tournament' invoked for: {name}")
try:
parsed_model_configs = [InputModelConfig(**mc) for mc in models]
parsed_evaluators = [InputEvaluatorConfig(**ev) for ev in (evaluators or [])]
input_data = CreateTournamentInput(
name=name,
prompt=prompt,
models=parsed_model_configs,
rounds=rounds,
tournament_type=tournament_type,
extraction_model_id=extraction_model_id,
evaluators=parsed_evaluators,
max_retries_per_model_call=max_retries_per_model_call,
retry_backoff_base_seconds=retry_backoff_base_seconds,
max_concurrent_model_calls=max_concurrent_model_calls
)
tournament = tournament_manager.create_tournament(input_data)
if not tournament:
raise ToolError("Failed to create tournament entry.")
logger.info("Calling start_tournament_execution (using asyncio)")
success = tournament_manager.start_tournament_execution(
tournament_id=tournament.tournament_id
)
if not success:
logger.error(f"Failed to schedule background execution for tournament {tournament.tournament_id}")
updated_tournament = tournament_manager.get_tournament(tournament.tournament_id)
error_msg = updated_tournament.error_message if updated_tournament else "Failed to schedule execution."
raise ToolError(f"Failed to start tournament execution: {error_msg}")
logger.info(f"Tournament {tournament.tournament_id} ({tournament.name}) created and background execution started.")
# Include storage_path in the return value
output = CreateTournamentOutput(
tournament_id=tournament.tournament_id,
status=tournament.status,
storage_path=tournament.storage_path,
message=f"Tournament '{tournament.name}' created successfully and execution started."
)
return output.dict()
except ValueError as ve:
logger.warning(f"Validation error creating tournament: {ve}")
raise ToolError(f"Invalid input: {ve}") from ve
except Exception as e:
logger.error(f"Error creating tournament: {e}", exc_info=True)
raise ToolError(f"An unexpected error occurred: {e}") from e
@with_tool_metrics
@with_error_handling
async def get_tournament_status(
tournament_id: str
) -> Dict[str, Any]:
"""Retrieves the current status and progress of a specific tournament.
Use this tool to monitor an ongoing tournament (PENDING, RUNNING) or check the final
state (COMPLETED, FAILED, CANCELLED) of a past tournament.
Args:
tournament_id: Unique identifier of the tournament to check.
Returns:
Dictionary containing tournament status information:
- tournament_id: Unique identifier for the tournament.
- name: Human-readable name of the tournament.
- tournament_type: Type of tournament (e.g., "code", "text").
- status: Current status (e.g., "PENDING", "RUNNING", "COMPLETED", "FAILED", "CANCELLED").
- current_round: Current round number (1-based) if RUNNING, else the last active round.
- total_rounds: Total number of rounds configured for this tournament.
- created_at: ISO timestamp when the tournament was created.
- updated_at: ISO timestamp when the tournament status was last updated.
- error_message: Error message if the tournament FAILED (null otherwise).
Error Handling:
- Raises ToolError (400) if tournament_id format is invalid.
- Raises ToolError (404) if the tournament ID is not found.
- Raises ToolError (500) for internal server errors.
Example:
{
"tournament_id": "tour_abc123xyz789",
"name": "Essay Refinement Contest",
"tournament_type": "text",
"status": "RUNNING",
"current_round": 2,
"total_rounds": 3,
"created_at": "2023-04-15T14:32:17.123456",
"updated_at": "2023-04-15T14:45:22.123456",
"error_message": null
}
"""
logger.debug(f"Getting status for tournament: {tournament_id}")
try:
if not tournament_id or not isinstance(tournament_id, str):
raise ToolError(
status_code=400,
detail="Invalid tournament ID format. Tournament ID must be a non-empty string."
)
try:
input_data = GetTournamentStatusInput(tournament_id=tournament_id)
except ValueError as ve:
raise ToolError(
status_code=400,
detail=f"Invalid tournament ID: {str(ve)}"
) from ve
tournament = tournament_manager.get_tournament(input_data.tournament_id, force_reload=True)
if not tournament:
raise ToolError(
status_code=404,
detail=f"Tournament not found: {tournament_id}. Check if the tournament ID is correct or use list_tournaments to see all available tournaments."
)
try:
output = GetTournamentStatusOutput(
tournament_id=tournament.tournament_id,
name=tournament.name,
tournament_type=tournament.config.tournament_type,
status=tournament.status,
current_round=tournament.current_round,
total_rounds=tournament.config.rounds,
created_at=tournament.created_at,
updated_at=tournament.updated_at,
error_message=tournament.error_message
)
return output.dict()
except Exception as e:
logger.error(f"Error converting tournament data to output format: {e}", exc_info=True)
raise ToolError(
status_code=500,
detail=f"Error processing tournament data: {str(e)}. The tournament data may be corrupted."
) from e
except ToolError:
raise
except Exception as e:
logger.error(f"Error getting tournament status for {tournament_id}: {e}", exc_info=True)
raise ToolError(
status_code=500,
detail=f"Internal server error retrieving tournament status: {str(e)}. Please try again or check the server logs."
) from e
@with_tool_metrics
@with_error_handling
async def list_tournaments(
) -> List[Dict[str, Any]]:
"""Lists all created tournaments with basic identifying information and status.
Useful for discovering existing tournaments and their current states without fetching full results.
Returns:
List of dictionaries, each containing basic tournament info:
- tournament_id: Unique identifier for the tournament.
- name: Human-readable name of the tournament.
- tournament_type: Type of tournament (e.g., "code", "text").
- status: Current status (e.g., "PENDING", "RUNNING", "COMPLETED", "FAILED", "CANCELLED").
- created_at: ISO timestamp when the tournament was created.
- updated_at: ISO timestamp when the tournament was last updated.
Example:
[
{
"tournament_id": "tour_abc123",
"name": "Tournament A",
"tournament_type": "code",
"status": "COMPLETED",
"created_at": "2023-04-10T10:00:00",
"updated_at": "2023-04-10T12:30:00"
},
...
]
"""
logger.debug("Listing all tournaments")
try:
tournaments = tournament_manager.list_tournaments()
output_list = []
for tournament in tournaments:
try:
# Ensure tournament object has necessary attributes before accessing
if not hasattr(tournament, 'tournament_id') or \
not hasattr(tournament, 'name') or \
not hasattr(tournament, 'config') or \
not hasattr(tournament.config, 'tournament_type') or \
not hasattr(tournament, 'status') or \
not hasattr(tournament, 'created_at') or \
not hasattr(tournament, 'updated_at'):
logger.warning(f"Skipping tournament due to missing attributes: {getattr(tournament, 'tournament_id', 'UNKNOWN ID')}")
continue
basic_info = TournamentBasicInfo(
tournament_id=tournament.tournament_id,
name=tournament.name,
tournament_type=tournament.config.tournament_type,
status=tournament.status,
created_at=tournament.created_at,
updated_at=tournament.updated_at,
)
output_list.append(basic_info.dict())
except Exception as e:
logger.warning(f"Skipping tournament {getattr(tournament, 'tournament_id', 'UNKNOWN')} due to data error during processing: {e}")
return output_list
except Exception as e:
logger.error(f"Error listing tournaments: {e}", exc_info=True)
raise ToolError(
status_code=500,
detail=f"Internal server error listing tournaments: {str(e)}"
) from e
@with_tool_metrics
@with_error_handling
async def get_tournament_results(
tournament_id: str
) -> List[Dict[str, str]]:
"""Retrieves the complete results and configuration for a specific tournament.
Provides comprehensive details including configuration, final scores (if applicable),
detailed round-by-round results, model outputs, and any errors encountered.
Use this *after* a tournament has finished (COMPLETED or FAILED) for full analysis.
Args:
tournament_id: Unique identifier for the tournament.
Returns:
Dictionary containing the full tournament data (structure depends on the tournament manager's implementation, but generally includes config, status, results, timestamps, etc.).
Example (Conceptual - actual structure may vary):
{
"tournament_id": "tour_abc123",
"name": "Sorting Algo Test",
"status": "COMPLETED",
"config": { ... },
"results": { "scores": { ... }, "round_results": [ { ... }, ... ] },
"created_at": "...",
"updated_at": "...",
"error_message": null
}
Raises:
ToolError: If the tournament ID is invalid, not found, results are not ready (still PENDING/RUNNING), or an internal error occurs.
"""
logger.debug(f"Getting results for tournament: {tournament_id}")
try:
if not tournament_id or not isinstance(tournament_id, str):
raise ToolError(
status_code=400,
detail="Invalid tournament ID format. Tournament ID must be a non-empty string."
)
try:
input_data = GetTournamentResultsInput(tournament_id=tournament_id)
except ValueError as ve:
raise ToolError(
status_code=400,
detail=f"Invalid tournament ID: {str(ve)}"
) from ve
# Make sure to request TournamentData which should contain results
tournament_data: Optional[TournamentData] = tournament_manager.get_tournament(input_data.tournament_id, force_reload=True)
if not tournament_data:
# Check if the tournament exists but just has no results yet (e.g., PENDING)
tournament_status_info = tournament_manager.get_tournament(tournament_id) # Gets basic info
if tournament_status_info:
current_status = tournament_status_info.status
if current_status in [TournamentStatus.PENDING, TournamentStatus.RUNNING]:
raise ToolError(
status_code=404, # Use 404 to indicate results not ready
detail=f"Tournament '{tournament_id}' is currently {current_status}. Results are not yet available."
)
else: # Should have results if COMPLETED or ERROR, maybe data issue?
logger.error(f"Tournament {tournament_id} status is {current_status} but get_tournament_results returned None.")
raise ToolError(
status_code=500,
detail=f"Could not retrieve results for tournament '{tournament_id}' despite status being {current_status}. There might be an internal data issue."
)
else:
raise ToolError(
status_code=404,
detail=f"Tournament not found: {tournament_id}. Cannot retrieve results."
)
# NEW: Return a structure that FastMCP might recognize as a pre-formatted content list
json_string = tournament_data.json()
logger.info(f"[DEBUG_GET_RESULTS] Returning pre-formatted TextContent list. JSON Snippet: {json_string[:150]}")
return [{ "type": "text", "text": json_string }]
except ToolError:
raise
except Exception as e:
logger.error(f"Error getting tournament results for {tournament_id}: {e}", exc_info=True)
raise ToolError(
f"Internal server error retrieving tournament results: {str(e)}",
500 # status_code
) from e
@with_tool_metrics
@with_error_handling
async def cancel_tournament(
tournament_id: str
) -> Dict[str, Any]:
"""Attempts to cancel a running (RUNNING) or pending (PENDING) tournament.
Signals the tournament manager to stop processing. Cancellation is not guaranteed
to be immediate. Check status afterwards using `get_tournament_status`.
Cannot cancel tournaments that are already COMPLETED, FAILED, or CANCELLED.
Args:
tournament_id: Unique identifier for the tournament to cancel.
Returns:
Dictionary confirming the cancellation attempt:
- tournament_id: The ID of the tournament targeted for cancellation.
- status: The status *after* the cancellation attempt (e.g., "CANCELLED", or the previous state like "COMPLETED" if cancellation was not possible).
- message: A message indicating the outcome (e.g., "Tournament cancellation requested successfully.", "Cancellation failed: Tournament is already COMPLETED.").
Raises:
ToolError: If the tournament ID is invalid, not found, or an internal error occurs.
"""
logger.info(f"Received request to cancel tournament: {tournament_id}")
try:
if not tournament_id or not isinstance(tournament_id, str):
raise ToolError(status_code=400, detail="Invalid tournament ID format.")
try:
input_data = CancelTournamentInput(tournament_id=tournament_id)
except ValueError as ve:
raise ToolError(status_code=400, detail=f"Invalid tournament ID: {str(ve)}") from ve
# Call the manager's cancel function
success, message, final_status = await tournament_manager.cancel_tournament(input_data.tournament_id)
# Prepare output using the Pydantic model
output = CancelTournamentOutput(
tournament_id=tournament_id,
status=final_status, # Return the actual status after attempt
message=message
)
if not success:
# Log the failure but return the status/message from the manager
logger.warning(f"Cancellation attempt for tournament {tournament_id} reported failure: {message}")
# Raise ToolError if the status implies a client error (e.g., not found)
if "not found" in message.lower():
raise ToolError(status_code=404, detail=message)
elif final_status in [TournamentStatus.COMPLETED, TournamentStatus.FAILED, TournamentStatus.CANCELLED] and "already" in message.lower():
raise ToolError(status_code=409, detail=message)
# Optionally handle other errors as 500
# else:
# raise ToolError(status_code=500, detail=f"Cancellation failed: {message}")
else:
logger.info(f"Cancellation attempt for tournament {tournament_id} successful. Final status: {final_status}")
return output.dict()
except ToolError:
raise
except Exception as e:
logger.error(f"Error cancelling tournament {tournament_id}: {e}", exc_info=True)
raise ToolError(status_code=500, detail=f"Internal server error during cancellation: {str(e)}") from e
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/openai.py:
--------------------------------------------------------------------------------
```python
"""OpenAI provider implementation."""
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from openai import AsyncOpenAI
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.utils import get_logger
# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.openai")
class OpenAIProvider(BaseProvider):
"""Provider implementation for OpenAI API."""
provider_name = Provider.OPENAI.value
def __init__(self, api_key: Optional[str] = None, **kwargs):
"""Initialize the OpenAI provider.
Args:
api_key: OpenAI API key
**kwargs: Additional options
"""
super().__init__(api_key=api_key, **kwargs)
self.base_url = kwargs.get("base_url")
self.organization = kwargs.get("organization")
self.models_cache = None
async def initialize(self) -> bool:
"""Initialize the OpenAI client.
Returns:
bool: True if initialization was successful
"""
try:
self.client = AsyncOpenAI(
api_key=self.api_key,
base_url=self.base_url,
organization=self.organization,
)
# Skip API call if using a mock key (for tests)
if self.api_key and "mock-" in self.api_key:
self.logger.info(
"Using mock OpenAI key - skipping API validation",
emoji_key="mock"
)
return True
# Test connection by listing models
await self.list_models()
self.logger.success(
"OpenAI provider initialized successfully",
emoji_key="provider"
)
return True
except Exception as e:
self.logger.error(
f"Failed to initialize OpenAI provider: {str(e)}",
emoji_key="error"
)
return False
async def generate_completion(
self,
prompt: Optional[str] = None,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
**kwargs
) -> ModelResponse:
"""Generate a completion using OpenAI.
Args:
prompt: Text prompt to send to the model
model: Model name to use (e.g., "gpt-4o")
max_tokens: Maximum tokens to generate
temperature: Temperature parameter (0.0-1.0)
**kwargs: Additional model-specific parameters
Returns:
ModelResponse with completion result
Raises:
Exception: If API call fails
"""
if not self.client:
await self.initialize()
# Use default model if not specified
model = model or self.get_default_model()
# Strip provider prefix if present (e.g., "openai/gpt-4o" -> "gpt-4o")
if model.startswith(f"{self.provider_name}/"):
original_model = model
model = model.split("/", 1)[1]
self.logger.debug(f"Stripped provider prefix from model name: {original_model} -> {model}")
# Handle case when messages are provided instead of prompt (for chat_completion)
messages = kwargs.pop("messages", None)
# If neither prompt nor messages are provided, raise an error
if prompt is None and not messages:
raise ValueError("Either 'prompt' or 'messages' must be provided")
# Create messages if not already provided
if not messages:
messages = [{"role": "user", "content": prompt}]
# Prepare API call parameters
params = {
"model": model,
"messages": messages,
"temperature": temperature,
}
# Add max_tokens if specified
if max_tokens is not None:
params["max_tokens"] = max_tokens
# Check for json_mode flag and remove it from kwargs
json_mode = kwargs.pop("json_mode", False)
if json_mode:
# Use the correct response_format for JSON mode
params["response_format"] = {"type": "json_object"}
self.logger.debug("Setting response_format to JSON mode for OpenAI")
# Handle any legacy response_format passed directly, but prefer json_mode
if "response_format" in kwargs and not json_mode:
# Support both direct format object and type-only specification
response_format = kwargs.pop("response_format")
if isinstance(response_format, dict):
params["response_format"] = response_format
elif isinstance(response_format, str) and response_format in ["json_object", "text"]:
params["response_format"] = {"type": response_format}
self.logger.debug(f"Setting response_format from direct param: {params.get('response_format')}")
# Add any remaining additional parameters
params.update(kwargs)
# --- Special handling for specific model parameter constraints ---
if model == 'o3-mini':
if 'temperature' in params:
self.logger.debug(f"Removing unsupported 'temperature' parameter for model {model}")
del params['temperature']
elif model == 'o1-preview':
current_temp = params.get('temperature')
# Only allow temperature if it's explicitly set to 1.0, otherwise remove it to use API default.
if current_temp is not None and current_temp != 1.0:
self.logger.debug(f"Removing non-default 'temperature' ({current_temp}) for model {model}")
del params['temperature']
# --- End special handling ---
# Log request
prompt_length = len(prompt) if prompt else sum(len(m.get("content", "")) for m in messages)
self.logger.info(
f"Generating completion with OpenAI model {model}",
emoji_key=self.provider_name,
prompt_length=prompt_length,
json_mode=json_mode # Log if json_mode was requested
)
try:
# API call with timing
response, processing_time = await self.process_with_timer(
self.client.chat.completions.create, **params
)
# Extract response text
completion_text = response.choices[0].message.content
# Create message object for chat_completion
message = {
"role": "assistant",
"content": completion_text
}
# Create standardized response
result = ModelResponse(
text=completion_text,
model=model,
provider=self.provider_name,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens,
processing_time=processing_time,
raw_response=response,
)
# Add message to result for chat_completion
result.message = message
# Log success
self.logger.success(
"OpenAI completion successful",
emoji_key="success",
model=model,
tokens={
"input": result.input_tokens,
"output": result.output_tokens
},
cost=result.cost,
time=result.processing_time
)
return result
except Exception as e:
self.logger.error(
f"OpenAI completion failed: {str(e)}",
emoji_key="error",
model=model
)
raise
async def generate_completion_stream(
self,
prompt: Optional[str] = None,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
**kwargs
) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
"""Generate a streaming completion using OpenAI.
Args:
prompt: Text prompt to send to the model
model: Model name to use (e.g., "gpt-4o")
max_tokens: Maximum tokens to generate
temperature: Temperature parameter (0.0-1.0)
**kwargs: Additional model-specific parameters
Yields:
Tuple of (text_chunk, metadata)
Raises:
Exception: If API call fails
"""
if not self.client:
await self.initialize()
# Use default model if not specified
model = model or self.get_default_model()
# Strip provider prefix if present (e.g., "openai/gpt-4o" -> "gpt-4o")
if model.startswith(f"{self.provider_name}/"):
original_model = model
model = model.split("/", 1)[1]
self.logger.debug(f"Stripped provider prefix from model name (stream): {original_model} -> {model}")
# Handle case when messages are provided instead of prompt (for chat_completion)
messages = kwargs.pop("messages", None)
# If neither prompt nor messages are provided, raise an error
if prompt is None and not messages:
raise ValueError("Either 'prompt' or 'messages' must be provided")
# Create messages if not already provided
if not messages:
messages = [{"role": "user", "content": prompt}]
# Prepare API call parameters
params = {
"model": model,
"messages": messages,
"temperature": temperature,
"stream": True,
}
# Add max_tokens if specified
if max_tokens is not None:
params["max_tokens"] = max_tokens
# Check for json_mode flag and remove it from kwargs
json_mode = kwargs.pop("json_mode", False)
if json_mode:
# Use the correct response_format for JSON mode
params["response_format"] = {"type": "json_object"}
self.logger.debug("Setting response_format to JSON mode for OpenAI streaming")
# Add any remaining additional parameters
params.update(kwargs)
# Log request
prompt_length = len(prompt) if prompt else sum(len(m.get("content", "")) for m in messages)
self.logger.info(
f"Generating streaming completion with OpenAI model {model}",
emoji_key=self.provider_name,
prompt_length=prompt_length,
json_mode=json_mode # Log if json_mode was requested
)
start_time = time.time()
total_chunks = 0
try:
# Make streaming API call
stream = await self.client.chat.completions.create(**params)
# Process the stream
async for chunk in stream:
total_chunks += 1
# Extract content from the chunk
delta = chunk.choices[0].delta
content = delta.content or ""
# Metadata for this chunk
metadata = {
"model": model,
"provider": self.provider_name,
"chunk_index": total_chunks,
"finish_reason": chunk.choices[0].finish_reason,
}
yield content, metadata
# Log success
processing_time = time.time() - start_time
self.logger.success(
"OpenAI streaming completion successful",
emoji_key="success",
model=model,
chunks=total_chunks,
time=processing_time
)
except Exception as e:
self.logger.error(
f"OpenAI streaming completion failed: {str(e)}",
emoji_key="error",
model=model
)
raise
async def list_models(self) -> List[Dict[str, Any]]:
"""
List available OpenAI models with their capabilities and metadata.
This method queries the OpenAI API to retrieve a comprehensive list of available
models accessible to the current API key. It filters the results to focus on
GPT models that are relevant to text generation tasks, excluding embeddings,
moderation, and other specialized models.
For efficiency, the method uses a caching mechanism that stores the model list
after the first successful API call. Subsequent calls return the cached results
without making additional API requests. This reduces latency and API usage while
ensuring the available models information is readily accessible.
If the API call fails (due to network issues, invalid credentials, etc.), the
method falls back to returning a hardcoded list of common OpenAI models to ensure
the application can continue functioning with reasonable defaults.
Returns:
A list of dictionaries containing model information with these fields:
- id: The model identifier used when making API calls (e.g., "gpt-4o")
- provider: Always "openai" for this provider
- created: Timestamp of when the model was created (if available from API)
- owned_by: Organization that owns the model (e.g., "openai", "system")
The fallback model list (used on API errors) includes basic information
for gpt-4o, gpt-4.1-mini, and other commonly used models.
Example response:
```python
[
{
"id": "gpt-4o",
"provider": "openai",
"created": 1693399330,
"owned_by": "openai"
},
{
"id": "gpt-4.1-mini",
"provider": "openai",
"created": 1705006269,
"owned_by": "openai"
}
]
```
Note:
The specific models returned depend on the API key's permissions and
the models currently offered by OpenAI. As new models are released
or existing ones deprecated, the list will change accordingly.
"""
if self.models_cache:
return self.models_cache
try:
if not self.client:
await self.initialize()
# Fetch models from API
response = await self.client.models.list()
# Process response
models = []
for model in response.data:
# Filter to relevant models (chat-capable GPT models)
if model.id.startswith("gpt-"):
models.append({
"id": model.id,
"provider": self.provider_name,
"created": model.created,
"owned_by": model.owned_by,
})
# Cache results
self.models_cache = models
return models
except Exception as e:
self.logger.error(
f"Failed to list OpenAI models: {str(e)}",
emoji_key="error"
)
# Return basic models on error
return [
{
"id": "gpt-4o",
"provider": self.provider_name,
"description": "Most capable GPT-4 model",
},
{
"id": "gpt-4.1-mini",
"provider": self.provider_name,
"description": "Smaller, efficient GPT-4 model",
},
{
"id": "gpt-4.1-mini",
"provider": self.provider_name,
"description": "Fast and cost-effective GPT model",
},
]
def get_default_model(self) -> str:
"""
Get the default OpenAI model identifier to use when none is specified.
This method determines the appropriate default model for OpenAI completions
through a prioritized selection process:
1. First, it attempts to load the default_model setting from the Ultimate MCP Server
configuration system (from providers.openai.default_model in the config)
2. If that's not available or valid, it falls back to a hardcoded default model
that represents a reasonable balance of capability, cost, and availability
Using the configuration system allows for flexible deployment-specific defaults
without code changes, while the hardcoded fallback ensures the system remains
functional even with minimal configuration.
Returns:
String identifier of the default OpenAI model to use (e.g., "gpt-4.1-mini").
This identifier can be directly used in API calls to the OpenAI API.
Note:
The current hardcoded default is "gpt-4.1-mini", chosen for its balance of
capability and cost. This may change in future versions as new models are
released or existing ones are deprecated.
"""
from ultimate_mcp_server.config import get_config
# Safely get from config if available
try:
config = get_config()
provider_config = getattr(config, 'providers', {}).get(self.provider_name, None)
if provider_config and provider_config.default_model:
return provider_config.default_model
except (AttributeError, TypeError):
# Handle case when providers attribute doesn't exist or isn't a dict
pass
# Otherwise return hard-coded default
return "gpt-4.1-mini"
async def check_api_key(self) -> bool:
"""Check if the OpenAI API key is valid.
This method performs a lightweight validation of the configured OpenAI API key
by attempting to list available models. A successful API call confirms that:
1. The API key is properly formatted and not empty
2. The key has at least read permissions on the OpenAI API
3. The API endpoint is accessible and responding
4. The account associated with the key is active and not suspended
This validation is useful when initializing the provider to ensure the API key
works before attempting to make model completion requests that might fail later.
Returns:
bool: True if the API key is valid and the API is accessible, False otherwise.
A False result may indicate an invalid key, network issues, or API service disruption.
Notes:
- This method simply calls list_models() which caches results for efficiency
- No detailed error information is returned, only a boolean success indicator
- The method silently catches all exceptions and returns False rather than raising
- For debugging key issues, check server logs for the full exception details
"""
try:
# Just list models as a simple validation
await self.list_models()
return True
except Exception:
return False
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/rag_engine.py:
--------------------------------------------------------------------------------
```python
"""RAG engine for retrieval-augmented generation."""
import time
from typing import Any, Dict, List, Optional, Set
from ultimate_mcp_server.core.models.requests import CompletionRequest
from ultimate_mcp_server.services.cache import get_cache_service
from ultimate_mcp_server.services.knowledge_base.feedback import get_rag_feedback_service
from ultimate_mcp_server.services.knowledge_base.retriever import KnowledgeBaseRetriever
from ultimate_mcp_server.services.knowledge_base.utils import (
extract_keywords,
generate_token_estimate,
)
from ultimate_mcp_server.services.prompts import get_prompt_service
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
# Default RAG prompt templates
DEFAULT_RAG_TEMPLATES = {
"rag_default": """Answer the question based only on the following context:
{context}
Question: {query}
Answer:""",
"rag_with_sources": """Answer the question based only on the following context:
{context}
Question: {query}
Provide your answer along with the source document IDs in [brackets] for each piece of information:""",
"rag_summarize": """Summarize the following context information:
{context}
Summary:""",
"rag_analysis": """Analyze the following information and provide key insights:
{context}
Query: {query}
Analysis:"""
}
class RAGEngine:
"""Engine for retrieval-augmented generation."""
def __init__(
self,
retriever: KnowledgeBaseRetriever,
provider_manager,
optimization_service=None,
analytics_service=None
):
"""Initialize the RAG engine.
Args:
retriever: Knowledge base retriever
provider_manager: Provider manager for LLM access
optimization_service: Optional optimization service for model selection
analytics_service: Optional analytics service for tracking
"""
self.retriever = retriever
self.provider_manager = provider_manager
self.optimization_service = optimization_service
self.analytics_service = analytics_service
# Initialize prompt service
self.prompt_service = get_prompt_service()
# Initialize feedback service
self.feedback_service = get_rag_feedback_service()
# Initialize cache service
self.cache_service = get_cache_service()
# Register RAG templates
for template_name, template_text in DEFAULT_RAG_TEMPLATES.items():
self.prompt_service.register_template(template_name, template_text)
logger.info("RAG engine initialized", extra={"emoji_key": "success"})
async def _select_optimal_model(self, task_info: Dict[str, Any]) -> Dict[str, Any]:
"""Select optimal model for a RAG task.
Args:
task_info: Task information
Returns:
Model selection
"""
if self.optimization_service:
try:
return await self.optimization_service.get_optimal_model(task_info)
except Exception as e:
logger.error(
f"Error selecting optimal model: {str(e)}",
extra={"emoji_key": "error"}
)
# Fallback to default models for RAG
return {
"provider": "openai",
"model": "gpt-4.1-mini"
}
async def _track_rag_metrics(
self,
knowledge_base: str,
query: str,
provider: str,
model: str,
metrics: Dict[str, Any]
) -> None:
"""Track RAG operation metrics.
Args:
knowledge_base: Knowledge base name
query: Query text
provider: Provider name
model: Model name
metrics: Operation metrics
"""
if not self.analytics_service:
return
try:
await self.analytics_service.track_operation(
operation_type="rag",
provider=provider,
model=model,
input_tokens=metrics.get("input_tokens", 0),
output_tokens=metrics.get("output_tokens", 0),
total_tokens=metrics.get("total_tokens", 0),
cost=metrics.get("cost", 0.0),
duration=metrics.get("total_time", 0.0),
metadata={
"knowledge_base": knowledge_base,
"query": query,
"retrieval_count": metrics.get("retrieval_count", 0),
"retrieval_time": metrics.get("retrieval_time", 0.0),
"generation_time": metrics.get("generation_time", 0.0)
}
)
except Exception as e:
logger.error(
f"Error tracking RAG metrics: {str(e)}",
extra={"emoji_key": "error"}
)
def _format_context(
self,
results: List[Dict[str, Any]],
include_metadata: bool = True
) -> str:
"""Format retrieval results into context.
Args:
results: List of retrieval results
include_metadata: Whether to include metadata
Returns:
Formatted context
"""
context_parts = []
for i, result in enumerate(results):
# Format metadata if included
metadata_str = ""
if include_metadata and result.get("metadata"):
# Extract relevant metadata fields
metadata_fields = []
for key in ["title", "source", "author", "date", "source_id", "potential_title"]:
if key in result["metadata"]:
metadata_fields.append(f"{key}: {result['metadata'][key]}")
if metadata_fields:
metadata_str = " | ".join(metadata_fields)
metadata_str = f"[{metadata_str}]\n"
# Add document with index
context_parts.append(f"Document {i+1} [ID: {result['id']}]:\n{metadata_str}{result['document']}")
return "\n\n".join(context_parts)
async def _adjust_retrieval_params(self, query: str, knowledge_base_name: str) -> Dict[str, Any]:
"""Dynamically adjust retrieval parameters based on query complexity.
Args:
query: Query text
knowledge_base_name: Knowledge base name
Returns:
Adjusted parameters
"""
# Analyze query complexity
query_length = len(query.split())
query_keywords = extract_keywords(query)
# Base parameters
params = {
"top_k": 5,
"retrieval_method": "vector",
"min_score": 0.6,
"search_params": {"search_ef": 100}
}
# Adjust based on query length
if query_length > 30: # Complex query
params["top_k"] = 8
params["search_params"]["search_ef"] = 200
params["retrieval_method"] = "hybrid"
elif query_length < 5: # Very short query
params["top_k"] = 10 # Get more results for short queries
params["min_score"] = 0.5 # Lower threshold
# Check if similar queries exist
similar_queries = await self.feedback_service.get_similar_queries(
knowledge_base_name=knowledge_base_name,
query=query,
top_k=1,
threshold=0.85
)
# If we have similar past queries, use their parameters
if similar_queries:
params["retrieval_method"] = "hybrid" # Hybrid works well for repeat queries
# Add keywords
params["additional_keywords"] = query_keywords
return params
async def _analyze_used_documents(
self,
answer: str,
results: List[Dict[str, Any]]
) -> Set[str]:
"""Analyze which documents were used in the answer.
Args:
answer: Generated answer
results: List of retrieval results
Returns:
Set of document IDs used in the answer
"""
used_ids = set()
# Check for explicit mentions of document IDs
for result in results:
doc_id = result["id"]
if f"[ID: {doc_id}]" in answer or f"[{doc_id}]" in answer:
used_ids.add(doc_id)
# Check content overlap (crude approximation)
for result in results:
if result["id"] in used_ids:
continue
# Check for significant phrases from document in answer
doc_keywords = extract_keywords(result["document"], max_keywords=5)
matched_keywords = sum(1 for kw in doc_keywords if kw in answer.lower())
# If multiple keywords match, consider document used
if matched_keywords >= 2:
used_ids.add(result["id"])
return used_ids
async def _check_cached_response(
self,
knowledge_base_name: str,
query: str
) -> Optional[Dict[str, Any]]:
"""Check for cached RAG response.
Args:
knowledge_base_name: Knowledge base name
query: Query text
Returns:
Cached response or None
"""
if not self.cache_service:
return None
cache_key = f"rag_{knowledge_base_name}_{query}"
try:
cached = await self.cache_service.get(cache_key)
if cached:
logger.info(
f"Using cached RAG response for query in '{knowledge_base_name}'",
extra={"emoji_key": "cache"}
)
return cached
except Exception as e:
logger.error(
f"Error checking cache: {str(e)}",
extra={"emoji_key": "error"}
)
return None
async def _cache_response(
self,
knowledge_base_name: str,
query: str,
response: Dict[str, Any]
) -> None:
"""Cache RAG response.
Args:
knowledge_base_name: Knowledge base name
query: Query text
response: Response to cache
"""
if not self.cache_service:
return
cache_key = f"rag_{knowledge_base_name}_{query}"
try:
# Cache for 1 day
await self.cache_service.set(cache_key, response, ttl=86400)
except Exception as e:
logger.error(
f"Error caching response: {str(e)}",
extra={"emoji_key": "error"}
)
async def generate_with_rag(
self,
knowledge_base_name: str,
query: str,
provider: Optional[str] = None,
model: Optional[str] = None,
template: str = "rag_default",
max_tokens: int = 1000,
temperature: float = 0.3,
top_k: Optional[int] = None,
retrieval_method: Optional[str] = None,
min_score: Optional[float] = None,
metadata_filter: Optional[Dict[str, Any]] = None,
include_metadata: bool = True,
include_sources: bool = True,
use_cache: bool = True,
apply_feedback: bool = True,
search_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Generate a response using RAG.
Args:
knowledge_base_name: Knowledge base name
query: Query text
provider: Provider name (auto-selected if None)
model: Model name (auto-selected if None)
template: RAG prompt template name
max_tokens: Maximum tokens for generation
temperature: Temperature for generation
top_k: Number of documents to retrieve (auto-adjusted if None)
retrieval_method: Retrieval method (vector, hybrid)
min_score: Minimum similarity score
metadata_filter: Optional metadata filter
include_metadata: Whether to include metadata in context
include_sources: Whether to include sources in response
use_cache: Whether to use cached responses
apply_feedback: Whether to apply feedback adjustments
search_params: Optional ChromaDB search parameters
Returns:
Generated response with sources and metrics
"""
start_time = time.time()
operation_metrics = {}
# Check cache first if enabled
if use_cache:
cached_response = await self._check_cached_response(knowledge_base_name, query)
if cached_response:
return cached_response
# Auto-select model if not specified
if not provider or not model:
# Determine task complexity based on query
task_complexity = "medium"
if len(query) > 100:
task_complexity = "high"
elif len(query) < 30:
task_complexity = "low"
# Get optimal model
model_selection = await self._select_optimal_model({
"task_type": "rag_completion",
"complexity": task_complexity,
"query_length": len(query)
})
provider = provider or model_selection["provider"]
model = model or model_selection["model"]
# Dynamically adjust retrieval parameters if not specified
if top_k is None or retrieval_method is None or min_score is None:
adjusted_params = await self._adjust_retrieval_params(query, knowledge_base_name)
# Use specified parameters or adjusted ones
top_k = top_k or adjusted_params["top_k"]
retrieval_method = retrieval_method or adjusted_params["retrieval_method"]
min_score = min_score or adjusted_params["min_score"]
search_params = search_params or adjusted_params.get("search_params")
additional_keywords = adjusted_params.get("additional_keywords")
else:
additional_keywords = None
# Retrieve context
retrieval_start = time.time()
if retrieval_method == "hybrid":
# Use hybrid search
retrieval_result = await self.retriever.retrieve_hybrid(
knowledge_base_name=knowledge_base_name,
query=query,
top_k=top_k,
min_score=min_score,
metadata_filter=metadata_filter,
additional_keywords=additional_keywords,
apply_feedback=apply_feedback,
search_params=search_params
)
else:
# Use standard vector search
retrieval_result = await self.retriever.retrieve(
knowledge_base_name=knowledge_base_name,
query=query,
top_k=top_k,
min_score=min_score,
metadata_filter=metadata_filter,
content_filter=None, # No content filter for vector-only search
apply_feedback=apply_feedback,
search_params=search_params
)
retrieval_time = time.time() - retrieval_start
operation_metrics["retrieval_time"] = retrieval_time
# Check if retrieval was successful
if retrieval_result.get("status") != "success" or not retrieval_result.get("results"):
logger.warning(
f"No relevant documents found for query in knowledge base '{knowledge_base_name}'",
extra={"emoji_key": "warning"}
)
# Return error response
error_response = {
"status": "no_results",
"message": "No relevant documents found for query",
"query": query,
"retrieval_time": retrieval_time,
"total_time": time.time() - start_time
}
# Cache error response if enabled
if use_cache:
await self._cache_response(knowledge_base_name, query, error_response)
return error_response
# Format context from retrieval results
context = self._format_context(
retrieval_result["results"],
include_metadata=include_metadata
)
# Get prompt template
template_text = self.prompt_service.get_template(template)
if not template_text:
# Fallback to default template
template_text = DEFAULT_RAG_TEMPLATES["rag_default"]
# Format prompt with template
rag_prompt = template_text.format(
context=context,
query=query
)
# Calculate token estimates
input_tokens = generate_token_estimate(rag_prompt)
operation_metrics["context_tokens"] = generate_token_estimate(context)
operation_metrics["input_tokens"] = input_tokens
operation_metrics["retrieval_count"] = len(retrieval_result["results"])
# Generate completion
generation_start = time.time()
provider_service = self.provider_manager.get_provider(provider)
completion_request = CompletionRequest(
prompt=rag_prompt,
model=model,
max_tokens=max_tokens,
temperature=temperature
)
completion_result = await provider_service.generate_completion(
request=completion_request
)
generation_time = time.time() - generation_start
operation_metrics["generation_time"] = generation_time
# Extract completion and metrics
completion = completion_result.get("completion", "")
operation_metrics["output_tokens"] = completion_result.get("output_tokens", 0)
operation_metrics["total_tokens"] = completion_result.get("total_tokens", 0)
operation_metrics["cost"] = completion_result.get("cost", 0.0)
operation_metrics["total_time"] = time.time() - start_time
# Prepare sources if requested
sources = []
if include_sources:
for result in retrieval_result["results"]:
# Include limited context for each source
doc_preview = result["document"]
if len(doc_preview) > 100:
doc_preview = doc_preview[:100] + "..."
sources.append({
"id": result["id"],
"document": doc_preview,
"score": result["score"],
"metadata": result.get("metadata", {})
})
# Analyze which documents were used in the answer
used_doc_ids = await self._analyze_used_documents(completion, retrieval_result["results"])
# Record feedback
if apply_feedback:
await self.retriever.record_feedback(
knowledge_base_name=knowledge_base_name,
query=query,
retrieved_documents=retrieval_result["results"],
used_document_ids=list(used_doc_ids)
)
# Track metrics
await self._track_rag_metrics(
knowledge_base=knowledge_base_name,
query=query,
provider=provider,
model=model,
metrics=operation_metrics
)
logger.info(
f"Generated RAG response using {provider}/{model} in {operation_metrics['total_time']:.2f}s",
extra={"emoji_key": "success"}
)
# Create response
response = {
"status": "success",
"query": query,
"answer": completion,
"sources": sources,
"knowledge_base": knowledge_base_name,
"provider": provider,
"model": model,
"used_document_ids": list(used_doc_ids),
"metrics": operation_metrics
}
# Cache response if enabled
if use_cache:
await self._cache_response(knowledge_base_name, query, response)
return response
```