#
tokens: 49114/50000 8/207 files (page 8/45)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 8 of 45. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/document.py:
--------------------------------------------------------------------------------

```python
  1 | """Document processing service for chunking and analyzing text documents."""
  2 | import re
  3 | from typing import List
  4 | 
  5 | from ultimate_mcp_server.utils import get_logger
  6 | 
  7 | logger = get_logger(__name__)
  8 | 
  9 | 
 10 | class DocumentProcessor:
 11 |     """
 12 |     Service for intelligent text document processing, chunking, and preparation.
 13 |     
 14 |     The DocumentProcessor provides sophisticated document handling capabilities
 15 |     focused on breaking down long documents into meaningful, properly-sized chunks
 16 |     optimized for various downstream NLP tasks such as embedding generation,
 17 |     semantic search, and RAG (Retrieval Augmented Generation).
 18 |     
 19 |     Key Features:
 20 |     - Multiple chunking strategies optimized for different content types
 21 |     - Configurable chunk size and overlap parameters
 22 |     - Semantic-aware chunking that preserves context and meaning
 23 |     - Sentence boundary detection for natural text segmentation
 24 |     - Token-based chunking for precise size control
 25 |     - Singleton implementation for efficient resource usage
 26 |     
 27 |     Chunking Methods:
 28 |     1. Semantic Chunking: Preserves paragraph structure and semantic meaning,
 29 |        preventing splits that would break logical content boundaries. Best for
 30 |        maintaining context in well-structured documents.
 31 |     
 32 |     2. Sentence Chunking: Splits documents at sentence boundaries, ensuring
 33 |        no sentence is broken across chunks. Ideal for natural language text
 34 |        where sentence integrity is important.
 35 |     
 36 |     3. Token Chunking: Divides text based on approximate token counts without
 37 |        special consideration for semantic boundaries. Provides the most precise
 38 |        control over chunk size for token-limited systems.
 39 |     
 40 |     Each method implements configurable overlap between chunks to maintain
 41 |     context across chunk boundaries, ensuring information isn't lost when a
 42 |     concept spans multiple chunks.
 43 |     
 44 |     Usage Example:
 45 |     ```python
 46 |     processor = get_document_processor()
 47 |     
 48 |     # Chunk a document with default settings (token-based)
 49 |     chunks = await processor.chunk_document(
 50 |         document=long_text,
 51 |         chunk_size=1000,
 52 |         chunk_overlap=200
 53 |     )
 54 |     
 55 |     # Use semantic chunking for a well-structured document
 56 |     semantic_chunks = await processor.chunk_document(
 57 |         document=article_text,
 58 |         chunk_size=1500,
 59 |         chunk_overlap=150,
 60 |         method="semantic"
 61 |     )
 62 |     
 63 |     # Process chunks for embedding or RAG
 64 |     for chunk in chunks:
 65 |         # Process each chunk...
 66 |     ```
 67 |     
 68 |     Note:
 69 |         This service implements the singleton pattern, ensuring only one instance
 70 |         exists throughout the application. Always use the get_document_processor()
 71 |         function to obtain the shared instance rather than creating instances directly.
 72 |     """
 73 |     
 74 |     _instance = None
 75 |     
 76 |     def __new__(cls, *args, **kwargs):
 77 |         """Create a singleton instance."""
 78 |         if cls._instance is None:
 79 |             cls._instance = super(DocumentProcessor, cls).__new__(cls)
 80 |             cls._instance._initialized = False
 81 |         return cls._instance
 82 |     
 83 |     def __init__(self):
 84 |         """Initialize the document processor."""
 85 |         # Only initialize once for singleton
 86 |         if getattr(self, "_initialized", False):
 87 |             return
 88 |             
 89 |         logger.info("Document processor initialized", extra={"emoji_key": "success"})
 90 |         self._initialized = True
 91 |     
 92 |     async def chunk_document(
 93 |         self,
 94 |         document: str,
 95 |         chunk_size: int = 1000,
 96 |         chunk_overlap: int = 200,
 97 |         method: str = "token"
 98 |     ) -> List[str]:
 99 |         """
100 |         Split a document into optimally sized, potentially overlapping chunks.
101 |         
102 |         This method intelligently divides a document into smaller segments using
103 |         one of several chunking strategies, balancing chunk size requirements with
104 |         preserving semantic coherence. The chunking process is critical for preparing
105 |         documents for embedding, retrieval, and other NLP operations that have
106 |         input size limitations or benefit from focused context.
107 |         
108 |         Chunking Methods:
109 |         - "token": (Default) Splits text based on approximate token count.
110 |           Simple and precise for size control, but may break semantic units.
111 |         - "sentence": Preserves sentence boundaries, ensuring no sentence is broken
112 |           across chunks. Better for maintaining local context and readability.
113 |         - "semantic": Most sophisticated approach that attempts to preserve paragraph
114 |           structure and semantic coherence. Best for maintaining document meaning
115 |           but may result in more size variation between chunks.
116 |         
117 |         The chunk_size parameter is approximate for all methods, as they prioritize
118 |         maintaining semantic boundaries where appropriate. The actual size of returned
119 |         chunks may vary, especially when using sentence or semantic methods.
120 |         
121 |         Chunk overlap creates a sliding window effect, where the end of one chunk
122 |         overlaps with the beginning of the next. This helps maintain context across
123 |         chunk boundaries and improves retrieval quality by ensuring concepts that
124 |         span multiple chunks can still be found.
125 |         
126 |         Selecting Parameters:
127 |         - For embedding models with strict token limits: Use "token" with chunk_size
128 |           set safely below the model's limit
129 |         - For maximizing context preservation: Use "semantic" with larger overlap
130 |         - For balancing size precision and sentence integrity: Use "sentence"
131 |         - Larger overlap (25-50% of chunk_size) improves retrieval quality but
132 |           increases storage and processing requirements
133 |         
134 |         Args:
135 |             document: Text content to be chunked
136 |             chunk_size: Target size of each chunk in approximate tokens (default: 1000)
137 |             chunk_overlap: Number of tokens to overlap between chunks (default: 200)
138 |             method: Chunking strategy to use ("token", "sentence", or "semantic")
139 |             
140 |         Returns:
141 |             List of text chunks derived from the original document
142 |             
143 |         Note:
144 |             Returns an empty list if the input document is empty or None.
145 |             The token estimation is approximate and based on whitespace splitting,
146 |             not a true tokenizer, so actual token counts may differ when processed
147 |             by specific models.
148 |         """
149 |         if not document:
150 |             return []
151 |             
152 |         logger.debug(
153 |             f"Chunking document using method '{method}' (size: {chunk_size}, overlap: {chunk_overlap})",
154 |             extra={"emoji_key": "processing"}
155 |         )
156 |         
157 |         if method == "semantic":
158 |             return await self._chunk_semantic(document, chunk_size, chunk_overlap)
159 |         elif method == "sentence":
160 |             return await self._chunk_by_sentence(document, chunk_size, chunk_overlap)
161 |         else:
162 |             # Default to token-based chunking
163 |             return await self._chunk_by_tokens(document, chunk_size, chunk_overlap)
164 |     
165 |     async def _chunk_by_tokens(
166 |         self,
167 |         document: str,
168 |         chunk_size: int = 1000,
169 |         chunk_overlap: int = 200
170 |     ) -> List[str]:
171 |         """
172 |         Split document into chunks by approximate token count without preserving semantic structures.
173 |         
174 |         This is the most straightforward chunking method, dividing text based solely
175 |         on approximate token counts without special consideration for sentence or
176 |         paragraph boundaries. It provides the most predictable and precise control
177 |         over chunk sizes at the cost of potentially breaking semantic units like
178 |         sentences or paragraphs.
179 |         
180 |         Algorithm implementation:
181 |         1. Approximates tokens by splitting text on whitespace (creating "words")
182 |         2. Divides the document into chunks of specified token length
183 |         3. Implements sliding window overlaps between consecutive chunks
184 |         4. Handles edge cases like empty documents and final chunks
185 |         
186 |         The token approximation used is simple whitespace splitting, which provides
187 |         a reasonable estimation for most Western languages and common tokenization
188 |         schemes. While not as accurate as model-specific tokenizers, it offers a
189 |         good balance between performance and approximation quality for general use.
190 |         
191 |         Chunk overlap is implemented by including tokens from the end of one chunk
192 |         at the beginning of the next, creating a sliding window effect that helps
193 |         maintain context across chunk boundaries.
194 |         
195 |         This method is ideal for:
196 |         - Working with strict token limits in downstream models
197 |         - Processing text where exact chunk sizes are more important than 
198 |           preserving semantic structures
199 |         - High-volume processing where simplicity and performance are priorities
200 |         - Text with unusual or inconsistent formatting where sentence/paragraph
201 |           detection might fail
202 |         
203 |         Args:
204 |             document: Text content to split by tokens
205 |             chunk_size: Number of tokens (words) per chunk
206 |             chunk_overlap: Number of tokens to overlap between chunks
207 |             
208 |         Returns:
209 |             List of text chunks of approximately equal token counts
210 |             
211 |         Note:
212 |             True token counts in NLP models may differ from this approximation,
213 |             especially for models with subword tokenization. For applications
214 |             requiring exact token counts, consider using the model's specific
215 |             tokenizer for more accurate size estimates.
216 |         """
217 |         # Simple token estimation (split by whitespace)
218 |         words = document.split()
219 |         
220 |         # No words, return empty list
221 |         if not words:
222 |             return []
223 |             
224 |         # Simple chunking
225 |         chunks = []
226 |         start = 0
227 |         
228 |         while start < len(words):
229 |             # Calculate end position with potential overlap
230 |             end = min(start + chunk_size, len(words))
231 |             
232 |             # Create chunk
233 |             chunk = " ".join(words[start:end])
234 |             chunks.append(chunk)
235 |             
236 |             # Move to next chunk with overlap
237 |             start = end - chunk_overlap
238 |             
239 |             # Avoid getting stuck at the end
240 |             if start >= len(words) - chunk_overlap:
241 |                 break
242 |         
243 |         logger.debug(
244 |             f"Split document into {len(chunks)} chunks by token",
245 |             extra={"emoji_key": "processing"}
246 |         )
247 |         
248 |         return chunks
249 |     
250 |     async def _chunk_by_sentence(
251 |         self,
252 |         document: str,
253 |         chunk_size: int = 1000,
254 |         chunk_overlap: int = 200
255 |     ) -> List[str]:
256 |         """
257 |         Split document into chunks by preserving complete sentences.
258 |         
259 |         This chunking method respects sentence boundaries when dividing documents,
260 |         ensuring that no sentence is fragmented across multiple chunks. It balances
261 |         chunk size requirements with maintaining the integrity of natural language
262 |         structures, producing more readable and semantically coherent chunks than
263 |         simple token-based approaches.
264 |         
265 |         Algorithm details:
266 |         1. Detects sentence boundaries using regular expressions that handle:
267 |            - Standard end punctuation (.!?)
268 |            - Common abbreviations (Mr., Dr., etc.)
269 |            - Edge cases like decimal numbers or acronyms
270 |         2. Builds chunks by adding complete sentences until the target chunk size
271 |            is approached
272 |         3. Creates overlap between chunks by including ending sentences from the
273 |            previous chunk at the beginning of the next chunk
274 |         4. Maintains approximate token count targets while prioritizing sentence
275 |            integrity
276 |         
277 |         The sentence detection uses a regex pattern that aims to balance accuracy
278 |         with simplicity and efficiency. It identifies likely sentence boundaries by:
279 |         - Looking for punctuation marks followed by whitespace
280 |         - Excluding common patterns that are not sentence boundaries (e.g., "Mr.")
281 |         - Handling basic cases like quotes and parentheses
282 |         
283 |         This method is ideal for:
284 |         - Natural language text where sentence flow is important
285 |         - Content where breaking mid-sentence would harm readability or context
286 |         - General purpose document processing where semantic units matter
287 |         - Documents that don't have clear paragraph structure
288 |         
289 |         Args:
290 |             document: Text content to split by sentences
291 |             chunk_size: Target approximate size per chunk in tokens
292 |             chunk_overlap: Number of tokens to overlap between chunks
293 |             
294 |         Returns:
295 |             List of document chunks with complete sentences
296 |             
297 |         Note:
298 |             The sentence detection uses regex patterns that work well for most
299 |             standard English text but may not handle all edge cases perfectly.
300 |             For specialized text with unusual punctuation patterns, additional
301 |             customization may be needed.
302 |         """
303 |         # Simple sentence splitting (not perfect but works for most cases)
304 |         sentence_delimiters = r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s'
305 |         sentences = re.split(sentence_delimiters, document)
306 |         sentences = [s.strip() for s in sentences if s.strip()]
307 |         
308 |         # No sentences, return empty list
309 |         if not sentences:
310 |             return []
311 |             
312 |         # Chunk by sentences, trying to reach target size
313 |         chunks = []
314 |         current_chunk = []
315 |         current_size = 0
316 |         
317 |         for sentence in sentences:
318 |             # Estimate size in tokens (approximate)
319 |             sentence_size = len(sentence.split())
320 |             
321 |             # If adding this sentence exceeds the chunk size and we have content,
322 |             # finalize the current chunk
323 |             if current_chunk and current_size + sentence_size > chunk_size:
324 |                 chunks.append(" ".join(current_chunk))
325 |                 
326 |                 # Start new chunk with overlap
327 |                 overlap_size = 0
328 |                 overlap_chunk = []
329 |                 
330 |                 # Add sentences from the end of previous chunk for overlap
331 |                 for s in reversed(current_chunk):
332 |                     s_size = len(s.split())
333 |                     if overlap_size + s_size <= chunk_overlap:
334 |                         overlap_chunk.insert(0, s)
335 |                         overlap_size += s_size
336 |                     else:
337 |                         break
338 |                 
339 |                 current_chunk = overlap_chunk
340 |                 current_size = overlap_size
341 |             
342 |             # Add current sentence
343 |             current_chunk.append(sentence)
344 |             current_size += sentence_size
345 |         
346 |         # Add the last chunk if not empty
347 |         if current_chunk:
348 |             chunks.append(" ".join(current_chunk))
349 |         
350 |         logger.debug(
351 |             f"Split document into {len(chunks)} chunks by sentence",
352 |             extra={"emoji_key": "processing"}
353 |         )
354 |         
355 |         return chunks
356 |     
357 |     async def _chunk_semantic(
358 |         self,
359 |         document: str,
360 |         chunk_size: int = 1000,
361 |         chunk_overlap: int = 200
362 |     ) -> List[str]:
363 |         """
364 |         Split document into chunks by semantic meaning, preserving paragraph structure.
365 |         
366 |         This advanced chunking method attempts to maintain the semantic coherence and
367 |         natural structure of the document by respecting paragraph boundaries whenever
368 |         possible. It implements a hierarchical approach that:
369 |         
370 |         1. First divides the document by paragraph breaks (blank lines)
371 |         2. Evaluates each paragraph for length
372 |         3. Keeps short and medium paragraphs intact to preserve their meaning
373 |         4. Further splits overly long paragraphs using sentence boundary detection
374 |         5. Assembles chunks with appropriate overlap for context continuity
375 |         
376 |         The algorithm prioritizes three key aspects of document structure:
377 |         - Paragraph integrity: Treats paragraphs as coherent units of thought
378 |         - Logical flow: Maintains document organization when possible
379 |         - Size constraints: Respects chunk size limitations for downstream processing
380 |         
381 |         Implementation details:
382 |         - Double newlines (\n\n) are treated as paragraph boundaries
383 |         - If a document lacks clear paragraph structure (e.g., single paragraph),
384 |           it falls back to sentence-based chunking
385 |         - For paragraphs exceeding the chunk size, sentence-based chunking is applied
386 |         - Context preservation is achieved by ensuring the last paragraph of a chunk
387 |           becomes the first paragraph of the next chunk (when appropriate)
388 |         
389 |         This method is ideal for:
390 |         - Well-structured documents like articles, papers, or reports
391 |         - Content where paragraph organization conveys meaning
392 |         - Documents where natural breaks exist between conceptual sections
393 |         - Cases where preserving document structure improves retrieval quality
394 |         
395 |         Args:
396 |             document: Text content to split semantically
397 |             chunk_size: Maximum approximate size per chunk in tokens
398 |             chunk_overlap: Number of tokens to overlap between chunks
399 |             
400 |         Returns:
401 |             List of semantic chunks with paragraph structure preserved
402 |             
403 |         Note:
404 |             Chunk sizes may vary more with semantic chunking than with other methods,
405 |             as maintaining coherent paragraph groups takes precedence over exact
406 |             size enforcement. For strict size control, use token-based chunking.
407 |         """
408 |         # For simplicity, this implementation is similar to sentence chunking
409 |         # but with paragraph awareness
410 |         
411 |         # Split by paragraphs first
412 |         paragraphs = [p.strip() for p in document.split("\n\n") if p.strip()]
413 |         
414 |         # Fallback to sentence chunking if no clear paragraphs
415 |         if len(paragraphs) <= 1:
416 |             return await self._chunk_by_sentence(document, chunk_size, chunk_overlap)
417 |         
418 |         # Process each paragraph and create semantic chunks
419 |         chunks = []
420 |         current_chunk = []
421 |         current_size = 0
422 |         
423 |         for paragraph in paragraphs:
424 |             # Estimate size in tokens
425 |             paragraph_size = len(paragraph.split())
426 |             
427 |             # If paragraph is very large, chunk it further
428 |             if paragraph_size > chunk_size:
429 |                 # Add current chunk if not empty
430 |                 if current_chunk:
431 |                     chunks.append("\n\n".join(current_chunk))
432 |                     current_chunk = []
433 |                     current_size = 0
434 |                 
435 |                 # Chunk large paragraph by sentences
436 |                 paragraph_chunks = await self._chunk_by_sentence(
437 |                     paragraph, chunk_size, chunk_overlap
438 |                 )
439 |                 chunks.extend(paragraph_chunks)
440 |                 continue
441 |             
442 |             # If adding this paragraph exceeds the chunk size and we have content,
443 |             # finalize the current chunk
444 |             if current_chunk and current_size + paragraph_size > chunk_size:
445 |                 chunks.append("\n\n".join(current_chunk))
446 |                 
447 |                 # Start new chunk with last paragraph for better context
448 |                 if current_chunk[-1] != paragraph and len(current_chunk) > 0:
449 |                     current_chunk = [current_chunk[-1]]
450 |                     current_size = len(current_chunk[-1].split())
451 |                 else:
452 |                     current_chunk = []
453 |                     current_size = 0
454 |             
455 |             # Add current paragraph
456 |             current_chunk.append(paragraph)
457 |             current_size += paragraph_size
458 |         
459 |         # Add the last chunk if not empty
460 |         if current_chunk:
461 |             chunks.append("\n\n".join(current_chunk))
462 |         
463 |         logger.debug(
464 |             f"Split document into {len(chunks)} chunks semantically",
465 |             extra={"emoji_key": "processing"}
466 |         )
467 |         
468 |         return chunks
469 | 
470 | 
471 | # Singleton instance
472 | _document_processor = None
473 | 
474 | 
475 | def get_document_processor() -> DocumentProcessor:
476 |     """
477 |     Get or create the singleton DocumentProcessor instance.
478 |     
479 |     This function implements the singleton pattern for the DocumentProcessor class,
480 |     ensuring that only one instance is created and shared throughout the application.
481 |     It provides a consistent, centralized access point for document processing
482 |     capabilities while conserving system resources.
483 |     
484 |     Using a singleton for the DocumentProcessor offers several benefits:
485 |     - Resource efficiency: Prevents multiple instantiations of the processor
486 |     - Consistency: Ensures all components use the same processing configuration
487 |     - Centralized access: Provides a clean API for obtaining the processor
488 |     - Lazy initialization: Creates the instance only when first needed
489 |     
490 |     This function should be used instead of directly instantiating the
491 |     DocumentProcessor class to maintain the singleton pattern and ensure
492 |     proper initialization.
493 |     
494 |     Returns:
495 |         The shared DocumentProcessor instance
496 |         
497 |     Usage Example:
498 |     ```python
499 |     # Get the document processor from anywhere in the codebase
500 |     processor = get_document_processor()
501 |     
502 |     # Use the processor's methods
503 |     chunks = await processor.chunk_document(document_text)
504 |     ```
505 |     
506 |     Note:
507 |         Even though the DocumentProcessor class itself implements singleton logic in
508 |         its __new__ method, this function is the preferred access point as it handles
509 |         the global instance management and follows the established pattern used
510 |         throughout the MCP server codebase.
511 |     """
512 |     global _document_processor
513 |     
514 |     if _document_processor is None:
515 |         _document_processor = DocumentProcessor()
516 |         
517 |     return _document_processor 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/rag.py:
--------------------------------------------------------------------------------

```python
  1 | """MCP tools for Retrieval-Augmented Generation (RAG).
  2 | 
  3 | Provides functions to create, manage, and query knowledge bases (vector stores)
  4 | and generate text responses augmented with retrieved context.
  5 | """
  6 | import re
  7 | from typing import Any, Dict, List, Optional
  8 | 
  9 | # Import specific exceptions for better error handling hints
 10 | from ultimate_mcp_server.exceptions import ProviderError, ResourceError, ToolInputError
 11 | from ultimate_mcp_server.services import get_rag_engine
 12 | 
 13 | # Moved imports for services to the top level
 14 | from ultimate_mcp_server.services.knowledge_base import (
 15 |     get_knowledge_base_manager,
 16 |     get_knowledge_base_retriever,
 17 | )
 18 | from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
 19 | from ultimate_mcp_server.utils import get_logger
 20 | 
 21 | logger = get_logger(__name__)
 22 | 
 23 | # --- Service Lazy Initialization ---
 24 | 
 25 | _kb_manager = None
 26 | _kb_retriever = None
 27 | _rag_engine = None
 28 | 
 29 | def _get_kb_manager():
 30 |     """Lazily initializes and returns the Knowledge Base Manager."""
 31 |     global _kb_manager
 32 |     if _kb_manager is None:
 33 |         logger.debug("Initializing KnowledgeBaseManager...")
 34 |         _kb_manager = get_knowledge_base_manager()
 35 |         logger.info("KnowledgeBaseManager initialized.")
 36 |     return _kb_manager
 37 | 
 38 | def _get_kb_retriever():
 39 |     """Lazily initializes and returns the Knowledge Base Retriever."""
 40 |     global _kb_retriever
 41 |     if _kb_retriever is None:
 42 |         logger.debug("Initializing KnowledgeBaseRetriever...")
 43 |         _kb_retriever = get_knowledge_base_retriever()
 44 |         logger.info("KnowledgeBaseRetriever initialized.")
 45 |     return _kb_retriever
 46 | 
 47 | def _get_rag_engine():
 48 |     """Lazily initializes and returns the RAG Engine."""
 49 |     global _rag_engine
 50 |     if _rag_engine is None:
 51 |         logger.debug("Initializing RAGEngine...")
 52 |         _rag_engine = get_rag_engine()
 53 |         logger.info("RAGEngine initialized.")
 54 |     return _rag_engine
 55 | 
 56 | # --- Standalone Tool Functions ---
 57 | 
 58 | @with_tool_metrics
 59 | @with_error_handling
 60 | async def create_knowledge_base(
 61 |     name: str,
 62 |     description: Optional[str] = None,
 63 |     embedding_model: Optional[str] = None,
 64 |     overwrite: bool = False
 65 | ) -> Dict[str, Any]:
 66 |     """Creates a new, empty knowledge base (vector store) to hold documents.
 67 | 
 68 |     This is the first step before adding documents.
 69 | 
 70 |     Args:
 71 |         name: A unique name for the knowledge base (e.g., "project_docs_v1").
 72 |               Must be a valid identifier (letters, numbers, underscores).
 73 |         description: (Optional) A brief description of the knowledge base's content or purpose.
 74 |         embedding_model: (Optional) The specific embedding model ID to use for this knowledge base
 75 |                          (e.g., "openai/text-embedding-3-small"). If None, uses the system default.
 76 |                          Consistency is important; use the same model when adding documents later.
 77 |         overwrite: (Optional) If True, deletes and recreates the knowledge base if one with the
 78 |                    same name already exists. Defaults to False (raises an error if exists).
 79 | 
 80 |     Returns:
 81 |         A dictionary confirming the creation:
 82 |         {
 83 |             "success": true,
 84 |             "name": "project_docs_v1",
 85 |             "message": "Knowledge base 'project_docs_v1' created successfully."
 86 |         }
 87 |         or an error dictionary if creation failed:
 88 |         {
 89 |             "success": false,
 90 |             "name": "project_docs_v1",
 91 |             "error": "Knowledge base 'project_docs_v1' already exists."
 92 |         }
 93 | 
 94 |     Raises:
 95 |         ResourceError: If the knowledge base already exists (and overwrite=False) or
 96 |                         if there's an issue during creation (e.g., invalid name).
 97 |         ToolInputError: If the provided name is invalid.
 98 |     """
 99 |     # Input validation (basic example)
