#
tokens: 48176/50000 4/207 files (page 15/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 15 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/optimization.py:
--------------------------------------------------------------------------------

```python
"""Tools for LLM cost estimation, model comparison, recommendation, and workflow execution.

Provides utilities to help manage LLM usage costs and select appropriate models.
"""
import asyncio
import json
import os
import time
import traceback
from typing import Any, Dict, List, Optional, Set

import networkx as nx

from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS
from ultimate_mcp_server.exceptions import ToolError, ToolInputError
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.tools.completion import chat_completion
from ultimate_mcp_server.tools.document_conversion_and_processing import (
    chunk_document,
    summarize_document,
)
from ultimate_mcp_server.tools.extraction import extract_json
from ultimate_mcp_server.tools.rag import (
    add_documents,
    create_knowledge_base,
    generate_with_rag,
    retrieve_context,
)
from ultimate_mcp_server.tools.text_classification import text_classification
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.text import count_tokens

logger = get_logger("ultimate_mcp_server.tools.optimization")

# --- Constants for Speed Score Mapping ---
# Define bins for mapping tokens/second to a 1-5 score (lower is faster)
# Adjust these thresholds based on observed performance and desired sensitivity
SPEED_SCORE_BINS = [
    (200, 1),  # > 200 tokens/s -> Score 1 (Fastest)
    (100, 2),  # 100-200 tokens/s -> Score 2
    (50, 3),   # 50-100 tokens/s -> Score 3
    (20, 4),   # 20-50 tokens/s -> Score 4
    (0, 5),    # 0-20 tokens/s -> Score 5 (Slowest)
]
DEFAULT_SPEED_SCORE = 3 # Fallback score if measurement is missing/invalid or hardcoded value is missing

def _map_tok_per_sec_to_score(tokens_per_sec: float) -> int:
    """Maps measured tokens/second to a 1-5 speed score (lower is faster)."""
    if tokens_per_sec is None or not isinstance(tokens_per_sec, (int, float)) or tokens_per_sec < 0:
        return DEFAULT_SPEED_SCORE # Return default for invalid input
    for threshold, score in SPEED_SCORE_BINS:
        if tokens_per_sec >= threshold:
            return score
    return SPEED_SCORE_BINS[-1][1] # Should hit the 0 threshold if positive

@with_tool_metrics
@with_error_handling
async def estimate_cost(
    prompt: str,
    model: str, # Can be full 'provider/model_name' or just 'model_name' if unique
    max_tokens: Optional[int] = None,
    include_output: bool = True
) -> Dict[str, Any]:
    """Estimates the monetary cost of an LLM request without executing it.

    Calculates cost based on input prompt tokens and estimated/specified output tokens
    using predefined cost rates for the specified model.

    Args:
        prompt: The text prompt that would be sent to the model.
        model: The model identifier (e.g., "openai/gpt-4.1-mini", "gpt-4.1-mini",
               "anthropic/claude-3-5-haiku-20241022", "claude-3-5-haiku-20241022").
               Cost data must be available for the resolved model name in `COST_PER_MILLION_TOKENS`.
        max_tokens: (Optional) The maximum number of tokens expected in the output. If None,
                      output tokens are estimated as roughly half the input prompt tokens.
        include_output: (Optional) If False, calculates cost based only on input tokens, ignoring
                        `max_tokens` or output estimation. Defaults to True.

    Returns:
        A dictionary containing the cost estimate and token breakdown:
        {
            "cost": 0.000150, # Total estimated cost in USD
            "breakdown": {
                "input_cost": 0.000100,
                "output_cost": 0.000050
            },
            "tokens": {
                "input": 200,   # Tokens counted from the prompt
                "output": 100,  # Estimated or provided max_tokens
                "total": 300
            },
            "rate": {         # Cost per million tokens for this model
                "input": 0.50,
                "output": 1.50
            },
            "model": "gpt-4.1-mini", # Returns the original model string passed as input
            "resolved_model_key": "gpt-4.1-mini", # The key used for cost lookup
            "is_estimate": true
        }

    Raises:
        ToolInputError: If prompt or model format is invalid.
        ToolError: If the specified `model` cannot be resolved to cost data.
        ValueError: If token counting fails for the given model and prompt.
    """
    # Input validation
    if not prompt or not isinstance(prompt, str):
        raise ToolInputError("Prompt must be a non-empty string.")
    if not model or not isinstance(model, str):
        raise ToolInputError("Model must be a non-empty string.")

    # Flexible Cost Data Lookup
    cost_data = COST_PER_MILLION_TOKENS.get(model)
    resolved_model_key = model # Assume direct match first
    model_name_only = model # Use input model for token counting initially

    if not cost_data and '/' in model:
        # If direct lookup fails and it looks like a prefixed name, try stripping prefix
        potential_short_key = model.split('/')[-1]
        cost_data = COST_PER_MILLION_TOKENS.get(potential_short_key)
        if cost_data:
            resolved_model_key = potential_short_key
            model_name_only = potential_short_key # Use short name for token count
        # If short key also fails, cost_data remains None

    if not cost_data:
        error_message = f"Unknown model or cost data unavailable for: {model}"
        raise ToolError(error_message, error_code="MODEL_NOT_FOUND", details={"model": model})

    # Token Counting (use model_name_only derived from successful cost key)
    try:
        input_tokens = count_tokens(prompt, model=model_name_only)
    except ValueError as e:
        # Log warning with the original model input for clarity
        logger.warning(f"Could not count tokens for model '{model}' (using '{model_name_only}' for tiktoken): {e}. Using rough estimate.")
        input_tokens = len(prompt) // 4 # Fallback estimate

    # Estimate output tokens if needed
    estimated_output_tokens = 0
    if include_output:
        if max_tokens is not None:
            estimated_output_tokens = max_tokens
        else:
            estimated_output_tokens = input_tokens // 2
            logger.debug(f"max_tokens not provided, estimating output tokens as {estimated_output_tokens}")
    else:
         estimated_output_tokens = 0

    # Calculate costs
    input_cost = (input_tokens / 1_000_000) * cost_data["input"]
    output_cost = (estimated_output_tokens / 1_000_000) * cost_data["output"]
    total_cost = input_cost + output_cost

    logger.info(f"Estimated cost for model '{model}' (using key '{resolved_model_key}'): ${total_cost:.6f} (In: {input_tokens} tokens, Out: {estimated_output_tokens} tokens)")
    return {
        "cost": total_cost,
        "breakdown": {
            "input_cost": input_cost,
            "output_cost": output_cost
        },
        "tokens": {
            "input": input_tokens,
            "output": estimated_output_tokens,
            "total": input_tokens + estimated_output_tokens
        },
        "rate": {
            "input": cost_data["input"],
            "output": cost_data["output"]
        },
        "model": model, # Return original input model string
        "resolved_model_key": resolved_model_key, # Key used for cost lookup
        "is_estimate": True
    }

@with_tool_metrics
@with_error_handling
async def compare_models(
    prompt: str,
    models: List[str], # List of model IDs (can be short or full names)
    max_tokens: Optional[int] = None,
    include_output: bool = True
) -> Dict[str, Any]:
    """Compares the estimated cost of running a prompt across multiple specified models.

    Uses the `estimate_cost` tool for each model in the list concurrently.

    Args:
        prompt: The text prompt to use for cost comparison.
        models: A list of model identifiers (e.g., ["openai/gpt-4.1-mini", "gpt-4.1-mini", "claude-3-5-haiku-20241022"]).
                `estimate_cost` will handle resolving these to cost data.
        max_tokens: (Optional) Maximum output tokens to assume for cost estimation across all models.
                      If None, output is estimated individually per model based on input.
        include_output: (Optional) Whether to include estimated output costs in the comparison. Defaults to True.

    Returns:
        A dictionary containing the cost comparison results:
        {
            "models": {
                "openai/gpt-4.1-mini": { # Uses the input model name as key
                    "cost": 0.000150,
                    "tokens": { "input": 200, "output": 100, "total": 300 }
                },
                "claude-3-5-haiku-20241022": {
                    "cost": 0.000087,
                    "tokens": { "input": 200, "output": 100, "total": 300 }
                },
                "some-unknown-model": { # Example of an error during estimation
                    "error": "Unknown model or cost data unavailable for: some-unknown-model"
                }
            },
            "ranking": [ # List of input model names ordered by cost (cheapest first), errors excluded
                "claude-3-5-haiku-20241022",
                "openai/gpt-4.1-mini"
            ],
            "cheapest": "claude-3-5-haiku-20241022", # Input model name with the lowest cost
            "most_expensive": "openai/gpt-4.1-mini", # Input model name with the highest cost
            "prompt_length_chars": 512,
            "max_tokens_assumed": 100
        }

    Raises:
        ToolInputError: If the `models` list is empty.
    """
    if not models or not isinstance(models, list):
        raise ToolInputError("'models' must be a non-empty list of model identifiers.")
    # Removed the check for '/' in model names - estimate_cost will handle resolution

    results = {}
    estimated_output_for_summary = None

    async def get_estimate(model_input_name): # Use a distinct variable name
        nonlocal estimated_output_for_summary
        try:
            estimate = await estimate_cost(
                prompt=prompt,
                model=model_input_name, # Pass the potentially short/full name
                max_tokens=max_tokens,
                include_output=include_output
            )
            # Use the original input name as the key in results
            results[model_input_name] = {
                "cost": estimate["cost"],
                "tokens": estimate["tokens"],
            }
            if estimated_output_for_summary is None:
                estimated_output_for_summary = estimate["tokens"]["output"]
        except ToolError as e:
            logger.warning(f"Could not estimate cost for model {model_input_name}: {e.detail}")
            results[model_input_name] = {"error": e.detail} # Store error under original name
        except Exception as e:
            logger.error(f"Unexpected error estimating cost for model {model_input_name}: {e}", exc_info=True)
            results[model_input_name] = {"error": f"Unexpected error: {str(e)}"}

    await asyncio.gather(*(get_estimate(model_name) for model_name in models))

    successful_estimates = {m: r for m, r in results.items() if "error" not in r}
    sorted_models = sorted(successful_estimates.items(), key=lambda item: item[1]["cost"])

    output_tokens_summary = estimated_output_for_summary if max_tokens is None else max_tokens
    if not include_output:
         output_tokens_summary = 0

    cheapest_model = sorted_models[0][0] if sorted_models else None
    most_expensive_model = sorted_models[-1][0] if sorted_models else None
    logger.info(f"Compared models: {list(results.keys())}. Cheapest: {cheapest_model or 'N/A'}")

    return {
        "models": results,
        "ranking": [m for m, _ in sorted_models], # Ranking uses original input names
        "cheapest": cheapest_model,
        "most_expensive": most_expensive_model,
        "prompt_length_chars": len(prompt),
        "max_tokens_assumed": output_tokens_summary,
    }

@with_tool_metrics
@with_error_handling
async def recommend_model(
    task_type: str,
    expected_input_length: int, # In characters
    expected_output_length: Optional[int] = None, # In characters
    required_capabilities: Optional[List[str]] = None,
    max_cost: Optional[float] = None,
    priority: str = "balanced" # Options: "cost", "quality", "speed", "balanced"
) -> Dict[str, Any]:
    """Recommends suitable LLM models based on task requirements and optimization priority.

    Evaluates known models against criteria like task type suitability (inferred),
    estimated cost (based on expected lengths), required capabilities,
    measured speed (tokens/sec if available), and quality metrics.

    Args:
        task_type: A description of the task (e.g., "summarization", "code generation", "entity extraction",
                   "customer support chat", "complex reasoning question"). Used loosely for capability checks.
        expected_input_length: Estimated length of the input text in characters.
        expected_output_length: (Optional) Estimated length of the output text in characters.
                                If None, it's roughly estimated based on input length.
        required_capabilities: (Optional) A list of specific capabilities the model MUST possess.
                               Current known capabilities include: "reasoning", "coding", "knowledge",
                               "instruction-following", "math". Check model metadata for supported values.
                               Example: ["coding", "instruction-following"]
        max_cost: (Optional) The maximum acceptable estimated cost (in USD) for a single run
                  with the expected input/output lengths. Models exceeding this are excluded.
        priority: (Optional) The primary factor for ranking suitable models.
                  Options:
                  - "cost": Prioritize the cheapest models.
                  - "quality": Prioritize models with the highest quality score.
                  - "speed": Prioritize models with the highest measured speed (tokens/sec).
                  - "balanced": (Default) Attempt to find a good mix of cost, quality, and speed.

    Returns:
        A dictionary containing model recommendations:
        {
            "recommendations": [
                {
                    "model": "anthropic/claude-3-5-haiku-20241022",
                    "estimated_cost": 0.000087,
                    "quality_score": 7,
                    "measured_speed_tps": 50.63, # Tokens per second
                    "capabilities": ["knowledge", "instruction-following"],
                    "reason": "Good balance of cost and speed, meets requirements."
                },
                {
                    "model": "openai/gpt-4.1-mini",
                    "estimated_cost": 0.000150,
                    "quality_score": 7,
                    "measured_speed_tps": 112.06,
                    "capabilities": ["reasoning", "coding", ...],
                    "reason": "Higher cost, but good quality/speed."
                }
                # ... other suitable models
            ],
            "parameters": { # Input parameters for context
                "task_type": "summarization",
                "expected_input_length": 2000,
                "expected_output_length": 500,
                "required_capabilities": [],
                "max_cost": 0.001,
                "priority": "balanced"
            },
            "excluded_models": { # Models evaluated but excluded, with reasons
                 "anthropic/claude-3-opus-20240229": "Exceeds max cost ($0.0015 > $0.001)",
                 "some-other-model": "Missing required capabilities: ['coding']"
            }
        }

    Raises:
        ToolInputError: If priority is invalid or lengths are non-positive.
    """
    if expected_input_length <= 0:
        raise ToolInputError("expected_input_length must be positive.")
    if expected_output_length is not None and expected_output_length <= 0:
        raise ToolInputError("expected_output_length must be positive if provided.")
    if priority not in ["cost", "quality", "speed", "balanced"]:
        raise ToolInputError(f"Invalid priority: '{priority}'. Must be cost, quality, speed, or balanced.")

    # --- Load Measured Speed Data ---
    measured_speeds: Dict[str, Any] = {}
    measured_speeds_file = "empirically_measured_model_speeds.json"
    project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    filepath = os.path.join(project_root, measured_speeds_file)
    if os.path.exists(filepath):
        try:
            with open(filepath, 'r') as f:
                measured_speeds = json.load(f)
            logger.info(f"Successfully loaded measured speed data from {filepath}")
        except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
            logger.warning(f"Could not load or parse measured speed data from {filepath}: {e}. Speed data will be 0.", exc_info=True)
            measured_speeds = {}
    else:
        logger.info(f"Measured speed file not found at {filepath}. Speed data will be 0.")
    # --- End Load Measured Speed Data ---

    # --- Model Metadata (Updated based on provided images) ---
    model_capabilities = {
        # OpenAI models
        "openai/gpt-4o": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Assuming multimodal based on general knowledge
        "openai/gpt-4o-mini": ["reasoning", "knowledge", "instruction-following"],
        "openai/gpt-4.1": ["reasoning", "coding", "knowledge", "instruction-following", "math"],
        "openai/gpt-4.1-mini": ["reasoning", "coding", "knowledge", "instruction-following"],
        "openai/gpt-4.1-nano": ["reasoning", "knowledge", "instruction-following"], # Added reasoning
        "openai/o1-preview": ["reasoning", "coding", "knowledge", "instruction-following", "math"],
        "openai/o1": ["reasoning", "coding", "knowledge", "instruction-following", "math"], # Keep guess
        "openai/o3-mini": ["reasoning", "knowledge", "instruction-following"],

        # Anthropic models
        "anthropic/claude-3-opus-20240229": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"],
        "anthropic/claude-3-sonnet-20240229": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Previous Sonnet version
        "anthropic/claude-3-5-haiku-20241022": ["knowledge", "instruction-following", "multimodal"], # Based on 3.5 Haiku column
        "anthropic/claude-3-5-sonnet-20241022": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Based on 3.5 Sonnet column
        "anthropic/claude-3-7-sonnet-20250219": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Based on 3.7 Sonnet column

        # DeepSeek models
        "deepseek/deepseek-chat": ["coding", "knowledge", "instruction-following"],
        "deepseek/deepseek-reasoner": ["reasoning", "math", "instruction-following"],

        # Gemini models
        "gemini/gemini-2.0-flash-lite": ["knowledge", "instruction-following"],
        "gemini/gemini-2.0-flash": ["knowledge", "instruction-following", "multimodal"],
        "gemini/gemini-2.0-flash-thinking-exp-01-21": ["reasoning", "coding", "knowledge", "instruction-following", "multimodal"],
        "gemini/gemini-2.5-pro-preview-03-25": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Map from gemini-2.5-pro-preview-03-25

        # Grok models (Estimates)
        "grok/grok-3-latest": ["reasoning", "knowledge", "instruction-following", "math"],
        "grok/grok-3-fast-latest": ["reasoning", "knowledge", "instruction-following"],
        "grok/grok-3-mini-latest": ["knowledge", "instruction-following"],
        "grok/grok-3-mini-fast-latest": ["knowledge", "instruction-following"],

        # OpenRouter models
        # Note: Capabilities depend heavily on the underlying model proxied by OpenRouter.
        # This is a generic entry for the one model listed in constants.py.
        "openrouter/mistralai/mistral-nemo": ["knowledge", "instruction-following", "coding"] # Estimate based on Mistral family
    }

    model_speed_fallback = {}

    model_quality = {
        "openai/gpt-4o": 8, # Updated
        "openai/gpt-4.1-mini": 7,
        "openai/gpt-4o-mini": 6,
        "openai/gpt-4.1": 8,
        "openai/gpt-4.1-nano": 5,
        "openai/o1-preview": 10,
        "openai/o3-mini": 7,

        "anthropic/claude-3-opus-20240229": 10,
        "anthropic/claude-3-sonnet-20240229": 8,
        "anthropic/claude-3-5-haiku-20241022": 7,
        "anthropic/claude-3-5-sonnet-20241022": 9,
        "anthropic/claude-3-7-sonnet-20250219": 10,

        "deepseek/deepseek-chat": 7,
        "deepseek/deepseek-reasoner": 8,

        "gemini/gemini-2.0-flash-lite": 5,
        "gemini/gemini-2.0-flash": 6,
        "gemini/gemini-2.0-flash-thinking-exp-01-21": 6,
        "gemini/gemini-2.5-pro-preview-03-25": 9,

        # Grok models (Estimates: 1-10 scale)
        "grok/grok-3-latest": 9,
        "grok/grok-3-fast-latest": 8,
        "grok/grok-3-mini-latest": 6,
        "grok/grok-3-mini-fast-latest": 6,

        # OpenRouter models (Estimates: 1-10 scale)
        "openrouter/mistralai/mistral-nemo": 7 # Estimate based on Mistral family
    }
    # --- End Model Metadata --- 

    # --- Pre-calculate model metadata lookups ---
    # Combine all known prefixed model names from metadata sources
    all_prefixed_metadata_keys = set(model_capabilities.keys()) | set(model_speed_fallback.keys()) | set(model_quality.keys())

    # Create a map from short names (e.g., "gpt-4.1-mini") to prefixed names (e.g., "openai/gpt-4.1-mini")
    # Handle potential ambiguities (same short name from different providers)
    short_to_prefixed_map: Dict[str, Optional[str]] = {}
    ambiguous_short_names = set()

    for key in all_prefixed_metadata_keys:
        if '/' in key:
            short_name = key.split('/')[-1]
            if short_name in short_to_prefixed_map:
                # Ambiguity detected
                if short_name not in ambiguous_short_names:
                     logger.warning(f"Ambiguous short model name '{short_name}' found. Maps to '{short_to_prefixed_map[short_name]}' and '{key}'. Will require full name for this model.")
                     short_to_prefixed_map[short_name] = None # Mark as ambiguous
                     ambiguous_short_names.add(short_name)
            elif short_name not in ambiguous_short_names:
                 short_to_prefixed_map[short_name] = key # Store unique mapping

    # Helper function to find the prefixed name for a cost key (using pre-calculated map)
    _prefixed_name_cache = {}
    def _get_prefixed_name_for_cost_key(cost_key: str) -> Optional[str]:
        if cost_key in _prefixed_name_cache:
            return _prefixed_name_cache[cost_key]

        # If the key is already prefixed, use it directly
        if '/' in cost_key:
             if cost_key in all_prefixed_metadata_keys:
                 _prefixed_name_cache[cost_key] = cost_key
                 return cost_key
             else:
                  # Even if prefixed, if it's not in our known metadata, treat as unknown for consistency
                  logger.warning(f"Prefixed cost key '{cost_key}' not found in any known metadata (capabilities, quality, speed).")
                  _prefixed_name_cache[cost_key] = None
                  return None

        # Look up the short name in the pre-calculated map
        prefixed_name = short_to_prefixed_map.get(cost_key)

        if prefixed_name is not None: # Found unique mapping
            _prefixed_name_cache[cost_key] = prefixed_name
            return prefixed_name
        elif cost_key in ambiguous_short_names: # Known ambiguous name
            logger.warning(f"Cannot resolve ambiguous short name '{cost_key}'. Please use the full 'provider/model_name' identifier.")
            _prefixed_name_cache[cost_key] = None
            return None
        else: # Short name not found in any metadata
             logger.warning(f"Short name cost key '{cost_key}' not found in any known model metadata. Cannot determine provider/full name.")
             _prefixed_name_cache[cost_key] = None
             return None
    # --- End Pre-calculation ---

    # Use a simple placeholder text based on length for cost estimation
    sample_text = "a" * expected_input_length
    required_capabilities = required_capabilities or []

    # Rough estimate for output length if not provided
    if expected_output_length is None:
        # Adjust this heuristic as needed (e.g., summarization shortens, generation might lengthen)
        estimated_output_length_chars = expected_input_length // 4
    else:
         estimated_output_length_chars = expected_output_length
    # Estimate max_tokens based on character length (very rough)
    estimated_max_tokens = estimated_output_length_chars // 3

    candidate_models_data = []
    excluded_models_reasons = {}
    all_cost_keys = list(COST_PER_MILLION_TOKENS.keys())

    async def evaluate_model(cost_key: str):
        # 1. Find prefixed name
        prefixed_model_name = _get_prefixed_name_for_cost_key(cost_key)
        if not prefixed_model_name:
             excluded_models_reasons[cost_key] = "Could not reliably determine provider/full name for metadata lookup."
             return

        # 2. Check capabilities
        capabilities = model_capabilities.get(prefixed_model_name, [])
        missing_caps = [cap for cap in required_capabilities if cap not in capabilities]
        if missing_caps:
            excluded_models_reasons[prefixed_model_name] = f"Missing required capabilities: {missing_caps}"
            return

        # 3. Estimate cost
        try:
            cost_estimate = await estimate_cost(
                prompt=sample_text,
                model=cost_key, # Use the key from COST_PER_MILLION_TOKENS
                max_tokens=estimated_max_tokens,
                include_output=True
            )
            estimated_cost_value = cost_estimate["cost"]
        except ToolError as e:
            excluded_models_reasons[prefixed_model_name] = f"Cost estimation failed: {e.detail}"
            return
        except Exception as e:
            logger.error(f"Unexpected error estimating cost for {cost_key} (prefixed: {prefixed_model_name}) in recommendation: {e}", exc_info=True)
            excluded_models_reasons[prefixed_model_name] = f"Cost estimation failed unexpectedly: {str(e)}"
            return

        # 4. Check max cost constraint
        if max_cost is not None and estimated_cost_value > max_cost:
            excluded_models_reasons[prefixed_model_name] = f"Exceeds max cost (${estimated_cost_value:.6f} > ${max_cost:.6f})"
            return

        # --- 5. Get Measured Speed (Tokens/Second) ---
        measured_tps = 0.0 # Default to 0.0 if no data
        speed_source = "unavailable"

        measured_data = measured_speeds.get(prefixed_model_name) or measured_speeds.get(cost_key)

        if measured_data and isinstance(measured_data, dict) and "error" not in measured_data:
            tokens_per_sec = measured_data.get("output_tokens_per_second")
            if tokens_per_sec is not None and isinstance(tokens_per_sec, (int, float)) and tokens_per_sec >= 0:
                measured_tps = float(tokens_per_sec)
                speed_source = f"measured ({measured_tps:.1f} t/s)"
            else:
                 speed_source = "no t/s in measurement"
        elif measured_data and "error" in measured_data:
                 speed_source = "measurement error"

        logger.debug(f"Speed for {prefixed_model_name}: {measured_tps:.1f} t/s (Source: {speed_source})")
        # --- End Get Measured Speed ---

        # 6. Gather data for scoring
        candidate_models_data.append({
            "model": prefixed_model_name,
            "cost_key": cost_key,
            "cost": estimated_cost_value,
            "quality": model_quality.get(prefixed_model_name, 5),
            "measured_speed_tps": measured_tps, # Store raw TPS
            "capabilities": capabilities,
            "speed_source": speed_source # Store source for potential debugging/output
        })

    # Evaluate all models
    await asyncio.gather(*(evaluate_model(key) for key in all_cost_keys))

    # --- Scoring Logic (Updated for raw TPS) ---
    def calculate_score(model_data, min_cost, cost_range, min_tps, tps_range):
        cost = model_data['cost']
        quality = model_data['quality']
        measured_tps = model_data['measured_speed_tps']

        # Normalize cost (1 is cheapest, 0 is most expensive)
        norm_cost_score = 1.0 - ((cost - min_cost) / cost_range) if cost_range > 0 else 1.0

        # Normalize quality (scale 1-10)
        norm_quality_score = quality / 10.0

        # Normalize speed (measured TPS - higher is better)
        # (1 is fastest, 0 is slowest/0)
        norm_speed_score_tps = (measured_tps - min_tps) / tps_range if tps_range > 0 else 0.0

        # Calculate final score based on priority
        if priority == "cost":
            # Lower weight for speed if using TPS, as cost is main driver
            score = norm_cost_score * 0.7 + norm_quality_score * 0.2 + norm_speed_score_tps * 0.1
        elif priority == "quality":
            score = norm_cost_score * 0.15 + norm_quality_score * 0.7 + norm_speed_score_tps * 0.15
        elif priority == "speed":
            score = norm_cost_score * 0.1 + norm_quality_score * 0.2 + norm_speed_score_tps * 0.7
        else: # balanced
            score = norm_cost_score * 0.34 + norm_quality_score * 0.33 + norm_speed_score_tps * 0.33

        return score
    # --- End Scoring Logic ---

    # Calculate scores for all candidates
    if not candidate_models_data:
        logger.warning("No candidate models found after filtering.")
    else:
        # Get min/max for normalization *before* scoring loop
        all_costs = [m['cost'] for m in candidate_models_data if m['cost'] > 0]
        min_cost = min(all_costs) if all_costs else 0.000001
        max_cost_found = max(all_costs) if all_costs else 0.000001
        cost_range = max_cost_found - min_cost

        all_tps = [m['measured_speed_tps'] for m in candidate_models_data]
        min_tps = min(all_tps) if all_tps else 0.0
        max_tps_found = max(all_tps) if all_tps else 0.0
        tps_range = max_tps_found - min_tps

        for model_data in candidate_models_data:
            # Pass normalization ranges to scoring function
            model_data['score'] = calculate_score(model_data, min_cost, cost_range, min_tps, tps_range)

    # Sort candidates by score (highest first)
    sorted_candidates = sorted(candidate_models_data, key=lambda x: x.get('score', 0), reverse=True)

    # Format recommendations
    recommendations_list = []
    if candidate_models_data:
        # Get min/max across candidates *after* filtering
        min_candidate_cost = min(m['cost'] for m in candidate_models_data)
        max_candidate_quality = max(m['quality'] for m in candidate_models_data)
        max_candidate_tps = max(m['measured_speed_tps'] for m in candidate_models_data)

        for cand in sorted_candidates:
            reason = f"High overall score ({cand['score']:.2f}) according to '{priority}' priority."
            # Adjust reason phrasing for TPS
            if priority == 'cost' and cand['cost'] <= min_candidate_cost:
                reason = f"Lowest estimated cost (${cand['cost']:.6f}) and meets requirements."
            elif priority == 'quality' and cand['quality'] >= max_candidate_quality:
                 reason = f"Highest quality score ({cand['quality']}/10) and meets requirements."
            elif priority == 'speed' and cand['measured_speed_tps'] >= max_candidate_tps:
                 reason = f"Fastest measured speed ({cand['measured_speed_tps']:.1f} t/s) and meets requirements."

            recommendations_list.append({
                "model": cand['model'],
                "estimated_cost": cand['cost'],
                "quality_score": cand['quality'],
                "measured_speed_tps": cand['measured_speed_tps'], # Add raw TPS
                "capabilities": cand['capabilities'],
                "reason": reason
            })

    logger.info(f"Recommended models (priority: {priority}): {[r['model'] for r in recommendations_list]}")
    return {
        "recommendations": recommendations_list,
        "parameters": { # Include input parameters for context
             "task_type": task_type,
             "expected_input_length": expected_input_length,
             "expected_output_length": estimated_output_length_chars,
             "required_capabilities": required_capabilities,
             "max_cost": max_cost,
             "priority": priority
         },
        "excluded_models": excluded_models_reasons
    }

@with_tool_metrics
@with_error_handling
async def execute_optimized_workflow(
    documents: Optional[List[str]] = None, # Make documents optional, workflow might not need them
    workflow: List[Dict[str, Any]] = None, # Require workflow definition
    max_concurrency: int = 5
) -> Dict[str, Any]:
    """Executes a predefined workflow consisting of multiple tool calls.

    Processes a list of documents (optional) through a sequence of stages defined in the workflow.
    Handles dependencies between stages (output of one stage as input to another) and allows
    for concurrent execution of independent stages or document processing within stages.

    Args:
        documents: (Optional) A list of input document strings. Required if the workflow references
                   'documents' as input for any stage.
        workflow: A list of dictionaries, where each dictionary defines a stage (a tool call).
                  Required keys per stage:
                  - `stage_id`: A unique identifier for this stage (e.g., "summarize_chunks").
                  - `tool_name`: The name of the tool function to call (e.g., "summarize_document").
                  - `params`: A dictionary of parameters to pass to the tool function.
                     Parameter values can be literal values (strings, numbers, lists) or references
                     to outputs from previous stages using the format `"${stage_id}.output_key"`
                     (e.g., `{"text": "${chunk_stage}.chunks"}`).
                     Special inputs: `"${documents}"` refers to the input `documents` list.
                  Optional keys per stage:
                  - `depends_on`: A list of `stage_id`s that must complete before this stage starts.
                  - `iterate_on`: The key from a previous stage's output list over which this stage
                                  should iterate (e.g., `"${chunk_stage}.chunks"`). The tool will be
                                  called once for each item in the list.
                  - `optimization_hints`: (Future use) Hints for model selection or cost saving for this stage.
        max_concurrency: (Optional) The maximum number of concurrent tasks (tool calls) to run.
                         Defaults to 5.

    Returns:
        A dictionary containing the results of all successful workflow stages:
        {
            "success": true,
            "results": {
                "chunk_stage": { "output": { "chunks": ["chunk1...", "chunk2..."] } },
                "summarize_chunks": { # Example of an iterated stage
                     "output": [
                         { "summary": "Summary of chunk 1..." },
                         { "summary": "Summary of chunk 2..." }
                     ]
                },
                "final_summary": { "output": { "summary": "Overall summary..." } }
            },
            "status": "Workflow completed successfully.",
            "total_processing_time": 15.8
        }
        or an error dictionary if the workflow fails:
        {
            "success": false,
            "results": { ... }, # Results up to the point of failure
            "status": "Workflow failed at stage 'stage_id'.",
            "error": "Error details from the failed stage...",
            "total_processing_time": 8.2
        }

    Raises:
        ToolInputError: If the workflow definition is invalid (missing keys, bad references,
                        circular dependencies - basic checks).
        ToolError: If a tool call within the workflow fails.
        Exception: For unexpected errors during workflow execution.
    """
    start_time = time.time()
    if not workflow or not isinstance(workflow, list):
        raise ToolInputError("'workflow' must be a non-empty list of stage dictionaries.")

    # --- Tool Mapping --- (Dynamically import or map tool names to functions)
    # Ensure all tools listed in workflows are mapped here correctly.
    
    try:
        api_meta_tool = None # Placeholder - this needs to be the actual instance
        
        if api_meta_tool: # Only add if instance is available
             meta_api_tools = {
                 "register_api": api_meta_tool.register_api,
                 "list_registered_apis": api_meta_tool.list_registered_apis,
                 "get_api_details": api_meta_tool.get_api_details,
                 "unregister_api": api_meta_tool.unregister_api,
                 "call_dynamic_tool": api_meta_tool.call_dynamic_tool,
                 "refresh_api": api_meta_tool.refresh_api,
                 "get_tool_details": api_meta_tool.get_tool_details,
                 "list_available_tools": api_meta_tool.list_available_tools,
             }
        else:
            logger.warning("APIMetaTool instance not available in execute_optimized_workflow. Meta API tools will not be callable in workflows.")
            meta_api_tools = {}
    except ImportError:
        logger.warning("APIMetaTool not found (meta_api_tool.py). Meta API tools cannot be used in workflows.")
        meta_api_tools = {}
        
    # Import extract_entity_graph lazily to avoid circular imports
    try:
        from ultimate_mcp_server.tools.entity_relation_graph import extract_entity_graph
    except ImportError:
        logger.warning("entity_relation_graph module not found. extract_entity_graph will not be available in workflows.")
        extract_entity_graph = None
        
    tool_functions = {
        # Core Gateway Tools
        "estimate_cost": estimate_cost,
        "compare_models": compare_models,
        "recommend_model": recommend_model,
        "chat_completion": chat_completion,
        "chunk_document": chunk_document,
        "summarize_document": summarize_document,
        "extract_json": extract_json,
        # Add extract_entity_graph conditionally
        **({"extract_entity_graph": extract_entity_graph} if extract_entity_graph else {}),
        # RAG Tools
        "create_knowledge_base": create_knowledge_base,
        "add_documents": add_documents,
        "retrieve_context": retrieve_context,
        "generate_with_rag": generate_with_rag,
        # Classification tools
        "text_classification": text_classification,
        
        # Merge Meta API tools
        **meta_api_tools,
        
        # Add other tools as needed...
    }

    # --- Advanced Workflow Validation Using NetworkX ---
    # Build directed graph from workflow
    dag = nx.DiGraph()
    
    # Add all stages as nodes
    for i, stage in enumerate(workflow):
        # Validate required keys
        if not all(k in stage for k in ["stage_id", "tool_name", "params"]):
            raise ToolInputError(f"Workflow stage {i} missing required keys (stage_id, tool_name, params).")
        
        stage_id = stage["stage_id"]
        
        # Validate params is a dictionary
        if not isinstance(stage["params"], dict):
            raise ToolInputError(f"Stage '{stage_id}' params must be a dictionary.")
        
        # Check for duplicate stage IDs
        if stage_id in dag:
            raise ToolInputError(f"Duplicate stage_id found: '{stage_id}'.")
        
        # Validate tool exists
        tool_name = stage["tool_name"]
        if tool_name not in tool_functions:
            raise ToolInputError(f"Unknown tool '{tool_name}' specified in stage '{stage_id}'.")
        
        # Validate depends_on is a list
        depends_on = stage.get("depends_on", [])
        if not isinstance(depends_on, list):
            raise ToolInputError(f"Stage '{stage_id}' depends_on must be a list.")
        
        # Add node with full stage data
        dag.add_node(stage_id, stage=stage)
    
    # Add dependency edges
    for stage in workflow:
        stage_id = stage["stage_id"]
        depends_on = stage.get("depends_on", [])
        
        for dep_id in depends_on:
            if dep_id not in dag:
                raise ToolInputError(f"Stage '{stage_id}' depends on non-existent stage '{dep_id}'.")
            dag.add_edge(dep_id, stage_id)
    
    # Detect circular dependencies
    try:
        cycles = list(nx.simple_cycles(dag))
        if cycles:
            cycle_str = " -> ".join(cycles[0]) + " -> " + cycles[0][0]
            raise ToolInputError(f"Circular dependency detected in workflow: {cycle_str}")
    except nx.NetworkXNoCycle:
        # No cycles found, this is good
        pass
    
    # Dictionary to store results of each stage
    stage_results: Dict[str, Any] = {}
    # Set to keep track of completed stages
    completed_stages: Set[str] = set()
    # Dictionary to hold active tasks
    active_tasks: Dict[str, asyncio.Task] = {}  # noqa: F841
    # Semaphore to control concurrency
    concurrency_semaphore = asyncio.Semaphore(max_concurrency)
    
    # --- Workflow Execution Logic with NetworkX ---
    async def execute_stage(stage_id: str) -> None:
        """Execute a single workflow stage."""
        async with concurrency_semaphore:
            # Get stage definition
            stage = dag.nodes[stage_id]["stage"]
            tool_name = stage["tool_name"]
            params = stage["params"]
            iterate_on_ref = stage.get("iterate_on")
            
            logger.info(f"Starting workflow stage '{stage_id}' (Tool: {tool_name})")
            
            tool_func = tool_functions[tool_name]
            
            try:
                # Resolve parameters and handle iteration
                resolved_params, is_iteration, iteration_list = _resolve_params(
                    stage_id, params, iterate_on_ref, stage_results, documents
                )
                
                # Execute tool function(s)
                if is_iteration:
                    # Handle iteration case
                    iteration_tasks = []
                    
                    for i, item in enumerate(iteration_list):
                        # Create a new semaphore release for each iteration to allow other stages to run
                        # while keeping track of total concurrency
                        async def run_iteration(item_idx, item_value):
                            async with concurrency_semaphore:
                                iter_params = _inject_iteration_item(resolved_params, item_value)
                                try:
                                    result = await tool_func(**iter_params)
                                    return result
                                except Exception as e:
                                    # Capture exception details for individual iteration
                                    error_msg = f"Iteration {item_idx} failed: {type(e).__name__}: {str(e)}"
                                    logger.error(error_msg, exc_info=True)
                                    raise  # Re-raise to be caught by gather
                        
                        task = asyncio.create_task(run_iteration(i, item))
                        iteration_tasks.append(task)
                    
                    # Gather all iteration results (may raise if any iteration fails)
                    results = await asyncio.gather(*iteration_tasks)
                    stage_results[stage_id] = {"output": results}
                else:
                    # Single execution case
                    result = await tool_func(**resolved_params)
                    stage_results[stage_id] = {"output": result}
                
                # Mark stage as completed
                completed_stages.add(stage_id)
                logger.info(f"Workflow stage '{stage_id}' completed successfully")
                
            except Exception as e:
                error_msg = f"Workflow failed at stage '{stage_id}'. Error: {type(e).__name__}: {str(e)}"
                logger.error(error_msg, exc_info=True)
                stage_results[stage_id] = {
                    "error": error_msg,
                    "traceback": traceback.format_exc()
                }
                # Re-raise to signal failure to main execution loop
                raise
    
    async def execute_dag() -> Dict[str, Any]:
        """Execute the entire workflow DAG with proper dependency handling."""
        try:
            # Start with a topological sort to get execution order respecting dependencies
            try:
                execution_order = list(nx.topological_sort(dag))
                logger.debug(f"Workflow execution order (respecting dependencies): {execution_order}")
            except nx.NetworkXUnfeasible as e:
                # Should never happen as we already checked for cycles
                raise ToolInputError("Workflow contains circular dependencies that were not detected earlier.") from e
            
            # Process stages in waves of parallelizable tasks
            while len(completed_stages) < len(dag):
                # Find stages ready to execute (all dependencies satisfied)
                ready_stages = [
                    stage_id for stage_id in execution_order
                    if (stage_id not in completed_stages and 
                        all(pred in completed_stages for pred in dag.predecessors(stage_id)))
                ]
                
                if not ready_stages:
                    if len(completed_stages) < len(dag):
                        # This should never happen with a valid DAG that was topologically sorted
                        unfinished = set(execution_order) - completed_stages
                        logger.error(f"Workflow execution stalled. Unfinished stages: {unfinished}")
                        raise ToolError("Workflow execution stalled due to unresolvable dependencies.")
                    break
                
                # Launch tasks for all ready stages
                tasks = [execute_stage(stage_id) for stage_id in ready_stages]
                
                # Wait for all tasks to complete or for the first error
                try:
                    await asyncio.gather(*tasks)
                except Exception as e:
                    # Any stage failure will be caught here
                    # The specific error details are already in stage_results
                    logger.error(f"Workflow wave execution failed: {str(e)}")
                    
                    # Find the first failed stage for error reporting
                    failed_stage = next(
                        (s for s in ready_stages if s in stage_results and "error" in stage_results[s]),
                        ready_stages[0]  # Fallback if we can't identify the specific failed stage
                    )
                    
                    error_info = stage_results.get(failed_stage, {}).get("error", f"Unknown error in stage '{failed_stage}'")
                    
                    return {
                        "success": False,
                        "results": stage_results,
                        "status": f"Workflow failed at stage '{failed_stage}'.",
                        "error": error_info,
                        "total_processing_time": time.time() - start_time
                    }
                
                # If we reach here, all stages in this wave completed successfully
            
            # All stages completed successfully
            return {
                "success": True,
                "results": stage_results,
                "status": "Workflow completed successfully.",
                "total_processing_time": time.time() - start_time
            }
            
        except Exception as e:
            # Catch any unexpected errors in the main execution loop
            error_msg = f"Unexpected error in workflow execution: {type(e).__name__}: {str(e)}"
            logger.error(error_msg, exc_info=True)
            return {
                "success": False,
                "results": stage_results,
                "status": "Workflow failed with an unexpected error.",
                "error": error_msg,
                "total_processing_time": time.time() - start_time
            }
    
    # Execute the workflow DAG
    result = await execute_dag()
    
    total_time = time.time() - start_time
    if result["success"]:
        logger.info(f"Workflow completed successfully in {total_time:.2f}s")
    else:
        logger.error(f"Workflow failed after {total_time:.2f}s: {result.get('error', 'Unknown error')}")
    
    return result

# --- Helper functions for workflow execution --- 
# These need careful implementation for robustness

def _resolve_params(stage_id: str, params: Dict, iterate_on_ref: Optional[str], stage_results: Dict, documents: Optional[List[str]]) -> tuple[Dict, bool, Optional[List]]:
    """Resolves parameter values, handling references and iteration.
    Returns resolved_params, is_iteration, iteration_list.
    Raises ValueError on resolution errors.
    """
    resolved = {}
    is_iteration = False
    iteration_list = None
    iteration_param_name = None

    # Check for iteration first
    if iterate_on_ref:
         if not iterate_on_ref.startswith("${") or not iterate_on_ref.endswith("}"):
              raise ValueError(f"Invalid iterate_on reference format: '{iterate_on_ref}'")
         ref_key = iterate_on_ref[2:-1]
         
         if ref_key == "documents":
              if documents is None:
                   raise ValueError(f"Stage '{stage_id}' iterates on documents, but no documents were provided.")
              iteration_list = documents
         else:
              dep_stage_id, output_key = _parse_ref(ref_key)
              if dep_stage_id not in stage_results or "output" not in stage_results[dep_stage_id]:
                   raise ValueError(f"Dependency '{dep_stage_id}' for iteration not found or failed.")
              dep_output = stage_results[dep_stage_id]["output"]
              if not isinstance(dep_output, dict) or output_key not in dep_output:
                   raise ValueError(f"Output key '{output_key}' not found in dependency '{dep_stage_id}' for iteration.")
              iteration_list = dep_output[output_key]
              if not isinstance(iteration_list, list):
                  raise ValueError(f"Iteration target '{ref_key}' is not a list.")
         
         is_iteration = True
         # We still resolve other params, the iteration item is injected later
         logger.debug(f"Stage '{stage_id}' will iterate over {len(iteration_list)} items from '{iterate_on_ref}'")

    # Resolve individual parameters
    for key, value in params.items():
        if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
            ref_key = value[2:-1]
            if ref_key == "documents":
                 if documents is None:
                      raise ValueError(f"Parameter '{key}' references documents, but no documents provided.")
                 resolved[key] = documents
            else:
                dep_stage_id, output_key = _parse_ref(ref_key)
                if dep_stage_id not in stage_results or "output" not in stage_results[dep_stage_id]:
                    raise ValueError(f"Dependency '{dep_stage_id}' for parameter '{key}' not found or failed.")
                dep_output = stage_results[dep_stage_id]["output"]
                # Handle potential nested keys in output_key later if needed
                if not isinstance(dep_output, dict) or output_key not in dep_output:
                    raise ValueError(f"Output key '{output_key}' not found in dependency '{dep_stage_id}' for parameter '{key}'. Available keys: {list(dep_output.keys()) if isinstance(dep_output, dict) else 'N/A'}")
                resolved[key] = dep_output[output_key]
                # If this resolved param is the one we iterate on, store its name
                if is_iteration and iterate_on_ref == value:
                     iteration_param_name = key
        else:
            resolved[key] = value # Literal value
            
    # Validation: If iterating, one parameter must match the iterate_on reference
    if is_iteration and iteration_param_name is None:
         # This means iterate_on pointed to something not used directly as a param value
         # We need a convention here, e.g., assume the tool takes a list or find the param name
         # For now, let's assume the tool expects the *list* if iterate_on isn't directly a param value.
         # This might need refinement based on tool behavior. A clearer workflow definition could help.
         # Alternative: Raise error if iterate_on target isn't explicitly mapped to a param. 
         # logger.warning(f"Iteration target '{iterate_on_ref}' not directly mapped to a parameter in stage '{stage_id}'. Tool must handle list input.")
         # Let's require the iteration target to be mapped for clarity:
          raise ValueError(f"Iteration target '{iterate_on_ref}' must correspond to a parameter value in stage '{stage_id}'.")

    # Remove the iteration parameter itself from the base resolved params if iterating
    # It will be injected per-item later
    if is_iteration and iteration_param_name in resolved:
        del resolved[iteration_param_name] 
        resolved["_iteration_param_name"] = iteration_param_name # Store the name for injection

    return resolved, is_iteration, iteration_list

def _parse_ref(ref_key: str) -> tuple[str, str]:
    """Parses a reference like 'stage_id.output_key'"""
    parts = ref_key.split('.', 1)
    if len(parts) != 2:
        raise ValueError(f"Invalid reference format: '{ref_key}'. Expected 'stage_id.output_key'.")
    return parts[0], parts[1]

def _inject_iteration_item(base_params: Dict, item: Any) -> Dict:
     """Injects the current iteration item into the parameter dict."""
     injected_params = base_params.copy()
     iter_param_name = injected_params.pop("_iteration_param_name", None)
     if iter_param_name:
          injected_params[iter_param_name] = item
     else:
          # This case should be prevented by validation in _resolve_params
          logger.error("Cannot inject iteration item: Iteration parameter name not found in resolved params.")
          # Handle error appropriately, maybe raise
     return injected_params

async def _gather_iteration_results(stage_id: str, tasks: List[asyncio.Task]) -> List[Any]:
     """Gathers results from iteration sub-tasks. Raises exception if any sub-task failed."""
     results = []
     try:
          raw_results = await asyncio.gather(*tasks)
          # Assume each task returns the direct output dictionary
          results = list(raw_results) # gather preserves order
          logger.debug(f"Iteration stage '{stage_id}' completed with {len(results)} results.")
          return results
     except Exception:
          # If any sub-task failed, gather will raise the first exception
          logger.error(f"Iteration stage '{stage_id}' failed: One or more sub-tasks raised an error.", exc_info=True)
          # Cancel any remaining tasks in this iteration group if needed (gather might do this)
          for task in tasks:
               if not task.done(): 
                   task.cancel()
          raise # Re-raise the exception to fail the main workflow stage
```

--------------------------------------------------------------------------------
/examples/research_workflow_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
Advanced Research Assistant Workflow Demo

This script demonstrates a realistic research workflow using the DAG-based 
workflow execution system. It processes research documents through multiple 
analysis stages and produces visualizations of the results.
"""
import asyncio
import os
import sys
from collections import namedtuple  # Import namedtuple

# Add the project root to path so we can import ultimate_mcp_server
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from rich.console import Console
from rich.layout import Layout
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table
from rich.tree import Tree

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.tools.optimization import execute_optimized_workflow
from ultimate_mcp_server.utils import get_logger  # Import get_logger
from ultimate_mcp_server.utils.display import CostTracker  # Import CostTracker

# Initialize rich console
console = Console()

# Initialize logger here so it's available in main()
logger = get_logger("example.research_workflow")

# Create a simple structure for cost tracking from dict (tokens might be missing)
TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])

# Sample research documents
SAMPLE_DOCS = [
    """
    # The Impact of Climate Change on Coastal Communities: A Multi-Regional Analysis
    
    ## Abstract
    This comprehensive study examines the cascading effects of climate change on 570+ coastal cities globally, with projections extending to 2050. Using data from the IPCC AR6 report and economic models from the World Bank (2021), we identify adaptation costs exceeding $71 billion annually. The research incorporates satellite data from NASA's GRACE mission and economic vulnerability indices developed by Stern et al. (2019) to assess regional disparities.
    
    ## Vulnerable Regions and Economic Impact Assessment
    
    ### 1. Southeast Asia
    The Mekong Delta region, home to 17 million people, faces submersion threats to 38% of its landmass by 2050. Ho Chi Minh City has invested $1.42 billion in flood prevention infrastructure, while Bangkok's $2.3 billion flood management system remains partially implemented. The Asian Development Bank (ADB) estimates adaptation costs will reach $5.7 billion annually for Vietnam alone.
    
    ### 2. Pacific Islands
    Kiribati, Tuvalu, and the Marshall Islands face existential threats, with projected displacement of 25-35% of their populations by 2050 according to UN estimates. Australia's "Pacific Resilience Fund" ($2.1 billion) supports adaptation, but President Maamau of Kiribati has criticized its scope as "drastically insufficient." The 2022 Wellington Accords established migration pathways for climate refugees, though implementation remains fragmented.
    
    ### 3. North American Coastal Zones
    Miami-Dade County's $6 billion "Rising Above" initiative represents the largest municipal climate adaptation budget in North America. The U.S. Army Corps of Engineers projects that without intervention, coastal erosion will affect 31% of Florida's beaches by 2040. Economic models by Greenstone and Carleton (2020) indicate property devaluation between $15-27 billion in Florida alone.
    
    ## Adaptation Strategies and Cost-Benefit Analysis
    
    ### Infrastructure Hardening
    The Netherlands' Room for the River program ($2.6 billion) has demonstrated 300% ROI through prevented flood damage. Conversely, New Orleans' post-Katrina $14.5 billion levee system upgrades show more modest returns (130% ROI) due to maintenance requirements and subsidence issues highlighted by Professor Sarah Jenkins (MIT).
    
    ### Managed Retreat
    Indonesia's capital relocation from Jakarta to Borneo (est. cost $34 billion) represents the largest planned managed retreat globally. Smaller programs in Alaska (Newtok and Shishmaref villages) provide case studies with per-capita costs exceeding $380,000. Dr. Robert Chen's longitudinal studies show significant social cohesion challenges, with 47% of relocated communities reporting decreased quality-of-life metrics despite improved safety.
    
    ### Ecosystem-Based Approaches
    Vietnam's mangrove restoration initiative ($220 million) reduces storm surge impacts by 20-50% and provides $8-$20 million in annual aquaculture benefits. The Nature Conservancy's coral reef insurance programs in Mexico demonstrate innovative financing mechanisms while providing co-benefits for local tourism economies valued at $320 million annually.
    
    ## Cross-Disciplinary Implications
    
    Climate migration pathways identified by the UNHCR will increase urban population pressures in receiving cities, particularly in Manila, Dhaka, and Lagos. Healthcare systems in coastal regions report increasing cases of waterborne diseases (62% increase since 2010) and mental health challenges associated with displacement anxiety as documented by the WHO Southeast Asia regional office.
    
    ## References
    
    1. IPCC (2021). AR6 Climate Change 2021: Impacts, Adaptation and Vulnerability
    2. Stern, N., et al. (2019). Economic vulnerability indices for coastal communities
    3. Asian Development Bank. (2022). Southeast Asia Climate Adaptation Report
    4. Greenstone, M., & Carleton, T. (2020). Coastal property value projections 2020-2050
    5. Jenkins, S. (2022). Engineering limitations in climate adaptation infrastructure
    6. Chen, R. (2021). Social dimensions of community relocation programs
    7. World Health Organization. (2021). Climate change health vulnerability assessments
    """,
    
    """
    # Renewable Energy Transition: Economic Implications and Policy Frameworks
    
    ## Executive Summary
    This multi-phase analysis examines the economic transformation accompanying the global renewable energy transition, projecting the creation of 42.3 million new jobs by 2050 while identifying significant regional disparities and transition barriers. Drawing on data from 157 countries, this research provides comprehensive policy recommendations for equitable implementation paths.
    
    ## Methodological Framework
    
    Our modeling utilizes a modified integrated assessment model combining economic inputs from the International Energy Agency (IEA), IRENA's Renewable Jobs Tracker database, and McKinsey's Global Energy Perspective 2022. Labor market projections incorporate automation factors derived from Oxford Economics' Workforce Displacement Index, providing more nuanced job creation estimates than previous studies by Zhang et al. (2019).
    
    ## Employment Transformation Analysis by Sector
    
    ### Solar Photovoltaic Industry
    Employment projections indicate 18.7 million jobs by 2045, concentrated in manufacturing (32%), installation (41%), and operations/maintenance (27%). Regional distribution analysis reveals concerning inequities, with China capturing 41% of manufacturing roles while sub-Saharan Africa secures only 2.3% despite having 16% of global solar potential. The Skill Transferability Index suggests 73% of displaced fossil fuel workers could transition to solar with targeted 6-month reskilling programs.
    
    ### Wind Energy Sector
    Offshore wind development led by Ørsted, Vestas, and General Electric is projected to grow at 24% CAGR through 2035, creating 6.8 million jobs. Supply chain bottlenecks in rare earth elements (particularly neodymium and dysprosium) represent critical vulnerabilities, with 83% of processing controlled by three Chinese companies. Professor Tanaka's analysis suggests price volatilities of 120-350% are possible under geopolitical tensions.
    
    ### Energy Storage Revolution
    Recent lithium-ferro-phosphate (LFP) battery innovations by CATL have reduced implementation costs by 27% while boosting cycle life by 4,000 cycles. Grid-scale storage installations are projected to grow from 17GW (2022) to 220GW by 2035, employing 5.3 million in manufacturing and installation. The MIT Battery Initiative under Dr. Viswanathan has demonstrated promising alternative chemistries using earth-abundant materials that could further accelerate adoption if commercialized by 2025.
    
    ### Hydrogen Economy Emergence
    Green hydrogen production costs have declined from $5.70/kg in 2018 to $3.80/kg in 2023, with projected cost parity with natural gas achievable by 2028 according to BloombergNEF. The European Hydrogen Backbone initiative, requiring €43 billion in infrastructure investment, could generate 3.8 million jobs while reducing EU natural gas imports by 30%. Significant technological challenges remain in storage density and transport infrastructure, as highlighted in critical analyses by Professors Wilson and Leibreich.
    
    ## Transition Barriers and Regional Disparities
    
    ### Financial Constraints
    Developing economies face investment gaps of $730 billion annually according to the Climate Policy Initiative's 2022 report. The African Development Bank estimates that 72% of sub-Saharan African energy projects stall at the planning phase due to financing constraints despite IRRs exceeding 11.5%. Innovative financing mechanisms through the Global Climate Fund have mobilized only 23% of pledged capital as of Q1 2023.
    
    ### Policy Framework Effectiveness
    
    Cross-jurisdictional analysis of 87 renewable portfolio standards reveals three dominant policy approaches:
    
    1. **Carbon Pricing Mechanisms**: The EU ETS carbon price of €85/ton has driven 16.5% emissions reduction in the power sector, while Canada's escalating carbon price schedule ($170/ton by 2030) provides investment certainty. Econometric modeling by Dr. Elizabeth Warren (LSE) indicates prices must reach €120/ton to fully internalize climate externalities.
    
    2. **Direct Subsidies**: Germany's Energiewende subsidies (€238 billion cumulative) achieved 44% renewable penetration but at high consumer costs. Targeted manufacturing incentives under the U.S. Inflation Reduction Act demonstrate improved cost-efficiency with 3.2x private capital mobilization according to analysis by Resources for the Future.
    
    3. **Phased Transition Approaches**: Denmark's offshore wind cluster development model produced the highest success metrics in our analysis, reducing LCOE by 67% while creating domestic supply chains capturing 82% of economic value. This approach has been partially replicated in Taiwan and Vietnam with similar success indicators.
    
    ## Visualized Outcomes Under Various Scenarios
    
    Under an accelerated transition (consistent with 1.5°C warming), global GDP would increase by 2.4% beyond baseline by 2050, while air pollution-related healthcare costs would decline by $780 billion annually. Conversely, our "delayed action" scenario projects stranded fossil assets exceeding $14 trillion, concentrated in 8 petrostate economies, potentially triggering financial contagion comparable to 2008.
    
    ## References
    
    1. International Energy Agency. (2022). World Energy Outlook 2022
    2. IRENA. (2023). Renewable Energy Jobs Annual Review
    3. McKinsey & Company. (2022). Global Energy Perspective
    4. Zhang, F., et al. (2019). Employment impacts of renewable expansion
    5. Oxford Economics. (2021). Workforce Displacement Index
    6. Tanaka, K. (2022). Critical material supply chains in energy transition
    7. Viswanathan, V. (2023). Next-generation grid-scale storage technologies
    8. BloombergNEF. (2023). Hydrogen Economy Outlook
    9. Climate Policy Initiative. (2022). Global Landscape of Climate Finance
    10. Warren, E. (2022). Carbon pricing efficiency and distributional impacts
    11. Resources for the Future. (2023). IRA Impact Assessment
    """,
    
    """
    # Artificial Intelligence Applications in Healthcare Diagnostics: Implementation Challenges and Economic Analysis
    
    ## Abstract
    This comprehensive evaluation examines the integration of artificial intelligence into clinical diagnostic workflows, with particular focus on deep learning systems demonstrating 94.2% accuracy in early-stage cancer detection across 14 cancer types. The analysis spans technical validation, implementation barriers, regulatory frameworks, and economic implications based on data from 137 healthcare systems across 42 countries.
    
    ## Technological Capabilities Assessment
    
    ### Diagnostic Performance Metrics
    
    Google Health's melanoma detection algorithm demonstrated sensitivity of 95.3% and specificity of 92.7% in prospective trials, exceeding dermatologist accuracy by 18 percentage points with consistent performance across Fitzpatrick skin types I-VI. This represents significant improvement over earlier algorithms criticized for performance disparities across demographic groups as documented by Dr. Abigail Johnson in JAMA Dermatology (2021).
    
    The Mayo Clinic's AI-enhanced colonoscopy system increased adenoma detection rates from 30% to 47% in their 2022 clinical implementation study (n=3,812). This translates to approximately 68 prevented colorectal cancer cases per 1,000 screened patients according to the predictive model developed by Dr. Singh at Memorial Sloan Kettering.
    
    Stanford Medicine's deep learning algorithm for chest radiograph interpretation identified 14 pathological conditions with average AUC of 0.91, reducing false negative rates for subtle pneumothorax by 43% and pulmonary nodules by 29% in their multi-center validation study across five hospital systems with diverse patient populations.
    
    ### Architectural Innovations
    
    Recent advancements in foundation models have transformed medical AI capabilities:
    
    1. **Multi-modal integration**: Microsoft/Nuance's DAX system combines speech recognition, natural language processing, and computer vision, enabling real-time clinical documentation with 96.4% accuracy while reducing physician documentation time by 78 minutes daily according to their 16-site implementation study published in Health Affairs.
    
    2. **Explainable AI approaches**: PathAI's interpretable convolutional neural networks provide visualization of decision-making factors in histopathology, addressing the "black box" concern highlighted by regulatory agencies. Their GradCAM implementation allows pathologists to review the specific cellular features informing algorithmic conclusions, increasing adoption willingness by 67% in surveyed practitioners (n=245).
    
    3. **Federated learning**: The MELLODDY consortium's federated approach enables algorithm training across 10 pharmaceutical companies' proprietary datasets without data sharing, demonstrating how privacy-preserving computation can accelerate biomarker discovery. This approach increased available training data by 720% while maintaining data sovereignty.
    
    ## Implementation Challenges
    
    ### Clinical Workflow Integration
    
    Field studies at Massachusetts General Hospital identified five critical integration failure points that reduce AI effectiveness by 30-70% compared to validation performance:
    
    1. Alert fatigue – 52% of clinical recommendations were dismissed when AI systems generated more than 8 alerts per hour
    2. Workflow disruption – Systems requiring more than 15 seconds of additional process time saw 68% lower adoption
    3. Interface design issues – Poorly designed UI elements reduced effective utilization by 47%
    4. Confirmation bias – Clinicians were 3.4× more likely to accept AI suggestions matching their preliminary conclusion
    5. Trust calibration – 64% of clinicians struggled to appropriately weight algorithmic recommendations against their clinical judgment
    
    The Cleveland Clinic's "AI Integration Framework" addresses these challenges through graduated autonomy, contextual presentation, and embedded calibration metrics, increasing sustained adoption rates to 84% compared to the industry average of 31%.
    
    ### Data Infrastructure Requirements
    
    Analysis of implementation failures reveals data architecture as the primary barrier in 68% of stalled healthcare AI initiatives. Specific challenges include:
    
    - Legacy system integration – 73% of U.S. hospitals utilize EHR systems with insufficient API capabilities for real-time AI integration
    - Data standardization – Only 12% of clinical data meets FHIR standards without requiring significant transformation
    - Computational infrastructure – 57% of healthcare systems lack edge computing capabilities necessary for low-latency applications
    
    Kaiser Permanente's successful enterprise-wide implementation demonstrates a viable pathway through their "data fabric" architecture connecting 39 hospitals while maintaining HIPAA compliance. Their staged implementation required $43 million in infrastructure investment but delivered $126 million in annual efficiency gains by year three.
    
    ### Training Requirements for Medical Personnel
    
    Harvard Medical School's "Technology Integration in Medicine" study identified critical competency gaps among practitioners:
    
    - Only 17% of physicians could correctly interpret AI-generated confidence intervals
    - 73% overestimated algorithm capabilities in transfer scenarios
    - 81% lacked understanding of common algorithmic biases
    
    The American Medical Association's AI curriculum module has demonstrated 82% improvement in AI literacy metrics but has reached only a fraction of practitioners. Training economics present a significant barrier, with health systems reporting that comprehensive AI education requires 18-24 hours per clinician at an average opportunity cost of $5,800.
    
    ## Economic and Policy Dimensions
    
    ### Cost-Benefit Model
    
    Our economic modeling based on Medicare claims data projects net healthcare savings of $36.7 billion annually when AI diagnostic systems reach 65% market penetration. These savings derive from:
    
    - Earlier cancer detection: $14.3 billion through stage migration
    - Reduced diagnostic errors: $9.8 billion in avoided misdiagnosis costs
    - Workflow efficiency: $6.2 billion in provider time optimization
    - Avoided unnecessary procedures: $6.4 billion by reducing false positives
    
    Implementation costs average $175,000-$390,000 per facility with 3.1-year average payback periods. Rural and critical access hospitals face disproportionately longer ROI timelines (5.7 years), exacerbating healthcare disparities.
    
    ### Regulatory Framework Analysis
    
    Comparative analysis of regulatory approaches across jurisdictions reveals critical inconsistencies:
    
    | Jurisdiction | Approval Pathway | Post-Market Requirements | Algorithm Update Handling |
    |--------------|------------------|--------------------------|---------------------------|
    | FDA (US) | 510(k)/De Novo | Limited continuous monitoring | Predetermined change protocol |
    | EMA (EU) | MDR risk-based | PMCF with periodic reporting | Significant modification framework |
    | PMDA (Japan) | SAKIGAKE pathway | Mandatory registry participation | Version control system |
    | NMPA (China) | Special approval | Real-world data collection | Annual recertification |
    
    The European Medical Device Regulation's requirement for "human oversight of automated systems" creates implementation ambiguities interpreted differently across member states. The FDA's proposed "Predetermined Change Control Plan" offers the most promising framework for AI's iterative improvement nature but remains in draft status.
    
    ## Conclusions and Future Directions
    
    AI diagnosis systems demonstrate significant technical capabilities but face complex implementation barriers that transcend technological challenges. Our analysis suggests a "sociotechnical systems approach" is essential, recognizing that successful implementation depends equally on technical performance, clinical workflow integration, organizational change management, and policy frameworks.
    
    The Cleveland Clinic-Mayo Clinic consortium's phased implementation approach, beginning with augmentative rather than autonomous functionality, provides a template for successful adoption. Their experience indicates that progressive automation on a 3-5 year timeline produces superior outcomes compared to transformative implementation approaches.
    
    ## References
    
    1. Johnson, A. (2021). Demographic performance disparities in dermatological AI. JAMA Dermatology, 157(2)
    2. Mayo Clinic. (2022). AI-enhanced colonoscopy outcomes study. Journal of Gastrointestinal Endoscopy, 95(3)
    3. Singh, K. (2021). Predictive modeling of prevented colorectal cancer cases. NEJM, 384
    4. Stanford Medicine. (2022). Multi-center validation of deep learning for radiograph interpretation. Radiology, 302(1)
    5. Nuance Communications. (2023). DAX system implementation outcomes. Health Affairs, 42(1)
    6. PathAI. (2022). Pathologist adoption of explainable AI systems. Modern Pathology, 35
    7. MELLODDY Consortium. (2022). Federated learning for pharmaceutical research. Nature Machine Intelligence, 4
    8. Massachusetts General Hospital. (2021). Clinical workflow integration failure points for AI. JAMIA, 28(9)
    9. Cleveland Clinic. (2023). AI Integration Framework outcomes. Healthcare Innovation, 11(2)
    10. American Medical Association. (2022). Physician AI literacy assessment. Journal of Medical Education, 97(6)
    11. Centers for Medicare & Medicaid Services. (2023). Healthcare AI economic impact analysis
    12. FDA. (2023). Proposed framework for AI/ML-based SaMD. Regulatory Science Forum
    """,
    
    """
    # Quantum Computing Applications in Pharmaceutical Discovery: Capabilities, Limitations, and Industry Transformation
    
    ## Executive Summary
    
    This analysis evaluates the integration of quantum computing technologies into pharmaceutical R&D workflows, examining current capabilities, near-term applications, and long-term industry transformation potential. Based on benchmarking across 17 pharmaceutical companies and 8 quantum technology providers, we provide a comprehensive assessment of this emerging computational paradigm and its implications for drug discovery economics.
    
    ## Current Quantum Computing Capabilities
    
    ### Hardware Platforms Assessment
    
    **Superconducting quantum processors** (IBM, Google, Rigetti) currently provide the most mature platform with IBM's 433-qubit Osprey system demonstrating quantum volume of 128 and error rates approaching 10^-3 per gate operation. While impressive relative to 2018 benchmarks, these systems remain limited by coherence times (averaging 114 microseconds) and require operating temperatures near absolute zero, creating significant infrastructure requirements.
    
    **Trapped-ion quantum computers** (IonQ, Quantinuum) offer superior coherence times exceeding 10 seconds and all-to-all connectivity but operate at slower gate speeds. IonQ's 32-qubit system achieved algorithmic qubits (#AQ) of 20, setting a record for effective computational capability when error mitigation is considered. Quantinuum's H-Series demonstrated the first logical qubit with real-time quantum error correction, a significant milestone towards fault-tolerant quantum computing.
    
    **Photonic quantum systems** (Xanadu, PsiQuantum) represent an alternative approach with potentially simpler scaling requirements. Xanadu's Borealis processor demonstrated quantum advantage for specific sampling problems but lacks the gate-based universality required for most pharmaceutical applications. PsiQuantum's fault-tolerant silicon photonic approach continues rapid development with semiconductor manufacturing partner GlobalFoundries but remains pre-commercial.
    
    **Neutral atom platforms** (QuEra, Pasqal) entered commercial accessibility in 2023, offering unprecedented qubit counts (QuEra: 256 atoms) with programmable geometries particularly suited for quantum simulation of molecular systems. Recent demonstrations of 3D atom arrangements provide promising avenues for simulating protein-ligand interactions.
    
    ### Quantum Algorithm Development
    
    Pharmaceutical applications currently focus on three quantum algorithm classes:
    
    1. **Variational Quantum Eigensolver (VQE)** algorithms have progressed significantly for molecular ground state energy calculations, with Riverlane's enhanced VQE implementations demonstrating accuracy within 1.5 kcal/mol for molecules up to 20 atoms on IBM's 127-qubit processors. Merck's collaboration with Zapata Computing improved convergence rates by 300% through adaptive ansatz methods.
    
    2. **Quantum Machine Learning (QML)** approaches for binding affinity prediction have shown mixed results. Pfizer's implementation of quantum convolutional neural networks (QCNN) demonstrated a 22% improvement in binding affinity predictions for their kinase inhibitor library, while AstraZeneca's quantum support vector machine approach showed no significant advantage over classical methods for their dataset.
    
    3. **Quantum Annealing** for conformational search remains dominated by D-Wave's 5,000+ qubit systems, with Boehringer Ingelheim reporting successful applications in peptide folding predictions. However, comparisons with enhanced classical methods (particularly those using modern GPUs) show quantum advantage remains elusive for most production cases.
    
    ## Pharmaceutical Applications Landscape
    
    ### Virtual Screening Transformation
    
    GSK's quantum computing team achieved a significant milestone in 2022 through quantum-classical hybrid algorithms that accelerated screening of 10^7 compounds against novel SARS-CoV-2 targets. Their approach used classical computers for initial filtering followed by quantum evaluation of 10^4 promising candidates, identifying 12 compounds with nanomolar binding affinities subsequently confirmed by experimental assays. While impressive, the computational requirements exceeded $1.2M and required specialized expertise from partners at Quantinuum.
    
    ### Molecular Property Prediction
    
    Roche's collaboration with Cambridge Quantum Computing (now Quantinuum) demonstrated quantum advantage for dipole moment calculations in drug-like molecules, achieving accuracy improvements of 16% compared to density functional theory methods while potentially offering asymptotic speedup as molecule size increases. Their hybrid quantum-classical approach requires significantly fewer qubits than full quantum simulation, making it commercially relevant within the NISQ (Noisy Intermediate-Scale Quantum) era of hardware.
    
    ### Retrosynthesis Planning
    
    Quantum approaches to synthetic route planning remain largely theoretical with limited experimental validation. MIT-Takeda research demonstrated proof-of-concept for mapping retrosynthesis to quantum walks on Johnson graphs, with preliminary results showing promise for identifying non-obvious synthetic pathways. Commercial application appears distant (5-8 years) given current hardware limitations.
    
    ## Economic Implications Analysis
    
    Our economic model quantifies four significant impacts on pharmaceutical R&D:
    
    1. **Preclinical timeline compression**: Currently estimated at 2-5% (0.5-1.3 months) but projected to reach 15-30% by 2030 as quantum hardware capabilities expand, potentially reducing time-to-market by up to 9 months for novel compounds
    
    2. **Candidate quality improvements**: Quantum-enhanced binding affinity and ADMET property predictions demonstrate 7-18% higher success rates in early clinical phases across our analysis of 87 compounds that utilized quantum computational methods in preclinical development
    
    3. **Novel mechanism identification**: Quantum simulation of previously intractable biological targets (particularly intrinsically disordered proteins and complex protein-protein interactions) could expand the druggable proteome by an estimated 8-14% according to our analysis of protein data bank targets
    
    4. **R&D productivity impacts**: A 10% improvement in candidate quality translates to approximately $310M in reduced clinical development costs per approved drug by reducing late-stage failures
    
    ## Investment and Adoption Patterns
    
    Pharmaceutical quantum computing investment has accelerated dramatically, with cumulative industry investment growing from $18M (2018) to $597M (2023). Investment strategies fall into three categories:
    
    1. **Direct infrastructure investment** (Roche, Merck): Building internal quantum teams and securing dedicated quantum hardware access
    
    2. **Collaborative research partnerships** (GSK, Biogen, Novartis): Forming multi-year academic and commercial partnerships focused on specific computational challenges
    
    3. **Quantum-as-a-service utilization** (Majority approach): Accessing quantum capabilities through cloud providers with limited internal expertise development
    
    Our analysis of 23 pharmaceutical companies indicates:
    - 19% have established dedicated quantum computing teams
    - 43% have active research collaborations with quantum providers
    - 78% report evaluating quantum capabilities for specific workflows
    - 100% express concerns about quantum talent acquisition challenges
    
    ## Future Outlook and Strategic Recommendations
    
    The pharmaceutical quantum computing landscape will evolve through three distinct phases:
    
    **Near-term (1-3 years)**: Hybrid quantum-classical algorithms will demonstrate incremental value in specific niches, particularly molecular property calculations and conformational analysis of small to medium-sized molecules. Successful organizations will combine quantum capabilities with enhanced classical methods rather than seeking immediate quantum advantage.
    
    **Mid-term (3-7 years)**: Error-corrected logical qubits will enable more robust quantum chemistry applications with demonstrable advantage for drug discovery workflows. Companies with established quantum capabilities will gain first-mover advantages in applying these technologies to proprietary chemical matter.
    
    **Long-term (7+ years)**: Fault-tolerant quantum computers with thousands of logical qubits could transform pharmaceutical R&D by enabling full quantum mechanical simulation of protein-drug interactions and previously intractable biological systems. This capability could fundamentally alter drug discovery economics by dramatically reducing empirical screening requirements.
    
    ## References
    
    1. IBM Quantum. (2023). Osprey processor technical specifications and benchmarking
    2. IonQ. (2023). Algorithmic qubit benchmarking methodology and results
    3. Quantinuum. (2022). H-Series logical qubit demonstration
    4. Xanadu. (2022). Borealis quantum advantage results. Nature Physics, 18
    5. QuEra. (2023). Neutral atom quantum processor capabilities. Science, 377
    6. Riverlane & Merck. (2022). Enhanced VQE implementations for molecular ground state calculations
    7. Pfizer Quantum Team. (2023). QCNN for binding affinity prediction. Journal of Chemical Information and Modeling
    8. AstraZeneca. (2022). Comparative analysis of quantum and classical ML methods
    9. Boehringer Ingelheim. (2023). Quantum annealing for peptide conformational search
    10. GSK Quantum Computing Team. (2022). Quantum-classical hybrid screening against SARS-CoV-2
    11. Roche & Cambridge Quantum Computing. (2023). Quantum advantage for dipole moment calculations
    12. MIT-Takeda Quantum Research. (2022). Mapping retrosynthesis to quantum walks
    13. PhRMA Quantum Computing Working Group. (2023). Pharmaceutical R&D impact analysis
    """,
    
    """
    # Neuroplasticity in Cognitive Rehabilitation: Mechanisms, Interventions, and Clinical Applications
    
    ## Abstract
    
    This multidisciplinary review synthesizes current understanding of neuroplasticity mechanisms underlying cognitive rehabilitation, evaluating intervention efficacies across five domains of cognitive function following acquired brain injury. Integrating data from 142 clinical studies with advanced neuroimaging findings, we present evidence-based recommendations for clinical practice and identify promising emerging approaches.
    
    ## Neurobiological Foundations of Rehabilitation-Induced Plasticity
    
    ### Cellular and Molecular Mechanisms
    
    Recent advances in understanding activity-dependent plasticity have revolutionized rehabilitation approaches. The pioneering work of Dr. Alvarez-Buylla at UCSF has demonstrated that even the adult human brain maintains neurogenic capabilities in the hippocampus and subventricular zone, with newly generated neurons integrating into existing neural circuits following injury. Transcriptomic studies by Zhang et al. (2021) identified 37 genes significantly upregulated during rehabilitation-induced recovery, with brain-derived neurotrophic factor (BDNF) and insulin-like growth factor-1 (IGF-1) showing particularly strong associations with positive outcomes.
    
    Post-injury plasticity occurs through multiple complementary mechanisms:
    
    1. **Synaptic remodeling**: Two-photon microscopy studies in animal models reveal extensive dendritic spine turnover within peri-lesional cortex during the first 3-8 weeks post-injury. The pioneering work of Professor Li-Huei Tsai demonstrates that enriched rehabilitation environments increase spine formation rates by 47-68% compared to standard housing conditions.
    
    2. **Network reorganization**: Professor Nicholas Schiff's research at Weill Cornell demonstrates that dormant neural pathways can be functionally recruited following injury through targeted stimulation. Their multimodal imaging studies identified specific thalamocortical circuits that, when engaged through non-invasive stimulation, facilitated motor recovery in 72% of chronic stroke patients previously classified as "plateaued."
    
    3. **Myelination dynamics**: Recent discoveries by Dr. Fields at NIH demonstrate activity-dependent myelination as a previously unrecognized form of neuroplasticity. Diffusion tensor imaging studies by Wang et al. (2022) show significant increases in white matter integrity following intensive cognitive training, correlating with functional improvements (r=0.62, p<0.001).
    
    ### Neuroimaging Correlates of Successful Rehabilitation
    
    Longitudinal multimodal neuroimaging studies have identified several biomarkers of successful cognitive rehabilitation:
    
    - **Functional connectivity reorganization**: Using resting-state fMRI, Northoff's laboratory documented that successful attention training in 67 TBI patients correlated with increased connectivity between the dorsolateral prefrontal cortex and posterior parietal regions (change in z-score: 0.43 ± 0.12), while unsuccessful cases showed no significant connectivity changes.
    
    - **Cortical thickness preservation**: Dr. Gabrieli's team at MIT found that cognitive rehabilitation initiated within 30 days of injury preserved cortical thickness in vulnerable regions, with each week of delay associated with 0.8% additional atrophy in domain-relevant cortical regions.
    
    - **Default mode network modulation**: Advanced network analyses by Dr. Marcus Raichle demonstrate that cognitive rehabilitation success correlates with restoration of appropriate task-related deactivation of the default mode network, suggesting intervention effectiveness can be monitored through this biomarker.
    
    ## Evidence-Based Intervention Analysis
    
    ### Attention and Executive Function Rehabilitation
    
    Our meta-analysis of 42 randomized controlled trials evaluating attention training programs reveals three intervention approaches with significant effect sizes:
    
    1. **Adaptive computerized training** (Hedges' g = 0.68, 95% CI: 0.54-0.82): Programs like Attention Process Training showed transfer to untrained measures when training adapts in real-time to performance. The NYU-Columbia adaptive attention protocol demonstrated maintenance of gains at 18-month follow-up (retention rate: 83%).
    
    2. **Metacognitive strategy training** (Hedges' g = 0.57, 95% CI: 0.41-0.73): The Toronto Hospital's Strategic Training for Executive Control program resulted in significant improvements on ecological measures of executive function. Moderator analyses indicate effectiveness increases when combined with daily strategy implementation exercises (interaction effect: p=0.002).
    
    3. **Neurostimulation-enhanced approaches**: Combined tDCS-cognitive training protocols developed at Harvard demonstrate 37% greater improvement compared to cognitive training alone. Targeting the right inferior frontal gyrus with 2mA anodal stimulation during inhibitory control training shows particular promise for impulsivity reduction (Cohen's d = 0.74).
    
    ### Memory Rehabilitation Approaches
    
    Memory intervention effectiveness varies substantially by memory system affected and etiology:
    
    - **Episodic memory**: For medial temporal lobe damage, compensatory approaches using spaced retrieval and errorless learning demonstrate the strongest evidence. Dr. Schacter's laboratory protocol combining elaborative encoding with distributed practice shows a remarkable 247% improvement in functional memory measures compared to intensive rehearsal techniques.
    
    - **Prospective memory**: Implementation intention protocols developed by Professor Gollwitzer show transfer to daily functioning with large effect sizes (d = 0.92) when combined with environmental restructuring. Smartphone-based reminder systems increased medication adherence by 43% in our 12-month community implementation study.
    
    - **Working memory**: Recent controversy surrounding n-back training was addressed in Professor Klingberg's definitive multi-site study demonstrating domain-specific transfer effects. Their adaptive protocol produced sustainable working memory improvements (40% above baseline at 6-month follow-up) when training exceeded 20 hours and incorporated gradually increasing interference control demands.
    
    ## Clinical Application Framework
    
    ### Precision Rehabilitation Medicine Approach
    
    Our analysis indicates rehabilitation effectiveness increases substantially when protocols are tailored using a precision medicine framework:
    
    1. **Comprehensive neurocognitive phenotyping**: The McGill Cognitive Rehabilitation Battery enables identification of specific processing deficits, allowing intervention targeting. Machine learning analysis of 1,247 patient profiles identified 11 distinct neurocognitive phenotypes that respond differentially to specific interventions.
    
    2. **Biomarker-guided protocol selection**: EEG connectivity measures predicted response to attention training with 76% accuracy in our validation cohort, potentially reducing non-response rates. Professor Knight's laboratory demonstrated that P300 latency specifically predicts processing speed training response (AUC = 0.81).
    
    3. **Adaptive progression algorithms**: Real-time difficulty adjustment based on multiple performance parameters rather than accuracy alone increased transfer effects by 34% compared to standard adaptive approaches. The computational model developed by Stanford's Poldrack laboratory dynamically optimizes challenge levels to maintain engagement while maximizing error-based learning.
    
    ### Implementation Science Considerations
    
    Our implementation analysis across 24 rehabilitation facilities identified critical factors for successful cognitive rehabilitation programs:
    
    - **Rehabilitation intensity and timing**: Early intervention (< 6 weeks post-injury) with high intensity (minimum 15 hours/week of direct treatment) demonstrated superior outcomes (NNT = 3.2 for clinically significant improvement).
    
    - **Therapist expertise effects**: Specialized certification in cognitive rehabilitation was associated with 28% larger treatment effects compared to general rehabilitation credentials.
    
    - **Technology augmentation**: Hybrid models combining therapist-directed sessions with home-based digital practice demonstrated optimal cost-effectiveness (ICER = $12,430/QALY) while addressing access barriers.
    
    ## Future Directions and Emerging Approaches
    
    Several innovative approaches show promise for enhancing neuroplasticity during cognitive rehabilitation:
    
    1. **Closed-loop neurostimulation**: Dr. Suthana's team at UCLA demonstrated that theta-burst stimulation delivered precisely during specific phases of hippocampal activity enhanced associative memory formation by 37% in patients with mild cognitive impairment.
    
    2. **Pharmacologically augmented rehabilitation**: The RESTORE trial combining daily atomoxetine with executive function training demonstrated synergistic effects (interaction p<0.001) compared to either intervention alone. Professor Feeney's research suggests a critical 30-minute window where noradrenergic enhancement specifically promotes task-relevant plasticity.
    
    3. **Virtual reality cognitive training**: Immersive VR protocols developed at ETH Zurich demonstrated transfer to real-world functioning by simulating ecologically relevant scenarios with graduated difficulty. Their randomized trial showed 3.2× greater functional improvement compared to matched non-immersive training.
    
    4. **Sleep optimization protocols**: The Northwestern sleep-enhanced memory consolidation protocol increased rehabilitation effectiveness by 41% by delivering targeted memory reactivation during slow-wave sleep, suggesting rehabilitation schedules should specifically incorporate sleep architecture considerations.
    
    ## Conclusion
    
    Cognitive rehabilitation effectiveness has improved substantially through integration of neuroplasticity principles, advanced technology, and precision intervention approaches. Optimal outcomes occur when interventions target specific neurocognitive mechanisms with sufficient intensity and are tailored to individual patient profiles. Emerging approaches leveraging closed-loop neurotechnology and multimodal enhancement strategies represent promising directions for further advancing rehabilitation outcomes.
    
    ## References
    
    1. Alvarez-Buylla, A., & Lim, D. A. (2022). Neurogenesis in the adult human brain following injury
    2. Zhang, Y., et al. (2021). Transcriptomic analysis of rehabilitation-responsive genes
    3. Tsai, L. H., et al. (2023). Environmental enrichment effects on dendritic spine dynamics
    4. Schiff, N. D. (2022). Recruitment of dormant neural pathways following brain injury
    5. Fields, R. D. (2021). Activity-dependent myelination as a form of neuroplasticity
    6. Wang, X., et al. (2022). White matter integrity changes following cognitive training
    7. Northoff, G., et al. (2023). Functional connectivity reorganization during attention training
    8. Gabrieli, J. D., et al. (2021). Relationship between intervention timing and cortical preservation
    9. Raichle, M. E. (2022). Default mode network dynamics as a biomarker of rehabilitation efficacy
    10. NYU-Columbia Collaborative. (2023). Adaptive attention protocol long-term outcomes
    11. Schacter, D. L., et al. (2021). Elaborative encoding with distributed practice for episodic memory
    12. Gollwitzer, P. M., & Oettingen, G. (2022). Implementation intentions for prospective memory
    13. Klingberg, T., et al. (2023). Multi-site study of adaptive working memory training
    14. Poldrack, R. A., et al. (2022). Computational models for optimizing learning parameters
    15. Suthana, N., et al. (2023). Phase-specific closed-loop stimulation for memory enhancement
    16. Feeney, D. M., & Sutton, R. L. (2022). Pharmacological enhancement of rehabilitation
    17. ETH Zurich Rehabilitation Engineering Group. (2023). Virtual reality cognitive training
    18. Northwestern Memory & Cognition Laboratory. (2022). Sleep-enhanced memory consolidation
    """
]

async def display_workflow_diagram(workflow):
    """Display a visual representation of the workflow DAG."""
    console.print("\n[bold cyan]Workflow Execution Plan[/bold cyan]")
    
    # Create a tree representation of the workflow
    tree = Tree("[bold yellow]Research Analysis Workflow[/bold yellow]")
    
    # Track dependencies for visualization
    dependencies = {}
    for stage in workflow:
        stage_id = stage["stage_id"]
        deps = stage.get("depends_on", [])
        for dep in deps:
            if dep not in dependencies:
                dependencies[dep] = []
            dependencies[dep].append(stage_id)
    
    # Add stages without dependencies first (roots)
    root_stages = [s for s in workflow if not s.get("depends_on")]
    stage_map = {s["stage_id"]: s for s in workflow}
    
    def add_stage_to_tree(parent_tree, stage_id):
        stage = stage_map[stage_id]
        tool = stage["tool_name"]
        node_text = f"[bold green]{stage_id}[/bold green] ([cyan]{tool}[/cyan])"
        
        if "iterate_on" in stage:
            node_text += " [italic](iterative)[/italic]"
            
        stage_node = parent_tree.add(node_text)
        
        # Add children (stages that depend on this one)
        children = dependencies.get(stage_id, [])
        for child in children:
            add_stage_to_tree(stage_node, child)
    
    # Build the tree
    for root in root_stages:
        add_stage_to_tree(tree, root["stage_id"])
    
    # Print the tree
    console.print(tree)
    
    # Display additional workflow statistics
    table = Table(title="Workflow Statistics")
    table.add_column("Metric", style="cyan")
    table.add_column("Value", style="green")
    
    table.add_row("Total Stages", str(len(workflow)))
    table.add_row("Parallel Stages", str(len(root_stages)))
    table.add_row("Iterative Stages", str(sum(1 for s in workflow if "iterate_on" in s)))
    
    console.print(table)

async def display_execution_progress(workflow_future):
    """Display a live progress indicator while the workflow executes."""
    with Progress(
        SpinnerColumn(),
        TextColumn("[bold blue]{task.description}"),
        console=console
    ) as progress:
        task = progress.add_task("[yellow]Executing workflow...", total=None)
        result = await workflow_future
        progress.update(task, completed=True, description="[green]Workflow completed!")
        return result

async def visualize_results(results):
    """Create visualizations of the workflow results."""
    console.print("\n[bold magenta]Research Analysis Results[/bold magenta]")
    
    # Set up layout
    layout = Layout()
    layout.split_column(
        Layout(name="header"),
        Layout(name="statistics"),
        Layout(name="summaries"),
        Layout(name="extracted_entities"),
    )
    
    # Header
    layout["header"].update(Panel(
        "[bold]Advanced Research Assistant Results[/bold]",
        style="blue"
    ))
    
    # Statistics
    stats_table = Table(title="Document Processing Statistics")
    stats_table.add_column("Document", style="cyan")
    stats_table.add_column("Word Count", style="green")
    stats_table.add_column("Entity Count", style="yellow")
    
    try:
        chunking_result = results["results"]["chunking_stage"]["output"]
        entity_results = results["results"]["entity_extraction_stage"]["output"]
        
        for i, doc_stats in enumerate(chunking_result.get("document_stats", [])):
            entity_count = len(entity_results[i].get("entities", []))
            stats_table.add_row(
                f"Document {i+1}", 
                str(doc_stats.get("word_count", "N/A")),
                str(entity_count)
            )
    except (KeyError, IndexError) as e:
        console.print(f"[red]Error displaying statistics: {e}[/red]")
    
    layout["statistics"].update(stats_table)
    
    # Summaries
    summary_panels = []
    try:
        summaries = results["results"]["summary_stage"]["output"]
        for i, summary in enumerate(summaries):
            summary_panels.append(Panel(
                summary.get("summary", "No summary available"),
                title=f"Document {i+1} Summary",
                border_style="green"
            ))
    except (KeyError, IndexError) as e:
        summary_panels.append(Panel(
            f"Error retrieving summaries: {e}",
            title="Summary Error",
            border_style="red"
        ))
    
    layout["summaries"].update(summary_panels)
    
    # Extracted entities
    try:
        final_analysis = results["results"]["final_analysis_stage"]["output"]
        json_str = Syntax(
            str(final_analysis.get("analysis", "No analysis available")),
            "json",
            theme="monokai",
            line_numbers=True
        )
        layout["extracted_entities"].update(Panel(
            json_str,
            title="Final Analysis",
            border_style="magenta"
        ))
    except (KeyError, IndexError) as e:
        layout["extracted_entities"].update(Panel(
            f"Error retrieving final analysis: {e}",
            title="Analysis Error",
            border_style="red"
        ))
    
    # Print layout
    console.print(layout)
    
    # Display execution time
    console.print(
        f"\n[bold green]Total workflow execution time:[/bold green] "
        f"{results.get('total_processing_time', 0):.2f} seconds"
    )

def create_research_workflow():
    """Define a complex research workflow with multiple parallel and sequential stages."""
    workflow = [
        # Initial document processing stages (run in parallel for all documents)
        {
            "stage_id": "chunking_stage",
            "tool_name": "chunk_document",
            "params": {
                "text": "${documents}",
                "chunk_size": 1000,
                "get_stats": True
            }
        },
        
        # Entity extraction runs in parallel with summarization
        {
            "stage_id": "entity_extraction_stage",
            "tool_name": "extract_entity_graph",
            "params": {
                "text": "${documents}",
                "entity_types": ["organization", "person", "concept", "location", "technology"],
                "include_relations": True,
                "confidence_threshold": 0.7
            }
        },
        
        # Summarization stage (iterate over each document)
        {
            "stage_id": "summary_stage",
            "tool_name": "summarize_document",
            "params": {
                "text": "${documents}",
                "max_length": 150,
                "focus_on": "key findings and implications"
            }
        },
        
        # Classification of document topics
        {
            "stage_id": "classification_stage",
            "tool_name": "text_classification",
            "depends_on": ["chunking_stage"],
            "params": {
                "text": "${chunking_stage.document_text}",
                "categories": [
                    "Climate & Environment", 
                    "Technology", 
                    "Healthcare", 
                    "Economy", 
                    "Social Policy",
                    "Scientific Research"
                ],
                "provider": Provider.OPENAI.value,
                "multi_label": True,
                "confidence_threshold": 0.6
            }
        },
        
        # Generate structured insights from entity analysis
        {
            "stage_id": "entity_insights_stage",
            "tool_name": "extract_json",
            "depends_on": ["entity_extraction_stage"],
            "params": {
                "text": "${entity_extraction_stage.text_output}",
                "schema": {
                    "key_entities": "array",
                    "primary_relationships": "array",
                    "research_domains": "array"
                },
                "include_reasoning": True
            }
        },
        
        # Cost-optimized final analysis
        {
            "stage_id": "model_selection_stage",
            "tool_name": "recommend_model",
            "depends_on": ["summary_stage", "classification_stage", "entity_insights_stage"],
            "params": {
                "task_type": "complex analysis and synthesis",
                "expected_input_length": 3000,
                "expected_output_length": 1000,
                "required_capabilities": ["reasoning", "knowledge"],
                "priority": "balanced"
            }
        },
        
        # Final analysis and synthesis
        {
            "stage_id": "final_analysis_stage",
            "tool_name": "chat_completion",
            "depends_on": ["model_selection_stage", "summary_stage", "classification_stage", "entity_insights_stage"],
            "params": {
                "messages": [
                    {
                        "role": "system",
                        "content": "You are a research assistant synthesizing information from multiple documents."
                    },
                    {
                        "role": "user",
                        "content": "Analyze the following research summaries, classifications, and entity insights. Provide a comprehensive analysis that identifies cross-document patterns, contradictions, and key insights. Format the response as structured JSON.\n\nSummaries: ${summary_stage.summary}\n\nClassifications: ${classification_stage.classifications}\n\nEntity Insights: ${entity_insights_stage.content}"
                    }
                ],
                "model": "${model_selection_stage.recommendations[0].model}",
                "response_format": {"type": "json_object"}
            }
        }
    ]
    
    return workflow

async def main():
    """Run the complete research assistant workflow demo."""
    console.print(Rule("[bold magenta]Advanced Research Workflow Demo[/bold magenta]"))
    tracker = CostTracker() # Instantiate tracker

    try:
        # Display header
        console.print(Panel.fit(
            "[bold cyan]Advanced Research Assistant Workflow Demo[/bold cyan]\n"
            "Powered by NetworkX DAG-based Workflow Engine",
            title="Ultimate MCP Server",
            border_style="green"
        ))
        
        # Create the workflow definition
        workflow = create_research_workflow()
        
        # Visualize the workflow before execution
        await display_workflow_diagram(workflow)
        
        # Prompt user to continue
        console.print("\n[yellow]Press Enter to execute the workflow...[/yellow]", end="")
        input()
        
        # Execute workflow with progress display
        workflow_future = execute_optimized_workflow(
            documents=SAMPLE_DOCS,
            workflow=workflow,
            max_concurrency=3
        )
        
        results = await display_execution_progress(workflow_future)
        
        # Track cost if possible
        if results and isinstance(results, dict) and "cost" in results:
             try:
                total_cost = results.get("cost", {}).get("total_cost", 0.0)
                processing_time = results.get("total_processing_time", 0.0)
                # Provider/Model is ambiguous here, use a placeholder
                trackable = TrackableResult(
                    cost=total_cost,
                    input_tokens=0, # Not aggregated
                    output_tokens=0, # Not aggregated
                    provider="workflow",
                    model="research_workflow",
                    processing_time=processing_time
                )
                tracker.add_call(trackable)
             except Exception as track_err:
                logger.warning(f"Could not track workflow cost: {track_err}", exc_info=False)

        if results:
            console.print(Rule("[bold green]Workflow Execution Completed[/bold green]"))
            await visualize_results(results.get("outputs", {}))
        else:
            console.print("[bold red]Workflow execution failed or timed out.[/bold red]")

    except Exception as e:
        console.print(f"[bold red]An unexpected error occurred:[/bold red] {e}")
    
    # Display cost summary
    tracker.display_summary(console)

if __name__ == "__main__":
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/vector/vector_service.py:
--------------------------------------------------------------------------------

```python
"""Vector database service for semantic search."""
import asyncio
import json
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

from ultimate_mcp_server.services.vector.embeddings import get_embedding_service
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)

# Try to import chromadb
try:
    import chromadb
    from chromadb.config import Settings as ChromaSettings
    CHROMADB_AVAILABLE = True
    logger.info("ChromaDB imported successfully", extra={"emoji_key": "success"})
except ImportError as e:
    logger.warning(f"ChromaDB not available: {str(e)}", extra={"emoji_key": "warning"})
    CHROMADB_AVAILABLE = False

# Try to import hnswlib, but don't fail if not available
try:
    import hnswlib
    HNSWLIB_AVAILABLE = True
    HNSW_INDEX = hnswlib.Index
except ImportError:
    HNSWLIB_AVAILABLE = False
    HNSW_INDEX = None


class VectorCollection:
    """A collection of vectors with metadata."""
    
    def __init__(
        self,
        name: str,
        dimension: int = 1536,
        similarity_metric: str = "cosine",
        metadata: Optional[Dict[str, Any]] = None
    ):
        """Initialize a vector collection.
        
        Args:
            name: Collection name
            dimension: Vector dimension
            similarity_metric: Similarity metric (cosine, dot, or euclidean)
            metadata: Optional metadata for the collection
        """
        self.name = name
        self.dimension = dimension
        self.similarity_metric = similarity_metric
        self.metadata = metadata or {}
        
        # Initialize storage
        self.vectors = []
        self.ids = []
        self.metadatas = []
        
        # Create embedding service
        self.embedding_service = get_embedding_service()
        
        # Initialize search index
        self._init_search_index()
        
        logger.info(
            f"Vector collection '{name}' created ({dimension} dimensions)",
            extra={"emoji_key": "vector"}
        )
        
    def _init_search_index(self):
        """Initialize search index based on available libraries."""
        self.index_type = "numpy"  # Fallback
        self.index = None
        
        # Try to use HNSW for fast search if available
        if HNSWLIB_AVAILABLE:
            try:
                self.index = HNSW_INDEX(space=self._get_hnswlib_space(), dim=self.dimension)
                self.index.init_index(max_elements=1000, ef_construction=200, M=16)
                self.index.set_ef(50)  # Search accuracy parameter
                self.index_type = "hnswlib"
                logger.debug(
                    f"Using HNSW index for collection '{self.name}'",
                    emoji_key="vector"
                )
            except Exception as e:
                logger.warning(
                    f"Failed to initialize HNSW index: {str(e)}. Falling back to numpy.",
                    emoji_key="warning"
                )
                self.index = None
    
    def _get_hnswlib_space(self) -> str:
        """Get HNSW space based on similarity metric.
        
        Returns:
            HNSW space name
        """
        if self.similarity_metric == "cosine":
            return "cosine"
        elif self.similarity_metric == "dot":
            return "ip"  # Inner product
        elif self.similarity_metric == "euclidean":
            return "l2"
        else:
            return "cosine"  # Default
    
    def add(
        self,
        vectors: Union[List[List[float]], np.ndarray],
        ids: Optional[List[str]] = None,
        metadatas: Optional[List[Dict[str, Any]]] = None
    ) -> List[str]:
        """Add vectors to the collection.
        
        Args:
            vectors: Vectors to add
            ids: Optional IDs for the vectors (generated if not provided)
            metadatas: Optional metadata for each vector
            
        Returns:
            List of vector IDs
        """
        # Ensure vectors is a numpy array
        if not isinstance(vectors, np.ndarray):
            vectors = np.array(vectors, dtype=np.float32)
        
        # Generate IDs if not provided
        if ids is None:
            ids = [str(uuid.uuid4()) for _ in range(len(vectors))]
        
        # Ensure metadatas is a list of dicts
        if metadatas is None:
            metadatas = [{} for _ in range(len(vectors))]
        
        # Add to storage
        for _i, (vector, id, metadata) in enumerate(zip(vectors, ids, metadatas, strict=False)):
            self.vectors.append(vector)
            self.ids.append(id)
            self.metadatas.append(metadata)
        
        # Update index if using HNSW
        if self.index_type == "hnswlib" and self.index is not None:
            try:
                # Resize index if needed
                if len(self.vectors) > self.index.get_max_elements():
                    new_size = max(1000, len(self.vectors) * 2)
                    self.index.resize_index(new_size)
                
                # Add vectors to index
                start_idx = len(self.vectors) - len(vectors)
                for i, vector in enumerate(vectors):
                    self.index.add_items(vector, start_idx + i)
            except Exception as e:
                logger.error(
                    f"Failed to update HNSW index: {str(e)}",
                    emoji_key="error"
                )
                # Rebuild index
                self._rebuild_index()
        
        logger.debug(
            f"Added {len(vectors)} vectors to collection '{self.name}'",
            emoji_key="vector"
        )
        
        return ids
    
    def _rebuild_index(self):
        """Rebuild the search index from scratch."""
        if not HNSWLIB_AVAILABLE or not self.vectors:
            return
            
        try:
            # Re-initialize index
            self.index = HNSW_INDEX(space=self._get_hnswlib_space(), dim=self.dimension)
            self.index.init_index(max_elements=max(1000, len(self.vectors) * 2), ef_construction=200, M=16)
            self.index.set_ef(50)
            
            # Add all vectors
            vectors_array = np.array(self.vectors, dtype=np.float32)
            self.index.add_items(vectors_array, np.arange(len(self.vectors)))
            
            logger.info(
                f"Rebuilt HNSW index for collection '{self.name}'",
                emoji_key="vector"
            )
        except Exception as e:
            logger.error(
                f"Failed to rebuild HNSW index: {str(e)}",
                emoji_key="error"
            )
            self.index = None
            self.index_type = "numpy"
    
    def search(
        self,
        query_vector: Union[List[float], np.ndarray],
        top_k: int = 5,
        filter: Optional[Dict[str, Any]] = None,
        similarity_threshold: float = 0.0
    ) -> List[Dict[str, Any]]:
        """Search for similar vectors.
        
        Args:
            query_vector: Query vector
            top_k: Number of results to return
            filter: Optional metadata filter
            similarity_threshold: Minimum similarity score (0.0 to 1.0)
            
        Returns:
            List of results with scores and metadata
        """
        # Ensure query_vector is a numpy array
        if not isinstance(query_vector, np.ndarray):
            query_vector = np.array(query_vector, dtype=np.float32)
        
        # Log some diagnostic information
        logger.debug(f"Collection '{self.name}' contains {len(self.vectors)} vectors")
        logger.debug(f"Searching for top {top_k} matches with filter: {filter} and threshold: {similarity_threshold}")
        
        # Filter vectors based on metadata if needed
        if filter:
            filtered_indices = self._apply_filter(filter)
            if not filtered_indices:
                logger.debug(f"No vectors match the filter criteria: {filter}")
                return []
            logger.debug(f"Filter reduced search space to {len(filtered_indices)} vectors")
        else:
            filtered_indices = list(range(len(self.vectors)))
            logger.debug(f"No filter applied, searching all {len(filtered_indices)} vectors")
        
        # If no vectors to search, return empty results
        if not filtered_indices:
            logger.debug("No vectors to search, returning empty results")
            return []
        
        # Perform search based on index type
        if self.index_type == "hnswlib" and self.index is not None and not filter:
            # Use HNSW for fast search (only if no filter)
            try:
                start_time = time.time()
                labels, distances = self.index.knn_query(query_vector, k=min(top_k, len(self.vectors)))
                search_time = time.time() - start_time
                
                # Convert distances to similarities based on metric
                if self.similarity_metric == "cosine" or self.similarity_metric == "dot":
                    similarities = 1.0 - distances[0]  # Convert distance to similarity
                else:
                    similarities = 1.0 / (1.0 + distances[0])  # Convert distance to similarity
                
                # Format results
                results = []
                for _i, (label, similarity) in enumerate(zip(labels[0], similarities, strict=False)):
                    # Apply similarity threshold
                    if similarity < similarity_threshold:
                        continue
                        
                    results.append({
                        "id": self.ids[label],
                        "similarity": float(similarity),
                        "metadata": self.metadatas[label],
                        "vector": self.vectors[label].tolist(),
                    })
                
                logger.debug(
                    f"HNSW search completed in {search_time:.6f}s, found {len(results)} results"
                )
                
                for i, result in enumerate(results):
                    logger.debug(f"Result {i+1}: id={result['id']}, similarity={result['similarity']:.4f}, metadata={result['metadata']}")
                
                return results
            except Exception as e:
                logger.error(
                    f"HNSW search failed: {str(e)}. Falling back to numpy.",
                    emoji_key="error"
                )
                # Fall back to numpy search
        
        # Numpy-based search (slower but always works)
        start_time = time.time()
        
        # Calculate similarities
        results = []
        for idx in filtered_indices:
            vector = self.vectors[idx]
            
            # Calculate similarity based on metric
            if self.similarity_metric == "cosine":
                similarity = cosine_similarity(query_vector, vector)
            elif self.similarity_metric == "dot":
                similarity = np.dot(query_vector, vector)
            elif self.similarity_metric == "euclidean":
                similarity = 1.0 / (1.0 + np.linalg.norm(query_vector - vector))
            else:
                similarity = cosine_similarity(query_vector, vector)
            
            # Apply similarity threshold
            if similarity < similarity_threshold:
                continue
                
            results.append({
                "id": self.ids[idx],
                "similarity": float(similarity),
                "metadata": self.metadatas[idx],
                "vector": vector.tolist(),
            })
        
        # Sort by similarity (descending)
        results.sort(key=lambda x: x["similarity"], reverse=True)
        
        # Limit to top_k
        results = results[:top_k]
        
        search_time = time.time() - start_time
        logger.debug(
            f"Numpy search completed in {search_time:.6f}s, found {len(results)} results"
        )
        
        for i, result in enumerate(results):
            logger.debug(f"Result {i+1}: id={result['id']}, similarity={result['similarity']:.4f}, metadata={result['metadata']}")
        
        return results
    
    def _apply_filter(self, filter: Dict[str, Any]) -> List[int]:
        """Apply metadata filter to get matching indices.
        
        Args:
            filter: Metadata filter
            
        Returns:
            List of matching indices
        """
        filtered_indices = []
        for i, metadata in enumerate(self.metadatas):
            # Simple equality filter for now
            match = True
            for k, v in filter.items():
                if k not in metadata or metadata[k] != v:
                    match = False
                    break
            if match:
                filtered_indices.append(i)
        return filtered_indices
    
    async def search_by_text(
        self,
        query_text: str,
        top_k: int = 5,
        filter: Optional[Dict[str, Any]] = None,
        model: Optional[str] = None,
        similarity_threshold: float = 0.0
    ) -> List[Dict[str, Any]]:
        """Search by text query.
        
        Args:
            query_text: Text query
            top_k: Number of results to return
            filter: Optional metadata filter
            model: Embedding model name
            similarity_threshold: Minimum similarity score (0.0 to 1.0)
            
        Returns:
            List of results with scores and metadata
        """
        # Get query embedding - call create_embeddings with a list and get the first result
        query_embeddings = await self.embedding_service.create_embeddings(
            texts=[query_text], # Pass text as a list
            # model=model # create_embeddings uses the model set during service init
        )
        if not query_embeddings: # Handle potential empty result
            logger.error(f"Failed to generate embedding for query: {query_text}")
            return []
            
        query_embedding = query_embeddings[0] # Get the first (only) embedding
        
        # Search with the embedding
        return self.search(query_embedding, top_k, filter, similarity_threshold)
    
    def delete(
        self,
        ids: Optional[List[str]] = None,
        filter: Optional[Dict[str, Any]] = None
    ) -> int:
        """Delete vectors from the collection.
        
        Args:
            ids: IDs of vectors to delete
            filter: Metadata filter for vectors to delete
            
        Returns:
            Number of vectors deleted
        """
        if ids is None and filter is None:
            return 0
        
        # Get indices to delete
        indices_to_delete = set()
        
        # Add indices by ID
        if ids:
            for i, id in enumerate(self.ids):
                if id in ids:
                    indices_to_delete.add(i)
        
        # Add indices by filter
        if filter:
            filtered_indices = self._apply_filter(filter)
            indices_to_delete.update(filtered_indices)
        
        # Delete vectors (in reverse order to avoid index issues)
        indices_to_delete = sorted(indices_to_delete, reverse=True)
        for idx in indices_to_delete:
            del self.vectors[idx]
            del self.ids[idx]
            del self.metadatas[idx]
        
        # Rebuild index if using HNSW
        if self.index_type == "hnswlib" and self.index is not None:
            self._rebuild_index()
        
        logger.info(
            f"Deleted {len(indices_to_delete)} vectors from collection '{self.name}'",
            emoji_key="vector"
        )
        
        return len(indices_to_delete)
    
    def save(self, directory: Union[str, Path]) -> bool:
        """Save collection to disk.
        
        Args:
            directory: Directory to save to
            
        Returns:
            True if successful
        """
        directory = Path(directory)
        directory.mkdir(parents=True, exist_ok=True)
        
        try:
            # Save vectors
            vectors_array = np.array(self.vectors, dtype=np.float32)
            np.save(str(directory / "vectors.npy"), vectors_array)
            
            # Save IDs and metadata
            with open(directory / "data.json", "w") as f:
                json.dump({
                    "name": self.name,
                    "dimension": self.dimension,
                    "similarity_metric": self.similarity_metric,
                    "metadata": self.metadata,
                    "ids": self.ids,
                    "metadatas": self.metadatas,
                }, f)
            
            logger.info(
                f"Saved collection '{self.name}' to {directory}",
                emoji_key="vector"
            )
            return True
        except Exception as e:
            logger.error(
                f"Failed to save collection: {str(e)}",
                emoji_key="error"
            )
            return False
    
    @classmethod
    def load(cls, directory: Union[str, Path]) -> "VectorCollection":
        """Load collection from disk.
        
        Args:
            directory: Directory to load from
            
        Returns:
            Loaded collection
            
        Raises:
            FileNotFoundError: If collection files not found
            ValueError: If collection data is invalid
        """
        directory = Path(directory)
        
        # Check if files exist
        vectors_file = directory / "vectors.npy"
        data_file = directory / "data.json"
        
        if not vectors_file.exists() or not data_file.exists():
            raise FileNotFoundError(f"Collection files not found in {directory}")
        
        try:
            # Load vectors
            vectors_array = np.load(str(vectors_file))
            vectors = [vectors_array[i] for i in range(len(vectors_array))]
            
            # Load data
            with open(data_file, "r") as f:
                data = json.load(f)
            
            # Create collection
            collection = cls(
                name=data["name"],
                dimension=data["dimension"],
                similarity_metric=data["similarity_metric"],
                metadata=data["metadata"]
            )
            
            # Set data
            collection.ids = data["ids"]
            collection.metadatas = data["metadatas"]
            collection.vectors = vectors
            
            # Rebuild index
            collection._rebuild_index()
            
            logger.info(
                f"Loaded collection '{collection.name}' from {directory} ({len(vectors)} vectors)",
                emoji_key="vector"
            )
            
            return collection
        except Exception as e:
            logger.error(
                f"Failed to load collection: {str(e)}",
                emoji_key="error"
            )
            raise ValueError(f"Failed to load collection: {str(e)}") from e
    
    def get_stats(self) -> Dict[str, Any]:
        """Get collection statistics.
        
        Returns:
            Dictionary of statistics
        """
        return {
            "name": self.name,
            "dimension": self.dimension,
            "similarity_metric": self.similarity_metric,
            "vectors_count": len(self.vectors),
            "index_type": self.index_type,
            "metadata": self.metadata,
        }
    
    def clear(self) -> None:
        """Clear all vectors from the collection."""
        self.vectors = []
        self.ids = []
        self.metadatas = []
        
        # Reset index
        self._init_search_index()
        
        logger.info(
            f"Cleared collection '{self.name}'",
            emoji_key="vector"
        )

    async def query(
        self,
        query_texts: List[str],
        n_results: int = 10,
        where: Optional[Dict[str, Any]] = None,
        where_document: Optional[Dict[str, Any]] = None,
        include: Optional[List[str]] = None
    ) -> Dict[str, List[Any]]:
        """Query the collection with text queries (compatibility with ChromaDB).

        Args:
            query_texts: List of query texts
            n_results: Number of results to return
            where: Optional metadata filter
            where_document: Optional document content filter
            include: Optional list of fields to include

        Returns:
            Dictionary with results in ChromaDB format
        """
        logger.debug(f"DEBUG VectorCollection.query: query_texts={query_texts}, n_results={n_results}")
        logger.debug(f"DEBUG VectorCollection.query: where={where}, where_document={where_document}")
        logger.debug(f"DEBUG VectorCollection.query: include={include}")
        logger.debug(f"DEBUG VectorCollection.query: Collection has {len(self.vectors)} vectors and {len(self.ids)} IDs")

        # Initialize results
        results = {
            "ids": [],
            "documents": [],
            "metadatas": [],
            "distances": [],
            "embeddings": []
        }

        # Process each query
        for query_text in query_texts:
            # Get embedding using the async embedding service (which uses its configured model)
            logger.debug(f"DEBUG VectorCollection.query: Getting embedding for '{query_text}' using service model: {self.embedding_service.model_name}")
            try:
                query_embeddings_list = await self.embedding_service.create_embeddings([query_text])
                if not query_embeddings_list or not query_embeddings_list[0]:
                     logger.error(f"Failed to generate embedding for query: '{query_text[:50]}...'")
                     # Add empty results for this query and continue
                     results["ids"].append([])
                     results["documents"].append([])
                     results["metadatas"].append([])
                     results["distances"].append([])
                     if "embeddings" in (include or []):
                         results["embeddings"].append([])
                     continue # Skip to next query_text
                query_embedding = np.array(query_embeddings_list[0], dtype=np.float32)
                if query_embedding.size == 0:
                     logger.warning(f"Generated query embedding is empty for: '{query_text[:50]}...'. Skipping search for this query.")
                     # Add empty results for this query and continue
                     results["ids"].append([])
                     results["documents"].append([])
                     results["metadatas"].append([])
                     results["distances"].append([])
                     if "embeddings" in (include or []):
                         results["embeddings"].append([])
                     continue # Skip to next query_text

            except Exception as embed_err:
                 logger.error(f"Error generating embedding for query '{query_text[:50]}...': {embed_err}", exc_info=True)
                 # Add empty results for this query and continue
                 results["ids"].append([])
                 results["documents"].append([])
                 results["metadatas"].append([])
                 results["distances"].append([])
                 if "embeddings" in (include or []):
                     results["embeddings"].append([])
                 continue # Skip to next query_text

            logger.debug(f"DEBUG VectorCollection.query: Embedding shape: {query_embedding.shape}")

            # Search with the embedding
            logger.debug(f"Searching for query text: '{query_text}' in collection '{self.name}'")
            search_results = self.search(
                query_vector=query_embedding, # Use the generated embedding
                top_k=n_results,
                filter=where,
                similarity_threshold=0.0  # Set to 0 to get all results for debugging
            )
            
            logger.debug(f"DEBUG VectorCollection.query: Found {len(search_results)} raw search results")
            
            # Format results in ChromaDB format
            ids = []
            documents = []
            metadatas = []
            distances = []
            embeddings = []

            for i, item in enumerate(search_results):
                ids.append(item["id"])

                # Extract document from metadata (keep existing robust logic)
                metadata = item.get("metadata", {})
                doc = ""
                if "text" in metadata:
                    doc = metadata["text"]
                elif "document" in metadata:
                    doc = metadata["document"]
                elif "content" in metadata:
                    doc = metadata["content"]
                if not doc and isinstance(metadata, str):
                    doc = metadata

                # Apply document content filter if specified
                if where_document and where_document.get("$contains"):
                    filter_text = where_document["$contains"]
                    if filter_text not in doc:
                        logger.debug(f"DEBUG VectorCollection.query: Skipping doc {i} - doesn't contain filter text")
                        continue

                logger.debug(f"Result {i+1}: id={item['id']}, similarity={item.get('similarity', 0.0):.4f}, doc_length={len(doc)}")

                documents.append(doc)
                metadatas.append(metadata)
                distance = 1.0 - item.get("similarity", 0.0)
                distances.append(distance)
                if "embeddings" in (include or []):
                    embeddings.append(item.get("vector", []))

            # Add results for the current query_text
            results["ids"].append(ids)
            results["documents"].append(documents)
            results["metadatas"].append(metadatas)
            results["distances"].append(distances)
            if "embeddings" in (include or []):
                results["embeddings"].append(embeddings)

            logger.debug(f"DEBUG VectorCollection.query: Final formatted results for this query - {len(documents)} documents")

        return results


class VectorDatabaseService:
    """Vector database service for semantic search."""
    
    _instance = None
    
    def __new__(cls, *args, **kwargs):
        """Create a singleton instance."""
        if cls._instance is None:
            cls._instance = super(VectorDatabaseService, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(
        self,
        base_dir: Optional[Union[str, Path]] = None,
        use_chromadb: Optional[bool] = None
    ):
        """Initialize the vector database service.
        
        Args:
            base_dir: Base directory for storage
            use_chromadb: Whether to use ChromaDB (if available)
        """
        # Only initialize once for singleton
        if self._initialized:
            return
            
        # Set base directory
        if base_dir:
            self.base_dir = Path(base_dir)
        else:
            self.base_dir = Path.home() / ".ultimate" / "vector_db"
            
        # Create base directory if it doesn't exist
        self.base_dir.mkdir(parents=True, exist_ok=True)
        
        # Check if ChromaDB should be used
        self.use_chromadb = use_chromadb if use_chromadb is not None else CHROMADB_AVAILABLE
        
        # Initialize ChromaDB client if used
        self.chroma_client = None
        if self.use_chromadb and CHROMADB_AVAILABLE:
            try:
                # Create ChromaDB directory if it doesn't exist
                chroma_dir = self.base_dir / "chromadb"
                chroma_dir.mkdir(parents=True, exist_ok=True)
                
                self.chroma_client = chromadb.PersistentClient(
                    path=str(chroma_dir),
                    settings=ChromaSettings(
                        anonymized_telemetry=False,
                        allow_reset=True
                    )
                )
                
                # Test if it works properly
                test_collections = self.chroma_client.list_collections()
                logger.debug(f"ChromaDB initialized with {len(test_collections)} existing collections")
                
                logger.info(
                    "Using ChromaDB for vector storage",
                    emoji_key="vector"
                )
            except Exception as e:
                logger.error(
                    f"Failed to initialize ChromaDB: {str(e)}. Vector operations will not work properly.",
                    emoji_key="error"
                )
                # We'll raise an error rather than falling back to local storage
                # as that creates inconsistency
                self.use_chromadb = False
                self.chroma_client = None
                
                # Re-raise if ChromaDB was explicitly requested
                if use_chromadb:
                    raise ValueError(f"ChromaDB initialization failed: {str(e)}") from e
        else:
            if use_chromadb and not CHROMADB_AVAILABLE:
                logger.error(
                    "ChromaDB was explicitly requested but is not available. Please install it with: pip install chromadb",
                    emoji_key="error"
                )
                raise ImportError("ChromaDB was requested but is not installed")
                
            self.use_chromadb = False
            
        # Collections
        self.collections = {}
        
        # Get embedding service
        self.embedding_service = get_embedding_service()
        
        self._initialized = True
        
        logger.info(
            f"Vector database service initialized (base_dir: {self.base_dir}, use_chromadb: {self.use_chromadb})",
            emoji_key="vector"
        )
    
    async def _reset_chroma_client(self) -> bool:
        """Reset or recreate the ChromaDB client.
        
        Returns:
            True if successful
        """
        if not CHROMADB_AVAILABLE or not self.use_chromadb:
            return False
            
        try:
            # First try using the reset API if available
            if self.chroma_client and hasattr(self.chroma_client, 'reset'):
                try:
                    self.chroma_client.reset()
                    logger.debug("Reset ChromaDB client successfully")
                    return True
                except Exception as e:
                    logger.debug(f"Failed to reset ChromaDB client using reset(): {str(e)}")
            
            # If that fails, recreate the client
            chroma_dir = self.base_dir / "chromadb"
            chroma_dir.mkdir(parents=True, exist_ok=True)
            
            self.chroma_client = chromadb.PersistentClient(
                path=str(chroma_dir),
                settings=ChromaSettings(
                    anonymized_telemetry=False,
                    allow_reset=True
                )
            )
            
            logger.debug("Successfully recreated ChromaDB client")
            return True
        except Exception as e:
            logger.error(
                f"Failed to reset or recreate ChromaDB client: {str(e)}",
                emoji_key="error"
            )
            return False
    
    async def create_collection(
        self,
        name: str,
        dimension: int = 1536,
        similarity_metric: str = "cosine",
        metadata: Optional[Dict[str, Any]] = None,
        overwrite: bool = False
    ) -> Union[VectorCollection, Any]:
        """Create a new collection.
        
        Args:
            name: Collection name
            dimension: Vector dimension
            similarity_metric: Similarity metric (cosine, dot, or euclidean)
            metadata: Optional metadata for the collection
            overwrite: Whether to overwrite existing collection
            
        Returns:
            Created collection
            
        Raises:
            ValueError: If collection already exists and overwrite is False
        """
        # Check if collection already exists in memory
        if name in self.collections and not overwrite:
            raise ValueError(f"Collection '{name}' already exists")
        
        # For consistency, if overwrite is True, explicitly delete any existing collection
        if overwrite:
            try:
                # Delete from memory collections
                if name in self.collections:
                    del self.collections[name]
            
                # Try to delete from ChromaDB
                await self.delete_collection(name)
                logger.debug(f"Deleted existing collection '{name}' for overwrite")
                
                # # If using ChromaDB and overwrite is True, also try to reset the client
                # if self.use_chromadb and self.chroma_client:
                #     await self._reset_chroma_client()
                #     logger.debug("Reset ChromaDB client before creating new collection")
                
                # Force a delay to ensure deletions complete
                await asyncio.sleep(1.5)
                
            except Exception as e:
                logger.debug(f"Error during collection cleanup for overwrite: {str(e)}")
        
        # Create collection based on storage type
        if self.use_chromadb and self.chroma_client is not None:
            # Use ChromaDB
            # Sanitize metadata for ChromaDB (no None values)
            sanitized_metadata = {}
            if metadata:
                for k, v in metadata.items():
                    if v is not None and not isinstance(v, (str, int, float, bool)):
                        sanitized_metadata[k] = str(v)  # Convert to string
                    elif v is not None:
                        sanitized_metadata[k] = v  # Keep as is if it's a valid type
            
            # Force a delay to ensure previous deletions have completed
            await asyncio.sleep(0.1)
            
            # Create collection
            try:
                collection = self.chroma_client.create_collection(
                    name=name,
                    metadata=sanitized_metadata or {"description": "Vector collection"}
                )
                
                logger.info(
                    f"Created ChromaDB collection '{name}'",
                    emoji_key="vector"
                )
                
                self.collections[name] = collection
                return collection
            except Exception as e:
                # Instead of falling back to local storage, raise the error
                logger.error(
                    f"Failed to create ChromaDB collection: {str(e)}",
                    emoji_key="error"
                )
                raise ValueError(f"Failed to create ChromaDB collection: {str(e)}") from e
        else:
            # Use local storage
            collection = VectorCollection(
                name=name,
                dimension=dimension,
                similarity_metric=similarity_metric,
                metadata=metadata
            )
            
            self.collections[name] = collection
            return collection
    
    async def get_collection(self, name: str) -> Optional[Union[VectorCollection, Any]]:
        """Get a collection by name.
        
        Args:
            name: Collection name
            
        Returns:
            Collection or None if not found
        """
        # Check if collection is already loaded
        if name in self.collections:
            return self.collections[name]
            
        # Try to load from disk
        if self.use_chromadb and self.chroma_client is not None:
            # Check if ChromaDB collection exists
            try:
                # In ChromaDB v0.6.0+, list_collections() returns names not objects
                existing_collections = self.chroma_client.list_collections()
                existing_collection_names = []
                
                # Handle both chromadb v0.6.0+ and older versions
                if existing_collections and not isinstance(existing_collections[0], str):
                    # v0.6.0+ returns collection objects
                    for collection in existing_collections:
                        # Access name attribute or use object itself if it's a string
                        if hasattr(collection, 'name'):
                            existing_collection_names.append(collection.name)
                        else:
                            existing_collection_names.append(str(collection))
                else:
                    # Older versions return string names directly
                    existing_collection_names = existing_collections
                    
                if name in existing_collection_names:
                    collection = self.chroma_client.get_collection(name)
                    self.collections[name] = collection
                    return collection
            except Exception as e:
                logger.error(
                    f"Failed to get ChromaDB collection: {str(e)}",
                    emoji_key="error"
                )
        
        # Try to load local collection
        collection_dir = self.base_dir / "collections" / name
        if collection_dir.exists():
            try:
                collection = VectorCollection.load(collection_dir)
                self.collections[name] = collection
                return collection
            except Exception as e:
                logger.error(
                    f"Failed to load collection '{name}': {str(e)}",
                    emoji_key="error"
                )
        
        return None
    
    async def list_collections(self) -> List[str]:
        """List all collection names.
        
        Returns:
            List of collection names
        """
        collection_names = set(self.collections.keys())
        
        # Add collections from ChromaDB
        if self.use_chromadb and self.chroma_client is not None:
            try:
                # Handle both chromadb v0.6.0+ and older versions
                chroma_collections = self.chroma_client.list_collections()
                
                # Check if we received a list of collection objects or just names
                if chroma_collections and not isinstance(chroma_collections[0], str):
                    # v0.6.0+ returns collection objects
                    for collection in chroma_collections:
                        # Access name attribute or use object itself if it's a string
                        if hasattr(collection, 'name'):
                            collection_names.add(collection.name)
                        else:
                            collection_names.add(str(collection))
                else:
                    # Older versions return string names directly
                    for collection in chroma_collections:
                        collection_names.add(collection)
            except Exception as e:
                logger.error(
                    f"Failed to list ChromaDB collections: {str(e)}",
                    emoji_key="error"
                )
        
        # Add collections from disk
        collections_dir = self.base_dir / "collections"
        if collections_dir.exists():
            for path in collections_dir.iterdir():
                if path.is_dir() and (path / "data.json").exists():
                    collection_names.add(path.name)
        
        return list(collection_names)
    
    async def delete_collection(self, name: str) -> bool:
        """Delete a collection.
        
        Args:
            name: Collection name
            
        Returns:
            True if successful
        """
        # Remove from loaded collections
        if name in self.collections:
            del self.collections[name]
        
        success = True
        
        # Delete from ChromaDB
        if self.use_chromadb and self.chroma_client is not None:
            try:
                # Check if collection exists in ChromaDB first
                exists_in_chromadb = False
                try:
                    collections = self.chroma_client.list_collections()
                    # Handle different versions of ChromaDB API
                    if collections and hasattr(collections[0], 'name'):
                        collection_names = [c.name for c in collections]
                    else:
                        collection_names = collections
                        
                    exists_in_chromadb = name in collection_names
                except Exception as e:
                    logger.debug(f"Error checking ChromaDB collections: {str(e)}")
                
                # Only try to delete if it exists
                if exists_in_chromadb:
                    self.chroma_client.delete_collection(name)
                    logger.debug(f"Deleted ChromaDB collection '{name}'")
            except Exception as e:
                logger.warning(
                    f"Failed to delete ChromaDB collection: {str(e)}",
                    emoji_key="warning"
                )
                success = False
        
        # Delete from disk
        collection_dir = self.base_dir / "collections" / name
        if collection_dir.exists():
            try:
                import shutil
                shutil.rmtree(collection_dir)
                logger.debug(f"Deleted collection directory: {collection_dir}")
            except Exception as e:
                logger.error(
                    f"Failed to delete collection directory: {str(e)}",
                    emoji_key="error"
                )
                return False
        
        logger.info(
            f"Deleted collection '{name}'",
            emoji_key="vector"
        )
        
        return success
    
    async def add_texts(
        self,
        collection_name: str,
        texts: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        ids: Optional[List[str]] = None,
        embedding_model: Optional[str] = None,
        batch_size: int = 100
    ) -> List[str]:
        """Add texts to a collection.
        
        Args:
            collection_name: Collection name
            texts: Texts to add
            metadatas: Optional metadata for each text
            ids: Optional IDs for the texts
            embedding_model: Embedding model name (NOTE: Model is set during EmbeddingService init)
            batch_size: Maximum batch size for embedding generation
            
        Returns:
            List of document IDs
            
        Raises:
            ValueError: If collection not found
        """
        # Get or create collection
        collection = await self.get_collection(collection_name)
        if collection is None:
            collection = await self.create_collection(collection_name)
        
        # Generate embeddings
        logger.debug(f"Generating embeddings for {len(texts)} texts using model: {self.embedding_service.model_name}")
        embeddings = []
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i + batch_size]
            batch_embeddings = await self.embedding_service.create_embeddings(
                texts=batch_texts,
            )
            embeddings.extend(batch_embeddings)
            if len(texts) > batch_size: # Add delay if batching
                await asyncio.sleep(0.1) # Small delay between batches
        
        logger.debug(f"Generated {len(embeddings)} embeddings")
        
        # Add to collection
        if self.use_chromadb and isinstance(collection, chromadb.Collection):
            # ChromaDB collection
            try:
                # Generate IDs if not provided
                if ids is None:
                    ids = [str(uuid.uuid4()) for _ in range(len(texts))]
                
                # Ensure metadatas is provided
                if metadatas is None:
                    metadatas = [{} for _ in range(len(texts))]
                
                # Add to ChromaDB collection
                collection.add(
                    embeddings=embeddings,
                    documents=texts,
                    metadatas=metadatas,
                    ids=ids
                )
                
                logger.info(
                    f"Added {len(texts)} documents to ChromaDB collection '{collection_name}'",
                    emoji_key="vector"
                )
                
                return ids
            except Exception as e:
                logger.error(
                    f"Failed to add documents to ChromaDB collection: {str(e)}",
                    emoji_key="error"
                )
                raise
        else:
            # Local collection
            # For local collection, store text in metadata
            combined_metadata = []
            for _i, (text, meta) in enumerate(zip(texts, metadatas or [{} for _ in range(len(texts))], strict=False)):
                # Create metadata with text as main content
                combined_meta = {"text": text}
                # Add any other metadata
                if meta:
                    combined_meta.update(meta)
                combined_metadata.append(combined_meta)
                
            logger.debug(f"Adding vectors to local collection with metadata: {combined_metadata[0] if combined_metadata else None}")
            
            result_ids = collection.add(
                vectors=embeddings,
                ids=ids,
                metadatas=combined_metadata
            )
            
            logger.debug(f"Added {len(result_ids)} vectors to local collection '{collection_name}'")
            
            return result_ids
    
    async def search_by_text(
        self,
        collection_name: str,
        query_text: str,
        top_k: int = 5,
        filter: Optional[Dict[str, Any]] = None,
        embedding_model: Optional[str] = None,
        include_vectors: bool = False,
        similarity_threshold: float = 0.0
    ) -> List[Dict[str, Any]]:
        """Search a collection by text query.
        
        Args:
            collection_name: Collection name
            query_text: Text query
            top_k: Number of results to return
            filter: Optional metadata filter
            embedding_model: Embedding model name
            include_vectors: Whether to include vectors in results
            similarity_threshold: Minimum similarity score (0.0 to 1.0)
            
        Returns:
            List of search results
            
        Raises:
            ValueError: If collection not found
        """
        # Get collection
        collection = await self.get_collection(collection_name)
        if collection is None:
            raise ValueError(f"Collection '{collection_name}' not found")
        
        # Search collection
        if self.use_chromadb and isinstance(collection, chromadb.Collection):
            # ChromaDB collection
            try:
                # Convert filter to ChromaDB format if provided
                chroma_filter = self._convert_to_chroma_filter(filter) if filter else None
                
                # Prepare include parameters for ChromaDB
                include_params = ["documents", "metadatas", "distances"]
                if include_vectors:
                    include_params.append("embeddings")
                
                # Get embedding directly using our service
                query_embeddings = await self.embedding_service.create_embeddings(
                    texts=[query_text],
                    # model=embedding_model # Model is defined in the service instance
                )
                if not query_embeddings:
                    logger.error(f"Failed to generate embedding for query: {query_text}")
                    return []
                query_embedding = query_embeddings[0]
                
                logger.debug(f"Using explicitly generated embedding with model {self.embedding_service.model_name}")
                
                # Search ChromaDB collection with our embedding
                results = collection.query(
                    query_embeddings=[query_embedding],  # Use our embedding directly, not ChromaDB's
                    n_results=top_k,
                    where=chroma_filter,
                    where_document=None,
                    include=include_params
                )
                
                # Format results and apply similarity threshold
                formatted_results = []
                for i in range(len(results["ids"][0])):
                    similarity = 1.0 - float(results["distances"][0][i])  # Convert distance to similarity
                    
                    # Skip results below threshold
                    if similarity < similarity_threshold:
                        continue
                        
                    result = {
                        "id": results["ids"][0][i],
                        "text": results["documents"][0][i],
                        "metadata": results["metadatas"][0][i],
                        "similarity": similarity,
                    }
                    
                    if include_vectors and "embeddings" in results:
                        result["vector"] = results["embeddings"][0][i]
                        
                    formatted_results.append(result)
                
                return formatted_results
            except Exception as e:
                logger.error(
                    f"Failed to search ChromaDB collection: {str(e)}",
                    emoji_key="error"
                )
                raise
        else:
            # Local collection
            results = await collection.search_by_text(
                query_text=query_text,
                top_k=top_k,
                filter=filter,
                # model=embedding_model, # Pass model used by the collection's service instance
                similarity_threshold=similarity_threshold
            )
            
            # Format results
            formatted_results = []
            for result in results:
                formatted_result = {
                    "id": result["id"],
                    "text": result["metadata"].get("text", ""),
                    "metadata": {k: v for k, v in result["metadata"].items() if k != "text"},
                    "similarity": result["similarity"],
                }
                
                if include_vectors:
                    formatted_result["vector"] = result["vector"]
                    
                formatted_results.append(formatted_result)
                
            return formatted_results
    
    def _convert_to_chroma_filter(self, filter: Dict[str, Any]) -> Dict[str, Any]:
        """Convert filter to ChromaDB format.
        
        Args:
            filter: Filter dictionary
            
        Returns:
            ChromaDB-compatible filter
        """
        # Simple equality filter for now
        return filter
    
    def save_all_collections(self) -> int:
        """Save all local collections to disk.
        
        Returns:
            Number of collections saved
        """
        saved_count = 0
        collections_dir = self.base_dir / "collections"
        collections_dir.mkdir(parents=True, exist_ok=True)
        
        for name, collection in self.collections.items():
            if not self.use_chromadb or not isinstance(collection, chromadb.Collection):
                # Only save local collections
                collection_dir = collections_dir / name
                if collection.save(collection_dir):
                    saved_count += 1
        
        logger.info(
            f"Saved {saved_count} collections to disk",
            emoji_key="vector"
        )
        
        return saved_count
    
    async def get_stats(self) -> Dict[str, Any]:
        """Get statistics about collections.
        
        Returns:
            Dictionary of statistics
        """
        collection_names = await self.list_collections()
        collection_stats = {}
        
        for name in collection_names:
            collection = await self.get_collection(name)
            if collection:
                if isinstance(collection, VectorCollection):
                    collection_stats[name] = collection.get_stats()
                else:
                    # ChromaDB collection
                    try:
                        count = collection.count()
                        collection_stats[name] = {
                            "count": count,
                            "type": "chromadb"
                        }
                    except Exception as e:
                        logger.error(
                            f"Error getting stats for ChromaDB collection '{name}': {str(e)}",
                            emoji_key="error"
                        )
                        collection_stats[name] = {
                            "count": 0,
                            "type": "chromadb",
                            "error": str(e)
                        }
        
        stats = {
            "collections": len(collection_names),
            "collection_stats": collection_stats
        }
        
        return stats

    async def get_collection_metadata(self, name: str) -> Dict[str, Any]:
        """Get collection metadata.
        
        Args:
            name: Collection name
            
        Returns:
            Collection metadata
            
        Raises:
            ValueError: If collection not found
        """
        # Get collection
        collection = await self.get_collection(name)
        if collection is None:
            raise ValueError(f"Collection '{name}' not found")
            
        # Get metadata
        try:
            if self.use_chromadb and hasattr(collection, "get_metadata"):
                # ChromaDB collection
                return collection.get_metadata() or {}
            elif hasattr(collection, "metadata"):
                # Local collection
                return collection.metadata or {}
        except Exception as e:
            logger.error(
                f"Failed to get collection metadata: {str(e)}",
                emoji_key="error"
            )
        
        return {}

    async def update_collection_metadata(self, name: str, metadata: Dict[str, Any]) -> bool:
        """Update collection metadata.
        
        Args:
            name: Collection name
            metadata: New metadata
            
        Returns:
            True if successful
            
        Raises:
            ValueError: If collection not found
        """
        # Get collection
        collection = await self.get_collection(name)
        if collection is None:
            raise ValueError(f"Collection '{name}' not found")
            
        # Update metadata
        try:
            if self.use_chromadb and hasattr(collection, "update_metadata"):
                # ChromaDB collection - needs validation
                validated_metadata = {}
                for k, v in metadata.items():
                    # ChromaDB accepts only str, int, float, bool
                    if isinstance(v, (str, int, float, bool)):
                        validated_metadata[k] = v
                    elif v is None:
                        # Skip None values
                        logger.debug(f"Skipping None value for metadata key '{k}'")
                        continue
                    else:
                        # Convert other types to string
                        validated_metadata[k] = str(v)
                        
                # Debug log the validated metadata
                logger.debug(f"Updating ChromaDB collection metadata with: {validated_metadata}")
                
                collection.update_metadata(validated_metadata)
            elif hasattr(collection, "metadata"):
                # Local collection
                collection.metadata.update(metadata)
                
            logger.info(
                f"Updated metadata for collection '{name}'",
                emoji_key="vector"
            )
            return True
        except Exception as e:
            logger.error(
                f"Failed to update collection metadata: {str(e)}",
                emoji_key="error"
            )
            # Don't re-raise, just return false
            return False


# Singleton instance getter
def get_vector_db_service(
    base_dir: Optional[Union[str, Path]] = None,
    use_chromadb: Optional[bool] = None
) -> VectorDatabaseService:
    """Get the vector database service singleton instance.
    
    Args:
        base_dir: Base directory for storage
        use_chromadb: Whether to use ChromaDB (if available)
        
    Returns:
        VectorDatabaseService singleton instance
    """
    return VectorDatabaseService(base_dir, use_chromadb)
```

--------------------------------------------------------------------------------
/examples/sentiment_analysis_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Business sentiment analysis demonstration using Ultimate MCP Server."""

import asyncio
import sys
from pathlib import Path
from typing import Any, Dict, List

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

# Third-party imports
from rich import box
from rich.console import Group
from rich.markup import escape
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.rule import Rule
from rich.table import Table
from rich.tree import Tree

# Project imports
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.tools.sentiment_analysis import (
    analyze_business_sentiment,
    analyze_business_text_batch,
)
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker
from ultimate_mcp_server.utils.logging.console import console

# Initialize logger
logger = get_logger("example.business_sentiment_demo")

# Provider and model configuration - easy to change
PROVIDER = Provider.OPENAI.value  # Change this to switch providers (e.g., Provider.OPENAI.value)
MODEL = 'gpt-4.1-nano'  # Set to None to use default model for the provider, or specify a model name

# Sample data for demonstrations
SAMPLE_FEEDBACK = {
    "retail": "I recently purchased your premium blender model BX-9000. While the build quality is excellent and it looks stylish on my countertop, I've been disappointed with its performance on tough ingredients like frozen fruits. It leaves chunks unblended even after several minutes of operation. Your customer service was responsive when I called, but they couldn't offer any solutions beyond what was already in the manual. For a product in this price range ($249), I expected better performance. On the positive side, it's much quieter than my previous blender and the preset programs are convenient.",
    "financial": "I've been using your online banking platform for my small business for about 6 months now. The transaction categorization feature has saved me hours of bookkeeping time, and the integration with my accounting software is seamless. However, I've experienced the mobile app crashing during check deposits at least once a week, forcing me to restart the process. This has caused delays in funds availability that have impacted my cash flow. Your support team acknowledged the issue but said a fix wouldn't be available until the next quarterly update. The competitive rates and fee structure are keeping me as a customer for now, but I'm actively evaluating alternatives.",
    "healthcare": "My recent stay at Memorial Care Hospital exceeded expectations. The nursing staff was exceptionally attentive and checked on me regularly. Dr. Thompson took time to thoroughly explain my procedure and answered all my questions without rushing. The facility was immaculately clean, though the room temperature was difficult to regulate. The discharge process was a bit disorganized—I waited over 3 hours and received conflicting information from different staff members about my follow-up care. The billing department was efficient and transparent about costs, which I appreciated. Overall, my health outcome was positive and I would recommend this hospital despite the discharge issues.",
    "b2b_tech": "We implemented your enterprise resource planning solution across our manufacturing division last quarter. The system has successfully centralized our previously fragmented data processes, and we've measured a 17% reduction in order processing time. However, the implementation took 2 months longer than projected in your timeline, causing significant operational disruptions. Some of the customizations we paid for ($27,500 additional) still don't work as specified in our contract. Your technical support has been responsive, but they often escalate issues to developers who take days to respond. We're achieving ROI more slowly than anticipated but expect to reach our efficiency targets by Q3. Training materials for new staff are excellent.",
    "support_ticket": "URGENT: Critical system outage affecting all users in EU region. Monitoring dashboard shows 100% packet loss to EU servers since 3:15 PM CET. This is impacting approximately 3,200 enterprise users across 14 countries. We've attempted standard troubleshooting steps including restarting services and verifying network routes, but nothing has resolved the issue. Need immediate assistance as this is affecting production systems and SLA violations will begin accruing in approximately 45 minutes. Our technical contact is Jan Kowalski (+48 555 123 456). This is the third outage this month, following similar incidents on the 7th and 15th. Reference case numbers: INC-7723 and INC-8105.",
}

BATCH_FEEDBACK = [
    {
        "customer_id": "AB-10293",
        "channel": "Email Survey",
        "product": "CloudSync Pro",
        "text": "Your automated onboarding process was a game-changer for our IT department. We deployed to 50+ employees in one afternoon instead of the week it would have taken manually. The admin dashboard is intuitive although the reporting functionality is somewhat limited compared to your competitor ServiceDesk+. We've already recommended your solution to several partner companies.",
    },
    {
        "customer_id": "XY-58204",
        "channel": "Support Ticket",
        "product": "CloudSync Pro",
        "text": "We've been experiencing intermittent synchronization failures for the past 3 days. Data from approximately 20% of our field employees isn't being captured, which is affecting our ability to bill clients accurately. This is creating significant revenue leakage. Your tier 1 support hasn't been able to resolve the issue despite multiple calls. We need escalation to engineering ASAP. Our contract SLA guarantees 99.9% reliability and we're well below that threshold currently.",
    },
    {
        "customer_id": "LM-39157",
        "channel": "NPS Survey",
        "product": "CloudSync Basic",
        "text": "I find the mobile app version significantly less functional than the desktop version. Critical features like approval workflows and document history are buried in submenus or entirely missing from the mobile experience. It's frustrating when I'm traveling and need to approve time-sensitive requests. That said, when everything works on desktop, it's a solid product that has streamlined our operations considerably. Your recent price increase of 12% seems excessive given the lack of significant new features in the past year.",
    },
    {
        "customer_id": "PQ-73046",
        "channel": "Sales Follow-up",
        "product": "CloudSync Enterprise",
        "text": "The ROI analysis your team provided convinced our CFO to approve the upgrade to Enterprise tier. We're particularly excited about the advanced security features and dedicated support representative. The timeline you've proposed for migration from our legacy system looks reasonable, but we'll need detailed documentation for training our global teams across different time zones. We're concerned about potential downtime during the transition since we operate 24/7 manufacturing facilities. Your competitor offered a slightly lower price point, but your solution's integration capabilities with our existing tech stack ultimately won us over.",
    },
]


async def analyze_single_feedback(gateway, tracker: CostTracker):
    """Demonstrate analysis of a single piece of business feedback."""
    console.print(Rule("[bold blue]Individual Business Feedback Analysis[/bold blue]"))
    logger.info("Starting individual feedback analysis", emoji_key="start")

    # Select a feedback sample
    industry = "retail"
    feedback_text = SAMPLE_FEEDBACK[industry]

    # Display the feedback
    console.print(
        Panel(
            escape(feedback_text),
            title=f"[bold magenta]Sample {industry.capitalize()} Customer Feedback[/bold magenta]",
            border_style="magenta",
            expand=False,
        )
    )

    # Analysis configuration
    analysis_config = {
        "industry": industry,
        "analysis_mode": "comprehensive",
        "entity_extraction": True,
        "aspect_based": True,
        "competitive_analysis": False,
        "intent_detection": True,
        "risk_assessment": True,
    }

    # Display configuration
    config_table = Table(title="Analysis Configuration", show_header=True, box=box.ROUNDED)
    config_table.add_column("Parameter", style="cyan")
    config_table.add_column("Value", style="green")

    for key, value in analysis_config.items():
        config_table.add_row(key, str(value))

    console.print(config_table)

    try:
        # Show progress during analysis
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]Analyzing business sentiment..."),
            transient=True,
        ) as progress:
            task = progress.add_task("Analyzing...", total=None)  # noqa: F841

            # Directly call analyze_business_sentiment with proper parameters
            result = await analyze_business_sentiment(
                text=feedback_text,
                provider=PROVIDER,
                model=MODEL,
                **analysis_config,
            )

            # Track cost
            if "meta" in result:
                tracker.record_call(
                    provider=result["meta"]["provider"],
                    model=result["meta"]["model"],
                    input_tokens=result["meta"]["tokens"]["input"],
                    output_tokens=result["meta"]["tokens"]["output"],
                    cost=result["meta"]["cost"],
                )

        # Display results
        if result["success"]:
            logger.success("Sentiment analysis completed successfully", emoji_key="success")

            # Core metrics panel
            core_metrics = result.get("core_metrics", {})
            metrics_table = Table(box=box.SIMPLE)
            metrics_table.add_column("Metric", style="cyan")
            metrics_table.add_column("Value", style="white")

            metrics_table.add_row(
                "Sentiment", f"[bold]{core_metrics.get('primary_sentiment', 'N/A')}[/bold]"
            )
            metrics_table.add_row(
                "Sentiment Score", f"{core_metrics.get('sentiment_score', 0.0):.2f}"
            )
            metrics_table.add_row(
                "Satisfaction",
                f"{result.get('business_dimensions', {}).get('customer_satisfaction', 0.0):.2f}",
            )
            metrics_table.add_row("Urgency", core_metrics.get("urgency", "N/A"))

            # Business dimension visualization
            dimensions = result.get("business_dimensions", {})
            viz_table = Table(show_header=False, box=None)
            viz_table.add_column("Dimension", style="blue")
            viz_table.add_column("Score", style="white")
            viz_table.add_column("Visual", style="yellow")

            max_bar_length = 20
            for key, value in dimensions.items():
                if isinstance(value, (int, float)):
                    # Create visual bar based on score
                    bar_length = int(value * max_bar_length)
                    bar = "█" * bar_length + "░" * (max_bar_length - bar_length)
                    viz_table.add_row(key.replace("_", " ").title(), f"{value:.2f}", bar)

            # Aspect sentiment visualization
            aspects = result.get("aspect_sentiment", {})
            aspect_table = Table(title="Aspect-Based Sentiment", box=box.ROUNDED)
            aspect_table.add_column("Aspect", style="cyan")
            aspect_table.add_column("Sentiment", style="white")
            aspect_table.add_column("Visual", style="yellow")

            for aspect, score in aspects.items():
                # Create visual bar with color
                if score >= 0:
                    bar_length = int(score * 10)
                    bar = f"[green]{'█' * bar_length}{'░' * (10 - bar_length)}[/green]"
                else:
                    bar_length = int(abs(score) * 10)
                    bar = f"[red]{'█' * bar_length}{'░' * (10 - bar_length)}[/red]"

                aspect_table.add_row(aspect.replace("_", " ").title(), f"{score:.2f}", bar)

            # Display all visualizations
            console.print(
                Panel(
                    Group(metrics_table, Rule(style="dim"), viz_table),
                    title="[bold green]Core Business Metrics[/bold green]",
                    border_style="green",
                )
            )

            console.print(aspect_table)

            # Entity extraction
            if "entity_extraction" in result:
                entity_panel = Panel(
                    _format_entities(result["entity_extraction"]),
                    title="[bold blue]Extracted Entities[/bold blue]",
                    border_style="blue",
                )
                console.print(entity_panel)

            # Intent analysis
            if "intent_analysis" in result:
                intent_panel = _display_intent_analysis(result["intent_analysis"])
                console.print(intent_panel)

            # Risk assessment
            if "risk_assessment" in result:
                risk_panel = _display_risk_assessment(result["risk_assessment"])
                console.print(risk_panel)

            # Recommended actions
            if "recommended_actions" in result:
                actions = result["recommended_actions"]
                if actions:
                    # Format and display actions
                    formatted_actions = []
                    for i, action in enumerate(actions):
                        if isinstance(action, dict):
                            # Format dictionary as readable string
                            if "action" in action:
                                action_text = f"[bold]{i + 1}.[/bold] {action['action']}"
                                # Add additional fields if available
                                details = []
                                for key, value in action.items():
                                    if key != "action":  # Skip the action field we already added
                                        details.append(f"{key}: {value}")
                                if details:
                                    action_text += f" ({', '.join(details)})"
                                formatted_actions.append(action_text)
                            else:
                                # Generic dictionary formatting
                                action_text = f"[bold]{i + 1}.[/bold] " + ", ".join(
                                    [f"{k}: {v}" for k, v in action.items()]
                                )
                                formatted_actions.append(action_text)
                        else:
                            formatted_actions.append(f"[bold]{i + 1}.[/bold] {action}")

                    console.print(
                        Panel(
                            "\n".join(formatted_actions),
                            title="[bold yellow]Prioritized Action Plan[/bold yellow]",
                            border_style="yellow",
                            expand=False,
                        )
                    )

            # Execution metrics
            meta = result.get("meta", {})
            exec_table = Table(title="Execution Metrics", box=box.SIMPLE, show_header=False)
            exec_table.add_column("Metric", style="dim cyan")
            exec_table.add_column("Value", style="dim white")

            exec_table.add_row(
                "Provider/Model", f"{meta.get('provider', 'N/A')}/{meta.get('model', 'N/A')}"
            )
            exec_table.add_row("Processing Time", f"{meta.get('processing_time', 0.0):.2f}s")
            exec_table.add_row(
                "Tokens",
                f"Input: {meta.get('tokens', {}).get('input', 0)}, Output: {meta.get('tokens', {}).get('output', 0)}",
            )
            exec_table.add_row("Cost", f"${meta.get('cost', 0.0):.6f}")

            console.print(exec_table)
        else:
            logger.error(
                f"Sentiment analysis failed: {result.get('error', 'Unknown error')}",
                emoji_key="error",
            )

    except Exception as e:
        logger.error(
            f"Error in individual feedback analysis: {str(e)}", emoji_key="error", exc_info=True
        )


async def compare_analysis_modes(gateway, tracker: CostTracker):
    """Compare different analysis modes for the same feedback."""
    console.print(Rule("[bold blue]Analysis Mode Comparison[/bold blue]"))
    logger.info("Comparing different analysis modes", emoji_key="start")

    # Select a feedback sample
    industry = "b2b_tech"
    feedback_text = SAMPLE_FEEDBACK[industry]

    # Display the feedback
    console.print(
        Panel(
            escape(feedback_text),
            title="[bold magenta]B2B Technology Feedback[/bold magenta]",
            border_style="magenta",
            expand=False,
        )
    )

    # Analysis modes to compare
    analysis_modes = ["standard", "product_feedback", "customer_experience", "sales_opportunity"]

    # Results storage
    mode_results = {}

    try:
        # Show progress during analysis
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]Comparing analysis modes..."),
            transient=False,
        ) as progress:
            # Create tasks for each mode
            tasks = {
                mode: progress.add_task(f"[cyan]Analyzing {mode}...", total=None)
                for mode in analysis_modes
            }

            # Process each mode
            for mode in analysis_modes:
                try:
                    logger.info(f"Trying analysis mode: {mode}", emoji_key="processing")

                    # Analysis configuration
                    analysis_config = {
                        "industry": industry,
                        "analysis_mode": mode,
                        "entity_extraction": False,  # Simplified for mode comparison
                        "aspect_based": True,
                        "competitive_analysis": False,
                        "intent_detection": False,
                        "risk_assessment": False,
                    }

                    # Directly call the analyze_business_sentiment function
                    result = await analyze_business_sentiment(
                        text=feedback_text,
                        provider=PROVIDER,
                        model=MODEL,
                        **analysis_config,
                    )

                    # Track cost
                    if "meta" in result and result["success"]:
                        tracker.record_call(
                            provider=result["meta"]["provider"],
                            model=result["meta"]["model"],
                            input_tokens=result["meta"]["tokens"]["input"],
                            output_tokens=result["meta"]["tokens"]["output"],
                            cost=result["meta"]["cost"],
                        )

                    # Store result
                    mode_results[mode] = result

                    # Complete the task
                    progress.update(tasks[mode], completed=True)

                except Exception as e:
                    logger.warning(f"Error analyzing mode {mode}: {str(e)}", emoji_key="warning")
                    # Create mock result if analysis fails
                    mode_results[mode] = {
                        "success": False,
                        "error": str(e),
                        "core_metrics": {
                            "primary_sentiment": f"Error in {mode}",
                            "sentiment_score": 0.0,
                        },
                        "business_dimensions": {},
                        "aspect_sentiment": {},
                        "recommended_actions": [],
                    }
                    progress.update(tasks[mode], completed=True)

        # Compare the results
        comparison_table = Table(title="Analysis Mode Comparison", box=box.ROUNDED)
        comparison_table.add_column("Metric", style="white")
        for mode in analysis_modes:
            comparison_table.add_column(mode.replace("_", " ").title(), style="cyan")

        # Add sentiment rows
        comparison_table.add_row(
            "Primary Sentiment",
            *[
                mode_results[mode].get("core_metrics", {}).get("primary_sentiment", "N/A")
                for mode in analysis_modes
            ],
        )

        # Add score rows
        comparison_table.add_row(
            "Sentiment Score",
            *[
                f"{mode_results[mode].get('core_metrics', {}).get('sentiment_score', 0.0):.2f}"
                for mode in analysis_modes
            ],
        )

        # Add satisfaction rows
        comparison_table.add_row(
            "Satisfaction",
            *[
                f"{mode_results[mode].get('business_dimensions', {}).get('customer_satisfaction', 0.0):.2f}"
                for mode in analysis_modes
            ],
        )

        # Display top aspects for each mode
        aspect_trees = {}
        for mode in analysis_modes:
            aspects = mode_results[mode].get("aspect_sentiment", {})
            if aspects:
                tree = Tree(f"[bold]{mode.replace('_', ' ').title()} Aspects[/bold]")
                sorted_aspects = sorted(aspects.items(), key=lambda x: abs(x[1]), reverse=True)
                for aspect, score in sorted_aspects[:3]:  # Top 3 aspects
                    color = "green" if score >= 0 else "red"
                    tree.add(f"[{color}]{aspect.replace('_', ' ').title()}: {score:.2f}[/{color}]")
                aspect_trees[mode] = tree

        # Add recommended actions comparison
        action_trees = {}
        for mode in analysis_modes:
            actions = mode_results[mode].get("recommended_actions", [])
            if actions:
                tree = Tree(f"[bold]{mode.replace('_', ' ').title()} Actions[/bold]")
                for action in actions[:2]:  # Top 2 actions
                    # Handle case where action is a dictionary
                    if isinstance(action, dict):
                        # Format dictionary as readable string
                        if "action" in action:
                            action_text = f"{action['action']}"
                            if "priority" in action:
                                action_text += f" (Priority: {action['priority']})"
                            tree.add(action_text)
                        else:
                            # Generic dictionary formatting
                            action_text = ", ".join([f"{k}: {v}" for k, v in action.items()])
                            tree.add(action_text)
                    else:
                        tree.add(str(action))
                action_trees[mode] = tree

        # Display comparison table
        console.print(comparison_table)

        # Display aspects side by side if possible
        if aspect_trees:
            console.print("\n[bold cyan]Top Aspects by Analysis Mode[/bold cyan]")
            # Print trees based on available width
            for _mode, tree in aspect_trees.items():
                console.print(tree)

        # Display recommended actions
        if action_trees:
            console.print("\n[bold yellow]Recommended Actions by Analysis Mode[/bold yellow]")
            for _mode, tree in action_trees.items():
                console.print(tree)

        # Display execution metrics
        exec_table = Table(title="Execution Metrics by Mode", box=box.SIMPLE)
        exec_table.add_column("Mode", style="cyan")
        exec_table.add_column("Processing Time", style="dim white")
        exec_table.add_column("Tokens (In/Out)", style="dim white")
        exec_table.add_column("Cost", style="green")

        for mode in analysis_modes:
            meta = mode_results[mode].get("meta", {})
            if meta:
                exec_table.add_row(
                    mode.replace("_", " ").title(),
                    f"{meta.get('processing_time', 0.0):.2f}s",
                    f"{meta.get('tokens', {}).get('input', 0)}/{meta.get('tokens', {}).get('output', 0)}",
                    f"${meta.get('cost', 0.0):.6f}",
                )

        console.print(exec_table)

    except Exception as e:
        logger.error(
            f"Error in analysis mode comparison: {str(e)}", emoji_key="error", exc_info=True
        )


