This is page 16 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/examples/unified_memory_system_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
import asyncio
import sys
import time
import traceback
from pathlib import Path
from typing import Any, Dict, Optional
def _fmt_id(val: Any, length: int = 8) -> str:
"""Return a short id string safe for logs."""
s = str(val) if val is not None else "?"
# Ensure slicing doesn't go out of bounds if string is shorter than length
return s[: min(length, len(s))]
# --- Project Setup ---
# Add project root to path for imports when running as script
# Adjust this path if your script location relative to the project root differs
try:
SCRIPT_DIR = Path(__file__).resolve().parent
# Navigate up until we find a directory likely containing the project modules
PROJECT_ROOT = SCRIPT_DIR
while (
not (PROJECT_ROOT / "ultimate_mcp_server").is_dir()
and not (PROJECT_ROOT / "ultimate_mcp_client").is_dir()
and PROJECT_ROOT.parent != PROJECT_ROOT
): # Prevent infinite loop
PROJECT_ROOT = PROJECT_ROOT.parent
if (
not (PROJECT_ROOT / "ultimate_mcp_server").is_dir()
and not (PROJECT_ROOT / "ultimate_mcp_client").is_dir()
):
print(
f"Error: Could not reliably determine project root from {SCRIPT_DIR}.", file=sys.stderr
)
# Fallback: Add script dir anyway, maybe it's flat structure
if str(SCRIPT_DIR) not in sys.path:
sys.path.insert(0, str(SCRIPT_DIR))
print(
f"Warning: Added script directory {SCRIPT_DIR} to path as fallback.",
file=sys.stderr,
)
else:
sys.exit(1) # Give up if markers not found after traversing up
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
except Exception as e:
print(f"Error setting up sys.path: {e}", file=sys.stderr)
sys.exit(1)
from rich.console import Console # noqa: E402
from rich.markup import escape # noqa: E402
from rich.panel import Panel # noqa: E402
from rich.pretty import pretty_repr # noqa: E402
from rich.rule import Rule # noqa: E402
from rich.traceback import install as install_rich_traceback # noqa: E402
from ultimate_mcp_server.config import get_config # noqa: E402
# Tools and related components from unified_memory
from ultimate_mcp_server.tools.unified_memory_system import ( # noqa: E402
ActionStatus,
ActionType,
ArtifactType,
# Utilities/Enums/Exceptions needed
DBConnection,
LinkType,
MemoryLevel,
MemoryType,
ThoughtType,
ToolError,
ToolInputError,
# Action Dependency Tools
add_action_dependency,
auto_update_focus,
compute_memory_statistics,
consolidate_memories,
create_memory_link,
# Thought
create_thought_chain,
# Workflow
create_workflow,
delete_expired_memories,
focus_memory,
generate_reflection,
generate_workflow_report,
get_action_dependencies,
get_action_details,
get_artifact_by_id,
get_artifacts,
get_linked_memories,
get_memory_by_id,
get_recent_actions,
get_thought_chain,
get_workflow_context,
get_workflow_details,
# Working Memory / State
get_working_memory,
hybrid_search_memories,
# Initialization
initialize_memory_system,
list_workflows,
load_cognitive_state,
optimize_working_memory,
promote_memory_level,
query_memories,
record_action_completion,
# Action
record_action_start,
# Artifacts
record_artifact,
record_thought,
save_cognitive_state,
search_semantic_memories,
# Core Memory
store_memory,
summarize_text,
update_memory,
update_workflow_status,
visualize_memory_network,
visualize_reasoning_chain,
)
# Utilities from the project
from ultimate_mcp_server.utils import get_logger # noqa: E402
console = Console()
logger = get_logger("demo.unified_memory")
config = get_config() # Load config to ensure provider keys might be found
install_rich_traceback(show_locals=False, width=console.width)
DEMO_DB_FILE: Optional[str] = config.agent_memory.db_path # Global to hold the DB path being used
async def safe_tool_call(func, args: Dict, description: str, suppress_output: bool = False):
"""Helper to call a tool function, catch errors, and display results."""
display_title = not suppress_output
display_args = not suppress_output
display_result_panel = not suppress_output
if display_title:
title = f"DEMO: {description}"
console.print(Rule(f"[bold blue]{escape(title)}[/bold blue]", style="blue"))
if display_args:
if args:
console.print(f"[dim]Calling [bold cyan]{func.__name__}[/] with args:[/]")
try:
# Filter out db_path if it matches the global default for cleaner logs
args_to_print = {
k: v for k, v in args.items() if k != "db_path" or v != DEMO_DB_FILE
}
args_repr = pretty_repr(args_to_print, max_length=120, max_string=100)
except Exception:
args_repr = str(args)[:300]
console.print(args_repr)
else:
console.print(f"[dim]Calling [bold cyan]{func.__name__}[/] (no arguments)[/]")
start_time = time.monotonic()
result = None
try:
# Use the global DEMO_DB_FILE if db_path isn't explicitly in args
if "db_path" not in args and DEMO_DB_FILE:
args["db_path"] = DEMO_DB_FILE
result = await func(**args)
processing_time = time.monotonic() - start_time
log_func = getattr(logger, "debug", print)
log_func(f"Tool '{func.__name__}' execution time: {processing_time:.4f}s")
if display_result_panel:
success = isinstance(result, dict) and result.get("success", False)
panel_title = f"[bold {'green' if success else 'yellow'}]Result: {func.__name__} {'✅' if success else '❔'}[/]"
panel_border = "green" if success else "yellow"
try:
# Handle specific large/complex outputs
if func.__name__ == "generate_workflow_report" and result.get("report"):
report_preview = str(result["report"])[:500] + (
"..." if len(str(result["report"])) > 500 else ""
)
result_repr = f"Report Format: {result.get('format')}\nStyle: {result.get('style_used')}\nPreview:\n---\n{report_preview}\n---"
elif func.__name__ in [
"visualize_reasoning_chain",
"visualize_memory_network",
] and result.get("visualization"):
viz_preview = str(result["visualization"])[:500] + (
"..." if len(str(result["visualization"])) > 500 else ""
)
result_repr = f"Visualization Format: {result.get('format')}\nContent Preview:\n---\n{viz_preview}\n---"
elif func.__name__ == "summarize_text" and result.get("summary"):
summary_preview = str(result["summary"])[:500] + (
"..." if len(str(result["summary"])) > 500 else ""
)
result_repr = f"Summary Preview:\n---\n{summary_preview}\n---"
elif func.__name__ == "consolidate_memories" and result.get("consolidated_content"):
content_preview = str(result["consolidated_content"])[:500] + (
"..." if len(str(result["consolidated_content"])) > 500 else ""
)
result_repr = f"Consolidated Content Preview:\n---\n{content_preview}\n---"
elif func.__name__ == "generate_reflection" and result.get("content"):
content_preview = str(result["content"])[:500] + (
"..." if len(str(result["content"])) > 500 else ""
)
result_repr = f"Reflection Content Preview:\n---\n{content_preview}\n---"
else:
result_repr = pretty_repr(result, max_length=200, max_string=150)
except Exception:
result_repr = f"(Could not represent result of type {type(result)} fully)\n{str(result)[:500]}"
console.print(
Panel(
escape(result_repr), title=panel_title, border_style=panel_border, expand=False
)
)
return result
except (ToolInputError, ToolError) as e:
processing_time = time.monotonic() - start_time
log_func_error = getattr(logger, "error", print)
log_func_error(f"Tool '{func.__name__}' failed: {e}", exc_info=False)
if display_result_panel:
error_title = f"[bold red]Error: {func.__name__} Failed ❌[/]"
error_content = f"[bold red]{type(e).__name__}:[/] {escape(str(e))}"
details = None
if hasattr(e, "details") and e.details:
details = e.details
elif hasattr(e, "context") and e.context:
details = e.context
if details:
try:
details_repr = pretty_repr(details)
except Exception:
details_repr = str(details)
error_content += f"\n\n[yellow]Details:[/]\n{escape(details_repr)}"
console.print(Panel(error_content, title=error_title, border_style="red", expand=False))
return {
"success": False,
"error": str(e),
"error_code": getattr(e, "error_code", "TOOL_ERROR"),
"error_type": type(e).__name__,
"details": details or {},
"isError": True,
}
except Exception as e:
processing_time = time.monotonic() - start_time
log_func_critical = getattr(logger, "critical", print)
log_func_critical(f"Unexpected error calling '{func.__name__}': {e}", exc_info=True)
if display_result_panel:
console.print(f"\n[bold red]CRITICAL UNEXPECTED ERROR in {func.__name__}:[/bold red]")
console.print_exception(show_locals=False)
return {
"success": False,
"error": f"Unexpected: {str(e)}",
"error_code": "UNEXPECTED_ERROR",
"error_type": type(e).__name__,
"details": {"traceback": traceback.format_exc()},
"isError": True,
}
finally:
if display_title:
console.print()
# --- Demo Setup & Teardown ---
async def setup_demo_environment():
"""Initialize the memory system using the DEFAULT database file."""
global DEMO_DB_FILE
DEMO_DB_FILE = config.agent_memory.db_path
log_func_info = getattr(logger, "info", print)
log_func_info(f"Using default database for demo: {DEMO_DB_FILE}")
console.print(
Panel(
f"Using default database: [cyan]{DEMO_DB_FILE}[/]\n"
f"[yellow]NOTE:[/yellow] This demo will operate on the actual development database.",
title="Demo Setup",
border_style="yellow",
)
)
init_result = await safe_tool_call(
initialize_memory_system,
{"db_path": DEMO_DB_FILE},
"Initialize Memory System",
)
if not init_result or not init_result.get("success"):
console.print(
"[bold red]CRITICAL:[/bold red] Failed to initialize memory system. Aborting demo."
)
console.print(
"[yellow]Check DB access and potentially API key configuration/network if init requires them.[/yellow]"
)
await cleanup_demo_environment()
sys.exit(1)
async def cleanup_demo_environment():
"""Close DB connection."""
global DEMO_DB_FILE
log_func_info = getattr(logger, "info", print)
log_func_warn = getattr(logger, "warning", print)
try:
await DBConnection.close_connection()
log_func_info("Closed database connection.")
except Exception as e:
log_func_warn(f"Error closing DB connection during cleanup: {e}")
if DEMO_DB_FILE:
log_func_info(f"Demo finished using database: {DEMO_DB_FILE}")
console.print(f"Demo finished using database: [dim]{DEMO_DB_FILE}[/dim]")
DEMO_DB_FILE = None
# --- Individual Demo Sections ---
# (Keep existing sections 1-8 as they are)
async def demonstrate_basic_workflows():
"""Demonstrate basic workflow CRUD and listing operations."""
console.print(Rule("[bold green]1. Basic Workflow Operations[/bold green]", style="green"))
# Create
create_args = {
"title": "Enhanced WF Demo",
"goal": "Demonstrate core workflow, action, artifact, and memory linking.",
"tags": ["enhanced", "demo", "core"],
}
wf_result = await safe_tool_call(create_workflow, create_args, "Create Enhanced Workflow")
wf_id = wf_result.get("workflow_id") if wf_result.get("success") else None
if not wf_id:
console.print("[bold red]CRITICAL DEMO FAILURE:[/bold red] Failed to create workflow. Cannot continue basic workflow demo.")
return None # Return None to signal failure
# Get Details
await safe_tool_call(
get_workflow_details, {"workflow_id": wf_id}, f"Get Workflow Details ({_fmt_id(wf_id)})"
)
# List (should show the one we created)
await safe_tool_call(list_workflows, {"limit": 5}, "List Workflows (Limit 5)")
# List Filtered by Tag
await safe_tool_call(list_workflows, {"tag": "enhanced"}, "List Workflows Tagged 'enhanced'")
# Update Status (to active for subsequent steps)
await safe_tool_call(
update_workflow_status,
{"workflow_id": wf_id, "status": "active"},
f"Ensure Workflow Status is Active ({_fmt_id(wf_id)})",
)
return wf_id
async def demonstrate_basic_actions(wf_id: Optional[str]):
"""Demonstrate basic action recording, completion, and retrieval."""
console.print(Rule("[bold green]2. Basic Action Operations[/bold green]", style="green"))
if not wf_id:
console.print("[yellow]Skipping action demo: No valid workflow ID provided.[/yellow]")
return {} # Return empty dict
action_ids = {}
# Record Action 1 Start (e.g., Planning)
start_args_1 = {
"workflow_id": wf_id,
"action_type": ActionType.PLANNING.value,
"reasoning": "Initial planning phase for the enhanced demo.",
"title": "Plan Demo Steps",
"tags": ["planning"],
}
action_res_1 = await safe_tool_call(
record_action_start, start_args_1, "Record Action 1 Start (Planning)"
)
action_id_1 = action_res_1.get("action_id") if action_res_1 and action_res_1.get("success") else None # More robust check
if not action_id_1:
console.print("[bold red]CRITICAL DEMO FAILURE:[/bold red] Failed to record start for Action 1. Aborting action demo.")
return {} # Return empty dict
action_ids["action1_id"] = action_id_1
# Record Action 1 Completion (Needs action_id_1, which is now checked)
complete_args_1 = {
"action_id": action_id_1,
"status": ActionStatus.COMPLETED.value,
"summary": "Planning complete. Next step: data simulation.",
}
await safe_tool_call(
record_action_completion,
complete_args_1,
f"Record Action 1 Completion ({_fmt_id(action_id_1)})",
)
# Record Action 2 Start (e.g., Tool Use - simulated)
start_args_2 = {
"workflow_id": wf_id,
"action_type": ActionType.TOOL_USE.value,
"reasoning": "Simulating data generation based on the plan.",
"tool_name": "simulate_data",
"tool_args": {"rows": 100, "type": "random"},
"title": "Simulate Demo Data",
"tags": ["data", "simulation"],
"parent_action_id": action_id_1, # Link to planning action
}
action_res_2 = await safe_tool_call(
record_action_start, start_args_2, "Record Action 2 Start (Simulate Data)"
)
action_id_2 = action_res_2.get("action_id") if action_res_2 and action_res_2.get("success") else None # More robust check
if action_id_2:
action_ids["action2_id"] = action_id_2
# Moved inside the 'if action_id_2:' block:
await safe_tool_call(
get_action_details,
{"action_ids": [action_id_1, action_id_2]}, # Both IDs are valid here
"Get Action Details (Multiple Actions)",
)
complete_args_2 = {
"action_id": action_id_2, # Now guaranteed to be non-None
"status": ActionStatus.FAILED.value,
"summary": "Simulation failed due to resource limit.",
"tool_result": {"error": "Timeout", "code": 504},
}
await safe_tool_call(
record_action_completion,
complete_args_2,
f"Record Action 2 Completion (Failed - {_fmt_id(action_id_2)})",
)
else:
# Keep the existing else block for handling Action 2 start failure
console.print("[bold red]Action 2 failed to start. Skipping completion and dependency tests involving Action 2.[/bold red]")
# Ensure action_id_2 is not added to the dict if it's None
if "action2_id" in action_ids:
del action_ids["action2_id"]
# Potentially skip dependency demo if action2_id is needed? (The demo logic does skip if action2_id is missing)
# Get Action Details (Only Action 1 if Action 2 failed) - Moved outside check block
if action_id_1 and not action_id_2: # Only fetch Action 1 if Action 2 failed
await safe_tool_call(
get_action_details,
{"action_id": action_id_1},
f"Get Action Details (Action 1 Only - {_fmt_id(action_id_1)})",
)
# Get Recent Actions (should show both)
await safe_tool_call(
get_recent_actions, {"workflow_id": wf_id, "limit": 5}, "Get Recent Actions"
)
return action_ids
async def demonstrate_action_dependencies(wf_id: Optional[str], action_ids: Dict):
"""Demonstrate adding and retrieving action dependencies."""
console.print(Rule("[bold green]3. Action Dependency Operations[/bold green]", style="green"))
if not wf_id:
console.print("[yellow]Skipping dependency demo: No valid workflow ID.[/yellow]")
return
action1_id = action_ids.get("action1_id")
action2_id = action_ids.get("action2_id")
if not action1_id or not action2_id:
console.print("[yellow]Skipping dependency demo: Need at least two valid action IDs.[/yellow]")
return
# Add Dependency (Action 2 requires Action 1)
await safe_tool_call(
add_action_dependency,
{
"source_action_id": action2_id,
"target_action_id": action1_id,
"dependency_type": "requires",
},
f"Add Dependency ({_fmt_id(action2_id)} requires {_fmt_id(action1_id)})",
)
# Get Dependencies for Action 1 (Should show Action 2 depends on it - Downstream)
await safe_tool_call(
get_action_dependencies,
{"action_id": action1_id, "direction": "downstream"},
f"Get Dependencies (Downstream of Action 1 - {_fmt_id(action1_id)})",
)
# Get Dependencies for Action 2 (Should show it depends on Action 1 - Upstream)
await safe_tool_call(
get_action_dependencies,
{"action_id": action2_id, "direction": "upstream", "include_details": True},
f"Get Dependencies (Upstream of Action 2 - {_fmt_id(action2_id)}, with Details)",
)
# Get Action 1 Details (Include Dependencies)
await safe_tool_call(
get_action_details,
{"action_id": action1_id, "include_dependencies": True},
f"Get Action 1 Details ({_fmt_id(action1_id)}), Include Dependencies",
)
async def demonstrate_artifacts(wf_id: Optional[str], action_ids: Dict):
"""Demonstrate artifact recording and retrieval."""
console.print(Rule("[bold green]4. Artifact Operations[/bold green]", style="green"))
if not wf_id:
console.print("[yellow]Skipping artifact demo: No valid workflow ID provided.[/yellow]")
return {} # Return empty dict
action1_id = action_ids.get("action1_id")
action2_id = action_ids.get("action2_id") # May be None if Action 2 failed
artifact_ids = {}
# Record Artifact 1 (e.g., Plan document from Action 1)
artifact_args_1 = {
"workflow_id": wf_id,
"action_id": action1_id,
"name": "demo_plan.txt",
"artifact_type": ArtifactType.FILE.value, # Use enum value
"description": "Initial plan for the demo steps.",
"path": "/path/to/demo_plan.txt",
"content": "Step 1: Plan\nStep 2: Simulate\nStep 3: Analyze", # Small content example
"tags": ["planning", "document"],
"is_output": False,
}
art_res_1 = await safe_tool_call(
record_artifact, artifact_args_1, "Record Artifact 1 (Plan Doc)"
)
art_id_1 = art_res_1.get("artifact_id") if art_res_1 and art_res_1.get("success") else None # Robust check
if not art_id_1:
console.print("[bold red]DEMO WARNING:[/bold red] Failed to record Artifact 1. Subsequent steps needing art1_id might fail.")
# Don't abort, but warn
else:
artifact_ids["art1_id"] = art_id_1
# Record Artifact 2 (e.g., Error log from Action 2)
artifact_args_2 = {
"workflow_id": wf_id,
"action_id": action2_id,
"name": "simulation_error.log",
"artifact_type": ArtifactType.TEXT.value,
"description": "Error log from the failed data simulation.",
"content": "ERROR: Timeout waiting for resource. Code 504.",
"tags": ["error", "log", "simulation"],
}
art_res_2 = await safe_tool_call(
record_artifact, artifact_args_2, "Record Artifact 2 (Error Log)"
)
art_id_2 = art_res_2.get("artifact_id") if art_res_2.get("success") else None
# --- ADD CHECK before recording Artifact 2 ---
if not action2_id:
console.print("[yellow]Skipping Artifact 2 recording: Action 2 ID is not available (likely failed to start).[/yellow]")
else:
# Proceed with recording Artifact 2 only if action2_id exists
artifact_args_2["action_id"] = action2_id # Assign the valid ID
art_res_2 = await safe_tool_call(
record_artifact, artifact_args_2, "Record Artifact 2 (Error Log)"
)
art_id_2 = art_res_2.get("artifact_id") if art_res_2 and art_res_2.get("success") else None
if art_id_2:
artifact_ids["art2_id"] = art_id_2
else:
console.print("[bold red]DEMO WARNING:[/bold red] Failed to record Artifact 2.")
# Get Artifacts (List all for workflow)
await safe_tool_call(
get_artifacts, {"workflow_id": wf_id, "limit": 5}, "Get Artifacts (List for Workflow)"
)
# Get Artifacts (Filter by tag 'planning')
await safe_tool_call(
get_artifacts,
{"workflow_id": wf_id, "tag": "planning"},
"Get Artifacts (Filter by Tag 'planning')",
)
# Get Artifact by ID (Get the plan doc)
if art_id_1:
await safe_tool_call(
get_artifact_by_id,
{"artifact_id": art_id_1, "include_content": True},
f"Get Artifact by ID ({_fmt_id(art_id_1)}, Include Content)",
)
else:
console.print("[yellow]Skipping 'Get Artifact by ID' for Artifact 1 as it failed to record.[/yellow]")
return artifact_ids
async def demonstrate_thoughts_and_linking(
wf_id: Optional[str], action_ids: Dict, artifact_ids: Dict
):
"""Demonstrate thought chains, recording thoughts, and linking them."""
console.print(Rule("[bold green]5. Thought Operations & Linking[/bold green]", style="green"))
if not wf_id:
console.print("[yellow]Skipping thought demo: No valid workflow ID provided.[/yellow]")
return None
action1_id = action_ids.get("action1_id") # noqa: F841
action2_id = action_ids.get("action2_id") # Might be None
art1_id = artifact_ids.get("art1_id") # Might be None if artifact demo failed
# Check if prerequisite artifact exists before linking
if not art1_id:
console.print("[yellow]Skipping thought demo: Planning artifact ID (art1_id) not available.[/yellow]")
return None
# Create a new thought chain
chain_args = {
"workflow_id": wf_id,
"title": "Analysis Thought Chain",
"initial_thought": "Review the plan artifact.",
"initial_thought_type": ThoughtType.PLAN.value,
}
chain_res = await safe_tool_call(
create_thought_chain, chain_args, "Create New Thought Chain (Analysis)"
)
chain_id = chain_res.get("thought_chain_id") if chain_res and chain_res.get("success") else None # Robust check
if not chain_id:
console.print("[bold red]CRITICAL DEMO FAILURE:[/bold red] Failed to create thought chain. Aborting thought demo.")
return None
# Record a thought linked to the plan artifact
thought_args_1 = {
"workflow_id": wf_id,
"thought_chain_id": chain_id,
"content": "The plan seems straightforward but lacks detail on simulation parameters.",
"thought_type": ThoughtType.CRITIQUE.value,
"relevant_artifact_id": art1_id, # Link to the plan artifact
}
thought_res_1 = await safe_tool_call(
record_thought, thought_args_1, "Record Thought (Critique Linked to Artifact)"
)
thought1_id = thought_res_1.get("thought_id") if thought_res_1.get("success") else None
if not thought1_id:
console.print("[bold red]DEMO WARNING:[/bold red] Failed to record thought 1. Subsequent linked thought might fail or be unlinked.")
# Record second thought (depends on action2_id, thought1_id)
if not action2_id:
console.print("[yellow]Skipping recording thought 2: Action 2 ID is missing.[/yellow]")
elif not thought1_id:
console.print("[yellow]Skipping recording thought 2: Thought 1 ID is missing.[/yellow]")
# Record thought 2 without parent link if action2_id exists but thought1_id doesn't?
thought_args_2_no_parent = {
"workflow_id": wf_id,
"thought_chain_id": chain_id,
"content": "The simulation failure needs investigation. Was it transient or configuration?",
"thought_type": ThoughtType.QUESTION.value,
"relevant_action_id": action2_id, # Action 2 ID exists here
}
await safe_tool_call(
record_thought, thought_args_2_no_parent, "Record Thought (Question Linked to Action, NO PARENT)"
)
else:
# Record another thought linked to the failed action
thought_args_2 = {
"workflow_id": wf_id,
"thought_chain_id": chain_id,
"content": "The simulation failure needs investigation. Was it transient or configuration?",
"thought_type": ThoughtType.QUESTION.value,
"relevant_action_id": action_ids.get("action2_id"), # Link to failed action
"parent_thought_id": thought1_id, # Link to previous thought
}
await safe_tool_call(
record_thought, thought_args_2, "Record Thought (Question Linked to Action)"
)
# Get the thought chain details (should show linked thoughts)
await safe_tool_call(
get_thought_chain,
{"thought_chain_id": chain_id},
f"Get Analysis Thought Chain Details ({_fmt_id(chain_id)})",
)
return chain_id
async def demonstrate_memory_operations(wf_id: Optional[str], action_ids: Dict, thought_ids: Dict):
"""Demonstrate memory storage, querying, linking."""
console.print(Rule("[bold green]6. Memory Operations & Querying[/bold green]", style="green"))
if not wf_id:
console.print("[yellow]Skipping memory demo: No valid workflow ID provided.[/yellow]")
return {} # Return empty dict
mem_ids = {}
action1_id = action_ids.get("action1_id") # Might be None # noqa: F841
action2_id = action_ids.get("action2_id") # Might be None # noqa: F841
# Store Memory 1 (Related to Planning Action)
store_args_1 = {
"workflow_id": wf_id,
"action_id": action_ids.get("action1_id"),
"content": "The initial plan involves simulation and analysis.",
"memory_type": MemoryType.SUMMARY.value,
"memory_level": MemoryLevel.EPISODIC.value,
"description": "Summary of initial plan",
"tags": ["planning", "summary"],
"generate_embedding": False, # Set False explicitly for baseline
}
mem_res_1 = await safe_tool_call(store_memory, store_args_1, "Store Memory 1 (Plan Summary)")
mem1_id = mem_res_1.get("memory_id") if mem_res_1.get("success") else None
if mem1_id:
mem_ids["mem1_id"] = mem1_id
# Store Memory 2 (Related to Failed Action)
store_args_2 = {
"workflow_id": wf_id,
"action_id": action_ids.get("action2_id"),
"content": "Data simulation failed with a timeout error (Code 504).",
"memory_type": MemoryType.OBSERVATION.value,
"memory_level": MemoryLevel.EPISODIC.value,
"description": "Simulation failure detail",
"importance": 7.0, # Failed actions might be important
"tags": ["error", "simulation", "observation"],
"generate_embedding": False,
}
mem_res_2 = await safe_tool_call(
store_memory, store_args_2, "Store Memory 2 (Simulation Error)"
)
mem2_id = mem_res_2.get("memory_id") if mem_res_2.get("success") else None
if mem2_id:
mem_ids["mem2_id"] = mem2_id
# Store Memory 3 (A more general fact)
store_args_3 = {
"workflow_id": wf_id,
"content": "Timeout errors often indicate resource contention or configuration issues.",
"memory_type": MemoryType.FACT.value,
"memory_level": MemoryLevel.SEMANTIC.value,
"description": "General knowledge about timeouts",
"importance": 6.0,
"confidence": 0.9,
"tags": ["error", "knowledge", "fact"],
"generate_embedding": False,
}
mem_res_3 = await safe_tool_call(store_memory, store_args_3, "Store Memory 3 (Timeout Fact)")
mem3_id = mem_res_3.get("memory_id") if mem_res_3.get("success") else None
if mem3_id:
mem_ids["mem3_id"] = mem3_id
# Link Memory 2 (Error) -> Memory 3 (Fact)
if mem2_id and mem3_id:
await safe_tool_call(
create_memory_link,
{
"source_memory_id": mem2_id,
"target_memory_id": mem3_id,
"link_type": LinkType.REFERENCES.value,
"description": "Error relates to general timeout knowledge",
},
f"Link Error ({_fmt_id(mem2_id)}) to Fact ({_fmt_id(mem3_id)})",
)
# Get Linked Memories for the Error Memory
await safe_tool_call(
get_linked_memories,
{"memory_id": mem2_id, "direction": "outgoing", "include_memory_details": True},
f"Get Outgoing Linked Memories for Error ({_fmt_id(mem2_id)})",
)
# Query Memories using FTS
await safe_tool_call(
query_memories,
{"workflow_id": wf_id, "search_text": "simulation error timeout"},
"Query Memories (FTS: 'simulation error timeout')",
)
# Query Memories by Importance Range
await safe_tool_call(
query_memories,
{"workflow_id": wf_id, "min_importance": 6.5, "sort_by": "importance"},
"Query Memories (Importance >= 6.5)",
)
# Query Memories by Memory Type
await safe_tool_call(
query_memories,
{"workflow_id": wf_id, "memory_type": MemoryType.FACT.value},
"Query Memories (Type: Fact)",
)
# Update Memory 1's tags
if mem1_id:
await safe_tool_call(
update_memory,
{"memory_id": mem1_id, "tags": ["planning", "summary", "initial_phase"]},
f"Update Memory 1 Tags ({_fmt_id(mem1_id)})",
)
# Verify update
await safe_tool_call(
get_memory_by_id,
{"memory_id": mem1_id},
f"Get Memory 1 After Tag Update ({_fmt_id(mem1_id)})",
)
# Example: Record a thought linked to a memory
if mem3_id and thought_ids: # Assuming demonstrate_thoughts ran successfully
thought_chain_id_str = thought_ids.get("main_chain_id")
if not thought_chain_id_str:
console.print(
"[yellow]Skipping thought link to memory: main_chain_id not found in thought_ids dict.[/yellow]"
)
else:
thought_args_link = {
"workflow_id": wf_id,
"thought_chain_id": thought_chain_id_str, # Pass the string ID
"content": "Based on the general knowledge about timeouts, need to check server logs.",
"thought_type": ThoughtType.PLAN.value,
"relevant_memory_id": mem3_id, # Link to the Fact memory
}
await safe_tool_call(
record_thought,
thought_args_link,
f"Record Thought Linked to Memory ({_fmt_id(mem3_id)})",
)
elif not thought_ids:
console.print(
"[yellow]Skipping thought link to memory: thought_ids dict is empty or None.[/yellow]"
)
return mem_ids
async def demonstrate_embedding_and_search(wf_id: Optional[str], mem_ids: Dict, thought_ids: Dict):
"""Demonstrate embedding generation and semantic/hybrid search."""
console.print(Rule("[bold green]7. Embedding & Semantic Search[/bold green]", style="green"))
if not wf_id:
console.print("[yellow]Skipping embedding demo: No valid workflow ID.[/yellow]")
return # Return immediately if no workflow ID
mem1_id = mem_ids.get("mem1_id") # Plan summary
mem2_id = mem_ids.get("mem2_id") # Simulation error
mem3_id = mem_ids.get("mem3_id") # Timeout fact
if not mem1_id or not mem2_id or not mem3_id:
console.print(
"[yellow]Skipping embedding demo: Missing required memory IDs from previous steps.[/yellow]"
)
return # Return immediately if prerequisite memories are missing
# 1. Update Memory 2 (Error) to generate embedding
# This relies on the embedding service being functional (API key configured)
console.print(
"[yellow]Attempting to generate embeddings. Requires configured Embedding Service (e.g., OpenAI API key).[/yellow]"
)
update_res = await safe_tool_call(
update_memory,
{
"memory_id": mem2_id,
"regenerate_embedding": True,
},
f"Update Memory 2 ({_fmt_id(mem2_id)}) to Generate Embedding",
)
if not (update_res and update_res.get("success") and update_res.get("embedding_regenerated")):
console.print(
"[red] -> Failed to generate embedding for Memory 2. Semantic/Hybrid search may not work as expected.[/red]"
)
# 2. Store a new memory WITH embedding generation enabled
store_args_4 = {
"workflow_id": wf_id,
"content": "Investigating the root cause of the simulation timeout is the next priority.",
"memory_type": MemoryType.PLAN.value,
"memory_level": MemoryLevel.EPISODIC.value,
"description": "Next step planning",
"importance": 7.5,
"tags": ["investigation", "planning", "error_handling"],
"generate_embedding": True, # Explicitly enable
}
mem_res_4 = await safe_tool_call(
store_memory, store_args_4, "Store Memory 4 (Next Step Plan) with Embedding"
)
mem4_id = mem_res_4.get("memory_id") if mem_res_4.get("success") else None
if mem4_id:
mem_ids["mem4_id"] = mem4_id # Add to our tracked IDs
# Check if embedding was actually generated for Mem4
if mem4_id:
mem4_details = await safe_tool_call(
get_memory_by_id,
{"memory_id": mem4_id},
f"Check Memory 4 Details ({_fmt_id(mem4_id)})",
suppress_output=True,
)
if mem4_details and mem4_details.get("success") and mem4_details.get("embedding_id"):
console.print(
f"[green] -> Embedding ID confirmed for Memory 4: {_fmt_id(mem4_details['embedding_id'])}[/green]"
)
else:
console.print(
"[yellow] -> Warning: Embedding ID missing for Memory 4. Embedding generation likely failed.[/yellow]"
)
console.print("[dim] (Semantic/Hybrid search results may be limited.)[/dim]")
# 3. Semantic Search
await safe_tool_call(
search_semantic_memories,
{
"workflow_id": wf_id,
"query": "problems with simulation performance",
"limit": 3,
"threshold": 0.5,
},
"Semantic Search: 'problems with simulation performance'",
)
await safe_tool_call(
search_semantic_memories,
{
"workflow_id": wf_id,
"query": "next actions to take",
"limit": 2,
"memory_level": MemoryLevel.EPISODIC.value,
},
"Semantic Search: 'next actions to take' (Episodic only)",
)
# 4. Hybrid Search
await safe_tool_call(
hybrid_search_memories,
{
"workflow_id": wf_id,
"query": "investigate timeout simulation",
"limit": 4,
"semantic_weight": 0.6,
"keyword_weight": 0.4,
"tags": ["error"],
"include_content": False,
},
"Hybrid Search: 'investigate timeout simulation' + tag 'error'",
)
# 5. Demonstrate link suggestions
# Update Mem3 (Timeout fact) to generate embedding
update_res_3 = await safe_tool_call(
update_memory,
{"memory_id": mem3_id, "regenerate_embedding": True},
f"Update Memory 3 ({_fmt_id(mem3_id)}) to Generate Embedding",
)
if not (
update_res_3 and update_res_3.get("success") and update_res_3.get("embedding_regenerated")
):
console.print(
"[red] -> Failed to generate embedding for Memory 3. Link suggestion test might fail.[/red]"
)
# --- Store Memory 5 (Hypothesis) ---
hypothesis_content = "Resource limits on the simulation server might be too low."
thought_chain_id = thought_ids.get("main_chain_id") if isinstance(thought_ids, dict) else None
hypothesis_thought_id = None
if thought_chain_id:
thought_args_hyp = {
"workflow_id": wf_id,
"thought_chain_id": thought_chain_id,
"content": hypothesis_content,
"thought_type": ThoughtType.HYPOTHESIS.value,
"relevant_memory_id": mem3_id,
}
hyp_thought_res = await safe_tool_call(
record_thought, thought_args_hyp, "Record Hypothesis Thought"
)
hypothesis_thought_id = (
hyp_thought_res.get("thought_id") if hyp_thought_res.get("success") else None
)
else:
console.print(
"[yellow]Skipping hypothesis memory storage: Could not get thought chain ID.[/yellow]"
)
mem5_id = None
mem_res_5 = None
if hypothesis_thought_id:
store_args_5 = {
"workflow_id": wf_id,
"thought_id": hypothesis_thought_id,
"content": hypothesis_content,
"memory_type": MemoryType.REASONING_STEP.value,
"memory_level": MemoryLevel.SEMANTIC.value,
"description": "Hypothesis on timeout cause",
"importance": 6.5,
"confidence": 0.6,
"tags": ["hypothesis", "resource", "error", "reasoning_step"],
"generate_embedding": True,
"suggest_links": True, # Explicitly ask for suggestions
"max_suggested_links": 2,
}
mem_res_5 = await safe_tool_call(
store_memory, store_args_5, "Store Memory 5 (Hypothesis Reasoning) - Suggest Links"
)
mem5_id = mem_res_5.get("memory_id") if mem_res_5.get("success") else None
if mem5_id:
mem_ids["mem5_id"] = mem5_id
# Check suggestions result
if mem_res_5 and mem_res_5.get("success") and mem_res_5.get("suggested_links"):
console.print("[cyan] -> Link suggestions received for Memory 5:[/]")
console.print(pretty_repr(mem_res_5["suggested_links"]))
elif mem_res_5 and mem_res_5.get("success"):
console.print(
"[dim] -> No link suggestions returned for Memory 5 (or embedding failed).[/dim]"
)
elif mem_res_5 and not mem_res_5.get("success"):
console.print(
"[yellow] -> Failed to store Memory 5, cannot check suggestions.[/yellow]"
)
else:
console.print(
"[yellow]Skipping Memory 5 storage: Hypothesis thought recording failed.[/yellow]"
)
async def demonstrate_state_and_working_memory(
wf_id: str,
mem_ids_dict: Dict[str, str],
action_ids_dict: Dict[str, str],
thought_ids_dict: Dict[str, Any],
state_ids_dict: Dict[str, str],
):
"""Demonstrate saving/loading state and working memory operations."""
console.print(
Rule("[bold green]8. Cognitive State & Working Memory[/bold green]", style="green")
)
# --- Retrieve necessary IDs from previous steps ---
main_wf_id = wf_id
main_chain_id = thought_ids_dict.get("main_chain_id") # noqa: F841
plan_action_id = action_ids_dict.get("action1_id")
sim_action_id = action_ids_dict.get("action2_id")
mem1_id = mem_ids_dict.get("mem1_id")
mem2_id = mem_ids_dict.get("mem2_id")
mem3_id = mem_ids_dict.get("mem3_id")
mem4_id = mem_ids_dict.get("mem4_id")
mem5_id = mem_ids_dict.get("mem5_id")
hypothesis_thought_id = None
if mem5_id and main_wf_id:
mem5_details = await safe_tool_call(
get_memory_by_id,
{"memory_id": mem5_id},
f"Get Memory 5 Details ({_fmt_id(mem5_id)}) for Thought ID",
suppress_output=True,
)
if mem5_details and mem5_details.get("success"):
hypothesis_thought_id = mem5_details.get("thought_id")
if hypothesis_thought_id:
console.print(
f"[cyan] -> Retrieved Hypothesis Thought ID: {_fmt_id(hypothesis_thought_id)}[/cyan]"
)
else:
console.print(
"[yellow] -> Could not retrieve hypothesis thought ID from Memory 5 details.[/yellow]"
)
else:
# Handle case where get_memory_by_id failed or didn't return success
console.print(
f"[yellow] -> Failed to get details for Memory 5 ({_fmt_id(mem5_id)}) to find Thought ID.[/yellow]"
)
# --- Check if we have enough *critical* data to proceed ---
# Hypothesis thought ID is critical for saving the intended state goals
if not (
main_wf_id
and mem1_id
and mem2_id
and mem3_id
and mem4_id
and plan_action_id
and hypothesis_thought_id # Ensure this critical ID exists
):
console.print(
"[bold yellow]Skipping state/working memory demo:[/bold yellow] Missing critical IDs (workflow, mem1-4, plan_action, hypothesis_thought) from previous steps."
)
return # Exit if critical IDs are missing
# Prepare IDs for saving state - check individually for non-critical ones
working_mems = [mem_id for mem_id in [mem2_id, mem3_id, mem4_id, mem5_id] if mem_id] # Filter None
focus_mems = [mem4_id] if mem4_id else [] # Filter None
context_actions = [action_id for action_id in [plan_action_id, sim_action_id] if action_id] # Filter None
goal_thoughts = [hypothesis_thought_id] # Already checked above
# 1. Save Cognitive State
save_args = {
"workflow_id": wf_id,
"title": "State after simulation failure and hypothesis",
"working_memory_ids": working_mems,
"focus_area_ids": focus_mems,
"context_action_ids": context_actions,
"current_goal_thought_ids": goal_thoughts,
}
state_res = await safe_tool_call(save_cognitive_state, save_args, "Save Cognitive State")
state_id = state_res.get("state_id") if state_res and state_res.get("success") else None # More robust check
if state_id:
state_ids_dict["saved_state_id"] = state_id
console.print(f"[green] -> Cognitive state saved successfully with ID: {_fmt_id(state_id)}[/green]")
else:
console.print("[bold red]CRITICAL DEMO FAILURE:[/bold red] Failed to save cognitive state. Cannot proceed with working memory tests.")
return # Exit if state saving failed
# 2. Load Cognitive State (by ID) - Use the confirmed state_id
await safe_tool_call(
load_cognitive_state,
{"workflow_id": wf_id, "state_id": state_id}, # Use state_id directly
f"Load Cognitive State ({_fmt_id(state_id)}) by ID",
)
# 3. Load Cognitive State (Latest)
await safe_tool_call(
load_cognitive_state,
{"workflow_id": wf_id},
"Load Latest Cognitive State",
)
# --- Working Memory Operations using the saved state_id as the context_id ---
# The variable 'state_id' now holds the context ID we need for the rest of this section.
console.print(f"\n[dim]Using saved state ID '{_fmt_id(state_id)}' as context_id for working memory tests...[/dim]\n")
# 4. Focus Memory (Focus on the 'hypothesis' memory if it exists)
focus_target_id = mem_ids_dict.get("mem5_id") # Get mem5_id again here
if focus_target_id:
await safe_tool_call(
focus_memory,
{
"memory_id": focus_target_id,
"context_id": state_id, # USE state_id
"add_to_working": False, # Assume it's already there from save_state
},
f"Focus Memory ({_fmt_id(focus_target_id)}) in Context ({_fmt_id(state_id)})", # USE state_id
)
else:
console.print(
"[bold yellow]Skipping focus memory test: Hypothesis memory ID (mem5_id) not available.[/bold yellow]"
)
# 5. Get Working Memory (Should reflect the saved state initially)
await safe_tool_call(
get_working_memory,
{
"context_id": state_id, # USE state_id
"include_links": False, # Keep output cleaner for this demo step
},
f"Get Working Memory for Context ({_fmt_id(state_id)})", # USE state_id
)
# 6. Optimize Working Memory (Reduce size, using 'balanced' strategy)
wm_details = await safe_tool_call(
get_working_memory,
{"context_id": state_id}, # USE state_id
"Get WM Size Before Optimization",
suppress_output=True,
)
current_wm_size = (
len(wm_details.get("working_memories", []))
if wm_details and wm_details.get("success")
else 0
)
if current_wm_size > 2: # Only optimize if we have more than 2 memories
target_optimize_size = max(1, current_wm_size // 2)
console.print(
f"[cyan] -> Optimizing working memory from {current_wm_size} down to {target_optimize_size}...[/cyan]"
)
await safe_tool_call(
optimize_working_memory,
{
"context_id": state_id, # USE state_id
"target_size": target_optimize_size,
"strategy": "balanced",
},
f"Optimize Working Memory (Context: {_fmt_id(state_id)}, Target: {target_optimize_size})", # USE state_id
)
await safe_tool_call(
get_working_memory,
{"context_id": state_id, "include_links": False}, # USE state_id
f"Get Working Memory After Optimization (Context: {_fmt_id(state_id)})", # USE state_id
)
else:
console.print(
f"[dim]Skipping working memory optimization: Current size ({current_wm_size}) is too small.[/dim]"
)
async def demonstrate_metacognition(wf_id: str, mem_ids: Dict, state_ids: Dict):
"""Demonstrate context retrieval, auto-focus, promotion, consolidation, reflection, summarization."""
console.print(Rule("[bold green]9. Meta-Cognition & Summarization[/bold green]", style="green"))
# 1. Get Workflow Context
await safe_tool_call(get_workflow_context, {"workflow_id": wf_id}, "Get Full Workflow Context")
# 2. Auto Update Focus
context_id = state_ids.get("saved_state_id")
if context_id:
await safe_tool_call(
auto_update_focus,
{"context_id": context_id},
f"Auto Update Focus for Context ({_fmt_id(context_id)})",
)
else:
console.print("[yellow]Skipping auto-focus: No context_id (state_id) available.[/yellow]")
# 3. Promote Memory Level
mem1_id = mem_ids.get("mem1_id") # Episodic summary
mem3_id = mem_ids.get("mem3_id") # Semantic fact
if mem1_id:
console.print(
f"[cyan] -> Manually increasing access_count for Memory 1 ({_fmt_id(mem1_id)}) to test promotion...[/cyan]"
)
try:
async with DBConnection(DEMO_DB_FILE) as conn:
await conn.execute(
"UPDATE memories SET access_count = 10, confidence = 0.9 WHERE memory_id = ?",
(mem1_id,),
)
await conn.commit()
await safe_tool_call(
promote_memory_level,
{"memory_id": mem1_id},
f"Attempt Promote Memory 1 ({_fmt_id(mem1_id)}) from Episodic",
)
except Exception as e:
console.print(f"[red] -> Error updating access count for promotion test: {e}[/red]")
if mem3_id:
await safe_tool_call(
promote_memory_level,
{"memory_id": mem3_id},
f"Attempt Promote Memory 3 ({_fmt_id(mem3_id)}) from Semantic (Should Fail)",
)
# 4. Consolidate Memories (requires LLM)
mem_ids_for_consolidation = [
mid
for mid in [mem_ids.get("mem1_id"), mem_ids.get("mem2_id"), mem_ids.get("mem3_id")]
if mid
]
if len(mem_ids_for_consolidation) >= 2:
console.print(
"[yellow]Attempting memory consolidation. Requires configured LLM provider (e.g., OpenAI API key).[/yellow]"
)
await safe_tool_call(
consolidate_memories,
{
"workflow_id": wf_id,
"target_memories": mem_ids_for_consolidation,
"consolidation_type": "summary",
"store_result": True,
"provider": config.default_provider or "openai",
},
"Consolidate Memories (Summary)",
)
else:
console.print(
"[yellow]Skipping consolidation: Not enough source memories available.[/yellow]"
)
# 5. Generate Reflection (requires LLM)
console.print(
"[yellow]Attempting reflection generation. Requires configured LLM provider.[/yellow]"
)
await safe_tool_call(
generate_reflection,
{
"workflow_id": wf_id,
"reflection_type": "gaps",
"provider": config.default_provider
or "openai", # Use configured default from GatewayConfig
},
"Generate Reflection (Gaps)",
)
# 6. Summarize Text (requires LLM)
console.print(
"[yellow]Attempting text summarization. Requires configured LLM provider.[/yellow]"
)
sample_text = """
The Unified Memory System integrates several components for advanced agent cognition.
It tracks workflows, actions, artifacts, and thoughts. A multi-level memory hierarchy
(working, episodic, semantic, procedural) allows for different types of knowledge storage.
Vector embeddings enable semantic search capabilities. Associative links connect related
memory items. Cognitive states can be saved and loaded, preserving the agent's context.
Maintenance tools help manage memory expiration and provide statistics. Reporting and
visualization tools offer insights into the agent's processes. This system aims to provide
a robust foundation for complex autonomous agents.
"""
await safe_tool_call(
summarize_text,
{
"text_to_summarize": sample_text,
"target_tokens": 50,
"record_summary": True,
"workflow_id": wf_id,
"provider": config.default_provider or "openai",
},
"Summarize Sample Text and Record Memory",
)
async def demonstrate_maintenance_and_stats(wf_id: str):
"""Demonstrate memory deletion and statistics computation."""
console.print(Rule("[bold green]10. Maintenance & Statistics[/bold green]", style="green"))
# 1. Delete Expired Memories
# Store a temporary memory with a short TTL
console.print("[cyan] -> Storing a temporary memory with TTL=1 second...[/cyan]")
ttl_mem_args = {
"workflow_id": wf_id,
"content": "This memory should expire quickly.",
"memory_type": "observation",
"ttl": 1, # 1 second TTL
}
ttl_mem_res = await safe_tool_call(
store_memory, # Pass the function object
ttl_mem_args, # Pass the arguments dictionary
"Store Temporary Memory",
suppress_output=True,
)
if ttl_mem_res and ttl_mem_res.get("success"):
console.print("[cyan] -> Waiting 2 seconds for memory to expire...[/cyan]")
await asyncio.sleep(2)
await safe_tool_call(
delete_expired_memories, {}, "Delete Expired Memories (Should delete 1)"
)
else:
console.print(
"[yellow] -> Failed to store temporary memory for expiration test.[/yellow]"
)
if ttl_mem_res:
console.print(f"[yellow] Error: {ttl_mem_res.get('error')}[/yellow]")
# 2. Compute Statistics (Workflow Specific)
await safe_tool_call(
compute_memory_statistics,
{"workflow_id": wf_id},
f"Compute Statistics for Workflow ({_fmt_id(wf_id)})",
)
# 3. Compute Statistics (Global)
await safe_tool_call(compute_memory_statistics, {}, "Compute Global Statistics")
async def demonstrate_reporting_and_viz(wf_id: str, thought_chain_id: str, mem_ids: Dict):
"""Demonstrate report generation and visualization."""
console.print(Rule("[bold green]11. Reporting & Visualization[/bold green]", style="green"))
# 1. Generate Workflow Reports
await safe_tool_call(
generate_workflow_report,
{"workflow_id": wf_id, "report_format": "markdown", "style": "professional"},
"Generate Workflow Report (Markdown, Professional)",
)
await safe_tool_call(
generate_workflow_report,
{"workflow_id": wf_id, "report_format": "html", "style": "concise"},
"Generate Workflow Report (HTML, Concise)",
)
await safe_tool_call(
generate_workflow_report,
{"workflow_id": wf_id, "report_format": "json"},
"Generate Workflow Report (JSON)",
)
await safe_tool_call(
generate_workflow_report,
{"workflow_id": wf_id, "report_format": "mermaid"},
"Generate Workflow Report (Mermaid Diagram)",
)
# 2. Visualize Reasoning Chain
if thought_chain_id:
await safe_tool_call(
visualize_reasoning_chain,
{"thought_chain_id": thought_chain_id},
f"Visualize Reasoning Chain ({_fmt_id(thought_chain_id)})",
)
else:
console.print(
"[yellow]Skipping reasoning visualization: No thought_chain_id available.[/yellow]"
)
# 3. Visualize Memory Network
# Visualize around the 'error' memory
center_mem_id = mem_ids.get("mem2_id")
if center_mem_id:
await safe_tool_call(
visualize_memory_network,
{"center_memory_id": center_mem_id, "depth": 1, "max_nodes": 15},
f"Visualize Memory Network (Centered on Error Mem {_fmt_id(center_mem_id)}, Depth 1)",
)
else:
console.print(
"[yellow]Skipping centered memory visualization: Error memory ID not available.[/yellow]"
)
# Visualize top memories for the workflow
await safe_tool_call(
visualize_memory_network,
{"workflow_id": wf_id, "max_nodes": 20},
f"Visualize Memory Network (Workflow {_fmt_id(wf_id)}, Top 20 Relevant)",
)
# --- Main Execution Logic ---
async def main():
"""Run the extended Unified Memory System demonstration suite."""
console.print(
Rule(
"[bold magenta]Unified Memory System Tools Demo (Extended)[/bold magenta]",
style="white",
)
)
exit_code = 0
# Dictionaries to store IDs created during the demo
wf_ids = {}
action_ids = {}
artifact_ids = {}
thought_ids = {} # Store chain ID
mem_ids = {}
state_ids = {} # Store state ID
try:
await setup_demo_environment()
# --- Run Demo Sections in Order ---
wf_id = await demonstrate_basic_workflows()
if wf_id:
wf_ids["main_wf_id"] = wf_id
else:
raise RuntimeError("Workflow creation failed, cannot continue demo.")
action_ids = await demonstrate_basic_actions(wf_ids.get("main_wf_id"))
await demonstrate_action_dependencies(wf_ids.get("main_wf_id"), action_ids)
artifact_ids = await demonstrate_artifacts(wf_ids.get("main_wf_id"), action_ids)
chain_id = await demonstrate_thoughts_and_linking(
wf_ids.get("main_wf_id"), action_ids, artifact_ids
)
if chain_id:
thought_ids["main_chain_id"] = chain_id
mem_ids = await demonstrate_memory_operations(
wf_ids.get("main_wf_id"), action_ids, thought_ids
) # Pass thought_ids dict
await demonstrate_embedding_and_search(wf_ids.get("main_wf_id"), mem_ids, thought_ids)
# State/Working Memory depends on previous steps creating IDs
# Pass all collected ID dictionaries
await demonstrate_state_and_working_memory(
wf_id=wf_ids["main_wf_id"],
mem_ids_dict=mem_ids,
action_ids_dict=action_ids,
thought_ids_dict=thought_ids, # Contains chain_id and potentially specific thought IDs if needed later
state_ids_dict=state_ids, # Pass dict to store the created state_id
)
# --- Run NEW Advanced Demo Sections ---
await demonstrate_metacognition(wf_ids["main_wf_id"], mem_ids, state_ids)
await demonstrate_maintenance_and_stats(wf_ids["main_wf_id"])
await demonstrate_reporting_and_viz(
wf_ids["main_wf_id"], thought_ids.get("main_chain_id"), mem_ids
)
# --- End NEW Sections ---
logger.success(
"Unified Memory System Demo completed successfully!",
emoji_key="complete",
)
console.print(Rule("[bold green]Demo Finished[/bold green]", style="green"))
except Exception as e:
logger.critical(f"Demo crashed unexpectedly: {str(e)}", emoji_key="critical", exc_info=True)
console.print(f"\n[bold red]CRITICAL ERROR:[/bold red] {escape(str(e))}")
console.print_exception(show_locals=False)
exit_code = 1
finally:
# Clean up the demo environment
console.print(Rule("Cleanup", style="dim"))
await cleanup_demo_environment()
return exit_code
if __name__ == "__main__":
# Run the demo
final_exit_code = asyncio.run(main())
sys.exit(final_exit_code)
```
--------------------------------------------------------------------------------
/mcp_tool_context_estimator.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
MCP Tool Context Estimator
This script connects to an already running MCP server and estimates how much
of an LLM's context window would be consumed by the registered tools when
they're sent to the model via the Model Context Protocol.
"""
import argparse
import asyncio
import json
import os
import sys
import traceback
from typing import Any, Dict, List, Optional
import aiohttp
import tiktoken
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from rich.console import Console
from rich.table import Table
# Add the current directory to the Python path to ensure we can import modules
sys.path.append("/data/projects/ultimate_mcp_server")
# Import the existing decouple configuration from the project
from ultimate_mcp_server.config import decouple_config
# Import actual model pricing from constants
from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS
# Removed dependency on STANDALONE_TOOL_FUNCTIONS to avoid circular imports
# from ultimate_mcp_server.tools import STANDALONE_TOOL_FUNCTIONS
# Define a function to read tool names from a file generated by the server
def read_tool_names_from_file(filename='tools_list.json', quiet=False):
"""Read tool names from a JSON file generated by the server"""
console = Console()
try:
if os.path.exists(filename):
with open(filename, 'r') as f:
tool_data = json.load(f)
if not quiet:
console.print(f"[green]Successfully loaded {len(tool_data)} tools from {filename}[/green]")
return tool_data
else:
if not quiet:
console.print(f"[yellow]Tool list file {filename} not found. Will use server-provided tools only.[/yellow]")
return []
except Exception as e:
if not quiet:
console.print(f"[red]Error reading tool list: {str(e)}[/red]")
return []
# Run another server with --load-all-tools for comparison
RUN_LOAD_ALL_TOOLS_COMPARISON = True
SHOW_DESCRIPTIONS = True
async def detect_server_transport(host: str, port: str, quiet: bool = False) -> tuple[str, str]:
"""
Detect what transport mode the server is running and return the appropriate URL and transport type.
Args:
host: Server hostname
port: Server port
quiet: If True, suppress detection messages
Returns:
Tuple of (url, transport_type) where transport_type is 'sse', 'streamable-http', or 'stdio'
"""
console = Console()
if not quiet:
console.print(f"[blue]Detecting transport mode for server at {host}:{port}...[/blue]")
# Test MCP protocol endpoints with proper requests
endpoints_to_try = [
(f"http://{host}:{port}/mcp/", "streamable-http"),
(f"http://{host}:{port}/sse", "sse"),
(f"http://{host}:{port}", "sse"), # fallback for sse
]
# Create a simple MCP initialization message for testing
test_message = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "mcp-detector", "version": "1.0.0"}
}
}
for url, transport in endpoints_to_try:
try:
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
if transport == "streamable-http":
# Test streamable-http with POST + MCP message
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}
async with session.post(url, json=test_message, headers=headers) as response:
if response.status == 200:
# Check if response looks like MCP
try:
data = await response.text()
if '"jsonrpc":"2.0"' in data or '"result"' in data:
if not quiet:
console.print(f"[green]Detected {transport} transport at {url}[/green]")
return url, transport
except Exception:
pass
elif response.status in [400, 404, 405, 406]:
# Server exists but doesn't support this transport
if not quiet:
console.print(f"[dim]Endpoint {url} returned {response.status}[/dim]")
continue
else:
# Test SSE endpoints - they might respond to GET or POST
# Try GET first for SSE
try:
async with session.get(url) as response:
if response.status == 200:
content_type = response.headers.get('content-type', '').lower()
if 'text/event-stream' in content_type:
if not quiet:
console.print(f"[green]Detected {transport} transport at {url}[/green]")
return url, transport
except Exception:
pass
# If GET failed, try POST for SSE (some servers might expect it)
try:
async with session.post(url, json=test_message) as response:
if response.status == 200:
content_type = response.headers.get('content-type', '').lower()
if 'text/event-stream' in content_type or 'application/json' in content_type:
if not quiet:
console.print(f"[green]Detected {transport} transport at {url}[/green]")
return url, transport
except Exception:
pass
except Exception as e:
if not quiet:
console.print(f"[dim]Could not connect to {url}: {str(e)}[/dim]")
continue
# If HTTP detection fails, try to guess based on what we know
# Check if port 8013 responds at all
try:
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(f"http://{host}:{port}/") as response:
if response.status == 200:
# Server is running, probably streamable-http since that's the new default
default_url = f"http://{host}:{port}/mcp/"
if not quiet:
console.print(f"[yellow]Server detected but transport unclear, defaulting to streamable-http at {default_url}[/yellow]")
return default_url, "streamable-http"
except Exception:
pass
# Final fallback to SSE for backwards compatibility
fallback_url = f"http://{host}:{port}/sse"
if not quiet:
console.print(f"[yellow]Could not detect transport mode, defaulting to SSE at {fallback_url}[/yellow]")
return fallback_url, "sse"
def get_server_url_and_transport() -> tuple[str, str]:
"""
Get the MCP server URL and transport type from .env file or environment variables
Returns:
Tuple of (server_url, transport_type)
"""
# Try to get from python-decouple (.env file)
try:
host = decouple_config('MCP_SERVER_HOST', default='localhost')
port = decouple_config('MCP_SERVER_PORT', default='8013')
# Try to detect transport type - this will be resolved in the async context
return host, port
except Exception:
# Fallback to environment variables if decouple fails
if "MCP_SERVER_HOST" in os.environ and "MCP_SERVER_PORT" in os.environ:
host = os.environ["MCP_SERVER_HOST"]
port = os.environ["MCP_SERVER_PORT"]
return host, port
# Default fallback
return "localhost", "8013"
# Calculate token counts for different models
def count_tokens(text: str) -> int:
"""Count tokens using tiktoken with cl100k_base encoding (used by most modern models)"""
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
# Use real pricing imported from constants.py
# Convert from dollars per million tokens to dollars per 1000 tokens for our calculations
MODEL_PRICES = {
model: price_info["input"] / 1000 # Convert from per million to per thousand
for model, price_info in COST_PER_MILLION_TOKENS.items()
}
def format_capabilities(capabilities):
"""Safely format capabilities object to string for display"""
result = {}
# Check for specific capabilities we know about
if hasattr(capabilities, "tools"):
result["tools"] = "Available" if capabilities.tools else "Not available"
if hasattr(capabilities, "prompts"):
result["prompts"] = "Available" if capabilities.prompts else "Not available"
if hasattr(capabilities, "resources"):
result["resources"] = "Available" if capabilities.resources else "Not available"
if hasattr(capabilities, "logging"):
result["logging"] = "Available" if capabilities.logging else "Not available"
if hasattr(capabilities, "completions"):
result["completions"] = "Available" if capabilities.completions else "Not available"
if hasattr(capabilities, "experimental"):
result["experimental"] = "Available" if capabilities.experimental else "Not available"
return json.dumps(result, indent=2)
async def get_mcp_server_tools_streamable_http(server_url: str, include_tools: Optional[List[str]] = None, console: Console = None, quiet: bool = False) -> Dict[str, Any]:
"""
Connect to an MCP server running in streamable-http mode and fetch all registered tools.
Args:
server_url: The URL of the running MCP server (should be http://host:port/mcp)
include_tools: Optional list of tool names to include (if None, get all tools)
console: Optional console for output
quiet: If True, only show most important output
Returns:
Dictionary with server info and tool definitions
"""
if console is None:
console = Console()
if not quiet:
console.print(f"[bold blue]Connecting to streamable-http MCP server at {server_url}...[/bold blue]")
try:
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
# First, try to initialize the MCP connection
init_data = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"roots": {"listChanged": True}},
"clientInfo": {"name": "mcp-tool-context-estimator", "version": "1.0.0"}
}
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}
if not quiet:
console.print("[bold blue]Initializing MCP protocol via streamable-http...[/bold blue]")
async with session.post(server_url, json=init_data, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to initialize: HTTP {response.status}")
# Capture session ID from response headers
session_id = response.headers.get('mcp-session-id')
if not session_id:
raise Exception("No session ID returned from server")
# Handle SSE-formatted response
response_text = await response.text()
if response.content_type == "text/event-stream":
# Parse SSE format
lines = response_text.strip().split('\n')
json_data = None
for line in lines:
if line.startswith('data: '):
json_data = line[6:] # Remove 'data: ' prefix
break
if json_data:
init_result = json.loads(json_data)
else:
raise Exception("No JSON data found in SSE response")
else:
init_result = await response.json()
if "error" in init_result:
raise Exception(f"MCP initialization error: {init_result['error']}")
if "result" not in init_result:
raise Exception("Invalid MCP initialization response")
result = init_result["result"]
server_info = result.get("serverInfo", {})
server_name = server_info.get("name", "Unknown Server")
server_version = server_info.get("version", "Unknown Version")
if not quiet:
console.print(f"[green]Connected to server:[/green] {server_name} v{server_version}")
# Show server capabilities
capabilities = result.get("capabilities", {})
if not quiet:
console.print("[bold blue]Server capabilities:[/bold blue]")
console.print(json.dumps(capabilities, indent=2))
# Check if tools capability is present
has_tools = capabilities.get("tools", False)
if not quiet and not has_tools:
console.print("[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]")
console.print("The server might not support tool listing, but we'll try anyway.")
# Get server instructions (from server info)
server_instructions = server_info.get("instructions", "")
if server_instructions and not quiet:
console.print(f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]")
elif not quiet:
console.print("[yellow]Server does not provide instructions[/yellow]")
# Update headers to include session ID for subsequent requests
headers["mcp-session-id"] = session_id
# Send initialized notification
init_notify_data = {
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
async with session.post(server_url, json=init_notify_data, headers=headers) as response:
# This is a notification, so we don't expect a response
pass
# Now list the tools
if not quiet:
console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
list_tools_data = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list"
}
async with session.post(server_url, json=list_tools_data, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to list tools: HTTP {response.status}")
# Handle SSE-formatted response for tools list
response_text = await response.text()
if response.content_type == "text/event-stream":
# Parse SSE format
lines = response_text.strip().split('\n')
json_data = None
for line in lines:
if line.startswith('data: '):
json_data = line[6:] # Remove 'data: ' prefix
break
if json_data:
tools_result = json.loads(json_data)
else:
raise Exception("No JSON data found in SSE response")
else:
tools_result = await response.json()
if "error" in tools_result:
raise Exception(f"MCP tools/list error: {tools_result['error']}")
if "result" not in tools_result:
raise Exception("Invalid MCP tools/list response")
tools_data = tools_result["result"]
tools = tools_data.get("tools", [])
# Count tools
tool_count = len(tools) if tools else 0
if not quiet:
console.print(f"[green]Found {tool_count} tools[/green]")
if tool_count == 0:
console.print("[bold yellow]No tools found on the server.[/bold yellow]")
return {}
# Convert tools to their JSON representation (exactly as sent to LLMs)
tool_defs = []
# Add debug information about descriptions
has_descriptions = 0
total_desc_length = 0
for tool in tools:
# Convert to dict that matches the MCP protocol spec for tool definitions
tool_dict = {
"name": tool.get("name"),
"inputSchema": tool.get("inputSchema")
}
# Debug description handling
if tool.get("description"):
desc = tool["description"]
has_descriptions += 1
total_desc_length += len(desc)
if not quiet:
console.print(f"[dim]Tool '{tool['name']}' has description ({len(desc):,} chars)[/dim]")
tool_dict["description"] = desc
elif not quiet:
console.print(f"[dim yellow]Tool '{tool['name']}' has no description[/dim yellow]")
if tool.get("annotations"):
tool_dict["annotations"] = tool["annotations"]
tool_defs.append(tool_dict)
# Print description statistics
if not quiet:
console.print(f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]")
if has_descriptions > 0:
console.print(f"[green]Average description length: {total_desc_length/has_descriptions:,.1f} chars[/green]")
# Include server info in the result to be used for creating the complete LLM prompt
return {
"tools": tool_defs,
"server_name": server_name,
"server_version": server_version,
"server_instructions": server_instructions
}
except Exception as e:
console.print(f"[bold red]Error connecting to streamable-http MCP server:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
raise
async def get_mcp_server_tools_stdio(command: str, args: Optional[List[str]] = None, include_tools: Optional[List[str]] = None, console: Console = None, quiet: bool = False) -> Dict[str, Any]:
"""
Connect to an MCP server via stdio transport and fetch all registered tools.
Args:
command: Command to run the MCP server
args: Additional arguments for the command
include_tools: Optional list of tool names to include (if None, get all tools)
console: Optional console for output
quiet: If True, only show most important output
Returns:
Dictionary with server info and tool definitions
"""
if console is None:
console = Console()
if not quiet:
console.print(f"[bold blue]Connecting to MCP server via stdio: {command} {' '.join(args or [])}[/bold blue]")
try:
# Build the command array
cmd = command.split() if isinstance(command, str) else [command]
if args:
cmd.extend(args)
async with stdio_client(cmd) as (read, write):
# Create a client session
async with ClientSession(read, write) as session:
# Initialize connection to server
if not quiet:
console.print("[bold blue]Initializing MCP protocol via stdio...[/bold blue]")
init_result = await session.initialize()
# Get server info
server_name = init_result.serverInfo.name
server_version = init_result.serverInfo.version
if not quiet:
console.print(f"[green]Connected to server:[/green] {server_name} v{server_version}")
# Show server capabilities safely
if not quiet:
console.print("[bold blue]Server capabilities:[/bold blue]")
console.print(format_capabilities(init_result.capabilities))
# Check if tools capability is present
has_tools = False
if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools:
has_tools = True
if not quiet and not has_tools:
console.print("[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]")
console.print("The server might not support tool listing, but we'll try anyway.")
# Get server instructions (will be used in the LLM prompt)
server_instructions = ""
if hasattr(init_result, "instructions") and init_result.instructions:
server_instructions = init_result.instructions
if not quiet:
console.print(f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]")
elif not quiet:
console.print("[yellow]Server does not provide instructions[/yellow]")
# List available tools
if not quiet:
console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
try:
tools_result = await session.list_tools()
# Handle ListToolsResult object
tools = []
if hasattr(tools_result, "tools"):
tools = tools_result.tools
else:
if not quiet:
console.print("[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]")
if hasattr(tools_result, "__iter__"):
tools = list(tools_result)
else:
if not quiet:
console.print(f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]")
console.print(f"Tools result attributes: {dir(tools_result)}")
raise ValueError("Unable to extract tools from server response")
# Count tools
tool_count = len(tools) if tools else 0
if not quiet:
console.print(f"[green]Found {tool_count} tools[/green]")
if tool_count == 0:
console.print("[bold yellow]No tools found on the server.[/bold yellow]")
return {}
# Convert tools to their JSON representation (exactly as sent to LLMs)
tool_defs = []
# Add debug information about descriptions
has_descriptions = 0
total_desc_length = 0
for tool in tools:
# Convert to dict that matches the MCP protocol spec for tool definitions
tool_dict = {
"name": tool.name,
"inputSchema": tool.inputSchema
}
# Debug description handling
if hasattr(tool, "description") and tool.description:
desc = tool.description
has_descriptions += 1
total_desc_length += len(desc)
if not quiet:
console.print(f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]")
tool_dict["description"] = desc
elif not quiet:
console.print(f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]")
if hasattr(tool, "annotations") and tool.annotations:
tool_dict["annotations"] = tool.annotations
tool_defs.append(tool_dict)
# Print description statistics
if not quiet:
console.print(f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]")
if has_descriptions > 0:
console.print(f"[green]Average description length: {total_desc_length/has_descriptions:,.1f} chars[/green]")
# Include server info in the result to be used for creating the complete LLM prompt
return {
"tools": tool_defs,
"server_name": server_name,
"server_version": server_version,
"server_instructions": server_instructions
}
except Exception as e:
console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
raise
except Exception as e:
console.print(f"[bold red]Error connecting to MCP server via stdio:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
raise
async def get_mcp_server_tools(server_url: str, transport_type: str, include_tools: Optional[List[str]] = None, console: Console = None, quiet: bool = False, command: Optional[str] = None, args: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Connect to an already running MCP server and fetch all registered tools.
Args:
server_url: The URL of the running MCP server (ignored for stdio)
transport_type: The transport type ('sse', 'streamable-http', or 'stdio')
include_tools: Optional list of tool names to include (if None, get all tools)
console: Optional console for output
quiet: If True, only show most important output
command: Command to run for stdio transport
args: Additional arguments for stdio command
Returns:
Dictionary with server info and tool definitions
"""
if console is None:
console = Console()
if transport_type == "streamable-http":
return await get_mcp_server_tools_streamable_http(server_url, include_tools, console, quiet)
elif transport_type == "stdio":
if not command:
raise ValueError("Command must be provided for stdio transport")
return await get_mcp_server_tools_stdio(command, args, include_tools, console, quiet)
# Original SSE implementation
if not quiet:
console.print(f"[bold blue]Connecting to MCP server at {server_url}...[/bold blue]")
try:
async with sse_client(server_url) as (read, write):
# Create a client session
async with ClientSession(read, write) as session:
# Initialize connection to server
if not quiet:
console.print("[bold blue]Initializing MCP protocol...[/bold blue]")
init_result = await session.initialize()
# Get server info
server_name = init_result.serverInfo.name
server_version = init_result.serverInfo.version
if not quiet:
console.print(f"[green]Connected to server:[/green] {server_name} v{server_version}")
# Show server capabilities safely
if not quiet:
console.print("[bold blue]Server capabilities:[/bold blue]")
console.print(format_capabilities(init_result.capabilities))
# Check if tools capability is present
has_tools = False
if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools:
has_tools = True
if not quiet and not has_tools:
console.print("[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]")
console.print("The server might not support tool listing, but we'll try anyway.")
# Get server instructions (will be used in the LLM prompt)
server_instructions = ""
if hasattr(init_result, "instructions") and init_result.instructions:
server_instructions = init_result.instructions
if not quiet:
console.print(f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]")
elif not quiet:
console.print("[yellow]Server does not provide instructions[/yellow]")
# List available tools
if not quiet:
console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
try:
tools_result = await session.list_tools()
# Handle ListToolsResult object
# The result should have a 'tools' attribute which is the actual list
tools = []
if hasattr(tools_result, "tools"):
tools = tools_result.tools
else:
# If it doesn't have a tools attribute, try to access it as a list directly
# or check other common patterns
if not quiet:
console.print("[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]")
if hasattr(tools_result, "__iter__"):
tools = list(tools_result)
else:
# Print the object to help diagnose
if not quiet:
console.print(f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]")
console.print(f"Tools result attributes: {dir(tools_result)}")
raise ValueError("Unable to extract tools from server response")
# Count tools
tool_count = len(tools) if tools else 0
if not quiet:
console.print(f"[green]Found {tool_count} tools[/green]")
if tool_count == 0:
console.print("[bold yellow]No tools found on the server.[/bold yellow]")
return {}
# Convert tools to their JSON representation (exactly as sent to LLMs)
tool_defs = []
# Add debug information about descriptions
has_descriptions = 0
total_desc_length = 0
for tool in tools:
# Convert to dict that matches the MCP protocol spec for tool definitions
tool_dict = {
"name": tool.name,
"inputSchema": tool.inputSchema
}
# Debug description handling
if hasattr(tool, "description") and tool.description:
desc = tool.description
has_descriptions += 1
total_desc_length += len(desc)
if not quiet:
console.print(f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]")
tool_dict["description"] = desc
elif not quiet:
console.print(f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]")
if hasattr(tool, "annotations") and tool.annotations:
tool_dict["annotations"] = tool.annotations
tool_defs.append(tool_dict)
# Print description statistics
if not quiet:
console.print(f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]")
if has_descriptions > 0:
console.print(f"[green]Average description length: {total_desc_length/has_descriptions:,.1f} chars[/green]")
# Include server info in the result to be used for creating the complete LLM prompt
return {
"tools": tool_defs,
"server_name": server_name,
"server_version": server_version,
"server_instructions": server_instructions
}
except Exception as e:
console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
# Try retrieving server details to help diagnose
if not quiet:
try:
console.print("[bold blue]Getting additional server information...[/bold blue]")
if hasattr(init_result.capabilities, "prompts") and init_result.capabilities.prompts:
prompts_result = await session.list_prompts()
prompt_count = 0
if hasattr(prompts_result, "prompts"):
prompt_count = len(prompts_result.prompts)
console.print(f"Server has {prompt_count} prompts available")
except Exception:
pass
raise
except Exception as e:
console.print(f"[bold red]Error connecting to MCP server:[/bold red] {str(e)}")
if not quiet:
console.print("[bold yellow]Stack trace:[/bold yellow]")
console.print(traceback.format_exc())
# Provide guidance based on the error
if "Connection refused" in str(e):
console.print("[bold yellow]The server doesn't appear to be running at the specified URL.[/bold yellow]")
console.print("Make sure your MCP server is running and available at the URL you specified.")
elif "401" in str(e):
console.print("[bold yellow]Authentication error - the server requires credentials.[/bold yellow]")
elif "404" in str(e):
console.print("[bold yellow]The server endpoint was not found.[/bold yellow]")
console.print("Check if you need to use a different URL path (e.g., /sse or /mcp)")
console.print("Try using /sse instead of just the port number.")
sys.exit(1)
def create_full_tool_registration_prompt(server_info, tools=None, quiet=False):
"""
Create a full, realistic prompt as would be sent to an LLM when registering MCP tools.
This generates the exact format used in the MCP client's format_tools_for_anthropic method
which sends tools to the Anthropic API.
Args:
server_info: Dictionary with server information
tools: List of tool definitions to include (if None, use all tools)
quiet: If True, only show most important output
Returns:
String with the serialized JSON representation of tools as sent to the API
"""
if tools is None:
tools = server_info["tools"]
# The actual format sent to Anthropic API is just:
# {
# "name": sanitized_name,
# "input_schema": tool.input_schema,
# "description": tool.description # only if present
# }
formatted_tools = []
# Track description statistics
desc_count = 0
total_desc_len = 0
console = Console()
for tool in tools:
# Create the tool dict exactly as in format_tools_for_anthropic
tool_dict_for_api = {
"name": tool["name"],
"input_schema": tool["inputSchema"]
}
if SHOW_DESCRIPTIONS:
# Add description only if it exists and is not empty
if "description" in tool and tool["description"]:
desc = tool["description"]
tool_dict_for_api["description"] = desc
desc_count += 1
total_desc_len += len(desc)
if not quiet and len(desc) > 100:
# Show abbreviated version for long descriptions
abbrev = desc[:50] + "..." + desc[-50:]
console.print(f"[dim]Including description for {tool['name']}: {abbrev}[/dim]")
elif not quiet:
console.print(f"[dim]Including description for {tool['name']}: {desc}[/dim]")
elif not quiet:
console.print(f"[dim yellow]No description for {tool['name']}[/dim yellow]")
formatted_tools.append(tool_dict_for_api)
# Final description statistics - ALWAYS show these since they're part of the requested output
console.print(f"[green]Included {desc_count} descriptions out of {len(tools)} tools in final output[/green]")
if desc_count > 0:
console.print(f"[green]Average description length in final output: {total_desc_len/desc_count:,.1f} chars[/green]")
# Return the serialized JSON that would be sent to the API
return json.dumps(formatted_tools, indent=2)
def format_tool_for_llm(tool: Dict[str, Any]) -> str:
"""
Format a tool definition exactly as it would be presented to an LLM.
This should match the format used in actual LLM prompt construction.
"""
# This is how tools are typically formatted for LLMs in the JSON format
return json.dumps(tool, indent=2)
def analyze_tools_token_usage(current_tools: Dict[str, Any], all_tools: Dict[str, Any], quiet: bool = False):
"""
Analyze token usage for a complete MCP tool registration prompt
Args:
current_tools: Current active toolset info
all_tools: Complete toolset info (with --load-all-tools)
quiet: If True, only show most important output
"""
console = Console()
# Format tools as they would be sent to an LLM
current_tools_subset = current_tools["tools"]
all_tools_subset = all_tools["tools"]
# Determine if we're likely comparing the same set vs different sets
same_toolsets = len(current_tools_subset) == len(all_tools_subset)
if same_toolsets and not quiet:
console.print("[yellow]Warning: Current tool count equals all tools count.[/yellow]")
console.print("[yellow]This suggests the server is already running with --load-all-tools[/yellow]")
# Adjust column labels based on what we're comparing
current_label = "Current Tools"
all_label = "All Tools"
# Get JSON representations
current_tools_json = "\n".join(format_tool_for_llm(tool) for tool in current_tools_subset)
all_tools_json = "\n".join(format_tool_for_llm(tool) for tool in all_tools_subset)
# Create the full prompts
current_tools_prompt = create_full_tool_registration_prompt(current_tools, current_tools_subset, quiet)
all_tools_prompt = create_full_tool_registration_prompt(all_tools, all_tools_subset, quiet)
# Calculate sizes for raw JSON
current_tools_size_kb = len(current_tools_json.encode('utf-8')) / 1024
all_tools_size_kb = len(all_tools_json.encode('utf-8')) / 1024
# Calculate sizes for full prompts
current_tools_prompt_size_kb = len(current_tools_prompt.encode('utf-8')) / 1024
all_tools_prompt_size_kb = len(all_tools_prompt.encode('utf-8')) / 1024
# Count tokens for raw JSON
current_tools_tokens = count_tokens(current_tools_json)
all_tools_tokens = count_tokens(all_tools_json)
# Count tokens for full prompts
current_tools_prompt_tokens = count_tokens(current_tools_prompt)
all_tools_prompt_tokens = count_tokens(all_tools_prompt)
# Calculate costs for different models (using full prompt tokens)
current_tools_costs = {model: (price * current_tools_prompt_tokens / 1000)
for model, price in MODEL_PRICES.items()}
all_tools_costs = {model: (price * all_tools_prompt_tokens / 1000)
for model, price in MODEL_PRICES.items()}
# Save the complete, untruncated text to files
with open("current_tools_sent_to_llm.json", "w", encoding="utf-8") as f:
f.write(current_tools_prompt)
console.print("[green]Saved current tools JSON to current_tools_sent_to_llm.json[/green]")
with open("all_tools_sent_to_llm.json", "w", encoding="utf-8") as f:
f.write(all_tools_prompt)
console.print("[green]Saved all tools JSON to all_tools_sent_to_llm.json[/green]\n\n")
# Create data for display - ensure the data is correct and consistent
data = {
"current_tools": {
"count": len(current_tools_subset),
"raw_size_kb": current_tools_size_kb,
"raw_tokens": current_tools_tokens,
"full_size_kb": current_tools_prompt_size_kb,
"full_tokens": current_tools_prompt_tokens,
"costs": current_tools_costs
},
"all_tools": {
"count": len(all_tools_subset),
"raw_size_kb": all_tools_size_kb,
"raw_tokens": all_tools_tokens,
"full_size_kb": all_tools_prompt_size_kb,
"full_tokens": all_tools_prompt_tokens,
"costs": all_tools_costs
}
}
# Create comparison table
table = Table(title="Tool Registration Token Usage")
# Add columns - including percentage column
table.add_column("Metric", style="white")
table.add_column(current_label, style="cyan")
table.add_column(all_label, style="magenta")
table.add_column("Difference", style="yellow")
table.add_column(f"{current_label} as % of {all_label}", style="green")
# SECTION 1: Number of Tools
# Calculate percentage for count
count_percentage = (data["current_tools"]["count"] / data["all_tools"]["count"]) * 100 if data["all_tools"]["count"] > 0 else 100
# Add rows - keep consistent format with other rows for the number of tools
table.add_row(
"Number of Tools",
str(data["current_tools"]["count"]),
str(data["all_tools"]["count"]),
str(data["current_tools"]["count"] - data["all_tools"]["count"]),
f"{count_percentage:.2f}%"
)
# Add a divider after Number of Tools
table.add_section()
# SECTION 2: Full Prompt stats
# Calculate percentage for full prompt size
full_size_percentage = (data["current_tools"]["full_size_kb"] / data["all_tools"]["full_size_kb"]) * 100 if data["all_tools"]["full_size_kb"] > 0 else 100
table.add_row(
"Full Prompt Size (KB)",
f"{data['current_tools']['full_size_kb']:,.2f}",
f"{data['all_tools']['full_size_kb']:,.2f}",
f"{data['current_tools']['full_size_kb'] - data['all_tools']['full_size_kb']:,.2f}",
f"{full_size_percentage:.2f}%"
)
# Calculate percentage for full tokens
full_tokens_percentage = (data["current_tools"]["full_tokens"] / data["all_tools"]["full_tokens"]) * 100 if data["all_tools"]["full_tokens"] > 0 else 100
table.add_row(
"Full Prompt Token Count",
f"{data['current_tools']['full_tokens']:,}",
f"{data['all_tools']['full_tokens']:,}",
f"{data['current_tools']['full_tokens'] - data['all_tools']['full_tokens']:,}",
f"{full_tokens_percentage:.2f}%"
)
# Add a divider after Full Prompt stats
table.add_section()
# SECTION 3: Model costs
# Specify the models to include and their order
models_to_include = [
"claude-3-7-sonnet-20250219",
"gpt-4.1",
"gemini-2.5-pro-preview-03-25",
"grok-3-latest"
]
# Add cost rows for selected models only, in specified order
for model in models_to_include:
if model in MODEL_PRICES:
current_cost = data["current_tools"]["costs"][model]
all_cost = data["all_tools"]["costs"][model]
diff_cost = current_cost - all_cost
# Calculate percentage
cost_percentage = (current_cost / all_cost) * 100 if all_cost > 0 else 100
table.add_row(
f"Cost ({model})",
f"${current_cost:.4f}",
f"${all_cost:.4f}",
f"${diff_cost:.4f}",
f"{cost_percentage:.2f}%"
)
# Print table
console.print(table)
# Print raw data as JSON (only if not in quiet mode)
if not quiet:
console.print("\nRaw token usage data:")
console.print(json.dumps(data, indent=2))
return data
async def get_complete_toolset(quiet: bool = False) -> List[Dict[str, Any]]:
"""
Generate the complete toolset that would be available with --load-all-tools
This uses a list of tool names read from a file generated by the server.
If the file doesn't exist, it will use a list of common tools from the current server.
Args:
quiet: If True, only show most important output
Returns:
Dictionary with server info and simulated complete toolset
"""
console = Console()
if not quiet:
console.print("[bold blue]Analyzing complete toolset (--load-all-tools)[/bold blue]")
# First get the current server's tools to extract real descriptions where possible
try:
# Get server connection details
host, port = get_server_url_and_transport()
server_url, transport_type = await detect_server_transport(host, port, quiet=quiet)
current_tools_info = await get_mcp_server_tools(server_url, transport_type, quiet=quiet, command=None, args=None)
current_tools = {tool["name"]: tool for tool in current_tools_info["tools"]} if current_tools_info else {}
if not quiet:
console.print(f"[green]Retrieved {len(current_tools)} tools from current server to use their real descriptions[/green]")
except Exception as e:
if not quiet:
console.print(f"[yellow]Could not get current tools: {str(e)}[/yellow]")
current_tools = {}
# Read tool names from file created by the server
all_tool_names = read_tool_names_from_file(quiet=quiet)
# If no tools found in file, use the tools we got from the server
if not all_tool_names and current_tools:
if not quiet:
console.print("[yellow]No tools found in file. Using current server tools and adding some common ones.[/yellow]")
all_tool_names = list(current_tools.keys())
# Add some common tool names that might not be in the current server
additional_tools = [
"excel_create_workbook", "excel_open_workbook", "excel_add_worksheet",
"excel_set_cell_value", "excel_get_cell_value", "excel_save_workbook",
"excel_get_worksheet_names", "excel_create_chart", "excel_set_range_format",
"smart_browser.autopilot", "smart_browser.parallel", "smart_browser.download_site_pdfs",
"generate_image", "analyze_image", "transcribe_audio"
]
# Add them if not already present
for tool in additional_tools:
if tool not in all_tool_names:
all_tool_names.append(tool)
if not quiet:
console.print(f"[green]Using complete list of {len(all_tool_names)} tools for all-tools mode[/green]")
# Create tool entries based on real data
tool_defs = []
for tool_name in all_tool_names:
# First check if we have real data for this tool
if tool_name in current_tools:
# Use the actual tool definition from the server
tool_def = current_tools[tool_name]
if not quiet:
console.print(f"[dim]Using real definition for tool '{tool_name}'[/dim]")
else:
# Create a definition with a realistic description based on the tool name
tool_desc = f"The {tool_name} tool provides functionality for {tool_name.replace('_', ' ')}. " + \
"This would be the actual docstring from the function when loaded with --load-all-tools."
# Create a basic definition
tool_def = {
"name": tool_name,
"inputSchema": {
"type": "object",
"properties": {
"param1": {"type": "string", "description": "First parameter"},
"param2": {"type": "string", "description": "Second parameter"}
},
"required": ["param1"]
},
"description": tool_desc
}
if not quiet:
console.print(f"[dim yellow]Created placeholder for tool '{tool_name}'[/dim yellow]")
tool_defs.append(tool_def)
# Return a similar structure to what get_mcp_server_tools returns
return {
"tools": tool_defs,
"server_name": "Ultimate MCP Server (with --load-all-tools)",
"server_version": "1.6.0",
"server_instructions": """This server provides access to the complete set of tools available in the Ultimate MCP Server.
When running with --load-all-tools, all tools from all categories are available, including:
- Completion tools for text generation
- Provider tools for model management
- Filesystem tools for file operations
- Optimization tools for cost and performance
- Text processing tools for manipulating text
- Meta tools for accessing tool information
- Search tools for querying databases
- Browser automation tools
- Web research tools
- HTML processing tools
- Extraction tools
- SQL database tools
- Document processing tools
- Audio transcription tools
- Excel spreadsheet tools
- OCR tools
- Sentiment analysis tools
"""
}
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description="MCP Tool Context Estimator")
parser.add_argument("--url", default=None,
help="URL of the MCP server (default: auto-detected)")
parser.add_argument("--transport", default=None,
choices=["sse", "streamable-http", "stdio"],
help="Force specific transport type (default: auto-detect)")
parser.add_argument("--command", default=None,
help="Command to run for stdio transport (e.g., 'python -m ultimate_mcp_server')")
parser.add_argument("--args", default=None, nargs="*",
help="Additional arguments for stdio command")
parser.add_argument("--no-all-tools", action="store_true",
help="Skip comparison with all tools")
parser.add_argument("--quiet", "-q", action="store_true",
help="Only show most important information and final table")
return parser.parse_args()
async def main():
"""Main function"""
console = Console()
args = parse_args()
# Handle stdio transport
if args.transport == "stdio":
if not args.command:
console.print("[bold red]Error: --command is required for stdio transport[/bold red]")
console.print("Example: --transport stdio --command 'python -m ultimate_mcp_server'")
sys.exit(1)
server_url = None # Not used for stdio
transport_type = "stdio"
command = args.command
stdio_args = args.args or []
if not args.quiet:
console.print(f"[blue]Using stdio transport with command: {command} {' '.join(stdio_args)}[/blue]")
else:
# Get server connection details for HTTP transports
if args.url:
# Parse URL to extract host and port
import urllib.parse
parsed = urllib.parse.urlparse(args.url)
host = parsed.hostname or "localhost"
port = str(parsed.port or 8013)
if args.transport:
transport_type = args.transport
if transport_type == "sse":
server_url = f"http://{host}:{port}/sse"
else: # streamable-http
server_url = f"http://{host}:{port}/mcp/"
else:
# Auto-detect transport for manually specified URL
server_url, transport_type = await detect_server_transport(host, port, quiet=args.quiet)
else:
# Auto-detect everything
host, port = get_server_url_and_transport()
if args.transport:
transport_type = args.transport
if transport_type == "sse":
server_url = f"http://{host}:{port}/sse"
else: # streamable-http
server_url = f"http://{host}:{port}/mcp/"
else:
server_url, transport_type = await detect_server_transport(host, port, quiet=args.quiet)
command = None
stdio_args = None
quiet_mode = args.quiet
try:
# Get the active toolset from the running server
current_tools = await get_mcp_server_tools(
server_url,
transport_type,
quiet=quiet_mode,
command=command,
args=stdio_args
)
if not current_tools or "tools" not in current_tools or not current_tools["tools"]:
console.print("[bold yellow]No tools found on the server.[/bold yellow]")
return
if args.no_all_tools:
# If we're not doing the comparison, create a meaningful subset for comparison
if not quiet_mode:
console.print("[yellow]Skipping comparison with full --load-all-tools[/yellow]")
console.print("[green]Creating an artificial subset of current tools for comparison[/green]")
# Create a more meaningful subset by taking half the tools
# If we have 1-4 tools, use all of them to avoid empty subset
total_tools = len(current_tools["tools"])
subset_size = max(total_tools // 2, min(total_tools, 4))
subset_tools = current_tools["tools"][:subset_size]
if not quiet_mode:
console.print(f"[green]Created subset with {subset_size} tools out of {total_tools} total[/green]")
# Create subset version
subset_data = {
"tools": subset_tools,
"server_name": current_tools["server_name"] + " (Subset)",
"server_version": current_tools["server_version"],
"server_instructions": current_tools["server_instructions"]
}
# Analyze token usage with the artificial subset vs full
analyze_tools_token_usage(subset_data, current_tools, quiet=quiet_mode)
else:
# Get the complete toolset that would be available with --load-all-tools
all_tools = await get_complete_toolset(quiet=quiet_mode)
# Check if current server is likely already running with all tools
current_tool_count = len(current_tools["tools"])
all_tool_count = len(all_tools["tools"])
if abs(current_tool_count - all_tool_count) <= 2: # Allow small difference
if not quiet_mode:
console.print(f"[yellow]Warning: Current server has {current_tool_count} tools, "
f"which is very close to the expected all-tools count of {all_tool_count}[/yellow]")
console.print("[yellow]This suggests the server is already running with --load-all-tools[/yellow]")
# For accurate comparison when counts are the same, we should just use the same data for both
# to ensure metrics are consistent
same_tools_data = { # noqa: F841
"tools": current_tools["tools"].copy(),
"server_name": "Current Server",
"server_version": current_tools["server_version"],
"server_instructions": current_tools["server_instructions"]
}
# Create a deep copy to ensure they're exactly the same
all_tools = {
"tools": current_tools["tools"].copy(),
"server_name": "All Tools",
"server_version": current_tools["server_version"],
"server_instructions": current_tools["server_instructions"]
}
# Analyze token usage with full prompt simulation
analyze_tools_token_usage(current_tools, all_tools, quiet=quiet_mode)
except KeyboardInterrupt:
console.print("[bold yellow]Operation cancelled by user[/bold yellow]")
except Exception as e:
console.print(f"[bold red]Unexpected error:[/bold red] {str(e)}")
if not quiet_mode:
console.print(traceback.format_exc())
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/base.py:
--------------------------------------------------------------------------------
```python
"""Base tool classes and decorators for Ultimate MCP Server."""
import asyncio
import functools
import inspect
import time
from typing import Any, Callable, Dict, List, Optional, Type, Union
try:
from fastmcp import Tool
except ImportError:
# Handle case where mcp might be available via different import
try:
from fastmcp import Tool
except ImportError:
Tool = None # Tool will be provided by the mcp_server
from ultimate_mcp_server.exceptions import (
ResourceError,
ToolError,
ToolExecutionError,
ToolInputError,
format_error_response,
)
# from ultimate_mcp_server.services.cache import with_cache
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.tools.base")
def tool(name=None, description=None):
"""
Decorator that marks a BaseTool class method as an MCP tool.
This decorator adds metadata to a method, identifying it as a tool that should be
registered with the MCP server when the containing BaseTool class is initialized.
It allows customizing the tool's name and description, which are used in tool
discoverability and documentation.
Unlike the register_tool function which directly registers standalone functions,
this decorator only marks methods for later registration, allowing BaseTool subclasses
to organize multiple related tools together in a single class.
The decorator adds three attributes to the method:
- _tool: A boolean flag indicating this is a tool method
- _tool_name: The name to use when registering the tool (or original method name)
- _tool_description: The description to use for the tool (or method docstring)
These attributes are used during the tool registration process, typically in the
_register_tools method of BaseTool subclasses.
Args:
name: Custom name for the tool (defaults to the method name if not provided)
description: Custom description for the tool (defaults to the method's docstring)
Returns:
A decorator function that adds tool metadata attributes to the decorated method
Example:
```python
class MyToolSet(BaseTool):
tool_name = "my_toolset"
@tool(name="custom_operation", description="Performs a customized operation")
async def perform_operation(self, param1: str, param2: int) -> Dict[str, Any]:
# Implementation
return {"result": "success"}
```
Notes:
- This decorator should be used on methods of classes that inherit from BaseTool
- Decorated methods should be async
- The decorated method must take self as its first parameter
- This decorator does not apply error handling or other middleware automatically
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
return await func(self, *args, **kwargs)
wrapper._tool = True
wrapper._tool_name = name
wrapper._tool_description = description
return wrapper
return decorator
def with_resource(resource_type, allow_creation=False, require_existence=True):
"""
Decorator for standardizing resource access and validation in tool methods.
This decorator provides consistent resource handling for tool methods that
access or create persistent resources in the MCP ecosystem. It enforces resource
validation rules, handles resource registration, and provides unified error handling
for resource-related operations.
Core functionalities:
1. Resource existence validation - Ensures resources exist before allowing access
2. Resource creation tracking - Registers newly created resources with the system
3. Resource type validation - Confirms resources match expected types
4. Standardized error handling - Produces consistent error responses for resource issues
The decorator identifies resource IDs by looking for common parameter names like
'{resource_type}_id', 'id', or 'resource_id' in the function's keyword arguments.
When a resource ID is found, it performs the configured validation checks before
allowing the function to execute. After execution, it can optionally register
newly created resources.
Args:
resource_type: Type category for the resource (e.g., "document", "embedding",
"database"). Used for validation and registration.
allow_creation: Whether the tool is allowed to create new resources of this type.
When True, the decorator will register any created resources.
require_existence: Whether the resource must exist before the tool is called.
When True, the decorator will verify resource existence.
Returns:
A decorator function that applies resource handling to tool methods.
Raises:
ResourceError: When resource validation fails (e.g., resource not found,
resource type mismatch, or unauthorized resource access).
Example:
```python
class DocumentTools(BaseTool):
@tool()
@with_resource("document", require_existence=True, allow_creation=False)
async def get_document_summary(self, document_id: str):
# This method will fail with ResourceError if document_id doesn't exist
# Resource existence is checked before this code runs
...
@tool()
@with_resource("document", require_existence=False, allow_creation=True)
async def create_document(self, content: str, metadata: Dict[str, Any] = None):
# Process content and create document
doc_id = str(uuid.uuid4())
# ... processing logic ...
# Return created resource with resource_id key to trigger registration
return {
"resource_id": doc_id, # This triggers resource registration
"status": "created",
"metadata": {"content_length": len(content), "created_at": time.time()}
}
# The resource is automatically registered with the returned metadata
```
Notes:
- This decorator should be applied after @tool but before other decorators
like @with_error_handling to ensure proper execution order
- Resources created with allow_creation=True must include a "resource_id"
key in their result dictionary to trigger registration
- The resource registry must be accessible via the tool's mcp server instance
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
# Get resource ID from kwargs (common parameter names)
resource_id = None
for param_name in [f"{resource_type}_id", "id", "resource_id"]:
if param_name in kwargs:
resource_id = kwargs[param_name]
break
# Check if resource exists if required
if require_existence and resource_id:
# Get resource registry from MCP server
resource_registry = getattr(self.mcp, "resources", None)
if resource_registry is None:
logger.warning(
f"Resource registry not available, skipping existence check for {resource_type}/{resource_id}",
emoji_key="warning"
)
else:
# Check if resource exists
exists = await resource_registry.exists(resource_type, resource_id)
if not exists:
raise ResourceError(
f"{resource_type.capitalize()} not found: {resource_id}",
resource_type=resource_type,
resource_id=resource_id
)
# Call function
result = await func(self, *args, **kwargs)
# If the function returns a new resource ID, register it
if allow_creation and isinstance(result, dict) and "resource_id" in result:
new_resource_id = result["resource_id"]
# Get resource registry from MCP server
resource_registry = getattr(self.mcp, "resources", None)
if resource_registry is not None:
# Register new resource
metadata = {
"created_at": time.time(),
"creator": kwargs.get("ctx", {}).get("user_id", "unknown"),
"resource_type": resource_type
}
# Add other metadata from result if available
if "metadata" in result:
metadata.update(result["metadata"])
await resource_registry.register(
resource_type,
new_resource_id,
metadata=metadata
)
logger.info(
f"Registered new {resource_type}: {new_resource_id}",
emoji_key="resource",
resource_type=resource_type,
resource_id=new_resource_id
)
return result
# Add resource metadata to function
wrapper._resource_type = resource_type
wrapper._allow_creation = allow_creation
wrapper._require_existence = require_existence
return wrapper
return decorator
class ResourceRegistry:
"""
Registry that tracks and manages resources used by MCP tools.
The ResourceRegistry provides a centralized system for tracking resources created or
accessed by tools within the MCP ecosystem. It maintains resource metadata, handles
persistence of resource information, and provides methods for registering, looking up,
and deleting resources.
Resources in the MCP ecosystem represent persistent or semi-persistent objects that
may be accessed across multiple tool calls or sessions. Examples include documents,
knowledge bases, embeddings, file paths, and database connections. The registry helps
manage the lifecycle of these resources and prevents issues like resource leaks or
unauthorized access.
Key features:
- In-memory caching of resource metadata for fast lookups
- Optional persistent storage via pluggable storage backends
- Resource type categorization (documents, embeddings, etc.)
- Resource existence checking for access control
- Simple CRUD operations for resource metadata
Resources are organized by type and identified by unique IDs within those types.
Each resource has associated metadata that can include creation time, owner information,
and resource-specific attributes.
The registry is typically initialized by the MCP server and made available to all tools.
Tools that create resources should register them, and tools that access resources should
verify their existence before proceeding.
"""
def __init__(self, storage_backend=None):
"""Initialize the resource registry.
Args:
storage_backend: Backend for persistent storage (if None, in-memory only)
"""
self.resources = {}
self.storage = storage_backend
self.logger = get_logger("ultimate_mcp_server.resources")
async def register(self, resource_type, resource_id, metadata=None):
"""Register a resource in the registry.
Args:
resource_type: Type of resource (e.g., "document", "embedding")
resource_id: Resource identifier
metadata: Additional metadata about the resource
Returns:
True if registration was successful
"""
# Initialize resource type if not exists
if resource_type not in self.resources:
self.resources[resource_type] = {}
# Register resource
self.resources[resource_type][resource_id] = {
"id": resource_id,
"type": resource_type,
"metadata": metadata or {},
"registered_at": time.time()
}
# Persist to storage backend if available
if self.storage:
try:
await self.storage.save_resource(
resource_type,
resource_id,
self.resources[resource_type][resource_id]
)
except Exception as e:
self.logger.error(
f"Failed to persist resource {resource_type}/{resource_id}: {str(e)}",
emoji_key="error",
exc_info=True
)
return True
async def exists(self, resource_type, resource_id):
"""Check if a resource exists in the registry.
Args:
resource_type: Type of resource
resource_id: Resource identifier
Returns:
True if the resource exists
"""
# Check in-memory registry first
if resource_type in self.resources and resource_id in self.resources[resource_type]:
return True
# Check storage backend if available
if self.storage:
try:
return await self.storage.resource_exists(resource_type, resource_id)
except Exception as e:
self.logger.error(
f"Failed to check resource existence {resource_type}/{resource_id}: {str(e)}",
emoji_key="error",
exc_info=True
)
return False
async def get(self, resource_type, resource_id):
"""Get resource metadata from the registry.
Args:
resource_type: Type of resource
resource_id: Resource identifier
Returns:
Resource metadata or None if not found
"""
# Check in-memory registry first
if resource_type in self.resources and resource_id in self.resources[resource_type]:
return self.resources[resource_type][resource_id]
# Check storage backend if available
if self.storage:
try:
resource = await self.storage.get_resource(resource_type, resource_id)
if resource:
# Cache in memory for future access
if resource_type not in self.resources:
self.resources[resource_type] = {}
self.resources[resource_type][resource_id] = resource
return resource
except Exception as e:
self.logger.error(
f"Failed to get resource {resource_type}/{resource_id}: {str(e)}",
emoji_key="error",
exc_info=True
)
return None
async def list(self, resource_type, limit=100, offset=0, filters=None):
"""List resources of a specific type.
Args:
resource_type: Type of resource to list
limit: Maximum number of resources to return
offset: Offset for pagination
filters: Dictionary of filters to apply
Returns:
List of resource metadata
"""
result = []
# Get from storage backend first if available
if self.storage:
try:
resources = await self.storage.list_resources(
resource_type,
limit=limit,
offset=offset,
filters=filters
)
# Cache in memory for future access
if resources:
if resource_type not in self.resources:
self.resources[resource_type] = {}
for resource in resources:
resource_id = resource.get("id")
if resource_id:
self.resources[resource_type][resource_id] = resource
return resources
except Exception as e:
self.logger.error(
f"Failed to list resources of type {resource_type}: {str(e)}",
emoji_key="error",
exc_info=True
)
# Fallback to in-memory registry
if resource_type in self.resources:
# Apply filters if provided
filtered_resources = self.resources[resource_type].values()
if filters:
for key, value in filters.items():
filtered_resources = [
r for r in filtered_resources
if r.get("metadata", {}).get(key) == value
]
# Apply pagination
result = list(filtered_resources)[offset:offset+limit]
return result
async def delete(self, resource_type, resource_id):
"""Delete a resource from the registry.
Args:
resource_type: Type of resource
resource_id: Resource identifier
Returns:
True if deletion was successful
"""
# Delete from in-memory registry
if resource_type in self.resources and resource_id in self.resources[resource_type]:
del self.resources[resource_type][resource_id]
# Delete from storage backend if available
if self.storage:
try:
return await self.storage.delete_resource(resource_type, resource_id)
except Exception as e:
self.logger.error(
f"Failed to delete resource {resource_type}/{resource_id}: {str(e)}",
emoji_key="error",
exc_info=True
)
return True
class BaseToolMetrics:
"""
Metrics collection and aggregation system for tool execution statistics.
The BaseToolMetrics class provides a standardized way to track and aggregate performance
metrics for tool executions. It maintains cumulative statistics about calls to a tool,
including execution counts, success rates, timing information, and optional token usage
and cost data when available.
This class is used both internally by BaseTool instances and by the with_tool_metrics
decorator to provide consistent metrics tracking across the entire MCP ecosystem. The
collected metrics enable monitoring, debugging, and optimization of tool performance
and usage patterns.
Metrics tracked:
- Total number of calls
- Number of successful and failed calls
- Success rate
- Total, minimum, and maximum execution duration
- Total token usage (for LLM-based tools)
- Total cost (for tools with cost accounting)
The metrics are aggregated in memory and can be retrieved at any time via the get_stats()
method. They represent the lifetime statistics of the tool since the metrics object
was created.
Example:
```python
# Accessing metrics from a tool
my_tool = MyToolClass(mcp_server)
metrics = my_tool.metrics.get_stats()
print(f"Success rate: {metrics['success_rate']:.2%}")
print(f"Average duration: {metrics['average_duration']:.2f}s")
```
"""
def __init__(self):
"""Initialize metrics tracking."""
self.total_calls = 0
self.successful_calls = 0
self.failed_calls = 0
self.total_duration = 0.0
self.min_duration = float('inf')
self.max_duration = 0.0
self.total_tokens = 0
self.total_cost = 0.0
def record_call(
self,
success: bool,
duration: float,
tokens: Optional[int] = None,
cost: Optional[float] = None
) -> None:
"""Record metrics for a tool call.
Args:
success: Whether the call was successful
duration: Duration of the call in seconds
tokens: Number of tokens used (if applicable)
cost: Cost of the call (if applicable)
"""
self.total_calls += 1
if success:
self.successful_calls += 1
else:
self.failed_calls += 1
self.total_duration += duration
self.min_duration = min(self.min_duration, duration)
self.max_duration = max(self.max_duration, duration)
if tokens is not None:
self.total_tokens += tokens
if cost is not None:
self.total_cost += cost
def get_stats(self) -> Dict[str, Any]:
"""Get current metrics.
Returns:
Dictionary of metrics
"""
if self.total_calls == 0:
return {
"total_calls": 0,
"success_rate": 0.0,
"average_duration": 0.0,
"min_duration": 0.0,
"max_duration": 0.0,
"total_tokens": 0,
"total_cost": 0.0,
}
return {
"total_calls": self.total_calls,
"successful_calls": self.successful_calls,
"failed_calls": self.failed_calls,
"success_rate": self.successful_calls / self.total_calls,
"average_duration": self.total_duration / self.total_calls,
"min_duration": self.min_duration if self.min_duration != float('inf') else 0.0,
"max_duration": self.max_duration,
"total_tokens": self.total_tokens,
"total_cost": self.total_cost,
}
class BaseTool:
"""
Foundation class for all tool implementations in the Ultimate MCP Server.
The BaseTool class serves as the fundamental building block for creating tools that
can be registered with and executed by the MCP server. It provides core functionality
for metrics tracking, logging, resource management, and tool execution.
Tools in the Ultimate MCP Server ecosystem are designed to provide specific capabilities
that can be invoked by clients (typically LLMs) to perform various operations like
document processing, vector search, file operations, etc. The BaseTool architecture
ensures all tools have consistent behavior for error handling, metrics collection,
and server integration.
Key features:
- Standardized tool registration via decorators
- Consistent metrics tracking for all tool executions
- Unified error handling and response formatting
- Integration with the server's resource registry
- Logger setup with tool-specific naming
Tool classes should inherit from BaseTool and define their tools using the @tool
decorator. Each tool method should be async and follow the standard pattern of
accepting parameters, performing operations, and returning results in a structured
format.
Example:
```python
class MyCustomTools(BaseTool):
tool_name = "my_custom_tools"
description = "Provides custom tools for specific operations"
@tool(name="custom_operation")
@with_tool_metrics
@with_error_handling
async def perform_operation(self, param1: str, param2: int) -> Dict[str, Any]:
# Implementation
return {"result": "success", "data": some_data}
```
"""
tool_name: str = "base_tool"
description: str = "Base tool class for Ultimate MCP Server."
def __init__(self, mcp_server):
"""Initialize the tool.
Args:
mcp_server: MCP server instance
"""
# If mcp_server is a Gateway instance, get the MCP object
self.mcp = mcp_server.mcp if hasattr(mcp_server, 'mcp') else mcp_server
self.logger = get_logger(f"tool.{self.tool_name}")
self.metrics = BaseToolMetrics()
# Initialize resource registry if not already available
if not hasattr(self.mcp, "resources"):
self.mcp.resources = ResourceRegistry()
def _register_tools(self):
"""Register tools with MCP server.
Override this method in subclasses to register specific tools.
This method is no longer called by the base class constructor.
Registration is now handled externally, e.g., in register_all_tools.
"""
pass
async def execute(self, tool_name: str, params: Dict[str, Any]) -> Any:
"""
Execute a tool method by name with the given parameters.
This method provides the core execution mechanism for BaseTool subclasses,
dynamically dispatching calls to the appropriate tool method based on the
tool_name parameter. It handles parameter validation, metrics collection,
and error standardization to ensure consistent behavior across all tools.
Execution flow:
1. Looks up the requested tool method in the class
2. Validates that the method is properly marked as a tool
3. Applies metrics tracking via _wrap_with_metrics
4. Executes the tool with the provided parameters
5. Returns the tool's response or a standardized error
Args:
tool_name: Name of the specific tool method to execute
params: Dictionary of parameters to pass to the tool method
(These parameters will be unpacked as kwargs)
Returns:
The result returned by the tool method, or a standardized error response
if execution fails
Raises:
ToolError: If the specified tool_name is not found or not properly
marked as a tool method
Example:
```python
# Direct execution of a tool method
result = await my_tool_instance.execute(
"analyze_document",
{"document_id": "doc123", "analysis_type": "sentiment"}
)
# Error handling
if "isError" in result and result["isError"]:
print(f"Tool execution failed: {result['error']['message']}")
else:
print(f"Analysis result: {result['analysis_score']}")
```
"""
# Find method with tool name
method_name = tool_name.split(".")[-1] # Handle namespaced tools
method = getattr(self, method_name, None)
if not method or not hasattr(method, "_tool"):
raise ToolError(
f"Tool not found: {tool_name}",
error_code="tool_not_found"
)
# Execute tool with metrics wrapper
return await self._wrap_with_metrics(method, **params)
async def _wrap_with_metrics(
self,
func: Callable,
*args,
**kwargs
) -> Any:
"""
Internal method that wraps a function call with metrics tracking.
This method provides a standardized way to execute tool functions while capturing
performance metrics such as execution duration, success/failure status, token usage,
and cost. These metrics are stored in the BaseTool instance's metrics object for
later analysis and reporting.
The method performs the following steps:
1. Records the start time of the operation
2. Executes the provided function with the supplied arguments
3. If successful, extracts metrics data from the result (if available)
4. Records the execution metrics in the BaseTool's metrics object
5. Returns the original result or propagates any exceptions that occurred
Metrics extraction:
- If the result is a dictionary, it will attempt to extract:
- Token usage from either result["tokens"]["total"] or result["total_tokens"]
- Cost information from result["cost"]
Args:
func: Async function to execute with metrics tracking
*args: Positional arguments to pass to the function
**kwargs: Keyword arguments to pass to the function
Returns:
The result of the wrapped function call
Raises:
Any exception raised by the wrapped function (after logging it)
Notes:
- This method is typically called internally by BaseTool subclasses
- Related to but different from the standalone with_tool_metrics decorator
- Exceptions are logged but not caught (to allow proper error handling)
"""
start_time = time.time()
success = False
tokens = None
cost = None
try:
# Call function
result = await func(*args, **kwargs)
# Extract metrics if available
if isinstance(result, dict):
if "tokens" in result and isinstance(result["tokens"], dict):
tokens = result["tokens"].get("total")
elif "total_tokens" in result:
tokens = result["total_tokens"]
cost = result.get("cost")
success = True
return result
except Exception as e:
self.logger.error(
f"Tool execution failed: {func.__name__}: {str(e)}",
emoji_key="error",
tool=func.__name__,
exc_info=True
)
raise
finally:
# Record metrics
duration = time.time() - start_time
self.metrics.record_call(
success=success,
duration=duration,
tokens=tokens,
cost=cost
)
def with_tool_metrics(func):
"""
Decorator that automatically tracks performance metrics for tool functions.
This decorator captures and records execution metrics for both class methods and
standalone functions. It adapts its behavior based on whether the decorated function
is a method on a BaseTool instance or a standalone function.
Metrics captured include:
- Execution time (duration in seconds)
- Success/failure state
- Token usage (extracted from result if available)
- Cost information (extracted from result if available)
The decorator performs several functions:
1. Captures start time before execution
2. Executes the wrapped function, preserving all args/kwargs
3. Extracts metrics from the result dictionary if available
4. Logs execution statistics
5. Updates metrics in the BaseTool.metrics object if available
When used with other decorators:
- Should be applied before with_error_handling to ensure metrics are
captured even when errors occur
- Works well with with_cache, tracking metrics for both cache hits and misses
- Compatible with with_retry, recording each attempt separately
Args:
func: The async function to decorate (can be a method or standalone function)
Returns:
Wrapped async function that captures and records metrics
Example:
```python
@with_tool_metrics
@with_error_handling
async def my_tool_function(param1, param2):
# Function implementation
```
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Check if the first arg looks like a BaseTool instance
self_obj = args[0] if args and isinstance(args[0], BaseTool) else None
tool_name = getattr(self_obj, 'tool_name', func.__name__)
start_time = time.time()
success = False
tokens = None
cost = None
result = None
try:
# Call original function, passing self_obj if it exists
if self_obj:
# Assumes if self_obj exists, it's the first positional arg expected by func
result = func(self_obj, *args[1:], **kwargs)
else:
# Pass only the args/kwargs received, assuming func is standalone
result = func(*args, **kwargs)
# Only await when necessary
if inspect.isawaitable(result):
result = await result
# result is now either a ToolResult _or_ an async iterator
# Extract metrics if available from result
if isinstance(result, dict):
if "tokens" in result and isinstance(result["tokens"], dict):
tokens = result["tokens"].get("total")
elif "total_tokens" in result:
tokens = result["total_tokens"]
cost = result.get("cost")
success = True
return result
except Exception as e:
logger.error(
f"Tool execution failed: {tool_name}: {str(e)}",
emoji_key="error",
tool=tool_name,
exc_info=True
)
raise # Re-raise exception for other handlers (like with_error_handling)
finally:
# Record metrics
duration = time.time() - start_time
# Log execution stats
logger.debug(
f"Tool execution: {tool_name} ({'success' if success else 'failed'})",
emoji_key="tool" if success else "error",
tool=tool_name,
time=duration,
cost=cost
)
# Update metrics if we found a self object with a metrics attribute
if self_obj and hasattr(self_obj, 'metrics'):
self_obj.metrics.record_call(
success=success,
duration=duration,
tokens=tokens,
cost=cost
)
return wrapper
def with_retry(
max_retries: int = 3,
retry_delay: float = 1.0,
backoff_factor: float = 2.0,
retry_exceptions: List[Type[Exception]] = None
):
"""
Decorator that adds exponential backoff retry logic to async tool functions.
This decorator wraps an async function with retry logic that will automatically
re-execute the function if it fails with certain exceptions. It implements an
exponential backoff strategy to progressively increase the wait time between
retry attempts, reducing load during transient failures.
Retry behavior:
1. When the decorated function raises an exception, the decorator checks if it's a
retriable exception type (based on the retry_exceptions parameter)
2. If retriable, it waits for a delay period (which increases with each attempt)
3. After waiting, it retries the function with the same arguments
4. This process repeats until either the function succeeds or max_retries is reached
Args:
max_retries: Maximum number of retry attempts before giving up (default: 3)
retry_delay: Initial delay in seconds before first retry (default: 1.0)
backoff_factor: Multiplier for delay between retries (default: 2.0)
Each retry's delay is calculated as: retry_delay * (backoff_factor ^ attempt)
retry_exceptions: List of exception types that should trigger retries.
If None, all exceptions will trigger retries.
Returns:
A decorator function that wraps the given async function with retry logic.
Example:
```python
@with_retry(max_retries=3, retry_delay=2.0, backoff_factor=3.0,
retry_exceptions=[ConnectionError, TimeoutError])
async def fetch_data(url):
# This function will retry up to 3 times if it raises ConnectionError or TimeoutError
# Delays between retries: 2s, 6s, 18s
# For other exceptions, it will fail immediately
return await some_api_call(url)
```
Notes:
- This decorator only works with async functions
- The decorated function must be idempotent (safe to call multiple times)
- Retries are logged at WARNING level, final failures at ERROR level
- The final exception is re-raised after all retries are exhausted
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
delay = retry_delay
for attempt in range(max_retries + 1):
try:
# Call original function
return await func(*args, **kwargs)
except Exception as e:
# Only retry on specified exceptions
if retry_exceptions and not any(
isinstance(e, exc_type) for exc_type in retry_exceptions
):
raise
last_exception = e
# Log retry attempt
if attempt < max_retries:
logger.warning(
f"Tool execution failed, retrying ({attempt+1}/{max_retries}): {str(e)}",
emoji_key="warning",
tool=func.__name__,
attempt=attempt+1,
max_retries=max_retries,
delay=delay
)
# Wait before retrying
await asyncio.sleep(delay)
# Increase delay for next retry
delay *= backoff_factor
else:
# Log final failure
logger.error(
f"Tool execution failed after {max_retries} retries: {str(e)}",
emoji_key="error",
tool=func.__name__,
exc_info=True
)
# If we get here, all retries failed
raise last_exception
return wrapper
return decorator
def with_error_handling(func):
"""
Decorator that transforms tool function exceptions into standardized error responses.
This decorator intercepts any exceptions raised during tool execution and converts them
into a structured error response format following the MCP protocol standards. It ensures
that clients receive consistent, actionable error information regardless of how or where
the error occurred.
The decorator performs several key functions:
1. Detects if it's decorating a BaseTool method or standalone function and adapts accordingly
2. Reconstructs function call arguments appropriately based on function signature
3. Catches exceptions raised during execution and transforms them into structured responses
4. Maps different exception types to corresponding MCP error types with appropriate metadata
5. Logs detailed error information while providing a clean, standardized response to clients
Exception handling:
- ToolError: Passed through with logging (assumes already formatted correctly)
- ValueError: Converted to ToolInputError with detailed context
- Other exceptions: Converted to ToolExecutionError with execution context
All error responses have the same structure:
```
{
"success": False,
"isError": True,
"error": {
"type": "<error_type>",
"message": "<human-readable message>",
"details": {<context-specific details>},
"retriable": <boolean>,
"suggestions": [<optional recovery suggestions>],
"timestamp": <current_time>
}
}
```
Args:
func: The async function to decorate (can be a method or standalone function)
Returns:
Decorated async function that catches exceptions and returns structured error responses
Example:
```python
@with_error_handling
async def my_tool_function(param1, param2):
# If this raises an exception, it will be transformed into a structured response
# rather than propagating up to the caller
# ...
```
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Check if the first arg looks like a BaseTool instance
self_obj = args[0] if args and isinstance(args[0], BaseTool) else None
# Determine tool_name based on instance or func name
tool_name = getattr(self_obj, 'tool_name', func.__name__)
sig = inspect.signature(func)
func_params = set(sig.parameters.keys()) # noqa: F841
call_args = []
call_kwargs = {}
if self_obj:
expected_params = list(sig.parameters.values())
if expected_params and expected_params[0].name == 'self':
call_args.append(self_obj)
start_index = 1 if self_obj and call_args else 0
call_args.extend(args[start_index:])
# Pass all original kwargs through
call_kwargs.update(kwargs)
try:
# Call original function with reconstructed args/kwargs
# This version passes *all* kwargs received by the wrapper,
# trusting FastMCP to pass the correct ones including 'ctx'.
result = func(*call_args, **call_kwargs)
# Only await when necessary
if inspect.isawaitable(result):
result = await result
# result is now either a ToolResult _or_ an async iterator
return result
except ToolError as e:
# Already a tool error, log and return
logger.error(
f"Tool error in {tool_name}: {str(e)} ({e.error_code})",
emoji_key="error",
tool=tool_name,
error_code=e.error_code,
details=e.details
)
# Debug log the formatted error response
error_response = format_error_response(e)
logger.debug(f"Formatted error response for {tool_name}: {error_response}")
# Return standardized error response
return error_response
except ValueError as e:
# Convert ValueError to ToolInputError with more detailed information
error = ToolInputError(
f"Invalid input to {tool_name}: {str(e)}",
details={
"tool_name": tool_name,
"exception_type": "ValueError",
"original_error": str(e)
}
)
logger.error(
f"Invalid input to {tool_name}: {str(e)}",
emoji_key="error",
tool=tool_name,
error_code=error.error_code
)
# Return standardized error response
return format_error_response(error)
except Exception as e:
# Create a more specific error message that includes the tool name
specific_message = f"Execution error in {tool_name}: {str(e)}"
# Convert to ToolExecutionError for other exceptions
error = ToolExecutionError(
specific_message,
cause=e,
details={
"tool_name": tool_name,
"exception_type": type(e).__name__,
"original_message": str(e)
}
)
logger.error(
specific_message,
emoji_key="error",
tool=tool_name,
exc_info=True
)
# Return standardized error response
return format_error_response(error)
return wrapper
def register_tool(mcp_server, name=None, description=None, cache_ttl=None):
"""
Register a standalone function as an MCP tool with optional caching and error handling.
This function creates a decorator that registers the decorated function with the MCP server,
automatically applying error handling and optional result caching. It provides a simpler
alternative to class-based tool registration via the BaseTool class, allowing standalone
functions to be exposed as MCP tools without creating a full tool class.
The decorator handles:
1. Tool registration with the MCP server using the provided name (or function name)
2. Documentation via the provided description (or function docstring)
3. Optional result caching with the specified TTL
4. Standardized error handling via the with_error_handling decorator
Args:
mcp_server: MCP server instance to register the tool with
name: Tool name used for registration (defaults to the function name if not provided)
description: Tool description for documentation (defaults to function docstring if not provided)
cache_ttl: Optional time-to-live in seconds for caching tool results. If provided, the tool results
will be cached for this duration to improve performance for identical calls.
Returns:
Decorator function that transforms the decorated function into a registered MCP tool
Example:
```python
# Initialize MCP server
mcp_server = FastMCP()
# Register a function as a tool
@register_tool(mcp_server, name="get_weather", cache_ttl=300)
async def get_weather_data(location: str, units: str = "metric"):
'''Get current weather data for a location.'''
# Implementation
return {"temperature": 22, "conditions": "sunny"}
# The function is now registered as an MCP tool named "get_weather"
# with 5-minute result caching and standardized error handling
```
Notes:
- The decorated function must be async
- If cache_ttl is provided, identical calls will return cached results
rather than re-executing the function
- Function signature is preserved, making it transparent to callers
- For more complex tools with multiple methods, use the BaseTool class instead
"""
def decorator(func):
# Get function name and docstring
tool_name = name or func.__name__
tool_description = description or func.__doc__ or f"Tool: {tool_name}" # noqa: F841
# Apply caching if specified
# if cache_ttl is not None:
# func = with_cache(ttl=cache_ttl)(func)
# Apply error handling
func = with_error_handling(func)
# Register with MCP server
mcp_server.tool(name=tool_name)(func)
return func
return decorator
def _get_json_schema_type(type_annotation):
"""
Convert Python type annotations to JSON Schema type definitions.
This utility function translates Python's typing annotations into equivalent JSON Schema
type definitions, enabling automatic generation of API documentation and client interfaces
from Python function signatures. It handles basic types, Optional types, Lists, and
provides reasonable defaults for complex types.
The function is primarily used internally by the MCP framework to generate JSON Schema
definitions for tool parameters, allowing clients to understand the expected input types
and structures for each tool.
Type mappings:
- str -> {"type": "string"}
- int -> {"type": "integer"}
- float -> {"type": "number"}
- bool -> {"type": "boolean"}
- Optional[T] -> Same as T, but adds "null" to "type" array
- List[T] -> {"type": "array", "items": <schema for T>}
- Dict -> {"type": "object"}
- Other complex types -> {"type": "object"}
Args:
type_annotation: A Python type annotation (from typing module or built-in types)
Returns:
A dictionary containing the equivalent JSON Schema type definition
Notes:
- This function provides only type information, not complete JSON Schema validation
rules like minimum/maximum values, string patterns, etc.
- Complex nested types (e.g., List[Dict[str, List[int]]]) are handled, but deeply
nested structures may be simplified in the output schema
- This function is meant for internal use by the tool registration system
Examples:
```python
# Basic types
_get_json_schema_type(str) # -> {"type": "string"}
_get_json_schema_type(int) # -> {"type": "integer"}
# Optional types
from typing import Optional
_get_json_schema_type(Optional[str]) # -> {"type": ["string", "null"]}
# List types
from typing import List
_get_json_schema_type(List[int]) # -> {"type": "array", "items": {"type": "integer"}}
# Complex types
from typing import Dict, List
_get_json_schema_type(Dict[str, List[int]]) # -> {"type": "object"}
```
"""
import typing
# Handle basic types
if type_annotation is str:
return {"type": "string"}
elif type_annotation is int:
return {"type": "integer"}
elif type_annotation is float:
return {"type": "number"}
elif type_annotation is bool:
return {"type": "boolean"}
# Handle Optional types
origin = typing.get_origin(type_annotation)
args = typing.get_args(type_annotation)
if origin is Union and type(None) in args:
# Optional type - get the non-None type
non_none_args = [arg for arg in args if arg is not type(None)]
if len(non_none_args) == 1:
inner_type = _get_json_schema_type(non_none_args[0])
return inner_type
# Handle lists
if origin is list or origin is List:
if args:
item_type = _get_json_schema_type(args[0])
return {
"type": "array",
"items": item_type
}
return {"type": "array"}
# Handle dictionaries
if origin is dict or origin is Dict:
return {"type": "object"}
# Default to object for complex types
return {"type": "object"}
def with_state_management(namespace: str):
"""
Decorator that provides persistent state management capabilities to tool functions.
This decorator enables stateful behavior in otherwise stateless tool functions by
injecting state access methods that allow reading, writing, and deleting values
from a persistent, namespace-based state store. This is essential for tools that
need to maintain context across multiple invocations, manage session data, or
build features with memory capabilities.
The state management system provides:
- Namespace isolation: Each tool can use its own namespace to prevent key collisions
- Thread-safe concurrency: Built-in locks ensure safe parallel access to the same state
- Optional persistence: State can be backed by disk storage for durability across restarts
- Lazy loading: State is loaded from disk only when accessed, improving performance
State accessibility functions injected into the decorated function:
- get_state(key, default=None) → Any: Retrieve a value by key, with optional default
- set_state(key, value) → None: Store a value under the specified key
- delete_state(key) → None: Remove a value from the state store
All state operations are async, allowing the tool to continue processing while
state operations are pending.
Args:
namespace: A unique string identifying this tool's state namespace. This
should be chosen carefully to avoid collisions with other tools.
Recommended format: "<tool_category>.<specific_feature>"
Examples: "conversation.history", "user.preferences", "document.cache"
Returns:
A decorator function that wraps the original tool function, adding state
management capabilities via injected parameters.
Examples:
Basic usage with conversation history:
```python
@with_state_management("conversation.history")
async def chat_with_memory(message: str, ctx=None, get_state=None, set_state=None, delete_state=None):
# Get previous messages from persistent store
history = await get_state("messages", [])
# Add new message
history.append({"role": "user", "content": message})
# Generate response based on all previous conversation context
response = generate_response(message, history)
# Add AI response to history
history.append({"role": "assistant", "content": response})
# Store updated history for future calls
await set_state("messages", history)
return {"response": response}
```
Advanced pattern with conversational memory and user customization:
```python
@with_state_management("assistant.settings")
async def personalized_assistant(
query: str,
update_preferences: bool = False,
preferences: Dict[str, Any] = None,
ctx=None,
get_state=None,
set_state=None,
delete_state=None
):
# Get user ID from context
user_id = ctx.get("user_id", "default_user")
# Retrieve user-specific preferences
user_prefs = await get_state(f"prefs:{user_id}", {
"tone": "professional",
"verbosity": "concise",
"expertise_level": "intermediate"
})
# Update preferences if requested
if update_preferences and preferences:
user_prefs.update(preferences)
await set_state(f"prefs:{user_id}", user_prefs)
# Get conversation history
history = await get_state(f"history:{user_id}", [])
# Process query using preferences and history
response = process_personalized_query(
query,
user_preferences=user_prefs,
conversation_history=history
)
# Update conversation history
history.append({"query": query, "response": response})
if len(history) > 20: # Keep only recent history
history = history[-20:]
await set_state(f"history:{user_id}", history)
return {
"response": response,
"preferences": user_prefs
}
```
State persistence across server restarts:
```python
# First call to the tool
@with_state_management("task.progress")
async def long_running_task(task_id: str, step: int = None, ctx=None,
get_state=None, set_state=None, delete_state=None):
# Get current progress
progress = await get_state(task_id, {"completed_steps": [], "current_step": 0})
# Update progress if a new step is provided
if step is not None:
progress["current_step"] = step
progress["completed_steps"].append({
"step": step,
"timestamp": time.time()
})
await set_state(task_id, progress)
# Even if the server restarts, the next call will retrieve the saved progress
return {
"task_id": task_id,
"progress": progress,
"completed": len(progress["completed_steps"]),
"current_step": progress["current_step"]
}
```
Implementation Pattern:
The decorator works by injecting three async state management functions into the
decorated function's keyword arguments:
1. `get_state(key, default=None)`:
- Retrieves state values from the persistent store
- If key doesn't exist, returns the provided default value
- Example: `user_data = await get_state("user:12345", {})`
2. `set_state(key, value)`:
- Stores a value in the persistent state store
- Automatically serializes complex Python objects (dicts, lists, etc.)
- Example: `await set_state("session:abc", {"authenticated": True})`
3. `delete_state(key)`:
- Removes a key and its associated value from the store
- Example: `await delete_state("temporary_data")`
Notes:
- The decorated function must accept get_state, set_state, delete_state, and ctx
parameters, either explicitly or via **kwargs.
- State persistence depends on the MCP server configuration. If persistence is
enabled, state will survive server restarts.
- For large objects, consider storing only references or identifiers in the state
and using a separate storage system for the actual data.
- The state store is shared across all server instances, so state keys should be
chosen to avoid collisions between different tools and features.
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Get context from kwargs
context = kwargs.get('ctx')
if not context or not hasattr(context, 'fastmcp'):
raise ValueError("Context with FastMCP server required")
# Access StateStore via the FastMCP 2.0+ pattern
if not hasattr(context.fastmcp, '_state_store'):
raise ValueError("FastMCP server does not have a state store attached")
state_store = context.fastmcp._state_store
# Add state accessors to kwargs
kwargs['get_state'] = lambda key, default=None: state_store.get(namespace, key, default)
kwargs['set_state'] = lambda key, value: state_store.set(namespace, key, value)
kwargs['delete_state'] = lambda key: state_store.delete(namespace, key)
return await func(*args, **kwargs)
# Update signature to include context parameter if not already present
sig = inspect.signature(func)
if 'ctx' not in sig.parameters:
wrapped_params = list(sig.parameters.values())
wrapped_params.append(
inspect.Parameter('ctx', inspect.Parameter.KEYWORD_ONLY,
annotation='Optional[Dict[str, Any]]', default=None)
)
wrapper.__signature__ = sig.replace(parameters=wrapped_params)
return wrapper
return decorator
```