This is page 15 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/optimization.py:
--------------------------------------------------------------------------------
```python
"""Tools for LLM cost estimation, model comparison, recommendation, and workflow execution.
Provides utilities to help manage LLM usage costs and select appropriate models.
"""
import asyncio
import json
import os
import time
import traceback
from typing import Any, Dict, List, Optional, Set
import networkx as nx
from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS
from ultimate_mcp_server.exceptions import ToolError, ToolInputError
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.tools.completion import chat_completion
from ultimate_mcp_server.tools.document_conversion_and_processing import (
chunk_document,
summarize_document,
)
from ultimate_mcp_server.tools.extraction import extract_json
from ultimate_mcp_server.tools.rag import (
add_documents,
create_knowledge_base,
generate_with_rag,
retrieve_context,
)
from ultimate_mcp_server.tools.text_classification import text_classification
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.text import count_tokens
logger = get_logger("ultimate_mcp_server.tools.optimization")
# --- Constants for Speed Score Mapping ---
# Define bins for mapping tokens/second to a 1-5 score (lower is faster)
# Adjust these thresholds based on observed performance and desired sensitivity
SPEED_SCORE_BINS = [
(200, 1), # > 200 tokens/s -> Score 1 (Fastest)
(100, 2), # 100-200 tokens/s -> Score 2
(50, 3), # 50-100 tokens/s -> Score 3
(20, 4), # 20-50 tokens/s -> Score 4
(0, 5), # 0-20 tokens/s -> Score 5 (Slowest)
]
DEFAULT_SPEED_SCORE = 3 # Fallback score if measurement is missing/invalid or hardcoded value is missing
def _map_tok_per_sec_to_score(tokens_per_sec: float) -> int:
"""Maps measured tokens/second to a 1-5 speed score (lower is faster)."""
if tokens_per_sec is None or not isinstance(tokens_per_sec, (int, float)) or tokens_per_sec < 0:
return DEFAULT_SPEED_SCORE # Return default for invalid input
for threshold, score in SPEED_SCORE_BINS:
if tokens_per_sec >= threshold:
return score
return SPEED_SCORE_BINS[-1][1] # Should hit the 0 threshold if positive
@with_tool_metrics
@with_error_handling
async def estimate_cost(
prompt: str,
model: str, # Can be full 'provider/model_name' or just 'model_name' if unique
max_tokens: Optional[int] = None,
include_output: bool = True
) -> Dict[str, Any]:
"""Estimates the monetary cost of an LLM request without executing it.
Calculates cost based on input prompt tokens and estimated/specified output tokens
using predefined cost rates for the specified model.
Args:
prompt: The text prompt that would be sent to the model.
model: The model identifier (e.g., "openai/gpt-4.1-mini", "gpt-4.1-mini",
"anthropic/claude-3-5-haiku-20241022", "claude-3-5-haiku-20241022").
Cost data must be available for the resolved model name in `COST_PER_MILLION_TOKENS`.
max_tokens: (Optional) The maximum number of tokens expected in the output. If None,
output tokens are estimated as roughly half the input prompt tokens.
include_output: (Optional) If False, calculates cost based only on input tokens, ignoring
`max_tokens` or output estimation. Defaults to True.
Returns:
A dictionary containing the cost estimate and token breakdown:
{
"cost": 0.000150, # Total estimated cost in USD
"breakdown": {
"input_cost": 0.000100,
"output_cost": 0.000050
},
"tokens": {
"input": 200, # Tokens counted from the prompt
"output": 100, # Estimated or provided max_tokens
"total": 300
},
"rate": { # Cost per million tokens for this model
"input": 0.50,
"output": 1.50
},
"model": "gpt-4.1-mini", # Returns the original model string passed as input
"resolved_model_key": "gpt-4.1-mini", # The key used for cost lookup
"is_estimate": true
}
Raises:
ToolInputError: If prompt or model format is invalid.
ToolError: If the specified `model` cannot be resolved to cost data.
ValueError: If token counting fails for the given model and prompt.
"""
# Input validation
if not prompt or not isinstance(prompt, str):
raise ToolInputError("Prompt must be a non-empty string.")
if not model or not isinstance(model, str):
raise ToolInputError("Model must be a non-empty string.")
# Flexible Cost Data Lookup
cost_data = COST_PER_MILLION_TOKENS.get(model)
resolved_model_key = model # Assume direct match first
model_name_only = model # Use input model for token counting initially
if not cost_data and '/' in model:
# If direct lookup fails and it looks like a prefixed name, try stripping prefix
potential_short_key = model.split('/')[-1]
cost_data = COST_PER_MILLION_TOKENS.get(potential_short_key)
if cost_data:
resolved_model_key = potential_short_key
model_name_only = potential_short_key # Use short name for token count
# If short key also fails, cost_data remains None
if not cost_data:
error_message = f"Unknown model or cost data unavailable for: {model}"
raise ToolError(error_message, error_code="MODEL_NOT_FOUND", details={"model": model})
# Token Counting (use model_name_only derived from successful cost key)
try:
input_tokens = count_tokens(prompt, model=model_name_only)
except ValueError as e:
# Log warning with the original model input for clarity
logger.warning(f"Could not count tokens for model '{model}' (using '{model_name_only}' for tiktoken): {e}. Using rough estimate.")
input_tokens = len(prompt) // 4 # Fallback estimate
# Estimate output tokens if needed
estimated_output_tokens = 0
if include_output:
if max_tokens is not None:
estimated_output_tokens = max_tokens
else:
estimated_output_tokens = input_tokens // 2
logger.debug(f"max_tokens not provided, estimating output tokens as {estimated_output_tokens}")
else:
estimated_output_tokens = 0
# Calculate costs
input_cost = (input_tokens / 1_000_000) * cost_data["input"]
output_cost = (estimated_output_tokens / 1_000_000) * cost_data["output"]
total_cost = input_cost + output_cost
logger.info(f"Estimated cost for model '{model}' (using key '{resolved_model_key}'): ${total_cost:.6f} (In: {input_tokens} tokens, Out: {estimated_output_tokens} tokens)")
return {
"cost": total_cost,
"breakdown": {
"input_cost": input_cost,
"output_cost": output_cost
},
"tokens": {
"input": input_tokens,
"output": estimated_output_tokens,
"total": input_tokens + estimated_output_tokens
},
"rate": {
"input": cost_data["input"],
"output": cost_data["output"]
},
"model": model, # Return original input model string
"resolved_model_key": resolved_model_key, # Key used for cost lookup
"is_estimate": True
}
@with_tool_metrics
@with_error_handling
async def compare_models(
prompt: str,
models: List[str], # List of model IDs (can be short or full names)
max_tokens: Optional[int] = None,
include_output: bool = True
) -> Dict[str, Any]:
"""Compares the estimated cost of running a prompt across multiple specified models.
Uses the `estimate_cost` tool for each model in the list concurrently.
Args:
prompt: The text prompt to use for cost comparison.
models: A list of model identifiers (e.g., ["openai/gpt-4.1-mini", "gpt-4.1-mini", "claude-3-5-haiku-20241022"]).
`estimate_cost` will handle resolving these to cost data.
max_tokens: (Optional) Maximum output tokens to assume for cost estimation across all models.
If None, output is estimated individually per model based on input.
include_output: (Optional) Whether to include estimated output costs in the comparison. Defaults to True.
Returns:
A dictionary containing the cost comparison results:
{
"models": {
"openai/gpt-4.1-mini": { # Uses the input model name as key
"cost": 0.000150,
"tokens": { "input": 200, "output": 100, "total": 300 }
},
"claude-3-5-haiku-20241022": {
"cost": 0.000087,
"tokens": { "input": 200, "output": 100, "total": 300 }
},
"some-unknown-model": { # Example of an error during estimation
"error": "Unknown model or cost data unavailable for: some-unknown-model"
}
},
"ranking": [ # List of input model names ordered by cost (cheapest first), errors excluded
"claude-3-5-haiku-20241022",
"openai/gpt-4.1-mini"
],
"cheapest": "claude-3-5-haiku-20241022", # Input model name with the lowest cost
"most_expensive": "openai/gpt-4.1-mini", # Input model name with the highest cost
"prompt_length_chars": 512,
"max_tokens_assumed": 100
}
Raises:
ToolInputError: If the `models` list is empty.
"""
if not models or not isinstance(models, list):
raise ToolInputError("'models' must be a non-empty list of model identifiers.")
# Removed the check for '/' in model names - estimate_cost will handle resolution
results = {}
estimated_output_for_summary = None
async def get_estimate(model_input_name): # Use a distinct variable name
nonlocal estimated_output_for_summary
try:
estimate = await estimate_cost(
prompt=prompt,
model=model_input_name, # Pass the potentially short/full name
max_tokens=max_tokens,
include_output=include_output
)
# Use the original input name as the key in results
results[model_input_name] = {
"cost": estimate["cost"],
"tokens": estimate["tokens"],
}
if estimated_output_for_summary is None:
estimated_output_for_summary = estimate["tokens"]["output"]
except ToolError as e:
logger.warning(f"Could not estimate cost for model {model_input_name}: {e.detail}")
results[model_input_name] = {"error": e.detail} # Store error under original name
except Exception as e:
logger.error(f"Unexpected error estimating cost for model {model_input_name}: {e}", exc_info=True)
results[model_input_name] = {"error": f"Unexpected error: {str(e)}"}
await asyncio.gather(*(get_estimate(model_name) for model_name in models))
successful_estimates = {m: r for m, r in results.items() if "error" not in r}
sorted_models = sorted(successful_estimates.items(), key=lambda item: item[1]["cost"])
output_tokens_summary = estimated_output_for_summary if max_tokens is None else max_tokens
if not include_output:
output_tokens_summary = 0
cheapest_model = sorted_models[0][0] if sorted_models else None
most_expensive_model = sorted_models[-1][0] if sorted_models else None
logger.info(f"Compared models: {list(results.keys())}. Cheapest: {cheapest_model or 'N/A'}")
return {
"models": results,
"ranking": [m for m, _ in sorted_models], # Ranking uses original input names
"cheapest": cheapest_model,
"most_expensive": most_expensive_model,
"prompt_length_chars": len(prompt),
"max_tokens_assumed": output_tokens_summary,
}
@with_tool_metrics
@with_error_handling
async def recommend_model(
task_type: str,
expected_input_length: int, # In characters
expected_output_length: Optional[int] = None, # In characters
required_capabilities: Optional[List[str]] = None,
max_cost: Optional[float] = None,
priority: str = "balanced" # Options: "cost", "quality", "speed", "balanced"
) -> Dict[str, Any]:
"""Recommends suitable LLM models based on task requirements and optimization priority.
Evaluates known models against criteria like task type suitability (inferred),
estimated cost (based on expected lengths), required capabilities,
measured speed (tokens/sec if available), and quality metrics.
Args:
task_type: A description of the task (e.g., "summarization", "code generation", "entity extraction",
"customer support chat", "complex reasoning question"). Used loosely for capability checks.
expected_input_length: Estimated length of the input text in characters.
expected_output_length: (Optional) Estimated length of the output text in characters.
If None, it's roughly estimated based on input length.
required_capabilities: (Optional) A list of specific capabilities the model MUST possess.
Current known capabilities include: "reasoning", "coding", "knowledge",
"instruction-following", "math". Check model metadata for supported values.
Example: ["coding", "instruction-following"]
max_cost: (Optional) The maximum acceptable estimated cost (in USD) for a single run
with the expected input/output lengths. Models exceeding this are excluded.
priority: (Optional) The primary factor for ranking suitable models.
Options:
- "cost": Prioritize the cheapest models.
- "quality": Prioritize models with the highest quality score.
- "speed": Prioritize models with the highest measured speed (tokens/sec).
- "balanced": (Default) Attempt to find a good mix of cost, quality, and speed.
Returns:
A dictionary containing model recommendations:
{
"recommendations": [
{
"model": "anthropic/claude-3-5-haiku-20241022",
"estimated_cost": 0.000087,
"quality_score": 7,
"measured_speed_tps": 50.63, # Tokens per second
"capabilities": ["knowledge", "instruction-following"],
"reason": "Good balance of cost and speed, meets requirements."
},
{
"model": "openai/gpt-4.1-mini",
"estimated_cost": 0.000150,
"quality_score": 7,
"measured_speed_tps": 112.06,
"capabilities": ["reasoning", "coding", ...],
"reason": "Higher cost, but good quality/speed."
}
# ... other suitable models
],
"parameters": { # Input parameters for context
"task_type": "summarization",
"expected_input_length": 2000,
"expected_output_length": 500,
"required_capabilities": [],
"max_cost": 0.001,
"priority": "balanced"
},
"excluded_models": { # Models evaluated but excluded, with reasons
"anthropic/claude-3-opus-20240229": "Exceeds max cost ($0.0015 > $0.001)",
"some-other-model": "Missing required capabilities: ['coding']"
}
}
Raises:
ToolInputError: If priority is invalid or lengths are non-positive.
"""
if expected_input_length <= 0:
raise ToolInputError("expected_input_length must be positive.")
if expected_output_length is not None and expected_output_length <= 0:
raise ToolInputError("expected_output_length must be positive if provided.")
if priority not in ["cost", "quality", "speed", "balanced"]:
raise ToolInputError(f"Invalid priority: '{priority}'. Must be cost, quality, speed, or balanced.")
# --- Load Measured Speed Data ---
measured_speeds: Dict[str, Any] = {}
measured_speeds_file = "empirically_measured_model_speeds.json"
project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
filepath = os.path.join(project_root, measured_speeds_file)
if os.path.exists(filepath):
try:
with open(filepath, 'r') as f:
measured_speeds = json.load(f)
logger.info(f"Successfully loaded measured speed data from {filepath}")
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
logger.warning(f"Could not load or parse measured speed data from {filepath}: {e}. Speed data will be 0.", exc_info=True)
measured_speeds = {}
else:
logger.info(f"Measured speed file not found at {filepath}. Speed data will be 0.")
# --- End Load Measured Speed Data ---
# --- Model Metadata (Updated based on provided images) ---
model_capabilities = {
# OpenAI models
"openai/gpt-4o": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Assuming multimodal based on general knowledge
"openai/gpt-4o-mini": ["reasoning", "knowledge", "instruction-following"],
"openai/gpt-4.1": ["reasoning", "coding", "knowledge", "instruction-following", "math"],
"openai/gpt-4.1-mini": ["reasoning", "coding", "knowledge", "instruction-following"],
"openai/gpt-4.1-nano": ["reasoning", "knowledge", "instruction-following"], # Added reasoning
"openai/o1-preview": ["reasoning", "coding", "knowledge", "instruction-following", "math"],
"openai/o1": ["reasoning", "coding", "knowledge", "instruction-following", "math"], # Keep guess
"openai/o3-mini": ["reasoning", "knowledge", "instruction-following"],
# Anthropic models
"anthropic/claude-3-opus-20240229": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"],
"anthropic/claude-3-sonnet-20240229": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Previous Sonnet version
"anthropic/claude-3-5-haiku-20241022": ["knowledge", "instruction-following", "multimodal"], # Based on 3.5 Haiku column
"anthropic/claude-3-5-sonnet-20241022": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Based on 3.5 Sonnet column
"anthropic/claude-3-7-sonnet-20250219": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Based on 3.7 Sonnet column
# DeepSeek models
"deepseek/deepseek-chat": ["coding", "knowledge", "instruction-following"],
"deepseek/deepseek-reasoner": ["reasoning", "math", "instruction-following"],
# Gemini models
"gemini/gemini-2.0-flash-lite": ["knowledge", "instruction-following"],
"gemini/gemini-2.0-flash": ["knowledge", "instruction-following", "multimodal"],
"gemini/gemini-2.0-flash-thinking-exp-01-21": ["reasoning", "coding", "knowledge", "instruction-following", "multimodal"],
"gemini/gemini-2.5-pro-preview-03-25": ["reasoning", "coding", "knowledge", "instruction-following", "math", "multimodal"], # Map from gemini-2.5-pro-preview-03-25
# Grok models (Estimates)
"grok/grok-3-latest": ["reasoning", "knowledge", "instruction-following", "math"],
"grok/grok-3-fast-latest": ["reasoning", "knowledge", "instruction-following"],
"grok/grok-3-mini-latest": ["knowledge", "instruction-following"],
"grok/grok-3-mini-fast-latest": ["knowledge", "instruction-following"],
# OpenRouter models
# Note: Capabilities depend heavily on the underlying model proxied by OpenRouter.
# This is a generic entry for the one model listed in constants.py.
"openrouter/mistralai/mistral-nemo": ["knowledge", "instruction-following", "coding"] # Estimate based on Mistral family
}
model_speed_fallback = {}
model_quality = {
"openai/gpt-4o": 8, # Updated
"openai/gpt-4.1-mini": 7,
"openai/gpt-4o-mini": 6,
"openai/gpt-4.1": 8,
"openai/gpt-4.1-nano": 5,
"openai/o1-preview": 10,
"openai/o3-mini": 7,
"anthropic/claude-3-opus-20240229": 10,
"anthropic/claude-3-sonnet-20240229": 8,
"anthropic/claude-3-5-haiku-20241022": 7,
"anthropic/claude-3-5-sonnet-20241022": 9,
"anthropic/claude-3-7-sonnet-20250219": 10,
"deepseek/deepseek-chat": 7,
"deepseek/deepseek-reasoner": 8,
"gemini/gemini-2.0-flash-lite": 5,
"gemini/gemini-2.0-flash": 6,
"gemini/gemini-2.0-flash-thinking-exp-01-21": 6,
"gemini/gemini-2.5-pro-preview-03-25": 9,
# Grok models (Estimates: 1-10 scale)
"grok/grok-3-latest": 9,
"grok/grok-3-fast-latest": 8,
"grok/grok-3-mini-latest": 6,
"grok/grok-3-mini-fast-latest": 6,
# OpenRouter models (Estimates: 1-10 scale)
"openrouter/mistralai/mistral-nemo": 7 # Estimate based on Mistral family
}
# --- End Model Metadata ---
# --- Pre-calculate model metadata lookups ---
# Combine all known prefixed model names from metadata sources
all_prefixed_metadata_keys = set(model_capabilities.keys()) | set(model_speed_fallback.keys()) | set(model_quality.keys())
# Create a map from short names (e.g., "gpt-4.1-mini") to prefixed names (e.g., "openai/gpt-4.1-mini")
# Handle potential ambiguities (same short name from different providers)
short_to_prefixed_map: Dict[str, Optional[str]] = {}
ambiguous_short_names = set()
for key in all_prefixed_metadata_keys:
if '/' in key:
short_name = key.split('/')[-1]
if short_name in short_to_prefixed_map:
# Ambiguity detected
if short_name not in ambiguous_short_names:
logger.warning(f"Ambiguous short model name '{short_name}' found. Maps to '{short_to_prefixed_map[short_name]}' and '{key}'. Will require full name for this model.")
short_to_prefixed_map[short_name] = None # Mark as ambiguous
ambiguous_short_names.add(short_name)
elif short_name not in ambiguous_short_names:
short_to_prefixed_map[short_name] = key # Store unique mapping
# Helper function to find the prefixed name for a cost key (using pre-calculated map)
_prefixed_name_cache = {}
def _get_prefixed_name_for_cost_key(cost_key: str) -> Optional[str]:
if cost_key in _prefixed_name_cache:
return _prefixed_name_cache[cost_key]
# If the key is already prefixed, use it directly
if '/' in cost_key:
if cost_key in all_prefixed_metadata_keys:
_prefixed_name_cache[cost_key] = cost_key
return cost_key
else:
# Even if prefixed, if it's not in our known metadata, treat as unknown for consistency
logger.warning(f"Prefixed cost key '{cost_key}' not found in any known metadata (capabilities, quality, speed).")
_prefixed_name_cache[cost_key] = None
return None
# Look up the short name in the pre-calculated map
prefixed_name = short_to_prefixed_map.get(cost_key)
if prefixed_name is not None: # Found unique mapping
_prefixed_name_cache[cost_key] = prefixed_name
return prefixed_name
elif cost_key in ambiguous_short_names: # Known ambiguous name
logger.warning(f"Cannot resolve ambiguous short name '{cost_key}'. Please use the full 'provider/model_name' identifier.")
_prefixed_name_cache[cost_key] = None
return None
else: # Short name not found in any metadata
logger.warning(f"Short name cost key '{cost_key}' not found in any known model metadata. Cannot determine provider/full name.")
_prefixed_name_cache[cost_key] = None
return None
# --- End Pre-calculation ---
# Use a simple placeholder text based on length for cost estimation
sample_text = "a" * expected_input_length
required_capabilities = required_capabilities or []
# Rough estimate for output length if not provided
if expected_output_length is None:
# Adjust this heuristic as needed (e.g., summarization shortens, generation might lengthen)
estimated_output_length_chars = expected_input_length // 4
else:
estimated_output_length_chars = expected_output_length
# Estimate max_tokens based on character length (very rough)
estimated_max_tokens = estimated_output_length_chars // 3
candidate_models_data = []
excluded_models_reasons = {}
all_cost_keys = list(COST_PER_MILLION_TOKENS.keys())
async def evaluate_model(cost_key: str):
# 1. Find prefixed name
prefixed_model_name = _get_prefixed_name_for_cost_key(cost_key)
if not prefixed_model_name:
excluded_models_reasons[cost_key] = "Could not reliably determine provider/full name for metadata lookup."
return
# 2. Check capabilities
capabilities = model_capabilities.get(prefixed_model_name, [])
missing_caps = [cap for cap in required_capabilities if cap not in capabilities]
if missing_caps:
excluded_models_reasons[prefixed_model_name] = f"Missing required capabilities: {missing_caps}"
return
# 3. Estimate cost
try:
cost_estimate = await estimate_cost(
prompt=sample_text,
model=cost_key, # Use the key from COST_PER_MILLION_TOKENS
max_tokens=estimated_max_tokens,
include_output=True
)
estimated_cost_value = cost_estimate["cost"]
except ToolError as e:
excluded_models_reasons[prefixed_model_name] = f"Cost estimation failed: {e.detail}"
return
except Exception as e:
logger.error(f"Unexpected error estimating cost for {cost_key} (prefixed: {prefixed_model_name}) in recommendation: {e}", exc_info=True)
excluded_models_reasons[prefixed_model_name] = f"Cost estimation failed unexpectedly: {str(e)}"
return
# 4. Check max cost constraint
if max_cost is not None and estimated_cost_value > max_cost:
excluded_models_reasons[prefixed_model_name] = f"Exceeds max cost (${estimated_cost_value:.6f} > ${max_cost:.6f})"
return
# --- 5. Get Measured Speed (Tokens/Second) ---
measured_tps = 0.0 # Default to 0.0 if no data
speed_source = "unavailable"
measured_data = measured_speeds.get(prefixed_model_name) or measured_speeds.get(cost_key)
if measured_data and isinstance(measured_data, dict) and "error" not in measured_data:
tokens_per_sec = measured_data.get("output_tokens_per_second")
if tokens_per_sec is not None and isinstance(tokens_per_sec, (int, float)) and tokens_per_sec >= 0:
measured_tps = float(tokens_per_sec)
speed_source = f"measured ({measured_tps:.1f} t/s)"
else:
speed_source = "no t/s in measurement"
elif measured_data and "error" in measured_data:
speed_source = "measurement error"
logger.debug(f"Speed for {prefixed_model_name}: {measured_tps:.1f} t/s (Source: {speed_source})")
# --- End Get Measured Speed ---
# 6. Gather data for scoring
candidate_models_data.append({
"model": prefixed_model_name,
"cost_key": cost_key,
"cost": estimated_cost_value,
"quality": model_quality.get(prefixed_model_name, 5),
"measured_speed_tps": measured_tps, # Store raw TPS
"capabilities": capabilities,
"speed_source": speed_source # Store source for potential debugging/output
})
# Evaluate all models
await asyncio.gather(*(evaluate_model(key) for key in all_cost_keys))
# --- Scoring Logic (Updated for raw TPS) ---
def calculate_score(model_data, min_cost, cost_range, min_tps, tps_range):
cost = model_data['cost']
quality = model_data['quality']
measured_tps = model_data['measured_speed_tps']
# Normalize cost (1 is cheapest, 0 is most expensive)
norm_cost_score = 1.0 - ((cost - min_cost) / cost_range) if cost_range > 0 else 1.0
# Normalize quality (scale 1-10)
norm_quality_score = quality / 10.0
# Normalize speed (measured TPS - higher is better)
# (1 is fastest, 0 is slowest/0)
norm_speed_score_tps = (measured_tps - min_tps) / tps_range if tps_range > 0 else 0.0
# Calculate final score based on priority
if priority == "cost":
# Lower weight for speed if using TPS, as cost is main driver
score = norm_cost_score * 0.7 + norm_quality_score * 0.2 + norm_speed_score_tps * 0.1
elif priority == "quality":
score = norm_cost_score * 0.15 + norm_quality_score * 0.7 + norm_speed_score_tps * 0.15
elif priority == "speed":
score = norm_cost_score * 0.1 + norm_quality_score * 0.2 + norm_speed_score_tps * 0.7
else: # balanced
score = norm_cost_score * 0.34 + norm_quality_score * 0.33 + norm_speed_score_tps * 0.33
return score
# --- End Scoring Logic ---
# Calculate scores for all candidates
if not candidate_models_data:
logger.warning("No candidate models found after filtering.")
else:
# Get min/max for normalization *before* scoring loop
all_costs = [m['cost'] for m in candidate_models_data if m['cost'] > 0]
min_cost = min(all_costs) if all_costs else 0.000001
max_cost_found = max(all_costs) if all_costs else 0.000001
cost_range = max_cost_found - min_cost
all_tps = [m['measured_speed_tps'] for m in candidate_models_data]
min_tps = min(all_tps) if all_tps else 0.0
max_tps_found = max(all_tps) if all_tps else 0.0
tps_range = max_tps_found - min_tps
for model_data in candidate_models_data:
# Pass normalization ranges to scoring function
model_data['score'] = calculate_score(model_data, min_cost, cost_range, min_tps, tps_range)
# Sort candidates by score (highest first)
sorted_candidates = sorted(candidate_models_data, key=lambda x: x.get('score', 0), reverse=True)
# Format recommendations
recommendations_list = []
if candidate_models_data:
# Get min/max across candidates *after* filtering
min_candidate_cost = min(m['cost'] for m in candidate_models_data)
max_candidate_quality = max(m['quality'] for m in candidate_models_data)
max_candidate_tps = max(m['measured_speed_tps'] for m in candidate_models_data)
for cand in sorted_candidates:
reason = f"High overall score ({cand['score']:.2f}) according to '{priority}' priority."
# Adjust reason phrasing for TPS
if priority == 'cost' and cand['cost'] <= min_candidate_cost:
reason = f"Lowest estimated cost (${cand['cost']:.6f}) and meets requirements."
elif priority == 'quality' and cand['quality'] >= max_candidate_quality:
reason = f"Highest quality score ({cand['quality']}/10) and meets requirements."
elif priority == 'speed' and cand['measured_speed_tps'] >= max_candidate_tps:
reason = f"Fastest measured speed ({cand['measured_speed_tps']:.1f} t/s) and meets requirements."
recommendations_list.append({
"model": cand['model'],
"estimated_cost": cand['cost'],
"quality_score": cand['quality'],
"measured_speed_tps": cand['measured_speed_tps'], # Add raw TPS
"capabilities": cand['capabilities'],
"reason": reason
})
logger.info(f"Recommended models (priority: {priority}): {[r['model'] for r in recommendations_list]}")
return {
"recommendations": recommendations_list,
"parameters": { # Include input parameters for context
"task_type": task_type,
"expected_input_length": expected_input_length,
"expected_output_length": estimated_output_length_chars,
"required_capabilities": required_capabilities,
"max_cost": max_cost,
"priority": priority
},
"excluded_models": excluded_models_reasons
}
@with_tool_metrics
@with_error_handling
async def execute_optimized_workflow(
documents: Optional[List[str]] = None, # Make documents optional, workflow might not need them
workflow: List[Dict[str, Any]] = None, # Require workflow definition
max_concurrency: int = 5
) -> Dict[str, Any]:
"""Executes a predefined workflow consisting of multiple tool calls.
Processes a list of documents (optional) through a sequence of stages defined in the workflow.
Handles dependencies between stages (output of one stage as input to another) and allows
for concurrent execution of independent stages or document processing within stages.
Args:
documents: (Optional) A list of input document strings. Required if the workflow references
'documents' as input for any stage.
workflow: A list of dictionaries, where each dictionary defines a stage (a tool call).
Required keys per stage:
- `stage_id`: A unique identifier for this stage (e.g., "summarize_chunks").
- `tool_name`: The name of the tool function to call (e.g., "summarize_document").
- `params`: A dictionary of parameters to pass to the tool function.
Parameter values can be literal values (strings, numbers, lists) or references
to outputs from previous stages using the format `"${stage_id}.output_key"`
(e.g., `{"text": "${chunk_stage}.chunks"}`).
Special inputs: `"${documents}"` refers to the input `documents` list.
Optional keys per stage:
- `depends_on`: A list of `stage_id`s that must complete before this stage starts.
- `iterate_on`: The key from a previous stage's output list over which this stage
should iterate (e.g., `"${chunk_stage}.chunks"`). The tool will be
called once for each item in the list.
- `optimization_hints`: (Future use) Hints for model selection or cost saving for this stage.
max_concurrency: (Optional) The maximum number of concurrent tasks (tool calls) to run.
Defaults to 5.
Returns:
A dictionary containing the results of all successful workflow stages:
{
"success": true,
"results": {
"chunk_stage": { "output": { "chunks": ["chunk1...", "chunk2..."] } },
"summarize_chunks": { # Example of an iterated stage
"output": [
{ "summary": "Summary of chunk 1..." },
{ "summary": "Summary of chunk 2..." }
]
},
"final_summary": { "output": { "summary": "Overall summary..." } }
},
"status": "Workflow completed successfully.",
"total_processing_time": 15.8
}
or an error dictionary if the workflow fails:
{
"success": false,
"results": { ... }, # Results up to the point of failure
"status": "Workflow failed at stage 'stage_id'.",
"error": "Error details from the failed stage...",
"total_processing_time": 8.2
}
Raises:
ToolInputError: If the workflow definition is invalid (missing keys, bad references,
circular dependencies - basic checks).
ToolError: If a tool call within the workflow fails.
Exception: For unexpected errors during workflow execution.
"""
start_time = time.time()
if not workflow or not isinstance(workflow, list):
raise ToolInputError("'workflow' must be a non-empty list of stage dictionaries.")
# --- Tool Mapping --- (Dynamically import or map tool names to functions)
# Ensure all tools listed in workflows are mapped here correctly.
try:
api_meta_tool = None # Placeholder - this needs to be the actual instance
if api_meta_tool: # Only add if instance is available
meta_api_tools = {
"register_api": api_meta_tool.register_api,
"list_registered_apis": api_meta_tool.list_registered_apis,
"get_api_details": api_meta_tool.get_api_details,
"unregister_api": api_meta_tool.unregister_api,
"call_dynamic_tool": api_meta_tool.call_dynamic_tool,
"refresh_api": api_meta_tool.refresh_api,
"get_tool_details": api_meta_tool.get_tool_details,
"list_available_tools": api_meta_tool.list_available_tools,
}
else:
logger.warning("APIMetaTool instance not available in execute_optimized_workflow. Meta API tools will not be callable in workflows.")
meta_api_tools = {}
except ImportError:
logger.warning("APIMetaTool not found (meta_api_tool.py). Meta API tools cannot be used in workflows.")
meta_api_tools = {}
# Import extract_entity_graph lazily to avoid circular imports
try:
from ultimate_mcp_server.tools.entity_relation_graph import extract_entity_graph
except ImportError:
logger.warning("entity_relation_graph module not found. extract_entity_graph will not be available in workflows.")
extract_entity_graph = None
tool_functions = {
# Core Gateway Tools
"estimate_cost": estimate_cost,
"compare_models": compare_models,
"recommend_model": recommend_model,
"chat_completion": chat_completion,
"chunk_document": chunk_document,
"summarize_document": summarize_document,
"extract_json": extract_json,
# Add extract_entity_graph conditionally
**({"extract_entity_graph": extract_entity_graph} if extract_entity_graph else {}),
# RAG Tools
"create_knowledge_base": create_knowledge_base,
"add_documents": add_documents,
"retrieve_context": retrieve_context,
"generate_with_rag": generate_with_rag,
# Classification tools
"text_classification": text_classification,
# Merge Meta API tools
**meta_api_tools,
# Add other tools as needed...
}
# --- Advanced Workflow Validation Using NetworkX ---
# Build directed graph from workflow
dag = nx.DiGraph()
# Add all stages as nodes
for i, stage in enumerate(workflow):
# Validate required keys
if not all(k in stage for k in ["stage_id", "tool_name", "params"]):
raise ToolInputError(f"Workflow stage {i} missing required keys (stage_id, tool_name, params).")
stage_id = stage["stage_id"]
# Validate params is a dictionary
if not isinstance(stage["params"], dict):
raise ToolInputError(f"Stage '{stage_id}' params must be a dictionary.")
# Check for duplicate stage IDs
if stage_id in dag:
raise ToolInputError(f"Duplicate stage_id found: '{stage_id}'.")
# Validate tool exists
tool_name = stage["tool_name"]
if tool_name not in tool_functions:
raise ToolInputError(f"Unknown tool '{tool_name}' specified in stage '{stage_id}'.")
# Validate depends_on is a list
depends_on = stage.get("depends_on", [])
if not isinstance(depends_on, list):
raise ToolInputError(f"Stage '{stage_id}' depends_on must be a list.")
# Add node with full stage data
dag.add_node(stage_id, stage=stage)
# Add dependency edges
for stage in workflow:
stage_id = stage["stage_id"]
depends_on = stage.get("depends_on", [])
for dep_id in depends_on:
if dep_id not in dag:
raise ToolInputError(f"Stage '{stage_id}' depends on non-existent stage '{dep_id}'.")
dag.add_edge(dep_id, stage_id)
# Detect circular dependencies
try:
cycles = list(nx.simple_cycles(dag))
if cycles:
cycle_str = " -> ".join(cycles[0]) + " -> " + cycles[0][0]
raise ToolInputError(f"Circular dependency detected in workflow: {cycle_str}")
except nx.NetworkXNoCycle:
# No cycles found, this is good
pass
# Dictionary to store results of each stage
stage_results: Dict[str, Any] = {}
# Set to keep track of completed stages
completed_stages: Set[str] = set()
# Dictionary to hold active tasks
active_tasks: Dict[str, asyncio.Task] = {} # noqa: F841
# Semaphore to control concurrency
concurrency_semaphore = asyncio.Semaphore(max_concurrency)
# --- Workflow Execution Logic with NetworkX ---
async def execute_stage(stage_id: str) -> None:
"""Execute a single workflow stage."""
async with concurrency_semaphore:
# Get stage definition
stage = dag.nodes[stage_id]["stage"]
tool_name = stage["tool_name"]
params = stage["params"]
iterate_on_ref = stage.get("iterate_on")
logger.info(f"Starting workflow stage '{stage_id}' (Tool: {tool_name})")
tool_func = tool_functions[tool_name]
try:
# Resolve parameters and handle iteration
resolved_params, is_iteration, iteration_list = _resolve_params(
stage_id, params, iterate_on_ref, stage_results, documents
)
# Execute tool function(s)
if is_iteration:
# Handle iteration case
iteration_tasks = []
for i, item in enumerate(iteration_list):
# Create a new semaphore release for each iteration to allow other stages to run
# while keeping track of total concurrency
async def run_iteration(item_idx, item_value):
async with concurrency_semaphore:
iter_params = _inject_iteration_item(resolved_params, item_value)
try:
result = await tool_func(**iter_params)
return result
except Exception as e:
# Capture exception details for individual iteration
error_msg = f"Iteration {item_idx} failed: {type(e).__name__}: {str(e)}"
logger.error(error_msg, exc_info=True)
raise # Re-raise to be caught by gather
task = asyncio.create_task(run_iteration(i, item))
iteration_tasks.append(task)
# Gather all iteration results (may raise if any iteration fails)
results = await asyncio.gather(*iteration_tasks)
stage_results[stage_id] = {"output": results}
else:
# Single execution case
result = await tool_func(**resolved_params)
stage_results[stage_id] = {"output": result}
# Mark stage as completed
completed_stages.add(stage_id)
logger.info(f"Workflow stage '{stage_id}' completed successfully")
except Exception as e:
error_msg = f"Workflow failed at stage '{stage_id}'. Error: {type(e).__name__}: {str(e)}"
logger.error(error_msg, exc_info=True)
stage_results[stage_id] = {
"error": error_msg,
"traceback": traceback.format_exc()
}
# Re-raise to signal failure to main execution loop
raise
async def execute_dag() -> Dict[str, Any]:
"""Execute the entire workflow DAG with proper dependency handling."""
try:
# Start with a topological sort to get execution order respecting dependencies
try:
execution_order = list(nx.topological_sort(dag))
logger.debug(f"Workflow execution order (respecting dependencies): {execution_order}")
except nx.NetworkXUnfeasible as e:
# Should never happen as we already checked for cycles
raise ToolInputError("Workflow contains circular dependencies that were not detected earlier.") from e
# Process stages in waves of parallelizable tasks
while len(completed_stages) < len(dag):
# Find stages ready to execute (all dependencies satisfied)
ready_stages = [
stage_id for stage_id in execution_order
if (stage_id not in completed_stages and
all(pred in completed_stages for pred in dag.predecessors(stage_id)))
]
if not ready_stages:
if len(completed_stages) < len(dag):
# This should never happen with a valid DAG that was topologically sorted
unfinished = set(execution_order) - completed_stages
logger.error(f"Workflow execution stalled. Unfinished stages: {unfinished}")
raise ToolError("Workflow execution stalled due to unresolvable dependencies.")
break
# Launch tasks for all ready stages
tasks = [execute_stage(stage_id) for stage_id in ready_stages]
# Wait for all tasks to complete or for the first error
try:
await asyncio.gather(*tasks)
except Exception as e:
# Any stage failure will be caught here
# The specific error details are already in stage_results
logger.error(f"Workflow wave execution failed: {str(e)}")
# Find the first failed stage for error reporting
failed_stage = next(
(s for s in ready_stages if s in stage_results and "error" in stage_results[s]),
ready_stages[0] # Fallback if we can't identify the specific failed stage
)
error_info = stage_results.get(failed_stage, {}).get("error", f"Unknown error in stage '{failed_stage}'")
return {
"success": False,
"results": stage_results,
"status": f"Workflow failed at stage '{failed_stage}'.",
"error": error_info,
"total_processing_time": time.time() - start_time
}
# If we reach here, all stages in this wave completed successfully
# All stages completed successfully
return {
"success": True,
"results": stage_results,
"status": "Workflow completed successfully.",
"total_processing_time": time.time() - start_time
}
except Exception as e:
# Catch any unexpected errors in the main execution loop
error_msg = f"Unexpected error in workflow execution: {type(e).__name__}: {str(e)}"
logger.error(error_msg, exc_info=True)
return {
"success": False,
"results": stage_results,
"status": "Workflow failed with an unexpected error.",
"error": error_msg,
"total_processing_time": time.time() - start_time
}
# Execute the workflow DAG
result = await execute_dag()
total_time = time.time() - start_time
if result["success"]:
logger.info(f"Workflow completed successfully in {total_time:.2f}s")
else:
logger.error(f"Workflow failed after {total_time:.2f}s: {result.get('error', 'Unknown error')}")
return result
# --- Helper functions for workflow execution ---
# These need careful implementation for robustness
def _resolve_params(stage_id: str, params: Dict, iterate_on_ref: Optional[str], stage_results: Dict, documents: Optional[List[str]]) -> tuple[Dict, bool, Optional[List]]:
"""Resolves parameter values, handling references and iteration.
Returns resolved_params, is_iteration, iteration_list.
Raises ValueError on resolution errors.
"""
resolved = {}
is_iteration = False
iteration_list = None
iteration_param_name = None
# Check for iteration first
if iterate_on_ref:
if not iterate_on_ref.startswith("${") or not iterate_on_ref.endswith("}"):
raise ValueError(f"Invalid iterate_on reference format: '{iterate_on_ref}'")
ref_key = iterate_on_ref[2:-1]
if ref_key == "documents":
if documents is None:
raise ValueError(f"Stage '{stage_id}' iterates on documents, but no documents were provided.")
iteration_list = documents
else:
dep_stage_id, output_key = _parse_ref(ref_key)
if dep_stage_id not in stage_results or "output" not in stage_results[dep_stage_id]:
raise ValueError(f"Dependency '{dep_stage_id}' for iteration not found or failed.")
dep_output = stage_results[dep_stage_id]["output"]
if not isinstance(dep_output, dict) or output_key not in dep_output:
raise ValueError(f"Output key '{output_key}' not found in dependency '{dep_stage_id}' for iteration.")
iteration_list = dep_output[output_key]
if not isinstance(iteration_list, list):
raise ValueError(f"Iteration target '{ref_key}' is not a list.")
is_iteration = True
# We still resolve other params, the iteration item is injected later
logger.debug(f"Stage '{stage_id}' will iterate over {len(iteration_list)} items from '{iterate_on_ref}'")
# Resolve individual parameters
for key, value in params.items():
if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
ref_key = value[2:-1]
if ref_key == "documents":
if documents is None:
raise ValueError(f"Parameter '{key}' references documents, but no documents provided.")
resolved[key] = documents
else:
dep_stage_id, output_key = _parse_ref(ref_key)
if dep_stage_id not in stage_results or "output" not in stage_results[dep_stage_id]:
raise ValueError(f"Dependency '{dep_stage_id}' for parameter '{key}' not found or failed.")
dep_output = stage_results[dep_stage_id]["output"]
# Handle potential nested keys in output_key later if needed
if not isinstance(dep_output, dict) or output_key not in dep_output:
raise ValueError(f"Output key '{output_key}' not found in dependency '{dep_stage_id}' for parameter '{key}'. Available keys: {list(dep_output.keys()) if isinstance(dep_output, dict) else 'N/A'}")
resolved[key] = dep_output[output_key]
# If this resolved param is the one we iterate on, store its name
if is_iteration and iterate_on_ref == value:
iteration_param_name = key
else:
resolved[key] = value # Literal value
# Validation: If iterating, one parameter must match the iterate_on reference
if is_iteration and iteration_param_name is None:
# This means iterate_on pointed to something not used directly as a param value
# We need a convention here, e.g., assume the tool takes a list or find the param name
# For now, let's assume the tool expects the *list* if iterate_on isn't directly a param value.
# This might need refinement based on tool behavior. A clearer workflow definition could help.
# Alternative: Raise error if iterate_on target isn't explicitly mapped to a param.
# logger.warning(f"Iteration target '{iterate_on_ref}' not directly mapped to a parameter in stage '{stage_id}'. Tool must handle list input.")
# Let's require the iteration target to be mapped for clarity:
raise ValueError(f"Iteration target '{iterate_on_ref}' must correspond to a parameter value in stage '{stage_id}'.")
# Remove the iteration parameter itself from the base resolved params if iterating
# It will be injected per-item later
if is_iteration and iteration_param_name in resolved:
del resolved[iteration_param_name]
resolved["_iteration_param_name"] = iteration_param_name # Store the name for injection
return resolved, is_iteration, iteration_list
def _parse_ref(ref_key: str) -> tuple[str, str]:
"""Parses a reference like 'stage_id.output_key'"""
parts = ref_key.split('.', 1)
if len(parts) != 2:
raise ValueError(f"Invalid reference format: '{ref_key}'. Expected 'stage_id.output_key'.")
return parts[0], parts[1]
def _inject_iteration_item(base_params: Dict, item: Any) -> Dict:
"""Injects the current iteration item into the parameter dict."""
injected_params = base_params.copy()
iter_param_name = injected_params.pop("_iteration_param_name", None)
if iter_param_name:
injected_params[iter_param_name] = item
else:
# This case should be prevented by validation in _resolve_params
logger.error("Cannot inject iteration item: Iteration parameter name not found in resolved params.")
# Handle error appropriately, maybe raise
return injected_params
async def _gather_iteration_results(stage_id: str, tasks: List[asyncio.Task]) -> List[Any]:
"""Gathers results from iteration sub-tasks. Raises exception if any sub-task failed."""
results = []
try:
raw_results = await asyncio.gather(*tasks)
# Assume each task returns the direct output dictionary
results = list(raw_results) # gather preserves order
logger.debug(f"Iteration stage '{stage_id}' completed with {len(results)} results.")
return results
except Exception:
# If any sub-task failed, gather will raise the first exception
logger.error(f"Iteration stage '{stage_id}' failed: One or more sub-tasks raised an error.", exc_info=True)
# Cancel any remaining tasks in this iteration group if needed (gather might do this)
for task in tasks:
if not task.done():
task.cancel()
raise # Re-raise the exception to fail the main workflow stage
```
--------------------------------------------------------------------------------
/examples/research_workflow_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
Advanced Research Assistant Workflow Demo
This script demonstrates a realistic research workflow using the DAG-based
workflow execution system. It processes research documents through multiple
analysis stages and produces visualizations of the results.
"""
import asyncio
import os
import sys
from collections import namedtuple # Import namedtuple
# Add the project root to path so we can import ultimate_mcp_server
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from rich.console import Console
from rich.layout import Layout
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table
from rich.tree import Tree
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.tools.optimization import execute_optimized_workflow
from ultimate_mcp_server.utils import get_logger # Import get_logger
from ultimate_mcp_server.utils.display import CostTracker # Import CostTracker
# Initialize rich console
console = Console()
# Initialize logger here so it's available in main()
logger = get_logger("example.research_workflow")
# Create a simple structure for cost tracking from dict (tokens might be missing)
TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])
# Sample research documents
SAMPLE_DOCS = [
"""
# The Impact of Climate Change on Coastal Communities: A Multi-Regional Analysis
## Abstract
This comprehensive study examines the cascading effects of climate change on 570+ coastal cities globally, with projections extending to 2050. Using data from the IPCC AR6 report and economic models from the World Bank (2021), we identify adaptation costs exceeding $71 billion annually. The research incorporates satellite data from NASA's GRACE mission and economic vulnerability indices developed by Stern et al. (2019) to assess regional disparities.
## Vulnerable Regions and Economic Impact Assessment
### 1. Southeast Asia
The Mekong Delta region, home to 17 million people, faces submersion threats to 38% of its landmass by 2050. Ho Chi Minh City has invested $1.42 billion in flood prevention infrastructure, while Bangkok's $2.3 billion flood management system remains partially implemented. The Asian Development Bank (ADB) estimates adaptation costs will reach $5.7 billion annually for Vietnam alone.
### 2. Pacific Islands
Kiribati, Tuvalu, and the Marshall Islands face existential threats, with projected displacement of 25-35% of their populations by 2050 according to UN estimates. Australia's "Pacific Resilience Fund" ($2.1 billion) supports adaptation, but President Maamau of Kiribati has criticized its scope as "drastically insufficient." The 2022 Wellington Accords established migration pathways for climate refugees, though implementation remains fragmented.
### 3. North American Coastal Zones
Miami-Dade County's $6 billion "Rising Above" initiative represents the largest municipal climate adaptation budget in North America. The U.S. Army Corps of Engineers projects that without intervention, coastal erosion will affect 31% of Florida's beaches by 2040. Economic models by Greenstone and Carleton (2020) indicate property devaluation between $15-27 billion in Florida alone.
## Adaptation Strategies and Cost-Benefit Analysis
### Infrastructure Hardening
The Netherlands' Room for the River program ($2.6 billion) has demonstrated 300% ROI through prevented flood damage. Conversely, New Orleans' post-Katrina $14.5 billion levee system upgrades show more modest returns (130% ROI) due to maintenance requirements and subsidence issues highlighted by Professor Sarah Jenkins (MIT).
### Managed Retreat
Indonesia's capital relocation from Jakarta to Borneo (est. cost $34 billion) represents the largest planned managed retreat globally. Smaller programs in Alaska (Newtok and Shishmaref villages) provide case studies with per-capita costs exceeding $380,000. Dr. Robert Chen's longitudinal studies show significant social cohesion challenges, with 47% of relocated communities reporting decreased quality-of-life metrics despite improved safety.
### Ecosystem-Based Approaches
Vietnam's mangrove restoration initiative ($220 million) reduces storm surge impacts by 20-50% and provides $8-$20 million in annual aquaculture benefits. The Nature Conservancy's coral reef insurance programs in Mexico demonstrate innovative financing mechanisms while providing co-benefits for local tourism economies valued at $320 million annually.
## Cross-Disciplinary Implications
Climate migration pathways identified by the UNHCR will increase urban population pressures in receiving cities, particularly in Manila, Dhaka, and Lagos. Healthcare systems in coastal regions report increasing cases of waterborne diseases (62% increase since 2010) and mental health challenges associated with displacement anxiety as documented by the WHO Southeast Asia regional office.
## References
1. IPCC (2021). AR6 Climate Change 2021: Impacts, Adaptation and Vulnerability
2. Stern, N., et al. (2019). Economic vulnerability indices for coastal communities
3. Asian Development Bank. (2022). Southeast Asia Climate Adaptation Report
4. Greenstone, M., & Carleton, T. (2020). Coastal property value projections 2020-2050
5. Jenkins, S. (2022). Engineering limitations in climate adaptation infrastructure
6. Chen, R. (2021). Social dimensions of community relocation programs
7. World Health Organization. (2021). Climate change health vulnerability assessments
""",
"""
# Renewable Energy Transition: Economic Implications and Policy Frameworks
## Executive Summary
This multi-phase analysis examines the economic transformation accompanying the global renewable energy transition, projecting the creation of 42.3 million new jobs by 2050 while identifying significant regional disparities and transition barriers. Drawing on data from 157 countries, this research provides comprehensive policy recommendations for equitable implementation paths.
## Methodological Framework
Our modeling utilizes a modified integrated assessment model combining economic inputs from the International Energy Agency (IEA), IRENA's Renewable Jobs Tracker database, and McKinsey's Global Energy Perspective 2022. Labor market projections incorporate automation factors derived from Oxford Economics' Workforce Displacement Index, providing more nuanced job creation estimates than previous studies by Zhang et al. (2019).
## Employment Transformation Analysis by Sector
### Solar Photovoltaic Industry
Employment projections indicate 18.7 million jobs by 2045, concentrated in manufacturing (32%), installation (41%), and operations/maintenance (27%). Regional distribution analysis reveals concerning inequities, with China capturing 41% of manufacturing roles while sub-Saharan Africa secures only 2.3% despite having 16% of global solar potential. The Skill Transferability Index suggests 73% of displaced fossil fuel workers could transition to solar with targeted 6-month reskilling programs.
### Wind Energy Sector
Offshore wind development led by Ørsted, Vestas, and General Electric is projected to grow at 24% CAGR through 2035, creating 6.8 million jobs. Supply chain bottlenecks in rare earth elements (particularly neodymium and dysprosium) represent critical vulnerabilities, with 83% of processing controlled by three Chinese companies. Professor Tanaka's analysis suggests price volatilities of 120-350% are possible under geopolitical tensions.
### Energy Storage Revolution
Recent lithium-ferro-phosphate (LFP) battery innovations by CATL have reduced implementation costs by 27% while boosting cycle life by 4,000 cycles. Grid-scale storage installations are projected to grow from 17GW (2022) to 220GW by 2035, employing 5.3 million in manufacturing and installation. The MIT Battery Initiative under Dr. Viswanathan has demonstrated promising alternative chemistries using earth-abundant materials that could further accelerate adoption if commercialized by 2025.
### Hydrogen Economy Emergence
Green hydrogen production costs have declined from $5.70/kg in 2018 to $3.80/kg in 2023, with projected cost parity with natural gas achievable by 2028 according to BloombergNEF. The European Hydrogen Backbone initiative, requiring €43 billion in infrastructure investment, could generate 3.8 million jobs while reducing EU natural gas imports by 30%. Significant technological challenges remain in storage density and transport infrastructure, as highlighted in critical analyses by Professors Wilson and Leibreich.
## Transition Barriers and Regional Disparities
### Financial Constraints
Developing economies face investment gaps of $730 billion annually according to the Climate Policy Initiative's 2022 report. The African Development Bank estimates that 72% of sub-Saharan African energy projects stall at the planning phase due to financing constraints despite IRRs exceeding 11.5%. Innovative financing mechanisms through the Global Climate Fund have mobilized only 23% of pledged capital as of Q1 2023.
### Policy Framework Effectiveness
Cross-jurisdictional analysis of 87 renewable portfolio standards reveals three dominant policy approaches:
1. **Carbon Pricing Mechanisms**: The EU ETS carbon price of €85/ton has driven 16.5% emissions reduction in the power sector, while Canada's escalating carbon price schedule ($170/ton by 2030) provides investment certainty. Econometric modeling by Dr. Elizabeth Warren (LSE) indicates prices must reach €120/ton to fully internalize climate externalities.
2. **Direct Subsidies**: Germany's Energiewende subsidies (€238 billion cumulative) achieved 44% renewable penetration but at high consumer costs. Targeted manufacturing incentives under the U.S. Inflation Reduction Act demonstrate improved cost-efficiency with 3.2x private capital mobilization according to analysis by Resources for the Future.
3. **Phased Transition Approaches**: Denmark's offshore wind cluster development model produced the highest success metrics in our analysis, reducing LCOE by 67% while creating domestic supply chains capturing 82% of economic value. This approach has been partially replicated in Taiwan and Vietnam with similar success indicators.
## Visualized Outcomes Under Various Scenarios
Under an accelerated transition (consistent with 1.5°C warming), global GDP would increase by 2.4% beyond baseline by 2050, while air pollution-related healthcare costs would decline by $780 billion annually. Conversely, our "delayed action" scenario projects stranded fossil assets exceeding $14 trillion, concentrated in 8 petrostate economies, potentially triggering financial contagion comparable to 2008.
## References
1. International Energy Agency. (2022). World Energy Outlook 2022
2. IRENA. (2023). Renewable Energy Jobs Annual Review
3. McKinsey & Company. (2022). Global Energy Perspective
4. Zhang, F., et al. (2019). Employment impacts of renewable expansion
5. Oxford Economics. (2021). Workforce Displacement Index
6. Tanaka, K. (2022). Critical material supply chains in energy transition
7. Viswanathan, V. (2023). Next-generation grid-scale storage technologies
8. BloombergNEF. (2023). Hydrogen Economy Outlook
9. Climate Policy Initiative. (2022). Global Landscape of Climate Finance
10. Warren, E. (2022). Carbon pricing efficiency and distributional impacts
11. Resources for the Future. (2023). IRA Impact Assessment
""",
"""
# Artificial Intelligence Applications in Healthcare Diagnostics: Implementation Challenges and Economic Analysis
## Abstract
This comprehensive evaluation examines the integration of artificial intelligence into clinical diagnostic workflows, with particular focus on deep learning systems demonstrating 94.2% accuracy in early-stage cancer detection across 14 cancer types. The analysis spans technical validation, implementation barriers, regulatory frameworks, and economic implications based on data from 137 healthcare systems across 42 countries.
## Technological Capabilities Assessment
### Diagnostic Performance Metrics
Google Health's melanoma detection algorithm demonstrated sensitivity of 95.3% and specificity of 92.7% in prospective trials, exceeding dermatologist accuracy by 18 percentage points with consistent performance across Fitzpatrick skin types I-VI. This represents significant improvement over earlier algorithms criticized for performance disparities across demographic groups as documented by Dr. Abigail Johnson in JAMA Dermatology (2021).
The Mayo Clinic's AI-enhanced colonoscopy system increased adenoma detection rates from 30% to 47% in their 2022 clinical implementation study (n=3,812). This translates to approximately 68 prevented colorectal cancer cases per 1,000 screened patients according to the predictive model developed by Dr. Singh at Memorial Sloan Kettering.
Stanford Medicine's deep learning algorithm for chest radiograph interpretation identified 14 pathological conditions with average AUC of 0.91, reducing false negative rates for subtle pneumothorax by 43% and pulmonary nodules by 29% in their multi-center validation study across five hospital systems with diverse patient populations.
### Architectural Innovations
Recent advancements in foundation models have transformed medical AI capabilities:
1. **Multi-modal integration**: Microsoft/Nuance's DAX system combines speech recognition, natural language processing, and computer vision, enabling real-time clinical documentation with 96.4% accuracy while reducing physician documentation time by 78 minutes daily according to their 16-site implementation study published in Health Affairs.
2. **Explainable AI approaches**: PathAI's interpretable convolutional neural networks provide visualization of decision-making factors in histopathology, addressing the "black box" concern highlighted by regulatory agencies. Their GradCAM implementation allows pathologists to review the specific cellular features informing algorithmic conclusions, increasing adoption willingness by 67% in surveyed practitioners (n=245).
3. **Federated learning**: The MELLODDY consortium's federated approach enables algorithm training across 10 pharmaceutical companies' proprietary datasets without data sharing, demonstrating how privacy-preserving computation can accelerate biomarker discovery. This approach increased available training data by 720% while maintaining data sovereignty.
## Implementation Challenges
### Clinical Workflow Integration
Field studies at Massachusetts General Hospital identified five critical integration failure points that reduce AI effectiveness by 30-70% compared to validation performance:
1. Alert fatigue – 52% of clinical recommendations were dismissed when AI systems generated more than 8 alerts per hour
2. Workflow disruption – Systems requiring more than 15 seconds of additional process time saw 68% lower adoption
3. Interface design issues – Poorly designed UI elements reduced effective utilization by 47%
4. Confirmation bias – Clinicians were 3.4× more likely to accept AI suggestions matching their preliminary conclusion
5. Trust calibration – 64% of clinicians struggled to appropriately weight algorithmic recommendations against their clinical judgment
The Cleveland Clinic's "AI Integration Framework" addresses these challenges through graduated autonomy, contextual presentation, and embedded calibration metrics, increasing sustained adoption rates to 84% compared to the industry average of 31%.
### Data Infrastructure Requirements
Analysis of implementation failures reveals data architecture as the primary barrier in 68% of stalled healthcare AI initiatives. Specific challenges include:
- Legacy system integration – 73% of U.S. hospitals utilize EHR systems with insufficient API capabilities for real-time AI integration
- Data standardization – Only 12% of clinical data meets FHIR standards without requiring significant transformation
- Computational infrastructure – 57% of healthcare systems lack edge computing capabilities necessary for low-latency applications
Kaiser Permanente's successful enterprise-wide implementation demonstrates a viable pathway through their "data fabric" architecture connecting 39 hospitals while maintaining HIPAA compliance. Their staged implementation required $43 million in infrastructure investment but delivered $126 million in annual efficiency gains by year three.
### Training Requirements for Medical Personnel
Harvard Medical School's "Technology Integration in Medicine" study identified critical competency gaps among practitioners:
- Only 17% of physicians could correctly interpret AI-generated confidence intervals
- 73% overestimated algorithm capabilities in transfer scenarios
- 81% lacked understanding of common algorithmic biases
The American Medical Association's AI curriculum module has demonstrated 82% improvement in AI literacy metrics but has reached only a fraction of practitioners. Training economics present a significant barrier, with health systems reporting that comprehensive AI education requires 18-24 hours per clinician at an average opportunity cost of $5,800.
## Economic and Policy Dimensions
### Cost-Benefit Model
Our economic modeling based on Medicare claims data projects net healthcare savings of $36.7 billion annually when AI diagnostic systems reach 65% market penetration. These savings derive from:
- Earlier cancer detection: $14.3 billion through stage migration
- Reduced diagnostic errors: $9.8 billion in avoided misdiagnosis costs
- Workflow efficiency: $6.2 billion in provider time optimization
- Avoided unnecessary procedures: $6.4 billion by reducing false positives
Implementation costs average $175,000-$390,000 per facility with 3.1-year average payback periods. Rural and critical access hospitals face disproportionately longer ROI timelines (5.7 years), exacerbating healthcare disparities.
### Regulatory Framework Analysis
Comparative analysis of regulatory approaches across jurisdictions reveals critical inconsistencies:
| Jurisdiction | Approval Pathway | Post-Market Requirements | Algorithm Update Handling |
|--------------|------------------|--------------------------|---------------------------|
| FDA (US) | 510(k)/De Novo | Limited continuous monitoring | Predetermined change protocol |
| EMA (EU) | MDR risk-based | PMCF with periodic reporting | Significant modification framework |
| PMDA (Japan) | SAKIGAKE pathway | Mandatory registry participation | Version control system |
| NMPA (China) | Special approval | Real-world data collection | Annual recertification |
The European Medical Device Regulation's requirement for "human oversight of automated systems" creates implementation ambiguities interpreted differently across member states. The FDA's proposed "Predetermined Change Control Plan" offers the most promising framework for AI's iterative improvement nature but remains in draft status.
## Conclusions and Future Directions
AI diagnosis systems demonstrate significant technical capabilities but face complex implementation barriers that transcend technological challenges. Our analysis suggests a "sociotechnical systems approach" is essential, recognizing that successful implementation depends equally on technical performance, clinical workflow integration, organizational change management, and policy frameworks.
The Cleveland Clinic-Mayo Clinic consortium's phased implementation approach, beginning with augmentative rather than autonomous functionality, provides a template for successful adoption. Their experience indicates that progressive automation on a 3-5 year timeline produces superior outcomes compared to transformative implementation approaches.
## References
1. Johnson, A. (2021). Demographic performance disparities in dermatological AI. JAMA Dermatology, 157(2)
2. Mayo Clinic. (2022). AI-enhanced colonoscopy outcomes study. Journal of Gastrointestinal Endoscopy, 95(3)
3. Singh, K. (2021). Predictive modeling of prevented colorectal cancer cases. NEJM, 384
4. Stanford Medicine. (2022). Multi-center validation of deep learning for radiograph interpretation. Radiology, 302(1)
5. Nuance Communications. (2023). DAX system implementation outcomes. Health Affairs, 42(1)
6. PathAI. (2022). Pathologist adoption of explainable AI systems. Modern Pathology, 35
7. MELLODDY Consortium. (2022). Federated learning for pharmaceutical research. Nature Machine Intelligence, 4
8. Massachusetts General Hospital. (2021). Clinical workflow integration failure points for AI. JAMIA, 28(9)
9. Cleveland Clinic. (2023). AI Integration Framework outcomes. Healthcare Innovation, 11(2)
10. American Medical Association. (2022). Physician AI literacy assessment. Journal of Medical Education, 97(6)
11. Centers for Medicare & Medicaid Services. (2023). Healthcare AI economic impact analysis
12. FDA. (2023). Proposed framework for AI/ML-based SaMD. Regulatory Science Forum
""",
"""
# Quantum Computing Applications in Pharmaceutical Discovery: Capabilities, Limitations, and Industry Transformation
## Executive Summary
This analysis evaluates the integration of quantum computing technologies into pharmaceutical R&D workflows, examining current capabilities, near-term applications, and long-term industry transformation potential. Based on benchmarking across 17 pharmaceutical companies and 8 quantum technology providers, we provide a comprehensive assessment of this emerging computational paradigm and its implications for drug discovery economics.
## Current Quantum Computing Capabilities
### Hardware Platforms Assessment
**Superconducting quantum processors** (IBM, Google, Rigetti) currently provide the most mature platform with IBM's 433-qubit Osprey system demonstrating quantum volume of 128 and error rates approaching 10^-3 per gate operation. While impressive relative to 2018 benchmarks, these systems remain limited by coherence times (averaging 114 microseconds) and require operating temperatures near absolute zero, creating significant infrastructure requirements.
**Trapped-ion quantum computers** (IonQ, Quantinuum) offer superior coherence times exceeding 10 seconds and all-to-all connectivity but operate at slower gate speeds. IonQ's 32-qubit system achieved algorithmic qubits (#AQ) of 20, setting a record for effective computational capability when error mitigation is considered. Quantinuum's H-Series demonstrated the first logical qubit with real-time quantum error correction, a significant milestone towards fault-tolerant quantum computing.
**Photonic quantum systems** (Xanadu, PsiQuantum) represent an alternative approach with potentially simpler scaling requirements. Xanadu's Borealis processor demonstrated quantum advantage for specific sampling problems but lacks the gate-based universality required for most pharmaceutical applications. PsiQuantum's fault-tolerant silicon photonic approach continues rapid development with semiconductor manufacturing partner GlobalFoundries but remains pre-commercial.
**Neutral atom platforms** (QuEra, Pasqal) entered commercial accessibility in 2023, offering unprecedented qubit counts (QuEra: 256 atoms) with programmable geometries particularly suited for quantum simulation of molecular systems. Recent demonstrations of 3D atom arrangements provide promising avenues for simulating protein-ligand interactions.
### Quantum Algorithm Development
Pharmaceutical applications currently focus on three quantum algorithm classes:
1. **Variational Quantum Eigensolver (VQE)** algorithms have progressed significantly for molecular ground state energy calculations, with Riverlane's enhanced VQE implementations demonstrating accuracy within 1.5 kcal/mol for molecules up to 20 atoms on IBM's 127-qubit processors. Merck's collaboration with Zapata Computing improved convergence rates by 300% through adaptive ansatz methods.
2. **Quantum Machine Learning (QML)** approaches for binding affinity prediction have shown mixed results. Pfizer's implementation of quantum convolutional neural networks (QCNN) demonstrated a 22% improvement in binding affinity predictions for their kinase inhibitor library, while AstraZeneca's quantum support vector machine approach showed no significant advantage over classical methods for their dataset.
3. **Quantum Annealing** for conformational search remains dominated by D-Wave's 5,000+ qubit systems, with Boehringer Ingelheim reporting successful applications in peptide folding predictions. However, comparisons with enhanced classical methods (particularly those using modern GPUs) show quantum advantage remains elusive for most production cases.
## Pharmaceutical Applications Landscape
### Virtual Screening Transformation
GSK's quantum computing team achieved a significant milestone in 2022 through quantum-classical hybrid algorithms that accelerated screening of 10^7 compounds against novel SARS-CoV-2 targets. Their approach used classical computers for initial filtering followed by quantum evaluation of 10^4 promising candidates, identifying 12 compounds with nanomolar binding affinities subsequently confirmed by experimental assays. While impressive, the computational requirements exceeded $1.2M and required specialized expertise from partners at Quantinuum.
### Molecular Property Prediction
Roche's collaboration with Cambridge Quantum Computing (now Quantinuum) demonstrated quantum advantage for dipole moment calculations in drug-like molecules, achieving accuracy improvements of 16% compared to density functional theory methods while potentially offering asymptotic speedup as molecule size increases. Their hybrid quantum-classical approach requires significantly fewer qubits than full quantum simulation, making it commercially relevant within the NISQ (Noisy Intermediate-Scale Quantum) era of hardware.
### Retrosynthesis Planning
Quantum approaches to synthetic route planning remain largely theoretical with limited experimental validation. MIT-Takeda research demonstrated proof-of-concept for mapping retrosynthesis to quantum walks on Johnson graphs, with preliminary results showing promise for identifying non-obvious synthetic pathways. Commercial application appears distant (5-8 years) given current hardware limitations.
## Economic Implications Analysis
Our economic model quantifies four significant impacts on pharmaceutical R&D:
1. **Preclinical timeline compression**: Currently estimated at 2-5% (0.5-1.3 months) but projected to reach 15-30% by 2030 as quantum hardware capabilities expand, potentially reducing time-to-market by up to 9 months for novel compounds
2. **Candidate quality improvements**: Quantum-enhanced binding affinity and ADMET property predictions demonstrate 7-18% higher success rates in early clinical phases across our analysis of 87 compounds that utilized quantum computational methods in preclinical development
3. **Novel mechanism identification**: Quantum simulation of previously intractable biological targets (particularly intrinsically disordered proteins and complex protein-protein interactions) could expand the druggable proteome by an estimated 8-14% according to our analysis of protein data bank targets
4. **R&D productivity impacts**: A 10% improvement in candidate quality translates to approximately $310M in reduced clinical development costs per approved drug by reducing late-stage failures
## Investment and Adoption Patterns
Pharmaceutical quantum computing investment has accelerated dramatically, with cumulative industry investment growing from $18M (2018) to $597M (2023). Investment strategies fall into three categories:
1. **Direct infrastructure investment** (Roche, Merck): Building internal quantum teams and securing dedicated quantum hardware access
2. **Collaborative research partnerships** (GSK, Biogen, Novartis): Forming multi-year academic and commercial partnerships focused on specific computational challenges
3. **Quantum-as-a-service utilization** (Majority approach): Accessing quantum capabilities through cloud providers with limited internal expertise development
Our analysis of 23 pharmaceutical companies indicates:
- 19% have established dedicated quantum computing teams
- 43% have active research collaborations with quantum providers
- 78% report evaluating quantum capabilities for specific workflows
- 100% express concerns about quantum talent acquisition challenges
## Future Outlook and Strategic Recommendations
The pharmaceutical quantum computing landscape will evolve through three distinct phases:
**Near-term (1-3 years)**: Hybrid quantum-classical algorithms will demonstrate incremental value in specific niches, particularly molecular property calculations and conformational analysis of small to medium-sized molecules. Successful organizations will combine quantum capabilities with enhanced classical methods rather than seeking immediate quantum advantage.
**Mid-term (3-7 years)**: Error-corrected logical qubits will enable more robust quantum chemistry applications with demonstrable advantage for drug discovery workflows. Companies with established quantum capabilities will gain first-mover advantages in applying these technologies to proprietary chemical matter.
**Long-term (7+ years)**: Fault-tolerant quantum computers with thousands of logical qubits could transform pharmaceutical R&D by enabling full quantum mechanical simulation of protein-drug interactions and previously intractable biological systems. This capability could fundamentally alter drug discovery economics by dramatically reducing empirical screening requirements.
## References
1. IBM Quantum. (2023). Osprey processor technical specifications and benchmarking
2. IonQ. (2023). Algorithmic qubit benchmarking methodology and results
3. Quantinuum. (2022). H-Series logical qubit demonstration
4. Xanadu. (2022). Borealis quantum advantage results. Nature Physics, 18
5. QuEra. (2023). Neutral atom quantum processor capabilities. Science, 377
6. Riverlane & Merck. (2022). Enhanced VQE implementations for molecular ground state calculations
7. Pfizer Quantum Team. (2023). QCNN for binding affinity prediction. Journal of Chemical Information and Modeling
8. AstraZeneca. (2022). Comparative analysis of quantum and classical ML methods
9. Boehringer Ingelheim. (2023). Quantum annealing for peptide conformational search
10. GSK Quantum Computing Team. (2022). Quantum-classical hybrid screening against SARS-CoV-2
11. Roche & Cambridge Quantum Computing. (2023). Quantum advantage for dipole moment calculations
12. MIT-Takeda Quantum Research. (2022). Mapping retrosynthesis to quantum walks
13. PhRMA Quantum Computing Working Group. (2023). Pharmaceutical R&D impact analysis
""",
"""
# Neuroplasticity in Cognitive Rehabilitation: Mechanisms, Interventions, and Clinical Applications
## Abstract
This multidisciplinary review synthesizes current understanding of neuroplasticity mechanisms underlying cognitive rehabilitation, evaluating intervention efficacies across five domains of cognitive function following acquired brain injury. Integrating data from 142 clinical studies with advanced neuroimaging findings, we present evidence-based recommendations for clinical practice and identify promising emerging approaches.
## Neurobiological Foundations of Rehabilitation-Induced Plasticity
### Cellular and Molecular Mechanisms
Recent advances in understanding activity-dependent plasticity have revolutionized rehabilitation approaches. The pioneering work of Dr. Alvarez-Buylla at UCSF has demonstrated that even the adult human brain maintains neurogenic capabilities in the hippocampus and subventricular zone, with newly generated neurons integrating into existing neural circuits following injury. Transcriptomic studies by Zhang et al. (2021) identified 37 genes significantly upregulated during rehabilitation-induced recovery, with brain-derived neurotrophic factor (BDNF) and insulin-like growth factor-1 (IGF-1) showing particularly strong associations with positive outcomes.
Post-injury plasticity occurs through multiple complementary mechanisms:
1. **Synaptic remodeling**: Two-photon microscopy studies in animal models reveal extensive dendritic spine turnover within peri-lesional cortex during the first 3-8 weeks post-injury. The pioneering work of Professor Li-Huei Tsai demonstrates that enriched rehabilitation environments increase spine formation rates by 47-68% compared to standard housing conditions.
2. **Network reorganization**: Professor Nicholas Schiff's research at Weill Cornell demonstrates that dormant neural pathways can be functionally recruited following injury through targeted stimulation. Their multimodal imaging studies identified specific thalamocortical circuits that, when engaged through non-invasive stimulation, facilitated motor recovery in 72% of chronic stroke patients previously classified as "plateaued."
3. **Myelination dynamics**: Recent discoveries by Dr. Fields at NIH demonstrate activity-dependent myelination as a previously unrecognized form of neuroplasticity. Diffusion tensor imaging studies by Wang et al. (2022) show significant increases in white matter integrity following intensive cognitive training, correlating with functional improvements (r=0.62, p<0.001).
### Neuroimaging Correlates of Successful Rehabilitation
Longitudinal multimodal neuroimaging studies have identified several biomarkers of successful cognitive rehabilitation:
- **Functional connectivity reorganization**: Using resting-state fMRI, Northoff's laboratory documented that successful attention training in 67 TBI patients correlated with increased connectivity between the dorsolateral prefrontal cortex and posterior parietal regions (change in z-score: 0.43 ± 0.12), while unsuccessful cases showed no significant connectivity changes.
- **Cortical thickness preservation**: Dr. Gabrieli's team at MIT found that cognitive rehabilitation initiated within 30 days of injury preserved cortical thickness in vulnerable regions, with each week of delay associated with 0.8% additional atrophy in domain-relevant cortical regions.
- **Default mode network modulation**: Advanced network analyses by Dr. Marcus Raichle demonstrate that cognitive rehabilitation success correlates with restoration of appropriate task-related deactivation of the default mode network, suggesting intervention effectiveness can be monitored through this biomarker.
## Evidence-Based Intervention Analysis
### Attention and Executive Function Rehabilitation
Our meta-analysis of 42 randomized controlled trials evaluating attention training programs reveals three intervention approaches with significant effect sizes:
1. **Adaptive computerized training** (Hedges' g = 0.68, 95% CI: 0.54-0.82): Programs like Attention Process Training showed transfer to untrained measures when training adapts in real-time to performance. The NYU-Columbia adaptive attention protocol demonstrated maintenance of gains at 18-month follow-up (retention rate: 83%).
2. **Metacognitive strategy training** (Hedges' g = 0.57, 95% CI: 0.41-0.73): The Toronto Hospital's Strategic Training for Executive Control program resulted in significant improvements on ecological measures of executive function. Moderator analyses indicate effectiveness increases when combined with daily strategy implementation exercises (interaction effect: p=0.002).
3. **Neurostimulation-enhanced approaches**: Combined tDCS-cognitive training protocols developed at Harvard demonstrate 37% greater improvement compared to cognitive training alone. Targeting the right inferior frontal gyrus with 2mA anodal stimulation during inhibitory control training shows particular promise for impulsivity reduction (Cohen's d = 0.74).
### Memory Rehabilitation Approaches
Memory intervention effectiveness varies substantially by memory system affected and etiology:
- **Episodic memory**: For medial temporal lobe damage, compensatory approaches using spaced retrieval and errorless learning demonstrate the strongest evidence. Dr. Schacter's laboratory protocol combining elaborative encoding with distributed practice shows a remarkable 247% improvement in functional memory measures compared to intensive rehearsal techniques.
- **Prospective memory**: Implementation intention protocols developed by Professor Gollwitzer show transfer to daily functioning with large effect sizes (d = 0.92) when combined with environmental restructuring. Smartphone-based reminder systems increased medication adherence by 43% in our 12-month community implementation study.
- **Working memory**: Recent controversy surrounding n-back training was addressed in Professor Klingberg's definitive multi-site study demonstrating domain-specific transfer effects. Their adaptive protocol produced sustainable working memory improvements (40% above baseline at 6-month follow-up) when training exceeded 20 hours and incorporated gradually increasing interference control demands.
## Clinical Application Framework
### Precision Rehabilitation Medicine Approach
Our analysis indicates rehabilitation effectiveness increases substantially when protocols are tailored using a precision medicine framework:
1. **Comprehensive neurocognitive phenotyping**: The McGill Cognitive Rehabilitation Battery enables identification of specific processing deficits, allowing intervention targeting. Machine learning analysis of 1,247 patient profiles identified 11 distinct neurocognitive phenotypes that respond differentially to specific interventions.
2. **Biomarker-guided protocol selection**: EEG connectivity measures predicted response to attention training with 76% accuracy in our validation cohort, potentially reducing non-response rates. Professor Knight's laboratory demonstrated that P300 latency specifically predicts processing speed training response (AUC = 0.81).
3. **Adaptive progression algorithms**: Real-time difficulty adjustment based on multiple performance parameters rather than accuracy alone increased transfer effects by 34% compared to standard adaptive approaches. The computational model developed by Stanford's Poldrack laboratory dynamically optimizes challenge levels to maintain engagement while maximizing error-based learning.
### Implementation Science Considerations
Our implementation analysis across 24 rehabilitation facilities identified critical factors for successful cognitive rehabilitation programs:
- **Rehabilitation intensity and timing**: Early intervention (< 6 weeks post-injury) with high intensity (minimum 15 hours/week of direct treatment) demonstrated superior outcomes (NNT = 3.2 for clinically significant improvement).
- **Therapist expertise effects**: Specialized certification in cognitive rehabilitation was associated with 28% larger treatment effects compared to general rehabilitation credentials.
- **Technology augmentation**: Hybrid models combining therapist-directed sessions with home-based digital practice demonstrated optimal cost-effectiveness (ICER = $12,430/QALY) while addressing access barriers.
## Future Directions and Emerging Approaches
Several innovative approaches show promise for enhancing neuroplasticity during cognitive rehabilitation:
1. **Closed-loop neurostimulation**: Dr. Suthana's team at UCLA demonstrated that theta-burst stimulation delivered precisely during specific phases of hippocampal activity enhanced associative memory formation by 37% in patients with mild cognitive impairment.
2. **Pharmacologically augmented rehabilitation**: The RESTORE trial combining daily atomoxetine with executive function training demonstrated synergistic effects (interaction p<0.001) compared to either intervention alone. Professor Feeney's research suggests a critical 30-minute window where noradrenergic enhancement specifically promotes task-relevant plasticity.
3. **Virtual reality cognitive training**: Immersive VR protocols developed at ETH Zurich demonstrated transfer to real-world functioning by simulating ecologically relevant scenarios with graduated difficulty. Their randomized trial showed 3.2× greater functional improvement compared to matched non-immersive training.
4. **Sleep optimization protocols**: The Northwestern sleep-enhanced memory consolidation protocol increased rehabilitation effectiveness by 41% by delivering targeted memory reactivation during slow-wave sleep, suggesting rehabilitation schedules should specifically incorporate sleep architecture considerations.
## Conclusion
Cognitive rehabilitation effectiveness has improved substantially through integration of neuroplasticity principles, advanced technology, and precision intervention approaches. Optimal outcomes occur when interventions target specific neurocognitive mechanisms with sufficient intensity and are tailored to individual patient profiles. Emerging approaches leveraging closed-loop neurotechnology and multimodal enhancement strategies represent promising directions for further advancing rehabilitation outcomes.
## References
1. Alvarez-Buylla, A., & Lim, D. A. (2022). Neurogenesis in the adult human brain following injury
2. Zhang, Y., et al. (2021). Transcriptomic analysis of rehabilitation-responsive genes
3. Tsai, L. H., et al. (2023). Environmental enrichment effects on dendritic spine dynamics
4. Schiff, N. D. (2022). Recruitment of dormant neural pathways following brain injury
5. Fields, R. D. (2021). Activity-dependent myelination as a form of neuroplasticity
6. Wang, X., et al. (2022). White matter integrity changes following cognitive training
7. Northoff, G., et al. (2023). Functional connectivity reorganization during attention training
8. Gabrieli, J. D., et al. (2021). Relationship between intervention timing and cortical preservation
9. Raichle, M. E. (2022). Default mode network dynamics as a biomarker of rehabilitation efficacy
10. NYU-Columbia Collaborative. (2023). Adaptive attention protocol long-term outcomes
11. Schacter, D. L., et al. (2021). Elaborative encoding with distributed practice for episodic memory
12. Gollwitzer, P. M., & Oettingen, G. (2022). Implementation intentions for prospective memory
13. Klingberg, T., et al. (2023). Multi-site study of adaptive working memory training
14. Poldrack, R. A., et al. (2022). Computational models for optimizing learning parameters
15. Suthana, N., et al. (2023). Phase-specific closed-loop stimulation for memory enhancement
16. Feeney, D. M., & Sutton, R. L. (2022). Pharmacological enhancement of rehabilitation
17. ETH Zurich Rehabilitation Engineering Group. (2023). Virtual reality cognitive training
18. Northwestern Memory & Cognition Laboratory. (2022). Sleep-enhanced memory consolidation
"""
]
async def display_workflow_diagram(workflow):
"""Display a visual representation of the workflow DAG."""
console.print("\n[bold cyan]Workflow Execution Plan[/bold cyan]")
# Create a tree representation of the workflow
tree = Tree("[bold yellow]Research Analysis Workflow[/bold yellow]")
# Track dependencies for visualization
dependencies = {}
for stage in workflow:
stage_id = stage["stage_id"]
deps = stage.get("depends_on", [])
for dep in deps:
if dep not in dependencies:
dependencies[dep] = []
dependencies[dep].append(stage_id)
# Add stages without dependencies first (roots)
root_stages = [s for s in workflow if not s.get("depends_on")]
stage_map = {s["stage_id"]: s for s in workflow}
def add_stage_to_tree(parent_tree, stage_id):
stage = stage_map[stage_id]
tool = stage["tool_name"]
node_text = f"[bold green]{stage_id}[/bold green] ([cyan]{tool}[/cyan])"
if "iterate_on" in stage:
node_text += " [italic](iterative)[/italic]"
stage_node = parent_tree.add(node_text)
# Add children (stages that depend on this one)
children = dependencies.get(stage_id, [])
for child in children:
add_stage_to_tree(stage_node, child)
# Build the tree
for root in root_stages:
add_stage_to_tree(tree, root["stage_id"])
# Print the tree
console.print(tree)
# Display additional workflow statistics
table = Table(title="Workflow Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Stages", str(len(workflow)))
table.add_row("Parallel Stages", str(len(root_stages)))
table.add_row("Iterative Stages", str(sum(1 for s in workflow if "iterate_on" in s)))
console.print(table)
async def display_execution_progress(workflow_future):
"""Display a live progress indicator while the workflow executes."""
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
console=console
) as progress:
task = progress.add_task("[yellow]Executing workflow...", total=None)
result = await workflow_future
progress.update(task, completed=True, description="[green]Workflow completed!")
return result
async def visualize_results(results):
"""Create visualizations of the workflow results."""
console.print("\n[bold magenta]Research Analysis Results[/bold magenta]")
# Set up layout
layout = Layout()
layout.split_column(
Layout(name="header"),
Layout(name="statistics"),
Layout(name="summaries"),
Layout(name="extracted_entities"),
)
# Header
layout["header"].update(Panel(
"[bold]Advanced Research Assistant Results[/bold]",
style="blue"
))
# Statistics
stats_table = Table(title="Document Processing Statistics")
stats_table.add_column("Document", style="cyan")
stats_table.add_column("Word Count", style="green")
stats_table.add_column("Entity Count", style="yellow")
try:
chunking_result = results["results"]["chunking_stage"]["output"]
entity_results = results["results"]["entity_extraction_stage"]["output"]
for i, doc_stats in enumerate(chunking_result.get("document_stats", [])):
entity_count = len(entity_results[i].get("entities", []))
stats_table.add_row(
f"Document {i+1}",
str(doc_stats.get("word_count", "N/A")),
str(entity_count)
)
except (KeyError, IndexError) as e:
console.print(f"[red]Error displaying statistics: {e}[/red]")
layout["statistics"].update(stats_table)
# Summaries
summary_panels = []
try:
summaries = results["results"]["summary_stage"]["output"]
for i, summary in enumerate(summaries):
summary_panels.append(Panel(
summary.get("summary", "No summary available"),
title=f"Document {i+1} Summary",
border_style="green"
))
except (KeyError, IndexError) as e:
summary_panels.append(Panel(
f"Error retrieving summaries: {e}",
title="Summary Error",
border_style="red"
))
layout["summaries"].update(summary_panels)
# Extracted entities
try:
final_analysis = results["results"]["final_analysis_stage"]["output"]
json_str = Syntax(
str(final_analysis.get("analysis", "No analysis available")),
"json",
theme="monokai",
line_numbers=True
)
layout["extracted_entities"].update(Panel(
json_str,
title="Final Analysis",
border_style="magenta"
))
except (KeyError, IndexError) as e:
layout["extracted_entities"].update(Panel(
f"Error retrieving final analysis: {e}",
title="Analysis Error",
border_style="red"
))
# Print layout
console.print(layout)
# Display execution time
console.print(
f"\n[bold green]Total workflow execution time:[/bold green] "
f"{results.get('total_processing_time', 0):.2f} seconds"
)
def create_research_workflow():
"""Define a complex research workflow with multiple parallel and sequential stages."""
workflow = [
# Initial document processing stages (run in parallel for all documents)
{
"stage_id": "chunking_stage",
"tool_name": "chunk_document",
"params": {
"text": "${documents}",
"chunk_size": 1000,
"get_stats": True
}
},
# Entity extraction runs in parallel with summarization
{
"stage_id": "entity_extraction_stage",
"tool_name": "extract_entity_graph",
"params": {
"text": "${documents}",
"entity_types": ["organization", "person", "concept", "location", "technology"],
"include_relations": True,
"confidence_threshold": 0.7
}
},
# Summarization stage (iterate over each document)
{
"stage_id": "summary_stage",
"tool_name": "summarize_document",
"params": {
"text": "${documents}",
"max_length": 150,
"focus_on": "key findings and implications"
}
},
# Classification of document topics
{
"stage_id": "classification_stage",
"tool_name": "text_classification",
"depends_on": ["chunking_stage"],
"params": {
"text": "${chunking_stage.document_text}",
"categories": [
"Climate & Environment",
"Technology",
"Healthcare",
"Economy",
"Social Policy",
"Scientific Research"
],
"provider": Provider.OPENAI.value,
"multi_label": True,
"confidence_threshold": 0.6
}
},
# Generate structured insights from entity analysis
{
"stage_id": "entity_insights_stage",
"tool_name": "extract_json",
"depends_on": ["entity_extraction_stage"],
"params": {
"text": "${entity_extraction_stage.text_output}",
"schema": {
"key_entities": "array",
"primary_relationships": "array",
"research_domains": "array"
},
"include_reasoning": True
}
},
# Cost-optimized final analysis
{
"stage_id": "model_selection_stage",
"tool_name": "recommend_model",
"depends_on": ["summary_stage", "classification_stage", "entity_insights_stage"],
"params": {
"task_type": "complex analysis and synthesis",
"expected_input_length": 3000,
"expected_output_length": 1000,
"required_capabilities": ["reasoning", "knowledge"],
"priority": "balanced"
}
},
# Final analysis and synthesis
{
"stage_id": "final_analysis_stage",
"tool_name": "chat_completion",
"depends_on": ["model_selection_stage", "summary_stage", "classification_stage", "entity_insights_stage"],
"params": {
"messages": [
{
"role": "system",
"content": "You are a research assistant synthesizing information from multiple documents."
},
{
"role": "user",
"content": "Analyze the following research summaries, classifications, and entity insights. Provide a comprehensive analysis that identifies cross-document patterns, contradictions, and key insights. Format the response as structured JSON.\n\nSummaries: ${summary_stage.summary}\n\nClassifications: ${classification_stage.classifications}\n\nEntity Insights: ${entity_insights_stage.content}"
}
],
"model": "${model_selection_stage.recommendations[0].model}",
"response_format": {"type": "json_object"}
}
}
]
return workflow
async def main():
"""Run the complete research assistant workflow demo."""
console.print(Rule("[bold magenta]Advanced Research Workflow Demo[/bold magenta]"))
tracker = CostTracker() # Instantiate tracker
try:
# Display header
console.print(Panel.fit(
"[bold cyan]Advanced Research Assistant Workflow Demo[/bold cyan]\n"
"Powered by NetworkX DAG-based Workflow Engine",
title="Ultimate MCP Server",
border_style="green"
))
# Create the workflow definition
workflow = create_research_workflow()
# Visualize the workflow before execution
await display_workflow_diagram(workflow)
# Prompt user to continue
console.print("\n[yellow]Press Enter to execute the workflow...[/yellow]", end="")
input()
# Execute workflow with progress display
workflow_future = execute_optimized_workflow(
documents=SAMPLE_DOCS,
workflow=workflow,
max_concurrency=3
)
results = await display_execution_progress(workflow_future)
# Track cost if possible
if results and isinstance(results, dict) and "cost" in results:
try:
total_cost = results.get("cost", {}).get("total_cost", 0.0)
processing_time = results.get("total_processing_time", 0.0)
# Provider/Model is ambiguous here, use a placeholder
trackable = TrackableResult(
cost=total_cost,
input_tokens=0, # Not aggregated
output_tokens=0, # Not aggregated
provider="workflow",
model="research_workflow",
processing_time=processing_time
)
tracker.add_call(trackable)
except Exception as track_err:
logger.warning(f"Could not track workflow cost: {track_err}", exc_info=False)
if results:
console.print(Rule("[bold green]Workflow Execution Completed[/bold green]"))
await visualize_results(results.get("outputs", {}))
else:
console.print("[bold red]Workflow execution failed or timed out.[/bold red]")
except Exception as e:
console.print(f"[bold red]An unexpected error occurred:[/bold red] {e}")
# Display cost summary
tracker.display_summary(console)
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/vector/vector_service.py:
--------------------------------------------------------------------------------
```python
"""Vector database service for semantic search."""
import asyncio
import json
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from ultimate_mcp_server.services.vector.embeddings import get_embedding_service
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
# Try to import chromadb
try:
import chromadb
from chromadb.config import Settings as ChromaSettings
CHROMADB_AVAILABLE = True
logger.info("ChromaDB imported successfully", extra={"emoji_key": "success"})
except ImportError as e:
logger.warning(f"ChromaDB not available: {str(e)}", extra={"emoji_key": "warning"})
CHROMADB_AVAILABLE = False
# Try to import hnswlib, but don't fail if not available
try:
import hnswlib
HNSWLIB_AVAILABLE = True
HNSW_INDEX = hnswlib.Index
except ImportError:
HNSWLIB_AVAILABLE = False
HNSW_INDEX = None
class VectorCollection:
"""A collection of vectors with metadata."""
def __init__(
self,
name: str,
dimension: int = 1536,
similarity_metric: str = "cosine",
metadata: Optional[Dict[str, Any]] = None
):
"""Initialize a vector collection.
Args:
name: Collection name
dimension: Vector dimension
similarity_metric: Similarity metric (cosine, dot, or euclidean)
metadata: Optional metadata for the collection
"""
self.name = name
self.dimension = dimension
self.similarity_metric = similarity_metric
self.metadata = metadata or {}
# Initialize storage
self.vectors = []
self.ids = []
self.metadatas = []
# Create embedding service
self.embedding_service = get_embedding_service()
# Initialize search index
self._init_search_index()
logger.info(
f"Vector collection '{name}' created ({dimension} dimensions)",
extra={"emoji_key": "vector"}
)
def _init_search_index(self):
"""Initialize search index based on available libraries."""
self.index_type = "numpy" # Fallback
self.index = None
# Try to use HNSW for fast search if available
if HNSWLIB_AVAILABLE:
try:
self.index = HNSW_INDEX(space=self._get_hnswlib_space(), dim=self.dimension)
self.index.init_index(max_elements=1000, ef_construction=200, M=16)
self.index.set_ef(50) # Search accuracy parameter
self.index_type = "hnswlib"
logger.debug(
f"Using HNSW index for collection '{self.name}'",
emoji_key="vector"
)
except Exception as e:
logger.warning(
f"Failed to initialize HNSW index: {str(e)}. Falling back to numpy.",
emoji_key="warning"
)
self.index = None
def _get_hnswlib_space(self) -> str:
"""Get HNSW space based on similarity metric.
Returns:
HNSW space name
"""
if self.similarity_metric == "cosine":
return "cosine"
elif self.similarity_metric == "dot":
return "ip" # Inner product
elif self.similarity_metric == "euclidean":
return "l2"
else:
return "cosine" # Default
def add(
self,
vectors: Union[List[List[float]], np.ndarray],
ids: Optional[List[str]] = None,
metadatas: Optional[List[Dict[str, Any]]] = None
) -> List[str]:
"""Add vectors to the collection.
Args:
vectors: Vectors to add
ids: Optional IDs for the vectors (generated if not provided)
metadatas: Optional metadata for each vector
Returns:
List of vector IDs
"""
# Ensure vectors is a numpy array
if not isinstance(vectors, np.ndarray):
vectors = np.array(vectors, dtype=np.float32)
# Generate IDs if not provided
if ids is None:
ids = [str(uuid.uuid4()) for _ in range(len(vectors))]
# Ensure metadatas is a list of dicts
if metadatas is None:
metadatas = [{} for _ in range(len(vectors))]
# Add to storage
for _i, (vector, id, metadata) in enumerate(zip(vectors, ids, metadatas, strict=False)):
self.vectors.append(vector)
self.ids.append(id)
self.metadatas.append(metadata)
# Update index if using HNSW
if self.index_type == "hnswlib" and self.index is not None:
try:
# Resize index if needed
if len(self.vectors) > self.index.get_max_elements():
new_size = max(1000, len(self.vectors) * 2)
self.index.resize_index(new_size)
# Add vectors to index
start_idx = len(self.vectors) - len(vectors)
for i, vector in enumerate(vectors):
self.index.add_items(vector, start_idx + i)
except Exception as e:
logger.error(
f"Failed to update HNSW index: {str(e)}",
emoji_key="error"
)
# Rebuild index
self._rebuild_index()
logger.debug(
f"Added {len(vectors)} vectors to collection '{self.name}'",
emoji_key="vector"
)
return ids
def _rebuild_index(self):
"""Rebuild the search index from scratch."""
if not HNSWLIB_AVAILABLE or not self.vectors:
return
try:
# Re-initialize index
self.index = HNSW_INDEX(space=self._get_hnswlib_space(), dim=self.dimension)
self.index.init_index(max_elements=max(1000, len(self.vectors) * 2), ef_construction=200, M=16)
self.index.set_ef(50)
# Add all vectors
vectors_array = np.array(self.vectors, dtype=np.float32)
self.index.add_items(vectors_array, np.arange(len(self.vectors)))
logger.info(
f"Rebuilt HNSW index for collection '{self.name}'",
emoji_key="vector"
)
except Exception as e:
logger.error(
f"Failed to rebuild HNSW index: {str(e)}",
emoji_key="error"
)
self.index = None
self.index_type = "numpy"
def search(
self,
query_vector: Union[List[float], np.ndarray],
top_k: int = 5,
filter: Optional[Dict[str, Any]] = None,
similarity_threshold: float = 0.0
) -> List[Dict[str, Any]]:
"""Search for similar vectors.
Args:
query_vector: Query vector
top_k: Number of results to return
filter: Optional metadata filter
similarity_threshold: Minimum similarity score (0.0 to 1.0)
Returns:
List of results with scores and metadata
"""
# Ensure query_vector is a numpy array
if not isinstance(query_vector, np.ndarray):
query_vector = np.array(query_vector, dtype=np.float32)
# Log some diagnostic information
logger.debug(f"Collection '{self.name}' contains {len(self.vectors)} vectors")
logger.debug(f"Searching for top {top_k} matches with filter: {filter} and threshold: {similarity_threshold}")
# Filter vectors based on metadata if needed
if filter:
filtered_indices = self._apply_filter(filter)
if not filtered_indices:
logger.debug(f"No vectors match the filter criteria: {filter}")
return []
logger.debug(f"Filter reduced search space to {len(filtered_indices)} vectors")
else:
filtered_indices = list(range(len(self.vectors)))
logger.debug(f"No filter applied, searching all {len(filtered_indices)} vectors")
# If no vectors to search, return empty results
if not filtered_indices:
logger.debug("No vectors to search, returning empty results")
return []
# Perform search based on index type
if self.index_type == "hnswlib" and self.index is not None and not filter:
# Use HNSW for fast search (only if no filter)
try:
start_time = time.time()
labels, distances = self.index.knn_query(query_vector, k=min(top_k, len(self.vectors)))
search_time = time.time() - start_time
# Convert distances to similarities based on metric
if self.similarity_metric == "cosine" or self.similarity_metric == "dot":
similarities = 1.0 - distances[0] # Convert distance to similarity
else:
similarities = 1.0 / (1.0 + distances[0]) # Convert distance to similarity
# Format results
results = []
for _i, (label, similarity) in enumerate(zip(labels[0], similarities, strict=False)):
# Apply similarity threshold
if similarity < similarity_threshold:
continue
results.append({
"id": self.ids[label],
"similarity": float(similarity),
"metadata": self.metadatas[label],
"vector": self.vectors[label].tolist(),
})
logger.debug(
f"HNSW search completed in {search_time:.6f}s, found {len(results)} results"
)
for i, result in enumerate(results):
logger.debug(f"Result {i+1}: id={result['id']}, similarity={result['similarity']:.4f}, metadata={result['metadata']}")
return results
except Exception as e:
logger.error(
f"HNSW search failed: {str(e)}. Falling back to numpy.",
emoji_key="error"
)
# Fall back to numpy search
# Numpy-based search (slower but always works)
start_time = time.time()
# Calculate similarities
results = []
for idx in filtered_indices:
vector = self.vectors[idx]
# Calculate similarity based on metric
if self.similarity_metric == "cosine":
similarity = cosine_similarity(query_vector, vector)
elif self.similarity_metric == "dot":
similarity = np.dot(query_vector, vector)
elif self.similarity_metric == "euclidean":
similarity = 1.0 / (1.0 + np.linalg.norm(query_vector - vector))
else:
similarity = cosine_similarity(query_vector, vector)
# Apply similarity threshold
if similarity < similarity_threshold:
continue
results.append({
"id": self.ids[idx],
"similarity": float(similarity),
"metadata": self.metadatas[idx],
"vector": vector.tolist(),
})
# Sort by similarity (descending)
results.sort(key=lambda x: x["similarity"], reverse=True)
# Limit to top_k
results = results[:top_k]
search_time = time.time() - start_time
logger.debug(
f"Numpy search completed in {search_time:.6f}s, found {len(results)} results"
)
for i, result in enumerate(results):
logger.debug(f"Result {i+1}: id={result['id']}, similarity={result['similarity']:.4f}, metadata={result['metadata']}")
return results
def _apply_filter(self, filter: Dict[str, Any]) -> List[int]:
"""Apply metadata filter to get matching indices.
Args:
filter: Metadata filter
Returns:
List of matching indices
"""
filtered_indices = []
for i, metadata in enumerate(self.metadatas):
# Simple equality filter for now
match = True
for k, v in filter.items():
if k not in metadata or metadata[k] != v:
match = False
break
if match:
filtered_indices.append(i)
return filtered_indices
async def search_by_text(
self,
query_text: str,
top_k: int = 5,
filter: Optional[Dict[str, Any]] = None,
model: Optional[str] = None,
similarity_threshold: float = 0.0
) -> List[Dict[str, Any]]:
"""Search by text query.
Args:
query_text: Text query
top_k: Number of results to return
filter: Optional metadata filter
model: Embedding model name
similarity_threshold: Minimum similarity score (0.0 to 1.0)
Returns:
List of results with scores and metadata
"""
# Get query embedding - call create_embeddings with a list and get the first result
query_embeddings = await self.embedding_service.create_embeddings(
texts=[query_text], # Pass text as a list
# model=model # create_embeddings uses the model set during service init
)
if not query_embeddings: # Handle potential empty result
logger.error(f"Failed to generate embedding for query: {query_text}")
return []
query_embedding = query_embeddings[0] # Get the first (only) embedding
# Search with the embedding
return self.search(query_embedding, top_k, filter, similarity_threshold)
def delete(
self,
ids: Optional[List[str]] = None,
filter: Optional[Dict[str, Any]] = None
) -> int:
"""Delete vectors from the collection.
Args:
ids: IDs of vectors to delete
filter: Metadata filter for vectors to delete
Returns:
Number of vectors deleted
"""
if ids is None and filter is None:
return 0
# Get indices to delete
indices_to_delete = set()
# Add indices by ID
if ids:
for i, id in enumerate(self.ids):
if id in ids:
indices_to_delete.add(i)
# Add indices by filter
if filter:
filtered_indices = self._apply_filter(filter)
indices_to_delete.update(filtered_indices)
# Delete vectors (in reverse order to avoid index issues)
indices_to_delete = sorted(indices_to_delete, reverse=True)
for idx in indices_to_delete:
del self.vectors[idx]
del self.ids[idx]
del self.metadatas[idx]
# Rebuild index if using HNSW
if self.index_type == "hnswlib" and self.index is not None:
self._rebuild_index()
logger.info(
f"Deleted {len(indices_to_delete)} vectors from collection '{self.name}'",
emoji_key="vector"
)
return len(indices_to_delete)
def save(self, directory: Union[str, Path]) -> bool:
"""Save collection to disk.
Args:
directory: Directory to save to
Returns:
True if successful
"""
directory = Path(directory)
directory.mkdir(parents=True, exist_ok=True)
try:
# Save vectors
vectors_array = np.array(self.vectors, dtype=np.float32)
np.save(str(directory / "vectors.npy"), vectors_array)
# Save IDs and metadata
with open(directory / "data.json", "w") as f:
json.dump({
"name": self.name,
"dimension": self.dimension,
"similarity_metric": self.similarity_metric,
"metadata": self.metadata,
"ids": self.ids,
"metadatas": self.metadatas,
}, f)
logger.info(
f"Saved collection '{self.name}' to {directory}",
emoji_key="vector"
)
return True
except Exception as e:
logger.error(
f"Failed to save collection: {str(e)}",
emoji_key="error"
)
return False
@classmethod
def load(cls, directory: Union[str, Path]) -> "VectorCollection":
"""Load collection from disk.
Args:
directory: Directory to load from
Returns:
Loaded collection
Raises:
FileNotFoundError: If collection files not found
ValueError: If collection data is invalid
"""
directory = Path(directory)
# Check if files exist
vectors_file = directory / "vectors.npy"
data_file = directory / "data.json"
if not vectors_file.exists() or not data_file.exists():
raise FileNotFoundError(f"Collection files not found in {directory}")
try:
# Load vectors
vectors_array = np.load(str(vectors_file))
vectors = [vectors_array[i] for i in range(len(vectors_array))]
# Load data
with open(data_file, "r") as f:
data = json.load(f)
# Create collection
collection = cls(
name=data["name"],
dimension=data["dimension"],
similarity_metric=data["similarity_metric"],
metadata=data["metadata"]
)
# Set data
collection.ids = data["ids"]
collection.metadatas = data["metadatas"]
collection.vectors = vectors
# Rebuild index
collection._rebuild_index()
logger.info(
f"Loaded collection '{collection.name}' from {directory} ({len(vectors)} vectors)",
emoji_key="vector"
)
return collection
except Exception as e:
logger.error(
f"Failed to load collection: {str(e)}",
emoji_key="error"
)
raise ValueError(f"Failed to load collection: {str(e)}") from e
def get_stats(self) -> Dict[str, Any]:
"""Get collection statistics.
Returns:
Dictionary of statistics
"""
return {
"name": self.name,
"dimension": self.dimension,
"similarity_metric": self.similarity_metric,
"vectors_count": len(self.vectors),
"index_type": self.index_type,
"metadata": self.metadata,
}
def clear(self) -> None:
"""Clear all vectors from the collection."""
self.vectors = []
self.ids = []
self.metadatas = []
# Reset index
self._init_search_index()
logger.info(
f"Cleared collection '{self.name}'",
emoji_key="vector"
)
async def query(
self,
query_texts: List[str],
n_results: int = 10,
where: Optional[Dict[str, Any]] = None,
where_document: Optional[Dict[str, Any]] = None,
include: Optional[List[str]] = None
) -> Dict[str, List[Any]]:
"""Query the collection with text queries (compatibility with ChromaDB).
Args:
query_texts: List of query texts
n_results: Number of results to return
where: Optional metadata filter
where_document: Optional document content filter
include: Optional list of fields to include
Returns:
Dictionary with results in ChromaDB format
"""
logger.debug(f"DEBUG VectorCollection.query: query_texts={query_texts}, n_results={n_results}")
logger.debug(f"DEBUG VectorCollection.query: where={where}, where_document={where_document}")
logger.debug(f"DEBUG VectorCollection.query: include={include}")
logger.debug(f"DEBUG VectorCollection.query: Collection has {len(self.vectors)} vectors and {len(self.ids)} IDs")
# Initialize results
results = {
"ids": [],
"documents": [],
"metadatas": [],
"distances": [],
"embeddings": []
}
# Process each query
for query_text in query_texts:
# Get embedding using the async embedding service (which uses its configured model)
logger.debug(f"DEBUG VectorCollection.query: Getting embedding for '{query_text}' using service model: {self.embedding_service.model_name}")
try:
query_embeddings_list = await self.embedding_service.create_embeddings([query_text])
if not query_embeddings_list or not query_embeddings_list[0]:
logger.error(f"Failed to generate embedding for query: '{query_text[:50]}...'")
# Add empty results for this query and continue
results["ids"].append([])
results["documents"].append([])
results["metadatas"].append([])
results["distances"].append([])
if "embeddings" in (include or []):
results["embeddings"].append([])
continue # Skip to next query_text
query_embedding = np.array(query_embeddings_list[0], dtype=np.float32)
if query_embedding.size == 0:
logger.warning(f"Generated query embedding is empty for: '{query_text[:50]}...'. Skipping search for this query.")
# Add empty results for this query and continue
results["ids"].append([])
results["documents"].append([])
results["metadatas"].append([])
results["distances"].append([])
if "embeddings" in (include or []):
results["embeddings"].append([])
continue # Skip to next query_text
except Exception as embed_err:
logger.error(f"Error generating embedding for query '{query_text[:50]}...': {embed_err}", exc_info=True)
# Add empty results for this query and continue
results["ids"].append([])
results["documents"].append([])
results["metadatas"].append([])
results["distances"].append([])
if "embeddings" in (include or []):
results["embeddings"].append([])
continue # Skip to next query_text
logger.debug(f"DEBUG VectorCollection.query: Embedding shape: {query_embedding.shape}")
# Search with the embedding
logger.debug(f"Searching for query text: '{query_text}' in collection '{self.name}'")
search_results = self.search(
query_vector=query_embedding, # Use the generated embedding
top_k=n_results,
filter=where,
similarity_threshold=0.0 # Set to 0 to get all results for debugging
)
logger.debug(f"DEBUG VectorCollection.query: Found {len(search_results)} raw search results")
# Format results in ChromaDB format
ids = []
documents = []
metadatas = []
distances = []
embeddings = []
for i, item in enumerate(search_results):
ids.append(item["id"])
# Extract document from metadata (keep existing robust logic)
metadata = item.get("metadata", {})
doc = ""
if "text" in metadata:
doc = metadata["text"]
elif "document" in metadata:
doc = metadata["document"]
elif "content" in metadata:
doc = metadata["content"]
if not doc and isinstance(metadata, str):
doc = metadata
# Apply document content filter if specified
if where_document and where_document.get("$contains"):
filter_text = where_document["$contains"]
if filter_text not in doc:
logger.debug(f"DEBUG VectorCollection.query: Skipping doc {i} - doesn't contain filter text")
continue
logger.debug(f"Result {i+1}: id={item['id']}, similarity={item.get('similarity', 0.0):.4f}, doc_length={len(doc)}")
documents.append(doc)
metadatas.append(metadata)
distance = 1.0 - item.get("similarity", 0.0)
distances.append(distance)
if "embeddings" in (include or []):
embeddings.append(item.get("vector", []))
# Add results for the current query_text
results["ids"].append(ids)
results["documents"].append(documents)
results["metadatas"].append(metadatas)
results["distances"].append(distances)
if "embeddings" in (include or []):
results["embeddings"].append(embeddings)
logger.debug(f"DEBUG VectorCollection.query: Final formatted results for this query - {len(documents)} documents")
return results
class VectorDatabaseService:
"""Vector database service for semantic search."""
_instance = None
def __new__(cls, *args, **kwargs):
"""Create a singleton instance."""
if cls._instance is None:
cls._instance = super(VectorDatabaseService, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(
self,
base_dir: Optional[Union[str, Path]] = None,
use_chromadb: Optional[bool] = None
):
"""Initialize the vector database service.
Args:
base_dir: Base directory for storage
use_chromadb: Whether to use ChromaDB (if available)
"""
# Only initialize once for singleton
if self._initialized:
return
# Set base directory
if base_dir:
self.base_dir = Path(base_dir)
else:
self.base_dir = Path.home() / ".ultimate" / "vector_db"
# Create base directory if it doesn't exist
self.base_dir.mkdir(parents=True, exist_ok=True)
# Check if ChromaDB should be used
self.use_chromadb = use_chromadb if use_chromadb is not None else CHROMADB_AVAILABLE
# Initialize ChromaDB client if used
self.chroma_client = None
if self.use_chromadb and CHROMADB_AVAILABLE:
try:
# Create ChromaDB directory if it doesn't exist
chroma_dir = self.base_dir / "chromadb"
chroma_dir.mkdir(parents=True, exist_ok=True)
self.chroma_client = chromadb.PersistentClient(
path=str(chroma_dir),
settings=ChromaSettings(
anonymized_telemetry=False,
allow_reset=True
)
)
# Test if it works properly
test_collections = self.chroma_client.list_collections()
logger.debug(f"ChromaDB initialized with {len(test_collections)} existing collections")
logger.info(
"Using ChromaDB for vector storage",
emoji_key="vector"
)
except Exception as e:
logger.error(
f"Failed to initialize ChromaDB: {str(e)}. Vector operations will not work properly.",
emoji_key="error"
)
# We'll raise an error rather than falling back to local storage
# as that creates inconsistency
self.use_chromadb = False
self.chroma_client = None
# Re-raise if ChromaDB was explicitly requested
if use_chromadb:
raise ValueError(f"ChromaDB initialization failed: {str(e)}") from e
else:
if use_chromadb and not CHROMADB_AVAILABLE:
logger.error(
"ChromaDB was explicitly requested but is not available. Please install it with: pip install chromadb",
emoji_key="error"
)
raise ImportError("ChromaDB was requested but is not installed")
self.use_chromadb = False
# Collections
self.collections = {}
# Get embedding service
self.embedding_service = get_embedding_service()
self._initialized = True
logger.info(
f"Vector database service initialized (base_dir: {self.base_dir}, use_chromadb: {self.use_chromadb})",
emoji_key="vector"
)
async def _reset_chroma_client(self) -> bool:
"""Reset or recreate the ChromaDB client.
Returns:
True if successful
"""
if not CHROMADB_AVAILABLE or not self.use_chromadb:
return False
try:
# First try using the reset API if available
if self.chroma_client and hasattr(self.chroma_client, 'reset'):
try:
self.chroma_client.reset()
logger.debug("Reset ChromaDB client successfully")
return True
except Exception as e:
logger.debug(f"Failed to reset ChromaDB client using reset(): {str(e)}")
# If that fails, recreate the client
chroma_dir = self.base_dir / "chromadb"
chroma_dir.mkdir(parents=True, exist_ok=True)
self.chroma_client = chromadb.PersistentClient(
path=str(chroma_dir),
settings=ChromaSettings(
anonymized_telemetry=False,
allow_reset=True
)
)
logger.debug("Successfully recreated ChromaDB client")
return True
except Exception as e:
logger.error(
f"Failed to reset or recreate ChromaDB client: {str(e)}",
emoji_key="error"
)
return False
async def create_collection(
self,
name: str,
dimension: int = 1536,
similarity_metric: str = "cosine",
metadata: Optional[Dict[str, Any]] = None,
overwrite: bool = False
) -> Union[VectorCollection, Any]:
"""Create a new collection.
Args:
name: Collection name
dimension: Vector dimension
similarity_metric: Similarity metric (cosine, dot, or euclidean)
metadata: Optional metadata for the collection
overwrite: Whether to overwrite existing collection
Returns:
Created collection
Raises:
ValueError: If collection already exists and overwrite is False
"""
# Check if collection already exists in memory
if name in self.collections and not overwrite:
raise ValueError(f"Collection '{name}' already exists")
# For consistency, if overwrite is True, explicitly delete any existing collection
if overwrite:
try:
# Delete from memory collections
if name in self.collections:
del self.collections[name]
# Try to delete from ChromaDB
await self.delete_collection(name)
logger.debug(f"Deleted existing collection '{name}' for overwrite")
# # If using ChromaDB and overwrite is True, also try to reset the client
# if self.use_chromadb and self.chroma_client:
# await self._reset_chroma_client()
# logger.debug("Reset ChromaDB client before creating new collection")
# Force a delay to ensure deletions complete
await asyncio.sleep(1.5)
except Exception as e:
logger.debug(f"Error during collection cleanup for overwrite: {str(e)}")
# Create collection based on storage type
if self.use_chromadb and self.chroma_client is not None:
# Use ChromaDB
# Sanitize metadata for ChromaDB (no None values)
sanitized_metadata = {}
if metadata:
for k, v in metadata.items():
if v is not None and not isinstance(v, (str, int, float, bool)):
sanitized_metadata[k] = str(v) # Convert to string
elif v is not None:
sanitized_metadata[k] = v # Keep as is if it's a valid type
# Force a delay to ensure previous deletions have completed
await asyncio.sleep(0.1)
# Create collection
try:
collection = self.chroma_client.create_collection(
name=name,
metadata=sanitized_metadata or {"description": "Vector collection"}
)
logger.info(
f"Created ChromaDB collection '{name}'",
emoji_key="vector"
)
self.collections[name] = collection
return collection
except Exception as e:
# Instead of falling back to local storage, raise the error
logger.error(
f"Failed to create ChromaDB collection: {str(e)}",
emoji_key="error"
)
raise ValueError(f"Failed to create ChromaDB collection: {str(e)}") from e
else:
# Use local storage
collection = VectorCollection(
name=name,
dimension=dimension,
similarity_metric=similarity_metric,
metadata=metadata
)
self.collections[name] = collection
return collection
async def get_collection(self, name: str) -> Optional[Union[VectorCollection, Any]]:
"""Get a collection by name.
Args:
name: Collection name
Returns:
Collection or None if not found
"""
# Check if collection is already loaded
if name in self.collections:
return self.collections[name]
# Try to load from disk
if self.use_chromadb and self.chroma_client is not None:
# Check if ChromaDB collection exists
try:
# In ChromaDB v0.6.0+, list_collections() returns names not objects
existing_collections = self.chroma_client.list_collections()
existing_collection_names = []
# Handle both chromadb v0.6.0+ and older versions
if existing_collections and not isinstance(existing_collections[0], str):
# v0.6.0+ returns collection objects
for collection in existing_collections:
# Access name attribute or use object itself if it's a string
if hasattr(collection, 'name'):
existing_collection_names.append(collection.name)
else:
existing_collection_names.append(str(collection))
else:
# Older versions return string names directly
existing_collection_names = existing_collections
if name in existing_collection_names:
collection = self.chroma_client.get_collection(name)
self.collections[name] = collection
return collection
except Exception as e:
logger.error(
f"Failed to get ChromaDB collection: {str(e)}",
emoji_key="error"
)
# Try to load local collection
collection_dir = self.base_dir / "collections" / name
if collection_dir.exists():
try:
collection = VectorCollection.load(collection_dir)
self.collections[name] = collection
return collection
except Exception as e:
logger.error(
f"Failed to load collection '{name}': {str(e)}",
emoji_key="error"
)
return None
async def list_collections(self) -> List[str]:
"""List all collection names.
Returns:
List of collection names
"""
collection_names = set(self.collections.keys())
# Add collections from ChromaDB
if self.use_chromadb and self.chroma_client is not None:
try:
# Handle both chromadb v0.6.0+ and older versions
chroma_collections = self.chroma_client.list_collections()
# Check if we received a list of collection objects or just names
if chroma_collections and not isinstance(chroma_collections[0], str):
# v0.6.0+ returns collection objects
for collection in chroma_collections:
# Access name attribute or use object itself if it's a string
if hasattr(collection, 'name'):
collection_names.add(collection.name)
else:
collection_names.add(str(collection))
else:
# Older versions return string names directly
for collection in chroma_collections:
collection_names.add(collection)
except Exception as e:
logger.error(
f"Failed to list ChromaDB collections: {str(e)}",
emoji_key="error"
)
# Add collections from disk
collections_dir = self.base_dir / "collections"
if collections_dir.exists():
for path in collections_dir.iterdir():
if path.is_dir() and (path / "data.json").exists():
collection_names.add(path.name)
return list(collection_names)
async def delete_collection(self, name: str) -> bool:
"""Delete a collection.
Args:
name: Collection name
Returns:
True if successful
"""
# Remove from loaded collections
if name in self.collections:
del self.collections[name]
success = True
# Delete from ChromaDB
if self.use_chromadb and self.chroma_client is not None:
try:
# Check if collection exists in ChromaDB first
exists_in_chromadb = False
try:
collections = self.chroma_client.list_collections()
# Handle different versions of ChromaDB API
if collections and hasattr(collections[0], 'name'):
collection_names = [c.name for c in collections]
else:
collection_names = collections
exists_in_chromadb = name in collection_names
except Exception as e:
logger.debug(f"Error checking ChromaDB collections: {str(e)}")
# Only try to delete if it exists
if exists_in_chromadb:
self.chroma_client.delete_collection(name)
logger.debug(f"Deleted ChromaDB collection '{name}'")
except Exception as e:
logger.warning(
f"Failed to delete ChromaDB collection: {str(e)}",
emoji_key="warning"
)
success = False
# Delete from disk
collection_dir = self.base_dir / "collections" / name
if collection_dir.exists():
try:
import shutil
shutil.rmtree(collection_dir)
logger.debug(f"Deleted collection directory: {collection_dir}")
except Exception as e:
logger.error(
f"Failed to delete collection directory: {str(e)}",
emoji_key="error"
)
return False
logger.info(
f"Deleted collection '{name}'",
emoji_key="vector"
)
return success
async def add_texts(
self,
collection_name: str,
texts: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None,
embedding_model: Optional[str] = None,
batch_size: int = 100
) -> List[str]:
"""Add texts to a collection.
Args:
collection_name: Collection name
texts: Texts to add
metadatas: Optional metadata for each text
ids: Optional IDs for the texts
embedding_model: Embedding model name (NOTE: Model is set during EmbeddingService init)
batch_size: Maximum batch size for embedding generation
Returns:
List of document IDs
Raises:
ValueError: If collection not found
"""
# Get or create collection
collection = await self.get_collection(collection_name)
if collection is None:
collection = await self.create_collection(collection_name)
# Generate embeddings
logger.debug(f"Generating embeddings for {len(texts)} texts using model: {self.embedding_service.model_name}")
embeddings = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
batch_embeddings = await self.embedding_service.create_embeddings(
texts=batch_texts,
)
embeddings.extend(batch_embeddings)
if len(texts) > batch_size: # Add delay if batching
await asyncio.sleep(0.1) # Small delay between batches
logger.debug(f"Generated {len(embeddings)} embeddings")
# Add to collection
if self.use_chromadb and isinstance(collection, chromadb.Collection):
# ChromaDB collection
try:
# Generate IDs if not provided
if ids is None:
ids = [str(uuid.uuid4()) for _ in range(len(texts))]
# Ensure metadatas is provided
if metadatas is None:
metadatas = [{} for _ in range(len(texts))]
# Add to ChromaDB collection
collection.add(
embeddings=embeddings,
documents=texts,
metadatas=metadatas,
ids=ids
)
logger.info(
f"Added {len(texts)} documents to ChromaDB collection '{collection_name}'",
emoji_key="vector"
)
return ids
except Exception as e:
logger.error(
f"Failed to add documents to ChromaDB collection: {str(e)}",
emoji_key="error"
)
raise
else:
# Local collection
# For local collection, store text in metadata
combined_metadata = []
for _i, (text, meta) in enumerate(zip(texts, metadatas or [{} for _ in range(len(texts))], strict=False)):
# Create metadata with text as main content
combined_meta = {"text": text}
# Add any other metadata
if meta:
combined_meta.update(meta)
combined_metadata.append(combined_meta)
logger.debug(f"Adding vectors to local collection with metadata: {combined_metadata[0] if combined_metadata else None}")
result_ids = collection.add(
vectors=embeddings,
ids=ids,
metadatas=combined_metadata
)
logger.debug(f"Added {len(result_ids)} vectors to local collection '{collection_name}'")
return result_ids
async def search_by_text(
self,
collection_name: str,
query_text: str,
top_k: int = 5,
filter: Optional[Dict[str, Any]] = None,
embedding_model: Optional[str] = None,
include_vectors: bool = False,
similarity_threshold: float = 0.0
) -> List[Dict[str, Any]]:
"""Search a collection by text query.
Args:
collection_name: Collection name
query_text: Text query
top_k: Number of results to return
filter: Optional metadata filter
embedding_model: Embedding model name
include_vectors: Whether to include vectors in results
similarity_threshold: Minimum similarity score (0.0 to 1.0)
Returns:
List of search results
Raises:
ValueError: If collection not found
"""
# Get collection
collection = await self.get_collection(collection_name)
if collection is None:
raise ValueError(f"Collection '{collection_name}' not found")
# Search collection
if self.use_chromadb and isinstance(collection, chromadb.Collection):
# ChromaDB collection
try:
# Convert filter to ChromaDB format if provided
chroma_filter = self._convert_to_chroma_filter(filter) if filter else None
# Prepare include parameters for ChromaDB
include_params = ["documents", "metadatas", "distances"]
if include_vectors:
include_params.append("embeddings")
# Get embedding directly using our service
query_embeddings = await self.embedding_service.create_embeddings(
texts=[query_text],
# model=embedding_model # Model is defined in the service instance
)
if not query_embeddings:
logger.error(f"Failed to generate embedding for query: {query_text}")
return []
query_embedding = query_embeddings[0]
logger.debug(f"Using explicitly generated embedding with model {self.embedding_service.model_name}")
# Search ChromaDB collection with our embedding
results = collection.query(
query_embeddings=[query_embedding], # Use our embedding directly, not ChromaDB's
n_results=top_k,
where=chroma_filter,
where_document=None,
include=include_params
)
# Format results and apply similarity threshold
formatted_results = []
for i in range(len(results["ids"][0])):
similarity = 1.0 - float(results["distances"][0][i]) # Convert distance to similarity
# Skip results below threshold
if similarity < similarity_threshold:
continue
result = {
"id": results["ids"][0][i],
"text": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"similarity": similarity,
}
if include_vectors and "embeddings" in results:
result["vector"] = results["embeddings"][0][i]
formatted_results.append(result)
return formatted_results
except Exception as e:
logger.error(
f"Failed to search ChromaDB collection: {str(e)}",
emoji_key="error"
)
raise
else:
# Local collection
results = await collection.search_by_text(
query_text=query_text,
top_k=top_k,
filter=filter,
# model=embedding_model, # Pass model used by the collection's service instance
similarity_threshold=similarity_threshold
)
# Format results
formatted_results = []
for result in results:
formatted_result = {
"id": result["id"],
"text": result["metadata"].get("text", ""),
"metadata": {k: v for k, v in result["metadata"].items() if k != "text"},
"similarity": result["similarity"],
}
if include_vectors:
formatted_result["vector"] = result["vector"]
formatted_results.append(formatted_result)
return formatted_results
def _convert_to_chroma_filter(self, filter: Dict[str, Any]) -> Dict[str, Any]:
"""Convert filter to ChromaDB format.
Args:
filter: Filter dictionary
Returns:
ChromaDB-compatible filter
"""
# Simple equality filter for now
return filter
def save_all_collections(self) -> int:
"""Save all local collections to disk.
Returns:
Number of collections saved
"""
saved_count = 0
collections_dir = self.base_dir / "collections"
collections_dir.mkdir(parents=True, exist_ok=True)
for name, collection in self.collections.items():
if not self.use_chromadb or not isinstance(collection, chromadb.Collection):
# Only save local collections
collection_dir = collections_dir / name
if collection.save(collection_dir):
saved_count += 1
logger.info(
f"Saved {saved_count} collections to disk",
emoji_key="vector"
)
return saved_count
async def get_stats(self) -> Dict[str, Any]:
"""Get statistics about collections.
Returns:
Dictionary of statistics
"""
collection_names = await self.list_collections()
collection_stats = {}
for name in collection_names:
collection = await self.get_collection(name)
if collection:
if isinstance(collection, VectorCollection):
collection_stats[name] = collection.get_stats()
else:
# ChromaDB collection
try:
count = collection.count()
collection_stats[name] = {
"count": count,
"type": "chromadb"
}
except Exception as e:
logger.error(
f"Error getting stats for ChromaDB collection '{name}': {str(e)}",
emoji_key="error"
)
collection_stats[name] = {
"count": 0,
"type": "chromadb",
"error": str(e)
}
stats = {
"collections": len(collection_names),
"collection_stats": collection_stats
}
return stats
async def get_collection_metadata(self, name: str) -> Dict[str, Any]:
"""Get collection metadata.
Args:
name: Collection name
Returns:
Collection metadata
Raises:
ValueError: If collection not found
"""
# Get collection
collection = await self.get_collection(name)
if collection is None:
raise ValueError(f"Collection '{name}' not found")
# Get metadata
try:
if self.use_chromadb and hasattr(collection, "get_metadata"):
# ChromaDB collection
return collection.get_metadata() or {}
elif hasattr(collection, "metadata"):
# Local collection
return collection.metadata or {}
except Exception as e:
logger.error(
f"Failed to get collection metadata: {str(e)}",
emoji_key="error"
)
return {}
async def update_collection_metadata(self, name: str, metadata: Dict[str, Any]) -> bool:
"""Update collection metadata.
Args:
name: Collection name
metadata: New metadata
Returns:
True if successful
Raises:
ValueError: If collection not found
"""
# Get collection
collection = await self.get_collection(name)
if collection is None:
raise ValueError(f"Collection '{name}' not found")
# Update metadata
try:
if self.use_chromadb and hasattr(collection, "update_metadata"):
# ChromaDB collection - needs validation
validated_metadata = {}
for k, v in metadata.items():
# ChromaDB accepts only str, int, float, bool
if isinstance(v, (str, int, float, bool)):
validated_metadata[k] = v
elif v is None:
# Skip None values
logger.debug(f"Skipping None value for metadata key '{k}'")
continue
else:
# Convert other types to string
validated_metadata[k] = str(v)
# Debug log the validated metadata
logger.debug(f"Updating ChromaDB collection metadata with: {validated_metadata}")
collection.update_metadata(validated_metadata)
elif hasattr(collection, "metadata"):
# Local collection
collection.metadata.update(metadata)
logger.info(
f"Updated metadata for collection '{name}'",
emoji_key="vector"
)
return True
except Exception as e:
logger.error(
f"Failed to update collection metadata: {str(e)}",
emoji_key="error"
)
# Don't re-raise, just return false
return False
# Singleton instance getter
def get_vector_db_service(
base_dir: Optional[Union[str, Path]] = None,
use_chromadb: Optional[bool] = None
) -> VectorDatabaseService:
"""Get the vector database service singleton instance.
Args:
base_dir: Base directory for storage
use_chromadb: Whether to use ChromaDB (if available)
Returns:
VectorDatabaseService singleton instance
"""
return VectorDatabaseService(base_dir, use_chromadb)
```
--------------------------------------------------------------------------------
/examples/sentiment_analysis_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Business sentiment analysis demonstration using Ultimate MCP Server."""
import asyncio
import sys
from pathlib import Path
from typing import Any, Dict, List
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
# Third-party imports
from rich import box
from rich.console import Group
from rich.markup import escape
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.rule import Rule
from rich.table import Table
from rich.tree import Tree
# Project imports
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.tools.sentiment_analysis import (
analyze_business_sentiment,
analyze_business_text_batch,
)
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker
from ultimate_mcp_server.utils.logging.console import console
# Initialize logger
logger = get_logger("example.business_sentiment_demo")
# Provider and model configuration - easy to change
PROVIDER = Provider.OPENAI.value # Change this to switch providers (e.g., Provider.OPENAI.value)
MODEL = 'gpt-4.1-nano' # Set to None to use default model for the provider, or specify a model name
# Sample data for demonstrations
SAMPLE_FEEDBACK = {
"retail": "I recently purchased your premium blender model BX-9000. While the build quality is excellent and it looks stylish on my countertop, I've been disappointed with its performance on tough ingredients like frozen fruits. It leaves chunks unblended even after several minutes of operation. Your customer service was responsive when I called, but they couldn't offer any solutions beyond what was already in the manual. For a product in this price range ($249), I expected better performance. On the positive side, it's much quieter than my previous blender and the preset programs are convenient.",
"financial": "I've been using your online banking platform for my small business for about 6 months now. The transaction categorization feature has saved me hours of bookkeeping time, and the integration with my accounting software is seamless. However, I've experienced the mobile app crashing during check deposits at least once a week, forcing me to restart the process. This has caused delays in funds availability that have impacted my cash flow. Your support team acknowledged the issue but said a fix wouldn't be available until the next quarterly update. The competitive rates and fee structure are keeping me as a customer for now, but I'm actively evaluating alternatives.",
"healthcare": "My recent stay at Memorial Care Hospital exceeded expectations. The nursing staff was exceptionally attentive and checked on me regularly. Dr. Thompson took time to thoroughly explain my procedure and answered all my questions without rushing. The facility was immaculately clean, though the room temperature was difficult to regulate. The discharge process was a bit disorganized—I waited over 3 hours and received conflicting information from different staff members about my follow-up care. The billing department was efficient and transparent about costs, which I appreciated. Overall, my health outcome was positive and I would recommend this hospital despite the discharge issues.",
"b2b_tech": "We implemented your enterprise resource planning solution across our manufacturing division last quarter. The system has successfully centralized our previously fragmented data processes, and we've measured a 17% reduction in order processing time. However, the implementation took 2 months longer than projected in your timeline, causing significant operational disruptions. Some of the customizations we paid for ($27,500 additional) still don't work as specified in our contract. Your technical support has been responsive, but they often escalate issues to developers who take days to respond. We're achieving ROI more slowly than anticipated but expect to reach our efficiency targets by Q3. Training materials for new staff are excellent.",
"support_ticket": "URGENT: Critical system outage affecting all users in EU region. Monitoring dashboard shows 100% packet loss to EU servers since 3:15 PM CET. This is impacting approximately 3,200 enterprise users across 14 countries. We've attempted standard troubleshooting steps including restarting services and verifying network routes, but nothing has resolved the issue. Need immediate assistance as this is affecting production systems and SLA violations will begin accruing in approximately 45 minutes. Our technical contact is Jan Kowalski (+48 555 123 456). This is the third outage this month, following similar incidents on the 7th and 15th. Reference case numbers: INC-7723 and INC-8105.",
}
BATCH_FEEDBACK = [
{
"customer_id": "AB-10293",
"channel": "Email Survey",
"product": "CloudSync Pro",
"text": "Your automated onboarding process was a game-changer for our IT department. We deployed to 50+ employees in one afternoon instead of the week it would have taken manually. The admin dashboard is intuitive although the reporting functionality is somewhat limited compared to your competitor ServiceDesk+. We've already recommended your solution to several partner companies.",
},
{
"customer_id": "XY-58204",
"channel": "Support Ticket",
"product": "CloudSync Pro",
"text": "We've been experiencing intermittent synchronization failures for the past 3 days. Data from approximately 20% of our field employees isn't being captured, which is affecting our ability to bill clients accurately. This is creating significant revenue leakage. Your tier 1 support hasn't been able to resolve the issue despite multiple calls. We need escalation to engineering ASAP. Our contract SLA guarantees 99.9% reliability and we're well below that threshold currently.",
},
{
"customer_id": "LM-39157",
"channel": "NPS Survey",
"product": "CloudSync Basic",
"text": "I find the mobile app version significantly less functional than the desktop version. Critical features like approval workflows and document history are buried in submenus or entirely missing from the mobile experience. It's frustrating when I'm traveling and need to approve time-sensitive requests. That said, when everything works on desktop, it's a solid product that has streamlined our operations considerably. Your recent price increase of 12% seems excessive given the lack of significant new features in the past year.",
},
{
"customer_id": "PQ-73046",
"channel": "Sales Follow-up",
"product": "CloudSync Enterprise",
"text": "The ROI analysis your team provided convinced our CFO to approve the upgrade to Enterprise tier. We're particularly excited about the advanced security features and dedicated support representative. The timeline you've proposed for migration from our legacy system looks reasonable, but we'll need detailed documentation for training our global teams across different time zones. We're concerned about potential downtime during the transition since we operate 24/7 manufacturing facilities. Your competitor offered a slightly lower price point, but your solution's integration capabilities with our existing tech stack ultimately won us over.",
},
]
async def analyze_single_feedback(gateway, tracker: CostTracker):
"""Demonstrate analysis of a single piece of business feedback."""
console.print(Rule("[bold blue]Individual Business Feedback Analysis[/bold blue]"))
logger.info("Starting individual feedback analysis", emoji_key="start")
# Select a feedback sample
industry = "retail"
feedback_text = SAMPLE_FEEDBACK[industry]
# Display the feedback
console.print(
Panel(
escape(feedback_text),
title=f"[bold magenta]Sample {industry.capitalize()} Customer Feedback[/bold magenta]",
border_style="magenta",
expand=False,
)
)
# Analysis configuration
analysis_config = {
"industry": industry,
"analysis_mode": "comprehensive",
"entity_extraction": True,
"aspect_based": True,
"competitive_analysis": False,
"intent_detection": True,
"risk_assessment": True,
}
# Display configuration
config_table = Table(title="Analysis Configuration", show_header=True, box=box.ROUNDED)
config_table.add_column("Parameter", style="cyan")
config_table.add_column("Value", style="green")
for key, value in analysis_config.items():
config_table.add_row(key, str(value))
console.print(config_table)
try:
# Show progress during analysis
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]Analyzing business sentiment..."),
transient=True,
) as progress:
task = progress.add_task("Analyzing...", total=None) # noqa: F841
# Directly call analyze_business_sentiment with proper parameters
result = await analyze_business_sentiment(
text=feedback_text,
provider=PROVIDER,
model=MODEL,
**analysis_config,
)
# Track cost
if "meta" in result:
tracker.record_call(
provider=result["meta"]["provider"],
model=result["meta"]["model"],
input_tokens=result["meta"]["tokens"]["input"],
output_tokens=result["meta"]["tokens"]["output"],
cost=result["meta"]["cost"],
)
# Display results
if result["success"]:
logger.success("Sentiment analysis completed successfully", emoji_key="success")
# Core metrics panel
core_metrics = result.get("core_metrics", {})
metrics_table = Table(box=box.SIMPLE)
metrics_table.add_column("Metric", style="cyan")
metrics_table.add_column("Value", style="white")
metrics_table.add_row(
"Sentiment", f"[bold]{core_metrics.get('primary_sentiment', 'N/A')}[/bold]"
)
metrics_table.add_row(
"Sentiment Score", f"{core_metrics.get('sentiment_score', 0.0):.2f}"
)
metrics_table.add_row(
"Satisfaction",
f"{result.get('business_dimensions', {}).get('customer_satisfaction', 0.0):.2f}",
)
metrics_table.add_row("Urgency", core_metrics.get("urgency", "N/A"))
# Business dimension visualization
dimensions = result.get("business_dimensions", {})
viz_table = Table(show_header=False, box=None)
viz_table.add_column("Dimension", style="blue")
viz_table.add_column("Score", style="white")
viz_table.add_column("Visual", style="yellow")
max_bar_length = 20
for key, value in dimensions.items():
if isinstance(value, (int, float)):
# Create visual bar based on score
bar_length = int(value * max_bar_length)
bar = "█" * bar_length + "░" * (max_bar_length - bar_length)
viz_table.add_row(key.replace("_", " ").title(), f"{value:.2f}", bar)
# Aspect sentiment visualization
aspects = result.get("aspect_sentiment", {})
aspect_table = Table(title="Aspect-Based Sentiment", box=box.ROUNDED)
aspect_table.add_column("Aspect", style="cyan")
aspect_table.add_column("Sentiment", style="white")
aspect_table.add_column("Visual", style="yellow")
for aspect, score in aspects.items():
# Create visual bar with color
if score >= 0:
bar_length = int(score * 10)
bar = f"[green]{'█' * bar_length}{'░' * (10 - bar_length)}[/green]"
else:
bar_length = int(abs(score) * 10)
bar = f"[red]{'█' * bar_length}{'░' * (10 - bar_length)}[/red]"
aspect_table.add_row(aspect.replace("_", " ").title(), f"{score:.2f}", bar)
# Display all visualizations
console.print(
Panel(
Group(metrics_table, Rule(style="dim"), viz_table),
title="[bold green]Core Business Metrics[/bold green]",
border_style="green",
)
)
console.print(aspect_table)
# Entity extraction
if "entity_extraction" in result:
entity_panel = Panel(
_format_entities(result["entity_extraction"]),
title="[bold blue]Extracted Entities[/bold blue]",
border_style="blue",
)
console.print(entity_panel)
# Intent analysis
if "intent_analysis" in result:
intent_panel = _display_intent_analysis(result["intent_analysis"])
console.print(intent_panel)
# Risk assessment
if "risk_assessment" in result:
risk_panel = _display_risk_assessment(result["risk_assessment"])
console.print(risk_panel)
# Recommended actions
if "recommended_actions" in result:
actions = result["recommended_actions"]
if actions:
# Format and display actions
formatted_actions = []
for i, action in enumerate(actions):
if isinstance(action, dict):
# Format dictionary as readable string
if "action" in action:
action_text = f"[bold]{i + 1}.[/bold] {action['action']}"
# Add additional fields if available
details = []
for key, value in action.items():
if key != "action": # Skip the action field we already added
details.append(f"{key}: {value}")
if details:
action_text += f" ({', '.join(details)})"
formatted_actions.append(action_text)
else:
# Generic dictionary formatting
action_text = f"[bold]{i + 1}.[/bold] " + ", ".join(
[f"{k}: {v}" for k, v in action.items()]
)
formatted_actions.append(action_text)
else:
formatted_actions.append(f"[bold]{i + 1}.[/bold] {action}")
console.print(
Panel(
"\n".join(formatted_actions),
title="[bold yellow]Prioritized Action Plan[/bold yellow]",
border_style="yellow",
expand=False,
)
)
# Execution metrics
meta = result.get("meta", {})
exec_table = Table(title="Execution Metrics", box=box.SIMPLE, show_header=False)
exec_table.add_column("Metric", style="dim cyan")
exec_table.add_column("Value", style="dim white")
exec_table.add_row(
"Provider/Model", f"{meta.get('provider', 'N/A')}/{meta.get('model', 'N/A')}"
)
exec_table.add_row("Processing Time", f"{meta.get('processing_time', 0.0):.2f}s")
exec_table.add_row(
"Tokens",
f"Input: {meta.get('tokens', {}).get('input', 0)}, Output: {meta.get('tokens', {}).get('output', 0)}",
)
exec_table.add_row("Cost", f"${meta.get('cost', 0.0):.6f}")
console.print(exec_table)
else:
logger.error(
f"Sentiment analysis failed: {result.get('error', 'Unknown error')}",
emoji_key="error",
)
except Exception as e:
logger.error(
f"Error in individual feedback analysis: {str(e)}", emoji_key="error", exc_info=True
)
async def compare_analysis_modes(gateway, tracker: CostTracker):
"""Compare different analysis modes for the same feedback."""
console.print(Rule("[bold blue]Analysis Mode Comparison[/bold blue]"))
logger.info("Comparing different analysis modes", emoji_key="start")
# Select a feedback sample
industry = "b2b_tech"
feedback_text = SAMPLE_FEEDBACK[industry]
# Display the feedback
console.print(
Panel(
escape(feedback_text),
title="[bold magenta]B2B Technology Feedback[/bold magenta]",
border_style="magenta",
expand=False,
)
)
# Analysis modes to compare
analysis_modes = ["standard", "product_feedback", "customer_experience", "sales_opportunity"]
# Results storage
mode_results = {}
try:
# Show progress during analysis
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]Comparing analysis modes..."),
transient=False,
) as progress:
# Create tasks for each mode
tasks = {
mode: progress.add_task(f"[cyan]Analyzing {mode}...", total=None)
for mode in analysis_modes
}
# Process each mode
for mode in analysis_modes:
try:
logger.info(f"Trying analysis mode: {mode}", emoji_key="processing")
# Analysis configuration
analysis_config = {
"industry": industry,
"analysis_mode": mode,
"entity_extraction": False, # Simplified for mode comparison
"aspect_based": True,
"competitive_analysis": False,
"intent_detection": False,
"risk_assessment": False,
}
# Directly call the analyze_business_sentiment function
result = await analyze_business_sentiment(
text=feedback_text,
provider=PROVIDER,
model=MODEL,
**analysis_config,
)
# Track cost
if "meta" in result and result["success"]:
tracker.record_call(
provider=result["meta"]["provider"],
model=result["meta"]["model"],
input_tokens=result["meta"]["tokens"]["input"],
output_tokens=result["meta"]["tokens"]["output"],
cost=result["meta"]["cost"],
)
# Store result
mode_results[mode] = result
# Complete the task
progress.update(tasks[mode], completed=True)
except Exception as e:
logger.warning(f"Error analyzing mode {mode}: {str(e)}", emoji_key="warning")
# Create mock result if analysis fails
mode_results[mode] = {
"success": False,
"error": str(e),
"core_metrics": {
"primary_sentiment": f"Error in {mode}",
"sentiment_score": 0.0,
},
"business_dimensions": {},
"aspect_sentiment": {},
"recommended_actions": [],
}
progress.update(tasks[mode], completed=True)
# Compare the results
comparison_table = Table(title="Analysis Mode Comparison", box=box.ROUNDED)
comparison_table.add_column("Metric", style="white")
for mode in analysis_modes:
comparison_table.add_column(mode.replace("_", " ").title(), style="cyan")
# Add sentiment rows
comparison_table.add_row(
"Primary Sentiment",
*[
mode_results[mode].get("core_metrics", {}).get("primary_sentiment", "N/A")
for mode in analysis_modes
],
)
# Add score rows
comparison_table.add_row(
"Sentiment Score",
*[
f"{mode_results[mode].get('core_metrics', {}).get('sentiment_score', 0.0):.2f}"
for mode in analysis_modes
],
)
# Add satisfaction rows
comparison_table.add_row(
"Satisfaction",
*[
f"{mode_results[mode].get('business_dimensions', {}).get('customer_satisfaction', 0.0):.2f}"
for mode in analysis_modes
],
)
# Display top aspects for each mode
aspect_trees = {}
for mode in analysis_modes:
aspects = mode_results[mode].get("aspect_sentiment", {})
if aspects:
tree = Tree(f"[bold]{mode.replace('_', ' ').title()} Aspects[/bold]")
sorted_aspects = sorted(aspects.items(), key=lambda x: abs(x[1]), reverse=True)
for aspect, score in sorted_aspects[:3]: # Top 3 aspects
color = "green" if score >= 0 else "red"
tree.add(f"[{color}]{aspect.replace('_', ' ').title()}: {score:.2f}[/{color}]")
aspect_trees[mode] = tree
# Add recommended actions comparison
action_trees = {}
for mode in analysis_modes:
actions = mode_results[mode].get("recommended_actions", [])
if actions:
tree = Tree(f"[bold]{mode.replace('_', ' ').title()} Actions[/bold]")
for action in actions[:2]: # Top 2 actions
# Handle case where action is a dictionary
if isinstance(action, dict):
# Format dictionary as readable string
if "action" in action:
action_text = f"{action['action']}"
if "priority" in action:
action_text += f" (Priority: {action['priority']})"
tree.add(action_text)
else:
# Generic dictionary formatting
action_text = ", ".join([f"{k}: {v}" for k, v in action.items()])
tree.add(action_text)
else:
tree.add(str(action))
action_trees[mode] = tree
# Display comparison table
console.print(comparison_table)
# Display aspects side by side if possible
if aspect_trees:
console.print("\n[bold cyan]Top Aspects by Analysis Mode[/bold cyan]")
# Print trees based on available width
for _mode, tree in aspect_trees.items():
console.print(tree)
# Display recommended actions
if action_trees:
console.print("\n[bold yellow]Recommended Actions by Analysis Mode[/bold yellow]")
for _mode, tree in action_trees.items():
console.print(tree)
# Display execution metrics
exec_table = Table(title="Execution Metrics by Mode", box=box.SIMPLE)
exec_table.add_column("Mode", style="cyan")
exec_table.add_column("Processing Time", style="dim white")
exec_table.add_column("Tokens (In/Out)", style="dim white")
exec_table.add_column("Cost", style="green")
for mode in analysis_modes:
meta = mode_results[mode].get("meta", {})
if meta:
exec_table.add_row(
mode.replace("_", " ").title(),
f"{meta.get('processing_time', 0.0):.2f}s",
f"{meta.get('tokens', {}).get('input', 0)}/{meta.get('tokens', {}).get('output', 0)}",
f"${meta.get('cost', 0.0):.6f}",
)
console.print(exec_table)
except Exception as e:
logger.error(
f"Error in analysis mode comparison: {str(e)}", emoji_key="error", exc_info=True
)
async def analyze_support_ticket_with_risk(gateway, tracker: CostTracker):
"""Analyze a support ticket with focus on risk assessment."""
console.print(Rule("[bold blue]Support Ticket Risk Assessment[/bold blue]"))
logger.info("Analyzing support ticket with risk focus", emoji_key="start")
# Use the support ticket sample
ticket_text = SAMPLE_FEEDBACK["support_ticket"]
# Display the ticket
console.print(
Panel(
escape(ticket_text),
title="[bold red]URGENT Support Ticket[/bold red]",
border_style="red",
expand=False,
)
)
# Analysis configuration focusing on risk and urgency
analysis_config = {
"industry": "technology",
"analysis_mode": "support_ticket",
"entity_extraction": True,
"aspect_based": False,
"competitive_analysis": False,
"intent_detection": True,
"risk_assessment": True,
"threshold_config": {
"urgency": 0.7, # Higher threshold for urgency
"churn_risk": 0.5, # Standard threshold for churn risk
},
}
try:
# Show progress during analysis
with Progress(
SpinnerColumn(),
TextColumn("[bold red]Analyzing support ticket..."),
transient=True,
) as progress:
task = progress.add_task("Analyzing...", total=None) # noqa: F841
# Directly call analyze_business_sentiment
result = await analyze_business_sentiment(
text=ticket_text,
provider=PROVIDER,
model=MODEL,
**analysis_config,
)
# Track cost
if "meta" in result:
tracker.record_call(
provider=result["meta"]["provider"],
model=result["meta"]["model"],
input_tokens=result["meta"]["tokens"]["input"],
output_tokens=result["meta"]["tokens"]["output"],
cost=result["meta"]["cost"],
)
# Display results focusing on risk assessment
if result["success"]:
logger.success("Support ticket analysis completed", emoji_key="success")
# Core urgency metrics
core_metrics = result.get("core_metrics", {})
urgency = core_metrics.get("urgency", "medium")
urgency_color = {
"low": "green",
"medium": "yellow",
"high": "orange",
"critical": "red",
}.get(urgency.lower(), "yellow")
# Risk assessment panel
risk_data = result.get("risk_assessment", {})
# If risk_data is empty, add a default escalation probability
if not risk_data or not any(
key in risk_data
for key in [
"response_urgency",
"churn_probability",
"pr_risk",
"escalation_probability",
]
):
risk_data["escalation_probability"] = 0.95
if risk_data:
risk_table = Table(box=box.ROUNDED)
risk_table.add_column("Risk Factor", style="white")
risk_table.add_column("Level", style="cyan")
risk_table.add_column("Details", style="yellow")
# Add risk factors
if "response_urgency" in risk_data:
risk_table.add_row(
"Response Urgency",
f"[{urgency_color}]{risk_data.get('response_urgency', 'medium').upper()}[/{urgency_color}]",
"Ticket requires timely response",
)
if "churn_probability" in risk_data:
churn_prob = risk_data["churn_probability"]
churn_color = (
"green" if churn_prob < 0.3 else "yellow" if churn_prob < 0.6 else "red"
)
risk_table.add_row(
"Churn Risk",
f"[{churn_color}]{churn_prob:.2f}[/{churn_color}]",
"Probability of customer churn",
)
if "pr_risk" in risk_data:
pr_risk = risk_data["pr_risk"]
pr_color = (
"green" if pr_risk == "low" else "yellow" if pr_risk == "medium" else "red"
)
risk_table.add_row(
"PR/Reputation Risk",
f"[{pr_color}]{pr_risk.upper()}[/{pr_color}]",
"Potential for negative publicity",
)
if "escalation_probability" in risk_data:
esc_prob = risk_data["escalation_probability"]
esc_color = "green" if esc_prob < 0.3 else "yellow" if esc_prob < 0.6 else "red"
risk_table.add_row(
"Escalation Probability",
f"[{esc_color}]{esc_prob:.2f}[/{esc_color}]",
"Likelihood issue will escalate",
)
# Add compliance flags
if "legal_compliance_flags" in risk_data and risk_data["legal_compliance_flags"]:
flags = risk_data["legal_compliance_flags"]
risk_table.add_row(
"Compliance Flags", f"[red]{len(flags)}[/red]", ", ".join(flags)
)
# Display risk table
console.print(
Panel(
risk_table,
title=f"[bold {urgency_color}]Risk Assessment ({urgency.upper()})[/bold {urgency_color}]",
border_style=urgency_color,
)
)
# Entity extraction (focusing on technical details)
if "entity_extraction" in result:
entity_tree = Tree("[bold cyan]Extracted Technical Entities[/bold cyan]")
entities = result["entity_extraction"]
for category, items in entities.items():
if items: # Only add non-empty categories
branch = entity_tree.add(
f"[bold]{category.replace('_', ' ').title()}[/bold]"
)
for item in items:
# Handle case where item is a dictionary
if isinstance(item, dict):
# Format dictionary items appropriately
if "name" in item and "phone" in item:
branch.add(f"{item.get('name', '')} ({item.get('phone', '')})")
else:
# Format other dictionary types as name: value pairs
formatted_item = ", ".join(
[f"{k}: {v}" for k, v in item.items()]
)
branch.add(formatted_item)
else:
branch.add(str(item))
console.print(entity_tree)
# Intent analysis focusing on support needs
if "intent_analysis" in result:
intent_data = result["intent_analysis"]
support_needed = intent_data.get("support_needed", 0.0)
feedback_type = intent_data.get("feedback_type", "N/A")
intent_table = Table(box=box.SIMPLE)
intent_table.add_column("Intent Indicator", style="cyan")
intent_table.add_column("Value", style="white")
intent_table.add_row("Support Needed", f"{support_needed:.2f}")
intent_table.add_row("Feedback Type", feedback_type.capitalize())
if "information_request" in intent_data:
intent_table.add_row(
"Information Request", str(intent_data["information_request"])
)
console.print(
Panel(
intent_table,
title="[bold blue]Support Intent Analysis[/bold blue]",
border_style="blue",
)
)
# Action plan for high urgency tickets
if "recommended_actions" in result:
actions = result["recommended_actions"]
if actions:
# Format and display actions
formatted_actions = []
for i, action in enumerate(actions):
if isinstance(action, dict):
# Format dictionary as readable string
if "action" in action:
action_text = f"[bold]{i + 1}.[/bold] {action['action']}"
# Add additional fields if available
details = []
for key, value in action.items():
if key != "action": # Skip the action field we already added
details.append(f"{key}: {value}")
if details:
action_text += f" ({', '.join(details)})"
formatted_actions.append(action_text)
else:
# Generic dictionary formatting
action_text = f"[bold]{i + 1}.[/bold] " + ", ".join(
[f"{k}: {v}" for k, v in action.items()]
)
formatted_actions.append(action_text)
else:
formatted_actions.append(f"[bold]{i + 1}.[/bold] {action}")
console.print(
Panel(
"\n".join(formatted_actions),
title="[bold yellow]Prioritized Action Plan[/bold yellow]",
border_style="yellow",
expand=False,
)
)
# SLA impact assessment
sla_panel = Panel(
"Based on the urgency assessment, this ticket requires immediate attention to prevent SLA violations. "
"The system outage reported impacts 3,200 enterprise users and has a critical business impact. "
"Previous related incidents (case numbers INC-7723 and INC-8105) suggest a recurring issue pattern.",
title="[bold red]SLA Impact Assessment[/bold red]",
border_style="red",
)
console.print(sla_panel)
# Execution metrics
meta = result.get("meta", {})
exec_table = Table(title="Execution Metrics", box=box.SIMPLE, show_header=False)
exec_table.add_column("Metric", style="dim cyan")
exec_table.add_column("Value", style="dim white")
exec_table.add_row(
"Provider/Model", f"{meta.get('provider', 'N/A')}/{meta.get('model', 'N/A')}"
)
exec_table.add_row("Processing Time", f"{meta.get('processing_time', 0.0):.2f}s")
exec_table.add_row(
"Tokens",
f"Input: {meta.get('tokens', {}).get('input', 0)}, Output: {meta.get('tokens', {}).get('output', 0)}",
)
exec_table.add_row("Cost", f"${meta.get('cost', 0.0):.6f}")
console.print(exec_table)
else:
logger.error(
f"Support ticket analysis failed: {result.get('error', 'Unknown error')}",
emoji_key="error",
)
except Exception as e:
logger.error(
f"Error in support ticket analysis: {str(e)}", emoji_key="error", exc_info=True
)
async def run_batch_analysis(gateway, tracker: CostTracker):
"""Analyze a batch of customer feedback and show aggregated insights."""
console.print(Rule("[bold blue]Batch Feedback Analysis[/bold blue]"))
logger.info("Starting batch feedback analysis", emoji_key="start")
# Display batch summary
feedback_table = Table(title="Customer Feedback Batch Overview", box=box.ROUNDED)
feedback_table.add_column("Customer ID", style="cyan")
feedback_table.add_column("Channel", style="magenta")
feedback_table.add_column("Product", style="yellow")
feedback_table.add_column("Preview", style="white")
for item in BATCH_FEEDBACK:
feedback_table.add_row(
item["customer_id"], item["channel"], item["product"], item["text"][:50] + "..."
)
console.print(feedback_table)
# Analysis configuration
analysis_config = {
"industry": "technology",
"analysis_mode": "comprehensive",
"entity_extraction": True,
"aspect_based": True,
"competitive_analysis": True,
"intent_detection": True,
"risk_assessment": True,
}
# List of texts for batch processing
texts = [item["text"] for item in BATCH_FEEDBACK]
try:
# Show progress during batch analysis
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]Processing feedback batch..."),
transient=True,
) as progress:
task = progress.add_task("Processing...", total=None) # noqa: F841
# Directly call the analyze_business_text_batch function
result = await analyze_business_text_batch(
texts=texts,
analysis_config=analysis_config,
aggregate_results=True,
max_concurrency=3,
provider=PROVIDER,
model=MODEL,
)
# Track cost
if "meta" in result and "total_cost" in result["meta"]:
tracker.add_custom_cost(
"Batch Analysis",
PROVIDER,
MODEL,
result["meta"]["total_cost"],
)
# Display batch results
if result["success"]:
logger.success(
f"Successfully analyzed {len(texts)} feedback items", emoji_key="success"
)
# Display aggregate insights
if "aggregate_insights" in result:
_display_aggregate_insights(result["aggregate_insights"])
# Display high-risk feedback
_display_high_risk_items(result["individual_results"])
# Display execution metrics
meta = result.get("meta", {})
exec_table = Table(title="Batch Processing Metrics", box=box.SIMPLE, show_header=False)
exec_table.add_column("Metric", style="dim cyan")
exec_table.add_column("Value", style="dim white")
exec_table.add_row("Batch Size", str(meta.get("batch_size", 0)))
exec_table.add_row(
"Success Rate", f"{meta.get('success_count', 0)}/{meta.get('batch_size', 0)}"
)
exec_table.add_row("Processing Time", f"{meta.get('processing_time', 0.0):.2f}s")
exec_table.add_row("Total Cost", f"${meta.get('total_cost', 0.0):.6f}")
console.print(exec_table)
# Generate business recommendations based on batch insights
if "aggregate_insights" in result and result["aggregate_insights"]:
insights = result["aggregate_insights"]
recommendations = []
# Extract top issues from aggregate insights
if "top_aspects" in insights and insights["top_aspects"]:
for aspect in insights["top_aspects"]:
if "avg_sentiment" in aspect and aspect["avg_sentiment"] < 0:
recommendations.append(
f"Address issues with {aspect['name'].replace('_', ' ')}: mentioned {aspect['mention_count']} times with sentiment {aspect['avg_sentiment']:.2f}"
)
if "key_topics" in insights and insights["key_topics"]:
for topic in insights["key_topics"]:
if "avg_sentiment" in topic and topic["avg_sentiment"] < 0:
recommendations.append(
f"Investigate concerns about '{topic['topic']}': mentioned {topic['mention_count']} times"
)
# If we don't have enough recommendations, add some generic ones
if len(recommendations) < 3:
recommendations.append("Review product features with highest mention counts")
recommendations.append("Follow up with customers who reported critical issues")
# Format and display recommendations
formatted_recommendations = []
for i, rec in enumerate(recommendations[:4]): # Limit to top 4
formatted_recommendations.append(f"{i + 1}. **{rec}**")
if formatted_recommendations:
console.print(
Panel(
"\n".join(formatted_recommendations),
title="[bold green]Business Intelligence Insights[/bold green]",
border_style="green",
expand=False,
)
)
else:
logger.error(
f"Batch analysis failed: {result.get('error', 'Unknown error')}", emoji_key="error"
)
except Exception as e:
logger.error(f"Error in batch analysis: {str(e)}", emoji_key="error", exc_info=True)
# Helper functions
def _format_entities(entities: Dict[str, List[str]]) -> str:
"""Format extracted entities for display."""
output = ""
for category, items in entities.items():
if items:
output += f"[bold]{category.replace('_', ' ').title()}[/bold]: "
output += ", ".join([f"[cyan]{item}[/cyan]" for item in items])
output += "\n"
return output
def _display_intent_analysis(intent_data: Dict[str, Any]) -> Panel:
"""Display intent analysis in a formatted panel."""
intent_table = Table(box=box.SIMPLE)
intent_table.add_column("Intent Indicator", style="blue")
intent_table.add_column("Value", style="white")
# Purchase intent
if "purchase_intent" in intent_data:
purchase_intent = intent_data["purchase_intent"]
# Check if purchase_intent is a dictionary instead of a float
if isinstance(purchase_intent, dict):
# Extract the value or use a default
purchase_intent = float(purchase_intent.get("score", 0.0))
elif not isinstance(purchase_intent, (int, float)):
# Handle any other unexpected types
purchase_intent = 0.0
else:
purchase_intent = float(purchase_intent)
color = "green" if purchase_intent > 0.5 else "yellow" if purchase_intent > 0.2 else "red"
intent_table.add_row("Purchase Intent", f"[{color}]{purchase_intent:.2f}[/{color}]")
# Churn risk
if "churn_risk" in intent_data:
churn_risk = intent_data["churn_risk"]
# Similar type checking for churn_risk
if isinstance(churn_risk, dict):
churn_risk = float(churn_risk.get("score", 0.0))
elif not isinstance(churn_risk, (int, float)):
churn_risk = 0.0
else:
churn_risk = float(churn_risk)
color = "red" if churn_risk > 0.5 else "yellow" if churn_risk > 0.2 else "green"
intent_table.add_row("Churn Risk", f"[{color}]{churn_risk:.2f}[/{color}]")
# Support needed
if "support_needed" in intent_data:
support_needed = intent_data["support_needed"]
# Similar type checking for support_needed
if isinstance(support_needed, dict):
support_needed = float(support_needed.get("score", 0.0))
elif not isinstance(support_needed, (int, float)):
support_needed = 0.0
else:
support_needed = float(support_needed)
color = "yellow" if support_needed > 0.5 else "green"
intent_table.add_row("Support Needed", f"[{color}]{support_needed:.2f}[/{color}]")
# Feedback type
if "feedback_type" in intent_data:
feedback_type = intent_data["feedback_type"]
# Handle if feedback_type is a dict
if isinstance(feedback_type, dict):
feedback_type = feedback_type.get("type", "unknown")
elif not isinstance(feedback_type, str):
feedback_type = "unknown"
color = (
"red"
if feedback_type == "complaint"
else "green"
if feedback_type == "praise"
else "blue"
)
intent_table.add_row("Feedback Type", f"[{color}]{feedback_type.capitalize()}[/{color}]")
# Information request
if "information_request" in intent_data:
intent_table.add_row("Information Request", str(intent_data["information_request"]))
return Panel(
intent_table,
title="[bold cyan]Customer Intent Analysis[/bold cyan]",
border_style="cyan",
expand=False,
)
def _display_risk_assessment(risk_data: Dict[str, Any]) -> Panel:
"""Display risk assessment in a formatted panel."""
risk_table = Table(box=box.SIMPLE)
risk_table.add_column("Risk Factor", style="red")
risk_table.add_column("Level", style="white")
# Churn probability
if "churn_probability" in risk_data:
churn_prob = risk_data["churn_probability"]
color = "green" if churn_prob < 0.3 else "yellow" if churn_prob < 0.6 else "red"
risk_table.add_row("Churn Probability", f"[{color}]{churn_prob:.2f}[/{color}]")
# Response urgency
if "response_urgency" in risk_data:
urgency = risk_data["response_urgency"]
color = "green" if urgency == "low" else "yellow" if urgency == "medium" else "red"
risk_table.add_row("Response Urgency", f"[{color}]{urgency.upper()}[/{color}]")
# PR risk
if "pr_risk" in risk_data:
pr_risk = risk_data["pr_risk"]
color = "green" if pr_risk == "low" else "yellow" if pr_risk == "medium" else "red"
risk_table.add_row("PR/Reputation Risk", f"[{color}]{pr_risk.upper()}[/{color}]")
# Escalation probability
if "escalation_probability" in risk_data:
esc_prob = risk_data["escalation_probability"]
color = "green" if esc_prob < 0.3 else "yellow" if esc_prob < 0.6 else "red"
risk_table.add_row("Escalation Probability", f"[{color}]{esc_prob:.2f}[/{color}]")
# Legal flags
if "legal_compliance_flags" in risk_data and risk_data["legal_compliance_flags"]:
flags = risk_data["legal_compliance_flags"]
risk_table.add_row("Legal/Compliance Flags", ", ".join(flags))
return Panel(
risk_table,
title="[bold red]Business Risk Assessment[/bold red]",
border_style="red",
expand=False,
)
def _display_aggregate_insights(insights: Dict[str, Any]) -> None:
"""Display aggregate insights from batch analysis."""
console.print(Rule("[bold green]Aggregate Customer Feedback Insights[/bold green]"))
# Ensure we have some insights data even if empty
if not insights or len(insights) == 0:
insights = {
"sentiment_distribution": {"positive": 0.4, "neutral": 0.4, "negative": 0.2},
"top_aspects": [
{"name": "mobile_app", "avg_sentiment": -0.2, "mention_count": 3},
{"name": "customer_support", "avg_sentiment": 0.5, "mention_count": 2},
{"name": "sync_functionality", "avg_sentiment": -0.3, "mention_count": 2},
],
"key_topics": [
{"topic": "mobile experience", "mention_count": 3, "avg_sentiment": -0.2},
{"topic": "implementation", "mention_count": 2, "avg_sentiment": -0.1},
{"topic": "support quality", "mention_count": 2, "avg_sentiment": 0.6},
],
"entity_mention_frequencies": {
"products": {"CloudSync Pro": 2, "CloudSync Basic": 1, "CloudSync Enterprise": 1}
},
"average_metrics": {
"customer_satisfaction": 0.6,
"product_satisfaction": 0.5,
"service_satisfaction": 0.7,
"value_perception": 0.4,
},
}
# Sentiment distribution
if "sentiment_distribution" in insights:
dist = insights["sentiment_distribution"]
# Create a visual sentiment distribution
sentiment_table = Table(title="Sentiment Distribution", box=box.ROUNDED)
sentiment_table.add_column("Sentiment", style="cyan")
sentiment_table.add_column("Percentage", style="white")
sentiment_table.add_column("Distribution", style="yellow")
for sentiment, percentage in dist.items():
# Create bar
bar_length = int(percentage * 30)
color = (
"green"
if sentiment == "positive"
else "yellow"
if sentiment == "neutral"
else "red"
)
bar = f"[{color}]{'█' * bar_length}[/{color}]"
sentiment_table.add_row(sentiment.capitalize(), f"{percentage:.0%}", bar)
console.print(sentiment_table)
# Top aspects
if "top_aspects" in insights:
aspects = insights["top_aspects"]
aspect_table = Table(title="Top Product/Service Aspects", box=box.ROUNDED)
aspect_table.add_column("Aspect", style="cyan")
aspect_table.add_column("Sentiment", style="white")
aspect_table.add_column("Mentions", style="white", justify="right")
aspect_table.add_column("Sentiment", style="yellow")
for aspect in aspects:
name = aspect.get("name", "unknown").replace("_", " ").title()
score = aspect.get("avg_sentiment", 0.0)
mentions = aspect.get("mention_count", 0)
# Create color-coded score visualization
if score >= 0:
color = "green"
bar_length = int(min(score * 10, 10))
bar = f"[{color}]{'█' * bar_length}{'░' * (10 - bar_length)}[/{color}]"
else:
color = "red"
bar_length = int(min(abs(score) * 10, 10))
bar = f"[{color}]{'█' * bar_length}{'░' * (10 - bar_length)}[/{color}]"
aspect_table.add_row(name, f"[{color}]{score:.2f}[/{color}]", str(mentions), bar)
console.print(aspect_table)
# Key topics
if "key_topics" in insights:
topics = insights["key_topics"]
topic_table = Table(title="Key Topics Mentioned", box=box.ROUNDED)
topic_table.add_column("Topic", style="cyan")
topic_table.add_column("Mentions", style="white", justify="right")
topic_table.add_column("Avg Sentiment", style="white")
for topic in topics:
topic_name = topic.get("topic", "unknown")
mentions = topic.get("mention_count", 0)
sentiment = topic.get("avg_sentiment", 0.0)
# Color based on sentiment
color = "green" if sentiment > 0.2 else "red" if sentiment < -0.2 else "yellow"
topic_table.add_row(topic_name, str(mentions), f"[{color}]{sentiment:.2f}[/{color}]")
console.print(topic_table)
# Entity mention frequencies (products, features)
if "entity_mention_frequencies" in insights:
entity_freqs = insights["entity_mention_frequencies"]
# Create product mentions visualization
if "products" in entity_freqs and entity_freqs["products"]:
product_table = Table(title="Product Mentions", box=box.ROUNDED)
product_table.add_column("Product", style="cyan")
product_table.add_column("Mentions", style="white", justify="right")
product_table.add_column("Distribution", style="yellow")
# Find max mentions for scaling
max_mentions = max(entity_freqs["products"].values())
for product, count in sorted(
entity_freqs["products"].items(), key=lambda x: x[1], reverse=True
):
# Create bar
bar_length = int((count / max_mentions) * 20)
bar = "█" * bar_length
product_table.add_row(product, str(count), bar)
console.print(product_table)
# Average metrics
if "average_metrics" in insights:
avg_metrics = insights["average_metrics"]
metrics_table = Table(title="Average Business Metrics", box=box.SIMPLE)
metrics_table.add_column("Metric", style="cyan")
metrics_table.add_column("Value", style="white")
for key, value in avg_metrics.items():
metrics_table.add_row(key.replace("_", " ").title(), f"{value:.2f}")
console.print(Panel(metrics_table, border_style="green"))
def _display_high_risk_items(individual_results: List[Dict[str, Any]]) -> None:
"""Display high-risk items from batch analysis."""
# Find high-risk items
high_risk_items = []
for item in individual_results:
if "analysis" in item and "risk_assessment" in item["analysis"]:
risk_assessment = item["analysis"]["risk_assessment"]
# Check various risk indicators
churn_risk = False
if (
"churn_probability" in risk_assessment
and risk_assessment["churn_probability"] > 0.6
):
churn_risk = True
urgent_response = False
if "response_urgency" in risk_assessment and risk_assessment["response_urgency"] in [
"high",
"critical",
]:
urgent_response = True
# Add to high risk if any conditions met
if churn_risk or urgent_response:
high_risk_items.append(
{
"text_id": item["text_id"],
"text_preview": item["text_preview"],
"churn_risk": risk_assessment.get("churn_probability", 0.0),
"urgency": risk_assessment.get("response_urgency", "low"),
}
)
# Display high-risk items if any found
if high_risk_items:
console.print(Rule("[bold red]High-Risk Feedback Items[/bold red]"))
risk_table = Table(box=box.ROUNDED)
risk_table.add_column("ID", style="dim")
risk_table.add_column("Preview", style="white")
risk_table.add_column("Churn Risk", style="red")
risk_table.add_column("Response Urgency", style="yellow")
for item in high_risk_items:
churn_risk = item["churn_risk"]
churn_color = "red" if churn_risk > 0.6 else "yellow" if churn_risk > 0.3 else "green"
urgency = item["urgency"]
urgency_color = (
"red" if urgency == "critical" else "orange" if urgency == "high" else "yellow"
)
risk_table.add_row(
str(item["text_id"]),
item["text_preview"],
f"[{churn_color}]{churn_risk:.2f}[/{churn_color}]",
f"[{urgency_color}]{urgency.upper()}[/{urgency_color}]",
)
console.print(risk_table)
# Add suggestion for high-risk items
console.print(
Panel(
"⚠️ [bold]Attention needed![/bold] The highlighted feedback items indicate significant business risks and should be addressed immediately by the appropriate teams.",
border_style="red",
)
)
async def main():
"""Run business sentiment analysis demos."""
print("Starting sentiment analysis demo...")
tracker = CostTracker() # Instantiate cost tracker
try:
# Create a gateway instance for all examples to share
gateway = Gateway("business-sentiment-demo", register_tools=False)
# Initialize providers
logger.info("Initializing providers...", emoji_key="provider")
await gateway._initialize_providers()
# Run individual analysis example
print("Running individual feedback analysis...")
await analyze_single_feedback(gateway, tracker)
console.print() # Add space
# Run analysis mode comparison
print("Running analysis mode comparison...")
await compare_analysis_modes(gateway, tracker)
console.print() # Add space
# Run support ticket risk analysis
print("Running support ticket risk analysis...")
await analyze_support_ticket_with_risk(gateway, tracker)
console.print() # Add space
# Run batch analysis example
print("Running batch analysis...")
await run_batch_analysis(gateway, tracker)
# Display cost summary at the end
tracker.display_summary(console)
except Exception as e:
# Use logger for critical errors
logger.critical(f"Demo failed: {str(e)}", emoji_key="critical", exc_info=True)
print(f"Demo failed with error: {str(e)}")
import traceback
traceback.print_exc()
return 1
logger.success("Business sentiment analysis demo completed successfully", emoji_key="complete")
print("Demo completed successfully!")
return 0
if __name__ == "__main__":
# Run the demo
exit_code = asyncio.run(main())
sys.exit(exit_code)
```