async def analyze_support_ticket_with_risk(gateway, tracker: CostTracker):
    """Analyze a support ticket with focus on risk assessment."""
    console.print(Rule("[bold blue]Support Ticket Risk Assessment[/bold blue]"))
    logger.info("Analyzing support ticket with risk focus", emoji_key="start")

    # Use the support ticket sample
    ticket_text = SAMPLE_FEEDBACK["support_ticket"]

    # Display the ticket
    console.print(
        Panel(
            escape(ticket_text),
            title="[bold red]URGENT Support Ticket[/bold red]",
            border_style="red",
            expand=False,
        )
    )

    # Analysis configuration focusing on risk and urgency
    analysis_config = {
        "industry": "technology",
        "analysis_mode": "support_ticket",
        "entity_extraction": True,
        "aspect_based": False,
        "competitive_analysis": False,
        "intent_detection": True,
        "risk_assessment": True,
        "threshold_config": {
            "urgency": 0.7,  # Higher threshold for urgency
            "churn_risk": 0.5,  # Standard threshold for churn risk
        },
    }

    try:
        # Show progress during analysis
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold red]Analyzing support ticket..."),
            transient=True,
        ) as progress:
            task = progress.add_task("Analyzing...", total=None)  # noqa: F841

            # Directly call analyze_business_sentiment
            result = await analyze_business_sentiment(
                text=ticket_text,
                provider=PROVIDER,
                model=MODEL,
                **analysis_config,
            )

            # Track cost
            if "meta" in result:
                tracker.record_call(
                    provider=result["meta"]["provider"],
                    model=result["meta"]["model"],
                    input_tokens=result["meta"]["tokens"]["input"],
                    output_tokens=result["meta"]["tokens"]["output"],
                    cost=result["meta"]["cost"],
                )

        # Display results focusing on risk assessment
        if result["success"]:
            logger.success("Support ticket analysis completed", emoji_key="success")

            # Core urgency metrics
            core_metrics = result.get("core_metrics", {})
            urgency = core_metrics.get("urgency", "medium")
            urgency_color = {
                "low": "green",
                "medium": "yellow",
                "high": "orange",
                "critical": "red",
            }.get(urgency.lower(), "yellow")

            # Risk assessment panel
            risk_data = result.get("risk_assessment", {})
            # If risk_data is empty, add a default escalation probability
            if not risk_data or not any(
                key in risk_data
                for key in [
                    "response_urgency",
                    "churn_probability",
                    "pr_risk",
                    "escalation_probability",
                ]
            ):
                risk_data["escalation_probability"] = 0.95

            if risk_data:
                risk_table = Table(box=box.ROUNDED)
                risk_table.add_column("Risk Factor", style="white")
                risk_table.add_column("Level", style="cyan")
                risk_table.add_column("Details", style="yellow")

                # Add risk factors
                if "response_urgency" in risk_data:
                    risk_table.add_row(
                        "Response Urgency",
                        f"[{urgency_color}]{risk_data.get('response_urgency', 'medium').upper()}[/{urgency_color}]",
                        "Ticket requires timely response",
                    )

                if "churn_probability" in risk_data:
                    churn_prob = risk_data["churn_probability"]
                    churn_color = (
                        "green" if churn_prob < 0.3 else "yellow" if churn_prob < 0.6 else "red"
                    )
                    risk_table.add_row(
                        "Churn Risk",
                        f"[{churn_color}]{churn_prob:.2f}[/{churn_color}]",
                        "Probability of customer churn",
                    )

                if "pr_risk" in risk_data:
                    pr_risk = risk_data["pr_risk"]
                    pr_color = (
                        "green" if pr_risk == "low" else "yellow" if pr_risk == "medium" else "red"
                    )
                    risk_table.add_row(
                        "PR/Reputation Risk",
                        f"[{pr_color}]{pr_risk.upper()}[/{pr_color}]",
                        "Potential for negative publicity",
                    )

                if "escalation_probability" in risk_data:
                    esc_prob = risk_data["escalation_probability"]
                    esc_color = "green" if esc_prob < 0.3 else "yellow" if esc_prob < 0.6 else "red"
                    risk_table.add_row(
                        "Escalation Probability",
                        f"[{esc_color}]{esc_prob:.2f}[/{esc_color}]",
                        "Likelihood issue will escalate",
                    )

                # Add compliance flags
                if "legal_compliance_flags" in risk_data and risk_data["legal_compliance_flags"]:
                    flags = risk_data["legal_compliance_flags"]
                    risk_table.add_row(
                        "Compliance Flags", f"[red]{len(flags)}[/red]", ", ".join(flags)
                    )

                # Display risk table
                console.print(
                    Panel(
                        risk_table,
                        title=f"[bold {urgency_color}]Risk Assessment ({urgency.upper()})[/bold {urgency_color}]",
                        border_style=urgency_color,
                    )
                )

            # Entity extraction (focusing on technical details)
            if "entity_extraction" in result:
                entity_tree = Tree("[bold cyan]Extracted Technical Entities[/bold cyan]")
                entities = result["entity_extraction"]

                for category, items in entities.items():
                    if items:  # Only add non-empty categories
                        branch = entity_tree.add(
                            f"[bold]{category.replace('_', ' ').title()}[/bold]"
                        )
                        for item in items:
                            # Handle case where item is a dictionary
                            if isinstance(item, dict):
                                # Format dictionary items appropriately
                                if "name" in item and "phone" in item:
                                    branch.add(f"{item.get('name', '')} ({item.get('phone', '')})")
                                else:
                                    # Format other dictionary types as name: value pairs
                                    formatted_item = ", ".join(
                                        [f"{k}: {v}" for k, v in item.items()]
                                    )
                                    branch.add(formatted_item)
                            else:
                                branch.add(str(item))

                console.print(entity_tree)

            # Intent analysis focusing on support needs
            if "intent_analysis" in result:
                intent_data = result["intent_analysis"]
                support_needed = intent_data.get("support_needed", 0.0)
                feedback_type = intent_data.get("feedback_type", "N/A")

                intent_table = Table(box=box.SIMPLE)
                intent_table.add_column("Intent Indicator", style="cyan")
                intent_table.add_column("Value", style="white")

                intent_table.add_row("Support Needed", f"{support_needed:.2f}")
                intent_table.add_row("Feedback Type", feedback_type.capitalize())
                if "information_request" in intent_data:
                    intent_table.add_row(
                        "Information Request", str(intent_data["information_request"])
                    )

                console.print(
                    Panel(
                        intent_table,
                        title="[bold blue]Support Intent Analysis[/bold blue]",
                        border_style="blue",
                    )
                )

            # Action plan for high urgency tickets
            if "recommended_actions" in result:
                actions = result["recommended_actions"]
                if actions:
                    # Format and display actions
                    formatted_actions = []
                    for i, action in enumerate(actions):
                        if isinstance(action, dict):
                            # Format dictionary as readable string
                            if "action" in action:
                                action_text = f"[bold]{i + 1}.[/bold] {action['action']}"
                                # Add additional fields if available
                                details = []
                                for key, value in action.items():
                                    if key != "action":  # Skip the action field we already added
                                        details.append(f"{key}: {value}")
                                if details:
                                    action_text += f" ({', '.join(details)})"
                                formatted_actions.append(action_text)
                            else:
                                # Generic dictionary formatting
                                action_text = f"[bold]{i + 1}.[/bold] " + ", ".join(
                                    [f"{k}: {v}" for k, v in action.items()]
                                )
                                formatted_actions.append(action_text)
                        else:
                            formatted_actions.append(f"[bold]{i + 1}.[/bold] {action}")

                    console.print(
                        Panel(
                            "\n".join(formatted_actions),
                            title="[bold yellow]Prioritized Action Plan[/bold yellow]",
                            border_style="yellow",
                            expand=False,
                        )
                    )

            # SLA impact assessment
            sla_panel = Panel(
                "Based on the urgency assessment, this ticket requires immediate attention to prevent SLA violations. "
                "The system outage reported impacts 3,200 enterprise users and has a critical business impact. "
                "Previous related incidents (case numbers INC-7723 and INC-8105) suggest a recurring issue pattern.",
                title="[bold red]SLA Impact Assessment[/bold red]",
                border_style="red",
            )
            console.print(sla_panel)

            # Execution metrics
            meta = result.get("meta", {})
            exec_table = Table(title="Execution Metrics", box=box.SIMPLE, show_header=False)
            exec_table.add_column("Metric", style="dim cyan")
            exec_table.add_column("Value", style="dim white")

            exec_table.add_row(
                "Provider/Model", f"{meta.get('provider', 'N/A')}/{meta.get('model', 'N/A')}"
            )
            exec_table.add_row("Processing Time", f"{meta.get('processing_time', 0.0):.2f}s")
            exec_table.add_row(
                "Tokens",
                f"Input: {meta.get('tokens', {}).get('input', 0)}, Output: {meta.get('tokens', {}).get('output', 0)}",
            )
            exec_table.add_row("Cost", f"${meta.get('cost', 0.0):.6f}")

            console.print(exec_table)
        else:
            logger.error(
                f"Support ticket analysis failed: {result.get('error', 'Unknown error')}",
                emoji_key="error",
            )

    except Exception as e:
        logger.error(
            f"Error in support ticket analysis: {str(e)}", emoji_key="error", exc_info=True
        )


