#
tokens: 39292/50000 3/207 files (page 16/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 16 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/examples/unified_memory_system_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
import asyncio
import sys
import time
import traceback
from pathlib import Path
from typing import Any, Dict, Optional


def _fmt_id(val: Any, length: int = 8) -> str:
    """Return a short id string safe for logs."""
    s = str(val) if val is not None else "?"
    # Ensure slicing doesn't go out of bounds if string is shorter than length
    return s[: min(length, len(s))]


# --- Project Setup ---
# Add project root to path for imports when running as script
# Adjust this path if your script location relative to the project root differs
try:
    SCRIPT_DIR = Path(__file__).resolve().parent
    # Navigate up until we find a directory likely containing the project modules
    PROJECT_ROOT = SCRIPT_DIR
    while (
        not (PROJECT_ROOT / "ultimate_mcp_server").is_dir()
        and not (PROJECT_ROOT / "ultimate_mcp_client").is_dir()
        and PROJECT_ROOT.parent != PROJECT_ROOT
    ):  # Prevent infinite loop
        PROJECT_ROOT = PROJECT_ROOT.parent

    if (
        not (PROJECT_ROOT / "ultimate_mcp_server").is_dir()
        and not (PROJECT_ROOT / "ultimate_mcp_client").is_dir()
    ):
        print(
            f"Error: Could not reliably determine project root from {SCRIPT_DIR}.", file=sys.stderr
        )
        # Fallback: Add script dir anyway, maybe it's flat structure
        if str(SCRIPT_DIR) not in sys.path:
            sys.path.insert(0, str(SCRIPT_DIR))
            print(
                f"Warning: Added script directory {SCRIPT_DIR} to path as fallback.",
                file=sys.stderr,
            )
        else:
            sys.exit(1)  # Give up if markers not found after traversing up

    if str(PROJECT_ROOT) not in sys.path:
        sys.path.insert(0, str(PROJECT_ROOT))

except Exception as e:
    print(f"Error setting up sys.path: {e}", file=sys.stderr)
    sys.exit(1)

from rich.console import Console  # noqa: E402
from rich.markup import escape  # noqa: E402 
from rich.panel import Panel  # noqa: E402
from rich.pretty import pretty_repr  # noqa: E402
from rich.rule import Rule  # noqa: E402
from rich.traceback import install as install_rich_traceback  # noqa: E402

from ultimate_mcp_server.config import get_config  # noqa: E402

# Tools and related components from unified_memory
from ultimate_mcp_server.tools.unified_memory_system import (  # noqa: E402
    ActionStatus,
    ActionType,
    ArtifactType,
    # Utilities/Enums/Exceptions needed
    DBConnection,
    LinkType,
    MemoryLevel,
    MemoryType,
    ThoughtType,
    ToolError,
    ToolInputError,
    # Action Dependency Tools
    add_action_dependency,
    auto_update_focus,
    compute_memory_statistics,
    consolidate_memories,
    create_memory_link,
    # Thought
    create_thought_chain,
    # Workflow
    create_workflow,
    delete_expired_memories,
    focus_memory,
    generate_reflection,
    generate_workflow_report,
    get_action_dependencies,
    get_action_details,
    get_artifact_by_id,
    get_artifacts,
    get_linked_memories,
    get_memory_by_id,
    get_recent_actions,
    get_thought_chain,
    get_workflow_context,
    get_workflow_details,
    # Working Memory / State
    get_working_memory,
    hybrid_search_memories,
    # Initialization
    initialize_memory_system,
    list_workflows,
    load_cognitive_state,
    optimize_working_memory,
    promote_memory_level,
    query_memories,
    record_action_completion,
    # Action
    record_action_start,
    # Artifacts
    record_artifact,
    record_thought,
    save_cognitive_state,
    search_semantic_memories,
    # Core Memory
    store_memory,
    summarize_text,
    update_memory,
    update_workflow_status,
    visualize_memory_network,
    visualize_reasoning_chain,
)

# Utilities from the project
from ultimate_mcp_server.utils import get_logger  # noqa: E402

console = Console()
logger = get_logger("demo.unified_memory")
config = get_config()  # Load config to ensure provider keys might be found

install_rich_traceback(show_locals=False, width=console.width)

DEMO_DB_FILE: Optional[str] = config.agent_memory.db_path  # Global to hold the DB path being used


async def safe_tool_call(func, args: Dict, description: str, suppress_output: bool = False):
    """Helper to call a tool function, catch errors, and display results."""
    display_title = not suppress_output
    display_args = not suppress_output
    display_result_panel = not suppress_output

    if display_title:
        title = f"DEMO: {description}"
        console.print(Rule(f"[bold blue]{escape(title)}[/bold blue]", style="blue"))
    if display_args:
        if args:
            console.print(f"[dim]Calling [bold cyan]{func.__name__}[/] with args:[/]")
            try:
                # Filter out db_path if it matches the global default for cleaner logs
                args_to_print = {
                    k: v for k, v in args.items() if k != "db_path" or v != DEMO_DB_FILE
                }
                args_repr = pretty_repr(args_to_print, max_length=120, max_string=100)
            except Exception:
                args_repr = str(args)[:300]
            console.print(args_repr)
        else:
            console.print(f"[dim]Calling [bold cyan]{func.__name__}[/] (no arguments)[/]")

    start_time = time.monotonic()
    result = None
    try:
        # Use the global DEMO_DB_FILE if db_path isn't explicitly in args
        if "db_path" not in args and DEMO_DB_FILE:
            args["db_path"] = DEMO_DB_FILE

        result = await func(**args)

        processing_time = time.monotonic() - start_time
        log_func = getattr(logger, "debug", print)
        log_func(f"Tool '{func.__name__}' execution time: {processing_time:.4f}s")

        if display_result_panel:
            success = isinstance(result, dict) and result.get("success", False)
            panel_title = f"[bold {'green' if success else 'yellow'}]Result: {func.__name__} {'✅' if success else '❔'}[/]"
            panel_border = "green" if success else "yellow"

            try:
                # Handle specific large/complex outputs
                if func.__name__ == "generate_workflow_report" and result.get("report"):
                    report_preview = str(result["report"])[:500] + (
                        "..." if len(str(result["report"])) > 500 else ""
                    )
                    result_repr = f"Report Format: {result.get('format')}\nStyle: {result.get('style_used')}\nPreview:\n---\n{report_preview}\n---"
                elif func.__name__ in [
                    "visualize_reasoning_chain",
                    "visualize_memory_network",
                ] and result.get("visualization"):
                    viz_preview = str(result["visualization"])[:500] + (
                        "..." if len(str(result["visualization"])) > 500 else ""
                    )
                    result_repr = f"Visualization Format: {result.get('format')}\nContent Preview:\n---\n{viz_preview}\n---"
                elif func.__name__ == "summarize_text" and result.get("summary"):
                    summary_preview = str(result["summary"])[:500] + (
                        "..." if len(str(result["summary"])) > 500 else ""
                    )
                    result_repr = f"Summary Preview:\n---\n{summary_preview}\n---"
                elif func.__name__ == "consolidate_memories" and result.get("consolidated_content"):
                    content_preview = str(result["consolidated_content"])[:500] + (
                        "..." if len(str(result["consolidated_content"])) > 500 else ""
                    )
                    result_repr = f"Consolidated Content Preview:\n---\n{content_preview}\n---"
                elif func.__name__ == "generate_reflection" and result.get("content"):
                    content_preview = str(result["content"])[:500] + (
                        "..." if len(str(result["content"])) > 500 else ""
                    )
                    result_repr = f"Reflection Content Preview:\n---\n{content_preview}\n---"
                else:
                    result_repr = pretty_repr(result, max_length=200, max_string=150)
            except Exception:
                result_repr = f"(Could not represent result of type {type(result)} fully)\n{str(result)[:500]}"

            console.print(
                Panel(
                    escape(result_repr), title=panel_title, border_style=panel_border, expand=False
                )
            )

        return result

    except (ToolInputError, ToolError) as e:
        processing_time = time.monotonic() - start_time
        log_func_error = getattr(logger, "error", print)
        log_func_error(f"Tool '{func.__name__}' failed: {e}", exc_info=False)
        if display_result_panel:
            error_title = f"[bold red]Error: {func.__name__} Failed ❌[/]"
            error_content = f"[bold red]{type(e).__name__}:[/] {escape(str(e))}"
            details = None
            if hasattr(e, "details") and e.details:
                details = e.details
            elif hasattr(e, "context") and e.context:
                details = e.context

            if details:
                try:
                    details_repr = pretty_repr(details)
                except Exception:
                    details_repr = str(details)
                error_content += f"\n\n[yellow]Details:[/]\n{escape(details_repr)}"
            console.print(Panel(error_content, title=error_title, border_style="red", expand=False))
        return {
            "success": False,
            "error": str(e),
            "error_code": getattr(e, "error_code", "TOOL_ERROR"),
            "error_type": type(e).__name__,
            "details": details or {},
            "isError": True,
        }
    except Exception as e:
        processing_time = time.monotonic() - start_time
        log_func_critical = getattr(logger, "critical", print)
        log_func_critical(f"Unexpected error calling '{func.__name__}': {e}", exc_info=True)
        if display_result_panel:
            console.print(f"\n[bold red]CRITICAL UNEXPECTED ERROR in {func.__name__}:[/bold red]")
            console.print_exception(show_locals=False)
        return {
            "success": False,
            "error": f"Unexpected: {str(e)}",
            "error_code": "UNEXPECTED_ERROR",
            "error_type": type(e).__name__,
            "details": {"traceback": traceback.format_exc()},
            "isError": True,
        }
    finally:
        if display_title:
            console.print()


# --- Demo Setup & Teardown ---


async def setup_demo_environment():
    """Initialize the memory system using the DEFAULT database file."""
    global DEMO_DB_FILE
    DEMO_DB_FILE = config.agent_memory.db_path
    log_func_info = getattr(logger, "info", print)
    log_func_info(f"Using default database for demo: {DEMO_DB_FILE}")
    console.print(
        Panel(
            f"Using default database: [cyan]{DEMO_DB_FILE}[/]\n"
            f"[yellow]NOTE:[/yellow] This demo will operate on the actual development database.",
            title="Demo Setup",
            border_style="yellow",
        )
    )

    init_result = await safe_tool_call(
        initialize_memory_system,
        {"db_path": DEMO_DB_FILE},
        "Initialize Memory System",
    )
    if not init_result or not init_result.get("success"):
        console.print(
            "[bold red]CRITICAL:[/bold red] Failed to initialize memory system. Aborting demo."
        )
        console.print(
            "[yellow]Check DB access and potentially API key configuration/network if init requires them.[/yellow]"
        )
        await cleanup_demo_environment()
        sys.exit(1)


async def cleanup_demo_environment():
    """Close DB connection."""
    global DEMO_DB_FILE
    log_func_info = getattr(logger, "info", print)
    log_func_warn = getattr(logger, "warning", print)

    try:
        await DBConnection.close_connection()
        log_func_info("Closed database connection.")
    except Exception as e:
        log_func_warn(f"Error closing DB connection during cleanup: {e}")

    if DEMO_DB_FILE:
        log_func_info(f"Demo finished using database: {DEMO_DB_FILE}")
        console.print(f"Demo finished using database: [dim]{DEMO_DB_FILE}[/dim]")
        DEMO_DB_FILE = None


# --- Individual Demo Sections ---


# (Keep existing sections 1-8 as they are)
async def demonstrate_basic_workflows():
    """Demonstrate basic workflow CRUD and listing operations."""
    console.print(Rule("[bold green]1. Basic Workflow Operations[/bold green]", style="green"))

    # Create
    create_args = {
        "title": "Enhanced WF Demo",
        "goal": "Demonstrate core workflow, action, artifact, and memory linking.",
        "tags": ["enhanced", "demo", "core"],
    }
    wf_result = await safe_tool_call(create_workflow, create_args, "Create Enhanced Workflow")
    wf_id = wf_result.get("workflow_id") if wf_result.get("success") else None

    if not wf_id:
        console.print("[bold red]CRITICAL DEMO FAILURE:[/bold red] Failed to create workflow. Cannot continue basic workflow demo.")
        return None # Return None to signal failure

    # Get Details
    await safe_tool_call(
        get_workflow_details, {"workflow_id": wf_id}, f"Get Workflow Details ({_fmt_id(wf_id)})"
    )

    # List (should show the one we created)
    await safe_tool_call(list_workflows, {"limit": 5}, "List Workflows (Limit 5)")

    # List Filtered by Tag
    await safe_tool_call(list_workflows, {"tag": "enhanced"}, "List Workflows Tagged 'enhanced'")

    # Update Status (to active for subsequent steps)
    await safe_tool_call(
        update_workflow_status,
        {"workflow_id": wf_id, "status": "active"},
        f"Ensure Workflow Status is Active ({_fmt_id(wf_id)})",
    )

    return wf_id


async def demonstrate_basic_actions(wf_id: Optional[str]):
    """Demonstrate basic action recording, completion, and retrieval."""
    console.print(Rule("[bold green]2. Basic Action Operations[/bold green]", style="green"))
    if not wf_id:
        console.print("[yellow]Skipping action demo: No valid workflow ID provided.[/yellow]")
        return {}  # Return empty dict

    action_ids = {}

    # Record Action 1 Start (e.g., Planning)
    start_args_1 = {
        "workflow_id": wf_id,
        "action_type": ActionType.PLANNING.value,
        "reasoning": "Initial planning phase for the enhanced demo.",
        "title": "Plan Demo Steps",
        "tags": ["planning"],
    }
    action_res_1 = await safe_tool_call(
        record_action_start, start_args_1, "Record Action 1 Start (Planning)"
    )
    action_id_1 = action_res_1.get("action_id") if action_res_1 and action_res_1.get("success") else None # More robust check
    if not action_id_1:
        console.print("[bold red]CRITICAL DEMO FAILURE:[/bold red] Failed to record start for Action 1. Aborting action demo.")
        return {} # Return empty dict
    action_ids["action1_id"] = action_id_1

    # Record Action 1 Completion (Needs action_id_1, which is now checked)
    complete_args_1 = {
        "action_id": action_id_1,
        "status": ActionStatus.COMPLETED.value,
        "summary": "Planning complete. Next step: data simulation.",
    }
    await safe_tool_call(
        record_action_completion,
        complete_args_1,
        f"Record Action 1 Completion ({_fmt_id(action_id_1)})",
    )

    # Record Action 2 Start (e.g., Tool Use - simulated)
    start_args_2 = {
        "workflow_id": wf_id,
        "action_type": ActionType.TOOL_USE.value,
        "reasoning": "Simulating data generation based on the plan.",
        "tool_name": "simulate_data",
        "tool_args": {"rows": 100, "type": "random"},
        "title": "Simulate Demo Data",
        "tags": ["data", "simulation"],
        "parent_action_id": action_id_1,  # Link to planning action
    }
    action_res_2 = await safe_tool_call(
        record_action_start, start_args_2, "Record Action 2 Start (Simulate Data)"
    )
    action_id_2 = action_res_2.get("action_id") if action_res_2 and action_res_2.get("success") else None # More robust check
    if action_id_2:
        action_ids["action2_id"] = action_id_2
        # Moved inside the 'if action_id_2:' block:
        await safe_tool_call(
            get_action_details,
            {"action_ids": [action_id_1, action_id_2]}, # Both IDs are valid here
            "Get Action Details (Multiple Actions)",
        )
        complete_args_2 = {
            "action_id": action_id_2,  # Now guaranteed to be non-None
            "status": ActionStatus.FAILED.value,
            "summary": "Simulation failed due to resource limit.",
            "tool_result": {"error": "Timeout", "code": 504},
        }
        await safe_tool_call(
            record_action_completion,
            complete_args_2,
            f"Record Action 2 Completion (Failed - {_fmt_id(action_id_2)})",
        )
    else:
        # Keep the existing else block for handling Action 2 start failure
        console.print("[bold red]Action 2 failed to start. Skipping completion and dependency tests involving Action 2.[/bold red]")
        # Ensure action_id_2 is not added to the dict if it's None
        if "action2_id" in action_ids:
            del action_ids["action2_id"]
        # Potentially skip dependency demo if action2_id is needed? (The demo logic does skip if action2_id is missing)

    # Get Action Details (Only Action 1 if Action 2 failed) - Moved outside check block
    if action_id_1 and not action_id_2:  # Only fetch Action 1 if Action 2 failed
        await safe_tool_call(
            get_action_details,
            {"action_id": action_id_1},
            f"Get Action Details (Action 1 Only - {_fmt_id(action_id_1)})",
        )

    # Get Recent Actions (should show both)
    await safe_tool_call(
        get_recent_actions, {"workflow_id": wf_id, "limit": 5}, "Get Recent Actions"
    )

    return action_ids


async def demonstrate_action_dependencies(wf_id: Optional[str], action_ids: Dict):
    """Demonstrate adding and retrieving action dependencies."""
    console.print(Rule("[bold green]3. Action Dependency Operations[/bold green]", style="green"))
    if not wf_id:
        console.print("[yellow]Skipping dependency demo: No valid workflow ID.[/yellow]")
        return
    action1_id = action_ids.get("action1_id")
    action2_id = action_ids.get("action2_id")
    if not action1_id or not action2_id:
        console.print("[yellow]Skipping dependency demo: Need at least two valid action IDs.[/yellow]")
        return

    # Add Dependency (Action 2 requires Action 1)
    await safe_tool_call(
        add_action_dependency,
        {
            "source_action_id": action2_id,
            "target_action_id": action1_id,
            "dependency_type": "requires",
        },
        f"Add Dependency ({_fmt_id(action2_id)} requires {_fmt_id(action1_id)})",
    )

    # Get Dependencies for Action 1 (Should show Action 2 depends on it - Downstream)
    await safe_tool_call(
        get_action_dependencies,
        {"action_id": action1_id, "direction": "downstream"},
        f"Get Dependencies (Downstream of Action 1 - {_fmt_id(action1_id)})",
    )

    # Get Dependencies for Action 2 (Should show it depends on Action 1 - Upstream)
    await safe_tool_call(
        get_action_dependencies,
        {"action_id": action2_id, "direction": "upstream", "include_details": True},
        f"Get Dependencies (Upstream of Action 2 - {_fmt_id(action2_id)}, with Details)",
    )

    # Get Action 1 Details (Include Dependencies)
    await safe_tool_call(
        get_action_details,
        {"action_id": action1_id, "include_dependencies": True},
        f"Get Action 1 Details ({_fmt_id(action1_id)}), Include Dependencies",
    )


async def demonstrate_artifacts(wf_id: Optional[str], action_ids: Dict):
    """Demonstrate artifact recording and retrieval."""
    console.print(Rule("[bold green]4. Artifact Operations[/bold green]", style="green"))
    if not wf_id:
        console.print("[yellow]Skipping artifact demo: No valid workflow ID provided.[/yellow]")
        return {}  # Return empty dict
    action1_id = action_ids.get("action1_id")
    action2_id = action_ids.get("action2_id") # May be None if Action 2 failed

    artifact_ids = {}

    # Record Artifact 1 (e.g., Plan document from Action 1)
    artifact_args_1 = {
        "workflow_id": wf_id,
        "action_id": action1_id,
        "name": "demo_plan.txt",
        "artifact_type": ArtifactType.FILE.value,  # Use enum value
        "description": "Initial plan for the demo steps.",
        "path": "/path/to/demo_plan.txt",
        "content": "Step 1: Plan\nStep 2: Simulate\nStep 3: Analyze",  # Small content example
        "tags": ["planning", "document"],
        "is_output": False,
    }
    art_res_1 = await safe_tool_call(
        record_artifact, artifact_args_1, "Record Artifact 1 (Plan Doc)"
    )
    art_id_1 = art_res_1.get("artifact_id") if art_res_1 and art_res_1.get("success") else None # Robust check
    if not art_id_1:
        console.print("[bold red]DEMO WARNING:[/bold red] Failed to record Artifact 1. Subsequent steps needing art1_id might fail.")
        # Don't abort, but warn
    else:
        artifact_ids["art1_id"] = art_id_1

    # Record Artifact 2 (e.g., Error log from Action 2)
    artifact_args_2 = {
        "workflow_id": wf_id,
        "action_id": action2_id,
        "name": "simulation_error.log",
        "artifact_type": ArtifactType.TEXT.value,
        "description": "Error log from the failed data simulation.",
        "content": "ERROR: Timeout waiting for resource. Code 504.",
        "tags": ["error", "log", "simulation"],
    }
    art_res_2 = await safe_tool_call(
        record_artifact, artifact_args_2, "Record Artifact 2 (Error Log)"
    )
    art_id_2 = art_res_2.get("artifact_id") if art_res_2.get("success") else None

    # --- ADD CHECK before recording Artifact 2 ---
    if not action2_id:
        console.print("[yellow]Skipping Artifact 2 recording: Action 2 ID is not available (likely failed to start).[/yellow]")
    else:
        # Proceed with recording Artifact 2 only if action2_id exists
        artifact_args_2["action_id"] = action2_id # Assign the valid ID
        art_res_2 = await safe_tool_call(
            record_artifact, artifact_args_2, "Record Artifact 2 (Error Log)"
        )
        art_id_2 = art_res_2.get("artifact_id") if art_res_2 and art_res_2.get("success") else None
        if art_id_2:
            artifact_ids["art2_id"] = art_id_2
        else:
             console.print("[bold red]DEMO WARNING:[/bold red] Failed to record Artifact 2.")

    # Get Artifacts (List all for workflow)
    await safe_tool_call(
        get_artifacts, {"workflow_id": wf_id, "limit": 5}, "Get Artifacts (List for Workflow)"
    )

    # Get Artifacts (Filter by tag 'planning')
    await safe_tool_call(
        get_artifacts,
        {"workflow_id": wf_id, "tag": "planning"},
        "Get Artifacts (Filter by Tag 'planning')",
    )

    # Get Artifact by ID (Get the plan doc)
    if art_id_1:
        await safe_tool_call(
            get_artifact_by_id,
            {"artifact_id": art_id_1, "include_content": True},
            f"Get Artifact by ID ({_fmt_id(art_id_1)}, Include Content)",
        )
    else:
        console.print("[yellow]Skipping 'Get Artifact by ID' for Artifact 1 as it failed to record.[/yellow]")

    return artifact_ids


async def demonstrate_thoughts_and_linking(
    wf_id: Optional[str], action_ids: Dict, artifact_ids: Dict
):
    """Demonstrate thought chains, recording thoughts, and linking them."""
    console.print(Rule("[bold green]5. Thought Operations & Linking[/bold green]", style="green"))
    if not wf_id:
        console.print("[yellow]Skipping thought demo: No valid workflow ID provided.[/yellow]")
        return None
    action1_id = action_ids.get("action1_id")  # noqa: F841
    action2_id = action_ids.get("action2_id") # Might be None
    art1_id = artifact_ids.get("art1_id") # Might be None if artifact demo failed

    # Check if prerequisite artifact exists before linking
    if not art1_id:
        console.print("[yellow]Skipping thought demo: Planning artifact ID (art1_id) not available.[/yellow]")
        return None
    
    # Create a new thought chain
    chain_args = {
        "workflow_id": wf_id,
        "title": "Analysis Thought Chain",
        "initial_thought": "Review the plan artifact.",
        "initial_thought_type": ThoughtType.PLAN.value,
    }
    chain_res = await safe_tool_call(
        create_thought_chain, chain_args, "Create New Thought Chain (Analysis)"
    )
    chain_id = chain_res.get("thought_chain_id") if chain_res and chain_res.get("success") else None # Robust check
    
    if not chain_id:
       console.print("[bold red]CRITICAL DEMO FAILURE:[/bold red] Failed to create thought chain. Aborting thought demo.")
       return None

    # Record a thought linked to the plan artifact
    thought_args_1 = {
        "workflow_id": wf_id,
        "thought_chain_id": chain_id,
        "content": "The plan seems straightforward but lacks detail on simulation parameters.",
        "thought_type": ThoughtType.CRITIQUE.value,
        "relevant_artifact_id": art1_id,  # Link to the plan artifact
    }
    thought_res_1 = await safe_tool_call(
        record_thought, thought_args_1, "Record Thought (Critique Linked to Artifact)"
    )
    thought1_id = thought_res_1.get("thought_id") if thought_res_1.get("success") else None

    if not thought1_id:
        console.print("[bold red]DEMO WARNING:[/bold red] Failed to record thought 1. Subsequent linked thought might fail or be unlinked.")
    
    # Record second thought (depends on action2_id, thought1_id)
    if not action2_id:
         console.print("[yellow]Skipping recording thought 2: Action 2 ID is missing.[/yellow]")
    elif not thought1_id:
         console.print("[yellow]Skipping recording thought 2: Thought 1 ID is missing.[/yellow]")
         # Record thought 2 without parent link if action2_id exists but thought1_id doesn't?
         thought_args_2_no_parent = {
             "workflow_id": wf_id,
             "thought_chain_id": chain_id,
             "content": "The simulation failure needs investigation. Was it transient or configuration?",
             "thought_type": ThoughtType.QUESTION.value,
             "relevant_action_id": action2_id, # Action 2 ID exists here
         }
         await safe_tool_call(
             record_thought, thought_args_2_no_parent, "Record Thought (Question Linked to Action, NO PARENT)"
         )
    else:
        # Record another thought linked to the failed action
        thought_args_2 = {
            "workflow_id": wf_id,
            "thought_chain_id": chain_id,
            "content": "The simulation failure needs investigation. Was it transient or configuration?",
            "thought_type": ThoughtType.QUESTION.value,
            "relevant_action_id": action_ids.get("action2_id"),  # Link to failed action
            "parent_thought_id": thought1_id,  # Link to previous thought
        }

    await safe_tool_call(
        record_thought, thought_args_2, "Record Thought (Question Linked to Action)"
    )

    # Get the thought chain details (should show linked thoughts)
    await safe_tool_call(
        get_thought_chain,
        {"thought_chain_id": chain_id},
        f"Get Analysis Thought Chain Details ({_fmt_id(chain_id)})",
    )

    return chain_id


async def demonstrate_memory_operations(wf_id: Optional[str], action_ids: Dict, thought_ids: Dict):
    """Demonstrate memory storage, querying, linking."""
    console.print(Rule("[bold green]6. Memory Operations & Querying[/bold green]", style="green"))
    if not wf_id:
        console.print("[yellow]Skipping memory demo: No valid workflow ID provided.[/yellow]")
        return {}  # Return empty dict

    mem_ids = {}

    action1_id = action_ids.get("action1_id") # Might be None  # noqa: F841
    action2_id = action_ids.get("action2_id") # Might be None  # noqa: F841


    # Store Memory 1 (Related to Planning Action)
    store_args_1 = {
        "workflow_id": wf_id,
        "action_id": action_ids.get("action1_id"),
        "content": "The initial plan involves simulation and analysis.",
        "memory_type": MemoryType.SUMMARY.value,
        "memory_level": MemoryLevel.EPISODIC.value,
        "description": "Summary of initial plan",
        "tags": ["planning", "summary"],
        "generate_embedding": False,  # Set False explicitly for baseline
    }
    mem_res_1 = await safe_tool_call(store_memory, store_args_1, "Store Memory 1 (Plan Summary)")
    mem1_id = mem_res_1.get("memory_id") if mem_res_1.get("success") else None
    if mem1_id:
        mem_ids["mem1_id"] = mem1_id

    # Store Memory 2 (Related to Failed Action)
    store_args_2 = {
        "workflow_id": wf_id,
        "action_id": action_ids.get("action2_id"),
        "content": "Data simulation failed with a timeout error (Code 504).",
        "memory_type": MemoryType.OBSERVATION.value,
        "memory_level": MemoryLevel.EPISODIC.value,
        "description": "Simulation failure detail",
        "importance": 7.0,  # Failed actions might be important
        "tags": ["error", "simulation", "observation"],
        "generate_embedding": False,
    }
    mem_res_2 = await safe_tool_call(
        store_memory, store_args_2, "Store Memory 2 (Simulation Error)"
    )
    mem2_id = mem_res_2.get("memory_id") if mem_res_2.get("success") else None
    if mem2_id:
        mem_ids["mem2_id"] = mem2_id

    # Store Memory 3 (A more general fact)
    store_args_3 = {
        "workflow_id": wf_id,
        "content": "Timeout errors often indicate resource contention or configuration issues.",
        "memory_type": MemoryType.FACT.value,
        "memory_level": MemoryLevel.SEMANTIC.value,
        "description": "General knowledge about timeouts",
        "importance": 6.0,
        "confidence": 0.9,
        "tags": ["error", "knowledge", "fact"],
        "generate_embedding": False,
    }
    mem_res_3 = await safe_tool_call(store_memory, store_args_3, "Store Memory 3 (Timeout Fact)")
    mem3_id = mem_res_3.get("memory_id") if mem_res_3.get("success") else None
    if mem3_id:
        mem_ids["mem3_id"] = mem3_id

    # Link Memory 2 (Error) -> Memory 3 (Fact)
    if mem2_id and mem3_id:
        await safe_tool_call(
            create_memory_link,
            {
                "source_memory_id": mem2_id,
                "target_memory_id": mem3_id,
                "link_type": LinkType.REFERENCES.value,
                "description": "Error relates to general timeout knowledge",
            },
            f"Link Error ({_fmt_id(mem2_id)}) to Fact ({_fmt_id(mem3_id)})",
        )

        # Get Linked Memories for the Error Memory
        await safe_tool_call(
            get_linked_memories,
            {"memory_id": mem2_id, "direction": "outgoing", "include_memory_details": True},
            f"Get Outgoing Linked Memories for Error ({_fmt_id(mem2_id)})",
        )

    # Query Memories using FTS
    await safe_tool_call(
        query_memories,
        {"workflow_id": wf_id, "search_text": "simulation error timeout"},
        "Query Memories (FTS: 'simulation error timeout')",
    )

    # Query Memories by Importance Range
    await safe_tool_call(
        query_memories,
        {"workflow_id": wf_id, "min_importance": 6.5, "sort_by": "importance"},
        "Query Memories (Importance >= 6.5)",
    )

    # Query Memories by Memory Type
    await safe_tool_call(
        query_memories,
        {"workflow_id": wf_id, "memory_type": MemoryType.FACT.value},
        "Query Memories (Type: Fact)",
    )

    # Update Memory 1's tags
    if mem1_id:
        await safe_tool_call(
            update_memory,
            {"memory_id": mem1_id, "tags": ["planning", "summary", "initial_phase"]},
            f"Update Memory 1 Tags ({_fmt_id(mem1_id)})",
        )
        # Verify update
        await safe_tool_call(
            get_memory_by_id,
            {"memory_id": mem1_id},
            f"Get Memory 1 After Tag Update ({_fmt_id(mem1_id)})",
        )

    # Example: Record a thought linked to a memory
    if mem3_id and thought_ids:  # Assuming demonstrate_thoughts ran successfully
        thought_chain_id_str = thought_ids.get("main_chain_id")
        if not thought_chain_id_str:
            console.print(
                "[yellow]Skipping thought link to memory: main_chain_id not found in thought_ids dict.[/yellow]"
            )
        else:
            thought_args_link = {
                "workflow_id": wf_id,
                "thought_chain_id": thought_chain_id_str,  # Pass the string ID
                "content": "Based on the general knowledge about timeouts, need to check server logs.",
                "thought_type": ThoughtType.PLAN.value,
                "relevant_memory_id": mem3_id,  # Link to the Fact memory
            }
            await safe_tool_call(
                record_thought,
                thought_args_link,
                f"Record Thought Linked to Memory ({_fmt_id(mem3_id)})",
            )
    elif not thought_ids:
        console.print(
            "[yellow]Skipping thought link to memory: thought_ids dict is empty or None.[/yellow]"
        )

    return mem_ids


async def demonstrate_embedding_and_search(wf_id: Optional[str], mem_ids: Dict, thought_ids: Dict):
    """Demonstrate embedding generation and semantic/hybrid search."""
    console.print(Rule("[bold green]7. Embedding & Semantic Search[/bold green]", style="green"))
    if not wf_id:
        console.print("[yellow]Skipping embedding demo: No valid workflow ID.[/yellow]")
        return  # Return immediately if no workflow ID
    mem1_id = mem_ids.get("mem1_id")  # Plan summary
    mem2_id = mem_ids.get("mem2_id")  # Simulation error
    mem3_id = mem_ids.get("mem3_id")  # Timeout fact

    if not mem1_id or not mem2_id or not mem3_id:
        console.print(
            "[yellow]Skipping embedding demo: Missing required memory IDs from previous steps.[/yellow]"
        )
        return  # Return immediately if prerequisite memories are missing

    # 1. Update Memory 2 (Error) to generate embedding
    # This relies on the embedding service being functional (API key configured)
    console.print(
        "[yellow]Attempting to generate embeddings. Requires configured Embedding Service (e.g., OpenAI API key).[/yellow]"
    )
    update_res = await safe_tool_call(
        update_memory,
        {
            "memory_id": mem2_id,
            "regenerate_embedding": True,
        },
        f"Update Memory 2 ({_fmt_id(mem2_id)}) to Generate Embedding",
    )
    if not (update_res and update_res.get("success") and update_res.get("embedding_regenerated")):
        console.print(
            "[red]   -> Failed to generate embedding for Memory 2. Semantic/Hybrid search may not work as expected.[/red]"
        )

    # 2. Store a new memory WITH embedding generation enabled
    store_args_4 = {
        "workflow_id": wf_id,
        "content": "Investigating the root cause of the simulation timeout is the next priority.",
        "memory_type": MemoryType.PLAN.value,
        "memory_level": MemoryLevel.EPISODIC.value,
        "description": "Next step planning",
        "importance": 7.5,
        "tags": ["investigation", "planning", "error_handling"],
        "generate_embedding": True,  # Explicitly enable
    }
    mem_res_4 = await safe_tool_call(
        store_memory, store_args_4, "Store Memory 4 (Next Step Plan) with Embedding"
    )
    mem4_id = mem_res_4.get("memory_id") if mem_res_4.get("success") else None
    if mem4_id:
        mem_ids["mem4_id"] = mem4_id  # Add to our tracked IDs

    # Check if embedding was actually generated for Mem4
    if mem4_id:
        mem4_details = await safe_tool_call(
            get_memory_by_id,
            {"memory_id": mem4_id},
            f"Check Memory 4 Details ({_fmt_id(mem4_id)})",
            suppress_output=True,
        )
        if mem4_details and mem4_details.get("success") and mem4_details.get("embedding_id"):
            console.print(
                f"[green]   -> Embedding ID confirmed for Memory 4: {_fmt_id(mem4_details['embedding_id'])}[/green]"
            )
        else:
            console.print(
                "[yellow]   -> Warning: Embedding ID missing for Memory 4. Embedding generation likely failed.[/yellow]"
            )
            console.print("[dim]      (Semantic/Hybrid search results may be limited.)[/dim]")

    # 3. Semantic Search
    await safe_tool_call(
        search_semantic_memories,
        {
            "workflow_id": wf_id,
            "query": "problems with simulation performance",
            "limit": 3,
            "threshold": 0.5,
        },
        "Semantic Search: 'problems with simulation performance'",
    )
    await safe_tool_call(
        search_semantic_memories,
        {
            "workflow_id": wf_id,
            "query": "next actions to take",
            "limit": 2,
            "memory_level": MemoryLevel.EPISODIC.value,
        },
        "Semantic Search: 'next actions to take' (Episodic only)",
    )

    # 4. Hybrid Search
    await safe_tool_call(
        hybrid_search_memories,
        {
            "workflow_id": wf_id,
            "query": "investigate timeout simulation",
            "limit": 4,
            "semantic_weight": 0.6,
            "keyword_weight": 0.4,
            "tags": ["error"],
            "include_content": False,
        },
        "Hybrid Search: 'investigate timeout simulation' + tag 'error'",
    )

    # 5. Demonstrate link suggestions
    # Update Mem3 (Timeout fact) to generate embedding
    update_res_3 = await safe_tool_call(
        update_memory,
        {"memory_id": mem3_id, "regenerate_embedding": True},
        f"Update Memory 3 ({_fmt_id(mem3_id)}) to Generate Embedding",
    )
    if not (
        update_res_3 and update_res_3.get("success") and update_res_3.get("embedding_regenerated")
    ):
        console.print(
            "[red]   -> Failed to generate embedding for Memory 3. Link suggestion test might fail.[/red]"
        )

    # --- Store Memory 5 (Hypothesis) ---
    hypothesis_content = "Resource limits on the simulation server might be too low."
    thought_chain_id = thought_ids.get("main_chain_id") if isinstance(thought_ids, dict) else None
    hypothesis_thought_id = None
    if thought_chain_id:
        thought_args_hyp = {
            "workflow_id": wf_id,
            "thought_chain_id": thought_chain_id,
            "content": hypothesis_content,
            "thought_type": ThoughtType.HYPOTHESIS.value,
            "relevant_memory_id": mem3_id,
        }
        hyp_thought_res = await safe_tool_call(
            record_thought, thought_args_hyp, "Record Hypothesis Thought"
        )
        hypothesis_thought_id = (
            hyp_thought_res.get("thought_id") if hyp_thought_res.get("success") else None
        )
    else:
        console.print(
            "[yellow]Skipping hypothesis memory storage: Could not get thought chain ID.[/yellow]"
        )

    mem5_id = None
    mem_res_5 = None
    if hypothesis_thought_id:
        store_args_5 = {
            "workflow_id": wf_id,
            "thought_id": hypothesis_thought_id,
            "content": hypothesis_content,
            "memory_type": MemoryType.REASONING_STEP.value,
            "memory_level": MemoryLevel.SEMANTIC.value,
            "description": "Hypothesis on timeout cause",
            "importance": 6.5,
            "confidence": 0.6,
            "tags": ["hypothesis", "resource", "error", "reasoning_step"],
            "generate_embedding": True,
            "suggest_links": True,  # Explicitly ask for suggestions
            "max_suggested_links": 2,
        }
        mem_res_5 = await safe_tool_call(
            store_memory, store_args_5, "Store Memory 5 (Hypothesis Reasoning) - Suggest Links"
        )
        mem5_id = mem_res_5.get("memory_id") if mem_res_5.get("success") else None
        if mem5_id:
            mem_ids["mem5_id"] = mem5_id

        # Check suggestions result
        if mem_res_5 and mem_res_5.get("success") and mem_res_5.get("suggested_links"):
            console.print("[cyan]   -> Link suggestions received for Memory 5:[/]")
            console.print(pretty_repr(mem_res_5["suggested_links"]))
        elif mem_res_5 and mem_res_5.get("success"):
            console.print(
                "[dim]   -> No link suggestions returned for Memory 5 (or embedding failed).[/dim]"
            )
        elif mem_res_5 and not mem_res_5.get("success"):
            console.print(
                "[yellow]   -> Failed to store Memory 5, cannot check suggestions.[/yellow]"
            )
    else:
        console.print(
            "[yellow]Skipping Memory 5 storage: Hypothesis thought recording failed.[/yellow]"
        )


async def demonstrate_state_and_working_memory(
    wf_id: str,
    mem_ids_dict: Dict[str, str],
    action_ids_dict: Dict[str, str],
    thought_ids_dict: Dict[str, Any],
    state_ids_dict: Dict[str, str],
):
    """Demonstrate saving/loading state and working memory operations."""
    console.print(
        Rule("[bold green]8. Cognitive State & Working Memory[/bold green]", style="green")
    )

    # --- Retrieve necessary IDs from previous steps ---
    main_wf_id = wf_id
    main_chain_id = thought_ids_dict.get("main_chain_id")  # noqa: F841
    plan_action_id = action_ids_dict.get("action1_id")
    sim_action_id = action_ids_dict.get("action2_id")
    mem1_id = mem_ids_dict.get("mem1_id")
    mem2_id = mem_ids_dict.get("mem2_id")
    mem3_id = mem_ids_dict.get("mem3_id")
    mem4_id = mem_ids_dict.get("mem4_id")
    mem5_id = mem_ids_dict.get("mem5_id")

    hypothesis_thought_id = None
    if mem5_id and main_wf_id:
        mem5_details = await safe_tool_call(
            get_memory_by_id,
            {"memory_id": mem5_id},
            f"Get Memory 5 Details ({_fmt_id(mem5_id)}) for Thought ID",
            suppress_output=True,
        )
        if mem5_details and mem5_details.get("success"):
            hypothesis_thought_id = mem5_details.get("thought_id")
            if hypothesis_thought_id:
                console.print(
                    f"[cyan]   -> Retrieved Hypothesis Thought ID: {_fmt_id(hypothesis_thought_id)}[/cyan]"
                )
            else:
                console.print(
                    "[yellow]   -> Could not retrieve hypothesis thought ID from Memory 5 details.[/yellow]"
                )
        else:
            # Handle case where get_memory_by_id failed or didn't return success
             console.print(
                 f"[yellow]   -> Failed to get details for Memory 5 ({_fmt_id(mem5_id)}) to find Thought ID.[/yellow]"
             )

    # --- Check if we have enough *critical* data to proceed ---
    # Hypothesis thought ID is critical for saving the intended state goals
    if not (
        main_wf_id
        and mem1_id
        and mem2_id
        and mem3_id
        and mem4_id
        and plan_action_id
        and hypothesis_thought_id # Ensure this critical ID exists
    ):
        console.print(
            "[bold yellow]Skipping state/working memory demo:[/bold yellow] Missing critical IDs (workflow, mem1-4, plan_action, hypothesis_thought) from previous steps."
        )
        return # Exit if critical IDs are missing

    # Prepare IDs for saving state - check individually for non-critical ones
    working_mems = [mem_id for mem_id in [mem2_id, mem3_id, mem4_id, mem5_id] if mem_id] # Filter None
    focus_mems = [mem4_id] if mem4_id else [] # Filter None
    context_actions = [action_id for action_id in [plan_action_id, sim_action_id] if action_id] # Filter None
    goal_thoughts = [hypothesis_thought_id] # Already checked above

    # 1. Save Cognitive State
    save_args = {
        "workflow_id": wf_id,
        "title": "State after simulation failure and hypothesis",
        "working_memory_ids": working_mems,
        "focus_area_ids": focus_mems,
        "context_action_ids": context_actions,
        "current_goal_thought_ids": goal_thoughts,
    }
    state_res = await safe_tool_call(save_cognitive_state, save_args, "Save Cognitive State")
    state_id = state_res.get("state_id") if state_res and state_res.get("success") else None # More robust check

    if state_id:
        state_ids_dict["saved_state_id"] = state_id
        console.print(f"[green]   -> Cognitive state saved successfully with ID: {_fmt_id(state_id)}[/green]")
    else:
        console.print("[bold red]CRITICAL DEMO FAILURE:[/bold red] Failed to save cognitive state. Cannot proceed with working memory tests.")
        return # Exit if state saving failed

    # 2. Load Cognitive State (by ID) - Use the confirmed state_id
    await safe_tool_call(
        load_cognitive_state,
        {"workflow_id": wf_id, "state_id": state_id}, # Use state_id directly
        f"Load Cognitive State ({_fmt_id(state_id)}) by ID",
    )

    # 3. Load Cognitive State (Latest)
    await safe_tool_call(
        load_cognitive_state,
        {"workflow_id": wf_id},
        "Load Latest Cognitive State",
    )

    # --- Working Memory Operations using the saved state_id as the context_id ---
    # The variable 'state_id' now holds the context ID we need for the rest of this section.
    console.print(f"\n[dim]Using saved state ID '{_fmt_id(state_id)}' as context_id for working memory tests...[/dim]\n")

    # 4. Focus Memory (Focus on the 'hypothesis' memory if it exists)
    focus_target_id = mem_ids_dict.get("mem5_id") # Get mem5_id again here
    if focus_target_id:
        await safe_tool_call(
            focus_memory,
            {
                "memory_id": focus_target_id,
                "context_id": state_id, # USE state_id
                "add_to_working": False, # Assume it's already there from save_state
            },
            f"Focus Memory ({_fmt_id(focus_target_id)}) in Context ({_fmt_id(state_id)})", # USE state_id
        )
    else:
        console.print(
            "[bold yellow]Skipping focus memory test: Hypothesis memory ID (mem5_id) not available.[/bold yellow]"
        )

    # 5. Get Working Memory (Should reflect the saved state initially)
    await safe_tool_call(
        get_working_memory,
        {
            "context_id": state_id, # USE state_id
            "include_links": False, # Keep output cleaner for this demo step
        },
        f"Get Working Memory for Context ({_fmt_id(state_id)})", # USE state_id
    )

    # 6. Optimize Working Memory (Reduce size, using 'balanced' strategy)
    wm_details = await safe_tool_call(
        get_working_memory,
        {"context_id": state_id}, # USE state_id
        "Get WM Size Before Optimization",
        suppress_output=True,
    )
    current_wm_size = (
        len(wm_details.get("working_memories", []))
        if wm_details and wm_details.get("success")
        else 0
    )

    if current_wm_size > 2: # Only optimize if we have more than 2 memories
        target_optimize_size = max(1, current_wm_size // 2)
        console.print(
            f"[cyan]   -> Optimizing working memory from {current_wm_size} down to {target_optimize_size}...[/cyan]"
        )
        await safe_tool_call(
            optimize_working_memory,
            {
                "context_id": state_id, # USE state_id
                "target_size": target_optimize_size,
                "strategy": "balanced",
            },
            f"Optimize Working Memory (Context: {_fmt_id(state_id)}, Target: {target_optimize_size})", # USE state_id
        )
        await safe_tool_call(
            get_working_memory,
            {"context_id": state_id, "include_links": False}, # USE state_id
            f"Get Working Memory After Optimization (Context: {_fmt_id(state_id)})", # USE state_id
        )
    else:
        console.print(
            f"[dim]Skipping working memory optimization: Current size ({current_wm_size}) is too small.[/dim]"
        )


async def demonstrate_metacognition(wf_id: str, mem_ids: Dict, state_ids: Dict):
    """Demonstrate context retrieval, auto-focus, promotion, consolidation, reflection, summarization."""
    console.print(Rule("[bold green]9. Meta-Cognition & Summarization[/bold green]", style="green"))

    # 1. Get Workflow Context
    await safe_tool_call(get_workflow_context, {"workflow_id": wf_id}, "Get Full Workflow Context")

    # 2. Auto Update Focus
    context_id = state_ids.get("saved_state_id")
    if context_id:
        await safe_tool_call(
            auto_update_focus,
            {"context_id": context_id},
            f"Auto Update Focus for Context ({_fmt_id(context_id)})",
        )
    else:
        console.print("[yellow]Skipping auto-focus: No context_id (state_id) available.[/yellow]")

    # 3. Promote Memory Level
    mem1_id = mem_ids.get("mem1_id")  # Episodic summary
    mem3_id = mem_ids.get("mem3_id")  # Semantic fact
    if mem1_id:
        console.print(
            f"[cyan]   -> Manually increasing access_count for Memory 1 ({_fmt_id(mem1_id)}) to test promotion...[/cyan]"
        )
        try:
            async with DBConnection(DEMO_DB_FILE) as conn:
                await conn.execute(
                    "UPDATE memories SET access_count = 10, confidence = 0.9 WHERE memory_id = ?",
                    (mem1_id,),
                )
                await conn.commit()
            await safe_tool_call(
                promote_memory_level,
                {"memory_id": mem1_id},
                f"Attempt Promote Memory 1 ({_fmt_id(mem1_id)}) from Episodic",
            )
        except Exception as e:
            console.print(f"[red]   -> Error updating access count for promotion test: {e}[/red]")

    if mem3_id:
        await safe_tool_call(
            promote_memory_level,
            {"memory_id": mem3_id},
            f"Attempt Promote Memory 3 ({_fmt_id(mem3_id)}) from Semantic (Should Fail)",
        )

    # 4. Consolidate Memories (requires LLM)
    mem_ids_for_consolidation = [
        mid
        for mid in [mem_ids.get("mem1_id"), mem_ids.get("mem2_id"), mem_ids.get("mem3_id")]
        if mid
    ]
    if len(mem_ids_for_consolidation) >= 2:
        console.print(
            "[yellow]Attempting memory consolidation. Requires configured LLM provider (e.g., OpenAI API key).[/yellow]"
        )
        await safe_tool_call(
            consolidate_memories,
            {
                "workflow_id": wf_id,
                "target_memories": mem_ids_for_consolidation,
                "consolidation_type": "summary",
                "store_result": True,
                "provider": config.default_provider or "openai",
            },
            "Consolidate Memories (Summary)",
        )
    else:
        console.print(
            "[yellow]Skipping consolidation: Not enough source memories available.[/yellow]"
        )

    # 5. Generate Reflection (requires LLM)
    console.print(
        "[yellow]Attempting reflection generation. Requires configured LLM provider.[/yellow]"
    )
    await safe_tool_call(
        generate_reflection,
        {
            "workflow_id": wf_id,
            "reflection_type": "gaps",
            "provider": config.default_provider
            or "openai",  # Use configured default from GatewayConfig
        },
        "Generate Reflection (Gaps)",
    )

    # 6. Summarize Text (requires LLM)
    console.print(
        "[yellow]Attempting text summarization. Requires configured LLM provider.[/yellow]"
    )
    sample_text = """
    The Unified Memory System integrates several components for advanced agent cognition.
    It tracks workflows, actions, artifacts, and thoughts. A multi-level memory hierarchy
    (working, episodic, semantic, procedural) allows for different types of knowledge storage.
    Vector embeddings enable semantic search capabilities. Associative links connect related
    memory items. Cognitive states can be saved and loaded, preserving the agent's context.
    Maintenance tools help manage memory expiration and provide statistics. Reporting and
    visualization tools offer insights into the agent's processes. This system aims to provide
    a robust foundation for complex autonomous agents.
    """
    await safe_tool_call(
        summarize_text,
        {
            "text_to_summarize": sample_text,
            "target_tokens": 50,
            "record_summary": True,
            "workflow_id": wf_id,
            "provider": config.default_provider or "openai",
        },
        "Summarize Sample Text and Record Memory",
    )


async def demonstrate_maintenance_and_stats(wf_id: str):
    """Demonstrate memory deletion and statistics computation."""
    console.print(Rule("[bold green]10. Maintenance & Statistics[/bold green]", style="green"))

    # 1. Delete Expired Memories
    # Store a temporary memory with a short TTL
    console.print("[cyan]   -> Storing a temporary memory with TTL=1 second...[/cyan]")
    ttl_mem_args = {
        "workflow_id": wf_id,
        "content": "This memory should expire quickly.",
        "memory_type": "observation",
        "ttl": 1,  # 1 second TTL
    }
    ttl_mem_res = await safe_tool_call(
        store_memory,  # Pass the function object
        ttl_mem_args,  # Pass the arguments dictionary
        "Store Temporary Memory",
        suppress_output=True,
    )

    if ttl_mem_res and ttl_mem_res.get("success"):
        console.print("[cyan]   -> Waiting 2 seconds for memory to expire...[/cyan]")
        await asyncio.sleep(2)
        await safe_tool_call(
            delete_expired_memories, {}, "Delete Expired Memories (Should delete 1)"
        )
    else:
        console.print(
            "[yellow]   -> Failed to store temporary memory for expiration test.[/yellow]"
        )
        if ttl_mem_res:
            console.print(f"[yellow]      Error: {ttl_mem_res.get('error')}[/yellow]")

    # 2. Compute Statistics (Workflow Specific)
    await safe_tool_call(
        compute_memory_statistics,
        {"workflow_id": wf_id},
        f"Compute Statistics for Workflow ({_fmt_id(wf_id)})",
    )

    # 3. Compute Statistics (Global)
    await safe_tool_call(compute_memory_statistics, {}, "Compute Global Statistics")


async def demonstrate_reporting_and_viz(wf_id: str, thought_chain_id: str, mem_ids: Dict):
    """Demonstrate report generation and visualization."""
    console.print(Rule("[bold green]11. Reporting & Visualization[/bold green]", style="green"))

    # 1. Generate Workflow Reports
    await safe_tool_call(
        generate_workflow_report,
        {"workflow_id": wf_id, "report_format": "markdown", "style": "professional"},
        "Generate Workflow Report (Markdown, Professional)",
    )
    await safe_tool_call(
        generate_workflow_report,
        {"workflow_id": wf_id, "report_format": "html", "style": "concise"},
        "Generate Workflow Report (HTML, Concise)",
    )
    await safe_tool_call(
        generate_workflow_report,
        {"workflow_id": wf_id, "report_format": "json"},
        "Generate Workflow Report (JSON)",
    )
    await safe_tool_call(
        generate_workflow_report,
        {"workflow_id": wf_id, "report_format": "mermaid"},
        "Generate Workflow Report (Mermaid Diagram)",
    )

    # 2. Visualize Reasoning Chain
    if thought_chain_id:
        await safe_tool_call(
            visualize_reasoning_chain,
            {"thought_chain_id": thought_chain_id},
            f"Visualize Reasoning Chain ({_fmt_id(thought_chain_id)})",
        )
    else:
        console.print(
            "[yellow]Skipping reasoning visualization: No thought_chain_id available.[/yellow]"
        )

    # 3. Visualize Memory Network
    # Visualize around the 'error' memory
    center_mem_id = mem_ids.get("mem2_id")
    if center_mem_id:
        await safe_tool_call(
            visualize_memory_network,
            {"center_memory_id": center_mem_id, "depth": 1, "max_nodes": 15},
            f"Visualize Memory Network (Centered on Error Mem {_fmt_id(center_mem_id)}, Depth 1)",
        )
    else:
        console.print(
            "[yellow]Skipping centered memory visualization: Error memory ID not available.[/yellow]"
        )

    # Visualize top memories for the workflow
    await safe_tool_call(
        visualize_memory_network,
        {"workflow_id": wf_id, "max_nodes": 20},
        f"Visualize Memory Network (Workflow {_fmt_id(wf_id)}, Top 20 Relevant)",
    )


# --- Main Execution Logic ---
async def main():
    """Run the extended Unified Memory System demonstration suite."""
    console.print(
        Rule(
            "[bold magenta]Unified Memory System Tools Demo (Extended)[/bold magenta]",
            style="white",
        )
    )
    exit_code = 0
    # Dictionaries to store IDs created during the demo
    wf_ids = {}
    action_ids = {}
    artifact_ids = {}
    thought_ids = {}  # Store chain ID
    mem_ids = {}
    state_ids = {}  # Store state ID

    try:
        await setup_demo_environment()

        # --- Run Demo Sections in Order ---
        wf_id = await demonstrate_basic_workflows()
        if wf_id:
            wf_ids["main_wf_id"] = wf_id
        else:
            raise RuntimeError("Workflow creation failed, cannot continue demo.")

        action_ids = await demonstrate_basic_actions(wf_ids.get("main_wf_id"))
        await demonstrate_action_dependencies(wf_ids.get("main_wf_id"), action_ids)
        artifact_ids = await demonstrate_artifacts(wf_ids.get("main_wf_id"), action_ids)

        chain_id = await demonstrate_thoughts_and_linking(
            wf_ids.get("main_wf_id"), action_ids, artifact_ids
        )
        if chain_id:
            thought_ids["main_chain_id"] = chain_id

        mem_ids = await demonstrate_memory_operations(
            wf_ids.get("main_wf_id"), action_ids, thought_ids
        )  # Pass thought_ids dict
        await demonstrate_embedding_and_search(wf_ids.get("main_wf_id"), mem_ids, thought_ids)

        # State/Working Memory depends on previous steps creating IDs
        # Pass all collected ID dictionaries
        await demonstrate_state_and_working_memory(
            wf_id=wf_ids["main_wf_id"],
            mem_ids_dict=mem_ids,
            action_ids_dict=action_ids,
            thought_ids_dict=thought_ids,  # Contains chain_id and potentially specific thought IDs if needed later
            state_ids_dict=state_ids,  # Pass dict to store the created state_id
        )

        # --- Run NEW Advanced Demo Sections ---
        await demonstrate_metacognition(wf_ids["main_wf_id"], mem_ids, state_ids)
        await demonstrate_maintenance_and_stats(wf_ids["main_wf_id"])
        await demonstrate_reporting_and_viz(
            wf_ids["main_wf_id"], thought_ids.get("main_chain_id"), mem_ids
        )
        # --- End NEW Sections ---

        logger.success(
            "Unified Memory System Demo completed successfully!",
            emoji_key="complete",
        )
        console.print(Rule("[bold green]Demo Finished[/bold green]", style="green"))

    except Exception as e:
        logger.critical(f"Demo crashed unexpectedly: {str(e)}", emoji_key="critical", exc_info=True)
        console.print(f"\n[bold red]CRITICAL ERROR:[/bold red] {escape(str(e))}")
        console.print_exception(show_locals=False)
        exit_code = 1

    finally:
        # Clean up the demo environment
        console.print(Rule("Cleanup", style="dim"))
        await cleanup_demo_environment()

    return exit_code


if __name__ == "__main__":
    # Run the demo
    final_exit_code = asyncio.run(main())
    sys.exit(final_exit_code)

```

--------------------------------------------------------------------------------
/mcp_tool_context_estimator.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
MCP Tool Context Estimator

This script connects to an already running MCP server and estimates how much 
of an LLM's context window would be consumed by the registered tools when 
they're sent to the model via the Model Context Protocol.
"""

import argparse
import asyncio
import json
import os
import sys
import traceback
from typing import Any, Dict, List, Optional

import aiohttp
import tiktoken
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from rich.console import Console
from rich.table import Table

# Add the current directory to the Python path to ensure we can import modules
sys.path.append("/data/projects/ultimate_mcp_server")

# Import the existing decouple configuration from the project
from ultimate_mcp_server.config import decouple_config

# Import actual model pricing from constants
from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS

# Removed dependency on STANDALONE_TOOL_FUNCTIONS to avoid circular imports
# from ultimate_mcp_server.tools import STANDALONE_TOOL_FUNCTIONS

# Define a function to read tool names from a file generated by the server
def read_tool_names_from_file(filename='tools_list.json', quiet=False):
    """Read tool names from a JSON file generated by the server"""
    console = Console()
    try:
        if os.path.exists(filename):
            with open(filename, 'r') as f:
                tool_data = json.load(f)
                if not quiet:
                    console.print(f"[green]Successfully loaded {len(tool_data)} tools from {filename}[/green]")
                return tool_data
        else:
            if not quiet:
                console.print(f"[yellow]Tool list file {filename} not found. Will use server-provided tools only.[/yellow]")
            return []
    except Exception as e:
        if not quiet:
            console.print(f"[red]Error reading tool list: {str(e)}[/red]")
        return []

# Run another server with --load-all-tools for comparison
RUN_LOAD_ALL_TOOLS_COMPARISON = True
SHOW_DESCRIPTIONS = True

async def detect_server_transport(host: str, port: str, quiet: bool = False) -> tuple[str, str]:
    """
    Detect what transport mode the server is running and return the appropriate URL and transport type.
    
    Args:
        host: Server hostname
        port: Server port
        quiet: If True, suppress detection messages
        
    Returns:
        Tuple of (url, transport_type) where transport_type is 'sse', 'streamable-http', or 'stdio'
    """
    console = Console()
    
    if not quiet:
        console.print(f"[blue]Detecting transport mode for server at {host}:{port}...[/blue]")
    
    # Test MCP protocol endpoints with proper requests
    endpoints_to_try = [
        (f"http://{host}:{port}/mcp/", "streamable-http"),
        (f"http://{host}:{port}/sse", "sse"),
        (f"http://{host}:{port}", "sse"),  # fallback for sse
    ]
    
    # Create a simple MCP initialization message for testing
    test_message = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "initialize",
        "params": {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {"name": "mcp-detector", "version": "1.0.0"}
        }
    }
    
    for url, transport in endpoints_to_try:
        try:
            timeout = aiohttp.ClientTimeout(total=5)
            async with aiohttp.ClientSession(timeout=timeout) as session:
                if transport == "streamable-http":
                    # Test streamable-http with POST + MCP message
                    headers = {
                        "Content-Type": "application/json",
                        "Accept": "application/json, text/event-stream"
                    }
                    async with session.post(url, json=test_message, headers=headers) as response:
                        if response.status == 200:
                            # Check if response looks like MCP
                            try:
                                data = await response.text()
                                if '"jsonrpc":"2.0"' in data or '"result"' in data:
                                    if not quiet:
                                        console.print(f"[green]Detected {transport} transport at {url}[/green]")
                                    return url, transport
                            except Exception:
                                pass
                        elif response.status in [400, 404, 405, 406]:
                            # Server exists but doesn't support this transport
                            if not quiet:
                                console.print(f"[dim]Endpoint {url} returned {response.status}[/dim]")
                            continue
                else:
                    # Test SSE endpoints - they might respond to GET or POST
                    # Try GET first for SSE
                    try:
                        async with session.get(url) as response:
                            if response.status == 200:
                                content_type = response.headers.get('content-type', '').lower()
                                if 'text/event-stream' in content_type:
                                    if not quiet:
                                        console.print(f"[green]Detected {transport} transport at {url}[/green]")
                                    return url, transport
                    except Exception:
                        pass
                    
                    # If GET failed, try POST for SSE (some servers might expect it)
                    try:
                        async with session.post(url, json=test_message) as response:
                            if response.status == 200:
                                content_type = response.headers.get('content-type', '').lower()
                                if 'text/event-stream' in content_type or 'application/json' in content_type:
                                    if not quiet:
                                        console.print(f"[green]Detected {transport} transport at {url}[/green]")
                                    return url, transport
                    except Exception:
                        pass
                        
        except Exception as e:
            if not quiet:
                console.print(f"[dim]Could not connect to {url}: {str(e)}[/dim]")
            continue
    
    # If HTTP detection fails, try to guess based on what we know
    # Check if port 8013 responds at all
    try:
        timeout = aiohttp.ClientTimeout(total=2)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(f"http://{host}:{port}/") as response:
                if response.status == 200:
                    # Server is running, probably streamable-http since that's the new default
                    default_url = f"http://{host}:{port}/mcp/"
                    if not quiet:
                        console.print(f"[yellow]Server detected but transport unclear, defaulting to streamable-http at {default_url}[/yellow]")
                    return default_url, "streamable-http"
    except Exception:
        pass
    
    # Final fallback to SSE for backwards compatibility
    fallback_url = f"http://{host}:{port}/sse"
    if not quiet:
        console.print(f"[yellow]Could not detect transport mode, defaulting to SSE at {fallback_url}[/yellow]")
    return fallback_url, "sse"

def get_server_url_and_transport() -> tuple[str, str]:
    """
    Get the MCP server URL and transport type from .env file or environment variables
    
    Returns:
        Tuple of (server_url, transport_type)
    """
    # Try to get from python-decouple (.env file)
    try:
        host = decouple_config('MCP_SERVER_HOST', default='localhost')
        port = decouple_config('MCP_SERVER_PORT', default='8013')
        
        # Try to detect transport type - this will be resolved in the async context
        return host, port
    except Exception:
        # Fallback to environment variables if decouple fails
        if "MCP_SERVER_HOST" in os.environ and "MCP_SERVER_PORT" in os.environ:
            host = os.environ["MCP_SERVER_HOST"]
            port = os.environ["MCP_SERVER_PORT"]
            return host, port
        
        # Default fallback
        return "localhost", "8013"

# Calculate token counts for different models
def count_tokens(text: str) -> int:
    """Count tokens using tiktoken with cl100k_base encoding (used by most modern models)"""
    encoding = tiktoken.get_encoding("cl100k_base")
    return len(encoding.encode(text))

# Use real pricing imported from constants.py
# Convert from dollars per million tokens to dollars per 1000 tokens for our calculations
MODEL_PRICES = {
    model: price_info["input"] / 1000  # Convert from per million to per thousand
    for model, price_info in COST_PER_MILLION_TOKENS.items()
}

def format_capabilities(capabilities):
    """Safely format capabilities object to string for display"""
    result = {}
    # Check for specific capabilities we know about
    if hasattr(capabilities, "tools"):
        result["tools"] = "Available" if capabilities.tools else "Not available"
    if hasattr(capabilities, "prompts"):
        result["prompts"] = "Available" if capabilities.prompts else "Not available"
    if hasattr(capabilities, "resources"):
        result["resources"] = "Available" if capabilities.resources else "Not available"
    if hasattr(capabilities, "logging"):
        result["logging"] = "Available" if capabilities.logging else "Not available"
    if hasattr(capabilities, "completions"):
        result["completions"] = "Available" if capabilities.completions else "Not available"
    if hasattr(capabilities, "experimental"):
        result["experimental"] = "Available" if capabilities.experimental else "Not available"
    
    return json.dumps(result, indent=2)

async def get_mcp_server_tools_streamable_http(server_url: str, include_tools: Optional[List[str]] = None, console: Console = None, quiet: bool = False) -> Dict[str, Any]:
    """
    Connect to an MCP server running in streamable-http mode and fetch all registered tools.
    
    Args:
        server_url: The URL of the running MCP server (should be http://host:port/mcp)
        include_tools: Optional list of tool names to include (if None, get all tools)
        console: Optional console for output
        quiet: If True, only show most important output
        
    Returns:
        Dictionary with server info and tool definitions
    """
    if console is None:
        console = Console()
        
    if not quiet:
        console.print(f"[bold blue]Connecting to streamable-http MCP server at {server_url}...[/bold blue]")
    
    try:
        timeout = aiohttp.ClientTimeout(total=30)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            # First, try to initialize the MCP connection
            init_data = {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "initialize",
                "params": {
                    "protocolVersion": "2024-11-05",
                    "capabilities": {"roots": {"listChanged": True}},
                    "clientInfo": {"name": "mcp-tool-context-estimator", "version": "1.0.0"}
                }
            }
            
            headers = {
                "Content-Type": "application/json",
                "Accept": "application/json, text/event-stream"
            }
            
            if not quiet:
                console.print("[bold blue]Initializing MCP protocol via streamable-http...[/bold blue]")
            
            async with session.post(server_url, json=init_data, headers=headers) as response:
                if response.status != 200:
                    raise Exception(f"Failed to initialize: HTTP {response.status}")
                
                # Capture session ID from response headers
                session_id = response.headers.get('mcp-session-id')
                if not session_id:
                    raise Exception("No session ID returned from server")
                
                # Handle SSE-formatted response
                response_text = await response.text()
                if response.content_type == "text/event-stream":
                    # Parse SSE format
                    lines = response_text.strip().split('\n')
                    json_data = None
                    for line in lines:
                        if line.startswith('data: '):
                            json_data = line[6:]  # Remove 'data: ' prefix
                            break
                    if json_data:
                        init_result = json.loads(json_data)
                    else:
                        raise Exception("No JSON data found in SSE response")
                else:
                    init_result = await response.json()
                
                if "error" in init_result:
                    raise Exception(f"MCP initialization error: {init_result['error']}")
                
                if "result" not in init_result:
                    raise Exception("Invalid MCP initialization response")
                
                result = init_result["result"]
                server_info = result.get("serverInfo", {})
                server_name = server_info.get("name", "Unknown Server")
                server_version = server_info.get("version", "Unknown Version")
                
                if not quiet:
                    console.print(f"[green]Connected to server:[/green] {server_name} v{server_version}")
                
                # Show server capabilities
                capabilities = result.get("capabilities", {})
                if not quiet:
                    console.print("[bold blue]Server capabilities:[/bold blue]")
                    console.print(json.dumps(capabilities, indent=2))
                
                # Check if tools capability is present
                has_tools = capabilities.get("tools", False)
                    
                if not quiet and not has_tools:
                    console.print("[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]")
                    console.print("The server might not support tool listing, but we'll try anyway.")
                
                # Get server instructions (from server info)
                server_instructions = server_info.get("instructions", "")
                if server_instructions and not quiet:
                    console.print(f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]")
                elif not quiet:
                    console.print("[yellow]Server does not provide instructions[/yellow]")
            
            # Update headers to include session ID for subsequent requests
            headers["mcp-session-id"] = session_id
            
            # Send initialized notification
            init_notify_data = {
                "jsonrpc": "2.0",
                "method": "notifications/initialized"
            }
            
            async with session.post(server_url, json=init_notify_data, headers=headers) as response:
                # This is a notification, so we don't expect a response
                pass
            
            # Now list the tools
            if not quiet:
                console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
            
            list_tools_data = {
                "jsonrpc": "2.0",
                "id": 2,
                "method": "tools/list"
            }
            
            async with session.post(server_url, json=list_tools_data, headers=headers) as response:
                if response.status != 200:
                    raise Exception(f"Failed to list tools: HTTP {response.status}")
                
                # Handle SSE-formatted response for tools list
                response_text = await response.text()
                if response.content_type == "text/event-stream":
                    # Parse SSE format
                    lines = response_text.strip().split('\n')
                    json_data = None
                    for line in lines:
                        if line.startswith('data: '):
                            json_data = line[6:]  # Remove 'data: ' prefix
                            break
                    if json_data:
                        tools_result = json.loads(json_data)
                    else:
                        raise Exception("No JSON data found in SSE response")
                else:
                    tools_result = await response.json()
                
                if "error" in tools_result:
                    raise Exception(f"MCP tools/list error: {tools_result['error']}")
                
                if "result" not in tools_result:
                    raise Exception("Invalid MCP tools/list response")
                
                tools_data = tools_result["result"]
                tools = tools_data.get("tools", [])
                
                # Count tools
                tool_count = len(tools) if tools else 0
                if not quiet:
                    console.print(f"[green]Found {tool_count} tools[/green]")
                
                if tool_count == 0:
                    console.print("[bold yellow]No tools found on the server.[/bold yellow]")
                    return {}
                
                # Convert tools to their JSON representation (exactly as sent to LLMs)
                tool_defs = []
                
                # Add debug information about descriptions
                has_descriptions = 0
                total_desc_length = 0
                
                for tool in tools:
                    # Convert to dict that matches the MCP protocol spec for tool definitions
                    tool_dict = {
                        "name": tool.get("name"),
                        "inputSchema": tool.get("inputSchema")
                    }
                    
                    # Debug description handling
                    if tool.get("description"):
                        desc = tool["description"]
                        has_descriptions += 1
                        total_desc_length += len(desc)
                        if not quiet:
                            console.print(f"[dim]Tool '{tool['name']}' has description ({len(desc):,} chars)[/dim]")
                        tool_dict["description"] = desc
                    elif not quiet:
                        console.print(f"[dim yellow]Tool '{tool['name']}' has no description[/dim yellow]")
                        
                    if tool.get("annotations"):
                        tool_dict["annotations"] = tool["annotations"]
                    
                    tool_defs.append(tool_dict)
                
                # Print description statistics
                if not quiet:
                    console.print(f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]")
                    if has_descriptions > 0:
                        console.print(f"[green]Average description length: {total_desc_length/has_descriptions:,.1f} chars[/green]")
                
                # Include server info in the result to be used for creating the complete LLM prompt
                return {
                    "tools": tool_defs,
                    "server_name": server_name,
                    "server_version": server_version,
                    "server_instructions": server_instructions
                }
                
    except Exception as e:
        console.print(f"[bold red]Error connecting to streamable-http MCP server:[/bold red] {str(e)}")
        if not quiet:
            console.print("[bold yellow]Stack trace:[/bold yellow]")
            console.print(traceback.format_exc())
        raise

async def get_mcp_server_tools_stdio(command: str, args: Optional[List[str]] = None, include_tools: Optional[List[str]] = None, console: Console = None, quiet: bool = False) -> Dict[str, Any]:
    """
    Connect to an MCP server via stdio transport and fetch all registered tools.
    
    Args:
        command: Command to run the MCP server
        args: Additional arguments for the command
        include_tools: Optional list of tool names to include (if None, get all tools)
        console: Optional console for output
        quiet: If True, only show most important output
        
    Returns:
        Dictionary with server info and tool definitions
    """
    if console is None:
        console = Console()
    
    if not quiet:
        console.print(f"[bold blue]Connecting to MCP server via stdio: {command} {' '.join(args or [])}[/bold blue]")
    
    try:
        # Build the command array
        cmd = command.split() if isinstance(command, str) else [command]
        if args:
            cmd.extend(args)
        
        async with stdio_client(cmd) as (read, write):
            # Create a client session
            async with ClientSession(read, write) as session:
                # Initialize connection to server
                if not quiet:
                    console.print("[bold blue]Initializing MCP protocol via stdio...[/bold blue]")
                init_result = await session.initialize()
                
                # Get server info
                server_name = init_result.serverInfo.name
                server_version = init_result.serverInfo.version
                if not quiet:
                    console.print(f"[green]Connected to server:[/green] {server_name} v{server_version}")
                
                # Show server capabilities safely
                if not quiet:
                    console.print("[bold blue]Server capabilities:[/bold blue]")
                    console.print(format_capabilities(init_result.capabilities))
                
                # Check if tools capability is present
                has_tools = False
                if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools:
                    has_tools = True
                    
                if not quiet and not has_tools:
                    console.print("[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]")
                    console.print("The server might not support tool listing, but we'll try anyway.")
                
                # Get server instructions (will be used in the LLM prompt)
                server_instructions = ""
                if hasattr(init_result, "instructions") and init_result.instructions:
                    server_instructions = init_result.instructions
                    if not quiet:
                        console.print(f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]")
                elif not quiet:
                    console.print("[yellow]Server does not provide instructions[/yellow]")
                
                # List available tools
                if not quiet:
                    console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
                try:
                    tools_result = await session.list_tools()
                    
                    # Handle ListToolsResult object
                    tools = []
                    if hasattr(tools_result, "tools"):
                        tools = tools_result.tools
                    else:
                        if not quiet:
                            console.print("[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]")
                        if hasattr(tools_result, "__iter__"):
                            tools = list(tools_result)
                        else:
                            if not quiet:
                                console.print(f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]")
                                console.print(f"Tools result attributes: {dir(tools_result)}")
                            raise ValueError("Unable to extract tools from server response")
                    
                    # Count tools
                    tool_count = len(tools) if tools else 0
                    if not quiet:
                        console.print(f"[green]Found {tool_count} tools[/green]")
                    
                    if tool_count == 0:
                        console.print("[bold yellow]No tools found on the server.[/bold yellow]")
                        return {}
                    
                    # Convert tools to their JSON representation (exactly as sent to LLMs)
                    tool_defs = []
                    
                    # Add debug information about descriptions
                    has_descriptions = 0
                    total_desc_length = 0
                    
                    for tool in tools:
                        # Convert to dict that matches the MCP protocol spec for tool definitions
                        tool_dict = {
                            "name": tool.name,
                            "inputSchema": tool.inputSchema
                        }
                        
                        # Debug description handling
                        if hasattr(tool, "description") and tool.description:
                            desc = tool.description
                            has_descriptions += 1
                            total_desc_length += len(desc)
                            if not quiet:
                                console.print(f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]")
                            tool_dict["description"] = desc
                        elif not quiet:
                            console.print(f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]")
                            
                        if hasattr(tool, "annotations") and tool.annotations:
                            tool_dict["annotations"] = tool.annotations
                        
                        tool_defs.append(tool_dict)
                    
                    # Print description statistics
                    if not quiet:
                        console.print(f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]")
                        if has_descriptions > 0:
                            console.print(f"[green]Average description length: {total_desc_length/has_descriptions:,.1f} chars[/green]")
                    
                    # Include server info in the result to be used for creating the complete LLM prompt
                    return {
                        "tools": tool_defs,
                        "server_name": server_name,
                        "server_version": server_version,
                        "server_instructions": server_instructions
                    }
                except Exception as e:
                    console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
                    if not quiet:
                        console.print("[bold yellow]Stack trace:[/bold yellow]")
                        console.print(traceback.format_exc())
                    raise
    except Exception as e:
        console.print(f"[bold red]Error connecting to MCP server via stdio:[/bold red] {str(e)}")
        if not quiet:
            console.print("[bold yellow]Stack trace:[/bold yellow]")
            console.print(traceback.format_exc())
        raise

async def get_mcp_server_tools(server_url: str, transport_type: str, include_tools: Optional[List[str]] = None, console: Console = None, quiet: bool = False, command: Optional[str] = None, args: Optional[List[str]] = None) -> Dict[str, Any]:
    """
    Connect to an already running MCP server and fetch all registered tools.
    
    Args:
        server_url: The URL of the running MCP server (ignored for stdio)
        transport_type: The transport type ('sse', 'streamable-http', or 'stdio')
        include_tools: Optional list of tool names to include (if None, get all tools)
        console: Optional console for output
        quiet: If True, only show most important output
        command: Command to run for stdio transport
        args: Additional arguments for stdio command
        
    Returns:
        Dictionary with server info and tool definitions
    """
    if console is None:
        console = Console()
    
    if transport_type == "streamable-http":
        return await get_mcp_server_tools_streamable_http(server_url, include_tools, console, quiet)
    elif transport_type == "stdio":
        if not command:
            raise ValueError("Command must be provided for stdio transport")
        return await get_mcp_server_tools_stdio(command, args, include_tools, console, quiet)
    
    # Original SSE implementation
    if not quiet:
        console.print(f"[bold blue]Connecting to MCP server at {server_url}...[/bold blue]")
    
    try:
        async with sse_client(server_url) as (read, write):
            # Create a client session
            async with ClientSession(read, write) as session:
                # Initialize connection to server
                if not quiet:
                    console.print("[bold blue]Initializing MCP protocol...[/bold blue]")
                init_result = await session.initialize()
                
                # Get server info
                server_name = init_result.serverInfo.name
                server_version = init_result.serverInfo.version
                if not quiet:
                    console.print(f"[green]Connected to server:[/green] {server_name} v{server_version}")
                
                # Show server capabilities safely
                if not quiet:
                    console.print("[bold blue]Server capabilities:[/bold blue]")
                    console.print(format_capabilities(init_result.capabilities))
                
                # Check if tools capability is present
                has_tools = False
                if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools:
                    has_tools = True
                    
                if not quiet and not has_tools:
                    console.print("[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]")
                    console.print("The server might not support tool listing, but we'll try anyway.")
                
                # Get server instructions (will be used in the LLM prompt)
                server_instructions = ""
                if hasattr(init_result, "instructions") and init_result.instructions:
                    server_instructions = init_result.instructions
                    if not quiet:
                        console.print(f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]")
                elif not quiet:
                    console.print("[yellow]Server does not provide instructions[/yellow]")
                
                # List available tools
                if not quiet:
                    console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
                try:
                    tools_result = await session.list_tools()
                    
                    # Handle ListToolsResult object
                    # The result should have a 'tools' attribute which is the actual list
                    tools = []
                    if hasattr(tools_result, "tools"):
                        tools = tools_result.tools
                    else:
                        # If it doesn't have a tools attribute, try to access it as a list directly
                        # or check other common patterns
                        if not quiet:
                            console.print("[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]")
                        if hasattr(tools_result, "__iter__"):
                            tools = list(tools_result)
                        else:
                            # Print the object to help diagnose
                            if not quiet:
                                console.print(f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]")
                                console.print(f"Tools result attributes: {dir(tools_result)}")
                            raise ValueError("Unable to extract tools from server response")
                    
                    # Count tools
                    tool_count = len(tools) if tools else 0
                    if not quiet:
                        console.print(f"[green]Found {tool_count} tools[/green]")
                    
                    if tool_count == 0:
                        console.print("[bold yellow]No tools found on the server.[/bold yellow]")
                        return {}
                    
                    # Convert tools to their JSON representation (exactly as sent to LLMs)
                    tool_defs = []
                    
                    # Add debug information about descriptions
                    has_descriptions = 0
                    total_desc_length = 0
                    
                    for tool in tools:
                        # Convert to dict that matches the MCP protocol spec for tool definitions
                        tool_dict = {
                            "name": tool.name,
                            "inputSchema": tool.inputSchema
                        }
                        
                        # Debug description handling
                        if hasattr(tool, "description") and tool.description:
                            desc = tool.description
                            has_descriptions += 1
                            total_desc_length += len(desc)
                            if not quiet:
                                console.print(f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]")
                            tool_dict["description"] = desc
                        elif not quiet:
                            console.print(f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]")
                            
                        if hasattr(tool, "annotations") and tool.annotations:
                            tool_dict["annotations"] = tool.annotations
                        
                        tool_defs.append(tool_dict)
                    
                    # Print description statistics
                    if not quiet:
                        console.print(f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]")
                        if has_descriptions > 0:
                            console.print(f"[green]Average description length: {total_desc_length/has_descriptions:,.1f} chars[/green]")
                    
                    # Include server info in the result to be used for creating the complete LLM prompt
                    return {
                        "tools": tool_defs,
                        "server_name": server_name,
                        "server_version": server_version,
                        "server_instructions": server_instructions
                    }
                except Exception as e:
                    console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
                    if not quiet:
                        console.print("[bold yellow]Stack trace:[/bold yellow]")
                        console.print(traceback.format_exc())
                    
                    # Try retrieving server details to help diagnose
                    if not quiet:
                        try:
                            console.print("[bold blue]Getting additional server information...[/bold blue]")
                            if hasattr(init_result.capabilities, "prompts") and init_result.capabilities.prompts:
                                prompts_result = await session.list_prompts()
                                prompt_count = 0
                                if hasattr(prompts_result, "prompts"):
                                    prompt_count = len(prompts_result.prompts)
                                console.print(f"Server has {prompt_count} prompts available")
                        except Exception:
                            pass
                        
                    raise
    except Exception as e:
        console.print(f"[bold red]Error connecting to MCP server:[/bold red] {str(e)}")
        if not quiet:
            console.print("[bold yellow]Stack trace:[/bold yellow]")
            console.print(traceback.format_exc())
        
        # Provide guidance based on the error
        if "Connection refused" in str(e):
            console.print("[bold yellow]The server doesn't appear to be running at the specified URL.[/bold yellow]")
            console.print("Make sure your MCP server is running and available at the URL you specified.")
        elif "401" in str(e):
            console.print("[bold yellow]Authentication error - the server requires credentials.[/bold yellow]")
        elif "404" in str(e):
            console.print("[bold yellow]The server endpoint was not found.[/bold yellow]")
            console.print("Check if you need to use a different URL path (e.g., /sse or /mcp)")
            console.print("Try using /sse instead of just the port number.")
        
        sys.exit(1)

def create_full_tool_registration_prompt(server_info, tools=None, quiet=False):
    """
    Create a full, realistic prompt as would be sent to an LLM when registering MCP tools.
    
    This generates the exact format used in the MCP client's format_tools_for_anthropic method
    which sends tools to the Anthropic API.
    
    Args:
        server_info: Dictionary with server information
        tools: List of tool definitions to include (if None, use all tools)
        quiet: If True, only show most important output
        
    Returns:
        String with the serialized JSON representation of tools as sent to the API
    """
    if tools is None:
        tools = server_info["tools"]
        
    # The actual format sent to Anthropic API is just:
    # {
    #   "name": sanitized_name,
    #   "input_schema": tool.input_schema,
    #   "description": tool.description  # only if present
    # }
    formatted_tools = []
    
    # Track description statistics
    desc_count = 0
    total_desc_len = 0
    
    console = Console()
    
    for tool in tools:
        # Create the tool dict exactly as in format_tools_for_anthropic
        tool_dict_for_api = {
            "name": tool["name"],
            "input_schema": tool["inputSchema"]
        }
        if SHOW_DESCRIPTIONS:
            # Add description only if it exists and is not empty
            if "description" in tool and tool["description"]:
                desc = tool["description"]
                tool_dict_for_api["description"] = desc
                desc_count += 1
                total_desc_len += len(desc)
                if not quiet and len(desc) > 100:
                    # Show abbreviated version for long descriptions
                    abbrev = desc[:50] + "..." + desc[-50:]
                    console.print(f"[dim]Including description for {tool['name']}: {abbrev}[/dim]")
                elif not quiet:
                    console.print(f"[dim]Including description for {tool['name']}: {desc}[/dim]")
            elif not quiet:
                console.print(f"[dim yellow]No description for {tool['name']}[/dim yellow]")
                
        formatted_tools.append(tool_dict_for_api)
    
    # Final description statistics - ALWAYS show these since they're part of the requested output
    console.print(f"[green]Included {desc_count} descriptions out of {len(tools)} tools in final output[/green]")
    if desc_count > 0:
        console.print(f"[green]Average description length in final output: {total_desc_len/desc_count:,.1f} chars[/green]")
    
    # Return the serialized JSON that would be sent to the API
    return json.dumps(formatted_tools, indent=2)

def format_tool_for_llm(tool: Dict[str, Any]) -> str:
    """
    Format a tool definition exactly as it would be presented to an LLM.
    This should match the format used in actual LLM prompt construction.
    """
    # This is how tools are typically formatted for LLMs in the JSON format
    return json.dumps(tool, indent=2)

def analyze_tools_token_usage(current_tools: Dict[str, Any], all_tools: Dict[str, Any], quiet: bool = False):
    """
    Analyze token usage for a complete MCP tool registration prompt
    
    Args:
        current_tools: Current active toolset info
        all_tools: Complete toolset info (with --load-all-tools)
        quiet: If True, only show most important output
    """
    console = Console()
    
    # Format tools as they would be sent to an LLM
    current_tools_subset = current_tools["tools"]
    all_tools_subset = all_tools["tools"]
    
    # Determine if we're likely comparing the same set vs different sets
    same_toolsets = len(current_tools_subset) == len(all_tools_subset)
    if same_toolsets and not quiet:
        console.print("[yellow]Warning: Current tool count equals all tools count.[/yellow]")
        console.print("[yellow]This suggests the server is already running with --load-all-tools[/yellow]")
    
    # Adjust column labels based on what we're comparing
    current_label = "Current Tools"
    all_label = "All Tools" 
    
    # Get JSON representations
    current_tools_json = "\n".join(format_tool_for_llm(tool) for tool in current_tools_subset)
    all_tools_json = "\n".join(format_tool_for_llm(tool) for tool in all_tools_subset)
    
    # Create the full prompts
    current_tools_prompt = create_full_tool_registration_prompt(current_tools, current_tools_subset, quiet)
    all_tools_prompt = create_full_tool_registration_prompt(all_tools, all_tools_subset, quiet)
    
    # Calculate sizes for raw JSON
    current_tools_size_kb = len(current_tools_json.encode('utf-8')) / 1024
    all_tools_size_kb = len(all_tools_json.encode('utf-8')) / 1024
    
    # Calculate sizes for full prompts
    current_tools_prompt_size_kb = len(current_tools_prompt.encode('utf-8')) / 1024
    all_tools_prompt_size_kb = len(all_tools_prompt.encode('utf-8')) / 1024
    
    # Count tokens for raw JSON
    current_tools_tokens = count_tokens(current_tools_json)
    all_tools_tokens = count_tokens(all_tools_json)
    
    # Count tokens for full prompts
    current_tools_prompt_tokens = count_tokens(current_tools_prompt)
    all_tools_prompt_tokens = count_tokens(all_tools_prompt)
    
    # Calculate costs for different models (using full prompt tokens)
    current_tools_costs = {model: (price * current_tools_prompt_tokens / 1000) 
                       for model, price in MODEL_PRICES.items()}
    all_tools_costs = {model: (price * all_tools_prompt_tokens / 1000) 
                          for model, price in MODEL_PRICES.items()}
    
    # Save the complete, untruncated text to files
    with open("current_tools_sent_to_llm.json", "w", encoding="utf-8") as f:
        f.write(current_tools_prompt)
    console.print("[green]Saved current tools JSON to current_tools_sent_to_llm.json[/green]")
    
    with open("all_tools_sent_to_llm.json", "w", encoding="utf-8") as f:
        f.write(all_tools_prompt)
    console.print("[green]Saved all tools JSON to all_tools_sent_to_llm.json[/green]\n\n")
    
    # Create data for display - ensure the data is correct and consistent
    data = {
        "current_tools": {
            "count": len(current_tools_subset),
            "raw_size_kb": current_tools_size_kb,
            "raw_tokens": current_tools_tokens,
            "full_size_kb": current_tools_prompt_size_kb,
            "full_tokens": current_tools_prompt_tokens,
            "costs": current_tools_costs
        },
        "all_tools": {
            "count": len(all_tools_subset),
            "raw_size_kb": all_tools_size_kb,
            "raw_tokens": all_tools_tokens,
            "full_size_kb": all_tools_prompt_size_kb,
            "full_tokens": all_tools_prompt_tokens,
            "costs": all_tools_costs
        }
    }
    
    # Create comparison table
    table = Table(title="Tool Registration Token Usage")
    
    # Add columns - including percentage column 
    table.add_column("Metric", style="white")
    table.add_column(current_label, style="cyan")
    table.add_column(all_label, style="magenta")
    table.add_column("Difference", style="yellow")
    table.add_column(f"{current_label} as % of {all_label}", style="green")
    
    # SECTION 1: Number of Tools
    # Calculate percentage for count
    count_percentage = (data["current_tools"]["count"] / data["all_tools"]["count"]) * 100 if data["all_tools"]["count"] > 0 else 100
    
    # Add rows - keep consistent format with other rows for the number of tools
    table.add_row(
        "Number of Tools", 
        str(data["current_tools"]["count"]), 
        str(data["all_tools"]["count"]),
        str(data["current_tools"]["count"] - data["all_tools"]["count"]),
        f"{count_percentage:.2f}%"
    )
    
    # Add a divider after Number of Tools
    table.add_section()
    
    # SECTION 2: Full Prompt stats
    # Calculate percentage for full prompt size
    full_size_percentage = (data["current_tools"]["full_size_kb"] / data["all_tools"]["full_size_kb"]) * 100 if data["all_tools"]["full_size_kb"] > 0 else 100
    
    table.add_row(
        "Full Prompt Size (KB)", 
        f"{data['current_tools']['full_size_kb']:,.2f}", 
        f"{data['all_tools']['full_size_kb']:,.2f}",
        f"{data['current_tools']['full_size_kb'] - data['all_tools']['full_size_kb']:,.2f}",
        f"{full_size_percentage:.2f}%"
    )
    
    # Calculate percentage for full tokens
    full_tokens_percentage = (data["current_tools"]["full_tokens"] / data["all_tools"]["full_tokens"]) * 100 if data["all_tools"]["full_tokens"] > 0 else 100
    
    table.add_row(
        "Full Prompt Token Count", 
        f"{data['current_tools']['full_tokens']:,}", 
        f"{data['all_tools']['full_tokens']:,}",
        f"{data['current_tools']['full_tokens'] - data['all_tools']['full_tokens']:,}",
        f"{full_tokens_percentage:.2f}%"
    )
    
    # Add a divider after Full Prompt stats
    table.add_section()
    
    # SECTION 3: Model costs
    # Specify the models to include and their order
    models_to_include = [
        "claude-3-7-sonnet-20250219",
        "gpt-4.1",
        "gemini-2.5-pro-preview-03-25",
        "grok-3-latest"
    ]
    
    # Add cost rows for selected models only, in specified order
    for model in models_to_include:
        if model in MODEL_PRICES:
            current_cost = data["current_tools"]["costs"][model]
            all_cost = data["all_tools"]["costs"][model]
            diff_cost = current_cost - all_cost
            
            # Calculate percentage
            cost_percentage = (current_cost / all_cost) * 100 if all_cost > 0 else 100
            
            table.add_row(
                f"Cost ({model})",
                f"${current_cost:.4f}",
                f"${all_cost:.4f}",
                f"${diff_cost:.4f}",
                f"{cost_percentage:.2f}%"
            )
    
    # Print table
    console.print(table)
    
    # Print raw data as JSON (only if not in quiet mode)
    if not quiet:
        console.print("\nRaw token usage data:")
        console.print(json.dumps(data, indent=2))
    
    return data

async def get_complete_toolset(quiet: bool = False) -> List[Dict[str, Any]]:
    """
    Generate the complete toolset that would be available with --load-all-tools
    
    This uses a list of tool names read from a file generated by the server.
    If the file doesn't exist, it will use a list of common tools from the current server.
    
    Args:
        quiet: If True, only show most important output
    
    Returns:
        Dictionary with server info and simulated complete toolset
    """
    console = Console()
    if not quiet:
        console.print("[bold blue]Analyzing complete toolset (--load-all-tools)[/bold blue]")
    
    # First get the current server's tools to extract real descriptions where possible
    try:
        # Get server connection details
        host, port = get_server_url_and_transport()
        server_url, transport_type = await detect_server_transport(host, port, quiet=quiet)
        current_tools_info = await get_mcp_server_tools(server_url, transport_type, quiet=quiet, command=None, args=None)
        current_tools = {tool["name"]: tool for tool in current_tools_info["tools"]} if current_tools_info else {}
        if not quiet:
            console.print(f"[green]Retrieved {len(current_tools)} tools from current server to use their real descriptions[/green]")
    except Exception as e:
        if not quiet:
            console.print(f"[yellow]Could not get current tools: {str(e)}[/yellow]")
        current_tools = {}
    
    # Read tool names from file created by the server
    all_tool_names = read_tool_names_from_file(quiet=quiet)
    
    # If no tools found in file, use the tools we got from the server
    if not all_tool_names and current_tools:
        if not quiet:
            console.print("[yellow]No tools found in file. Using current server tools and adding some common ones.[/yellow]")
        all_tool_names = list(current_tools.keys())
        
        # Add some common tool names that might not be in the current server
        additional_tools = [
            "excel_create_workbook", "excel_open_workbook", "excel_add_worksheet",
            "excel_set_cell_value", "excel_get_cell_value", "excel_save_workbook",
            "excel_get_worksheet_names", "excel_create_chart", "excel_set_range_format",
            "smart_browser.autopilot", "smart_browser.parallel", "smart_browser.download_site_pdfs",
            "generate_image", "analyze_image", "transcribe_audio"
        ]
        
        # Add them if not already present
        for tool in additional_tools:
            if tool not in all_tool_names:
                all_tool_names.append(tool)
    
    if not quiet:
        console.print(f"[green]Using complete list of {len(all_tool_names)} tools for all-tools mode[/green]")
    
    # Create tool entries based on real data
    tool_defs = []
    
    for tool_name in all_tool_names:
        # First check if we have real data for this tool
        if tool_name in current_tools:
            # Use the actual tool definition from the server
            tool_def = current_tools[tool_name]
            if not quiet:
                console.print(f"[dim]Using real definition for tool '{tool_name}'[/dim]")
        else:
            # Create a definition with a realistic description based on the tool name
            tool_desc = f"The {tool_name} tool provides functionality for {tool_name.replace('_', ' ')}. " + \
                       "This would be the actual docstring from the function when loaded with --load-all-tools."
            
            # Create a basic definition
            tool_def = {
                "name": tool_name,
                "inputSchema": {
                    "type": "object",
                    "properties": {
                        "param1": {"type": "string", "description": "First parameter"},
                        "param2": {"type": "string", "description": "Second parameter"}
                    },
                    "required": ["param1"]
                },
                "description": tool_desc
            }
            if not quiet:
                console.print(f"[dim yellow]Created placeholder for tool '{tool_name}'[/dim yellow]")
        
        tool_defs.append(tool_def)
    
    # Return a similar structure to what get_mcp_server_tools returns
    return {
        "tools": tool_defs,
        "server_name": "Ultimate MCP Server (with --load-all-tools)",
        "server_version": "1.6.0",
        "server_instructions": """This server provides access to the complete set of tools available in the Ultimate MCP Server.
When running with --load-all-tools, all tools from all categories are available, including:
- Completion tools for text generation
- Provider tools for model management
- Filesystem tools for file operations
- Optimization tools for cost and performance
- Text processing tools for manipulating text
- Meta tools for accessing tool information
- Search tools for querying databases
- Browser automation tools
- Web research tools
- HTML processing tools
- Extraction tools
- SQL database tools
- Document processing tools
- Audio transcription tools
- Excel spreadsheet tools
- OCR tools
- Sentiment analysis tools
"""
    }

def parse_args():
    """Parse command line arguments"""
    parser = argparse.ArgumentParser(description="MCP Tool Context Estimator")
    parser.add_argument("--url", default=None, 
                        help="URL of the MCP server (default: auto-detected)")
    parser.add_argument("--transport", default=None,
                        choices=["sse", "streamable-http", "stdio"],
                        help="Force specific transport type (default: auto-detect)")
    parser.add_argument("--command", default=None,
                        help="Command to run for stdio transport (e.g., 'python -m ultimate_mcp_server')")
    parser.add_argument("--args", default=None, nargs="*",
                        help="Additional arguments for stdio command")
    parser.add_argument("--no-all-tools", action="store_true",
                        help="Skip comparison with all tools")
    parser.add_argument("--quiet", "-q", action="store_true",
                        help="Only show most important information and final table")
    return parser.parse_args()

async def main():
    """Main function"""
    console = Console()
    args = parse_args()
    
    # Handle stdio transport
    if args.transport == "stdio":
        if not args.command:
            console.print("[bold red]Error: --command is required for stdio transport[/bold red]")
            console.print("Example: --transport stdio --command 'python -m ultimate_mcp_server'")
            sys.exit(1)
        
        server_url = None  # Not used for stdio
        transport_type = "stdio"
        command = args.command
        stdio_args = args.args or []
        
        if not args.quiet:
            console.print(f"[blue]Using stdio transport with command: {command} {' '.join(stdio_args)}[/blue]")
    else:
        # Get server connection details for HTTP transports
        if args.url:
            # Parse URL to extract host and port
            import urllib.parse
            parsed = urllib.parse.urlparse(args.url)
            host = parsed.hostname or "localhost"
            port = str(parsed.port or 8013)
            if args.transport:
                transport_type = args.transport
                if transport_type == "sse":
                    server_url = f"http://{host}:{port}/sse"
                else:  # streamable-http
                    server_url = f"http://{host}:{port}/mcp/"
            else:
                # Auto-detect transport for manually specified URL
                server_url, transport_type = await detect_server_transport(host, port, quiet=args.quiet)
        else:
            # Auto-detect everything
            host, port = get_server_url_and_transport()
            if args.transport:
                transport_type = args.transport
                if transport_type == "sse":
                    server_url = f"http://{host}:{port}/sse"
                else:  # streamable-http
                    server_url = f"http://{host}:{port}/mcp/"
            else:
                server_url, transport_type = await detect_server_transport(host, port, quiet=args.quiet)
        
        command = None
        stdio_args = None
    
    quiet_mode = args.quiet
    
    try:
        # Get the active toolset from the running server
        current_tools = await get_mcp_server_tools(
            server_url, 
            transport_type, 
            quiet=quiet_mode,
            command=command,
            args=stdio_args
        )
        
        if not current_tools or "tools" not in current_tools or not current_tools["tools"]:
            console.print("[bold yellow]No tools found on the server.[/bold yellow]")
            return
        
        if args.no_all_tools:
            # If we're not doing the comparison, create a meaningful subset for comparison
            if not quiet_mode:
                console.print("[yellow]Skipping comparison with full --load-all-tools[/yellow]")
                console.print("[green]Creating an artificial subset of current tools for comparison[/green]")
            
            # Create a more meaningful subset by taking half the tools
            # If we have 1-4 tools, use all of them to avoid empty subset
            total_tools = len(current_tools["tools"])
            subset_size = max(total_tools // 2, min(total_tools, 4))
            subset_tools = current_tools["tools"][:subset_size]
            
            if not quiet_mode:
                console.print(f"[green]Created subset with {subset_size} tools out of {total_tools} total[/green]")
            
            # Create subset version
            subset_data = {
                "tools": subset_tools, 
                "server_name": current_tools["server_name"] + " (Subset)",
                "server_version": current_tools["server_version"],
                "server_instructions": current_tools["server_instructions"]
            }
            
            # Analyze token usage with the artificial subset vs full
            analyze_tools_token_usage(subset_data, current_tools, quiet=quiet_mode)
        else:
            # Get the complete toolset that would be available with --load-all-tools
            all_tools = await get_complete_toolset(quiet=quiet_mode)
            
            # Check if current server is likely already running with all tools
            current_tool_count = len(current_tools["tools"])
            all_tool_count = len(all_tools["tools"])
            
            if abs(current_tool_count - all_tool_count) <= 2:  # Allow small difference
                if not quiet_mode:
                    console.print(f"[yellow]Warning: Current server has {current_tool_count} tools, "
                                 f"which is very close to the expected all-tools count of {all_tool_count}[/yellow]")
                    console.print("[yellow]This suggests the server is already running with --load-all-tools[/yellow]")
                
                # For accurate comparison when counts are the same, we should just use the same data for both
                # to ensure metrics are consistent
                same_tools_data = {  # noqa: F841
                    "tools": current_tools["tools"].copy(),
                    "server_name": "Current Server",
                    "server_version": current_tools["server_version"],
                    "server_instructions": current_tools["server_instructions"]
                }
                
                # Create a deep copy to ensure they're exactly the same
                all_tools = {
                    "tools": current_tools["tools"].copy(),
                    "server_name": "All Tools",
                    "server_version": current_tools["server_version"],
                    "server_instructions": current_tools["server_instructions"]
                }
            
            # Analyze token usage with full prompt simulation
            analyze_tools_token_usage(current_tools, all_tools, quiet=quiet_mode)
    except KeyboardInterrupt:
        console.print("[bold yellow]Operation cancelled by user[/bold yellow]")
    except Exception as e:
        console.print(f"[bold red]Unexpected error:[/bold red] {str(e)}")
        if not quiet_mode:
            console.print(traceback.format_exc())

if __name__ == "__main__":
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/base.py:
--------------------------------------------------------------------------------

```python
"""Base tool classes and decorators for Ultimate MCP Server."""
import asyncio
import functools
import inspect
import time
from typing import Any, Callable, Dict, List, Optional, Type, Union

try:
    from fastmcp import Tool
except ImportError:
    # Handle case where mcp might be available via different import
    try:
        from fastmcp import Tool
    except ImportError:
        Tool = None  # Tool will be provided by the mcp_server

from ultimate_mcp_server.exceptions import (
    ResourceError,
    ToolError,
    ToolExecutionError,
    ToolInputError,
    format_error_response,
)

# from ultimate_mcp_server.services.cache import with_cache
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.base")


def tool(name=None, description=None):
    """
    Decorator that marks a BaseTool class method as an MCP tool.
    
    This decorator adds metadata to a method, identifying it as a tool that should be
    registered with the MCP server when the containing BaseTool class is initialized.
    It allows customizing the tool's name and description, which are used in tool
    discoverability and documentation.
    
    Unlike the register_tool function which directly registers standalone functions,
    this decorator only marks methods for later registration, allowing BaseTool subclasses
    to organize multiple related tools together in a single class.
    
    The decorator adds three attributes to the method:
    - _tool: A boolean flag indicating this is a tool method
    - _tool_name: The name to use when registering the tool (or original method name)
    - _tool_description: The description to use for the tool (or method docstring)
    
    These attributes are used during the tool registration process, typically in the
    _register_tools method of BaseTool subclasses.
    
    Args:
        name: Custom name for the tool (defaults to the method name if not provided)
        description: Custom description for the tool (defaults to the method's docstring)
        
    Returns:
        A decorator function that adds tool metadata attributes to the decorated method
        
    Example:
        ```python
        class MyToolSet(BaseTool):
            tool_name = "my_toolset"
            
            @tool(name="custom_operation", description="Performs a customized operation")
            async def perform_operation(self, param1: str, param2: int) -> Dict[str, Any]:
                # Implementation
                return {"result": "success"}
        ```
        
    Notes:
        - This decorator should be used on methods of classes that inherit from BaseTool
        - Decorated methods should be async
        - The decorated method must take self as its first parameter
        - This decorator does not apply error handling or other middleware automatically
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(self, *args, **kwargs):
            return await func(self, *args, **kwargs)
        
        wrapper._tool = True
        wrapper._tool_name = name
        wrapper._tool_description = description
        
        return wrapper
    
    return decorator


def with_resource(resource_type, allow_creation=False, require_existence=True):
    """
    Decorator for standardizing resource access and validation in tool methods.
    
    This decorator provides consistent resource handling for tool methods that
    access or create persistent resources in the MCP ecosystem. It enforces resource
    validation rules, handles resource registration, and provides unified error handling
    for resource-related operations.
    
    Core functionalities:
    1. Resource existence validation - Ensures resources exist before allowing access
    2. Resource creation tracking - Registers newly created resources with the system
    3. Resource type validation - Confirms resources match expected types
    4. Standardized error handling - Produces consistent error responses for resource issues
    
    The decorator identifies resource IDs by looking for common parameter names like
    '{resource_type}_id', 'id', or 'resource_id' in the function's keyword arguments.
    When a resource ID is found, it performs the configured validation checks before
    allowing the function to execute. After execution, it can optionally register
    newly created resources.
    
    Args:
        resource_type: Type category for the resource (e.g., "document", "embedding", 
                      "database"). Used for validation and registration.
        allow_creation: Whether the tool is allowed to create new resources of this type.
                       When True, the decorator will register any created resources.
        require_existence: Whether the resource must exist before the tool is called.
                          When True, the decorator will verify resource existence.
        
    Returns:
        A decorator function that applies resource handling to tool methods.
        
    Raises:
        ResourceError: When resource validation fails (e.g., resource not found,
                      resource type mismatch, or unauthorized resource access).
        
    Example:
        ```python
        class DocumentTools(BaseTool):
            @tool()
            @with_resource("document", require_existence=True, allow_creation=False)
            async def get_document_summary(self, document_id: str):
                # This method will fail with ResourceError if document_id doesn't exist
                # Resource existence is checked before this code runs
                ...
                
            @tool()
            @with_resource("document", require_existence=False, allow_creation=True)
            async def create_document(self, content: str, metadata: Dict[str, Any] = None):
                # Process content and create document
                doc_id = str(uuid.uuid4())
                # ... processing logic ...
                
                # Return created resource with resource_id key to trigger registration
                return {
                    "resource_id": doc_id,  # This triggers resource registration
                    "status": "created",
                    "metadata": {"content_length": len(content), "created_at": time.time()}
                }
                # The resource is automatically registered with the returned metadata
        ```
    
    Notes:
        - This decorator should be applied after @tool but before other decorators
          like @with_error_handling to ensure proper execution order
        - Resources created with allow_creation=True must include a "resource_id" 
          key in their result dictionary to trigger registration
        - The resource registry must be accessible via the tool's mcp server instance
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(self, *args, **kwargs):
            # Get resource ID from kwargs (common parameter names)
            resource_id = None
            for param_name in [f"{resource_type}_id", "id", "resource_id"]:
                if param_name in kwargs:
                    resource_id = kwargs[param_name]
                    break
            
            # Check if resource exists if required
            if require_existence and resource_id:
                # Get resource registry from MCP server
                resource_registry = getattr(self.mcp, "resources", None)
                if resource_registry is None:
                    logger.warning(
                        f"Resource registry not available, skipping existence check for {resource_type}/{resource_id}",
                        emoji_key="warning"
                    )
                else:
                    # Check if resource exists
                    exists = await resource_registry.exists(resource_type, resource_id)
                    if not exists:
                        raise ResourceError(
                            f"{resource_type.capitalize()} not found: {resource_id}",
                            resource_type=resource_type,
                            resource_id=resource_id
                        )
            
            # Call function
            result = await func(self, *args, **kwargs)
            
            # If the function returns a new resource ID, register it
            if allow_creation and isinstance(result, dict) and "resource_id" in result:
                new_resource_id = result["resource_id"]
                # Get resource registry from MCP server
                resource_registry = getattr(self.mcp, "resources", None)
                if resource_registry is not None:
                    # Register new resource
                    metadata = {
                        "created_at": time.time(),
                        "creator": kwargs.get("ctx", {}).get("user_id", "unknown"),
                        "resource_type": resource_type
                    }
                    
                    # Add other metadata from result if available
                    if "metadata" in result:
                        metadata.update(result["metadata"])
                    
                    await resource_registry.register(
                        resource_type, 
                        new_resource_id, 
                        metadata=metadata
                    )
                    
                    logger.info(
                        f"Registered new {resource_type}: {new_resource_id}",
                        emoji_key="resource",
                        resource_type=resource_type,
                        resource_id=new_resource_id
                    )
            
            return result
                
        # Add resource metadata to function
        wrapper._resource_type = resource_type
        wrapper._allow_creation = allow_creation
        wrapper._require_existence = require_existence
        
        return wrapper
    
    return decorator


class ResourceRegistry:
    """
    Registry that tracks and manages resources used by MCP tools.
    
    The ResourceRegistry provides a centralized system for tracking resources created or
    accessed by tools within the MCP ecosystem. It maintains resource metadata, handles
    persistence of resource information, and provides methods for registering, looking up,
    and deleting resources.
    
    Resources in the MCP ecosystem represent persistent or semi-persistent objects that
    may be accessed across multiple tool calls or sessions. Examples include documents,
    knowledge bases, embeddings, file paths, and database connections. The registry helps
    manage the lifecycle of these resources and prevents issues like resource leaks or
    unauthorized access.
    
    Key features:
    - In-memory caching of resource metadata for fast lookups
    - Optional persistent storage via pluggable storage backends
    - Resource type categorization (documents, embeddings, etc.)
    - Resource existence checking for access control
    - Simple CRUD operations for resource metadata
    
    Resources are organized by type and identified by unique IDs within those types.
    Each resource has associated metadata that can include creation time, owner information,
    and resource-specific attributes.
    
    The registry is typically initialized by the MCP server and made available to all tools.
    Tools that create resources should register them, and tools that access resources should
    verify their existence before proceeding.
    """
    
    def __init__(self, storage_backend=None):
        """Initialize the resource registry.
        
        Args:
            storage_backend: Backend for persistent storage (if None, in-memory only)
        """
        self.resources = {}
        self.storage = storage_backend
        self.logger = get_logger("ultimate_mcp_server.resources")
    
    async def register(self, resource_type, resource_id, metadata=None):
        """Register a resource in the registry.
        
        Args:
            resource_type: Type of resource (e.g., "document", "embedding")
            resource_id: Resource identifier
            metadata: Additional metadata about the resource
            
        Returns:
            True if registration was successful
        """
        # Initialize resource type if not exists
        if resource_type not in self.resources:
            self.resources[resource_type] = {}
        
        # Register resource
        self.resources[resource_type][resource_id] = {
            "id": resource_id,
            "type": resource_type,
            "metadata": metadata or {},
            "registered_at": time.time()
        }
        
        # Persist to storage backend if available
        if self.storage:
            try:
                await self.storage.save_resource(
                    resource_type, 
                    resource_id, 
                    self.resources[resource_type][resource_id]
                )
            except Exception as e:
                self.logger.error(
                    f"Failed to persist resource {resource_type}/{resource_id}: {str(e)}",
                    emoji_key="error",
                    exc_info=True
                )
        
        return True
    
    async def exists(self, resource_type, resource_id):
        """Check if a resource exists in the registry.
        
        Args:
            resource_type: Type of resource
            resource_id: Resource identifier
            
        Returns:
            True if the resource exists
        """
        # Check in-memory registry first
        if resource_type in self.resources and resource_id in self.resources[resource_type]:
            return True
        
        # Check storage backend if available
        if self.storage:
            try:
                return await self.storage.resource_exists(resource_type, resource_id)
            except Exception as e:
                self.logger.error(
                    f"Failed to check resource existence {resource_type}/{resource_id}: {str(e)}",
                    emoji_key="error",
                    exc_info=True
                )
        
        return False
    
    async def get(self, resource_type, resource_id):
        """Get resource metadata from the registry.
        
        Args:
            resource_type: Type of resource
            resource_id: Resource identifier
            
        Returns:
            Resource metadata or None if not found
        """
        # Check in-memory registry first
        if resource_type in self.resources and resource_id in self.resources[resource_type]:
            return self.resources[resource_type][resource_id]
        
        # Check storage backend if available
        if self.storage:
            try:
                resource = await self.storage.get_resource(resource_type, resource_id)
                if resource:
                    # Cache in memory for future access
                    if resource_type not in self.resources:
                        self.resources[resource_type] = {}
                    self.resources[resource_type][resource_id] = resource
                    return resource
            except Exception as e:
                self.logger.error(
                    f"Failed to get resource {resource_type}/{resource_id}: {str(e)}",
                    emoji_key="error",
                    exc_info=True
                )
        
        return None
    
    async def list(self, resource_type, limit=100, offset=0, filters=None):
        """List resources of a specific type.
        
        Args:
            resource_type: Type of resource to list
            limit: Maximum number of resources to return
            offset: Offset for pagination
            filters: Dictionary of filters to apply
            
        Returns:
            List of resource metadata
        """
        result = []
        
        # Get from storage backend first if available
        if self.storage:
            try:
                resources = await self.storage.list_resources(
                    resource_type, 
                    limit=limit, 
                    offset=offset, 
                    filters=filters
                )
                
                # Cache in memory for future access
                if resources:
                    if resource_type not in self.resources:
                        self.resources[resource_type] = {}
                    
                    for resource in resources:
                        resource_id = resource.get("id")
                        if resource_id:
                            self.resources[resource_type][resource_id] = resource
                    
                    return resources
            except Exception as e:
                self.logger.error(
                    f"Failed to list resources of type {resource_type}: {str(e)}",
                    emoji_key="error",
                    exc_info=True
                )
        
        # Fallback to in-memory registry
        if resource_type in self.resources:
            # Apply filters if provided
            filtered_resources = self.resources[resource_type].values()
            if filters:
                for key, value in filters.items():
                    filtered_resources = [
                        r for r in filtered_resources 
                        if r.get("metadata", {}).get(key) == value
                    ]
            
            # Apply pagination
            result = list(filtered_resources)[offset:offset+limit]
        
        return result
    
    async def delete(self, resource_type, resource_id):
        """Delete a resource from the registry.
        
        Args:
            resource_type: Type of resource
            resource_id: Resource identifier
            
        Returns:
            True if deletion was successful
        """
        # Delete from in-memory registry
        if resource_type in self.resources and resource_id in self.resources[resource_type]:
            del self.resources[resource_type][resource_id]
        
        # Delete from storage backend if available
        if self.storage:
            try:
                return await self.storage.delete_resource(resource_type, resource_id)
            except Exception as e:
                self.logger.error(
                    f"Failed to delete resource {resource_type}/{resource_id}: {str(e)}",
                    emoji_key="error",
                    exc_info=True
                )
        
        return True


class BaseToolMetrics:
    """
    Metrics collection and aggregation system for tool execution statistics.
    
    The BaseToolMetrics class provides a standardized way to track and aggregate performance
    metrics for tool executions. It maintains cumulative statistics about calls to a tool,
    including execution counts, success rates, timing information, and optional token usage
    and cost data when available.
    
    This class is used both internally by BaseTool instances and by the with_tool_metrics
    decorator to provide consistent metrics tracking across the entire MCP ecosystem. The
    collected metrics enable monitoring, debugging, and optimization of tool performance
    and usage patterns.
    
    Metrics tracked:
    - Total number of calls
    - Number of successful and failed calls
    - Success rate
    - Total, minimum, and maximum execution duration
    - Total token usage (for LLM-based tools)
    - Total cost (for tools with cost accounting)
    
    The metrics are aggregated in memory and can be retrieved at any time via the get_stats()
    method. They represent the lifetime statistics of the tool since the metrics object
    was created.
    
    Example:
    ```python
    # Accessing metrics from a tool
    my_tool = MyToolClass(mcp_server)
    metrics = my_tool.metrics.get_stats()
    print(f"Success rate: {metrics['success_rate']:.2%}")
    print(f"Average duration: {metrics['average_duration']:.2f}s")
    ```
    """
    
    def __init__(self):
        """Initialize metrics tracking."""
        self.total_calls = 0
        self.successful_calls = 0
        self.failed_calls = 0
        self.total_duration = 0.0
        self.min_duration = float('inf')
        self.max_duration = 0.0
        self.total_tokens = 0
        self.total_cost = 0.0
        
    def record_call(
        self,
        success: bool,
        duration: float,
        tokens: Optional[int] = None,
        cost: Optional[float] = None
    ) -> None:
        """Record metrics for a tool call.
        
        Args:
            success: Whether the call was successful
            duration: Duration of the call in seconds
            tokens: Number of tokens used (if applicable)
            cost: Cost of the call (if applicable)
        """
        self.total_calls += 1
        
        if success:
            self.successful_calls += 1
        else:
            self.failed_calls += 1
            
        self.total_duration += duration
        self.min_duration = min(self.min_duration, duration)
        self.max_duration = max(self.max_duration, duration)
        
        if tokens is not None:
            self.total_tokens += tokens
            
        if cost is not None:
            self.total_cost += cost
    
    def get_stats(self) -> Dict[str, Any]:
        """Get current metrics.
        
        Returns:
            Dictionary of metrics
        """
        if self.total_calls == 0:
            return {
                "total_calls": 0,
                "success_rate": 0.0,
                "average_duration": 0.0,
                "min_duration": 0.0,
                "max_duration": 0.0,
                "total_tokens": 0,
                "total_cost": 0.0,
            }
            
        return {
            "total_calls": self.total_calls,
            "successful_calls": self.successful_calls,
            "failed_calls": self.failed_calls,
            "success_rate": self.successful_calls / self.total_calls,
            "average_duration": self.total_duration / self.total_calls,
            "min_duration": self.min_duration if self.min_duration != float('inf') else 0.0,
            "max_duration": self.max_duration,
            "total_tokens": self.total_tokens,
            "total_cost": self.total_cost,
        }


class BaseTool:
    """
    Foundation class for all tool implementations in the Ultimate MCP Server.
    
    The BaseTool class serves as the fundamental building block for creating tools that 
    can be registered with and executed by the MCP server. It provides core functionality
    for metrics tracking, logging, resource management, and tool execution.
    
    Tools in the Ultimate MCP Server ecosystem are designed to provide specific capabilities
    that can be invoked by clients (typically LLMs) to perform various operations like
    document processing, vector search, file operations, etc. The BaseTool architecture
    ensures all tools have consistent behavior for error handling, metrics collection,
    and server integration.
    
    Key features:
    - Standardized tool registration via decorators
    - Consistent metrics tracking for all tool executions
    - Unified error handling and response formatting
    - Integration with the server's resource registry
    - Logger setup with tool-specific naming
    
    Tool classes should inherit from BaseTool and define their tools using the @tool
    decorator. Each tool method should be async and follow the standard pattern of
    accepting parameters, performing operations, and returning results in a structured
    format.
    
    Example:
    ```python
    class MyCustomTools(BaseTool):
        tool_name = "my_custom_tools"
        description = "Provides custom tools for specific operations"
        
        @tool(name="custom_operation")
        @with_tool_metrics
        @with_error_handling
        async def perform_operation(self, param1: str, param2: int) -> Dict[str, Any]:
            # Implementation
            return {"result": "success", "data": some_data}
    ```
    """
    
    tool_name: str = "base_tool"
    description: str = "Base tool class for Ultimate MCP Server."
    
    def __init__(self, mcp_server):
        """Initialize the tool.
        
        Args:
            mcp_server: MCP server instance
        """
        # If mcp_server is a Gateway instance, get the MCP object
        self.mcp = mcp_server.mcp if hasattr(mcp_server, 'mcp') else mcp_server
        self.logger = get_logger(f"tool.{self.tool_name}")
        self.metrics = BaseToolMetrics()
        
        # Initialize resource registry if not already available
        if not hasattr(self.mcp, "resources"):
            self.mcp.resources = ResourceRegistry()
        
    def _register_tools(self):
        """Register tools with MCP server.
        
        Override this method in subclasses to register specific tools.
        This method is no longer called by the base class constructor.
        Registration is now handled externally, e.g., in register_all_tools.
        """
        pass
        
    async def execute(self, tool_name: str, params: Dict[str, Any]) -> Any:
        """
        Execute a tool method by name with the given parameters.
        
        This method provides the core execution mechanism for BaseTool subclasses,
        dynamically dispatching calls to the appropriate tool method based on the
        tool_name parameter. It handles parameter validation, metrics collection,
        and error standardization to ensure consistent behavior across all tools.
        
        Execution flow:
        1. Looks up the requested tool method in the class
        2. Validates that the method is properly marked as a tool
        3. Applies metrics tracking via _wrap_with_metrics
        4. Executes the tool with the provided parameters
        5. Returns the tool's response or a standardized error
        
        Args:
            tool_name: Name of the specific tool method to execute
            params: Dictionary of parameters to pass to the tool method
                    (These parameters will be unpacked as kwargs)
        
        Returns:
            The result returned by the tool method, or a standardized error response
            if execution fails
            
        Raises:
            ToolError: If the specified tool_name is not found or not properly
                       marked as a tool method
                       
        Example:
            ```python
            # Direct execution of a tool method
            result = await my_tool_instance.execute(
                "analyze_document", 
                {"document_id": "doc123", "analysis_type": "sentiment"}
            )
            
            # Error handling
            if "isError" in result and result["isError"]:
                print(f"Tool execution failed: {result['error']['message']}")
            else:
                print(f"Analysis result: {result['analysis_score']}")
            ```
        """
        # Find method with tool name
        method_name = tool_name.split(".")[-1]  # Handle namespaced tools
        method = getattr(self, method_name, None)
        
        if not method or not hasattr(method, "_tool"):
            raise ToolError(
                f"Tool not found: {tool_name}",
                error_code="tool_not_found"
            )
        
        # Execute tool with metrics wrapper
        return await self._wrap_with_metrics(method, **params)

    async def _wrap_with_metrics(
        self,
        func: Callable,
        *args,
        **kwargs
    ) -> Any:
        """
        Internal method that wraps a function call with metrics tracking.
        
        This method provides a standardized way to execute tool functions while capturing
        performance metrics such as execution duration, success/failure status, token usage,
        and cost. These metrics are stored in the BaseTool instance's metrics object for
        later analysis and reporting.
        
        The method performs the following steps:
        1. Records the start time of the operation
        2. Executes the provided function with the supplied arguments
        3. If successful, extracts metrics data from the result (if available)
        4. Records the execution metrics in the BaseTool's metrics object
        5. Returns the original result or propagates any exceptions that occurred
        
        Metrics extraction:
        - If the result is a dictionary, it will attempt to extract:
          - Token usage from either result["tokens"]["total"] or result["total_tokens"]
          - Cost information from result["cost"]
        
        Args:
            func: Async function to execute with metrics tracking
            *args: Positional arguments to pass to the function
            **kwargs: Keyword arguments to pass to the function
            
        Returns:
            The result of the wrapped function call
            
        Raises:
            Any exception raised by the wrapped function (after logging it)
            
        Notes:
            - This method is typically called internally by BaseTool subclasses
            - Related to but different from the standalone with_tool_metrics decorator
            - Exceptions are logged but not caught (to allow proper error handling)
        """
        start_time = time.time()
        success = False
        tokens = None
        cost = None
        
        try:
            # Call function
            result = await func(*args, **kwargs)
            
            # Extract metrics if available
            if isinstance(result, dict):
                if "tokens" in result and isinstance(result["tokens"], dict):
                    tokens = result["tokens"].get("total")
                elif "total_tokens" in result:
                    tokens = result["total_tokens"]
                    
                cost = result.get("cost")
                
            success = True
            return result
            
        except Exception as e:
            self.logger.error(
                f"Tool execution failed: {func.__name__}: {str(e)}",
                emoji_key="error",
                tool=func.__name__,
                exc_info=True
            )
            raise
            
        finally:
            # Record metrics
            duration = time.time() - start_time
            self.metrics.record_call(
                success=success,
                duration=duration,
                tokens=tokens,
                cost=cost
            )


def with_tool_metrics(func):
    """
    Decorator that automatically tracks performance metrics for tool functions.
    
    This decorator captures and records execution metrics for both class methods and
    standalone functions. It adapts its behavior based on whether the decorated function
    is a method on a BaseTool instance or a standalone function.
    
    Metrics captured include:
    - Execution time (duration in seconds)
    - Success/failure state
    - Token usage (extracted from result if available)
    - Cost information (extracted from result if available)
    
    The decorator performs several functions:
    1. Captures start time before execution
    2. Executes the wrapped function, preserving all args/kwargs
    3. Extracts metrics from the result dictionary if available
    4. Logs execution statistics
    5. Updates metrics in the BaseTool.metrics object if available
    
    When used with other decorators:
    - Should be applied before with_error_handling to ensure metrics are 
      captured even when errors occur
    - Works well with with_cache, tracking metrics for both cache hits and misses
    - Compatible with with_retry, recording each attempt separately
    
    Args:
        func: The async function to decorate (can be a method or standalone function)
        
    Returns:
        Wrapped async function that captures and records metrics
        
    Example:
        ```python
        @with_tool_metrics
        @with_error_handling
        async def my_tool_function(param1, param2):
            # Function implementation
        ```
    """
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        # Check if the first arg looks like a BaseTool instance
        self_obj = args[0] if args and isinstance(args[0], BaseTool) else None
        tool_name = getattr(self_obj, 'tool_name', func.__name__)

        start_time = time.time()
        success = False
        tokens = None
        cost = None
        result = None
        
        try:
            # Call original function, passing self_obj if it exists
            if self_obj:
                # Assumes if self_obj exists, it's the first positional arg expected by func
                result = func(self_obj, *args[1:], **kwargs)
            else:
                # Pass only the args/kwargs received, assuming func is standalone
                result = func(*args, **kwargs)
            
            # Only await when necessary
            if inspect.isawaitable(result):
                result = await result
            # result is now either a ToolResult _or_ an async iterator
            
            # Extract metrics if available from result
            if isinstance(result, dict):
                if "tokens" in result and isinstance(result["tokens"], dict):
                    tokens = result["tokens"].get("total")
                elif "total_tokens" in result:
                    tokens = result["total_tokens"]
                cost = result.get("cost")
                
            success = True
            return result
            
        except Exception as e:
            logger.error(
                f"Tool execution failed: {tool_name}: {str(e)}",
                emoji_key="error",
                tool=tool_name,
                exc_info=True
            )
            raise # Re-raise exception for other handlers (like with_error_handling)
            
        finally:
            # Record metrics
            duration = time.time() - start_time
            
            # Log execution stats
            logger.debug(
                f"Tool execution: {tool_name} ({'success' if success else 'failed'})",
                emoji_key="tool" if success else "error",
                tool=tool_name,
                time=duration,
                cost=cost
            )
            
            # Update metrics if we found a self object with a metrics attribute
            if self_obj and hasattr(self_obj, 'metrics'):
                self_obj.metrics.record_call(
                    success=success,
                    duration=duration,
                    tokens=tokens,
                    cost=cost
                )
                
    return wrapper


def with_retry(
    max_retries: int = 3,
    retry_delay: float = 1.0,
    backoff_factor: float = 2.0,
    retry_exceptions: List[Type[Exception]] = None
):
    """
    Decorator that adds exponential backoff retry logic to async tool functions.
    
    This decorator wraps an async function with retry logic that will automatically
    re-execute the function if it fails with certain exceptions. It implements an
    exponential backoff strategy to progressively increase the wait time between
    retry attempts, reducing load during transient failures.
    
    Retry behavior:
    1. When the decorated function raises an exception, the decorator checks if it's a
       retriable exception type (based on the retry_exceptions parameter)
    2. If retriable, it waits for a delay period (which increases with each attempt)
    3. After waiting, it retries the function with the same arguments
    4. This process repeats until either the function succeeds or max_retries is reached
    
    Args:
        max_retries: Maximum number of retry attempts before giving up (default: 3)
        retry_delay: Initial delay in seconds before first retry (default: 1.0)
        backoff_factor: Multiplier for delay between retries (default: 2.0)
                       Each retry's delay is calculated as: retry_delay * (backoff_factor ^ attempt)
        retry_exceptions: List of exception types that should trigger retries.
                         If None, all exceptions will trigger retries.
    
    Returns:
        A decorator function that wraps the given async function with retry logic.
        
    Example:
        ```python
        @with_retry(max_retries=3, retry_delay=2.0, backoff_factor=3.0,
                   retry_exceptions=[ConnectionError, TimeoutError])
        async def fetch_data(url):
            # This function will retry up to 3 times if it raises ConnectionError or TimeoutError
            # Delays between retries: 2s, 6s, 18s
            # For other exceptions, it will fail immediately
            return await some_api_call(url)
        ```
        
    Notes:
        - This decorator only works with async functions
        - The decorated function must be idempotent (safe to call multiple times)
        - Retries are logged at WARNING level, final failures at ERROR level
        - The final exception is re-raised after all retries are exhausted
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            delay = retry_delay
            
            for attempt in range(max_retries + 1):
                try:
                    # Call original function
                    return await func(*args, **kwargs)
                    
                except Exception as e:
                    # Only retry on specified exceptions
                    if retry_exceptions and not any(
                        isinstance(e, exc_type) for exc_type in retry_exceptions
                    ):
                        raise
                        
                    last_exception = e
                    
                    # Log retry attempt
                    if attempt < max_retries:
                        logger.warning(
                            f"Tool execution failed, retrying ({attempt+1}/{max_retries}): {str(e)}",
                            emoji_key="warning",
                            tool=func.__name__,
                            attempt=attempt+1,
                            max_retries=max_retries,
                            delay=delay
                        )
                        
                        # Wait before retrying
                        await asyncio.sleep(delay)
                        
                        # Increase delay for next retry
                        delay *= backoff_factor
                    else:
                        # Log final failure
                        logger.error(
                            f"Tool execution failed after {max_retries} retries: {str(e)}",
                            emoji_key="error",
                            tool=func.__name__,
                            exc_info=True
                        )
                        
            # If we get here, all retries failed
            raise last_exception
                
        return wrapper
    return decorator
    

def with_error_handling(func):
    """
    Decorator that transforms tool function exceptions into standardized error responses.
    
    This decorator intercepts any exceptions raised during tool execution and converts them
    into a structured error response format following the MCP protocol standards. It ensures
    that clients receive consistent, actionable error information regardless of how or where
    the error occurred.
    
    The decorator performs several key functions:
    1. Detects if it's decorating a BaseTool method or standalone function and adapts accordingly
    2. Reconstructs function call arguments appropriately based on function signature
    3. Catches exceptions raised during execution and transforms them into structured responses
    4. Maps different exception types to corresponding MCP error types with appropriate metadata
    5. Logs detailed error information while providing a clean, standardized response to clients
    
    Exception handling:
    - ToolError: Passed through with logging (assumes already formatted correctly)
    - ValueError: Converted to ToolInputError with detailed context
    - Other exceptions: Converted to ToolExecutionError with execution context
    
    All error responses have the same structure:
    ```
    {
        "success": False,
        "isError": True,
        "error": {
            "type": "<error_type>",
            "message": "<human-readable message>",
            "details": {<context-specific details>},
            "retriable": <boolean>,
            "suggestions": [<optional recovery suggestions>],
            "timestamp": <current_time>
        }
    }
    ```
    
    Args:
        func: The async function to decorate (can be a method or standalone function)
        
    Returns:
        Decorated async function that catches exceptions and returns structured error responses
        
    Example:
        ```python
        @with_error_handling
        async def my_tool_function(param1, param2):
            # If this raises an exception, it will be transformed into a structured response
            # rather than propagating up to the caller
            # ...
        ```
    """
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        # Check if the first arg looks like a BaseTool instance
        self_obj = args[0] if args and isinstance(args[0], BaseTool) else None
        # Determine tool_name based on instance or func name
        tool_name = getattr(self_obj, 'tool_name', func.__name__) 
        
        sig = inspect.signature(func)
        func_params = set(sig.parameters.keys())  # noqa: F841
        
        call_args = []
        call_kwargs = {}

        if self_obj:
            expected_params = list(sig.parameters.values())
            if expected_params and expected_params[0].name == 'self':
                call_args.append(self_obj)
        
        start_index = 1 if self_obj and call_args else 0
        call_args.extend(args[start_index:])

        # Pass all original kwargs through
        call_kwargs.update(kwargs)
            
        try:
            # Call original function with reconstructed args/kwargs
            # This version passes *all* kwargs received by the wrapper,
            # trusting FastMCP to pass the correct ones including 'ctx'.
            result = func(*call_args, **call_kwargs)
            
            # Only await when necessary
            if inspect.isawaitable(result):
                result = await result
            # result is now either a ToolResult _or_ an async iterator
            return result
            
        except ToolError as e:
            # Already a tool error, log and return
            logger.error(
                f"Tool error in {tool_name}: {str(e)} ({e.error_code})",
                emoji_key="error",
                tool=tool_name,
                error_code=e.error_code,
                details=e.details
            )
            
            # Debug log the formatted error response
            error_response = format_error_response(e)
            logger.debug(f"Formatted error response for {tool_name}: {error_response}")
            
            # Return standardized error response
            return error_response
            
        except ValueError as e:
            # Convert ValueError to ToolInputError with more detailed information
            error = ToolInputError(
                f"Invalid input to {tool_name}: {str(e)}",
                details={
                    "tool_name": tool_name,
                    "exception_type": "ValueError",
                    "original_error": str(e)
                }
            )
            
            logger.error(
                f"Invalid input to {tool_name}: {str(e)}",
                emoji_key="error",
                tool=tool_name,
                error_code=error.error_code
            )
            
            # Return standardized error response
            return format_error_response(error)
            
        except Exception as e:
            # Create a more specific error message that includes the tool name
            specific_message = f"Execution error in {tool_name}: {str(e)}"
            
            # Convert to ToolExecutionError for other exceptions
            error = ToolExecutionError(
                specific_message,
                cause=e,
                details={
                    "tool_name": tool_name,
                    "exception_type": type(e).__name__,
                    "original_message": str(e)
                }
            )
            
            logger.error(
                specific_message,
                emoji_key="error",
                tool=tool_name,
                exc_info=True
            )
            
            # Return standardized error response
            return format_error_response(error)
                
    return wrapper


def register_tool(mcp_server, name=None, description=None, cache_ttl=None):
    """
    Register a standalone function as an MCP tool with optional caching and error handling.
    
    This function creates a decorator that registers the decorated function with the MCP server,
    automatically applying error handling and optional result caching. It provides a simpler
    alternative to class-based tool registration via the BaseTool class, allowing standalone
    functions to be exposed as MCP tools without creating a full tool class.
    
    The decorator handles:
    1. Tool registration with the MCP server using the provided name (or function name)
    2. Documentation via the provided description (or function docstring)
    3. Optional result caching with the specified TTL
    4. Standardized error handling via the with_error_handling decorator
    
    Args:
        mcp_server: MCP server instance to register the tool with
        name: Tool name used for registration (defaults to the function name if not provided)
        description: Tool description for documentation (defaults to function docstring if not provided)
        cache_ttl: Optional time-to-live in seconds for caching tool results. If provided, the tool results
                  will be cached for this duration to improve performance for identical calls.
        
    Returns:
        Decorator function that transforms the decorated function into a registered MCP tool
        
    Example:
        ```python
        # Initialize MCP server
        mcp_server = FastMCP()
        
        # Register a function as a tool
        @register_tool(mcp_server, name="get_weather", cache_ttl=300)
        async def get_weather_data(location: str, units: str = "metric"):
            '''Get current weather data for a location.'''
            # Implementation
            return {"temperature": 22, "conditions": "sunny"}
            
        # The function is now registered as an MCP tool named "get_weather"
        # with 5-minute result caching and standardized error handling
        ```
        
    Notes:
        - The decorated function must be async
        - If cache_ttl is provided, identical calls will return cached results 
          rather than re-executing the function
        - Function signature is preserved, making it transparent to callers
        - For more complex tools with multiple methods, use the BaseTool class instead
    """
    def decorator(func):
        # Get function name and docstring
        tool_name = name or func.__name__
        tool_description = description or func.__doc__ or f"Tool: {tool_name}"  # noqa: F841
        
        # Apply caching if specified
        # if cache_ttl is not None:
        #     func = with_cache(ttl=cache_ttl)(func)
        
        # Apply error handling
        func = with_error_handling(func)
        
        # Register with MCP server
        mcp_server.tool(name=tool_name)(func)
        
        return func
    
    return decorator

def _get_json_schema_type(type_annotation):
    """
    Convert Python type annotations to JSON Schema type definitions.
    
    This utility function translates Python's typing annotations into equivalent JSON Schema
    type definitions, enabling automatic generation of API documentation and client interfaces
    from Python function signatures. It handles basic types, Optional types, Lists, and 
    provides reasonable defaults for complex types.
    
    The function is primarily used internally by the MCP framework to generate JSON Schema
    definitions for tool parameters, allowing clients to understand the expected input types
    and structures for each tool.
    
    Type mappings:
    - str -> {"type": "string"}
    - int -> {"type": "integer"}
    - float -> {"type": "number"}
    - bool -> {"type": "boolean"}
    - Optional[T] -> Same as T, but adds "null" to "type" array
    - List[T] -> {"type": "array", "items": <schema for T>}
    - Dict -> {"type": "object"}
    - Other complex types -> {"type": "object"}
    
    Args:
        type_annotation: A Python type annotation (from typing module or built-in types)
        
    Returns:
        A dictionary containing the equivalent JSON Schema type definition
        
    Notes:
        - This function provides only type information, not complete JSON Schema validation
          rules like minimum/maximum values, string patterns, etc.
        - Complex nested types (e.g., List[Dict[str, List[int]]]) are handled, but deeply 
          nested structures may be simplified in the output schema
        - This function is meant for internal use by the tool registration system
        
    Examples:
        ```python
        # Basic types
        _get_json_schema_type(str)  # -> {"type": "string"}
        _get_json_schema_type(int)  # -> {"type": "integer"}
        
        # Optional types
        from typing import Optional
        _get_json_schema_type(Optional[str])  # -> {"type": ["string", "null"]}
        
        # List types
        from typing import List
        _get_json_schema_type(List[int])  # -> {"type": "array", "items": {"type": "integer"}}
        
        # Complex types
        from typing import Dict, List
        _get_json_schema_type(Dict[str, List[int]])  # -> {"type": "object"}
        ```
    """
    import typing
    
    # Handle basic types
    if type_annotation is str:
        return {"type": "string"}
    elif type_annotation is int:
        return {"type": "integer"}
    elif type_annotation is float:
        return {"type": "number"}
    elif type_annotation is bool:
        return {"type": "boolean"}
    
    # Handle Optional types
    origin = typing.get_origin(type_annotation)
    args = typing.get_args(type_annotation)
    
    if origin is Union and type(None) in args:
        # Optional type - get the non-None type
        non_none_args = [arg for arg in args if arg is not type(None)]
        if len(non_none_args) == 1:
            inner_type = _get_json_schema_type(non_none_args[0])
            return inner_type
    
    # Handle lists
    if origin is list or origin is List:
        if args:
            item_type = _get_json_schema_type(args[0])
            return {
                "type": "array",
                "items": item_type
            }
        return {"type": "array"}
    
    # Handle dictionaries
    if origin is dict or origin is Dict:
        return {"type": "object"}
    
    # Default to object for complex types
    return {"type": "object"}

def with_state_management(namespace: str):
    """
    Decorator that provides persistent state management capabilities to tool functions.
    
    This decorator enables stateful behavior in otherwise stateless tool functions by
    injecting state access methods that allow reading, writing, and deleting values
    from a persistent, namespace-based state store. This is essential for tools that
    need to maintain context across multiple invocations, manage session data, or 
    build features with memory capabilities.
    
    The state management system provides:
    - Namespace isolation: Each tool can use its own namespace to prevent key collisions
    - Thread-safe concurrency: Built-in locks ensure safe parallel access to the same state
    - Optional persistence: State can be backed by disk storage for durability across restarts
    - Lazy loading: State is loaded from disk only when accessed, improving performance
    
    State accessibility functions injected into the decorated function:
    - get_state(key, default=None) → Any: Retrieve a value by key, with optional default
    - set_state(key, value) → None: Store a value under the specified key
    - delete_state(key) → None: Remove a value from the state store
    
    All state operations are async, allowing the tool to continue processing while
    state operations are pending.
    
    Args:
        namespace: A unique string identifying this tool's state namespace. This 
                  should be chosen carefully to avoid collisions with other tools.
                  Recommended format: "<tool_category>.<specific_feature>"
                  Examples: "conversation.history", "user.preferences", "document.cache"
    
    Returns:
        A decorator function that wraps the original tool function, adding state
        management capabilities via injected parameters.
        
    Examples:
        Basic usage with conversation history:
        ```python
        @with_state_management("conversation.history")
        async def chat_with_memory(message: str, ctx=None, get_state=None, set_state=None, delete_state=None):
            # Get previous messages from persistent store
            history = await get_state("messages", [])
            
            # Add new message
            history.append({"role": "user", "content": message})
            
            # Generate response based on all previous conversation context
            response = generate_response(message, history)
            
            # Add AI response to history
            history.append({"role": "assistant", "content": response})
            
            # Store updated history for future calls
            await set_state("messages", history)
            return {"response": response}
        ```
        
        Advanced pattern with conversational memory and user customization:
        ```python
        @with_state_management("assistant.settings")
        async def personalized_assistant(
            query: str, 
            update_preferences: bool = False,
            preferences: Dict[str, Any] = None,
            ctx=None, 
            get_state=None, 
            set_state=None, 
            delete_state=None
        ):
            # Get user ID from context
            user_id = ctx.get("user_id", "default_user")
            
            # Retrieve user-specific preferences
            user_prefs = await get_state(f"prefs:{user_id}", {
                "tone": "professional",
                "verbosity": "concise",
                "expertise_level": "intermediate"
            })
            
            # Update preferences if requested
            if update_preferences and preferences:
                user_prefs.update(preferences)
                await set_state(f"prefs:{user_id}", user_prefs)
            
            # Get conversation history
            history = await get_state(f"history:{user_id}", [])
            
            # Process query using preferences and history
            response = process_personalized_query(
                query, 
                user_preferences=user_prefs,
                conversation_history=history
            )
            
            # Update conversation history
            history.append({"query": query, "response": response})
            if len(history) > 20:  # Keep only recent history
                history = history[-20:]
            await set_state(f"history:{user_id}", history)
            
            return {
                "response": response,
                "preferences": user_prefs
            }
        ```
        
        State persistence across server restarts:
        ```python
        # First call to the tool
        @with_state_management("task.progress")
        async def long_running_task(task_id: str, step: int = None, ctx=None, 
                                   get_state=None, set_state=None, delete_state=None):
            # Get current progress
            progress = await get_state(task_id, {"completed_steps": [], "current_step": 0})
            
            # Update progress if a new step is provided
            if step is not None:
                progress["current_step"] = step
                progress["completed_steps"].append({
                    "step": step,
                    "timestamp": time.time()
                })
                await set_state(task_id, progress)
            
            # Even if the server restarts, the next call will retrieve the saved progress
            return {
                "task_id": task_id,
                "progress": progress,
                "completed": len(progress["completed_steps"]),
                "current_step": progress["current_step"]
            }
        ```
        
    Implementation Pattern:
    The decorator works by injecting three async state management functions into the
    decorated function's keyword arguments:
    
    1. `get_state(key, default=None)`:
       - Retrieves state values from the persistent store
       - If key doesn't exist, returns the provided default value
       - Example: `user_data = await get_state("user:12345", {})`
    
    2. `set_state(key, value)`: 
       - Stores a value in the persistent state store
       - Automatically serializes complex Python objects (dicts, lists, etc.)
       - Example: `await set_state("session:abc", {"authenticated": True})`
    
    3. `delete_state(key)`:
       - Removes a key and its associated value from the store
       - Example: `await delete_state("temporary_data")`
    
    Notes:
        - The decorated function must accept get_state, set_state, delete_state, and ctx
          parameters, either explicitly or via **kwargs.
        - State persistence depends on the MCP server configuration. If persistence is
          enabled, state will survive server restarts.
        - For large objects, consider storing only references or identifiers in the state
          and using a separate storage system for the actual data.
        - The state store is shared across all server instances, so state keys should be
          chosen to avoid collisions between different tools and features.
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # Get context from kwargs
            context = kwargs.get('ctx')
            if not context or not hasattr(context, 'fastmcp'):
                raise ValueError("Context with FastMCP server required")
            
            # Access StateStore via the FastMCP 2.0+ pattern
            if not hasattr(context.fastmcp, '_state_store'):
                raise ValueError("FastMCP server does not have a state store attached")
            
            state_store = context.fastmcp._state_store
            
            # Add state accessors to kwargs
            kwargs['get_state'] = lambda key, default=None: state_store.get(namespace, key, default)
            kwargs['set_state'] = lambda key, value: state_store.set(namespace, key, value)
            kwargs['delete_state'] = lambda key: state_store.delete(namespace, key)
            
            return await func(*args, **kwargs)
        
        # Update signature to include context parameter if not already present
        sig = inspect.signature(func)
        if 'ctx' not in sig.parameters:
            wrapped_params = list(sig.parameters.values())
            wrapped_params.append(
                inspect.Parameter('ctx', inspect.Parameter.KEYWORD_ONLY, 
                                 annotation='Optional[Dict[str, Any]]', default=None)
            )
            wrapper.__signature__ = sig.replace(parameters=wrapped_params)
        
        return wrapper
    return decorator

```
Page 16/35FirstPrevNextLast