This is page 8 of 45. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/document.py:
--------------------------------------------------------------------------------
```python
1 | """Document processing service for chunking and analyzing text documents."""
2 | import re
3 | from typing import List
4 |
5 | from ultimate_mcp_server.utils import get_logger
6 |
7 | logger = get_logger(__name__)
8 |
9 |
10 | class DocumentProcessor:
11 | """
12 | Service for intelligent text document processing, chunking, and preparation.
13 |
14 | The DocumentProcessor provides sophisticated document handling capabilities
15 | focused on breaking down long documents into meaningful, properly-sized chunks
16 | optimized for various downstream NLP tasks such as embedding generation,
17 | semantic search, and RAG (Retrieval Augmented Generation).
18 |
19 | Key Features:
20 | - Multiple chunking strategies optimized for different content types
21 | - Configurable chunk size and overlap parameters
22 | - Semantic-aware chunking that preserves context and meaning
23 | - Sentence boundary detection for natural text segmentation
24 | - Token-based chunking for precise size control
25 | - Singleton implementation for efficient resource usage
26 |
27 | Chunking Methods:
28 | 1. Semantic Chunking: Preserves paragraph structure and semantic meaning,
29 | preventing splits that would break logical content boundaries. Best for
30 | maintaining context in well-structured documents.
31 |
32 | 2. Sentence Chunking: Splits documents at sentence boundaries, ensuring
33 | no sentence is broken across chunks. Ideal for natural language text
34 | where sentence integrity is important.
35 |
36 | 3. Token Chunking: Divides text based on approximate token counts without
37 | special consideration for semantic boundaries. Provides the most precise
38 | control over chunk size for token-limited systems.
39 |
40 | Each method implements configurable overlap between chunks to maintain
41 | context across chunk boundaries, ensuring information isn't lost when a
42 | concept spans multiple chunks.
43 |
44 | Usage Example:
45 | ```python
46 | processor = get_document_processor()
47 |
48 | # Chunk a document with default settings (token-based)
49 | chunks = await processor.chunk_document(
50 | document=long_text,
51 | chunk_size=1000,
52 | chunk_overlap=200
53 | )
54 |
55 | # Use semantic chunking for a well-structured document
56 | semantic_chunks = await processor.chunk_document(
57 | document=article_text,
58 | chunk_size=1500,
59 | chunk_overlap=150,
60 | method="semantic"
61 | )
62 |
63 | # Process chunks for embedding or RAG
64 | for chunk in chunks:
65 | # Process each chunk...
66 | ```
67 |
68 | Note:
69 | This service implements the singleton pattern, ensuring only one instance
70 | exists throughout the application. Always use the get_document_processor()
71 | function to obtain the shared instance rather than creating instances directly.
72 | """
73 |
74 | _instance = None
75 |
76 | def __new__(cls, *args, **kwargs):
77 | """Create a singleton instance."""
78 | if cls._instance is None:
79 | cls._instance = super(DocumentProcessor, cls).__new__(cls)
80 | cls._instance._initialized = False
81 | return cls._instance
82 |
83 | def __init__(self):
84 | """Initialize the document processor."""
85 | # Only initialize once for singleton
86 | if getattr(self, "_initialized", False):
87 | return
88 |
89 | logger.info("Document processor initialized", extra={"emoji_key": "success"})
90 | self._initialized = True
91 |
92 | async def chunk_document(
93 | self,
94 | document: str,
95 | chunk_size: int = 1000,
96 | chunk_overlap: int = 200,
97 | method: str = "token"
98 | ) -> List[str]:
99 | """
100 | Split a document into optimally sized, potentially overlapping chunks.
101 |
102 | This method intelligently divides a document into smaller segments using
103 | one of several chunking strategies, balancing chunk size requirements with
104 | preserving semantic coherence. The chunking process is critical for preparing
105 | documents for embedding, retrieval, and other NLP operations that have
106 | input size limitations or benefit from focused context.
107 |
108 | Chunking Methods:
109 | - "token": (Default) Splits text based on approximate token count.
110 | Simple and precise for size control, but may break semantic units.
111 | - "sentence": Preserves sentence boundaries, ensuring no sentence is broken
112 | across chunks. Better for maintaining local context and readability.
113 | - "semantic": Most sophisticated approach that attempts to preserve paragraph
114 | structure and semantic coherence. Best for maintaining document meaning
115 | but may result in more size variation between chunks.
116 |
117 | The chunk_size parameter is approximate for all methods, as they prioritize
118 | maintaining semantic boundaries where appropriate. The actual size of returned
119 | chunks may vary, especially when using sentence or semantic methods.
120 |
121 | Chunk overlap creates a sliding window effect, where the end of one chunk
122 | overlaps with the beginning of the next. This helps maintain context across
123 | chunk boundaries and improves retrieval quality by ensuring concepts that
124 | span multiple chunks can still be found.
125 |
126 | Selecting Parameters:
127 | - For embedding models with strict token limits: Use "token" with chunk_size
128 | set safely below the model's limit
129 | - For maximizing context preservation: Use "semantic" with larger overlap
130 | - For balancing size precision and sentence integrity: Use "sentence"
131 | - Larger overlap (25-50% of chunk_size) improves retrieval quality but
132 | increases storage and processing requirements
133 |
134 | Args:
135 | document: Text content to be chunked
136 | chunk_size: Target size of each chunk in approximate tokens (default: 1000)
137 | chunk_overlap: Number of tokens to overlap between chunks (default: 200)
138 | method: Chunking strategy to use ("token", "sentence", or "semantic")
139 |
140 | Returns:
141 | List of text chunks derived from the original document
142 |
143 | Note:
144 | Returns an empty list if the input document is empty or None.
145 | The token estimation is approximate and based on whitespace splitting,
146 | not a true tokenizer, so actual token counts may differ when processed
147 | by specific models.
148 | """
149 | if not document:
150 | return []
151 |
152 | logger.debug(
153 | f"Chunking document using method '{method}' (size: {chunk_size}, overlap: {chunk_overlap})",
154 | extra={"emoji_key": "processing"}
155 | )
156 |
157 | if method == "semantic":
158 | return await self._chunk_semantic(document, chunk_size, chunk_overlap)
159 | elif method == "sentence":
160 | return await self._chunk_by_sentence(document, chunk_size, chunk_overlap)
161 | else:
162 | # Default to token-based chunking
163 | return await self._chunk_by_tokens(document, chunk_size, chunk_overlap)
164 |
165 | async def _chunk_by_tokens(
166 | self,
167 | document: str,
168 | chunk_size: int = 1000,
169 | chunk_overlap: int = 200
170 | ) -> List[str]:
171 | """
172 | Split document into chunks by approximate token count without preserving semantic structures.
173 |
174 | This is the most straightforward chunking method, dividing text based solely
175 | on approximate token counts without special consideration for sentence or
176 | paragraph boundaries. It provides the most predictable and precise control
177 | over chunk sizes at the cost of potentially breaking semantic units like
178 | sentences or paragraphs.
179 |
180 | Algorithm implementation:
181 | 1. Approximates tokens by splitting text on whitespace (creating "words")
182 | 2. Divides the document into chunks of specified token length
183 | 3. Implements sliding window overlaps between consecutive chunks
184 | 4. Handles edge cases like empty documents and final chunks
185 |
186 | The token approximation used is simple whitespace splitting, which provides
187 | a reasonable estimation for most Western languages and common tokenization
188 | schemes. While not as accurate as model-specific tokenizers, it offers a
189 | good balance between performance and approximation quality for general use.
190 |
191 | Chunk overlap is implemented by including tokens from the end of one chunk
192 | at the beginning of the next, creating a sliding window effect that helps
193 | maintain context across chunk boundaries.
194 |
195 | This method is ideal for:
196 | - Working with strict token limits in downstream models
197 | - Processing text where exact chunk sizes are more important than
198 | preserving semantic structures
199 | - High-volume processing where simplicity and performance are priorities
200 | - Text with unusual or inconsistent formatting where sentence/paragraph
201 | detection might fail
202 |
203 | Args:
204 | document: Text content to split by tokens
205 | chunk_size: Number of tokens (words) per chunk
206 | chunk_overlap: Number of tokens to overlap between chunks
207 |
208 | Returns:
209 | List of text chunks of approximately equal token counts
210 |
211 | Note:
212 | True token counts in NLP models may differ from this approximation,
213 | especially for models with subword tokenization. For applications
214 | requiring exact token counts, consider using the model's specific
215 | tokenizer for more accurate size estimates.
216 | """
217 | # Simple token estimation (split by whitespace)
218 | words = document.split()
219 |
220 | # No words, return empty list
221 | if not words:
222 | return []
223 |
224 | # Simple chunking
225 | chunks = []
226 | start = 0
227 |
228 | while start < len(words):
229 | # Calculate end position with potential overlap
230 | end = min(start + chunk_size, len(words))
231 |
232 | # Create chunk
233 | chunk = " ".join(words[start:end])
234 | chunks.append(chunk)
235 |
236 | # Move to next chunk with overlap
237 | start = end - chunk_overlap
238 |
239 | # Avoid getting stuck at the end
240 | if start >= len(words) - chunk_overlap:
241 | break
242 |
243 | logger.debug(
244 | f"Split document into {len(chunks)} chunks by token",
245 | extra={"emoji_key": "processing"}
246 | )
247 |
248 | return chunks
249 |
250 | async def _chunk_by_sentence(
251 | self,
252 | document: str,
253 | chunk_size: int = 1000,
254 | chunk_overlap: int = 200
255 | ) -> List[str]:
256 | """
257 | Split document into chunks by preserving complete sentences.
258 |
259 | This chunking method respects sentence boundaries when dividing documents,
260 | ensuring that no sentence is fragmented across multiple chunks. It balances
261 | chunk size requirements with maintaining the integrity of natural language
262 | structures, producing more readable and semantically coherent chunks than
263 | simple token-based approaches.
264 |
265 | Algorithm details:
266 | 1. Detects sentence boundaries using regular expressions that handle:
267 | - Standard end punctuation (.!?)
268 | - Common abbreviations (Mr., Dr., etc.)
269 | - Edge cases like decimal numbers or acronyms
270 | 2. Builds chunks by adding complete sentences until the target chunk size
271 | is approached
272 | 3. Creates overlap between chunks by including ending sentences from the
273 | previous chunk at the beginning of the next chunk
274 | 4. Maintains approximate token count targets while prioritizing sentence
275 | integrity
276 |
277 | The sentence detection uses a regex pattern that aims to balance accuracy
278 | with simplicity and efficiency. It identifies likely sentence boundaries by:
279 | - Looking for punctuation marks followed by whitespace
280 | - Excluding common patterns that are not sentence boundaries (e.g., "Mr.")
281 | - Handling basic cases like quotes and parentheses
282 |
283 | This method is ideal for:
284 | - Natural language text where sentence flow is important
285 | - Content where breaking mid-sentence would harm readability or context
286 | - General purpose document processing where semantic units matter
287 | - Documents that don't have clear paragraph structure
288 |
289 | Args:
290 | document: Text content to split by sentences
291 | chunk_size: Target approximate size per chunk in tokens
292 | chunk_overlap: Number of tokens to overlap between chunks
293 |
294 | Returns:
295 | List of document chunks with complete sentences
296 |
297 | Note:
298 | The sentence detection uses regex patterns that work well for most
299 | standard English text but may not handle all edge cases perfectly.
300 | For specialized text with unusual punctuation patterns, additional
301 | customization may be needed.
302 | """
303 | # Simple sentence splitting (not perfect but works for most cases)
304 | sentence_delimiters = r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s'
305 | sentences = re.split(sentence_delimiters, document)
306 | sentences = [s.strip() for s in sentences if s.strip()]
307 |
308 | # No sentences, return empty list
309 | if not sentences:
310 | return []
311 |
312 | # Chunk by sentences, trying to reach target size
313 | chunks = []
314 | current_chunk = []
315 | current_size = 0
316 |
317 | for sentence in sentences:
318 | # Estimate size in tokens (approximate)
319 | sentence_size = len(sentence.split())
320 |
321 | # If adding this sentence exceeds the chunk size and we have content,
322 | # finalize the current chunk
323 | if current_chunk and current_size + sentence_size > chunk_size:
324 | chunks.append(" ".join(current_chunk))
325 |
326 | # Start new chunk with overlap
327 | overlap_size = 0
328 | overlap_chunk = []
329 |
330 | # Add sentences from the end of previous chunk for overlap
331 | for s in reversed(current_chunk):
332 | s_size = len(s.split())
333 | if overlap_size + s_size <= chunk_overlap:
334 | overlap_chunk.insert(0, s)
335 | overlap_size += s_size
336 | else:
337 | break
338 |
339 | current_chunk = overlap_chunk
340 | current_size = overlap_size
341 |
342 | # Add current sentence
343 | current_chunk.append(sentence)
344 | current_size += sentence_size
345 |
346 | # Add the last chunk if not empty
347 | if current_chunk:
348 | chunks.append(" ".join(current_chunk))
349 |
350 | logger.debug(
351 | f"Split document into {len(chunks)} chunks by sentence",
352 | extra={"emoji_key": "processing"}
353 | )
354 |
355 | return chunks
356 |
357 | async def _chunk_semantic(
358 | self,
359 | document: str,
360 | chunk_size: int = 1000,
361 | chunk_overlap: int = 200
362 | ) -> List[str]:
363 | """
364 | Split document into chunks by semantic meaning, preserving paragraph structure.
365 |
366 | This advanced chunking method attempts to maintain the semantic coherence and
367 | natural structure of the document by respecting paragraph boundaries whenever
368 | possible. It implements a hierarchical approach that:
369 |
370 | 1. First divides the document by paragraph breaks (blank lines)
371 | 2. Evaluates each paragraph for length
372 | 3. Keeps short and medium paragraphs intact to preserve their meaning
373 | 4. Further splits overly long paragraphs using sentence boundary detection
374 | 5. Assembles chunks with appropriate overlap for context continuity
375 |
376 | The algorithm prioritizes three key aspects of document structure:
377 | - Paragraph integrity: Treats paragraphs as coherent units of thought
378 | - Logical flow: Maintains document organization when possible
379 | - Size constraints: Respects chunk size limitations for downstream processing
380 |
381 | Implementation details:
382 | - Double newlines (\n\n) are treated as paragraph boundaries
383 | - If a document lacks clear paragraph structure (e.g., single paragraph),
384 | it falls back to sentence-based chunking
385 | - For paragraphs exceeding the chunk size, sentence-based chunking is applied
386 | - Context preservation is achieved by ensuring the last paragraph of a chunk
387 | becomes the first paragraph of the next chunk (when appropriate)
388 |
389 | This method is ideal for:
390 | - Well-structured documents like articles, papers, or reports
391 | - Content where paragraph organization conveys meaning
392 | - Documents where natural breaks exist between conceptual sections
393 | - Cases where preserving document structure improves retrieval quality
394 |
395 | Args:
396 | document: Text content to split semantically
397 | chunk_size: Maximum approximate size per chunk in tokens
398 | chunk_overlap: Number of tokens to overlap between chunks
399 |
400 | Returns:
401 | List of semantic chunks with paragraph structure preserved
402 |
403 | Note:
404 | Chunk sizes may vary more with semantic chunking than with other methods,
405 | as maintaining coherent paragraph groups takes precedence over exact
406 | size enforcement. For strict size control, use token-based chunking.
407 | """
408 | # For simplicity, this implementation is similar to sentence chunking
409 | # but with paragraph awareness
410 |
411 | # Split by paragraphs first
412 | paragraphs = [p.strip() for p in document.split("\n\n") if p.strip()]
413 |
414 | # Fallback to sentence chunking if no clear paragraphs
415 | if len(paragraphs) <= 1:
416 | return await self._chunk_by_sentence(document, chunk_size, chunk_overlap)
417 |
418 | # Process each paragraph and create semantic chunks
419 | chunks = []
420 | current_chunk = []
421 | current_size = 0
422 |
423 | for paragraph in paragraphs:
424 | # Estimate size in tokens
425 | paragraph_size = len(paragraph.split())
426 |
427 | # If paragraph is very large, chunk it further
428 | if paragraph_size > chunk_size:
429 | # Add current chunk if not empty
430 | if current_chunk:
431 | chunks.append("\n\n".join(current_chunk))
432 | current_chunk = []
433 | current_size = 0
434 |
435 | # Chunk large paragraph by sentences
436 | paragraph_chunks = await self._chunk_by_sentence(
437 | paragraph, chunk_size, chunk_overlap
438 | )
439 | chunks.extend(paragraph_chunks)
440 | continue
441 |
442 | # If adding this paragraph exceeds the chunk size and we have content,
443 | # finalize the current chunk
444 | if current_chunk and current_size + paragraph_size > chunk_size:
445 | chunks.append("\n\n".join(current_chunk))
446 |
447 | # Start new chunk with last paragraph for better context
448 | if current_chunk[-1] != paragraph and len(current_chunk) > 0:
449 | current_chunk = [current_chunk[-1]]
450 | current_size = len(current_chunk[-1].split())
451 | else:
452 | current_chunk = []
453 | current_size = 0
454 |
455 | # Add current paragraph
456 | current_chunk.append(paragraph)
457 | current_size += paragraph_size
458 |
459 | # Add the last chunk if not empty
460 | if current_chunk:
461 | chunks.append("\n\n".join(current_chunk))
462 |
463 | logger.debug(
464 | f"Split document into {len(chunks)} chunks semantically",
465 | extra={"emoji_key": "processing"}
466 | )
467 |
468 | return chunks
469 |
470 |
471 | # Singleton instance
472 | _document_processor = None
473 |
474 |
475 | def get_document_processor() -> DocumentProcessor:
476 | """
477 | Get or create the singleton DocumentProcessor instance.
478 |
479 | This function implements the singleton pattern for the DocumentProcessor class,
480 | ensuring that only one instance is created and shared throughout the application.
481 | It provides a consistent, centralized access point for document processing
482 | capabilities while conserving system resources.
483 |
484 | Using a singleton for the DocumentProcessor offers several benefits:
485 | - Resource efficiency: Prevents multiple instantiations of the processor
486 | - Consistency: Ensures all components use the same processing configuration
487 | - Centralized access: Provides a clean API for obtaining the processor
488 | - Lazy initialization: Creates the instance only when first needed
489 |
490 | This function should be used instead of directly instantiating the
491 | DocumentProcessor class to maintain the singleton pattern and ensure
492 | proper initialization.
493 |
494 | Returns:
495 | The shared DocumentProcessor instance
496 |
497 | Usage Example:
498 | ```python
499 | # Get the document processor from anywhere in the codebase
500 | processor = get_document_processor()
501 |
502 | # Use the processor's methods
503 | chunks = await processor.chunk_document(document_text)
504 | ```
505 |
506 | Note:
507 | Even though the DocumentProcessor class itself implements singleton logic in
508 | its __new__ method, this function is the preferred access point as it handles
509 | the global instance management and follows the established pattern used
510 | throughout the MCP server codebase.
511 | """
512 | global _document_processor
513 |
514 | if _document_processor is None:
515 | _document_processor = DocumentProcessor()
516 |
517 | return _document_processor
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/rag.py:
--------------------------------------------------------------------------------
```python
1 | """MCP tools for Retrieval-Augmented Generation (RAG).
2 |
3 | Provides functions to create, manage, and query knowledge bases (vector stores)
4 | and generate text responses augmented with retrieved context.
5 | """
6 | import re
7 | from typing import Any, Dict, List, Optional
8 |
9 | # Import specific exceptions for better error handling hints
10 | from ultimate_mcp_server.exceptions import ProviderError, ResourceError, ToolInputError
11 | from ultimate_mcp_server.services import get_rag_engine
12 |
13 | # Moved imports for services to the top level
14 | from ultimate_mcp_server.services.knowledge_base import (
15 | get_knowledge_base_manager,
16 | get_knowledge_base_retriever,
17 | )
18 | from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
19 | from ultimate_mcp_server.utils import get_logger
20 |
21 | logger = get_logger(__name__)
22 |
23 | # --- Service Lazy Initialization ---
24 |
25 | _kb_manager = None
26 | _kb_retriever = None
27 | _rag_engine = None
28 |
29 | def _get_kb_manager():
30 | """Lazily initializes and returns the Knowledge Base Manager."""
31 | global _kb_manager
32 | if _kb_manager is None:
33 | logger.debug("Initializing KnowledgeBaseManager...")
34 | _kb_manager = get_knowledge_base_manager()
35 | logger.info("KnowledgeBaseManager initialized.")
36 | return _kb_manager
37 |
38 | def _get_kb_retriever():
39 | """Lazily initializes and returns the Knowledge Base Retriever."""
40 | global _kb_retriever
41 | if _kb_retriever is None:
42 | logger.debug("Initializing KnowledgeBaseRetriever...")
43 | _kb_retriever = get_knowledge_base_retriever()
44 | logger.info("KnowledgeBaseRetriever initialized.")
45 | return _kb_retriever
46 |
47 | def _get_rag_engine():
48 | """Lazily initializes and returns the RAG Engine."""
49 | global _rag_engine
50 | if _rag_engine is None:
51 | logger.debug("Initializing RAGEngine...")
52 | _rag_engine = get_rag_engine()
53 | logger.info("RAGEngine initialized.")
54 | return _rag_engine
55 |
56 | # --- Standalone Tool Functions ---
57 |
58 | @with_tool_metrics
59 | @with_error_handling
60 | async def create_knowledge_base(
61 | name: str,
62 | description: Optional[str] = None,
63 | embedding_model: Optional[str] = None,
64 | overwrite: bool = False
65 | ) -> Dict[str, Any]:
66 | """Creates a new, empty knowledge base (vector store) to hold documents.
67 |
68 | This is the first step before adding documents.
69 |
70 | Args:
71 | name: A unique name for the knowledge base (e.g., "project_docs_v1").
72 | Must be a valid identifier (letters, numbers, underscores).
73 | description: (Optional) A brief description of the knowledge base's content or purpose.
74 | embedding_model: (Optional) The specific embedding model ID to use for this knowledge base
75 | (e.g., "openai/text-embedding-3-small"). If None, uses the system default.
76 | Consistency is important; use the same model when adding documents later.
77 | overwrite: (Optional) If True, deletes and recreates the knowledge base if one with the
78 | same name already exists. Defaults to False (raises an error if exists).
79 |
80 | Returns:
81 | A dictionary confirming the creation:
82 | {
83 | "success": true,
84 | "name": "project_docs_v1",
85 | "message": "Knowledge base 'project_docs_v1' created successfully."
86 | }
87 | or an error dictionary if creation failed:
88 | {
89 | "success": false,
90 | "name": "project_docs_v1",
91 | "error": "Knowledge base 'project_docs_v1' already exists."
92 | }
93 |
94 | Raises:
95 | ResourceError: If the knowledge base already exists (and overwrite=False) or
96 | if there's an issue during creation (e.g., invalid name).
97 | ToolInputError: If the provided name is invalid.
98 | """
99 | # Input validation (basic example)
100 | if not name or not re.match(r"^[a-zA-Z0-9_]+$", name):
101 | raise ToolInputError(f"Invalid knowledge base name: '{name}'. Use only letters, numbers, underscores.")
102 |
103 | kb_manager = _get_kb_manager() # Use lazy getter
104 | try:
105 | result = await kb_manager.create_knowledge_base(
106 | name=name,
107 | description=description,
108 | embedding_model=embedding_model,
109 | overwrite=overwrite
110 | )
111 | return result
112 | except Exception as e:
113 | logger.error(f"Failed to create knowledge base '{name}': {e}", exc_info=True)
114 | # Re-raise specific error if possible, otherwise wrap
115 | if isinstance(e, (ResourceError, ToolInputError)):
116 | raise
117 | raise ResourceError(f"Failed to create knowledge base '{name}': {str(e)}", resource_type="knowledge_base", resource_id=name, cause=e) from e
118 |
119 | @with_tool_metrics
120 | @with_error_handling
121 | async def list_knowledge_bases() -> Dict[str, Any]:
122 | """Lists all available knowledge bases and their metadata.
123 |
124 | Returns:
125 | A dictionary containing a list of knowledge base details:
126 | {
127 | "success": true,
128 | "knowledge_bases": [
129 | {
130 | "name": "project_docs_v1",
131 | "description": "Documentation for Project X",
132 | "embedding_model": "openai/text-embedding-3-small",
133 | "document_count": 150,
134 | "created_at": "2023-10-27T10:00:00Z"
135 | },
136 | { ... } # Other knowledge bases
137 | ]
138 | }
139 | or an error dictionary:
140 | {
141 | "success": false,
142 | "error": "Failed to retrieve knowledge base list."
143 | }
144 | Raises:
145 | ResourceError: If there's an issue retrieving the list from the backend.
146 | """
147 | kb_manager = _get_kb_manager()
148 | try:
149 | result = await kb_manager.list_knowledge_bases()
150 | return result
151 | except Exception as e:
152 | logger.error(f"Failed to list knowledge bases: {e}", exc_info=True)
153 | raise ResourceError(f"Failed to list knowledge bases: {str(e)}", resource_type="knowledge_base", cause=e) from e
154 |
155 | @with_tool_metrics
156 | @with_error_handling
157 | async def delete_knowledge_base(name: str) -> Dict[str, Any]:
158 | """Deletes an existing knowledge base and all its documents.
159 |
160 | Warning: This action is irreversible.
161 |
162 | Args:
163 | name: The exact name of the knowledge base to delete.
164 |
165 | Returns:
166 | A dictionary confirming the deletion:
167 | {
168 | "success": true,
169 | "name": "project_docs_v1",
170 | "message": "Knowledge base 'project_docs_v1' deleted successfully."
171 | }
172 | or an error dictionary:
173 | {
174 | "success": false,
175 | "name": "project_docs_v1",
176 | "error": "Knowledge base 'project_docs_v1' not found."
177 | }
178 |
179 | Raises:
180 | ResourceError: If the knowledge base doesn't exist or if deletion fails.
181 | ToolInputError: If the provided name is invalid.
182 | """
183 | if not name:
184 | raise ToolInputError("Knowledge base name cannot be empty.")
185 |
186 | kb_manager = _get_kb_manager()
187 | try:
188 | result = await kb_manager.delete_knowledge_base(name)
189 | return result
190 | except Exception as e:
191 | logger.error(f"Failed to delete knowledge base '{name}': {e}", exc_info=True)
192 | if isinstance(e, (ResourceError, ToolInputError)):
193 | raise
194 | raise ResourceError(f"Failed to delete knowledge base '{name}': {str(e)}", resource_type="knowledge_base", resource_id=name, cause=e) from e
195 |
196 | @with_tool_metrics
197 | @with_error_handling
198 | async def add_documents(
199 | knowledge_base_name: str,
200 | documents: List[str],
201 | metadatas: Optional[List[Dict[str, Any]]] = None,
202 | chunk_size: int = 1000,
203 | chunk_overlap: int = 200,
204 | chunk_method: str = "semantic",
205 | embedding_model: Optional[str] = None
206 | ) -> Dict[str, Any]:
207 | """Adds one or more documents to a specified knowledge base.
208 |
209 | The documents are split into chunks, embedded, and stored for later retrieval.
210 |
211 | Args:
212 | knowledge_base_name: The name of the existing knowledge base to add documents to.
213 | documents: A list of strings, where each string is the full text content of a document.
214 | metadatas: (Optional) A list of dictionaries, one for each document in the `documents` list.
215 | Each dictionary should contain metadata relevant to the corresponding document
216 | (e.g., {"source": "filename.pdf", "page": 1, "author": "Alice"}).
217 | This metadata is stored alongside the document chunks and can be used for filtering during retrieval.
218 | If provided, `len(metadatas)` MUST equal `len(documents)`.
219 | chunk_size: (Optional) The target size for document chunks. Interpretation depends on `chunk_method`
220 | (e.g., tokens for "token" method, characters for "character", approximate size for "semantic").
221 | Defaults to 1000.
222 | chunk_overlap: (Optional) The number of units (tokens, characters) to overlap between consecutive chunks.
223 | Helps maintain context across chunk boundaries. Defaults to 200.
224 | chunk_method: (Optional) The method used for splitting documents into chunks.
225 | Options: "semantic" (attempts to split at meaningful semantic boundaries, recommended),
226 | "token" (splits by token count using tiktoken), "sentence" (splits by sentence).
227 | Defaults to "semantic".
228 | embedding_model: (Optional) The specific embedding model ID to use. If None, uses the model
229 | associated with the knowledge base (or the system default if none was specified
230 | at creation). It's best practice to ensure this matches the KB's model.
231 |
232 | Returns:
233 | A dictionary summarizing the addition process:
234 | {
235 | "success": true,
236 | "knowledge_base_name": "project_docs_v1",
237 | "documents_added": 5,
238 | "chunks_created": 75,
239 | "message": "Successfully added 5 documents (75 chunks) to 'project_docs_v1'."
240 | }
241 | or an error dictionary:
242 | {
243 | "success": false,
244 | "knowledge_base_name": "project_docs_v1",
245 | "error": "Knowledge base 'project_docs_v1' not found."
246 | }
247 |
248 | Raises:
249 | ResourceError: If the knowledge base doesn't exist or if there's an error during processing/storage.
250 | ToolInputError: If inputs are invalid (e.g., documents/metadatas length mismatch, invalid chunk_method).
251 | ProviderError: If the LLM provider fails during generation.
252 | """
253 | if not knowledge_base_name:
254 | raise ToolInputError("Knowledge base name cannot be empty.")
255 | if not documents or not isinstance(documents, list) or not all(isinstance(d, str) for d in documents):
256 | raise ToolInputError("'documents' must be a non-empty list of strings.")
257 | if metadatas and (not isinstance(metadatas, list) or len(metadatas) != len(documents)):
258 | raise ToolInputError("'metadatas', if provided, must be a list with the same length as 'documents'.")
259 | if chunk_method not in ["semantic", "token", "sentence", "character", "paragraph"]: # Added more methods
260 | raise ToolInputError(f"Invalid chunk_method: '{chunk_method}'. Must be one of: semantic, token, sentence, character, paragraph.")
261 |
262 | kb_manager = _get_kb_manager()
263 | try:
264 | result = await kb_manager.add_documents(
265 | knowledge_base_name=knowledge_base_name,
266 | documents=documents,
267 | metadatas=metadatas,
268 | chunk_size=chunk_size,
269 | chunk_overlap=chunk_overlap,
270 | chunk_method=chunk_method,
271 | embedding_model=embedding_model
272 | )
273 | return result
274 | except Exception as e:
275 | logger.error(f"Failed to add documents to knowledge base '{knowledge_base_name}': {e}", exc_info=True)
276 | if isinstance(e, (ResourceError, ToolInputError, ProviderError)):
277 | raise
278 | raise ResourceError(f"Failed to add documents to knowledge base '{knowledge_base_name}': {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e
279 |
280 | @with_tool_metrics
281 | @with_error_handling
282 | async def retrieve_context(
283 | knowledge_base_name: str,
284 | query: str,
285 | top_k: int = 5,
286 | retrieval_method: str = "vector",
287 | min_score: Optional[float] = None, # Changed default to None
288 | metadata_filter: Optional[Dict[str, Any]] = None
289 | ) -> Dict[str, Any]:
290 | """Retrieves relevant document chunks (context) from a knowledge base based on a query.
291 |
292 | Searches the specified knowledge base for chunks semantically similar to the query.
293 |
294 | Args:
295 | knowledge_base_name: The name of the knowledge base to query.
296 | query: The text query to search for relevant context.
297 | top_k: (Optional) The maximum number of relevant chunks to retrieve. Defaults to 5.
298 | retrieval_method: (Optional) The method used for retrieval.
299 | Options: "vector" (semantic similarity search), "hybrid" (combines vector search
300 | with keyword-based search, may require specific backend support).
301 | Defaults to "vector".
302 | min_score: (Optional) The minimum similarity score (typically between 0 and 1) for a chunk
303 | to be included in the results. Higher values mean stricter relevance.
304 | If None (default), the backend decides or no filtering is applied.
305 | metadata_filter: (Optional) A dictionary used to filter results based on metadata associated
306 | with the chunks during `add_documents`. Filters use exact matches.
307 | Example: {"source": "filename.pdf", "page": 5}
308 | Example: {"author": "Alice"}
309 | Defaults to None (no metadata filtering).
310 |
311 | Returns:
312 | A dictionary containing the retrieved context:
313 | {
314 | "success": true,
315 | "query": "What are the project goals?",
316 | "knowledge_base_name": "project_docs_v1",
317 | "retrieved_chunks": [
318 | {
319 | "content": "The main goal of Project X is to improve user engagement...",
320 | "score": 0.85,
321 | "metadata": {"source": "project_plan.docx", "page": 1}
322 | },
323 | { ... } # Other relevant chunks
324 | ]
325 | }
326 | or an error dictionary:
327 | {
328 | "success": false,
329 | "knowledge_base_name": "project_docs_v1",
330 | "error": "Knowledge base 'project_docs_v1' not found."
331 | }
332 |
333 | Raises:
334 | ResourceError: If the knowledge base doesn't exist or retrieval fails.
335 | ToolInputError: If inputs are invalid (e.g., invalid retrieval_method).
336 | """
337 | if not knowledge_base_name:
338 | raise ToolInputError("Knowledge base name cannot be empty.")
339 | if not query or not isinstance(query, str):
340 | raise ToolInputError("Query must be a non-empty string.")
341 | if retrieval_method not in ["vector", "hybrid"]: # Add more methods if supported by backend
342 | raise ToolInputError(f"Invalid retrieval_method: '{retrieval_method}'. Must be one of: vector, hybrid.")
343 |
344 | kb_retriever = _get_kb_retriever()
345 | try:
346 | # Note: The actual implementation might vary based on the retriever service
347 | # Here we assume the service handles different methods via parameters or distinct functions
348 | # Keeping the previous logic structure for now.
349 | if retrieval_method == "hybrid":
350 | # Assuming a specific hybrid method exists or the main retrieve handles it
351 | # This might need adjustment based on the actual service implementation
352 | logger.debug(f"Attempting hybrid retrieval for '{knowledge_base_name}'")
353 | result = await kb_retriever.retrieve_hybrid( # Or potentially kb_retriever.retrieve with a method flag
354 | knowledge_base_name=knowledge_base_name,
355 | query=query,
356 | top_k=top_k,
357 | min_score=min_score,
358 | metadata_filter=metadata_filter
359 | )
360 | else: # Default to vector
361 | logger.debug(f"Attempting vector retrieval for '{knowledge_base_name}'")
362 | result = await kb_retriever.retrieve(
363 | knowledge_base_name=knowledge_base_name,
364 | query=query,
365 | top_k=top_k,
366 | rerank=True, # Assuming rerank is often desired for vector
367 | min_score=min_score,
368 | metadata_filter=metadata_filter
369 | )
370 | return result
371 | except Exception as e:
372 | logger.error(f"Failed to retrieve context from knowledge base '{knowledge_base_name}' for query '{query}': {e}", exc_info=True)
373 | if isinstance(e, (ResourceError, ToolInputError)):
374 | raise
375 | raise ResourceError(f"Failed to retrieve context from knowledge base '{knowledge_base_name}': {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e
376 |
377 | @with_tool_metrics
378 | @with_error_handling
379 | async def generate_with_rag(
380 | knowledge_base_name: str,
381 | query: str,
382 | provider: Optional[str] = None,
383 | model: Optional[str] = None,
384 | template: str = "rag_default",
385 | max_tokens: int = 1000,
386 | temperature: float = 0.3,
387 | top_k: int = 5,
388 | retrieval_method: str = "vector",
389 | min_score: Optional[float] = None, # Changed default to None
390 | include_sources: bool = True
391 | ) -> Dict[str, Any]:
392 | """Generates a response to a query using context retrieved from a knowledge base (RAG).
393 |
394 | This function first retrieves relevant document chunks using `retrieve_context` parameters,
395 | then feeds the query and the retrieved context into an LLM using a specified prompt template
396 | to generate a final, context-aware answer.
397 |
398 | Args:
399 | knowledge_base_name: The name of the knowledge base to retrieve context from.
400 | query: The user's query or question to be answered.
401 | provider: (Optional) The LLM provider for the generation step (e.g., "openai", "anthropic").
402 | If None, the RAG engine selects a default provider.
403 | model: (Optional) The specific LLM model ID for generation (e.g., "openai/gpt-4.1-mini").
404 | If None, the RAG engine selects a default model.
405 | template: (Optional) The name of the prompt template to use for combining the query and context.
406 | Available templates might include: "rag_default" (standard Q&A), "rag_with_sources"
407 | (default, includes source attribution), "rag_summarize" (summarizes retrieved context
408 | based on query), "rag_analysis" (performs analysis based on context).
409 | Defaults to "rag_default" (or potentially "rag_with_sources" depending on engine default).
410 | max_tokens: (Optional) Maximum number of tokens for the generated LLM response. Defaults to 1000.
411 | temperature: (Optional) Sampling temperature for the LLM generation (0.0 to 1.0). Lower values
412 | are more deterministic, higher values more creative. Defaults to 0.3.
413 | top_k: (Optional) Maximum number of context chunks to retrieve (passed to retrieval). Defaults to 5.
414 | retrieval_method: (Optional) Method for retrieving context ("vector", "hybrid"). Defaults to "vector".
415 | min_score: (Optional) Minimum similarity score for retrieved chunks. Defaults to None.
416 | include_sources: (Optional) Whether the final response object should explicitly include details
417 | of the source chunks used for generation. Defaults to True.
418 |
419 | Returns:
420 | A dictionary containing the generated response and related information:
421 | {
422 | "success": true,
423 | "query": "What are the project goals?",
424 | "knowledge_base_name": "project_docs_v1",
425 | "generated_response": "The main goal of Project X is to improve user engagement by implementing features A, B, and C.",
426 | "sources": [ # Included if include_sources=True
427 | {
428 | "content": "The main goal of Project X is to improve user engagement...",
429 | "score": 0.85,
430 | "metadata": {"source": "project_plan.docx", "page": 1}
431 | },
432 | { ... } # Other source chunks used
433 | ],
434 | "model": "openai/gpt-4.1-mini", # Actual model used
435 | "provider": "openai",
436 | "tokens": { "input": ..., "output": ..., "total": ... }, # Generation tokens
437 | "cost": 0.000120,
438 | "processing_time": 5.2,
439 | "retrieval_time": 0.8, # Time spent only on retrieval
440 | "generation_time": 4.4 # Time spent only on generation
441 | }
442 | or an error dictionary:
443 | {
444 | "success": false,
445 | "knowledge_base_name": "project_docs_v1",
446 | "error": "RAG generation failed: Knowledge base 'project_docs_v1' not found."
447 | }
448 |
449 | Raises:
450 | ResourceError: If the knowledge base doesn't exist or retrieval fails.
451 | ProviderError: If the LLM provider fails during generation.
452 | ToolInputError: If inputs are invalid.
453 | """
454 | if not knowledge_base_name:
455 | raise ToolInputError("Knowledge base name cannot be empty.")
456 | if not query or not isinstance(query, str):
457 | raise ToolInputError("Query must be a non-empty string.")
458 |
459 | rag_engine = _get_rag_engine()
460 | try:
461 | result = await rag_engine.generate_with_rag(
462 | knowledge_base_name=knowledge_base_name,
463 | query=query,
464 | provider=provider,
465 | model=model,
466 | template=template,
467 | max_tokens=max_tokens,
468 | temperature=temperature,
469 | top_k=top_k,
470 | retrieval_method=retrieval_method,
471 | min_score=min_score,
472 | include_sources=include_sources
473 | )
474 | return result
475 | except Exception as e:
476 | logger.error(f"RAG generation failed for query on '{knowledge_base_name}': {e}", exc_info=True)
477 | if isinstance(e, (ResourceError, ProviderError, ToolInputError)):
478 | raise
479 | # Wrap generic errors
480 | raise ResourceError(f"RAG generation failed: {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/prompts/templates.py:
--------------------------------------------------------------------------------
```python
1 | """Prompt template management and rendering for Ultimate MCP Server."""
2 | import json
3 | import re
4 | from enum import Enum
5 | from functools import lru_cache
6 | from pathlib import Path
7 | from typing import Any, Dict, List, Optional, Set, Tuple, Union
8 |
9 | from jinja2 import Environment, FileSystemLoader, Template, select_autoescape
10 |
11 | from ultimate_mcp_server.constants import Provider
12 | from ultimate_mcp_server.services.prompts.repository import get_prompt_repository
13 | from ultimate_mcp_server.utils import get_logger
14 |
15 | logger = get_logger(__name__)
16 |
17 |
18 | class TemplateFormat(str, Enum):
19 | """Template format options."""
20 | JINJA = "jinja"
21 | SIMPLE = "simple"
22 | MARKDOWN = "markdown"
23 | JSON = "json"
24 |
25 |
26 | class TemplateType(str, Enum):
27 | """Template type options."""
28 | COMPLETION = "completion"
29 | CHAT = "chat"
30 | SYSTEM = "system"
31 | USER = "user"
32 | FUNCTION = "function"
33 | EXTRACTION = "extraction"
34 |
35 |
36 | class PromptTemplate:
37 | """Template for generating prompts for LLM providers."""
38 |
39 | def __init__(
40 | self,
41 | template: str,
42 | template_id: str,
43 | format: Union[str, TemplateFormat] = TemplateFormat.JINJA,
44 | type: Union[str, TemplateType] = TemplateType.COMPLETION,
45 | metadata: Optional[Dict[str, Any]] = None,
46 | provider_defaults: Optional[Dict[str, Any]] = None,
47 | description: Optional[str] = None,
48 | required_vars: Optional[List[str]] = None,
49 | example_vars: Optional[Dict[str, Any]] = None,
50 | ):
51 | """Initialize a prompt template.
52 |
53 | Args:
54 | template: Template string
55 | template_id: Unique identifier for this template
56 | format: Template format (jinja, simple, markdown, or json)
57 | type: Template type (completion, chat, system, user, function, extraction)
58 | metadata: Optional metadata for the template
59 | provider_defaults: Optional provider-specific defaults
60 | description: Optional description of the template
61 | required_vars: Optional list of required variables
62 | example_vars: Optional example variables for testing
63 | """
64 | self.template = template
65 | self.template_id = template_id
66 |
67 | # Normalize format and type to enum values
68 | self.format = TemplateFormat(format) if isinstance(format, str) else format
69 | self.type = TemplateType(type) if isinstance(type, str) else type
70 |
71 | # Store additional attributes
72 | self.metadata = metadata or {}
73 | self.provider_defaults = provider_defaults or {}
74 | self.description = description
75 | self.example_vars = example_vars or {}
76 |
77 | # Extract required variables based on format
78 | self.required_vars = required_vars or self._extract_required_vars()
79 |
80 | # Compile template if using Jinja format
81 | self._compiled_template: Optional[Template] = None
82 | if self.format == TemplateFormat.JINJA:
83 | self._compiled_template = self._compile_template()
84 |
85 | def _extract_required_vars(self) -> List[str]:
86 | """Extract required variables from template based on format.
87 |
88 | Returns:
89 | List of required variable names
90 | """
91 | if self.format == TemplateFormat.JINJA:
92 | # Extract variables using regex for basic Jinja pattern
93 | matches = re.findall(r'{{(.*?)}}', self.template)
94 | vars_set: Set[str] = set()
95 |
96 | for match in matches:
97 | # Extract variable name (removing filters and whitespace)
98 | var_name = match.split('|')[0].strip()
99 | if var_name and not var_name.startswith('_'):
100 | vars_set.add(var_name)
101 |
102 | return sorted(list(vars_set))
103 |
104 | elif self.format == TemplateFormat.SIMPLE:
105 | # Extract variables from {variable} format
106 | matches = re.findall(r'{([^{}]*)}', self.template)
107 | return sorted(list(set(matches)))
108 |
109 | elif self.format == TemplateFormat.JSON:
110 | # Try to find JSON template variables
111 | try:
112 | # Parse as JSON to find potential variables
113 | template_dict = json.loads(self.template)
114 | return self._extract_json_vars(template_dict)
115 | except json.JSONDecodeError:
116 | logger.warning(
117 | f"Failed to parse JSON template: {self.template_id}",
118 | emoji_key="warning"
119 | )
120 | return []
121 |
122 | # Default: no variables detected
123 | return []
124 |
125 | def _extract_json_vars(self, obj: Any, prefix: str = "") -> List[str]:
126 | """Recursively extract variables from a JSON object.
127 |
128 | Args:
129 | obj: JSON object to extract variables from
130 | prefix: Prefix for nested variables
131 |
132 | Returns:
133 | List of variable names
134 | """
135 | vars_list = []
136 |
137 | if isinstance(obj, dict):
138 | for key, value in obj.items():
139 | if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
140 | # This is a variable placeholder
141 | var_name = value[2:-1] # Remove ${ and }
142 | vars_list.append(f"{prefix}{var_name}")
143 | elif isinstance(value, (dict, list)):
144 | # Recursively extract from nested structures
145 | nested_prefix = f"{prefix}{key}." if prefix else f"{key}."
146 | vars_list.extend(self._extract_json_vars(value, nested_prefix))
147 | elif isinstance(obj, list):
148 | for _i, item in enumerate(obj):
149 | if isinstance(item, (dict, list)):
150 | vars_list.extend(self._extract_json_vars(item, prefix))
151 | elif isinstance(item, str) and item.startswith("${") and item.endswith("}"):
152 | var_name = item[2:-1]
153 | vars_list.append(f"{prefix}{var_name}")
154 |
155 | return sorted(list(set(vars_list)))
156 |
157 | def _compile_template(self) -> Template:
158 | """Compile the Jinja template.
159 |
160 | Returns:
161 | Compiled Jinja template
162 |
163 | Raises:
164 | ValueError: If template compilation fails
165 | """
166 | try:
167 | env = Environment(autoescape=select_autoescape(['html', 'xml']))
168 | return env.from_string(self.template)
169 | except Exception as e:
170 | logger.error(
171 | f"Failed to compile template {self.template_id}: {str(e)}",
172 | emoji_key="error"
173 | )
174 | raise ValueError(f"Invalid template format: {str(e)}") from e
175 |
176 | def render(self, variables: Dict[str, Any]) -> str:
177 | """Render the template with the provided variables.
178 |
179 | Args:
180 | variables: Dictionary of variables to render with
181 |
182 | Returns:
183 | Rendered template string
184 |
185 | Raises:
186 | ValueError: If required variables are missing
187 | """
188 | # Check for required variables
189 | missing_vars = [var for var in self.required_vars if var not in variables]
190 | if missing_vars:
191 | raise ValueError(
192 | f"Missing required variables for template {self.template_id}: {', '.join(missing_vars)}"
193 | )
194 |
195 | # Render based on format
196 | if self.format == TemplateFormat.JINJA:
197 | if not self._compiled_template:
198 | self._compiled_template = self._compile_template()
199 | return self._compiled_template.render(**variables)
200 |
201 | elif self.format == TemplateFormat.SIMPLE:
202 | # Simple variable substitution with {var} syntax
203 | result = self.template
204 | for var_name, var_value in variables.items():
205 | result = result.replace(f"{{{var_name}}}", str(var_value))
206 | return result
207 |
208 | elif self.format == TemplateFormat.JSON:
209 | try:
210 | # Parse template as JSON
211 | template_dict = json.loads(self.template)
212 |
213 | # Replace variables in the JSON structure
214 | rendered_dict = self._render_json_vars(template_dict, variables)
215 |
216 | # Convert back to JSON string
217 | return json.dumps(rendered_dict)
218 |
219 | except json.JSONDecodeError:
220 | logger.error(
221 | f"Failed to parse JSON template: {self.template_id}",
222 | emoji_key="error"
223 | )
224 | # Fall back to simple replacement
225 | return self.template
226 |
227 | elif self.format == TemplateFormat.MARKDOWN:
228 | # Process markdown with simple variable substitution
229 | result = self.template
230 | for var_name, var_value in variables.items():
231 | result = result.replace(f"{{{var_name}}}", str(var_value))
232 | return result
233 |
234 | # Default: return template as is
235 | return self.template
236 |
237 | def _render_json_vars(self, obj: Any, variables: Dict[str, Any]) -> Any:
238 | """Recursively render variables in a JSON object.
239 |
240 | Args:
241 | obj: JSON object to render variables in
242 | variables: Dictionary of variables to render with
243 |
244 | Returns:
245 | Rendered JSON object
246 | """
247 | if isinstance(obj, dict):
248 | return {
249 | key: self._render_json_vars(value, variables)
250 | for key, value in obj.items()
251 | }
252 | elif isinstance(obj, list):
253 | return [self._render_json_vars(item, variables) for item in obj]
254 | elif isinstance(obj, str) and obj.startswith("${") and obj.endswith("}"):
255 | # This is a variable placeholder
256 | var_name = obj[2:-1] # Remove ${ and }
257 | # Get the variable value, or keep placeholder if not found
258 | return variables.get(var_name, obj)
259 | else:
260 | return obj
261 |
262 | def validate_variables(self, variables: Dict[str, Any]) -> Tuple[bool, List[str]]:
263 | """Validate that all required variables are provided.
264 |
265 | Args:
266 | variables: Dictionary of variables to validate
267 |
268 | Returns:
269 | Tuple of (is_valid, missing_variables)
270 | """
271 | missing_vars = [var for var in self.required_vars if var not in variables]
272 | return len(missing_vars) == 0, missing_vars
273 |
274 | def to_dict(self) -> Dict[str, Any]:
275 | """Convert template to dictionary representation.
276 |
277 | Returns:
278 | Dictionary representation of template
279 | """
280 | return {
281 | "template_id": self.template_id,
282 | "template": self.template,
283 | "format": self.format.value,
284 | "type": self.type.value,
285 | "metadata": self.metadata,
286 | "provider_defaults": self.provider_defaults,
287 | "description": self.description,
288 | "required_vars": self.required_vars,
289 | "example_vars": self.example_vars,
290 | }
291 |
292 | @classmethod
293 | def from_dict(cls, data: Dict[str, Any]) -> "PromptTemplate":
294 | """Create a template from dictionary representation.
295 |
296 | Args:
297 | data: Dictionary representation of template
298 |
299 | Returns:
300 | PromptTemplate instance
301 | """
302 | return cls(
303 | template=data["template"],
304 | template_id=data["template_id"],
305 | format=data.get("format", TemplateFormat.JINJA),
306 | type=data.get("type", TemplateType.COMPLETION),
307 | metadata=data.get("metadata"),
308 | provider_defaults=data.get("provider_defaults"),
309 | description=data.get("description"),
310 | required_vars=data.get("required_vars"),
311 | example_vars=data.get("example_vars"),
312 | )
313 |
314 | def get_provider_defaults(self, provider: str) -> Dict[str, Any]:
315 | """Get provider-specific default parameters.
316 |
317 | Args:
318 | provider: Provider name
319 |
320 | Returns:
321 | Dictionary of default parameters
322 | """
323 | return self.provider_defaults.get(provider, {})
324 |
325 |
326 | class PromptTemplateRenderer:
327 | """Service for rendering prompt templates."""
328 |
329 | def __init__(self, template_dir: Optional[Union[str, Path]] = None):
330 | """Initialize the prompt template renderer.
331 |
332 | Args:
333 | template_dir: Optional directory containing template files
334 | """
335 | # Set template directory
336 | if template_dir:
337 | self.template_dir = Path(template_dir)
338 | else:
339 | # Default to project directory / templates
340 | self.template_dir = Path.home() / ".ultimate" / "templates"
341 |
342 | # Create directory if it doesn't exist
343 | self.template_dir.mkdir(parents=True, exist_ok=True)
344 |
345 | # Set up Jinja environment for file-based templates
346 | self.jinja_env = Environment(
347 | loader=FileSystemLoader(str(self.template_dir)),
348 | autoescape=select_autoescape(['html', 'xml']),
349 | trim_blocks=True,
350 | lstrip_blocks=True,
351 | )
352 |
353 | # Get prompt repository for template storage
354 | self.repository = get_prompt_repository()
355 |
356 | # Template cache
357 | self._template_cache: Dict[str, PromptTemplate] = {}
358 |
359 | async def get_template(self, template_id: str) -> Optional[PromptTemplate]:
360 | """Get a template by ID.
361 |
362 | Args:
363 | template_id: Template identifier
364 |
365 | Returns:
366 | PromptTemplate instance or None if not found
367 | """
368 | # Check cache first
369 | if template_id in self._template_cache:
370 | return self._template_cache[template_id]
371 |
372 | # Look up in repository
373 | template_data = await self.repository.get_prompt(template_id)
374 | if template_data:
375 | template = PromptTemplate.from_dict(template_data)
376 | # Cache for future use
377 | self._template_cache[template_id] = template
378 | return template
379 |
380 | # Try to load from file if not in repository
381 | template_path = self.template_dir / f"{template_id}.j2"
382 | if template_path.exists():
383 | # Load template from file
384 | with open(template_path, "r", encoding="utf-8") as f:
385 | template_content = f.read()
386 |
387 | # Create template instance
388 | template = PromptTemplate(
389 | template=template_content,
390 | template_id=template_id,
391 | format=TemplateFormat.JINJA,
392 | )
393 |
394 | # Cache for future use
395 | self._template_cache[template_id] = template
396 | return template
397 |
398 | return None
399 |
400 | async def render_template(
401 | self,
402 | template_id: str,
403 | variables: Dict[str, Any],
404 | provider: Optional[str] = None
405 | ) -> str:
406 | """Render a template with variables.
407 |
408 | Args:
409 | template_id: Template identifier
410 | variables: Variables to render the template with
411 | provider: Optional provider name for provider-specific adjustments
412 |
413 | Returns:
414 | Rendered template string
415 |
416 | Raises:
417 | ValueError: If template not found or rendering fails
418 | """
419 | # Get the template
420 | template = await self.get_template(template_id)
421 | if not template:
422 | raise ValueError(f"Template not found: {template_id}")
423 |
424 | # Check if all required variables are provided
425 | is_valid, missing_vars = template.validate_variables(variables)
426 | if not is_valid:
427 | raise ValueError(
428 | f"Missing required variables for template {template_id}: {', '.join(missing_vars)}"
429 | )
430 |
431 | # Render the template
432 | rendered = template.render(variables)
433 |
434 | # Apply provider-specific adjustments if provided
435 | if provider:
436 | rendered = self._apply_provider_adjustments(rendered, provider, template)
437 |
438 | return rendered
439 |
440 | def _apply_provider_adjustments(
441 | self,
442 | rendered: str,
443 | provider: str,
444 | template: PromptTemplate
445 | ) -> str:
446 | """Apply provider-specific adjustments to rendered template.
447 |
448 | Args:
449 | rendered: Rendered template string
450 | provider: Provider name
451 | template: Template being rendered
452 |
453 | Returns:
454 | Adjusted template string
455 | """
456 | # Apply provider-specific transformations
457 | if provider == Provider.ANTHROPIC.value:
458 | # Anthropic-specific adjustments
459 | if template.type == TemplateType.SYSTEM:
460 | # Ensure no trailing newlines for system prompts
461 | rendered = rendered.rstrip()
462 | elif provider == Provider.OPENAI.value:
463 | # OpenAI-specific adjustments
464 | pass
465 | elif provider == Provider.GEMINI.value:
466 | # Gemini-specific adjustments
467 | pass
468 |
469 | return rendered
470 |
471 | async def save_template(self, template: PromptTemplate) -> bool:
472 | """Save a template to the repository.
473 |
474 | Args:
475 | template: Template to save
476 |
477 | Returns:
478 | True if successful
479 | """
480 | # Update cache
481 | self._template_cache[template.template_id] = template
482 |
483 | # Save to repository
484 | return await self.repository.save_prompt(
485 | prompt_id=template.template_id,
486 | prompt_data=template.to_dict()
487 | )
488 |
489 | async def delete_template(self, template_id: str) -> bool:
490 | """Delete a template from the repository.
491 |
492 | Args:
493 | template_id: Template identifier
494 |
495 | Returns:
496 | True if successful
497 | """
498 | # Remove from cache
499 | if template_id in self._template_cache:
500 | del self._template_cache[template_id]
501 |
502 | # Delete from repository
503 | return await self.repository.delete_prompt(template_id)
504 |
505 | async def list_templates(self) -> List[str]:
506 | """List available templates.
507 |
508 | Returns:
509 | List of template IDs
510 | """
511 | # Get templates from repository
512 | return await self.repository.list_prompts()
513 |
514 | def clear_cache(self) -> None:
515 | """Clear the template cache."""
516 | self._template_cache.clear()
517 |
518 |
519 | # Global template renderer instance
520 | _template_renderer: Optional[PromptTemplateRenderer] = None
521 |
522 |
523 | def get_template_renderer() -> PromptTemplateRenderer:
524 | """Get the global template renderer instance.
525 |
526 | Returns:
527 | PromptTemplateRenderer instance
528 | """
529 | global _template_renderer
530 | if _template_renderer is None:
531 | _template_renderer = PromptTemplateRenderer()
532 | return _template_renderer
533 |
534 |
535 | @lru_cache(maxsize=32)
536 | def get_template_path(template_id: str) -> Optional[Path]:
537 | """Get the path to a template file.
538 |
539 | Args:
540 | template_id: Template identifier
541 |
542 | Returns:
543 | Path to template file or None if not found
544 | """
545 | # Try standard locations
546 | template_dirs = [
547 | # First check the user's template directory
548 | Path.home() / ".ultimate" / "templates",
549 | # Then check the package's template directory
550 | Path(__file__).parent.parent.parent.parent / "templates",
551 | ]
552 |
553 | for template_dir in template_dirs:
554 | # Check for .j2 extension first
555 | template_path = template_dir / f"{template_id}.j2"
556 | if template_path.exists():
557 | return template_path
558 |
559 | # Check for .tpl extension
560 | template_path = template_dir / f"{template_id}.tpl"
561 | if template_path.exists():
562 | return template_path
563 |
564 | # Check for .md extension
565 | template_path = template_dir / f"{template_id}.md"
566 | if template_path.exists():
567 | return template_path
568 |
569 | # Check for .json extension
570 | template_path = template_dir / f"{template_id}.json"
571 | if template_path.exists():
572 | return template_path
573 |
574 | return None
575 |
576 |
577 | async def render_prompt_template(
578 | template_id: str,
579 | variables: Dict[str, Any],
580 | provider: Optional[str] = None
581 | ) -> str:
582 | """Render a prompt template.
583 |
584 | Args:
585 | template_id: Template identifier
586 | variables: Variables to render the template with
587 | provider: Optional provider name for provider-specific adjustments
588 |
589 | Returns:
590 | Rendered template string
591 |
592 | Raises:
593 | ValueError: If template not found or rendering fails
594 | """
595 | renderer = get_template_renderer()
596 | return await renderer.render_template(template_id, variables, provider)
597 |
598 |
599 | async def render_prompt(
600 | template_content: str,
601 | variables: Dict[str, Any],
602 | format: Union[str, TemplateFormat] = TemplateFormat.JINJA,
603 | ) -> str:
604 | """Render a prompt from template content.
605 |
606 | Args:
607 | template_content: Template content string
608 | variables: Variables to render the template with
609 | format: Template format
610 |
611 | Returns:
612 | Rendered template string
613 |
614 | Raises:
615 | ValueError: If rendering fails
616 | """
617 | # Create a temporary template
618 | template = PromptTemplate(
619 | template=template_content,
620 | template_id="_temp_template",
621 | format=format,
622 | )
623 |
624 | # Render and return
625 | return template.render(variables)
626 |
627 |
628 | async def get_template_defaults(
629 | template_id: str,
630 | provider: str
631 | ) -> Dict[str, Any]:
632 | """Get provider-specific default parameters for a template.
633 |
634 | Args:
635 | template_id: Template identifier
636 | provider: Provider name
637 |
638 | Returns:
639 | Dictionary of default parameters
640 |
641 | Raises:
642 | ValueError: If template not found
643 | """
644 | renderer = get_template_renderer()
645 | template = await renderer.get_template(template_id)
646 | if not template:
647 | raise ValueError(f"Template not found: {template_id}")
648 |
649 | return template.get_provider_defaults(provider)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/openrouter.py:
--------------------------------------------------------------------------------
```python
1 | # ultimate/core/providers/openrouter.py
2 | """OpenRouter provider implementation."""
3 | import os
4 | import time
5 | from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union
6 |
7 | from openai import AsyncOpenAI
8 |
9 | from ultimate_mcp_server.config import get_config
10 | from ultimate_mcp_server.constants import DEFAULT_MODELS, Provider
11 | from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
12 | from ultimate_mcp_server.utils import get_logger
13 |
14 | # Use the same naming scheme everywhere: logger at module level
15 | logger = get_logger("ultimate_mcp_server.providers.openrouter")
16 |
17 | # Default OpenRouter Base URL (can be overridden by config)
18 | DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
19 |
20 | class OpenRouterProvider(BaseProvider):
21 | """Provider implementation for OpenRouter API (using OpenAI-compatible interface)."""
22 |
23 | provider_name = Provider.OPENROUTER.value
24 |
25 | def __init__(self, **kwargs):
26 | """Initialize the OpenRouter provider.
27 |
28 | Args:
29 | **kwargs: Additional options:
30 | - base_url (str): Override the default OpenRouter API base URL.
31 | - http_referer (str): Optional HTTP-Referer header.
32 | - x_title (str): Optional X-Title header.
33 | """
34 | config = get_config().providers.openrouter
35 | super().__init__(**kwargs)
36 | self.name = "openrouter"
37 |
38 | # Use config default first, then fallback to constants
39 | self.default_model = config.default_model or DEFAULT_MODELS.get(Provider.OPENROUTER)
40 | if not config.default_model:
41 | logger.debug(f"No default model set in config for OpenRouter, using fallback from constants: {self.default_model}")
42 |
43 | # Get base_url from config, fallback to kwargs, then constant
44 | self.base_url = config.base_url or kwargs.get("base_url", DEFAULT_OPENROUTER_BASE_URL)
45 |
46 | # Get additional headers from config's additional_params
47 | self.http_referer = config.additional_params.get("http_referer") or kwargs.get("http_referer")
48 | self.x_title = config.additional_params.get("x_title") or kwargs.get("x_title")
49 |
50 | # We'll create the client in initialize() instead
51 | self.client = None
52 | self.available_models = []
53 |
54 | async def initialize(self) -> bool:
55 | """Initialize the OpenRouter client.
56 |
57 | Returns:
58 | bool: True if initialization was successful
59 | """
60 | try:
61 | # Create headers dictionary
62 | headers = {}
63 | if self.http_referer:
64 | headers["HTTP-Referer"] = self.http_referer
65 | if self.x_title:
66 | headers["X-Title"] = self.x_title
67 |
68 | # Get timeout from config
69 | config = get_config().providers.openrouter
70 | timeout = config.timeout or 30.0 # Default timeout 30s
71 |
72 | # Check if API key is available
73 | if not self.api_key:
74 | logger.warning(f"{self.name} API key not found in configuration. Provider will be unavailable.")
75 | return False
76 |
77 | # Create the client
78 | self.client = AsyncOpenAI(
79 | base_url=self.base_url,
80 | api_key=self.api_key,
81 | default_headers=headers,
82 | timeout=timeout
83 | )
84 |
85 | # Pre-fetch available models
86 | try:
87 | self.available_models = await self.list_models()
88 | logger.info(f"Loaded {len(self.available_models)} models from OpenRouter")
89 | except Exception as model_err:
90 | logger.warning(f"Failed to fetch models from OpenRouter: {str(model_err)}")
91 | # Use hardcoded fallback models
92 | self.available_models = self._get_fallback_models()
93 |
94 | logger.success(
95 | "OpenRouter provider initialized successfully",
96 | emoji_key="provider"
97 | )
98 | return True
99 |
100 | except Exception as e:
101 | logger.error(
102 | f"Failed to initialize OpenRouter provider: {str(e)}",
103 | emoji_key="error"
104 | )
105 | return False
106 |
107 | def _initialize_client(self, **kwargs):
108 | """Initialize the OpenAI async client with OpenRouter specifics."""
109 | # This method is now deprecated - use initialize() instead
110 | logger.warning("_initialize_client() is deprecated, use initialize() instead")
111 | return False
112 |
113 | async def generate_completion(
114 | self,
115 | prompt: str,
116 | model: Optional[str] = None,
117 | max_tokens: Optional[int] = None,
118 | temperature: float = 0.7,
119 | **kwargs
120 | ) -> ModelResponse:
121 | """Generate a completion using OpenRouter.
122 |
123 | Args:
124 | prompt: Text prompt to send to the model
125 | model: Model name (e.g., "openai/gpt-4.1-mini", "google/gemini-flash-1.5")
126 | max_tokens: Maximum tokens to generate
127 | temperature: Temperature parameter (0.0-1.0)
128 | **kwargs: Additional model-specific parameters, including:
129 | - extra_headers (Dict): Additional headers for this specific call.
130 | - extra_body (Dict): OpenRouter-specific arguments.
131 |
132 | Returns:
133 | ModelResponse with completion result
134 |
135 | Raises:
136 | Exception: If API call fails
137 | """
138 | if not self.client:
139 | initialized = await self._initialize_client()
140 | if not initialized:
141 | raise RuntimeError(f"{self.provider_name} provider not initialized.")
142 |
143 | # Use default model if not specified
144 | model = model or self.default_model
145 |
146 | # Ensure we have a model name before proceeding
147 | if model is None:
148 | logger.error("Completion failed: No model specified and no default model configured for OpenRouter.")
149 | raise ValueError("No model specified and no default model configured for OpenRouter.")
150 |
151 | # Strip provider prefix only if it matches OUR provider name
152 | if model.startswith(f"{self.provider_name}:"):
153 | model = model.split(":", 1)[1]
154 | logger.debug(f"Stripped provider prefix from model name: {model}")
155 | # Note: Keep prefixes like 'openai/' or 'google/' as OpenRouter uses them.
156 | # DO NOT strip other provider prefixes as they're needed for OpenRouter routing
157 |
158 | # Create messages
159 | messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
160 |
161 | # Prepare API call parameters
162 | params = {
163 | "model": model,
164 | "messages": messages,
165 | "temperature": temperature,
166 | }
167 |
168 | if max_tokens is not None:
169 | params["max_tokens"] = max_tokens
170 |
171 | # Extract OpenRouter specific args from kwargs
172 | extra_headers = kwargs.pop("extra_headers", {})
173 | extra_body = kwargs.pop("extra_body", {})
174 |
175 | json_mode = kwargs.pop("json_mode", False)
176 | if json_mode:
177 | # OpenRouter uses OpenAI-compatible API
178 | params["response_format"] = {"type": "json_object"}
179 | self.logger.debug("Setting response_format to JSON mode for OpenRouter")
180 |
181 | # Add any remaining kwargs to the main params (standard OpenAI args)
182 | params.update(kwargs)
183 |
184 | self.logger.info(
185 | f"Generating completion with {self.provider_name} model {model}",
186 | emoji_key=self.provider_name,
187 | prompt_length=len(prompt),
188 | json_mode_requested=json_mode
189 | )
190 |
191 | try:
192 | # Make API call with timing
193 | response, processing_time = await self.process_with_timer(
194 | self.client.chat.completions.create, **params, extra_headers=extra_headers, extra_body=extra_body
195 | )
196 |
197 | # Extract response text
198 | completion_text = response.choices[0].message.content
199 |
200 | # Create standardized response
201 | result = ModelResponse(
202 | text=completion_text,
203 | model=response.model, # Use model returned by API
204 | provider=self.provider_name,
205 | input_tokens=response.usage.prompt_tokens,
206 | output_tokens=response.usage.completion_tokens,
207 | total_tokens=response.usage.total_tokens,
208 | processing_time=processing_time,
209 | raw_response=response,
210 | )
211 |
212 | self.logger.success(
213 | f"{self.provider_name} completion successful",
214 | emoji_key="success",
215 | model=result.model,
216 | tokens={
217 | "input": result.input_tokens,
218 | "output": result.output_tokens
219 | },
220 | cost=result.cost, # Will be calculated by ModelResponse
221 | time=result.processing_time
222 | )
223 |
224 | return result
225 |
226 | except Exception as e:
227 | self.logger.error(
228 | f"{self.provider_name} completion failed for model {model}: {str(e)}",
229 | emoji_key="error",
230 | model=model
231 | )
232 | raise
233 |
234 | async def generate_completion_stream(
235 | self,
236 | prompt: str,
237 | model: Optional[str] = None,
238 | max_tokens: Optional[int] = None,
239 | temperature: float = 0.7,
240 | **kwargs
241 | ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
242 | """Generate a streaming completion using OpenRouter.
243 |
244 | Args:
245 | prompt: Text prompt to send to the model
246 | model: Model name (e.g., "openai/gpt-4.1-mini")
247 | max_tokens: Maximum tokens to generate
248 | temperature: Temperature parameter (0.0-1.0)
249 | **kwargs: Additional model-specific parameters, including:
250 | - extra_headers (Dict): Additional headers for this specific call.
251 | - extra_body (Dict): OpenRouter-specific arguments.
252 |
253 | Yields:
254 | Tuple of (text_chunk, metadata)
255 |
256 | Raises:
257 | Exception: If API call fails
258 | """
259 | if not self.client:
260 | initialized = await self._initialize_client()
261 | if not initialized:
262 | raise RuntimeError(f"{self.provider_name} provider not initialized.")
263 |
264 | model = model or self.default_model
265 | if model.startswith(f"{self.provider_name}:"):
266 | model = model.split(":", 1)[1]
267 | # DO NOT strip other provider prefixes as they're needed for OpenRouter routing
268 |
269 | messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
270 |
271 | params = {
272 | "model": model,
273 | "messages": messages,
274 | "temperature": temperature,
275 | "stream": True,
276 | }
277 | if max_tokens is not None:
278 | params["max_tokens"] = max_tokens
279 |
280 | extra_headers = kwargs.pop("extra_headers", {})
281 | extra_body = kwargs.pop("extra_body", {})
282 |
283 | json_mode = kwargs.pop("json_mode", False)
284 | if json_mode:
285 | # OpenRouter uses OpenAI-compatible API
286 | params["response_format"] = {"type": "json_object"}
287 | self.logger.debug("Setting response_format to JSON mode for OpenRouter streaming")
288 |
289 | params.update(kwargs)
290 |
291 | self.logger.info(
292 | f"Generating streaming completion with {self.provider_name} model {model}",
293 | emoji_key=self.provider_name,
294 | prompt_length=len(prompt),
295 | json_mode_requested=json_mode
296 | )
297 |
298 | start_time = time.time()
299 | total_chunks = 0
300 | final_model_name = model # Store initially requested model
301 |
302 | try:
303 | stream = await self.client.chat.completions.create(**params, extra_headers=extra_headers, extra_body=extra_body)
304 |
305 | async for chunk in stream:
306 | total_chunks += 1
307 | delta = chunk.choices[0].delta
308 | content = delta.content or ""
309 |
310 | # Try to get model name from the chunk if available (some providers include it)
311 | if chunk.model:
312 | final_model_name = chunk.model
313 |
314 | metadata = {
315 | "model": final_model_name,
316 | "provider": self.provider_name,
317 | "chunk_index": total_chunks,
318 | "finish_reason": chunk.choices[0].finish_reason,
319 | }
320 |
321 | yield content, metadata
322 |
323 | processing_time = time.time() - start_time
324 | self.logger.success(
325 | f"{self.provider_name} streaming completion successful",
326 | emoji_key="success",
327 | model=final_model_name,
328 | chunks=total_chunks,
329 | time=processing_time
330 | )
331 |
332 | except Exception as e:
333 | self.logger.error(
334 | f"{self.provider_name} streaming completion failed for model {model}: {str(e)}",
335 | emoji_key="error",
336 | model=model
337 | )
338 | raise
339 |
340 | async def list_models(self) -> List[Dict[str, Any]]:
341 | """List available OpenRouter models (provides examples, not exhaustive).
342 |
343 | OpenRouter offers a vast number of models. This list provides common examples.
344 | Refer to OpenRouter documentation for the full list.
345 |
346 | Returns:
347 | List of example model information dictionaries
348 | """
349 | if self.available_models:
350 | return self.available_models
351 | models = self._get_fallback_models()
352 | return models
353 |
354 | def get_default_model(self) -> str:
355 | """Get the default OpenRouter model.
356 |
357 | Returns:
358 | Default model name (e.g., "openai/gpt-4.1-mini")
359 | """
360 | # Allow override via environment variable
361 | default_model_env = os.environ.get("OPENROUTER_DEFAULT_MODEL")
362 | if default_model_env:
363 | return default_model_env
364 |
365 | # Fallback to constants
366 | return DEFAULT_MODELS.get(self.provider_name, "openai/gpt-4.1-mini")
367 |
368 | async def check_api_key(self) -> bool:
369 | """Check if the OpenRouter API key is valid by attempting a small request."""
370 | if not self.client:
371 | # Try to initialize if not already done
372 | if not await self._initialize_client():
373 | return False # Initialization failed
374 |
375 | try:
376 | # Attempt a simple, low-cost operation, e.g., list models (even if it returns 404/permission error, it validates the key/URL)
377 | # Or use a very small completion request
378 | await self.client.chat.completions.create(
379 | model=self.get_default_model(),
380 | messages=[{"role": "user", "content": "test"}],
381 | max_tokens=1,
382 | temperature=0
383 | )
384 | return True
385 | except Exception as e:
386 | logger.warning(f"API key check failed for {self.provider_name}: {str(e)}", emoji_key="warning")
387 | return False
388 |
389 | def get_available_models(self) -> List[str]:
390 | """Return a list of available model names."""
391 | return [model["id"] for model in self.available_models]
392 |
393 | def is_model_available(self, model_name: str) -> bool:
394 | """Check if a specific model is available."""
395 | available_model_ids = [model["id"] for model in self.available_models]
396 | return model_name in available_model_ids
397 |
398 | async def create_completion(self, model: str, messages: List[Dict[str, str]], stream: bool = False, **kwargs) -> Union[str, AsyncGenerator[str, None]]:
399 | """Create a completion using the specified model."""
400 | if not self.client:
401 | raise RuntimeError("OpenRouter client not initialized (likely missing API key).")
402 |
403 | # Check if model is available
404 | if not self.is_model_available(model):
405 | # Fallback to default if provided model isn't listed? Or raise error?
406 | # Let's try the default model if the requested one isn't confirmed available.
407 | if self.default_model and self.is_model_available(self.default_model):
408 | logger.warning(f"Model '{model}' not found in available list. Falling back to default '{self.default_model}'.")
409 | model = self.default_model
410 | else:
411 | # If even the default isn't available or set, raise error
412 | raise ValueError(f"Model '{model}' is not available via OpenRouter according to fetched list, and no valid default model is set.")
413 |
414 | merged_kwargs = {**kwargs}
415 | # OpenRouter uses standard OpenAI params like max_tokens, temperature, etc.
416 | # Ensure essential params are passed
417 | if 'max_tokens' not in merged_kwargs:
418 | merged_kwargs['max_tokens'] = get_config().providers.openrouter.max_tokens or 1024 # Use config or default
419 |
420 | if stream:
421 | logger.debug(f"Creating stream completion: Model={model}, Params={merged_kwargs}")
422 | return self._stream_completion_generator(model, messages, **merged_kwargs)
423 | else:
424 | logger.debug(f"Creating completion: Model={model}, Params={merged_kwargs}")
425 | try:
426 | response = await self.client.chat.completions.create(
427 | model=model,
428 | messages=messages,
429 | stream=False,
430 | **merged_kwargs
431 | )
432 | # Extract content based on OpenAI library version
433 | if hasattr(response, 'choices') and response.choices:
434 | choice = response.choices[0]
435 | if hasattr(choice, 'message') and hasattr(choice.message, 'content'):
436 | return choice.message.content or "" # Return empty string if content is None
437 | elif hasattr(choice, 'delta') and hasattr(choice.delta, 'content'): # Should not happen for stream=False but check
438 | return choice.delta.content or ""
439 | logger.warning("Could not extract content from OpenRouter response.")
440 | return "" # Return empty string if no content found
441 | except Exception as e:
442 | logger.error(f"OpenRouter completion failed: {e}", exc_info=True)
443 | raise RuntimeError(f"OpenRouter API call failed: {e}") from e
444 |
445 | async def _stream_completion_generator(self, model: str, messages: List[Dict[str, str]], **kwargs) -> AsyncGenerator[str, None]:
446 | """Async generator for streaming completions."""
447 | if not self.client:
448 | raise RuntimeError("OpenRouter client not initialized (likely missing API key).")
449 | try:
450 | stream = await self.client.chat.completions.create(
451 | model=model,
452 | messages=messages,
453 | stream=True,
454 | **kwargs
455 | )
456 | async for chunk in stream:
457 | # Extract content based on OpenAI library version
458 | content = ""
459 | if hasattr(chunk, 'choices') and chunk.choices:
460 | choice = chunk.choices[0]
461 | if hasattr(choice, 'delta') and hasattr(choice.delta, 'content'):
462 | content = choice.delta.content
463 | elif hasattr(choice, 'message') and hasattr(choice.message, 'content'): # Should not happen for stream=True
464 | content = choice.message.content
465 |
466 | if content:
467 | yield content
468 | except Exception as e:
469 | logger.error(f"OpenRouter stream completion failed: {e}", exc_info=True)
470 | # Depending on desired behavior, either raise or yield an error message
471 | # yield f"Error during stream: {e}"
472 | raise RuntimeError(f"OpenRouter API stream failed: {e}") from e
473 |
474 | # --- Cost Calculation (Needs OpenRouter Specific Data) ---
475 | def get_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> Optional[float]:
476 | """Calculate the cost of a request based on OpenRouter pricing.
477 |
478 | Note: Requires loading detailed model pricing info, which is not
479 | done by default in fetch_available_models.
480 | This is a placeholder and needs enhancement.
481 | """
482 | # Placeholder: Need to fetch and store detailed pricing from OpenRouter
483 | # Example structure (needs actual data):
484 | openrouter_pricing = {
485 | # "model_id": {"prompt_cost_per_mtok": X, "completion_cost_per_mtok": Y},
486 | "openai/gpt-4o": {"prompt_cost_per_mtok": 5.0, "completion_cost_per_mtok": 15.0},
487 | "google/gemini-pro-1.5": {"prompt_cost_per_mtok": 3.5, "completion_cost_per_mtok": 10.5},
488 | "anthropic/claude-3-opus": {"prompt_cost_per_mtok": 15.0, "completion_cost_per_mtok": 75.0},
489 | # ... add more model costs from openrouter.ai/docs#models ...
490 | }
491 |
492 | model_cost = openrouter_pricing.get(model)
493 | if model_cost:
494 | prompt_cost = (prompt_tokens / 1_000_000) * model_cost.get("prompt_cost_per_mtok", 0)
495 | completion_cost = (completion_tokens / 1_000_000) * model_cost.get("completion_cost_per_mtok", 0)
496 | return prompt_cost + completion_cost
497 | else:
498 | logger.warning(f"Cost calculation not available for OpenRouter model: {model}")
499 | # Return None if cost cannot be calculated
500 | return None
501 |
502 | # --- Prompt Formatting --- #
503 | def format_prompt(self, messages: List[Dict[str, str]]) -> Any:
504 | """Use standard list of dictionaries format for OpenRouter (like OpenAI)."""
505 | # OpenRouter generally uses the same format as OpenAI
506 | return messages
507 |
508 | def _get_fallback_models(self) -> List[Dict[str, Any]]:
509 | """Return a list of fallback models when API is not accessible."""
510 | return [
511 | {
512 | "id": "mistralai/mistral-large",
513 | "provider": self.provider_name,
514 | "description": "Mistral: Strong open-weight model.",
515 | },
516 | {
517 | "id": "mistralai/mistral-nemo",
518 | "provider": self.provider_name,
519 | "description": "Mistral: Strong open-weight model.",
520 | },
521 | {
522 | "id": "meta-llama/llama-3-70b-instruct",
523 | "provider": self.provider_name,
524 | "description": "Meta: Powerful open-source instruction-tuned model.",
525 | },
526 | ]
527 |
528 | # Make available via discovery
529 | __all__ = ["OpenRouterProvider"]
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/parsing.py:
--------------------------------------------------------------------------------
```python
1 | """Parsing utilities for Ultimate MCP Server.
2 |
3 | This module provides utility functions for parsing and processing
4 | results from Ultimate MCP Server operations that were previously defined in
5 | example scripts but are now part of the library.
6 | """
7 |
8 | import json
9 | import re
10 | from typing import Any, Dict
11 |
12 | from ultimate_mcp_server.utils import get_logger
13 |
14 | # Initialize logger
15 | logger = get_logger("ultimate_mcp_server.utils.parsing")
16 |
17 | def extract_json_from_markdown(text: str) -> str:
18 | """Extracts a JSON string embedded within markdown code fences.
19 |
20 | Handles various markdown code block formats and edge cases:
21 | - Complete code blocks: ```json ... ``` or ``` ... ```
22 | - Alternative fence styles: ~~~json ... ~~~
23 | - Incomplete/truncated blocks with only opening fence
24 | - Multiple code blocks (chooses the first valid JSON)
25 | - Extensive JSON repair for common LLM output issues:
26 | - Unterminated strings
27 | - Trailing commas
28 | - Missing closing brackets
29 | - Unquoted keys
30 | - Truncated content
31 |
32 | If no valid JSON-like content is found in fences, returns the original string.
33 |
34 | Args:
35 | text: The input string possibly containing markdown-fenced JSON.
36 |
37 | Returns:
38 | The extracted JSON string or the stripped original string.
39 | """
40 | if not text:
41 | return ""
42 |
43 | cleaned_text = text.strip()
44 | possible_json_candidates = []
45 |
46 | # Try to find JSON inside complete code blocks with various fence styles
47 | # Look for backtick fences (most common)
48 | backtick_matches = re.finditer(r"```(?:json)?\s*(.*?)\s*```", cleaned_text, re.DOTALL | re.IGNORECASE)
49 | for match in backtick_matches:
50 | possible_json_candidates.append(match.group(1).strip())
51 |
52 | # Look for tilde fences (less common but valid in some markdown)
53 | tilde_matches = re.finditer(r"~~~(?:json)?\s*(.*?)\s*~~~", cleaned_text, re.DOTALL | re.IGNORECASE)
54 | for match in tilde_matches:
55 | possible_json_candidates.append(match.group(1).strip())
56 |
57 | # If no complete blocks found, check for blocks with only opening fence
58 | if not possible_json_candidates:
59 | # Try backtick opening fence
60 | backtick_start = re.search(r"```(?:json)?\s*", cleaned_text, re.IGNORECASE)
61 | if backtick_start:
62 | content_after_fence = cleaned_text[backtick_start.end():].strip()
63 | possible_json_candidates.append(content_after_fence)
64 |
65 | # Try tilde opening fence
66 | tilde_start = re.search(r"~~~(?:json)?\s*", cleaned_text, re.IGNORECASE)
67 | if tilde_start:
68 | content_after_fence = cleaned_text[tilde_start.end():].strip()
69 | possible_json_candidates.append(content_after_fence)
70 |
71 | # If still no candidates, add the original text as last resort
72 | if not possible_json_candidates:
73 | possible_json_candidates.append(cleaned_text)
74 |
75 | # Try each candidate, returning the first one that looks like valid JSON
76 | for candidate in possible_json_candidates:
77 | # Apply advanced JSON repair
78 | repaired = _repair_json(candidate)
79 | try:
80 | # Validate if it's actually parseable JSON
81 | json.loads(repaired)
82 | return repaired # Return the first valid JSON
83 | except json.JSONDecodeError:
84 | # If repair didn't work, continue to the next candidate
85 | continue
86 |
87 | # If no candidate worked with regular repair, try more aggressive repair on the first candidate
88 | if possible_json_candidates:
89 | aggressive_repair = _repair_json(possible_json_candidates[0], aggressive=True)
90 | try:
91 | json.loads(aggressive_repair)
92 | return aggressive_repair
93 | except json.JSONDecodeError:
94 | # Return the best we can - the first candidate with basic cleaning
95 | # This will still fail in json.loads, but at least we tried
96 | return possible_json_candidates[0]
97 |
98 | # Absolute fallback - return the original text
99 | return cleaned_text
100 |
101 | def _repair_json(text: str, aggressive=False) -> str:
102 | """
103 | Repair common JSON formatting issues in LLM-generated output.
104 |
105 | This internal utility function applies a series of transformations to fix common
106 | JSON formatting problems that frequently occur in LLM outputs. It can operate in
107 | two modes: standard and aggressive.
108 |
109 | In standard mode (aggressive=False), it applies basic repairs like:
110 | - Removing trailing commas before closing brackets/braces
111 | - Ensuring property names are properly quoted
112 | - Basic structure validation
113 |
114 | In aggressive mode (aggressive=True), it applies more extensive repairs:
115 | - Fixing unterminated string literals by adding missing quotes
116 | - Balancing unmatched brackets and braces
117 | - Adding missing values for dangling properties
118 | - Handling truncated JSON at the end of strings
119 | - Attempting to recover partial JSON structures
120 |
121 | The aggressive repairs are particularly useful when dealing with outputs from
122 | models that have been truncated mid-generation or contain structural errors
123 | that would normally make the JSON unparseable.
124 |
125 | Args:
126 | text: The JSON-like string to repair, potentially containing formatting errors
127 | aggressive: Whether to apply more extensive repair techniques beyond basic
128 | formatting fixes. Default is False (basic repairs only).
129 |
130 | Returns:
131 | A repaired JSON string that is more likely to be parseable. Note that even
132 | with aggressive repairs, the function cannot guarantee valid JSON for
133 | severely corrupted inputs.
134 |
135 | Note:
136 | This function is intended for internal use by extract_json_from_markdown.
137 | While it attempts to fix common issues, it may not address all possible
138 | JSON formatting problems, especially in severely malformed inputs.
139 | """
140 | if not text:
141 | return text
142 |
143 | # Step 1: Basic cleanup
144 | result = text.strip()
145 |
146 | # Quick check if it even remotely looks like JSON
147 | if not (result.startswith('{') or result.startswith('[')):
148 | return result
149 |
150 | # Step 2: Fix common issues
151 |
152 | # Fix trailing commas before closing brackets
153 | result = re.sub(r',\s*([\}\]])', r'\1', result)
154 |
155 | # Ensure property names are quoted
156 | result = re.sub(r'([{,]\s*)([a-zA-Z0-9_$]+)(\s*:)', r'\1"\2"\3', result)
157 |
158 | # If we're not in aggressive mode, return after basic fixes
159 | if not aggressive:
160 | return result
161 |
162 | # Step 3: Aggressive repairs for truncated/malformed JSON
163 |
164 | # Track opening/closing brackets to detect imbalance
165 | open_braces = result.count('{')
166 | close_braces = result.count('}')
167 | open_brackets = result.count('[')
168 | close_brackets = result.count(']')
169 |
170 | # Count quotes to check if we have an odd number (indicating unterminated strings)
171 | quote_count = result.count('"')
172 | if quote_count % 2 != 0:
173 | # We have an odd number of quotes, meaning at least one string is unterminated
174 | # This is a much more aggressive approach to fix strings
175 |
176 | # First, try to find all strings that are properly terminated
177 | proper_strings = []
178 | pos = 0
179 | in_string = False
180 | string_start = 0
181 |
182 | # This helps track properly formed strings and identify problematic ones
183 | while pos < len(result):
184 | if result[pos] == '"' and (pos == 0 or result[pos-1] != '\\'):
185 | if not in_string:
186 | # Start of a string
187 | in_string = True
188 | string_start = pos
189 | else:
190 | # End of a string
191 | in_string = False
192 | proper_strings.append((string_start, pos))
193 | pos += 1
194 |
195 | # If we're still in a string at the end, we found an unterminated string
196 | if in_string:
197 | # Force terminate it at the end
198 | result += '"'
199 |
200 | # Even more aggressive string fixing
201 | # This regexp looks for a quote followed by any characters not containing a quote
202 | # followed by a comma, closing brace, or bracket, without a quote in between
203 | # This indicates an unterminated string
204 | result = re.sub(r'"([^"]*?)(?=,|\s*[\]}]|$)', r'"\1"', result)
205 |
206 | # Fix cases where value might be truncated mid-word just before closing quote
207 | # If we find something that looks like it's in the middle of a string, terminate it
208 | result = re.sub(r'"([^"]+)(\s*[\]}]|,|$)', lambda m:
209 | f'"{m.group(1)}"{"" if m.group(2).startswith(",") or m.group(2) in "]}," else m.group(2)}',
210 | result)
211 |
212 | # Fix dangling quotes at the end of the string - these usually indicate a truncated string
213 | if result.rstrip().endswith('"'):
214 | # Add closing quote and appropriate structure depending on context
215 | result = result.rstrip() + '"'
216 |
217 | # Look at the previous few characters to determine if we need a comma or not
218 | context = result[-20:] if len(result) > 20 else result
219 | # If string ends with x": " it's likely a property name
220 | if re.search(r'"\s*:\s*"$', context):
221 | # Add a placeholder value and closing structure for the property
222 | result += "unknown"
223 |
224 | # Check for dangling property (property name with colon but no value)
225 | result = re.sub(r'"([^"]+)"\s*:(?!\s*["{[\w-])', r'"\1": null', result)
226 |
227 | # Add missing closing brackets/braces if needed
228 | if open_braces > close_braces:
229 | result += '}' * (open_braces - close_braces)
230 | if open_brackets > close_brackets:
231 | result += ']' * (open_brackets - close_brackets)
232 |
233 | # Handle truncated JSON structure - look for incomplete objects at the end
234 | # This is complex, but we'll try some common patterns
235 |
236 | # If JSON ends with a property name and colon but no value
237 | if re.search(r'"[^"]+"\s*:\s*$', result):
238 | result += 'null'
239 |
240 | # If JSON ends with a comma, it needs another value - add a null
241 | if re.search(r',\s*$', result):
242 | result += 'null'
243 |
244 | # If the JSON structure is fundamentally corrupted at the end (common in truncation)
245 | # Close any unclosed objects or arrays
246 | if not (result.endswith('}') or result.endswith(']') or result.endswith('"')):
247 | # Count unmatched opening brackets
248 | stack = []
249 | for char in result:
250 | if char in '{[':
251 | stack.append(char)
252 | elif char in '}]':
253 | if stack and ((stack[-1] == '{' and char == '}') or (stack[-1] == '[' and char == ']')):
254 | stack.pop()
255 |
256 | # Close any unclosed structures
257 | for bracket in reversed(stack):
258 | if bracket == '{':
259 | result += '}'
260 | elif bracket == '[':
261 | result += ']'
262 |
263 | # As a final safeguard, try to eval the JSON with a permissive parser
264 | # This won't fix deep structural issues but catches cases our regexes missed
265 | try:
266 | import simplejson
267 | simplejson.loads(result, parse_constant=lambda x: x)
268 | except (ImportError, simplejson.JSONDecodeError):
269 | try:
270 | # Try one last time with the more permissive custom JSON parser
271 | _scan_once = json.scanner.py_make_scanner(json.JSONDecoder())
272 | try:
273 | _scan_once(result, 0)
274 | except StopIteration:
275 | # Likely unterminated JSON - do one final pass of common fixups
276 |
277 | # Check for unterminated strings of various forms one more time
278 | if re.search(r'(?<!")"(?:[^"\\]|\\.)*[^"\\](?!")(?=,|\s*[\]}]|$)', result):
279 | # Even more aggressive fixes, replacing with generic values
280 | result = re.sub(r'(?<!")"(?:[^"\\]|\\.)*[^"\\](?!")(?=,|\s*[\]}]|$)',
281 | r'"invalid_string"', result)
282 |
283 | # Ensure valid JSON-like structure
284 | if not (result.endswith('}') or result.endswith(']')):
285 | if result.count('{') > result.count('}'):
286 | result += '}'
287 | if result.count('[') > result.count(']'):
288 | result += ']'
289 | except Exception:
290 | # Something else is wrong, but we've tried our best
291 | pass
292 | except Exception:
293 | # We've done all we reasonably can
294 | pass
295 |
296 | return result
297 |
298 | async def parse_result(result: Any) -> Dict[str, Any]:
299 | """Parse the result from a tool call into a usable dictionary.
300 |
301 | Handles various return types from MCP tools, including TextContent objects,
302 | list results, and direct dictionaries. Attempts to extract JSON from
303 | markdown code fences if present.
304 |
305 | Args:
306 | result: Result from an MCP tool call or provider operation
307 |
308 | Returns:
309 | Parsed dictionary containing the result data
310 | """
311 | try:
312 | text_to_parse = None
313 | # Handle TextContent object (which has a .text attribute)
314 | if hasattr(result, 'text'):
315 | text_to_parse = result.text
316 |
317 | # Handle list result
318 | elif isinstance(result, list):
319 | if result:
320 | first_item = result[0]
321 | if hasattr(first_item, 'text'):
322 | text_to_parse = first_item.text
323 | elif isinstance(first_item, dict):
324 | # NEW: Check if it's an MCP-style text content dict
325 | if first_item.get("type") == "text" and "text" in first_item:
326 | text_to_parse = first_item["text"]
327 | else:
328 | # It's some other dictionary, return it directly
329 | return first_item
330 | elif isinstance(first_item, str):
331 | text_to_parse = first_item
332 | else:
333 | logger.warning(f"List item type not directly parseable: {type(first_item)}")
334 | return {"error": f"List item type not directly parseable: {type(first_item)}", "original_result_type": str(type(result))}
335 | else: # Empty list
336 | return {} # Or perhaps an error/warning? For now, empty dict.
337 |
338 | # Handle dictionary directly
339 | elif isinstance(result, dict):
340 | return result
341 |
342 | # Handle string directly
343 | elif isinstance(result, str):
344 | text_to_parse = result
345 |
346 | # If text_to_parse is still None or is empty/whitespace after potential assignments
347 | if text_to_parse is None or not text_to_parse.strip():
348 | logger.warning(f"No parsable text content found in result (type: {type(result)}, content preview: \'{str(text_to_parse)[:100]}...\').")
349 | return {"error": "No parsable text content found in result", "result_type": str(type(result)), "content_preview": str(text_to_parse)[:100] if text_to_parse else None}
350 |
351 | # At this point, text_to_parse should be a non-empty string.
352 | # Attempt to extract JSON from markdown (if any)
353 | # If no markdown, or extraction fails, json_to_parse will be text_to_parse itself.
354 | json_to_parse = extract_json_from_markdown(text_to_parse)
355 |
356 | if not json_to_parse.strip(): # If extraction resulted in an empty string (e.g. from "``` ```")
357 | logger.warning(f"JSON extraction from text_to_parse yielded an empty string. Original text_to_parse: \'{text_to_parse[:200]}...\'")
358 | # Fallback to trying the original text_to_parse if extraction gave nothing useful
359 | # This covers cases where text_to_parse might be pure JSON without fences.
360 | if text_to_parse.strip(): # Ensure original text_to_parse wasn't also empty
361 | json_to_parse = text_to_parse
362 | else: # Should have been caught by the earlier check, but as a safeguard:
363 | return {"error": "Content became empty after attempting JSON extraction", "original_text_to_parse": text_to_parse}
364 |
365 |
366 | # Now, json_to_parse should be the best candidate string for JSON parsing.
367 | # Only attempt to parse if it's not empty/whitespace.
368 | if not json_to_parse.strip():
369 | logger.warning(f"Final string to parse is empty. Original text_to_parse: \'{text_to_parse[:200]}...\'")
370 | return {"error": "Final string for JSON parsing is empty", "original_text_to_parse": text_to_parse}
371 |
372 | try:
373 | return json.loads(json_to_parse)
374 | except json.JSONDecodeError as e:
375 | problematic_text_for_repair = json_to_parse # This is the string that failed json.loads
376 | logger.warning(f"Initial JSON parsing failed for: '{problematic_text_for_repair[:200]}...' Error: {e}. Attempting LLM repair...", emoji_key="warning")
377 | try:
378 | from ultimate_mcp_server.tools.completion import generate_completion
379 |
380 | system_message_content = "You are an expert JSON repair assistant. Your goal is to return only valid JSON."
381 | # Prepend system instruction to the main prompt for completion models
382 | # (as generate_completion with openai provider doesn't natively use a separate system_prompt field in its current design)
383 | user_repair_request = (
384 | f"The following text is supposed to be valid JSON but failed parsing. "
385 | f"Please correct it and return *only* the raw, valid JSON string. "
386 | f"Do not include any explanations or markdown formatting. "
387 | f"If it's impossible to extract or repair to valid JSON, return an empty JSON object {{}}. "
388 | f"Problematic text:\n\n```text\n{problematic_text_for_repair}\n```"
389 | )
390 | combined_prompt = f"{system_message_content}\n\n{user_repair_request}"
391 |
392 | llm_repair_result = await generate_completion(
393 | prompt=combined_prompt, # Use the combined prompt
394 | provider="openai",
395 | model="gpt-4.1-mini",
396 | temperature=0.0,
397 | additional_params={} # Remove system_prompt from here
398 | )
399 |
400 | text_from_llm_repair = llm_repair_result.get("text", "")
401 | if not text_from_llm_repair.strip():
402 | logger.error("LLM repair attempt returned empty string.")
403 | return {"error": "LLM repair returned empty string", "original_text": problematic_text_for_repair}
404 |
405 | # Re-extract from LLM response, as it might add fences
406 | final_repaired_json_str = extract_json_from_markdown(text_from_llm_repair)
407 |
408 | if not final_repaired_json_str.strip():
409 | logger.error(f"LLM repair extracted an empty JSON string from LLM response: {text_from_llm_repair[:200]}...")
410 | return {"error": "LLM repair extracted empty JSON", "llm_response": text_from_llm_repair, "original_text": problematic_text_for_repair}
411 |
412 | try:
413 | logger.debug(f"Attempting to parse LLM-repaired JSON: {final_repaired_json_str[:200]}...")
414 | parsed_llm_result = json.loads(final_repaired_json_str)
415 | logger.success("LLM JSON repair successful.", emoji_key="success")
416 | return parsed_llm_result
417 | except json.JSONDecodeError as llm_e:
418 | logger.error(f"LLM repair attempt failed. LLM response could not be parsed as JSON: {llm_e}. LLM response (after extraction): '{final_repaired_json_str[:200]}' Original LLM text: '{text_from_llm_repair[:500]}...'")
419 | return {"error": "LLM repair failed to produce valid JSON", "detail": str(llm_e), "llm_response_extracted": final_repaired_json_str, "llm_response_raw": text_from_llm_repair, "original_text": problematic_text_for_repair}
420 | except Exception as repair_ex:
421 | logger.error(f"Exception during LLM repair process: {repair_ex}", exc_info=True)
422 | return {"error": "Exception during LLM repair", "detail": str(repair_ex), "original_text": problematic_text_for_repair}
423 |
424 | except Exception as e: # General error in parse_result
425 | logger.error(f"Critical error in parse_result: {e}", exc_info=True)
426 | return {"error": "Critical error during result parsing", "detail": str(e)}
427 |
428 | async def process_mcp_result(result: Any) -> Dict[str, Any]:
429 | """
430 | Process and normalize results from MCP tool calls into a consistent dictionary format.
431 |
432 | This function serves as a user-friendly interface for handling and normalizing
433 | the various types of results that can be returned from MCP tools and provider operations.
434 | It acts as a bridge between the raw MCP tool outputs and downstream application code
435 | that expects a consistent dictionary structure.
436 |
437 | The function handles multiple return formats:
438 | - TextContent objects with a .text attribute
439 | - List results containing TextContent objects or dictionaries
440 | - Direct dictionary returns
441 | - JSON-like strings embedded in markdown code blocks
442 |
443 | Key features:
444 | - Automatic extraction of JSON from markdown code fences
445 | - JSON repair for malformed or truncated LLM outputs
446 | - Fallback to LLM-based repair for difficult parsing cases
447 | - Consistent error handling and reporting
448 |
449 | This function is especially useful in:
450 | - Handling results from completion tools where LLMs may return JSON in various formats
451 | - Processing tool responses that contain structured data embedded in text
452 | - Creating a consistent interface for downstream processing of MCP tool results
453 | - Simplifying error handling in client applications
454 |
455 | Args:
456 | result: The raw result from an MCP tool call or provider operation, which could
457 | be a TextContent object, a list, a dictionary, or another structure
458 |
459 | Returns:
460 | A dictionary containing either:
461 | - The successfully parsed result data
462 | - An error description with diagnostic information if parsing failed
463 |
464 | Example:
465 | ```python
466 | result = await some_mcp_tool()
467 | processed_data = await process_mcp_result(result)
468 |
469 | # Check for errors in the processed result
470 | if "error" in processed_data:
471 | print(f"Error processing result: {processed_data['error']}")
472 | else:
473 | # Use the normalized data
474 | print(f"Processed data: {processed_data}")
475 | ```
476 | """
477 | return await parse_result(result)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/retriever.py:
--------------------------------------------------------------------------------
```python
1 | """Knowledge base retriever for RAG functionality."""
2 | import time
3 | from typing import Any, Dict, List, Optional
4 |
5 | from ultimate_mcp_server.services.knowledge_base.feedback import get_rag_feedback_service
6 | from ultimate_mcp_server.services.knowledge_base.utils import build_metadata_filter
7 | from ultimate_mcp_server.services.vector import VectorDatabaseService
8 | from ultimate_mcp_server.utils import get_logger
9 |
10 | logger = get_logger(__name__)
11 |
12 |
13 | class KnowledgeBaseRetriever:
14 | """
15 | Advanced retrieval engine for knowledge base collections in RAG applications.
16 |
17 | The KnowledgeBaseRetriever provides sophisticated search capabilities for finding
18 | the most relevant documents within knowledge bases. It offers multiple retrieval
19 | strategies optimized for different search scenarios, from pure semantic vector
20 | search to hybrid approaches combining vector and keyword matching.
21 |
22 | Key Features:
23 | - Multiple retrieval methods (vector, hybrid, keyword)
24 | - Metadata filtering for targeted searches
25 | - Content-based filtering for keyword matching
26 | - Configurable similarity thresholds and relevance scoring
27 | - Feedback mechanisms for continuous retrieval improvement
28 | - Performance monitoring and diagnostics
29 | - Advanced parameter tuning for specialized search needs
30 |
31 | Retrieval Methods:
32 | 1. Vector Search: Uses embeddings for semantic similarity matching
33 | - Best for finding conceptually related content
34 | - Handles paraphrasing and semantic equivalence
35 | - Computationally efficient for large collections
36 |
37 | 2. Hybrid Search: Combines vector and keyword matching with weighted scoring
38 | - Balances semantic understanding with exact term matching
39 | - Addresses vocabulary mismatch problems
40 | - Provides more robust retrieval across diverse query types
41 |
42 | 3. Keyword Filtering: Limits results to those containing specific text
43 | - Used for explicit term presence requirements
44 | - Can be combined with other search methods
45 |
46 | Architecture:
47 | The retriever operates as a higher-level service above the vector database,
48 | working in concert with:
49 | - Embedding services for query vectorization
50 | - Vector database services for efficient similarity search
51 | - Feedback services for result quality improvement
52 | - Metadata filters for context-aware retrieval
53 |
54 | Usage in RAG Applications:
55 | This retriever is a critical component in RAG pipelines, responsible for
56 | the quality and relevance of context provided to LLMs. Tuning retrieval
57 | parameters significantly impacts the quality of generated responses.
58 |
59 | Example Usage:
60 | ```python
61 | # Get retriever instance
62 | retriever = get_knowledge_base_retriever()
63 |
64 | # Simple vector search
65 | results = await retriever.retrieve(
66 | knowledge_base_name="company_policies",
67 | query="What is our remote work policy?",
68 | top_k=3,
69 | min_score=0.7
70 | )
71 |
72 | # Hybrid search with metadata filtering
73 | dept_results = await retriever.retrieve_hybrid(
74 | knowledge_base_name="company_policies",
75 | query="security requirements for customer data",
76 | top_k=5,
77 | vector_weight=0.6,
78 | keyword_weight=0.4,
79 | metadata_filter={"department": "security", "status": "active"}
80 | )
81 |
82 | # Process and use the retrieved documents
83 | for item in results["results"]:
84 | print(f"Document (score: {item['score']:.2f}): {item['document'][:100]}...")
85 | print(f"Source: {item['metadata'].get('source', 'unknown')}")
86 |
87 | # Record which documents were actually useful
88 | await retriever.record_feedback(
89 | knowledge_base_name="company_policies",
90 | query="What is our remote work policy?",
91 | retrieved_documents=results["results"],
92 | used_document_ids=["doc123", "doc456"]
93 | )
94 | ```
95 | """
96 |
97 | def __init__(self, vector_service: VectorDatabaseService):
98 | """Initialize the knowledge base retriever.
99 |
100 | Args:
101 | vector_service: Vector database service for retrieving embeddings
102 | """
103 | self.vector_service = vector_service
104 | self.feedback_service = get_rag_feedback_service()
105 |
106 | # Get embedding service for generating query embeddings
107 | from ultimate_mcp_server.services.vector.embeddings import get_embedding_service
108 | self.embedding_service = get_embedding_service()
109 |
110 | logger.info("Knowledge base retriever initialized", extra={"emoji_key": "success"})
111 |
112 | async def _validate_knowledge_base(self, name: str) -> Dict[str, Any]:
113 | """Validate that a knowledge base exists.
114 |
115 | Args:
116 | name: Knowledge base name
117 |
118 | Returns:
119 | Validation result
120 | """
121 | # Check if knowledge base exists
122 | collections = await self.vector_service.list_collections()
123 |
124 | if name not in collections:
125 | logger.warning(
126 | f"Knowledge base '{name}' not found",
127 | extra={"emoji_key": "warning"}
128 | )
129 | return {"status": "not_found", "name": name}
130 |
131 | # Get metadata
132 | metadata = await self.vector_service.get_collection_metadata(name)
133 |
134 | if metadata.get("type") != "knowledge_base":
135 | logger.warning(
136 | f"Collection '{name}' is not a knowledge base",
137 | extra={"emoji_key": "warning"}
138 | )
139 | return {"status": "not_knowledge_base", "name": name}
140 |
141 | return {
142 | "status": "valid",
143 | "name": name,
144 | "metadata": metadata
145 | }
146 |
147 | async def retrieve(
148 | self,
149 | knowledge_base_name: str,
150 | query: str,
151 | top_k: int = 5,
152 | min_score: float = 0.6,
153 | metadata_filter: Optional[Dict[str, Any]] = None,
154 | content_filter: Optional[str] = None,
155 | embedding_model: Optional[str] = None,
156 | apply_feedback: bool = True,
157 | search_params: Optional[Dict[str, Any]] = None
158 | ) -> Dict[str, Any]:
159 | """Retrieve documents from a knowledge base using vector search.
160 |
161 | Args:
162 | knowledge_base_name: Knowledge base name
163 | query: Query text
164 | top_k: Number of results to return
165 | min_score: Minimum similarity score
166 | metadata_filter: Optional metadata filter (field->value or field->{op:value})
167 | content_filter: Text to search for in documents
168 | embedding_model: Optional embedding model name
169 | apply_feedback: Whether to apply feedback adjustments
170 | search_params: Optional ChromaDB search parameters
171 |
172 | Returns:
173 | Retrieved documents with metadata
174 | """
175 | start_time = time.time()
176 |
177 | # Validate knowledge base
178 | kb_info = await self._validate_knowledge_base(knowledge_base_name)
179 |
180 | if kb_info["status"] != "valid":
181 | logger.warning(
182 | f"Knowledge base '{knowledge_base_name}' not found or invalid",
183 | extra={"emoji_key": "warning"}
184 | )
185 | return {
186 | "status": "error",
187 | "message": f"Knowledge base '{knowledge_base_name}' not found or invalid"
188 | }
189 |
190 | logger.debug(f"DEBUG: Knowledge base validated - metadata: {kb_info['metadata']}")
191 |
192 | # Use the same embedding model that was used to create the knowledge base
193 | if not embedding_model and kb_info["metadata"].get("embedding_model"):
194 | embedding_model = kb_info["metadata"]["embedding_model"]
195 | logger.debug(f"Using embedding model from knowledge base metadata: {embedding_model}")
196 |
197 | # If embedding model is specified, ensure it's saved in the metadata for future use
198 | if embedding_model and not kb_info["metadata"].get("embedding_model"):
199 | try:
200 | await self.vector_service.update_collection_metadata(
201 | name=knowledge_base_name,
202 | metadata={
203 | **kb_info["metadata"],
204 | "embedding_model": embedding_model
205 | }
206 | )
207 | logger.debug(f"Updated knowledge base metadata with embedding model: {embedding_model}")
208 | except Exception as e:
209 | logger.warning(f"Failed to update knowledge base metadata with embedding model: {str(e)}")
210 |
211 | # Get or create ChromaDB collection
212 | collection = await self.vector_service.get_collection(knowledge_base_name)
213 | logger.debug(f"DEBUG: Retrieved collection type: {type(collection)}")
214 |
215 | # Set search parameters if provided
216 | if search_params:
217 | await self.vector_service.update_collection_metadata(
218 | collection_name=knowledge_base_name,
219 | metadata={
220 | **kb_info["metadata"],
221 | **{f"hnsw:{k}": v for k, v in search_params.items()}
222 | }
223 | )
224 |
225 | # Create includes parameter
226 | includes = ["documents", "metadatas", "distances"]
227 |
228 | # Create where_document parameter for content filtering
229 | where_document = {"$contains": content_filter} if content_filter else None
230 |
231 | # Convert metadata filter format if provided
232 | chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None
233 |
234 | logger.debug(f"DEBUG: Search parameters - top_k: {top_k}, min_score: {min_score}, filter: {chroma_filter}, where_document: {where_document}")
235 |
236 | try:
237 | # Generate embedding directly with our embedding service
238 | # Call create_embeddings with a list and get the first result
239 | query_embeddings = await self.embedding_service.create_embeddings(
240 | texts=[query],
241 | # model=embedding_model # Model is set during service init
242 | )
243 | if not query_embeddings:
244 | logger.error(f"Failed to generate embedding for query: {query}")
245 | return { "status": "error", "message": "Failed to generate query embedding" }
246 | query_embedding = query_embeddings[0]
247 |
248 | logger.debug(f"Generated query embedding with model: {self.embedding_service.model_name}, dimension: {len(query_embedding)}")
249 |
250 | # Use correct query method based on collection type
251 | if hasattr(collection, 'query') and not hasattr(collection, 'search_by_text'):
252 | # ChromaDB collection
253 | logger.debug("Using ChromaDB direct query with embeddings")
254 | try:
255 | search_results = collection.query(
256 | query_embeddings=[query_embedding], # Use our embedding directly
257 | n_results=top_k * 2,
258 | where=chroma_filter,
259 | where_document=where_document,
260 | include=includes
261 | )
262 | except Exception as e:
263 | logger.error(f"ChromaDB query error: {str(e)}")
264 | raise
265 | else:
266 | # Our custom VectorCollection
267 | logger.debug("Using VectorCollection search method")
268 | search_results = await collection.query(
269 | query_texts=[query],
270 | n_results=top_k * 2,
271 | where=chroma_filter,
272 | where_document=where_document,
273 | include=includes,
274 | embedding_model=embedding_model
275 | )
276 |
277 | # Debug raw results
278 | logger.debug(f"DEBUG: Raw search results - keys: {search_results.keys()}")
279 | logger.debug(f"DEBUG: Documents count: {len(search_results.get('documents', [[]])[0])}")
280 | logger.debug(f"DEBUG: IDs: {search_results.get('ids', [[]])[0]}")
281 | logger.debug(f"DEBUG: Distances: {search_results.get('distances', [[]])[0]}")
282 |
283 | # Process results
284 | results = []
285 | for i, doc in enumerate(search_results["documents"][0]):
286 | # Convert distance to similarity score (1 = exact match, 0 = completely different)
287 | # Most distance metrics return 0 for exact match, so we use 1 - distance
288 | # This works for cosine, l2, etc.
289 | similarity = 1.0 - float(search_results["distances"][0][i])
290 |
291 | # Debug each document
292 | logger.debug(f"DEBUG: Document {i} - ID: {search_results['ids'][0][i]}")
293 | logger.debug(f"DEBUG: Similarity: {similarity} (min required: {min_score})")
294 | logger.debug(f"DEBUG: Document content (first 100 chars): {doc[:100] if doc else 'Empty'}")
295 |
296 | if search_results["metadatas"] and i < len(search_results["metadatas"][0]):
297 | metadata = search_results["metadatas"][0][i]
298 | logger.debug(f"DEBUG: Metadata: {metadata}")
299 |
300 | # Skip results below minimum score
301 | if similarity < min_score:
302 | logger.debug(f"DEBUG: Skipping document {i} due to low similarity: {similarity} < {min_score}")
303 | continue
304 |
305 | results.append({
306 | "id": search_results["ids"][0][i],
307 | "document": doc,
308 | "metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {},
309 | "score": similarity
310 | })
311 |
312 | logger.debug(f"DEBUG: After filtering, {len(results)} documents remain.")
313 |
314 | # Apply feedback adjustments if requested
315 | if apply_feedback:
316 | results = await self.feedback_service.apply_feedback_adjustments(
317 | knowledge_base_name=knowledge_base_name,
318 | results=results,
319 | query=query
320 | )
321 |
322 | # Limit to top_k
323 | results = results[:top_k]
324 |
325 | # Track retrieval time
326 | retrieval_time = time.time() - start_time
327 |
328 | logger.info(
329 | f"Retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s",
330 | extra={"emoji_key": "success"}
331 | )
332 |
333 | return {
334 | "status": "success",
335 | "query": query,
336 | "results": results,
337 | "count": len(results),
338 | "retrieval_time": retrieval_time
339 | }
340 |
341 | except Exception as e:
342 | logger.error(
343 | f"Error retrieving from knowledge base '{knowledge_base_name}': {str(e)}",
344 | extra={"emoji_key": "error"}
345 | )
346 |
347 | return {
348 | "status": "error",
349 | "message": str(e)
350 | }
351 |
352 | async def retrieve_hybrid(
353 | self,
354 | knowledge_base_name: str,
355 | query: str,
356 | top_k: int = 5,
357 | vector_weight: float = 0.7,
358 | keyword_weight: float = 0.3,
359 | min_score: float = 0.6,
360 | metadata_filter: Optional[Dict[str, Any]] = None,
361 | additional_keywords: Optional[List[str]] = None,
362 | apply_feedback: bool = True,
363 | search_params: Optional[Dict[str, Any]] = None
364 | ) -> Dict[str, Any]:
365 | """Retrieve documents using hybrid search.
366 |
367 | Args:
368 | knowledge_base_name: Knowledge base name
369 | query: Query text
370 | top_k: Number of documents to retrieve
371 | vector_weight: Weight for vector search component
372 | keyword_weight: Weight for keyword search component
373 | min_score: Minimum similarity score
374 | metadata_filter: Optional metadata filter
375 | additional_keywords: Additional keywords to include in search
376 | apply_feedback: Whether to apply feedback adjustments
377 | search_params: Optional ChromaDB search parameters
378 |
379 | Returns:
380 | Retrieved documents with metadata
381 | """
382 | start_time = time.time()
383 |
384 | # Validate knowledge base
385 | kb_info = await self._validate_knowledge_base(knowledge_base_name)
386 |
387 | if kb_info["status"] != "valid":
388 | logger.warning(
389 | f"Knowledge base '{knowledge_base_name}' not found or invalid",
390 | extra={"emoji_key": "warning"}
391 | )
392 | return {
393 | "status": "error",
394 | "message": f"Knowledge base '{knowledge_base_name}' not found or invalid"
395 | }
396 |
397 | # Get or create ChromaDB collection
398 | collection = await self.vector_service.get_collection(knowledge_base_name)
399 |
400 | # Set search parameters if provided
401 | if search_params:
402 | await self.vector_service.update_collection_metadata(
403 | collection_name=knowledge_base_name,
404 | metadata={
405 | **kb_info["metadata"],
406 | **{f"hnsw:{k}": v for k, v in search_params.items()}
407 | }
408 | )
409 |
410 | # Convert metadata filter format if provided
411 | chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None
412 |
413 | # Create content filter based on query and additional keywords
414 | content_text = query
415 | if additional_keywords:
416 | content_text = f"{query} {' '.join(additional_keywords)}"
417 |
418 | # Use ChromaDB's hybrid search by providing both query text and content filter
419 | try:
420 | # Vector search results with content filter
421 | search_results = await collection.query(
422 | query_texts=[query],
423 | n_results=top_k * 3, # Get more results for combining
424 | where=chroma_filter,
425 | where_document={"$contains": content_text} if content_text else None,
426 | include=["documents", "metadatas", "distances"],
427 | embedding_model=None # Use default embedding model
428 | )
429 |
430 | # Process results
431 | combined_results = {}
432 |
433 | # Process vector search results
434 | for i, doc in enumerate(search_results["documents"][0]):
435 | doc_id = search_results["ids"][0][i]
436 | vector_score = 1.0 - float(search_results["distances"][0][i])
437 |
438 | combined_results[doc_id] = {
439 | "id": doc_id,
440 | "document": doc,
441 | "metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {},
442 | "vector_score": vector_score,
443 | "keyword_score": 0.0,
444 | "score": vector_score * vector_weight
445 | }
446 |
447 | # Now do a keyword-only search if we have keywords component
448 | if keyword_weight > 0:
449 | keyword_results = await collection.query(
450 | query_texts=None, # No vector query
451 | n_results=top_k * 3,
452 | where=chroma_filter,
453 | where_document={"$contains": content_text},
454 | include=["documents", "metadatas"],
455 | embedding_model=None # No embedding model needed for keyword-only search
456 | )
457 |
458 | # Process keyword results
459 | for i, doc in enumerate(keyword_results["documents"][0]):
460 | doc_id = keyword_results["ids"][0][i]
461 | # Approximate keyword score based on position (best = 1.0)
462 | keyword_score = 1.0 - (i / len(keyword_results["documents"][0]))
463 |
464 | if doc_id in combined_results:
465 | # Update existing result
466 | combined_results[doc_id]["keyword_score"] = keyword_score
467 | combined_results[doc_id]["score"] += keyword_score * keyword_weight
468 | else:
469 | # Add new result
470 | combined_results[doc_id] = {
471 | "id": doc_id,
472 | "document": doc,
473 | "metadata": keyword_results["metadatas"][0][i] if keyword_results["metadatas"] else {},
474 | "vector_score": 0.0,
475 | "keyword_score": keyword_score,
476 | "score": keyword_score * keyword_weight
477 | }
478 |
479 | # Convert to list and filter by min_score
480 | results = [r for r in combined_results.values() if r["score"] >= min_score]
481 |
482 | # Apply feedback adjustments if requested
483 | if apply_feedback:
484 | results = await self.feedback_service.apply_feedback_adjustments(
485 | knowledge_base_name=knowledge_base_name,
486 | results=results,
487 | query=query
488 | )
489 |
490 | # Sort by score and limit to top_k
491 | results.sort(key=lambda x: x["score"], reverse=True)
492 | results = results[:top_k]
493 |
494 | # Track retrieval time
495 | retrieval_time = time.time() - start_time
496 |
497 | logger.info(
498 | f"Hybrid search retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s",
499 | extra={"emoji_key": "success"}
500 | )
501 |
502 | return {
503 | "status": "success",
504 | "query": query,
505 | "results": results,
506 | "count": len(results),
507 | "retrieval_time": retrieval_time
508 | }
509 |
510 | except Exception as e:
511 | logger.error(
512 | f"Error performing hybrid search on '{knowledge_base_name}': {str(e)}",
513 | extra={"emoji_key": "error"}
514 | )
515 |
516 | return {
517 | "status": "error",
518 | "message": str(e)
519 | }
520 |
521 | async def record_feedback(
522 | self,
523 | knowledge_base_name: str,
524 | query: str,
525 | retrieved_documents: List[Dict[str, Any]],
526 | used_document_ids: Optional[List[str]] = None,
527 | explicit_feedback: Optional[Dict[str, str]] = None
528 | ) -> Dict[str, Any]:
529 | """Record feedback for retrieval results.
530 |
531 | Args:
532 | knowledge_base_name: Knowledge base name
533 | query: Query text
534 | retrieved_documents: List of retrieved documents
535 | used_document_ids: List of document IDs used in the response
536 | explicit_feedback: Explicit feedback for documents
537 |
538 | Returns:
539 | Feedback recording result
540 | """
541 | # Convert list to set if provided
542 | used_ids_set = set(used_document_ids) if used_document_ids else None
543 |
544 | # Record feedback
545 | result = await self.feedback_service.record_retrieval_feedback(
546 | knowledge_base_name=knowledge_base_name,
547 | query=query,
548 | retrieved_documents=retrieved_documents,
549 | used_document_ids=used_ids_set,
550 | explicit_feedback=explicit_feedback
551 | )
552 |
553 | return result
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/gemini.py:
--------------------------------------------------------------------------------
```python
1 | """Google Gemini provider implementation."""
2 | import time
3 | from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
4 |
5 | from google import genai
6 |
7 | from ultimate_mcp_server.constants import Provider
8 | from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
9 | from ultimate_mcp_server.utils import get_logger
10 |
11 | # Use the same naming scheme everywhere: logger at module level
12 | logger = get_logger("ultimate_mcp_server.providers.gemini")
13 |
14 |
15 | class GeminiProvider(BaseProvider):
16 | """Provider implementation for Google Gemini API."""
17 |
18 | provider_name = Provider.GEMINI.value
19 |
20 | def __init__(self, api_key: Optional[str] = None, **kwargs):
21 | """Initialize the Gemini provider.
22 |
23 | Args:
24 | api_key: Google API key
25 | **kwargs: Additional options
26 | """
27 | super().__init__(api_key=api_key, **kwargs)
28 | self.models_cache = None
29 |
30 | async def initialize(self) -> bool:
31 | """Initialize the Gemini client.
32 |
33 | Returns:
34 | bool: True if initialization was successful
35 | """
36 | try:
37 | # Skip real API calls if using mock key for tests
38 | if self.api_key and "mock-" in self.api_key:
39 | self.logger.info(
40 | "Using mock Gemini key - skipping API initialization",
41 | emoji_key="mock"
42 | )
43 | self.client = {"mock_client": True}
44 | return True
45 |
46 | # Create a client instance instead of configuring globally
47 | self.client = genai.Client(
48 | api_key=self.api_key,
49 | http_options={"api_version": "v1alpha"}
50 | )
51 |
52 | self.logger.success(
53 | "Gemini provider initialized successfully",
54 | emoji_key="provider"
55 | )
56 | return True
57 |
58 | except Exception as e:
59 | self.logger.error(
60 | f"Failed to initialize Gemini provider: {str(e)}",
61 | emoji_key="error"
62 | )
63 | return False
64 |
65 | async def generate_completion(
66 | self,
67 | prompt: Optional[str] = None,
68 | messages: Optional[List[Dict[str, Any]]] = None,
69 | model: Optional[str] = None,
70 | max_tokens: Optional[int] = None,
71 | temperature: float = 0.7,
72 | **kwargs
73 | ) -> ModelResponse:
74 | """Generate a completion using Google Gemini.
75 |
76 | Args:
77 | prompt: Text prompt to send to the model (or None if messages provided)
78 | messages: List of message dictionaries (alternative to prompt)
79 | model: Model name to use (e.g., "gemini-2.0-flash-lite")
80 | max_tokens: Maximum tokens to generate
81 | temperature: Temperature parameter (0.0-1.0)
82 | **kwargs: Additional model-specific parameters
83 |
84 | Returns:
85 | ModelResponse: Standardized response
86 |
87 | Raises:
88 | Exception: If API call fails
89 | """
90 | if not self.client:
91 | await self.initialize()
92 |
93 | # Use default model if not specified
94 | model = model or self.get_default_model()
95 |
96 | # Strip provider prefix if present (e.g., "gemini:gemini-2.0-pro" -> "gemini-2.0-pro")
97 | if ":" in model:
98 | original_model = model
99 | model = model.split(":", 1)[1]
100 | self.logger.debug(f"Stripped provider prefix from model name: {original_model} -> {model}")
101 |
102 | # Validate that either prompt or messages is provided
103 | if prompt is None and not messages:
104 | raise ValueError("Either 'prompt' or 'messages' must be provided")
105 |
106 | # Prepare generation config and API call kwargs
107 | config = {
108 | "temperature": temperature,
109 | }
110 | if max_tokens is not None:
111 | config["max_output_tokens"] = max_tokens # Gemini uses max_output_tokens
112 |
113 | # Pop json_mode flag
114 | json_mode = kwargs.pop("json_mode", False)
115 |
116 | # Set up JSON mode in config dict per Gemini API docs
117 | if json_mode:
118 | # For Gemini, JSON mode is set via response_mime_type in the config dict
119 | config["response_mime_type"] = "application/json"
120 | self.logger.debug("Setting response_mime_type to application/json for Gemini in config")
121 |
122 | # Add remaining kwargs to config
123 | for key in list(kwargs.keys()):
124 | if key in ["top_p", "top_k", "candidate_count", "stop_sequences"]:
125 | config[key] = kwargs.pop(key)
126 |
127 | # Store other kwargs that might need to be passed directly
128 | request_params = {}
129 | for key in list(kwargs.keys()):
130 | if key in ["safety_settings", "tools", "system"]:
131 | request_params[key] = kwargs.pop(key)
132 |
133 | # Prepare content based on input type (prompt or messages)
134 | content = None
135 | if prompt:
136 | content = prompt
137 | log_input_size = len(prompt)
138 | elif messages:
139 | # Convert messages to Gemini format
140 | content = []
141 | log_input_size = 0
142 | for msg in messages:
143 | role = msg.get("role", "").lower()
144 | text = msg.get("content", "")
145 | log_input_size += len(text)
146 |
147 | # Map roles to Gemini's expectations
148 | if role == "system":
149 | # For system messages, prepend to user input or add as user message
150 | system_text = text
151 | # Find the next user message to prepend to
152 | for _i, future_msg in enumerate(messages[messages.index(msg) + 1:], messages.index(msg) + 1):
153 | if future_msg.get("role", "").lower() == "user":
154 | # Leave this system message to be handled when we reach the user message
155 | # Just track its content for now
156 | break
157 | else:
158 | # No user message found after system, add as separate user message
159 | content.append({"role": "user", "parts": [{"text": system_text}]})
160 | continue
161 |
162 | elif role == "user":
163 | # Check if previous message was a system message
164 | prev_system_text = ""
165 | if messages.index(msg) > 0:
166 | prev_msg = messages[messages.index(msg) - 1]
167 | if prev_msg.get("role", "").lower() == "system":
168 | prev_system_text = prev_msg.get("content", "")
169 |
170 | # If there was a system message before, prepend it to the user message
171 | if prev_system_text:
172 | gemini_role = "user"
173 | gemini_text = f"{prev_system_text}\n\n{text}"
174 | else:
175 | gemini_role = "user"
176 | gemini_text = text
177 |
178 | elif role == "assistant":
179 | gemini_role = "model"
180 | gemini_text = text
181 | else:
182 | self.logger.warning(f"Unsupported message role '{role}', treating as user")
183 | gemini_role = "user"
184 | gemini_text = text
185 |
186 | content.append({"role": gemini_role, "parts": [{"text": gemini_text}]})
187 |
188 | # Log request
189 | self.logger.info(
190 | f"Generating completion with Gemini model {model}",
191 | emoji_key=self.provider_name,
192 | prompt_length=log_input_size,
193 | json_mode_requested=json_mode
194 | )
195 |
196 | start_time = time.time()
197 |
198 | try:
199 | # Check if we're using a mock client for testing
200 | if isinstance(self.client, dict) and self.client.get("mock_client"):
201 | # Return mock response for tests
202 | completion_text = "Mock Gemini response for testing"
203 | processing_time = 0.1
204 | response = None
205 | else:
206 | # Pass everything in the correct structure according to the API
207 | if isinstance(content, list): # messages format
208 | response = self.client.models.generate_content(
209 | model=model,
210 | contents=content,
211 | config=config, # Pass config dict containing temperature, max_output_tokens, etc.
212 | **request_params # Pass other params directly if needed
213 | )
214 | else: # prompt format (string)
215 | response = self.client.models.generate_content(
216 | model=model,
217 | contents=content,
218 | config=config, # Pass config dict containing temperature, max_output_tokens, etc.
219 | **request_params # Pass other params directly if needed
220 | )
221 |
222 | processing_time = time.time() - start_time
223 |
224 | # Extract response text
225 | completion_text = response.text
226 |
227 | # Estimate token usage (Gemini doesn't provide token counts)
228 | # Roughly 4 characters per token as a crude approximation
229 | char_to_token_ratio = 4.0
230 | estimated_input_tokens = log_input_size / char_to_token_ratio
231 | estimated_output_tokens = len(completion_text) / char_to_token_ratio
232 |
233 | # Create standardized response
234 | result = ModelResponse(
235 | text=completion_text,
236 | model=model,
237 | provider=self.provider_name,
238 | input_tokens=int(estimated_input_tokens),
239 | output_tokens=int(estimated_output_tokens),
240 | processing_time=processing_time,
241 | raw_response=None, # Don't need raw response for tests
242 | metadata={"token_count_estimated": True}
243 | )
244 |
245 | # Add message for consistency with other providers
246 | result.message = {"role": "assistant", "content": completion_text}
247 |
248 | # Log success
249 | self.logger.success(
250 | "Gemini completion successful",
251 | emoji_key="success",
252 | model=model,
253 | tokens={
254 | "input": result.input_tokens,
255 | "output": result.output_tokens
256 | },
257 | cost=result.cost,
258 | time=result.processing_time
259 | )
260 |
261 | return result
262 |
263 | except Exception as e:
264 | self.logger.error(
265 | f"Gemini completion failed: {str(e)}",
266 | emoji_key="error",
267 | model=model
268 | )
269 | raise
270 |
271 | async def generate_completion_stream(
272 | self,
273 | prompt: Optional[str] = None,
274 | messages: Optional[List[Dict[str, Any]]] = None,
275 | model: Optional[str] = None,
276 | max_tokens: Optional[int] = None,
277 | temperature: float = 0.7,
278 | **kwargs
279 | ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
280 | """Generate a streaming completion using Google Gemini.
281 |
282 | Args:
283 | prompt: Text prompt to send to the model (or None if messages provided)
284 | messages: List of message dictionaries (alternative to prompt)
285 | model: Model name to use (e.g., "gemini-2.0-flash-lite")
286 | max_tokens: Maximum tokens to generate
287 | temperature: Temperature parameter (0.0-1.0)
288 | **kwargs: Additional model-specific parameters
289 |
290 | Yields:
291 | Tuple of (text_chunk, metadata)
292 |
293 | Raises:
294 | Exception: If API call fails
295 | """
296 | if not self.client:
297 | await self.initialize()
298 |
299 | # Use default model if not specified
300 | model = model or self.get_default_model()
301 |
302 | # Strip provider prefix if present (e.g., "gemini:gemini-2.0-pro" -> "gemini-2.0-pro")
303 | if ":" in model:
304 | original_model = model
305 | model = model.split(":", 1)[1]
306 | self.logger.debug(f"Stripped provider prefix from model name (stream): {original_model} -> {model}")
307 |
308 | # Validate that either prompt or messages is provided
309 | if prompt is None and not messages:
310 | raise ValueError("Either 'prompt' or 'messages' must be provided")
311 |
312 | # Prepare config dict per Gemini API
313 | config = {
314 | "temperature": temperature,
315 | }
316 | if max_tokens is not None:
317 | config["max_output_tokens"] = max_tokens
318 |
319 | # Pop json_mode flag
320 | json_mode = kwargs.pop("json_mode", False)
321 |
322 | # Set up JSON mode in config dict
323 | if json_mode:
324 | # For Gemini, JSON mode is set via response_mime_type in the config dict
325 | config["response_mime_type"] = "application/json"
326 | self.logger.debug("Setting response_mime_type to application/json for Gemini streaming in config")
327 |
328 | # Add remaining kwargs to config
329 | for key in list(kwargs.keys()):
330 | if key in ["top_p", "top_k", "candidate_count", "stop_sequences"]:
331 | config[key] = kwargs.pop(key)
332 |
333 | # Store other kwargs that might need to be passed directly
334 | request_params = {}
335 | for key in list(kwargs.keys()):
336 | if key in ["safety_settings", "tools", "system"]:
337 | request_params[key] = kwargs.pop(key)
338 |
339 | # Prepare content based on input type (prompt or messages)
340 | content = None
341 | if prompt:
342 | content = prompt
343 | log_input_size = len(prompt)
344 | elif messages:
345 | # Convert messages to Gemini format
346 | content = []
347 | log_input_size = 0
348 | for msg in messages:
349 | role = msg.get("role", "").lower()
350 | text = msg.get("content", "")
351 | log_input_size += len(text)
352 |
353 | # Map roles to Gemini's expectations
354 | if role == "system":
355 | # For system messages, prepend to user input or add as user message
356 | system_text = text
357 | # Find the next user message to prepend to
358 | for _i, future_msg in enumerate(messages[messages.index(msg) + 1:], messages.index(msg) + 1):
359 | if future_msg.get("role", "").lower() == "user":
360 | # Leave this system message to be handled when we reach the user message
361 | # Just track its content for now
362 | break
363 | else:
364 | # No user message found after system, add as separate user message
365 | content.append({"role": "user", "parts": [{"text": system_text}]})
366 | continue
367 |
368 | elif role == "user":
369 | # Check if previous message was a system message
370 | prev_system_text = ""
371 | if messages.index(msg) > 0:
372 | prev_msg = messages[messages.index(msg) - 1]
373 | if prev_msg.get("role", "").lower() == "system":
374 | prev_system_text = prev_msg.get("content", "")
375 |
376 | # If there was a system message before, prepend it to the user message
377 | if prev_system_text:
378 | gemini_role = "user"
379 | gemini_text = f"{prev_system_text}\n\n{text}"
380 | else:
381 | gemini_role = "user"
382 | gemini_text = text
383 |
384 | elif role == "assistant":
385 | gemini_role = "model"
386 | gemini_text = text
387 | else:
388 | self.logger.warning(f"Unsupported message role '{role}', treating as user")
389 | gemini_role = "user"
390 | gemini_text = text
391 |
392 | content.append({"role": gemini_role, "parts": [{"text": gemini_text}]})
393 |
394 | # Log request
395 | self.logger.info(
396 | f"Generating streaming completion with Gemini model {model}",
397 | emoji_key=self.provider_name,
398 | input_type=f"{'prompt' if prompt else 'messages'} ({log_input_size} chars)",
399 | json_mode_requested=json_mode
400 | )
401 |
402 | start_time = time.time()
403 | total_chunks = 0
404 |
405 | try:
406 | # Use the dedicated streaming method as per Google's documentation
407 | try:
408 | if isinstance(content, list): # messages format
409 | stream_response = self.client.models.generate_content_stream(
410 | model=model,
411 | contents=content,
412 | config=config,
413 | **request_params
414 | )
415 | else: # prompt format (string)
416 | stream_response = self.client.models.generate_content_stream(
417 | model=model,
418 | contents=content,
419 | config=config,
420 | **request_params
421 | )
422 |
423 | # Process the stream - iterating over chunks
424 | async def iterate_response():
425 | # Convert sync iterator to async
426 | for chunk in stream_response:
427 | yield chunk
428 |
429 | async for chunk in iterate_response():
430 | total_chunks += 1
431 |
432 | # Extract text from the chunk
433 | chunk_text = ""
434 | if hasattr(chunk, 'text'):
435 | chunk_text = chunk.text
436 | elif hasattr(chunk, 'candidates') and chunk.candidates:
437 | if hasattr(chunk.candidates[0], 'content') and chunk.candidates[0].content:
438 | if hasattr(chunk.candidates[0].content, 'parts') and chunk.candidates[0].content.parts:
439 | chunk_text = chunk.candidates[0].content.parts[0].text
440 |
441 | # Metadata for this chunk
442 | metadata = {
443 | "model": model,
444 | "provider": self.provider_name,
445 | "chunk_index": total_chunks,
446 | }
447 |
448 | yield chunk_text, metadata
449 |
450 | # Log success
451 | processing_time = time.time() - start_time
452 | self.logger.success(
453 | "Gemini streaming completion successful",
454 | emoji_key="success",
455 | model=model,
456 | chunks=total_chunks,
457 | time=processing_time
458 | )
459 |
460 | # Yield final metadata chunk
461 | yield "", {
462 | "model": model,
463 | "provider": self.provider_name,
464 | "chunk_index": total_chunks + 1,
465 | "processing_time": processing_time,
466 | "finish_reason": "stop", # Gemini doesn't provide this directly
467 | }
468 |
469 | except (AttributeError, TypeError) as e:
470 | # If streaming isn't supported, fall back to non-streaming
471 | self.logger.warning(f"Streaming not supported for current Gemini API: {e}. Falling back to non-streaming.")
472 |
473 | # Call generate_completion and yield the entire result as one chunk
474 | completion = await self.generate_completion(
475 | prompt=prompt,
476 | messages=messages,
477 | model=model,
478 | max_tokens=max_tokens,
479 | temperature=temperature,
480 | json_mode=json_mode,
481 | **kwargs
482 | )
483 |
484 | yield completion.text, {
485 | "model": model,
486 | "provider": self.provider_name,
487 | "chunk_index": 1,
488 | "is_fallback": True
489 | }
490 | total_chunks = 1
491 |
492 | # Skip the rest of the streaming logic
493 | raise StopAsyncIteration() from e
494 |
495 | except Exception as e:
496 | self.logger.error(
497 | f"Gemini streaming completion failed: {str(e)}",
498 | emoji_key="error",
499 | model=model
500 | )
501 | # Yield error info in final chunk
502 | yield "", {
503 | "model": model,
504 | "provider": self.provider_name,
505 | "chunk_index": total_chunks + 1,
506 | "error": f"{type(e).__name__}: {str(e)}",
507 | "processing_time": time.time() - start_time,
508 | "finish_reason": "error"
509 | }
510 |
511 | async def list_models(self) -> List[Dict[str, Any]]:
512 | """List available Gemini models.
513 |
514 | Returns:
515 | List of model information dictionaries
516 | """
517 | # Gemini doesn't have a comprehensive models endpoint, so we return a static list
518 | if self.models_cache:
519 | return self.models_cache
520 |
521 | models = [
522 | {
523 | "id": "gemini-2.0-flash-lite",
524 | "provider": self.provider_name,
525 | "description": "Fastest and most efficient Gemini model",
526 | },
527 | {
528 | "id": "gemini-2.0-flash",
529 | "provider": self.provider_name,
530 | "description": "Fast Gemini model with good quality",
531 | },
532 | {
533 | "id": "gemini-2.0-pro",
534 | "provider": self.provider_name,
535 | "description": "More capable Gemini model",
536 | },
537 | {
538 | "id": "gemini-2.5-pro-preview-03-25",
539 | "provider": self.provider_name,
540 | "description": "Most capable Gemini model",
541 | },
542 | ]
543 |
544 | # Cache results
545 | self.models_cache = models
546 |
547 | return models
548 |
549 | def get_default_model(self) -> str:
550 | """Get the default Gemini model.
551 |
552 | Returns:
553 | Default model name
554 | """
555 | from ultimate_mcp_server.config import get_config
556 |
557 | # Safely get from config if available
558 | try:
559 | config = get_config()
560 | provider_config = getattr(config, 'providers', {}).get(self.provider_name, None)
561 | if provider_config and provider_config.default_model:
562 | return provider_config.default_model
563 | except (AttributeError, TypeError):
564 | # Handle case when providers attribute doesn't exist or isn't a dict
565 | pass
566 |
567 | # Otherwise return hard-coded default
568 | return "gemini-2.0-flash-lite"
569 |
570 | async def check_api_key(self) -> bool:
571 | """Check if the Gemini API key is valid.
572 |
573 | Returns:
574 | bool: True if API key is valid
575 | """
576 | try:
577 | # Try listing models to validate the API key
578 | # Use the client's models API to check if API key is valid
579 | self.client.models.list()
580 | return True
581 | except Exception:
582 | return False
```
--------------------------------------------------------------------------------
/mcp_python_lib_docs.md:
--------------------------------------------------------------------------------
```markdown
1 | # MCP Python SDK
2 |
3 | <div align="center">
4 |
5 | <strong>Python implementation of the Model Context Protocol (MCP)</strong>
6 |
7 | [![PyPI][pypi-badge]][pypi-url]
8 | [![MIT licensed][mit-badge]][mit-url]
9 | [![Python Version][python-badge]][python-url]
10 | [![Documentation][docs-badge]][docs-url]
11 | [![Specification][spec-badge]][spec-url]
12 | [![GitHub Discussions][discussions-badge]][discussions-url]
13 |
14 | </div>
15 |
16 | <!-- omit in toc -->
17 | ## Table of Contents
18 |
19 | - [MCP Python SDK](#mcp-python-sdk)
20 | - [Overview](#overview)
21 | - [Installation](#installation)
22 | - [Adding MCP to your python project](#adding-mcp-to-your-python-project)
23 | - [Running the standalone MCP development tools](#running-the-standalone-mcp-development-tools)
24 | - [Quickstart](#quickstart)
25 | - [What is MCP?](#what-is-mcp)
26 | - [Core Concepts](#core-concepts)
27 | - [Server](#server)
28 | - [Resources](#resources)
29 | - [Tools](#tools)
30 | - [Prompts](#prompts)
31 | - [Images](#images)
32 | - [Context](#context)
33 | - [Running Your Server](#running-your-server)
34 | - [Development Mode](#development-mode)
35 | - [Claude Desktop Integration](#claude-desktop-integration)
36 | - [Direct Execution](#direct-execution)
37 | - [Mounting to an Existing ASGI Server](#mounting-to-an-existing-asgi-server)
38 | - [Examples](#examples)
39 | - [Echo Server](#echo-server)
40 | - [SQLite Explorer](#sqlite-explorer)
41 | - [Advanced Usage](#advanced-usage)
42 | - [Low-Level Server](#low-level-server)
43 | - [Writing MCP Clients](#writing-mcp-clients)
44 | - [MCP Primitives](#mcp-primitives)
45 | - [Server Capabilities](#server-capabilities)
46 | - [Documentation](#documentation)
47 | - [Contributing](#contributing)
48 | - [License](#license)
49 |
50 | [pypi-badge]: https://img.shields.io/pypi/v/mcp.svg
51 | [pypi-url]: https://pypi.org/project/mcp/
52 | [mit-badge]: https://img.shields.io/pypi/l/mcp.svg
53 | [mit-url]: https://github.com/modelcontextprotocol/python-sdk/blob/main/LICENSE
54 | [python-badge]: https://img.shields.io/pypi/pyversions/mcp.svg
55 | [python-url]: https://www.python.org/downloads/
56 | [docs-badge]: https://img.shields.io/badge/docs-modelcontextprotocol.io-blue.svg
57 | [docs-url]: https://modelcontextprotocol.io
58 | [spec-badge]: https://img.shields.io/badge/spec-spec.modelcontextprotocol.io-blue.svg
59 | [spec-url]: https://spec.modelcontextprotocol.io
60 | [discussions-badge]: https://img.shields.io/github/discussions/modelcontextprotocol/python-sdk
61 | [discussions-url]: https://github.com/modelcontextprotocol/python-sdk/discussions
62 |
63 | ## Overview
64 |
65 | The Model Context Protocol allows applications to provide context for LLMs in a standardized way, separating the concerns of providing context from the actual LLM interaction. This Python SDK implements the full MCP specification, making it easy to:
66 |
67 | - Build MCP clients that can connect to any MCP server
68 | - Create MCP servers that expose resources, prompts and tools
69 | - Use standard transports like stdio and SSE
70 | - Handle all MCP protocol messages and lifecycle events
71 |
72 | ## Installation
73 |
74 | ### Adding MCP to your python project
75 |
76 | We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects. In a uv managed python project, add mcp to dependencies by:
77 |
78 | ```bash
79 | uv add "mcp[cli]"
80 | ```
81 |
82 | Alternatively, for projects using pip for dependencies:
83 | ```bash
84 | pip install mcp
85 | ```
86 |
87 | ### Running the standalone MCP development tools
88 |
89 | To run the mcp command with uv:
90 |
91 | ```bash
92 | uv run mcp
93 | ```
94 |
95 | ## Quickstart
96 |
97 | Let's create a simple MCP server that exposes a calculator tool and some data:
98 |
99 | ```python
100 | # server.py
101 | from mcp.server.fastmcp import FastMCP
102 |
103 | # Create an MCP server
104 | mcp = FastMCP("Demo")
105 |
106 |
107 | # Add an addition tool
108 | @mcp.tool()
109 | def add(a: int, b: int) -> int:
110 | """Add two numbers"""
111 | return a + b
112 |
113 |
114 | # Add a dynamic greeting resource
115 | @mcp.resource("greeting://{name}")
116 | def get_greeting(name: str) -> str:
117 | """Get a personalized greeting"""
118 | return f"Hello, {name}!"
119 | ```
120 |
121 | You can install this server in [Claude Desktop](https://claude.ai/download) and interact with it right away by running:
122 | ```bash
123 | mcp install server.py
124 | ```
125 |
126 | Alternatively, you can test it with the MCP Inspector:
127 | ```bash
128 | mcp dev server.py
129 | ```
130 |
131 | ## What is MCP?
132 |
133 | The [Model Context Protocol (MCP)](https://modelcontextprotocol.io) lets you build servers that expose data and functionality to LLM applications in a secure, standardized way. Think of it like a web API, but specifically designed for LLM interactions. MCP servers can:
134 |
135 | - Expose data through **Resources** (think of these sort of like GET endpoints; they are used to load information into the LLM's context)
136 | - Provide functionality through **Tools** (sort of like POST endpoints; they are used to execute code or otherwise produce a side effect)
137 | - Define interaction patterns through **Prompts** (reusable templates for LLM interactions)
138 | - And more!
139 |
140 | ## Core Concepts
141 |
142 | ### Server
143 |
144 | The FastMCP server is your core interface to the MCP protocol. It handles connection management, protocol compliance, and message routing:
145 |
146 | ```python
147 | # Add lifespan support for startup/shutdown with strong typing
148 | from contextlib import asynccontextmanager
149 | from collections.abc import AsyncIterator
150 | from dataclasses import dataclass
151 |
152 | from fake_database import Database # Replace with your actual DB type
153 |
154 | from mcp.server.fastmcp import Context, FastMCP
155 |
156 | # Create a named server
157 | mcp = FastMCP("My App")
158 |
159 | # Specify dependencies for deployment and development
160 | mcp = FastMCP("My App", dependencies=["pandas", "numpy"])
161 |
162 |
163 | @dataclass
164 | class AppContext:
165 | db: Database
166 |
167 |
168 | @asynccontextmanager
169 | async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
170 | """Manage application lifecycle with type-safe context"""
171 | # Initialize on startup
172 | db = await Database.connect()
173 | try:
174 | yield AppContext(db=db)
175 | finally:
176 | # Cleanup on shutdown
177 | await db.disconnect()
178 |
179 |
180 | # Pass lifespan to server
181 | mcp = FastMCP("My App", lifespan=app_lifespan)
182 |
183 |
184 | # Access type-safe lifespan context in tools
185 | @mcp.tool()
186 | def query_db(ctx: Context) -> str:
187 | """Tool that uses initialized resources"""
188 | db = ctx.request_context.lifespan_context["db"]
189 | return db.query()
190 | ```
191 |
192 | ### Resources
193 |
194 | Resources are how you expose data to LLMs. They're similar to GET endpoints in a REST API - they provide data but shouldn't perform significant computation or have side effects:
195 |
196 | ```python
197 | from mcp.server.fastmcp import FastMCP
198 |
199 | mcp = FastMCP("My App")
200 |
201 |
202 | @mcp.resource("config://app")
203 | def get_config() -> str:
204 | """Static configuration data"""
205 | return "App configuration here"
206 |
207 |
208 | @mcp.resource("users://{user_id}/profile")
209 | def get_user_profile(user_id: str) -> str:
210 | """Dynamic user data"""
211 | return f"Profile data for user {user_id}"
212 | ```
213 |
214 | ### Tools
215 |
216 | Tools let LLMs take actions through your server. Unlike resources, tools are expected to perform computation and have side effects:
217 |
218 | ```python
219 | import httpx
220 | from mcp.server.fastmcp import FastMCP
221 |
222 | mcp = FastMCP("My App")
223 |
224 |
225 | @mcp.tool()
226 | def calculate_bmi(weight_kg: float, height_m: float) -> float:
227 | """Calculate BMI given weight in kg and height in meters"""
228 | return weight_kg / (height_m**2)
229 |
230 |
231 | @mcp.tool()
232 | async def fetch_weather(city: str) -> str:
233 | """Fetch current weather for a city"""
234 | async with httpx.AsyncClient() as client:
235 | response = await client.get(f"https://api.weather.com/{city}")
236 | return response.text
237 | ```
238 |
239 | ### Prompts
240 |
241 | Prompts are reusable templates that help LLMs interact with your server effectively:
242 |
243 | ```python
244 | from mcp.server.fastmcp import FastMCP
245 | from mcp.server.fastmcp.prompts import base
246 |
247 | mcp = FastMCP("My App")
248 |
249 |
250 | @mcp.prompt()
251 | def review_code(code: str) -> str:
252 | return f"Please review this code:\n\n{code}"
253 |
254 |
255 | @mcp.prompt()
256 | def debug_error(error: str) -> list[base.Message]:
257 | return [
258 | base.UserMessage("I'm seeing this error:"),
259 | base.UserMessage(error),
260 | base.AssistantMessage("I'll help debug that. What have you tried so far?"),
261 | ]
262 | ```
263 |
264 | ### Images
265 |
266 | FastMCP provides an `Image` class that automatically handles image data:
267 |
268 | ```python
269 | from mcp.server.fastmcp import FastMCP, Image
270 | from PIL import Image as PILImage
271 |
272 | mcp = FastMCP("My App")
273 |
274 |
275 | @mcp.tool()
276 | def create_thumbnail(image_path: str) -> Image:
277 | """Create a thumbnail from an image"""
278 | img = PILImage.open(image_path)
279 | img.thumbnail((100, 100))
280 | return Image(data=img.tobytes(), format="png")
281 | ```
282 |
283 | ### Context
284 |
285 | The Context object gives your tools and resources access to MCP capabilities:
286 |
287 | ```python
288 | from mcp.server.fastmcp import FastMCP, Context
289 |
290 | mcp = FastMCP("My App")
291 |
292 |
293 | @mcp.tool()
294 | async def long_task(files: list[str], ctx: Context) -> str:
295 | """Process multiple files with progress tracking"""
296 | for i, file in enumerate(files):
297 | ctx.info(f"Processing {file}")
298 | await ctx.report_progress(i, len(files))
299 | data, mime_type = await ctx.read_resource(f"file://{file}")
300 | return "Processing complete"
301 | ```
302 |
303 | ## Running Your Server
304 |
305 | ### Development Mode
306 |
307 | The fastest way to test and debug your server is with the MCP Inspector:
308 |
309 | ```bash
310 | mcp dev server.py
311 |
312 | # Add dependencies
313 | mcp dev server.py --with pandas --with numpy
314 |
315 | # Mount local code
316 | mcp dev server.py --with-editable .
317 | ```
318 |
319 | ### Claude Desktop Integration
320 |
321 | Once your server is ready, install it in Claude Desktop:
322 |
323 | ```bash
324 | mcp install server.py
325 |
326 | # Custom name
327 | mcp install server.py --name "My Analytics Server"
328 |
329 | # Environment variables
330 | mcp install server.py -v API_KEY=abc123 -v DB_URL=postgres://...
331 | mcp install server.py -f .env
332 | ```
333 |
334 | ### Direct Execution
335 |
336 | For advanced scenarios like custom deployments:
337 |
338 | ```python
339 | from mcp.server.fastmcp import FastMCP
340 |
341 | mcp = FastMCP("My App")
342 |
343 | if __name__ == "__main__":
344 | mcp.run()
345 | ```
346 |
347 | Run it with:
348 | ```bash
349 | python server.py
350 | # or
351 | mcp run server.py
352 | ```
353 |
354 | ### Mounting to an Existing ASGI Server
355 |
356 | You can mount the SSE server to an existing ASGI server using the `sse_app` method. This allows you to integrate the SSE server with other ASGI applications.
357 |
358 | ```python
359 | from starlette.applications import Starlette
360 | from starlette.routes import Mount, Host
361 | from mcp.server.fastmcp import FastMCP
362 |
363 |
364 | mcp = FastMCP("My App")
365 |
366 | # Mount the SSE server to the existing ASGI server
367 | app = Starlette(
368 | routes=[
369 | Mount('/', app=mcp.sse_app()),
370 | ]
371 | )
372 |
373 | # or dynamically mount as host
374 | app.router.routes.append(Host('mcp.acme.corp', app=mcp.sse_app()))
375 | ```
376 |
377 | For more information on mounting applications in Starlette, see the [Starlette documentation](https://www.starlette.io/routing/#submounting-routes).
378 |
379 | ## Examples
380 |
381 | ### Echo Server
382 |
383 | A simple server demonstrating resources, tools, and prompts:
384 |
385 | ```python
386 | from mcp.server.fastmcp import FastMCP
387 |
388 | mcp = FastMCP("Echo")
389 |
390 |
391 | @mcp.resource("echo://{message}")
392 | def echo_resource(message: str) -> str:
393 | """Echo a message as a resource"""
394 | return f"Resource echo: {message}"
395 |
396 |
397 | @mcp.tool()
398 | def echo_tool(message: str) -> str:
399 | """Echo a message as a tool"""
400 | return f"Tool echo: {message}"
401 |
402 |
403 | @mcp.prompt()
404 | def echo_prompt(message: str) -> str:
405 | """Create an echo prompt"""
406 | return f"Please process this message: {message}"
407 | ```
408 |
409 | ### SQLite Explorer
410 |
411 | A more complex example showing database integration:
412 |
413 | ```python
414 | import sqlite3
415 |
416 | from mcp.server.fastmcp import FastMCP
417 |
418 | mcp = FastMCP("SQLite Explorer")
419 |
420 |
421 | @mcp.resource("schema://main")
422 | def get_schema() -> str:
423 | """Provide the database schema as a resource"""
424 | conn = sqlite3.connect("database.db")
425 | schema = conn.execute("SELECT sql FROM sqlite_master WHERE type='table'").fetchall()
426 | return "\n".join(sql[0] for sql in schema if sql[0])
427 |
428 |
429 | @mcp.tool()
430 | def query_data(sql: str) -> str:
431 | """Execute SQL queries safely"""
432 | conn = sqlite3.connect("database.db")
433 | try:
434 | result = conn.execute(sql).fetchall()
435 | return "\n".join(str(row) for row in result)
436 | except Exception as e:
437 | return f"Error: {str(e)}"
438 | ```
439 |
440 | ## Advanced Usage
441 |
442 | ### Low-Level Server
443 |
444 | For more control, you can use the low-level server implementation directly. This gives you full access to the protocol and allows you to customize every aspect of your server, including lifecycle management through the lifespan API:
445 |
446 | ```python
447 | from contextlib import asynccontextmanager
448 | from collections.abc import AsyncIterator
449 |
450 | from fake_database import Database # Replace with your actual DB type
451 |
452 | from mcp.server import Server
453 |
454 |
455 | @asynccontextmanager
456 | async def server_lifespan(server: Server) -> AsyncIterator[dict]:
457 | """Manage server startup and shutdown lifecycle."""
458 | # Initialize resources on startup
459 | db = await Database.connect()
460 | try:
461 | yield {"db": db}
462 | finally:
463 | # Clean up on shutdown
464 | await db.disconnect()
465 |
466 |
467 | # Pass lifespan to server
468 | server = Server("example-server", lifespan=server_lifespan)
469 |
470 |
471 | # Access lifespan context in handlers
472 | @server.call_tool()
473 | async def query_db(name: str, arguments: dict) -> list:
474 | ctx = server.request_context
475 | db = ctx.lifespan_context["db"]
476 | return await db.query(arguments["query"])
477 | ```
478 |
479 | The lifespan API provides:
480 | - A way to initialize resources when the server starts and clean them up when it stops
481 | - Access to initialized resources through the request context in handlers
482 | - Type-safe context passing between lifespan and request handlers
483 |
484 | ```python
485 | import mcp.server.stdio
486 | import mcp.types as types
487 | from mcp.server.lowlevel import NotificationOptions, Server
488 | from mcp.server.models import InitializationOptions
489 |
490 | # Create a server instance
491 | server = Server("example-server")
492 |
493 |
494 | @server.list_prompts()
495 | async def handle_list_prompts() -> list[types.Prompt]:
496 | return [
497 | types.Prompt(
498 | name="example-prompt",
499 | description="An example prompt template",
500 | arguments=[
501 | types.PromptArgument(
502 | name="arg1", description="Example argument", required=True
503 | )
504 | ],
505 | )
506 | ]
507 |
508 |
509 | @server.get_prompt()
510 | async def handle_get_prompt(
511 | name: str, arguments: dict[str, str] | None
512 | ) -> types.GetPromptResult:
513 | if name != "example-prompt":
514 | raise ValueError(f"Unknown prompt: {name}")
515 |
516 | return types.GetPromptResult(
517 | description="Example prompt",
518 | messages=[
519 | types.PromptMessage(
520 | role="user",
521 | content=types.TextContent(type="text", text="Example prompt text"),
522 | )
523 | ],
524 | )
525 |
526 |
527 | async def run():
528 | async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
529 | await server.run(
530 | read_stream,
531 | write_stream,
532 | InitializationOptions(
533 | server_name="example",
534 | server_version="0.1.0",
535 | capabilities=server.get_capabilities(
536 | notification_options=NotificationOptions(),
537 | experimental_capabilities={},
538 | ),
539 | ),
540 | )
541 |
542 |
543 | if __name__ == "__main__":
544 | import asyncio
545 |
546 | asyncio.run(run())
547 | ```
548 |
549 | ### Writing MCP Clients
550 |
551 | The SDK provides a high-level client interface for connecting to MCP servers:
552 |
553 | ```python
554 | from mcp import ClientSession, StdioServerParameters, types
555 | from mcp.client.stdio import stdio_client
556 |
557 | # Create server parameters for stdio connection
558 | server_params = StdioServerParameters(
559 | command="python", # Executable
560 | args=["example_server.py"], # Optional command line arguments
561 | env=None, # Optional environment variables
562 | )
563 |
564 |
565 | # Optional: create a sampling callback
566 | async def handle_sampling_message(
567 | message: types.CreateMessageRequestParams,
568 | ) -> types.CreateMessageResult:
569 | return types.CreateMessageResult(
570 | role="assistant",
571 | content=types.TextContent(
572 | type="text",
573 | text="Hello, world! from model",
574 | ),
575 | model="gpt-4.1-mini",
576 | stopReason="endTurn",
577 | )
578 |
579 |
580 | async def run():
581 | async with stdio_client(server_params) as (read, write):
582 | async with ClientSession(
583 | read, write, sampling_callback=handle_sampling_message
584 | ) as session:
585 | # Initialize the connection
586 | await session.initialize()
587 |
588 | # List available prompts
589 | prompts = await session.list_prompts()
590 |
591 | # Get a prompt
592 | prompt = await session.get_prompt(
593 | "example-prompt", arguments={"arg1": "value"}
594 | )
595 |
596 | # List available resources
597 | resources = await session.list_resources()
598 |
599 | # List available tools
600 | tools = await session.list_tools()
601 |
602 | # Read a resource
603 | content, mime_type = await session.read_resource("file://some/path")
604 |
605 | # Call a tool
606 | result = await session.call_tool("tool-name", arguments={"arg1": "value"})
607 |
608 |
609 | if __name__ == "__main__":
610 | import asyncio
611 |
612 | asyncio.run(run())
613 | ```
614 |
615 | ### MCP Primitives
616 |
617 | The MCP protocol defines three core primitives that servers can implement:
618 |
619 | | Primitive | Control | Description | Example Use |
620 | |-----------|-----------------------|-----------------------------------------------------|------------------------------|
621 | | Prompts | User-controlled | Interactive templates invoked by user choice | Slash commands, menu options |
622 | | Resources | Application-controlled| Contextual data managed by the client application | File contents, API responses |
623 | | Tools | Model-controlled | Functions exposed to the LLM to take actions | API calls, data updates |
624 |
625 | ### Server Capabilities
626 |
627 | MCP servers declare capabilities during initialization:
628 |
629 | | Capability | Feature Flag | Description |
630 | |-------------|------------------------------|------------------------------------|
631 | | `prompts` | `listChanged` | Prompt template management |
632 | | `resources` | `subscribe`<br/>`listChanged`| Resource exposure and updates |
633 | | `tools` | `listChanged` | Tool discovery and execution |
634 | | `logging` | - | Server logging configuration |
635 | | `completion`| - | Argument completion suggestions |
636 |
637 | ## Tool Composition Patterns
638 |
639 | When building complex workflows with MCP, effectively chaining tools together is crucial for success:
640 |
641 | ```python
642 | from mcp.server.fastmcp import FastMCP, Context
643 |
644 | mcp = FastMCP("Analytics Pipeline")
645 |
646 | @mcp.tool()
647 | async def fetch_data(source: str, date_range: str, ctx: Context) -> str:
648 | """Fetch raw data from a source for analysis"""
649 | # Fetch operation that might be slow
650 | await ctx.report_progress(0.3, 1.0)
651 | return f"Data from {source} for {date_range}"
652 |
653 | @mcp.tool()
654 | def transform_data(raw_data: str, format_type: str = "json") -> dict:
655 | """Transform raw data into structured format"""
656 | # Data transformation logic
657 | return {"processed": raw_data, "format": format_type}
658 |
659 | @mcp.tool()
660 | def analyze_data(data: dict, metric: str) -> str:
661 | """Analyze transformed data with specific metrics"""
662 | # Analysis logic
663 | return f"Analysis of {metric}: Result based on {data['processed']}"
664 |
665 | # Usage pattern (for LLMs):
666 | # 1. First fetch the raw data
667 | # 2. Transform the fetched data
668 | # 3. Then analyze the transformed result
669 | ```
670 |
671 | **Pattern: Sequential Dependency Chain**
672 | ```
673 | fetch_data → transform_data → analyze_data
674 | ```
675 |
676 | **Pattern: Parallel Processing with Aggregation**
677 | ```python
678 | @mcp.tool()
679 | async def parallel_process(sources: list[str], ctx: Context) -> dict:
680 | """Process multiple sources in parallel and aggregate results"""
681 | results = {}
682 | for i, source in enumerate(sources):
683 | # Get data for each source (these could be separate tool calls)
684 | data = await fetch_data(source, "last_week", ctx)
685 | transformed = transform_data(data)
686 | results[source] = transformed
687 | await ctx.report_progress(i / len(sources), 1.0)
688 | return results
689 | ```
690 |
691 | ## Error Recovery Strategies
692 |
693 | When tools fail or return unexpected results, LLMs should follow these recovery patterns:
694 |
695 | **Strategy: Retry with Backoff**
696 | ```python
697 | @mcp.tool()
698 | async def resilient_operation(resource_id: str, ctx: Context) -> str:
699 | """Example of resilient operation with retry logic"""
700 | MAX_ATTEMPTS = 3
701 | for attempt in range(1, MAX_ATTEMPTS + 1):
702 | try:
703 | # Attempt the operation
704 | return f"Successfully processed {resource_id}"
705 | except Exception as e:
706 | if attempt == MAX_ATTEMPTS:
707 | # If final attempt, report the failure clearly
708 | ctx.warning(f"Operation failed after {MAX_ATTEMPTS} attempts: {str(e)}")
709 | return f"ERROR: Could not process {resource_id} - {str(e)}"
710 | # For earlier attempts, log and retry
711 | ctx.info(f"Attempt {attempt} failed, retrying...")
712 | await asyncio.sleep(2 ** attempt) # Exponential backoff
713 | ```
714 |
715 | **Strategy: Fallback Chain**
716 | ```python
717 | @mcp.tool()
718 | async def get_data_with_fallbacks(primary_source: str, fallback_sources: list[str] = None) -> dict:
719 | """Try multiple data sources in order until one succeeds"""
720 | sources = [primary_source] + (fallback_sources or [])
721 |
722 | errors = []
723 | for source in sources:
724 | try:
725 | # Try to get data from this source
726 | result = {"source": source, "data": f"Data from {source}"}
727 | return result
728 | except Exception as e:
729 | # Record the error and try the next source
730 | errors.append(f"{source}: {str(e)}")
731 |
732 | # If all sources failed, return a clear error with history
733 | return {"error": "All sources failed", "attempts": errors}
734 | ```
735 |
736 | **Error Reporting Best Practices**
737 | - Always return structured error information (not just exception text)
738 | - Include specific error codes when possible
739 | - Provide actionable suggestions for recovery
740 | - Log detailed error context for debugging
741 |
742 | ## Resource Selection Optimization
743 |
744 | Efficiently managing resources within context limits requires strategic selection:
745 |
746 | **Progressive Loading Pattern**
747 | ```python
748 | @mcp.tool()
749 | async def analyze_document(doc_uri: str, ctx: Context) -> str:
750 | """Analyze a document with progressively loaded sections"""
751 | # First load metadata for quick access
752 | metadata = await ctx.read_resource(f"{doc_uri}/metadata")
753 |
754 | # Based on metadata, selectively load relevant sections
755 | relevant_sections = identify_relevant_sections(metadata)
756 |
757 | # Only load sections that are actually needed
758 | section_data = {}
759 | for section in relevant_sections:
760 | section_data[section] = await ctx.read_resource(f"{doc_uri}/sections/{section}")
761 |
762 | # Process with only the necessary context
763 | return f"Analysis of {len(section_data)} relevant sections"
764 | ```
765 |
766 | **Context Budget Management**
767 | ```python
768 | @mcp.tool()
769 | async def summarize_large_dataset(dataset_uri: str, ctx: Context) -> str:
770 | """Summarize a large dataset while respecting context limits"""
771 | # Get total size to plan the approach
772 | metadata = await ctx.read_resource(f"{dataset_uri}/metadata")
773 | total_size = metadata.get("size_kb", 0)
774 |
775 | if total_size > 100: # Arbitrary threshold
776 | # For large datasets, use chunking approach
777 | chunks = await ctx.read_resource(f"{dataset_uri}/summary_chunks")
778 | return f"Summary of {len(chunks)} chunks: {', '.join(chunks)}"
779 | else:
780 | # For smaller datasets, process everything at once
781 | full_data = await ctx.read_resource(dataset_uri)
782 | return f"Complete analysis of {dataset_uri}"
783 | ```
784 |
785 | **Resource Relevance Filtering**
786 | - Focus on the most recent/relevant data first
787 | - Filter resources to match the specific query intent
788 | - Use metadata to decide which resources to load
789 | - Prefer sampling representative data over loading everything
790 |
791 | ## Documentation
792 |
793 | - [Model Context Protocol documentation](https://modelcontextprotocol.io)
794 | - [Model Context Protocol specification](https://spec.modelcontextprotocol.io)
795 | - [Officially supported servers](https://github.com/modelcontextprotocol/servers)
796 |
797 | ## Contributing
798 |
799 | We are passionate about supporting contributors of all levels of experience and would love to see you get involved in the project. See the [contributing guide](CONTRIBUTING.md) to get started.
800 |
801 | ## License
802 |
803 | This project is licensed under the MIT License - see the LICENSE file for details.
```