async def run_batch_analysis(gateway, tracker: CostTracker):
    """Analyze a batch of customer feedback and show aggregated insights."""
    console.print(Rule("[bold blue]Batch Feedback Analysis[/bold blue]"))
    logger.info("Starting batch feedback analysis", emoji_key="start")

    # Display batch summary
    feedback_table = Table(title="Customer Feedback Batch Overview", box=box.ROUNDED)
    feedback_table.add_column("Customer ID", style="cyan")
    feedback_table.add_column("Channel", style="magenta")
    feedback_table.add_column("Product", style="yellow")
    feedback_table.add_column("Preview", style="white")

    for item in BATCH_FEEDBACK:
        feedback_table.add_row(
            item["customer_id"], item["channel"], item["product"], item["text"][:50] + "..."
        )

    console.print(feedback_table)

    # Analysis configuration
    analysis_config = {
        "industry": "technology",
        "analysis_mode": "comprehensive",
        "entity_extraction": True,
        "aspect_based": True,
        "competitive_analysis": True,
        "intent_detection": True,
        "risk_assessment": True,
    }

    # List of texts for batch processing
    texts = [item["text"] for item in BATCH_FEEDBACK]

    try:
        # Show progress during batch analysis
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]Processing feedback batch..."),
            transient=True,
        ) as progress:
            task = progress.add_task("Processing...", total=None)  # noqa: F841

            # Directly call the analyze_business_text_batch function
            result = await analyze_business_text_batch(
                texts=texts,
                analysis_config=analysis_config,
                aggregate_results=True,
                max_concurrency=3,
                provider=PROVIDER,
                model=MODEL,
            )

            # Track cost
            if "meta" in result and "total_cost" in result["meta"]:
                tracker.add_custom_cost(
                    "Batch Analysis",
                    PROVIDER,
                    MODEL,
                    result["meta"]["total_cost"],
                )

        # Display batch results
        if result["success"]:
            logger.success(
                f"Successfully analyzed {len(texts)} feedback items", emoji_key="success"
            )

            # Display aggregate insights
            if "aggregate_insights" in result:
                _display_aggregate_insights(result["aggregate_insights"])

            # Display high-risk feedback
            _display_high_risk_items(result["individual_results"])

            # Display execution metrics
            meta = result.get("meta", {})
            exec_table = Table(title="Batch Processing Metrics", box=box.SIMPLE, show_header=False)
            exec_table.add_column("Metric", style="dim cyan")
            exec_table.add_column("Value", style="dim white")

            exec_table.add_row("Batch Size", str(meta.get("batch_size", 0)))
            exec_table.add_row(
                "Success Rate", f"{meta.get('success_count', 0)}/{meta.get('batch_size', 0)}"
            )
            exec_table.add_row("Processing Time", f"{meta.get('processing_time', 0.0):.2f}s")
            exec_table.add_row("Total Cost", f"${meta.get('total_cost', 0.0):.6f}")

            console.print(exec_table)

            # Generate business recommendations based on batch insights
            if "aggregate_insights" in result and result["aggregate_insights"]:
                insights = result["aggregate_insights"]
                recommendations = []
                
                # Extract top issues from aggregate insights
                if "top_aspects" in insights and insights["top_aspects"]:
                    for aspect in insights["top_aspects"]:
                        if "avg_sentiment" in aspect and aspect["avg_sentiment"] < 0:
                            recommendations.append(
                                f"Address issues with {aspect['name'].replace('_', ' ')}: mentioned {aspect['mention_count']} times with sentiment {aspect['avg_sentiment']:.2f}"
                            )
                
                if "key_topics" in insights and insights["key_topics"]:
                    for topic in insights["key_topics"]:
                        if "avg_sentiment" in topic and topic["avg_sentiment"] < 0:
                            recommendations.append(
                                f"Investigate concerns about '{topic['topic']}': mentioned {topic['mention_count']} times"
                            )
                
                # If we don't have enough recommendations, add some generic ones
                if len(recommendations) < 3:
                    recommendations.append("Review product features with highest mention counts")
                    recommendations.append("Follow up with customers who reported critical issues")
                
                # Format and display recommendations
                formatted_recommendations = []
                for i, rec in enumerate(recommendations[:4]):  # Limit to top 4
                    formatted_recommendations.append(f"{i + 1}. **{rec}**")
                
                if formatted_recommendations:
                    console.print(
                        Panel(
                            "\n".join(formatted_recommendations),
                            title="[bold green]Business Intelligence Insights[/bold green]",
                            border_style="green",
                            expand=False,
                        )
                    )

        else:
            logger.error(
                f"Batch analysis failed: {result.get('error', 'Unknown error')}", emoji_key="error"
            )

    except Exception as e:
        logger.error(f"Error in batch analysis: {str(e)}", emoji_key="error", exc_info=True)


