This is page 5 of 45. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/test_stdio_client.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Stdio Test Client for Ultimate MCP Server
4 | Tests server functionality over stdio transport
5 | """
6 |
7 | import asyncio
8 | import json
9 | import os
10 | import subprocess
11 | from pathlib import Path
12 |
13 | from fastmcp import Client
14 |
15 |
16 | async def test_stdio_server():
17 | """Test Ultimate MCP Server over stdio transport."""
18 | print("📡 Ultimate MCP Server Stdio Test Client")
19 | print("=" * 50)
20 | print("🔗 Starting Ultimate MCP Server in stdio mode...")
21 |
22 | # Find the umcp command
23 | umcp_cmd = None
24 | if os.path.exists("uv.lock"):
25 | # Try uv run first
26 | umcp_cmd = ["uv", "run", "umcp", "run"]
27 | else:
28 | # Try direct umcp command
29 | umcp_cmd = ["umcp", "run"]
30 |
31 | print(f"📡 Command: {' '.join(umcp_cmd)}")
32 |
33 | try:
34 | # Start the server process in stdio mode
35 | # Note: stdio is the default mode, so no -t flag needed
36 | process = subprocess.Popen(
37 | umcp_cmd,
38 | stdin=subprocess.PIPE,
39 | stdout=subprocess.PIPE,
40 | stderr=subprocess.PIPE,
41 | text=True,
42 | bufsize=0, # Unbuffered
43 | cwd=Path.cwd(),
44 | env=os.environ.copy()
45 | )
46 |
47 | print("✅ Server process started, connecting via stdio...")
48 |
49 | # Create FastMCP client for stdio transport
50 | # Use the process stdin/stdout for communication
51 | async with Client.stdio(
52 | process.stdin,
53 | process.stdout
54 | ) as client:
55 | print("✅ Successfully connected to stdio server")
56 |
57 | # Test 1: List available tools
58 | print("\n📋 Testing tool discovery via stdio...")
59 | tools = await client.list_tools()
60 | print(f"Found {len(tools)} tools via stdio transport:")
61 | for i, tool in enumerate(tools[:10]): # Show first 10
62 | print(f" {i+1:2d}. {tool.name}")
63 | if len(tools) > 10:
64 | print(f" ... and {len(tools) - 10} more tools")
65 |
66 | # Test 2: List available resources
67 | print("\n📚 Testing resource discovery via stdio...")
68 | resources = await client.list_resources()
69 | print(f"Found {len(resources)} resources:")
70 | for resource in resources:
71 | print(f" - {resource.uri}")
72 |
73 | # Test 3: Echo tool test
74 | print("\n🔊 Testing echo tool via stdio...")
75 | echo_result = await client.call_tool("echo", {"message": "Hello from stdio client!"})
76 | if echo_result:
77 | echo_data = json.loads(echo_result[0].text)
78 | print(f"✅ Echo response: {json.dumps(echo_data, indent=2)}")
79 |
80 | # Test 4: Provider status test
81 | print("\n🔌 Testing provider status via stdio...")
82 | try:
83 | provider_result = await client.call_tool("get_provider_status", {})
84 | if provider_result:
85 | provider_data = json.loads(provider_result[0].text)
86 | providers = provider_data.get("providers", {})
87 | print(f"✅ Found {len(providers)} providers via stdio:")
88 | for name, status in providers.items():
89 | available = "✅" if status.get("available") else "❌"
90 | model_count = len(status.get("models", []))
91 | print(f" {available} {name}: {model_count} models")
92 | except Exception as e:
93 | print(f"❌ Provider status failed: {e}")
94 |
95 | # Test 5: Resource reading test
96 | print("\n📖 Testing resource reading via stdio...")
97 | if resources:
98 | try:
99 | resource_uri = resources[0].uri
100 | resource_content = await client.read_resource(resource_uri)
101 | if resource_content:
102 | content = resource_content[0].text
103 | preview = content[:200] + "..." if len(content) > 200 else content
104 | print(f"✅ Resource {resource_uri} content preview:")
105 | print(f" {preview}")
106 | except Exception as e:
107 | print(f"❌ Resource reading failed: {e}")
108 |
109 | # Test 6: Simple completion test (if providers available)
110 | print("\n🤖 Testing completion via stdio...")
111 | try:
112 | completion_result = await client.call_tool(
113 | "generate_completion",
114 | {
115 | "prompt": "Say hello in exactly 3 words",
116 | "provider": "ollama",
117 | "model": "mix_77/gemma3-qat-tools:27b",
118 | "max_tokens": 10,
119 | },
120 | )
121 | if completion_result:
122 | result_data = json.loads(completion_result[0].text)
123 | print("✅ Completion via stdio:")
124 | print(f" Text: '{result_data.get('text', 'No text')}'")
125 | print(f" Model: {result_data.get('model', 'Unknown')}")
126 | print(f" Success: {result_data.get('success', False)}")
127 | print(f" Processing time: {result_data.get('processing_time', 0):.2f}s")
128 | except Exception as e:
129 | print(f"⚠️ Completion test failed (expected if no providers): {e}")
130 |
131 | # Test 7: Filesystem tool test
132 | print("\n📁 Testing filesystem tools via stdio...")
133 | try:
134 | dirs_result = await client.call_tool("list_allowed_directories", {})
135 | if dirs_result:
136 | dirs_data = json.loads(dirs_result[0].text)
137 | print(f"✅ Allowed directories via stdio: {dirs_data.get('count', 0)} directories")
138 | except Exception as e:
139 | print(f"❌ Filesystem test failed: {e}")
140 |
141 | # Test 8: Text processing tool test
142 | print("\n📝 Testing text processing via stdio...")
143 | try:
144 | ripgrep_result = await client.call_tool(
145 | "run_ripgrep",
146 | {
147 | "args_str": "'import' . -t py --max-count 3",
148 | "input_dir": "."
149 | }
150 | )
151 | if ripgrep_result:
152 | ripgrep_data = json.loads(ripgrep_result[0].text)
153 | if ripgrep_data.get("success"):
154 | lines = ripgrep_data.get("output", "").split('\n')
155 | line_count = len([l for l in lines if l.strip()]) # noqa: E741
156 | print(f"✅ Ripgrep via stdio found {line_count} matching lines")
157 | else:
158 | print("⚠️ Ripgrep completed but found no matches")
159 | except Exception as e:
160 | print(f"❌ Text processing test failed: {e}")
161 |
162 | print("\n🎉 Stdio transport functionality test completed!")
163 |
164 | # Clean up process
165 | print("\n🔄 Shutting down server process...")
166 | process.terminate()
167 | try:
168 | process.wait(timeout=5)
169 | print("✅ Server process terminated cleanly")
170 | except subprocess.TimeoutExpired:
171 | print("⚠️ Server process didn't terminate, forcing kill...")
172 | process.kill()
173 | process.wait()
174 |
175 | return True
176 |
177 | except FileNotFoundError:
178 | print("❌ Could not find umcp command")
179 | print("\nTroubleshooting:")
180 | print("1. Make sure you're in the Ultimate MCP Server directory")
181 | print("2. Make sure umcp is installed and in PATH")
182 | print("3. Try running 'uv run umcp run' manually to test")
183 | return False
184 | except Exception as e:
185 | print(f"❌ Stdio connection failed: {e}")
186 | print("\nTroubleshooting:")
187 | print("1. Make sure the server can start in stdio mode")
188 | print("2. Check for any startup errors in stderr")
189 | print("3. Verify all dependencies are installed")
190 |
191 | # Try to get stderr from process if available
192 | if 'process' in locals():
193 | try:
194 | stderr_output = process.stderr.read() if process.stderr else ""
195 | if stderr_output:
196 | print(f"\nServer stderr:\n{stderr_output}")
197 | process.terminate()
198 | process.wait(timeout=5)
199 | except Exception:
200 | pass
201 |
202 | return False
203 |
204 |
205 | async def test_stdio_interactive():
206 | """Interactive stdio testing mode."""
207 | print("\n🎮 Entering stdio interactive mode...")
208 | print("⚠️ Note: Interactive mode with stdio requires careful process management")
209 | print("Type 'list' to see available tools, 'quit' to exit")
210 |
211 | # Find the umcp command
212 | umcp_cmd = None
213 | if os.path.exists("uv.lock"):
214 | umcp_cmd = ["uv", "run", "umcp", "run"]
215 | else:
216 | umcp_cmd = ["umcp", "run"]
217 |
218 | try:
219 | # Start the server process
220 | process = subprocess.Popen(
221 | umcp_cmd,
222 | stdin=subprocess.PIPE,
223 | stdout=subprocess.PIPE,
224 | stderr=subprocess.PIPE,
225 | text=True,
226 | bufsize=0,
227 | cwd=Path.cwd(),
228 | env=os.environ.copy()
229 | )
230 |
231 | async with Client.stdio(process.stdin, process.stdout) as client:
232 | tools = await client.list_tools()
233 | resources = await client.list_resources()
234 |
235 | while True:
236 | try:
237 | command = input("\nStdio> ").strip()
238 |
239 | if command.lower() in ['quit', 'exit', 'q']:
240 | print("👋 Goodbye!")
241 | break
242 | elif command.lower() == 'list':
243 | print("Available tools:")
244 | for i, tool in enumerate(tools[:20]):
245 | print(f" {i+1:2d}. {tool.name}")
246 | if len(tools) > 20:
247 | print(f" ... and {len(tools) - 20} more")
248 | elif command.lower() == 'resources':
249 | print("Available resources:")
250 | for resource in resources:
251 | print(f" - {resource.uri}")
252 | elif command.startswith("tool "):
253 | # Call tool: tool <tool_name> <json_params>
254 | parts = command[5:].split(' ', 1)
255 | tool_name = parts[0]
256 | params = json.loads(parts[1]) if len(parts) > 1 else {}
257 |
258 | try:
259 | result = await client.call_tool(tool_name, params)
260 | if result:
261 | print(f"✅ Tool result: {result[0].text}")
262 | else:
263 | print("❌ No result returned")
264 | except Exception as e:
265 | print(f"❌ Tool call failed: {e}")
266 | elif command.startswith("read "):
267 | # Read resource: read <resource_uri>
268 | resource_uri = command[5:].strip()
269 | try:
270 | result = await client.read_resource(resource_uri)
271 | if result:
272 | content = result[0].text
273 | preview = content[:500] + "..." if len(content) > 500 else content
274 | print(f"✅ Resource content: {preview}")
275 | else:
276 | print("❌ No content returned")
277 | except Exception as e:
278 | print(f"❌ Resource read failed: {e}")
279 | else:
280 | print("Commands:")
281 | print(" list - List available tools")
282 | print(" resources - List available resources")
283 | print(" tool <name> <params> - Call a tool with JSON params")
284 | print(" read <uri> - Read a resource")
285 | print(" quit - Exit interactive mode")
286 |
287 | except KeyboardInterrupt:
288 | print("\n👋 Goodbye!")
289 | break
290 | except Exception as e:
291 | print(f"❌ Command error: {e}")
292 |
293 | # Clean up
294 | process.terminate()
295 | try:
296 | process.wait(timeout=5)
297 | except subprocess.TimeoutExpired:
298 | process.kill()
299 | process.wait()
300 |
301 | except Exception as e:
302 | print(f"❌ Stdio interactive mode failed: {e}")
303 |
304 |
305 | def check_prerequisites():
306 | """Check if prerequisites are available."""
307 | print("🔍 Checking prerequisites...")
308 |
309 | # Check if we're in the right directory
310 | if not Path("pyproject.toml").exists():
311 | print("❌ Not in Ultimate MCP Server directory (no pyproject.toml found)")
312 | return False
313 |
314 | # Check if umcp is available
315 | try:
316 | if Path("uv.lock").exists():
317 | result = subprocess.run(["uv", "run", "umcp", "--version"],
318 | capture_output=True, text=True, timeout=10)
319 | else:
320 | result = subprocess.run(["umcp", "--version"],
321 | capture_output=True, text=True, timeout=10)
322 |
323 | if result.returncode == 0:
324 | print("✅ umcp command is available")
325 | return True
326 | else:
327 | print(f"❌ umcp command failed: {result.stderr}")
328 | return False
329 | except FileNotFoundError:
330 | print("❌ umcp command not found")
331 | print("Try: pip install -e . or uv sync")
332 | return False
333 | except subprocess.TimeoutExpired:
334 | print("❌ umcp command timed out")
335 | return False
336 | except Exception as e:
337 | print(f"❌ Error checking umcp: {e}")
338 | return False
339 |
340 |
341 | async def main():
342 | """Main test function."""
343 | # Check prerequisites first
344 | if not check_prerequisites():
345 | print("\n❌ Prerequisites not met. Please fix the issues above.")
346 | return
347 |
348 | print("✅ Prerequisites check passed\n")
349 |
350 | # Run basic functionality test
351 | success = await test_stdio_server()
352 |
353 | if success:
354 | # Ask if user wants interactive mode
355 | try:
356 | response = input("\nWould you like to enter stdio interactive mode? (y/n): ").strip().lower()
357 | if response in ['y', 'yes']:
358 | await test_stdio_interactive()
359 | except KeyboardInterrupt:
360 | print("\n👋 Goodbye!")
361 | else:
362 | print("\n❌ Basic stdio test failed. Skipping interactive mode.")
363 |
364 |
365 | if __name__ == "__main__":
366 | asyncio.run(main())
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/logging/formatter.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Log formatters for Gateway logging system.
3 |
4 | This module provides formatters that convert log records into Rich renderables
5 | with consistent styling and visual elements.
6 | """
7 | import logging
8 | import time
9 | import traceback
10 | from datetime import datetime
11 | from typing import Any, Dict, Optional, Tuple
12 |
13 | from rich.columns import Columns
14 | from rich.console import Console, ConsoleRenderable, Group
15 | from rich.logging import RichHandler
16 | from rich.panel import Panel
17 | from rich.style import Style
18 | from rich.table import Table
19 | from rich.text import Text
20 | from rich.traceback import Traceback
21 |
22 | from .console import get_rich_console # Import the console factory
23 |
24 | # Use relative imports for utils within the same package
25 | from .emojis import LEVEL_EMOJIS, get_emoji
26 | from .themes import get_component_style, get_level_style
27 |
28 |
29 | class GatewayLogRecord:
30 | """Enhanced log record simulation using standard LogRecord attributes.
31 |
32 | This class is mostly for documentation and conceptual clarity.
33 | The actual data comes from the standard logging.LogRecord,
34 | populated via the 'extra' dictionary in the Logger._log method.
35 | """
36 |
37 | def __init__(self, record: logging.LogRecord):
38 | """Initialize from a standard logging.LogRecord."""
39 | self.record = record
40 |
41 | @property
42 | def level(self) -> str:
43 | """Get the original Gateway log level name (e.g., 'success')."""
44 | return getattr(self.record, 'gateway_level', self.record.levelname.lower())
45 |
46 | @property
47 | def message(self) -> str:
48 | """Get the log message."""
49 | return self.record.getMessage()
50 |
51 | @property
52 | def component(self) -> Optional[str]:
53 | """Get the Gateway component."""
54 | comp = getattr(self.record, 'component', None)
55 | return comp.lower() if comp else None
56 |
57 | @property
58 | def operation(self) -> Optional[str]:
59 | """Get the Gateway operation."""
60 | op = getattr(self.record, 'operation', None)
61 | return op.lower() if op else None
62 |
63 | @property
64 | def custom_emoji(self) -> Optional[str]:
65 | """Get the custom emoji override."""
66 | return getattr(self.record, 'custom_emoji', None)
67 |
68 | @property
69 | def context(self) -> Optional[Dict[str, Any]]:
70 | """Get the additional context data."""
71 | return getattr(self.record, 'log_context', None)
72 |
73 | @property
74 | def timestamp(self) -> float:
75 | """Get the log record creation time."""
76 | return self.record.created
77 |
78 | @property
79 | def exception_info(self) -> Optional[Tuple]:
80 | """Get the exception info tuple."""
81 | return self.record.exc_info
82 |
83 | @property
84 | def emoji(self) -> str:
85 | """Get the appropriate emoji for this log record."""
86 | if self.custom_emoji:
87 | return self.custom_emoji
88 |
89 | # Use operation emoji if available
90 | if self.operation:
91 | operation_emoji = get_emoji("operation", self.operation)
92 | if operation_emoji != "❓": # If not unknown
93 | return operation_emoji
94 |
95 | # Fall back to level emoji (use gateway_level if available)
96 | return LEVEL_EMOJIS.get(self.level, "❓")
97 |
98 | @property
99 | def style(self) -> Style:
100 | """Get the appropriate style for this log record."""
101 | return get_level_style(self.level)
102 |
103 | @property
104 | def component_style(self) -> Style:
105 | """Get the style for this record's component."""
106 | if not self.component:
107 | return self.style
108 | return get_component_style(self.component)
109 |
110 | @property
111 | def format_time(self) -> str:
112 | """Format the timestamp for display."""
113 | dt = datetime.fromtimestamp(self.timestamp)
114 | return dt.strftime("%H:%M:%S.%f")[:-3] # Trim microseconds to milliseconds
115 |
116 | def has_exception(self) -> bool:
117 | """Check if this record contains exception information."""
118 | return self.record.exc_info is not None
119 |
120 | class GatewayLogFormatter(logging.Formatter):
121 | """Base formatter for Gateway logs that converts to Rich renderables.
122 | Adapts standard Formatter for Rich output.
123 | """
124 |
125 | def __init__(
126 | self,
127 | fmt: Optional[str] = None,
128 | datefmt: Optional[str] = None,
129 | style: str = '%',
130 | show_time: bool = True,
131 | show_level: bool = True,
132 | show_component: bool = True,
133 | show_path: bool = False,
134 | **kwargs
135 | ):
136 | """Initialize the formatter.
137 |
138 | Args:
139 | fmt: Format string (standard logging format)
140 | datefmt: Date format string
141 | style: Formatting style ('%', '{', '$')
142 | show_time: Whether to show timestamp in Rich output
143 | show_level: Whether to show log level in Rich output
144 | show_component: Whether to show component in Rich output
145 | show_path: Whether to show path/lineno in Rich output
146 | **kwargs: Additional args for base Formatter
147 | """
148 | super().__init__(fmt=fmt, datefmt=datefmt, style=style, **kwargs)
149 | self.show_time = show_time
150 | self.show_level = show_level
151 | self.show_component = show_component
152 | self.show_path = show_path
153 |
154 | def format(self, record: logging.LogRecord) -> str:
155 | """Format the record into a string (for non-Rich handlers)."""
156 | # Use default formatting for file/non-rich output
157 | # Add custom fields to the record temporarily if needed
158 | record.gateway_component = getattr(record, 'component', '')
159 | record.gateway_operation = getattr(record, 'operation', '')
160 | # Use the standard Formatter implementation
161 | return super().format(record)
162 |
163 | def format_rich(self, record: logging.LogRecord) -> ConsoleRenderable:
164 | """Format a standard logging.LogRecord into a Rich renderable.
165 |
166 | Args:
167 | record: The log record to format
168 |
169 | Returns:
170 | A Rich renderable object
171 | """
172 | # Subclasses should implement this
173 | raise NotImplementedError("Subclasses must implement format_rich")
174 |
175 | class SimpleLogFormatter(GatewayLogFormatter):
176 | """Simple single-line log formatter for Rich console output."""
177 |
178 | def format_rich(self, record: logging.LogRecord) -> Text:
179 | """Format a record as a single line of rich text.
180 |
181 | Args:
182 | record: The log record to format
183 |
184 | Returns:
185 | Formatted Text object
186 | """
187 | gateway_record = GatewayLogRecord(record) # Wrap for easier access
188 | result = Text()
189 |
190 | # Add timestamp if requested
191 | if self.show_time:
192 | result.append(f"[{gateway_record.format_time}] ", style="timestamp")
193 |
194 | # Add emoji
195 | result.append(f"{gateway_record.emoji} ", style=gateway_record.style)
196 |
197 | # Add level if requested
198 | if self.show_level:
199 | level_text = f"[{gateway_record.level.upper()}] "
200 | result.append(level_text, style=gateway_record.style)
201 |
202 | # Add component if available and requested
203 | if self.show_component and gateway_record.component:
204 | component_text = f"[{gateway_record.component}] "
205 | result.append(component_text, style=gateway_record.component_style)
206 |
207 | # Add operation if available
208 | if gateway_record.operation:
209 | operation_text = f"{gateway_record.operation}: "
210 | result.append(operation_text, style="operation")
211 |
212 | # Add message
213 | result.append(gateway_record.message)
214 |
215 | # Add path/line number if requested
216 | if self.show_path:
217 | path_text = f" ({record.pathname}:{record.lineno})"
218 | result.append(path_text, style="dim")
219 |
220 | # Add Exception/Traceback if present (handled by RichHandler.render)
221 |
222 | return result
223 |
224 | class DetailedLogFormatter(GatewayLogFormatter):
225 | """Multi-line formatter that can include context data (Placeholder)."""
226 |
227 | def format_rich(self, record: logging.LogRecord) -> ConsoleRenderable:
228 | """Format a record with potentially detailed information.
229 |
230 | Args:
231 | record: The log record to format
232 |
233 | Returns:
234 | Formatted Panel or Text object
235 | """
236 | # Fallback to simple formatting for now
237 | formatter = SimpleLogFormatter(
238 | show_time=self.show_time,
239 | show_level=self.show_level,
240 | show_component=self.show_component,
241 | show_path=self.show_path
242 | )
243 | return formatter.format_rich(record)
244 |
245 | class RichLoggingHandler(RichHandler):
246 | """Custom RichHandler that uses GatewayLogFormatter.
247 |
248 | Overrides render to use the custom formatter.
249 | """
250 |
251 | def __init__(
252 | self,
253 | level: int = logging.NOTSET,
254 | console: Optional[Console] = None,
255 | formatter: Optional[GatewayLogFormatter] = None,
256 | show_path: bool = False, # Control path display via handler
257 | **kwargs
258 | ):
259 | """Initialize the Rich handler.
260 |
261 | Args:
262 | level: Log level for this handler
263 | console: Rich console instance (uses global if None)
264 | formatter: Custom Gateway formatter (creates default if None)
265 | show_path: Whether to show path/lineno in the logs
266 | **kwargs: Additional args for RichHandler
267 | """
268 | # Use the provided console or the default from console.py
269 | effective_console = console or get_rich_console()
270 |
271 | super().__init__(level=level, console=effective_console, **kwargs)
272 |
273 | # Create a default SimpleLogFormatter if none is provided
274 | self.formatter = formatter or SimpleLogFormatter(show_path=show_path)
275 |
276 | def emit(self, record: logging.LogRecord) -> None:
277 | """Emit a log record using Rich formatting."""
278 | try:
279 | # Let the custom formatter create the Rich renderable
280 | message_renderable = self.format_rich(record)
281 |
282 | # Get the traceback if there is one
283 | traceback_renderable = None
284 | if record.exc_info:
285 | traceback_renderable = Traceback.from_exception(
286 | *record.exc_info,
287 | width=self.console.width if self.console else None, # Check if console exists
288 | extra_lines=self.tracebacks_extra_lines,
289 | theme=self.tracebacks_theme,
290 | word_wrap=self.tracebacks_word_wrap,
291 | show_locals=self.tracebacks_show_locals,
292 | locals_max_length=self.locals_max_length,
293 | locals_max_string=self.locals_max_string,
294 | suppress=self.tracebacks_suppress,
295 | )
296 |
297 | # Use the render method to combine message and traceback
298 | renderable = self.render(
299 | record=record,
300 | traceback=traceback_renderable, # Pass the Traceback instance
301 | message_renderable=message_renderable
302 | )
303 | if self.console:
304 | self.console.print(renderable)
305 | except Exception:
306 | self.handleError(record)
307 |
308 | def format_rich(self, record: logging.LogRecord) -> ConsoleRenderable:
309 | """Format the record using the assigned GatewayLogFormatter."""
310 | # Ensure formatter is of the correct type before calling format_rich
311 | if isinstance(self.formatter, GatewayLogFormatter):
312 | # Indentation corrected: 4 spaces
313 | return self.formatter.format_rich(record)
314 | elif isinstance(self.formatter, logging.Formatter):
315 | # Indentation corrected: 4 spaces
316 | # Fallback for standard formatter (e.g., if assigned incorrectly)
317 | return Text(self.formatter.format(record))
318 | else:
319 | # Indentation corrected: 4 spaces
320 | # Fallback if formatter is None or unexpected type
321 | return Text(record.getMessage())
322 |
323 | def render(
324 | self,
325 | *, # Make args keyword-only
326 | record: logging.LogRecord,
327 | traceback: Optional[Traceback],
328 | message_renderable: ConsoleRenderable,
329 | ) -> ConsoleRenderable:
330 | """Renders log message and Traceback.
331 | Overridden to ensure our formatted message_renderable is used correctly.
332 |
333 | Args:
334 | record: logging Record.
335 | traceback: Traceback instance or None for no Traceback.
336 | message_renderable: Renderable representing log message.
337 |
338 | Returns:
339 | Renderable to be written to console.
340 | """
341 | # message_renderable is already formatted by format_rich
342 | # We just need to potentially append the traceback
343 | if traceback:
344 | # If the message is simple Text, append newline and traceback
345 | if isinstance(message_renderable, Text):
346 | # Check if message already ends with newline for cleaner separation
347 | if not str(message_renderable).endswith("\n"):
348 | message_renderable = Text.assemble(message_renderable, "\n") # Use assemble for safety
349 | return Group(message_renderable, traceback)
350 | else:
351 | # For Panels or other renderables, group them
352 | return Group(message_renderable, traceback)
353 | else:
354 | return message_renderable
355 |
356 | def create_rich_console_handler(**kwargs):
357 | """Factory function to create a RichLoggingHandler.
358 | Used in dictConfig.
359 |
360 | Args:
361 | **kwargs: Arguments passed from dictConfig, forwarded to RichLoggingHandler.
362 | Includes level, formatter (if specified), show_path, etc.
363 |
364 | Returns:
365 | Instance of RichLoggingHandler.
366 | """
367 | # Ensure console is not passed directly if we want the shared one
368 | kwargs.pop('console', None)
369 |
370 | # Extract formatter config if provided (though unlikely needed with custom handler)
371 | formatter_config = kwargs.pop('formatter', None)
372 | # We expect the handler config to specify the formatter directly or rely on default
373 |
374 | # Extract level, default to NOTSET if not provided
375 | level_name = kwargs.pop('level', 'NOTSET').upper()
376 | level = logging.getLevelName(level_name)
377 |
378 | # Extract show_path flag
379 | show_path = kwargs.pop('show_path', False)
380 |
381 | # Create the handler instance
382 | # Pass relevant args like show_path
383 | # Also pass RichHandler specific args if they exist in kwargs
384 | rich_handler_args = {
385 | k: v for k, v in kwargs.items()
386 | if k in (
387 | 'show_time', 'show_level', 'markup', 'rich_tracebacks',
388 | 'tracebacks_width', 'tracebacks_extra_lines', 'tracebacks_theme',
389 | 'tracebacks_word_wrap', 'tracebacks_show_locals',
390 | 'locals_max_length', 'locals_max_string', 'tracebacks_suppress'
391 | )
392 | }
393 | # Add show_path explicitly as it's specific to our handler/formatter logic here
394 | handler = RichLoggingHandler(level=level, show_path=show_path, **rich_handler_args)
395 |
396 | # Note: Setting a specific formatter via dictConfig for this custom handler
397 | # might require more complex logic here to instantiate the correct GatewayLogFormatter.
398 | # For now, it defaults to SimpleLogFormatter controlled by show_path.
399 |
400 | return handler
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/feedback.py:
--------------------------------------------------------------------------------
```python
1 | """Feedback and adaptive learning service for RAG."""
2 | import json
3 | import time
4 | from pathlib import Path
5 | from typing import Any, Dict, List, Optional, Set
6 |
7 | import numpy as np
8 |
9 | from ultimate_mcp_server.services.vector import get_embedding_service
10 | from ultimate_mcp_server.utils import get_logger
11 |
12 | logger = get_logger(__name__)
13 |
14 |
15 | class RAGFeedbackService:
16 | """Service for collecting and utilizing feedback for RAG."""
17 |
18 | def __init__(self, storage_dir: Optional[str] = None):
19 | """Initialize the feedback service.
20 |
21 | Args:
22 | storage_dir: Directory to store feedback data
23 | """
24 | if storage_dir:
25 | self.storage_dir = Path(storage_dir)
26 | else:
27 | self.storage_dir = Path("storage") / "rag_feedback"
28 |
29 | # Create storage directory
30 | self.storage_dir.mkdir(parents=True, exist_ok=True)
31 |
32 | # Feedback data structure
33 | self.document_feedback = {} # Knowledge base -> document_id -> feedback
34 | self.query_feedback = {} # Knowledge base -> query -> feedback
35 | self.retrieval_stats = {} # Knowledge base -> document_id -> usage stats
36 |
37 | # Load existing feedback data
38 | self._load_feedback_data()
39 |
40 | # Get embedding service for similarity calculations
41 | self.embedding_service = get_embedding_service()
42 |
43 | # Improvement factors (weights for feedback)
44 | self.feedback_weights = {
45 | "thumbs_up": 0.1, # Positive explicit feedback
46 | "thumbs_down": -0.15, # Negative explicit feedback
47 | "used_in_answer": 0.05, # Document was used in the answer
48 | "not_used": -0.02, # Document was retrieved but not used
49 | "time_decay": 0.001, # Decay factor for time
50 | }
51 |
52 | logger.info("RAG feedback service initialized", extra={"emoji_key": "success"})
53 |
54 | def _get_feedback_file(self, kb_name: str) -> Path:
55 | """Get path to feedback file for a knowledge base.
56 |
57 | Args:
58 | kb_name: Knowledge base name
59 |
60 | Returns:
61 | Path to feedback file
62 | """
63 | return self.storage_dir / f"{kb_name}_feedback.json"
64 |
65 | def _load_feedback_data(self):
66 | """Load feedback data from storage."""
67 | try:
68 | # Load all feedback files
69 | for file_path in self.storage_dir.glob("*_feedback.json"):
70 | try:
71 | kb_name = file_path.stem.replace("_feedback", "")
72 |
73 | with open(file_path, "r") as f:
74 | data = json.load(f)
75 |
76 | self.document_feedback[kb_name] = data.get("document_feedback", {})
77 | self.query_feedback[kb_name] = data.get("query_feedback", {})
78 | self.retrieval_stats[kb_name] = data.get("retrieval_stats", {})
79 |
80 | logger.debug(
81 | f"Loaded feedback data for knowledge base '{kb_name}'",
82 | extra={"emoji_key": "cache"}
83 | )
84 | except Exception as e:
85 | logger.error(
86 | f"Error loading feedback data from {file_path}: {str(e)}",
87 | extra={"emoji_key": "error"}
88 | )
89 | except Exception as e:
90 | logger.error(
91 | f"Error loading feedback data: {str(e)}",
92 | extra={"emoji_key": "error"}
93 | )
94 |
95 | def _save_feedback_data(self, kb_name: str):
96 | """Save feedback data to storage.
97 |
98 | Args:
99 | kb_name: Knowledge base name
100 | """
101 | try:
102 | file_path = self._get_feedback_file(kb_name)
103 |
104 | # Prepare data
105 | data = {
106 | "document_feedback": self.document_feedback.get(kb_name, {}),
107 | "query_feedback": self.query_feedback.get(kb_name, {}),
108 | "retrieval_stats": self.retrieval_stats.get(kb_name, {}),
109 | "last_updated": time.time()
110 | }
111 |
112 | # Save to file
113 | with open(file_path, "w") as f:
114 | json.dump(data, f, indent=2)
115 |
116 | logger.debug(
117 | f"Saved feedback data for knowledge base '{kb_name}'",
118 | extra={"emoji_key": "cache"}
119 | )
120 | except Exception as e:
121 | logger.error(
122 | f"Error saving feedback data for knowledge base '{kb_name}': {str(e)}",
123 | extra={"emoji_key": "error"}
124 | )
125 |
126 | async def record_retrieval_feedback(
127 | self,
128 | knowledge_base_name: str,
129 | query: str,
130 | retrieved_documents: List[Dict[str, Any]],
131 | used_document_ids: Optional[Set[str]] = None,
132 | explicit_feedback: Optional[Dict[str, str]] = None
133 | ) -> Dict[str, Any]:
134 | """Record feedback about retrieval results.
135 |
136 | Args:
137 | knowledge_base_name: Knowledge base name
138 | query: Query text
139 | retrieved_documents: List of retrieved documents with IDs and scores
140 | used_document_ids: Set of document IDs that were used in the answer
141 | explicit_feedback: Optional explicit feedback (document_id -> feedback)
142 |
143 | Returns:
144 | Feedback recording result
145 | """
146 | # Initialize structures if needed
147 | if knowledge_base_name not in self.document_feedback:
148 | self.document_feedback[knowledge_base_name] = {}
149 |
150 | if knowledge_base_name not in self.query_feedback:
151 | self.query_feedback[knowledge_base_name] = {}
152 |
153 | if knowledge_base_name not in self.retrieval_stats:
154 | self.retrieval_stats[knowledge_base_name] = {}
155 |
156 | # Set default for used_document_ids
157 | if used_document_ids is None:
158 | used_document_ids = set()
159 |
160 | # Set default for explicit_feedback
161 | if explicit_feedback is None:
162 | explicit_feedback = {}
163 |
164 | # Record query
165 | query_hash = query[:100] # Use prefix as key
166 |
167 | if query_hash not in self.query_feedback[knowledge_base_name]:
168 | self.query_feedback[knowledge_base_name][query_hash] = {
169 | "query": query,
170 | "count": 0,
171 | "last_used": time.time(),
172 | "retrieved_docs": []
173 | }
174 |
175 | # Update query stats
176 | self.query_feedback[knowledge_base_name][query_hash]["count"] += 1
177 | self.query_feedback[knowledge_base_name][query_hash]["last_used"] = time.time()
178 |
179 | # Process each retrieved document
180 | for doc in retrieved_documents:
181 | doc_id = doc["id"]
182 |
183 | # Initialize document feedback if not exists
184 | if doc_id not in self.document_feedback[knowledge_base_name]:
185 | self.document_feedback[knowledge_base_name][doc_id] = {
186 | "relevance_adjustment": 0.0,
187 | "positive_feedback_count": 0,
188 | "negative_feedback_count": 0,
189 | "used_count": 0,
190 | "retrieved_count": 0,
191 | "last_used": time.time()
192 | }
193 |
194 | # Update document stats
195 | doc_feedback = self.document_feedback[knowledge_base_name][doc_id]
196 | doc_feedback["retrieved_count"] += 1
197 | doc_feedback["last_used"] = time.time()
198 |
199 | # Record if document was used in the answer
200 | if doc_id in used_document_ids:
201 | doc_feedback["used_count"] += 1
202 | doc_feedback["relevance_adjustment"] += self.feedback_weights["used_in_answer"]
203 | else:
204 | doc_feedback["relevance_adjustment"] += self.feedback_weights["not_used"]
205 |
206 | # Apply explicit feedback if provided
207 | if doc_id in explicit_feedback:
208 | feedback_type = explicit_feedback[doc_id]
209 |
210 | if feedback_type == "thumbs_up":
211 | doc_feedback["positive_feedback_count"] += 1
212 | doc_feedback["relevance_adjustment"] += self.feedback_weights["thumbs_up"]
213 | elif feedback_type == "thumbs_down":
214 | doc_feedback["negative_feedback_count"] += 1
215 | doc_feedback["relevance_adjustment"] += self.feedback_weights["thumbs_down"]
216 |
217 | # Keep adjustment within bounds
218 | doc_feedback["relevance_adjustment"] = max(-0.5, min(0.5, doc_feedback["relevance_adjustment"]))
219 |
220 | # Record document in query feedback
221 | if doc_id not in self.query_feedback[knowledge_base_name][query_hash]["retrieved_docs"]:
222 | self.query_feedback[knowledge_base_name][query_hash]["retrieved_docs"].append(doc_id)
223 |
224 | # Save feedback data
225 | self._save_feedback_data(knowledge_base_name)
226 |
227 | logger.info(
228 | f"Recorded feedback for {len(retrieved_documents)} documents in knowledge base '{knowledge_base_name}'",
229 | extra={"emoji_key": "success"}
230 | )
231 |
232 | return {
233 | "status": "success",
234 | "knowledge_base": knowledge_base_name,
235 | "query": query,
236 | "documents_count": len(retrieved_documents),
237 | "used_documents_count": len(used_document_ids)
238 | }
239 |
240 | async def get_document_boost(
241 | self,
242 | knowledge_base_name: str,
243 | document_id: str
244 | ) -> float:
245 | """Get relevance boost for a document based on feedback.
246 |
247 | Args:
248 | knowledge_base_name: Knowledge base name
249 | document_id: Document ID
250 |
251 | Returns:
252 | Relevance boost factor
253 | """
254 | if (knowledge_base_name not in self.document_feedback or
255 | document_id not in self.document_feedback[knowledge_base_name]):
256 | return 0.0
257 |
258 | # Get document feedback
259 | doc_feedback = self.document_feedback[knowledge_base_name][document_id]
260 |
261 | # Calculate time decay
262 | time_since_last_use = time.time() - doc_feedback.get("last_used", 0)
263 | time_decay = min(1.0, time_since_last_use / (86400 * 30)) # 30 days max decay
264 |
265 | # Apply decay to adjustment
266 | adjustment = doc_feedback["relevance_adjustment"] * (1.0 - time_decay * self.feedback_weights["time_decay"])
267 |
268 | return adjustment
269 |
270 | async def get_similar_queries(
271 | self,
272 | knowledge_base_name: str,
273 | query: str,
274 | top_k: int = 3,
275 | threshold: float = 0.8
276 | ) -> List[Dict[str, Any]]:
277 | """Find similar previous queries.
278 |
279 | Args:
280 | knowledge_base_name: Knowledge base name
281 | query: Query text
282 | top_k: Number of similar queries to return
283 | threshold: Similarity threshold
284 |
285 | Returns:
286 | List of similar queries with metadata
287 | """
288 | if knowledge_base_name not in self.query_feedback:
289 | return []
290 |
291 | query_feedback = self.query_feedback[knowledge_base_name]
292 |
293 | if not query_feedback:
294 | return []
295 |
296 | # Get embedding for the query
297 | query_embedding = await self.embedding_service.get_embedding(query)
298 |
299 | # Calculate similarity with all previous queries
300 | similarities = []
301 |
302 | for _query_hash, data in query_feedback.items():
303 | try:
304 | prev_query = data["query"]
305 | prev_embedding = await self.embedding_service.get_embedding(prev_query)
306 |
307 | # Calculate cosine similarity
308 | similarity = np.dot(query_embedding, prev_embedding) / (
309 | np.linalg.norm(query_embedding) * np.linalg.norm(prev_embedding)
310 | )
311 |
312 | if similarity >= threshold:
313 | similarities.append({
314 | "query": prev_query,
315 | "similarity": float(similarity),
316 | "count": data["count"],
317 | "last_used": data["last_used"],
318 | "retrieved_docs": data["retrieved_docs"]
319 | })
320 | except Exception as e:
321 | logger.error(
322 | f"Error calculating similarity for query: {str(e)}",
323 | extra={"emoji_key": "error"}
324 | )
325 |
326 | # Sort by similarity (descending)
327 | similarities.sort(key=lambda x: x["similarity"], reverse=True)
328 |
329 | return similarities[:top_k]
330 |
331 | async def apply_feedback_adjustments(
332 | self,
333 | knowledge_base_name: str,
334 | results: List[Dict[str, Any]],
335 | query: str
336 | ) -> List[Dict[str, Any]]:
337 | """Apply feedback-based adjustments to retrieval results.
338 |
339 | Args:
340 | knowledge_base_name: Knowledge base name
341 | results: List of retrieval results
342 | query: Query text
343 |
344 | Returns:
345 | Adjusted retrieval results
346 | """
347 | # Check if we have feedback data
348 | if knowledge_base_name not in self.document_feedback:
349 | return results
350 |
351 | # Get similar queries
352 | similar_queries = await self.get_similar_queries(
353 | knowledge_base_name=knowledge_base_name,
354 | query=query,
355 | top_k=3,
356 | threshold=0.8
357 | )
358 |
359 | # Collect document IDs from similar queries
360 | similar_doc_ids = set()
361 | for sq in similar_queries:
362 | similar_doc_ids.update(sq["retrieved_docs"])
363 |
364 | # Apply boosts to results
365 | adjusted_results = []
366 |
367 | for result in results:
368 | doc_id = result["id"]
369 | score = result["score"]
370 |
371 | # Apply document-specific boost
372 | doc_boost = await self.get_document_boost(knowledge_base_name, doc_id)
373 |
374 | # Apply boost for documents from similar queries
375 | similar_query_boost = 0.05 if doc_id in similar_doc_ids else 0.0
376 |
377 | # Calculate final score with boosts
378 | adjusted_score = min(1.0, score + doc_boost + similar_query_boost)
379 |
380 | # Update result
381 | adjusted_result = result.copy()
382 | adjusted_result["original_score"] = score
383 | adjusted_result["feedback_boost"] = doc_boost
384 | adjusted_result["similar_query_boost"] = similar_query_boost
385 | adjusted_result["score"] = adjusted_score
386 |
387 | adjusted_results.append(adjusted_result)
388 |
389 | # Sort by adjusted score
390 | adjusted_results.sort(key=lambda x: x["score"], reverse=True)
391 |
392 | return adjusted_results
393 |
394 |
395 | # Singleton instance
396 | _rag_feedback_service = None
397 |
398 |
399 | def get_rag_feedback_service() -> RAGFeedbackService:
400 | """Get or create a RAG feedback service instance.
401 |
402 | Returns:
403 | RAGFeedbackService: RAG feedback service instance
404 | """
405 | global _rag_feedback_service
406 |
407 | if _rag_feedback_service is None:
408 | _rag_feedback_service = RAGFeedbackService()
409 |
410 | return _rag_feedback_service
```
--------------------------------------------------------------------------------
/error_handling.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Comprehensive error handling framework for Model Control Protocol (MCP) systems.
3 |
4 | This module implements a consistent, standardized approach to error handling for MCP tools
5 | and services. It provides decorators, formatters, and utilities that transform Python
6 | exceptions into structured, protocol-compliant error responses that LLMs and client
7 | applications can reliably interpret and respond to.
8 |
9 | The framework is designed around several key principles:
10 |
11 | 1. CONSISTENCY: All errors follow the same structured format regardless of their source
12 | 2. RECOVERABILITY: Errors include explicit information on whether operations can be retried
13 | 3. ACTIONABILITY: Error responses provide specific suggestions for resolving issues
14 | 4. DEBUGGABILITY: Rich error details are preserved for troubleshooting
15 | 5. CATEGORIZATION: Errors are mapped to standardized types for consistent handling
16 |
17 | Key components:
18 | - ErrorType enum: Categorization system for different error conditions
19 | - format_error_response(): Creates standardized error response dictionaries
20 | - with_error_handling: Decorator that catches exceptions and formats responses
21 | - validate_inputs: Decorator for declarative parameter validation
22 | - Validator functions: Reusable validation logic for common parameter types
23 |
24 | Usage example:
25 | ```python
26 | @with_error_handling
27 | @validate_inputs(
28 | prompt=non_empty_string,
29 | temperature=in_range(0.0, 1.0)
30 | )
31 | async def generate_text(prompt, temperature=0.7):
32 | # Implementation...
33 | # Any exceptions thrown here will be caught and formatted
34 | # Input validation happens before execution
35 | if external_service_down:
36 | raise Exception("External service unavailable")
37 | return result
38 | ```
39 |
40 | The error handling pattern is designed to work seamlessly with async functions and
41 | integrates with the MCP protocol's expected error response structure.
42 | """
43 | import functools
44 | import inspect
45 | import time
46 | import traceback
47 | from enum import Enum
48 | from typing import Any, Callable, Dict, Optional, Union
49 |
50 |
51 | class ErrorType(str, Enum):
52 | """Types of errors that can occur in MCP tools."""
53 |
54 | VALIDATION_ERROR = "validation_error" # Input validation failed
55 | EXECUTION_ERROR = "execution_error" # Error during execution
56 | PERMISSION_ERROR = "permission_error" # Insufficient permissions
57 | NOT_FOUND_ERROR = "not_found_error" # Resource not found
58 | TIMEOUT_ERROR = "timeout_error" # Operation timed out
59 | RATE_LIMIT_ERROR = "rate_limit_error" # Rate limit exceeded
60 | EXTERNAL_ERROR = "external_error" # Error in external service
61 | UNKNOWN_ERROR = "unknown_error" # Unknown error
62 |
63 |
64 | def format_error_response(
65 | error_type: Union[ErrorType, str],
66 | message: str,
67 | details: Optional[Dict[str, Any]] = None,
68 | retriable: bool = False,
69 | suggestions: Optional[list] = None
70 | ) -> Dict[str, Any]:
71 | """
72 | Format a standardized error response.
73 |
74 | Args:
75 | error_type: Type of error
76 | message: Human-readable error message
77 | details: Additional error details
78 | retriable: Whether the operation can be retried
79 | suggestions: List of suggestions for resolving the error
80 |
81 | Returns:
82 | Formatted error response
83 | """
84 | return {
85 | "success": False,
86 | "isError": True, # MCP protocol flag
87 | "error": {
88 | "type": error_type if isinstance(error_type, str) else error_type.value,
89 | "message": message,
90 | "details": details or {},
91 | "retriable": retriable,
92 | "suggestions": suggestions or [],
93 | "timestamp": time.time()
94 | }
95 | }
96 |
97 |
98 | def with_error_handling(func: Callable) -> Callable:
99 | """
100 | Decorator that provides standardized exception handling for MCP tool functions.
101 |
102 | This decorator intercepts any exceptions raised by the wrapped function and transforms
103 | them into a structured error response format that follows the MCP protocol. The response
104 | includes consistent error categorization, helpful suggestions for recovery, and details
105 | to aid debugging.
106 |
107 | Key features:
108 | - Automatically categorizes exceptions into appropriate ErrorType values
109 | - Preserves the original exception message and stack trace
110 | - Adds relevant suggestions based on the error type
111 | - Indicates whether the operation can be retried
112 | - Adds a timestamp for error logging/tracking
113 |
114 | The error response structure always includes:
115 | - success: False
116 | - isError: True (MCP protocol flag)
117 | - error: A dictionary with type, message, details, retriable flag, and suggestions
118 |
119 | Exception mapping:
120 | - ValueError, TypeError, KeyError, AttributeError → VALIDATION_ERROR (retriable)
121 | - FileNotFoundError, KeyError, IndexError → NOT_FOUND_ERROR (not retriable)
122 | - PermissionError, AccessError → PERMISSION_ERROR (not retriable)
123 | - TimeoutError → TIMEOUT_ERROR (retriable)
124 | - Exceptions with "rate limit" in message → RATE_LIMIT_ERROR (retriable)
125 | - All other exceptions → UNKNOWN_ERROR (not retriable)
126 |
127 | Args:
128 | func: The async function to wrap with error handling
129 |
130 | Returns:
131 | Decorated async function that catches exceptions and returns structured error responses
132 |
133 | Example:
134 | ```python
135 | @with_error_handling
136 | async def my_tool_function(param1, param2):
137 | # Function implementation that might raise exceptions
138 | # If an exception occurs, it will be transformed into a structured response
139 | ```
140 | """
141 | @functools.wraps(func)
142 | async def wrapper(*args, **kwargs):
143 | try:
144 | # Call the original function
145 | return await func(*args, **kwargs)
146 | except Exception as e:
147 | # Get exception details
148 | exc_type = type(e).__name__
149 | exc_message = str(e)
150 | exc_traceback = traceback.format_exc()
151 |
152 | # Determine error type
153 | error_type = ErrorType.UNKNOWN_ERROR
154 | retriable = False
155 |
156 | # Map common exceptions to error types
157 | if exc_type in ("ValueError", "TypeError", "KeyError", "AttributeError"):
158 | error_type = ErrorType.VALIDATION_ERROR
159 | retriable = True
160 | elif exc_type in ("FileNotFoundError", "KeyError", "IndexError"):
161 | error_type = ErrorType.NOT_FOUND_ERROR
162 | retriable = False
163 | elif exc_type in ("PermissionError", "AccessError"):
164 | error_type = ErrorType.PERMISSION_ERROR
165 | retriable = False
166 | elif exc_type in ("TimeoutError"):
167 | error_type = ErrorType.TIMEOUT_ERROR
168 | retriable = True
169 | elif "rate limit" in exc_message.lower():
170 | error_type = ErrorType.RATE_LIMIT_ERROR
171 | retriable = True
172 |
173 | # Generate suggestions based on error type
174 | suggestions = []
175 | if error_type == ErrorType.VALIDATION_ERROR:
176 | suggestions = [
177 | "Check that all required parameters are provided",
178 | "Verify parameter types and formats",
179 | "Ensure parameter values are within allowed ranges"
180 | ]
181 | elif error_type == ErrorType.NOT_FOUND_ERROR:
182 | suggestions = [
183 | "Verify the resource ID or path exists",
184 | "Check for typos in identifiers",
185 | "Ensure the resource hasn't been deleted"
186 | ]
187 | elif error_type == ErrorType.RATE_LIMIT_ERROR:
188 | suggestions = [
189 | "Wait before retrying the request",
190 | "Reduce the frequency of requests",
191 | "Implement backoff strategy for retries"
192 | ]
193 |
194 | # Format and return error response
195 | return format_error_response(
196 | error_type=error_type,
197 | message=exc_message,
198 | details={
199 | "exception_type": exc_type,
200 | "traceback": exc_traceback
201 | },
202 | retriable=retriable,
203 | suggestions=suggestions
204 | )
205 |
206 | return wrapper
207 |
208 |
209 | def validate_inputs(**validators):
210 | """
211 | Decorator for validating tool input parameters against custom validation rules.
212 |
213 | This decorator enables declarative input validation for async tool functions by applying
214 | validator functions to specified parameters before the decorated function is called.
215 | If any validation fails, the function returns a standardized error response instead
216 | of executing, preventing errors from propagating and providing clear feedback on the issue.
217 |
218 | The validation approach supports:
219 | - Applying different validation rules to different parameters
220 | - Detailed error messages explaining which parameter failed and why
221 | - Custom validation logic via any callable that raises ValueError on failure
222 | - Zero validation overhead for parameters not explicitly validated
223 |
224 | Validator functions should:
225 | 1. Take a single parameter (the value to validate)
226 | 2. Raise a ValueError with a descriptive message if validation fails
227 | 3. Return None or any value (which is ignored) if validation passes
228 | 4. Include a docstring that describes the constraint (used in error messages)
229 |
230 | Args:
231 | **validators: A mapping of parameter names to validator functions.
232 | Each key should match a parameter name in the decorated function.
233 | Each value should be a callable that validates the corresponding parameter.
234 |
235 | Returns:
236 | Decorator function that wraps an async function with input validation
237 |
238 | Example:
239 | ```
240 | # Define validators (or use the provided ones like non_empty_string)
241 | def validate_temperature(value):
242 | '''Temperature must be between 0.0 and 1.0.'''
243 | if not isinstance(value, float) or value < 0.0 or value > 1.0:
244 | raise ValueError("Temperature must be between 0.0 and 1.0")
245 |
246 | # Apply validation to specific parameters
247 | @validate_inputs(
248 | prompt=non_empty_string,
249 | temperature=validate_temperature,
250 | max_tokens=positive_number
251 | )
252 | async def generate_text(prompt, temperature=0.7, max_tokens=None):
253 | # This function will only be called if all validations pass
254 | # Otherwise a standardized error response is returned
255 | ...
256 |
257 | # The response structure when validation fails:
258 | # {
259 | # "success": False,
260 | # "isError": True,
261 | # "error": {
262 | # "type": "validation_error",
263 | # "message": "Invalid value for parameter 'prompt': Value must be a non-empty string",
264 | # "details": { ... },
265 | # "retriable": true,
266 | # "suggestions": [ ... ]
267 | # }
268 | # }
269 | ```
270 |
271 | Note:
272 | This decorator should typically be applied before other decorators like
273 | with_error_handling so that validation errors are correctly formatted.
274 | """
275 | def decorator(func):
276 | @functools.wraps(func)
277 | async def wrapper(*args, **kwargs):
278 | # Get function signature
279 | sig = inspect.signature(func)
280 |
281 | # Build mapping of parameter names to values
282 | bound_args = sig.bind(*args, **kwargs)
283 | bound_args.apply_defaults()
284 |
285 | # Validate inputs
286 | for param_name, validator in validators.items():
287 | if param_name in bound_args.arguments:
288 | value = bound_args.arguments[param_name]
289 | try:
290 | # Run validation
291 | validator(value)
292 | except Exception as e:
293 | # Return validation error
294 | return format_error_response(
295 | error_type=ErrorType.VALIDATION_ERROR,
296 | message=f"Invalid value for parameter '{param_name}': {str(e)}",
297 | details={
298 | "parameter": param_name,
299 | "value": str(value),
300 | "constraint": str(validator.__doc__ or "")
301 | },
302 | retriable=True,
303 | suggestions=[
304 | f"Provide a valid value for '{param_name}'",
305 | "Check the parameter constraints in the tool description"
306 | ]
307 | )
308 |
309 | # Call the original function if validation passes
310 | return await func(*args, **kwargs)
311 |
312 | return wrapper
313 |
314 | return decorator
315 |
316 |
317 | # Example validators
318 | def non_empty_string(value):
319 | """
320 | Validates that a value is a non-empty string.
321 |
322 | This validator checks that the input is a string type and contains at least
323 | one non-whitespace character. Empty strings or strings containing only
324 | whitespace characters are rejected. This is useful for validating required
325 | text inputs where blank values should not be allowed.
326 |
327 | Args:
328 | value: The value to validate
329 |
330 | Raises:
331 | ValueError: If the value is not a string or is empty/whitespace-only
332 | """
333 | if not isinstance(value, str) or not value.strip():
334 | raise ValueError("Value must be a non-empty string")
335 |
336 | def positive_number(value):
337 | """
338 | Validates that a value is a positive number (greater than zero).
339 |
340 | This validator ensures that the input is either an integer or float
341 | and has a value greater than zero. Zero or negative values are rejected.
342 | This is useful for validating inputs like quantities, counts, or rates
343 | that must be positive.
344 |
345 | Args:
346 | value: The value to validate
347 |
348 | Raises:
349 | ValueError: If the value is not a number or is not positive
350 | """
351 | if not isinstance(value, (int, float)) or value <= 0:
352 | raise ValueError("Value must be a positive number")
353 |
354 | def in_range(min_val, max_val):
355 | """
356 | Creates a validator function for checking if a number falls within a specified range.
357 |
358 | This is a validator factory that returns a custom validator function
359 | configured with the given minimum and maximum bounds. The returned function
360 | checks that a value is a number and falls within the inclusive range
361 | [min_val, max_val]. This is useful for validating inputs that must fall
362 | within specific limits, such as probabilities, temperatures, or indexes.
363 |
364 | Args:
365 | min_val: The minimum allowed value (inclusive)
366 | max_val: The maximum allowed value (inclusive)
367 |
368 | Returns:
369 | A validator function that checks if values are within the specified range
370 |
371 | Example:
372 | ```python
373 | # Create a validator for temperature (0.0 to 1.0)
374 | validate_temperature = in_range(0.0, 1.0)
375 |
376 | # Use in validation decorator
377 | @validate_inputs(temperature=validate_temperature)
378 | async def generate_text(prompt, temperature=0.7):
379 | # Function body
380 | ...
381 | ```
382 | """
383 | def validator(value):
384 | """Value must be between {min_val} and {max_val}."""
385 | if not isinstance(value, (int, float)) or value < min_val or value > max_val:
386 | raise ValueError(f"Value must be between {min_val} and {max_val}")
387 | return validator
```
--------------------------------------------------------------------------------
/examples/vector_search_demo.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """Vector database and semantic search demonstration for Ultimate MCP Server."""
3 | import asyncio
4 | import sys
5 | import time
6 | from pathlib import Path
7 |
8 | # Add project root to path for imports when running as script
9 | sys.path.insert(0, str(Path(__file__).parent.parent))
10 |
11 | from rich import box
12 | from rich.markup import escape
13 | from rich.panel import Panel
14 | from rich.rule import Rule
15 | from rich.table import Table
16 |
17 | from ultimate_mcp_server.constants import Provider
18 | from ultimate_mcp_server.core.providers.base import get_provider
19 | from ultimate_mcp_server.services.vector import get_embedding_service, get_vector_db_service
20 | from ultimate_mcp_server.utils import get_logger
21 | from ultimate_mcp_server.utils.display import CostTracker
22 |
23 | # --- Add Rich Imports ---
24 | from ultimate_mcp_server.utils.logging.console import console
25 |
26 | # ----------------------
27 |
28 | # Initialize logger
29 | logger = get_logger("example.vector_search")
30 |
31 |
32 | async def demonstrate_vector_operations():
33 | """Demonstrate basic vector database operations using Rich."""
34 | console.print(Rule("[bold blue]Vector Database Operations Demo[/bold blue]"))
35 | logger.info("Starting vector database demonstration", emoji_key="start")
36 |
37 | embedding_service = get_embedding_service()
38 | vector_db = get_vector_db_service()
39 |
40 | if not embedding_service or not hasattr(embedding_service, 'client'):
41 | logger.critical("Failed to initialize embedding service. Is OPENAI_API_KEY configured correctly?", emoji_key="critical")
42 | console.print("[bold red]Error:[/bold red] Embedding service (likely OpenAI) failed to initialize. Check API key.")
43 | return False
44 |
45 | console.print(f"[dim]Vector DB Storage Path: {vector_db.base_dir}[/dim]")
46 |
47 | collection_name = "semantic_search_demo_rich"
48 | embedding_dimension = 1536 # Default for text-embedding-ada-002 / 3-small
49 |
50 | # --- Setup Collection ---
51 | try:
52 | logger.info(f"Creating/resetting collection: {collection_name}", emoji_key="db")
53 | await vector_db.create_collection(
54 | name=collection_name,
55 | dimension=embedding_dimension,
56 | overwrite=True,
57 | metadata={"description": "Rich Demo collection"}
58 | )
59 |
60 | documents = [
61 | "Machine learning is a field of study in artificial intelligence concerned with the development of algorithms that can learn from data.",
62 | "Natural language processing (NLP) is a subfield of linguistics and AI focused on interactions between computers and human language.",
63 | "Neural networks are computing systems inspired by the biological neural networks that constitute animal brains.",
64 | "Deep learning is part of a broader family of machine learning methods based on artificial neural networks.",
65 | "Transformer models have revolutionized natural language processing with their self-attention mechanism.",
66 | "Vector databases store and retrieve high-dimensional vectors for tasks like semantic search and recommendation systems.",
67 | "Embeddings are numerical representations that capture semantic meanings and relationships between objects.",
68 | "Clustering algorithms group data points into clusters based on similarity metrics.",
69 | "Reinforcement learning is about how software agents should take actions to maximize cumulative reward.",
70 | "Knowledge graphs represent knowledge in graph form with entities as nodes and relationships as edges."
71 | ]
72 | document_ids = [f"doc_{i}" for i in range(len(documents))]
73 | document_metadata = [
74 | {"domain": "machine_learning", "type": "concept", "id": document_ids[i]}
75 | for i, doc in enumerate(documents)
76 | ]
77 |
78 | logger.info(f"Adding {len(documents)} documents...", emoji_key="processing")
79 | add_start_time = time.time()
80 | ids = await vector_db.add_texts(
81 | collection_name=collection_name,
82 | texts=documents,
83 | metadatas=document_metadata,
84 | ids=document_ids
85 | )
86 | add_time = time.time() - add_start_time
87 | logger.success(f"Added {len(ids)} documents in {add_time:.2f}s", emoji_key="success")
88 |
89 | # --- Basic Search ---
90 | console.print(Rule("[green]Semantic Search[/green]"))
91 | query = "How do neural networks work?"
92 | logger.info(f"Searching for: '{escape(query)}'...", emoji_key="search")
93 | search_start_time = time.time()
94 | results = await vector_db.search_by_text(
95 | collection_name=collection_name,
96 | query_text=query,
97 | top_k=3,
98 | include_vectors=False
99 | )
100 | search_time = time.time() - search_start_time
101 | logger.success(f"Search completed in {search_time:.3f}s", emoji_key="success")
102 |
103 | results_table = Table(title=f'Search Results for: "{escape(query)}"', box=box.ROUNDED)
104 | results_table.add_column("#", style="dim", justify="right")
105 | results_table.add_column("Score", style="green", justify="right")
106 | results_table.add_column("Domain", style="cyan")
107 | results_table.add_column("Text Snippet", style="white")
108 |
109 | if results:
110 | for i, res in enumerate(results):
111 | metadata = res.get("metadata", {})
112 | text_snippet = escape(res.get("text", "")[:120] + ( "..." if len(res.get("text", "")) > 120 else ""))
113 | results_table.add_row(
114 | str(i+1),
115 | f"{res.get('similarity', 0.0):.4f}",
116 | escape(metadata.get("domain", "N/A")),
117 | text_snippet
118 | )
119 | else:
120 | results_table.add_row("-","-","-", "[dim]No results found.[/dim]")
121 | console.print(results_table)
122 | console.print()
123 |
124 | # --- Filtered Search ---
125 | console.print(Rule("[green]Filtered Semantic Search[/green]"))
126 | filter_query = "embeddings"
127 | domain_filter = {"domain": "machine_learning"} # Example filter
128 | logger.info(f"Searching for '{escape(filter_query)}' with filter {escape(str(domain_filter))}...", emoji_key="filter")
129 |
130 | f_search_start_time = time.time()
131 | filtered_results = await vector_db.search_by_text(
132 | collection_name=collection_name,
133 | query_text=filter_query,
134 | top_k=3,
135 | filter=domain_filter
136 | )
137 | f_search_time = time.time() - f_search_start_time
138 | logger.success(f"Filtered search completed in {f_search_time:.3f}s", emoji_key="success")
139 |
140 | f_results_table = Table(title=f'Filtered Results (domain=machine_learning) for: "{escape(filter_query)}"', box=box.ROUNDED)
141 | f_results_table.add_column("#", style="dim", justify="right")
142 | f_results_table.add_column("Score", style="green", justify="right")
143 | f_results_table.add_column("Domain", style="cyan")
144 | f_results_table.add_column("Text Snippet", style="white")
145 |
146 | if filtered_results:
147 | for i, res in enumerate(filtered_results):
148 | metadata = res.get("metadata", {})
149 | text_snippet = escape(res.get("text", "")[:120] + ( "..." if len(res.get("text", "")) > 120 else ""))
150 | f_results_table.add_row(
151 | str(i+1),
152 | f"{res.get('similarity', 0.0):.4f}",
153 | escape(metadata.get("domain", "N/A")),
154 | text_snippet
155 | )
156 | else:
157 | f_results_table.add_row("-","-","-", "[dim]No results found.[/dim]")
158 | console.print(f_results_table)
159 | console.print()
160 |
161 | # --- Direct Embedding ---
162 | console.print(Rule("[green]Direct Embedding Generation[/green]"))
163 | logger.info("Demonstrating direct embedding generation", emoji_key="vector")
164 | sample_text = "Semantic search helps find conceptually similar content."
165 | console.print(f"[cyan]Input Text:[/cyan] {escape(sample_text)}")
166 |
167 | emb_start_time = time.time()
168 | embeddings_list = await embedding_service.create_embeddings([sample_text])
169 | embedding = embeddings_list[0]
170 | emb_time = time.time() - emb_start_time
171 | logger.success(f"Generated embedding in {emb_time:.3f}s", emoji_key="success")
172 |
173 | # Use embedding display utility
174 | from ultimate_mcp_server.utils.display import _display_embeddings_info
175 | _display_embeddings_info([embedding], "text-embedding-3-small", console)
176 |
177 | # Also show sample values in a simple format for demo clarity
178 | console.print(f"[cyan]Sample Values (first 5):[/cyan] [dim]{escape(str(embedding[:5]))}...[/dim]")
179 | console.print()
180 |
181 | return True
182 |
183 | except Exception as e:
184 | logger.error(f"Error in vector operations: {e}", emoji_key="error", exc_info=True)
185 | console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
186 | return False
187 | finally:
188 | # Clean up collection
189 | try:
190 | logger.info(f"Deleting collection: {collection_name}", emoji_key="db")
191 | await vector_db.delete_collection(collection_name)
192 | except Exception as del_e:
193 | logger.warning(f"Could not delete collection {collection_name}: {del_e}", emoji_key="warning")
194 |
195 |
196 | async def demonstrate_llm_with_vector_retrieval(tracker: CostTracker):
197 | """Demonstrate RAG using vector search and LLM with Rich display."""
198 | console.print(Rule("[bold blue]Retrieval-Augmented Generation (RAG) Demo[/bold blue]"))
199 | logger.info("Starting RAG demo", emoji_key="start")
200 |
201 | vector_db = get_vector_db_service()
202 | # Let get_provider handle key loading internally AND await it
203 | provider = await get_provider(Provider.OPENAI.value)
204 |
205 | if not provider:
206 | logger.critical("OpenAI provider failed to initialize for RAG demo. Is OPENAI_API_KEY configured?", emoji_key="critical")
207 | console.print("[bold red]Error:[/bold red] OpenAI provider failed to initialize. Check API key.")
208 | return False
209 |
210 | # Re-create collection and add docs for this demo part
211 | collection_name = "rag_demo_collection_rich"
212 | embedding_dimension = 1536
213 | try:
214 | logger.info(f"Setting up collection: {collection_name}", emoji_key="db")
215 | await vector_db.create_collection(name=collection_name, dimension=embedding_dimension, overwrite=True)
216 | documents = [
217 | "Deep learning uses artificial neural networks with many layers (deep architectures).",
218 | "Neural networks are inspired by biological brains and consist of interconnected nodes or neurons.",
219 | "While deep learning is a type of machine learning that uses neural networks, not all neural networks qualify as deep learning (e.g., shallow networks).",
220 | "Key difference: Deep learning implies significant depth (many layers) allowing hierarchical feature learning."
221 | ]
222 | doc_ids = [f"rag_doc_{i}" for i in range(len(documents))]
223 | doc_metadatas = [
224 | {"topic": "machine_learning", "source": "demo_document", "id": doc_ids[i]}
225 | for i in range(len(documents))
226 | ]
227 | await vector_db.add_texts(
228 | collection_name=collection_name,
229 | texts=documents,
230 | metadatas=doc_metadatas,
231 | ids=doc_ids
232 | )
233 | logger.success(f"Added {len(documents)} documents for RAG.", emoji_key="success")
234 |
235 | question = "What is the difference between deep learning and neural networks?"
236 | console.print(f"[cyan]User Question:[/cyan] {escape(question)}")
237 |
238 | # Step 1: Retrieve Context
239 | logger.info("Retrieving relevant context...", emoji_key="search")
240 | search_start_time = time.time()
241 | search_results = await vector_db.search_by_text(
242 | collection_name=collection_name,
243 | query_text=question,
244 | top_k=3
245 | )
246 | search_time = time.time() - search_start_time
247 |
248 | logger.success(f"Retrieved {len(search_results)} context snippets in {search_time:.3f}s.", emoji_key="success")
249 |
250 | # Use vector results display utility
251 | from ultimate_mcp_server.utils.display import _display_vector_results
252 | _display_vector_results(search_results, console)
253 |
254 | # Join context for LLM
255 | context_texts = [result["text"] for result in search_results]
256 | context = "\n\n".join(context_texts)
257 | console.print(Panel(escape(context), title="[yellow]Retrieved Context[/yellow]", border_style="dim yellow", expand=False))
258 |
259 | # Step 2: Generate Answer with Context
260 | prompt = f"""Answer the following question based *only* on the provided context:
261 |
262 | Context:
263 | {context}
264 |
265 | Question: {question}
266 |
267 | Answer:"""
268 |
269 | logger.info("Generating answer using retrieved context...", emoji_key="processing")
270 | gen_start_time = time.time()
271 | result = await provider.generate_completion(
272 | prompt=prompt,
273 | model="gpt-4.1-mini", # Use a capable model
274 | temperature=0.2, # Lower temperature for factual answer
275 | max_tokens=200
276 | )
277 | gen_time = time.time() - gen_start_time
278 | logger.success("Answer generated.", emoji_key="success")
279 |
280 | # Track cost for the generation step
281 | tracker.add_call(result)
282 |
283 | # --- Display RAG Result ---
284 | console.print(Panel(
285 | escape(result.text.strip()),
286 | title="[bold green]Generated Answer (RAG)[/bold green]",
287 | border_style="green",
288 | expand=False
289 | ))
290 |
291 | stats_table = Table(title="RAG Stats", box=box.MINIMAL, show_header=False)
292 | stats_table.add_column("Metric", style="cyan")
293 | stats_table.add_column("Value", style="white")
294 | stats_table.add_row("Search Time", f"{search_time:.3f}s")
295 | stats_table.add_row("Generation Time", f"{gen_time:.3f}s")
296 | stats_table.add_row("Input Tokens", str(result.input_tokens))
297 | stats_table.add_row("Output Tokens", str(result.output_tokens))
298 | stats_table.add_row("Total Cost", f"${result.cost:.6f}")
299 | console.print(stats_table)
300 | console.print()
301 |
302 | return True
303 |
304 | except Exception as e:
305 | logger.error(f"Error in RAG demo: {e}", emoji_key="error", exc_info=True)
306 | console.print(f"[bold red]Error:[/bold red] {escape(str(e))}")
307 | return False
308 | finally:
309 | # Clean up collection
310 | try:
311 | logger.info(f"Deleting collection: {collection_name}", emoji_key="db")
312 | await vector_db.delete_collection(collection_name)
313 | except Exception as del_e:
314 | logger.warning(f"Could not delete collection {collection_name}: {del_e}", emoji_key="warning")
315 |
316 | async def main():
317 | """Run all vector search demonstrations."""
318 | console.print(Rule("[bold magenta]Vector Search & RAG Demos Starting[/bold magenta]"))
319 | success = False
320 | tracker = CostTracker()
321 | try:
322 | operations_ok = await demonstrate_vector_operations()
323 | if operations_ok:
324 | rag_ok = await demonstrate_llm_with_vector_retrieval(tracker)
325 | success = rag_ok
326 | else:
327 | logger.warning("Skipping RAG demo due to vector operation errors.", emoji_key="skip")
328 |
329 | except Exception as e:
330 | logger.critical(f"Vector search demo failed: {str(e)}", emoji_key="critical", exc_info=True)
331 | console.print(f"[bold red]Critical Demo Error:[/bold red] {escape(str(e))}")
332 | return 1
333 |
334 | if success:
335 | logger.success("Vector Search & RAG Demos Finished Successfully!", emoji_key="complete")
336 | console.print(Rule("[bold magenta]Vector Search & RAG Demos Complete[/bold magenta]"))
337 | tracker.display_summary(console)
338 | return 0
339 | else:
340 | logger.error("One or more vector search demos failed.", emoji_key="error")
341 | console.print(Rule("[bold red]Vector Search & RAG Demos Finished with Errors[/bold red]"))
342 | tracker.display_summary(console)
343 | return 1
344 |
345 |
346 | if __name__ == "__main__":
347 | # Run the demonstration
348 | exit_code = asyncio.run(main())
349 | sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/exceptions.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Comprehensive exception system for the Ultimate MCP Server.
3 |
4 | This module implements a hierarchical, structured exception system designed to provide
5 | consistent error handling, reporting, and formatting across the MCP ecosystem. The exceptions
6 | are designed to support both internal error handling and MCP protocol-compliant error responses
7 | for client applications and LLMs.
8 |
9 | Key design principles:
10 | 1. HIERARCHICAL: Exception types form a logical inheritance tree with ToolError at the root
11 | 2. CONTEXTUAL: Exceptions carry rich metadata including error codes, details, and context
12 | 3. FORMATTABLE: All exceptions can be converted to standard response dictionaries
13 | 4. TRACEABLE: Original causes and stack traces are preserved for debugging
14 | 5. ACTIONABLE: Error responses include specific information to aid recovery
15 |
16 | Exception hierarchy:
17 | - ToolError (base class)
18 | ├── ToolInputError (parameter validation issues)
19 | ├── ToolExecutionError (runtime execution failures)
20 | ├── ProviderError (LLM provider issues)
21 | │ ├── RateLimitError (provider throttling)
22 | │ └── AuthenticationError (auth failures)
23 | ├── ResourceError (resource access/manipulation)
24 | ├── ValidationError (general validation issues)
25 | ├── ConfigurationError (config problems)
26 | └── StorageError (storage operation failures)
27 |
28 | The module also provides a format_error_response() function that standardizes any
29 | exception (including non-ToolError exceptions) into a consistent error response
30 | format compatible with the MCP protocol.
31 |
32 | Example usage:
33 | ```python
34 | # Raising a specific exception with context
35 | if not os.path.exists(file_path):
36 | raise ResourceError(
37 | message="Cannot access required resource",
38 | resource_type="file",
39 | resource_id=file_path
40 | )
41 |
42 | # Catching and formatting an error
43 | try:
44 | result = process_data(data)
45 | except Exception as e:
46 | # Convert to standard response format for API
47 | error_response = format_error_response(e)
48 | return error_response
49 | ```
50 | """
51 | import traceback
52 | from typing import Any, Dict
53 |
54 |
55 | class ToolError(Exception):
56 | """
57 | Base exception class for all tool-related errors in the Ultimate MCP Server.
58 |
59 | ToolError serves as the foundation of the MCP error handling system, providing a
60 | consistent interface for reporting, formatting, and categorizing errors that occur
61 | during tool execution. All specialized error types in the system inherit from this
62 | class, ensuring consistent error handling across the codebase.
63 |
64 | This exception class enhances Python's standard Exception with:
65 |
66 | - Error codes: Standardized identifiers for error categorization and programmatic handling
67 | - HTTP status mapping: Optional association with HTTP status codes for API responses
68 | - Detailed context: Support for rich error details and contextual information
69 | - Structured formatting: Conversion to standardized error response dictionaries
70 |
71 | The error hierarchy is designed to provide increasingly specific error types while
72 | maintaining a consistent structure that can be easily interpreted by error handlers,
73 | logging systems, and API responses.
74 |
75 | Error responses created from ToolError instances follow the MCP protocol format and
76 | include consistent fields for error type, message, details, and context.
77 |
78 | Usage example:
79 | ```python
80 | try:
81 | # Some operation that might fail
82 | result = process_data(data)
83 | except ToolInputError as e:
84 | # Handle input validation errors specifically
85 | log_validation_error(e)
86 | except ToolError as e:
87 | # Handle all other tool errors generically
88 | report_tool_error(e)
89 | ```
90 | """
91 |
92 | def __init__(self, message, error_code=None, details=None, context=None, http_status_code: int | None = None):
93 | """Initialize the tool error.
94 |
95 | Args:
96 | message: Error message
97 | error_code: Error code (for categorization)
98 | details: Additional error details dictionary
99 | context: Context dictionary (will be merged into details and stored)
100 | http_status_code: Optional HTTP status code associated with the error.
101 | """
102 | self.error_code = error_code or "TOOL_ERROR"
103 | self.http_status_code = http_status_code
104 |
105 | # Combine details and context, giving precedence to context if keys overlap
106 | combined_details = details.copy() if details else {} # Start with a copy of details or empty dict
107 | if context and isinstance(context, dict):
108 | combined_details.update(context) # Merge context into the combined dict
109 |
110 | self.details = combined_details # Store the combined dictionary
111 | self.context = context or {} # Also store original context separately for compatibility
112 |
113 | super().__init__(message)
114 |
115 | class ToolInputError(ToolError):
116 | """Exception raised for errors in the tool input parameters."""
117 |
118 | def __init__(self, message, param_name=None, expected_type=None, provided_value=None, details=None):
119 | """Initialize the tool input error.
120 |
121 | Args:
122 | message: Error message
123 | param_name: Name of the problematic parameter
124 | expected_type: Expected parameter type
125 | provided_value: Value that was provided
126 | details: Additional error details
127 | """
128 | error_details = details or {}
129 | if param_name:
130 | error_details["param_name"] = param_name
131 | if expected_type:
132 | error_details["expected_type"] = str(expected_type)
133 | if provided_value is not None:
134 | error_details["provided_value"] = str(provided_value)
135 |
136 | super().__init__(
137 | message,
138 | error_code="INVALID_PARAMETER",
139 | details=error_details
140 | )
141 |
142 |
143 | class ToolExecutionError(ToolError):
144 | """Exception raised when a tool execution fails."""
145 |
146 | def __init__(self, message, cause=None, details=None):
147 | """Initialize the tool execution error.
148 |
149 | Args:
150 | message: Error message
151 | cause: Original exception that caused the error
152 | details: Additional error details
153 | """
154 | error_details = details or {}
155 | if cause:
156 | error_details["cause"] = str(cause)
157 | error_details["traceback"] = traceback.format_exc()
158 |
159 | super().__init__(
160 | message,
161 | error_code="EXECUTION_ERROR",
162 | details=error_details
163 | )
164 |
165 |
166 | class ProviderError(ToolError):
167 | """Exception raised for provider-specific errors."""
168 |
169 | def __init__(self, message, provider=None, model=None, cause=None, details=None):
170 | """Initialize the provider error.
171 |
172 | Args:
173 | message: Error message
174 | provider: Name of the provider
175 | model: Model name
176 | cause: Original exception that caused the error
177 | details: Additional error details
178 | """
179 | error_details = details or {}
180 | if provider:
181 | error_details["provider"] = provider
182 | if model:
183 | error_details["model"] = model
184 | if cause:
185 | error_details["cause"] = str(cause)
186 | error_details["traceback"] = traceback.format_exc()
187 |
188 | super().__init__(
189 | message,
190 | error_code="PROVIDER_ERROR",
191 | details=error_details
192 | )
193 |
194 |
195 | class ResourceError(ToolError):
196 | """Exception raised for resource-related errors."""
197 |
198 | def __init__(self, message, resource_type=None, resource_id=None, cause=None, details=None):
199 | """Initialize the resource error.
200 |
201 | Args:
202 | message: Error message
203 | resource_type: Type of resource (e.g., "document", "embedding")
204 | resource_id: Resource identifier
205 | cause: Original exception that caused the error
206 | details: Additional error details
207 | """
208 | error_details = details or {}
209 | if resource_type:
210 | error_details["resource_type"] = resource_type
211 | if resource_id:
212 | error_details["resource_id"] = resource_id
213 | if cause:
214 | error_details["cause"] = str(cause)
215 |
216 | super().__init__(
217 | message,
218 | error_code="RESOURCE_ERROR",
219 | details=error_details
220 | )
221 |
222 |
223 | class RateLimitError(ProviderError):
224 | """Exception raised when a provider's rate limit is reached."""
225 |
226 | def __init__(self, message, provider=None, retry_after=None, details=None):
227 | """Initialize the rate limit error.
228 |
229 | Args:
230 | message: Error message
231 | provider: Name of the provider
232 | retry_after: Seconds to wait before retrying
233 | details: Additional error details
234 | """
235 | error_details = details or {}
236 | if retry_after is not None:
237 | error_details["retry_after"] = retry_after
238 |
239 | super().__init__(
240 | message,
241 | provider=provider,
242 | error_code="RATE_LIMIT_ERROR",
243 | details=error_details
244 | )
245 |
246 |
247 | class AuthenticationError(ProviderError):
248 | """Exception raised when authentication with a provider fails."""
249 |
250 | def __init__(self, message, provider=None, details=None):
251 | """Initialize the authentication error.
252 |
253 | Args:
254 | message: Error message
255 | provider: Name of the provider
256 | details: Additional error details
257 | """
258 | super().__init__(
259 | message,
260 | provider=provider,
261 | error_code="AUTHENTICATION_ERROR",
262 | details=details
263 | )
264 |
265 |
266 | class ValidationError(ToolError):
267 | """Exception raised when validation of input/output fails."""
268 |
269 | def __init__(self, message, field_errors=None, details=None):
270 | """Initialize the validation error.
271 |
272 | Args:
273 | message: Error message
274 | field_errors: Dictionary of field-specific errors
275 | details: Additional error details
276 | """
277 | error_details = details or {}
278 | if field_errors:
279 | error_details["field_errors"] = field_errors
280 |
281 | super().__init__(
282 | message,
283 | error_code="VALIDATION_ERROR",
284 | details=error_details
285 | )
286 |
287 |
288 | class ConfigurationError(ToolError):
289 | """Exception raised when there is an issue with configuration."""
290 |
291 | def __init__(self, message, config_key=None, details=None):
292 | """Initialize the configuration error.
293 |
294 | Args:
295 | message: Error message
296 | config_key: Key of the problematic configuration
297 | details: Additional error details
298 | """
299 | error_details = details or {}
300 | if config_key:
301 | error_details["config_key"] = config_key
302 |
303 | super().__init__(
304 | message,
305 | error_code="CONFIGURATION_ERROR",
306 | details=error_details
307 | )
308 |
309 |
310 | class StorageError(ToolError):
311 | """Exception raised when there is an issue with storage operations."""
312 |
313 | def __init__(self, message, operation=None, location=None, details=None):
314 | """Initialize the storage error.
315 |
316 | Args:
317 | message: Error message
318 | operation: Storage operation that failed
319 | location: Location of the storage operation
320 | details: Additional error details
321 | """
322 | error_details = details or {}
323 | if operation:
324 | error_details["operation"] = operation
325 | if location:
326 | error_details["location"] = location
327 |
328 | super().__init__(
329 | message,
330 | error_code="STORAGE_ERROR",
331 | details=error_details
332 | )
333 |
334 |
335 | def format_error_response(error: Exception) -> Dict[str, Any]:
336 | """
337 | Format any exception into a standardized MCP-compliant error response dictionary.
338 |
339 | This utility function creates a structured error response that follows the MCP protocol
340 | format, ensuring consistency in error reporting across different components. It handles
341 | both ToolError instances (with their rich error metadata) and standard Python exceptions,
342 | automatically extracting relevant information to create detailed, actionable error responses.
343 |
344 | The function performs special processing for different error types:
345 |
346 | - For ToolError and subclasses: Extracts error code, details, and context from the exception
347 | - For ToolInputError with path validation: Enhances messages with more user-friendly text
348 | - For standard Python exceptions: Captures traceback and generates appropriate error codes
349 |
350 | The resulting dictionary always contains these standardized fields:
351 | - error: Human-readable error message (string)
352 | - error_code: Categorized error code (string)
353 | - error_type: Name of the exception class (string)
354 | - details: Dictionary with additional error information (object)
355 | - success: Always false for errors (boolean)
356 | - isError: Always true, used by MCP protocol handlers (boolean)
357 |
358 | Args:
359 | error: Any exception instance to format into a response
360 |
361 | Returns:
362 | Dictionary containing standardized error information following the MCP protocol
363 |
364 | Example:
365 | ```python
366 | try:
367 | result = perform_operation()
368 | except Exception as e:
369 | error_response = format_error_response(e)
370 | return error_response # Ready for API response
371 | ```
372 | """
373 | if isinstance(error, ToolError):
374 | # For ToolError instances, extract structured information
375 | error_type = error.__class__.__name__
376 | error_message = str(error)
377 | error_details = error.details or {}
378 |
379 | # Include context in the message for better clarity in user-facing errors
380 | context = getattr(error, 'context', None)
381 | if context and isinstance(context, dict):
382 | # Create a more specific error message based on error type
383 | if isinstance(error, ToolInputError):
384 | # For path validation errors, add more helpful information
385 | if 'path' in context and error_message.endswith('does not exist.'):
386 | error_message = f"File not found: {context.get('path')}"
387 | elif 'path' in context and 'is not a regular file' in error_message:
388 | if 'directory' in error_message.lower():
389 | error_message = f"Cannot read directory as file: {context.get('path')}. Use list_directory instead."
390 | else:
391 | error_message = f"Path exists but is not a file: {context.get('path')}"
392 |
393 | # Add context to details for more information
394 | error_details["context"] = context
395 |
396 | # Look for error_type in details if available
397 | if "error_type" in error_details:
398 | error_type_from_details = error_details["error_type"]
399 | # Use this in the response directly
400 | response_error_type = error_type_from_details
401 | else:
402 | response_error_type = error_type
403 |
404 | # Create a standard error response that the demo can easily process
405 | return {
406 | "error": error_message,
407 | "error_code": error.error_code,
408 | "error_type": response_error_type,
409 | "details": error_details,
410 | "success": False,
411 | "isError": True
412 | }
413 | else:
414 | # For unknown errors, use the actual error message instead of a generic message
415 | error_message = str(error)
416 | if not error_message or error_message.strip() == "":
417 | error_message = f"Unknown error of type {type(error).__name__}"
418 |
419 | # Match the same response structure for consistency
420 | return {
421 | "error": error_message,
422 | "error_code": "UNKNOWN_ERROR",
423 | "error_type": type(error).__name__,
424 | "details": {
425 | "type": type(error).__name__,
426 | "message": error_message,
427 | "traceback": traceback.format_exc()
428 | },
429 | "success": False,
430 | "isError": True
431 | }
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/grok.py:
--------------------------------------------------------------------------------
```python
1 | """Grok (xAI) provider implementation."""
2 | import time
3 | from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
4 |
5 | from openai import AsyncOpenAI
6 |
7 | from ultimate_mcp_server.config import get_config
8 | from ultimate_mcp_server.constants import Provider
9 | from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
10 | from ultimate_mcp_server.utils import get_logger
11 |
12 | # Use the same naming scheme everywhere: logger at module level
13 | logger = get_logger("ultimate_mcp_server.providers.grok")
14 |
15 |
16 | class GrokProvider(BaseProvider):
17 | """Provider implementation for xAI's Grok API."""
18 |
19 | provider_name = Provider.GROK.value
20 |
21 | def __init__(self, api_key: Optional[str] = None, **kwargs):
22 | """Initialize the Grok provider.
23 |
24 | Args:
25 | api_key: xAI API key
26 | **kwargs: Additional options
27 | """
28 | super().__init__(api_key=api_key, **kwargs)
29 | self.base_url = kwargs.get("base_url", "https://api.x.ai/v1")
30 | self.models_cache = None
31 |
32 | async def initialize(self) -> bool:
33 | """Initialize the Grok client.
34 |
35 | Returns:
36 | bool: True if initialization was successful
37 | """
38 | try:
39 | self.client = AsyncOpenAI(
40 | api_key=self.api_key,
41 | base_url=self.base_url,
42 | )
43 |
44 | # Skip API call if using a mock key (for tests)
45 | if self.api_key and "mock-" in self.api_key:
46 | self.logger.info(
47 | "Using mock Grok key - skipping API validation",
48 | emoji_key="mock"
49 | )
50 | return True
51 |
52 | # Test connection by listing models
53 | await self.list_models()
54 |
55 | self.logger.success(
56 | "Grok provider initialized successfully",
57 | emoji_key="provider"
58 | )
59 | return True
60 |
61 | except Exception as e:
62 | self.logger.error(
63 | f"Failed to initialize Grok provider: {str(e)}",
64 | emoji_key="error"
65 | )
66 | return False
67 |
68 | async def generate_completion(
69 | self,
70 | prompt: str,
71 | model: Optional[str] = None,
72 | max_tokens: Optional[int] = None,
73 | temperature: float = 0.7,
74 | **kwargs
75 | ) -> ModelResponse:
76 | """Generate a completion using Grok.
77 |
78 | Args:
79 | prompt: Text prompt to send to the model
80 | model: Model name to use (e.g., "grok-3-latest")
81 | max_tokens: Maximum tokens to generate
82 | temperature: Temperature parameter (0.0-1.0)
83 | **kwargs: Additional model-specific parameters
84 |
85 | Returns:
86 | ModelResponse with completion result
87 |
88 | Raises:
89 | Exception: If API call fails
90 | """
91 | if not self.client:
92 | await self.initialize()
93 |
94 | # Use default model if not specified
95 | model = model or self.get_default_model()
96 |
97 | # Strip provider prefix if present (e.g., "grok/grok-3" -> "grok-3")
98 | if model.startswith(f"{self.provider_name}/"):
99 | original_model = model
100 | model = model.split("/", 1)[1]
101 | self.logger.debug(f"Stripped provider prefix from model name: {original_model} -> {model}")
102 |
103 | # Create messages
104 | messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
105 |
106 | # Get system message if provided
107 | system_message = kwargs.pop("system", None)
108 | if system_message and not any(msg.get("role") == "system" for msg in messages):
109 | messages.insert(0, {"role": "system", "content": system_message})
110 |
111 | # Handle tool support (function calling)
112 | tools = kwargs.pop("tools", None)
113 | tool_choice = kwargs.pop("tool_choice", None)
114 |
115 | # Handle reasoning effort for grok-3-mini models
116 | reasoning_effort = None
117 | if model.startswith("grok-3-mini") and "reasoning_effort" in kwargs:
118 | reasoning_effort = kwargs.pop("reasoning_effort")
119 |
120 | # Prepare API call parameters
121 | params = {
122 | "model": model,
123 | "messages": messages,
124 | "temperature": temperature,
125 | }
126 |
127 | # Add max_tokens if specified
128 | if max_tokens is not None:
129 | params["max_tokens"] = max_tokens
130 |
131 | # Add tools and tool_choice if specified
132 | if tools:
133 | params["tools"] = tools
134 | if tool_choice:
135 | params["tool_choice"] = tool_choice
136 |
137 | # Handle JSON mode
138 | json_mode = kwargs.pop("json_mode", False)
139 | if json_mode:
140 | params["response_format"] = {"type": "json_object"}
141 | self.logger.debug("Setting response_format to JSON mode for Grok")
142 |
143 | # Add reasoning_effort for mini models if specified
144 | if reasoning_effort:
145 | params["reasoning_effort"] = reasoning_effort
146 |
147 | # Add any additional parameters
148 | params.update(kwargs)
149 |
150 | # Log request
151 | self.logger.info(
152 | f"Generating completion with Grok model {model}",
153 | emoji_key=self.provider_name,
154 | prompt_length=len(prompt),
155 | json_mode_requested=json_mode
156 | )
157 |
158 | try:
159 | # API call with timing
160 | response, processing_time = await self.process_with_timer(
161 | self.client.chat.completions.create, **params
162 | )
163 |
164 | # Extract response text
165 | completion_text = response.choices[0].message.content
166 |
167 | # Extract reasoning content for grok-3-mini models if available
168 | reasoning_content = None
169 | if hasattr(response.choices[0].message, "reasoning_content"):
170 | reasoning_content = response.choices[0].message.reasoning_content
171 |
172 | # Get usage statistics
173 | input_tokens = response.usage.prompt_tokens
174 | output_tokens = response.usage.completion_tokens
175 | total_tokens = response.usage.total_tokens
176 |
177 | # Extract reasoning tokens if available
178 | reasoning_tokens = None
179 | if hasattr(response.usage, "completion_tokens_details") and \
180 | hasattr(response.usage.completion_tokens_details, "reasoning_tokens"):
181 | reasoning_tokens = response.usage.completion_tokens_details.reasoning_tokens
182 |
183 | # Create metadata with reasoning information
184 | metadata = {}
185 | if reasoning_content:
186 | metadata["reasoning_content"] = reasoning_content
187 | if reasoning_tokens:
188 | metadata["reasoning_tokens"] = reasoning_tokens
189 |
190 | # Create standardized response
191 | result = ModelResponse(
192 | text=completion_text,
193 | model=model,
194 | provider=self.provider_name,
195 | input_tokens=input_tokens,
196 | output_tokens=output_tokens,
197 | total_tokens=total_tokens,
198 | processing_time=processing_time,
199 | raw_response=response,
200 | metadata=metadata,
201 | )
202 |
203 | # Log success
204 | self.logger.success(
205 | "Grok completion successful",
206 | emoji_key="success",
207 | model=model,
208 | tokens={
209 | "input": result.input_tokens,
210 | "output": result.output_tokens
211 | },
212 | cost=result.cost,
213 | time=result.processing_time
214 | )
215 |
216 | return result
217 |
218 | except Exception as e:
219 | self.logger.error(
220 | f"Grok completion failed: {str(e)}",
221 | emoji_key="error",
222 | model=model
223 | )
224 | raise
225 |
226 | async def generate_completion_stream(
227 | self,
228 | prompt: str,
229 | model: Optional[str] = None,
230 | max_tokens: Optional[int] = None,
231 | temperature: float = 0.7,
232 | **kwargs
233 | ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
234 | """Generate a streaming completion using Grok.
235 |
236 | Args:
237 | prompt: Text prompt to send to the model
238 | model: Model name to use (e.g., "grok-3-latest")
239 | max_tokens: Maximum tokens to generate
240 | temperature: Temperature parameter (0.0-1.0)
241 | **kwargs: Additional model-specific parameters
242 |
243 | Yields:
244 | Tuple of (text_chunk, metadata)
245 |
246 | Raises:
247 | Exception: If API call fails
248 | """
249 | if not self.client:
250 | await self.initialize()
251 |
252 | # Use default model if not specified
253 | model = model or self.get_default_model()
254 |
255 | # Strip provider prefix if present (e.g., "grok/grok-3" -> "grok-3")
256 | if model.startswith(f"{self.provider_name}/"):
257 | original_model = model
258 | model = model.split("/", 1)[1]
259 | self.logger.debug(f"Stripped provider prefix from model name (stream): {original_model} -> {model}")
260 |
261 | # Create messages
262 | messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
263 |
264 | # Get system message if provided
265 | system_message = kwargs.pop("system", None)
266 | if system_message and not any(msg.get("role") == "system" for msg in messages):
267 | messages.insert(0, {"role": "system", "content": system_message})
268 |
269 | # Handle reasoning effort for grok-3-mini models
270 | reasoning_effort = None
271 | if model.startswith("grok-3-mini") and "reasoning_effort" in kwargs:
272 | reasoning_effort = kwargs.pop("reasoning_effort")
273 |
274 | # Prepare API call parameters
275 | params = {
276 | "model": model,
277 | "messages": messages,
278 | "temperature": temperature,
279 | "stream": True,
280 | }
281 |
282 | # Add max_tokens if specified
283 | if max_tokens is not None:
284 | params["max_tokens"] = max_tokens
285 |
286 | # Handle JSON mode
287 | json_mode = kwargs.pop("json_mode", False)
288 | if json_mode:
289 | params["response_format"] = {"type": "json_object"}
290 | self.logger.debug("Setting response_format to JSON mode for Grok streaming")
291 |
292 | # Add reasoning_effort for mini models if specified
293 | if reasoning_effort:
294 | params["reasoning_effort"] = reasoning_effort
295 |
296 | # Add any additional parameters
297 | params.update(kwargs)
298 |
299 | # Log request
300 | self.logger.info(
301 | f"Generating streaming completion with Grok model {model}",
302 | emoji_key=self.provider_name,
303 | prompt_length=len(prompt),
304 | json_mode_requested=json_mode
305 | )
306 |
307 | start_time = time.time()
308 | total_chunks = 0
309 |
310 | try:
311 | # Make streaming API call
312 | stream = await self.client.chat.completions.create(**params)
313 |
314 | # Process the stream
315 | async for chunk in stream:
316 | total_chunks += 1
317 |
318 | # Extract content from the chunk
319 | delta = chunk.choices[0].delta
320 | content = delta.content or ""
321 |
322 | # Extract reasoning content for grok-3-mini models if available
323 | reasoning_content = None
324 | if hasattr(delta, "reasoning_content"):
325 | reasoning_content = delta.reasoning_content
326 |
327 | # Metadata for this chunk
328 | metadata = {
329 | "model": model,
330 | "provider": self.provider_name,
331 | "chunk_index": total_chunks,
332 | "finish_reason": chunk.choices[0].finish_reason,
333 | }
334 |
335 | # Add reasoning content to metadata if available
336 | if reasoning_content:
337 | metadata["reasoning_content"] = reasoning_content
338 |
339 | yield content, metadata
340 |
341 | # Log success
342 | processing_time = time.time() - start_time
343 | self.logger.success(
344 | "Grok streaming completion successful",
345 | emoji_key="success",
346 | model=model,
347 | chunks=total_chunks,
348 | time=processing_time
349 | )
350 |
351 | except Exception as e:
352 | self.logger.error(
353 | f"Grok streaming completion failed: {str(e)}",
354 | emoji_key="error",
355 | model=model
356 | )
357 | raise
358 |
359 | async def list_models(self) -> List[Dict[str, Any]]:
360 | """List available Grok models.
361 |
362 | Returns:
363 | List of model information dictionaries
364 | """
365 | if self.models_cache:
366 | return self.models_cache
367 |
368 | try:
369 | if not self.client:
370 | await self.initialize()
371 |
372 | # Fetch models from API (Grok API uses the same endpoint as OpenAI)
373 | response = await self.client.models.list()
374 |
375 | # Process response
376 | models = []
377 | for model in response.data:
378 | # Filter to only include grok-3 models
379 | if model.id.startswith("grok-3"):
380 | models.append({
381 | "id": model.id,
382 | "provider": self.provider_name,
383 | "created": model.created,
384 | "owned_by": model.owned_by,
385 | })
386 |
387 | # Cache results
388 | self.models_cache = models
389 |
390 | return models
391 |
392 | except Exception as e:
393 | self.logger.error(
394 | f"Failed to list Grok models: {str(e)}",
395 | emoji_key="error"
396 | )
397 |
398 | # Return basic grok-3 models on error based on documentation
399 | return [
400 | {
401 | "id": "grok-3-latest",
402 | "provider": self.provider_name,
403 | "description": "Flagship model for enterprise tasks (latest version)",
404 | },
405 | {
406 | "id": "grok-3-beta",
407 | "provider": self.provider_name,
408 | "description": "Flagship model that excels at enterprise tasks, domain knowledge",
409 | },
410 | {
411 | "id": "grok-3-fast-latest",
412 | "provider": self.provider_name,
413 | "description": "Fast version of grok-3, same quality with higher cost",
414 | },
415 | {
416 | "id": "grok-3-mini-latest",
417 | "provider": self.provider_name,
418 | "description": "Lightweight model with thinking capabilities",
419 | },
420 | {
421 | "id": "grok-3-mini-fast-latest",
422 | "provider": self.provider_name,
423 | "description": "Fast version of grok-3-mini, same quality with higher cost",
424 | }
425 | ]
426 |
427 | def get_default_model(self) -> str:
428 | """Get the default Grok model.
429 |
430 | Returns:
431 | Default model name
432 | """
433 | # Safely get from config if available
434 | try:
435 | config = get_config()
436 | provider_config = getattr(config, 'providers', {}).get(self.provider_name, None)
437 | if provider_config and provider_config.default_model:
438 | return provider_config.default_model
439 | except (AttributeError, TypeError):
440 | # Handle case when providers attribute doesn't exist or isn't a dict
441 | pass
442 |
443 | # Otherwise return hard-coded default
444 | return "grok-3-beta"
445 |
446 | async def check_api_key(self) -> bool:
447 | """Check if the Grok API key is valid.
448 |
449 | Returns:
450 | bool: True if API key is valid
451 | """
452 | try:
453 | # Just list models as a simple validation
454 | await self.list_models()
455 | return True
456 | except Exception:
457 | return False
```
--------------------------------------------------------------------------------
/examples/analytics_reporting_demo.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """Analytics and reporting demonstration for Ultimate MCP Server."""
3 | import asyncio
4 | import sys
5 | import time
6 | from pathlib import Path
7 |
8 | # Add project root to path for imports when running as script
9 | sys.path.insert(0, str(Path(__file__).parent.parent))
10 |
11 | from rich import box
12 | from rich.live import Live
13 | from rich.markup import escape
14 | from rich.rule import Rule
15 | from rich.table import Table
16 |
17 | from ultimate_mcp_server.constants import Provider
18 | from ultimate_mcp_server.core.providers.base import get_provider
19 | from ultimate_mcp_server.services.analytics.metrics import get_metrics_tracker
20 | from ultimate_mcp_server.utils import get_logger
21 | from ultimate_mcp_server.utils.display import CostTracker, display_analytics_metrics
22 |
23 | # --- Add Rich Imports ---
24 | from ultimate_mcp_server.utils.logging.console import console
25 |
26 | # ----------------------
27 |
28 | # Initialize logger
29 | logger = get_logger("example.analytics_reporting")
30 |
31 |
32 | async def simulate_llm_usage(tracker: CostTracker = None):
33 | """Simulate various LLM API calls to generate analytics data."""
34 | console.print(Rule("[bold blue]Simulating LLM Usage[/bold blue]"))
35 | logger.info("Simulating LLM usage to generate analytics data", emoji_key="start")
36 |
37 | metrics = get_metrics_tracker()
38 | providers_info = []
39 |
40 | # Setup providers - Get keys from loaded config via get_provider
41 | # REMOVED provider_configs dict and direct decouple calls
42 |
43 | # Iterate through defined Provider enums
44 | for provider_enum in Provider:
45 | # Skip if provider doesn't make sense to simulate here, e.g., OpenRouter might need extra config
46 | if provider_enum == Provider.OPENROUTER:
47 | logger.info(f"Skipping {provider_enum.value} in simulation.", emoji_key="skip")
48 | continue
49 |
50 | try:
51 | # Let get_provider handle config/key lookup internally
52 | provider = await get_provider(provider_enum.value) # REMOVED api_key argument
53 | if provider:
54 | # Ensure initialization (get_provider might not initialize)
55 | # await provider.initialize() # Initialization might be done by get_provider or completion call
56 | providers_info.append((provider_enum.value, provider))
57 | logger.info(f"Provider instance obtained for: {provider_enum.value}", emoji_key="provider")
58 | else:
59 | logger.info(f"Provider {provider_enum.value} not configured or key missing, skipping simulation.", emoji_key="skip")
60 | except Exception as e:
61 | logger.warning(f"Failed to get/initialize {provider_enum.value}: {e}", emoji_key="warning")
62 |
63 | if not providers_info:
64 | logger.error("No providers could be initialized. Cannot simulate usage.", emoji_key="error")
65 | console.print("[bold red]Error:[/bold red] No LLM providers could be initialized. Please check your API keys.")
66 | return metrics # Return empty metrics
67 |
68 | console.print(f"Simulating usage with [cyan]{len(providers_info)}[/cyan] providers.")
69 |
70 | prompts = [
71 | "What is machine learning?",
72 | "Explain the concept of neural networks in simple terms.",
73 | "Write a short story about a robot that learns to love.",
74 | "Summarize the key innovations in artificial intelligence over the past decade.",
75 | "What are the ethical considerations in developing advanced AI systems?"
76 | ]
77 |
78 | total_calls = len(providers_info) * len(prompts)
79 | call_count = 0
80 |
81 | for provider_name, provider in providers_info:
82 | # Use default model unless specific logic requires otherwise
83 | model_to_use = provider.get_default_model()
84 | if not model_to_use:
85 | logger.warning(f"No default model found for {provider_name}, skipping provider.", emoji_key="warning")
86 | continue # Skip this provider if no default model
87 |
88 | for prompt in prompts:
89 | call_count += 1
90 | logger.info(
91 | f"Simulating call ({call_count}/{total_calls}) for {provider_name}",
92 | emoji_key="processing"
93 | )
94 |
95 | try:
96 | start_time = time.time()
97 | result = await provider.generate_completion(
98 | prompt=prompt,
99 | model=model_to_use, # Use determined model
100 | temperature=0.7,
101 | max_tokens=150
102 | )
103 | completion_time = time.time() - start_time
104 |
105 | # Track costs if tracker provided
106 | if tracker:
107 | tracker.add_call(result)
108 |
109 | # Record metrics using the actual model returned in the result
110 | metrics.record_request(
111 | provider=provider_name,
112 | model=result.model, # Use model from result
113 | input_tokens=result.input_tokens,
114 | output_tokens=result.output_tokens,
115 | cost=result.cost,
116 | duration=completion_time,
117 | success=True
118 | )
119 |
120 | # Log less verbosely during simulation
121 | # logger.success("Completion generated", emoji_key="success", provider=provider_name, model=result.model)
122 |
123 | await asyncio.sleep(0.2) # Shorter delay
124 |
125 | except Exception as e:
126 | logger.error(f"Error simulating completion for {provider_name}: {str(e)}", emoji_key="error")
127 | metrics.record_request(
128 | provider=provider_name,
129 | model=model_to_use, # Log error against intended model
130 | input_tokens=0, # Assume 0 tokens on error for simplicity
131 | output_tokens=0,
132 | cost=0.0,
133 | duration=time.time() - start_time, # Log duration even on error
134 | success=False # Mark as failed
135 | )
136 |
137 | logger.info("Finished simulating LLM usage", emoji_key="complete")
138 | return metrics
139 |
140 |
141 | async def demonstrate_metrics_tracking(tracker: CostTracker = None):
142 | """Demonstrate metrics tracking functionality using Rich."""
143 | console.print(Rule("[bold blue]Metrics Tracking Demonstration[/bold blue]"))
144 | logger.info("Starting metrics tracking demonstration", emoji_key="start")
145 |
146 | metrics = get_metrics_tracker(reset_on_start=True)
147 | await simulate_llm_usage(tracker)
148 | stats = metrics.get_stats()
149 |
150 | # Use the standardized display utility instead of custom code
151 | display_analytics_metrics(stats)
152 |
153 | return stats
154 |
155 |
156 | async def demonstrate_analytics_reporting():
157 | """Demonstrate analytics reporting functionality."""
158 | console.print(Rule("[bold blue]Analytics Reporting Demonstration[/bold blue]"))
159 | logger.info("Starting analytics reporting demonstration", emoji_key="start")
160 |
161 | metrics = get_metrics_tracker()
162 | stats = metrics.get_stats()
163 | if stats["general"]["requests_total"] == 0:
164 | logger.warning("No metrics data found. Running simulation first.", emoji_key="warning")
165 | await simulate_llm_usage()
166 | stats = metrics.get_stats()
167 |
168 | # --- Perform calculations directly from stats ---
169 | general_stats = stats.get("general", {})
170 | provider_stats = stats.get("providers", {})
171 | model_stats = stats.get("models", {})
172 | daily_usage_stats = stats.get("daily_usage", [])
173 | total_cost = general_stats.get("cost_total", 0.0)
174 | total_tokens = general_stats.get("tokens_total", 0)
175 |
176 | # Calculate cost by provider
177 | cost_by_provider = []
178 | if total_cost > 0:
179 | cost_by_provider = [
180 | {
181 | "name": provider,
182 | "value": data.get("cost", 0.0),
183 | "percentage": data.get("cost", 0.0) / total_cost * 100 if total_cost > 0 else 0,
184 | }
185 | for provider, data in provider_stats.items()
186 | ]
187 | cost_by_provider.sort(key=lambda x: x["value"], reverse=True)
188 |
189 | # Calculate cost by model
190 | cost_by_model = []
191 | if total_cost > 0:
192 | cost_by_model = [
193 | {
194 | "name": model,
195 | "value": data.get("cost", 0.0),
196 | "percentage": data.get("cost", 0.0) / total_cost * 100 if total_cost > 0 else 0,
197 | }
198 | for model, data in model_stats.items()
199 | ]
200 | cost_by_model.sort(key=lambda x: x["value"], reverse=True)
201 |
202 | # Calculate tokens by provider
203 | tokens_by_provider = []
204 | if total_tokens > 0:
205 | tokens_by_provider = [
206 | {
207 | "name": provider,
208 | "value": data.get("tokens", 0),
209 | "percentage": data.get("tokens", 0) / total_tokens * 100 if total_tokens > 0 else 0,
210 | }
211 | for provider, data in provider_stats.items()
212 | ]
213 | tokens_by_provider.sort(key=lambda x: x["value"], reverse=True)
214 |
215 | # Calculate tokens by model
216 | tokens_by_model = []
217 | if total_tokens > 0:
218 | tokens_by_model = [
219 | {
220 | "name": model,
221 | "value": data.get("tokens", 0),
222 | "percentage": data.get("tokens", 0) / total_tokens * 100 if total_tokens > 0 else 0,
223 | }
224 | for model, data in model_stats.items()
225 | ]
226 | tokens_by_model.sort(key=lambda x: x["value"], reverse=True)
227 |
228 | # Calculate daily cost trend (simplified: just show daily cost, no % change)
229 | daily_cost_trend = [
230 | {
231 | "date": day.get("date"),
232 | "cost": day.get("cost", 0.0)
233 | }
234 | for day in daily_usage_stats
235 | ]
236 | daily_cost_trend.sort(key=lambda x: x["date"]) # Sort by date
237 | # --------------------------------------------------
238 |
239 | # Display reports using tables (using the calculated data)
240 | # Provider cost report
241 | if cost_by_provider:
242 | provider_cost_table = Table(title="[bold green]Cost by Provider[/bold green]", box=box.ROUNDED)
243 | provider_cost_table.add_column("Provider", style="magenta")
244 | provider_cost_table.add_column("Cost", style="green", justify="right")
245 | provider_cost_table.add_column("Percentage", style="cyan", justify="right")
246 |
247 | for item in cost_by_provider:
248 | provider_cost_table.add_row(
249 | escape(item["name"]),
250 | f"${item['value']:.6f}",
251 | f"{item['percentage']:.1f}%"
252 | )
253 | console.print(provider_cost_table)
254 | console.print()
255 |
256 | # Model cost report
257 | if cost_by_model:
258 | model_cost_table = Table(title="[bold green]Cost by Model[/bold green]", box=box.ROUNDED)
259 | model_cost_table.add_column("Model", style="blue")
260 | model_cost_table.add_column("Cost", style="green", justify="right")
261 | model_cost_table.add_column("Percentage", style="cyan", justify="right")
262 |
263 | for item in cost_by_model:
264 | model_cost_table.add_row(
265 | escape(item["name"]),
266 | f"${item['value']:.6f}",
267 | f"{item['percentage']:.1f}%"
268 | )
269 | console.print(model_cost_table)
270 | console.print()
271 |
272 | # Tokens by provider report
273 | if tokens_by_provider:
274 | tokens_provider_table = Table(title="[bold green]Tokens by Provider[/bold green]", box=box.ROUNDED)
275 | tokens_provider_table.add_column("Provider", style="magenta")
276 | tokens_provider_table.add_column("Tokens", style="white", justify="right")
277 | tokens_provider_table.add_column("Percentage", style="cyan", justify="right")
278 |
279 | for item in tokens_by_provider:
280 | tokens_provider_table.add_row(
281 | escape(item["name"]),
282 | f"{item['value']:,}",
283 | f"{item['percentage']:.1f}%"
284 | )
285 | console.print(tokens_provider_table)
286 | console.print()
287 |
288 | # Tokens by model report
289 | if tokens_by_model:
290 | tokens_model_table = Table(title="[bold green]Tokens by Model[/bold green]", box=box.ROUNDED)
291 | tokens_model_table.add_column("Model", style="blue")
292 | tokens_model_table.add_column("Tokens", style="white", justify="right")
293 | tokens_model_table.add_column("Percentage", style="cyan", justify="right")
294 |
295 | for item in tokens_by_model:
296 | tokens_model_table.add_row(
297 | escape(item["name"]),
298 | f"{item['value']:,}",
299 | f"{item['percentage']:.1f}%"
300 | )
301 | console.print(tokens_model_table)
302 | console.print()
303 |
304 | # Daily cost trend report
305 | if daily_cost_trend:
306 | daily_trend_table = Table(title="[bold green]Daily Cost Trend[/bold green]", box=box.ROUNDED)
307 | daily_trend_table.add_column("Date", style="yellow")
308 | daily_trend_table.add_column("Cost", style="green", justify="right")
309 | # daily_trend_table.add_column("Change", style="cyan", justify="right") # Removed change calculation for simplicity
310 |
311 | for item in daily_cost_trend:
312 | # change_str = f"{item.get('change', 0):.1f}%" if 'change' in item else "N/A"
313 | # change_style = ""
314 | # if 'change' in item:
315 | # if item['change'] > 0:
316 | # change_style = "[red]+"
317 | # elif item['change'] < 0:
318 | # change_style = "[green]"
319 |
320 | daily_trend_table.add_row(
321 | item["date"],
322 | f"${item['cost']:.6f}"
323 | # f"{change_style}{change_str}[/]" if change_style else change_str
324 | )
325 | console.print(daily_trend_table)
326 | console.print()
327 |
328 | # Return the calculated data instead of None
329 | return {
330 | "cost_by_provider": cost_by_provider,
331 | "cost_by_model": cost_by_model,
332 | "tokens_by_provider": tokens_by_provider,
333 | "tokens_by_model": tokens_by_model,
334 | "daily_cost_trend": daily_cost_trend
335 | }
336 |
337 |
338 | async def demonstrate_real_time_monitoring():
339 | """Demonstrate real-time metrics monitoring using Rich Live."""
340 | console.print(Rule("[bold blue]Real-Time Monitoring Demonstration[/bold blue]"))
341 | logger.info("Starting real-time monitoring (updates every 2s for 10s)", emoji_key="start")
342 |
343 | metrics = get_metrics_tracker() # Use existing tracker
344 |
345 | def generate_stats_table() -> Table:
346 | """Generates a Rich Table with current stats."""
347 | stats = metrics.get_stats()["general"]
348 | table = Table(title="Live LLM Usage Stats", box=box.ROUNDED)
349 | table.add_column("Metric", style="cyan")
350 | table.add_column("Value", style="white", justify="right")
351 | table.add_row("Total Requests", f"{stats['requests_total']:,}")
352 | table.add_row("Total Tokens", f"{stats['tokens_total']:,}")
353 | table.add_row("Total Cost", f"${stats['cost_total']:.6f}")
354 | table.add_row("Total Errors", f"{stats['errors_total']:,}")
355 | return table
356 |
357 | try:
358 | with Live(generate_stats_table(), refresh_per_second=0.5, console=console) as live:
359 | # Simulate some activity in the background while monitoring
360 | # We could run simulate_llm_usage again, but let's just wait for demo purposes
361 | end_time = time.time() + 10 # Monitor for 10 seconds
362 | while time.time() < end_time:
363 | # In a real app, other tasks would be modifying metrics here
364 | live.update(generate_stats_table())
365 | await asyncio.sleep(2) # Update display every 2 seconds
366 |
367 | # Final update
368 | live.update(generate_stats_table())
369 |
370 | except Exception as e:
371 | logger.error(f"Error during live monitoring: {e}", emoji_key="error", exc_info=True)
372 |
373 | logger.info("Finished real-time monitoring demonstration", emoji_key="complete")
374 | console.print()
375 |
376 |
377 | async def main():
378 | """Run all analytics and reporting demonstrations."""
379 | tracker = CostTracker() # Create cost tracker instance
380 | try:
381 | # Demonstrate metrics tracking (includes simulation)
382 | await demonstrate_metrics_tracking(tracker)
383 |
384 | # Demonstrate report generation
385 | await demonstrate_analytics_reporting()
386 |
387 | # Demonstrate real-time monitoring
388 | await demonstrate_real_time_monitoring()
389 |
390 | # Display final cost summary
391 | tracker.display_summary(console)
392 |
393 | except Exception as e:
394 | logger.critical(f"Analytics demo failed: {str(e)}", emoji_key="critical", exc_info=True)
395 | return 1
396 |
397 | logger.success("Analytics & Reporting Demo Finished Successfully!", emoji_key="complete")
398 | return 0
399 |
400 |
401 | if __name__ == "__main__":
402 | exit_code = asyncio.run(main())
403 | sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/examples/cost_optimization.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """Cost optimization examples for Ultimate MCP Server."""
3 | import asyncio
4 | import sys
5 | from pathlib import Path
6 |
7 | # Add project root to path for imports when running as script
8 | sys.path.insert(0, str(Path(__file__).parent.parent))
9 |
10 | from fastmcp import FastMCP
11 | from rich import box
12 | from rich.markup import escape
13 | from rich.panel import Panel
14 | from rich.rule import Rule
15 | from rich.table import Table
16 |
17 | from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS, DEFAULT_MODELS, Provider
18 | from ultimate_mcp_server.core.providers.base import get_provider
19 | from ultimate_mcp_server.tools.optimization import estimate_cost, recommend_model
20 | from ultimate_mcp_server.utils import get_logger
21 |
22 | # --- Import display utilities ---
23 | from ultimate_mcp_server.utils.display import CostTracker, parse_and_display_result
24 |
25 | # --- Add Rich Imports ---
26 | from ultimate_mcp_server.utils.logging.console import console
27 | from ultimate_mcp_server.utils.text import count_tokens # Import proper token counting function
28 |
29 | # ----------------------
30 |
31 | # Initialize logger
32 | logger = get_logger("example.cost_optimization")
33 |
34 | # Initialize FastMCP server
35 | mcp = FastMCP("Cost Optimization Demo")
36 |
37 | # Create optimization tools instance with MCP server - this registers all the tools
38 | # OptimizationTools(mcp)
39 |
40 | # Manually register the tools needed for this demo on the local MCP instance
41 | mcp.tool()(estimate_cost)
42 | mcp.tool()(recommend_model)
43 | logger.info("Manually registered optimization tools (estimate_cost, recommend_model).")
44 |
45 | # Helper function to unpack tool results that might be returned as a list
46 | def unpack_tool_result(result):
47 | """
48 | Handles the case where tool results are returned as a list instead of a dictionary.
49 |
50 | Args:
51 | result: Result from an MCP tool call
52 |
53 | Returns:
54 | Unpacked result as a dictionary if possible, or the original result
55 | """
56 | # Check if result is a list with content
57 | if isinstance(result, list) and result:
58 | # Try to get the first item if it's a dictionary
59 | first_item = result[0]
60 | if isinstance(first_item, dict):
61 | return first_item
62 |
63 | # Handle case where item might be an object with a text attribute
64 | if hasattr(first_item, 'text'):
65 | try:
66 | import json
67 | # Try to parse as JSON
68 | return json.loads(first_item.text)
69 | except (json.JSONDecodeError, AttributeError):
70 | pass
71 |
72 | # Return the original result if we can't unpack
73 | return result
74 |
75 | # Modified to use provider system directly
76 | async def _execute_with_recommended_model(balanced_rec, prompt, estimated_output_tokens, tracker: CostTracker):
77 | """Execute completion with the recommended model using provider system."""
78 | if not balanced_rec:
79 | logger.error("No recommended model provided", emoji_key="error")
80 | return
81 |
82 | provider_name = _get_provider_for_model(balanced_rec)
83 | if not provider_name:
84 | logger.error(f"Could not determine provider for recommended model {balanced_rec}", emoji_key="error")
85 | return
86 |
87 | try:
88 | # Get the provider without explicitly passing an API key (let the provider system handle it)
89 | provider = await get_provider(provider_name)
90 | await provider.initialize()
91 |
92 | # Generate the completion and get accurate token counts from the response
93 | result = await provider.generate_completion(
94 | prompt=prompt,
95 | model=balanced_rec,
96 | temperature=0.7,
97 | max_tokens=estimated_output_tokens
98 | )
99 |
100 | # Track cost
101 | tracker.add_call(result)
102 |
103 | # Display result information using actual token counts from the API response
104 | logger.success(f"Completion with {balanced_rec} successful", emoji_key="success")
105 | logger.info(f"Actual input tokens: {result.input_tokens}, output tokens: {result.output_tokens}", emoji_key="token")
106 | logger.info(f"Cost based on actual usage: ${result.cost:.6f}", emoji_key="cost")
107 |
108 | console.print(Panel(
109 | escape(result.text.strip()),
110 | title=f"[bold green]Response from {escape(balanced_rec)}[/bold green]",
111 | border_style="green"
112 | ))
113 |
114 | # Display Stats using display utility
115 | parse_and_display_result(
116 | f"Execution Stats for {balanced_rec}",
117 | None,
118 | {
119 | "model": balanced_rec,
120 | "provider": provider_name,
121 | "cost": result.cost,
122 | "tokens": {
123 | "input": result.input_tokens,
124 | "output": result.output_tokens,
125 | "total": result.input_tokens + result.output_tokens
126 | },
127 | "processing_time": result.processing_time
128 | }
129 | )
130 |
131 | except Exception as e:
132 | logger.error(f"Error running completion with {balanced_rec}: {e}", emoji_key="error", exc_info=True)
133 |
134 | async def demonstrate_cost_optimization(tracker: CostTracker):
135 | """Demonstrate cost optimization features using Rich."""
136 | console.print(Rule("[bold blue]Cost Optimization Demonstration[/bold blue]"))
137 | logger.info("Starting cost optimization demonstration", emoji_key="start")
138 |
139 | prompt = """
140 | Write a comprehensive analysis of how machine learning is being applied in the healthcare industry,
141 | focusing on diagnostic tools, treatment optimization, and administrative efficiency.
142 | Include specific examples and potential future developments.
143 | """
144 |
145 | # Note for the demo: Use proper token counting, not character estimation
146 | logger.info("Calculating tokens for the prompt with tiktoken", emoji_key="info")
147 | # Use default models from constants for the initial token count display
148 | models_to_show = list(DEFAULT_MODELS.values())
149 | for model_name in models_to_show:
150 | token_count = count_tokens(prompt, model_name)
151 | logger.info(f"Model {model_name}: {token_count} input tokens", emoji_key="token")
152 |
153 | estimated_output_tokens = 500 # Estimate output length for the prompt
154 |
155 | # --- Cost Estimation ---
156 | console.print(Rule("[cyan]Cost Estimation[/cyan]"))
157 | logger.info("Estimating costs for different models", emoji_key="cost")
158 |
159 | # Dynamically get models from the constants file
160 | models_to_compare = list(COST_PER_MILLION_TOKENS.keys())
161 |
162 | cost_table = Table(title="Estimated Costs", box=box.ROUNDED, show_header=True)
163 | cost_table.add_column("Model", style="magenta")
164 | cost_table.add_column("Input Tokens", style="white", justify="right")
165 | cost_table.add_column("Output Tokens", style="white", justify="right")
166 | cost_table.add_column("Input Rate ($/M)", style="dim blue", justify="right")
167 | cost_table.add_column("Output Rate ($/M)", style="dim blue", justify="right")
168 | cost_table.add_column("Estimated Cost", style="green", justify="right")
169 |
170 | cost_estimates = []
171 | for model_name_only in models_to_compare: # Renamed variable for clarity
172 | try:
173 | # Determine provider and construct full model name
174 | provider_name = _get_provider_for_model(model_name_only)
175 | if not provider_name:
176 | logger.warning(f"Could not determine provider for model '{model_name_only}'. Skipping cost estimation.", emoji_key="warning")
177 | cost_table.add_row(escape(model_name_only), "-", "-", "-", "-", "[dim yellow]Unknown provider[/dim yellow]")
178 | continue
179 |
180 | full_model_name = f"{provider_name}/{model_name_only}"
181 |
182 | # Call the estimate_cost tool with the prefixed model name
183 | raw_result = await mcp.call_tool("estimate_cost", {
184 | "prompt": prompt,
185 | "model": full_model_name, # Use the prefixed name
186 | "max_tokens": estimated_output_tokens
187 | })
188 |
189 | # Unpack the result
190 | estimate_result = unpack_tool_result(raw_result)
191 |
192 | if "error" in estimate_result:
193 | # Log the error with the original model name for clarity in logs
194 | logger.warning(f"Could not estimate cost for {model_name_only}: {estimate_result['error']}", emoji_key="warning")
195 | cost_table.add_row(escape(model_name_only), "-", "-", "-", "-", f"[dim red]{estimate_result['error']}[/dim red]")
196 | else:
197 | cost_estimates.append(estimate_result) # Store for later use if needed
198 | cost_table.add_row(
199 | escape(model_name_only), # Display original model name in table
200 | str(estimate_result["tokens"]["input"]),
201 | str(estimate_result["tokens"]["output"]),
202 | f"${estimate_result['rate']['input']:.2f}",
203 | f"${estimate_result['rate']['output']:.2f}",
204 | f"${estimate_result['cost']:.6f}"
205 | )
206 | except Exception as e:
207 | logger.error(f"Error calling estimate_cost for {model_name_only}: {e}", emoji_key="error", exc_info=True)
208 | cost_table.add_row(escape(model_name_only), "-", "-", "-", "-", "[red]Error[/red]")
209 |
210 | console.print(cost_table)
211 | console.print()
212 |
213 | # --- Model Recommendation ---
214 | console.print(Rule("[cyan]Model Recommendation[/cyan]"))
215 | logger.info("Getting model recommendations based on different priorities", emoji_key="recommend")
216 |
217 | # Define task parameters for recommendation
218 | task_info = {
219 | "task_type": "analysis_generation",
220 | "expected_input_length": len(prompt),
221 | "expected_output_length": estimated_output_tokens * 4, # Convert tokens back to chars approx
222 | "required_capabilities": ["reasoning", "knowledge"],
223 | "max_cost": 0.005 # Example max cost constraint
224 | }
225 |
226 | priorities = ["balanced", "cost", "quality", "speed"]
227 |
228 | recommendation_table = Table(title="Model Recommendations", box=box.ROUNDED, show_header=True)
229 | recommendation_table.add_column("Priority", style="yellow")
230 | recommendation_table.add_column("1st Rec", style="magenta")
231 | recommendation_table.add_column("Cost", style="green", justify="right")
232 | recommendation_table.add_column("Quality", style="blue", justify="right")
233 | recommendation_table.add_column("Speed", style="cyan", justify="right")
234 | recommendation_table.add_column("Score", style="white", justify="right")
235 | recommendation_table.add_column("Other Recs", style="dim")
236 |
237 | recommendation_results = {}
238 |
239 | for priority in priorities:
240 | try:
241 | # Call the recommend_model tool
242 | raw_result = await mcp.call_tool("recommend_model", {
243 | **task_info,
244 | "priority": priority
245 | })
246 |
247 | # Unpack the result
248 | rec_result = unpack_tool_result(raw_result)
249 |
250 | if "error" in rec_result:
251 | logger.warning(f"Could not get recommendations for priority '{priority}': {rec_result['error']}", emoji_key="warning")
252 | recommendation_table.add_row(priority, "-", "-", "-", "-", "-", f"[dim red]{rec_result['error']}[/dim red]")
253 | elif not rec_result.get("recommendations"):
254 | logger.info(f"No models met criteria for priority '{priority}'", emoji_key="info")
255 | recommendation_table.add_row(priority, "[dim]None[/dim]", "-", "-", "-", "-", "No models fit criteria")
256 | else:
257 | recs = rec_result["recommendations"]
258 | top_rec = recs[0]
259 | other_recs_str = ", ".join([escape(r["model"]) for r in recs[1:]]) if len(recs) > 1 else "None"
260 |
261 | cost_key = "estimated_cost"
262 | quality_key = "quality_score"
263 | speed_key = "speed_score"
264 |
265 | if 'score' not in top_rec:
266 | if priority == "cost":
267 | # Lower cost is better
268 | score = 10.0 / (float(top_rec.get(cost_key, 1.0)) + 0.001) # Use .get with default
269 | elif priority == "quality":
270 | # Higher quality is better
271 | score = float(top_rec.get(quality_key, 0))
272 | elif priority == "speed":
273 | # Lower speed value is better
274 | score = 10.0 - float(top_rec.get(speed_key, 5))
275 | else: # balanced
276 | # Balanced score - use .get for safety
277 | q = float(top_rec.get(quality_key, 5))
278 | c = float(top_rec.get(cost_key, 0.001))
279 | s = float(top_rec.get(speed_key, 3))
280 | score = (q * 0.5 - c * 100.0 - s * 0.3)
281 | else:
282 | score = top_rec['score']
283 |
284 | recommendation_table.add_row(
285 | priority,
286 | escape(top_rec["model"]),
287 | f"${top_rec.get(cost_key, 0.0):.6f}", # Use .get
288 | str(top_rec.get(quality_key, '-')), # Use .get
289 | str(top_rec.get(speed_key, '-')), # Use .get
290 | f"{score:.2f}",
291 | other_recs_str
292 | )
293 |
294 | # Store for later use
295 | recommendation_results[priority] = rec_result
296 |
297 | except Exception as e:
298 | logger.error(f"Error calling recommend_model for priority {priority}: {e}", emoji_key="error", exc_info=True)
299 | recommendation_table.add_row(priority, "-", "-", "-", "-", "-", "[red]Error[/red]")
300 |
301 | console.print(recommendation_table)
302 | console.print()
303 |
304 | # --- Run with Recommended Model (Example) ---
305 | # Find the balanced recommendation
306 | balanced_rec = None
307 | try:
308 | # Use stored result if available
309 | if "balanced" in recommendation_results:
310 | rec_result = recommendation_results["balanced"]
311 | if rec_result.get("recommendations"):
312 | balanced_rec = rec_result["recommendations"][0]["model"]
313 | else:
314 | # Otherwise, try to get a fresh recommendation
315 | raw_result = await mcp.call_tool("recommend_model", {
316 | **task_info,
317 | "priority": "balanced"
318 | })
319 |
320 | # Unpack the result
321 | rec_result = unpack_tool_result(raw_result)
322 |
323 | if rec_result.get("recommendations"):
324 | balanced_rec = rec_result["recommendations"][0]["model"]
325 | except Exception as e:
326 | logger.error(f"Error getting balanced recommendation: {e}", emoji_key="error")
327 | pass # Ignore errors here, just trying to get a model
328 |
329 | if balanced_rec:
330 | console.print(Rule(f"[cyan]Executing with Recommended Model ({escape(balanced_rec)})[/cyan]"))
331 | logger.info(f"Running completion with balanced recommendation: {balanced_rec}", emoji_key="processing")
332 |
333 | # Use the new helper function instead of direct API key handling
334 | await _execute_with_recommended_model(balanced_rec, prompt, estimated_output_tokens, tracker)
335 | else:
336 | logger.info("Could not get a balanced recommendation to execute.", emoji_key="info")
337 |
338 | # Display cost summary at the end of the demonstration
339 | tracker.display_summary(console)
340 |
341 |
342 | def _get_provider_for_model(model_name: str) -> str:
343 | """Helper to determine provider from model name (handles prefixed names)."""
344 | if '/' in model_name:
345 | # If already prefixed, extract provider
346 | provider = model_name.split('/')[0]
347 | # Validate against known providers if necessary
348 | known_providers = [p.value for p in Provider]
349 | if provider in known_providers:
350 | return provider
351 | else:
352 | logger.warning(f"Unknown or ambiguous provider prefix in '{model_name}'")
353 | return None
354 |
355 | # Fallback for non-prefixed names (original logic)
356 | if model_name.startswith("gpt-"):
357 | return Provider.OPENAI.value
358 | elif model_name.startswith("claude-"):
359 | return Provider.ANTHROPIC.value
360 | elif model_name.startswith("deepseek-"):
361 | return Provider.DEEPSEEK.value
362 | elif model_name.startswith("gemini-"):
363 | return Provider.GEMINI.value
364 | elif model_name.startswith("grok-"):
365 | return Provider.GROK.value
366 |
367 | # Add specific non-prefixed model checks if needed
368 | if model_name in ["o1-preview", "o3-mini"]: # Example
369 | return Provider.OPENAI.value
370 |
371 | logger.warning(f"Could not determine provider for model '{model_name}'")
372 | return None
373 |
374 |
375 | async def main():
376 | """Run cost optimization examples."""
377 | tracker = CostTracker() # Instantiate tracker
378 | try:
379 | await demonstrate_cost_optimization(tracker) # Pass tracker
380 |
381 | except Exception as e:
382 | logger.critical(f"Example failed: {str(e)}", emoji_key="critical")
383 | return 1
384 |
385 | return 0
386 |
387 |
388 | if __name__ == "__main__":
389 | # Run the examples
390 | exit_code = asyncio.run(main())
391 | sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/ums_api/ums_database.py:
--------------------------------------------------------------------------------
```python
1 | """Database utilities for UMS API."""
2 |
3 | import sqlite3
4 | import math
5 | from pathlib import Path
6 | from collections import Counter, defaultdict
7 | from typing import Any, Dict, List
8 |
9 | # Database path configuration
10 | def get_database_path() -> str:
11 | """Get the path to the unified agent memory database."""
12 | project_root = Path(__file__).resolve().parent.parent.parent.parent
13 | storage_dir = project_root / "storage"
14 | return str(storage_dir / "unified_agent_memory.db")
15 |
16 |
17 | def get_db_connection() -> sqlite3.Connection:
18 | """
19 | Return a SQLite connection with row factory.
20 |
21 | This function creates a connection to the unified agent memory database
22 | and configures it with a row factory for easier data access.
23 |
24 | Returns:
25 | sqlite3.Connection: Database connection with row factory configured
26 | """
27 | conn = sqlite3.connect(get_database_path())
28 | conn.row_factory = sqlite3.Row
29 | return conn
30 |
31 |
32 | def execute_query(query: str, params: tuple = None) -> list:
33 | """
34 | Execute a SELECT query and return results as a list of dictionaries.
35 |
36 | Args:
37 | query: SQL SELECT query to execute
38 | params: Optional parameters for the query
39 |
40 | Returns:
41 | List of dictionaries representing the query results
42 | """
43 | conn = get_db_connection()
44 | cursor = conn.cursor()
45 |
46 | try:
47 | if params:
48 | cursor.execute(query, params)
49 | else:
50 | cursor.execute(query)
51 |
52 | columns = [description[0] for description in cursor.description]
53 | results = [dict(zip(columns, row, strict=False)) for row in cursor.fetchall()]
54 |
55 | return results
56 | finally:
57 | conn.close()
58 |
59 |
60 | def execute_update(query: str, params: tuple = None) -> int:
61 | """
62 | Execute an INSERT, UPDATE, or DELETE query and return the number of affected rows.
63 |
64 | Args:
65 | query: SQL query to execute
66 | params: Optional parameters for the query
67 |
68 | Returns:
69 | Number of rows affected by the query
70 | """
71 | conn = get_db_connection()
72 | cursor = conn.cursor()
73 |
74 | try:
75 | if params:
76 | cursor.execute(query, params)
77 | else:
78 | cursor.execute(query)
79 |
80 | conn.commit()
81 | return cursor.rowcount
82 | finally:
83 | conn.close()
84 |
85 |
86 | def ensure_database_exists() -> bool:
87 | """
88 | Ensure the database file exists and is accessible.
89 |
90 | Returns:
91 | True if the database exists and is accessible, False otherwise
92 | """
93 | try:
94 | db_path = get_database_path()
95 | return Path(db_path).exists()
96 | except Exception:
97 | return False
98 | # ---------- Helper Functions for Data Processing ----------
99 | def _dict_depth(d: Dict[str, Any], depth: int = 0) -> int:
100 | if not isinstance(d, dict) or not d:
101 | return depth
102 | return max(_dict_depth(v, depth + 1) for v in d.values())
103 | def _count_values(d: Dict[str, Any]) -> int:
104 | cnt = 0
105 | for v in d.values():
106 | if isinstance(v, dict):
107 | cnt += _count_values(v)
108 | elif isinstance(v, list):
109 | cnt += len(v)
110 | else:
111 | cnt += 1
112 | return cnt
113 | def calculate_state_complexity(state_data: Dict[str, Any]) -> float:
114 | if not state_data:
115 | return 0.0
116 | comp = (
117 | len(state_data) * 5 + _dict_depth(state_data) * 10 + _count_values(state_data) * 0.5
118 | )
119 | return round(min(100.0, comp), 2)
120 | def compute_state_diff(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
121 | diff = {"added": {}, "removed": {}, "modified": {}, "magnitude": 0.0}
122 | keys = set(a) | set(b)
123 | changed = 0
124 | for k in keys:
125 | if k not in a:
126 | diff["added"][k] = b[k]
127 | changed += 1
128 | elif k not in b:
129 | diff["removed"][k] = a[k]
130 | changed += 1
131 | elif a[k] != b[k]:
132 | diff["modified"][k] = {"before": a[k], "after": b[k]}
133 | changed += 1
134 | if keys:
135 | diff["magnitude"] = (changed / len(keys)) * 100
136 | return diff
137 |
138 |
139 | # ---------- Timeline Analysis Functions ----------
140 | def generate_timeline_segments(
141 | timeline_data: List[Dict[str, Any]], granularity: str, hours: int
142 | ) -> List[Dict[str, Any]]:
143 | """Generate timeline segments summarising state counts / complexity over time."""
144 | if not timeline_data:
145 | return []
146 |
147 | start_ts = min(item["timestamp"] for item in timeline_data)
148 | end_ts = max(item["timestamp"] for item in timeline_data)
149 |
150 | seg_seconds = 1 if granularity == "second" else 60 if granularity == "minute" else 3600
151 | segments: List[Dict[str, Any]] = []
152 | current = start_ts
153 |
154 | while current < end_ts:
155 | seg_end = current + seg_seconds
156 | seg_states = [it for it in timeline_data if current <= it["timestamp"] < seg_end]
157 | if seg_states:
158 | segments.append(
159 | {
160 | "start_time": current,
161 | "end_time": seg_end,
162 | "state_count": len(seg_states),
163 | "avg_complexity": sum(s["complexity_score"] for s in seg_states)
164 | / len(seg_states),
165 | "max_change_magnitude": max(s["change_magnitude"] for s in seg_states),
166 | "dominant_type": Counter(
167 | s["state_type"] for s in seg_states
168 | ).most_common(1)[0][0],
169 | }
170 | )
171 | current = seg_end
172 | return segments
173 | def calculate_timeline_stats(timeline_data: List[Dict[str, Any]]) -> Dict[str, Any]:
174 | """Return aggregate stats about timeline complexity / changes."""
175 | if not timeline_data:
176 | return {}
177 |
178 | complexities = [it["complexity_score"] for it in timeline_data]
179 | changes = [it["change_magnitude"] for it in timeline_data if it["change_magnitude"] > 0]
180 | stypes = Counter(it["state_type"] for it in timeline_data)
181 | return {
182 | "avg_complexity": sum(complexities) / len(complexities),
183 | "max_complexity": max(complexities),
184 | "avg_change_magnitude": (sum(changes) / len(changes)) if changes else 0,
185 | "max_change_magnitude": max(changes) if changes else 0,
186 | "most_common_type": stypes.most_common(1)[0][0] if stypes else None,
187 | "type_distribution": dict(stypes),
188 | }
189 |
190 | # ---------- Action Monitoring Helper Functions ----------
191 | def get_action_status_indicator(status: str, execution_time: float) -> dict:
192 | """Get status indicator with color and icon for action status"""
193 | indicators = {
194 | "running": {"color": "blue", "icon": "play", "label": "Running"},
195 | "executing": {"color": "blue", "icon": "cpu", "label": "Executing"},
196 | "in_progress": {"color": "orange", "icon": "clock", "label": "In Progress"},
197 | "completed": {"color": "green", "icon": "check", "label": "Completed"},
198 | "failed": {"color": "red", "icon": "x", "label": "Failed"},
199 | "cancelled": {"color": "gray", "icon": "stop", "label": "Cancelled"},
200 | "timeout": {"color": "yellow", "icon": "timer-off", "label": "Timeout"},
201 | }
202 |
203 | indicator = indicators.get(
204 | status, {"color": "gray", "icon": "help", "label": "Unknown"}
205 | )
206 |
207 | # Add urgency flag for long-running actions
208 | if (
209 | status in ["running", "executing", "in_progress"] and execution_time > 120
210 | ): # 2 minutes
211 | indicator["urgency"] = "high"
212 | elif (
213 | status in ["running", "executing", "in_progress"] and execution_time > 60
214 | ): # 1 minute
215 | indicator["urgency"] = "medium"
216 | else:
217 | indicator["urgency"] = "low"
218 |
219 | return indicator
220 | def categorize_action_performance(execution_time: float, estimated_duration: float) -> str:
221 | """Categorize action performance based on execution time vs estimate"""
222 | if estimated_duration <= 0:
223 | return "unknown"
224 |
225 | ratio = execution_time / estimated_duration
226 |
227 | if ratio <= 0.5:
228 | return "excellent"
229 | elif ratio <= 0.8:
230 | return "good"
231 | elif ratio <= 1.2:
232 | return "acceptable"
233 | elif ratio <= 2.0:
234 | return "slow"
235 | else:
236 | return "very_slow"
237 | def get_action_resource_usage(action_id: str) -> dict:
238 | """Get resource usage for an action (placeholder implementation)"""
239 | # This is a placeholder - in a real implementation, you'd fetch actual metrics
240 | return {"cpu_usage": 0.0, "memory_usage": 0.0, "network_io": 0.0, "disk_io": 0.0}
241 | def estimate_wait_time(position: int, queue: list) -> float:
242 | """Estimate wait time based on queue position and historical data"""
243 | if position == 0:
244 | return 0.0
245 | # Average action time of 30 seconds (this could be calculated from historical data)
246 | avg_action_time = 30.0
247 | return position * avg_action_time
248 | def get_priority_label(priority: int) -> str:
249 | """Get human-readable priority label"""
250 | if priority <= 1:
251 | return "Critical"
252 | elif priority <= 3:
253 | return "High"
254 | elif priority <= 5:
255 | return "Normal"
256 | elif priority <= 7:
257 | return "Low"
258 | else:
259 | return "Very Low"
260 | def calculate_action_performance_score(action: dict) -> float:
261 | """Calculate performance score for a completed action"""
262 | if action["status"] != "completed":
263 | return 0.0
264 |
265 | execution_time = action.get("execution_duration", 0)
266 | if execution_time <= 0:
267 | return 100.0
268 |
269 | if execution_time <= 5:
270 | return 100.0
271 | elif execution_time <= 15:
272 | return 90.0
273 | elif execution_time <= 30:
274 | return 80.0
275 | elif execution_time <= 60:
276 | return 70.0
277 | elif execution_time <= 120:
278 | return 60.0
279 | else:
280 | return max(50.0, 100.0 - (execution_time / 10))
281 | def calculate_efficiency_rating(execution_time: float, result_size: int) -> str:
282 | """Calculate efficiency rating based on time and output"""
283 | if execution_time <= 0:
284 | return "unknown"
285 |
286 | efficiency_score = result_size / execution_time if execution_time > 0 else 0
287 |
288 | if efficiency_score >= 100:
289 | return "excellent"
290 | elif efficiency_score >= 50:
291 | return "good"
292 | elif efficiency_score >= 20:
293 | return "fair"
294 | else:
295 | return "poor"
296 |
297 | # ---------- File and Data Utilities ----------
298 | def format_file_size(size_bytes: int) -> str:
299 | """Format file size in human readable format"""
300 | if size_bytes == 0:
301 | return "0 B"
302 |
303 | size_names = ["B", "KB", "MB", "GB", "TB"]
304 | i = int(math.floor(math.log(size_bytes, 1024)))
305 | p = math.pow(1024, i)
306 | s = round(size_bytes / p, 2)
307 | return f"{s} {size_names[i]}"
308 |
309 | # ---------- Performance Analysis Functions ----------
310 | def calculate_performance_summary(actions: list) -> dict:
311 | """Calculate performance summary from action history"""
312 | if not actions:
313 | return {
314 | "avg_score": 0.0,
315 | "top_performer": None,
316 | "worst_performer": None,
317 | "efficiency_distribution": {},
318 | }
319 |
320 | scores = [a.get("performance_score", 0) for a in actions]
321 | avg_score = sum(scores) / len(scores)
322 |
323 | best_action = max(actions, key=lambda a: a.get("performance_score", 0))
324 | worst_action = min(actions, key=lambda a: a.get("performance_score", 0))
325 |
326 |
327 | efficiency_counts = Counter(a.get("efficiency_rating", "unknown") for a in actions)
328 |
329 | return {
330 | "avg_score": round(avg_score, 2),
331 | "top_performer": {
332 | "tool_name": best_action.get("tool_name", ""),
333 | "score": best_action.get("performance_score", 0),
334 | },
335 | "worst_performer": {
336 | "tool_name": worst_action.get("tool_name", ""),
337 | "score": worst_action.get("performance_score", 0),
338 | },
339 | "efficiency_distribution": dict(efficiency_counts),
340 | }
341 | def generate_performance_insights(
342 | overall_stats: dict, tool_stats: list, hourly_metrics: list
343 | ) -> list:
344 | """Generate actionable performance insights"""
345 | insights = []
346 |
347 | success_rate = (
348 | overall_stats.get("successful_actions", 0) / overall_stats.get("total_actions", 1)
349 | ) * 100
350 | if success_rate < 80:
351 | insights.append(
352 | {
353 | "type": "warning",
354 | "title": "Low Success Rate",
355 | "message": f"Current success rate is {success_rate:.1f}%. Consider investigating failing tools.",
356 | "severity": "high",
357 | }
358 | )
359 |
360 | if tool_stats:
361 | slowest_tool = max(tool_stats, key=lambda t: t.get("avg_duration", 0))
362 | if slowest_tool.get("avg_duration", 0) > 60:
363 | insights.append(
364 | {
365 | "type": "info",
366 | "title": "Performance Optimization",
367 | "message": f"{slowest_tool['tool_name']} is taking {slowest_tool['avg_duration']:.1f}s on average. Consider optimization.",
368 | "severity": "medium",
369 | }
370 | )
371 |
372 | if hourly_metrics:
373 | peak_hour = max(hourly_metrics, key=lambda h: h.get("action_count", 0))
374 | insights.append(
375 | {
376 | "type": "info",
377 | "title": "Peak Usage",
378 | "message": f"Peak usage occurs at {peak_hour['hour']}:00 with {peak_hour['action_count']} actions.",
379 | "severity": "low",
380 | }
381 | )
382 |
383 | return insights
384 |
385 |
386 | # ---------- Cognitive Pattern Analysis Functions ----------
387 | def find_cognitive_patterns(
388 | states: List[Dict[str, Any]], min_length: int, similarity_threshold: float
389 | ) -> List[Dict[str, Any]]:
390 | """Find recurring patterns in cognitive states"""
391 | patterns = []
392 | from collections import defaultdict
393 |
394 | type_sequences = defaultdict(list)
395 | for state in states:
396 | type_sequences[state["state_type"]].append(state)
397 | for state_type, sequence in type_sequences.items():
398 | if len(sequence) >= min_length * 2:
399 | for length in range(min_length, len(sequence) // 2 + 1):
400 | for start in range(len(sequence) - length * 2 + 1):
401 | subseq1 = sequence[start : start + length]
402 | subseq2 = sequence[start + length : start + length * 2]
403 | similarity = calculate_sequence_similarity(subseq1, subseq2)
404 | if similarity >= similarity_threshold:
405 | patterns.append(
406 | {
407 | "type": f"repeating_{state_type}",
408 | "length": length,
409 | "similarity": similarity,
410 | "occurrences": 2,
411 | "first_occurrence": subseq1[0]["timestamp"],
412 | "pattern_description": f"Repeating {state_type} sequence of {length} states",
413 | }
414 | )
415 | return sorted(patterns, key=lambda p: p["similarity"], reverse=True)
416 | def calculate_sequence_similarity(
417 | seq1: List[Dict[str, Any]], seq2: List[Dict[str, Any]]
418 | ) -> float:
419 | """Calculate similarity between two state sequences"""
420 | if len(seq1) != len(seq2):
421 | return 0.0
422 | total_similarity = 0.0
423 | for s1, s2 in zip(seq1, seq2, strict=False):
424 | state_sim = calculate_single_state_similarity(s1, s2)
425 | total_similarity += state_sim
426 | return total_similarity / len(seq1)
427 | def calculate_single_state_similarity(
428 | state1: Dict[str, Any], state2: Dict[str, Any]
429 | ) -> float:
430 | """Calculate similarity between two individual states"""
431 | data1 = state1.get("state_data", {})
432 | data2 = state2.get("state_data", {})
433 | if not data1 and not data2:
434 | return 1.0
435 | if not data1 or not data2:
436 | return 0.0
437 | keys1 = set(data1.keys())
438 | keys2 = set(data2.keys())
439 | key_similarity = len(keys1 & keys2) / len(keys1 | keys2) if keys1 | keys2 else 1.0
440 | common_keys = keys1 & keys2
441 | value_similarity = 0.0
442 | if common_keys:
443 | matching_values = sum(1 for key in common_keys if data1[key] == data2[key])
444 | value_similarity = matching_values / len(common_keys)
445 | return (key_similarity + value_similarity) / 2
446 | def analyze_state_transitions(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
447 | """Analyze transitions between cognitive states"""
448 | from collections import defaultdict
449 |
450 | transitions = defaultdict(int)
451 | for i in range(len(states) - 1):
452 | current_type = states[i]["state_type"]
453 | next_type = states[i + 1]["state_type"]
454 | transition = f"{current_type} → {next_type}"
455 | transitions[transition] += 1
456 | sorted_transitions = sorted(transitions.items(), key=lambda x: x[1], reverse=True)
457 | return [
458 | {
459 | "transition": transition,
460 | "count": count,
461 | "percentage": (count / (len(states) - 1)) * 100 if len(states) > 1 else 0,
462 | }
463 | for transition, count in sorted_transitions
464 | ]
465 | def detect_cognitive_anomalies(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
466 | """Detect anomalous cognitive states"""
467 | anomalies = []
468 | if len(states) < 3:
469 | return anomalies
470 | complexities = [calculate_state_complexity(s.get("state_data", {})) for s in states]
471 | avg_complexity = sum(complexities) / len(complexities)
472 | std_complexity = (
473 | sum((c - avg_complexity) ** 2 for c in complexities) / len(complexities)
474 | ) ** 0.5
475 | for i, state in enumerate(states):
476 | complexity = complexities[i]
477 | z_score = (
478 | (complexity - avg_complexity) / std_complexity if std_complexity > 0 else 0
479 | )
480 | if abs(z_score) > 2:
481 | anomalies.append(
482 | {
483 | "state_id": state["state_id"],
484 | "timestamp": state["timestamp"],
485 | "anomaly_type": "complexity_outlier",
486 | "z_score": z_score,
487 | "description": f"Unusual complexity: {complexity:.1f} (avg: {avg_complexity:.1f})",
488 | "severity": "high" if abs(z_score) > 3 else "medium",
489 | }
490 | )
491 | return anomalies
492 |
493 | # ---------- Pattern analysis models ----------
494 |
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/single_shot_synthesis.py:
--------------------------------------------------------------------------------
```python
1 | # --- tools/single_shot_synthesis.py (NEW) ---
2 | import asyncio
3 | import json
4 | import random
5 | import re
6 | import time
7 | import uuid
8 | from datetime import datetime
9 | from pathlib import Path
10 | from typing import Any, Dict, List, Literal, Optional
11 |
12 | from ultimate_mcp_server.exceptions import ToolError
13 | from pydantic import ValidationError
14 |
15 | from ultimate_mcp_server.core.models.tournament import ( # Reusing models from tournament where appropriate
16 | SingleShotGeneratorModelConfig,
17 | SingleShotIndividualResponse,
18 | SingleShotSynthesisInput,
19 | SingleShotSynthesisOutput,
20 | )
21 | from ultimate_mcp_server.core.tournaments.utils import extract_thinking
22 | from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
23 | from ultimate_mcp_server.tools.completion import generate_completion
24 | from ultimate_mcp_server.tools.extraction import extract_code_from_response
25 | from ultimate_mcp_server.utils import get_logger
26 |
27 | logger = get_logger("ultimate_mcp_server.tools.single_shot_synthesis")
28 |
29 | STORAGE_DIR_BASE = Path(__file__).resolve().parent.parent.parent / "storage" / "single_shot_synthesis"
30 | STORAGE_DIR_BASE.mkdir(parents=True, exist_ok=True)
31 |
32 | def _get_single_shot_storage_path(name: str, request_id: str) -> Path:
33 | timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
34 | safe_name = re.sub(r'[^\w\s-]', '', name).strip().replace(' ', '_')
35 | safe_name = re.sub(r'[-\s]+', '-', safe_name)[:50]
36 | uuid_suffix = request_id.split('-')[0]
37 | folder_name = f"{timestamp_str}_{safe_name}_{uuid_suffix}"
38 | path = STORAGE_DIR_BASE / folder_name
39 | path.mkdir(parents=True, exist_ok=True)
40 | return path
41 |
42 | async def _generate_expert_response(
43 | prompt: str,
44 | config: SingleShotGeneratorModelConfig,
45 | global_retry_config: Optional[Dict] = None # e.g. {"max_retries": 2, "backoff_base": 1.0}
46 | ) -> SingleShotIndividualResponse:
47 |
48 | start_time = time.monotonic()
49 | response_data = SingleShotIndividualResponse(model_id=config.model_id)
50 |
51 | # Simple retry logic here, could be more sophisticated or rely on generate_completion
52 | max_retries = global_retry_config.get("max_retries", 1) if global_retry_config else 1
53 | backoff_base = global_retry_config.get("backoff_base", 1.0) if global_retry_config else 1.0
54 |
55 | # Determine provider and apply specific logic
56 | derived_provider = None
57 | if '/' in config.model_id:
58 | provider_prefix = config.model_id.split('/')[0].lower()
59 | if provider_prefix == "google":
60 | derived_provider = "gemini" # Map 'google' to 'gemini'
61 | elif provider_prefix == "anthropic":
62 | derived_provider = "anthropic"
63 | # Add other explicit mappings here if needed in the future
64 | else:
65 | derived_provider = provider_prefix # Default to the prefix as is
66 |
67 | current_max_tokens = config.max_tokens
68 | if derived_provider == "anthropic" and current_max_tokens is None:
69 | logger.info(f"Anthropic model {config.model_id} called without max_tokens, defaulting to 2048 for timeout calculation.")
70 | current_max_tokens = 2048 # Default for Anthropic if not specified to prevent TypeError
71 |
72 | for attempt in range(max_retries + 1):
73 | try:
74 | # provider was originally derived inside the loop, now passed explicitly
75 | completion_result = await generate_completion(
76 | prompt=prompt,
77 | model=config.model_id,
78 | provider=derived_provider, # Use the determined/mapped provider
79 | temperature=config.temperature,
80 | max_tokens=current_max_tokens, # Use the potentially defaulted max_tokens
81 | # TODO: Add seed if SingleShotGeneratorModelConfig includes it
82 | )
83 |
84 | response_data.metrics["cost"] = completion_result.get("cost", 0.0)
85 | response_data.metrics["input_tokens"] = completion_result.get("tokens", {}).get("input")
86 | response_data.metrics["output_tokens"] = completion_result.get("tokens", {}).get("output")
87 | response_data.metrics["api_latency_ms"] = int(completion_result.get("processing_time", 0) * 1000)
88 | response_data.metrics["api_model_id_used"] = completion_result.get("model", config.model_id)
89 |
90 | if completion_result.get("success"):
91 | response_data.response_text = completion_result.get("text")
92 | break # Success
93 | else:
94 | response_data.error = completion_result.get("error", f"Generation failed on attempt {attempt+1}")
95 | if attempt == max_retries: # Last attempt
96 | logger.error(f"Expert {config.model_id} failed after {max_retries+1} attempts: {response_data.error}")
97 |
98 | except Exception as e:
99 | logger.error(f"Exception during expert call {config.model_id} (attempt {attempt+1}): {e}", exc_info=True)
100 | response_data.error = str(e)
101 | if attempt == max_retries:
102 | logger.error(f"Expert {config.model_id} failed with exception after {max_retries+1} attempts.")
103 |
104 | if attempt < max_retries:
105 | sleep_duration = random.uniform(backoff_base, backoff_base * 1.5) * (2 ** attempt)
106 | sleep_duration = min(sleep_duration, 15.0) # Max 15s sleep
107 | logger.info(f"Expert {config.model_id} attempt {attempt+1} failed. Retrying in {sleep_duration:.2f}s...")
108 | await asyncio.sleep(sleep_duration)
109 |
110 | response_data.metrics["total_task_time_ms"] = int((time.monotonic() - start_time) * 1000)
111 | return response_data
112 |
113 |
114 | @with_tool_metrics # This decorator will add its own overall metrics
115 | @with_error_handling
116 | async def single_shot_synthesis(
117 | name: str,
118 | prompt: str,
119 | expert_models: List[Dict[str, Any]], # CHANGED from expert_model_configs
120 | synthesizer_model: Dict[str, Any], # CHANGED from synthesizer_model_config
121 | tournament_type: Literal["code", "text"] = "text",
122 | synthesis_instructions: Optional[str] = None
123 | ) -> Dict[str, Any]:
124 | """
125 | Performs a single-shot, multi-model synthesis:
126 | 1. Sends the prompt to multiple "expert" LLMs in parallel.
127 | 2. Collects their responses.
128 | 3. Feeds the original prompt and all expert responses to a "synthesizer" LLM.
129 | 4. The synthesizer LLM produces a final, fused response.
130 | Useful for quick brainstorming and merging diverse perspectives.
131 |
132 | Args:
133 | name: A descriptive name for this synthesis task (e.g., "Product Description Brainstorm").
134 | prompt: The initial challenge prompt or question for all expert models.
135 | expert_models: A list of configurations for the "expert" models. Each config is a dict:
136 | - model_id (str, required): e.g., 'openai/gpt-3.5-turbo'.
137 | - temperature (float, optional): Model-specific temperature.
138 | - max_tokens (int, optional): Model-specific max tokens.
139 | synthesizer_model: Configuration for the "synthesizer" model. Dict fields:
140 | - model_id (str, required, default 'anthropic/claude-3-7-sonnet-20250219'): e.g., 'google/gemini-1.5-pro-latest'.
141 | - temperature (float, optional): Synthesizer-specific temperature.
142 | - max_tokens (int, optional): Synthesizer-specific max tokens.
143 | - system_prompt (str, optional): System prompt for the synthesizer.
144 | tournament_type: 'code' or 'text'. Influences synthesis instructions and output processing (default 'text').
145 | synthesis_instructions: Custom instructions for the synthesizer. If None, default instructions are used.
146 |
147 | Returns:
148 | A dictionary containing the request_id, status, individual expert responses,
149 | the synthesized response, metrics, and storage path for artifacts.
150 | """
151 | task_start_time = time.monotonic()
152 | request_id = str(uuid.uuid4())
153 | storage_path = _get_single_shot_storage_path(name, request_id)
154 |
155 | output_data = SingleShotSynthesisOutput(
156 | request_id=request_id,
157 | name=name,
158 | status="FAILED", # Default to FAILED, update on success
159 | expert_responses=[],
160 | storage_path=str(storage_path)
161 | )
162 |
163 | try:
164 | # Validate inputs using Pydantic model with aliases
165 | validated_input = SingleShotSynthesisInput(
166 | name=name,
167 | prompt=prompt,
168 | expert_models=expert_models, # Pass using alias
169 | synthesizer_model=synthesizer_model, # Pass using alias
170 | tournament_type=tournament_type,
171 | synthesis_instructions=synthesis_instructions
172 | )
173 | parsed_expert_configs = validated_input.expert_model_configs
174 | parsed_synthesizer_config = validated_input.synthesizer_model_config
175 | retry_cfg_experts = {"max_retries": 1, "backoff_base": 1.0}
176 |
177 | except ValidationError as e:
178 | output_data.error_message = f"Input validation error: {e.errors()}"
179 | output_data.total_metrics["total_task_time_ms"] = int((time.monotonic() - task_start_time) * 1000)
180 | logger.error(f"SingleShotSynthesis input validation error for '{name}': {e.json(indent=2)}")
181 | raise ToolError(f"Invalid input for single_shot_synthesis: {e.errors()}", status_code=400) from e
182 |
183 |
184 | # 1. Parallel fan-out to expert models
185 | logger.info(f"[{request_id}] Starting expert model responses for '{name}'. Count: {len(parsed_expert_configs)}")
186 | expert_tasks = [
187 | _generate_expert_response(prompt, config, retry_cfg_experts) for config in parsed_expert_configs
188 | ]
189 | output_data.expert_responses = await asyncio.gather(*expert_tasks, return_exceptions=False) # Exceptions handled in _generate_expert_response
190 |
191 | # Persist expert responses
192 | for i, resp in enumerate(output_data.expert_responses):
193 | expert_file_name = f"expert_{i+1}_{re.sub(r'[^a-zA-Z0-9_-]', '_', resp.model_id)}.md"
194 | expert_file_path = storage_path / expert_file_name
195 | content = f"# Expert Response: {resp.model_id}\n\n"
196 | content += f"## Metrics\nCost: ${resp.metrics.get('cost',0):.6f}\nLatency: {resp.metrics.get('api_latency_ms','N/A')}ms\n\n"
197 | if resp.error:
198 | content += f"## Error\n```\n{resp.error}\n```\n"
199 | if resp.response_text:
200 | content += f"## Response Text\n```\n{resp.response_text}\n```\n"
201 | expert_file_path.write_text(content, encoding='utf-8')
202 |
203 | # 2. Aggregation prompt builder
204 | agg_prompt_parts = [f"You are an advanced AI tasked with synthesizing a definitive answer based on multiple expert inputs.\n\nOriginal Problem/Prompt:\n---\n{prompt}\n---\n"]
205 | agg_prompt_parts.append("Below are responses from several expert models. Review them critically:\n")
206 |
207 | for i, resp in enumerate(output_data.expert_responses):
208 | agg_prompt_parts.append(f"-- Response from Expert Model {i+1} ({resp.model_id}) --")
209 | if resp.response_text and not resp.error:
210 | agg_prompt_parts.append(resp.response_text)
211 | elif resp.error:
212 | agg_prompt_parts.append(f"[This model encountered an error: {resp.error}]")
213 | else:
214 | agg_prompt_parts.append("[This model provided no content.]")
215 | agg_prompt_parts.append("-------------------------------------\n")
216 |
217 | # Synthesis Instructions
218 | if synthesis_instructions:
219 | agg_prompt_parts.append(f"\nSynthesis Instructions:\n{synthesis_instructions}\n")
220 | else: # Default instructions
221 | default_instr = "Your Task:\n1. Identify unique insights, complementary information, and key arguments from each expert response.\n"
222 | default_instr += "2. Resolve any contradictions or discrepancies, prioritizing verifiable facts and logical consistency.\n"
223 | default_instr += "3. Produce a single, coherent, and comprehensive response that is strictly superior to any individual expert response.\n"
224 | if tournament_type == "code":
225 | default_instr += "4. If the task involves code, provide ONLY the complete, final code block (e.g., ```python ... ```). Do not include explanations outside of code comments.\n"
226 | else: # Text
227 | default_instr += "4. You MAY optionally begin your output with a brief <thinking>...</thinking> block explaining your synthesis strategy, then provide the final synthesized text.\n"
228 | default_instr += "\n### Final Synthesized Answer:\n"
229 | agg_prompt_parts.append(default_instr)
230 |
231 | aggregation_prompt = "\n".join(agg_prompt_parts)
232 | (storage_path / "synthesis_prompt.md").write_text(aggregation_prompt, encoding='utf-8')
233 |
234 |
235 | # 3. Fan-in call to synthesizer_id
236 | logger.info(f"[{request_id}] Requesting synthesis from {parsed_synthesizer_config.model_id} for '{name}'.")
237 | synthesizer_success = False
238 | try:
239 | # Simple retry for synthesizer
240 | retry_cfg_synth = {"max_retries": 1, "backoff_base": 2.0}
241 | synth_response_raw = await _generate_expert_response( # Reuse for basic call structure
242 | aggregation_prompt,
243 | SingleShotGeneratorModelConfig( # Adapt to expected input
244 | model_id=parsed_synthesizer_config.model_id,
245 | temperature=parsed_synthesizer_config.temperature,
246 | max_tokens=parsed_synthesizer_config.max_tokens
247 | ),
248 | retry_cfg_synth
249 | )
250 | output_data.synthesizer_metrics = synth_response_raw.metrics
251 |
252 | if synth_response_raw.response_text and not synth_response_raw.error:
253 | output_data.synthesized_response_text = synth_response_raw.response_text
254 | output_data.synthesizer_thinking_process = await extract_thinking(output_data.synthesized_response_text)
255 | if output_data.synthesizer_thinking_process and output_data.synthesized_response_text:
256 | # Remove thinking block from main response if present
257 | output_data.synthesized_response_text = output_data.synthesized_response_text.replace(f"<thinking>{output_data.synthesizer_thinking_process}</thinking>", "").strip()
258 |
259 | if tournament_type == "code":
260 | # Extraction model can be fixed or configurable for this tool
261 | extraction_model_id = "anthropic/claude-3-5-haiku-20241022" # Example default
262 | code_extraction_result = await extract_code_from_response(
263 | response_text=output_data.synthesized_response_text,
264 | language_hint="python", # Assuming Python for now
265 | extraction_model_id=extraction_model_id
266 | )
267 | if code_extraction_result.get("success"):
268 | output_data.synthesized_extracted_code = code_extraction_result.get("code_block")
269 | else: # Log if extraction failed but don't mark whole thing as failure
270 | logger.warning(f"[{request_id}] Code extraction from synthesizer output failed: {code_extraction_result.get('error')}")
271 | synthesizer_success = True
272 | else:
273 | output_data.error_message = f"Synthesizer ({parsed_synthesizer_config.model_id}) failed: {synth_response_raw.error or 'No response text'}"
274 | logger.error(f"[{request_id}] {output_data.error_message}")
275 |
276 | except Exception as e:
277 | output_data.error_message = f"Exception during synthesis call: {str(e)}"
278 | logger.error(f"[{request_id}] {output_data.error_message}", exc_info=True)
279 |
280 | # 4. Persist synthesized artifact
281 | synth_content = f"# Synthesized Response\n\n**Synthesizer Model:** {parsed_synthesizer_config.model_id}\n"
282 | synth_content += f"## Metrics\nCost: ${output_data.synthesizer_metrics.get('cost',0):.6f}\nLatency: {output_data.synthesizer_metrics.get('api_latency_ms','N/A')}ms\n\n"
283 | if output_data.synthesizer_thinking_process:
284 | synth_content += f"## Thinking Process\n```\n{output_data.synthesizer_thinking_process}\n```\n"
285 | if tournament_type == "code" and output_data.synthesized_extracted_code:
286 | synth_content += f"## Extracted Code\n```python\n{output_data.synthesized_extracted_code}\n```\n"
287 | elif output_data.synthesized_response_text: # Fallback to full text if code not extracted or text type
288 | synth_content += f"## Response Text\n```\n{output_data.synthesized_response_text}\n```\n"
289 | if output_data.error_message and not synthesizer_success : # If synthesizer specifically failed
290 | synth_content += f"\n## Synthesizer Error\n```\n{output_data.error_message}\n```"
291 |
292 | (storage_path / "synthesized_response.md").write_text(synth_content, encoding='utf-8')
293 | if tournament_type == "code" and output_data.synthesized_extracted_code:
294 | (storage_path / "synthesized_code.py").write_text(output_data.synthesized_extracted_code, encoding='utf-8')
295 |
296 |
297 | # 5. Finalize output
298 | output_data.status = "SUCCESS" if synthesizer_success else ("PARTIAL_SUCCESS" if any(er.response_text for er in output_data.expert_responses) else "FAILED")
299 | if not synthesizer_success and not output_data.error_message: # General failure if no specific error
300 | output_data.error_message = output_data.error_message or "Synthesis process encountered an issue."
301 |
302 | # Aggregate total metrics
303 | total_cost_agg = output_data.synthesizer_metrics.get("cost", 0.0)
304 | total_input_tokens_agg = output_data.synthesizer_metrics.get("input_tokens", 0) or 0
305 | total_output_tokens_agg = output_data.synthesizer_metrics.get("output_tokens", 0) or 0
306 |
307 | for resp in output_data.expert_responses:
308 | total_cost_agg += resp.metrics.get("cost", 0.0)
309 | total_input_tokens_agg += resp.metrics.get("input_tokens", 0) or 0
310 | total_output_tokens_agg += resp.metrics.get("output_tokens", 0) or 0
311 |
312 | output_data.total_metrics = {
313 | "total_cost": total_cost_agg,
314 | "total_input_tokens": total_input_tokens_agg,
315 | "total_output_tokens": total_output_tokens_agg,
316 | "overall_task_time_ms": int((time.monotonic() - task_start_time) * 1000)
317 | }
318 |
319 | # Save overall metrics
320 | (storage_path / "overall_metrics.json").write_text(json.dumps(output_data.total_metrics, indent=2), encoding='utf-8')
321 |
322 | logger.info(f"[{request_id}] Single-shot synthesis '{name}' completed. Status: {output_data.status}. Cost: ${total_cost_agg:.4f}")
323 | return output_data.model_dump()
324 |
```