This is page 11 of 45. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/async_utils.py:
--------------------------------------------------------------------------------
```python
1 | """Async utilities for Ultimate MCP Server."""
2 | import asyncio
3 | import functools
4 | import time
5 | from contextlib import asynccontextmanager
6 | from typing import Any, Callable, List, Optional, Type, TypeVar, Union
7 |
8 | from ultimate_mcp_server.utils import get_logger
9 |
10 | logger = get_logger(__name__)
11 |
12 | # Type definitions
13 | T = TypeVar('T')
14 | AsyncCallable = Callable[..., Any]
15 |
16 |
17 | class RateLimiter:
18 | """
19 | Rate limiter for controlling request rates to external services.
20 |
21 | This class implements a token bucket algorithm to enforce API rate limits,
22 | preventing too many requests in a short period of time. It's designed for use
23 | in asynchronous code and will automatically pause execution when limits are reached.
24 |
25 | The rate limiter tracks the timestamps of recent calls and blocks new calls
26 | if they would exceed the configured rate limit. When the limit is reached,
27 | the acquire() method blocks until enough time has passed to allow another call.
28 |
29 | This is useful for:
30 | - Respecting API rate limits of external services
31 | - Preventing service overload in high-concurrency applications
32 | - Implementing polite crawling/scraping behavior
33 | - Managing resource access in distributed systems
34 |
35 | Usage example:
36 | ```python
37 | # Create a rate limiter that allows 5 calls per second
38 | limiter = RateLimiter(max_calls=5, period=1.0)
39 |
40 | async def make_api_call():
41 | # This will automatically wait if we're over the limit
42 | await limiter.acquire()
43 | # Now make the actual API call...
44 | return await actual_api_call()
45 | ```
46 | """
47 |
48 | def __init__(self, max_calls: int, period: float):
49 | """
50 | Initialize the rate limiter.
51 |
52 | Args:
53 | max_calls: Maximum number of calls allowed within the specified period.
54 | For example, 100 calls per period.
55 | period: Time period in seconds over which the max_calls limit applies.
56 | For example, 60.0 for a per-minute rate limit.
57 | """
58 | self.max_calls = max_calls
59 | self.period = period
60 | self.calls = []
61 | self.lock = asyncio.Lock()
62 |
63 | async def acquire(self):
64 | """
65 | Acquire permission to make a call, waiting if necessary.
66 |
67 | This method blocks until a call is allowed based on the rate limit. When the
68 | limit has been reached, it will sleep until enough time has passed to allow
69 | another call, respecting the configured max_calls within the period.
70 |
71 | The method ensures thread-safety through an asyncio lock, making it safe to use
72 | across multiple tasks. It also handles the case where waiting for rate limit
73 | permissions overlaps with multiple concurrent requests.
74 |
75 | Returns:
76 | None. When this method returns, the caller is allowed to proceed.
77 |
78 | Raises:
79 | asyncio.CancelledError: If the task is cancelled while waiting.
80 | """
81 | async with self.lock:
82 | now = time.time()
83 |
84 | # Remove expired timestamps
85 | self.calls = [t for t in self.calls if now - t < self.period]
86 |
87 | # Check if we're under the limit
88 | if len(self.calls) < self.max_calls:
89 | self.calls.append(now)
90 | return
91 |
92 | # Calculate wait time
93 | wait_time = self.period - (now - self.calls[0])
94 | if wait_time > 0:
95 | # Release lock while waiting
96 | self.lock.release()
97 | try:
98 | logger.debug(
99 | f"Rate limit reached, waiting {wait_time:.2f}s",
100 | emoji_key="warning"
101 | )
102 | await asyncio.sleep(wait_time)
103 | finally:
104 | # Reacquire lock
105 | await self.lock.acquire()
106 |
107 | # Retry after waiting
108 | await self.acquire()
109 | else:
110 | # Oldest call just expired, record new call
111 | self.calls = self.calls[1:] + [now]
112 |
113 |
114 | @asynccontextmanager
115 | async def timed_context(name: str):
116 | """
117 | Async context manager for measuring and logging operation duration.
118 |
119 | This utility provides a simple way to time asynchronous operations and log their
120 | duration upon completion. It's useful for performance monitoring, debugging,
121 | and identifying bottlenecks in asynchronous code.
122 |
123 | The context manager:
124 | 1. Records the start time when entering the context
125 | 2. Allows the wrapped code to execute
126 | 3. Calculates the elapsed time when the context exits
127 | 4. Logs the operation name and duration with appropriate formatting
128 |
129 | This works with any async code, including API calls, database operations,
130 | file I/O, or computational tasks.
131 |
132 | Args:
133 | name: Descriptive name of the operation being timed. This name will appear
134 | in log messages for easy identification.
135 |
136 | Yields:
137 | None - This context manager doesn't provide any additional context variables.
138 |
139 | Example usage:
140 | ```python
141 | async def fetch_user_data(user_id):
142 | async with timed_context("Fetch user data"):
143 | return await database.get_user(user_id)
144 |
145 | async def process_document(doc_id):
146 | async with timed_context("Document processing"):
147 | # Multiple operations can be timed together
148 | doc = await fetch_document(doc_id)
149 | results = await analyze_document(doc)
150 | return results
151 | ```
152 | """
153 | start_time = time.time()
154 | try:
155 | yield
156 | finally:
157 | duration = time.time() - start_time
158 | logger.debug(
159 | f"{name} completed in {duration:.3f}s",
160 | emoji_key="time",
161 | time=duration
162 | )
163 |
164 |
165 | async def gather_with_concurrency(
166 | n: int,
167 | *tasks,
168 | return_exceptions: bool = False
169 | ) -> List[Any]:
170 | """
171 | Run multiple async tasks with a controlled concurrency limit.
172 |
173 | This function provides a way to execute multiple asynchronous tasks in parallel
174 | while ensuring that no more than a specified number of tasks run simultaneously.
175 | It's similar to asyncio.gather() but with an added concurrency control mechanism.
176 |
177 | This is particularly valuable for:
178 | - Preventing resource exhaustion when processing many tasks
179 | - Respecting service capacity limitations
180 | - Managing memory usage by limiting parallel execution
181 | - Implementing "worker pool" patterns in async code
182 |
183 | The function uses a semaphore internally to control the number of concurrently
184 | executing tasks. Tasks beyond the concurrency limit will wait until a running
185 | task completes and releases the semaphore.
186 |
187 | Args:
188 | n: Maximum number of tasks to run concurrently. This controls resource usage
189 | and prevents overloading the system or external services.
190 | *tasks: Any number of awaitable coroutine objects to execute.
191 | return_exceptions: If True, exceptions are returned as results rather than being
192 | raised. If False, the first raised exception will propagate.
193 | This matches the behavior of asyncio.gather().
194 |
195 | Returns:
196 | List of task results in the same order as the tasks were provided, regardless
197 | of the order in which they completed.
198 |
199 | Example usage:
200 | ```python
201 | # Process a list of URLs with at most 5 concurrent requests
202 | urls = ["https://example.com/1", "https://example.com/2", ...]
203 | tasks = [fetch_url(url) for url in urls]
204 | results = await gather_with_concurrency(5, *tasks)
205 |
206 | # With exception handling
207 | try:
208 | results = await gather_with_concurrency(10, *tasks, return_exceptions=False)
209 | except Exception as e:
210 | # Handle first exception
211 | pass
212 |
213 | # Or capture exceptions in results
214 | results = await gather_with_concurrency(10, *tasks, return_exceptions=True)
215 | for result in results:
216 | if isinstance(result, Exception):
217 | # Handle this exception
218 | pass
219 | ```
220 | """
221 | semaphore = asyncio.Semaphore(n)
222 |
223 | async def run_task_with_semaphore(task):
224 | async with semaphore:
225 | return await task
226 |
227 | return await asyncio.gather(
228 | *(run_task_with_semaphore(task) for task in tasks),
229 | return_exceptions=return_exceptions
230 | )
231 |
232 |
233 | async def run_with_timeout(
234 | coro: Any,
235 | timeout: float,
236 | default: Optional[T] = None,
237 | log_timeout: bool = True
238 | ) -> Union[Any, T]:
239 | """
240 | Run an async coroutine with a timeout, returning a default value if time expires.
241 |
242 | This utility function executes an async operation with a strict time limit and
243 | provides graceful handling of timeouts. If the operation completes within the
244 | specified timeout, its result is returned normally. If the timeout is exceeded,
245 | the specified default value is returned instead, and the operation is cancelled.
246 |
247 | This functionality is particularly useful for:
248 | - Making external API calls that might hang or take too long
249 | - Implementing responsive UIs that can't wait indefinitely
250 | - Handling potentially slow operations in time-sensitive contexts
251 | - Creating fallback behavior for unreliable services
252 |
253 | The function uses asyncio.wait_for internally and properly handles the
254 | TimeoutError, converting it to a controlled return of the default value.
255 |
256 | Args:
257 | coro: The coroutine (awaitable) to execute with a timeout. This can be
258 | any async function call or awaitable object.
259 | timeout: Maximum execution time in seconds before timing out. Must be a
260 | positive number.
261 | default: The value to return if the operation times out. Defaults to None.
262 | This can be any type, and will be returned exactly as provided.
263 | log_timeout: Whether to log a warning message when a timeout occurs. Set
264 | to False to suppress the warning. Default is True.
265 |
266 | Returns:
267 | The result of the coroutine if it completes within the timeout period,
268 | otherwise the specified default value.
269 |
270 | Raises:
271 | Exception: Any exception raised by the coroutine other than TimeoutError
272 | will be propagated to the caller.
273 |
274 | Example usage:
275 | ```python
276 | # Basic usage with default fallback
277 | result = await run_with_timeout(
278 | fetch_data_from_api("https://example.com/data"),
279 | timeout=5.0,
280 | default={"status": "timeout", "data": None}
281 | )
282 |
283 | # Without logging timeouts
284 | result = await run_with_timeout(
285 | slow_operation(),
286 | timeout=10.0,
287 | default=None,
288 | log_timeout=False
289 | )
290 |
291 | # With type checking (using TypeVar)
292 | data: Optional[List[str]] = await run_with_timeout(
293 | get_string_list(),
294 | timeout=3.0,
295 | default=None
296 | )
297 | ```
298 | """
299 | try:
300 | return await asyncio.wait_for(coro, timeout=timeout)
301 | except asyncio.TimeoutError:
302 | if log_timeout:
303 | logger.warning(
304 | f"Operation timed out after {timeout}s",
305 | emoji_key="time",
306 | time=timeout
307 | )
308 | return default
309 |
310 |
311 | def async_retry(
312 | max_retries: int = 3,
313 | retry_delay: float = 1.0,
314 | backoff_factor: float = 2.0,
315 | retry_exceptions: Optional[List[Type[Exception]]] = None,
316 | max_backoff: Optional[float] = None
317 | ):
318 | """
319 | Decorator for automatically retrying async functions when they raise exceptions.
320 |
321 | This decorator implements a configurable exponential backoff retry strategy for
322 | asynchronous functions. When the decorated function raises a specified exception,
323 | the decorator will automatically wait and retry the operation, with an increasing
324 | delay between attempts.
325 |
326 | The retry behavior includes:
327 | - A configurable number of maximum retry attempts
328 | - Initial delay between retries
329 | - Exponential backoff (each retry waits longer than the previous one)
330 | - Optional filtering of which exception types trigger retries
331 | - Optional maximum backoff time to cap the exponential growth
332 | - Detailed logging of retry attempts and final failures
333 |
334 | This pattern is especially useful for:
335 | - Network operations that may fail temporarily
336 | - API calls subject to rate limiting or intermittent failures
337 | - Database operations that may encounter transient errors
338 | - Any resource access that may be temporarily unavailable
339 |
340 | Args:
341 | max_retries: Maximum number of retry attempts after the initial call
342 | (default: 3). Total attempts will be max_retries + 1.
343 | retry_delay: Initial delay between retries in seconds (default: 1.0).
344 | This is the wait time after the first failure.
345 | backoff_factor: Multiplier applied to delay between retries (default: 2.0).
346 | Each retry will wait backoff_factor times longer than the previous.
347 | retry_exceptions: List of exception types that should trigger a retry.
348 | If None (default), all exceptions trigger retries.
349 | max_backoff: Maximum delay between retries in seconds, regardless of the
350 | backoff calculation. None (default) means no maximum.
351 |
352 | Returns:
353 | A decorator function that wraps the target async function with retry logic.
354 |
355 | Example usage:
356 | ```python
357 | # Basic usage - retry any exception up to 3 times
358 | @async_retry()
359 | async def fetch_data(url):
360 | return await make_request(url)
361 |
362 | # Custom configuration - retry specific exceptions with longer delays
363 | @async_retry(
364 | max_retries=5,
365 | retry_delay=2.0,
366 | backoff_factor=3.0,
367 | retry_exceptions=[ConnectionError, TimeoutError],
368 | max_backoff=30.0
369 | )
370 | async def send_to_service(data):
371 | return await service.process(data)
372 | ```
373 |
374 | Note:
375 | The decorated function's signature and return type are preserved, making this
376 | decorator compatible with static type checking.
377 | """
378 | def decorator(func):
379 | @functools.wraps(func)
380 | async def wrapper(*args, **kwargs):
381 | exceptions = []
382 | delay = retry_delay
383 |
384 | for attempt in range(max_retries + 1):
385 | try:
386 | return await func(*args, **kwargs)
387 | except Exception as e:
388 | # Check if we should retry this exception
389 | if retry_exceptions and not any(
390 | isinstance(e, exc_type) for exc_type in retry_exceptions
391 | ):
392 | raise
393 |
394 | exceptions.append(e)
395 |
396 | # If this was the last attempt, reraise
397 | if attempt >= max_retries:
398 | if len(exceptions) > 1:
399 | logger.error(
400 | f"Function {func.__name__} failed after {max_retries+1} attempts",
401 | emoji_key="error",
402 | attempts=max_retries+1
403 | )
404 | raise
405 |
406 | # Log retry
407 | logger.warning(
408 | f"Retrying {func.__name__} after error: {str(e)} "
409 | f"(attempt {attempt+1}/{max_retries+1})",
410 | emoji_key="warning",
411 | attempt=attempt+1,
412 | max_attempts=max_retries+1,
413 | error=str(e)
414 | )
415 |
416 | # Wait before retrying
417 | await asyncio.sleep(delay)
418 |
419 | # Increase delay for next retry
420 | delay *= backoff_factor
421 | if max_backoff:
422 | delay = min(delay, max_backoff)
423 |
424 | # Shouldn't get here, but just in case
425 | raise exceptions[-1]
426 |
427 | return wrapper
428 | return decorator
429 |
430 |
431 | async def map_async(
432 | func: Callable[[Any], Any],
433 | items: List[Any],
434 | concurrency: int = 10,
435 | chunk_size: Optional[int] = None
436 | ) -> List[Any]:
437 | """Map a function over items with limited concurrency.
438 |
439 | This utility provides efficient parallel processing of a list of items while controlling the
440 | maximum number of concurrent operations. It applies the provided async function to each item
441 | in the list, respecting the concurrency limit set by the semaphore.
442 |
443 | The function supports two processing modes:
444 | 1. Chunked processing: When chunk_size is provided, items are processed in batches to improve
445 | memory efficiency when dealing with large lists.
446 | 2. Full parallel processing: When chunk_size is omitted, all items are processed in parallel
447 | but still limited by the concurrency parameter.
448 |
449 | Args:
450 | func: Async function to apply to each item. This function should accept a single item
451 | and return a result.
452 | items: List of items to process. Each item will be passed individually to the function.
453 | concurrency: Maximum number of concurrent tasks allowed. This controls the load on system
454 | resources. Default is 10.
455 | chunk_size: Optional batch size for processing. If provided, items are processed in chunks
456 | of this size to limit memory usage. If None, all items are processed at once
457 | (but still constrained by concurrency).
458 |
459 | Returns:
460 | List of results from applying the function to each item, in the same order as the input items.
461 |
462 | Examples:
463 | ```python
464 | # Define an async function to process an item
465 | async def process_item(item):
466 | await asyncio.sleep(0.1) # Simulate I/O or processing time
467 | return item * 2
468 |
469 | # Process a list of 100 items with max 5 concurrent tasks
470 | items = list(range(100))
471 | results = await map_async(process_item, items, concurrency=5)
472 |
473 | # Process a large list in chunks to manage memory usage
474 | large_list = list(range(10000))
475 | results = await map_async(process_item, large_list, concurrency=10, chunk_size=500)
476 | ```
477 |
478 | Notes:
479 | - If the items list is empty, an empty list is returned immediately.
480 | - The function preserves the original order of items in the result list.
481 | - For CPU-bound tasks, consider using ProcessPoolExecutor with asyncio.to_thread
482 | instead of this function, as this is optimized for I/O-bound tasks.
483 | """
484 | if not items:
485 | return []
486 |
487 | # Create semaphore for concurrency control
488 | semaphore = asyncio.Semaphore(concurrency)
489 |
490 | # Define task function
491 | async def process_item(item):
492 | async with semaphore:
493 | return await func(item)
494 |
495 | # If using chunks, process in batches
496 | if chunk_size:
497 | results = []
498 | # Process in chunks for better memory management
499 | for i in range(0, len(items), chunk_size):
500 | chunk = items[i:i+chunk_size]
501 | chunk_results = await asyncio.gather(
502 | *(process_item(item) for item in chunk)
503 | )
504 | results.extend(chunk_results)
505 | return results
506 | else:
507 | # Process all at once with concurrency limit
508 | return await asyncio.gather(
509 | *(process_item(item) for item in items)
510 | )
511 |
512 |
513 | class AsyncBatchProcessor:
514 | """
515 | Processor for efficiently batching async operations to optimize throughput.
516 |
517 | This class provides a framework for batch processing asynchronous operations,
518 | which is useful for optimizing I/O-bound tasks such as database writes, API calls,
519 | or other operations where batching improves efficiency. It automatically collects
520 | individual items and processes them in batches when:
521 |
522 | 1. The batch reaches a specified size (controlled by batch_size)
523 | 2. A specified time interval elapses (controlled by flush_interval)
524 | 3. A manual flush is requested
525 |
526 | The processor also controls concurrency, allowing multiple batches to be processed
527 | simultaneously while limiting the maximum number of concurrent operations to prevent
528 | overwhelming system resources or external services.
529 |
530 | Common use cases include:
531 | - Batching database inserts or updates for better throughput
532 | - Aggregating API calls to services that support bulk operations
533 | - Optimizing data processing pipelines with chunked operations
534 | - Building high-performance ETL (Extract, Transform, Load) processes
535 |
536 | Usage involves extending this class to implement custom batch processing logic
537 | by overriding the _process_batch method with specific implementation details.
538 |
539 | This class implements the async context manager protocol, allowing for use in
540 | async with statements to ensure proper resource cleanup.
541 | """
542 |
543 | def __init__(
544 | self,
545 | batch_size: int = 100,
546 | max_concurrency: int = 5,
547 | flush_interval: Optional[float] = None
548 | ):
549 | """
550 | Initialize the batch processor with configuration settings.
551 |
552 | Args:
553 | batch_size: Maximum number of items to collect before processing a batch.
554 | Higher values generally improve throughput at the cost of increased
555 | latency and memory usage. Default is 100.
556 | max_concurrency: Maximum number of concurrent batch operations allowed.
557 | This prevents overwhelming external services or system
558 | resources. Default is 5 concurrent batch operations.
559 | flush_interval: Optional automatic flush interval in seconds. When specified,
560 | any collected items will be flushed after this interval,
561 | regardless of whether the batch_size has been reached.
562 | Set to None (default) to disable automatic flushing.
563 | """
564 | self.batch_size = batch_size
565 | self.max_concurrency = max_concurrency
566 | self.flush_interval = flush_interval
567 |
568 | self.items = []
569 | self.flush_task = None
570 | self.semaphore = asyncio.Semaphore(max_concurrency)
571 |
572 | async def add(self, item: Any):
573 | """
574 | Add an item to the current batch for processing.
575 |
576 | This method adds the provided item to the internal collection batch.
577 | The item will be processed when either:
578 | 1. The batch reaches the configured batch_size
579 | 2. The flush_interval elapses (if configured)
580 | 3. A manual flush() is called
581 |
582 | The method automatically triggers a flush operation if the number of
583 | collected items reaches the configured batch_size.
584 |
585 | If a flush_interval is set and no auto-flush task is running, this method
586 | also initializes a background task to automatically flush items after
587 | the specified interval.
588 |
589 | Args:
590 | item: The item to add to the batch. Can be any type that your
591 | _process_batch implementation can handle.
592 |
593 | Returns:
594 | None
595 |
596 | Example:
597 | ```python
598 | processor = MyBatchProcessor(batch_size=50)
599 | await processor.add({"id": 1, "value": "data"})
600 | ```
601 | """
602 | self.items.append(item)
603 |
604 | # Start flush task if needed
605 | if self.flush_interval and not self.flush_task:
606 | self.flush_task = asyncio.create_task(self._auto_flush())
607 |
608 | # Flush if batch is full
609 | if len(self.items) >= self.batch_size:
610 | await self.flush()
611 |
612 | async def flush(self) -> List[Any]:
613 | """
614 | Process all currently batched items immediately.
615 |
616 | This method forces processing of all currently collected items, regardless
617 | of whether the batch is full or the flush interval has elapsed. It's useful
618 | when you need to ensure all items are processed without waiting, such as:
619 |
620 | - When shutting down the application
621 | - Before a checkpoint or commit operation
622 | - When immediate processing is needed for time-sensitive data
623 | - At the end of a processing cycle
624 |
625 | The method handles empty batches gracefully, returning an empty list
626 | when there are no items to process.
627 |
628 | Returns:
629 | List of results from processing the batch. The exact content depends
630 | on what the _process_batch implementation returns. Returns an empty
631 | list if there were no items to process.
632 |
633 | Example:
634 | ```python
635 | # Process any pending items immediately
636 | results = await processor.flush()
637 | ```
638 | """
639 | if not self.items:
640 | return []
641 |
642 | # Get current items
643 | items = self.items
644 | self.items = []
645 |
646 | # Cancel flush task if running
647 | if self.flush_task:
648 | self.flush_task.cancel()
649 | self.flush_task = None
650 |
651 | # Process the batch
652 | return await self._process_batch(items)
653 |
654 | async def _auto_flush(self):
655 | """
656 | Internal background task for automatic periodic flushing.
657 |
658 | This method implements the auto-flush functionality that runs periodically
659 | when flush_interval is set. It sleeps for the configured interval and then
660 | triggers a flush operation if any items are pending.
661 |
662 | The task continues running until:
663 | 1. It's cancelled (typically when a manual flush occurs)
664 | 2. The processor is shut down (via __aexit__)
665 |
666 | This method is not intended to be called directly but is started automatically
667 | by the add() method when needed and a flush_interval is configured.
668 |
669 | Raises:
670 | asyncio.CancelledError: When the task is cancelled. This is caught
671 | internally and used to terminate the loop.
672 | """
673 | try:
674 | while True:
675 | await asyncio.sleep(self.flush_interval)
676 | if self.items:
677 | await self.flush()
678 | except asyncio.CancelledError:
679 | # Task was cancelled, which is expected
680 | pass
681 |
682 | async def _process_batch(self, batch: List[Any]) -> List[Any]:
683 | """
684 | Process a batch of items (to be overridden by subclasses).
685 |
686 | This method should be overridden by subclasses to implement the actual
687 | batch processing logic. The base implementation simply returns the batch
688 | unchanged and logs a warning, as it's not meant to be used directly.
689 |
690 | When implementing this method in a subclass, typical patterns include:
691 | - Sending a bulk API request with all batch items
692 | - Executing a batch database operation
693 | - Processing items in parallel with controlled concurrency
694 | - Aggregating items for a combined operation
695 |
696 | Args:
697 | batch: List of items to process that were collected via add()
698 |
699 | Returns:
700 | List of processed results, where each result corresponds to an item
701 | in the input batch. The actual return type depends on the specific
702 | implementation in the subclass.
703 |
704 | Example implementation:
705 | ```python
706 | async def _process_batch(self, batch: List[dict]) -> List[dict]:
707 | # Add a batch_id to each item
708 | batch_id = str(uuid.uuid4())
709 | for item in batch:
710 | item['batch_id'] = batch_id
711 |
712 | # Send to database in a single operation
713 | results = await self.db.insert_many(batch)
714 | return results
715 | ```
716 | """
717 | # This should be overridden by subclasses
718 | logger.warning(
719 | f"Default batch processing used for {len(batch)} items",
720 | emoji_key="warning"
721 | )
722 | return batch
723 |
724 | async def __aenter__(self):
725 | """
726 | Enter the async context manager.
727 |
728 | Allows the batch processor to be used in an async with statement,
729 | which ensures proper cleanup when the context is exited.
730 |
731 | Returns:
732 | The batch processor instance, ready for use.
733 |
734 | Example:
735 | ```python
736 | async with MyBatchProcessor(batch_size=100) as processor:
737 | for item in items:
738 | await processor.add(item)
739 | # All pending items are automatically flushed when the context exits
740 | ```
741 | """
742 | return self
743 |
744 | async def __aexit__(self, exc_type, exc_val, exc_tb):
745 | """
746 | Exit the async context manager.
747 |
748 | This method is called when exiting an async with block. It ensures
749 | that any pending items are flushed before the context manager completes,
750 | preventing data loss when the processor goes out of scope.
751 |
752 | Args:
753 | exc_type: Exception type if an exception was raised in the context
754 | exc_val: Exception value if an exception was raised
755 | exc_tb: Exception traceback if an exception was raised
756 |
757 | Returns:
758 | False, indicating that any exceptions should be propagated.
759 | """
760 | # Flush any remaining items
761 | if self.items:
762 | await self.flush()
```
--------------------------------------------------------------------------------
/examples/tournament_code_demo.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Tournament Code Demo - Demonstrates running a code improvement tournament
4 |
5 | This script shows how to:
6 | 1. Create a tournament with multiple models, including diversity and evaluators.
7 | 2. Track progress across multiple rounds.
8 | 3. Retrieve and analyze the improved code and evaluation scores.
9 |
10 | The tournament task is to write and iteratively improve a Python function for
11 | parsing messy CSV data, handling various edge cases.
12 |
13 | Usage:
14 | python examples/tournament_code_demo.py [--task TASK] [--rounds N]
15 |
16 | Options:
17 | --task TASK Specify a coding task (default: parse_csv)
18 | --rounds N Number of tournament rounds (default: 2)
19 | """
20 |
21 | import argparse
22 | import asyncio
23 | import json
24 | import sys
25 | from pathlib import Path
26 | from typing import Any, Dict, List, Optional # Added List
27 |
28 | # Add project root to path for imports when running as script
29 | sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
30 |
31 | from rich.markup import escape
32 | from rich.panel import Panel
33 | from rich.rule import Rule
34 | from rich.syntax import Syntax # For displaying code
35 |
36 | # Add these imports to fix undefined names
37 | from ultimate_mcp_server.core.models.tournament import TournamentStatus
38 |
39 | # Assuming Gateway, PromptTemplate, etc. are correctly located
40 | from ultimate_mcp_server.core.server import Gateway
41 | from ultimate_mcp_server.exceptions import ProviderError, ToolError
42 | from ultimate_mcp_server.services.prompts import PromptTemplate # If used
43 |
44 | # Import tournament tools for manual registration
45 | from ultimate_mcp_server.tools.tournament import (
46 | create_tournament,
47 | get_tournament_results,
48 | get_tournament_status,
49 | )
50 | from ultimate_mcp_server.utils import (
51 | get_logger,
52 | process_mcp_result,
53 | ) # process_mcp_result might need updates
54 | from ultimate_mcp_server.utils.display import ( # Ensure these are updated for new TournamentData structure
55 | CostTracker,
56 | display_tournament_results, # This will need significant updates
57 | display_tournament_status, # This likely needs updates too
58 | )
59 | from ultimate_mcp_server.utils.logging.console import console
60 |
61 | # Initialize logger
62 | logger = get_logger("example.tournament_code")
63 |
64 | # Global gateway instance (initialized in setup_gateway)
65 | gateway: Optional[Gateway] = None
66 |
67 | # --- Configuration ---
68 | DEFAULT_MODEL_CONFIGS: List[Dict[str, Any]] = [
69 | {
70 | "model_id": "openai/gpt-4o-mini", # Example, use your actual model IDs
71 | "diversity_count": 2, # Generate 2 variants from this model
72 | "temperature": 0.7,
73 | },
74 | {
75 | "model_id": "anthropic/claude-3-5-haiku-20241022",
76 | "diversity_count": 1,
77 | "temperature": 0.6,
78 | },
79 | # Add more models as available/desired
80 | # {
81 | # "model_id": "google/gemini-1.5-flash-latest",
82 | # "diversity_count": 1,
83 | # },
84 | ]
85 | DEFAULT_NUM_ROUNDS = 2
86 | DEFAULT_TOURNAMENT_NAME = "Advanced Code Improvement Tournament"
87 |
88 | # Default Evaluators
89 | DEFAULT_EVALUATORS: List[Dict[str, Any]] = [
90 | {
91 | "evaluator_id": "python_syntax_checker",
92 | "type": "regex_match",
93 | "params": {
94 | "patterns": [r"^\s*def\s+\w+\(.*\):|^\s*class\s+\w+:"], # Basic check for def/class
95 | "target_field": "extracted_code",
96 | "match_mode": "any_can_match",
97 | },
98 | "weight": 0.2,
99 | "primary_metric": False,
100 | },
101 | {
102 | "evaluator_id": "code_length_penalty", # Example: Penalize overly short/long code
103 | "type": "regex_match", # Could be a custom evaluator
104 | "params": {
105 | # This regex means: content has between 5 and 500 lines (approx)
106 | "patterns": [r"^(?:[^\n]*\n){4,499}[^\n]*$"],
107 | "target_field": "extracted_code",
108 | "match_mode": "all_must_match", # Must be within the line range
109 | "regex_flag_options": ["MULTILINE", "DOTALL"],
110 | },
111 | "weight": 0.1,
112 | },
113 | {
114 | "evaluator_id": "llm_code_grader",
115 | "type": "llm_grader",
116 | "params": {
117 | "model_id": "anthropic/claude-3-5-haiku-20241022", # Use a cost-effective grader
118 | "rubric": (
119 | "Evaluate the provided Python code based on the original prompt. "
120 | "Score from 0-100 considering: \n"
121 | "1. Correctness & Robustness (does it likely solve the problem, handle edges?).\n"
122 | "2. Efficiency (algorithmic complexity, resource usage).\n"
123 | "3. Readability & Maintainability (clarity, comments, Pythonic style).\n"
124 | "4. Completeness (are all requirements addressed?).\n"
125 | "Provide a 'Score: XX' line and a brief justification."
126 | ),
127 | },
128 | "weight": 0.7, # Main evaluator
129 | "primary_metric": True,
130 | },
131 | ]
132 |
133 |
134 | # The generic code prompt template
135 | TEMPLATE_CODE = """
136 | # GENERIC CODE TOURNAMENT PROMPT TEMPLATE
137 |
138 | Write a {{code_type}} that {{task_description}}.
139 |
140 | {{context}}
141 |
142 | Your solution should:
143 | {% for requirement in requirements %}
144 | {{ loop.index }}. {{requirement}}
145 | {% endfor %}
146 |
147 | {% if example_inputs %}
148 | Example inputs:
149 | ```
150 | {{example_inputs}}
151 | ```
152 | {% endif %}
153 |
154 | {% if example_outputs %}
155 | Expected outputs:
156 | ```
157 | {{example_outputs}}
158 | ```
159 | {% endif %}
160 |
161 | Provide ONLY the Python code for your solution, enclosed in triple backticks (```python ... ```).
162 | No explanations before or after the code block, unless they are comments within the code itself.
163 | """
164 |
165 | # Define predefined tasks
166 | TASKS = {
167 | "parse_csv": {
168 | "code_type": "Python function",
169 | "task_description": "parses a CSV string that may use different delimiters and contains various edge cases",
170 | "context": "Your function should be robust enough to handle real-world messy CSV data.",
171 | "requirements": [
172 | "Implement `parse_csv_string(csv_data: str) -> list[dict]`",
173 | "Accept a string `csv_data` which might contain CSV data",
174 | "Automatically detect the delimiter (comma, semicolon, or tab)",
175 | "Handle quoted fields correctly, including escaped quotes within fields",
176 | "Treat the first row as the header",
177 | "Return a list of dictionaries, where each dictionary represents a row",
178 | "Handle errors gracefully by logging warnings and skipping problematic rows",
179 | "Return an empty list if the input is empty or only contains a header",
180 | "Include necessary imports (e.g., `csv`, `io`).",
181 | "Be efficient for moderately large inputs (e.g., up to 1MB).",
182 | ],
183 | "example_inputs": """name,age,city\n"Smith, John",42,New York\n"Doe, Jane";39;"Los Angeles, CA"\n"\\"Williams\\", Bob"\t65\t"Chicago" """,
184 | "example_outputs": """[\n {"name": "Smith, John", "age": "42", "city": "New York"},\n {"name": "Doe, Jane", "age": "39", "city": "Los Angeles, CA"},\n {"name": "\\"Williams\\", Bob", "age": "65", "city": "Chicago"}\n]""",
185 | },
186 | # Add other tasks (calculator, string_util) here if needed, similar structure
187 | }
188 |
189 |
190 | def create_custom_task_variables(task_description_custom: str):
191 | return {
192 | "code_type": "Python function",
193 | "task_description": task_description_custom,
194 | "context": "Ensure your solution is well-documented and handles potential edge cases.",
195 | "requirements": [
196 | "Implement the solution as specified in the task description.",
197 | "Write clean, readable, and efficient Python code.",
198 | "Include type hints and comprehensive docstrings.",
199 | "Handle potential errors gracefully.",
200 | "Make sure all necessary imports are included.",
201 | ],
202 | "example_inputs": "# Provide relevant example inputs if applicable",
203 | "example_outputs": "# Provide expected outputs for the examples if applicable",
204 | }
205 |
206 |
207 | def parse_arguments():
208 | parser = argparse.ArgumentParser(description="Run a code improvement tournament demo")
209 | parser.add_argument(
210 | "--task",
211 | type=str,
212 | default="parse_csv",
213 | choices=list(TASKS.keys()) + ["custom"],
214 | help="Coding task (default: parse_csv)",
215 | )
216 | parser.add_argument(
217 | "--custom-task", type=str, help="Custom coding task description (used when --task=custom)"
218 | )
219 | parser.add_argument(
220 | "--rounds",
221 | type=int,
222 | default=DEFAULT_NUM_ROUNDS,
223 | help=f"Number of tournament rounds (default: {DEFAULT_NUM_ROUNDS})",
224 | )
225 | parser.add_argument(
226 | "--models",
227 | type=str,
228 | nargs="+",
229 | default=[mc["model_id"] for mc in DEFAULT_MODEL_CONFIGS], # Pass only model_id strings
230 | help="List of model IDs to participate (e.g., 'openai/gpt-4o-mini'). Overrides default models.",
231 | )
232 | return parser.parse_args()
233 |
234 |
235 | async def setup_gateway_for_demo():
236 | global gateway
237 | if gateway:
238 | return
239 |
240 | logger.info("Initializing gateway for code tournament demo...", emoji_key="rocket")
241 | # Assuming Gateway constructor and _initialize_providers are async
242 | # The actual Gateway might not need to be created this way if it's a singleton managed by server.py
243 | # For a standalone demo, this direct instantiation is okay.
244 | # For integration, you'd likely get the existing gateway instance.
245 | try:
246 | # This is a simplified setup. In a real server, Gateway might be a singleton.
247 | # Here, we create a new one for the demo.
248 | # Ensure your actual Gateway class can be instantiated and initialized like this.
249 | # from ultimate_mcp_server.core import get_gateway_instance, async_init_gateway
250 | # gateway = get_gateway_instance()
251 | # if not gateway:
252 | # gateway = await async_init_gateway() # This sets the global instance
253 |
254 | # For a script, direct instantiation:
255 | gateway = Gateway(
256 | name="code_tournament_demo_gateway", register_tools=False # Changed: register_tools=False, removed load_all_tools
257 | )
258 | # In a script, you might need to manually initialize providers if not done by Gateway constructor
259 | if not gateway.providers: # Check if providers are already initialized
260 | await gateway._initialize_providers()
261 |
262 | # Manually register tournament tools
263 | mcp = gateway.mcp
264 | mcp.tool()(create_tournament)
265 | mcp.tool()(get_tournament_status)
266 | mcp.tool()(get_tournament_results)
267 | logger.info("Manually registered tournament tools for the demo.")
268 |
269 | except Exception as e:
270 | logger.critical(f"Failed to initialize Gateway: {e}", exc_info=True)
271 | raise
272 |
273 | # Verify tournament tools are registered (they should be if register_tools=True and load_all_tools=True)
274 | # This check is more for sanity.
275 | mcp_tools = await gateway.mcp.list_tools()
276 | registered_tool_names = [t.name for t in mcp_tools]
277 | required_tournament_tools = [
278 | "create_tournament",
279 | "get_tournament_status",
280 | "get_tournament_results",
281 | ]
282 | missing_tools = [
283 | tool for tool in required_tournament_tools if tool not in registered_tool_names
284 | ]
285 |
286 | if missing_tools:
287 | logger.error(
288 | f"Gateway initialized, but required tournament tools are missing: {missing_tools}",
289 | emoji_key="error",
290 | )
291 | logger.info(f"Available tools: {registered_tool_names}")
292 | raise RuntimeError(f"Required tournament tools not registered: {missing_tools}")
293 |
294 | logger.success(
295 | "Gateway for demo initialized and tournament tools verified.", emoji_key="heavy_check_mark"
296 | )
297 |
298 |
299 | async def poll_tournament_status_enhanced(
300 | tournament_id: str, storage_path: Optional[str] = None, interval: int = 10
301 | ) -> Optional[str]:
302 | logger.info(
303 | f"Polling status for tournament {tournament_id} (storage: {storage_path})...",
304 | emoji_key="hourglass",
305 | )
306 | final_states = [
307 | status.value
308 | for status in [
309 | TournamentStatus.COMPLETED,
310 | TournamentStatus.FAILED,
311 | TournamentStatus.CANCELLED,
312 | ]
313 | ]
314 |
315 | while True:
316 | status_input = {"tournament_id": tournament_id}
317 | status_result_raw = await gateway.mcp.call_tool("get_tournament_status", status_input)
318 |
319 | # Process MCP result to get the dictionary
320 | status_data_dict = await process_mcp_result(status_result_raw) # Ensure this returns a dict
321 |
322 | if "error" in status_data_dict or not status_data_dict.get(
323 | "success", True
324 | ): # Check for tool call error
325 | error_message = status_data_dict.get(
326 | "error_message", status_data_dict.get("error", "Unknown error fetching status")
327 | )
328 | if storage_path and "not found" in error_message.lower():
329 | # Fallback to direct file reading
330 | state_file = Path(storage_path) / "tournament_state.json"
331 | logger.debug(f"Tournament not found via API, trying direct read: {state_file}")
332 | if state_file.exists():
333 | try:
334 | direct_state = json.loads(state_file.read_text())
335 | current_status = direct_state.get("status")
336 | # Reconstruct a compatible status_data_dict for display
337 | status_data_dict = {
338 | "tournament_id": tournament_id,
339 | "name": direct_state.get("name"),
340 | "tournament_type": direct_state.get("config", {}).get(
341 | "tournament_type"
342 | ),
343 | "status": current_status,
344 | "current_round": direct_state.get("current_round", -1)
345 | + 1, # Adjust for display
346 | "total_rounds": direct_state.get("config", {}).get("rounds", 0),
347 | "progress_summary": f"Read from file. Round {direct_state.get('current_round', -1) + 1}.",
348 | "created_at": direct_state.get("created_at"),
349 | "updated_at": direct_state.get("updated_at"),
350 | "error_message": direct_state.get("error_message"),
351 | }
352 | logger.info(f"Successfully read direct state from file: {current_status}")
353 | except Exception as e:
354 | logger.error(f"Error reading state file directly: {e}")
355 | # Keep original error message
356 | else:
357 | logger.warning(f"Fallback state file not found: {state_file}")
358 | else: # Non-"not found" error or no storage path
359 | logger.error(
360 | f"Error fetching tournament status: {error_message}", emoji_key="error"
361 | )
362 | return None # Indicate polling error
363 |
364 | # Display status using the utility function (ensure it handles the new dict structure)
365 | display_tournament_status(status_data_dict) # Expects a dict
366 |
367 | current_status_val = status_data_dict.get("status")
368 | if current_status_val in final_states:
369 | logger.success(
370 | f"Tournament {tournament_id} reached final state: {current_status_val}",
371 | emoji_key="heavy_check_mark",
372 | )
373 | return current_status_val
374 |
375 | await asyncio.sleep(interval)
376 |
377 |
378 | # --- Robust result processing for demo ---
379 | async def robust_process_mcp_result(result_raw, storage_path=None):
380 | from ultimate_mcp_server.utils import process_mcp_result
381 | try:
382 | processed = await process_mcp_result(result_raw)
383 | # If no error, or error is not about JSON, return as is
384 | if not processed.get("error") or "LLM repair" not in processed.get("error", ""):
385 | return processed
386 | except Exception as e:
387 | processed = {"error": f"Exception in process_mcp_result: {e}"}
388 |
389 | # Fallback: try to load from file if storage_path is provided
390 | if storage_path:
391 | state_file = Path(storage_path) / "tournament_state.json"
392 | if state_file.exists():
393 | try:
394 | with open(state_file, "r", encoding="utf-8") as f:
395 | return json.load(f)
396 | except Exception as file_e:
397 | return {"error": f"Failed to parse both API and file: {file_e}"}
398 | # Otherwise, return a clear error
399 | return {"error": f"API did not return JSON. Raw: {str(result_raw)[:200]}"}
400 |
401 |
402 | async def run_code_tournament_demo(tracker: CostTracker, args: argparse.Namespace):
403 | if args.task == "custom":
404 | if not args.custom_task:
405 | console.print(
406 | "[bold red]Error:[/bold red] --custom-task description must be provided when --task=custom."
407 | )
408 | return 1
409 | task_name = "custom_task"
410 | task_vars = create_custom_task_variables(args.custom_task)
411 | task_description_log = args.custom_task
412 | elif args.task in TASKS:
413 | task_name = args.task
414 | task_vars = TASKS[task_name]
415 | task_description_log = task_vars["task_description"]
416 | else: # Should not happen due to argparse choices
417 | console.print(f"[bold red]Error:[/bold red] Unknown task '{args.task}'.")
418 | return 1
419 |
420 | console.print(
421 | Rule(
422 | f"[bold blue]{DEFAULT_TOURNAMENT_NAME} - Task: {task_name.replace('_', ' ').title()}[/bold blue]"
423 | )
424 | )
425 | console.print(f"Task Description: [yellow]{escape(task_description_log)}[/yellow]")
426 |
427 | # Prepare model_configs based on CLI input or defaults
428 | # The tool now expects list of dicts for model_configs
429 | current_model_configs = []
430 | if args.models == [mc["model_id"] for mc in DEFAULT_MODEL_CONFIGS]: # Default models used
431 | current_model_configs = DEFAULT_MODEL_CONFIGS
432 | console.print(
433 | f"Using default models: [cyan]{', '.join([mc['model_id'] for mc in current_model_configs])}[/cyan]"
434 | )
435 | else: # Custom models from CLI
436 | for model_id_str in args.models:
437 | # Find if this model_id_str matches any default config to get its diversity/temp
438 | # This is a simple way; a more complex CLI could allow full ModelConfig dicts
439 | default_mc = next(
440 | (mc for mc in DEFAULT_MODEL_CONFIGS if mc["model_id"] == model_id_str), None
441 | )
442 | if default_mc:
443 | current_model_configs.append(default_mc)
444 | else: # Model from CLI not in defaults, use basic config
445 | current_model_configs.append({"model_id": model_id_str, "diversity_count": 1})
446 | console.print(f"Using CLI specified models: [cyan]{', '.join(args.models)}[/cyan]")
447 |
448 | console.print(f"Rounds: [cyan]{args.rounds}[/cyan]")
449 | console.print(
450 | f"Evaluators: [cyan]{', '.join([e['evaluator_id'] for e in DEFAULT_EVALUATORS])}[/cyan]"
451 | )
452 |
453 | code_prompt_template = PromptTemplate(
454 | template=TEMPLATE_CODE,
455 | template_id="demo_code_prompt",
456 | required_vars=["code_type", "task_description", "context", "requirements", "example_inputs", "example_outputs"]
457 | )
458 | try:
459 | initial_prompt = code_prompt_template.render(task_vars)
460 | except Exception as e:
461 | logger.error(f"Failed to render prompt template: {e}", exc_info=True)
462 | return 1
463 |
464 | console.print(
465 | Panel(
466 | escape(initial_prompt[:500] + "..."),
467 | title="[bold]Initial Prompt Preview[/bold]",
468 | border_style="dim",
469 | )
470 | )
471 |
472 | create_input = {
473 | "name": f"{DEFAULT_TOURNAMENT_NAME} - {task_name.replace('_', ' ').title()}",
474 | "prompt": initial_prompt,
475 | "models": current_model_configs, # This should be List[Dict] for the tool
476 | "rounds": args.rounds,
477 | "tournament_type": "code",
478 | "evaluators": DEFAULT_EVALUATORS, # Pass evaluator configs
479 | # Add other new config params if desired, e.g., extraction_model_id
480 | "extraction_model_id": "anthropic/claude-3-5-haiku-20241022",
481 | "max_retries_per_model_call": 2,
482 | "max_concurrent_model_calls": 3,
483 | }
484 |
485 | tournament_id: Optional[str] = None
486 | storage_path: Optional[str] = None
487 |
488 | try:
489 | logger.info("Creating code tournament...", emoji_key="gear")
490 | create_result_raw = await gateway.mcp.call_tool("create_tournament", create_input)
491 | create_data = await process_mcp_result(
492 | create_result_raw
493 | ) # process_mcp_result must return dict
494 |
495 | # Corrected error handling, similar to tournament_text_demo.py
496 | if "error" in create_data:
497 | error_msg = create_data.get("error_message", create_data.get("error", "Unknown error creating tournament"))
498 | logger.error(f"Failed to create tournament: {error_msg}", emoji_key="cross_mark")
499 | console.print(f"[bold red]Error creating tournament:[/bold red] {escape(error_msg)}")
500 | return 1
501 |
502 | tournament_id = create_data.get("tournament_id")
503 | storage_path = create_data.get("storage_path") # Get storage_path
504 |
505 | if not tournament_id:
506 | logger.error(
507 | "No tournament ID returned from create_tournament call.", emoji_key="cross_mark"
508 | )
509 | console.print("[bold red]Error: No tournament ID returned.[/bold red]")
510 | return 1
511 |
512 | console.print(
513 | f"Tournament [bold green]'{create_input['name']}'[/bold green] created successfully!"
514 | )
515 | console.print(f" ID: [yellow]{tournament_id}[/yellow]")
516 | console.print(f" Status: [magenta]{create_data.get('status')}[/magenta]")
517 | if storage_path:
518 | console.print(f" Storage Path: [blue underline]{storage_path}[/blue underline]")
519 |
520 | await asyncio.sleep(1) # Brief pause for task scheduling
521 |
522 | final_status_val = await poll_tournament_status_enhanced(
523 | tournament_id, storage_path, interval=10
524 | )
525 |
526 | if final_status_val == TournamentStatus.COMPLETED.value:
527 | logger.info(
528 | f"Tournament {tournament_id} completed. Fetching final results...",
529 | emoji_key="sports_medal",
530 | )
531 | results_input = {"tournament_id": tournament_id}
532 | results_raw = await gateway.mcp.call_tool("get_tournament_results", results_input)
533 | processed_results_dict = await robust_process_mcp_result(
534 | results_raw, storage_path
535 | )
536 |
537 | results_data_dict = processed_results_dict
538 | workaround_applied_successfully = False
539 |
540 | # If process_mcp_result itself signals an error
541 | # (This will be true if JSON parsing failed and LLM repair also failed to produce valid JSON)
542 | if "error" in processed_results_dict: # Simpler check for any error from process_mcp_result
543 | original_error_msg = processed_results_dict.get("error", "Unknown error processing results")
544 | logger.warning(
545 | f"Initial processing of 'get_tournament_results' failed with: {original_error_msg}"
546 | )
547 |
548 | # Attempt workaround if it's a code tournament, storage path is known,
549 | # AND the initial processing via MCP failed.
550 | current_tournament_type = create_input.get("tournament_type", "unknown")
551 | if current_tournament_type == "code" and storage_path:
552 | logger.info(
553 | f"Applying workaround for 'get_tournament_results' failure. "
554 | f"Attempting to load results directly from storage: {storage_path}"
555 | )
556 | state_file_path = Path(storage_path) / "tournament_state.json"
557 | if state_file_path.exists():
558 | try:
559 | with open(state_file_path, 'r', encoding='utf-8') as f:
560 | results_data_dict = json.load(f) # Override with data from file
561 | logger.success(
562 | f"Workaround successful: Loaded results from {state_file_path}"
563 | )
564 | workaround_applied_successfully = True
565 | except Exception as e:
566 | logger.error(
567 | f"Workaround failed: Could not load or parse {state_file_path}: {e}"
568 | )
569 | # results_data_dict remains processed_results_dict (the error dict from initial processing)
570 | else:
571 | logger.warning(
572 | f"Workaround failed: State file not found at {state_file_path}"
573 | )
574 | # results_data_dict remains processed_results_dict (the error dict from initial processing)
575 | # If not a code tournament, or no storage path, or workaround failed,
576 | # results_data_dict is still the original error dict from processed_results_dict
577 |
578 | # Now, check the final results_data_dict (either from tool or successful workaround)
579 | # This outer check sees if results_data_dict *still* has an error after potential workaround
580 | if "error" in results_data_dict:
581 | # This block will be hit if:
582 | # 1. Original tool call failed AND it wasn't the specific known issue for the workaround.
583 | # 2. Original tool call failed with the known issue, BUT the workaround also failed (e.g., file not found, parse error).
584 | final_error_msg = results_data_dict.get("error_message", results_data_dict.get("error", "Unknown error"))
585 | logger.error(
586 | f"Failed to get tournament results (workaround_applied_successfully={workaround_applied_successfully}): {final_error_msg}",
587 | emoji_key="cross_mark"
588 | )
589 | console.print(f"[bold red]Error fetching results:[/bold red] {escape(final_error_msg)}")
590 | else:
591 | # Successfully got data, either from tool or workaround
592 | if workaround_applied_successfully:
593 | console.print(
594 | "[yellow i](Workaround applied: Results loaded directly from tournament_state.json)[/yellow i]"
595 | )
596 |
597 | # Pass the full dictionary results_data_dict to display_tournament_results
598 | display_tournament_results(
599 | results_data_dict, console
600 | ) # Ensure this function handles the new structure
601 |
602 | # Example of accessing overall best response
603 | overall_best_resp_data = results_data_dict.get("overall_best_response")
604 | if overall_best_resp_data:
605 | console.print(
606 | Rule("[bold green]Overall Best Response Across All Rounds[/bold green]")
607 | )
608 | best_variant_id = overall_best_resp_data.get("model_id_variant", "N/A")
609 | best_score = overall_best_resp_data.get("overall_score", "N/A")
610 | console.print(
611 | f"Best Variant: [cyan]{best_variant_id}[/cyan] (Score: {best_score:.2f if isinstance(best_score, float) else 'N/A'})"
612 | )
613 | best_code = overall_best_resp_data.get("extracted_code")
614 | if best_code:
615 | console.print(
616 | Panel(
617 | Syntax(best_code, "python", theme="monokai", line_numbers=True),
618 | title=f"Code from {best_variant_id}",
619 | border_style="green",
620 | )
621 | )
622 | else:
623 | console.print(
624 | "[yellow]No extracted code found for the overall best response.[/yellow]"
625 | )
626 |
627 | # Try to find and mention the leaderboard file from the last round
628 | last_round_num = results_data_dict.get("config", {}).get("rounds", 0) - 1
629 | if last_round_num >= 0 and last_round_num < len(
630 | results_data_dict.get("rounds_results", [])
631 | ):
632 | last_round_data = results_data_dict["rounds_results"][last_round_num]
633 | leaderboard_file = last_round_data.get("leaderboard_file_path")
634 | if leaderboard_file:
635 | console.print(
636 | f"\nCheck the final leaderboard: [blue underline]{leaderboard_file}[/blue underline]"
637 | )
638 | comparison_file = last_round_data.get("comparison_file_path")
639 | if comparison_file:
640 | console.print(
641 | f"Check the final round comparison: [blue underline]{comparison_file}[/blue underline]"
642 | )
643 |
644 | elif final_status_val: # FAILED or CANCELLED
645 | logger.warning(
646 | f"Tournament {tournament_id} ended with status: {final_status_val}",
647 | emoji_key="warning",
648 | )
649 | console.print(
650 | f"[bold yellow]Tournament ended with status: {final_status_val}[/bold yellow]"
651 | )
652 | # Optionally fetch results for FAILED tournaments to see partial data / error
653 | if final_status_val == TournamentStatus.FAILED.value:
654 | results_input = {"tournament_id": tournament_id}
655 | results_raw = await gateway.mcp.call_tool("get_tournament_results", results_input)
656 | results_data_dict = await robust_process_mcp_result(
657 | results_raw, storage_path
658 | )
659 | if results_data_dict and not results_data_dict.get(
660 | "error_message"
661 | ): # Check success of get_tournament_results
662 | display_tournament_results(results_data_dict, console) # Display what we have
663 | else: # Polling failed
664 | logger.error(f"Polling failed for tournament {tournament_id}.", emoji_key="cross_mark")
665 | console.print(f"[bold red]Polling failed for tournament {tournament_id}.[/bold red]")
666 |
667 | except (ToolError, ProviderError, Exception) as e: # Catch more general errors
668 | logger.error(
669 | f"An error occurred during the code tournament demo: {e}",
670 | exc_info=True,
671 | emoji_key="error",
672 | )
673 | console.print(f"[bold red]Demo Error:[/bold red] {escape(str(e))}")
674 | return 1
675 | finally:
676 | tracker.display_summary(console)
677 | logger.info("Code tournament demo finished.", emoji_key="party_popper")
678 | return 0
679 |
680 |
681 | async def main_async():
682 | args = parse_arguments()
683 | tracker = CostTracker()
684 | exit_code = 1 # Default to error
685 | try:
686 | await setup_gateway_for_demo()
687 | exit_code = await run_code_tournament_demo(tracker, args)
688 | except Exception as e:
689 | console.print(
690 | f"[bold red]Critical error in demo setup or execution:[/bold red] {escape(str(e))}"
691 | )
692 | logger.critical(f"Demo main_async failed: {e}", exc_info=True)
693 | finally:
694 | # Simplified cleanup, similar to tournament_text_demo.py
695 | if gateway:
696 | # Perform any necessary general gateway cleanup if available in the future
697 | # For now, specific sandbox closing is removed as it caused issues and
698 | # repl_python is not explicitly registered/used by this demo with register_tools=False
699 | pass
700 | logger.info("Demo finished.")
701 | return exit_code
702 |
703 |
704 | if __name__ == "__main__":
705 | try:
706 | final_exit_code = asyncio.run(main_async())
707 | except KeyboardInterrupt:
708 | console.print("\n[bold yellow]Demo interrupted by user.[/bold yellow]")
709 | final_exit_code = 130 # Standard exit code for Ctrl+C
710 | sys.exit(final_exit_code)
711 |
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/cli/typer_cli.py:
--------------------------------------------------------------------------------
```python
1 | """Typer CLI implementation for the Ultimate MCP Server."""
2 | import asyncio
3 | import os
4 | import sys
5 | from enum import Enum
6 | from pathlib import Path
7 | from typing import Dict, List, Optional
8 |
9 | import typer
10 | from rich.console import Console
11 | from rich.panel import Panel
12 | from rich.prompt import Confirm
13 | from rich.table import Table
14 |
15 | # Get version hardcoded to avoid import errors
16 | __version__ = "0.1.0" # Hardcode since there are import issues
17 |
18 | from ultimate_mcp_server.cli.commands import (
19 | benchmark_providers,
20 | check_cache,
21 | generate_completion,
22 | list_providers,
23 | run_server,
24 | test_provider,
25 | )
26 | from ultimate_mcp_server.constants import BASE_TOOLSET_CATEGORIES
27 | from ultimate_mcp_server.utils import get_logger
28 |
29 | # Use consistent namespace and get console for Rich output
30 | logger = get_logger("ultimate_mcp_server.cli")
31 | console = Console(file=sys.stderr) # Use stderr to avoid interfering with MCP protocol
32 |
33 | # Create typer app
34 | app = typer.Typer(
35 | name="umcp",
36 | help= (
37 | "[bold green]Ultimate MCP Server[/bold green]: Multi-provider LLM management server\n"
38 | "[italic]Unified CLI to run your server, manage providers, and more.[/italic]"
39 | ),
40 | rich_markup_mode="rich",
41 | no_args_is_help=True,
42 | add_completion=True,
43 | )
44 |
45 |
46 | def version_callback(value: bool):
47 | """Show the version information and exit.
48 |
49 | This callback is triggered by the --version/-v flag and displays
50 | the current version of Ultimate MCP Server before exiting.
51 | """
52 | if value:
53 | console.print(f"Ultimate MCP Server version: [bold]{__version__}[/bold]")
54 | raise typer.Exit()
55 |
56 |
57 | class TransportMode(str, Enum):
58 | """Transport mode for the server."""
59 |
60 | SSE = "sse"
61 | STDIO = "stdio"
62 | STREAMABLE_HTTP = "streamable-http"
63 | SHTTP = "shttp" # Short alias for streamable-http
64 |
65 |
66 | # Define tool-to-example mapping
67 | TOOL_TO_EXAMPLE_MAP: Dict[str, str] = {
68 | # Completion tools
69 | "generate_completion": "simple_completion_demo.py",
70 | "stream_completion": "simple_completion_demo.py",
71 | "chat_completion": "claude_integration_demo.py",
72 | "multi_completion": "multi_provider_demo.py",
73 |
74 | # Provider tools
75 | "get_provider_status": "multi_provider_demo.py",
76 | "list_models": "multi_provider_demo.py",
77 |
78 | # Document tools
79 | "summarize_document": "document_conversion_and_processing_demo.py",
80 | "extract_entities": "document_conversion_and_processing_demo.py",
81 | "chunk_document": "document_conversion_and_processing_demo.py",
82 | "process_document_batch": "document_conversion_and_processing_demo.py",
83 | "extract_text_from_pdf": "document_conversion_and_processing_demo.py",
84 | "process_image_ocr": "document_conversion_and_processing_demo.py",
85 |
86 | # Extraction tools
87 | "extract_json": "advanced_extraction_demo.py",
88 | "extract_table": "advanced_extraction_demo.py",
89 | "extract_key_value_pairs": "advanced_extraction_demo.py",
90 | "extract_semantic_schema": "advanced_extraction_demo.py",
91 |
92 | # Entity graph tools
93 | "extract_entity_graph": "entity_relation_graph_demo.py",
94 |
95 | # RAG tools
96 | "create_knowledge_base": "rag_example.py",
97 | "add_documents": "rag_example.py",
98 | "retrieve_context": "rag_example.py",
99 | "generate_with_rag": "rag_example.py",
100 |
101 | # Classification tools
102 | "text_classification": "text_classification_demo.py",
103 |
104 | # Tournament tools
105 | "create_tournament": "tournament_text_demo.py",
106 | "list_tournaments": "tournament_text_demo.py",
107 | "get_tournament_results": "tournament_text_demo.py",
108 |
109 | # Optimization tools
110 | "estimate_cost": "cost_optimization.py",
111 | "compare_models": "cost_optimization.py",
112 | "recommend_model": "cost_optimization.py",
113 |
114 | # Filesystem tools
115 | "read_file": "filesystem_operations_demo.py",
116 | "write_file": "filesystem_operations_demo.py",
117 | "list_directory": "filesystem_operations_demo.py",
118 | "search_files": "filesystem_operations_demo.py",
119 |
120 |
121 | # HTML tools
122 | "clean_and_format_text_as_markdown": "html_to_markdown_demo.py",
123 |
124 | # Text comparison tools
125 | "compare_documents_redline": "text_redline_demo.py",
126 |
127 | # Marqo search tools
128 | "marqo_fused_search": "marqo_fused_search_demo.py",
129 |
130 | # SQL tools
131 | "connect_to_database": "sql_database_interactions_demo.py",
132 | "execute_query": "sql_database_interactions_demo.py",
133 |
134 | # Audio tools
135 | "transcribe_audio": "audio_transcription_demo.py",
136 |
137 | # Browser automation tools
138 | "browser_init": "browser_automation_demo.py",
139 | "execute_web_workflow": "browser_automation_demo.py",
140 | }
141 |
142 | # Group examples by category
143 | EXAMPLE_CATEGORIES: Dict[str, List[str]] = {
144 | "text-generation": [
145 | "simple_completion_demo.py",
146 | "claude_integration_demo.py",
147 | "multi_provider_demo.py",
148 | "grok_integration_demo.py",
149 | ],
150 | "document-conversion-and-processing": [
151 | "document_conversion_and_processing_demo.py",
152 | "advanced_extraction_demo.py",
153 | ],
154 | "search-and-retrieval": [
155 | "rag_example.py",
156 | "vector_search_demo.py",
157 | "advanced_vector_search_demo.py",
158 | "marqo_fused_search_demo.py",
159 | ],
160 | "browser-automation": [
161 | "browser_automation_demo.py",
162 | "sse_client_demo.py",
163 | ],
164 | "data-analysis": [
165 | "sql_database_interactions_demo.py",
166 | "analytics_reporting_demo.py",
167 | ],
168 | "specialized-tools": [
169 | "audio_transcription_demo.py",
170 | "text_redline_demo.py",
171 | "html_to_markdown_demo.py",
172 | "entity_relation_graph_demo.py",
173 | ],
174 | "workflows": [
175 | "workflow_delegation_demo.py",
176 | "tool_composition_examples.py",
177 | "research_workflow_demo.py",
178 | ],
179 | }
180 |
181 |
182 | # Define option constants to avoid function calls in default arguments
183 | HOST_OPTION = typer.Option(
184 | None,
185 | "-h",
186 | "--host",
187 | help="[cyan]Host[/cyan] or [cyan]IP address[/cyan] to bind the server to (-h shortcut). Defaults from config.",
188 | rich_help_panel="Server Options",
189 | )
190 | PORT_OPTION = typer.Option(
191 | None,
192 | "-p",
193 | "--port",
194 | help="[cyan]Port[/cyan] to listen on (-p shortcut). Defaults from config.",
195 | rich_help_panel="Server Options",
196 | )
197 | WORKERS_OPTION = typer.Option(
198 | None,
199 | "-w",
200 | "--workers",
201 | help="[cyan]Number of worker[/cyan] processes to spawn (-w shortcut). Defaults from config.",
202 | rich_help_panel="Server Options",
203 | )
204 | TRANSPORT_MODE_OPTION = typer.Option(
205 | TransportMode.STREAMABLE_HTTP,
206 | "-t",
207 | "--transport-mode",
208 | help="[cyan]Transport mode[/cyan] for server communication (-t shortcut). Options: 'sse', 'stdio', 'streamable-http', 'shttp'.",
209 | rich_help_panel="Server Options",
210 | )
211 | DEBUG_OPTION = typer.Option(
212 | False,
213 | "-d",
214 | "--debug",
215 | help="[yellow]Enable debug logging[/yellow] for detailed output (-d shortcut).",
216 | rich_help_panel="Server Options",
217 | )
218 | INCLUDE_TOOLS_OPTION = typer.Option(
219 | None,
220 | "--include-tools",
221 | help="[green]List of tool names to include[/green] when running the server. Adds to the 'Base Toolset' by default, or to all tools if --load-all-tools is used.",
222 | rich_help_panel="Server Options",
223 | )
224 | EXCLUDE_TOOLS_OPTION = typer.Option(
225 | None,
226 | "--exclude-tools",
227 | help="[red]List of tool names to exclude[/red] when running the server. Applies after including tools.",
228 | rich_help_panel="Server Options",
229 | )
230 | LOAD_ALL_TOOLS_OPTION = typer.Option(
231 | False,
232 | "-a",
233 | "--load-all-tools",
234 | help="[yellow]Load all available tools[/yellow] instead of just the default 'Base Toolset' (-a shortcut).",
235 | rich_help_panel="Server Options",
236 | )
237 |
238 | CHECK_OPTION = typer.Option(
239 | False,
240 | "-c",
241 | "--check",
242 | help="[yellow]Check API keys[/yellow] for all configured providers (-c shortcut).",
243 | rich_help_panel="Provider Options",
244 | )
245 | MODELS_OPTION = typer.Option(
246 | False,
247 | "--models",
248 | help="[green]List available models[/green] for each provider.",
249 | rich_help_panel="Provider Options",
250 | )
251 |
252 | MODEL_OPTION = typer.Option(
253 | None,
254 | "--model",
255 | help="[cyan]Model ID[/cyan] to test (defaults to the provider's default).",
256 | rich_help_panel="Test Options",
257 | )
258 | PROMPT_OPTION = typer.Option(
259 | "Hello, world!",
260 | "--prompt",
261 | help="[magenta]Prompt text[/magenta] to send to the provider.",
262 | rich_help_panel="Test Options",
263 | )
264 |
265 | PROVIDER_OPTION = typer.Option(
266 | "openai",
267 | "--provider",
268 | help="[cyan]Provider[/cyan] to use (default: openai)",
269 | rich_help_panel="Completion Options",
270 | )
271 | COMPLETION_MODEL_OPTION = typer.Option(
272 | None,
273 | "--model",
274 | help="[blue]Model ID[/blue] for completion (defaults to the provider's default)",
275 | rich_help_panel="Completion Options",
276 | )
277 | COMPLETION_PROMPT_OPTION = typer.Option(
278 | None,
279 | "--prompt",
280 | help="[magenta]Prompt text[/magenta] for generation (reads from stdin if not provided)",
281 | rich_help_panel="Completion Options",
282 | )
283 | TEMPERATURE_OPTION = typer.Option(
284 | 0.7,
285 | "--temperature",
286 | help="[yellow]Sampling temperature[/yellow] (0.0 - 2.0, default: 0.7)",
287 | rich_help_panel="Completion Options",
288 | )
289 | MAX_TOKENS_OPTION = typer.Option(
290 | None,
291 | "--max-tokens",
292 | help="[green]Max tokens[/green] to generate (defaults to provider's setting)",
293 | rich_help_panel="Completion Options",
294 | )
295 | SYSTEM_OPTION = typer.Option(
296 | None,
297 | "--system",
298 | help="[blue]System prompt[/blue] for providers that support it.",
299 | rich_help_panel="Completion Options",
300 | )
301 | STREAM_OPTION = typer.Option(
302 | False,
303 | "-s",
304 | "--stream",
305 | help="[cyan]Stream[/cyan] the response token by token (-s shortcut).",
306 | rich_help_panel="Completion Options",
307 | )
308 |
309 | STATUS_OPTION = typer.Option(
310 | True,
311 | "--status",
312 | help="[green]Show cache status[/green]",
313 | rich_help_panel="Cache Options",
314 | )
315 | CLEAR_OPTION = typer.Option(
316 | False,
317 | "--clear",
318 | help="[red]Clear the cache[/red]",
319 | rich_help_panel="Cache Options",
320 | )
321 |
322 | DEFAULT_PROVIDERS = ["openai", "anthropic", "deepseek", "gemini", "openrouter"]
323 | PROVIDERS_OPTION = typer.Option(
324 | DEFAULT_PROVIDERS,
325 | "--providers",
326 | help="[cyan]Providers list[/cyan] to benchmark (default: all)",
327 | rich_help_panel="Benchmark Options",
328 | )
329 | BENCHMARK_MODELS_OPTION = typer.Option(
330 | None,
331 | "--models",
332 | help="[blue]Model IDs[/blue] to benchmark (defaults to default model of each provider)",
333 | rich_help_panel="Benchmark Options",
334 | )
335 | BENCHMARK_PROMPT_OPTION = typer.Option(
336 | None,
337 | "--prompt",
338 | help="[magenta]Prompt text[/magenta] to use for benchmarking (default built-in)",
339 | rich_help_panel="Benchmark Options",
340 | )
341 | RUNS_OPTION = typer.Option(
342 | 3,
343 | "-r",
344 | "--runs",
345 | help="[green]Number of runs[/green] per provider/model (-r shortcut, default: 3)",
346 | rich_help_panel="Benchmark Options",
347 | )
348 |
349 | VERSION_OPTION = typer.Option(
350 | False,
351 | "--version",
352 | "-v",
353 | callback=version_callback,
354 | is_eager=True,
355 | help="[yellow]Show the application version and exit.[/yellow]",
356 | rich_help_panel="Global Options",
357 | )
358 |
359 | # Options for tools command
360 | CATEGORY_OPTION = typer.Option(
361 | None,
362 | "--category",
363 | help="[cyan]Filter category[/cyan] when listing tools.",
364 | rich_help_panel="Tools Options",
365 | )
366 | CATEGORY_FILTER_OPTION = typer.Option(
367 | None,
368 | "--category",
369 | help="[cyan]Filter category[/cyan] when listing examples.",
370 | rich_help_panel="Examples Options",
371 | )
372 |
373 | # Options for examples command
374 | SHOW_EXAMPLES_OPTION = typer.Option(
375 | False,
376 | "--examples",
377 | help="[magenta]Show example scripts[/magenta] alongside tools.",
378 | rich_help_panel="Tools Options",
379 | )
380 |
381 | LIST_OPTION = typer.Option(
382 | False,
383 | "-l",
384 | "--list",
385 | help="[green]List examples[/green] instead of running one (-l shortcut).",
386 | rich_help_panel="Examples Options",
387 | )
388 |
389 |
390 | @app.command(name="run")
391 | def run(
392 | host: Optional[str] = HOST_OPTION,
393 | port: Optional[int] = PORT_OPTION,
394 | workers: Optional[int] = WORKERS_OPTION,
395 | transport_mode: TransportMode = TRANSPORT_MODE_OPTION,
396 | debug: bool = DEBUG_OPTION,
397 | include_tools: List[str] = INCLUDE_TOOLS_OPTION,
398 | exclude_tools: List[str] = EXCLUDE_TOOLS_OPTION,
399 | load_all_tools: bool = LOAD_ALL_TOOLS_OPTION,
400 | ):
401 | """
402 | [bold green]Run the Ultimate MCP Server[/bold green]
403 |
404 | Start the MCP server with configurable networking, performance, and tool options.
405 | The server exposes MCP-protocol compatible endpoints that AI agents can use to
406 | access various tools and capabilities.
407 |
408 | By default, only the [yellow]'Base Toolset'[/yellow] is loaded to optimize context window usage.
409 | Use `--load-all-tools` to load all available tools.
410 |
411 | Network settings control server accessibility, workers affect concurrency,
412 | and tool filtering lets you customize which capabilities are exposed.
413 |
414 | [bold]Examples:[/bold]
415 | [cyan]umcp run --host 0.0.0.0 --port 8000 --workers 4[/cyan] (Runs with Base Toolset)
416 | [cyan]umcp run --load-all-tools --debug[/cyan] (Runs with all tools and debug logging)
417 | [cyan]umcp run --include-tools browser,audio[/cyan] (Adds browser and audio tools to the Base Toolset)
418 | [cyan]umcp run --load-all-tools --exclude-tools filesystem[/cyan] (Loads all tools except filesystem)
419 | """
420 | # Set debug mode if requested
421 | if debug:
422 | os.environ["LOG_LEVEL"] = "DEBUG"
423 |
424 | # Print server info
425 | server_info_str = (
426 | f"Host: [cyan]{host or 'default from config'}[/cyan]\n"
427 | f"Port: [cyan]{port or 'default from config'}[/cyan]\n"
428 | f"Workers: [cyan]{workers or 'default from config'}[/cyan]\n"
429 | f"Transport mode: [cyan]{transport_mode}[/cyan]"
430 | )
431 |
432 | # Tool Loading Status
433 | if load_all_tools:
434 | server_info_str += "\nTool Loading: [yellow]All Available Tools[/yellow]"
435 | else:
436 | server_info_str += "\nTool Loading: [yellow]Base Toolset Only[/yellow] (Use --load-all-tools to load all)"
437 | # Format the categories for display
438 | category_lines = []
439 | for category, tools in BASE_TOOLSET_CATEGORIES.items():
440 | category_lines.append(f" [cyan]{category}[/cyan]: {', '.join(tools)}")
441 |
442 | server_info_str += "\n [bold]Includes:[/bold]\n" + "\n".join(category_lines)
443 |
444 |
445 | # Print tool filtering info if enabled
446 | if include_tools or exclude_tools:
447 | server_info_str += "\n[bold]Tool Filtering:[/bold]"
448 | if include_tools:
449 | server_info_str += f"\nIncluding: [cyan]{', '.join(include_tools)}[/cyan]"
450 | if exclude_tools:
451 | server_info_str += f"\nExcluding: [red]{', '.join(exclude_tools)}[/red]"
452 |
453 | console.print(Panel(server_info_str, title="[bold blue]Starting Ultimate MCP Server[/bold blue]", expand=False))
454 | console.print() # Add a newline for spacing
455 |
456 | # Convert transport_mode enum to string and handle aliases
457 | if transport_mode == TransportMode.SHTTP:
458 | actual_transport_mode = "streamable-http"
459 | else:
460 | # Convert enum to string value (e.g., TransportMode.SSE -> "sse")
461 | actual_transport_mode = transport_mode.value
462 |
463 | # Run the server
464 | run_server(
465 | host=host,
466 | port=port,
467 | workers=workers,
468 | transport_mode=actual_transport_mode,
469 | include_tools=include_tools,
470 | exclude_tools=exclude_tools,
471 | load_all_tools=load_all_tools,
472 | )
473 |
474 |
475 | @app.command(name="providers")
476 | def providers(
477 | check: bool = CHECK_OPTION,
478 | models: bool = MODELS_OPTION,
479 | ):
480 | """
481 | [bold green]List Available Providers[/bold green]
482 |
483 | Display configured LLM providers (OpenAI, Anthropic, Gemini, etc.)
484 | with their connection status, default models, and API key validation.
485 |
486 | Use this command to verify your configuration, troubleshoot API keys,
487 | or explore available models across all providers.
488 |
489 | Usage:
490 | umcp providers # Basic provider listing
491 | umcp providers --check # Validate API keys with providers
492 | umcp providers --models # List available models for each provider
493 |
494 | Examples:
495 | umcp providers --check --models # Comprehensive provider diagnostics
496 | """
497 | asyncio.run(list_providers(check_keys=check, list_models=models))
498 |
499 |
500 | @app.command(name="test")
501 | def test(
502 | provider: str = typer.Argument(..., help="Provider to test (openai, anthropic, deepseek, gemini)", rich_help_panel="Test Options"),
503 | model: Optional[str] = MODEL_OPTION,
504 | prompt: str = PROMPT_OPTION,
505 | ):
506 | """
507 | [bold green]Test a Specific Provider[/bold green]
508 |
509 | Verify connectivity and functionality of an LLM provider by sending a test
510 | prompt and displaying the response. This command performs a full API round-trip
511 | to validate your credentials, model availability, and proper configuration.
512 |
513 | The output includes the response text, token counts, cost estimate,
514 | and response time metrics to help diagnose performance issues.
515 |
516 | Usage:
517 | umcp test openai # Test default OpenAI model
518 | umcp test anthropic --model claude-3-5-haiku-20241022 # Test specific model
519 | umcp test gemini --prompt "Summarize quantum computing" # Custom prompt
520 |
521 | Examples:
522 | umcp test openai # Quick health check with default settings
523 | """
524 | with console.status(f"[bold green]Testing provider '{provider}'..."):
525 | try:
526 | asyncio.run(test_provider(provider=provider, model=model, prompt=prompt))
527 | except Exception as e:
528 | console.print(Panel(f"Failed to test provider '{provider}':\n{str(e)}", title="[bold red]Test Error[/bold red]", border_style="red"))
529 | raise typer.Exit(code=1) from e
530 |
531 |
532 | @app.command(name="complete")
533 | def complete(
534 | provider: str = PROVIDER_OPTION,
535 | model: Optional[str] = COMPLETION_MODEL_OPTION,
536 | prompt: Optional[str] = COMPLETION_PROMPT_OPTION,
537 | temperature: float = TEMPERATURE_OPTION,
538 | max_tokens: Optional[int] = MAX_TOKENS_OPTION,
539 | system: Optional[str] = SYSTEM_OPTION,
540 | stream: bool = STREAM_OPTION,
541 | ):
542 | """
543 | [bold green]Generate Text Completion[/bold green]
544 |
545 | Request text generation directly from an LLM provider through the CLI.
546 | This command bypasses the server's MCP endpoint and sends requests
547 | directly to the provider's API, useful for testing or quick generations.
548 |
549 | Supports input from arguments, stdin (piped content), or interactive prompt.
550 | The command provides full control over provider selection, model choice,
551 | and generation parameters. Results include token counts and cost estimates.
552 |
553 | Usage:
554 | echo "Tell me about Mars" | umcp complete # Pipe content as prompt
555 | umcp complete --prompt "Write a haiku" # Direct prompt
556 | umcp complete --provider anthropic --model claude-3-5-sonnet-20241022 # Specify model
557 | umcp complete --temperature 1.5 --max-tokens 250 # Adjust generation params
558 | umcp complete --system "You are a helpful assistant" # Set system prompt
559 | umcp complete --stream # Stream tokens in real-time
560 |
561 | Examples:
562 | umcp complete --prompt "Write a haiku about autumn." --stream
563 | """
564 | # Get prompt from stdin if not provided
565 | if prompt is None:
566 | if sys.stdin.isatty():
567 | console.print("Enter prompt (Ctrl+D to finish):")
568 | prompt = sys.stdin.read().strip()
569 |
570 | asyncio.run(
571 | generate_completion(
572 | provider=provider,
573 | model=model,
574 | prompt=prompt,
575 | temperature=temperature,
576 | max_tokens=max_tokens,
577 | system=system,
578 | stream=stream,
579 | )
580 | )
581 |
582 |
583 | @app.command(name="cache")
584 | def cache(
585 | status: bool = STATUS_OPTION,
586 | clear: bool = CLEAR_OPTION,
587 | ):
588 | """
589 | [bold green]Cache Management[/bold green]
590 |
591 | Monitor and maintain the server's response cache system.
592 | Caching stores previous LLM responses to avoid redundant API calls,
593 | significantly reducing costs and latency for repeated or similar requests.
594 |
595 | The status view shows backend type, item count, hit rate percentage,
596 | and estimated cost savings from cache hits. Clearing the cache removes
597 | all stored responses, which may be necessary after configuration changes
598 | or to force fresh responses.
599 |
600 | Usage:
601 | umcp cache # View cache statistics and status
602 | umcp cache --status # Explicitly request status view
603 | umcp cache --clear # Remove all cached entries (with confirmation)
604 | umcp cache --status --clear # View stats before clearing
605 |
606 | Examples:
607 | umcp cache # Check current cache performance and hit rate
608 | """
609 | should_clear = False
610 | if clear:
611 | if Confirm.ask("[bold yellow]Are you sure you want to clear the cache?[/bold yellow]"):
612 | should_clear = True
613 | else:
614 | console.print("[yellow]Cache clear aborted.[/yellow]")
615 | raise typer.Exit()
616 |
617 | # Only run the async part if needed
618 | if status or should_clear:
619 | with console.status("[bold green]Accessing cache..."):
620 | try:
621 | asyncio.run(check_cache(show_status=status, clear=should_clear))
622 | except Exception as e:
623 | console.print(Panel(f"Failed to access cache:\n{str(e)}", title="[bold red]Cache Error[/bold red]", border_style="red"))
624 | raise typer.Exit(code=1) from e
625 | elif not clear: # If only clear was specified but user aborted
626 | pass # Do nothing, already printed message
627 | else:
628 | console.print("Use --status to view status or --clear to clear the cache.")
629 |
630 |
631 | @app.command(name="benchmark")
632 | def benchmark(
633 | providers: List[str] = PROVIDERS_OPTION,
634 | models: Optional[List[str]] = BENCHMARK_MODELS_OPTION,
635 | prompt: Optional[str] = BENCHMARK_PROMPT_OPTION,
636 | runs: int = RUNS_OPTION,
637 | ):
638 | """
639 | [bold green]Benchmark Providers[/bold green]
640 |
641 | Compare performance metrics and costs across different LLM providers and models.
642 | The benchmark sends identical prompts to each selected provider/model combination
643 | and measures response time, token processing speed, and cost per request.
644 |
645 | Results are presented in a table format showing average metrics across
646 | multiple runs to ensure statistical validity. This helps identify the
647 | most performant or cost-effective options for your specific use cases.
648 |
649 | Usage:
650 | umcp benchmark # Test all configured providers
651 | umcp benchmark --providers openai,anthropic # Test specific providers
652 | umcp benchmark --models gpt-4o,claude-3-5-haiku # Test specific models
653 | umcp benchmark --prompt "Explain quantum computing" --runs 5 # Custom benchmark
654 |
655 | Examples:
656 | umcp benchmark --runs 3 --providers openai,gemini # Compare top providers
657 | """
658 | asyncio.run(benchmark_providers(providers=providers, models=models, prompt=prompt, runs=runs))
659 |
660 |
661 | @app.command(name="tools")
662 | def tools(
663 | category: Optional[str] = CATEGORY_OPTION,
664 | show_examples: bool = SHOW_EXAMPLES_OPTION,
665 | ):
666 | """
667 | [bold green]List Available Tools[/bold green]
668 |
669 | Display the MCP tools registered in the server, organized by functional categories.
670 | These tools represent the server's capabilities that can be invoked by AI agents
671 | through the Model Context Protocol (MCP) interface.
672 |
673 | Tools are grouped into logical categories like completion, document processing,
674 | filesystem access, browser automation, and more. For each tool, you can
675 | optionally view associated example scripts that demonstrate its usage patterns.
676 |
677 | Usage:
678 | umcp tools # List all tools across all categories
679 | umcp tools --category document # Show only document-related tools
680 | umcp tools --examples # Show example scripts for each tool
681 |
682 | Examples:
683 | umcp tools --category filesystem --examples # Learn filesystem tools with examples
684 | """
685 | # Manually list tools by category for demonstration
686 | tool_categories: Dict[str, List[str]] = {
687 | "completion": [
688 | "generate_completion",
689 | "stream_completion",
690 | "chat_completion",
691 | "multi_completion"
692 | ],
693 | "document": [
694 | "summarize_document",
695 | "extract_entities",
696 | "chunk_document",
697 | "process_document_batch"
698 | ],
699 | "extraction": [
700 | "extract_json",
701 | "extract_table",
702 | "extract_key_value_pairs",
703 | "extract_semantic_schema"
704 | ],
705 | "rag": [
706 | "create_knowledge_base",
707 | "add_documents",
708 | "retrieve_context",
709 | "generate_with_rag"
710 | ],
711 | "filesystem": [
712 | "read_file",
713 | "write_file",
714 | "list_directory",
715 | "search_files"
716 | ],
717 | "browser": [
718 | "browser_init",
719 | "browser_navigate",
720 | "browser_click",
721 | "execute_web_workflow"
722 | ]
723 | }
724 |
725 | # Filter by category if specified
726 | if category and category in tool_categories:
727 | categories_to_show = {category: tool_categories[category]}
728 | else:
729 | categories_to_show = tool_categories
730 |
731 | # Create Rich table for display
732 | table = Table(title="Ultimate MCP Server Tools")
733 | table.add_column("Category", style="cyan")
734 | table.add_column("Tool", style="green")
735 |
736 | if show_examples:
737 | table.add_column("Example Script", style="yellow")
738 |
739 | # Add rows to table
740 | for module_name, tool_names in sorted(categories_to_show.items()):
741 | for tool_name in sorted(tool_names):
742 | example_script = TOOL_TO_EXAMPLE_MAP.get(tool_name, "")
743 |
744 | if show_examples:
745 | table.add_row(
746 | module_name,
747 | tool_name,
748 | example_script if example_script else "N/A"
749 | )
750 | else:
751 | table.add_row(module_name, tool_name)
752 |
753 | console.print(table)
754 |
755 | # Print help for running examples
756 | if show_examples:
757 | console.print("\n[bold]Tip:[/bold] Run examples using the command:")
758 | console.print(" [cyan]umcp examples <example_name>[/cyan]")
759 |
760 |
761 | @app.command(name="examples")
762 | def examples(
763 | example_name: Optional[str] = typer.Argument(None, help="Name of the example to run"),
764 | category: Optional[str] = CATEGORY_FILTER_OPTION,
765 | list_examples: bool = LIST_OPTION,
766 | ):
767 | """
768 | [bold green]Run or List Example Scripts[/bold green]
769 |
770 | Browse and execute the demonstration Python scripts included with Ultimate MCP Server.
771 | These examples showcase real-world usage patterns and integration techniques for
772 | different server capabilities, from basic completions to complex workflows.
773 |
774 | Examples are organized by functional category (text-generation, document-processing,
775 | browser-automation, etc.) and contain fully functional code that interacts with
776 | a running MCP server. They serve as both educational resources and starting
777 | points for your own implementations.
778 |
779 | Usage:
780 | umcp examples # List all available example scripts
781 | umcp examples --list # List-only mode (same as above)
782 | umcp examples --category browser-automation # Filter by category
783 | umcp examples rag_example # Run specific example (with or without .py extension)
784 | umcp examples rag_example.py # Explicit extension version
785 |
786 | Examples:
787 | umcp examples simple_completion_demo # Run the basic completion example
788 | """
789 | # Ensure we have the examples directory
790 | project_root = Path(__file__).parent.parent.parent
791 | examples_dir = project_root / "examples"
792 |
793 | if not examples_dir.exists() or not examples_dir.is_dir():
794 | console.print(f"[bold red]Error:[/bold red] Examples directory not found at: {examples_dir}")
795 | console.print(Panel(f"Examples directory not found at: {examples_dir}", title="[bold red]Error[/bold red]", border_style="red"))
796 | return 1
797 |
798 | # If just listing examples
799 | if list_examples or not example_name:
800 | # Create Rich table for display
801 | table = Table(title="Ultimate MCP Server Example Scripts")
802 | table.add_column("Category", style="cyan")
803 | table.add_column("Example Script", style="green")
804 |
805 | # List available examples by category
806 | for category_name, script_names in sorted(EXAMPLE_CATEGORIES.items()):
807 | for script_name in sorted(script_names):
808 | table.add_row(category_name, script_name)
809 |
810 | console.print(table)
811 |
812 | # Print help for running examples
813 | console.print("\n[bold]Run an example:[/bold]")
814 | console.print(" [cyan]umcp examples <example_name>[/cyan]")
815 |
816 | return 0
817 |
818 | # Run the specified example
819 | example_file = None
820 |
821 | # Check if .py extension was provided
822 | if example_name.endswith('.py'):
823 | example_path = examples_dir / example_name
824 | if example_path.exists():
825 | example_file = example_path
826 | else:
827 | # Try with .py extension
828 | example_path = examples_dir / f"{example_name}.py"
829 | if example_path.exists():
830 | example_file = example_path
831 | else:
832 | # Try finding in the tool map
833 | if example_name in TOOL_TO_EXAMPLE_MAP:
834 | example_script = TOOL_TO_EXAMPLE_MAP[example_name]
835 | example_path = examples_dir / example_script
836 | if example_path.exists():
837 | example_file = example_path
838 |
839 | if not example_file:
840 | console.print(f"[bold red]Error:[/bold red] Example '{example_name}' not found")
841 | console.print(Panel(f"Example script '{example_name}' not found in {examples_dir}", title="[bold red]Error[/bold red]", border_style="red"))
842 | return 1
843 |
844 | # Run the example script
845 | console.print(f"[bold blue]Running example:[/bold blue] {example_file.name}")
846 |
847 | # Change to the project root directory to ensure imports work
848 | os.chdir(project_root)
849 |
850 | # Use subprocess to run the script
851 | import subprocess
852 | try:
853 | # Execute the Python script
854 | result = subprocess.run(
855 | [sys.executable, str(example_file)],
856 | check=True
857 | )
858 | return result.returncode
859 | except subprocess.CalledProcessError as e:
860 | console.print(f"[bold red]Error:[/bold red] Example script failed with exit code {e.returncode}")
861 | console.print(Panel(f"Example script '{example_file.name}' failed with exit code {e.returncode}", title="[bold red]Execution Error[/bold red]", border_style="red"))
862 | return e.returncode
863 | except Exception as e:
864 | console.print(f"[bold red]Error:[/bold red] Failed to run example: {str(e)}")
865 | console.print(Panel(f"Failed to run example '{example_file.name}':\n{str(e)}", title="[bold red]Execution Error[/bold red]", border_style="red"))
866 | return 1
867 |
868 |
869 | @app.callback()
870 | def main(
871 | version: bool = VERSION_OPTION,
872 | ):
873 | """Ultimate MCP Server - A comprehensive AI agent operating system.
874 |
875 | The Ultimate MCP Server provides a unified interface to manage LLM providers,
876 | tools, and capabilities through the Model Context Protocol (MCP). It enables
877 | AI agents to access dozens of powerful capabilities including file operations,
878 | browser automation, document processing, database access, and much more.
879 |
880 | This CLI provides commands to:
881 | • Start and configure the server
882 | • Manage and test LLM providers
883 | • Generate text completions directly
884 | • View and clear the response cache
885 | • Benchmark provider performance
886 | • List available tools and capabilities
887 | • Run example scripts demonstrating usage patterns
888 | """
889 | # This function will be called before any command
890 | pass
891 |
892 |
893 | def cli():
894 | """Entry point for CLI package installation.
895 |
896 | This function serves as the main entry point when the package is installed
897 | and the 'umcp' command is invoked. It's referenced in pyproject.toml's
898 | [project.scripts] section to create the command-line executable.
899 | """
900 | app()
901 |
902 |
903 | if __name__ == "__main__":
904 | app()
```
--------------------------------------------------------------------------------
/examples/tool_composition_examples.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tool composition patterns for MCP servers.
3 |
4 | This module demonstrates how to design tools that work together effectively
5 | in sequences and patterns, making it easier for LLMs to understand how to
6 | compose tools for multi-step operations.
7 | """
8 | import csv
9 | import io
10 | import json
11 | from pathlib import Path
12 | from typing import Any, Dict, List, Optional
13 |
14 | from error_handling import non_empty_string, validate_inputs, with_error_handling
15 | from tool_annotations import QUERY_TOOL, READONLY_TOOL
16 | from ultimate_mcp_server.exceptions import ToolExecutionError, ToolInputError
17 | from ultimate_mcp_server.tools.document_conversion_and_processing import (
18 | summarize_document_standalone,
19 | )
20 | from ultimate_mcp_server.tools.filesystem import delete_file, read_file, write_file
21 | from ultimate_mcp_server.tools.local_text_tools import run_sed
22 | from ultimate_mcp_server.utils import get_logger
23 |
24 | logger = get_logger("tool_composition_examples")
25 |
26 |
27 | class DocumentProcessingExample:
28 | """
29 | Example of tool composition for document processing.
30 |
31 | This class demonstrates a pattern where multiple tools work together
32 | to process a document through multiple stages:
33 | 1. Chunking - Break large document into manageable pieces
34 | 2. Analysis - Process each chunk individually
35 | 3. Aggregation - Combine results into a final output
36 |
37 | This pattern is ideal for working with large documents that exceed
38 | context windows.
39 | """
40 |
41 | def __init__(self, mcp_server):
42 | """Initialize with an MCP server instance."""
43 | self.mcp = mcp_server
44 | self._register_tools()
45 |
46 | def _register_tools(self):
47 | """Register document processing tools with the MCP server."""
48 |
49 | @self.mcp.tool(
50 | description=(
51 | "Split a document into manageable chunks for processing. "
52 | "This is the FIRST step in processing large documents that exceed context windows. "
53 | "After chunking, process each chunk separately with analyze_chunk()."
54 | ),
55 | annotations=READONLY_TOOL.to_dict(),
56 | examples=[
57 | {
58 | "name": "Chunk research paper",
59 | "description": "Split a research paper into chunks",
60 | "input": {"document": "This is a long research paper...", "chunk_size": 1000},
61 | "output": {
62 | "chunks": ["Chunk 1...", "Chunk 2..."],
63 | "chunk_count": 2,
64 | "chunk_ids": ["doc123_chunk_1", "doc123_chunk_2"]
65 | }
66 | }
67 | ]
68 | )
69 | @with_error_handling
70 | @validate_inputs(document=non_empty_string)
71 | async def chunk_document(
72 | document: str,
73 | chunk_size: int = 1000,
74 | overlap: int = 100,
75 | ctx=None
76 | ) -> Dict[str, Any]:
77 | """
78 | Split a document into manageable chunks for processing.
79 |
80 | This tool is the first step in a multi-step document processing workflow:
81 | 1. First, chunk the document with chunk_document() (this tool)
82 | 2. Then, process each chunk with analyze_chunk()
83 | 3. Finally, combine results with aggregate_chunks()
84 |
85 | Args:
86 | document: The document text to split
87 | chunk_size: Maximum size of each chunk in characters
88 | overlap: Number of characters to overlap between chunks
89 | ctx: Context object passed by the MCP server
90 |
91 | Returns:
92 | Dictionary containing the chunks and their metadata
93 | """
94 | # Simple chunking strategy - split by character count with overlap
95 | chunks = []
96 | chunk_ids = []
97 | doc_id = f"doc_{hash(document) % 10000}"
98 |
99 | # Create chunks with overlap
100 | for i in range(0, len(document), chunk_size - overlap):
101 | chunk_text = document[i:i + chunk_size]
102 | if chunk_text:
103 | chunk_id = f"{doc_id}_chunk_{len(chunks) + 1}"
104 | chunks.append(chunk_text)
105 | chunk_ids.append(chunk_id)
106 |
107 | return {
108 | "chunks": chunks,
109 | "chunk_count": len(chunks),
110 | "chunk_ids": chunk_ids,
111 | "document_id": doc_id,
112 | "next_step": "analyze_chunk" # Hint for the next tool to use
113 | }
114 |
115 | @self.mcp.tool(
116 | description=(
117 | "Analyze a single document chunk by summarizing it. "
118 | "This is the SECOND step in the document processing workflow. "
119 | "Use after chunk_document() and before aggregate_chunks()."
120 | ),
121 | annotations=READONLY_TOOL.to_dict(),
122 | examples=[
123 | {
124 | "name": "Analyze document chunk",
125 | "description": "Analyze a single chunk from a research paper",
126 | "input": {"chunk": "This chunk discusses methodology...", "chunk_id": "doc123_chunk_1"},
127 | "output": {
128 | "analysis": {"key_topics": ["methodology", "experiment design"]},
129 | "chunk_id": "doc123_chunk_1"
130 | }
131 | }
132 | ]
133 | )
134 | @with_error_handling
135 | @validate_inputs(chunk=non_empty_string)
136 | async def analyze_chunk(
137 | chunk: str,
138 | chunk_id: str,
139 | analysis_type: str = "general",
140 | ctx=None
141 | ) -> Dict[str, Any]:
142 | """
143 | Analyze a single document chunk by summarizing it.
144 |
145 | This tool is the second step in a multi-step document processing workflow:
146 | 1. First, chunk the document with chunk_document()
147 | 2. Then, process each chunk with analyze_chunk() (this tool)
148 | 3. Finally, combine results with aggregate_chunks()
149 |
150 | Args:
151 | chunk: The text chunk to analyze
152 | chunk_id: The ID of the chunk (from chunk_document)
153 | analysis_type: Type of analysis to perform
154 | ctx: Context object passed by the MCP server
155 |
156 | Returns:
157 | Dictionary containing the analysis results
158 | """
159 | # --- Call the actual summarize_document tool ---
160 | logger.info(f"Analyzing chunk {chunk_id} with summarize_document...")
161 | try:
162 | # Use a concise summary for chunk analysis
163 | summary_result = await summarize_document_standalone(
164 | document=chunk,
165 | summary_format="key_points", # Use key points for chunk analysis
166 | max_length=100 # Keep chunk summaries relatively short
167 | # We might need to specify provider/model if defaults aren't suitable
168 | )
169 |
170 | if summary_result.get("success"):
171 | analysis = {
172 | "summary": summary_result.get("summary", "[Summary Unavailable]"),
173 | "analysis_type": "summary", # Indicate the type of analysis performed
174 | "metrics": { # Include metrics from the summary call
175 | "cost": summary_result.get("cost", 0.0),
176 | "tokens": summary_result.get("tokens", {}),
177 | "processing_time": summary_result.get("processing_time", 0.0)
178 | }
179 | }
180 | logger.success(f"Chunk {chunk_id} analyzed successfully.")
181 | else:
182 | logger.warning(f"Summarize tool failed for chunk {chunk_id}: {summary_result.get('error')}")
183 | analysis = {"error": f"Analysis failed: {summary_result.get('error')}"}
184 | except Exception as e:
185 | logger.error(f"Error calling summarize_document for chunk {chunk_id}: {e}", exc_info=True)
186 | analysis = {"error": f"Analysis error: {str(e)}"}
187 | # -------------------------------------------------
188 |
189 | return {
190 | "analysis": analysis,
191 | "chunk_id": chunk_id,
192 | "next_step": "aggregate_chunks" # Hint for the next tool to use
193 | }
194 |
195 | @self.mcp.tool(
196 | description=(
197 | "Aggregate analysis results from multiple document chunks. "
198 | "This is the FINAL step in the document processing workflow. "
199 | "Use after analyzing individual chunks with analyze_chunk()."
200 | ),
201 | annotations=READONLY_TOOL.to_dict(),
202 | examples=[
203 | {
204 | "name": "Aggregate analysis results",
205 | "description": "Combine analysis results from multiple chunks",
206 | "input": {
207 | "analysis_results": [
208 | {"analysis": {"key_topics": ["methodology"]}, "chunk_id": "doc123_chunk_1"},
209 | {"analysis": {"key_topics": ["results"]}, "chunk_id": "doc123_chunk_2"}
210 | ]
211 | },
212 | "output": {
213 | "document_summary": "This document covers methodology and results...",
214 | "overall_statistics": {"total_chunks": 2, "word_count": 2500}
215 | }
216 | }
217 | ]
218 | )
219 | @with_error_handling
220 | async def aggregate_chunks(
221 | analysis_results: List[Dict[str, Any]],
222 | aggregation_type: str = "summary",
223 | ctx=None
224 | ) -> Dict[str, Any]:
225 | """
226 | Aggregate analysis results from multiple document chunks.
227 |
228 | This tool is the final step in a multi-step document processing workflow:
229 | 1. First, chunk the document with chunk_document()
230 | 2. Then, process each chunk with analyze_chunk()
231 | 3. Finally, combine results with aggregate_chunks() (this tool)
232 |
233 | Args:
234 | analysis_results: List of analysis results from analyze_chunk
235 | aggregation_type: Type of aggregation to perform
236 | ctx: Context object passed by the MCP server
237 |
238 | Returns:
239 | Dictionary containing the aggregated results
240 | """
241 | # Validate input
242 | if not analysis_results or not isinstance(analysis_results, list):
243 | return {
244 | "error": "Invalid analysis_results. Must provide a non-empty list of analysis results."
245 | }
246 |
247 | # Extract all analyses
248 | all_analyses = [result.get("analysis", {}) for result in analysis_results if "analysis" in result]
249 | total_chunks = len(all_analyses)
250 |
251 | # Calculate overall statistics
252 | total_word_count = sum(analysis.get("word_count", 0) for analysis in all_analyses)
253 | all_key_sentences = [sentence for analysis in all_analyses
254 | for sentence in analysis.get("key_sentences", [])]
255 |
256 | # Generate summary based on aggregation type
257 | if aggregation_type == "summary":
258 | summary = f"Document contains {total_chunks} chunks with {total_word_count} words total."
259 | if all_key_sentences:
260 | summary += f" Key points include: {' '.join(all_key_sentences[:3])}..."
261 | elif aggregation_type == "sentiment":
262 | # Aggregate sentiment scores if available
263 | sentiment_scores = [analysis.get("sentiment_score", 0.5) for analysis in all_analyses
264 | if "sentiment_score" in analysis]
265 | avg_sentiment = sum(sentiment_scores) / len(sentiment_scores) if sentiment_scores else 0.5
266 | sentiment_label = "positive" if avg_sentiment > 0.6 else "neutral" if avg_sentiment > 0.4 else "negative"
267 | summary = f"Document has an overall {sentiment_label} sentiment (score: {avg_sentiment:.2f})."
268 | else:
269 | summary = f"Aggregated {total_chunks} chunks with {total_word_count} total words."
270 |
271 | return {
272 | "document_summary": summary,
273 | "overall_statistics": {
274 | "total_chunks": total_chunks,
275 | "word_count": total_word_count,
276 | "key_sentences_count": len(all_key_sentences)
277 | },
278 | "workflow_complete": True # Indicate this is the end of the workflow
279 | }
280 |
281 |
282 | # --- Helper: Get a temporary path within allowed storage ---
283 | # Assume storage directory exists and is allowed for this demo context
284 | STORAGE_DIR = Path(__file__).resolve().parent.parent / "storage"
285 | TEMP_DATA_DIR = STORAGE_DIR / "temp_pipeline_data"
286 |
287 | async def _setup_temp_data_files():
288 | """Create temporary data files for the pipeline demo."""
289 | TEMP_DATA_DIR.mkdir(exist_ok=True)
290 | # Sample CSV Data
291 | csv_data = io.StringIO()
292 | writer = csv.writer(csv_data)
293 | writer.writerow(["date", "amount", "category"])
294 | writer.writerow(["2023-01-01", "1,200", "electronics"]) # Note: Amount as string with comma
295 | writer.writerow(["2023-01-02", "950", "clothing"])
296 | writer.writerow(["2023-01-03", "1500", "electronics"])
297 | writer.writerow(["2023-01-04", "800", "food"])
298 | csv_content = csv_data.getvalue()
299 | csv_path = TEMP_DATA_DIR / "temp_sales.csv"
300 | await write_file(path=str(csv_path), content=csv_content) # Use write_file tool implicitly
301 |
302 | # Sample JSON Data
303 | json_data = [
304 | {"user_id": 101, "name": "Alice", "active": True, "last_login": "2023-01-10"},
305 | {"user_id": 102, "name": "Bob", "active": False, "last_login": "2022-12-15"},
306 | {"user_id": 103, "name": "Charlie", "active": True, "last_login": "2023-01-05"},
307 | ]
308 | json_content = json.dumps(json_data, indent=2)
309 | json_path = TEMP_DATA_DIR / "temp_users.json"
310 | await write_file(path=str(json_path), content=json_content) # Use write_file tool implicitly
311 |
312 | return {"csv": str(csv_path), "json": str(json_path)}
313 |
314 | async def _cleanup_temp_data_files(temp_files: Dict[str, str]):
315 | """Remove temporary data files."""
316 | for file_path in temp_files.values():
317 | try:
318 | await delete_file(path=file_path) # Use delete_file tool implicitly
319 | logger.debug(f"Cleaned up temp file: {file_path}")
320 | except Exception as e:
321 | logger.warning(f"Failed to clean up temp file {file_path}: {e}")
322 | try:
323 | # Attempt to remove the directory if empty
324 | if TEMP_DATA_DIR.exists() and not any(TEMP_DATA_DIR.iterdir()):
325 | TEMP_DATA_DIR.rmdir()
326 | logger.debug(f"Cleaned up temp directory: {TEMP_DATA_DIR}")
327 | except Exception as e:
328 | logger.warning(f"Failed to remove temp directory {TEMP_DATA_DIR}: {e}")
329 |
330 | # --- End Helper ---
331 |
332 | class DataPipelineExample:
333 | """
334 | Example of tool composition for data processing pipelines.
335 |
336 | This class demonstrates a pattern where tools form a processing
337 | pipeline to transform, filter, and analyze data:
338 | 1. Fetch - Get data from a source
339 | 2. Transform - Clean and process the data
340 | 3. Filter - Select relevant data
341 | 4. Analyze - Perform analysis on filtered data
342 |
343 | This pattern is ideal for working with structured data that
344 | needs multiple processing steps.
345 | """
346 |
347 | def __init__(self, mcp_server):
348 | """Initialize with an MCP server instance."""
349 | self.mcp = mcp_server
350 | self._register_tools()
351 |
352 | def _register_tools(self):
353 | """Register data pipeline tools with the MCP server."""
354 |
355 | @self.mcp.tool(
356 | description=(
357 | "Fetch data from a temporary source file based on type. "
358 | "This is the FIRST step in the data pipeline. "
359 | "Continue with transform_data() to clean the fetched data."
360 | ),
361 | annotations=QUERY_TOOL.to_dict(),
362 | examples=[
363 | {
364 | "name": "Fetch CSV data",
365 | "description": "Fetch data from a CSV source",
366 | "input": {"source_type": "csv"},
367 | "output": {
368 | "data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
369 | "record_count": 2,
370 | "schema": {"date": "string", "amount": "number"}
371 | }
372 | }
373 | ]
374 | )
375 | @with_error_handling
376 | async def fetch_data(
377 | source_type: str,
378 | limit: Optional[int] = None,
379 | ctx=None
380 | ) -> Dict[str, Any]:
381 | """
382 | Fetch data from a temporary source file based on type.
383 |
384 | This tool is the first step in a data processing pipeline:
385 | 1. First, fetch data with fetch_data() (this tool) - Creates temp files if needed.
386 | 2. Then, clean the data with transform_data()
387 | 3. Then, filter the data with filter_data()
388 | 4. Finally, analyze the data with analyze_data()
389 |
390 | Args:
391 | source_type: Type of data source (csv or json for this demo).
392 | limit: Maximum number of records to fetch/read (applied after reading).
393 | ctx: Context object passed by the MCP server.
394 |
395 | Returns:
396 | Dictionary containing the fetched data and metadata
397 | """
398 | # Ensure temp files exist
399 | temp_files = await _setup_temp_data_files()
400 | source_path = temp_files.get(source_type.lower())
401 |
402 | if not source_path:
403 | raise ToolInputError(f"Unsupported source_type for demo: {source_type}. Use 'csv' or 'json'.")
404 |
405 | logger.info(f"Fetching data from temporary file: {source_path}")
406 |
407 | # Use read_file tool implicitly to get content
408 | read_result = await read_file(path=source_path)
409 | if not read_result.get("success"):
410 | raise ToolExecutionError(f"Failed to read temporary file {source_path}: {read_result.get('error')}")
411 |
412 | # Assuming read_file returns content in a predictable way (e.g., result['content'][0]['text'])
413 | # Adjust parsing based on actual read_file output structure
414 | content = read_result.get("content", [])
415 | if not content or not isinstance(content, list) or "text" not in content[0]:
416 | raise ToolExecutionError(f"Unexpected content structure from read_file for {source_path}")
417 |
418 | file_content = content[0]["text"]
419 | data = []
420 | schema = {}
421 |
422 | try:
423 | if source_type.lower() == "csv":
424 | # Parse CSV data
425 | csv_reader = csv.reader(io.StringIO(file_content))
426 | headers = next(csv_reader)
427 | for row in csv_reader:
428 | if row: # Skip empty rows
429 | data.append(dict(zip(headers, row, strict=False)))
430 | elif source_type.lower() == "json":
431 | # Parse JSON data
432 | data = json.loads(file_content)
433 | else:
434 | # Default dummy data if somehow type is wrong despite check
435 | data = [{"id": i, "value": f"Sample {i}"} for i in range(1, 6)]
436 | except Exception as parse_error:
437 | raise ToolExecutionError(f"Failed to parse content from {source_path}: {parse_error}") from parse_error
438 |
439 | # Apply limit if specified AFTER reading/parsing
440 | if limit and limit > 0 and len(data) > limit:
441 | data = data[:limit]
442 |
443 | # Infer schema from first record
444 | if data:
445 | first_record = data[0]
446 | for key, value in first_record.items():
447 | value_type = "string"
448 | if isinstance(value, (int, float)):
449 | value_type = "number"
450 | elif isinstance(value, bool):
451 | value_type = "boolean"
452 | schema[key] = value_type
453 |
454 | return {
455 | "data": data,
456 | "record_count": len(data),
457 | "schema": schema,
458 | "source_info": {"type": source_type, "path": source_path},
459 | "next_step": "transform_data" # Hint for the next tool to use
460 | }
461 |
462 | @self.mcp.tool(
463 | description=(
464 | "Transform and clean data using basic text processing tools (sed). "
465 | "This is the SECOND step in the data pipeline. "
466 | "Use after fetch_data() and before filter_data()."
467 | ),
468 | annotations=READONLY_TOOL.to_dict(),
469 | examples=[
470 | {
471 | "name": "Transform sales data",
472 | "description": "Clean and transform sales data",
473 | "input": {
474 | "data": [{"date": "2023-01-01", "amount": "1,200"}, {"date": "2023-01-02", "amount": "950"}],
475 | "transformations": ["convert_dates", "normalize_numbers"]
476 | },
477 | "output": {
478 | "transformed_data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
479 | "transformation_log": ["Converted 2 dates", "Normalized 2 numbers"]
480 | }
481 | }
482 | ]
483 | )
484 | @with_error_handling
485 | async def transform_data(
486 | data: List[Dict[str, Any]],
487 | transformations: List[str] = None,
488 | custom_transformations: Dict[str, str] = None,
489 | ctx=None
490 | ) -> Dict[str, Any]:
491 | """
492 | Transform and clean data using basic text processing tools (sed).
493 |
494 | This tool is the second step in a data processing pipeline:
495 | 1. First, fetch data with fetch_data()
496 | 2. Then, clean the data with transform_data() (this tool)
497 | 3. Then, filter the data with filter_data()
498 | 4. Finally, analyze the data with analyze_data()
499 |
500 | Args:
501 | data: List of data records to transform
502 | transformations: List of built-in transformations to apply
503 | custom_transformations: Dictionary of field->transform_expression
504 | ctx: Context object passed by the MCP server
505 |
506 | Returns:
507 | Dictionary containing the transformed data and transformation log
508 | """
509 | # Validate input
510 | if not data or not isinstance(data, list) or not all(isinstance(r, dict) for r in data):
511 | return {
512 | "error": "Invalid data. Must provide a non-empty list of records (dictionaries)."
513 | }
514 |
515 | transformation_log = []
516 | # Convert input data (list of dicts) to a string format suitable for sed (e.g., JSON lines)
517 | try:
518 | input_text = "\n".join(json.dumps(record) for record in data)
519 | except Exception as e:
520 | return {"error": f"Could not serialize input data for transformation: {e}"}
521 |
522 | current_text = input_text
523 | sed_scripts = [] # Accumulate sed commands
524 |
525 | # Apply standard transformations if specified
526 | transformations = transformations or []
527 | for transform in transformations:
528 | if transform == "convert_dates":
529 | # Use sed to replace '/' with '-' in date-like fields (heuristic)
530 | # This is complex with JSON structure, better done after parsing.
531 | # For demo, we apply a simple global substitution (less robust)
532 | sed_scripts.append("s|/|-|g")
533 | transformation_log.append("Applied date conversion (sed: s|/|-|g)")
534 |
535 | elif transform == "normalize_numbers":
536 | # Use sed to remove commas from numbers (heuristic)
537 | # Example: "amount": "1,200" -> "amount": "1200"
538 | sed_scripts.append('s/"([a-zA-Z_]+)":"([0-9,]+)"/"\1":"\2"/g; s/,//g') # More complex sed needed
539 | transformation_log.append("Applied number normalization (sed: remove commas)")
540 |
541 | # --- Execute accumulated sed scripts ---
542 | if sed_scripts:
543 | # Combine scripts with -e for each
544 | combined_script = " ".join([f"-e '{s}'" for s in sed_scripts])
545 | logger.info(f"Running sed transformation with script: {combined_script}")
546 | try:
547 | sed_result = await run_sed(
548 | args_str=combined_script, # Pass combined script
549 | input_data=current_text
550 | )
551 |
552 | if sed_result.get("success"):
553 | current_text = sed_result["stdout"]
554 | logger.success("Sed transformation completed successfully.")
555 | else:
556 | error_msg = sed_result.get("error", "Sed command failed")
557 | logger.error(f"Sed transformation failed: {error_msg}")
558 | return {"error": f"Transformation failed: {error_msg}"}
559 | except Exception as e:
560 | logger.error(f"Error running sed transformation: {e}", exc_info=True)
561 | return {"error": f"Transformation execution error: {e}"}
562 |
563 | # --- Attempt to parse back to list of dicts ---
564 | try:
565 | transformed_data = []
566 | for line in current_text.strip().split("\n"):
567 | if line:
568 | record = json.loads(line)
569 | # Post-processing for number normalization (sed only removes commas)
570 | if "normalize_numbers" in transformations:
571 | for key, value in record.items():
572 | if isinstance(value, str) and value.replace(".", "", 1).isdigit():
573 | try:
574 | record[key] = float(value) if "." in value else int(value)
575 | except ValueError:
576 | pass # Keep as string if conversion fails
577 | transformed_data.append(record)
578 | logger.success("Successfully parsed transformed data back to JSON objects.")
579 | except Exception as e:
580 | logger.error(f"Could not parse transformed data back to JSON: {e}", exc_info=True)
581 | # Return raw text if parsing fails
582 | return {
583 | "transformed_data_raw": current_text,
584 | "transformation_log": transformation_log,
585 | "warning": "Could not parse final data back to JSON records"
586 | }
587 | # ---------------------------------------------------
588 |
589 | return {
590 | "transformed_data": transformed_data,
591 | "transformation_log": transformation_log,
592 | "record_count": len(transformed_data),
593 | "next_step": "filter_data" # Hint for the next tool to use
594 | }
595 |
596 | @self.mcp.tool(
597 | description=(
598 | "Filter data based on criteria. "
599 | "This is the THIRD step in the data pipeline. "
600 | "Use after transform_data() and before analyze_data()."
601 | ),
602 | annotations=READONLY_TOOL.to_dict(),
603 | examples=[
604 | {
605 | "name": "Filter data",
606 | "description": "Filter data based on criteria",
607 | "input": {
608 | "data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
609 | "filter_criteria": {"amount": {"$gt": 1000}}
610 | },
611 | "output": {
612 | "filtered_data": [{"date": "2023-01-01", "amount": 1200}],
613 | "filter_criteria": {"amount": {"$gt": 1000}}
614 | }
615 | }
616 | ]
617 | )
618 | @with_error_handling
619 | async def filter_data(
620 | data: List[Dict[str, Any]],
621 | filter_criteria: Dict[str, Any],
622 | ctx=None
623 | ) -> Dict[str, Any]:
624 | """
625 | Filter data based on criteria.
626 |
627 | This tool is the third step in a data processing pipeline:
628 | 1. First, fetch data with fetch_data()
629 | 2. Then, clean the data with transform_data()
630 | 3. Then, filter the data with filter_data() (this tool)
631 | 4. Finally, analyze the data with analyze_data()
632 |
633 | Args:
634 | data: List of data records to filter
635 | filter_criteria: Criteria to filter data
636 | ctx: Context object passed by the MCP server
637 |
638 | Returns:
639 | Dictionary containing the filtered data and filter criteria
640 | """
641 | # Filter data based on criteria
642 | filtered_data = [record for record in data if all(record.get(key) == value for key, value in filter_criteria.items())]
643 |
644 | return {
645 | "filtered_data": filtered_data,
646 | "filter_criteria": filter_criteria,
647 | "record_count": len(filtered_data)
648 | }
649 |
650 | @self.mcp.tool(
651 | description=(
652 | "Analyze data. "
653 | "This is the FINAL step in the data pipeline. "
654 | "Use after filtering data with filter_data()."
655 | ),
656 | annotations=READONLY_TOOL.to_dict(),
657 | examples=[
658 | {
659 | "name": "Analyze data",
660 | "description": "Analyze filtered data",
661 | "input": {
662 | "data": [{"date": "2023-01-01", "amount": 1200}, {"date": "2023-01-02", "amount": 950}],
663 | "analysis_type": "summary"
664 | },
665 | "output": {
666 | "analysis_results": [
667 | {"analysis": {"key_topics": ["methodology"]}, "chunk_id": "doc123_chunk_1"},
668 | {"analysis": {"key_topics": ["results"]}, "chunk_id": "doc123_chunk_2"}
669 | ],
670 | "analysis_type": "summary"
671 | }
672 | }
673 | ]
674 | )
675 | @with_error_handling
676 | async def analyze_data(
677 | data: List[Dict[str, Any]],
678 | analysis_type: str = "summary",
679 | ctx=None
680 | ) -> Dict[str, Any]:
681 | """
682 | Analyze data.
683 |
684 | This tool is the final step in a data processing pipeline:
685 | 1. First, fetch data with fetch_data()
686 | 2. Then, clean the data with transform_data()
687 | 3. Then, filter the data with filter_data()
688 | 4. Finally, analyze the data with analyze_data() (this tool)
689 |
690 | Args:
691 | data: List of data records to analyze
692 | analysis_type: Type of analysis to perform
693 | ctx: Context object passed by the MCP server
694 |
695 | Returns:
696 | Dictionary containing the analysis results
697 | """
698 | # Simulate analysis based on analysis_type
699 | if analysis_type == "summary":
700 | # Aggregate analysis results
701 | analysis_results = [{"analysis": {"key_topics": ["methodology"]}, "chunk_id": "doc123_chunk_1"},
702 | {"analysis": {"key_topics": ["results"]}, "chunk_id": "doc123_chunk_2"}]
703 | else:
704 | # Placeholder for other analysis types
705 | analysis_results = []
706 |
707 | return {
708 | "analysis_results": analysis_results,
709 | "analysis_type": analysis_type
710 | }
711 |
712 | async def cleanup_pipeline_data(self):
713 | """Cleans up temporary data files created by fetch_data."""
714 | await _cleanup_temp_data_files({"csv": str(TEMP_DATA_DIR / "temp_sales.csv"), "json": str(TEMP_DATA_DIR / "temp_users.json")})
715 |
716 | # Example usage (if this file were run directly or imported)
717 | # async def run_pipeline_example():
718 | # # ... initialize MCP server ...
719 | # pipeline = DataPipelineExample(mcp_server)
720 | # try:
721 | # # ... run pipeline steps ...
722 | # fetch_result = await pipeline.fetch_data(source_type="csv")
723 | # transform_result = await pipeline.transform_data(data=fetch_result['data'])
724 | # # ... etc ...
725 | # finally:
726 | # await pipeline.cleanup_pipeline_data()
727 |
728 | # asyncio.run(run_pipeline_example())
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/completion.py:
--------------------------------------------------------------------------------
```python
1 | """Text completion tools for Ultimate MCP Server."""
2 | import asyncio
3 | import time
4 | from typing import Any, AsyncGenerator, Dict, List, Optional
5 |
6 | from ultimate_mcp_server.constants import Provider, TaskType
7 | from ultimate_mcp_server.core.providers.base import get_provider, parse_model_string
8 | from ultimate_mcp_server.exceptions import ProviderError, ToolError, ToolInputError
9 | from ultimate_mcp_server.services.cache import with_cache
10 | from ultimate_mcp_server.tools.base import with_error_handling, with_retry, with_tool_metrics
11 | from ultimate_mcp_server.utils import get_logger
12 |
13 | logger = get_logger("ultimate_mcp_server.tools.completion")
14 |
15 | # --- Tool Functions (Standalone, Decorated) ---
16 |
17 | @with_tool_metrics
18 | @with_error_handling
19 | async def generate_completion(
20 | prompt: str,
21 | provider: str = Provider.OPENAI.value,
22 | model: Optional[str] = None,
23 | max_tokens: Optional[int] = None,
24 | temperature: float = 0.7,
25 | stream: bool = False,
26 | json_mode: bool = False,
27 | additional_params: Optional[Dict[str, Any]] = None
28 | ) -> Dict[str, Any]:
29 | """Generates a single, complete text response for a given prompt (non-streaming).
30 |
31 | Use this tool for single-turn tasks where the entire response is needed at once,
32 | such as answering a question, summarizing text, translating, or classifying content.
33 | It waits for the full response from the LLM before returning.
34 |
35 | If you need the response to appear incrementally (e.g., for user interfaces or long generations),
36 | use the `stream_completion` tool instead.
37 |
38 | Args:
39 | prompt: The input text prompt for the LLM.
40 | provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini"). Defaults to "openai".
41 | Use `list_models` or `get_provider_status` to see available providers.
42 | model: The specific model ID (e.g., "openai/gpt-4.1-mini", "anthropic/claude-3-5-haiku-20241022").
43 | If None, the provider's default model is used. Use `list_models` to find available IDs.
44 | max_tokens: (Optional) Maximum number of tokens to generate in the response.
45 | temperature: (Optional) Controls response randomness (0.0=deterministic, 1.0=creative). Default 0.7.
46 | stream: Must be False for this tool. Set to True to trigger an error directing to `stream_completion`.
47 | json_mode: (Optional) When True, instructs the model to return a valid JSON response. Default False.
48 | Note: Support and behavior varies by provider.
49 | additional_params: (Optional) Dictionary of additional provider-specific parameters (e.g., `{"top_p": 0.9}`).
50 |
51 | Returns:
52 | A dictionary containing the full completion and metadata:
53 | {
54 | "text": "The generated completion text...",
55 | "model": "provider/model-used",
56 | "provider": "provider-name",
57 | "tokens": {
58 | "input": 15,
59 | "output": 150,
60 | "total": 165
61 | },
62 | "cost": 0.000123, # Estimated cost in USD
63 | "processing_time": 1.23, # Execution time in seconds
64 | "success": true
65 | }
66 |
67 | Raises:
68 | ToolInputError: If `stream` is set to True.
69 | ProviderError: If the provider is unavailable or the LLM request fails.
70 | ToolError: For other internal errors.
71 | """
72 | # Streaming not supported for this endpoint
73 | if stream:
74 | raise ToolInputError(
75 | "Streaming is not supported for `generate_completion`. Use the `stream_completion` tool instead.",
76 | param_name="stream",
77 | provided_value=stream
78 | )
79 |
80 | start_time = time.time()
81 |
82 | # Check if model contains a provider prefix (e.g., "openai/gpt-4.1-mini")
83 | if model:
84 | extracted_provider, extracted_model = parse_model_string(model)
85 | if extracted_provider:
86 | provider = extracted_provider # Override provider with the one from the model string
87 | model = extracted_model # Use the model name without the provider prefix
88 | logger.debug(f"Using provider '{provider}' and model '{model}' extracted from model string")
89 |
90 | # Get provider instance
91 | try:
92 | # Use provider name directly, get_provider handles splitting if needed
93 | provider_instance = await get_provider(provider)
94 | except Exception as e:
95 | raise ProviderError(
96 | f"Failed to initialize provider '{provider}': {str(e)}",
97 | provider=provider,
98 | cause=e
99 | ) from e
100 |
101 | # Set default additional params
102 | additional_params = additional_params or {}
103 |
104 | # Conditionally construct parameters for the provider call
105 | params_for_provider = {
106 | "prompt": prompt,
107 | "model": model, # model here is already stripped of provider prefix if applicable
108 | "temperature": temperature,
109 | "json_mode": json_mode,
110 | # messages will be handled by chat_completion, this is for simple completion
111 | }
112 | if max_tokens is not None:
113 | params_for_provider["max_tokens"] = max_tokens
114 |
115 | # Merge any other additional_params, ensuring they don't overwrite core params already set
116 | # or ensuring that additional_params are provider-specific and don't conflict.
117 | # A safer merge would be params_for_provider.update({k: v for k, v in additional_params.items() if k not in params_for_provider})
118 | # However, the current **additional_params likely intends to override if keys match, so we keep that behavior for now
119 | # but apply it to the conditionally built dict.
120 | final_provider_params = {**params_for_provider, **additional_params}
121 |
122 | try:
123 | # Generate completion
124 | result = await provider_instance.generate_completion(
125 | **final_provider_params
126 | )
127 |
128 | # Calculate processing time
129 | processing_time = time.time() - start_time
130 |
131 | # Log success
132 | logger.success(
133 | f"Completion generated successfully with {provider}/{result.model}",
134 | emoji_key=TaskType.COMPLETION.value,
135 | tokens={
136 | "input": result.input_tokens,
137 | "output": result.output_tokens
138 | },
139 | cost=result.cost,
140 | time=processing_time
141 | )
142 |
143 | # Return standardized result
144 | return {
145 | "text": result.text,
146 | "model": result.model, # Return the actual model used (might differ from input if default)
147 | "provider": provider,
148 | "tokens": {
149 | "input": result.input_tokens,
150 | "output": result.output_tokens,
151 | "total": result.total_tokens,
152 | },
153 | "cost": result.cost,
154 | "processing_time": processing_time,
155 | "success": True
156 | }
157 |
158 | except Exception as e:
159 | # Convert to provider error
160 | # Use the potentially prefixed model name in the error context
161 | error_model = model or f"{provider}/default"
162 | raise ProviderError(
163 | f"Completion generation failed for model '{error_model}': {str(e)}",
164 | provider=provider,
165 | model=error_model,
166 | cause=e
167 | ) from e
168 |
169 | @with_tool_metrics
170 | @with_error_handling
171 | async def stream_completion(
172 | prompt: str,
173 | provider: str = Provider.OPENAI.value,
174 | model: Optional[str] = None,
175 | max_tokens: Optional[int] = None,
176 | temperature: float = 0.7,
177 | json_mode: bool = False,
178 | additional_params: Optional[Dict[str, Any]] = None
179 | ) -> AsyncGenerator[Dict[str, Any], None]:
180 | """Generates a text completion for a prompt and streams the response chunk by chunk.
181 |
182 | Use this tool when you need to display the LLM's response progressively as it's generated,
183 | improving perceived responsiveness for users, especially for longer outputs.
184 |
185 | If you need the entire response at once, use `generate_completion`.
186 |
187 | Args:
188 | prompt: The input text prompt for the LLM.
189 | provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini"). Defaults to "openai".
190 | Use `list_models` or `get_provider_status` to see available providers.
191 | model: The specific model ID (e.g., "openai/gpt-4.1-mini", "anthropic/claude-3-5-haiku-20241022").
192 | If None, the provider's default model is used. Use `list_models` to find available IDs.
193 | max_tokens: (Optional) Maximum number of tokens to generate in the response.
194 | temperature: (Optional) Controls response randomness (0.0=deterministic, 1.0=creative). Default 0.7.
195 | json_mode: (Optional) When True, instructs the model to return a valid JSON response. Default False.
196 | additional_params: (Optional) Dictionary of additional provider-specific parameters (e.g., `{"top_p": 0.9}`).
197 |
198 | Yields:
199 | A stream of dictionary chunks. Each chunk contains:
200 | {
201 | "text": "The incremental piece of generated text...",
202 | "chunk_index": 1, # Sequence number of this chunk (starts at 1)
203 | "provider": "provider-name",
204 | "model": "provider/model-used",
205 | "finish_reason": null, # Reason generation stopped (e.g., "stop", "length"), null until the end
206 | "finished": false # True only for the very last yielded dictionary
207 | }
208 | The *final* yielded dictionary will have `finished: true` and may also contain aggregate metadata:
209 | {
210 | "text": "", # Final chunk might be empty
211 | "chunk_index": 10,
212 | "provider": "provider-name",
213 | "model": "provider/model-used",
214 | "finish_reason": "stop",
215 | "finished": true,
216 | "full_text": "The complete generated text...", # Full response concatenated
217 | "processing_time": 5.67, # Total time in seconds
218 | "tokens": { "input": ..., "output": ..., "total": ... }, # Final token counts
219 | "cost": 0.000543 # Final estimated cost
220 | "error": null # Error message if one occurred during streaming
221 | }
222 |
223 | Raises:
224 | ProviderError: If the provider is unavailable or the LLM stream request fails initially.
225 | Errors during the stream yield an error message in the final chunk.
226 | """
227 | start_time = time.time()
228 |
229 | # Add MCP annotations for audience and priority
230 | # annotations = { # noqa: F841
231 | # "audience": ["assistant", "user"], # Useful for both assistant and user
232 | # "priority": 0.8 # High priority but not required (generate_completion is the primary tool)
233 | # }
234 |
235 | # Check if model contains a provider prefix (e.g., "openai/gpt-4.1-mini")
236 | if model:
237 | extracted_provider, extracted_model = parse_model_string(model)
238 | if extracted_provider:
239 | provider = extracted_provider # Override provider with the one from the model string
240 | model = extracted_model # Use the model name without the provider prefix
241 | logger.debug(f"Using provider '{provider}' and model '{model}' extracted from model string")
242 |
243 | # Get provider instance
244 | try:
245 | provider_instance = await get_provider(provider)
246 | except Exception as e:
247 | logger.error(
248 | f"Failed to initialize provider '{provider}': {str(e)}",
249 | emoji_key="error",
250 | provider=provider
251 | )
252 | # Yield a single error chunk if provider init fails
253 | yield {
254 | "error": f"Failed to initialize provider '{provider}': {str(e)}",
255 | "text": None,
256 | "finished": True,
257 | "provider": provider,
258 | "model": model
259 | }
260 | return
261 |
262 | # Set default additional params
263 | additional_params = additional_params or {}
264 |
265 | logger.info(
266 | f"Starting streaming completion with {provider}",
267 | emoji_key=TaskType.COMPLETION.value,
268 | prompt_length=len(prompt),
269 | json_mode_requested=json_mode # Log the request
270 | )
271 |
272 | chunk_count = 0
273 | full_text = ""
274 | final_metadata = {} # To store final metadata like model, cost etc.
275 | error_during_stream = None
276 | actual_model_used = model # Keep track of the actual model used
277 |
278 | try:
279 | # Get stream, passing json_mode directly
280 | stream = provider_instance.generate_completion_stream(
281 | prompt=prompt,
282 | model=model,
283 | max_tokens=max_tokens,
284 | temperature=temperature,
285 | json_mode=json_mode, # Pass the flag here
286 | **additional_params
287 | )
288 |
289 | async for chunk, metadata in stream:
290 | chunk_count += 1
291 | full_text += chunk
292 | final_metadata.update(metadata) # Keep track of latest metadata
293 | actual_model_used = metadata.get("model", actual_model_used) # Update if metadata provides it
294 |
295 | # Yield chunk with metadata
296 | yield {
297 | "text": chunk,
298 | "chunk_index": chunk_count,
299 | "provider": provider,
300 | "model": actual_model_used,
301 | "finish_reason": metadata.get("finish_reason"),
302 | "finished": False,
303 | }
304 |
305 | except Exception as e:
306 | error_during_stream = f"Error during streaming after {chunk_count} chunks: {type(e).__name__}: {str(e)}"
307 | logger.error(
308 | f"Error during streaming completion with {provider}/{actual_model_used or 'default'}: {error_during_stream}",
309 | emoji_key="error"
310 | )
311 | # Don't return yet, yield the final chunk with the error
312 |
313 | # --- Final Chunk ---
314 | processing_time = time.time() - start_time
315 |
316 | # Log completion (success or failure based on error_during_stream)
317 | log_level = logger.error if error_during_stream else logger.success
318 | log_message = f"Streaming completion finished ({chunk_count} chunks)" if not error_during_stream else f"Streaming completion failed after {chunk_count} chunks"
319 |
320 | log_level(
321 | log_message,
322 | emoji_key="error" if error_during_stream else "success",
323 | provider=provider,
324 | model=actual_model_used,
325 | tokens={
326 | "input": final_metadata.get("input_tokens"),
327 | "output": final_metadata.get("output_tokens")
328 | },
329 | cost=final_metadata.get("cost"),
330 | time=processing_time,
331 | error=error_during_stream
332 | )
333 |
334 | # Yield the final aggregated chunk
335 | yield {
336 | "text": "", # No new text in the final summary chunk
337 | "chunk_index": chunk_count + 1,
338 | "provider": provider,
339 | "model": actual_model_used,
340 | "finish_reason": final_metadata.get("finish_reason"),
341 | "finished": True,
342 | "full_text": full_text,
343 | "processing_time": processing_time,
344 | "tokens": {
345 | "input": final_metadata.get("input_tokens"),
346 | "output": final_metadata.get("output_tokens"),
347 | "total": final_metadata.get("total_tokens")
348 | },
349 | "cost": final_metadata.get("cost"),
350 | "error": error_during_stream
351 | }
352 |
353 | @with_tool_metrics
354 | @with_error_handling
355 | async def generate_completion_stream(
356 | prompt: str,
357 | provider: str = Provider.OPENAI.value,
358 | model: Optional[str] = None,
359 | max_tokens: Optional[int] = None,
360 | temperature: float = 0.7,
361 | json_mode: bool = False,
362 | additional_params: Optional[Dict[str, Any]] = None
363 | ) -> AsyncGenerator[Dict[str, Any], None]:
364 | """Generates a text response in a streaming fashion for a given prompt.
365 |
366 | Use this tool when you want to display the response as it's being generated
367 | without waiting for the entire response to be completed. It yields chunks of text
368 | as they become available, allowing for more interactive user experiences.
369 |
370 | Args:
371 | prompt: The text prompt to send to the LLM.
372 | provider: The LLM provider to use (default: "openai").
373 | model: The specific model to use (if None, uses provider's default).
374 | max_tokens: Maximum tokens to generate in the response.
375 | temperature: Controls randomness in the output (0.0-1.0).
376 | json_mode: Whether to request JSON formatted output from the model.
377 | additional_params: Additional provider-specific parameters.
378 |
379 | Yields:
380 | Dictionary containing the generated text chunk and metadata:
381 | {
382 | "text": str, # The text chunk
383 | "metadata": {...}, # Additional information about the generation
384 | "done": bool # Whether this is the final chunk
385 | }
386 |
387 | Raises:
388 | ToolError: If an error occurs during text generation.
389 | """
390 | # Initialize variables to track metrics
391 | start_time = time.time()
392 |
393 | try:
394 | # Get provider instance
395 | provider_instance = await get_provider(provider)
396 | if not provider_instance:
397 | raise ValueError(f"Invalid provider: {provider}")
398 |
399 | # Add json_mode to additional_params if specified
400 | params = additional_params.copy() if additional_params else {}
401 | if json_mode:
402 | params["json_mode"] = True
403 |
404 | # Stream the completion
405 | async for chunk, metadata in provider_instance.generate_completion_stream(
406 | prompt=prompt,
407 | model=model,
408 | max_tokens=max_tokens,
409 | temperature=temperature,
410 | **params
411 | ):
412 | # Calculate elapsed time for each chunk
413 | elapsed_time = time.time() - start_time
414 |
415 | # Include additional metadata with each chunk
416 | response = {
417 | "text": chunk,
418 | "metadata": {
419 | **metadata,
420 | "elapsed_time": elapsed_time,
421 | },
422 | "done": metadata.get("finish_reason") is not None
423 | }
424 |
425 | yield response
426 |
427 | except Exception as e:
428 | logger.error(f"Error in generate_completion_stream: {str(e)}", exc_info=True)
429 | raise ToolError(f"Failed to generate streaming completion: {str(e)}") from e
430 |
431 | @with_cache(ttl=24 * 60 * 60) # Cache results for 24 hours
432 | @with_tool_metrics
433 | @with_retry(max_retries=2, retry_delay=1.0) # Retry up to 2 times on failure
434 | @with_error_handling
435 | async def chat_completion(
436 | messages: List[Dict[str, Any]],
437 | provider: str = Provider.OPENAI.value,
438 | model: Optional[str] = None,
439 | max_tokens: Optional[int] = None,
440 | temperature: float = 0.7,
441 | system_prompt: Optional[str] = None,
442 | json_mode: bool = False,
443 | additional_params: Optional[Dict[str, Any]] = None
444 | ) -> Dict[str, Any]:
445 | """Generates a response within a conversational context (multi-turn chat).
446 |
447 | Use this tool for chatbot interactions, instruction following, or any task requiring
448 | the LLM to consider previous turns in a conversation. It takes a list of messages
449 | (user, assistant, system roles) as input.
450 |
451 | This tool automatically retries on transient failures and caches results for identical requests
452 | (based on messages, model, etc.) for 24 hours to save costs and time.
453 | Streaming is NOT supported; this tool returns the complete chat response at once.
454 |
455 | Args:
456 | messages: A list of message dictionaries representing the conversation history.
457 | Each dictionary must have "role" ("user", "assistant", or "system") and "content" (string).
458 | Example: `[{"role": "user", "content": "Hello!"}, {"role": "assistant", "content": "Hi there!"}]`
459 | provider: The name of the LLM provider (e.g., "openai", "anthropic", "gemini"). Defaults to "openai".
460 | model: The specific model ID (e.g., "openai/gpt-4o", "anthropic/claude-3-7-sonnet-20250219").
461 | If None, the provider's default model is used. Use `list_models` to find available IDs.
462 | max_tokens: (Optional) Maximum number of tokens for the *assistant's* response.
463 | temperature: (Optional) Controls response randomness (0.0=deterministic, 1.0=creative). Default 0.7.
464 | system_prompt: (Optional) An initial system message to guide the model's behavior (e.g., persona, instructions).
465 | If provided, it's effectively prepended to the `messages` list as a system message.
466 | json_mode: (Optional) Request structured JSON output from the LLM. Default False.
467 | additional_params: (Optional) Dictionary of additional provider-specific parameters (e.g., `{"top_p": 0.9}`).
468 |
469 | Returns:
470 | A dictionary containing the assistant's response message and metadata:
471 | {
472 | "message": {
473 | "role": "assistant",
474 | "content": "The assistant's generated response..."
475 | },
476 | "model": "provider/model-used",
477 | "provider": "provider-name",
478 | "tokens": {
479 | "input": 55, # Includes all input messages
480 | "output": 120, # Assistant's response only
481 | "total": 175
482 | },
483 | "cost": 0.000150, # Estimated cost in USD
484 | "processing_time": 2.50, # Execution time in seconds
485 | "cached_result": false, # True if the result was served from cache
486 | "success": true
487 | }
488 |
489 | Raises:
490 | ToolInputError: If the `messages` format is invalid.
491 | ProviderError: If the provider is unavailable or the LLM request fails (after retries).
492 | ToolError: For other internal errors.
493 | """
494 | start_time = time.time()
495 |
496 | # Validate messages format
497 | if not isinstance(messages, list) or not all(isinstance(m, dict) and 'role' in m and 'content' in m for m in messages):
498 | raise ToolInputError(
499 | "Invalid messages format. Must be a list of dictionaries, each with 'role' and 'content'.",
500 | param_name="messages",
501 | provided_value=messages
502 | )
503 |
504 | # Prepend system prompt if provided
505 | if system_prompt:
506 | # Avoid modifying the original list if called multiple times
507 | processed_messages = [{"role": "system", "content": system_prompt}] + messages
508 | else:
509 | processed_messages = messages
510 |
511 | # Check if model contains a provider prefix (e.g., "openai/gpt-4.1-mini")
512 | if model:
513 | extracted_provider, extracted_model = parse_model_string(model)
514 | if extracted_provider:
515 | provider = extracted_provider # Override provider with the one from the model string
516 | model = extracted_model # Use the model name without the provider prefix
517 | logger.debug(f"Using provider '{provider}' and model '{model}' extracted from model string")
518 |
519 | # Get provider instance
520 | try:
521 | provider_instance = await get_provider(provider)
522 | except Exception as e:
523 | raise ProviderError(
524 | f"Failed to initialize provider '{provider}': {str(e)}",
525 | provider=provider,
526 | cause=e
527 | ) from e
528 |
529 | additional_params = additional_params or {}
530 | # Add json_mode to additional params if specified
531 | if json_mode:
532 | additional_params["json_mode"] = True
533 |
534 | try:
535 | result = await provider_instance.generate_completion(
536 | messages=processed_messages,
537 | model=model,
538 | max_tokens=max_tokens,
539 | temperature=temperature,
540 | **additional_params
541 | )
542 |
543 | processing_time = time.time() - start_time
544 |
545 | logger.success(
546 | f"Chat completion generated successfully with {provider}/{result.model}",
547 | emoji_key=TaskType.CHAT.value,
548 | tokens={
549 | "input": result.input_tokens,
550 | "output": result.output_tokens
551 | },
552 | cost=result.cost,
553 | time=processing_time
554 | )
555 |
556 | return {
557 | "message": result.message.dict() if hasattr(result.message, 'dict') else result.message, # Return message as dict
558 | "model": result.model,
559 | "provider": provider,
560 | "tokens": {
561 | "input": result.input_tokens,
562 | "output": result.output_tokens,
563 | "total": result.total_tokens,
564 | },
565 | "cost": result.cost,
566 | "processing_time": processing_time,
567 | # Note: cached_result is automatically added by the @with_cache decorator if applicable
568 | "success": True
569 | }
570 |
571 | except Exception as e:
572 | error_model = model or f"{provider}/default"
573 | # Check if the exception has the model attribute, otherwise use the determined error_model
574 | error_model_from_exception = getattr(e, 'model', None)
575 | final_error_model = error_model_from_exception or error_model
576 |
577 | raise ProviderError(
578 | f"Chat completion generation failed for model '{final_error_model}': {str(e)}",
579 | provider=provider,
580 | model=final_error_model,
581 | cause=e
582 | ) from e
583 |
584 |
585 | @with_cache(ttl=7 * 24 * 60 * 60) # Cache results for 7 days
586 | @with_tool_metrics
587 | @with_error_handling # Error handling should be used
588 | async def multi_completion(
589 | prompt: str,
590 | providers: List[Dict[str, Any]],
591 | max_concurrency: int = 3,
592 | timeout: Optional[float] = 30.0
593 | ) -> Dict[str, Any]:
594 | """Generates completions for the same prompt from multiple LLM providers/models concurrently.
595 |
596 | Use this tool to compare responses, latency, or cost across different models for a specific prompt.
597 | It runs requests in parallel up to `max_concurrency`.
598 |
599 | Results are cached for 7 days based on the prompt and provider configurations.
600 |
601 | Args:
602 | prompt: The input text prompt to send to all specified models.
603 | providers: A list of dictionaries, each specifying a provider and model configuration.
604 | Example: `[{"provider": "openai", "model": "gpt-4.1-mini"}, {"provider": "anthropic", "model": "claude-3-5-haiku-20241022", "max_tokens": 50}]`
605 | Each dict must contain at least "provider". "model" is optional (uses provider default).
606 | Other valid parameters for `generate_completion` (like `max_tokens`, `temperature`) can be included.
607 | max_concurrency: (Optional) Maximum number of provider requests to run in parallel. Default 3.
608 | timeout: (Optional) Maximum time in seconds to wait for each individual provider request. Default 30.0.
609 | Requests exceeding this time will result in a timeout error for that specific provider.
610 |
611 | Returns:
612 | A dictionary containing results from each provider, along with aggregate statistics:
613 | {
614 | "results": {
615 | "openai/gpt-4.1-mini": { # Keyed by provider/model
616 | "text": "Response from OpenAI...",
617 | "model": "openai/gpt-4.1-mini",
618 | "provider": "openai",
619 | "tokens": { ... },
620 | "cost": 0.000123,
621 | "processing_time": 1.5,
622 | "success": true,
623 | "error": null
624 | },
625 | "anthropic/claude-3-5-haiku-20241022": {
626 | "text": null,
627 | "model": "anthropic/claude-3-5-haiku-20241022",
628 | "provider": "anthropic",
629 | "tokens": null,
630 | "cost": 0.0,
631 | "processing_time": 30.0,
632 | "success": false,
633 | "error": "Request timed out after 30.0 seconds"
634 | },
635 | ...
636 | },
637 | "aggregate_stats": {
638 | "total_requests": 2,
639 | "successful_requests": 1,
640 | "failed_requests": 1,
641 | "total_cost": 0.000123,
642 | "total_processing_time": 30.1, # Total wall time for the concurrent execution
643 | "average_processing_time": 1.5 # Average time for successful requests
644 | },
645 | "cached_result": false # True if the entire multi_completion result was cached
646 | }
647 |
648 | Raises:
649 | ToolInputError: If the `providers` list format is invalid.
650 | ToolError: For other internal errors during setup.
651 | Individual provider errors are captured within the "results" dictionary.
652 | """
653 | start_time = time.time()
654 |
655 | # Validate providers format
656 | if not isinstance(providers, list) or not all(isinstance(p, dict) and 'provider' in p for p in providers):
657 | raise ToolInputError(
658 | "Invalid providers format. Must be a list of dictionaries, each with at least a 'provider' key.",
659 | param_name="providers",
660 | provided_value=providers
661 | )
662 |
663 | results = {}
664 | tasks = []
665 | semaphore = asyncio.Semaphore(max_concurrency)
666 | total_cost = 0.0
667 | successful_requests = 0
668 | failed_requests = 0
669 | successful_times = []
670 |
671 | async def process_provider(provider_config):
672 | nonlocal total_cost, successful_requests, failed_requests, successful_times
673 | provider_name = provider_config.get("provider")
674 | model_name = provider_config.get("model")
675 | # Create a unique key for results dictionary, handling cases where model might be None initially
676 | result_key = f"{provider_name}/{model_name or 'default'}"
677 |
678 | async with semaphore:
679 | provider_start_time = time.time()
680 | error_message = None
681 | result_data = None
682 | actual_model_used = model_name # Store the actual model reported by the result
683 |
684 | try:
685 | # Extract specific params for generate_completion
686 | completion_params = {k: v for k, v in provider_config.items() if k not in ["provider"]}
687 | completion_params["prompt"] = prompt # Add the common prompt
688 |
689 | logger.debug(f"Calling generate_completion for {provider_name} / {model_name or 'default'}...")
690 |
691 | # Call generate_completion with timeout
692 | completion_task = generate_completion(provider=provider_name, **completion_params)
693 | result_data = await asyncio.wait_for(completion_task, timeout=timeout)
694 |
695 | provider_processing_time = time.time() - provider_start_time
696 |
697 | if result_data and result_data.get("success"):
698 | cost = result_data.get("cost", 0.0)
699 | total_cost += cost
700 | successful_requests += 1
701 | successful_times.append(provider_processing_time)
702 | actual_model_used = result_data.get("model") # Get the actual model used
703 | logger.info(f"Success from {result_key} in {provider_processing_time:.2f}s")
704 | else:
705 | failed_requests += 1
706 | error_message = result_data.get("error", "Unknown error during completion") if isinstance(result_data, dict) else "Invalid result format"
707 | logger.warning(f"Failure from {result_key}: {error_message}")
708 |
709 | except asyncio.TimeoutError:
710 | provider_processing_time = time.time() - provider_start_time
711 | failed_requests += 1
712 | error_message = f"Request timed out after {timeout:.1f} seconds"
713 | logger.warning(f"Timeout for {result_key} after {timeout:.1f}s")
714 | except ProviderError as pe:
715 | provider_processing_time = time.time() - provider_start_time
716 | failed_requests += 1
717 | error_message = f"ProviderError: {str(pe)}"
718 | logger.warning(f"ProviderError for {result_key}: {str(pe)}")
719 | actual_model_used = pe.model # Get model from exception if available
720 | except Exception as e:
721 | provider_processing_time = time.time() - provider_start_time
722 | failed_requests += 1
723 | error_message = f"Unexpected error: {type(e).__name__}: {str(e)}"
724 | logger.error(f"Unexpected error for {result_key}: {e}", exc_info=True)
725 |
726 | # Store result or error
727 | # Use the potentially updated result_key
728 | results[result_key] = {
729 | "text": result_data.get("text") if result_data else None,
730 | "model": actual_model_used, # Use the actual model name from result or exception
731 | "provider": provider_name,
732 | "tokens": result_data.get("tokens") if result_data else None,
733 | "cost": result_data.get("cost", 0.0) if result_data else 0.0,
734 | "processing_time": provider_processing_time,
735 | "success": error_message is None,
736 | "error": error_message
737 | }
738 |
739 | # Create tasks
740 | for config in providers:
741 | task = asyncio.create_task(process_provider(config))
742 | tasks.append(task)
743 |
744 | # Wait for all tasks to complete
745 | await asyncio.gather(*tasks)
746 |
747 | total_processing_time = time.time() - start_time
748 | average_processing_time = sum(successful_times) / len(successful_times) if successful_times else 0.0
749 |
750 | logger.info(
751 | f"Multi-completion finished. Success: {successful_requests}, Failed: {failed_requests}, Total Cost: ${total_cost:.6f}, Total Time: {total_processing_time:.2f}s",
752 | emoji_key="info"
753 | )
754 |
755 | return {
756 | "results": results,
757 | "aggregate_stats": {
758 | "total_requests": len(providers),
759 | "successful_requests": successful_requests,
760 | "failed_requests": failed_requests,
761 | "total_cost": total_cost,
762 | "total_processing_time": total_processing_time,
763 | "average_processing_time": average_processing_time
764 | }
765 | # Note: cached_result is added by decorator
766 | }
```