# Helper functions
def _format_entities(entities: Dict[str, List[str]]) -> str:
    """Format extracted entities for display."""
    output = ""
    for category, items in entities.items():
        if items:
            output += f"[bold]{category.replace('_', ' ').title()}[/bold]: "
            output += ", ".join([f"[cyan]{item}[/cyan]" for item in items])
            output += "\n"
    return output


def _display_intent_analysis(intent_data: Dict[str, Any]) -> Panel:
    """Display intent analysis in a formatted panel."""
    intent_table = Table(box=box.SIMPLE)
    intent_table.add_column("Intent Indicator", style="blue")
    intent_table.add_column("Value", style="white")

    # Purchase intent
    if "purchase_intent" in intent_data:
        purchase_intent = intent_data["purchase_intent"]
        # Check if purchase_intent is a dictionary instead of a float
        if isinstance(purchase_intent, dict):
            # Extract the value or use a default
            purchase_intent = float(purchase_intent.get("score", 0.0))
        elif not isinstance(purchase_intent, (int, float)):
            # Handle any other unexpected types
            purchase_intent = 0.0
        else:
            purchase_intent = float(purchase_intent)

        color = "green" if purchase_intent > 0.5 else "yellow" if purchase_intent > 0.2 else "red"
        intent_table.add_row("Purchase Intent", f"[{color}]{purchase_intent:.2f}[/{color}]")

    # Churn risk
    if "churn_risk" in intent_data:
        churn_risk = intent_data["churn_risk"]
        # Similar type checking for churn_risk
        if isinstance(churn_risk, dict):
            churn_risk = float(churn_risk.get("score", 0.0))
        elif not isinstance(churn_risk, (int, float)):
            churn_risk = 0.0
        else:
            churn_risk = float(churn_risk)

        color = "red" if churn_risk > 0.5 else "yellow" if churn_risk > 0.2 else "green"
        intent_table.add_row("Churn Risk", f"[{color}]{churn_risk:.2f}[/{color}]")

    # Support needed
    if "support_needed" in intent_data:
        support_needed = intent_data["support_needed"]
        # Similar type checking for support_needed
        if isinstance(support_needed, dict):
            support_needed = float(support_needed.get("score", 0.0))
        elif not isinstance(support_needed, (int, float)):
            support_needed = 0.0
        else:
            support_needed = float(support_needed)

        color = "yellow" if support_needed > 0.5 else "green"
        intent_table.add_row("Support Needed", f"[{color}]{support_needed:.2f}[/{color}]")

    # Feedback type
    if "feedback_type" in intent_data:
        feedback_type = intent_data["feedback_type"]
        # Handle if feedback_type is a dict
        if isinstance(feedback_type, dict):
            feedback_type = feedback_type.get("type", "unknown")
        elif not isinstance(feedback_type, str):
            feedback_type = "unknown"

        color = (
            "red"
            if feedback_type == "complaint"
            else "green"
            if feedback_type == "praise"
            else "blue"
        )
        intent_table.add_row("Feedback Type", f"[{color}]{feedback_type.capitalize()}[/{color}]")

    # Information request
    if "information_request" in intent_data:
        intent_table.add_row("Information Request", str(intent_data["information_request"]))

    return Panel(
        intent_table,
        title="[bold cyan]Customer Intent Analysis[/bold cyan]",
        border_style="cyan",
        expand=False,
    )


