This is page 13 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/examples/tournament_text_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Tournament Text Demo - Demonstrates running a text improvement tournament
This script shows how to:
1. Create a tournament with multiple models focused on text refinement
2. Track progress across multiple rounds
3. Retrieve and analyze the improved essay/text
The tournament task is to refine and improve a comparative essay on
transformer vs. diffusion model architectures, demonstrating how
the tournament system can be used for general text refinement tasks.
Usage:
python examples/tournament_text_demo.py [--topic TOPIC]
Options:
--topic TOPIC Specify a different essay topic (default: transformers vs diffusion models)
"""
import argparse
import asyncio
import json
import os
import re
import sys
from collections import namedtuple
from pathlib import Path
from typing import Any, Dict, List, Optional
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table
from ultimate_mcp_server.core.models.requests import CompletionRequest
from ultimate_mcp_server.core.providers.base import get_provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.services.prompts import PromptTemplate
from ultimate_mcp_server.tools.tournament import (
create_tournament,
get_tournament_results,
get_tournament_status,
)
from ultimate_mcp_server.utils import get_logger, process_mcp_result
from ultimate_mcp_server.utils.display import (
CostTracker,
display_tournament_results,
display_tournament_status,
)
from ultimate_mcp_server.utils.logging.console import console
DEFAULT_MODEL_CONFIGS_TEXT: List[Dict[str, Any]] = [
{
"model_id": "openai/gpt-4o-mini",
"diversity_count": 1,
"temperature": 0.75,
},
{
"model_id": "anthropic/claude-3-5-haiku-20241022",
"diversity_count": 1,
"temperature": 0.7,
},
]
DEFAULT_NUM_ROUNDS_TEXT = 2
DEFAULT_TOURNAMENT_NAME_TEXT = "Advanced Text Refinement Tournament"
def parse_arguments_text():
parser = argparse.ArgumentParser(description="Run a text refinement tournament demo")
parser.add_argument(
"--topic", type=str, default="transformer_vs_diffusion",
choices=list(TOPICS.keys()) + ["custom"],
help="Essay topic (default: transformer_vs_diffusion)"
)
parser.add_argument(
"--custom-topic", type=str,
help="Custom essay topic (used when --topic=custom)"
)
parser.add_argument(
"--rounds", type=int, default=DEFAULT_NUM_ROUNDS_TEXT,
help=f"Number of tournament rounds (default: {DEFAULT_NUM_ROUNDS_TEXT})"
)
parser.add_argument(
"--models", type=str, nargs="+",
default=[mc["model_id"] for mc in DEFAULT_MODEL_CONFIGS_TEXT],
help="List of model IDs to participate."
)
return parser.parse_args()
# Initialize logger using get_logger
logger = get_logger("example.tournament_text")
# Create a simple structure for cost tracking from dict (tokens might be missing)
TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])
# Initialize global gateway
gateway: Optional[Gateway] = None
# --- Configuration ---
# Adjust model IDs based on your configured providers
MODEL_IDS = [
"openai:gpt-4.1-mini",
"deepseek:deepseek-chat",
"gemini:gemini-2.5-pro-preview-03-25"
]
NUM_ROUNDS = 2 # Changed from 3 to 2 for faster execution and debugging
TOURNAMENT_NAME = "Text Refinement Tournament Demo" # More generic name
# The generic essay prompt template
TEMPLATE_TEXT = """
# GENERIC TEXT TOURNAMENT PROMPT TEMPLATE
Please write a high-quality, comprehensive {{content_type}} on the topic of: "{{topic}}".
{{context}}
Your {{content_type}} should thoroughly explore the following sections and subtopics:
{% for section in sections %}
## {{section.title}}
{% for subtopic in section.subtopics %}
- {{subtopic}}
{% endfor %}
{% endfor %}
Adhere to the following style and content requirements:
{{style_requirements}}
Please provide only the {{content_type}} text. If you have meta-comments or a thinking process,
enclose it in <thinking>...</thinking> tags at the very beginning of your response.
"""
# Define predefined topics
TOPICS = {
"transformer_vs_diffusion": {
"content_type": "technical essay",
"topic": "comparing transformer architecture and diffusion models",
"context": "Focus on their underlying mechanisms, common applications, strengths, weaknesses, and future potential in AI.",
"sections": [
{"title": "Core Principles", "subtopics": ["Transformer self-attention, positional encoding", "Diffusion forward/reverse processes, noise schedules"]},
{"title": "Applications & Performance", "subtopics": ["Typical tasks for transformers (NLP, vision)", "Typical tasks for diffusion models (image/audio generation)", "Comparative performance benchmarks or known strengths"]},
{"title": "Limitations & Challenges", "subtopics": ["Computational costs, data requirements", "Interpretability, controllability, known failure modes for each"]},
{"title": "Future Outlook", "subtopics": ["Potential for hybridization", "Scaling frontiers", "Impact on AGI research"]}
],
"style_requirements": "Write in a clear, objective, and technically precise manner suitable for an audience with a machine learning background. Aim for around 800-1200 words."
},
"llm_vs_traditional_ai": {
"content_type": "comparative analysis",
"topic": "comparing large language models to traditional AI approaches",
"context": "The rise of large language models has shifted the AI landscape significantly.",
"sections": [
{
"title": "Fundamental Differences",
"subtopics": [
"How LLMs differ architecturally from traditional ML/AI systems",
"Data requirements and training approaches"
]
},
{
"title": "Capabilities and Limitations",
"subtopics": [
"Tasks where LLMs excel compared to traditional approaches",
"Situations where traditional AI methods remain superior",
"Emergent capabilities unique to large language models"
]
},
{
"title": "Real-world Applications",
"subtopics": [
"Industries being transformed by LLMs",
"Where traditional AI approaches continue to dominate",
"Examples of hybrid systems combining both approaches"
]
},
{
"title": "Future Outlook",
"subtopics": [
"Projected evolution of both paradigms",
"Potential convergence or further divergence",
"Research frontiers for each approach"
]
}
],
"style_requirements": "Present a balanced analysis that acknowledges the strengths and weaknesses of both paradigms. Support claims with specific examples where possible."
}
}
# Create custom topic template
def create_custom_topic_variables(topic_description):
"""Create a simple custom topic with standard sections"""
return {
"content_type": "essay",
"topic": topic_description,
"context": "",
"sections": [
{
"title": "Background and Key Concepts",
"subtopics": [
"Define and explain the core elements of the topic",
"Provide necessary historical or theoretical context"
]
},
{
"title": "Analysis of Main Aspects",
"subtopics": [
"Examine the primary dimensions or elements of the topic",
"Discuss relationships between different aspects",
"Identify patterns or trends relevant to the topic"
]
},
{
"title": "Practical Implications",
"subtopics": [
"Real-world applications or impacts",
"How this topic affects related fields or domains"
]
},
{
"title": "Future Perspectives",
"subtopics": [
"Emerging trends or developments",
"Potential challenges and opportunities",
"Areas requiring further research or exploration"
]
}
],
"style_requirements": "Present a comprehensive and well-structured analysis with clear reasoning and specific examples where appropriate."
}
# Create the prompt template object
essay_template = PromptTemplate(
template=TEMPLATE_TEXT,
template_id="text_tournament_template",
description="A template for text tournament prompts",
required_vars=["content_type", "topic", "context", "sections", "style_requirements"]
)
# --- Helper Functions ---
def parse_result(result):
"""Parse the result from a tool call into a usable dictionary.
Handles various return types from MCP tools.
"""
try:
# Handle TextContent object (which has a .text attribute)
if hasattr(result, 'text'):
try:
# Try to parse the text as JSON
return json.loads(result.text)
except json.JSONDecodeError:
# Return the raw text if not JSON
return {"text": result.text}
# Handle list result
if isinstance(result, list):
if result:
first_item = result[0]
if hasattr(first_item, 'text'):
try:
return json.loads(first_item.text)
except json.JSONDecodeError:
return {"text": first_item.text}
else:
return first_item
return {}
# Handle dictionary directly
if isinstance(result, dict):
return result
# Handle other potential types or return error
else:
return {"error": f"Unexpected result type: {type(result)}"}
except Exception as e:
return {"error": f"Error parsing result: {str(e)}"}
async def setup_gateway():
"""Set up the gateway for demonstration."""
global gateway
# Create gateway instance
logger.info("Initializing gateway for demonstration", emoji_key="start")
gateway = Gateway("text-tournament-demo", register_tools=False)
# Initialize the server with all providers and built-in tools
await gateway._initialize_providers()
# Manually register tournament tools
mcp = gateway.mcp
mcp.tool()(create_tournament)
mcp.tool()(get_tournament_status)
mcp.tool()(get_tournament_results)
logger.info("Manually registered tournament tools.")
# Verify tools are registered
tools = await gateway.mcp.list_tools()
tournament_tools = [t.name for t in tools if t.name.startswith('tournament') or 'tournament' in t.name]
logger.info(f"Registered tournament tools: {tournament_tools}", emoji_key="info")
if not any('tournament' in t.lower() for t in [t.name for t in tools]):
logger.warning("No tournament tools found. Make sure tournament plugins are registered.", emoji_key="warning")
logger.success("Gateway initialized", emoji_key="success")
async def poll_tournament_status(tournament_id: str, storage_path: Optional[str] = None, interval: int = 5) -> Optional[str]:
"""Poll the tournament status until it reaches a final state.
Args:
tournament_id: ID of the tournament to poll
storage_path: Optional storage path to avoid tournament not found issues
interval: Time between status checks in seconds
"""
logger.info(f"Polling status for tournament {tournament_id}...", emoji_key="poll")
final_states = ["COMPLETED", "FAILED", "CANCELLED"]
# Add direct file polling capability to handle case where tournament manager can't find the tournament
if storage_path:
storage_dir = Path(storage_path)
state_file = storage_dir / "tournament_state.json"
logger.debug(f"Will check tournament state file directly at: {state_file}")
while True:
status_input = {"tournament_id": tournament_id}
status_result = await gateway.mcp.call_tool("get_tournament_status", status_input)
status_data = await process_mcp_result(status_result)
if "error" in status_data:
# If tournament manager couldn't find the tournament but we have the storage path,
# try to read the state file directly (this is a fallback mechanism)
if storage_path and "not found" in status_data.get("error", "").lower():
try:
logger.debug(f"Attempting to read tournament state directly from: {state_file}")
if state_file.exists():
with open(state_file, 'r', encoding='utf-8') as f:
direct_status_data = json.load(f)
status = direct_status_data.get("status")
current_round = direct_status_data.get("current_round", 0)
total_rounds = direct_status_data.get("config", {}).get("rounds", 0)
# Create a status object compatible with our display function
status_data = {
"tournament_id": tournament_id,
"status": status,
"current_round": current_round,
"total_rounds": total_rounds,
"storage_path": storage_path
}
logger.debug(f"Successfully read direct state: {status}")
else:
logger.warning(f"State file not found at: {state_file}")
except Exception as e:
logger.error(f"Error reading state file directly: {e}")
logger.error(f"Error fetching status: {status_data['error']}", emoji_key="error")
return None # Indicate error during polling
else:
# Standard error case
logger.error(f"Error fetching status: {status_data['error']}", emoji_key="error")
return None # Indicate error during polling
# Display improved status using the imported function
display_tournament_status(status_data)
status = status_data.get("status")
if status in final_states:
logger.success(f"Tournament reached final state: {status}", emoji_key="success")
return status
await asyncio.sleep(interval)
def extract_thinking(text: str) -> str:
"""Extract <thinking> tags content (simple version)."""
match = re.search(r"<thinking>(.*?)</thinking>", text, re.DOTALL)
return match.group(1).strip() if match else ""
def analyze_text_quality(text: str) -> Dict[str, Any]:
"""Basic text quality analysis."""
word_count = len(text.split())
# Add more metrics later (readability, sentiment, etc.)
return {"word_count": word_count}
async def evaluate_essays(essays_by_model: Dict[str, str], tracker: CostTracker = None) -> Dict[str, Any]:
"""Use LLM to evaluate which essay is the best.
Args:
essays_by_model: Dictionary mapping model IDs to their essay texts
tracker: Optional CostTracker to track API call costs
Returns:
Dictionary with evaluation results
"""
if not essays_by_model or len(essays_by_model) < 2:
return {"error": "Not enough essays to compare"}
eval_cost = 0.0 # Initialize evaluation cost
try:
# Format the essays for evaluation
evaluation_prompt = "# Essay Evaluation\n\nPlease analyze the following essays on the same topic and determine which one is the best. "
evaluation_prompt += "Consider factors such as technical accuracy, clarity, organization, depth of analysis, and overall quality.\n\n"
# Add each essay
for i, (model_id, essay) in enumerate(essays_by_model.items(), 1):
display_model = model_id.split(':')[-1] if ':' in model_id else model_id
# Limit each essay to 3000 chars to fit context windows
truncated_essay = essay[:3000]
if len(essay) > 3000:
truncated_essay += "..."
evaluation_prompt += f"## Essay {i} (by {display_model})\n\n{truncated_essay}\n\n"
evaluation_prompt += "\n# Your Evaluation Task\n\n"
evaluation_prompt += "1. Rank the essays from best to worst\n"
evaluation_prompt += "2. Explain your reasoning for the ranking\n"
evaluation_prompt += "3. Highlight specific strengths of the best essay\n"
evaluation_prompt += "4. Suggest one improvement for each essay\n"
# Use a more capable model for evaluation
model_to_use = "gemini:gemini-2.5-pro-preview-03-25"
logger.info(f"Evaluating essays using {model_to_use}...", emoji_key="evaluate")
# Get the provider
provider_id = model_to_use.split(':')[0]
provider = await get_provider(provider_id)
if not provider:
return {
"error": f"Provider {provider_id} not available for evaluation",
"model_used": model_to_use,
"eval_prompt": evaluation_prompt,
"cost": 0.0
}
# Generate completion for evaluation with timeout
try:
request = CompletionRequest(prompt=evaluation_prompt, model=model_to_use)
# Set a timeout for the completion request
completion_task = provider.generate_completion(
prompt=request.prompt,
model=request.model
)
# 45 second timeout for evaluation
completion_result = await asyncio.wait_for(completion_task, timeout=45)
# Track API call if tracker provided
if tracker:
tracker.add_call(completion_result)
# Accumulate cost
if hasattr(completion_result, 'cost'):
eval_cost = completion_result.cost
elif hasattr(completion_result, 'metrics') and isinstance(completion_result.metrics, dict):
eval_cost = completion_result.metrics.get('cost', 0.0)
# Prepare result dict
result = {
"evaluation": completion_result.text,
"model_used": model_to_use,
"eval_prompt": evaluation_prompt,
"cost": eval_cost # Return the cost
}
except asyncio.TimeoutError:
logger.warning(f"Evaluation with {model_to_use} timed out after 45 seconds", emoji_key="warning")
return {
"error": "Evaluation timed out after 45 seconds",
"model_used": model_to_use,
"eval_prompt": evaluation_prompt,
"cost": 0.0
}
except Exception as request_error:
logger.error(f"Error during model request: {str(request_error)}", emoji_key="error")
return {
"error": f"Error during model request: {str(request_error)}",
"model_used": model_to_use,
"eval_prompt": evaluation_prompt,
"cost": 0.0
}
except Exception as e:
logger.error(f"Essay evaluation failed: {str(e)}", emoji_key="error", exc_info=True)
return {
"error": str(e),
"model_used": model_to_use if 'model_to_use' in locals() else "unknown",
"eval_prompt": evaluation_prompt if 'evaluation_prompt' in locals() else "Error generating prompt",
"cost": 0.0
}
return result
async def calculate_tournament_costs(rounds_results, evaluation_cost=None):
"""Calculate total costs of the tournament by model and grand total.
Args:
rounds_results: List of round results data from tournament results
evaluation_cost: Optional cost of the final evaluation step
Returns:
Dictionary with cost information
"""
model_costs = {}
total_cost = 0.0
# Process costs for each round
for _round_idx, round_data in enumerate(rounds_results):
responses = round_data.get('responses', {})
for model_id, response in responses.items():
metrics = response.get('metrics', {})
cost = metrics.get('cost', 0.0)
# Convert to float if it's a string
if isinstance(cost, str):
try:
cost = float(cost.replace('$', ''))
except (ValueError, TypeError):
cost = 0.0
# Initialize model if not present
if model_id not in model_costs:
model_costs[model_id] = 0.0
# Add to model total and grand total
model_costs[model_id] += cost
total_cost += cost
# Add evaluation cost if provided
if evaluation_cost:
total_cost += evaluation_cost
model_costs['evaluation'] = evaluation_cost
return {
'model_costs': model_costs,
'total_cost': total_cost
}
# --- Main Script Logic ---
async def run_tournament_demo(tracker: CostTracker):
"""Run the text tournament demo."""
# Parse command line arguments
args = parse_arguments_text()
# Determine which topic to use
if args.topic == "custom" and args.custom_topic:
# Custom topic provided via command line
topic_name = "custom"
essay_variables = create_custom_topic_variables(args.custom_topic)
topic_description = args.custom_topic
log_topic_info = f"Using custom topic: [yellow]{escape(topic_description)}[/yellow]"
elif args.topic in TOPICS:
# Use one of the predefined topics
topic_name = args.topic
essay_variables = TOPICS[args.topic]
topic_description = essay_variables["topic"]
log_topic_info = f"Using predefined topic: [yellow]{escape(topic_description)}[/yellow]"
else:
# Default to transformer vs diffusion if topic not recognized
topic_name = "transformer_vs_diffusion"
essay_variables = TOPICS[topic_name]
topic_description = essay_variables['topic']
log_topic_info = f"Using default topic: [yellow]{escape(topic_description)}[/yellow]"
# Use Rich Rule for title
console.print(Rule(f"[bold blue]{TOURNAMENT_NAME} - {topic_name.replace('_', ' ').title()}[/bold blue]"))
console.print(log_topic_info)
console.print(f"Models: [cyan]{', '.join(MODEL_IDS)}[/cyan]")
console.print(f"Rounds: [cyan]{NUM_ROUNDS}[/cyan]")
# Render the template
try:
rendered_prompt = essay_template.render(essay_variables)
logger.info(f"Template rendered for topic: {topic_name}", emoji_key="template")
# Show prompt preview in a Panel
prompt_preview = rendered_prompt.split("\n")[:10] # Show more lines
preview_text = "\n".join(prompt_preview) + "\n..."
console.print(Panel(escape(preview_text), title="[bold]Rendered Prompt Preview[/bold]", border_style="dim blue", expand=False))
except Exception as e:
logger.error(f"Template rendering failed: {str(e)}", emoji_key="error", exc_info=True)
# Log template and variables for debugging using logger
logger.debug(f"Template: {TEMPLATE_TEXT}")
logger.debug(f"Variables: {escape(str(essay_variables))}") # Escape potentially complex vars
return 1
# 1. Create the tournament
# Prepare model configurations
# Default temperature from DEFAULT_MODEL_CONFIGS_TEXT, assuming it's a common parameter.
# The create_tournament tool itself will parse these against InputModelConfig.
model_configs = [{"model_id": mid, "diversity_count": 1, "temperature": 0.7 } for mid in MODEL_IDS]
create_input = {
"name": f"{TOURNAMENT_NAME} - {topic_name.replace('_', ' ').title()}",
"prompt": rendered_prompt,
"models": model_configs, # Changed from model_ids to models
"rounds": NUM_ROUNDS,
"tournament_type": "text"
}
try:
logger.info("Creating tournament...", emoji_key="processing")
create_result = await gateway.mcp.call_tool("create_tournament", create_input)
create_data = await process_mcp_result(create_result)
if "error" in create_data:
error_msg = create_data.get("error", "Unknown error")
logger.error(f"Failed to create tournament: {error_msg}. Exiting.", emoji_key="error")
return 1
tournament_id = create_data.get("tournament_id")
if not tournament_id:
logger.error("No tournament ID returned. Exiting.", emoji_key="error")
return 1
# Extract storage path for reference
storage_path = create_data.get("storage_path")
logger.info(f"Tournament created with ID: {tournament_id}", emoji_key="tournament")
if storage_path:
logger.info(f"Tournament storage path: {storage_path}", emoji_key="path")
# Add a small delay to ensure the tournament state is saved before proceeding
await asyncio.sleep(2)
# 2. Poll for status
final_status = await poll_tournament_status(tournament_id, storage_path)
# 3. Fetch and display final results
if final_status == "COMPLETED":
logger.info("Fetching final results...", emoji_key="results")
results_input = {"tournament_id": tournament_id}
final_results = await gateway.mcp.call_tool("get_tournament_results", results_input)
results_data = await process_mcp_result(final_results)
if "error" not in results_data:
# Use the imported display function for tournament results
display_tournament_results(results_data)
# Track aggregated tournament cost (excluding separate evaluation)
if isinstance(results_data, dict) and "cost" in results_data:
try:
total_cost = results_data.get("cost", {}).get("total_cost", 0.0)
processing_time = results_data.get("total_processing_time", 0.0)
trackable = TrackableResult(
cost=total_cost,
input_tokens=0,
output_tokens=0,
provider="tournament",
model="text_tournament",
processing_time=processing_time
)
tracker.add_call(trackable)
logger.info(f"Tracked tournament cost: ${total_cost:.6f}", emoji_key="cost")
except Exception as track_err:
logger.warning(f"Could not track tournament cost: {track_err}", exc_info=False)
# Analyze round progression if available
rounds_results = results_data.get('rounds_results', [])
if rounds_results:
console.print(Rule("[bold blue]Essay Evolution Analysis[/bold blue]"))
for round_idx, round_data in enumerate(rounds_results):
console.print(f"[bold]Round {round_idx} Analysis:[/bold]")
responses = round_data.get('responses', {})
round_table = Table(box=box.MINIMAL, show_header=True, expand=False)
round_table.add_column("Model", style="magenta")
round_table.add_column("Word Count", style="green", justify="right")
has_responses = False
for model_id, response in responses.items():
display_model = escape(model_id.split(':')[-1])
response_text = response.get('response_text', '')
if response_text:
has_responses = True
metrics = analyze_text_quality(response_text)
round_table.add_row(
display_model,
str(metrics['word_count'])
)
if has_responses:
console.print(round_table)
else:
console.print("[dim]No valid responses recorded for this round.[/dim]")
console.print() # Add space between rounds
# Evaluate final essays using LLM
final_round = rounds_results[-1]
final_responses = final_round.get('responses', {})
# Track evaluation cost
evaluation_cost = 0.0
if final_responses:
console.print(Rule("[bold blue]AI Evaluation of Essays[/bold blue]"))
console.print("[bold]Evaluating final essays...[/bold]")
essays_by_model = {}
for model_id, response in final_responses.items():
essays_by_model[model_id] = response.get('response_text', '')
evaluation_result = await evaluate_essays(essays_by_model, tracker)
if "error" not in evaluation_result:
console.print(Panel(
escape(evaluation_result["evaluation"]),
title=f"[bold]Essay Evaluation (by {evaluation_result['model_used'].split(':')[-1]})[/bold]",
border_style="green",
expand=False
))
# Track evaluation cost separately
if evaluation_cost > 0:
try:
trackable_eval = TrackableResult(
cost=evaluation_cost,
input_tokens=0, # Tokens for eval not easily available here
output_tokens=0,
provider=evaluation_result['model_used'].split(':')[0],
model=evaluation_result['model_used'].split(':')[-1],
processing_time=0 # Eval time not tracked here
)
tracker.add_call(trackable_eval)
except Exception as track_err:
logger.warning(f"Could not track evaluation cost: {track_err}", exc_info=False)
# Save evaluation result to a file in the tournament directory
if storage_path:
try:
evaluation_file = os.path.join(storage_path, "essay_evaluation.md")
with open(evaluation_file, "w", encoding="utf-8") as f:
f.write(f"# Essay Evaluation by {evaluation_result['model_used']}\n\n")
f.write(evaluation_result["evaluation"])
logger.info(f"Evaluation saved to {evaluation_file}", emoji_key="save")
except Exception as e:
logger.warning(f"Could not save evaluation to file: {str(e)}", emoji_key="warning")
# Track evaluation cost if available
evaluation_cost = evaluation_result.get('cost', 0.0)
logger.info(f"Evaluation cost: ${evaluation_cost:.6f}", emoji_key="cost")
else:
console.print(f"[yellow]Could not evaluate essays: {evaluation_result.get('error')}[/yellow]")
# Try with fallback model if Gemini fails
if "gemini" in evaluation_result.get("model_used", ""):
console.print("[bold]Trying evaluation with fallback model (gpt-4.1-mini)...[/bold]")
# Switch to OpenAI model as backup
essays_by_model_limited = {}
# Limit content size to avoid token limits
for model_id, essay in essays_by_model.items():
essays_by_model_limited[model_id] = essay[:5000] # Shorter excerpt to fit in context
fallback_evaluation = {
"model_used": "openai:gpt-4.1-mini",
"eval_prompt": evaluation_result.get("eval_prompt", "Evaluation failed")
}
try:
provider_id = "openai"
provider = await get_provider(provider_id)
if provider:
# Create a shorter, simplified prompt
simple_prompt = "Compare these essays and rank them from best to worst:\n\n"
for i, (model_id, essay) in enumerate(essays_by_model_limited.items(), 1):
display_model = model_id.split(':')[-1] if ':' in model_id else model_id
simple_prompt += f"Essay {i} ({display_model}):\n{essay[:2000]}...\n\n"
request = CompletionRequest(prompt=simple_prompt, model="openai:gpt-4.1-mini")
completion_result = await provider.generate_completion(
prompt=request.prompt,
model=request.model
)
fallback_evaluation["evaluation"] = completion_result.text
# Track fallback evaluation cost
if completion_result.cost > 0:
try:
trackable_fallback = TrackableResult(
cost=completion_result.cost,
input_tokens=0,
output_tokens=0,
provider="openai",
model="gpt-4.1-mini",
processing_time=0 # Eval time not tracked
)
tracker.add_call(trackable_fallback)
except Exception as track_err:
logger.warning(f"Could not track fallback evaluation cost: {track_err}", exc_info=False)
logger.info(f"Fallback evaluation cost: ${completion_result.cost:.6f}", emoji_key="cost")
console.print(Panel(
escape(fallback_evaluation["evaluation"]),
title="[bold]Fallback Evaluation (by gpt-4.1-mini)[/bold]",
border_style="yellow",
expand=False
))
# Save fallback evaluation to file
if storage_path:
try:
fallback_eval_file = os.path.join(storage_path, "fallback_evaluation.md")
with open(fallback_eval_file, "w", encoding="utf-8") as f:
f.write("# Fallback Essay Evaluation by gpt-4.1-mini\n\n")
f.write(fallback_evaluation["evaluation"])
logger.info(f"Fallback evaluation saved to {fallback_eval_file}", emoji_key="save")
except Exception as e:
logger.warning(f"Could not save fallback evaluation: {str(e)}", emoji_key="warning")
else:
console.print("[red]Fallback model unavailable[/red]")
except Exception as fallback_error:
console.print(f"[red]Fallback evaluation failed: {str(fallback_error)}[/red]")
# Find and highlight comparison file for final round
comparison_file = final_round.get('comparison_file_path')
if comparison_file:
console.print(Panel(
f"Check the final comparison file for the full essay text and detailed round comparisons:\n[bold yellow]{escape(comparison_file)}[/bold yellow]",
title="[bold]Final Comparison File[/bold]",
border_style="yellow",
expand=False
))
else:
logger.warning("Could not find path to final comparison file in results", emoji_key="warning")
# Display cost summary
costs = await calculate_tournament_costs(rounds_results, evaluation_cost)
model_costs = costs.get('model_costs', {})
total_cost = costs.get('total_cost', 0.0)
console.print(Rule("[bold blue]Tournament Cost Summary[/bold blue]"))
cost_table = Table(box=box.MINIMAL, show_header=True, expand=False)
cost_table.add_column("Model", style="magenta")
cost_table.add_column("Total Cost", style="green", justify="right")
# Add model costs to table
for model_id, cost in sorted(model_costs.items()):
if model_id == 'evaluation':
display_model = "Evaluation"
else:
display_model = model_id.split(':')[-1] if ':' in model_id else model_id
cost_table.add_row(
display_model,
f"${cost:.6f}"
)
# Add grand total
cost_table.add_row(
"[bold]GRAND TOTAL[/bold]",
f"[bold]${total_cost:.6f}[/bold]"
)
console.print(cost_table)
# Save cost summary to file
if storage_path:
try:
cost_file = os.path.join(storage_path, "cost_summary.md")
with open(cost_file, "w", encoding="utf-8") as f:
f.write("# Tournament Cost Summary\n\n")
f.write("## Per-Model Costs\n\n")
for model_id, cost in sorted(model_costs.items()):
if model_id == 'evaluation':
display_model = "Evaluation"
else:
display_model = model_id.split(':')[-1] if ':' in model_id else model_id
f.write(f"- **{display_model}**: ${cost:.6f}\n")
f.write("\n## Grand Total\n\n")
f.write(f"**TOTAL COST**: ${total_cost:.6f}\n")
logger.info(f"Cost summary saved to {cost_file}", emoji_key="save")
except Exception as e:
logger.warning(f"Could not save cost summary: {str(e)}", emoji_key="warning")
else:
logger.error(f"Could not fetch final results: {results_data.get('error', 'Unknown error')}", emoji_key="error")
elif final_status:
logger.warning(f"Tournament ended with status {final_status}. Check logs or status details for more info.", emoji_key="warning")
except Exception as e:
logger.error(f"Error in tournament demo: {str(e)}", emoji_key="error", exc_info=True)
return 1
# Display cost summary at the end
tracker.display_summary(console)
logger.success("Text Tournament Demo Finished", emoji_key="complete")
console.print(Panel(
"To view full essays and detailed comparisons, check the storage directory indicated in the results summary.",
title="[bold]Next Steps[/bold]",
border_style="dim green",
expand=False
))
return 0
async def main():
"""Run the tournament demo."""
tracker = CostTracker() # Instantiate tracker
try:
# Set up gateway
await setup_gateway()
# Run the demo
return await run_tournament_demo(tracker) # Pass tracker
except Exception as e:
logger.critical(f"Demo failed: {str(e)}", emoji_key="critical", exc_info=True)
return 1
finally:
# Clean up
if gateway:
pass # No cleanup needed for Gateway instance
if __name__ == "__main__":
# Run the demo
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/meta_api_tool.py:
--------------------------------------------------------------------------------
```python
"""API Meta-Tool for dynamically exposing FastAPI endpoints via MCP.
This module provides a tool for automatically discovering and integrating
FastAPI-compatible REST APIs into the MCP server by pointing it at the
FastAPI server's OpenAPI specification (e.g., /openapi.json).
Usage Examples:
1. Register an API:
```python
result = await client.tools.register_api(
api_name="petstore",
openapi_url="https://petstore.swagger.io/v2/swagger.json"
)
print(f"Registered {result['tools_count']} tools for the Petstore API")
```
2. List all registered APIs:
```python
apis = await client.tools.list_registered_apis()
for api_name, api_info in apis["apis"].items():
print(f"{api_name}: {api_info['tools_count']} tools")
```
3. Call a dynamically registered tool:
```python
# Get a pet by ID
pet = await client.tools.call_dynamic_tool(
tool_name="petstore_getPetById",
inputs={"petId": 123}
)
print(f"Pet name: {pet['name']}")
# Add a new pet
new_pet = await client.tools.call_dynamic_tool(
tool_name="petstore_addPet",
inputs={
"body": {
"id": 0,
"name": "Fluffy",
"status": "available"
}
}
)
print(f"Added pet with ID: {new_pet['id']}")
```
4. Unregister an API:
```python
result = await client.tools.unregister_api(api_name="petstore")
print(f"Unregistered {result['tools_count']} tools")
```
"""
import asyncio
import json
import re
import time
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
import httpx
from ultimate_mcp_server.exceptions import ToolError, ToolInputError
from ultimate_mcp_server.services.cache import with_cache
from ultimate_mcp_server.tools.base import (
with_error_handling,
with_state_management,
with_tool_metrics,
)
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.tools.meta_api")
async def fetch_openapi_spec(
url: str, timeout: float = 30.0, headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Fetches the OpenAPI spec from the given URL.
Args:
url: URL of the OpenAPI spec (typically ending in /openapi.json)
timeout: Timeout for the HTTP request in seconds
headers: Optional headers to include in the request (e.g., for authentication)
Returns:
Parsed OpenAPI spec as a dictionary
Raises:
ToolError: If the fetch or parsing fails
"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=timeout, headers=headers)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise ToolError(
f"Failed to fetch OpenAPI spec: HTTP {e.response.status_code}",
details={"url": url, "status_code": e.response.status_code},
) from e
except httpx.RequestError as e:
raise ToolError(
f"Failed to fetch OpenAPI spec: {str(e)}", details={"url": url, "error": str(e)}
) from e
except json.JSONDecodeError as e:
raise ToolError(
f"Failed to parse OpenAPI spec as JSON: {str(e)}", details={"url": url, "error": str(e)}
) from e
def extract_endpoint_info(openapi_spec: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extracts endpoint information from an OpenAPI spec.
Args:
openapi_spec: Parsed OpenAPI spec as a dictionary
Returns:
List of dictionaries containing endpoint information, each with keys:
- path: The endpoint path
- method: The HTTP method (GET, POST, etc.)
- operation_id: The operationId from the spec (used as tool name)
- parameters: List of parameter objects
- request_body: Request body schema (if applicable)
- responses: Response schemas
- summary: Endpoint summary
- description: Endpoint description
"""
endpoints = []
paths = openapi_spec.get("paths", {})
for path, path_item in paths.items():
for method, operation in path_item.items():
if method.lower() not in ["get", "post", "put", "delete", "patch"]:
continue # Skip non-HTTP methods like "parameters"
# Extract operation ID (fall back to generating one if not provided)
operation_id = operation.get("operationId")
if not operation_id:
# Generate operation ID from path and method
path_parts = [p for p in path.split("/") if p and not p.startswith("{")]
if path_parts:
operation_id = f"{method.lower()}_{path_parts[-1]}"
else:
operation_id = f"{method.lower()}_root"
# Ensure operation_id is a valid Python identifier
operation_id = re.sub(r"[^a-zA-Z0-9_]", "_", operation_id)
if operation_id[0].isdigit():
operation_id = f"op_{operation_id}"
# Extract parameters
parameters = []
# Include parameters from the path item
if "parameters" in path_item:
parameters.extend(path_item["parameters"])
# Include parameters from the operation (overriding path item parameters if same name)
if "parameters" in operation:
# Remove any path item parameters with the same name
path_param_names = {
p["name"] for p in path_item.get("parameters", []) if "name" in p
}
op_params = []
for p in operation["parameters"]:
if p.get("name") in path_param_names:
# This parameter overrides a path item parameter
parameters = [
param for param in parameters if param.get("name") != p.get("name")
]
op_params.append(p)
parameters.extend(op_params)
# Extract request body schema
request_body = None
if "requestBody" in operation:
request_body = operation["requestBody"]
# Extract response schemas
responses = operation.get("responses", {})
endpoints.append(
{
"path": path,
"method": method.lower(),
"operation_id": operation_id,
"parameters": parameters,
"request_body": request_body,
"responses": responses,
"summary": operation.get("summary", ""),
"description": operation.get("description", ""),
"tags": operation.get("tags", []),
}
)
return endpoints
def generate_tool_function_code(
endpoint_info: Dict[str, Any],
base_url: str,
api_name: str,
cache_ttl: Optional[int] = None,
auth_header: Optional[str] = None,
) -> str:
"""Generates Python code for a tool function based on endpoint info.
Args:
endpoint_info: Dictionary containing endpoint information
base_url: Base URL of the API
api_name: Name of the API (used for function documentation)
cache_ttl: Optional TTL for caching tool results in seconds
auth_header: Optional authentication header name to include in requests
Returns:
String containing Python code for the tool function
"""
operation_id = endpoint_info["operation_id"]
path = endpoint_info["path"]
method = endpoint_info["method"]
summary = endpoint_info["summary"]
description = endpoint_info["description"]
tags = ", ".join(endpoint_info.get("tags", []))
# Generate a clean function name (no API prefix, will be added during registration)
function_name = operation_id
# Generate docstring
docstring = (
f'"""{summary}\n\n'
if summary
else f'"""Calls the {method.upper()} {path} endpoint of the {api_name} API.\n\n'
)
if description:
docstring += f"{description}\n\n"
if tags:
docstring += f"Tags: {tags}\n\n"
docstring += "Args:\n"
# Generate function parameters
params = []
path_params = []
query_params = []
header_params = []
body_param = None
for param in endpoint_info.get("parameters", []):
param_name = param["name"]
# Clean the parameter name to be a valid Python identifier
clean_param_name = re.sub(r"[^a-zA-Z0-9_]", "_", param_name)
if clean_param_name[0].isdigit():
clean_param_name = f"p_{clean_param_name}"
param_type = param.get("schema", {}).get("type", "string")
required = param.get("required", False)
param_in = param.get("in", "query")
param_description = param.get("description", "")
python_type = "str"
if param_type == "integer":
python_type = "int"
elif param_type == "number":
python_type = "float"
elif param_type == "boolean":
python_type = "bool"
elif param_type == "array":
python_type = "List[Any]"
elif param_type == "object":
python_type = "Dict[str, Any]"
if required:
params.append(f"{clean_param_name}: {python_type}")
docstring += f" {clean_param_name}: {param_description} (in: {param_in})\n"
else:
params.append(f"{clean_param_name}: Optional[{python_type}] = None")
docstring += (
f" {clean_param_name}: (Optional) {param_description} (in: {param_in})\n"
)
# Store parameter location for request building
if param_in == "path":
path_params.append((param_name, clean_param_name))
elif param_in == "query":
query_params.append((param_name, clean_param_name))
elif (
param_in == "header" and param_name.lower() != auth_header.lower()
if auth_header
else True
):
header_params.append((param_name, clean_param_name))
# Handle request body
if endpoint_info.get("request_body"):
content = endpoint_info["request_body"].get("content", {})
if "application/json" in content:
body_param = "body"
schema_desc = "Request body"
# Try to get schema description from the content schema
schema = content.get("application/json", {}).get("schema", {})
if "description" in schema:
schema_desc = schema["description"]
params.append("body: Dict[str, Any]")
docstring += f" body: {schema_desc}\n"
# Add timeout and auth_token params if needed
params.append("timeout: float = 30.0")
docstring += " timeout: Timeout for the HTTP request in seconds\n"
if auth_header:
params.append("auth_token: Optional[str] = None")
docstring += f" auth_token: Optional authentication token to include in the '{auth_header}' header\n"
docstring += '\n Returns:\n API response data as a dictionary\n """'
# Generate function body
function_body = []
function_body.append(" async with httpx.AsyncClient() as client:")
# Format URL with path params
if path_params:
# For path params, replace {param} with {clean_param_name}
url_format = path
for param_name, clean_name in path_params:
url_format = url_format.replace(f"{{{param_name}}}", f"{{{clean_name}}}")
function_body.append(f' url = f"{base_url}{url_format}"')
else:
function_body.append(f' url = "{base_url}{path}"')
# Prepare query params
if query_params:
function_body.append(" params = {}")
for param_name, clean_name in query_params:
function_body.append(f" if {clean_name} is not None:")
function_body.append(f' params["{param_name}"] = {clean_name}')
else:
function_body.append(" params = None")
# Prepare headers
function_body.append(" headers = {}")
if auth_header:
function_body.append(" if auth_token is not None:")
function_body.append(f' headers["{auth_header}"] = auth_token')
if header_params:
for param_name, clean_name in header_params:
function_body.append(f" if {clean_name} is not None:")
function_body.append(f' headers["{param_name}"] = {clean_name}')
# Prepare request
request_args = ["url"]
if query_params:
request_args.append("params=params")
if header_params or auth_header:
request_args.append("headers=headers")
if body_param:
request_args.append(f"json={body_param}")
request_args.append("timeout=timeout")
function_body.append(" try:")
function_body.append(" response = await client.{method}({', '.join(request_args)})")
function_body.append(" response.raise_for_status()")
function_body.append(
" if response.headers.get('content-type', '').startswith('application/json'):"
)
function_body.append(" return response.json()")
function_body.append(" else:")
function_body.append(" return {{'text': response.text}}")
function_body.append(" except httpx.HTTPStatusError as e:")
function_body.append(" error_detail = e.response.text")
function_body.append(" try:")
function_body.append(" error_json = e.response.json()")
function_body.append(" if isinstance(error_json, dict):")
function_body.append(" error_detail = error_json")
function_body.append(" except Exception:")
function_body.append(" pass # Couldn't parse JSON error")
function_body.append(" raise ToolError(")
function_body.append(' f"API request failed: HTTP {{e.response.status_code}}",')
function_body.append(
' details={{"status_code": e.response.status_code, "response": error_detail}}'
)
function_body.append(" )")
function_body.append(" except httpx.RequestError as e:")
function_body.append(" raise ToolError(")
function_body.append(' f"API request failed: {{str(e)}}",')
function_body.append(' details={{"error": str(e)}}')
function_body.append(" )")
# Generate the full function
param_str = ", ".join(params)
if param_str:
param_str = f", {param_str}"
# Add decorators based on configuration
decorators = ["@with_tool_metrics", "@with_error_handling"]
if cache_ttl is not None:
decorators.insert(0, f"@with_cache(ttl={cache_ttl})")
function_code = [
*decorators,
f"async def {function_name}(self{param_str}):",
f"{docstring}",
*function_body,
]
return "\n".join(function_code)
# After the generate_tool_function_code function and before register_api_meta_tools
@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def register_api(
api_name: str,
openapi_url: str,
base_url: Optional[str] = None,
cache_ttl: Optional[int] = None,
auth_header: Optional[str] = None,
auth_token: Optional[str] = None,
tool_name_prefix: Optional[str] = None,
timeout: float = 30.0,
ctx: Optional[Dict[str, Any]] = None,
get_state=None,
set_state=None,
delete_state=None
) -> Dict[str, Any]:
"""Registers an API with the MCP server by fetching its OpenAPI spec.
Dynamically generates MCP tools for each endpoint in the API and registers
them with the MCP server. The tools are prefixed with the API name by default,
resulting in tool names like "api_name_operation_id".
Args:
api_name: A unique name for the API (used as a prefix for tool names)
openapi_url: URL of the OpenAPI spec (typically ending in /openapi.json)
base_url: Base URL of the API (if different from the OpenAPI URL)
cache_ttl: Optional TTL for caching tool results in seconds
auth_header: Optional name of the header to use for authentication
auth_token: Optional token to use when fetching the OpenAPI spec
tool_name_prefix: Optional prefix for tool names (default: api_name)
timeout: Timeout for the HTTP request in seconds
ctx: MCP context
get_state: Function to get state from store (injected by decorator)
set_state: Function to set state in store (injected by decorator)
delete_state: Function to delete state from store (injected by decorator)
Returns:
A dictionary containing the registration results:
{
"success": true,
"api_name": "example_api",
"base_url": "https://api.example.com",
"tools_registered": ["example_api_get_users", "example_api_create_user", ...],
"tools_count": 5,
"processing_time": 1.23
}
"""
# Validate inputs
if not api_name:
raise ToolInputError("api_name cannot be empty")
# Check if API name has invalid characters
if not re.match(r"^[a-zA-Z0-9_]+$", api_name):
raise ToolInputError(
"api_name must contain only alphanumeric characters and underscores"
)
if not openapi_url:
raise ToolInputError("openapi_url cannot be empty")
# Get registered APIs from state store
registered_apis = await get_state("registered_apis", {})
generated_tools = await get_state("generated_tools", {})
# Check if API is already registered
if api_name in registered_apis:
raise ToolInputError(
f"API {api_name} is already registered. Use a different name or unregister it first."
)
# Set tool name prefix
tool_name_prefix = tool_name_prefix or api_name
# Determine base URL if not provided
if not base_url:
# Extract base URL from OpenAPI URL
try:
parsed_url = urlparse(openapi_url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
logger.info(f"Using base_url: {base_url} (derived from openapi_url)")
except Exception as e:
raise ToolInputError(f"Could not determine base_url from openapi_url: {str(e)}") from e
# Prepare headers for fetching OpenAPI spec
headers = None
if auth_token and auth_header:
headers = {auth_header: auth_token}
# Fetch OpenAPI spec
logger.info(f"Fetching OpenAPI spec from {openapi_url}")
start_time = time.time()
openapi_spec = await fetch_openapi_spec(openapi_url, timeout, headers)
# Extract endpoint information
endpoints = extract_endpoint_info(openapi_spec)
logger.info(f"Extracted {len(endpoints)} endpoints from OpenAPI spec")
# Get MCP server from context
mcp = ctx.get('mcp')
if not mcp:
raise ToolError("MCP server context not available")
# Generate and register tools for each endpoint
registered_tools = []
generated_code = {}
for endpoint in endpoints:
operation_id = endpoint["operation_id"]
tool_name = f"{tool_name_prefix}_{operation_id}"
# Skip if this tool is already registered
if tool_name in generated_tools:
logger.warning(f"Tool {tool_name} already registered, skipping")
continue
# Generate tool function code
tool_code = generate_tool_function_code(
endpoint, base_url, api_name, cache_ttl, auth_header
)
# Store the generated code for debugging
generated_code[tool_name] = tool_code
# Create and register the tool function
try:
# Create a namespace for the exec
namespace = {}
# Add required imports to the namespace
namespace.update(
{
"httpx": httpx,
"ToolError": ToolError,
"Dict": Dict,
"Any": Any,
"Optional": Optional,
"List": List,
"with_tool_metrics": with_tool_metrics,
"with_error_handling": with_error_handling,
"with_cache": with_cache,
}
)
# Execute the generated code
exec(tool_code, namespace)
# Get the generated function from the namespace
generated_func = namespace[operation_id]
# Register with MCP server
mcp.tool(name=tool_name)(generated_func)
# Store the generated tool in state
generated_tools[tool_name] = tool_code
registered_tools.append(tool_name)
logger.info(
f"Registered tool {tool_name} for endpoint {endpoint['method'].upper()} {endpoint['path']}"
)
except Exception as e:
logger.error(f"Failed to register tool {tool_name}: {str(e)}", exc_info=True)
if "tool_code" in locals():
logger.error(f"Generated code that failed:\n{tool_code}")
# Store API information in state store
registered_apis[api_name] = {
"base_url": base_url,
"openapi_url": openapi_url,
"spec": openapi_spec,
"tools": registered_tools,
"tool_name_prefix": tool_name_prefix,
"generated_code": generated_code,
"auth_header": auth_header,
}
# Update state store
await set_state("registered_apis", registered_apis)
await set_state("generated_tools", generated_tools)
processing_time = time.time() - start_time
logger.success(
f"API {api_name} registered with {len(registered_tools)} tools in {processing_time:.2f}s"
)
return {
"success": True,
"api_name": api_name,
"base_url": base_url,
"tools_registered": registered_tools,
"tools_count": len(registered_tools),
"processing_time": processing_time,
}
@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def list_registered_apis(
ctx: Optional[Dict[str, Any]] = None,
get_state=None,
set_state=None,
delete_state=None
) -> Dict[str, Any]:
"""Lists all registered APIs and their endpoints.
Args:
ctx: MCP context
get_state: Function to get state from store (injected by decorator)
set_state: Function to set state in store (injected by decorator)
delete_state: Function to delete state from store (injected by decorator)
Returns:
A dictionary containing the registered APIs:
{
"success": true,
"apis": {
"example_api": {
"base_url": "https://api.example.com",
"openapi_url": "https://api.example.com/openapi.json",
"tools_count": 5,
"tools": ["example_api_get_users", "example_api_create_user", ...]
},
...
},
"total_apis": 2,
"total_tools": 12
}
"""
# Get state data
registered_apis = await get_state("registered_apis", {})
generated_tools = await get_state("generated_tools", {})
result = {
"success": True,
"apis": {},
"total_apis": len(registered_apis),
"total_tools": len(generated_tools),
}
for api_name, api_info in registered_apis.items():
result["apis"][api_name] = {
"base_url": api_info["base_url"],
"openapi_url": api_info["openapi_url"],
"tools_count": len(api_info["tools"]),
"tools": api_info["tools"],
"tool_name_prefix": api_info["tool_name_prefix"],
}
return result
@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def get_api_details(
api_name: str,
ctx: Optional[Dict[str, Any]] = None,
get_state=None,
set_state=None,
delete_state=None
) -> Dict[str, Any]:
"""Gets detailed information about a registered API.
Args:
api_name: The name of the API to get details for
ctx: MCP context
get_state: Function to get state from store (injected by decorator)
set_state: Function to set state in store (injected by decorator)
delete_state: Function to delete state from store (injected by decorator)
Returns:
A dictionary containing the API details:
{
"success": true,
"api_name": "example_api",
"base_url": "https://api.example.com",
"openapi_url": "https://api.example.com/openapi.json",
"tools": [
{
"name": "example_api_get_users",
"method": "get",
"path": "/users",
"summary": "Get all users",
"description": "Returns a list of all users in the system",
"parameters": [...]
},
...
],
"endpoints_count": 5
}
"""
# Get registered APIs from state
registered_apis = await get_state("registered_apis", {})
if api_name not in registered_apis:
raise ToolInputError(f"API {api_name} not found")
api_info = registered_apis[api_name]
# Extract endpoint details from the OpenAPI spec
endpoints = []
spec = api_info["spec"]
for endpoint_info in extract_endpoint_info(spec):
tool_name = f"{api_info['tool_name_prefix']}_{endpoint_info['operation_id']}"
endpoints.append(
{
"name": tool_name,
"method": endpoint_info["method"],
"path": endpoint_info["path"],
"summary": endpoint_info["summary"],
"description": endpoint_info["description"],
"parameters": endpoint_info["parameters"],
"tags": endpoint_info.get("tags", []),
}
)
return {
"success": True,
"api_name": api_name,
"base_url": api_info["base_url"],
"openapi_url": api_info["openapi_url"],
"tools": endpoints,
"endpoints_count": len(endpoints),
}
@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def unregister_api(
api_name: str,
ctx: Optional[Dict[str, Any]] = None,
get_state=None,
set_state=None,
delete_state=None
) -> Dict[str, Any]:
"""Unregisters an API and all its tools from the MCP server.
Args:
api_name: The name of the API to unregister
ctx: MCP context
get_state: Function to get state from store (injected by decorator)
set_state: Function to set state in store (injected by decorator)
delete_state: Function to delete state from store (injected by decorator)
Returns:
A dictionary indicating the result:
{
"success": true,
"api_name": "example_api",
"tools_unregistered": ["example_api_get_users", "example_api_create_user", ...],
"tools_count": 5
}
"""
# Get state data
registered_apis = await get_state("registered_apis", {})
generated_tools = await get_state("generated_tools", {})
if api_name not in registered_apis:
raise ToolInputError(f"API {api_name} not found")
api_info = registered_apis[api_name]
tools = api_info["tools"]
# Get MCP server from context
mcp = ctx.get('mcp')
if not mcp:
raise ToolError("MCP server context not available")
# Unregister tools from MCP server
for tool_name in tools:
try:
# Check if the MCP server has a method for unregistering tools
if hasattr(mcp, "unregister_tool"):
mcp.unregister_tool(tool_name)
# If not, try to remove from the tools dictionary
elif hasattr(mcp, "tools"):
if tool_name in mcp.tools:
del mcp.tools[tool_name]
# Remove from our generated tools dictionary
if tool_name in generated_tools:
del generated_tools[tool_name]
logger.info(f"Unregistered tool {tool_name}")
except Exception as e:
logger.error(f"Failed to unregister tool {tool_name}: {str(e)}", exc_info=True)
# Remove API from registered APIs
del registered_apis[api_name]
# Update state
await set_state("registered_apis", registered_apis)
await set_state("generated_tools", generated_tools)
logger.success(f"API {api_name} unregistered with {len(tools)} tools")
return {
"success": True,
"api_name": api_name,
"tools_unregistered": tools,
"tools_count": len(tools),
}
@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def call_dynamic_tool(
tool_name: str,
inputs: Optional[Dict[str, Any]] = None,
ctx: Optional[Dict[str, Any]] = None,
get_state=None,
set_state=None,
delete_state=None
) -> Dict[str, Any]:
"""Calls a dynamically registered tool by name.
This is a convenience function for calling tools registered via register_api,
allowing direct invocation of API endpoints.
Args:
tool_name: Name of the tool to call
inputs: Inputs to pass to the tool (parameters for the API endpoint)
ctx: MCP context
get_state: Function to get state from store (injected by decorator)
set_state: Function to set state in store (injected by decorator)
delete_state: Function to delete state from store (injected by decorator)
Returns:
The result of the tool call
"""
# Get MCP server from context
mcp = ctx.get('mcp')
if not mcp:
raise ToolError("MCP server context not available")
# Get registered APIs and generated tools from state
registered_apis = await get_state("registered_apis", {})
generated_tools = await get_state("generated_tools", {})
if not tool_name:
raise ToolInputError("tool_name cannot be empty")
# Check if tool exists
if tool_name not in generated_tools:
valid_tools = list(generated_tools.keys())
raise ToolInputError(
f"Tool {tool_name} not found. Valid tools: {', '.join(valid_tools[:10])}..."
if len(valid_tools) > 10
else f"Tool {tool_name} not found. Valid tools: {', '.join(valid_tools)}"
)
# Initialize inputs
if inputs is None:
inputs = {}
# Find which API this tool belongs to
api_name = None
for name, info in registered_apis.items():
if tool_name in info["tools"]:
api_name = name
break
if not api_name:
logger.warning(f"Could not determine which API {tool_name} belongs to")
# Add auth_token to inputs if specified and the API has an auth_header
api_info = registered_apis.get(api_name, {})
if api_info.get("auth_header") and "auth_token" in ctx:
inputs["auth_token"] = ctx["auth_token"]
# Call the tool directly through MCP
logger.info(f"Calling dynamic tool {tool_name} with inputs: {inputs}")
start_time = time.time()
# MCP execute may be different from mcp.call_tool, handle appropriately
if hasattr(mcp, "execute"):
result = await mcp.execute(tool_name, inputs)
else:
result = await mcp.call_tool(tool_name, inputs)
processing_time = time.time() - start_time
# Add metadata to result
if isinstance(result, dict):
result["processing_time"] = processing_time
result["success"] = True
else:
result = {"data": result, "processing_time": processing_time, "success": True}
logger.info(f"Called dynamic tool {tool_name} in {processing_time:.4f}s")
return result
@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def refresh_api(
api_name: str,
update_base_url: Optional[str] = None,
timeout: float = 30.0,
ctx: Optional[Dict[str, Any]] = None,
get_state=None,
set_state=None,
delete_state=None
) -> Dict[str, Any]:
"""Refreshes an API by re-fetching its OpenAPI spec and updating tools.
This is useful when the API has been updated with new endpoints or
modifications to existing endpoints.
Args:
api_name: The name of the API to refresh
update_base_url: Optional new base URL for the API
timeout: Timeout for the HTTP request in seconds
ctx: MCP context
get_state: Function to get state from store (injected by decorator)
set_state: Function to set state in store (injected by decorator)
delete_state: Function to delete state from store (injected by decorator)
Returns:
A dictionary indicating the result:
{
"success": true,
"api_name": "example_api",
"tools_added": ["example_api_new_endpoint", ...],
"tools_updated": ["example_api_modified_endpoint", ...],
"tools_removed": ["example_api_deleted_endpoint", ...],
"tools_count": 8
}
"""
# Get registered APIs from state
registered_apis = await get_state("registered_apis", {})
if api_name not in registered_apis:
raise ToolInputError(f"API {api_name} not found")
api_info = registered_apis[api_name]
old_tools = set(api_info["tools"])
# Determine if we need to update the base URL
base_url = update_base_url or api_info["base_url"]
# First, unregister the API
await unregister_api(api_name, ctx=ctx, get_state=get_state, set_state=set_state, delete_state=delete_state)
# Re-register with the same parameters but potentially updated base URL
result = await register_api(
api_name=api_name,
openapi_url=api_info["openapi_url"],
base_url=base_url,
auth_header=api_info.get("auth_header"),
tool_name_prefix=api_info["tool_name_prefix"],
timeout=timeout,
ctx=ctx,
get_state=get_state,
set_state=set_state,
delete_state=delete_state
)
# Determine which tools were added, updated, or removed
new_tools = set(result["tools_registered"])
tools_added = list(new_tools - old_tools)
tools_removed = list(old_tools - new_tools)
tools_updated = list(new_tools.intersection(old_tools))
logger.success(
f"API {api_name} refreshed: "
f"{len(tools_added)} added, {len(tools_removed)} removed, {len(tools_updated)} updated"
)
return {
"success": True,
"api_name": api_name,
"tools_added": tools_added,
"tools_updated": tools_updated,
"tools_removed": tools_removed,
"tools_count": len(new_tools),
}
@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def get_tool_details(
tool_name: str,
ctx: Optional[Dict[str, Any]] = None,
get_state=None,
set_state=None,
delete_state=None
) -> Dict[str, Any]:
"""Gets detailed information about a dynamically registered tool.
Args:
tool_name: Name of the tool to get details for
ctx: MCP context
get_state: Function to get state from store (injected by decorator)
set_state: Function to set state in store (injected by decorator)
delete_state: Function to delete state from store (injected by decorator)
Returns:
A dictionary containing the tool details:
{
"success": true,
"tool_name": "example_api_get_users",
"api_name": "example_api",
"method": "get",
"path": "/users",
"summary": "Get all users",
"description": "Returns a list of all users in the system",
"parameters": [...],
"source_code": "..."
}
"""
# Get registered APIs and generated tools from state
registered_apis = await get_state("registered_apis", {})
generated_tools = await get_state("generated_tools", {})
if tool_name not in generated_tools:
raise ToolInputError(f"Tool {tool_name} not found")
# Find which API this tool belongs to
api_name = None
for name, info in registered_apis.items():
if tool_name in info["tools"]:
api_name = name
break
if not api_name:
raise ToolError(f"Could not determine which API {tool_name} belongs to")
api_info = registered_apis[api_name]
# Find endpoint information in the API's endpoint list
endpoint_info = None
for endpoint in extract_endpoint_info(api_info["spec"]):
if f"{api_info['tool_name_prefix']}_{endpoint['operation_id']}" == tool_name:
endpoint_info = endpoint
break
if not endpoint_info:
raise ToolError(f"Could not find endpoint information for tool {tool_name}")
# Get the source code
source_code = api_info.get("generated_code", {}).get(tool_name, "Source code not available")
return {
"success": True,
"tool_name": tool_name,
"api_name": api_name,
"method": endpoint_info["method"],
"path": endpoint_info["path"],
"summary": endpoint_info["summary"],
"description": endpoint_info["description"],
"parameters": endpoint_info["parameters"],
"tags": endpoint_info.get("tags", []),
"source_code": source_code,
}
@with_tool_metrics
@with_error_handling
@with_state_management(namespace="meta_api")
async def list_available_tools(
include_source_code: bool = False,
ctx: Optional[Dict[str, Any]] = None,
get_state=None,
set_state=None,
delete_state=None
) -> Dict[str, Any]:
"""Lists all available tools registered via the API Meta-Tool.
Args:
include_source_code: Whether to include source code in the response
ctx: MCP context
get_state: Function to get state from store (injected by decorator)
set_state: Function to set state in store (injected by decorator)
delete_state: Function to delete state from store (injected by decorator)
Returns:
A dictionary containing the available tools:
{
"success": true,
"tools": [
{
"name": "example_api_get_users",
"api_name": "example_api",
"method": "get",
"path": "/users",
"summary": "Get all users",
"source_code": "..." # Only if include_source_code=True
},
...
],
"tools_count": 12
}
"""
# Get registered APIs from state
registered_apis = await get_state("registered_apis", {})
generated_tools = await get_state("generated_tools", {})
tools = []
for api_name, api_info in registered_apis.items():
spec = api_info["spec"]
endpoints = extract_endpoint_info(spec)
for endpoint in endpoints:
tool_name = f"{api_info['tool_name_prefix']}_{endpoint['operation_id']}"
if tool_name in generated_tools:
tool_info = {
"name": tool_name,
"api_name": api_name,
"method": endpoint["method"],
"path": endpoint["path"],
"summary": endpoint["summary"],
}
if include_source_code:
tool_info["source_code"] = api_info.get("generated_code", {}).get(
tool_name, "Source code not available"
)
tools.append(tool_info)
return {"success": True, "tools": tools, "tools_count": len(tools)}
# Now we have all our stateless functions defined:
# register_api, list_registered_apis, get_api_details, unregister_api
# call_dynamic_tool, refresh_api, get_tool_details, list_available_tools
def register_api_meta_tools(mcp_server):
"""Registers API Meta-Tool with the MCP server.
Args:
mcp_server: MCP server instance
"""
# Register tools with MCP server
mcp_server.tool(name="register_api")(register_api)
mcp_server.tool(name="list_registered_apis")(list_registered_apis)
mcp_server.tool(name="get_api_details")(get_api_details)
mcp_server.tool(name="unregister_api")(unregister_api)
mcp_server.tool(name="call_dynamic_tool")(call_dynamic_tool)
mcp_server.tool(name="refresh_api")(refresh_api)
mcp_server.tool(name="get_tool_details")(get_tool_details)
mcp_server.tool(name="list_available_tools")(list_available_tools)
logger.info("Registered API Meta-Tool functions")
return None # No need to return an instance anymore
# Example usage if this module is run directly
if __name__ == "__main__":
import argparse
import asyncio
from ultimate_mcp_server import create_app
async def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="API Meta-Tool for Ultimate MCP Server")
parser.add_argument("--register", help="Register an API with the given name")
parser.add_argument("--url", help="OpenAPI spec URL")
parser.add_argument("--list", action="store_true", help="List registered APIs")
parser.add_argument("--details", help="Get details for the given API")
parser.add_argument("--unregister", help="Unregister the given API")
parser.add_argument("--refresh", help="Refresh the given API")
parser.add_argument("--base-url", help="Base URL for API requests")
args = parser.parse_args()
# Create MCP server
create_app()
# In FastMCP 2.0+, access the MCP server directly from the Gateway instance
# The create_app() should return the gateway instance or we need to get it differently
from ultimate_mcp_server.core import _gateway_instance
mcp_server = _gateway_instance.mcp if _gateway_instance else None
if not mcp_server:
raise RuntimeError("Gateway instance not initialized or MCP server not available")
# Register API Meta-Tool
register_api_meta_tools(mcp_server)
# Create context for stateless functions
ctx = {"mcp": mcp_server}
# Process commands
if args.register and args.url:
result = await register_api(
api_name=args.register,
openapi_url=args.url,
base_url=args.base_url,
ctx=ctx
)
print(f"Registered API {args.register} with {result['tools_count']} tools")
print(f"Tools: {', '.join(result['tools_registered'])}")
elif args.list:
result = await list_registered_apis(ctx=ctx)
print(f"Registered APIs ({result['total_apis']}):")
for api_name, api_info in result["apis"].items():
print(
f"- {api_name}: {api_info['tools_count']} tools, Base URL: {api_info['base_url']}"
)
elif args.details:
result = await get_api_details(args.details, ctx=ctx)
print(f"API {args.details} ({result['endpoints_count']} endpoints):")
print(f"Base URL: {result['base_url']}")
print(f"OpenAPI URL: {result['openapi_url']}")
print("Endpoints:")
for endpoint in result["tools"]:
print(f"- {endpoint['method'].upper()} {endpoint['path']} ({endpoint['name']})")
if endpoint["summary"]:
print(f" Summary: {endpoint['summary']}")
elif args.unregister:
result = await unregister_api(args.unregister, ctx=ctx)
print(f"Unregistered API {args.unregister} with {result['tools_count']} tools")
elif args.refresh:
result = await refresh_api(
api_name=args.refresh,
update_base_url=args.base_url,
ctx=ctx
)
print(
f"Refreshed API {args.refresh}: {len(result['tools_added'])} added, {len(result['tools_removed'])} removed, {len(result['tools_updated'])} updated"
)
else:
print("No action specified. Use --help for usage information.")
# Run the main function
asyncio.run(main())
```
--------------------------------------------------------------------------------
/examples/entity_relation_graph_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Entity relationship graph extraction and visualization demo using Ultimate MCP Server (New Version)."""
import asyncio
import json
import os
import sys
import time
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Optional
import networkx as nx
from rich import box
from rich.markup import escape
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table
from rich.tree import Tree
try:
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.tools.entity_relation_graph import (
COMMON_SCHEMAS,
HAS_NETWORKX,
HAS_VISUALIZATION_LIBS,
GraphStrategy,
OutputFormat,
VisualizationFormat,
extract_entity_graph,
)
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.logging.console import console
except ImportError as e:
print(f"Error importing Ultimate MCP Server modules: {e}")
print("Please ensure the script is run from the correct directory or the project path is set correctly.")
sys.exit(1)
# Initialize logger
logger = get_logger("example.entity_graph") # Updated logger name
# Setup Directories
SCRIPT_DIR = Path(__file__).resolve().parent
SAMPLE_DIR = SCRIPT_DIR / "sample"
OUTPUT_DIR = SCRIPT_DIR / "output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
class TextDomain(Enum):
"""Domain types for demonstration examples."""
BUSINESS = "business"
ACADEMIC = "academic"
LEGAL = "legal"
MEDICAL = "medical"
GENERAL = "general" # Added for cases without a specific domain schema
# Console instances
main_console = console
# Keep detail_console if needed, but wasn't used in original provided script
# detail_console = Console(width=100, highlight=True)
def display_header(title: str) -> None:
"""Display a section header."""
main_console.print()
main_console.print(Rule(f"[bold blue]{title}[/bold blue]"))
main_console.print()
def display_dataset_info(dataset_path: Path, title: str) -> None:
"""Display information about a dataset."""
if not dataset_path.exists():
main_console.print(f"[bold red]Error:[/bold red] Dataset file not found: {dataset_path}")
return
try:
with open(dataset_path, "r", encoding="utf-8") as f:
content = f.read()
# Count entities/characters for display
char_count = len(content)
word_count = len(content.split())
# Simple sentence count (approximate)
sentence_count = content.count('.') + content.count('?') + content.count('!')
if sentence_count == 0 and char_count > 0:
sentence_count = 1 # At least one sentence if there's text
# Preview of the content (first 300 chars)
preview = escape(content[:300] + "..." if len(content) > 300 else content)
main_console.print(Panel(
f"[bold cyan]Dataset:[/bold cyan] {dataset_path.name}\n"
f"[bold cyan]Size:[/bold cyan] {char_count:,} characters | {word_count:,} words | ~{sentence_count:,} sentences\n\n"
f"[bold cyan]Preview:[/bold cyan]\n{preview}",
title=title,
border_style="cyan",
expand=False
))
except Exception as e:
main_console.print(f"[bold red]Error reading dataset file {dataset_path.name}:[/bold red] {e}")
def display_extraction_params(params: Dict[str, Any]) -> None:
"""Display extraction parameters passed to the tool."""
param_table = Table(title="Extraction Parameters", box=box.ROUNDED, show_header=True, header_style="bold magenta")
param_table.add_column("Parameter", style="cyan", no_wrap=True)
param_table.add_column("Value", style="green")
# Filter parameters to display relevant ones
display_keys = [
"provider", "model", "strategy", "domain", "output_format", "visualization_format",
"include_evidence", "include_attributes", "include_positions", "include_temporal_info",
"normalize_entities", "max_entities", "max_relations", "min_confidence", "enable_reasoning",
"language" # Added language if used
]
for key in display_keys:
if key in params:
value = params[key]
# Format enums and lists nicely
if isinstance(value, Enum):
value_str = value.value
elif isinstance(value, list):
value_str = escape(", ".join(str(v) for v in value)) if value else "[dim italic]Empty List[/dim italic]"
elif isinstance(value, bool):
value_str = "[green]Yes[/green]" if value else "[red]No[/red]"
elif value is None:
value_str = "[dim italic]None[/dim italic]"
else:
value_str = escape(str(value))
param_table.add_row(key, value_str)
main_console.print(param_table)
def display_entity_stats(result: Dict[str, Any]) -> None:
"""Display statistics about extracted entities and relationships."""
metadata = result.get("metadata", {})
entities = result.get("entities", []) # Get entities directly for type counting if metadata missing
relationships = result.get("relationships", [])
entity_count = metadata.get("entity_count", len(entities))
relationship_count = metadata.get("relationship_count", len(relationships))
if entity_count == 0:
main_console.print("[yellow]No entities found in extraction result.[/yellow]")
return
# Use metadata if available, otherwise count manually
entity_types_meta = metadata.get("entity_types")
rel_types_meta = metadata.get("relation_types")
entity_type_counts = {}
if entity_types_meta:
for etype in entity_types_meta:
# Count occurrences in the actual entity list for accuracy
entity_type_counts[etype] = sum(1 for e in entities if e.get("type") == etype)
else: # Fallback if metadata key is missing
for entity in entities:
ent_type = entity.get("type", "Unknown")
entity_type_counts[ent_type] = entity_type_counts.get(ent_type, 0) + 1
rel_type_counts = {}
if rel_types_meta:
for rtype in rel_types_meta:
rel_type_counts[rtype] = sum(1 for r in relationships if r.get("type") == rtype)
else: # Fallback
for rel in relationships:
rel_type = rel.get("type", "Unknown")
rel_type_counts[rel_type] = rel_type_counts.get(rel_type, 0) + 1
# Create entity stats table
stats_table = Table(title="Extraction Statistics", box=box.ROUNDED, show_header=True, header_style="bold blue")
stats_table.add_column("Metric", style="cyan")
stats_table.add_column("Count", style="green", justify="right")
stats_table.add_row("Total Entities", str(entity_count))
stats_table.add_row("Total Relationships", str(relationship_count))
# Add entity type counts
stats_table.add_section()
for ent_type, count in sorted(entity_type_counts.items(), key=lambda x: x[1], reverse=True):
stats_table.add_row(f"Entity Type: [italic]{escape(ent_type)}[/italic]", str(count))
# Add relationship type counts (top 5)
if rel_type_counts:
stats_table.add_section()
for rel_type, count in sorted(rel_type_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
stats_table.add_row(f"Relationship Type: [italic]{escape(rel_type)}[/italic]", str(count))
if len(rel_type_counts) > 5:
stats_table.add_row("[dim]... (other relationship types)[/dim]", "")
main_console.print(stats_table)
def display_graph_metrics(result: Dict[str, Any]) -> None:
"""Display graph metrics if available in metadata."""
# Metrics are now nested under metadata
metrics = result.get("metadata", {}).get("metrics", {})
if not metrics:
# Check the top level as a fallback for older structure compatibility if needed
metrics = result.get("metrics", {})
if not metrics:
main_console.print("[dim]No graph metrics calculated (requires networkx).[/dim]")
return
metrics_table = Table(title="Graph Metrics", box=box.ROUNDED, show_header=True, header_style="bold blue")
metrics_table.add_column("Metric", style="cyan")
metrics_table.add_column("Value", style="green", justify="right")
# Add metrics to table
for key, value in metrics.items():
if isinstance(value, (int, float)):
# Improved formatting
if isinstance(value, float):
if 0.0001 < abs(value) < 10000:
formatted_value = f"{value:.4f}"
else:
formatted_value = f"{value:.3e}" # Scientific notation for very small/large
else:
formatted_value = f"{value:,}" # Add commas for integers
metrics_table.add_row(key.replace("_", " ").title(), formatted_value)
elif value is not None:
metrics_table.add_row(key.replace("_", " ").title(), escape(str(value)))
main_console.print(metrics_table)
def display_entities_table(result: Dict[str, Any], limit: int = 10) -> None:
"""Display extracted entities in a table, sorted appropriately."""
entities = result.get("entities", [])
if not entities:
main_console.print("[yellow]No entities found to display.[/yellow]")
return
# Sorting based on available metrics from _add_graph_metrics
# The new tool adds 'degree' and 'centrality'
sort_key = "name" # Default sort
if entities and isinstance(entities[0], dict):
if "centrality" in entities[0] and any(e.get("centrality", 0) > 0 for e in entities):
entities.sort(key=lambda x: x.get("centrality", 0.0), reverse=True)
sort_key = "centrality"
elif "degree" in entities[0] and any(e.get("degree", 0) > 0 for e in entities):
entities.sort(key=lambda x: x.get("degree", 0.0), reverse=True)
sort_key = "degree"
elif "mentions" in entities[0] and any(e.get("mentions") for e in entities):
entities.sort(key=lambda x: len(x.get("mentions", [])), reverse=True)
sort_key = "mentions"
else:
entities.sort(key=lambda x: x.get("name", "").lower()) # Fallback to name
# Limit to top entities
display_entities = entities[:limit]
title = f"Top {limit} Entities"
if sort_key != "name":
title += f" (Sorted by {sort_key.capitalize()})"
entity_table = Table(title=title, box=box.ROUNDED, show_header=True, header_style="bold blue")
entity_table.add_column("ID", style="dim", width=8)
entity_table.add_column("Name", style="cyan", max_width=40)
entity_table.add_column("Type", style="green", max_width=20)
# Add columns for additional information if available
has_degree = any(e.get("degree", 0) > 0 for e in display_entities)
has_centrality = any(e.get("centrality", 0) > 0 for e in display_entities)
has_mentions = any(e.get("mentions") for e in display_entities)
has_attributes = any(e.get("attributes") for e in display_entities)
if has_degree:
entity_table.add_column("Degree", style="magenta", justify="right", width=8)
if has_centrality:
entity_table.add_column("Centrality", style="magenta", justify="right", width=10)
if has_mentions:
entity_table.add_column("Mentions", style="yellow", justify="right", width=8)
if has_attributes:
entity_table.add_column("Attributes", style="blue", max_width=50)
# Add rows for each entity
for entity in display_entities:
row = [
escape(entity.get("id", "")),
escape(entity.get("name", "")),
escape(entity.get("type", "Unknown"))
]
if has_degree:
degree = entity.get("degree", 0.0)
row.append(f"{degree:.3f}")
if has_centrality:
centrality = entity.get("centrality", 0.0)
row.append(f"{centrality:.4f}")
if has_mentions:
mentions_count = len(entity.get("mentions", []))
row.append(str(mentions_count))
if has_attributes:
attributes = entity.get("attributes", {})
# Format attributes more readably
attr_str = "; ".join(f"{k}={v}" for k, v in attributes.items() if v) # Ignore empty values
row.append(escape(attr_str[:45] + ("..." if len(attr_str) > 45 else "")))
entity_table.add_row(*row)
main_console.print(entity_table)
if len(entities) > limit:
main_console.print(f"[dim italic]...and {len(entities) - limit} more entities[/dim italic]")
def display_relationships_table(result: Dict[str, Any], limit: int = 10) -> None:
"""Display extracted relationships in a table, sorted by confidence."""
relationships = result.get("relationships", [])
# Create entity map for quick name lookups
entity_map = {entity["id"]: entity for entity in result.get("entities", []) if isinstance(entity, dict) and "id" in entity}
if not relationships:
main_console.print("[yellow]No relationships found to display.[/yellow]")
return
# Sort relationships by confidence (new tool ensures confidence exists)
relationships.sort(key=lambda x: x.get("confidence", 0.0), reverse=True)
# Limit to top relationships
display_relationships = relationships[:limit]
rel_table = Table(title=f"Top {limit} Relationships (Sorted by Confidence)", box=box.ROUNDED, show_header=True, header_style="bold blue")
rel_table.add_column("Source", style="cyan", max_width=30)
rel_table.add_column("Type", style="green", max_width=25)
rel_table.add_column("Target", style="cyan", max_width=30)
rel_table.add_column("Conf.", style="magenta", justify="right", width=6)
# Check if we have evidence or temporal info
has_evidence = any(r.get("evidence") for r in display_relationships)
has_temporal = any(r.get("temporal") for r in display_relationships)
if has_evidence:
rel_table.add_column("Evidence", style="yellow", max_width=40)
if has_temporal:
rel_table.add_column("Temporal", style="blue", max_width=20)
# Add rows for each relationship
for rel in display_relationships:
source_id = rel.get("source", "")
target_id = rel.get("target", "")
# Get entity names if available, fallback to ID
source_name = entity_map.get(source_id, {}).get("name", source_id)
target_name = entity_map.get(target_id, {}).get("name", target_id)
row = [
escape(source_name),
escape(rel.get("type", "Unknown")),
escape(target_name),
f"{rel.get('confidence', 0.0):.2f}",
]
if has_evidence:
evidence = rel.get("evidence", "")
row.append(escape(evidence[:35] + ("..." if len(evidence) > 35 else "")))
if has_temporal:
temporal = rel.get("temporal", {})
temp_str = "; ".join(f"{k}={v}" for k, v in temporal.items())
row.append(escape(temp_str[:18] + ("..." if len(temp_str) > 18 else "")))
rel_table.add_row(*row)
main_console.print(rel_table)
if len(relationships) > limit:
main_console.print(f"[dim italic]...and {len(relationships) - limit} more relationships[/dim italic]")
def display_entity_graph_tree(result: Dict[str, Any], max_depth: int = 2, max_children: int = 5) -> None:
"""Display a tree representation of the entity graph, starting from the most central node."""
entities = result.get("entities", [])
relationships = result.get("relationships", [])
entity_map = {entity["id"]: entity for entity in entities if isinstance(entity, dict) and "id" in entity}
if not entities or not relationships or not HAS_NETWORKX: # Tree view less useful without sorting/structure
if not HAS_NETWORKX:
main_console.print("[yellow]Cannot display graph tree: NetworkX library not available for centrality sorting.[/yellow]")
else:
main_console.print("[yellow]Cannot display graph tree: insufficient data.[/yellow]")
return
# Sort entities by centrality (assuming metrics were calculated)
entities.sort(key=lambda x: x.get("centrality", 0.0), reverse=True)
# Get most central entity as root
if not entities:
return # Should not happen if check above passed, but safety
root_entity = entities[0]
root_id = root_entity["id"]
# Create rich Tree
tree = Tree(
f"[bold cyan]{escape(root_entity.get('name', root_id))}[/bold cyan] "
f"([dim italic]ID: {escape(root_id)}, Type: {escape(root_entity.get('type', 'Unknown'))}[/dim italic])"
)
# Keep track of edges explored to represent the tree structure
explored_edges = set()
# Recursively build tree using BFS approach for levels
queue = [(root_id, tree, 0)] # (entity_id, parent_tree_node, current_depth)
visited_nodes_in_tree = {root_id} # Prevent cycles *within this tree rendering*
while queue:
current_id, parent_node, depth = queue.pop(0)
if depth >= max_depth:
continue
# Find outgoing relationships
outgoing_rels = [
r for r in relationships
if r.get("source") == current_id
and (current_id, r.get("target"), r.get("type")) not in explored_edges
]
outgoing_rels.sort(key=lambda x: x.get("confidence", 0.0), reverse=True)
# Find incoming relationships
incoming_rels = [
r for r in relationships
if r.get("target") == current_id
and (r.get("source"), current_id, r.get("type")) not in explored_edges
]
incoming_rels.sort(key=lambda x: x.get("confidence", 0.0), reverse=True)
# Add outgoing children
children_count = 0
for rel in outgoing_rels:
if children_count >= max_children:
parent_node.add("[dim italic]... (more outgoing)[/dim]")
break
target_id = rel.get("target")
if target_id and target_id not in visited_nodes_in_tree: # Avoid cycles in display
target_entity = entity_map.get(target_id)
if target_entity:
edge_sig = (current_id, target_id, rel.get("type"))
explored_edges.add(edge_sig)
visited_nodes_in_tree.add(target_id)
rel_type = escape(rel.get("type", "related to"))
conf = rel.get("confidence", 0.0)
target_name = escape(target_entity.get("name", target_id))
target_type = escape(target_entity.get("type", "Unknown"))
branch_text = (
f"-[[green]{rel_type}[/green] ({conf:.1f})]-> "
f"[cyan]{target_name}[/cyan] ([dim italic]{target_type}[/dim italic])"
)
branch = parent_node.add(branch_text)
queue.append((target_id, branch, depth + 1))
children_count += 1
# Add incoming children (optional, can make tree busy)
# Comment out this block if you only want outgoing relationships in the tree
# children_count = 0
# for rel in incoming_rels:
# if children_count >= max_children // 2: # Show fewer incoming
# parent_node.add("[dim italic]... (more incoming)[/dim]")
# break
# source_id = rel.get("source")
# if source_id and source_id not in visited_nodes_in_tree:
# source_entity = entity_map.get(source_id)
# if source_entity:
# edge_sig = (source_id, current_id, rel.get("type"))
# explored_edges.add(edge_sig)
# visited_nodes_in_tree.add(source_id)
#
# rel_type = escape(rel.get("type", "related to"))
# conf = rel.get("confidence", 0.0)
# source_name = escape(source_entity.get("name", source_id))
# source_type = escape(source_entity.get("type", "Unknown"))
#
# branch_text = (
# f"<-[[red]{rel_type}[/red] ({conf:.1f})]- "
# f"[magenta]{source_name}[/magenta] ([dim italic]{source_type}[/dim italic])"
# )
# branch = parent_node.add(branch_text)
# queue.append((source_id, branch, depth + 1))
# children_count += 1
main_console.print(Panel(tree, title=f"Entity Graph Tree View (Root: {escape(root_entity.get('name', ''))})", border_style="blue"))
def display_extraction_summary(result: Dict[str, Any]) -> None:
"""Display a summary of the extraction performance and cost."""
metadata = result.get("metadata", {})
provider = result.get("provider", "Unknown")
model = result.get("model", "Unknown")
tokens = result.get("tokens", {})
cost = result.get("cost", 0.0) # Cost is now float
processing_time = result.get("processing_time", 0.0) # Time is now float
strategy = metadata.get("processing_strategy", "Unknown")
schema_used = metadata.get("schema_used", "Unknown")
summary_table = Table(box=box.ROUNDED, show_header=False, title="Extraction Summary")
summary_table.add_column("Metric", style="cyan", no_wrap=True)
summary_table.add_column("Value", style="green")
summary_table.add_row("Provider", escape(provider))
summary_table.add_row("Model", escape(model))
summary_table.add_row("Strategy", escape(strategy))
summary_table.add_row("Schema Used", escape(schema_used))
summary_table.add_row("Input Tokens", f"{tokens.get('input', 0):,}")
summary_table.add_row("Output Tokens", f"{tokens.get('output', 0):,}")
summary_table.add_row("Total Tokens", f"{tokens.get('total', 0):,}")
summary_table.add_row("Cost", f"${cost:.6f}")
summary_table.add_row("Processing Time", f"{processing_time:.2f} seconds")
main_console.print(summary_table)
def save_visualization(result: Dict[str, Any], domain: str, strategy: str, output_dir: Path) -> Optional[str]:
"""Save visualization file based on the format present in the result."""
visualization = result.get("visualization") # Visualization data is now under this key
if not visualization or not isinstance(visualization, dict):
main_console.print("[dim]No visualization data found in the result.[/dim]")
return None
content = None
extension = None
file_path = None
# Check for different visualization formats
if "html" in visualization:
content = visualization["html"]
extension = "html"
elif "svg" in visualization:
content = visualization["svg"]
extension = "svg"
elif "png_url" in visualization: # Assuming PNG might save file directly and return URL
file_path = visualization["png_url"].replace("file://", "")
extension = "png"
elif "dot" in visualization:
content = visualization["dot"]
extension = "dot"
if file_path: # If path was returned directly (like maybe for PNG)
if Path(file_path).exists():
return file_path
else:
main_console.print(f"[red]Visualization file path provided but not found: {file_path}[/red]")
return None
if content and extension:
timestamp = int(time.time())
# Sanitize domain and strategy for filename
safe_domain = domain.replace(" ", "_").lower()
safe_strategy = strategy.replace(" ", "_").lower()
output_path = output_dir / f"graph_{safe_domain}_{safe_strategy}_{timestamp}.{extension}"
try:
with open(output_path, "w", encoding="utf-8") as f:
f.write(content)
return str(output_path)
except Exception as e:
main_console.print(f"[bold red]Error saving visualization file {output_path}:[/bold red] {e}")
return None
elif "error" in visualization:
main_console.print(f"[yellow]Visualization generation failed:[/yellow] {visualization['error']}")
return None
else:
main_console.print("[dim]Unsupported or missing visualization format in result.[/dim]")
return None
async def run_entity_extraction(
text: str,
domain: TextDomain,
strategy: GraphStrategy,
model: str,
output_format: OutputFormat = OutputFormat.JSON,
visualization_format: VisualizationFormat = VisualizationFormat.HTML, # Keep HTML for demo vis
provider: str = Provider.ANTHROPIC.value # Example provider
) -> Optional[Dict[str, Any]]:
"""Run entity graph extraction with progress indicator and display params."""
# Setup extraction parameters
params = {
"text": text,
"provider": provider, # Pass provider name string
"model": model,
"strategy": strategy, # Pass enum directly
"output_format": output_format, # Pass enum directly
"visualization_format": visualization_format, # Pass enum directly
# --- Include Flags (consider new defaults) ---
"include_evidence": True, # Explicitly keep True for demo
"include_attributes": True, # Explicitly keep True for demo
"include_positions": False, # Change to False to match new default (saves tokens)
"include_temporal_info": True, # Explicitly keep True for demo
# --- Control Flags ---
"normalize_entities": True, # Keep True (new default)
"enable_reasoning": False, # Keep False for demo speed (can be enabled)
# --- Limits ---
"max_entities": 75, # Adjusted limits for demo
"max_relations": 150,
"min_confidence": 0.55, # Slightly higher confidence
# --- Optional ---
"language": None, # Specify if needed, e.g., "Spanish"
"domain": None, # Set below if applicable
#"custom_prompt": None, # Add if testing custom prompts
#"system_prompt": None, # Add if testing system prompts
#"additional_params": {"temperature": 0.2} # Example
}
# Add domain value string if applicable and not GENERAL
if domain != TextDomain.GENERAL and domain.value in COMMON_SCHEMAS:
params["domain"] = domain.value
# Display parameters being used
display_extraction_params(params)
# Run extraction with progress spinner
result = None
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=main_console,
transient=False # Keep progress visible after completion
) as progress:
task_desc = f"Extracting graph ({domain.value}/{strategy.value}/{model})..."
task = progress.add_task(task_desc, total=None)
try:
start_time = time.monotonic()
# Pass enums directly now, the tool handles conversion if needed
result = await extract_entity_graph(**params) # type: ignore
end_time = time.monotonic()
duration = end_time - start_time
progress.update(task, completed=100, description=f"[green]Extraction complete ({duration:.2f}s)[/green]")
return result
except Exception as e:
logger.error(f"Extraction failed during run_entity_extraction: {e}", exc_info=True)
progress.update(task, completed=100, description=f"[bold red]Extraction failed: {escape(str(e))}[/bold red]")
# Optionally re-raise or just return None
# raise # Uncomment to stop the demo on failure
return None # Allow demo to continue
# --- Demonstration Functions (Updated calls to run_entity_extraction) ---
async def demonstrate_domain_extraction(
domain: TextDomain,
sample_file: str,
strategy: GraphStrategy,
model: str = "claude-3-5-haiku-20241022", # Default model for demos
provider: str = Provider.ANTHROPIC.value
):
"""Helper function to demonstrate extraction for a specific domain."""
domain_name = domain.value.capitalize()
display_header(f"{domain_name} Domain Entity Graph Extraction ({strategy.value} strategy)")
sample_path = SAMPLE_DIR / sample_file
display_dataset_info(sample_path, f"{domain_name} Sample Text")
if not sample_path.exists():
return # Skip if file missing
with open(sample_path, "r", encoding="utf-8") as f:
text_content = f.read()
try:
result = await run_entity_extraction(
text=text_content,
domain=domain,
strategy=strategy,
model=model,
provider=provider,
visualization_format=VisualizationFormat.HTML # Request HTML for viewing
)
if result and result.get("success", False):
# Display results
display_entity_stats(result)
if HAS_NETWORKX: # Only display metrics if networkx is installed
display_graph_metrics(result)
display_entities_table(result)
display_relationships_table(result)
if HAS_NETWORKX: # Tree view also requires networkx
display_entity_graph_tree(result)
# Save visualization if available
vis_path = save_visualization(result, domain.value, strategy.value, OUTPUT_DIR)
if vis_path:
main_console.print(f"\n[green]✓[/green] Visualization saved to: [blue link=file://{vis_path}]{vis_path}[/blue]")
elif "visualization" in result and "error" in result["visualization"]:
main_console.print(f"[yellow]Visualization generation failed: {result['visualization']['error']}[/yellow]")
# Display summary
display_extraction_summary(result)
elif result:
main_console.print(f"[bold red]Extraction reported failure:[/bold red] {result.get('error', 'Unknown error')}")
# If result is None, run_entity_extraction already printed the error
except Exception as e:
# Catch errors not caught by run_entity_extraction's try/except
main_console.print(f"[bold red]Error during {domain_name} demonstration:[/bold red] {escape(str(e))}")
logger.error(f"Unhandled error in {domain_name} demo: {e}", exc_info=True)
async def demonstrate_strategy_comparison():
"""Compare different extraction strategies on the same text."""
display_header("Strategy Comparison")
# Load business article for comparison
comparison_file = "article.txt"
comparison_path = SAMPLE_DIR / comparison_file
display_dataset_info(comparison_path, f"{comparison_file} (For Strategy Comparison)")
if not comparison_path.exists():
return
with open(comparison_path, "r", encoding="utf-8") as f:
comparison_text = f.read()
# Define strategies to compare
strategies_to_compare = [
(GraphStrategy.STANDARD, "Standard"),
(GraphStrategy.MULTISTAGE, "Multistage"),
(GraphStrategy.CHUNKED, "Chunked"), # Will process full text if short, or chunk if long
(GraphStrategy.STRUCTURED, "Structured"), # Needs examples from domain
(GraphStrategy.STRICT_SCHEMA, "Strict Schema") # Needs domain
]
# Setup comparison table
comparison_table = Table(title="Strategy Comparison Results", box=box.ROUNDED, show_header=True, header_style="bold magenta")
comparison_table.add_column("Strategy", style="cyan")
comparison_table.add_column("Entities", style="green", justify="right")
comparison_table.add_column("Rels", style="green", justify="right")
comparison_table.add_column("Time (s)", style="yellow", justify="right")
comparison_table.add_column("Tokens", style="magenta", justify="right")
comparison_table.add_column("Cost ($)", style="blue", justify="right")
# Use a slightly smaller model for faster comparison if needed
comparison_model = "gpt-4.1-mini"
comparison_provider = Provider.OPENAI.value
comparison_domain = TextDomain.BUSINESS
# Compare each strategy
for strategy, desc in strategies_to_compare:
main_console.print(f"\n[bold underline]Running {desc} Strategy[/bold underline]")
# Use full text for chunking demo, maybe excerpt for others if needed for speed?
# Let's use full text for all to see chunking effect properly
text_to_use = comparison_text
result = None # Define result outside try block
try:
result = await run_entity_extraction(
text=text_to_use,
domain=comparison_domain, # Business domain has examples/schema
strategy=strategy,
model=comparison_model,
provider=comparison_provider,
visualization_format=VisualizationFormat.NONE # Skip visualization for comparison
)
if result and result.get("success", False):
# Extract metrics for comparison
entity_count = result.get("metadata", {}).get("entity_count", 0)
rel_count = result.get("metadata", {}).get("relationship_count", 0)
processing_time = result.get("processing_time", 0.0)
token_count = result.get("tokens", {}).get("total", 0)
cost = result.get("cost", 0.0)
# Add to comparison table
comparison_table.add_row(
desc,
str(entity_count),
str(rel_count),
f"{processing_time:.2f}",
f"{token_count:,}",
f"{cost:.6f}"
)
# Display brief stats for this strategy
display_entity_stats(result)
else:
error_msg = result.get("error", "Extraction failed") if result else "Extraction returned None"
main_console.print(f"[bold red]Error with {desc} strategy:[/bold red] {escape(error_msg)}")
comparison_table.add_row(desc, "[red]ERR[/red]", "[red]ERR[/red]", "N/A", "N/A", "N/A")
except Exception as e:
logger.error(f"Unhandled error comparing strategy {desc}: {e}", exc_info=True)
main_console.print(f"[bold red]Unhandled Error with {desc} strategy:[/bold red] {escape(str(e))}")
comparison_table.add_row(desc, "[red]CRASH[/red]", "[red]CRASH[/red]", "N/A", "N/A", "N/A")
# Display final comparison table
main_console.print(Rule("Comparison Summary"))
main_console.print(comparison_table)
async def demonstrate_output_formats():
"""Demonstrate different output formats using a sample text."""
display_header("Output Format Demonstration")
# Load academic paper for output format demo
format_file = "research_paper.txt"
format_path = SAMPLE_DIR / format_file
display_dataset_info(format_path, f"{format_file} (For Output Formats)")
if not format_path.exists():
return
with open(format_path, "r", encoding="utf-8") as f:
format_text = f.read()
# Define output formats to demonstrate
# Exclude NetworkX if library not installed
formats_to_demonstrate = [
(OutputFormat.JSON, "Standard JSON"),
(OutputFormat.CYTOSCAPE, "Cytoscape.js"),
(OutputFormat.NEO4J, "Neo4j Cypher"),
(OutputFormat.RDF, "RDF Triples"),
(OutputFormat.D3, "D3.js nodes/links"),
]
if HAS_NETWORKX:
formats_to_demonstrate.insert(1, (OutputFormat.NETWORKX, "NetworkX Object"))
main_console.print("[bold yellow]Note:[/bold yellow] This demonstrates how extracted data can be formatted.")
# Use a short excerpt for speed
text_excerpt = format_text[:2000]
base_model = "gpt-4.1-mini" # Faster model for formats
base_provider = Provider.OPENAI.value
base_domain = TextDomain.ACADEMIC
# Extract with each output format
for fmt, desc in formats_to_demonstrate:
main_console.print(f"\n[bold underline]Demonstrating {desc} Output Format[/bold underline]")
result = None # Define outside try block
try:
result = await run_entity_extraction(
text=text_excerpt,
domain=base_domain,
strategy=GraphStrategy.STANDARD, # Use standard strategy
model=base_model,
provider=base_provider,
output_format=fmt, # Specify the output format
visualization_format=VisualizationFormat.NONE # No viz needed here
)
if not result or not result.get("success", False):
error_msg = result.get("error", "Extraction failed") if result else "Extraction returned None"
main_console.print(f"[bold red]Error extracting data for {desc} format:[/bold red] {escape(error_msg)}")
continue # Skip displaying output for this format
# Display format-specific output key
output_key = fmt.value # Default key matches enum value
if fmt == OutputFormat.NETWORKX:
output_key = "graph"
if fmt == OutputFormat.NEO4J:
output_key = "neo4j_queries"
if fmt == OutputFormat.RDF:
output_key = "rdf_triples"
if fmt == OutputFormat.D3:
output_key = "d3"
if fmt == OutputFormat.CYTOSCAPE:
output_key = "cytoscape"
if output_key in result:
data_to_display = result[output_key]
display_title = f"Sample of {desc} Output (`{output_key}` key)"
if fmt == OutputFormat.JSON:
# Display a subset of the standard JSON keys
json_subset = {
"entities": result.get("entities", [])[:2],
"relationships": result.get("relationships", [])[:2],
"metadata": {k:v for k,v in result.get("metadata",{}).items() if k in ["entity_count", "relationship_count","processing_strategy"]}
}
output_content = Syntax(json.dumps(json_subset, indent=2), "json", theme="default", line_numbers=True, word_wrap=True)
elif fmt == OutputFormat.NETWORKX:
graph_obj = result["graph"]
info = (
f"[green]✓[/green] NetworkX graph object created: {isinstance(graph_obj, nx.DiGraph)}\n"
f"Nodes: {graph_obj.number_of_nodes()}, Edges: {graph_obj.number_of_edges()}\n\n"
"[italic]Allows graph algorithms (centrality, paths, etc.)[/italic]"
)
output_content = info # Simple text panel
elif fmt == OutputFormat.CYTOSCAPE:
sample = {
"nodes": data_to_display.get("nodes", [])[:2],
"edges": data_to_display.get("edges", [])[:2]
}
output_content = Syntax(json.dumps(sample, indent=2), "json", theme="default", line_numbers=True, word_wrap=True)
elif fmt == OutputFormat.NEO4J:
queries = data_to_display
sample_queries = queries[:3] # Show first few queries
output_content = Syntax("\n\n".join(sample_queries) + "\n...", "cypher", theme="default", line_numbers=True, word_wrap=True)
elif fmt == OutputFormat.RDF:
triples = data_to_display
sample_triples = ['("{}", "{}", "{}")'.format(*t) for t in triples[:5]] # Format first few
output_content = Syntax("\n".join(sample_triples) + "\n...", "turtle", theme="default", line_numbers=True, word_wrap=True) # Turtle isn't perfect but ok
elif fmt == OutputFormat.D3:
sample = {
"nodes": data_to_display.get("nodes", [])[:2],
"links": data_to_display.get("links", [])[:2]
}
output_content = Syntax(json.dumps(sample, indent=2), "json", theme="default", line_numbers=True, word_wrap=True)
else:
# Fallback for unexpected formats
output_content = escape(str(data_to_display)[:500] + "...")
main_console.print(Panel(
output_content,
title=display_title,
border_style="green",
expand=False
))
else:
main_console.print(f"[yellow]Output key '{output_key}' not found in result for {desc} format.[/yellow]")
except Exception as e:
logger.error(f"Unhandled error demonstrating format {desc}: {e}", exc_info=True)
main_console.print(f"[bold red]Unhandled Error with {desc} format:[/bold red] {escape(str(e))}")
async def main():
"""Run entity relation graph extraction demonstrations."""
try:
# Display welcome message
main_console.print(Rule("[bold magenta]Entity Relationship Graph Extraction Demo (v2 Tool)[/bold magenta]"))
main_console.print(
"[bold]This demonstrates the refactored `entity_graph` tool for extracting and visualizing "
"knowledge graphs from text across different domains and using various strategies.[/bold]\n"
)
# Check for dependencies needed by the demo display itself
if not HAS_VISUALIZATION_LIBS:
main_console.print("[yellow]Warning:[/yellow] `networkx`, `pyvis`, or `matplotlib` not installed.")
main_console.print("Graph metrics, tree view, and some visualizations may be unavailable.")
if not HAS_NETWORKX:
main_console.print("[yellow]Warning:[/yellow] `networkx` not installed. Graph metrics and tree view disabled.")
# Initialize the Gateway (optional, depends if Gateway context is needed for config/logging)
# If the tool functions standalone, this might not be strictly necessary for the demo.
# gateway = Gateway("entity-graph-demo", register_tools=True)
# logger.info("Ultimate MCP Server initialized (optional for demo).")
# Check if sample directory exists
if not SAMPLE_DIR.exists() or not any(SAMPLE_DIR.iterdir()):
main_console.print(f"[bold red]Error:[/bold red] Sample directory '{SAMPLE_DIR}' not found or is empty!")
main_console.print("Please create the 'sample' directory next to this script and add text files (e.g., article.txt).")
return 1
# --- Run Demonstrations ---
# Define models to use - maybe select based on availability or speed
# Using Sonnet as a balance, Haiku for comparisons/formats
default_model = "gpt-4.1-mini"
default_provider = Provider.OPENAI.value # String like "anthropic"
# 1. Domain Examples (using appropriate strategies)
await demonstrate_domain_extraction(TextDomain.BUSINESS, "article.txt", GraphStrategy.STANDARD, model=default_model, provider=default_provider)
await demonstrate_domain_extraction(TextDomain.ACADEMIC, "research_paper.txt", GraphStrategy.MULTISTAGE, model=default_model, provider=default_provider)
await demonstrate_domain_extraction(TextDomain.LEGAL, "legal_contract.txt", GraphStrategy.STRUCTURED, model=default_model, provider=default_provider)
await demonstrate_domain_extraction(TextDomain.MEDICAL, "medical_case.txt", GraphStrategy.STRICT_SCHEMA, model=default_model, provider=default_provider)
# 2. Strategy Comparison
await demonstrate_strategy_comparison()
# 3. Output Format Demonstration
await demonstrate_output_formats()
main_console.print(Rule("[bold green]Entity Relationship Graph Extraction Demo Complete[/bold green]"))
# Split the print into two simpler statements to avoid rich markup issues
main_console.print(f"\n[bold]Visualizations and outputs have been saved to:[/bold] {OUTPUT_DIR}")
main_console.print("Open any HTML files in a web browser to explore interactive graphs.")
return 0
except Exception as e:
# Catch-all for unexpected errors during setup or top-level execution
logger.critical(f"Demo failed catastrophically: {e}", exc_info=True)
main_console.print(f"[bold red]Critical Demo Error:[/bold red] {escape(str(e))}")
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/ums_api/ums_models.py:
--------------------------------------------------------------------------------
```python
"""Pydantic models for UMS API endpoints."""
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
# ---------- Cognitive States Models ----------
class CognitiveState(BaseModel):
state_id: str
timestamp: float
formatted_timestamp: str
state_type: str
description: Optional[str] = None
workflow_id: Optional[str] = None
workflow_title: Optional[str] = None
complexity_score: float
change_magnitude: float
age_minutes: float
memory_count: int
action_count: int
state_data: Dict[str, Any] = {}
class CognitiveStatesResponse(BaseModel):
states: List[CognitiveState]
total: int
has_more: bool
class TimelineState(BaseModel):
state_id: str
timestamp: float
formatted_time: str
state_type: str
workflow_id: Optional[str] = None
description: Optional[str] = None
sequence_number: int
complexity_score: float
change_magnitude: float
class TimelineSummaryStats(BaseModel):
avg_complexity: float
total_transitions: int
max_change_magnitude: float
class CognitiveTimelineResponse(BaseModel):
timeline_data: List[TimelineState]
total_states: int
time_range_hours: int
granularity: str
summary_stats: TimelineSummaryStats
class Memory(BaseModel):
memory_id: str
memory_type: str
content: str
importance: float
created_at: float
class Action(BaseModel):
action_id: str
action_type: str
tool_name: str
status: str
started_at: float
class DetailedCognitiveState(BaseModel):
state_id: str
timestamp: float
formatted_timestamp: str
state_type: str
description: Optional[str] = None
workflow_id: Optional[str] = None
workflow_title: Optional[str] = None
workflow_goal: Optional[str] = None
state_data: Dict[str, Any]
complexity_score: float
memories: List[Memory] = []
actions: List[Action] = []
class Pattern(BaseModel):
type: str
length: int
similarity: float
occurrences: int
first_occurrence: float
pattern_description: str
class Transition(BaseModel):
transition: str
count: int
percentage: float
class Anomaly(BaseModel):
state_id: str
timestamp: float
anomaly_type: str
z_score: float
description: str
severity: str
class PatternSummary(BaseModel):
pattern_count: int
most_common_transition: Optional[Transition] = None
anomaly_count: int
class CognitivePatternAnalysis(BaseModel):
total_states: int
time_range_hours: int
patterns: List[Pattern] = []
transitions: List[Transition] = []
anomalies: List[Anomaly] = []
summary: PatternSummary
class StateComparisonInfo(BaseModel):
state_id: str
timestamp: float
formatted_timestamp: str
class StateDiff(BaseModel):
added: Dict[str, Any] = {}
removed: Dict[str, Any] = {}
modified: Dict[str, Dict[str, Any]] = {}
magnitude: float
class StateComparisonRequest(BaseModel):
state_id_1: str = Field(
...,
description="First cognitive state ID for comparison",
example="state_abc123"
)
state_id_2: str = Field(
...,
description="Second cognitive state ID for comparison",
example="state_xyz789"
)
class StateComparisonResponse(BaseModel):
state_1: StateComparisonInfo
state_2: StateComparisonInfo
time_diff_minutes: float
diff: StateDiff
# ---------- Action Monitor Models ----------
class StatusIndicator(BaseModel):
"""Action status indicator with visual cues"""
color: str = Field(..., description="Color for visual representation")
icon: str = Field(..., description="Icon name for the status")
label: str = Field(..., description="Human-readable status label")
urgency: str = Field(..., description="Urgency level: low, medium, high")
class ResourceUsage(BaseModel):
"""Resource usage metrics for an action"""
cpu_usage: float = Field(..., description="CPU usage percentage")
memory_usage: float = Field(..., description="Memory usage percentage")
network_io: float = Field(..., description="Network I/O in KB/s")
disk_io: float = Field(..., description="Disk I/O in KB/s")
class RunningAction(BaseModel):
"""Model for a currently running action"""
action_id: str = Field(..., description="Unique action identifier")
workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
workflow_title: Optional[str] = Field(None, description="Workflow title")
tool_name: str = Field(..., description="Name of the tool being executed")
status: str = Field(..., description="Current execution status")
started_at: float = Field(..., description="Start timestamp")
formatted_start_time: str = Field(..., description="ISO formatted start time")
execution_time_seconds: float = Field(
..., description="Current execution duration in seconds"
)
estimated_duration: Optional[float] = Field(
None, description="Estimated duration in seconds"
)
progress_percentage: float = Field(..., description="Estimated progress percentage")
status_indicator: StatusIndicator = Field(..., description="Visual status indicator")
performance_category: str = Field(..., description="Performance categorization")
resource_usage: ResourceUsage = Field(..., description="Current resource usage")
tool_data: Dict[str, Any] = Field(
default_factory=dict, description="Tool-specific data"
)
class RunningActionsResponse(BaseModel):
"""Response for currently running actions"""
running_actions: List[RunningAction] = Field(
..., description="List of currently executing actions"
)
total_running: int = Field(..., description="Total number of running actions")
avg_execution_time: float = Field(
..., description="Average execution time of running actions"
)
timestamp: str = Field(..., description="Response timestamp")
class QueuedAction(BaseModel):
"""Model for a queued action"""
action_id: str = Field(..., description="Unique action identifier")
workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
workflow_title: Optional[str] = Field(None, description="Workflow title")
tool_name: str = Field(..., description="Name of the tool to be executed")
status: str = Field(..., description="Queue status")
created_at: float = Field(..., description="Creation timestamp")
formatted_queue_time: str = Field(..., description="ISO formatted queue time")
queue_position: int = Field(..., description="Position in the queue (1-based)")
queue_time_seconds: float = Field(..., description="Time spent in queue")
estimated_wait_time: float = Field(..., description="Estimated wait time in seconds")
priority: int = Field(..., description="Numeric priority value")
priority_label: str = Field(..., description="Human-readable priority label")
tool_data: Dict[str, Any] = Field(
default_factory=dict, description="Tool-specific data"
)
class ActionQueueResponse(BaseModel):
"""Response for action queue status"""
queued_actions: List[QueuedAction] = Field(..., description="List of queued actions")
total_queued: int = Field(..., description="Total number of queued actions")
avg_queue_time: float = Field(..., description="Average time in queue")
next_action: Optional[QueuedAction] = Field(
None, description="Next action to be executed"
)
timestamp: str = Field(..., description="Response timestamp")
class ActionHistoryItem(BaseModel):
"""Model for a single action in history"""
action_id: str = Field(..., description="Unique action identifier")
workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
workflow_title: Optional[str] = Field(None, description="Associated workflow title")
tool_name: str = Field(..., description="Name of the tool executed")
action_type: Optional[str] = Field(None, description="Type of action")
status: str = Field(..., description="Action completion status")
started_at: float = Field(..., description="Unix timestamp when action started")
completed_at: Optional[float] = Field(
None, description="Unix timestamp when action completed"
)
execution_duration_seconds: float = Field(
..., description="Total execution time in seconds"
)
performance_score: float = Field(
..., description="Calculated performance score (0-100)"
)
efficiency_rating: str = Field(
..., description="Efficiency rating based on time and output"
)
success_rate_impact: int = Field(..., description="Impact on success rate (1 or 0)")
formatted_start_time: str = Field(..., description="ISO formatted start time")
formatted_completion_time: Optional[str] = Field(
None, description="ISO formatted completion time"
)
tool_data: Dict[str, Any] = Field(
default_factory=dict, description="Tool-specific data"
)
result_data: Dict[str, Any] = Field(
default_factory=dict, description="Action result data"
)
result_size: int = Field(0, description="Size of the result data")
class PerformanceSummary(BaseModel):
"""Performance summary statistics"""
avg_score: float = Field(..., description="Average performance score")
top_performer: Optional[Dict[str, Any]] = Field(
None, description="Best performing tool"
)
worst_performer: Optional[Dict[str, Any]] = Field(
None, description="Worst performing tool"
)
efficiency_distribution: Dict[str, int] = Field(
..., description="Distribution of efficiency ratings"
)
class ActionHistoryResponse(BaseModel):
"""Response model for action history"""
action_history: List[ActionHistoryItem] = Field(
..., description="List of completed actions"
)
total_actions: int = Field(
..., description="Total number of actions in the time period"
)
success_rate: float = Field(..., description="Overall success rate percentage")
avg_execution_time: float = Field(..., description="Average execution time in seconds")
performance_summary: PerformanceSummary = Field(
..., description="Performance summary statistics"
)
timestamp: str = Field(..., description="Response timestamp")
class OverallMetrics(BaseModel):
"""Overall action execution metrics"""
total_actions: int = Field(..., description="Total number of actions executed")
successful_actions: int = Field(
..., description="Number of successfully completed actions"
)
failed_actions: int = Field(..., description="Number of failed actions")
avg_duration: Optional[float] = Field(
None, description="Average execution duration in seconds"
)
success_rate_percentage: float = Field(
..., description="Overall success rate as percentage"
)
failure_rate_percentage: float = Field(
..., description="Overall failure rate as percentage"
)
avg_duration_seconds: float = Field(..., description="Average duration in seconds")
class ToolUsageStat(BaseModel):
"""Statistics for a single tool"""
tool_name: str = Field(..., description="Name of the tool")
usage_count: int = Field(..., description="Number of times the tool was used")
success_count: int = Field(..., description="Number of successful executions")
avg_duration: Optional[float] = Field(
None, description="Average execution time in seconds"
)
class HourlyMetric(BaseModel):
"""Hourly performance metrics"""
hour: str = Field(..., description="Hour of the day (0-23)")
action_count: int = Field(..., description="Number of actions in this hour")
avg_duration: Optional[float] = Field(
None, description="Average duration for this hour"
)
success_count: int = Field(..., description="Number of successful actions")
class PerformanceInsight(BaseModel):
"""Performance insight or recommendation"""
type: str = Field(..., description="Type of insight (warning, info, etc.)")
title: str = Field(..., description="Title of the insight")
message: str = Field(..., description="Detailed message")
severity: str = Field(..., description="Severity level (high, medium, low)")
class ActionMetricsResponse(BaseModel):
"""Response model for action metrics"""
overall_metrics: OverallMetrics = Field(..., description="Overall execution metrics")
tool_usage_stats: List[ToolUsageStat] = Field(
..., description="Per-tool usage statistics"
)
hourly_performance: List[HourlyMetric] = Field(
..., description="Hourly performance breakdown"
)
performance_insights: List[PerformanceInsight] = Field(
..., description="Actionable insights and recommendations"
)
timestamp: str = Field(..., description="Response timestamp")
# ---------- Artifacts Models ----------
class Artifact(BaseModel):
"""Model for a single artifact"""
artifact_id: str = Field(..., description="Unique artifact identifier")
name: str = Field(..., description="Name of the artifact")
artifact_type: str = Field(
..., description="Type of artifact (document, image, code, etc.)"
)
description: Optional[str] = Field(None, description="Description of the artifact")
file_path: Optional[str] = Field(None, description="File system path to the artifact")
workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
workflow_title: Optional[str] = Field(None, description="Title of associated workflow")
created_at: float = Field(..., description="Creation timestamp")
updated_at: float = Field(..., description="Last update timestamp")
file_size: int = Field(..., description="File size in bytes")
file_size_human: str = Field(..., description="Human-readable file size")
importance: Optional[float] = Field(None, description="Importance score (1-10)")
access_count: int = Field(0, description="Number of times accessed")
tags: List[str] = Field(default_factory=list, description="Associated tags")
metadata: Dict[str, Any] = Field(
default_factory=dict, description="Additional metadata"
)
relationship_count: int = Field(0, description="Number of related artifacts")
version_count: int = Field(0, description="Number of versions")
formatted_created_at: str = Field(..., description="ISO formatted creation date")
formatted_updated_at: str = Field(..., description="ISO formatted update date")
age_days: float = Field(..., description="Age of artifact in days")
class ArtifactsFilter(BaseModel):
"""Filter parameters used in the request"""
artifact_type: Optional[str] = Field(None, description="Type filter applied")
workflow_id: Optional[str] = Field(None, description="Workflow filter applied")
tags: Optional[str] = Field(None, description="Tags filter applied")
search: Optional[str] = Field(None, description="Search query applied")
sort_by: str = Field(..., description="Sort field used")
sort_order: str = Field(..., description="Sort order used")
class ArtifactsResponse(BaseModel):
"""Response model for artifacts listing"""
artifacts: List[Artifact] = Field(..., description="List of artifacts")
total: int = Field(..., description="Total number of artifacts matching query")
has_more: bool = Field(..., description="Whether there are more artifacts available")
filters: ArtifactsFilter = Field(..., description="Filters that were applied")
class ArtifactTypeStats(BaseModel):
"""Statistics for a specific artifact type"""
artifact_type: str = Field(..., description="Type of artifact")
count: int = Field(..., description="Number of artifacts of this type")
avg_importance: Optional[float] = Field(None, description="Average importance score")
total_size: int = Field(..., description="Total size of all artifacts of this type")
max_access_count: int = Field(..., description="Maximum access count for this type")
class ArtifactOverallStats(BaseModel):
"""Overall artifact statistics"""
total_artifacts: int = Field(..., description="Total number of artifacts")
unique_types: int = Field(..., description="Number of unique artifact types")
unique_workflows: int = Field(..., description="Number of unique workflows")
total_size: int = Field(..., description="Total size of all artifacts in bytes")
total_size_human: str = Field(..., description="Human-readable total size")
avg_size: float = Field(..., description="Average artifact size in bytes")
latest_created: Optional[float] = Field(
None, description="Timestamp of most recent artifact"
)
earliest_created: Optional[float] = Field(
None, description="Timestamp of oldest artifact"
)
class ArtifactStatsResponse(BaseModel):
"""Response model for artifact statistics"""
overall: ArtifactOverallStats = Field(..., description="Overall statistics")
by_type: List[ArtifactTypeStats] = Field(
..., description="Statistics broken down by type"
)
# ---------- Memory Quality Models ----------
class MemoryDetail(BaseModel):
"""Detailed information about a memory"""
memory_id: str = Field(..., description="Unique memory identifier")
workflow_id: Optional[str] = Field(None, description="Associated workflow ID")
memory_type: str = Field(..., description="Type of memory")
importance: float = Field(..., description="Importance score")
created_at: float = Field(..., description="Creation timestamp")
class DuplicateGroup(BaseModel):
"""Group of duplicate memories"""
cluster_id: str = Field(..., description="Unique identifier for this duplicate cluster")
content_preview: str = Field(..., description="Preview of the duplicated content")
duplicate_count: int = Field(..., description="Number of duplicates in this group")
memory_ids: List[str] = Field(..., description="List of all memory IDs in this group")
primary_memory_id: str = Field(..., description="Suggested primary memory to keep")
memory_details: List[MemoryDetail] = Field(..., description="Detailed info for each memory")
first_created: float = Field(..., description="Timestamp of earliest duplicate")
last_created: float = Field(..., description="Timestamp of latest duplicate")
avg_importance: float = Field(..., description="Average importance across duplicates")
recommendation: str = Field(..., description="Recommended action (merge/review)")
class DuplicatesResponse(BaseModel):
"""Response model for duplicate analysis"""
success: bool = Field(..., description="Whether analysis completed successfully")
clusters: List[DuplicateGroup] = Field(..., description="List of duplicate groups")
duplicate_groups: List[DuplicateGroup] = Field(..., description="Alias for clusters (backward compatibility)")
total_groups: int = Field(..., description="Total number of duplicate groups found")
total_duplicates: int = Field(..., description="Total number of duplicate memories")
class OrphanedMemory(BaseModel):
"""Model for an orphaned memory"""
memory_id: str = Field(..., description="Unique memory identifier")
content: str = Field(..., description="Memory content")
memory_type: str = Field(..., description="Type of memory")
importance: float = Field(..., description="Importance score")
created_at: float = Field(..., description="Creation timestamp")
class OrphanedMemoriesResponse(BaseModel):
"""Response model for orphaned memories"""
success: bool = Field(..., description="Whether query completed successfully")
orphaned_memories: List[OrphanedMemory] = Field(..., description="List of orphaned memories")
total_orphaned: int = Field(..., description="Total count of orphaned memories")
recommendation: str = Field(..., description="Recommended action for orphaned memories")
class BulkOperationRequest(BaseModel):
"""Request model for bulk operations"""
operation_type: str = Field(
...,
description="Type of bulk operation to perform",
regex="^(delete|archive|merge)$"
)
memory_ids: List[str] = Field(
...,
description="List of memory IDs to operate on",
min_items=1
)
target_memory_id: Optional[str] = Field(
None,
description="Target memory ID for merge operations"
)
class BulkOperationResponse(BaseModel):
"""Response model for bulk operations"""
success: bool = Field(..., description="Whether operation completed successfully")
operation_type: str = Field(..., description="Type of operation performed")
memory_ids: List[str] = Field(..., description="Memory IDs that were operated on")
success_count: int = Field(..., description="Number of successful operations")
error_count: int = Field(..., description="Number of failed operations")
message: str = Field(..., description="Summary message of the operation")
errors: List[str] = Field(default_factory=list, description="List of error messages")
merged_into: Optional[str] = Field(None, description="Target memory ID for merge operations")
class PreviewMemory(BaseModel):
"""Memory preview for bulk operations"""
memory_id: str = Field(..., description="Memory ID")
content: str = Field(..., description="Memory content")
memory_type: str = Field(..., description="Type of memory")
importance: float = Field(..., description="Importance score")
workflow_id: Optional[str] = Field(None, description="Associated workflow")
class BulkOperationPreview(BaseModel):
"""Preview of bulk operation effects"""
operation_type: str = Field(..., description="Type of operation to be performed")
total_affected: int = Field(..., description="Total memories that will be affected")
preview_description: str = Field(..., description="Description of what will happen")
affected_memories: List[PreviewMemory] = Field(..., description="Details of affected memories")
merge_target: Optional[PreviewMemory] = Field(None, description="Target memory for merge")
will_be_deleted: Optional[List[PreviewMemory]] = Field(None, description="Memories to be deleted in merge")
class BulkPreviewResponse(BaseModel):
"""Response model for bulk operation preview"""
success: bool = Field(..., description="Whether preview generated successfully")
operation: BulkOperationPreview = Field(..., description="Preview of the operation")
# ---------- Working Memory Models ----------
class FocusMode(BaseModel):
"""Focus mode configuration"""
enabled: bool = Field(..., description="Whether focus mode is enabled")
focus_keywords: List[str] = Field(default_factory=list, description="Keywords for focus filtering")
class PerformanceMetrics(BaseModel):
"""Working memory performance metrics"""
avg_relevance_score: float = Field(..., description="Average relevance score across all memories")
optimization_suggestions: int = Field(..., description="Number of optimization suggestions")
class WorkingMemoryStatus(BaseModel):
"""Complete working memory system status"""
initialized: bool = Field(..., description="Whether the system is initialized")
total_capacity: int = Field(..., description="Maximum memory capacity")
current_size: int = Field(..., description="Current number of memories in pool")
utilization_percentage: float = Field(..., description="Percentage of capacity used")
focus_mode: FocusMode = Field(..., description="Focus mode configuration")
performance_metrics: PerformanceMetrics = Field(..., description="Performance metrics")
category_distribution: Dict[str, int] = Field(default_factory=dict, description="Memory count by category")
last_optimization: str = Field(..., description="ISO timestamp of last optimization")
optimization_count: int = Field(..., description="Total number of optimizations performed")
class InitializeRequest(BaseModel):
"""Request model for initializing working memory"""
capacity: int = Field(
100,
ge=10,
le=1000,
description="Maximum number of memories in working pool"
)
focus_threshold: float = Field(
0.7,
ge=0.0,
le=1.0,
description="Relevance threshold for focus mode"
)
class InitializeResponse(BaseModel):
"""Response model for initialization"""
success: bool = Field(..., description="Whether initialization was successful")
message: str = Field(..., description="Status message")
configuration: Dict[str, Any] = Field(..., description="Applied configuration")
class MemoryItem(BaseModel):
"""Model for a memory in the working pool"""
memory_id: str = Field(..., description="Unique memory identifier")
content: str = Field(..., description="Memory content")
category: str = Field(..., description="Memory category")
importance: float = Field(..., description="Importance score (0-10)")
relevance_score: float = Field(..., description="Current relevance score (0-1)")
added_at: float = Field(..., description="Timestamp when added to working memory")
last_accessed: float = Field(..., description="Timestamp of last access")
access_count: int = Field(..., description="Number of times accessed")
class ActiveMemoriesResponse(BaseModel):
"""Response for active memories query"""
memories: List[MemoryItem] = Field(..., description="List of active memories sorted by relevance")
total_count: int = Field(..., description="Total number of memories matching criteria")
focus_active: bool = Field(..., description="Whether focus mode filtering is active")
class SetFocusModeRequest(BaseModel):
"""Request to set focus mode"""
enabled: bool = Field(..., description="Enable or disable focus mode")
keywords: List[str] = Field(default_factory=list, description="Keywords for focus filtering", max_items=20)
class OptimizeResponse(BaseModel):
"""Response for optimization operation"""
success: bool = Field(..., description="Whether optimization was successful")
removed_count: int = Field(..., description="Number of memories removed")
message: str = Field(..., description="Optimization result message")
# ---------- Performance Profiler Models ----------
class PerformanceOverviewStats(BaseModel):
"""Overall performance statistics"""
total_actions: int = Field(..., description="Total number of actions executed")
active_workflows: int = Field(..., description="Number of unique workflows")
avg_execution_time: float = Field(..., description="Average execution time in seconds")
min_execution_time: Optional[float] = Field(None, description="Minimum execution time")
max_execution_time: Optional[float] = Field(None, description="Maximum execution time")
successful_actions: int = Field(..., description="Number of successful actions")
failed_actions: int = Field(..., description="Number of failed actions")
tools_used: int = Field(..., description="Number of distinct tools used")
success_rate_percentage: float = Field(..., description="Success rate as percentage")
throughput_per_hour: float = Field(..., description="Actions processed per hour")
error_rate_percentage: float = Field(..., description="Error rate as percentage")
avg_workflow_size: float = Field(..., description="Average actions per workflow")
class TimelineBucket(BaseModel):
"""Performance metrics for a time bucket"""
time_bucket: str = Field(..., description="Time bucket identifier")
action_count: int = Field(..., description="Number of actions in this bucket")
avg_duration: Optional[float] = Field(None, description="Average duration in seconds")
successful_count: int = Field(..., description="Number of successful actions")
failed_count: int = Field(..., description="Number of failed actions")
workflow_count: int = Field(..., description="Number of unique workflows")
class ToolUtilization(BaseModel):
"""Tool utilization metrics"""
tool_name: str = Field(..., description="Name of the tool")
usage_count: int = Field(..., description="Number of times used")
avg_duration: Optional[float] = Field(None, description="Average execution duration")
success_count: int = Field(..., description="Number of successful executions")
max_duration: Optional[float] = Field(None, description="Maximum execution duration")
class Bottleneck(BaseModel):
"""Performance bottleneck information"""
tool_name: str = Field(..., description="Tool causing the bottleneck")
workflow_id: Optional[str] = Field(None, description="Associated workflow")
action_id: str = Field(..., description="Action identifier")
started_at: float = Field(..., description="Start timestamp")
completed_at: Optional[float] = Field(None, description="Completion timestamp")
duration: float = Field(..., description="Duration in seconds")
status: str = Field(..., description="Action status")
reasoning: Optional[str] = Field(None, description="Action reasoning")
class PerformanceOverviewResponse(BaseModel):
"""Response model for performance overview"""
overview: PerformanceOverviewStats
timeline: List[TimelineBucket]
tool_utilization: List[ToolUtilization]
bottlenecks: List[Bottleneck]
analysis_period: Dict[str, Any] = Field(..., description="Analysis period information")
timestamp: str = Field(..., description="Response generation timestamp")
class ToolBottleneck(BaseModel):
"""Tool performance bottleneck analysis"""
tool_name: str = Field(..., description="Name of the tool")
total_calls: int = Field(..., description="Total number of calls")
avg_duration: float = Field(..., description="Average execution duration")
max_duration: float = Field(..., description="Maximum execution duration")
min_duration: float = Field(..., description="Minimum execution duration")
p95_duration: float = Field(..., description="95th percentile duration")
p99_duration: float = Field(..., description="99th percentile duration")
failure_count: int = Field(..., description="Number of failures")
total_time_spent: float = Field(..., description="Total time spent in seconds")
class WorkflowBottleneck(BaseModel):
"""Workflow performance bottleneck"""
workflow_id: str = Field(..., description="Workflow identifier")
title: Optional[str] = Field(None, description="Workflow title")
action_count: int = Field(..., description="Number of actions")
avg_action_duration: float = Field(..., description="Average action duration")
max_action_duration: float = Field(..., description="Maximum action duration")
total_workflow_time: float = Field(..., description="Total workflow execution time")
workflow_start: float = Field(..., description="Workflow start timestamp")
workflow_end: float = Field(..., description="Workflow end timestamp")
total_elapsed_time: float = Field(..., description="Total elapsed wall-clock time")
class ParallelizationOpportunity(BaseModel):
"""Workflow parallelization opportunity"""
workflow_id: str = Field(..., description="Workflow identifier")
sequential_actions: int = Field(..., description="Number of sequential actions")
total_sequential_time: float = Field(..., description="Total sequential execution time")
actual_elapsed_time: float = Field(..., description="Actual elapsed time")
potential_time_savings: float = Field(..., description="Potential time savings in seconds")
parallelization_efficiency: float = Field(..., description="Current parallelization efficiency percentage")
optimization_score: float = Field(..., description="Optimization potential score (0-10)")
class ResourceContention(BaseModel):
"""Resource contention analysis"""
tool_name: str = Field(..., description="Tool name")
concurrent_usage: int = Field(..., description="Number of concurrent usages")
avg_duration_under_contention: float = Field(..., description="Average duration when contended")
class OptimizationRecommendation(BaseModel):
"""Performance optimization recommendation"""
type: str = Field(..., description="Type of optimization")
priority: str = Field(..., description="Priority level (high, medium, low)")
title: str = Field(..., description="Recommendation title")
description: str = Field(..., description="Detailed description")
impact: str = Field(..., description="Expected impact description")
actions: List[str] = Field(..., description="Recommended actions to take")
class BottleneckAnalysisResponse(BaseModel):
"""Response model for bottleneck analysis"""
tool_bottlenecks: List[ToolBottleneck]
workflow_bottlenecks: List[WorkflowBottleneck]
parallelization_opportunities: List[ParallelizationOpportunity]
resource_contention: List[ResourceContention]
recommendations: List[OptimizationRecommendation]
analysis_summary: Dict[str, Any]
timestamp: str
class FlameGraphNode(BaseModel):
"""Model for a flame graph node"""
name: str = Field(..., description="Name of the node (workflow, tool, or action)")
value: float = Field(..., description="Duration in seconds")
children: List['FlameGraphNode'] = Field(default_factory=list, description="Child nodes")
action_id: Optional[str] = Field(None, description="Action ID if this is an action node")
status: Optional[str] = Field(None, description="Execution status")
reasoning: Optional[str] = Field(None, description="Reasoning for the action")
started_at: Optional[float] = Field(None, description="Start timestamp")
completed_at: Optional[float] = Field(None, description="Completion timestamp")
FlameGraphNode.model_rebuild() # Needed for recursive model
class CriticalPathAction(BaseModel):
"""Model for a critical path action"""
action_id: str = Field(..., description="Action identifier")
tool_name: str = Field(..., description="Tool used for the action")
duration: float = Field(..., description="Duration in seconds")
start_time: float = Field(..., description="Start timestamp")
end_time: float = Field(..., description="End timestamp")
class WorkflowMetrics(BaseModel):
"""Workflow performance metrics"""
total_actions: int = Field(..., description="Total number of actions in workflow")
total_cpu_time: float = Field(..., description="Total CPU time (sum of all action durations)")
wall_clock_time: float = Field(..., description="Total wall clock time from start to end")
parallelization_efficiency: float = Field(..., description="Efficiency percentage (0-100)")
avg_action_duration: float = Field(..., description="Average duration per action")
workflow_start: float = Field(..., description="Workflow start timestamp")
workflow_end: float = Field(..., description="Workflow end timestamp")
class WorkflowAnalysis(BaseModel):
"""Analysis results for workflow optimization"""
bottleneck_tool: Optional[str] = Field(None, description="Tool causing the main bottleneck")
parallelization_potential: float = Field(..., description="Potential time savings through parallelization")
optimization_score: float = Field(..., description="Overall optimization score (0-10)")
class FlameGraphResponse(BaseModel):
"""Response model for flame graph generation"""
flame_graph: FlameGraphNode = Field(..., description="Hierarchical flame graph data")
metrics: WorkflowMetrics = Field(..., description="Workflow performance metrics")
critical_path: List[CriticalPathAction] = Field(..., description="Critical path through the workflow")
analysis: WorkflowAnalysis = Field(..., description="Workflow optimization analysis")
timestamp: str = Field(..., description="Response generation timestamp")
class DailyTrend(BaseModel):
"""Model for daily performance metrics"""
date: str = Field(..., description="Date in YYYY-MM-DD format")
action_count: int = Field(..., description="Number of actions executed")
avg_duration: Optional[float] = Field(None, description="Average action duration in seconds")
success_rate: float = Field(..., description="Success rate percentage (0-100)")
throughput: float = Field(..., description="Actions per hour")
error_rate: float = Field(..., description="Error rate percentage (0-100)")
successful_actions: int = Field(..., description="Number of successful actions")
failed_actions: int = Field(..., description="Number of failed actions")
workflow_count: int = Field(..., description="Number of unique workflows")
tool_count: int = Field(..., description="Number of unique tools used")
class ToolTrend(BaseModel):
"""Model for tool-specific performance trends"""
tool_name: str = Field(..., description="Name of the tool")
date: str = Field(..., description="Date in YYYY-MM-DD format")
usage_count: int = Field(..., description="Number of times used")
avg_duration: Optional[float] = Field(None, description="Average execution duration")
success_count: int = Field(..., description="Number of successful executions")
class WorkflowComplexityTrend(BaseModel):
"""Model for workflow complexity trends"""
date: str = Field(..., description="Date in YYYY-MM-DD format")
workflow_id: str = Field(..., description="Workflow identifier")
action_count: int = Field(..., description="Number of actions in workflow")
total_duration: Optional[float] = Field(None, description="Total workflow duration")
elapsed_time: Optional[float] = Field(None, description="Wall clock time")
class TrendAnalysis(BaseModel):
"""Trend analysis results"""
performance_trend: str = Field(..., description="Overall performance trend (improving/degrading/stable/insufficient_data)")
success_trend: str = Field(..., description="Success rate trend (improving/degrading/stable/insufficient_data)")
data_points: int = Field(..., description="Number of data points analyzed")
analysis_period_days: int = Field(..., description="Analysis period in days")
class InsightMetrics(BaseModel):
"""Performance insight metrics"""
best_performing_day: Optional[DailyTrend] = Field(None, description="Day with best performance")
worst_performing_day: Optional[DailyTrend] = Field(None, description="Day with worst performance")
peak_throughput_day: Optional[DailyTrend] = Field(None, description="Day with highest throughput")
avg_daily_actions: float = Field(..., description="Average actions per day")
class PerformanceTrendsResponse(BaseModel):
"""Response model for performance trends analysis"""
daily_trends: List[DailyTrend] = Field(..., description="Daily performance metrics")
tool_trends: List[ToolTrend] = Field(..., description="Tool-specific performance trends")
workflow_complexity: List[WorkflowComplexityTrend] = Field(..., description="Workflow complexity trends")
trend_analysis: TrendAnalysis = Field(..., description="Overall trend analysis")
patterns: List[PerformancePattern] = Field(..., description="Detected performance patterns")
insights: InsightMetrics = Field(..., description="Key performance insights")
timestamp: str = Field(..., description="Response generation timestamp")
class ImpactEstimate(BaseModel):
"""Model for recommendation impact estimates"""
time_savings_potential: float = Field(..., description="Estimated time savings in seconds")
affected_actions: int = Field(..., description="Number of actions that would benefit")
cost_benefit_ratio: float = Field(..., description="Ratio of benefit to implementation cost")
affected_workflows: Optional[int] = Field(None, description="Number of affected workflows")
efficiency_improvement: Optional[float] = Field(None, description="Percentage efficiency improvement")
reliability_improvement: Optional[float] = Field(None, description="Percentage reliability improvement")
user_experience_impact: Optional[str] = Field(None, description="Impact on user experience (high/medium/low)")
class PerformanceRecommendation(BaseModel):
"""Model for a single performance recommendation"""
id: str = Field(..., description="Unique identifier for the recommendation")
type: str = Field(..., description="Type of recommendation (tool_optimization, parallelization, reliability_improvement)")
priority: str = Field(..., description="Priority level (high, medium, low)")
title: str = Field(..., description="Brief title of the recommendation")
description: str = Field(..., description="Detailed description of the issue and recommendation")
impact_estimate: ImpactEstimate = Field(..., description="Estimated impact of implementing this recommendation")
implementation_steps: List[str] = Field(..., description="Step-by-step implementation guide")
estimated_effort: str = Field(..., description="Estimated implementation effort (low, medium, high)")
prerequisites: List[str] = Field(..., description="Prerequisites for implementation")
metrics_to_track: List[str] = Field(..., description="Metrics to track after implementation")
class RecommendationSummary(BaseModel):
"""Summary statistics for recommendations"""
total_recommendations: int = Field(..., description="Total number of recommendations generated")
high_priority: int = Field(..., description="Number of high priority recommendations")
medium_priority: int = Field(..., description="Number of medium priority recommendations")
low_priority: int = Field(..., description="Number of low priority recommendations")
estimated_total_savings: float = Field(..., description="Total estimated time savings in seconds")
analysis_period_hours: int = Field(..., description="Hours of data analyzed")
class ImplementationRoadmap(BaseModel):
"""Categorized implementation roadmap"""
quick_wins: List[PerformanceRecommendation] = Field(..., description="Low effort, high impact recommendations")
major_improvements: List[PerformanceRecommendation] = Field(..., description="High effort, high impact recommendations")
maintenance_tasks: List[PerformanceRecommendation] = Field(..., description="Low priority maintenance recommendations")
class PerformanceRecommendationsResponse(BaseModel):
"""Response model for performance recommendations"""
recommendations: List[PerformanceRecommendation] = Field(..., description="List of actionable recommendations")
summary: RecommendationSummary = Field(..., description="Summary statistics")
implementation_roadmap: ImplementationRoadmap = Field(..., description="Recommendations organized by implementation strategy")
timestamp: str = Field(..., description="ISO timestamp of analysis")
# ---------- Workflow Management Models ----------
class WorkflowScheduleRequest(BaseModel):
"""Request model for scheduling a workflow"""
scheduled_at: datetime = Field(
...,
description="ISO timestamp for when to execute the workflow",
example="2024-01-01T12:00:00Z"
)
priority: int = Field(
default=5,
ge=1,
le=10,
description="Execution priority (1=highest, 10=lowest)",
example=3
)
class ScheduleData(BaseModel):
"""Schedule data for the workflow"""
workflow_id: str = Field(..., description="ID of the scheduled workflow")
scheduled_at: str = Field(..., description="Scheduled execution time")
priority: int = Field(..., description="Execution priority")
status: str = Field(..., description="Schedule status")
created_at: str = Field(..., description="When the schedule was created")
class WorkflowScheduleResponse(BaseModel):
"""Response model for workflow scheduling"""
success: bool = Field(..., description="Whether scheduling was successful")
schedule_id: str = Field(..., description="Unique identifier for this schedule")
message: str = Field(..., description="Success or error message")
schedule_data: ScheduleData = Field(..., description="Details of the created schedule")
class RestoreStateRequest(BaseModel):
"""Request model for restoring a cognitive state"""
restore_mode: str = Field(
default="full",
regex="^(full|partial|snapshot)$",
description="Type of restoration to perform",
example="full"
)
class RestoreData(BaseModel):
"""Restoration data"""
state_id: str = Field(..., description="ID of the state being restored")
restore_mode: str = Field(..., description="Restoration mode used")
restored_at: str = Field(..., description="When the restoration occurred")
original_timestamp: Optional[float] = Field(None, description="Original state timestamp")
class RestoreStateResponse(BaseModel):
"""Response model for state restoration"""
success: bool = Field(..., description="Whether restoration was successful")
message: str = Field(..., description="Success or error message")
restore_data: RestoreData = Field(..., description="Details of the restoration")
# ---------- Health Check Models ----------
class HealthResponse(BaseModel):
"""Health check response"""
status: str = Field(..., description="Health status indicator", example="ok")
version: str = Field(..., description="Server version string", example="0.1.0")
# ---------- Performance Trends Models ----------
class PerformancePattern(BaseModel):
"""Detected performance pattern"""
type: str = Field(..., description="Type of pattern detected")
description: str = Field(..., description="Description of the pattern")
impact: str = Field(..., description="Impact level (high/medium/low)")
recommendation: str = Field(..., description="Recommended action")
date: Optional[str] = Field(None, description="Date of occurrence for anomalies")
```