100 |     if not name or not re.match(r"^[a-zA-Z0-9_]+$", name):
101 |         raise ToolInputError(f"Invalid knowledge base name: '{name}'. Use only letters, numbers, underscores.")
102 | 
103 |     kb_manager = _get_kb_manager() # Use lazy getter
104 |     try:
105 |         result = await kb_manager.create_knowledge_base(
106 |             name=name,
107 |             description=description,
108 |             embedding_model=embedding_model,
109 |             overwrite=overwrite
110 |         )
111 |         return result
112 |     except Exception as e:
113 |         logger.error(f"Failed to create knowledge base '{name}': {e}", exc_info=True)
114 |         # Re-raise specific error if possible, otherwise wrap
115 |         if isinstance(e, (ResourceError, ToolInputError)): 
116 |             raise
117 |         raise ResourceError(f"Failed to create knowledge base '{name}': {str(e)}", resource_type="knowledge_base", resource_id=name, cause=e) from e
118 | 
119 | @with_tool_metrics
120 | @with_error_handling
121 | async def list_knowledge_bases() -> Dict[str, Any]:
122 |     """Lists all available knowledge bases and their metadata.
123 | 
124 |     Returns:
125 |         A dictionary containing a list of knowledge base details:
126 |         {
127 |             "success": true,
128 |             "knowledge_bases": [
129 |                 {
130 |                     "name": "project_docs_v1",
131 |                     "description": "Documentation for Project X",
132 |                     "embedding_model": "openai/text-embedding-3-small",
133 |                     "document_count": 150,
134 |                     "created_at": "2023-10-27T10:00:00Z"
135 |                 },
136 |                 { ... } # Other knowledge bases
137 |             ]
138 |         }
139 |         or an error dictionary:
140 |         {
141 |             "success": false,
142 |             "error": "Failed to retrieve knowledge base list."
143 |         }
144 |     Raises:
145 |         ResourceError: If there's an issue retrieving the list from the backend.
146 |     """
147 |     kb_manager = _get_kb_manager()
148 |     try:
149 |         result = await kb_manager.list_knowledge_bases()
150 |         return result
151 |     except Exception as e:
152 |         logger.error(f"Failed to list knowledge bases: {e}", exc_info=True)
153 |         raise ResourceError(f"Failed to list knowledge bases: {str(e)}", resource_type="knowledge_base", cause=e) from e
154 | 
155 | @with_tool_metrics
156 | @with_error_handling
157 | async def delete_knowledge_base(name: str) -> Dict[str, Any]:
158 |     """Deletes an existing knowledge base and all its documents.
159 | 
160 |     Warning: This action is irreversible.
161 | 
162 |     Args:
163 |         name: The exact name of the knowledge base to delete.
164 | 
165 |     Returns:
166 |         A dictionary confirming the deletion:
167 |         {
168 |             "success": true,
169 |             "name": "project_docs_v1",
170 |             "message": "Knowledge base 'project_docs_v1' deleted successfully."
171 |         }
172 |         or an error dictionary:
173 |         {
174 |             "success": false,
175 |             "name": "project_docs_v1",
176 |             "error": "Knowledge base 'project_docs_v1' not found."
177 |         }
178 | 
179 |     Raises:
180 |         ResourceError: If the knowledge base doesn't exist or if deletion fails.
181 |         ToolInputError: If the provided name is invalid.
182 |     """
183 |     if not name:
184 |         raise ToolInputError("Knowledge base name cannot be empty.")
185 | 
186 |     kb_manager = _get_kb_manager()
187 |     try:
188 |         result = await kb_manager.delete_knowledge_base(name)
189 |         return result
190 |     except Exception as e:
191 |         logger.error(f"Failed to delete knowledge base '{name}': {e}", exc_info=True)
192 |         if isinstance(e, (ResourceError, ToolInputError)): 
193 |             raise
194 |         raise ResourceError(f"Failed to delete knowledge base '{name}': {str(e)}", resource_type="knowledge_base", resource_id=name, cause=e) from e
195 | 
196 | @with_tool_metrics
197 | @with_error_handling
198 | async def add_documents(
199 |     knowledge_base_name: str,
200 |     documents: List[str],
201 |     metadatas: Optional[List[Dict[str, Any]]] = None,
202 |     chunk_size: int = 1000,
203 |     chunk_overlap: int = 200,
204 |     chunk_method: str = "semantic",
205 |     embedding_model: Optional[str] = None
206 | ) -> Dict[str, Any]:
207 |     """Adds one or more documents to a specified knowledge base.
208 | 
209 |     The documents are split into chunks, embedded, and stored for later retrieval.
210 | 
211 |     Args:
212 |         knowledge_base_name: The name of the existing knowledge base to add documents to.
213 |         documents: A list of strings, where each string is the full text content of a document.
214 |         metadatas: (Optional) A list of dictionaries, one for each document in the `documents` list.
215 |                    Each dictionary should contain metadata relevant to the corresponding document
216 |                    (e.g., {"source": "filename.pdf", "page": 1, "author": "Alice"}).
217 |                    This metadata is stored alongside the document chunks and can be used for filtering during retrieval.
218 |                    If provided, `len(metadatas)` MUST equal `len(documents)`.
219 |         chunk_size: (Optional) The target size for document chunks. Interpretation depends on `chunk_method`
220 |                     (e.g., tokens for "token" method, characters for "character", approximate size for "semantic").
221 |                     Defaults to 1000.
222 |         chunk_overlap: (Optional) The number of units (tokens, characters) to overlap between consecutive chunks.
223 |                        Helps maintain context across chunk boundaries. Defaults to 200.
224 |         chunk_method: (Optional) The method used for splitting documents into chunks.
225 |                       Options: "semantic" (attempts to split at meaningful semantic boundaries, recommended),
226 |                       "token" (splits by token count using tiktoken), "sentence" (splits by sentence).
227 |                       Defaults to "semantic".
228 |         embedding_model: (Optional) The specific embedding model ID to use. If None, uses the model
229 |                          associated with the knowledge base (or the system default if none was specified
230 |                          at creation). It's best practice to ensure this matches the KB's model.
231 | 
232 |     Returns:
233 |         A dictionary summarizing the addition process:
234 |         {
235 |             "success": true,
236 |             "knowledge_base_name": "project_docs_v1",
237 |             "documents_added": 5,
238 |             "chunks_created": 75,
239 |             "message": "Successfully added 5 documents (75 chunks) to 'project_docs_v1'."
240 |         }
241 |         or an error dictionary:
242 |         {
243 |             "success": false,
244 |             "knowledge_base_name": "project_docs_v1",
245 |             "error": "Knowledge base 'project_docs_v1' not found."
246 |         }
247 | 
248 |     Raises:
249 |         ResourceError: If the knowledge base doesn't exist or if there's an error during processing/storage.
250 |         ToolInputError: If inputs are invalid (e.g., documents/metadatas length mismatch, invalid chunk_method).
251 |         ProviderError: If the LLM provider fails during generation.
252 |     """
253 |     if not knowledge_base_name:
254 |         raise ToolInputError("Knowledge base name cannot be empty.")
255 |     if not documents or not isinstance(documents, list) or not all(isinstance(d, str) for d in documents):
256 |         raise ToolInputError("'documents' must be a non-empty list of strings.")
257 |     if metadatas and (not isinstance(metadatas, list) or len(metadatas) != len(documents)):
258 |         raise ToolInputError("'metadatas', if provided, must be a list with the same length as 'documents'.")
259 |     if chunk_method not in ["semantic", "token", "sentence", "character", "paragraph"]: # Added more methods
260 |          raise ToolInputError(f"Invalid chunk_method: '{chunk_method}'. Must be one of: semantic, token, sentence, character, paragraph.")
261 | 
262 |     kb_manager = _get_kb_manager()
263 |     try:
264 |         result = await kb_manager.add_documents(
265 |             knowledge_base_name=knowledge_base_name,
266 |             documents=documents,
267 |             metadatas=metadatas,
268 |             chunk_size=chunk_size,
269 |             chunk_overlap=chunk_overlap,
270 |             chunk_method=chunk_method,
271 |             embedding_model=embedding_model
272 |         )
273 |         return result
274 |     except Exception as e:
275 |         logger.error(f"Failed to add documents to knowledge base '{knowledge_base_name}': {e}", exc_info=True)
276 |         if isinstance(e, (ResourceError, ToolInputError, ProviderError)):
277 |             raise
278 |         raise ResourceError(f"Failed to add documents to knowledge base '{knowledge_base_name}': {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e
279 | 
280 | @with_tool_metrics
281 | @with_error_handling
282 | async def retrieve_context(
283 |     knowledge_base_name: str,
284 |     query: str,
285 |     top_k: int = 5,
286 |     retrieval_method: str = "vector",
287 |     min_score: Optional[float] = None, # Changed default to None
288 |     metadata_filter: Optional[Dict[str, Any]] = None
289 | ) -> Dict[str, Any]:
290 |     """Retrieves relevant document chunks (context) from a knowledge base based on a query.
291 | 
292 |     Searches the specified knowledge base for chunks semantically similar to the query.
293 | 
294 |     Args:
295 |         knowledge_base_name: The name of the knowledge base to query.
296 |         query: The text query to search for relevant context.
297 |         top_k: (Optional) The maximum number of relevant chunks to retrieve. Defaults to 5.
298 |         retrieval_method: (Optional) The method used for retrieval.
299 |                           Options: "vector" (semantic similarity search), "hybrid" (combines vector search
300 |                           with keyword-based search, may require specific backend support).
301 |                           Defaults to "vector".
302 |         min_score: (Optional) The minimum similarity score (typically between 0 and 1) for a chunk
303 |                    to be included in the results. Higher values mean stricter relevance.
304 |                    If None (default), the backend decides or no filtering is applied.
305 |         metadata_filter: (Optional) A dictionary used to filter results based on metadata associated
306 |                          with the chunks during `add_documents`. Filters use exact matches.
307 |                          Example: {"source": "filename.pdf", "page": 5}
308 |                          Example: {"author": "Alice"}
309 |                          Defaults to None (no metadata filtering).
310 | 
311 |     Returns:
312 |         A dictionary containing the retrieved context:
313 |         {
314 |             "success": true,
315 |             "query": "What are the project goals?",
316 |             "knowledge_base_name": "project_docs_v1",
317 |             "retrieved_chunks": [
318 |                 {
319 |                     "content": "The main goal of Project X is to improve user engagement...",
320 |                     "score": 0.85,
321 |                     "metadata": {"source": "project_plan.docx", "page": 1}
322 |                 },
323 |                 { ... } # Other relevant chunks
324 |             ]
325 |         }
326 |         or an error dictionary:
327 |         {
328 |             "success": false,
329 |             "knowledge_base_name": "project_docs_v1",
330 |             "error": "Knowledge base 'project_docs_v1' not found."
331 |         }
332 | 
333 |     Raises:
334 |         ResourceError: If the knowledge base doesn't exist or retrieval fails.
335 |         ToolInputError: If inputs are invalid (e.g., invalid retrieval_method).
336 |     """
337 |     if not knowledge_base_name:
338 |         raise ToolInputError("Knowledge base name cannot be empty.")
339 |     if not query or not isinstance(query, str):
340 |         raise ToolInputError("Query must be a non-empty string.")
341 |     if retrieval_method not in ["vector", "hybrid"]: # Add more methods if supported by backend
342 |         raise ToolInputError(f"Invalid retrieval_method: '{retrieval_method}'. Must be one of: vector, hybrid.")
343 | 
344 |     kb_retriever = _get_kb_retriever()
345 |     try:
346 |         # Note: The actual implementation might vary based on the retriever service
347 |         # Here we assume the service handles different methods via parameters or distinct functions
348 |         # Keeping the previous logic structure for now.
349 |         if retrieval_method == "hybrid":
350 |             # Assuming a specific hybrid method exists or the main retrieve handles it
351 |             # This might need adjustment based on the actual service implementation
352 |             logger.debug(f"Attempting hybrid retrieval for '{knowledge_base_name}'")
353 |             result = await kb_retriever.retrieve_hybrid( # Or potentially kb_retriever.retrieve with a method flag
354 |                 knowledge_base_name=knowledge_base_name,
355 |                 query=query,
356 |                 top_k=top_k,
357 |                 min_score=min_score,
358 |                 metadata_filter=metadata_filter
359 |             )
360 |         else: # Default to vector
361 |             logger.debug(f"Attempting vector retrieval for '{knowledge_base_name}'")
362 |             result = await kb_retriever.retrieve(
363 |                 knowledge_base_name=knowledge_base_name,
364 |                 query=query,
365 |                 top_k=top_k,
366 |                 rerank=True, # Assuming rerank is often desired for vector
367 |                 min_score=min_score,
368 |                 metadata_filter=metadata_filter
369 |             )
370 |         return result
371 |     except Exception as e:
372 |         logger.error(f"Failed to retrieve context from knowledge base '{knowledge_base_name}' for query '{query}': {e}", exc_info=True)
373 |         if isinstance(e, (ResourceError, ToolInputError)):
374 |             raise
375 |         raise ResourceError(f"Failed to retrieve context from knowledge base '{knowledge_base_name}': {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e
376 | 
377 | @with_tool_metrics
378 | @with_error_handling
379 | async def generate_with_rag(
380 |     knowledge_base_name: str,
381 |     query: str,
382 |     provider: Optional[str] = None,
383 |     model: Optional[str] = None,
384 |     template: str = "rag_default",
385 |     max_tokens: int = 1000,
386 |     temperature: float = 0.3,
387 |     top_k: int = 5,
388 |     retrieval_method: str = "vector",
389 |     min_score: Optional[float] = None, # Changed default to None
390 |     include_sources: bool = True
391 | ) -> Dict[str, Any]:
392 |     """Generates a response to a query using context retrieved from a knowledge base (RAG).
393 | 
394 |     This function first retrieves relevant document chunks using `retrieve_context` parameters,
395 |     then feeds the query and the retrieved context into an LLM using a specified prompt template
396 |     to generate a final, context-aware answer.
397 | 
398 |     Args:
399 |         knowledge_base_name: The name of the knowledge base to retrieve context from.
400 |         query: The user's query or question to be answered.
401 |         provider: (Optional) The LLM provider for the generation step (e.g., "openai", "anthropic").
402 |                   If None, the RAG engine selects a default provider.
403 |         model: (Optional) The specific LLM model ID for generation (e.g., "openai/gpt-4.1-mini").
404 |                If None, the RAG engine selects a default model.
405 |         template: (Optional) The name of the prompt template to use for combining the query and context.
406 |                   Available templates might include: "rag_default" (standard Q&A), "rag_with_sources"
407 |                   (default, includes source attribution), "rag_summarize" (summarizes retrieved context
408 |                   based on query), "rag_analysis" (performs analysis based on context).
409 |                   Defaults to "rag_default" (or potentially "rag_with_sources" depending on engine default).
410 |         max_tokens: (Optional) Maximum number of tokens for the generated LLM response. Defaults to 1000.
411 |         temperature: (Optional) Sampling temperature for the LLM generation (0.0 to 1.0). Lower values
412 |                      are more deterministic, higher values more creative. Defaults to 0.3.
413 |         top_k: (Optional) Maximum number of context chunks to retrieve (passed to retrieval). Defaults to 5.
414 |         retrieval_method: (Optional) Method for retrieving context ("vector", "hybrid"). Defaults to "vector".
415 |         min_score: (Optional) Minimum similarity score for retrieved chunks. Defaults to None.
416 |         include_sources: (Optional) Whether the final response object should explicitly include details
417 |                          of the source chunks used for generation. Defaults to True.
418 | 
419 |     Returns:
420 |         A dictionary containing the generated response and related information:
421 |         {
422 |             "success": true,
423 |             "query": "What are the project goals?",
424 |             "knowledge_base_name": "project_docs_v1",
425 |             "generated_response": "The main goal of Project X is to improve user engagement by implementing features A, B, and C.",
426 |             "sources": [ # Included if include_sources=True
427 |                 {
428 |                     "content": "The main goal of Project X is to improve user engagement...",
429 |                     "score": 0.85,
430 |                     "metadata": {"source": "project_plan.docx", "page": 1}
431 |                 },
432 |                 { ... } # Other source chunks used
433 |             ],
434 |             "model": "openai/gpt-4.1-mini", # Actual model used
435 |             "provider": "openai",
436 |             "tokens": { "input": ..., "output": ..., "total": ... }, # Generation tokens
437 |             "cost": 0.000120,
438 |             "processing_time": 5.2,
439 |             "retrieval_time": 0.8, # Time spent only on retrieval
440 |             "generation_time": 4.4 # Time spent only on generation
441 |         }
442 |         or an error dictionary:
443 |         {
444 |             "success": false,
445 |             "knowledge_base_name": "project_docs_v1",
446 |             "error": "RAG generation failed: Knowledge base 'project_docs_v1' not found."
447 |         }
448 | 
449 |     Raises:
450 |         ResourceError: If the knowledge base doesn't exist or retrieval fails.
451 |         ProviderError: If the LLM provider fails during generation.
452 |         ToolInputError: If inputs are invalid.
453 |     """
454 |     if not knowledge_base_name:
455 |         raise ToolInputError("Knowledge base name cannot be empty.")
456 |     if not query or not isinstance(query, str):
457 |         raise ToolInputError("Query must be a non-empty string.")
458 | 
459 |     rag_engine = _get_rag_engine()
460 |     try:
461 |         result = await rag_engine.generate_with_rag(
462 |             knowledge_base_name=knowledge_base_name,
463 |             query=query,
464 |             provider=provider,
465 |             model=model,
466 |             template=template,
467 |             max_tokens=max_tokens,
468 |             temperature=temperature,
469 |             top_k=top_k,
470 |             retrieval_method=retrieval_method,
471 |             min_score=min_score,
472 |             include_sources=include_sources
473 |         )
474 |         return result
475 |     except Exception as e:
476 |         logger.error(f"RAG generation failed for query on '{knowledge_base_name}': {e}", exc_info=True)
477 |         if isinstance(e, (ResourceError, ProviderError, ToolInputError)): 
478 |             raise
479 |         # Wrap generic errors
480 |         raise ResourceError(f"RAG generation failed: {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/prompts/templates.py:
--------------------------------------------------------------------------------

```python
  1 | """Prompt template management and rendering for Ultimate MCP Server."""
  2 | import json
  3 | import re
  4 | from enum import Enum
  5 | from functools import lru_cache
  6 | from pathlib import Path
  7 | from typing import Any, Dict, List, Optional, Set, Tuple, Union
  8 | 
  9 | from jinja2 import Environment, FileSystemLoader, Template, select_autoescape
 10 | 
 11 | from ultimate_mcp_server.constants import Provider
 12 | from ultimate_mcp_server.services.prompts.repository import get_prompt_repository
 13 | from ultimate_mcp_server.utils import get_logger
 14 | 
 15 | logger = get_logger(__name__)
 16 | 
 17 | 
 18 | class TemplateFormat(str, Enum):
 19 |     """Template format options."""
 20 |     JINJA = "jinja"
 21 |     SIMPLE = "simple"
 22 |     MARKDOWN = "markdown"
 23 |     JSON = "json"
 24 | 
 25 | 
 26 | class TemplateType(str, Enum):
 27 |     """Template type options."""
 28 |     COMPLETION = "completion"
 29 |     CHAT = "chat"
 30 |     SYSTEM = "system"
 31 |     USER = "user"
 32 |     FUNCTION = "function"
 33 |     EXTRACTION = "extraction"
 34 | 
 35 | 
 36 | class PromptTemplate:
 37 |     """Template for generating prompts for LLM providers."""
 38 |     
 39 |     def __init__(
 40 |         self,
 41 |         template: str,
 42 |         template_id: str,
 43 |         format: Union[str, TemplateFormat] = TemplateFormat.JINJA,
 44 |         type: Union[str, TemplateType] = TemplateType.COMPLETION,
 45 |         metadata: Optional[Dict[str, Any]] = None,
 46 |         provider_defaults: Optional[Dict[str, Any]] = None,
 47 |         description: Optional[str] = None,
 48 |         required_vars: Optional[List[str]] = None,
 49 |         example_vars: Optional[Dict[str, Any]] = None,
 50 |     ):
 51 |         """Initialize a prompt template.
 52 |         
 53 |         Args:
 54 |             template: Template string
 55 |             template_id: Unique identifier for this template
 56 |             format: Template format (jinja, simple, markdown, or json)
 57 |             type: Template type (completion, chat, system, user, function, extraction)
 58 |             metadata: Optional metadata for the template
 59 |             provider_defaults: Optional provider-specific defaults
 60 |             description: Optional description of the template
 61 |             required_vars: Optional list of required variables
 62 |             example_vars: Optional example variables for testing
 63 |         """
 64 |         self.template = template
 65 |         self.template_id = template_id
 66 |         
 67 |         # Normalize format and type to enum values
 68 |         self.format = TemplateFormat(format) if isinstance(format, str) else format
 69 |         self.type = TemplateType(type) if isinstance(type, str) else type
 70 |         
 71 |         # Store additional attributes
 72 |         self.metadata = metadata or {}
 73 |         self.provider_defaults = provider_defaults or {}
 74 |         self.description = description
 75 |         self.example_vars = example_vars or {}
 76 |         
 77 |         # Extract required variables based on format
 78 |         self.required_vars = required_vars or self._extract_required_vars()
 79 |         
 80 |         # Compile template if using Jinja format
 81 |         self._compiled_template: Optional[Template] = None
 82 |         if self.format == TemplateFormat.JINJA:
 83 |             self._compiled_template = self._compile_template()
 84 |     
 85 |     def _extract_required_vars(self) -> List[str]:
 86 |         """Extract required variables from template based on format.
 87 |         
 88 |         Returns:
 89 |             List of required variable names
 90 |         """
 91 |         if self.format == TemplateFormat.JINJA:
 92 |             # Extract variables using regex for basic Jinja pattern
 93 |             matches = re.findall(r'{{(.*?)}}', self.template)
 94 |             vars_set: Set[str] = set()
 95 |             
 96 |             for match in matches:
 97 |                 # Extract variable name (removing filters and whitespace)
 98 |                 var_name = match.split('|')[0].strip()
 99 |                 if var_name and not var_name.startswith('_'):
100 |                     vars_set.add(var_name)
101 |                     
102 |             return sorted(list(vars_set))
103 |             
104 |         elif self.format == TemplateFormat.SIMPLE:
105 |             # Extract variables from {variable} format
106 |             matches = re.findall(r'{([^{}]*)}', self.template)
107 |             return sorted(list(set(matches)))
108 |             
109 |         elif self.format == TemplateFormat.JSON:
110 |             # Try to find JSON template variables
111 |             try:
112 |                 # Parse as JSON to find potential variables
113 |                 template_dict = json.loads(self.template)
114 |                 return self._extract_json_vars(template_dict)
115 |             except json.JSONDecodeError:
116 |                 logger.warning(
117 |                     f"Failed to parse JSON template: {self.template_id}",
118 |                     emoji_key="warning"
119 |                 )
120 |                 return []
121 |                 
122 |         # Default: no variables detected
123 |         return []
124 |     
125 |     def _extract_json_vars(self, obj: Any, prefix: str = "") -> List[str]:
126 |         """Recursively extract variables from a JSON object.
127 |         
128 |         Args:
129 |             obj: JSON object to extract variables from
130 |             prefix: Prefix for nested variables
131 |             
132 |         Returns:
133 |             List of variable names
134 |         """
135 |         vars_list = []
136 |         
137 |         if isinstance(obj, dict):
138 |             for key, value in obj.items():
139 |                 if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
140 |                     # This is a variable placeholder
141 |                     var_name = value[2:-1]  # Remove ${ and }
142 |                     vars_list.append(f"{prefix}{var_name}")
143 |                 elif isinstance(value, (dict, list)):
144 |                     # Recursively extract from nested structures
145 |                     nested_prefix = f"{prefix}{key}." if prefix else f"{key}."
146 |                     vars_list.extend(self._extract_json_vars(value, nested_prefix))
147 |         elif isinstance(obj, list):
148 |             for _i, item in enumerate(obj):
149 |                 if isinstance(item, (dict, list)):
150 |                     vars_list.extend(self._extract_json_vars(item, prefix))
151 |                 elif isinstance(item, str) and item.startswith("${") and item.endswith("}"):
152 |                     var_name = item[2:-1]
153 |                     vars_list.append(f"{prefix}{var_name}")
154 |         
155 |         return sorted(list(set(vars_list)))
156 |     
157 |     def _compile_template(self) -> Template:
158 |         """Compile the Jinja template.
159 |         
160 |         Returns:
161 |             Compiled Jinja template
162 |             
163 |         Raises:
164 |             ValueError: If template compilation fails
165 |         """
166 |         try:
167 |             env = Environment(autoescape=select_autoescape(['html', 'xml']))
168 |             return env.from_string(self.template)
169 |         except Exception as e:
170 |             logger.error(
171 |                 f"Failed to compile template {self.template_id}: {str(e)}",
172 |                 emoji_key="error"
173 |             )
174 |             raise ValueError(f"Invalid template format: {str(e)}") from e
175 |     
176 |     def render(self, variables: Dict[str, Any]) -> str:
177 |         """Render the template with the provided variables.
178 |         
179 |         Args:
180 |             variables: Dictionary of variables to render with
181 |             
182 |         Returns:
183 |             Rendered template string
184 |             
185 |         Raises:
186 |             ValueError: If required variables are missing
187 |         """
188 |         # Check for required variables
189 |         missing_vars = [var for var in self.required_vars if var not in variables]
190 |         if missing_vars:
191 |             raise ValueError(
192 |                 f"Missing required variables for template {self.template_id}: {', '.join(missing_vars)}"
193 |             )
194 |         
195 |         # Render based on format
196 |         if self.format == TemplateFormat.JINJA:
197 |             if not self._compiled_template:
198 |                 self._compiled_template = self._compile_template()
199 |             return self._compiled_template.render(**variables)
200 |             
201 |         elif self.format == TemplateFormat.SIMPLE:
202 |             # Simple variable substitution with {var} syntax
203 |             result = self.template
204 |             for var_name, var_value in variables.items():
205 |                 result = result.replace(f"{{{var_name}}}", str(var_value))
206 |             return result
207 |             
208 |         elif self.format == TemplateFormat.JSON:
209 |             try:
210 |                 # Parse template as JSON
211 |                 template_dict = json.loads(self.template)
212 |                 
213 |                 # Replace variables in the JSON structure
214 |                 rendered_dict = self._render_json_vars(template_dict, variables)
215 |                 
216 |                 # Convert back to JSON string
217 |                 return json.dumps(rendered_dict)
218 |                 
219 |             except json.JSONDecodeError:
220 |                 logger.error(
221 |                     f"Failed to parse JSON template: {self.template_id}",
222 |                     emoji_key="error"
223 |                 )
224 |                 # Fall back to simple replacement
225 |                 return self.template
226 |             
227 |         elif self.format == TemplateFormat.MARKDOWN:
228 |             # Process markdown with simple variable substitution
229 |             result = self.template
230 |             for var_name, var_value in variables.items():
231 |                 result = result.replace(f"{{{var_name}}}", str(var_value))
232 |             return result
233 |             
234 |         # Default: return template as is
235 |         return self.template
236 |     
237 |     def _render_json_vars(self, obj: Any, variables: Dict[str, Any]) -> Any:
238 |         """Recursively render variables in a JSON object.
239 |         
240 |         Args:
241 |             obj: JSON object to render variables in
242 |             variables: Dictionary of variables to render with
243 |             
244 |         Returns:
245 |             Rendered JSON object
246 |         """
247 |         if isinstance(obj, dict):
248 |             return {
249 |                 key: self._render_json_vars(value, variables) 
250 |                 for key, value in obj.items()
251 |             }
252 |         elif isinstance(obj, list):
253 |             return [self._render_json_vars(item, variables) for item in obj]
254 |         elif isinstance(obj, str) and obj.startswith("${") and obj.endswith("}"):
255 |             # This is a variable placeholder
256 |             var_name = obj[2:-1]  # Remove ${ and }
257 |             # Get the variable value, or keep placeholder if not found
258 |             return variables.get(var_name, obj)
259 |         else:
260 |             return obj
261 |     
262 |     def validate_variables(self, variables: Dict[str, Any]) -> Tuple[bool, List[str]]:
263 |         """Validate that all required variables are provided.
264 |         
265 |         Args:
266 |             variables: Dictionary of variables to validate
267 |             
268 |         Returns:
269 |             Tuple of (is_valid, missing_variables)
270 |         """
271 |         missing_vars = [var for var in self.required_vars if var not in variables]
272 |         return len(missing_vars) == 0, missing_vars
273 |     
274 |     def to_dict(self) -> Dict[str, Any]:
275 |         """Convert template to dictionary representation.
276 |         
277 |         Returns:
278 |             Dictionary representation of template
279 |         """
280 |         return {
281 |             "template_id": self.template_id,
282 |             "template": self.template,
283 |             "format": self.format.value,
284 |             "type": self.type.value,
285 |             "metadata": self.metadata,
286 |             "provider_defaults": self.provider_defaults,
287 |             "description": self.description,
288 |             "required_vars": self.required_vars,
289 |             "example_vars": self.example_vars,
290 |         }
291 |     
292 |     @classmethod
293 |     def from_dict(cls, data: Dict[str, Any]) -> "PromptTemplate":
294 |         """Create a template from dictionary representation.
295 |         
296 |         Args:
297 |             data: Dictionary representation of template
298 |             
299 |         Returns:
300 |             PromptTemplate instance
301 |         """
302 |         return cls(
303 |             template=data["template"],
304 |             template_id=data["template_id"],
305 |             format=data.get("format", TemplateFormat.JINJA),
306 |             type=data.get("type", TemplateType.COMPLETION),
307 |             metadata=data.get("metadata"),
308 |             provider_defaults=data.get("provider_defaults"),
309 |             description=data.get("description"),
310 |             required_vars=data.get("required_vars"),
311 |             example_vars=data.get("example_vars"),
312 |         )
313 |     
314 |     def get_provider_defaults(self, provider: str) -> Dict[str, Any]:
315 |         """Get provider-specific default parameters.
316 |         
317 |         Args:
318 |             provider: Provider name
319 |             
320 |         Returns:
321 |             Dictionary of default parameters
322 |         """
323 |         return self.provider_defaults.get(provider, {})
324 | 
325 | 
326 | class PromptTemplateRenderer:
327 |     """Service for rendering prompt templates."""
328 |     
329 |     def __init__(self, template_dir: Optional[Union[str, Path]] = None):
330 |         """Initialize the prompt template renderer.
331 |         
332 |         Args:
333 |             template_dir: Optional directory containing template files
334 |         """
335 |         # Set template directory
336 |         if template_dir:
337 |             self.template_dir = Path(template_dir)
338 |         else:
339 |             # Default to project directory / templates
340 |             self.template_dir = Path.home() / ".ultimate" / "templates"
341 |             
342 |         # Create directory if it doesn't exist
343 |         self.template_dir.mkdir(parents=True, exist_ok=True)
344 |         
345 |         # Set up Jinja environment for file-based templates
346 |         self.jinja_env = Environment(
347 |             loader=FileSystemLoader(str(self.template_dir)),
348 |             autoescape=select_autoescape(['html', 'xml']),
349 |             trim_blocks=True,
350 |             lstrip_blocks=True,
351 |         )
352 |         
353 |         # Get prompt repository for template storage
354 |         self.repository = get_prompt_repository()
355 |         
356 |         # Template cache
357 |         self._template_cache: Dict[str, PromptTemplate] = {}
358 |     
359 |     async def get_template(self, template_id: str) -> Optional[PromptTemplate]:
360 |         """Get a template by ID.
361 |         
362 |         Args:
363 |             template_id: Template identifier
364 |             
365 |         Returns:
366 |             PromptTemplate instance or None if not found
367 |         """
368 |         # Check cache first
369 |         if template_id in self._template_cache:
370 |             return self._template_cache[template_id]
371 |             
372 |         # Look up in repository
373 |         template_data = await self.repository.get_prompt(template_id)
374 |         if template_data:
375 |             template = PromptTemplate.from_dict(template_data)
376 |             # Cache for future use
377 |             self._template_cache[template_id] = template
378 |             return template
379 |         
380 |         # Try to load from file if not in repository
381 |         template_path = self.template_dir / f"{template_id}.j2"
382 |         if template_path.exists():
383 |             # Load template from file
384 |             with open(template_path, "r", encoding="utf-8") as f:
385 |                 template_content = f.read()
386 |                 
387 |             # Create template instance
388 |             template = PromptTemplate(
389 |                 template=template_content,
390 |                 template_id=template_id,
391 |                 format=TemplateFormat.JINJA,
392 |             )
393 |             
394 |             # Cache for future use
395 |             self._template_cache[template_id] = template
396 |             return template
397 |             
398 |         return None
399 |     
400 |     async def render_template(
401 |         self,
402 |         template_id: str,
403 |         variables: Dict[str, Any],
404 |         provider: Optional[str] = None
405 |     ) -> str:
406 |         """Render a template with variables.
407 |         
408 |         Args:
409 |             template_id: Template identifier
410 |             variables: Variables to render the template with
411 |             provider: Optional provider name for provider-specific adjustments
412 |             
413 |         Returns:
414 |             Rendered template string
415 |             
416 |         Raises:
417 |             ValueError: If template not found or rendering fails
418 |         """
419 |         # Get the template
420 |         template = await self.get_template(template_id)
421 |         if not template:
422 |             raise ValueError(f"Template not found: {template_id}")
423 |             
424 |         # Check if all required variables are provided
425 |         is_valid, missing_vars = template.validate_variables(variables)
426 |         if not is_valid:
427 |             raise ValueError(
428 |                 f"Missing required variables for template {template_id}: {', '.join(missing_vars)}"
429 |             )
430 |         
431 |         # Render the template
432 |         rendered = template.render(variables)
433 |         
434 |         # Apply provider-specific adjustments if provided
435 |         if provider:
436 |             rendered = self._apply_provider_adjustments(rendered, provider, template)
437 |             
438 |         return rendered
439 |     
440 |     def _apply_provider_adjustments(
441 |         self,
442 |         rendered: str,
443 |         provider: str,
444 |         template: PromptTemplate
445 |     ) -> str:
446 |         """Apply provider-specific adjustments to rendered template.
447 |         
448 |         Args:
449 |             rendered: Rendered template string
450 |             provider: Provider name
451 |             template: Template being rendered
452 |             
453 |         Returns:
454 |             Adjusted template string
455 |         """
456 |         # Apply provider-specific transformations
457 |         if provider == Provider.ANTHROPIC.value:
458 |             # Anthropic-specific adjustments
459 |             if template.type == TemplateType.SYSTEM:
460 |                 # Ensure no trailing newlines for system prompts
461 |                 rendered = rendered.rstrip()
462 |         elif provider == Provider.OPENAI.value:
463 |             # OpenAI-specific adjustments
464 |             pass
465 |         elif provider == Provider.GEMINI.value:
466 |             # Gemini-specific adjustments
467 |             pass
468 |         
469 |         return rendered
470 |     
471 |     async def save_template(self, template: PromptTemplate) -> bool:
472 |         """Save a template to the repository.
473 |         
474 |         Args:
475 |             template: Template to save
476 |             
477 |         Returns:
478 |             True if successful
479 |         """
480 |         # Update cache
481 |         self._template_cache[template.template_id] = template
482 |         
483 |         # Save to repository
484 |         return await self.repository.save_prompt(
485 |             prompt_id=template.template_id,
486 |             prompt_data=template.to_dict()
487 |         )
488 |     
489 |     async def delete_template(self, template_id: str) -> bool:
490 |         """Delete a template from the repository.
491 |         
492 |         Args:
493 |             template_id: Template identifier
494 |             
495 |         Returns:
496 |             True if successful
497 |         """
498 |         # Remove from cache
499 |         if template_id in self._template_cache:
500 |             del self._template_cache[template_id]
501 |             
502 |         # Delete from repository
503 |         return await self.repository.delete_prompt(template_id)
504 |     
505 |     async def list_templates(self) -> List[str]:
506 |         """List available templates.
507 |         
508 |         Returns:
509 |             List of template IDs
510 |         """
511 |         # Get templates from repository
512 |         return await self.repository.list_prompts()
513 |     
514 |     def clear_cache(self) -> None:
515 |         """Clear the template cache."""
516 |         self._template_cache.clear()
517 | 
518 | 
519 | # Global template renderer instance
520 | _template_renderer: Optional[PromptTemplateRenderer] = None
521 | 
522 | 
523 | def get_template_renderer() -> PromptTemplateRenderer:
524 |     """Get the global template renderer instance.
525 |     
526 |     Returns:
527 |         PromptTemplateRenderer instance
528 |     """
529 |     global _template_renderer
530 |     if _template_renderer is None:
531 |         _template_renderer = PromptTemplateRenderer()
532 |     return _template_renderer
533 | 
534 | 
535 | @lru_cache(maxsize=32)
536 | def get_template_path(template_id: str) -> Optional[Path]:
537 |     """Get the path to a template file.
538 |     
539 |     Args:
540 |         template_id: Template identifier
541 |         
542 |     Returns:
543 |         Path to template file or None if not found
544 |     """
545 |     # Try standard locations
546 |     template_dirs = [
547 |         # First check the user's template directory
548 |         Path.home() / ".ultimate" / "templates",
549 |         # Then check the package's template directory
550 |         Path(__file__).parent.parent.parent.parent / "templates",
551 |     ]
552 |     
553 |     for template_dir in template_dirs:
554 |         # Check for .j2 extension first
555 |         template_path = template_dir / f"{template_id}.j2"
556 |         if template_path.exists():
557 |             return template_path
558 |             
559 |         # Check for .tpl extension
560 |         template_path = template_dir / f"{template_id}.tpl"
561 |         if template_path.exists():
562 |             return template_path
563 |             
564 |         # Check for .md extension
565 |         template_path = template_dir / f"{template_id}.md"
566 |         if template_path.exists():
567 |             return template_path
568 |             
569 |         # Check for .json extension
570 |         template_path = template_dir / f"{template_id}.json"
571 |         if template_path.exists():
572 |             return template_path
573 |     
574 |     return None
575 | 
576 | 
577 | async def render_prompt_template(
578 |     template_id: str,
579 |     variables: Dict[str, Any],
580 |     provider: Optional[str] = None
581 | ) -> str:
582 |     """Render a prompt template.
583 |     
584 |     Args:
585 |         template_id: Template identifier
586 |         variables: Variables to render the template with
587 |         provider: Optional provider name for provider-specific adjustments
588 |         
589 |     Returns:
590 |         Rendered template string
591 |         
592 |     Raises:
593 |         ValueError: If template not found or rendering fails
594 |     """
595 |     renderer = get_template_renderer()
596 |     return await renderer.render_template(template_id, variables, provider)
597 | 
598 | 
599 | async def render_prompt(
600 |     template_content: str,
601 |     variables: Dict[str, Any],
602 |     format: Union[str, TemplateFormat] = TemplateFormat.JINJA,
603 | ) -> str:
604 |     """Render a prompt from template content.
605 |     
606 |     Args:
607 |         template_content: Template content string
608 |         variables: Variables to render the template with
609 |         format: Template format
610 |         
611 |     Returns:
612 |         Rendered template string
613 |         
614 |     Raises:
615 |         ValueError: If rendering fails
616 |     """
617 |     # Create a temporary template
618 |     template = PromptTemplate(
619 |         template=template_content,
620 |         template_id="_temp_template",
621 |         format=format,
622 |     )
623 |     
624 |     # Render and return
625 |     return template.render(variables)
626 | 
627 | 
628 | async def get_template_defaults(
629 |     template_id: str,
630 |     provider: str
631 | ) -> Dict[str, Any]:
632 |     """Get provider-specific default parameters for a template.
633 |     
634 |     Args:
635 |         template_id: Template identifier
636 |         provider: Provider name
637 |         
638 |     Returns:
639 |         Dictionary of default parameters
640 |         
641 |     Raises:
642 |         ValueError: If template not found
643 |     """
644 |     renderer = get_template_renderer()
645 |     template = await renderer.get_template(template_id)
646 |     if not template:
647 |         raise ValueError(f"Template not found: {template_id}")
648 |         
649 |     return template.get_provider_defaults(provider)
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/openrouter.py:
--------------------------------------------------------------------------------

```python
  1 | # ultimate/core/providers/openrouter.py
  2 | """OpenRouter provider implementation."""
  3 | import os
  4 | import time
  5 | from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union
  6 | 
  7 | from openai import AsyncOpenAI
  8 | 
  9 | from ultimate_mcp_server.config import get_config
 10 | from ultimate_mcp_server.constants import DEFAULT_MODELS, Provider
 11 | from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
 12 | from ultimate_mcp_server.utils import get_logger
 13 | 
 14 | # Use the same naming scheme everywhere: logger at module level
 15 | logger = get_logger("ultimate_mcp_server.providers.openrouter")
 16 | 
 17 | # Default OpenRouter Base URL (can be overridden by config)
 18 | DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
 19 | 
 20 | class OpenRouterProvider(BaseProvider):
 21 |     """Provider implementation for OpenRouter API (using OpenAI-compatible interface)."""
 22 | 
 23 |     provider_name = Provider.OPENROUTER.value
 24 | 
 25 |     def __init__(self, **kwargs):
 26 |         """Initialize the OpenRouter provider.
 27 | 
 28 |         Args:
 29 |             **kwargs: Additional options:
 30 |                 - base_url (str): Override the default OpenRouter API base URL.
 31 |                 - http_referer (str): Optional HTTP-Referer header.
 32 |                 - x_title (str): Optional X-Title header.
 33 |         """
 34 |         config = get_config().providers.openrouter
 35 |         super().__init__(**kwargs)
 36 |         self.name = "openrouter"
 37 |         
 38 |         # Use config default first, then fallback to constants
 39 |         self.default_model = config.default_model or DEFAULT_MODELS.get(Provider.OPENROUTER)
 40 |         if not config.default_model:
 41 |             logger.debug(f"No default model set in config for OpenRouter, using fallback from constants: {self.default_model}")
 42 | 
 43 |         # Get base_url from config, fallback to kwargs, then constant
 44 |         self.base_url = config.base_url or kwargs.get("base_url", DEFAULT_OPENROUTER_BASE_URL)
 45 | 
 46 |         # Get additional headers from config's additional_params
 47 |         self.http_referer = config.additional_params.get("http_referer") or kwargs.get("http_referer")
 48 |         self.x_title = config.additional_params.get("x_title") or kwargs.get("x_title")
 49 |         
 50 |         # We'll create the client in initialize() instead
 51 |         self.client = None
 52 |         self.available_models = []
 53 |     
 54 |     async def initialize(self) -> bool:
 55 |         """Initialize the OpenRouter client.
 56 |         
 57 |         Returns:
 58 |             bool: True if initialization was successful
 59 |         """
 60 |         try:
 61 |             # Create headers dictionary
 62 |             headers = {}
 63 |             if self.http_referer:
 64 |                 headers["HTTP-Referer"] = self.http_referer
 65 |             if self.x_title:
 66 |                 headers["X-Title"] = self.x_title
 67 |             
 68 |             # Get timeout from config
 69 |             config = get_config().providers.openrouter
 70 |             timeout = config.timeout or 30.0  # Default timeout 30s
 71 |             
 72 |             # Check if API key is available
 73 |             if not self.api_key:
 74 |                 logger.warning(f"{self.name} API key not found in configuration. Provider will be unavailable.")
 75 |                 return False
 76 |                 
 77 |             # Create the client
 78 |             self.client = AsyncOpenAI(
 79 |                 base_url=self.base_url,
 80 |                 api_key=self.api_key,
 81 |                 default_headers=headers,
 82 |                 timeout=timeout
 83 |             )
 84 |             
 85 |             # Pre-fetch available models
 86 |             try:
 87 |                 self.available_models = await self.list_models()
 88 |                 logger.info(f"Loaded {len(self.available_models)} models from OpenRouter")
 89 |             except Exception as model_err:
 90 |                 logger.warning(f"Failed to fetch models from OpenRouter: {str(model_err)}")
 91 |                 # Use hardcoded fallback models
 92 |                 self.available_models = self._get_fallback_models()
 93 |             
 94 |             logger.success(
 95 |                 "OpenRouter provider initialized successfully", 
 96 |                 emoji_key="provider"
 97 |             )
 98 |             return True
 99 |             
100 |         except Exception as e:
101 |             logger.error(
102 |                 f"Failed to initialize OpenRouter provider: {str(e)}", 
103 |                 emoji_key="error"
104 |             )
105 |             return False
106 | 
107 |     def _initialize_client(self, **kwargs):
108 |         """Initialize the OpenAI async client with OpenRouter specifics."""
109 |         # This method is now deprecated - use initialize() instead
110 |         logger.warning("_initialize_client() is deprecated, use initialize() instead")
111 |         return False
112 | 
113 |     async def generate_completion(
114 |         self,
115 |         prompt: str,
116 |         model: Optional[str] = None,
117 |         max_tokens: Optional[int] = None,
118 |         temperature: float = 0.7,
119 |         **kwargs
120 |     ) -> ModelResponse:
121 |         """Generate a completion using OpenRouter.
122 | 
123 |         Args:
124 |             prompt: Text prompt to send to the model
125 |             model: Model name (e.g., "openai/gpt-4.1-mini", "google/gemini-flash-1.5")
126 |             max_tokens: Maximum tokens to generate
127 |             temperature: Temperature parameter (0.0-1.0)
128 |             **kwargs: Additional model-specific parameters, including:
129 |                 - extra_headers (Dict): Additional headers for this specific call.
130 |                 - extra_body (Dict): OpenRouter-specific arguments.
131 | 
132 |         Returns:
133 |             ModelResponse with completion result
134 | 
135 |         Raises:
136 |             Exception: If API call fails
137 |         """
138 |         if not self.client:
139 |             initialized = await self._initialize_client()
140 |             if not initialized:
141 |                 raise RuntimeError(f"{self.provider_name} provider not initialized.")
142 | 
143 |         # Use default model if not specified
144 |         model = model or self.default_model
145 | 
146 |         # Ensure we have a model name before proceeding
147 |         if model is None:
148 |             logger.error("Completion failed: No model specified and no default model configured for OpenRouter.")
149 |             raise ValueError("No model specified and no default model configured for OpenRouter.")
150 | 
151 |         # Strip provider prefix only if it matches OUR provider name
152 |         if model.startswith(f"{self.provider_name}:"):
153 |             model = model.split(":", 1)[1]
154 |             logger.debug(f"Stripped provider prefix from model name: {model}")
155 |         # Note: Keep prefixes like 'openai/' or 'google/' as OpenRouter uses them.
156 |         # DO NOT strip other provider prefixes as they're needed for OpenRouter routing
157 | 
158 |         # Create messages
159 |         messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
160 | 
161 |         # Prepare API call parameters
162 |         params = {
163 |             "model": model,
164 |             "messages": messages,
165 |             "temperature": temperature,
166 |         }
167 | 
168 |         if max_tokens is not None:
169 |             params["max_tokens"] = max_tokens
170 | 
171 |         # Extract OpenRouter specific args from kwargs
172 |         extra_headers = kwargs.pop("extra_headers", {})
173 |         extra_body = kwargs.pop("extra_body", {})
174 | 
175 |         json_mode = kwargs.pop("json_mode", False)
176 |         if json_mode:
177 |             # OpenRouter uses OpenAI-compatible API
178 |             params["response_format"] = {"type": "json_object"}
179 |             self.logger.debug("Setting response_format to JSON mode for OpenRouter")
180 | 
181 |         # Add any remaining kwargs to the main params (standard OpenAI args)
182 |         params.update(kwargs)
183 | 
184 |         self.logger.info(
185 |             f"Generating completion with {self.provider_name} model {model}",
186 |             emoji_key=self.provider_name,
187 |             prompt_length=len(prompt),
188 |             json_mode_requested=json_mode
189 |         )
190 | 
191 |         try:
192 |             # Make API call with timing
193 |             response, processing_time = await self.process_with_timer(
194 |                 self.client.chat.completions.create, **params, extra_headers=extra_headers, extra_body=extra_body
195 |             )
196 | 
197 |             # Extract response text
198 |             completion_text = response.choices[0].message.content
199 | 
200 |             # Create standardized response
201 |             result = ModelResponse(
202 |                 text=completion_text,
203 |                 model=response.model, # Use model returned by API
204 |                 provider=self.provider_name,
205 |                 input_tokens=response.usage.prompt_tokens,
206 |                 output_tokens=response.usage.completion_tokens,
207 |                 total_tokens=response.usage.total_tokens,
208 |                 processing_time=processing_time,
209 |                 raw_response=response,
210 |             )
211 | 
212 |             self.logger.success(
213 |                 f"{self.provider_name} completion successful",
214 |                 emoji_key="success",
215 |                 model=result.model,
216 |                 tokens={
217 |                     "input": result.input_tokens,
218 |                     "output": result.output_tokens
219 |                 },
220 |                 cost=result.cost, # Will be calculated by ModelResponse
221 |                 time=result.processing_time
222 |             )
223 | 
224 |             return result
225 | 
226 |         except Exception as e:
227 |             self.logger.error(
228 |                 f"{self.provider_name} completion failed for model {model}: {str(e)}",
229 |                 emoji_key="error",
230 |                 model=model
231 |             )
232 |             raise
233 | 
234 |     async def generate_completion_stream(
235 |         self,
236 |         prompt: str,
237 |         model: Optional[str] = None,
238 |         max_tokens: Optional[int] = None,
239 |         temperature: float = 0.7,
240 |         **kwargs
241 |     ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
242 |         """Generate a streaming completion using OpenRouter.
243 | 
244 |         Args:
245 |             prompt: Text prompt to send to the model
246 |             model: Model name (e.g., "openai/gpt-4.1-mini")
247 |             max_tokens: Maximum tokens to generate
248 |             temperature: Temperature parameter (0.0-1.0)
249 |             **kwargs: Additional model-specific parameters, including:
250 |                 - extra_headers (Dict): Additional headers for this specific call.
251 |                 - extra_body (Dict): OpenRouter-specific arguments.
252 | 
253 |         Yields:
254 |             Tuple of (text_chunk, metadata)
255 | 
256 |         Raises:
257 |             Exception: If API call fails
258 |         """
259 |         if not self.client:
260 |             initialized = await self._initialize_client()
261 |             if not initialized:
262 |                 raise RuntimeError(f"{self.provider_name} provider not initialized.")
263 | 
264 |         model = model or self.default_model
265 |         if model.startswith(f"{self.provider_name}:"):
266 |             model = model.split(":", 1)[1]
267 |         # DO NOT strip other provider prefixes as they're needed for OpenRouter routing
268 | 
269 |         messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
270 | 
271 |         params = {
272 |             "model": model,
273 |             "messages": messages,
274 |             "temperature": temperature,
275 |             "stream": True,
276 |         }
277 |         if max_tokens is not None:
278 |             params["max_tokens"] = max_tokens
279 | 
280 |         extra_headers = kwargs.pop("extra_headers", {})
281 |         extra_body = kwargs.pop("extra_body", {})
282 | 
283 |         json_mode = kwargs.pop("json_mode", False)
284 |         if json_mode:
285 |             # OpenRouter uses OpenAI-compatible API
286 |             params["response_format"] = {"type": "json_object"}
287 |             self.logger.debug("Setting response_format to JSON mode for OpenRouter streaming")
288 | 
289 |         params.update(kwargs)
290 | 
291 |         self.logger.info(
292 |             f"Generating streaming completion with {self.provider_name} model {model}",
293 |             emoji_key=self.provider_name,
294 |             prompt_length=len(prompt),
295 |             json_mode_requested=json_mode
296 |         )
297 | 
298 |         start_time = time.time()
299 |         total_chunks = 0
300 |         final_model_name = model # Store initially requested model
301 | 
302 |         try:
303 |             stream = await self.client.chat.completions.create(**params, extra_headers=extra_headers, extra_body=extra_body)
304 | 
305 |             async for chunk in stream:
306 |                 total_chunks += 1
307 |                 delta = chunk.choices[0].delta
308 |                 content = delta.content or ""
309 | 
310 |                 # Try to get model name from the chunk if available (some providers include it)
311 |                 if chunk.model:
312 |                     final_model_name = chunk.model
313 | 
314 |                 metadata = {
315 |                     "model": final_model_name,
316 |                     "provider": self.provider_name,
317 |                     "chunk_index": total_chunks,
318 |                     "finish_reason": chunk.choices[0].finish_reason,
319 |                 }
320 | 
321 |                 yield content, metadata
322 | 
323 |             processing_time = time.time() - start_time
324 |             self.logger.success(
325 |                 f"{self.provider_name} streaming completion successful",
326 |                 emoji_key="success",
327 |                 model=final_model_name,
328 |                 chunks=total_chunks,
329 |                 time=processing_time
330 |             )
331 | 
332 |         except Exception as e:
333 |             self.logger.error(
334 |                 f"{self.provider_name} streaming completion failed for model {model}: {str(e)}",
335 |                 emoji_key="error",
336 |                 model=model
337 |             )
338 |             raise
339 | 
340 |     async def list_models(self) -> List[Dict[str, Any]]:
341 |         """List available OpenRouter models (provides examples, not exhaustive).
342 | 
343 |         OpenRouter offers a vast number of models. This list provides common examples.
344 |         Refer to OpenRouter documentation for the full list.
345 | 
346 |         Returns:
347 |             List of example model information dictionaries
348 |         """
349 |         if self.available_models:
350 |             return self.available_models
351 |         models = self._get_fallback_models()
352 |         return models
353 | 
354 |     def get_default_model(self) -> str:
355 |         """Get the default OpenRouter model.
356 | 
357 |         Returns:
358 |             Default model name (e.g., "openai/gpt-4.1-mini")
359 |         """
360 |         # Allow override via environment variable
361 |         default_model_env = os.environ.get("OPENROUTER_DEFAULT_MODEL")
362 |         if default_model_env:
363 |             return default_model_env
364 | 
365 |         # Fallback to constants
366 |         return DEFAULT_MODELS.get(self.provider_name, "openai/gpt-4.1-mini")
367 | 
368 |     async def check_api_key(self) -> bool:
369 |         """Check if the OpenRouter API key is valid by attempting a small request."""
370 |         if not self.client:
371 |             # Try to initialize if not already done
372 |             if not await self._initialize_client():
373 |                 return False # Initialization failed
374 | 
375 |         try:
376 |             # Attempt a simple, low-cost operation, e.g., list models (even if it returns 404/permission error, it validates the key/URL)
377 |             # Or use a very small completion request
378 |             await self.client.chat.completions.create(
379 |                 model=self.get_default_model(),
380 |                 messages=[{"role": "user", "content": "test"}],
381 |                 max_tokens=1,
382 |                 temperature=0
383 |             )
384 |             return True
385 |         except Exception as e:
386 |             logger.warning(f"API key check failed for {self.provider_name}: {str(e)}", emoji_key="warning")
387 |             return False
388 | 
389 |     def get_available_models(self) -> List[str]:
390 |         """Return a list of available model names."""
391 |         return [model["id"] for model in self.available_models]
392 | 
393 |     def is_model_available(self, model_name: str) -> bool:
394 |         """Check if a specific model is available."""
395 |         available_model_ids = [model["id"] for model in self.available_models]
396 |         return model_name in available_model_ids
397 | 
398 |     async def create_completion(self, model: str, messages: List[Dict[str, str]], stream: bool = False, **kwargs) -> Union[str, AsyncGenerator[str, None]]:
399 |         """Create a completion using the specified model."""
400 |         if not self.client:
401 |             raise RuntimeError("OpenRouter client not initialized (likely missing API key).")
402 |             
403 |         # Check if model is available
404 |         if not self.is_model_available(model):
405 |             # Fallback to default if provided model isn't listed? Or raise error?
406 |             # Let's try the default model if the requested one isn't confirmed available.
407 |             if self.default_model and self.is_model_available(self.default_model):
408 |                 logger.warning(f"Model '{model}' not found in available list. Falling back to default '{self.default_model}'.")
409 |                 model = self.default_model
410 |             else:
411 |                 # If even the default isn't available or set, raise error
412 |                 raise ValueError(f"Model '{model}' is not available via OpenRouter according to fetched list, and no valid default model is set.")
413 | 
414 |         merged_kwargs = {**kwargs}
415 |         # OpenRouter uses standard OpenAI params like max_tokens, temperature, etc.
416 |         # Ensure essential params are passed
417 |         if 'max_tokens' not in merged_kwargs:
418 |             merged_kwargs['max_tokens'] = get_config().providers.openrouter.max_tokens or 1024 # Use config or default
419 | 
420 |         if stream:
421 |             logger.debug(f"Creating stream completion: Model={model}, Params={merged_kwargs}")
422 |             return self._stream_completion_generator(model, messages, **merged_kwargs)
423 |         else:
424 |             logger.debug(f"Creating completion: Model={model}, Params={merged_kwargs}")
425 |             try:
426 |                 response = await self.client.chat.completions.create(
427 |                     model=model,
428 |                     messages=messages,
429 |                     stream=False,
430 |                     **merged_kwargs
431 |                 )
432 |                 # Extract content based on OpenAI library version
433 |                 if hasattr(response, 'choices') and response.choices:
434 |                     choice = response.choices[0]
435 |                     if hasattr(choice, 'message') and hasattr(choice.message, 'content'):
436 |                         return choice.message.content or "" # Return empty string if content is None
437 |                     elif hasattr(choice, 'delta') and hasattr(choice.delta, 'content'): # Should not happen for stream=False but check
438 |                         return choice.delta.content or ""
439 |                 logger.warning("Could not extract content from OpenRouter response.")
440 |                 return "" # Return empty string if no content found
441 |             except Exception as e:
442 |                 logger.error(f"OpenRouter completion failed: {e}", exc_info=True)
443 |                 raise RuntimeError(f"OpenRouter API call failed: {e}") from e
444 | 
445 |     async def _stream_completion_generator(self, model: str, messages: List[Dict[str, str]], **kwargs) -> AsyncGenerator[str, None]:
446 |         """Async generator for streaming completions."""
447 |         if not self.client:
448 |             raise RuntimeError("OpenRouter client not initialized (likely missing API key).")
449 |         try:
450 |             stream = await self.client.chat.completions.create(
451 |                 model=model,
452 |                 messages=messages,
453 |                 stream=True,
454 |                 **kwargs
455 |             )
456 |             async for chunk in stream:
457 |                 # Extract content based on OpenAI library version
458 |                 content = ""
459 |                 if hasattr(chunk, 'choices') and chunk.choices:
460 |                      choice = chunk.choices[0]
461 |                      if hasattr(choice, 'delta') and hasattr(choice.delta, 'content'):
462 |                           content = choice.delta.content
463 |                      elif hasattr(choice, 'message') and hasattr(choice.message, 'content'): # Should not happen for stream=True
464 |                           content = choice.message.content
465 | 
466 |                 if content:
467 |                      yield content
468 |         except Exception as e:
469 |             logger.error(f"OpenRouter stream completion failed: {e}", exc_info=True)
470 |             # Depending on desired behavior, either raise or yield an error message
471 |             # yield f"Error during stream: {e}"
472 |             raise RuntimeError(f"OpenRouter API stream failed: {e}") from e
473 | 
474 |     # --- Cost Calculation (Needs OpenRouter Specific Data) ---
475 |     def get_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> Optional[float]:
476 |         """Calculate the cost of a request based on OpenRouter pricing.
477 | 
478 |         Note: Requires loading detailed model pricing info, which is not
479 |               done by default in fetch_available_models.
480 |               This is a placeholder and needs enhancement.
481 |         """
482 |         # Placeholder: Need to fetch and store detailed pricing from OpenRouter
483 |         # Example structure (needs actual data):
484 |         openrouter_pricing = {
485 |              # "model_id": {"prompt_cost_per_mtok": X, "completion_cost_per_mtok": Y},
486 |              "openai/gpt-4o": {"prompt_cost_per_mtok": 5.0, "completion_cost_per_mtok": 15.0},
487 |              "google/gemini-pro-1.5": {"prompt_cost_per_mtok": 3.5, "completion_cost_per_mtok": 10.5},
488 |              "anthropic/claude-3-opus": {"prompt_cost_per_mtok": 15.0, "completion_cost_per_mtok": 75.0},
489 |              # ... add more model costs from openrouter.ai/docs#models ...
490 |         }
491 | 
492 |         model_cost = openrouter_pricing.get(model)
493 |         if model_cost:
494 |             prompt_cost = (prompt_tokens / 1_000_000) * model_cost.get("prompt_cost_per_mtok", 0)
495 |             completion_cost = (completion_tokens / 1_000_000) * model_cost.get("completion_cost_per_mtok", 0)
496 |             return prompt_cost + completion_cost
497 |         else:
498 |             logger.warning(f"Cost calculation not available for OpenRouter model: {model}")
499 |             # Return None if cost cannot be calculated
500 |             return None
501 | 
502 |     # --- Prompt Formatting --- #
503 |     def format_prompt(self, messages: List[Dict[str, str]]) -> Any:
504 |         """Use standard list of dictionaries format for OpenRouter (like OpenAI)."""
505 |         # OpenRouter generally uses the same format as OpenAI
506 |         return messages
507 | 
508 |     def _get_fallback_models(self) -> List[Dict[str, Any]]:
509 |         """Return a list of fallback models when API is not accessible."""
510 |         return [
511 |             {
512 |                 "id": "mistralai/mistral-large",
513 |                 "provider": self.provider_name,
514 |                 "description": "Mistral: Strong open-weight model.",
515 |             },
516 |             {
517 |                 "id": "mistralai/mistral-nemo",
518 |                 "provider": self.provider_name,
519 |                 "description": "Mistral: Strong open-weight model.",
520 |             },
521 |             {
522 |                 "id": "meta-llama/llama-3-70b-instruct",
523 |                 "provider": self.provider_name,
524 |                 "description": "Meta: Powerful open-source instruction-tuned model.",
525 |             },
526 |         ]
527 | 
528 | # Make available via discovery
529 | __all__ = ["OpenRouterProvider"]
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/parsing.py:
--------------------------------------------------------------------------------

```python
  1 | """Parsing utilities for Ultimate MCP Server.
  2 | 
  3 | This module provides utility functions for parsing and processing 
  4 | results from Ultimate MCP Server operations that were previously defined in
  5 | example scripts but are now part of the library.
  6 | """
  7 | 
  8 | import json
  9 | import re
 10 | from typing import Any, Dict
 11 | 
 12 | from ultimate_mcp_server.utils import get_logger
 13 | 
 14 | # Initialize logger
 15 | logger = get_logger("ultimate_mcp_server.utils.parsing")
 16 | 
 17 | def extract_json_from_markdown(text: str) -> str:
 18 |     """Extracts a JSON string embedded within markdown code fences.
 19 | 
 20 |     Handles various markdown code block formats and edge cases:
 21 |     - Complete code blocks: ```json ... ``` or ``` ... ```
 22 |     - Alternative fence styles: ~~~json ... ~~~ 
 23 |     - Incomplete/truncated blocks with only opening fence
 24 |     - Multiple code blocks (chooses the first valid JSON)
 25 |     - Extensive JSON repair for common LLM output issues:
 26 |         - Unterminated strings
 27 |         - Trailing commas
 28 |         - Missing closing brackets
 29 |         - Unquoted keys
 30 |         - Truncated content
 31 | 
 32 |     If no valid JSON-like content is found in fences, returns the original string.
 33 | 
 34 |     Args:
 35 |         text: The input string possibly containing markdown-fenced JSON.
 36 | 
 37 |     Returns:
 38 |         The extracted JSON string or the stripped original string.
 39 |     """
 40 |     if not text:
 41 |         return ""
 42 |         
 43 |     cleaned_text = text.strip()
 44 |     possible_json_candidates = []
 45 |     
 46 |     # Try to find JSON inside complete code blocks with various fence styles
 47 |     # Look for backtick fences (most common)
 48 |     backtick_matches = re.finditer(r"```(?:json)?\s*(.*?)\s*```", cleaned_text, re.DOTALL | re.IGNORECASE)
 49 |     for match in backtick_matches:
 50 |         possible_json_candidates.append(match.group(1).strip())
 51 |     
 52 |     # Look for tilde fences (less common but valid in some markdown)
 53 |     tilde_matches = re.finditer(r"~~~(?:json)?\s*(.*?)\s*~~~", cleaned_text, re.DOTALL | re.IGNORECASE)
 54 |     for match in tilde_matches:
 55 |         possible_json_candidates.append(match.group(1).strip())
 56 |     
 57 |     # If no complete blocks found, check for blocks with only opening fence
 58 |     if not possible_json_candidates:
 59 |         # Try backtick opening fence
 60 |         backtick_start = re.search(r"```(?:json)?\s*", cleaned_text, re.IGNORECASE)
 61 |         if backtick_start:
 62 |             content_after_fence = cleaned_text[backtick_start.end():].strip()
 63 |             possible_json_candidates.append(content_after_fence)
 64 |         
 65 |         # Try tilde opening fence
 66 |         tilde_start = re.search(r"~~~(?:json)?\s*", cleaned_text, re.IGNORECASE)
 67 |         if tilde_start:
 68 |             content_after_fence = cleaned_text[tilde_start.end():].strip()
 69 |             possible_json_candidates.append(content_after_fence)
 70 |     
 71 |     # If still no candidates, add the original text as last resort
 72 |     if not possible_json_candidates:
 73 |         possible_json_candidates.append(cleaned_text)
 74 |     
 75 |     # Try each candidate, returning the first one that looks like valid JSON
 76 |     for candidate in possible_json_candidates:
 77 |         # Apply advanced JSON repair
 78 |         repaired = _repair_json(candidate)
 79 |         try:
 80 |             # Validate if it's actually parseable JSON
 81 |             json.loads(repaired)
 82 |             return repaired  # Return the first valid JSON
 83 |         except json.JSONDecodeError:
 84 |             # If repair didn't work, continue to the next candidate
 85 |             continue
 86 |     
 87 |     # If no candidate worked with regular repair, try more aggressive repair on the first candidate
 88 |     if possible_json_candidates:
 89 |         aggressive_repair = _repair_json(possible_json_candidates[0], aggressive=True)
 90 |         try:
 91 |             json.loads(aggressive_repair)
 92 |             return aggressive_repair
 93 |         except json.JSONDecodeError:
 94 |             # Return the best we can - the first candidate with basic cleaning
 95 |             # This will still fail in json.loads, but at least we tried
 96 |             return possible_json_candidates[0]
 97 |     
 98 |     # Absolute fallback - return the original text
 99 |     return cleaned_text
100 | 
101 | def _repair_json(text: str, aggressive=False) -> str:
102 |     """
103 |     Repair common JSON formatting issues in LLM-generated output.
104 |     
105 |     This internal utility function applies a series of transformations to fix common
106 |     JSON formatting problems that frequently occur in LLM outputs. It can operate in
107 |     two modes: standard and aggressive.
108 |     
109 |     In standard mode (aggressive=False), it applies basic repairs like:
110 |     - Removing trailing commas before closing brackets/braces
111 |     - Ensuring property names are properly quoted
112 |     - Basic structure validation
113 |     
114 |     In aggressive mode (aggressive=True), it applies more extensive repairs:
115 |     - Fixing unterminated string literals by adding missing quotes
116 |     - Balancing unmatched brackets and braces
117 |     - Adding missing values for dangling properties
118 |     - Handling truncated JSON at the end of strings
119 |     - Attempting to recover partial JSON structures
120 |     
121 |     The aggressive repairs are particularly useful when dealing with outputs from
122 |     models that have been truncated mid-generation or contain structural errors
123 |     that would normally make the JSON unparseable.
124 |     
125 |     Args:
126 |         text: The JSON-like string to repair, potentially containing formatting errors
127 |         aggressive: Whether to apply more extensive repair techniques beyond basic
128 |                    formatting fixes. Default is False (basic repairs only).
129 |         
130 |     Returns:
131 |         A repaired JSON string that is more likely to be parseable. Note that even
132 |         with aggressive repairs, the function cannot guarantee valid JSON for
133 |         severely corrupted inputs.
134 |     
135 |     Note:
136 |         This function is intended for internal use by extract_json_from_markdown.
137 |         While it attempts to fix common issues, it may not address all possible
138 |         JSON formatting problems, especially in severely malformed inputs.
139 |     """
140 |     if not text:
141 |         return text
142 |         
143 |     # Step 1: Basic cleanup
144 |     result = text.strip()
145 |     
146 |     # Quick check if it even remotely looks like JSON
147 |     if not (result.startswith('{') or result.startswith('[')):
148 |         return result
149 |         
150 |     # Step 2: Fix common issues
151 |     
152 |     # Fix trailing commas before closing brackets
153 |     result = re.sub(r',\s*([\}\]])', r'\1', result)
154 |     
155 |     # Ensure property names are quoted
156 |     result = re.sub(r'([{,]\s*)([a-zA-Z0-9_$]+)(\s*:)', r'\1"\2"\3', result)
157 |     
158 |     # If we're not in aggressive mode, return after basic fixes
159 |     if not aggressive:
160 |         return result
161 |         
162 |     # Step 3: Aggressive repairs for truncated/malformed JSON
163 |     
164 |     # Track opening/closing brackets to detect imbalance
165 |     open_braces = result.count('{')
166 |     close_braces = result.count('}')
167 |     open_brackets = result.count('[')
168 |     close_brackets = result.count(']')
169 |     
170 |     # Count quotes to check if we have an odd number (indicating unterminated strings)
171 |     quote_count = result.count('"')
172 |     if quote_count % 2 != 0:
173 |         # We have an odd number of quotes, meaning at least one string is unterminated
174 |         # This is a much more aggressive approach to fix strings
175 |         
176 |         # First, try to find all strings that are properly terminated
177 |         proper_strings = []
178 |         pos = 0
179 |         in_string = False
180 |         string_start = 0
181 |         
182 |         # This helps track properly formed strings and identify problematic ones
183 |         while pos < len(result):
184 |             if result[pos] == '"' and (pos == 0 or result[pos-1] != '\\'):
185 |                 if not in_string:
186 |                     # Start of a string
187 |                     in_string = True
188 |                     string_start = pos
189 |                 else:
190 |                     # End of a string
191 |                     in_string = False
192 |                     proper_strings.append((string_start, pos))
193 |             pos += 1
194 |         
195 |         # If we're still in a string at the end, we found an unterminated string
196 |         if in_string:
197 |             # Force terminate it at the end
198 |             result += '"'
199 |     
200 |     # Even more aggressive string fixing
201 |     # This regexp looks for a quote followed by any characters not containing a quote
202 |     # followed by a comma, closing brace, or bracket, without a quote in between
203 |     # This indicates an unterminated string
204 |     result = re.sub(r'"([^"]*?)(?=,|\s*[\]}]|$)', r'"\1"', result)
205 |     
206 |     # Fix cases where value might be truncated mid-word just before closing quote
207 |     # If we find something that looks like it's in the middle of a string, terminate it
208 |     result = re.sub(r'"([^"]+)(\s*[\]}]|,|$)', lambda m: 
209 |         f'"{m.group(1)}"{"" if m.group(2).startswith(",") or m.group(2) in "]}," else m.group(2)}', 
210 |         result)
211 |     
212 |     # Fix dangling quotes at the end of the string - these usually indicate a truncated string
213 |     if result.rstrip().endswith('"'):
214 |         # Add closing quote and appropriate structure depending on context
215 |         result = result.rstrip() + '"'
216 |         
217 |         # Look at the previous few characters to determine if we need a comma or not
218 |         context = result[-20:] if len(result) > 20 else result
219 |         # If string ends with x": " it's likely a property name
220 |         if re.search(r'"\s*:\s*"$', context):
221 |             # Add a placeholder value and closing structure for the property
222 |             result += "unknown"
223 |             
224 |     # Check for dangling property (property name with colon but no value)
225 |     result = re.sub(r'"([^"]+)"\s*:(?!\s*["{[\w-])', r'"\1": null', result)
226 |     
227 |     # Add missing closing brackets/braces if needed
228 |     if open_braces > close_braces:
229 |         result += '}' * (open_braces - close_braces)
230 |     if open_brackets > close_brackets:
231 |         result += ']' * (open_brackets - close_brackets)
232 |     
233 |     # Handle truncated JSON structure - look for incomplete objects at the end
234 |     # This is complex, but we'll try some common patterns
235 |     
236 |     # If JSON ends with a property name and colon but no value
237 |     if re.search(r'"[^"]+"\s*:\s*$', result):
238 |         result += 'null'
239 |     
240 |     # If JSON ends with a comma, it needs another value - add a null
241 |     if re.search(r',\s*$', result):
242 |         result += 'null'
243 |         
244 |     # If the JSON structure is fundamentally corrupted at the end (common in truncation)
245 |     # Close any unclosed objects or arrays
246 |     if not (result.endswith('}') or result.endswith(']') or result.endswith('"')):
247 |         # Count unmatched opening brackets
248 |         stack = []
249 |         for char in result:
250 |             if char in '{[':
251 |                 stack.append(char)
252 |             elif char in '}]':
253 |                 if stack and ((stack[-1] == '{' and char == '}') or (stack[-1] == '[' and char == ']')):
254 |                     stack.pop()
255 |                     
256 |         # Close any unclosed structures
257 |         for bracket in reversed(stack):
258 |             if bracket == '{':
259 |                 result += '}'
260 |             elif bracket == '[':
261 |                 result += ']'
262 |     
263 |     # As a final safeguard, try to eval the JSON with a permissive parser
264 |     # This won't fix deep structural issues but catches cases our regexes missed
265 |     try:
266 |         import simplejson
267 |         simplejson.loads(result, parse_constant=lambda x: x)
268 |     except (ImportError, simplejson.JSONDecodeError):
269 |         try:
270 |             # Try one last time with the more permissive custom JSON parser
271 |             _scan_once = json.scanner.py_make_scanner(json.JSONDecoder())
272 |             try:
273 |                 _scan_once(result, 0)
274 |             except StopIteration:
275 |                 # Likely unterminated JSON - do one final pass of common fixups
276 |                 
277 |                 # Check for unterminated strings of various forms one more time
278 |                 if re.search(r'(?<!")"(?:[^"\\]|\\.)*[^"\\](?!")(?=,|\s*[\]}]|$)', result):
279 |                     # Even more aggressive fixes, replacing with generic values
280 |                     result = re.sub(r'(?<!")"(?:[^"\\]|\\.)*[^"\\](?!")(?=,|\s*[\]}]|$)', 
281 |                                     r'"invalid_string"', result)
282 |                 
283 |                 # Ensure valid JSON-like structure
284 |                 if not (result.endswith('}') or result.endswith(']')):
285 |                     if result.count('{') > result.count('}'):
286 |                         result += '}'
287 |                     if result.count('[') > result.count(']'):
288 |                         result += ']'
289 |             except Exception:
290 |                 # Something else is wrong, but we've tried our best
291 |                 pass
292 |         except Exception:
293 |             # We've done all we reasonably can
294 |             pass
295 |     
296 |     return result
297 | 
298 | async def parse_result(result: Any) -> Dict[str, Any]:
299 |     """Parse the result from a tool call into a usable dictionary.
300 |     
301 |     Handles various return types from MCP tools, including TextContent objects,
302 |     list results, and direct dictionaries. Attempts to extract JSON from
303 |     markdown code fences if present.
304 |     
305 |     Args:
306 |         result: Result from an MCP tool call or provider operation
307 |             
308 |     Returns:
309 |         Parsed dictionary containing the result data
310 |     """
311 |     try:
312 |         text_to_parse = None
313 |         # Handle TextContent object (which has a .text attribute)
314 |         if hasattr(result, 'text'):
315 |             text_to_parse = result.text
316 |                 
317 |         # Handle list result
318 |         elif isinstance(result, list):
319 |             if result:
320 |                 first_item = result[0]
321 |                 if hasattr(first_item, 'text'): 
322 |                     text_to_parse = first_item.text
323 |                 elif isinstance(first_item, dict):
324 |                     # NEW: Check if it's an MCP-style text content dict
325 |                     if first_item.get("type") == "text" and "text" in first_item:
326 |                         text_to_parse = first_item["text"]
327 |                     else:
328 |                         # It's some other dictionary, return it directly
329 |                         return first_item 
330 |                 elif isinstance(first_item, str): 
331 |                     text_to_parse = first_item
332 |                 else:
333 |                     logger.warning(f"List item type not directly parseable: {type(first_item)}")
334 |                     return {"error": f"List item type not directly parseable: {type(first_item)}", "original_result_type": str(type(result))}
335 |             else: # Empty list
336 |                 return {} # Or perhaps an error/warning? For now, empty dict.
337 |             
338 |         # Handle dictionary directly
339 |         elif isinstance(result, dict):
340 |             return result
341 | 
342 |         # Handle string directly
343 |         elif isinstance(result, str):
344 |             text_to_parse = result
345 | 
346 |         # If text_to_parse is still None or is empty/whitespace after potential assignments
347 |         if text_to_parse is None or not text_to_parse.strip():
348 |             logger.warning(f"No parsable text content found in result (type: {type(result)}, content preview: \'{str(text_to_parse)[:100]}...\').")
349 |             return {"error": "No parsable text content found in result", "result_type": str(type(result)), "content_preview": str(text_to_parse)[:100] if text_to_parse else None}
350 | 
351 |         # At this point, text_to_parse should be a non-empty string.
352 |         # Attempt to extract JSON from markdown (if any)
353 |         # If no markdown, or extraction fails, json_to_parse will be text_to_parse itself.
354 |         json_to_parse = extract_json_from_markdown(text_to_parse)
355 | 
356 |         if not json_to_parse.strip(): # If extraction resulted in an empty string (e.g. from "``` ```")
357 |             logger.warning(f"JSON extraction from text_to_parse yielded an empty string. Original text_to_parse: \'{text_to_parse[:200]}...\'")
358 |             # Fallback to trying the original text_to_parse if extraction gave nothing useful
359 |             # This covers cases where text_to_parse might be pure JSON without fences.
360 |             if text_to_parse.strip(): # Ensure original text_to_parse wasn't also empty
361 |                  json_to_parse = text_to_parse 
362 |             else: # Should have been caught by the earlier check, but as a safeguard:
363 |                 return {"error": "Content became empty after attempting JSON extraction", "original_text_to_parse": text_to_parse}
364 | 
365 | 
366 |         # Now, json_to_parse should be the best candidate string for JSON parsing.
367 |         # Only attempt to parse if it's not empty/whitespace.
368 |         if not json_to_parse.strip():
369 |             logger.warning(f"Final string to parse is empty. Original text_to_parse: \'{text_to_parse[:200]}...\'")
370 |             return {"error": "Final string for JSON parsing is empty", "original_text_to_parse": text_to_parse}
371 | 
372 |         try:
373 |             return json.loads(json_to_parse)
374 |         except json.JSONDecodeError as e:
375 |             problematic_text_for_repair = json_to_parse # This is the string that failed json.loads
376 |             logger.warning(f"Initial JSON parsing failed for: '{problematic_text_for_repair[:200]}...' Error: {e}. Attempting LLM repair...", emoji_key="warning")
377 |             try:
378 |                 from ultimate_mcp_server.tools.completion import generate_completion
379 |                 
380 |                 system_message_content = "You are an expert JSON repair assistant. Your goal is to return only valid JSON."
381 |                 # Prepend system instruction to the main prompt for completion models
382 |                 # (as generate_completion with openai provider doesn't natively use a separate system_prompt field in its current design)
383 |                 user_repair_request = (
384 |                     f"The following text is supposed to be valid JSON but failed parsing. "
385 |                     f"Please correct it and return *only* the raw, valid JSON string. "
386 |                     f"Do not include any explanations or markdown formatting. "
387 |                     f"If it's impossible to extract or repair to valid JSON, return an empty JSON object {{}}. "
388 |                     f"Problematic text:\n\n```text\n{problematic_text_for_repair}\n```"
389 |                 )
390 |                 combined_prompt = f"{system_message_content}\n\n{user_repair_request}"
391 | 
392 |                 llm_repair_result = await generate_completion(
393 |                     prompt=combined_prompt, # Use the combined prompt
394 |                     provider="openai",
395 |                     model="gpt-4.1-mini", 
396 |                     temperature=0.0,
397 |                     additional_params={} # Remove system_prompt from here
398 |                 )
399 | 
400 |                 text_from_llm_repair = llm_repair_result.get("text", "")
401 |                 if not text_from_llm_repair.strip():
402 |                     logger.error("LLM repair attempt returned empty string.")
403 |                     return {"error": "LLM repair returned empty string", "original_text": problematic_text_for_repair}
404 | 
405 |                 # Re-extract from LLM response, as it might add fences
406 |                 final_repaired_json_str = extract_json_from_markdown(text_from_llm_repair)
407 |                 
408 |                 if not final_repaired_json_str.strip():
409 |                     logger.error(f"LLM repair extracted an empty JSON string from LLM response: {text_from_llm_repair[:200]}...")
410 |                     return {"error": "LLM repair extracted empty JSON", "llm_response": text_from_llm_repair, "original_text": problematic_text_for_repair}
411 | 
412 |                 try:
413 |                     logger.debug(f"Attempting to parse LLM-repaired JSON: {final_repaired_json_str[:200]}...")
414 |                     parsed_llm_result = json.loads(final_repaired_json_str)
415 |                     logger.success("LLM JSON repair successful.", emoji_key="success")
416 |                     return parsed_llm_result
417 |                 except json.JSONDecodeError as llm_e:
418 |                     logger.error(f"LLM repair attempt failed. LLM response could not be parsed as JSON: {llm_e}. LLM response (after extraction): '{final_repaired_json_str[:200]}' Original LLM text: '{text_from_llm_repair[:500]}...'")
419 |                     return {"error": "LLM repair failed to produce valid JSON", "detail": str(llm_e), "llm_response_extracted": final_repaired_json_str, "llm_response_raw": text_from_llm_repair, "original_text": problematic_text_for_repair}
420 |             except Exception as repair_ex:
421 |                 logger.error(f"Exception during LLM repair process: {repair_ex}", exc_info=True)
422 |                 return {"error": "Exception during LLM repair", "detail": str(repair_ex), "original_text": problematic_text_for_repair}
423 | 
424 |     except Exception as e: # General error in parse_result
425 |         logger.error(f"Critical error in parse_result: {e}", exc_info=True)
426 |         return {"error": "Critical error during result parsing", "detail": str(e)}
427 | 
428 | async def process_mcp_result(result: Any) -> Dict[str, Any]:
429 |     """
430 |     Process and normalize results from MCP tool calls into a consistent dictionary format.
431 |     
432 |     This function serves as a user-friendly interface for handling and normalizing
433 |     the various types of results that can be returned from MCP tools and provider operations.
434 |     It acts as a bridge between the raw MCP tool outputs and downstream application code
435 |     that expects a consistent dictionary structure.
436 |     
437 |     The function handles multiple return formats:
438 |     - TextContent objects with a .text attribute
439 |     - List results containing TextContent objects or dictionaries
440 |     - Direct dictionary returns
441 |     - JSON-like strings embedded in markdown code blocks
442 |     
443 |     Key features:
444 |     - Automatic extraction of JSON from markdown code fences
445 |     - JSON repair for malformed or truncated LLM outputs
446 |     - Fallback to LLM-based repair for difficult parsing cases
447 |     - Consistent error handling and reporting
448 |     
449 |     This function is especially useful in:
450 |     - Handling results from completion tools where LLMs may return JSON in various formats
451 |     - Processing tool responses that contain structured data embedded in text
452 |     - Creating a consistent interface for downstream processing of MCP tool results
453 |     - Simplifying error handling in client applications
454 |     
455 |     Args:
456 |         result: The raw result from an MCP tool call or provider operation, which could
457 |                be a TextContent object, a list, a dictionary, or another structure
458 |             
459 |     Returns:
460 |         A dictionary containing either:
461 |         - The successfully parsed result data
462 |         - An error description with diagnostic information if parsing failed
463 |         
464 |     Example:
465 |         ```python
466 |         result = await some_mcp_tool()
467 |         processed_data = await process_mcp_result(result)
468 |         
469 |         # Check for errors in the processed result
470 |         if "error" in processed_data:
471 |             print(f"Error processing result: {processed_data['error']}")
472 |         else:
473 |             # Use the normalized data
474 |             print(f"Processed data: {processed_data}")
475 |         ```
476 |     """
477 |     return await parse_result(result) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/retriever.py:
--------------------------------------------------------------------------------

```python
  1 | """Knowledge base retriever for RAG functionality."""
  2 | import time
  3 | from typing import Any, Dict, List, Optional
  4 | 
  5 | from ultimate_mcp_server.services.knowledge_base.feedback import get_rag_feedback_service
  6 | from ultimate_mcp_server.services.knowledge_base.utils import build_metadata_filter
  7 | from ultimate_mcp_server.services.vector import VectorDatabaseService
  8 | from ultimate_mcp_server.utils import get_logger
  9 | 
 10 | logger = get_logger(__name__)
 11 | 
 12 | 
 13 | class KnowledgeBaseRetriever:
 14 |     """
 15 |     Advanced retrieval engine for knowledge base collections in RAG applications.
 16 |     
 17 |     The KnowledgeBaseRetriever provides sophisticated search capabilities for finding
 18 |     the most relevant documents within knowledge bases. It offers multiple retrieval
 19 |     strategies optimized for different search scenarios, from pure semantic vector
 20 |     search to hybrid approaches combining vector and keyword matching.
 21 |     
 22 |     Key Features:
 23 |     - Multiple retrieval methods (vector, hybrid, keyword)
 24 |     - Metadata filtering for targeted searches
 25 |     - Content-based filtering for keyword matching
 26 |     - Configurable similarity thresholds and relevance scoring
 27 |     - Feedback mechanisms for continuous retrieval improvement
 28 |     - Performance monitoring and diagnostics
 29 |     - Advanced parameter tuning for specialized search needs
 30 |     
 31 |     Retrieval Methods:
 32 |     1. Vector Search: Uses embeddings for semantic similarity matching
 33 |        - Best for finding conceptually related content
 34 |        - Handles paraphrasing and semantic equivalence
 35 |        - Computationally efficient for large collections
 36 |     
 37 |     2. Hybrid Search: Combines vector and keyword matching with weighted scoring
 38 |        - Balances semantic understanding with exact term matching
 39 |        - Addresses vocabulary mismatch problems
 40 |        - Provides more robust retrieval across diverse query types
 41 |     
 42 |     3. Keyword Filtering: Limits results to those containing specific text
 43 |        - Used for explicit term presence requirements
 44 |        - Can be combined with other search methods
 45 |     
 46 |     Architecture:
 47 |     The retriever operates as a higher-level service above the vector database,
 48 |     working in concert with:
 49 |     - Embedding services for query vectorization
 50 |     - Vector database services for efficient similarity search
 51 |     - Feedback services for result quality improvement
 52 |     - Metadata filters for context-aware retrieval
 53 |     
 54 |     Usage in RAG Applications:
 55 |     This retriever is a critical component in RAG pipelines, responsible for
 56 |     the quality and relevance of context provided to LLMs. Tuning retrieval
 57 |     parameters significantly impacts the quality of generated responses.
 58 |     
 59 |     Example Usage:
 60 |     ```python
 61 |     # Get retriever instance
 62 |     retriever = get_knowledge_base_retriever()
 63 |     
 64 |     # Simple vector search
 65 |     results = await retriever.retrieve(
 66 |         knowledge_base_name="company_policies",
 67 |         query="What is our remote work policy?",
 68 |         top_k=3,
 69 |         min_score=0.7
 70 |     )
 71 |     
 72 |     # Hybrid search with metadata filtering
 73 |     dept_results = await retriever.retrieve_hybrid(
 74 |         knowledge_base_name="company_policies",
 75 |         query="security requirements for customer data",
 76 |         top_k=5,
 77 |         vector_weight=0.6,
 78 |         keyword_weight=0.4,
 79 |         metadata_filter={"department": "security", "status": "active"}
 80 |     )
 81 |     
 82 |     # Process and use the retrieved documents
 83 |     for item in results["results"]:
 84 |         print(f"Document (score: {item['score']:.2f}): {item['document'][:100]}...")
 85 |         print(f"Source: {item['metadata'].get('source', 'unknown')}")
 86 |     
 87 |     # Record which documents were actually useful
 88 |     await retriever.record_feedback(
 89 |         knowledge_base_name="company_policies",
 90 |         query="What is our remote work policy?",
 91 |         retrieved_documents=results["results"],
 92 |         used_document_ids=["doc123", "doc456"]
 93 |     )
 94 |     ```
 95 |     """
 96 |     
 97 |     def __init__(self, vector_service: VectorDatabaseService):
 98 |         """Initialize the knowledge base retriever.
 99 |         
100 |         Args:
101 |             vector_service: Vector database service for retrieving embeddings
102 |         """
103 |         self.vector_service = vector_service
104 |         self.feedback_service = get_rag_feedback_service()
105 |         
106 |         # Get embedding service for generating query embeddings
107 |         from ultimate_mcp_server.services.vector.embeddings import get_embedding_service
108 |         self.embedding_service = get_embedding_service()
109 |         
110 |         logger.info("Knowledge base retriever initialized", extra={"emoji_key": "success"})
111 |     
112 |     async def _validate_knowledge_base(self, name: str) -> Dict[str, Any]:
113 |         """Validate that a knowledge base exists.
114 |         
115 |         Args:
116 |             name: Knowledge base name
117 |             
118 |         Returns:
119 |             Validation result
120 |         """
121 |         # Check if knowledge base exists
122 |         collections = await self.vector_service.list_collections()
123 |         
124 |         if name not in collections:
125 |             logger.warning(
126 |                 f"Knowledge base '{name}' not found", 
127 |                 extra={"emoji_key": "warning"}
128 |             )
129 |             return {"status": "not_found", "name": name}
130 |         
131 |         # Get metadata
132 |         metadata = await self.vector_service.get_collection_metadata(name)
133 |         
134 |         if metadata.get("type") != "knowledge_base":
135 |             logger.warning(
136 |                 f"Collection '{name}' is not a knowledge base", 
137 |                 extra={"emoji_key": "warning"}
138 |             )
139 |             return {"status": "not_knowledge_base", "name": name}
140 |         
141 |         return {
142 |             "status": "valid",
143 |             "name": name,
144 |             "metadata": metadata
145 |         }
146 |     
147 |     async def retrieve(
148 |         self,
149 |         knowledge_base_name: str,
150 |         query: str,
151 |         top_k: int = 5,
152 |         min_score: float = 0.6,
153 |         metadata_filter: Optional[Dict[str, Any]] = None,
154 |         content_filter: Optional[str] = None,
155 |         embedding_model: Optional[str] = None,
156 |         apply_feedback: bool = True,
157 |         search_params: Optional[Dict[str, Any]] = None
158 |     ) -> Dict[str, Any]:
159 |         """Retrieve documents from a knowledge base using vector search.
160 |         
161 |         Args:
162 |             knowledge_base_name: Knowledge base name
163 |             query: Query text
164 |             top_k: Number of results to return
165 |             min_score: Minimum similarity score
166 |             metadata_filter: Optional metadata filter (field->value or field->{op:value})
167 |             content_filter: Text to search for in documents
168 |             embedding_model: Optional embedding model name
169 |             apply_feedback: Whether to apply feedback adjustments
170 |             search_params: Optional ChromaDB search parameters
171 |             
172 |         Returns:
173 |             Retrieved documents with metadata
174 |         """
175 |         start_time = time.time()
176 |         
177 |         # Validate knowledge base
178 |         kb_info = await self._validate_knowledge_base(knowledge_base_name)
179 |         
180 |         if kb_info["status"] != "valid":
181 |             logger.warning(
182 |                 f"Knowledge base '{knowledge_base_name}' not found or invalid", 
183 |                 extra={"emoji_key": "warning"}
184 |             )
185 |             return {
186 |                 "status": "error",
187 |                 "message": f"Knowledge base '{knowledge_base_name}' not found or invalid"
188 |             }
189 |         
190 |         logger.debug(f"DEBUG: Knowledge base validated - metadata: {kb_info['metadata']}")
191 |         
192 |         # Use the same embedding model that was used to create the knowledge base
193 |         if not embedding_model and kb_info["metadata"].get("embedding_model"):
194 |             embedding_model = kb_info["metadata"]["embedding_model"]
195 |             logger.debug(f"Using embedding model from knowledge base metadata: {embedding_model}")
196 |         
197 |         # If embedding model is specified, ensure it's saved in the metadata for future use
198 |         if embedding_model and not kb_info["metadata"].get("embedding_model"):
199 |             try:
200 |                 await self.vector_service.update_collection_metadata(
201 |                     name=knowledge_base_name,
202 |                     metadata={
203 |                         **kb_info["metadata"],
204 |                         "embedding_model": embedding_model
205 |                     }
206 |                 )
207 |                 logger.debug(f"Updated knowledge base metadata with embedding model: {embedding_model}")
208 |             except Exception as e:
209 |                 logger.warning(f"Failed to update knowledge base metadata with embedding model: {str(e)}")
210 |         
211 |         # Get or create ChromaDB collection
212 |         collection = await self.vector_service.get_collection(knowledge_base_name)
213 |         logger.debug(f"DEBUG: Retrieved collection type: {type(collection)}")
214 |         
215 |         # Set search parameters if provided
216 |         if search_params:
217 |             await self.vector_service.update_collection_metadata(
218 |                 collection_name=knowledge_base_name,
219 |                 metadata={
220 |                     **kb_info["metadata"],
221 |                     **{f"hnsw:{k}": v for k, v in search_params.items()}
222 |                 }
223 |             )
224 |         
225 |         # Create includes parameter
226 |         includes = ["documents", "metadatas", "distances"]
227 |         
228 |         # Create where_document parameter for content filtering
229 |         where_document = {"$contains": content_filter} if content_filter else None
230 |         
231 |         # Convert metadata filter format if provided
232 |         chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None
233 |         
234 |         logger.debug(f"DEBUG: Search parameters - top_k: {top_k}, min_score: {min_score}, filter: {chroma_filter}, where_document: {where_document}")
235 |         
236 |         try:
237 |             # Generate embedding directly with our embedding service
238 |             # Call create_embeddings with a list and get the first result
239 |             query_embeddings = await self.embedding_service.create_embeddings(
240 |                 texts=[query],
241 |                 # model=embedding_model # Model is set during service init
242 |             )
243 |             if not query_embeddings:
244 |                 logger.error(f"Failed to generate embedding for query: {query}")
245 |                 return { "status": "error", "message": "Failed to generate query embedding" }
246 |             query_embedding = query_embeddings[0]
247 |             
248 |             logger.debug(f"Generated query embedding with model: {self.embedding_service.model_name}, dimension: {len(query_embedding)}")
249 |             
250 |             # Use correct query method based on collection type
251 |             if hasattr(collection, 'query') and not hasattr(collection, 'search_by_text'):
252 |                 # ChromaDB collection
253 |                 logger.debug("Using ChromaDB direct query with embeddings")
254 |                 try:
255 |                     search_results = collection.query(
256 |                         query_embeddings=[query_embedding],  # Use our embedding directly
257 |                         n_results=top_k * 2, 
258 |                         where=chroma_filter,
259 |                         where_document=where_document,
260 |                         include=includes
261 |                     )
262 |                 except Exception as e:
263 |                     logger.error(f"ChromaDB query error: {str(e)}")
264 |                     raise
265 |             else:
266 |                 # Our custom VectorCollection
267 |                 logger.debug("Using VectorCollection search method")
268 |                 search_results = await collection.query(
269 |                     query_texts=[query],
270 |                     n_results=top_k * 2,
271 |                     where=chroma_filter,
272 |                     where_document=where_document,
273 |                     include=includes,
274 |                     embedding_model=embedding_model
275 |                 )
276 |             
277 |             # Debug raw results
278 |             logger.debug(f"DEBUG: Raw search results - keys: {search_results.keys()}")
279 |             logger.debug(f"DEBUG: Documents count: {len(search_results.get('documents', [[]])[0])}")
280 |             logger.debug(f"DEBUG: IDs: {search_results.get('ids', [[]])[0]}")
281 |             logger.debug(f"DEBUG: Distances: {search_results.get('distances', [[]])[0]}")
282 |             
283 |             # Process results
284 |             results = []
285 |             for i, doc in enumerate(search_results["documents"][0]):
286 |                 # Convert distance to similarity score (1 = exact match, 0 = completely different)
287 |                 # Most distance metrics return 0 for exact match, so we use 1 - distance
288 |                 # This works for cosine, l2, etc.
289 |                 similarity = 1.0 - float(search_results["distances"][0][i])
290 |                 
291 |                 # Debug each document
292 |                 logger.debug(f"DEBUG: Document {i} - ID: {search_results['ids'][0][i]}")
293 |                 logger.debug(f"DEBUG: Similarity: {similarity} (min required: {min_score})")
294 |                 logger.debug(f"DEBUG: Document content (first 100 chars): {doc[:100] if doc else 'Empty'}")
295 |                 
296 |                 if search_results["metadatas"] and i < len(search_results["metadatas"][0]):
297 |                     metadata = search_results["metadatas"][0][i]
298 |                     logger.debug(f"DEBUG: Metadata: {metadata}")
299 |                 
300 |                 # Skip results below minimum score
301 |                 if similarity < min_score:
302 |                     logger.debug(f"DEBUG: Skipping document {i} due to low similarity: {similarity} < {min_score}")
303 |                     continue
304 |                 
305 |                 results.append({
306 |                     "id": search_results["ids"][0][i],
307 |                     "document": doc,
308 |                     "metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {},
309 |                     "score": similarity
310 |                 })
311 |             
312 |             logger.debug(f"DEBUG: After filtering, {len(results)} documents remain.")
313 |             
314 |             # Apply feedback adjustments if requested
315 |             if apply_feedback:
316 |                 results = await self.feedback_service.apply_feedback_adjustments(
317 |                     knowledge_base_name=knowledge_base_name,
318 |                     results=results,
319 |                     query=query
320 |                 )
321 |             
322 |             # Limit to top_k
323 |             results = results[:top_k]
324 |             
325 |             # Track retrieval time
326 |             retrieval_time = time.time() - start_time
327 |             
328 |             logger.info(
329 |                 f"Retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s", 
330 |                 extra={"emoji_key": "success"}
331 |             )
332 |             
333 |             return {
334 |                 "status": "success",
335 |                 "query": query,
336 |                 "results": results,
337 |                 "count": len(results),
338 |                 "retrieval_time": retrieval_time
339 |             }
340 |             
341 |         except Exception as e:
342 |             logger.error(
343 |                 f"Error retrieving from knowledge base '{knowledge_base_name}': {str(e)}", 
344 |                 extra={"emoji_key": "error"}
345 |             )
346 |             
347 |             return {
348 |                 "status": "error",
349 |                 "message": str(e)
350 |             }
351 |     
352 |     async def retrieve_hybrid(
353 |         self,
354 |         knowledge_base_name: str,
355 |         query: str,
356 |         top_k: int = 5,
357 |         vector_weight: float = 0.7,
358 |         keyword_weight: float = 0.3,
359 |         min_score: float = 0.6,
360 |         metadata_filter: Optional[Dict[str, Any]] = None,
361 |         additional_keywords: Optional[List[str]] = None,
362 |         apply_feedback: bool = True,
363 |         search_params: Optional[Dict[str, Any]] = None
364 |     ) -> Dict[str, Any]:
365 |         """Retrieve documents using hybrid search.
366 |         
367 |         Args:
368 |             knowledge_base_name: Knowledge base name
369 |             query: Query text
370 |             top_k: Number of documents to retrieve
371 |             vector_weight: Weight for vector search component
372 |             keyword_weight: Weight for keyword search component
373 |             min_score: Minimum similarity score
374 |             metadata_filter: Optional metadata filter
375 |             additional_keywords: Additional keywords to include in search
376 |             apply_feedback: Whether to apply feedback adjustments
377 |             search_params: Optional ChromaDB search parameters
378 |             
379 |         Returns:
380 |             Retrieved documents with metadata
381 |         """
382 |         start_time = time.time()
383 |         
384 |         # Validate knowledge base
385 |         kb_info = await self._validate_knowledge_base(knowledge_base_name)
386 |         
387 |         if kb_info["status"] != "valid":
388 |             logger.warning(
389 |                 f"Knowledge base '{knowledge_base_name}' not found or invalid", 
390 |                 extra={"emoji_key": "warning"}
391 |             )
392 |             return {
393 |                 "status": "error",
394 |                 "message": f"Knowledge base '{knowledge_base_name}' not found or invalid"
395 |             }
396 |         
397 |         # Get or create ChromaDB collection
398 |         collection = await self.vector_service.get_collection(knowledge_base_name)
399 |         
400 |         # Set search parameters if provided
401 |         if search_params:
402 |             await self.vector_service.update_collection_metadata(
403 |                 collection_name=knowledge_base_name,
404 |                 metadata={
405 |                     **kb_info["metadata"],
406 |                     **{f"hnsw:{k}": v for k, v in search_params.items()}
407 |                 }
408 |             )
409 |         
410 |         # Convert metadata filter format if provided
411 |         chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None
412 |         
413 |         # Create content filter based on query and additional keywords
414 |         content_text = query
415 |         if additional_keywords:
416 |             content_text = f"{query} {' '.join(additional_keywords)}"
417 |         
418 |         # Use ChromaDB's hybrid search by providing both query text and content filter
419 |         try:
420 |             # Vector search results with content filter
421 |             search_results = await collection.query(
422 |                 query_texts=[query],
423 |                 n_results=top_k * 3,  # Get more results for combining
424 |                 where=chroma_filter,
425 |                 where_document={"$contains": content_text} if content_text else None,
426 |                 include=["documents", "metadatas", "distances"],
427 |                 embedding_model=None  # Use default embedding model
428 |             )
429 |             
430 |             # Process results
431 |             combined_results = {}
432 |             
433 |             # Process vector search results
434 |             for i, doc in enumerate(search_results["documents"][0]):
435 |                 doc_id = search_results["ids"][0][i]
436 |                 vector_score = 1.0 - float(search_results["distances"][0][i])
437 |                 
438 |                 combined_results[doc_id] = {
439 |                     "id": doc_id,
440 |                     "document": doc,
441 |                     "metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {},
442 |                     "vector_score": vector_score,
443 |                     "keyword_score": 0.0,
444 |                     "score": vector_score * vector_weight
445 |                 }
446 |             
447 |             # Now do a keyword-only search if we have keywords component
448 |             if keyword_weight > 0:
449 |                 keyword_results = await collection.query(
450 |                     query_texts=None,  # No vector query
451 |                     n_results=top_k * 3,
452 |                     where=chroma_filter,
453 |                     where_document={"$contains": content_text},
454 |                     include=["documents", "metadatas"],
455 |                     embedding_model=None  # No embedding model needed for keyword-only search
456 |                 )
457 |                 
458 |                 # Process keyword results
459 |                 for i, doc in enumerate(keyword_results["documents"][0]):
460 |                     doc_id = keyword_results["ids"][0][i]
461 |                     # Approximate keyword score based on position (best = 1.0)
462 |                     keyword_score = 1.0 - (i / len(keyword_results["documents"][0]))
463 |                     
464 |                     if doc_id in combined_results:
465 |                         # Update existing result
466 |                         combined_results[doc_id]["keyword_score"] = keyword_score
467 |                         combined_results[doc_id]["score"] += keyword_score * keyword_weight
468 |                     else:
469 |                         # Add new result
470 |                         combined_results[doc_id] = {
471 |                             "id": doc_id,
472 |                             "document": doc,
473 |                             "metadata": keyword_results["metadatas"][0][i] if keyword_results["metadatas"] else {},
474 |                             "vector_score": 0.0,
475 |                             "keyword_score": keyword_score,
476 |                             "score": keyword_score * keyword_weight
477 |                         }
478 |             
479 |             # Convert to list and filter by min_score
480 |             results = [r for r in combined_results.values() if r["score"] >= min_score]
481 |             
482 |             # Apply feedback adjustments if requested
483 |             if apply_feedback:
484 |                 results = await self.feedback_service.apply_feedback_adjustments(
485 |                     knowledge_base_name=knowledge_base_name,
486 |                     results=results,
487 |                     query=query
488 |                 )
489 |             
490 |             # Sort by score and limit to top_k
491 |             results.sort(key=lambda x: x["score"], reverse=True)
492 |             results = results[:top_k]
493 |             
494 |             # Track retrieval time
495 |             retrieval_time = time.time() - start_time
496 |             
497 |             logger.info(
498 |                 f"Hybrid search retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s", 
499 |                 extra={"emoji_key": "success"}
500 |             )
501 |             
502 |             return {
503 |                 "status": "success",
504 |                 "query": query,
505 |                 "results": results,
506 |                 "count": len(results),
507 |                 "retrieval_time": retrieval_time
508 |             }
509 |             
510 |         except Exception as e:
511 |             logger.error(
512 |                 f"Error performing hybrid search on '{knowledge_base_name}': {str(e)}", 
513 |                 extra={"emoji_key": "error"}
514 |             )
515 |             
516 |             return {
517 |                 "status": "error",
518 |                 "message": str(e)
519 |             }
520 |     
521 |     async def record_feedback(
522 |         self,
523 |         knowledge_base_name: str,
524 |         query: str,
525 |         retrieved_documents: List[Dict[str, Any]],
526 |         used_document_ids: Optional[List[str]] = None,
527 |         explicit_feedback: Optional[Dict[str, str]] = None
528 |     ) -> Dict[str, Any]:
529 |         """Record feedback for retrieval results.
530 |         
531 |         Args:
532 |             knowledge_base_name: Knowledge base name
533 |             query: Query text
534 |             retrieved_documents: List of retrieved documents
535 |             used_document_ids: List of document IDs used in the response
536 |             explicit_feedback: Explicit feedback for documents
537 |             
538 |         Returns:
539 |             Feedback recording result
540 |         """
541 |         # Convert list to set if provided
542 |         used_ids_set = set(used_document_ids) if used_document_ids else None
543 |         
544 |         # Record feedback
545 |         result = await self.feedback_service.record_retrieval_feedback(
546 |             knowledge_base_name=knowledge_base_name,
547 |             query=query,
548 |             retrieved_documents=retrieved_documents,
549 |             used_document_ids=used_ids_set,
550 |             explicit_feedback=explicit_feedback
551 |         )
552 |         
553 |         return result 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/gemini.py:
--------------------------------------------------------------------------------

```python
  1 | """Google Gemini provider implementation."""
  2 | import time
  3 | from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
  4 | 
  5 | from google import genai
  6 | 
  7 | from ultimate_mcp_server.constants import Provider
  8 | from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
  9 | from ultimate_mcp_server.utils import get_logger
 10 | 
 11 | # Use the same naming scheme everywhere: logger at module level
 12 | logger = get_logger("ultimate_mcp_server.providers.gemini")
 13 | 
 14 | 
 15 | class GeminiProvider(BaseProvider):
 16 |     """Provider implementation for Google Gemini API."""
 17 |     
 18 |     provider_name = Provider.GEMINI.value
 19 |     
 20 |     def __init__(self, api_key: Optional[str] = None, **kwargs):
 21 |         """Initialize the Gemini provider.
 22 |         
 23 |         Args:
 24 |             api_key: Google API key
 25 |             **kwargs: Additional options
 26 |         """
 27 |         super().__init__(api_key=api_key, **kwargs)
 28 |         self.models_cache = None
 29 |         
 30 |     async def initialize(self) -> bool:
 31 |         """Initialize the Gemini client.
 32 |         
 33 |         Returns:
 34 |             bool: True if initialization was successful
 35 |         """
 36 |         try:
 37 |             # Skip real API calls if using mock key for tests
 38 |             if self.api_key and "mock-" in self.api_key:
 39 |                 self.logger.info(
 40 |                     "Using mock Gemini key - skipping API initialization",
 41 |                     emoji_key="mock"
 42 |                 )
 43 |                 self.client = {"mock_client": True}
 44 |                 return True
 45 |                 
 46 |             # Create a client instance instead of configuring globally
 47 |             self.client = genai.Client(
 48 |                 api_key=self.api_key,
 49 |                 http_options={"api_version": "v1alpha"}
 50 |             )
 51 |             
 52 |             self.logger.success(
 53 |                 "Gemini provider initialized successfully", 
 54 |                 emoji_key="provider"
 55 |             )
 56 |             return True
 57 |             
 58 |         except Exception as e:
 59 |             self.logger.error(
 60 |                 f"Failed to initialize Gemini provider: {str(e)}", 
 61 |                 emoji_key="error"
 62 |             )
 63 |             return False
 64 |         
 65 |     async def generate_completion(
 66 |         self,
 67 |         prompt: Optional[str] = None,
 68 |         messages: Optional[List[Dict[str, Any]]] = None,
 69 |         model: Optional[str] = None,
 70 |         max_tokens: Optional[int] = None,
 71 |         temperature: float = 0.7,
 72 |         **kwargs
 73 |     ) -> ModelResponse:
 74 |         """Generate a completion using Google Gemini.
 75 |         
 76 |         Args:
 77 |             prompt: Text prompt to send to the model (or None if messages provided)
 78 |             messages: List of message dictionaries (alternative to prompt)
 79 |             model: Model name to use (e.g., "gemini-2.0-flash-lite")
 80 |             max_tokens: Maximum tokens to generate
 81 |             temperature: Temperature parameter (0.0-1.0)
 82 |             **kwargs: Additional model-specific parameters
 83 |             
 84 |         Returns:
 85 |             ModelResponse: Standardized response
 86 |             
 87 |         Raises:
 88 |             Exception: If API call fails
 89 |         """
 90 |         if not self.client:
 91 |             await self.initialize()
 92 |             
 93 |         # Use default model if not specified
 94 |         model = model or self.get_default_model()
 95 |         
 96 |         # Strip provider prefix if present (e.g., "gemini:gemini-2.0-pro" -> "gemini-2.0-pro")
 97 |         if ":" in model:
 98 |             original_model = model
 99 |             model = model.split(":", 1)[1]
100 |             self.logger.debug(f"Stripped provider prefix from model name: {original_model} -> {model}")
101 |         
102 |         # Validate that either prompt or messages is provided
103 |         if prompt is None and not messages:
104 |             raise ValueError("Either 'prompt' or 'messages' must be provided")
105 |         
106 |         # Prepare generation config and API call kwargs
107 |         config = {
108 |             "temperature": temperature,
109 |         }
110 |         if max_tokens is not None:
111 |              config["max_output_tokens"] = max_tokens # Gemini uses max_output_tokens
112 | 
113 |         # Pop json_mode flag
114 |         json_mode = kwargs.pop("json_mode", False)
115 |         
116 |         # Set up JSON mode in config dict per Gemini API docs
117 |         if json_mode:
118 |             # For Gemini, JSON mode is set via response_mime_type in the config dict
119 |             config["response_mime_type"] = "application/json"
120 |             self.logger.debug("Setting response_mime_type to application/json for Gemini in config")
121 | 
122 |         # Add remaining kwargs to config
123 |         for key in list(kwargs.keys()):
124 |              if key in ["top_p", "top_k", "candidate_count", "stop_sequences"]:
125 |                  config[key] = kwargs.pop(key)
126 |                  
127 |         # Store other kwargs that might need to be passed directly
128 |         request_params = {}
129 |         for key in list(kwargs.keys()):
130 |             if key in ["safety_settings", "tools", "system"]:
131 |                 request_params[key] = kwargs.pop(key)
132 | 
133 |         # Prepare content based on input type (prompt or messages)
134 |         content = None
135 |         if prompt:
136 |             content = prompt
137 |             log_input_size = len(prompt)
138 |         elif messages:
139 |             # Convert messages to Gemini format
140 |             content = []
141 |             log_input_size = 0
142 |             for msg in messages:
143 |                 role = msg.get("role", "").lower()
144 |                 text = msg.get("content", "")
145 |                 log_input_size += len(text)
146 |                 
147 |                 # Map roles to Gemini's expectations
148 |                 if role == "system":
149 |                     # For system messages, prepend to user input or add as user message
150 |                     system_text = text
151 |                     # Find the next user message to prepend to
152 |                     for _i, future_msg in enumerate(messages[messages.index(msg) + 1:], messages.index(msg) + 1):
153 |                         if future_msg.get("role", "").lower() == "user":
154 |                             # Leave this system message to be handled when we reach the user message
155 |                             # Just track its content for now
156 |                             break
157 |                     else:
158 |                         # No user message found after system, add as separate user message
159 |                         content.append({"role": "user", "parts": [{"text": system_text}]})
160 |                     continue
161 |                 
162 |                 elif role == "user":
163 |                     # Check if previous message was a system message
164 |                     prev_system_text = ""
165 |                     if messages.index(msg) > 0:
166 |                         prev_msg = messages[messages.index(msg) - 1]
167 |                         if prev_msg.get("role", "").lower() == "system":
168 |                             prev_system_text = prev_msg.get("content", "")
169 |                     
170 |                     # If there was a system message before, prepend it to the user message
171 |                     if prev_system_text:
172 |                         gemini_role = "user"
173 |                         gemini_text = f"{prev_system_text}\n\n{text}"
174 |                     else:
175 |                         gemini_role = "user"
176 |                         gemini_text = text
177 |                         
178 |                 elif role == "assistant":
179 |                     gemini_role = "model"
180 |                     gemini_text = text
181 |                 else:
182 |                     self.logger.warning(f"Unsupported message role '{role}', treating as user")
183 |                     gemini_role = "user"
184 |                     gemini_text = text
185 |                 
186 |                 content.append({"role": gemini_role, "parts": [{"text": gemini_text}]})
187 | 
188 |         # Log request
189 |         self.logger.info(
190 |             f"Generating completion with Gemini model {model}",
191 |             emoji_key=self.provider_name,
192 |             prompt_length=log_input_size,
193 |             json_mode_requested=json_mode
194 |         )
195 |         
196 |         start_time = time.time()
197 |         
198 |         try:
199 |             # Check if we're using a mock client for testing
200 |             if isinstance(self.client, dict) and self.client.get("mock_client"):
201 |                 # Return mock response for tests
202 |                 completion_text = "Mock Gemini response for testing"
203 |                 processing_time = 0.1
204 |                 response = None
205 |             else:
206 |                 # Pass everything in the correct structure according to the API
207 |                 if isinstance(content, list):  # messages format
208 |                     response = self.client.models.generate_content(
209 |                         model=model,
210 |                         contents=content,
211 |                         config=config,  # Pass config dict containing temperature, max_output_tokens, etc.
212 |                         **request_params  # Pass other params directly if needed
213 |                     )
214 |                 else:  # prompt format (string)
215 |                     response = self.client.models.generate_content(
216 |                         model=model,
217 |                         contents=content,
218 |                         config=config,  # Pass config dict containing temperature, max_output_tokens, etc.
219 |                         **request_params  # Pass other params directly if needed
220 |                     )
221 |                 
222 |                 processing_time = time.time() - start_time
223 |                 
224 |                 # Extract response text
225 |                 completion_text = response.text
226 |             
227 |             # Estimate token usage (Gemini doesn't provide token counts)
228 |             # Roughly 4 characters per token as a crude approximation
229 |             char_to_token_ratio = 4.0
230 |             estimated_input_tokens = log_input_size / char_to_token_ratio
231 |             estimated_output_tokens = len(completion_text) / char_to_token_ratio
232 |             
233 |             # Create standardized response
234 |             result = ModelResponse(
235 |                 text=completion_text,
236 |                 model=model,
237 |                 provider=self.provider_name,
238 |                 input_tokens=int(estimated_input_tokens),
239 |                 output_tokens=int(estimated_output_tokens),
240 |                 processing_time=processing_time,
241 |                 raw_response=None,  # Don't need raw response for tests
242 |                 metadata={"token_count_estimated": True}
243 |             )
244 |             
245 |             # Add message for consistency with other providers
246 |             result.message = {"role": "assistant", "content": completion_text}
247 |             
248 |             # Log success
249 |             self.logger.success(
250 |                 "Gemini completion successful",
251 |                 emoji_key="success",
252 |                 model=model,
253 |                 tokens={
254 |                     "input": result.input_tokens,
255 |                     "output": result.output_tokens
256 |                 },
257 |                 cost=result.cost,
258 |                 time=result.processing_time
259 |             )
260 |             
261 |             return result
262 |             
263 |         except Exception as e:
264 |             self.logger.error(
265 |                 f"Gemini completion failed: {str(e)}",
266 |                 emoji_key="error",
267 |                 model=model
268 |             )
269 |             raise
270 |             
271 |     async def generate_completion_stream(
272 |         self,
273 |         prompt: Optional[str] = None,
274 |         messages: Optional[List[Dict[str, Any]]] = None,
275 |         model: Optional[str] = None,
276 |         max_tokens: Optional[int] = None,
277 |         temperature: float = 0.7,
278 |         **kwargs
279 |     ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
280 |         """Generate a streaming completion using Google Gemini.
281 |         
282 |         Args:
283 |             prompt: Text prompt to send to the model (or None if messages provided)
284 |             messages: List of message dictionaries (alternative to prompt)
285 |             model: Model name to use (e.g., "gemini-2.0-flash-lite")
286 |             max_tokens: Maximum tokens to generate
287 |             temperature: Temperature parameter (0.0-1.0)
288 |             **kwargs: Additional model-specific parameters
289 |             
290 |         Yields:
291 |             Tuple of (text_chunk, metadata)
292 |             
293 |         Raises:
294 |             Exception: If API call fails
295 |         """
296 |         if not self.client:
297 |             await self.initialize()
298 |             
299 |         # Use default model if not specified
300 |         model = model or self.get_default_model()
301 |         
302 |         # Strip provider prefix if present (e.g., "gemini:gemini-2.0-pro" -> "gemini-2.0-pro")
303 |         if ":" in model:
304 |             original_model = model
305 |             model = model.split(":", 1)[1]
306 |             self.logger.debug(f"Stripped provider prefix from model name (stream): {original_model} -> {model}")
307 |         
308 |         # Validate that either prompt or messages is provided
309 |         if prompt is None and not messages:
310 |             raise ValueError("Either 'prompt' or 'messages' must be provided")
311 |         
312 |         # Prepare config dict per Gemini API
313 |         config = {
314 |             "temperature": temperature,
315 |         }
316 |         if max_tokens is not None:
317 |             config["max_output_tokens"] = max_tokens
318 | 
319 |         # Pop json_mode flag
320 |         json_mode = kwargs.pop("json_mode", False)
321 |         
322 |         # Set up JSON mode in config dict
323 |         if json_mode:
324 |             # For Gemini, JSON mode is set via response_mime_type in the config dict
325 |             config["response_mime_type"] = "application/json"
326 |             self.logger.debug("Setting response_mime_type to application/json for Gemini streaming in config")
327 | 
328 |         # Add remaining kwargs to config
329 |         for key in list(kwargs.keys()):
330 |              if key in ["top_p", "top_k", "candidate_count", "stop_sequences"]:
331 |                  config[key] = kwargs.pop(key)
332 |                  
333 |         # Store other kwargs that might need to be passed directly
334 |         request_params = {}
335 |         for key in list(kwargs.keys()):
336 |             if key in ["safety_settings", "tools", "system"]:
337 |                 request_params[key] = kwargs.pop(key)
338 | 
339 |         # Prepare content based on input type (prompt or messages)
340 |         content = None
341 |         if prompt:
342 |             content = prompt
343 |             log_input_size = len(prompt)
344 |         elif messages:
345 |             # Convert messages to Gemini format
346 |             content = []
347 |             log_input_size = 0
348 |             for msg in messages:
349 |                 role = msg.get("role", "").lower()
350 |                 text = msg.get("content", "")
351 |                 log_input_size += len(text)
352 |                 
353 |                 # Map roles to Gemini's expectations
354 |                 if role == "system":
355 |                     # For system messages, prepend to user input or add as user message
356 |                     system_text = text
357 |                     # Find the next user message to prepend to
358 |                     for _i, future_msg in enumerate(messages[messages.index(msg) + 1:], messages.index(msg) + 1):
359 |                         if future_msg.get("role", "").lower() == "user":
360 |                             # Leave this system message to be handled when we reach the user message
361 |                             # Just track its content for now
362 |                             break
363 |                     else:
364 |                         # No user message found after system, add as separate user message
365 |                         content.append({"role": "user", "parts": [{"text": system_text}]})
366 |                     continue
367 |                 
368 |                 elif role == "user":
369 |                     # Check if previous message was a system message
370 |                     prev_system_text = ""
371 |                     if messages.index(msg) > 0:
372 |                         prev_msg = messages[messages.index(msg) - 1]
373 |                         if prev_msg.get("role", "").lower() == "system":
374 |                             prev_system_text = prev_msg.get("content", "")
375 |                     
376 |                     # If there was a system message before, prepend it to the user message
377 |                     if prev_system_text:
378 |                         gemini_role = "user"
379 |                         gemini_text = f"{prev_system_text}\n\n{text}"
380 |                     else:
381 |                         gemini_role = "user"
382 |                         gemini_text = text
383 |                         
384 |                 elif role == "assistant":
385 |                     gemini_role = "model"
386 |                     gemini_text = text
387 |                 else:
388 |                     self.logger.warning(f"Unsupported message role '{role}', treating as user")
389 |                     gemini_role = "user"
390 |                     gemini_text = text
391 |                 
392 |                 content.append({"role": gemini_role, "parts": [{"text": gemini_text}]})
393 | 
394 |         # Log request
395 |         self.logger.info(
396 |             f"Generating streaming completion with Gemini model {model}",
397 |             emoji_key=self.provider_name,
398 |             input_type=f"{'prompt' if prompt else 'messages'} ({log_input_size} chars)",
399 |             json_mode_requested=json_mode
400 |         )
401 |         
402 |         start_time = time.time()
403 |         total_chunks = 0
404 |         
405 |         try:
406 |             # Use the dedicated streaming method as per Google's documentation
407 |             try:
408 |                 if isinstance(content, list):  # messages format
409 |                     stream_response = self.client.models.generate_content_stream(
410 |                         model=model,
411 |                         contents=content,
412 |                         config=config,
413 |                         **request_params
414 |                     )
415 |                 else:  # prompt format (string)
416 |                     stream_response = self.client.models.generate_content_stream(
417 |                         model=model,
418 |                         contents=content,
419 |                         config=config,
420 |                         **request_params
421 |                     )
422 |                     
423 |                 # Process the stream - iterating over chunks
424 |                 async def iterate_response():
425 |                     # Convert sync iterator to async
426 |                     for chunk in stream_response:
427 |                         yield chunk
428 |                 
429 |                 async for chunk in iterate_response():
430 |                     total_chunks += 1
431 |                     
432 |                     # Extract text from the chunk
433 |                     chunk_text = ""
434 |                     if hasattr(chunk, 'text'):
435 |                         chunk_text = chunk.text
436 |                     elif hasattr(chunk, 'candidates') and chunk.candidates:
437 |                         if hasattr(chunk.candidates[0], 'content') and chunk.candidates[0].content:
438 |                             if hasattr(chunk.candidates[0].content, 'parts') and chunk.candidates[0].content.parts:
439 |                                 chunk_text = chunk.candidates[0].content.parts[0].text
440 |                     
441 |                     # Metadata for this chunk
442 |                     metadata = {
443 |                         "model": model,
444 |                         "provider": self.provider_name,
445 |                         "chunk_index": total_chunks,
446 |                     }
447 |                     
448 |                     yield chunk_text, metadata
449 |                 
450 |                 # Log success
451 |                 processing_time = time.time() - start_time
452 |                 self.logger.success(
453 |                     "Gemini streaming completion successful",
454 |                     emoji_key="success",
455 |                     model=model,
456 |                     chunks=total_chunks,
457 |                     time=processing_time
458 |                 )
459 |                 
460 |                 # Yield final metadata chunk
461 |                 yield "", {
462 |                     "model": model,
463 |                     "provider": self.provider_name,
464 |                     "chunk_index": total_chunks + 1,
465 |                     "processing_time": processing_time,
466 |                     "finish_reason": "stop",  # Gemini doesn't provide this directly
467 |                 }
468 |                 
469 |             except (AttributeError, TypeError) as e:
470 |                 # If streaming isn't supported, fall back to non-streaming
471 |                 self.logger.warning(f"Streaming not supported for current Gemini API: {e}. Falling back to non-streaming.")
472 |                 
473 |                 # Call generate_completion and yield the entire result as one chunk
474 |                 completion = await self.generate_completion(
475 |                     prompt=prompt,
476 |                     messages=messages,
477 |                     model=model,
478 |                     max_tokens=max_tokens,
479 |                     temperature=temperature,
480 |                     json_mode=json_mode,
481 |                     **kwargs
482 |                 )
483 |                 
484 |                 yield completion.text, {
485 |                     "model": model,
486 |                     "provider": self.provider_name,
487 |                     "chunk_index": 1,
488 |                     "is_fallback": True
489 |                 }
490 |                 total_chunks = 1
491 |                 
492 |                 # Skip the rest of the streaming logic
493 |                 raise StopAsyncIteration() from e
494 |             
495 |         except Exception as e:
496 |             self.logger.error(
497 |                 f"Gemini streaming completion failed: {str(e)}",
498 |                 emoji_key="error",
499 |                 model=model
500 |             )
501 |             # Yield error info in final chunk
502 |             yield "", {
503 |                 "model": model,
504 |                 "provider": self.provider_name,
505 |                 "chunk_index": total_chunks + 1,
506 |                 "error": f"{type(e).__name__}: {str(e)}",
507 |                 "processing_time": time.time() - start_time,
508 |                 "finish_reason": "error"
509 |             }
510 |             
511 |     async def list_models(self) -> List[Dict[str, Any]]:
512 |         """List available Gemini models.
513 |         
514 |         Returns:
515 |             List of model information dictionaries
516 |         """
517 |         # Gemini doesn't have a comprehensive models endpoint, so we return a static list
518 |         if self.models_cache:
519 |             return self.models_cache
520 |             
521 |         models = [
522 |             {
523 |                 "id": "gemini-2.0-flash-lite",
524 |                 "provider": self.provider_name,
525 |                 "description": "Fastest and most efficient Gemini model",
526 |             },
527 |             {
528 |                 "id": "gemini-2.0-flash",
529 |                 "provider": self.provider_name,
530 |                 "description": "Fast Gemini model with good quality",
531 |             },
532 |             {
533 |                 "id": "gemini-2.0-pro",
534 |                 "provider": self.provider_name,
535 |                 "description": "More capable Gemini model",
536 |             },
537 |             {
538 |                 "id": "gemini-2.5-pro-preview-03-25",
539 |                 "provider": self.provider_name,
540 |                 "description": "Most capable Gemini model",
541 |             },
542 |         ]
543 |         
544 |         # Cache results
545 |         self.models_cache = models
546 |         
547 |         return models
548 |             
549 |     def get_default_model(self) -> str:
550 |         """Get the default Gemini model.
551 |         
552 |         Returns:
553 |             Default model name
554 |         """
555 |         from ultimate_mcp_server.config import get_config
556 |         
557 |         # Safely get from config if available
558 |         try:
559 |             config = get_config()
560 |             provider_config = getattr(config, 'providers', {}).get(self.provider_name, None)
561 |             if provider_config and provider_config.default_model:
562 |                 return provider_config.default_model
563 |         except (AttributeError, TypeError):
564 |             # Handle case when providers attribute doesn't exist or isn't a dict
565 |             pass
566 |             
567 |         # Otherwise return hard-coded default
568 |         return "gemini-2.0-flash-lite"
569 |         
570 |     async def check_api_key(self) -> bool:
571 |         """Check if the Gemini API key is valid.
572 |         
573 |         Returns:
574 |             bool: True if API key is valid
575 |         """
576 |         try:
577 |             # Try listing models to validate the API key
578 |             # Use the client's models API to check if API key is valid
579 |             self.client.models.list()
580 |             return True
581 |         except Exception:
582 |             return False
```

--------------------------------------------------------------------------------
/mcp_python_lib_docs.md:
--------------------------------------------------------------------------------

```markdown
  1 | # MCP Python SDK
  2 | 
  3 | <div align="center">
  4 | 
  5 | <strong>Python implementation of the Model Context Protocol (MCP)</strong>
  6 | 
  7 | [![PyPI][pypi-badge]][pypi-url]
  8 | [![MIT licensed][mit-badge]][mit-url]
  9 | [![Python Version][python-badge]][python-url]
 10 | [![Documentation][docs-badge]][docs-url]
 11 | [![Specification][spec-badge]][spec-url]
 12 | [![GitHub Discussions][discussions-badge]][discussions-url]
 13 | 
 14 | </div>
 15 | 
 16 | <!-- omit in toc -->
 17 | ## Table of Contents
 18 | 
 19 | - [MCP Python SDK](#mcp-python-sdk)
 20 |   - [Overview](#overview)
 21 |   - [Installation](#installation)
 22 |     - [Adding MCP to your python project](#adding-mcp-to-your-python-project)
 23 |     - [Running the standalone MCP development tools](#running-the-standalone-mcp-development-tools)
 24 |   - [Quickstart](#quickstart)
 25 |   - [What is MCP?](#what-is-mcp)
 26 |   - [Core Concepts](#core-concepts)
 27 |     - [Server](#server)
 28 |     - [Resources](#resources)
 29 |     - [Tools](#tools)
 30 |     - [Prompts](#prompts)
 31 |     - [Images](#images)
 32 |     - [Context](#context)
 33 |   - [Running Your Server](#running-your-server)
 34 |     - [Development Mode](#development-mode)
 35 |     - [Claude Desktop Integration](#claude-desktop-integration)
 36 |     - [Direct Execution](#direct-execution)
 37 |     - [Mounting to an Existing ASGI Server](#mounting-to-an-existing-asgi-server)
 38 |   - [Examples](#examples)
 39 |     - [Echo Server](#echo-server)
 40 |     - [SQLite Explorer](#sqlite-explorer)
 41 |   - [Advanced Usage](#advanced-usage)
 42 |     - [Low-Level Server](#low-level-server)
 43 |     - [Writing MCP Clients](#writing-mcp-clients)
 44 |     - [MCP Primitives](#mcp-primitives)
 45 |     - [Server Capabilities](#server-capabilities)
 46 |   - [Documentation](#documentation)
 47 |   - [Contributing](#contributing)
 48 |   - [License](#license)
 49 | 
 50 | [pypi-badge]: https://img.shields.io/pypi/v/mcp.svg
 51 | [pypi-url]: https://pypi.org/project/mcp/
 52 | [mit-badge]: https://img.shields.io/pypi/l/mcp.svg
 53 | [mit-url]: https://github.com/modelcontextprotocol/python-sdk/blob/main/LICENSE
 54 | [python-badge]: https://img.shields.io/pypi/pyversions/mcp.svg
 55 | [python-url]: https://www.python.org/downloads/
 56 | [docs-badge]: https://img.shields.io/badge/docs-modelcontextprotocol.io-blue.svg
 57 | [docs-url]: https://modelcontextprotocol.io
 58 | [spec-badge]: https://img.shields.io/badge/spec-spec.modelcontextprotocol.io-blue.svg
 59 | [spec-url]: https://spec.modelcontextprotocol.io
 60 | [discussions-badge]: https://img.shields.io/github/discussions/modelcontextprotocol/python-sdk
 61 | [discussions-url]: https://github.com/modelcontextprotocol/python-sdk/discussions
 62 | 
 63 | ## Overview
 64 | 
 65 | The Model Context Protocol allows applications to provide context for LLMs in a standardized way, separating the concerns of providing context from the actual LLM interaction. This Python SDK implements the full MCP specification, making it easy to:
 66 | 
 67 | - Build MCP clients that can connect to any MCP server
 68 | - Create MCP servers that expose resources, prompts and tools
 69 | - Use standard transports like stdio and SSE
 70 | - Handle all MCP protocol messages and lifecycle events
 71 | 
 72 | ## Installation
 73 | 
 74 | ### Adding MCP to your python project
 75 | 
 76 | We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects. In a uv managed python project, add mcp to dependencies by:
 77 | 
 78 | ```bash
 79 | uv add "mcp[cli]"
 80 | ```
 81 | 
 82 | Alternatively, for projects using pip for dependencies:
 83 | ```bash
 84 | pip install mcp
 85 | ```
 86 | 
 87 | ### Running the standalone MCP development tools
 88 | 
 89 | To run the mcp command with uv:
 90 | 
 91 | ```bash
 92 | uv run mcp
 93 | ```
 94 | 
 95 | ## Quickstart
 96 | 
 97 | Let's create a simple MCP server that exposes a calculator tool and some data:
 98 | 
 99 | ```python
100 | # server.py
101 | from mcp.server.fastmcp import FastMCP
102 | 
103 | # Create an MCP server
104 | mcp = FastMCP("Demo")
105 | 
106 | 
107 | # Add an addition tool
108 | @mcp.tool()
109 | def add(a: int, b: int) -> int:
110 |     """Add two numbers"""
111 |     return a + b
112 | 
113 | 
114 | # Add a dynamic greeting resource
115 | @mcp.resource("greeting://{name}")
116 | def get_greeting(name: str) -> str:
117 |     """Get a personalized greeting"""
118 |     return f"Hello, {name}!"
119 | ```
120 | 
121 | You can install this server in [Claude Desktop](https://claude.ai/download) and interact with it right away by running:
122 | ```bash
123 | mcp install server.py
124 | ```
125 | 
126 | Alternatively, you can test it with the MCP Inspector:
127 | ```bash
128 | mcp dev server.py
129 | ```
130 | 
131 | ## What is MCP?
132 | 
133 | The [Model Context Protocol (MCP)](https://modelcontextprotocol.io) lets you build servers that expose data and functionality to LLM applications in a secure, standardized way. Think of it like a web API, but specifically designed for LLM interactions. MCP servers can:
134 | 
135 | - Expose data through **Resources** (think of these sort of like GET endpoints; they are used to load information into the LLM's context)
136 | - Provide functionality through **Tools** (sort of like POST endpoints; they are used to execute code or otherwise produce a side effect)
137 | - Define interaction patterns through **Prompts** (reusable templates for LLM interactions)
138 | - And more!
139 | 
140 | ## Core Concepts
141 | 
142 | ### Server
143 | 
144 | The FastMCP server is your core interface to the MCP protocol. It handles connection management, protocol compliance, and message routing:
145 | 
146 | ```python
147 | # Add lifespan support for startup/shutdown with strong typing
148 | from contextlib import asynccontextmanager
149 | from collections.abc import AsyncIterator
150 | from dataclasses import dataclass
151 | 
152 | from fake_database import Database  # Replace with your actual DB type
153 | 
154 | from mcp.server.fastmcp import Context, FastMCP
155 | 
156 | # Create a named server
157 | mcp = FastMCP("My App")
158 | 
159 | # Specify dependencies for deployment and development
160 | mcp = FastMCP("My App", dependencies=["pandas", "numpy"])
161 | 
162 | 
163 | @dataclass
164 | class AppContext:
165 |     db: Database
166 | 
167 | 
168 | @asynccontextmanager
169 | async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
170 |     """Manage application lifecycle with type-safe context"""
171 |     # Initialize on startup
172 |     db = await Database.connect()
173 |     try:
174 |         yield AppContext(db=db)
175 |     finally:
176 |         # Cleanup on shutdown
177 |         await db.disconnect()
178 | 
179 | 
180 | # Pass lifespan to server
181 | mcp = FastMCP("My App", lifespan=app_lifespan)
182 | 
183 | 
184 | # Access type-safe lifespan context in tools
185 | @mcp.tool()
186 | def query_db(ctx: Context) -> str:
187 |     """Tool that uses initialized resources"""
188 |     db = ctx.request_context.lifespan_context["db"]
189 |     return db.query()
190 | ```
191 | 
192 | ### Resources
193 | 
194 | Resources are how you expose data to LLMs. They're similar to GET endpoints in a REST API - they provide data but shouldn't perform significant computation or have side effects:
195 | 
196 | ```python
197 | from mcp.server.fastmcp import FastMCP
198 | 
199 | mcp = FastMCP("My App")
200 | 
201 | 
202 | @mcp.resource("config://app")
203 | def get_config() -> str:
204 |     """Static configuration data"""
205 |     return "App configuration here"
206 | 
207 | 
208 | @mcp.resource("users://{user_id}/profile")
209 | def get_user_profile(user_id: str) -> str:
210 |     """Dynamic user data"""
211 |     return f"Profile data for user {user_id}"
212 | ```
213 | 
214 | ### Tools
215 | 
216 | Tools let LLMs take actions through your server. Unlike resources, tools are expected to perform computation and have side effects:
217 | 
218 | ```python
219 | import httpx
220 | from mcp.server.fastmcp import FastMCP
221 | 
222 | mcp = FastMCP("My App")
223 | 
224 | 
225 | @mcp.tool()
226 | def calculate_bmi(weight_kg: float, height_m: float) -> float:
227 |     """Calculate BMI given weight in kg and height in meters"""
228 |     return weight_kg / (height_m**2)
229 | 
230 | 
231 | @mcp.tool()
232 | async def fetch_weather(city: str) -> str:
233 |     """Fetch current weather for a city"""
234 |     async with httpx.AsyncClient() as client:
235 |         response = await client.get(f"https://api.weather.com/{city}")
236 |         return response.text
237 | ```
238 | 
239 | ### Prompts
240 | 
241 | Prompts are reusable templates that help LLMs interact with your server effectively:
242 | 
243 | ```python
244 | from mcp.server.fastmcp import FastMCP
245 | from mcp.server.fastmcp.prompts import base
246 | 
247 | mcp = FastMCP("My App")
248 | 
249 | 
250 | @mcp.prompt()
251 | def review_code(code: str) -> str:
252 |     return f"Please review this code:\n\n{code}"
253 | 
254 | 
255 | @mcp.prompt()
256 | def debug_error(error: str) -> list[base.Message]:
257 |     return [
258 |         base.UserMessage("I'm seeing this error:"),
259 |         base.UserMessage(error),
260 |         base.AssistantMessage("I'll help debug that. What have you tried so far?"),
261 |     ]
262 | ```
263 | 
264 | ### Images
265 | 
266 | FastMCP provides an `Image` class that automatically handles image data:
267 | 
268 | ```python
269 | from mcp.server.fastmcp import FastMCP, Image
270 | from PIL import Image as PILImage
271 | 
272 | mcp = FastMCP("My App")
273 | 
274 | 
275 | @mcp.tool()
276 | def create_thumbnail(image_path: str) -> Image:
277 |     """Create a thumbnail from an image"""
278 |     img = PILImage.open(image_path)
279 |     img.thumbnail((100, 100))
280 |     return Image(data=img.tobytes(), format="png")
281 | ```
282 | 
283 | ### Context
284 | 
285 | The Context object gives your tools and resources access to MCP capabilities:
286 | 
287 | ```python
288 | from mcp.server.fastmcp import FastMCP, Context
289 | 
290 | mcp = FastMCP("My App")
291 | 
292 | 
293 | @mcp.tool()
294 | async def long_task(files: list[str], ctx: Context) -> str:
295 |     """Process multiple files with progress tracking"""
296 |     for i, file in enumerate(files):
297 |         ctx.info(f"Processing {file}")
298 |         await ctx.report_progress(i, len(files))
299 |         data, mime_type = await ctx.read_resource(f"file://{file}")
300 |     return "Processing complete"
301 | ```
302 | 
303 | ## Running Your Server
304 | 
305 | ### Development Mode
306 | 
307 | The fastest way to test and debug your server is with the MCP Inspector:
308 | 
309 | ```bash
310 | mcp dev server.py
311 | 
312 | # Add dependencies
313 | mcp dev server.py --with pandas --with numpy
314 | 
315 | # Mount local code
316 | mcp dev server.py --with-editable .
317 | ```
318 | 
319 | ### Claude Desktop Integration
320 | 
321 | Once your server is ready, install it in Claude Desktop:
322 | 
323 | ```bash
324 | mcp install server.py
325 | 
326 | # Custom name
327 | mcp install server.py --name "My Analytics Server"
328 | 
329 | # Environment variables
330 | mcp install server.py -v API_KEY=abc123 -v DB_URL=postgres://...
331 | mcp install server.py -f .env
332 | ```
333 | 
334 | ### Direct Execution
335 | 
336 | For advanced scenarios like custom deployments:
337 | 
338 | ```python
339 | from mcp.server.fastmcp import FastMCP
340 | 
341 | mcp = FastMCP("My App")
342 | 
343 | if __name__ == "__main__":
344 |     mcp.run()
345 | ```
346 | 
347 | Run it with:
348 | ```bash
349 | python server.py
350 | # or
351 | mcp run server.py
352 | ```
353 | 
354 | ### Mounting to an Existing ASGI Server
355 | 
356 | You can mount the SSE server to an existing ASGI server using the `sse_app` method. This allows you to integrate the SSE server with other ASGI applications.
357 | 
358 | ```python
359 | from starlette.applications import Starlette
360 | from starlette.routes import Mount, Host
361 | from mcp.server.fastmcp import FastMCP
362 | 
363 | 
364 | mcp = FastMCP("My App")
365 | 
366 | # Mount the SSE server to the existing ASGI server
367 | app = Starlette(
368 |     routes=[
369 |         Mount('/', app=mcp.sse_app()),
370 |     ]
371 | )
372 | 
373 | # or dynamically mount as host
374 | app.router.routes.append(Host('mcp.acme.corp', app=mcp.sse_app()))
375 | ```
376 | 
377 | For more information on mounting applications in Starlette, see the [Starlette documentation](https://www.starlette.io/routing/#submounting-routes).
378 | 
379 | ## Examples
380 | 
381 | ### Echo Server
382 | 
383 | A simple server demonstrating resources, tools, and prompts:
384 | 
385 | ```python
386 | from mcp.server.fastmcp import FastMCP
387 | 
388 | mcp = FastMCP("Echo")
389 | 
390 | 
391 | @mcp.resource("echo://{message}")
392 | def echo_resource(message: str) -> str:
393 |     """Echo a message as a resource"""
394 |     return f"Resource echo: {message}"
395 | 
396 | 
397 | @mcp.tool()
398 | def echo_tool(message: str) -> str:
399 |     """Echo a message as a tool"""
400 |     return f"Tool echo: {message}"
401 | 
402 | 
403 | @mcp.prompt()
404 | def echo_prompt(message: str) -> str:
405 |     """Create an echo prompt"""
406 |     return f"Please process this message: {message}"
407 | ```
408 | 
409 | ### SQLite Explorer
410 | 
411 | A more complex example showing database integration:
412 | 
413 | ```python
414 | import sqlite3
415 | 
416 | from mcp.server.fastmcp import FastMCP
417 | 
418 | mcp = FastMCP("SQLite Explorer")
419 | 
420 | 
421 | @mcp.resource("schema://main")
422 | def get_schema() -> str:
423 |     """Provide the database schema as a resource"""
424 |     conn = sqlite3.connect("database.db")
425 |     schema = conn.execute("SELECT sql FROM sqlite_master WHERE type='table'").fetchall()
426 |     return "\n".join(sql[0] for sql in schema if sql[0])
427 | 
428 | 
429 | @mcp.tool()
430 | def query_data(sql: str) -> str:
431 |     """Execute SQL queries safely"""
432 |     conn = sqlite3.connect("database.db")
433 |     try:
434 |         result = conn.execute(sql).fetchall()
435 |         return "\n".join(str(row) for row in result)
436 |     except Exception as e:
437 |         return f"Error: {str(e)}"
438 | ```
439 | 
440 | ## Advanced Usage
441 | 
442 | ### Low-Level Server
443 | 
444 | For more control, you can use the low-level server implementation directly. This gives you full access to the protocol and allows you to customize every aspect of your server, including lifecycle management through the lifespan API:
445 | 
446 | ```python
447 | from contextlib import asynccontextmanager
448 | from collections.abc import AsyncIterator
449 | 
450 | from fake_database import Database  # Replace with your actual DB type
451 | 
452 | from mcp.server import Server
453 | 
454 | 
455 | @asynccontextmanager
456 | async def server_lifespan(server: Server) -> AsyncIterator[dict]:
457 |     """Manage server startup and shutdown lifecycle."""
458 |     # Initialize resources on startup
459 |     db = await Database.connect()
460 |     try:
461 |         yield {"db": db}
462 |     finally:
463 |         # Clean up on shutdown
464 |         await db.disconnect()
465 | 
466 | 
467 | # Pass lifespan to server
468 | server = Server("example-server", lifespan=server_lifespan)
469 | 
470 | 
471 | # Access lifespan context in handlers
472 | @server.call_tool()
473 | async def query_db(name: str, arguments: dict) -> list:
474 |     ctx = server.request_context
475 |     db = ctx.lifespan_context["db"]
476 |     return await db.query(arguments["query"])
477 | ```
478 | 
479 | The lifespan API provides:
480 | - A way to initialize resources when the server starts and clean them up when it stops
481 | - Access to initialized resources through the request context in handlers
482 | - Type-safe context passing between lifespan and request handlers
483 | 
484 | ```python
485 | import mcp.server.stdio
486 | import mcp.types as types
487 | from mcp.server.lowlevel import NotificationOptions, Server
488 | from mcp.server.models import InitializationOptions
489 | 
490 | # Create a server instance
491 | server = Server("example-server")
492 | 
493 | 
494 | @server.list_prompts()
495 | async def handle_list_prompts() -> list[types.Prompt]:
496 |     return [
497 |         types.Prompt(
498 |             name="example-prompt",
499 |             description="An example prompt template",
500 |             arguments=[
501 |                 types.PromptArgument(
502 |                     name="arg1", description="Example argument", required=True
503 |                 )
504 |             ],
505 |         )
506 |     ]
507 | 
508 | 
509 | @server.get_prompt()
510 | async def handle_get_prompt(
511 |     name: str, arguments: dict[str, str] | None
512 | ) -> types.GetPromptResult:
513 |     if name != "example-prompt":
514 |         raise ValueError(f"Unknown prompt: {name}")
515 | 
516 |     return types.GetPromptResult(
517 |         description="Example prompt",
518 |         messages=[
519 |             types.PromptMessage(
520 |                 role="user",
521 |                 content=types.TextContent(type="text", text="Example prompt text"),
522 |             )
523 |         ],
524 |     )
525 | 
526 | 
527 | async def run():
528 |     async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
529 |         await server.run(
530 |             read_stream,
531 |             write_stream,
532 |             InitializationOptions(
533 |                 server_name="example",
534 |                 server_version="0.1.0",
535 |                 capabilities=server.get_capabilities(
536 |                     notification_options=NotificationOptions(),
537 |                     experimental_capabilities={},
538 |                 ),
539 |             ),
540 |         )
541 | 
542 | 
543 | if __name__ == "__main__":
544 |     import asyncio
545 | 
546 |     asyncio.run(run())
547 | ```
548 | 
549 | ### Writing MCP Clients
550 | 
551 | The SDK provides a high-level client interface for connecting to MCP servers:
552 | 
553 | ```python
554 | from mcp import ClientSession, StdioServerParameters, types
555 | from mcp.client.stdio import stdio_client
556 | 
557 | # Create server parameters for stdio connection
558 | server_params = StdioServerParameters(
559 |     command="python",  # Executable
560 |     args=["example_server.py"],  # Optional command line arguments
561 |     env=None,  # Optional environment variables
562 | )
563 | 
564 | 
565 | # Optional: create a sampling callback
566 | async def handle_sampling_message(
567 |     message: types.CreateMessageRequestParams,
568 | ) -> types.CreateMessageResult:
569 |     return types.CreateMessageResult(
570 |         role="assistant",
571 |         content=types.TextContent(
572 |             type="text",
573 |             text="Hello, world! from model",
574 |         ),
575 |         model="gpt-4.1-mini",
576 |         stopReason="endTurn",
577 |     )
578 | 
579 | 
580 | async def run():
581 |     async with stdio_client(server_params) as (read, write):
582 |         async with ClientSession(
583 |             read, write, sampling_callback=handle_sampling_message
584 |         ) as session:
585 |             # Initialize the connection
586 |             await session.initialize()
587 | 
588 |             # List available prompts
589 |             prompts = await session.list_prompts()
590 | 
591 |             # Get a prompt
592 |             prompt = await session.get_prompt(
593 |                 "example-prompt", arguments={"arg1": "value"}
594 |             )
595 | 
596 |             # List available resources
597 |             resources = await session.list_resources()
598 | 
599 |             # List available tools
600 |             tools = await session.list_tools()
601 | 
602 |             # Read a resource
603 |             content, mime_type = await session.read_resource("file://some/path")
604 | 
605 |             # Call a tool
606 |             result = await session.call_tool("tool-name", arguments={"arg1": "value"})
607 | 
608 | 
609 | if __name__ == "__main__":
610 |     import asyncio
611 | 
612 |     asyncio.run(run())
613 | ```
614 | 
615 | ### MCP Primitives
616 | 
617 | The MCP protocol defines three core primitives that servers can implement:
618 | 
619 | | Primitive | Control               | Description                                         | Example Use                  |
620 | |-----------|-----------------------|-----------------------------------------------------|------------------------------|
621 | | Prompts   | User-controlled       | Interactive templates invoked by user choice        | Slash commands, menu options |
622 | | Resources | Application-controlled| Contextual data managed by the client application   | File contents, API responses |
623 | | Tools     | Model-controlled      | Functions exposed to the LLM to take actions        | API calls, data updates      |
624 | 
625 | ### Server Capabilities
626 | 
627 | MCP servers declare capabilities during initialization:
628 | 
629 | | Capability  | Feature Flag                 | Description                        |
630 | |-------------|------------------------------|------------------------------------|
631 | | `prompts`   | `listChanged`                | Prompt template management         |
632 | | `resources` | `subscribe`<br/>`listChanged`| Resource exposure and updates      |
633 | | `tools`     | `listChanged`                | Tool discovery and execution       |
634 | | `logging`   | -                            | Server logging configuration       |
635 | | `completion`| -                            | Argument completion suggestions    |
636 | 
637 | ## Tool Composition Patterns
638 | 
639 | When building complex workflows with MCP, effectively chaining tools together is crucial for success:
640 | 
641 | ```python
642 | from mcp.server.fastmcp import FastMCP, Context
643 | 
644 | mcp = FastMCP("Analytics Pipeline")
645 | 
646 | @mcp.tool()
647 | async def fetch_data(source: str, date_range: str, ctx: Context) -> str:
648 |     """Fetch raw data from a source for analysis"""
649 |     # Fetch operation that might be slow
650 |     await ctx.report_progress(0.3, 1.0)
651 |     return f"Data from {source} for {date_range}"
652 | 
653 | @mcp.tool()
654 | def transform_data(raw_data: str, format_type: str = "json") -> dict:
655 |     """Transform raw data into structured format"""
656 |     # Data transformation logic
657 |     return {"processed": raw_data, "format": format_type}
658 | 
659 | @mcp.tool()
660 | def analyze_data(data: dict, metric: str) -> str:
661 |     """Analyze transformed data with specific metrics"""
662 |     # Analysis logic
663 |     return f"Analysis of {metric}: Result based on {data['processed']}"
664 | 
665 | # Usage pattern (for LLMs):
666 | # 1. First fetch the raw data
667 | # 2. Transform the fetched data
668 | # 3. Then analyze the transformed result
669 | ```
670 | 
671 | **Pattern: Sequential Dependency Chain**
672 | ```
673 | fetch_data → transform_data → analyze_data
674 | ```
675 | 
676 | **Pattern: Parallel Processing with Aggregation**
677 | ```python
678 | @mcp.tool()
679 | async def parallel_process(sources: list[str], ctx: Context) -> dict:
680 |     """Process multiple sources in parallel and aggregate results"""
681 |     results = {}
682 |     for i, source in enumerate(sources):
683 |         # Get data for each source (these could be separate tool calls)
684 |         data = await fetch_data(source, "last_week", ctx)
685 |         transformed = transform_data(data)
686 |         results[source] = transformed
687 |         await ctx.report_progress(i / len(sources), 1.0)
688 |     return results
689 | ```
690 | 
691 | ## Error Recovery Strategies
692 | 
693 | When tools fail or return unexpected results, LLMs should follow these recovery patterns:
694 | 
695 | **Strategy: Retry with Backoff**
696 | ```python
697 | @mcp.tool()
698 | async def resilient_operation(resource_id: str, ctx: Context) -> str:
699 |     """Example of resilient operation with retry logic"""
700 |     MAX_ATTEMPTS = 3
701 |     for attempt in range(1, MAX_ATTEMPTS + 1):
702 |         try:
703 |             # Attempt the operation
704 |             return f"Successfully processed {resource_id}"
705 |         except Exception as e:
706 |             if attempt == MAX_ATTEMPTS:
707 |                 # If final attempt, report the failure clearly
708 |                 ctx.warning(f"Operation failed after {MAX_ATTEMPTS} attempts: {str(e)}")
709 |                 return f"ERROR: Could not process {resource_id} - {str(e)}"
710 |             # For earlier attempts, log and retry
711 |             ctx.info(f"Attempt {attempt} failed, retrying...")
712 |             await asyncio.sleep(2 ** attempt)  # Exponential backoff
713 | ```
714 | 
715 | **Strategy: Fallback Chain**
716 | ```python
717 | @mcp.tool()
718 | async def get_data_with_fallbacks(primary_source: str, fallback_sources: list[str] = None) -> dict:
719 |     """Try multiple data sources in order until one succeeds"""
720 |     sources = [primary_source] + (fallback_sources or [])
721 |     
722 |     errors = []
723 |     for source in sources:
724 |         try:
725 |             # Try to get data from this source
726 |             result = {"source": source, "data": f"Data from {source}"}
727 |             return result
728 |         except Exception as e:
729 |             # Record the error and try the next source
730 |             errors.append(f"{source}: {str(e)}")
731 |     
732 |     # If all sources failed, return a clear error with history
733 |     return {"error": "All sources failed", "attempts": errors}
734 | ```
735 | 
736 | **Error Reporting Best Practices**
737 | - Always return structured error information (not just exception text)
738 | - Include specific error codes when possible
739 | - Provide actionable suggestions for recovery
740 | - Log detailed error context for debugging
741 | 
742 | ## Resource Selection Optimization
743 | 
744 | Efficiently managing resources within context limits requires strategic selection:
745 | 
746 | **Progressive Loading Pattern**
747 | ```python
748 | @mcp.tool()
749 | async def analyze_document(doc_uri: str, ctx: Context) -> str:
750 |     """Analyze a document with progressively loaded sections"""
751 |     # First load metadata for quick access
752 |     metadata = await ctx.read_resource(f"{doc_uri}/metadata")
753 |     
754 |     # Based on metadata, selectively load relevant sections
755 |     relevant_sections = identify_relevant_sections(metadata)
756 |     
757 |     # Only load sections that are actually needed
758 |     section_data = {}
759 |     for section in relevant_sections:
760 |         section_data[section] = await ctx.read_resource(f"{doc_uri}/sections/{section}")
761 |     
762 |     # Process with only the necessary context
763 |     return f"Analysis of {len(section_data)} relevant sections"
764 | ```
765 | 
766 | **Context Budget Management**
767 | ```python
768 | @mcp.tool()
769 | async def summarize_large_dataset(dataset_uri: str, ctx: Context) -> str:
770 |     """Summarize a large dataset while respecting context limits"""
771 |     # Get total size to plan the approach
772 |     metadata = await ctx.read_resource(f"{dataset_uri}/metadata")
773 |     total_size = metadata.get("size_kb", 0)
774 |     
775 |     if total_size > 100:  # Arbitrary threshold
776 |         # For large datasets, use chunking approach
777 |         chunks = await ctx.read_resource(f"{dataset_uri}/summary_chunks")
778 |         return f"Summary of {len(chunks)} chunks: {', '.join(chunks)}"
779 |     else:
780 |         # For smaller datasets, process everything at once
781 |         full_data = await ctx.read_resource(dataset_uri)
782 |         return f"Complete analysis of {dataset_uri}"
783 | ```
784 | 
785 | **Resource Relevance Filtering**
786 | - Focus on the most recent/relevant data first
787 | - Filter resources to match the specific query intent
788 | - Use metadata to decide which resources to load
789 | - Prefer sampling representative data over loading everything
790 | 
791 | ## Documentation
792 | 
793 | - [Model Context Protocol documentation](https://modelcontextprotocol.io)
794 | - [Model Context Protocol specification](https://spec.modelcontextprotocol.io)
795 | - [Officially supported servers](https://github.com/modelcontextprotocol/servers)
796 | 
797 | ## Contributing
798 | 
799 | We are passionate about supporting contributors of all levels of experience and would love to see you get involved in the project. See the [contributing guide](CONTRIBUTING.md) to get started.
800 | 
801 | ## License
802 | 
803 | This project is licensed under the MIT License - see the LICENSE file for details.
```
Page 8/45FirstPrevNextLast