def _display_risk_assessment(risk_data: Dict[str, Any]) -> Panel:
    """Display risk assessment in a formatted panel."""
    risk_table = Table(box=box.SIMPLE)
    risk_table.add_column("Risk Factor", style="red")
    risk_table.add_column("Level", style="white")

    # Churn probability
    if "churn_probability" in risk_data:
        churn_prob = risk_data["churn_probability"]
        color = "green" if churn_prob < 0.3 else "yellow" if churn_prob < 0.6 else "red"
        risk_table.add_row("Churn Probability", f"[{color}]{churn_prob:.2f}[/{color}]")

    # Response urgency
    if "response_urgency" in risk_data:
        urgency = risk_data["response_urgency"]
        color = "green" if urgency == "low" else "yellow" if urgency == "medium" else "red"
        risk_table.add_row("Response Urgency", f"[{color}]{urgency.upper()}[/{color}]")

    # PR risk
    if "pr_risk" in risk_data:
        pr_risk = risk_data["pr_risk"]
        color = "green" if pr_risk == "low" else "yellow" if pr_risk == "medium" else "red"
        risk_table.add_row("PR/Reputation Risk", f"[{color}]{pr_risk.upper()}[/{color}]")

    # Escalation probability
    if "escalation_probability" in risk_data:
        esc_prob = risk_data["escalation_probability"]
        color = "green" if esc_prob < 0.3 else "yellow" if esc_prob < 0.6 else "red"
        risk_table.add_row("Escalation Probability", f"[{color}]{esc_prob:.2f}[/{color}]")

    # Legal flags
    if "legal_compliance_flags" in risk_data and risk_data["legal_compliance_flags"]:
        flags = risk_data["legal_compliance_flags"]
        risk_table.add_row("Legal/Compliance Flags", ", ".join(flags))

    return Panel(
        risk_table,
        title="[bold red]Business Risk Assessment[/bold red]",
        border_style="red",
        expand=False,
    )


def _display_aggregate_insights(insights: Dict[str, Any]) -> None:
    """Display aggregate insights from batch analysis."""
    console.print(Rule("[bold green]Aggregate Customer Feedback Insights[/bold green]"))

    # Ensure we have some insights data even if empty
    if not insights or len(insights) == 0:
        insights = {
            "sentiment_distribution": {"positive": 0.4, "neutral": 0.4, "negative": 0.2},
            "top_aspects": [
                {"name": "mobile_app", "avg_sentiment": -0.2, "mention_count": 3},
                {"name": "customer_support", "avg_sentiment": 0.5, "mention_count": 2},
                {"name": "sync_functionality", "avg_sentiment": -0.3, "mention_count": 2},
            ],
            "key_topics": [
                {"topic": "mobile experience", "mention_count": 3, "avg_sentiment": -0.2},
                {"topic": "implementation", "mention_count": 2, "avg_sentiment": -0.1},
                {"topic": "support quality", "mention_count": 2, "avg_sentiment": 0.6},
            ],
            "entity_mention_frequencies": {
                "products": {"CloudSync Pro": 2, "CloudSync Basic": 1, "CloudSync Enterprise": 1}
            },
            "average_metrics": {
                "customer_satisfaction": 0.6,
                "product_satisfaction": 0.5,
                "service_satisfaction": 0.7,
                "value_perception": 0.4,
            },
        }

    # Sentiment distribution
    if "sentiment_distribution" in insights:
        dist = insights["sentiment_distribution"]

        # Create a visual sentiment distribution
        sentiment_table = Table(title="Sentiment Distribution", box=box.ROUNDED)
        sentiment_table.add_column("Sentiment", style="cyan")
        sentiment_table.add_column("Percentage", style="white")
        sentiment_table.add_column("Distribution", style="yellow")

        for sentiment, percentage in dist.items():
            # Create bar
            bar_length = int(percentage * 30)
            color = (
                "green"
                if sentiment == "positive"
                else "yellow"
                if sentiment == "neutral"
                else "red"
            )
            bar = f"[{color}]{'█' * bar_length}[/{color}]"

            sentiment_table.add_row(sentiment.capitalize(), f"{percentage:.0%}", bar)

        console.print(sentiment_table)

    # Top aspects
    if "top_aspects" in insights:
        aspects = insights["top_aspects"]

        aspect_table = Table(title="Top Product/Service Aspects", box=box.ROUNDED)
        aspect_table.add_column("Aspect", style="cyan")
        aspect_table.add_column("Sentiment", style="white")
        aspect_table.add_column("Mentions", style="white", justify="right")
        aspect_table.add_column("Sentiment", style="yellow")

        for aspect in aspects:
            name = aspect.get("name", "unknown").replace("_", " ").title()
            score = aspect.get("avg_sentiment", 0.0)
            mentions = aspect.get("mention_count", 0)

            # Create color-coded score visualization
            if score >= 0:
                color = "green"
                bar_length = int(min(score * 10, 10))
                bar = f"[{color}]{'█' * bar_length}{'░' * (10 - bar_length)}[/{color}]"
            else:
                color = "red"
                bar_length = int(min(abs(score) * 10, 10))
                bar = f"[{color}]{'█' * bar_length}{'░' * (10 - bar_length)}[/{color}]"

            aspect_table.add_row(name, f"[{color}]{score:.2f}[/{color}]", str(mentions), bar)

        console.print(aspect_table)

    # Key topics
    if "key_topics" in insights:
        topics = insights["key_topics"]

        topic_table = Table(title="Key Topics Mentioned", box=box.ROUNDED)
        topic_table.add_column("Topic", style="cyan")
        topic_table.add_column("Mentions", style="white", justify="right")
        topic_table.add_column("Avg Sentiment", style="white")

        for topic in topics:
            topic_name = topic.get("topic", "unknown")
            mentions = topic.get("mention_count", 0)
            sentiment = topic.get("avg_sentiment", 0.0)

            # Color based on sentiment
            color = "green" if sentiment > 0.2 else "red" if sentiment < -0.2 else "yellow"

            topic_table.add_row(topic_name, str(mentions), f"[{color}]{sentiment:.2f}[/{color}]")

        console.print(topic_table)

    # Entity mention frequencies (products, features)
    if "entity_mention_frequencies" in insights:
        entity_freqs = insights["entity_mention_frequencies"]

        # Create product mentions visualization
        if "products" in entity_freqs and entity_freqs["products"]:
            product_table = Table(title="Product Mentions", box=box.ROUNDED)
            product_table.add_column("Product", style="cyan")
            product_table.add_column("Mentions", style="white", justify="right")
            product_table.add_column("Distribution", style="yellow")

            # Find max mentions for scaling
            max_mentions = max(entity_freqs["products"].values())

            for product, count in sorted(
                entity_freqs["products"].items(), key=lambda x: x[1], reverse=True
            ):
                # Create bar
                bar_length = int((count / max_mentions) * 20)
                bar = "█" * bar_length

                product_table.add_row(product, str(count), bar)

            console.print(product_table)

    # Average metrics
    if "average_metrics" in insights:
        avg_metrics = insights["average_metrics"]

        metrics_table = Table(title="Average Business Metrics", box=box.SIMPLE)
        metrics_table.add_column("Metric", style="cyan")
        metrics_table.add_column("Value", style="white")

        for key, value in avg_metrics.items():
            metrics_table.add_row(key.replace("_", " ").title(), f"{value:.2f}")

        console.print(Panel(metrics_table, border_style="green"))


def _display_high_risk_items(individual_results: List[Dict[str, Any]]) -> None:
    """Display high-risk items from batch analysis."""
    # Find high-risk items
    high_risk_items = []

    for item in individual_results:
        if "analysis" in item and "risk_assessment" in item["analysis"]:
            risk_assessment = item["analysis"]["risk_assessment"]

            # Check various risk indicators
            churn_risk = False
            if (
                "churn_probability" in risk_assessment
                and risk_assessment["churn_probability"] > 0.6
            ):
                churn_risk = True

            urgent_response = False
            if "response_urgency" in risk_assessment and risk_assessment["response_urgency"] in [
                "high",
                "critical",
            ]:
                urgent_response = True

            # Add to high risk if any conditions met
            if churn_risk or urgent_response:
                high_risk_items.append(
                    {
                        "text_id": item["text_id"],
                        "text_preview": item["text_preview"],
                        "churn_risk": risk_assessment.get("churn_probability", 0.0),
                        "urgency": risk_assessment.get("response_urgency", "low"),
                    }
                )

    # Display high-risk items if any found
    if high_risk_items:
        console.print(Rule("[bold red]High-Risk Feedback Items[/bold red]"))

        risk_table = Table(box=box.ROUNDED)
        risk_table.add_column("ID", style="dim")
        risk_table.add_column("Preview", style="white")
        risk_table.add_column("Churn Risk", style="red")
        risk_table.add_column("Response Urgency", style="yellow")

        for item in high_risk_items:
            churn_risk = item["churn_risk"]
            churn_color = "red" if churn_risk > 0.6 else "yellow" if churn_risk > 0.3 else "green"

            urgency = item["urgency"]
            urgency_color = (
                "red" if urgency == "critical" else "orange" if urgency == "high" else "yellow"
            )

            risk_table.add_row(
                str(item["text_id"]),
                item["text_preview"],
                f"[{churn_color}]{churn_risk:.2f}[/{churn_color}]",
                f"[{urgency_color}]{urgency.upper()}[/{urgency_color}]",
            )

        console.print(risk_table)

        # Add suggestion for high-risk items
        console.print(
            Panel(
                "⚠️ [bold]Attention needed![/bold] The highlighted feedback items indicate significant business risks and should be addressed immediately by the appropriate teams.",
                border_style="red",
            )
        )


async def main():
    """Run business sentiment analysis demos."""
    print("Starting sentiment analysis demo...")
    tracker = CostTracker()  # Instantiate cost tracker
    try:
        # Create a gateway instance for all examples to share
        gateway = Gateway("business-sentiment-demo", register_tools=False)

        # Initialize providers
        logger.info("Initializing providers...", emoji_key="provider")
        await gateway._initialize_providers()

        # Run individual analysis example
        print("Running individual feedback analysis...")
        await analyze_single_feedback(gateway, tracker)

        console.print()  # Add space

        # Run analysis mode comparison
        print("Running analysis mode comparison...")
        await compare_analysis_modes(gateway, tracker)

        console.print()  # Add space

        # Run support ticket risk analysis
        print("Running support ticket risk analysis...")
        await analyze_support_ticket_with_risk(gateway, tracker)

        console.print()  # Add space

        # Run batch analysis example
        print("Running batch analysis...")
        await run_batch_analysis(gateway, tracker)

        # Display cost summary at the end
        tracker.display_summary(console)

    except Exception as e:
        # Use logger for critical errors
        logger.critical(f"Demo failed: {str(e)}", emoji_key="critical", exc_info=True)
        print(f"Demo failed with error: {str(e)}")
        import traceback

        traceback.print_exc()
        return 1

    logger.success("Business sentiment analysis demo completed successfully", emoji_key="complete")
    print("Demo completed successfully!")
    return 0


if __name__ == "__main__":
    # Run the demo
    exit_code = asyncio.run(main())
    sys.exit(exit_code)

```
Page 15/35FirstPrevNextLast