#
tokens: 48467/50000 10/207 files (page 6/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 6 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/redline-compiled.css:
--------------------------------------------------------------------------------

```css
*,:after,:before{--tw-border-spacing-x:0;--tw-border-spacing-y:0;--tw-translate-x:0;--tw-translate-y:0;--tw-rotate:0;--tw-skew-x:0;--tw-skew-y:0;--tw-scale-x:1;--tw-scale-y:1;--tw-pan-x: ;--tw-pan-y: ;--tw-pinch-zoom: ;--tw-scroll-snap-strictness:proximity;--tw-gradient-from-position: ;--tw-gradient-via-position: ;--tw-gradient-to-position: ;--tw-ordinal: ;--tw-slashed-zero: ;--tw-numeric-figure: ;--tw-numeric-spacing: ;--tw-numeric-fraction: ;--tw-ring-inset: ;--tw-ring-offset-width:0px;--tw-ring-offset-color:#fff;--tw-ring-color:rgba(59,130,246,.5);--tw-ring-offset-shadow:0 0 #0000;--tw-ring-shadow:0 0 #0000;--tw-shadow:0 0 #0000;--tw-shadow-colored:0 0 #0000;--tw-blur: ;--tw-brightness: ;--tw-contrast: ;--tw-grayscale: ;--tw-hue-rotate: ;--tw-invert: ;--tw-saturate: ;--tw-sepia: ;--tw-drop-shadow: ;--tw-backdrop-blur: ;--tw-backdrop-brightness: ;--tw-backdrop-contrast: ;--tw-backdrop-grayscale: ;--tw-backdrop-hue-rotate: ;--tw-backdrop-invert: ;--tw-backdrop-opacity: ;--tw-backdrop-saturate: ;--tw-backdrop-sepia: ;--tw-contain-size: ;--tw-contain-layout: ;--tw-contain-paint: ;--tw-contain-style: }::backdrop{--tw-border-spacing-x:0;--tw-border-spacing-y:0;--tw-translate-x:0;--tw-translate-y:0;--tw-rotate:0;--tw-skew-x:0;--tw-skew-y:0;--tw-scale-x:1;--tw-scale-y:1;--tw-pan-x: ;--tw-pan-y: ;--tw-pinch-zoom: ;--tw-scroll-snap-strictness:proximity;--tw-gradient-from-position: ;--tw-gradient-via-position: ;--tw-gradient-to-position: ;--tw-ordinal: ;--tw-slashed-zero: ;--tw-numeric-figure: ;--tw-numeric-spacing: ;--tw-numeric-fraction: ;--tw-ring-inset: ;--tw-ring-offset-width:0px;--tw-ring-offset-color:#fff;--tw-ring-color:rgba(59,130,246,.5);--tw-ring-offset-shadow:0 0 #0000;--tw-ring-shadow:0 0 #0000;--tw-shadow:0 0 #0000;--tw-shadow-colored:0 0 #0000;--tw-blur: ;--tw-brightness: ;--tw-contrast: ;--tw-grayscale: ;--tw-hue-rotate: ;--tw-invert: ;--tw-saturate: ;--tw-sepia: ;--tw-drop-shadow: ;--tw-backdrop-blur: ;--tw-backdrop-brightness: ;--tw-backdrop-contrast: ;--tw-backdrop-grayscale: ;--tw-backdrop-hue-rotate: ;--tw-backdrop-invert: ;--tw-backdrop-opacity: ;--tw-backdrop-saturate: ;--tw-backdrop-sepia: ;--tw-contain-size: ;--tw-contain-layout: ;--tw-contain-paint: ;--tw-contain-style: }
/*! tailwindcss v3.4.17 | MIT License | https://tailwindcss.com*/*,:after,:before{box-sizing:border-box;border:0 solid #e5e7eb}:after,:before{--tw-content:""}:host,html{line-height:1.5;-webkit-text-size-adjust:100%;-moz-tab-size:4;-o-tab-size:4;tab-size:4;font-family:ui-sans-serif,system-ui,sans-serif,Apple Color Emoji,Segoe UI Emoji,Segoe UI Symbol,Noto Color Emoji;font-feature-settings:normal;font-variation-settings:normal;-webkit-tap-highlight-color:transparent}body{margin:0;line-height:inherit}hr{height:0;color:inherit;border-top-width:1px}abbr:where([title]){-webkit-text-decoration:underline dotted;text-decoration:underline dotted}h1,h2,h3,h4,h5,h6{font-size:inherit;font-weight:inherit}a{color:inherit;text-decoration:inherit}b,strong{font-weight:bolder}code,kbd,pre,samp{font-family:ui-monospace,SFMono-Regular,Menlo,Monaco,Consolas,Liberation Mono,Courier New,monospace;font-feature-settings:normal;font-variation-settings:normal;font-size:1em}small{font-size:80%}sub,sup{font-size:75%;line-height:0;position:relative;vertical-align:baseline}sub{bottom:-.25em}sup{top:-.5em}table{text-indent:0;border-color:inherit;border-collapse:collapse}button,input,optgroup,select,textarea{font-family:inherit;font-feature-settings:inherit;font-variation-settings:inherit;font-size:100%;font-weight:inherit;line-height:inherit;letter-spacing:inherit;color:inherit;margin:0;padding:0}button,select{text-transform:none}button,input:where([type=button]),input:where([type=reset]),input:where([type=submit]){-webkit-appearance:button;background-color:transparent;background-image:none}:-moz-focusring{outline:auto}:-moz-ui-invalid{box-shadow:none}progress{vertical-align:baseline}::-webkit-inner-spin-button,::-webkit-outer-spin-button{height:auto}[type=search]{-webkit-appearance:textfield;outline-offset:-2px}::-webkit-search-decoration{-webkit-appearance:none}::-webkit-file-upload-button{-webkit-appearance:button;font:inherit}summary{display:list-item}blockquote,dd,dl,figure,h1,h2,h3,h4,h5,h6,hr,p,pre{margin:0}fieldset{margin:0}fieldset,legend{padding:0}menu,ol,ul{list-style:none;margin:0;padding:0}dialog{padding:0}textarea{resize:vertical}input::-moz-placeholder,textarea::-moz-placeholder{opacity:1;color:#9ca3af}input::placeholder,textarea::placeholder{opacity:1;color:#9ca3af}[role=button],button{cursor:pointer}:disabled{cursor:default}audio,canvas,embed,iframe,img,object,svg,video{display:block;vertical-align:middle}img,video{max-width:100%;height:auto}[hidden]:where(:not([hidden=until-found])){display:none}.\!container{width:100%!important}.container{width:100%}@media (min-width:640px){.\!container{max-width:640px!important}.container{max-width:640px}}@media (min-width:768px){.\!container{max-width:768px!important}.container{max-width:768px}}@media (min-width:1024px){.\!container{max-width:1024px!important}.container{max-width:1024px}}@media (min-width:1280px){.\!container{max-width:1280px!important}.container{max-width:1280px}}@media (min-width:1536px){.\!container{max-width:1536px!important}.container{max-width:1536px}}.diff-insert,ins.diff-insert,ins.diff-insert-text{--tw-bg-opacity:1;background-color:rgb(239 246 255/var(--tw-bg-opacity,1));--tw-text-opacity:1;color:rgb(30 64 175/var(--tw-text-opacity,1));text-decoration-line:none;--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(1px + var(--tw-ring-offset-width)) var(--tw-ring-color);--tw-ring-inset:inset;--tw-ring-color:rgba(147,197,253,.6)}.diff-delete,.diff-insert,del.diff-delete,del.diff-delete-text,ins.diff-insert,ins.diff-insert-text{border-radius:.125rem;padding-left:.125rem;padding-right:.125rem;box-shadow:var(--tw-ring-offset-shadow),var(--tw-ring-shadow),var(--tw-shadow,0 0 #0000)}.diff-delete,del.diff-delete,del.diff-delete-text{--tw-bg-opacity:1;background-color:rgb(255 241 242/var(--tw-bg-opacity,1));--tw-text-opacity:1;color:rgb(159 18 57/var(--tw-text-opacity,1));text-decoration-line:line-through;--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(1px + var(--tw-ring-offset-width)) var(--tw-ring-color);--tw-ring-inset:inset;--tw-ring-color:rgba(253,164,175,.6)}.diff-move-target,ins.diff-move-target{border-radius:.125rem;border-width:1px;--tw-border-opacity:1;border-color:rgb(110 231 183/var(--tw-border-opacity,1));--tw-bg-opacity:1;background-color:rgb(236 253 245/var(--tw-bg-opacity,1));padding-left:.125rem;padding-right:.125rem;--tw-text-opacity:1;color:rgb(6 78 59/var(--tw-text-opacity,1));--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(1px + var(--tw-ring-offset-width)) var(--tw-ring-color);box-shadow:var(--tw-ring-offset-shadow),var(--tw-ring-shadow),var(--tw-shadow,0 0 #0000);--tw-ring-color:rgba(52,211,153,.6)}.diff-move-source,del.diff-move-source{border-radius:.125rem;border:1px dashed rgba(52,211,153,.4);background-color:rgba(236,253,245,.5);padding-left:.125rem;padding-right:.125rem;color:rgba(6,95,70,.6);text-decoration-line:line-through}span.diff-attrib-change{border-bottom-width:1px;border-style:dotted;border-color:rgb(251 146 60/var(--tw-border-opacity,1));background-color:rgba(255,247,237,.4)}span.diff-attrib-change,span.diff-rename-node{--tw-border-opacity:1;padding-left:.125rem;padding-right:.125rem}span.diff-rename-node{border-bottom-width:2px;border-style:dotted;border-color:rgb(192 132 252/var(--tw-border-opacity,1));background-color:rgba(245,243,255,.4)}.visible{visibility:visible}.collapse{visibility:collapse}.static{position:static}.fixed{position:fixed}.absolute{position:absolute}.relative{position:relative}.bottom-10{bottom:2.5rem}.bottom-2{bottom:.5rem}.left-2{left:.5rem}.right-1{right:.25rem}.right-2{right:.5rem}.top-10{top:2.5rem}.top-2{top:.5rem}.isolate{isolation:isolate}.z-40{z-index:40}.z-50{z-index:50}.mx-auto{margin-left:auto;margin-right:auto}.ml-1{margin-left:.25rem}.ml-2{margin-left:.5rem}.mr-1{margin-right:.25rem}.block{display:block}.inline-block{display:inline-block}.inline{display:inline}.flex{display:flex}.\!table{display:table!important}.table{display:table}.grid{display:grid}.contents{display:contents}.hidden{display:none}.h-0\.5{height:.125rem}.h-3{height:.75rem}.h-4{height:1rem}.w-1{width:.25rem}.w-3{width:.75rem}.w-4{width:1rem}.max-w-3xl{max-width:48rem}.shrink{flex-shrink:1}.grow{flex-grow:1}.border-collapse{border-collapse:collapse}.transform{transform:translate(var(--tw-translate-x),var(--tw-translate-y)) rotate(var(--tw-rotate)) skewX(var(--tw-skew-x)) skewY(var(--tw-skew-y)) scaleX(var(--tw-scale-x)) scaleY(var(--tw-scale-y))}.cursor-pointer{cursor:pointer}.resize{resize:both}.flex-col{flex-direction:column}.flex-wrap{flex-wrap:wrap}.items-center{align-items:center}.gap-2{gap:.5rem}.truncate{overflow:hidden;text-overflow:ellipsis;white-space:nowrap}.rounded{border-radius:.25rem}.rounded-lg{border-radius:.5rem}.rounded-sm{border-radius:.125rem}.border{border-width:1px}.border-b{border-bottom-width:1px}.border-b-2{border-bottom-width:2px}.border-dashed{border-style:dashed}.border-dotted{border-style:dotted}.border-emerald-300{--tw-border-opacity:1;border-color:rgb(110 231 183/var(--tw-border-opacity,1))}.border-emerald-400{--tw-border-opacity:1;border-color:rgb(52 211 153/var(--tw-border-opacity,1))}.border-emerald-400\/40{border-color:rgba(52,211,153,.4)}.border-gray-300{--tw-border-opacity:1;border-color:rgb(209 213 219/var(--tw-border-opacity,1))}.border-orange-400{--tw-border-opacity:1;border-color:rgb(251 146 60/var(--tw-border-opacity,1))}.border-orange-500{--tw-border-opacity:1;border-color:rgb(249 115 22/var(--tw-border-opacity,1))}.border-purple-400{--tw-border-opacity:1;border-color:rgb(192 132 252/var(--tw-border-opacity,1))}.border-purple-500{--tw-border-opacity:1;border-color:rgb(168 85 247/var(--tw-border-opacity,1))}.bg-blue-100{--tw-bg-opacity:1;background-color:rgb(219 234 254/var(--tw-bg-opacity,1))}.bg-blue-50{--tw-bg-opacity:1;background-color:rgb(239 246 255/var(--tw-bg-opacity,1))}.bg-blue-500{--tw-bg-opacity:1;background-color:rgb(59 130 246/var(--tw-bg-opacity,1))}.bg-blue-900\/40{background-color:rgba(30,58,138,.4)}.bg-emerald-100{--tw-bg-opacity:1;background-color:rgb(209 250 229/var(--tw-bg-opacity,1))}.bg-emerald-100\/70{background-color:rgba(209,250,229,.7)}.bg-emerald-50{--tw-bg-opacity:1;background-color:rgb(236 253 245/var(--tw-bg-opacity,1))}.bg-emerald-50\/50{background-color:rgba(236,253,245,.5)}.bg-emerald-500{--tw-bg-opacity:1;background-color:rgb(16 185 129/var(--tw-bg-opacity,1))}.bg-emerald-900\/30{background-color:rgba(6,78,59,.3)}.bg-emerald-900\/40{background-color:rgba(6,78,59,.4)}.bg-gray-100{--tw-bg-opacity:1;background-color:rgb(243 244 246/var(--tw-bg-opacity,1))}.bg-gray-200{--tw-bg-opacity:1;background-color:rgb(229 231 235/var(--tw-bg-opacity,1))}.bg-orange-50{--tw-bg-opacity:1;background-color:rgb(255 247 237/var(--tw-bg-opacity,1))}.bg-orange-50\/40{background-color:rgba(255,247,237,.4)}.bg-rose-100{--tw-bg-opacity:1;background-color:rgb(255 228 230/var(--tw-bg-opacity,1))}.bg-rose-50{--tw-bg-opacity:1;background-color:rgb(255 241 242/var(--tw-bg-opacity,1))}.bg-rose-500{--tw-bg-opacity:1;background-color:rgb(244 63 94/var(--tw-bg-opacity,1))}.bg-rose-900\/40{background-color:rgba(136,19,55,.4)}.bg-transparent{background-color:transparent}.bg-violet-50\/40{background-color:rgba(245,243,255,.4)}.bg-white{--tw-bg-opacity:1;background-color:rgb(255 255 255/var(--tw-bg-opacity,1))}.bg-white\/90{background-color:hsla(0,0%,100%,.9)}.p-2{padding:.5rem}.px-0\.5{padding-left:.125rem;padding-right:.125rem}.px-1{padding-left:.25rem;padding-right:.25rem}.px-2{padding-left:.5rem;padding-right:.5rem}.px-4{padding-left:1rem;padding-right:1rem}.py-1{padding-top:.25rem;padding-bottom:.25rem}.py-8{padding-top:2rem;padding-bottom:2rem}.font-\[\'Newsreader\'\]{font-family:Newsreader}.text-\[0\.95em\]{font-size:.95em}.text-xs{font-size:.75rem;line-height:1rem}.uppercase{text-transform:uppercase}.lowercase{text-transform:lowercase}.capitalize{text-transform:capitalize}.italic{font-style:italic}.ordinal{--tw-ordinal:ordinal;font-variant-numeric:var(--tw-ordinal) var(--tw-slashed-zero) var(--tw-numeric-figure) var(--tw-numeric-spacing) var(--tw-numeric-fraction)}.text-black{--tw-text-opacity:1;color:rgb(0 0 0/var(--tw-text-opacity,1))}.text-blue-200{--tw-text-opacity:1;color:rgb(191 219 254/var(--tw-text-opacity,1))}.text-blue-800{--tw-text-opacity:1;color:rgb(30 64 175/var(--tw-text-opacity,1))}.text-emerald-200{--tw-text-opacity:1;color:rgb(167 243 208/var(--tw-text-opacity,1))}.text-emerald-300\/60{color:rgba(110,231,183,.6)}.text-emerald-800\/60{color:rgba(6,95,70,.6)}.text-emerald-900{--tw-text-opacity:1;color:rgb(6 78 59/var(--tw-text-opacity,1))}.text-gray-500{--tw-text-opacity:1;color:rgb(107 114 128/var(--tw-text-opacity,1))}.text-rose-200{--tw-text-opacity:1;color:rgb(254 205 211/var(--tw-text-opacity,1))}.text-rose-800{--tw-text-opacity:1;color:rgb(159 18 57/var(--tw-text-opacity,1))}.underline{text-decoration-line:underline}.overline{text-decoration-line:overline}.line-through{text-decoration-line:line-through}.no-underline{text-decoration-line:none}.decoration-2{text-decoration-thickness:2px}.underline-offset-4{text-underline-offset:4px}.shadow{--tw-shadow:0 1px 3px 0 rgba(0,0,0,.1),0 1px 2px -1px rgba(0,0,0,.1);--tw-shadow-colored:0 1px 3px 0 var(--tw-shadow-color),0 1px 2px -1px var(--tw-shadow-color)}.shadow,.shadow-lg{box-shadow:var(--tw-ring-offset-shadow,0 0 #0000),var(--tw-ring-shadow,0 0 #0000),var(--tw-shadow)}.shadow-lg{--tw-shadow:0 10px 15px -3px rgba(0,0,0,.1),0 4px 6px -4px rgba(0,0,0,.1);--tw-shadow-colored:0 10px 15px -3px var(--tw-shadow-color),0 4px 6px -4px var(--tw-shadow-color)}.outline{outline-style:solid}.ring{--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(3px + var(--tw-ring-offset-width)) var(--tw-ring-color)}.ring,.ring-0{box-shadow:var(--tw-ring-offset-shadow),var(--tw-ring-shadow),var(--tw-shadow,0 0 #0000)}.ring-0{--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(var(--tw-ring-offset-width)) var(--tw-ring-color)}.ring-1{--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(1px + var(--tw-ring-offset-width)) var(--tw-ring-color)}.ring-1,.ring-2{box-shadow:var(--tw-ring-offset-shadow),var(--tw-ring-shadow),var(--tw-shadow,0 0 #0000)}.ring-2{--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(2px + var(--tw-ring-offset-width)) var(--tw-ring-color)}.ring-inset{--tw-ring-inset:inset}.ring-black\/10{--tw-ring-color:rgba(0,0,0,.1)}.ring-blue-300{--tw-ring-opacity:1;--tw-ring-color:rgb(147 197 253/var(--tw-ring-opacity,1))}.ring-blue-300\/60{--tw-ring-color:rgba(147,197,253,.6)}.ring-emerald-300{--tw-ring-opacity:1;--tw-ring-color:rgb(110 231 183/var(--tw-ring-opacity,1))}.ring-emerald-400\/60{--tw-ring-color:rgba(52,211,153,.6)}.ring-emerald-500\/30{--tw-ring-color:rgba(16,185,129,.3)}.ring-orange-300{--tw-ring-opacity:1;--tw-ring-color:rgb(253 186 116/var(--tw-ring-opacity,1))}.ring-rose-300{--tw-ring-opacity:1;--tw-ring-color:rgb(253 164 175/var(--tw-ring-opacity,1))}.ring-rose-300\/60{--tw-ring-color:rgba(253,164,175,.6)}.ring-offset-1{--tw-ring-offset-width:1px}.blur{--tw-blur:blur(8px)}.blur,.grayscale{filter:var(--tw-blur) var(--tw-brightness) var(--tw-contrast) var(--tw-grayscale) var(--tw-hue-rotate) var(--tw-invert) var(--tw-saturate) var(--tw-sepia) var(--tw-drop-shadow)}.grayscale{--tw-grayscale:grayscale(100%)}.invert{--tw-invert:invert(100%)}.filter,.invert{filter:var(--tw-blur) var(--tw-brightness) var(--tw-contrast) var(--tw-grayscale) var(--tw-hue-rotate) var(--tw-invert) var(--tw-saturate) var(--tw-sepia) var(--tw-drop-shadow)}.backdrop-blur-sm{--tw-backdrop-blur:blur(4px);-webkit-backdrop-filter:var(--tw-backdrop-blur) var(--tw-backdrop-brightness) var(--tw-backdrop-contrast) var(--tw-backdrop-grayscale) var(--tw-backdrop-hue-rotate) var(--tw-backdrop-invert) var(--tw-backdrop-opacity) var(--tw-backdrop-saturate) var(--tw-backdrop-sepia);backdrop-filter:var(--tw-backdrop-blur) var(--tw-backdrop-brightness) var(--tw-backdrop-contrast) var(--tw-backdrop-grayscale) var(--tw-backdrop-hue-rotate) var(--tw-backdrop-invert) var(--tw-backdrop-opacity) var(--tw-backdrop-saturate) var(--tw-backdrop-sepia)}.transition{transition-property:color,background-color,border-color,text-decoration-color,fill,stroke,opacity,box-shadow,transform,filter,-webkit-backdrop-filter;transition-property:color,background-color,border-color,text-decoration-color,fill,stroke,opacity,box-shadow,transform,filter,backdrop-filter;transition-property:color,background-color,border-color,text-decoration-color,fill,stroke,opacity,box-shadow,transform,filter,backdrop-filter,-webkit-backdrop-filter;transition-timing-function:cubic-bezier(.4,0,.2,1);transition-duration:.15s}.transition-all{transition-property:all;transition-timing-function:cubic-bezier(.4,0,.2,1);transition-duration:.15s}.transition-colors{transition-property:color,background-color,border-color,text-decoration-color,fill,stroke;transition-timing-function:cubic-bezier(.4,0,.2,1);transition-duration:.15s}.duration-200{transition-duration:.2s}.ease-out{transition-timing-function:cubic-bezier(0,0,.2,1)}.\[_hdr_start\:_hdr_end\]{_hdr_start:hdr end}.\[_nonce_start\:_nonce_end\]{_nonce_start:nonce end}.\[a\:a\]{a:a}.\[ctx_start\:ctx_end\]{ctx_start:ctx end}.\[current_pos\:best_split_pos\]{current_pos:best split pos}.\[end-1\:end\]{end-1:end}.\[hl_start\:hl_end\]{hl_start:hl end}.\[i1\:i2\]{i1:i2}.\[i\:i\+3\]{i:i+3}.\[i\:i\+chunk_size\]{i:i+chunk size}.\[i\:i\+len\(pattern\)\]{i:i+len(pattern)}.\[inherit_ndx\:inherit_ndx\+1\]{inherit_ndx:inherit ndx+1}.\[j1\:j2\]{j1:j2}.\[json_start\:json_end\]{json_start:json end}.\[last\:start\]{last:start}.\[last_end\:start\]{last_end:start}.\[last_section_end\:first_match_start\]{last_section_end:first match start}.\[left\:right\]{left:right}.\[line_offset\:end_line\]{line_offset:end line}.\[offset\:next_offset\]{offset:next offset}.\[offset\:offset\+limit\]{offset:offset+limit}.\[pos\:end_pos\]{pos:end pos}.\[pos\:new_pos\]{pos:new pos}.\[position\:offset\]{position:offset}.\[position\:start\]{position:start}.\[search_region_start\:end_index\]{search_region_start:end index}.\[section_content_start\:section_content_end\]{section_content_start:section content end}.\[section_start\:section_end\]{section_start:section end}.\[start\:end\]{start:end}.\[start\:start\+1\]{start:start+1}.\[start_index\:best_split_index\]{start_index:best split index}.\[start_index\:end_index\]{start_index:end index}.\[start_pos\:pos\]{start_pos:pos}.\[user\:passwd\@\]{user:passwd@}.hover\:bg-gray-200:hover{--tw-bg-opacity:1;background-color:rgb(229 231 235/var(--tw-bg-opacity,1))}.hover\:bg-gray-300:hover{--tw-bg-opacity:1;background-color:rgb(209 213 219/var(--tw-bg-opacity,1))}.hover\:text-gray-700:hover{--tw-text-opacity:1;color:rgb(55 65 81/var(--tw-text-opacity,1))}@media (min-width:640px){.sm\:inline{display:inline}.sm\:hidden{display:none}}@media (min-width:768px){.md\:flex{display:flex}}@media (prefers-color-scheme:dark){.dark\:inline{display:inline}.dark\:hidden{display:none}.dark\:bg-blue-900\/60{background-color:rgba(30,58,138,.6)}.dark\:bg-emerald-900\/60{background-color:rgba(6,78,59,.6)}.dark\:bg-gray-700{--tw-bg-opacity:1;background-color:rgb(55 65 81/var(--tw-bg-opacity,1))}.dark\:bg-gray-800{--tw-bg-opacity:1;background-color:rgb(31 41 55/var(--tw-bg-opacity,1))}.dark\:bg-gray-800\/90{background-color:rgba(31,41,55,.9)}.dark\:bg-gray-900{--tw-bg-opacity:1;background-color:rgb(17 24 39/var(--tw-bg-opacity,1))}.dark\:bg-orange-900\/60{background-color:rgba(124,45,18,.6)}.dark\:bg-rose-900\/60{background-color:rgba(136,19,55,.6)}.dark\:text-gray-200{--tw-text-opacity:1;color:rgb(229 231 235/var(--tw-text-opacity,1))}.dark\:ring-blue-700{--tw-ring-opacity:1;--tw-ring-color:rgb(29 78 216/var(--tw-ring-opacity,1))}.dark\:ring-emerald-700{--tw-ring-opacity:1;--tw-ring-color:rgb(4 120 87/var(--tw-ring-opacity,1))}.dark\:ring-orange-700{--tw-ring-opacity:1;--tw-ring-color:rgb(194 65 12/var(--tw-ring-opacity,1))}.dark\:ring-rose-700{--tw-ring-opacity:1;--tw-ring-color:rgb(190 18 60/var(--tw-ring-opacity,1))}.dark\:hover\:bg-gray-600:hover{--tw-bg-opacity:1;background-color:rgb(75 85 99/var(--tw-bg-opacity,1))}}
```

--------------------------------------------------------------------------------
/examples/ollama_integration_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Ollama integration demonstration using Ultimate MCP Server."""
import asyncio
import sys
import time
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

# Import configuration system instead of using update_ollama_env
from ultimate_mcp_server.config import get_config

# Load the config to ensure environment variables from .env are read
config = get_config()

# Third-party imports
# These imports need to be below sys.path modification, which is why they have noqa comments
from rich import box  # noqa: E402
from rich.markup import escape  # noqa: E402
from rich.panel import Panel  # noqa: E402
from rich.rule import Rule  # noqa: E402
from rich.table import Table  # noqa: E402

# Project imports
from ultimate_mcp_server.constants import Provider  # noqa: E402
from ultimate_mcp_server.core.server import Gateway  # noqa: E402
from ultimate_mcp_server.utils import get_logger  # noqa: E402
from ultimate_mcp_server.utils.display import CostTracker  # Import CostTracker  # noqa: E402
from ultimate_mcp_server.utils.logging.console import console  # noqa: E402

# Initialize logger
logger = get_logger("example.ollama_integration_demo")


async def compare_ollama_models(tracker: CostTracker):
    """Compare different Ollama models."""
    console.print(Rule("[bold blue]🦙 Ollama Model Comparison[/bold blue]"))
    logger.info("Starting Ollama models comparison", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("ollama-demo", register_tools=False)
    
    try:
        # Initialize providers
        logger.info("Initializing providers...", emoji_key="provider")
        await gateway._initialize_providers()
        
        provider_name = Provider.OLLAMA.value
        try:
            # Get the provider from the gateway
            provider = gateway.providers.get(provider_name)
            if not provider:
                logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
                return
            
            logger.info(f"Using provider: {provider_name}", emoji_key="provider")
            
            models = await provider.list_models()
            model_names = [m["id"] for m in models] # Extract names from model dictionaries
            console.print(f"Found {len(model_names)} Ollama models: [cyan]{escape(str(model_names))}[/cyan]")
            
            # Select specific models to compare (adjust these based on what you have installed locally)
            ollama_models = [
                "mix_77/gemma3-qat-tools:27b", 
                "JollyLlama/GLM-Z1-32B-0414-Q4_K_M:latest",
                "llama3.2-vision:latest"
            ]
            # Filter based on available models
            models_to_compare = [m for m in ollama_models if m in model_names]
            if not models_to_compare:
                logger.error("None of the selected models for comparison are available. Please use 'ollama pull MODEL' to download models first.", emoji_key="error")
                console.print("[red]Selected models not found. Use 'ollama pull mix_77/gemma3-qat-tools:27b' to download models first.[/red]")
                return
            console.print(f"Comparing models: [yellow]{escape(str(models_to_compare))}[/yellow]")
            
            prompt = """
            Explain the concept of quantum entanglement in a way that a high school student would understand.
            Keep your response brief and accessible.
            """
            console.print(f"[cyan]Using Prompt:[/cyan] {escape(prompt.strip())[:100]}...")
            
            results_data = []
            
            for model_name in models_to_compare:
                try:
                    logger.info(f"Testing model: {model_name}", emoji_key="model")
                    start_time = time.time()
                    result = await provider.generate_completion(
                        prompt=prompt,
                        model=model_name,
                        temperature=0.3,
                        max_tokens=300
                    )
                    processing_time = time.time() - start_time
                    
                    # Track the cost
                    tracker.add_call(result)
                    
                    results_data.append({
                        "model": model_name,
                        "text": result.text,
                        "tokens": {
                            "input": result.input_tokens,
                            "output": result.output_tokens,
                            "total": result.total_tokens
                        },
                        "cost": result.cost,
                        "time": processing_time
                    })
                    
                    logger.success(
                        f"Completion for {model_name} successful",
                        emoji_key="success",
                        # Tokens/cost/time logged implicitly by storing in results_data
                    )
                    
                except Exception as e:
                    logger.error(f"Error testing model {model_name}: {str(e)}", emoji_key="error", exc_info=True)
                    # Optionally add an error entry to results_data if needed
            
            # Display comparison results using Rich
            if results_data:
                console.print(Rule("[bold green]Comparison Results[/bold green]"))
                
                for result_item in results_data:
                    model = result_item["model"]
                    time_s = result_item["time"]
                    tokens = result_item.get("tokens", {}).get("total", 0)
                    
                    # If tokens is 0 but we have text, estimate based on text length
                    text = result_item.get("text", "").strip()
                    if tokens == 0 and text:
                        # Rough estimate: ~1.3 tokens per word plus some for punctuation
                        tokens = len(text.split()) + len(text) // 10
                        # Update the result item for cost tracking
                        result_item["tokens"]["output"] = tokens
                        result_item["tokens"]["total"] = tokens + result_item["tokens"]["input"]
                        
                    tokens_per_second = tokens / time_s if time_s > 0 and tokens > 0 else 0
                    cost = result_item.get("cost", 0.0)
                    
                    stats_line = (
                        f"Time: [yellow]{time_s:.2f}s[/yellow] | "
                        f"Tokens: [cyan]{tokens}[/cyan] | "
                        f"Speed: [blue]{tokens_per_second:.1f} tok/s[/blue] | "
                        f"Cost: [green]${cost:.6f}[/green]"
                    )
                    
                    console.print(Panel(
                        escape(text),
                        title=f"[bold magenta]{escape(model)}[/bold magenta]",
                        subtitle=stats_line,
                        border_style="blue",
                        expand=False
                    ))
                console.print()
            
        except Exception as e:
            logger.error(f"Error in model comparison: {str(e)}", emoji_key="error", exc_info=True)
    finally:
        # Ensure resources are cleaned up
        if hasattr(gateway, 'shutdown'):
            await gateway.shutdown()


async def demonstrate_system_prompt(tracker: CostTracker):
    """Demonstrate Ollama with system prompts."""
    console.print(Rule("[bold blue]🦙 Ollama System Prompt Demonstration[/bold blue]"))
    logger.info("Demonstrating Ollama with system prompts", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("ollama-demo", register_tools=False)
    
    try:
        # Initialize providers
        logger.info("Initializing providers...", emoji_key="provider")
        await gateway._initialize_providers()
        
        provider_name = Provider.OLLAMA.value
        try:
            # Get the provider from the gateway
            provider = gateway.providers.get(provider_name)
            if not provider:
                logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
                return
            
            # Use mix_77/gemma3-qat-tools:27b (ensure it's available)
            model = "mix_77/gemma3-qat-tools:27b"
            available_models = await provider.list_models()
            model_names = [m["id"] for m in available_models]
            
            if model not in model_names:
                logger.warning(f"Model {model} not available, please run 'ollama pull mix_77/gemma3-qat-tools:27b'", emoji_key="warning")
                console.print("[yellow]Model not found. Please run 'ollama pull mix_77/gemma3-qat-tools:27b' to download it.[/yellow]")
                return
            
            logger.info(f"Using model: {model}", emoji_key="model")
            
            system_prompt = """
You are a helpful assistant with expertise in physics.
Keep all explanations accurate but very concise.
Always provide real-world examples to illustrate concepts.
"""
            user_prompt = "Explain the concept of gravity."
            
            logger.info("Generating completion with system prompt", emoji_key="processing")
            
            result = await provider.generate_completion(
                prompt=user_prompt,
                model=model,
                temperature=0.7,
                system=system_prompt,
                max_tokens=1000  # Increased max_tokens
            )
            
            # Track the cost
            tracker.add_call(result)
            
            logger.success("Completion with system prompt successful", emoji_key="success")
            
            # Display result using Rich Panels
            console.print(Panel(
                escape(system_prompt.strip()),
                title="[bold cyan]System Prompt[/bold cyan]",
                border_style="dim cyan",
                expand=False
            ))
            console.print(Panel(
                escape(user_prompt.strip()),
                title="[bold yellow]User Prompt[/bold yellow]",
                border_style="dim yellow",
                expand=False
            ))
            console.print(Panel(
                escape(result.text.strip()),
                title="[bold green]Ollama Response[/bold green]",
                border_style="green",
                expand=False
            ))
            
            # Display stats in a small table
            stats_table = Table(title="Execution Stats", show_header=False, box=box.MINIMAL, expand=False)
            stats_table.add_column("Metric", style="cyan")
            stats_table.add_column("Value", style="white")
            stats_table.add_row("Input Tokens", str(result.input_tokens))
            stats_table.add_row("Output Tokens", str(result.output_tokens))
            stats_table.add_row("Cost", f"${result.cost:.6f}")
            stats_table.add_row("Processing Time", f"{result.processing_time:.3f}s")
            console.print(stats_table)
            console.print()
            
        except Exception as e:
            logger.error(f"Error in system prompt demonstration: {str(e)}", emoji_key="error", exc_info=True)
            # Optionally re-raise or handle differently
    finally:
        # Ensure resources are cleaned up
        if hasattr(gateway, 'shutdown'):
            await gateway.shutdown()


async def demonstrate_streaming(tracker: CostTracker):
    """Demonstrate Ollama streaming capabilities."""
    console.print(Rule("[bold blue]🦙 Ollama Streaming Demonstration[/bold blue]"))
    logger.info("Demonstrating Ollama streaming capabilities", emoji_key="start")
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("ollama-demo", register_tools=False)
    
    try:
        # Initialize providers
        logger.info("Initializing providers...", emoji_key="provider")
        await gateway._initialize_providers()
        
        provider_name = Provider.OLLAMA.value
        try:
            # Get the provider from the gateway
            provider = gateway.providers.get(provider_name)
            if not provider:
                logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
                return
            
            # Use any available Ollama model
            model = provider.get_default_model()
            logger.info(f"Using model: {model}", emoji_key="model")
            
            prompt = "Write a short poem about programming with AI"
            console.print(Panel(
                escape(prompt.strip()),
                title="[bold yellow]Prompt[/bold yellow]",
                border_style="dim yellow",
                expand=False
            ))
            
            logger.info("Generating streaming completion", emoji_key="processing")
            
            console.print("[bold green]Streaming response:[/bold green]")
            
            # Stream completion and display tokens as they arrive
            output_text = ""
            token_count = 0
            start_time = time.time()
            
            stream = provider.generate_completion_stream(
                prompt=prompt,
                model=model,
                temperature=0.7,
                max_tokens=200
            )
            
            final_metadata = None
            async for chunk, metadata in stream:
                # Display the streaming chunk
                console.print(chunk, end="", highlight=False)
                output_text += chunk
                token_count += 1
                final_metadata = metadata
            
            # Newline after streaming is complete
            console.print()
            console.print()
            
            processing_time = time.time() - start_time
            
            if final_metadata:
                # Track the cost at the end
                if tracker:
                    # Create a simple object with the necessary attributes for the tracker
                    class StreamingCall:
                        def __init__(self, metadata):
                            self.model = metadata.get("model", "")
                            self.provider = metadata.get("provider", "")
                            self.input_tokens = metadata.get("input_tokens", 0)
                            self.output_tokens = metadata.get("output_tokens", 0)
                            self.total_tokens = metadata.get("total_tokens", 0)
                            self.cost = metadata.get("cost", 0.0)
                            self.processing_time = metadata.get("processing_time", 0.0)
                    
                    tracker.add_call(StreamingCall(final_metadata))
                
                # Display stats
                stats_table = Table(title="Streaming Stats", show_header=False, box=box.MINIMAL, expand=False)
                stats_table.add_column("Metric", style="cyan")
                stats_table.add_column("Value", style="white")
                stats_table.add_row("Input Tokens", str(final_metadata.get("input_tokens", 0)))
                stats_table.add_row("Output Tokens", str(final_metadata.get("output_tokens", 0)))
                stats_table.add_row("Cost", f"${final_metadata.get('cost', 0.0):.6f}")
                stats_table.add_row("Processing Time", f"{processing_time:.3f}s")
                stats_table.add_row("Tokens per Second", f"{final_metadata.get('output_tokens', 0) / processing_time if processing_time > 0 else 0:.1f}")
                console.print(stats_table)
            
            logger.success("Streaming demonstration completed", emoji_key="success")
            
        except Exception as e:
            logger.error(f"Error in streaming demonstration: {str(e)}", emoji_key="error", exc_info=True)
    finally:
        # Ensure resources are cleaned up
        if hasattr(gateway, 'shutdown'):
            await gateway.shutdown()


async def explore_ollama_models():
    """Display available Ollama models."""
    console.print(Rule("[bold cyan]🦙 Available Ollama Models[/bold cyan]"))
    
    # Create Gateway instance - this handles provider initialization
    gateway = Gateway("ollama-demo", register_tools=False)
    
    try:
        # Initialize providers
        logger.info("Initializing providers...", emoji_key="provider")
        await gateway._initialize_providers()
        
        # Get provider from the gateway
        provider = gateway.providers.get(Provider.OLLAMA.value)
        if not provider:
            logger.error(f"Provider {Provider.OLLAMA.value} not available or initialized", emoji_key="error")
            console.print("[red]Ollama provider not available. Make sure Ollama is installed and running on your machine.[/red]")
            console.print("[yellow]Visit https://ollama.com/download for installation instructions.[/yellow]")
            return
        
        # Get list of available models
        try:
            models = await provider.list_models()
            
            if not models:
                console.print("[yellow]No Ollama models found. Use 'ollama pull MODEL' to download models.[/yellow]")
                console.print("Example: [green]ollama pull mix_77/gemma3-qat-tools:27b[/green]")
                return
            
            # Create a table to display model information
            table = Table(title="Local Ollama Models")
            table.add_column("Model ID", style="cyan")
            table.add_column("Description", style="green")
            table.add_column("Size", style="yellow")
            
            for model in models:
                # Extract size from description if available
                size_str = "Unknown"
                description = model.get("description", "")
                
                # Check if size information is in the description (format: "... (X.XX GB)")
                import re
                size_match = re.search(r'\((\d+\.\d+) GB\)', description)
                if size_match:
                    size_gb = float(size_match.group(1))
                    size_str = f"{size_gb:.2f} GB"
                
                table.add_row(
                    model["id"], 
                    description,
                    size_str
                )
            
            console.print(table)
            console.print("\n[dim]Note: To add more models, use 'ollama pull MODEL_NAME'[/dim]")
        except Exception as e:
            logger.error(f"Error listing Ollama models: {str(e)}", emoji_key="error")
            console.print(f"[red]Failed to list Ollama models: {str(e)}[/red]")
            console.print("[yellow]Make sure Ollama is installed and running on your machine.[/yellow]")
            console.print("[yellow]Visit https://ollama.com/download for installation instructions.[/yellow]")
    finally:
        # Ensure resources are cleaned up
        if hasattr(gateway, 'shutdown'):
            await gateway.shutdown()


async def main():
    """Run Ollama integration examples."""
    console.print(Panel(
        "[bold]This demonstration shows how to use Ollama with the Ultimate MCP Server.[/bold]\n"
        "Ollama allows you to run LLMs locally on your own machine without sending data to external services.\n\n"
        "[yellow]Make sure you have Ollama installed and running:[/yellow]\n"
        "- Download from [link]https://ollama.com/download[/link]\n"
        "- Pull models with [green]ollama pull mix_77/gemma3-qat-tools:27b[/green] or similar commands",
        title="[bold blue]🦙 Ollama Integration Demo[/bold blue]",
        border_style="blue",
        expand=False
    ))
    
    tracker = CostTracker() # Instantiate tracker here
    try:
        # First show available models
        await explore_ollama_models()
        
        console.print() # Add space between sections
        
        # Run model comparison 
        await compare_ollama_models(tracker) # Pass tracker
        
        console.print() # Add space between sections
        
        # Run system prompt demonstration
        await demonstrate_system_prompt(tracker) # Pass tracker
        
        console.print() # Add space between sections
        
        # Run streaming demonstration
        await demonstrate_streaming(tracker) # Pass tracker

        # Display final summary
        tracker.display_summary(console) # Display summary at the end
        
    except Exception as e:
        logger.critical(f"Example failed: {str(e)}", emoji_key="critical", exc_info=True)
        return 1
    finally:
        # Clean up any remaining aiohttp resources by forcing garbage collection
        import gc
        gc.collect()
    
    logger.success("Ollama Integration Demo Finished Successfully!", emoji_key="complete")
    return 0


if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/pyodide_boot_template.html:
--------------------------------------------------------------------------------

```html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8" />
    <meta name="viewport" content="width=device-width,initial-scale=1" />
    <meta http-equiv="Referrer-Policy" content="strict-origin-when-cross-origin" />
    <title>Pyodide Sandbox v__PYODIDE_VERSION__ (Full)</title>
    <script>
        /* SessionStorage stub */
        try { if(typeof window!=='undefined' && typeof window.sessionStorage!=='undefined'){void window.sessionStorage;}else{throw new Error();} }
        catch(_){ if(typeof window!=='undefined'){Object.defineProperty(window, "sessionStorage", {value:{getItem:()=>null,setItem:()=>{},removeItem:()=>{},clear:()=>{},key:()=>null,length:0},configurable:true,writable:false});console.warn("sessionStorage stubbed.");} }
    </script>
    <style>
        /* Basic styles */
        body { font-family: system-ui, sans-serif; margin: 15px; background-color: #f8f9fa; color: #212529; }
        .status { padding: 10px 15px; border: 1px solid #dee2e6; margin-bottom: 15px; border-radius: 0.25rem; font-size: 0.9em; }
        .status-loading { background-color: #e9ecef; border-color: #ced4da; color: #495057; }
        .status-ready { background-color: #d1e7dd; border-color: #a3cfbb; color: #0a3622;}
        .status-error { background-color: #f8d7da; border-color: #f5c2c7; color: #842029; font-weight: bold; }
    </style>
</head>
<body>

<div id="status-indicator" class="status status-loading">Initializing Sandbox...</div>

<script type="module">
    // --- Constants ---
    const CDN_BASE = "__CDN_BASE__";
    const PYODIDE_VERSION = "__PYODIDE_VERSION__";
    // *** Use the constant for packages loaded AT STARTUP ***
    const CORE_PACKAGES_JSON = '__CORE_PACKAGES_JSON__';
    const MEM_LIMIT_MB = parseInt("__MEM_LIMIT_MB__", 10);
    const logPrefix = "[PyodideFull]"; // Updated prefix

    // --- DOM Element ---
    const statusIndicator = document.getElementById('status-indicator');

    // --- Performance & Heap Helpers ---
    const perf = (typeof performance !== 'undefined') ? performance : { now: () => Date.now() };
    const now = () => perf.now();
    const heapMB = () => (performance?.memory?.usedJSHeapSize ?? 0) / 1048576;

    // --- Safe Proxy Destruction Helper ---
    function safeDestroy(proxy, name, handlerLogPrefix) {
        try {
            if (proxy && typeof proxy === 'object' && typeof proxy.destroy === 'function') {
                const proxyId = proxy.toString ? proxy.toString() : '(proxy)';
                console.log(`${handlerLogPrefix} Destroying ${name} proxy: ${proxyId}`);
                proxy.destroy();
                console.log(`${handlerLogPrefix} ${name} proxy destroyed.`);
                return true;
            }
        } catch (e) { console.warn(`${handlerLogPrefix} Error destroying ${name}:`, e?.message || e); }
        return false;
    }

    // --- Python Runner Code Template (Reads code from its own global scope) ---
    // This should be the same simple runner that worked before
    const pythonRunnerTemplate = `
import sys, io, contextlib, traceback, time, base64, json
print(f"[PyRunnerFull] Starting execution...")
_stdout = io.StringIO(); _stderr = io.StringIO()
result_value = None; error_info = None; execution_ok = False; elapsed_ms = 0.0; user_code = None
try:
    # Get code from the global scope *this script is run in*
    print("[PyRunnerFull] Getting code from _USER_CODE_TO_EXEC...")
    user_code = globals().get("_USER_CODE_TO_EXEC") # Read code set by JS onto the *passed* globals proxy
    if user_code is None: raise ValueError("_USER_CODE_TO_EXEC global not found in execution scope.")
    print(f"[PyRunnerFull] Code retrieved ({len(user_code)} chars). Ready to execute.")
    start_time = time.time()
    print(f"[PyRunnerFull] Executing user code...")
    try:
        with contextlib.redirect_stdout(_stdout), contextlib.redirect_stderr(_stderr):
            compiled_code = compile(source=user_code, filename='<user_code>', mode='exec')
            exec(compiled_code, globals()) # Execute in the provided globals
        if 'result' in globals(): result_value = globals()['result']
        execution_ok = True
        print("[PyRunnerFull] User code execution finished successfully.")
    except Exception as e:
        exc_type=type(e).__name__; exc_msg=str(e); tb_str=traceback.format_exc()
        error_info = {'type':exc_type, 'message':exc_msg, 'traceback':tb_str}
        print(f"[PyRunnerFull] User code execution failed: {exc_type}: {exc_msg}\\n{tb_str}")
    finally: elapsed_ms = (time.time() - start_time) * 1000 if 'start_time' in locals() else 0
    print(f"[PyRunnerFull] Exec phase took: {elapsed_ms:.1f}ms")
except Exception as outer_err:
     tb_str = traceback.format_exc(); error_info = {'type': type(outer_err).__name__, 'message': str(outer_err), 'traceback': tb_str}
     print(f"[PyRunnerFull] Setup/GetCode Error: {outer_err}\\n{tb_str}")
     execution_ok = False
payload_dict = {'ok':execution_ok,'stdout':_stdout.getvalue(),'stderr':_stderr.getvalue(),'elapsed':elapsed_ms,'result':result_value,'error':error_info}
print("[PyRunnerFull] Returning payload dictionary.")
payload_dict # Return value
`; // End of pythonRunnerTemplate

    // --- Main Async IIFE ---
    (async () => {
        let BOOT_MS = 0; let t0 = 0;
        let corePackagesToLoad = []; // Store parsed core packages list
        try {
            // === Step 1: Prepare for Load ===
            t0 = now(); console.log(`${logPrefix} Boot script starting at t0=${t0}`);
            statusIndicator.textContent = `Importing Pyodide v${PYODIDE_VERSION}...`;

            // Parse CORE packages list BEFORE calling loadPyodide
            try {
                corePackagesToLoad = JSON.parse(CORE_PACKAGES_JSON);
                if (!Array.isArray(corePackagesToLoad)) { corePackagesToLoad = []; }
                console.log(`${logPrefix} Core packages requested for init:`, corePackagesToLoad.length > 0 ? corePackagesToLoad : '(none)');
            } catch (parseErr) {
                console.error(`${logPrefix} Error parsing core packages JSON:`, parseErr);
                statusIndicator.textContent += ' (Error parsing core package list!)';
                corePackagesToLoad = [];
            }

            // === Step 2: Load Pyodide (with packages option) ===
            const { loadPyodide } = await import(`${CDN_BASE}/pyodide.mjs`);
            console.log(`${logPrefix} Calling loadPyodide with core packages...`);
            statusIndicator.textContent = `Loading Pyodide runtime & core packages...`;

            window.pyodide = await loadPyodide({
                indexURL: `${CDN_BASE}/`,
                packages: corePackagesToLoad // Load core packages during initialization
            });
            const pyodide = window.pyodide;
            console.log(`${logPrefix} Pyodide core and initial packages loaded. Version: ${pyodide.version}`);
            statusIndicator.textContent = 'Pyodide core & packages loaded.';

            // === Step 3: Verify Loaded Packages (Optional Debugging) ===
            if (corePackagesToLoad.length > 0) {
                const loaded = pyodide.loadedPackages ? Object.keys(pyodide.loadedPackages) : [];
                console.log(`${logPrefix} Currently loaded packages:`, loaded);
                corePackagesToLoad.forEach(pkg => {
                    if (!loaded.includes(pkg)) {
                         console.warn(`${logPrefix} Core package '${pkg}' requested but not loaded! Check CDN/package name.`);
                         statusIndicator.textContent += ` (Warn: ${pkg} failed load)`;
                    }
                });
            }

            BOOT_MS = now() - t0;
            console.log(`${logPrefix} Pyodide setup complete in ${BOOT_MS.toFixed(0)}ms. Heap: ${heapMB().toFixed(1)} MB`);
            statusIndicator.textContent = `Pyodide Ready (${BOOT_MS.toFixed(0)}ms). Awaiting commands...`;
            statusIndicator.className = 'status status-ready';


            Object.freeze(corePackagesToLoad);
            Object.freeze(statusIndicator);   // prevents accidental re-assign
            
            // ================== Main Message Handler ==================
            console.log(`${logPrefix} Setting up main message listener...`);
            window.addEventListener("message", async (ev) => {
                const msg = ev.data;
                if (typeof msg !== 'object' || msg === null || !msg.id) { return; }
                const handlerLogPrefix = `${logPrefix}[Handler id:${msg.id}]`;
                console.log(`${handlerLogPrefix} Received: type=${msg.type}`);
                const wall0 = now();

                const reply = { id: msg.id, ok: false, stdout:'', stderr:'', result:null, elapsed:0, wall_ms:0, error:null };
                let pyResultProxy = null;
                let namespaceProxy = null; // Holds the target execution scope
                let micropipProxy = null;
                let persistentReplProxyToDestroy = null;

                try { // Outer try for message handling
                    if (!window.pyodide) { throw new Error('Pyodide instance lost!'); }
                    const pyodide = window.pyodide;

                    // === Handle Reset Message ===
                    if (msg.type === "reset") {
                         console.log(`${handlerLogPrefix} Reset request received.`);
                         try {
                             if (pyodide.globals.has("_MCP_REPL_NS")) {
                                 console.log(`${handlerLogPrefix} Found _MCP_REPL_NS, attempting deletion.`);
                                 persistentReplProxyToDestroy = pyodide.globals.get("_MCP_REPL_NS");
                                 pyodide.globals.delete("_MCP_REPL_NS");
                                 reply.cleared = true; console.log(`${handlerLogPrefix} _MCP_REPL_NS deleted.`);
                             } else {
                                 reply.cleared = false; console.log(`${handlerLogPrefix} No _MCP_REPL_NS found.`);
                             }
                             reply.ok = true;
                         } catch (err) {
                              console.error(`${handlerLogPrefix} Error during reset operation:`, err);
                              reply.ok = false; reply.error = { type: err.name || 'ResetError', message: `Reset failed: ${err.message || err}`, traceback: err.stack };
                         } finally {
                              safeDestroy(persistentReplProxyToDestroy, "Persistent REPL (on reset)", handlerLogPrefix);
                              console.log(`${handlerLogPrefix} Delivering reset response via callback (ok=${reply.ok})`);
                              if(typeof window._deliverReplyToHost === 'function') { window._deliverReplyToHost(reply); }
                              else { console.error(`${handlerLogPrefix} Host callback _deliverReplyToHost not found!`); }
                         }
                         return; // Exit handler
                     } // End Reset

                    // === Ignore Non-Exec ===
                    if (msg.type !== "exec") { console.log(`${handlerLogPrefix} Ignoring non-exec type: ${msg.type}`); return; }

                    // ================== Handle Exec Message ==================
                    console.log(`${handlerLogPrefix} Processing exec request (repl=${msg.repl_mode})`);

                    /* === Step 1: Load *Additional* Packages/Wheels === */
                    // Filter out packages already loaded during init
                    const currentlyLoaded = pyodide.loadedPackages ? Object.keys(pyodide.loadedPackages) : [];
                    const additionalPackagesToLoad = msg.packages?.filter(p => !currentlyLoaded.includes(p)) || [];
                    if (additionalPackagesToLoad.length > 0) {
                        const pkgs = additionalPackagesToLoad.join(", ");
                        console.log(`${handlerLogPrefix} Loading additional packages: ${pkgs}`);
                        await pyodide.loadPackage(additionalPackagesToLoad).catch(err => {
                            throw new Error(`Additional package loading failed: ${pkgs} - ${err?.message || err}`);
                        });
                        console.log(`${handlerLogPrefix} Additional packages loaded: ${pkgs}`);
                    }

                    // Load wheels (ensure micropip is available)
                    if (msg.wheels?.length) {
                        const whls = msg.wheels.join(", ");
                        console.log(`${handlerLogPrefix} Loading wheels: ${whls}`);
                        // Check if micropip needs loading (it might be a core package now)
                        if (!pyodide.loadedPackages || !pyodide.loadedPackages['micropip']) {
                            console.log(`${handlerLogPrefix} Loading micropip for wheels...`);
                            await pyodide.loadPackage("micropip").catch(err => {
                               throw new Error(`Failed to load micropip: ${err?.message || err}`);
                            });
                        }
                        micropipProxy = pyodide.pyimport("micropip");
                        console.log(`${handlerLogPrefix} Installing wheels via micropip...`);
                        for (const whl of msg.wheels) {
                             console.log(`${handlerLogPrefix} Installing wheel: ${whl}`);
                             await micropipProxy.install(whl).catch(err => {
                                 let pyError = ""; if (err instanceof pyodide.ffi.PythonError) pyError = `${err.type}: `;
                                 throw new Error(`Wheel install failed for ${whl}: ${pyError}${err?.message || err}`);
                             });
                             console.log(`${handlerLogPrefix} Wheel installed: ${whl}`);
                         }
                         // Micropip proxy destroyed in finally
                    }

                    /* === Step 2: Prepare Namespace Proxy (REPL aware) === */
                    if (msg.repl_mode) {
                        if (pyodide.globals.has("_MCP_REPL_NS")) {
                            console.log(`${handlerLogPrefix} Reusing persistent REPL namespace.`);
                            namespaceProxy = pyodide.globals.get("_MCP_REPL_NS");
                            if (!namespaceProxy || typeof namespaceProxy.set !== 'function') {
                                console.warn(`${handlerLogPrefix} REPL namespace invalid. Resetting.`);
                                safeDestroy(namespaceProxy, "Invalid REPL", handlerLogPrefix);
                                namespaceProxy = pyodide.toPy({'__name__': '__main__'});
                                pyodide.globals.set("_MCP_REPL_NS", namespaceProxy);
                            }
                        } else {
                            console.log(`${handlerLogPrefix} Initializing new persistent REPL namespace.`);
                            namespaceProxy = pyodide.toPy({'__name__': '__main__'});
                            pyodide.globals.set("_MCP_REPL_NS", namespaceProxy);
                        }
                    } else {
                        console.log(`${handlerLogPrefix} Creating fresh temporary namespace.`);
                        namespaceProxy = pyodide.toPy({'__name__': '__main__'});
                    }
                    if (!namespaceProxy || typeof namespaceProxy.set !== 'function') { // Final check
                        throw new Error("Failed to obtain valid namespace proxy.");
                    }

                    /* === Step 3: Prepare and Set User Code INTO Namespace Proxy === */
                    let userCode = '';
                    try {
                        if (typeof msg.code_b64 !== 'string' || msg.code_b64 === '') throw new Error("Missing/empty code_b64");
                        userCode = atob(msg.code_b64); // JS base64 decode
                    } catch (decodeErr) { throw new Error(`Base64 decode failed: ${decodeErr.message}`); }
                    console.log(`${handlerLogPrefix} Setting _USER_CODE_TO_EXEC on target namespace proxy...`);
                    namespaceProxy.set("_USER_CODE_TO_EXEC", userCode); // Set ON THE TARGET PROXY

                    /* === Step 4: Execute Runner === */
                    console.log(`${handlerLogPrefix} Executing Python runner...`);
                    // Pass the namespaceProxy (which now contains the code) as globals
                    pyResultProxy = await pyodide.runPythonAsync(pythonRunnerTemplate, { globals: namespaceProxy });
                    // Cleanup the code variable from the namespaceProxy afterwards
                    console.log(`${handlerLogPrefix} Deleting _USER_CODE_TO_EXEC from namespace proxy...`);
                    if (namespaceProxy.has && namespaceProxy.has("_USER_CODE_TO_EXEC")) { // Check if method exists
                         namespaceProxy.delete("_USER_CODE_TO_EXEC");
                    } else { console.warn(`${handlerLogPrefix} Could not check/delete _USER_CODE_TO_EXEC from namespace.`); }
                    reply.wall_ms = now() - wall0;
                    console.log(`${handlerLogPrefix} Python runner finished. Wall: ${reply.wall_ms.toFixed(0)}ms`);

                    /* === Step 5: Process Result Proxy === */
                    if (!pyResultProxy || typeof pyResultProxy.toJs !== 'function') { throw new Error(`Runner returned invalid result.`); }
                    console.log(`${handlerLogPrefix} Converting Python result payload...`);
                    let jsResultPayload = pyResultProxy.toJs({ dict_converter: Object.fromEntries });
                    Object.assign(reply, jsResultPayload); // Merge python results
                    reply.ok = jsResultPayload.ok;

                } catch (err) { // Catch ANY error during the process
                    reply.wall_ms = now() - wall0;
                    console.error(`${handlerLogPrefix} *** ERROR DURING EXECUTION PROCESS ***:`, err);
                    reply.ok = false;
                    reply.error = { type: err.name || 'JavaScriptError', message: err.message || String(err), traceback: err.stack || null };
                    if (err instanceof pyodide.ffi.PythonError) { reply.error.type = err.type || 'PythonError'; }
                    reply.stdout = reply.stdout || ''; reply.stderr = reply.stderr || ''; reply.result = reply.result || null; reply.elapsed = reply.elapsed || 0;
                } finally {
                     /* === Step 6: Cleanup Proxies === */
                     console.log(`${handlerLogPrefix} Entering finally block for cleanup.`);
                     safeDestroy(pyResultProxy, "Result", handlerLogPrefix);
                     safeDestroy(micropipProxy, "Micropip", handlerLogPrefix);
                     // Only destroy namespace if it was temporary (non-REPL)
                     if (!msg?.repl_mode) { // Use optional chaining
                         safeDestroy(namespaceProxy, "Temporary Namespace", handlerLogPrefix);
                     } else {
                          console.log(`${handlerLogPrefix} Skipping destruction of persistent REPL namespace proxy.`);
                     }
                     console.log(`${handlerLogPrefix} Cleanup finished.`);

                     /* === Step 7: Send Reply via Exposed Function === */
                     console.log(`${handlerLogPrefix} *** Delivering final response via exposed function *** (ok=${reply.ok})`);
                     console.log(`${handlerLogPrefix} Reply payload:`, JSON.stringify(reply, null, 2));
                     try {
                         if (typeof window._deliverReplyToHost === 'function') { window._deliverReplyToHost(reply); console.log(`${handlerLogPrefix} Reply delivered.`); }
                         else { console.error(`${handlerLogPrefix} !!! ERROR: Host function _deliverReplyToHost not found!`); }
                     } catch (deliveryErr) { console.error(`${handlerLogPrefix} !!! FAILED TO DELIVER REPLY !!!`, deliveryErr); }
                } // End finally block

                /* Step 8: Heap Watchdog */
                const currentHeapMB = heapMB();
                if (Number.isFinite(currentHeapMB) && currentHeapMB > MEM_LIMIT_MB) {
                     console.warn(`${handlerLogPrefix}[WATCHDOG] Heap ${currentHeapMB.toFixed(0)}MB > limit ${MEM_LIMIT_MB}MB. Closing!`);
                     statusIndicator.textContent = `Heap limit exceeded (${currentHeapMB.toFixed(0)}MB). Closing...`;
                     statusIndicator.className = 'status status-error';
                     setTimeout(() => window.close(), 200);
                 }

            }); // End message listener

            console.log(`${logPrefix} Main message listener active.`);
            window.postMessage({ ready: true, boot_ms: BOOT_MS, id: "pyodide_ready" });
            console.log(`${logPrefix} Full Sandbox Ready.`);

        } catch (err) { // Catch Initialization Errors
            const initErrorMsg = `FATAL: Pyodide init failed: ${err.message || err}`;
            console.error(`${logPrefix} ${initErrorMsg}`, err.stack || '(no stack)', err);
            statusIndicator.textContent = initErrorMsg; statusIndicator.className = 'status status-error';
            try { window.postMessage({ id: "pyodide_init_error", ok: false, error: { type: err.name || 'InitError', message: initErrorMsg, traceback: err.stack || null } });
            } catch (postErr) { console.error(`${logPrefix} Failed to post init error:`, postErr); }
        }
    })(); // End main IIFE
</script>

</body>
</html>
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/clients/rag_client.py:
--------------------------------------------------------------------------------

```python
"""High-level client for RAG (Retrieval-Augmented Generation) operations."""

from typing import Any, Dict, List, Optional

from ultimate_mcp_server.services.knowledge_base import (
    get_knowledge_base_manager,
    get_knowledge_base_retriever,
    get_rag_service,
)
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.clients.rag")

class RAGClient:
    """
    High-level client for Retrieval-Augmented Generation (RAG) operations.
    
    The RAGClient provides a simplified, unified interface for building and using
    RAG systems within the MCP ecosystem. It encapsulates all the key operations in
    the RAG workflow, from knowledge base creation and document ingestion to context
    retrieval and LLM-augmented generation.
    
    RAG is a technique that enhances LLM capabilities by retrieving relevant information
    from external knowledge bases before generation, allowing models to access information
    beyond their training data and produce more accurate, up-to-date responses.
    
    Key capabilities:
    - Knowledge base creation and management
    - Document ingestion with automatic chunking
    - Semantic retrieval of relevant context
    - LLM generation with retrieved context
    - Various retrieval methods (vector, hybrid, keyword)
    
    Architecture:
    The client follows a modular architecture with three main components:
    1. Knowledge Base Manager: Handles creation, deletion, and document ingestion
    2. Knowledge Base Retriever: Responsible for context retrieval using various methods
    3. RAG Service: Combines retrieval with LLM generation for complete RAG workflow
    
    Performance Considerations:
    - Document chunking size affects both retrieval quality and storage requirements
    - Retrieval method selection impacts accuracy vs. speed tradeoffs:
      * Vector search: Fast with good semantic understanding but may miss keyword matches
      * Keyword search: Good for exact matches but misses semantic similarities
      * Hybrid search: Most comprehensive but computationally more expensive
    - Top-k parameter balances between providing sufficient context and relevance dilution
    - Different LLM models may require different prompt templates for optimal performance
    
    This client abstracts away the complexity of the underlying vector stores,
    embeddings, and retrieval mechanisms, providing a simple API for RAG operations.
    
    Example usage:
    ```python
    # Create a RAG client
    client = RAGClient()
    
    # Create a knowledge base
    await client.create_knowledge_base(
        "company_docs", 
        "Company documentation and policies"
    )
    
    # Add documents to the knowledge base with metadata
    await client.add_documents(
        "company_docs",
        documents=[
            "Our return policy allows returns within 30 days of purchase with receipt.",
            "Product warranties cover manufacturing defects for a period of one year."
        ],
        metadatas=[
            {"source": "policies/returns.pdf", "page": 1, "department": "customer_service"},
            {"source": "policies/warranty.pdf", "page": 3, "department": "legal"}
        ],
        chunk_size=500,
        chunk_method="semantic"
    )
    
    # Retrieve context without generation (for inspection or custom handling)
    context = await client.retrieve(
        "company_docs",
        query="What is our return policy?",
        top_k=3,
        retrieval_method="hybrid"
    )
    
    # Print retrieved context and sources
    for i, (doc, meta) in enumerate(zip(context["documents"], context["metadatas"])):
        print(f"Source: {meta.get('source', 'unknown')} | Score: {context['distances'][i]:.3f}")
        print(f"Content: {doc[:100]}...\n")
    
    # Generate a response using RAG with specific provider and template
    result = await client.generate_with_rag(
        "company_docs",
        query="Explain our warranty coverage for electronics",
        provider="openai",
        model="gpt-4",
        template="customer_service_response",
        temperature=0.3,
        retrieval_method="hybrid"
    )
    
    print(result["response"])
    print("\nSources:")
    for source in result.get("sources", []):
        print(f"- {source['metadata'].get('source')}")
    ```
    """
    
    def __init__(self):
        """Initialize the RAG client."""
        self.kb_manager = get_knowledge_base_manager()
        self.kb_retriever = get_knowledge_base_retriever()
        self.rag_service = get_rag_service()
    
    async def create_knowledge_base(
        self,
        name: str,
        description: Optional[str] = None,
        overwrite: bool = False
    ) -> Dict[str, Any]:
        """Create a knowledge base.
        
        Args:
            name: The name of the knowledge base
            description: Optional description
            overwrite: Whether to overwrite an existing KB with the same name
            
        Returns:
            Result of the operation
        """
        logger.info(f"Creating knowledge base: {name}", emoji_key="processing")
        
        try:
            result = await self.kb_manager.create_knowledge_base(
                name=name,
                description=description,
                overwrite=overwrite
            )
            
            logger.success(f"Knowledge base created: {name}", emoji_key="success")
            return result
        except Exception as e:
            logger.error(f"Failed to create knowledge base: {str(e)}", emoji_key="error")
            raise
    
    async def add_documents(
        self,
        knowledge_base_name: str,
        documents: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        chunk_size: int = 1000,
        chunk_method: str = "semantic"
    ) -> Dict[str, Any]:
        """Add documents to the knowledge base.
        
        Args:
            knowledge_base_name: Name of the knowledge base to add to
            documents: List of document texts
            metadatas: Optional list of metadata dictionaries
            chunk_size: Size of chunks to split documents into
            chunk_method: Method to use for chunking ('simple', 'semantic', etc.)
            
        Returns:
            Result of the operation
        """
        logger.info(f"Adding documents to knowledge base: {knowledge_base_name}", emoji_key="processing")
        
        try:
            result = await self.kb_manager.add_documents(
                knowledge_base_name=knowledge_base_name,
                documents=documents,
                metadatas=metadatas,
                chunk_size=chunk_size,
                chunk_method=chunk_method
            )
            
            added_count = result.get("added_count", 0)
            logger.success(f"Added {added_count} documents to knowledge base", emoji_key="success")
            return result
        except Exception as e:
            logger.error(f"Failed to add documents: {str(e)}", emoji_key="error")
            raise
    
    async def list_knowledge_bases(self) -> List[Any]:
        """List all knowledge bases.
        
        Returns:
            List of knowledge base information
        """
        logger.info("Retrieving list of knowledge bases", emoji_key="processing")
        
        try:
            knowledge_bases = await self.kb_manager.list_knowledge_bases()
            return knowledge_bases
        except Exception as e:
            logger.error(f"Failed to list knowledge bases: {str(e)}", emoji_key="error")
            raise
    
    async def retrieve(
        self,
        knowledge_base_name: str,
        query: str,
        top_k: int = 3,
        retrieval_method: str = "vector"
    ) -> Dict[str, Any]:
        """
        Retrieve relevant documents from a knowledge base for a given query.
        
        This method performs the retrieval stage of RAG, finding the most relevant
        documents in the knowledge base based on the query. The method is useful
        for standalone retrieval operations or when you want to examine retrieved
        context before generating a response.
        
        The retrieval process works by:
        1. Converting the query into a vector representation (embedding)
        2. Finding documents with similar embeddings and/or matching keywords
        3. Ranking and returning the most relevant documents
        
        Available retrieval methods:
        - "vector": Embedding-based similarity search using cosine distance
          Best for conceptual/semantic queries where exact wording may differ
        - "keyword": Traditional text search using BM25 or similar algorithms
          Best for queries with specific terms that must be matched
        - "hybrid": Combines vector and keyword approaches with a weighted blend
          Good general-purpose approach that balances semantic and keyword matching
        - "rerank": Two-stage retrieval that first gets candidates, then reranks them
          More computationally intensive but often more accurate
        
        Optimization strategies:
        - For factual queries with specific terminology, use "keyword" or "hybrid"
        - For conceptual or paraphrased queries, use "vector"
        - For highest accuracy at cost of performance, use "rerank"
        - Adjust top_k based on document length; shorter documents may need higher top_k
        - Pre-filter by metadata before retrieval when targeting specific sections
        
        Understanding retrieval metrics:
        - "distances" represent similarity scores where lower values indicate higher similarity
          for vector search, and higher values indicate better matches for keyword search
        - Scores are normalized differently between retrieval methods, so direct
          comparison between methods is not meaningful
        - Score thresholds for "good matches" vary based on embedding model and content domain
        
        Args:
            knowledge_base_name: Name of the knowledge base to search
            query: The search query (question or keywords)
            top_k: Maximum number of documents to retrieve (default: 3)
            retrieval_method: Method to use for retrieval ("vector", "keyword", "hybrid", "rerank")
            
        Returns:
            Dictionary containing:
            - "documents": List of retrieved documents (text chunks)
            - "metadatas": List of metadata for each document (source info, etc.)
            - "distances": List of similarity scores or relevance metrics
              (interpretation depends on retrieval_method)
            - "query": The original query
            - "retrieval_method": The method used for retrieval
            - "processing_time_ms": Time taken for retrieval in milliseconds
            
        Raises:
            ValueError: If knowledge_base_name doesn't exist or query is invalid
            Exception: If the retrieval process fails
            
        Example:
            ```python
            # Retrieve context about product returns
            results = await rag_client.retrieve(
                knowledge_base_name="support_docs",
                query="How do I return a product?",
                top_k=5,
                retrieval_method="hybrid"
            )
            
            # Check if we got high-quality matches
            if results["documents"] and min(results["distances"]) < 0.3:  # Good match threshold
                print("Found relevant information!")
            else:
                print("No strong matches found, consider reformulating the query")
            
            # Display retrieved documents and their sources with scores
            for i, (doc, meta) in enumerate(zip(results["documents"], results["metadatas"])):
                score = results["distances"][i]
                score_indicator = "🟢" if score < 0.3 else "🟡" if score < 0.6 else "🔴"
                print(f"{score_indicator} Result {i+1} (score: {score:.3f}):")
                print(f"Source: {meta.get('source', 'unknown')}")
                print(doc[:100] + "...\n")
            ```
        """
        logger.info(f"Retrieving context for query: '{query}'", emoji_key="processing")
        
        try:
            results = await self.kb_retriever.retrieve(
                knowledge_base_name=knowledge_base_name,
                query=query,
                top_k=top_k,
                retrieval_method=retrieval_method
            )
            
            return results
        except Exception as e:
            logger.error(f"Failed to retrieve context: {str(e)}", emoji_key="error")
            raise
    
    async def generate_with_rag(
        self,
        knowledge_base_name: str,
        query: str,
        provider: str = "gemini",
        model: Optional[str] = None,
        template: str = "rag_default",
        temperature: float = 0.3,
        top_k: int = 3,
        retrieval_method: str = "hybrid",
        include_sources: bool = True
    ) -> Dict[str, Any]:
        """
        Generate a response using Retrieval-Augmented Generation (RAG).
        
        This method performs the complete RAG process:
        1. Retrieves relevant documents from the specified knowledge base based on the query
        2. Constructs a prompt that includes both the query and retrieved context
        3. Sends the augmented prompt to the LLM to generate a response
        4. Optionally includes source information for transparency and citation
        
        The retrieval process can use different methods:
        - "vector": Pure semantic/embedding-based similarity search (good for conceptual queries)
        - "keyword": Traditional keyword-based search (good for specific terms or phrases)
        - "hybrid": Combines vector and keyword approaches (good general-purpose approach)
        - "rerank": Uses a two-stage approach with retrieval and reranking
        
        Args:
            knowledge_base_name: Name of the knowledge base to query for relevant context
            query: The user's question or request
            provider: LLM provider to use for generation (e.g., "openai", "anthropic", "gemini")
            model: Specific model to use (if None, uses provider's default)
            template: Prompt template name that defines how to format the RAG prompt
                      Different templates can be optimized for different use cases
            temperature: Sampling temperature for controlling randomness (0.0-1.0)
                         Lower values recommended for factual RAG responses
            top_k: Number of relevant documents to retrieve and include in the context
                   Higher values provide more context but may dilute relevance
            retrieval_method: The method to use for retrieving documents ("vector", "keyword", "hybrid")
            include_sources: Whether to include source information in the output for citations
            
        Returns:
            Dictionary containing:
            - "response": The generated text response from the LLM
            - "sources": List of source documents and their metadata (if include_sources=True)
            - "context": The retrieved context that was used for generation
            - "prompt": The full prompt that was sent to the LLM (useful for debugging)
            - "tokens": Token usage information (input, output, total)
            - "processing_time_ms": Total processing time in milliseconds
            
        Raises:
            ValueError: If knowledge_base_name or query is invalid
            Exception: If retrieval or generation fails
            
        Example:
            ```python
            # Generate a response using RAG with custom parameters
            result = await rag_client.generate_with_rag(
                knowledge_base_name="financial_docs",
                query="What were our Q3 earnings?",
                provider="openai",
                model="gpt-4",
                temperature=0.1,
                top_k=5,
                retrieval_method="hybrid"
            )
            
            # Access the response and sources
            print(result["response"])
            for src in result["sources"]:
                print(f"- {src['metadata']['source']} (relevance: {src['score']})")
            ```
        """
        logger.info(f"Generating RAG response for: '{query}'", emoji_key="processing")
        
        try:
            result = await self.rag_service.generate_with_rag(
                knowledge_base_name=knowledge_base_name,
                query=query,
                provider=provider,
                model=model,
                template=template,
                temperature=temperature,
                top_k=top_k,
                retrieval_method=retrieval_method,
                include_sources=include_sources
            )
            
            return result
        except Exception as e:
            logger.error(f"Failed to call RAG service: {str(e)}", emoji_key="error")
            raise
    
    async def delete_knowledge_base(self, name: str) -> Dict[str, Any]:
        """Delete a knowledge base.
        
        Args:
            name: Name of the knowledge base to delete
            
        Returns:
            Result of the operation
        """
        logger.info(f"Deleting knowledge base: {name}", emoji_key="processing")
        
        try:
            result = await self.kb_manager.delete_knowledge_base(name=name)
            logger.success(f"Knowledge base {name} deleted successfully", emoji_key="success")
            return result
        except Exception as e:
            logger.error(f"Failed to delete knowledge base: {str(e)}", emoji_key="error")
            raise
    
    async def reset_knowledge_base(self, knowledge_base_name: str) -> None:
        """
        Reset (delete and recreate) a knowledge base.
        
        This method completely removes an existing knowledge base and creates a new
        empty one with the same name. This is useful when you need to:
        - Remove all documents from a knowledge base efficiently
        - Fix a corrupted knowledge base
        - Change the underlying embedding model without renaming the knowledge base
        - Update document chunking strategy for an entire collection
        - Clear outdated information before a complete refresh
        
        Performance Considerations:
        - Resetting is significantly faster than removing documents individually
        - For large knowledge bases, resetting and bulk re-adding documents can be
          orders of magnitude more efficient than incremental updates
        - New documents added after reset will use any updated embedding models or
          chunking strategies configured in the system
        
        Data Integrity:
        - This operation preserves knowledge base configuration but removes all content
        - The knowledge base name and any associated permissions remain intact
        - Custom configuration settings on the knowledge base will be preserved
          if the knowledge base service supports configuration persistence
        
        WARNING: This operation is irreversible. All documents and their embeddings
        will be permanently deleted. Consider backing up important data before resetting.
        
        Disaster Recovery:
        Before resetting a production knowledge base, consider these strategies:
        1. Create a backup by exporting documents and metadata if the feature is available
        2. Maintain source documents in original form outside the knowledge base
        3. Document the ingestion pipeline to reproduce the knowledge base if needed
        4. Consider creating a temporary duplicate before resetting critical knowledge bases
        
        The reset process:
        1. Deletes the entire knowledge base collection/index
        2. Creates a new empty knowledge base with the same name
        3. Re-initializes any associated metadata or settings
        
        Args:
            knowledge_base_name: Name of the knowledge base to reset
            
        Returns:
            None
            
        Raises:
            ValueError: If the knowledge base doesn't exist
            Exception: If deletion or recreation fails
            
        Example:
            ```python
            # Backup critical metadata before reset (if needed)
            kb_info = await rag_client.list_knowledge_bases()
            kb_config = next((kb for kb in kb_info if kb['name'] == 'product_documentation'), None)
            
            # Reset a knowledge base that may have outdated or corrupted data
            await rag_client.reset_knowledge_base("product_documentation")
            
            # After resetting, re-add documents with potentially improved chunking strategy
            await rag_client.add_documents(
                knowledge_base_name="product_documentation",
                documents=updated_docs,
                metadatas=updated_metadatas,
                chunk_size=800,  # Updated chunk size better suited for the content
                chunk_method="semantic"  # Using semantic chunking for better results
            )
            ```
        """
        logger.info(f"Deleting knowledge base: {knowledge_base_name}", emoji_key="processing")
        
        try:
            result = await self.kb_manager.delete_knowledge_base(name=knowledge_base_name)
            logger.success(f"Knowledge base {knowledge_base_name} deleted successfully", emoji_key="success")
            return result
        except Exception as e:
            logger.error(f"Failed to delete knowledge base: {str(e)}", emoji_key="error")
            raise 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/document.py:
--------------------------------------------------------------------------------

```python
"""Document processing service for chunking and analyzing text documents."""
import re
from typing import List

from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


class DocumentProcessor:
    """
    Service for intelligent text document processing, chunking, and preparation.
    
    The DocumentProcessor provides sophisticated document handling capabilities
    focused on breaking down long documents into meaningful, properly-sized chunks
    optimized for various downstream NLP tasks such as embedding generation,
    semantic search, and RAG (Retrieval Augmented Generation).
    
    Key Features:
    - Multiple chunking strategies optimized for different content types
    - Configurable chunk size and overlap parameters
    - Semantic-aware chunking that preserves context and meaning
    - Sentence boundary detection for natural text segmentation
    - Token-based chunking for precise size control
    - Singleton implementation for efficient resource usage
    
    Chunking Methods:
    1. Semantic Chunking: Preserves paragraph structure and semantic meaning,
       preventing splits that would break logical content boundaries. Best for
       maintaining context in well-structured documents.
    
    2. Sentence Chunking: Splits documents at sentence boundaries, ensuring
       no sentence is broken across chunks. Ideal for natural language text
       where sentence integrity is important.
    
    3. Token Chunking: Divides text based on approximate token counts without
       special consideration for semantic boundaries. Provides the most precise
       control over chunk size for token-limited systems.
    
    Each method implements configurable overlap between chunks to maintain
    context across chunk boundaries, ensuring information isn't lost when a
    concept spans multiple chunks.
    
    Usage Example:
    ```python
    processor = get_document_processor()
    
    # Chunk a document with default settings (token-based)
    chunks = await processor.chunk_document(
        document=long_text,
        chunk_size=1000,
        chunk_overlap=200
    )
    
    # Use semantic chunking for a well-structured document
    semantic_chunks = await processor.chunk_document(
        document=article_text,
        chunk_size=1500,
        chunk_overlap=150,
        method="semantic"
    )
    
    # Process chunks for embedding or RAG
    for chunk in chunks:
        # Process each chunk...
    ```
    
    Note:
        This service implements the singleton pattern, ensuring only one instance
        exists throughout the application. Always use the get_document_processor()
        function to obtain the shared instance rather than creating instances directly.
    """
    
    _instance = None
    
    def __new__(cls, *args, **kwargs):
        """Create a singleton instance."""
        if cls._instance is None:
            cls._instance = super(DocumentProcessor, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        """Initialize the document processor."""
        # Only initialize once for singleton
        if getattr(self, "_initialized", False):
            return
            
        logger.info("Document processor initialized", extra={"emoji_key": "success"})
        self._initialized = True
    
    async def chunk_document(
        self,
        document: str,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        method: str = "token"
    ) -> List[str]:
        """
        Split a document into optimally sized, potentially overlapping chunks.
        
        This method intelligently divides a document into smaller segments using
        one of several chunking strategies, balancing chunk size requirements with
        preserving semantic coherence. The chunking process is critical for preparing
        documents for embedding, retrieval, and other NLP operations that have
        input size limitations or benefit from focused context.
        
        Chunking Methods:
        - "token": (Default) Splits text based on approximate token count.
          Simple and precise for size control, but may break semantic units.
        - "sentence": Preserves sentence boundaries, ensuring no sentence is broken
          across chunks. Better for maintaining local context and readability.
        - "semantic": Most sophisticated approach that attempts to preserve paragraph
          structure and semantic coherence. Best for maintaining document meaning
          but may result in more size variation between chunks.
        
        The chunk_size parameter is approximate for all methods, as they prioritize
        maintaining semantic boundaries where appropriate. The actual size of returned
        chunks may vary, especially when using sentence or semantic methods.
        
        Chunk overlap creates a sliding window effect, where the end of one chunk
        overlaps with the beginning of the next. This helps maintain context across
        chunk boundaries and improves retrieval quality by ensuring concepts that
        span multiple chunks can still be found.
        
        Selecting Parameters:
        - For embedding models with strict token limits: Use "token" with chunk_size
          set safely below the model's limit
        - For maximizing context preservation: Use "semantic" with larger overlap
        - For balancing size precision and sentence integrity: Use "sentence"
        - Larger overlap (25-50% of chunk_size) improves retrieval quality but
          increases storage and processing requirements
        
        Args:
            document: Text content to be chunked
            chunk_size: Target size of each chunk in approximate tokens (default: 1000)
            chunk_overlap: Number of tokens to overlap between chunks (default: 200)
            method: Chunking strategy to use ("token", "sentence", or "semantic")
            
        Returns:
            List of text chunks derived from the original document
            
        Note:
            Returns an empty list if the input document is empty or None.
            The token estimation is approximate and based on whitespace splitting,
            not a true tokenizer, so actual token counts may differ when processed
            by specific models.
        """
        if not document:
            return []
            
        logger.debug(
            f"Chunking document using method '{method}' (size: {chunk_size}, overlap: {chunk_overlap})",
            extra={"emoji_key": "processing"}
        )
        
        if method == "semantic":
            return await self._chunk_semantic(document, chunk_size, chunk_overlap)
        elif method == "sentence":
            return await self._chunk_by_sentence(document, chunk_size, chunk_overlap)
        else:
            # Default to token-based chunking
            return await self._chunk_by_tokens(document, chunk_size, chunk_overlap)
    
    async def _chunk_by_tokens(
        self,
        document: str,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ) -> List[str]:
        """
        Split document into chunks by approximate token count without preserving semantic structures.
        
        This is the most straightforward chunking method, dividing text based solely
        on approximate token counts without special consideration for sentence or
        paragraph boundaries. It provides the most predictable and precise control
        over chunk sizes at the cost of potentially breaking semantic units like
        sentences or paragraphs.
        
        Algorithm implementation:
        1. Approximates tokens by splitting text on whitespace (creating "words")
        2. Divides the document into chunks of specified token length
        3. Implements sliding window overlaps between consecutive chunks
        4. Handles edge cases like empty documents and final chunks
        
        The token approximation used is simple whitespace splitting, which provides
        a reasonable estimation for most Western languages and common tokenization
        schemes. While not as accurate as model-specific tokenizers, it offers a
        good balance between performance and approximation quality for general use.
        
        Chunk overlap is implemented by including tokens from the end of one chunk
        at the beginning of the next, creating a sliding window effect that helps
        maintain context across chunk boundaries.
        
        This method is ideal for:
        - Working with strict token limits in downstream models
        - Processing text where exact chunk sizes are more important than 
          preserving semantic structures
        - High-volume processing where simplicity and performance are priorities
        - Text with unusual or inconsistent formatting where sentence/paragraph
          detection might fail
        
        Args:
            document: Text content to split by tokens
            chunk_size: Number of tokens (words) per chunk
            chunk_overlap: Number of tokens to overlap between chunks
            
        Returns:
            List of text chunks of approximately equal token counts
            
        Note:
            True token counts in NLP models may differ from this approximation,
            especially for models with subword tokenization. For applications
            requiring exact token counts, consider using the model's specific
            tokenizer for more accurate size estimates.
        """
        # Simple token estimation (split by whitespace)
        words = document.split()
        
        # No words, return empty list
        if not words:
            return []
            
        # Simple chunking
        chunks = []
        start = 0
        
        while start < len(words):
            # Calculate end position with potential overlap
            end = min(start + chunk_size, len(words))
            
            # Create chunk
            chunk = " ".join(words[start:end])
            chunks.append(chunk)
            
            # Move to next chunk with overlap
            start = end - chunk_overlap
            
            # Avoid getting stuck at the end
            if start >= len(words) - chunk_overlap:
                break
        
        logger.debug(
            f"Split document into {len(chunks)} chunks by token",
            extra={"emoji_key": "processing"}
        )
        
        return chunks
    
    async def _chunk_by_sentence(
        self,
        document: str,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ) -> List[str]:
        """
        Split document into chunks by preserving complete sentences.
        
        This chunking method respects sentence boundaries when dividing documents,
        ensuring that no sentence is fragmented across multiple chunks. It balances
        chunk size requirements with maintaining the integrity of natural language
        structures, producing more readable and semantically coherent chunks than
        simple token-based approaches.
        
        Algorithm details:
        1. Detects sentence boundaries using regular expressions that handle:
           - Standard end punctuation (.!?)
           - Common abbreviations (Mr., Dr., etc.)
           - Edge cases like decimal numbers or acronyms
        2. Builds chunks by adding complete sentences until the target chunk size
           is approached
        3. Creates overlap between chunks by including ending sentences from the
           previous chunk at the beginning of the next chunk
        4. Maintains approximate token count targets while prioritizing sentence
           integrity
        
        The sentence detection uses a regex pattern that aims to balance accuracy
        with simplicity and efficiency. It identifies likely sentence boundaries by:
        - Looking for punctuation marks followed by whitespace
        - Excluding common patterns that are not sentence boundaries (e.g., "Mr.")
        - Handling basic cases like quotes and parentheses
        
        This method is ideal for:
        - Natural language text where sentence flow is important
        - Content where breaking mid-sentence would harm readability or context
        - General purpose document processing where semantic units matter
        - Documents that don't have clear paragraph structure
        
        Args:
            document: Text content to split by sentences
            chunk_size: Target approximate size per chunk in tokens
            chunk_overlap: Number of tokens to overlap between chunks
            
        Returns:
            List of document chunks with complete sentences
            
        Note:
            The sentence detection uses regex patterns that work well for most
            standard English text but may not handle all edge cases perfectly.
            For specialized text with unusual punctuation patterns, additional
            customization may be needed.
        """
        # Simple sentence splitting (not perfect but works for most cases)
        sentence_delimiters = r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s'
        sentences = re.split(sentence_delimiters, document)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        # No sentences, return empty list
        if not sentences:
            return []
            
        # Chunk by sentences, trying to reach target size
        chunks = []
        current_chunk = []
        current_size = 0
        
        for sentence in sentences:
            # Estimate size in tokens (approximate)
            sentence_size = len(sentence.split())
            
            # If adding this sentence exceeds the chunk size and we have content,
            # finalize the current chunk
            if current_chunk and current_size + sentence_size > chunk_size:
                chunks.append(" ".join(current_chunk))
                
                # Start new chunk with overlap
                overlap_size = 0
                overlap_chunk = []
                
                # Add sentences from the end of previous chunk for overlap
                for s in reversed(current_chunk):
                    s_size = len(s.split())
                    if overlap_size + s_size <= chunk_overlap:
                        overlap_chunk.insert(0, s)
                        overlap_size += s_size
                    else:
                        break
                
                current_chunk = overlap_chunk
                current_size = overlap_size
            
            # Add current sentence
            current_chunk.append(sentence)
            current_size += sentence_size
        
        # Add the last chunk if not empty
        if current_chunk:
            chunks.append(" ".join(current_chunk))
        
        logger.debug(
            f"Split document into {len(chunks)} chunks by sentence",
            extra={"emoji_key": "processing"}
        )
        
        return chunks
    
    async def _chunk_semantic(
        self,
        document: str,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ) -> List[str]:
        """
        Split document into chunks by semantic meaning, preserving paragraph structure.
        
        This advanced chunking method attempts to maintain the semantic coherence and
        natural structure of the document by respecting paragraph boundaries whenever
        possible. It implements a hierarchical approach that:
        
        1. First divides the document by paragraph breaks (blank lines)
        2. Evaluates each paragraph for length
        3. Keeps short and medium paragraphs intact to preserve their meaning
        4. Further splits overly long paragraphs using sentence boundary detection
        5. Assembles chunks with appropriate overlap for context continuity
        
        The algorithm prioritizes three key aspects of document structure:
        - Paragraph integrity: Treats paragraphs as coherent units of thought
        - Logical flow: Maintains document organization when possible
        - Size constraints: Respects chunk size limitations for downstream processing
        
        Implementation details:
        - Double newlines (\n\n) are treated as paragraph boundaries
        - If a document lacks clear paragraph structure (e.g., single paragraph),
          it falls back to sentence-based chunking
        - For paragraphs exceeding the chunk size, sentence-based chunking is applied
        - Context preservation is achieved by ensuring the last paragraph of a chunk
          becomes the first paragraph of the next chunk (when appropriate)
        
        This method is ideal for:
        - Well-structured documents like articles, papers, or reports
        - Content where paragraph organization conveys meaning
        - Documents where natural breaks exist between conceptual sections
        - Cases where preserving document structure improves retrieval quality
        
        Args:
            document: Text content to split semantically
            chunk_size: Maximum approximate size per chunk in tokens
            chunk_overlap: Number of tokens to overlap between chunks
            
        Returns:
            List of semantic chunks with paragraph structure preserved
            
        Note:
            Chunk sizes may vary more with semantic chunking than with other methods,
            as maintaining coherent paragraph groups takes precedence over exact
            size enforcement. For strict size control, use token-based chunking.
        """
        # For simplicity, this implementation is similar to sentence chunking
        # but with paragraph awareness
        
        # Split by paragraphs first
        paragraphs = [p.strip() for p in document.split("\n\n") if p.strip()]
        
        # Fallback to sentence chunking if no clear paragraphs
        if len(paragraphs) <= 1:
            return await self._chunk_by_sentence(document, chunk_size, chunk_overlap)
        
        # Process each paragraph and create semantic chunks
        chunks = []
        current_chunk = []
        current_size = 0
        
        for paragraph in paragraphs:
            # Estimate size in tokens
            paragraph_size = len(paragraph.split())
            
            # If paragraph is very large, chunk it further
            if paragraph_size > chunk_size:
                # Add current chunk if not empty
                if current_chunk:
                    chunks.append("\n\n".join(current_chunk))
                    current_chunk = []
                    current_size = 0
                
                # Chunk large paragraph by sentences
                paragraph_chunks = await self._chunk_by_sentence(
                    paragraph, chunk_size, chunk_overlap
                )
                chunks.extend(paragraph_chunks)
                continue
            
            # If adding this paragraph exceeds the chunk size and we have content,
            # finalize the current chunk
            if current_chunk and current_size + paragraph_size > chunk_size:
                chunks.append("\n\n".join(current_chunk))
                
                # Start new chunk with last paragraph for better context
                if current_chunk[-1] != paragraph and len(current_chunk) > 0:
                    current_chunk = [current_chunk[-1]]
                    current_size = len(current_chunk[-1].split())
                else:
                    current_chunk = []
                    current_size = 0
            
            # Add current paragraph
            current_chunk.append(paragraph)
            current_size += paragraph_size
        
        # Add the last chunk if not empty
        if current_chunk:
            chunks.append("\n\n".join(current_chunk))
        
        logger.debug(
            f"Split document into {len(chunks)} chunks semantically",
            extra={"emoji_key": "processing"}
        )
        
        return chunks


# Singleton instance
_document_processor = None


def get_document_processor() -> DocumentProcessor:
    """
    Get or create the singleton DocumentProcessor instance.
    
    This function implements the singleton pattern for the DocumentProcessor class,
    ensuring that only one instance is created and shared throughout the application.
    It provides a consistent, centralized access point for document processing
    capabilities while conserving system resources.
    
    Using a singleton for the DocumentProcessor offers several benefits:
    - Resource efficiency: Prevents multiple instantiations of the processor
    - Consistency: Ensures all components use the same processing configuration
    - Centralized access: Provides a clean API for obtaining the processor
    - Lazy initialization: Creates the instance only when first needed
    
    This function should be used instead of directly instantiating the
    DocumentProcessor class to maintain the singleton pattern and ensure
    proper initialization.
    
    Returns:
        The shared DocumentProcessor instance
        
    Usage Example:
    ```python
    # Get the document processor from anywhere in the codebase
    processor = get_document_processor()
    
    # Use the processor's methods
    chunks = await processor.chunk_document(document_text)
    ```
    
    Note:
        Even though the DocumentProcessor class itself implements singleton logic in
        its __new__ method, this function is the preferred access point as it handles
        the global instance management and follows the established pattern used
        throughout the MCP server codebase.
    """
    global _document_processor
    
    if _document_processor is None:
        _document_processor = DocumentProcessor()
        
    return _document_processor 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/rag.py:
--------------------------------------------------------------------------------

```python
"""MCP tools for Retrieval-Augmented Generation (RAG).

Provides functions to create, manage, and query knowledge bases (vector stores)
and generate text responses augmented with retrieved context.
"""
import re
from typing import Any, Dict, List, Optional

# Import specific exceptions for better error handling hints
from ultimate_mcp_server.exceptions import ProviderError, ResourceError, ToolInputError
from ultimate_mcp_server.services import get_rag_engine

# Moved imports for services to the top level
from ultimate_mcp_server.services.knowledge_base import (
    get_knowledge_base_manager,
    get_knowledge_base_retriever,
)
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)

# --- Service Lazy Initialization ---

_kb_manager = None
_kb_retriever = None
_rag_engine = None

def _get_kb_manager():
    """Lazily initializes and returns the Knowledge Base Manager."""
    global _kb_manager
    if _kb_manager is None:
        logger.debug("Initializing KnowledgeBaseManager...")
        _kb_manager = get_knowledge_base_manager()
        logger.info("KnowledgeBaseManager initialized.")
    return _kb_manager

def _get_kb_retriever():
    """Lazily initializes and returns the Knowledge Base Retriever."""
    global _kb_retriever
    if _kb_retriever is None:
        logger.debug("Initializing KnowledgeBaseRetriever...")
        _kb_retriever = get_knowledge_base_retriever()
        logger.info("KnowledgeBaseRetriever initialized.")
    return _kb_retriever

def _get_rag_engine():
    """Lazily initializes and returns the RAG Engine."""
    global _rag_engine
    if _rag_engine is None:
        logger.debug("Initializing RAGEngine...")
        _rag_engine = get_rag_engine()
        logger.info("RAGEngine initialized.")
    return _rag_engine

# --- Standalone Tool Functions ---

@with_tool_metrics
@with_error_handling
async def create_knowledge_base(
    name: str,
    description: Optional[str] = None,
    embedding_model: Optional[str] = None,
    overwrite: bool = False
) -> Dict[str, Any]:
    """Creates a new, empty knowledge base (vector store) to hold documents.

    This is the first step before adding documents.

    Args:
        name: A unique name for the knowledge base (e.g., "project_docs_v1").
              Must be a valid identifier (letters, numbers, underscores).
        description: (Optional) A brief description of the knowledge base's content or purpose.
        embedding_model: (Optional) The specific embedding model ID to use for this knowledge base
                         (e.g., "openai/text-embedding-3-small"). If None, uses the system default.
                         Consistency is important; use the same model when adding documents later.
        overwrite: (Optional) If True, deletes and recreates the knowledge base if one with the
                   same name already exists. Defaults to False (raises an error if exists).

    Returns:
        A dictionary confirming the creation:
        {
            "success": true,
            "name": "project_docs_v1",
            "message": "Knowledge base 'project_docs_v1' created successfully."
        }
        or an error dictionary if creation failed:
        {
            "success": false,
            "name": "project_docs_v1",
            "error": "Knowledge base 'project_docs_v1' already exists."
        }

    Raises:
        ResourceError: If the knowledge base already exists (and overwrite=False) or
                        if there's an issue during creation (e.g., invalid name).
        ToolInputError: If the provided name is invalid.
    """
    # Input validation (basic example)
    if not name or not re.match(r"^[a-zA-Z0-9_]+$", name):
        raise ToolInputError(f"Invalid knowledge base name: '{name}'. Use only letters, numbers, underscores.")

    kb_manager = _get_kb_manager() # Use lazy getter
    try:
        result = await kb_manager.create_knowledge_base(
            name=name,
            description=description,
            embedding_model=embedding_model,
            overwrite=overwrite
        )
        return result
    except Exception as e:
        logger.error(f"Failed to create knowledge base '{name}': {e}", exc_info=True)
        # Re-raise specific error if possible, otherwise wrap
        if isinstance(e, (ResourceError, ToolInputError)): 
            raise
        raise ResourceError(f"Failed to create knowledge base '{name}': {str(e)}", resource_type="knowledge_base", resource_id=name, cause=e) from e

@with_tool_metrics
@with_error_handling
async def list_knowledge_bases() -> Dict[str, Any]:
    """Lists all available knowledge bases and their metadata.

    Returns:
        A dictionary containing a list of knowledge base details:
        {
            "success": true,
            "knowledge_bases": [
                {
                    "name": "project_docs_v1",
                    "description": "Documentation for Project X",
                    "embedding_model": "openai/text-embedding-3-small",
                    "document_count": 150,
                    "created_at": "2023-10-27T10:00:00Z"
                },
                { ... } # Other knowledge bases
            ]
        }
        or an error dictionary:
        {
            "success": false,
            "error": "Failed to retrieve knowledge base list."
        }
    Raises:
        ResourceError: If there's an issue retrieving the list from the backend.
    """
    kb_manager = _get_kb_manager()
    try:
        result = await kb_manager.list_knowledge_bases()
        return result
    except Exception as e:
        logger.error(f"Failed to list knowledge bases: {e}", exc_info=True)
        raise ResourceError(f"Failed to list knowledge bases: {str(e)}", resource_type="knowledge_base", cause=e) from e

@with_tool_metrics
@with_error_handling
async def delete_knowledge_base(name: str) -> Dict[str, Any]:
    """Deletes an existing knowledge base and all its documents.

    Warning: This action is irreversible.

    Args:
        name: The exact name of the knowledge base to delete.

    Returns:
        A dictionary confirming the deletion:
        {
            "success": true,
            "name": "project_docs_v1",
            "message": "Knowledge base 'project_docs_v1' deleted successfully."
        }
        or an error dictionary:
        {
            "success": false,
            "name": "project_docs_v1",
            "error": "Knowledge base 'project_docs_v1' not found."
        }

    Raises:
        ResourceError: If the knowledge base doesn't exist or if deletion fails.
        ToolInputError: If the provided name is invalid.
    """
    if not name:
        raise ToolInputError("Knowledge base name cannot be empty.")

    kb_manager = _get_kb_manager()
    try:
        result = await kb_manager.delete_knowledge_base(name)
        return result
    except Exception as e:
        logger.error(f"Failed to delete knowledge base '{name}': {e}", exc_info=True)
        if isinstance(e, (ResourceError, ToolInputError)): 
            raise
        raise ResourceError(f"Failed to delete knowledge base '{name}': {str(e)}", resource_type="knowledge_base", resource_id=name, cause=e) from e

@with_tool_metrics
@with_error_handling
async def add_documents(
    knowledge_base_name: str,
    documents: List[str],
    metadatas: Optional[List[Dict[str, Any]]] = None,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    chunk_method: str = "semantic",
    embedding_model: Optional[str] = None
) -> Dict[str, Any]:
    """Adds one or more documents to a specified knowledge base.

    The documents are split into chunks, embedded, and stored for later retrieval.

    Args:
        knowledge_base_name: The name of the existing knowledge base to add documents to.
        documents: A list of strings, where each string is the full text content of a document.
        metadatas: (Optional) A list of dictionaries, one for each document in the `documents` list.
                   Each dictionary should contain metadata relevant to the corresponding document
                   (e.g., {"source": "filename.pdf", "page": 1, "author": "Alice"}).
                   This metadata is stored alongside the document chunks and can be used for filtering during retrieval.
                   If provided, `len(metadatas)` MUST equal `len(documents)`.
        chunk_size: (Optional) The target size for document chunks. Interpretation depends on `chunk_method`
                    (e.g., tokens for "token" method, characters for "character", approximate size for "semantic").
                    Defaults to 1000.
        chunk_overlap: (Optional) The number of units (tokens, characters) to overlap between consecutive chunks.
                       Helps maintain context across chunk boundaries. Defaults to 200.
        chunk_method: (Optional) The method used for splitting documents into chunks.
                      Options: "semantic" (attempts to split at meaningful semantic boundaries, recommended),
                      "token" (splits by token count using tiktoken), "sentence" (splits by sentence).
                      Defaults to "semantic".
        embedding_model: (Optional) The specific embedding model ID to use. If None, uses the model
                         associated with the knowledge base (or the system default if none was specified
                         at creation). It's best practice to ensure this matches the KB's model.

    Returns:
        A dictionary summarizing the addition process:
        {
            "success": true,
            "knowledge_base_name": "project_docs_v1",
            "documents_added": 5,
            "chunks_created": 75,
            "message": "Successfully added 5 documents (75 chunks) to 'project_docs_v1'."
        }
        or an error dictionary:
        {
            "success": false,
            "knowledge_base_name": "project_docs_v1",
            "error": "Knowledge base 'project_docs_v1' not found."
        }

    Raises:
        ResourceError: If the knowledge base doesn't exist or if there's an error during processing/storage.
        ToolInputError: If inputs are invalid (e.g., documents/metadatas length mismatch, invalid chunk_method).
        ProviderError: If the LLM provider fails during generation.
    """
    if not knowledge_base_name:
        raise ToolInputError("Knowledge base name cannot be empty.")
    if not documents or not isinstance(documents, list) or not all(isinstance(d, str) for d in documents):
        raise ToolInputError("'documents' must be a non-empty list of strings.")
    if metadatas and (not isinstance(metadatas, list) or len(metadatas) != len(documents)):
        raise ToolInputError("'metadatas', if provided, must be a list with the same length as 'documents'.")
    if chunk_method not in ["semantic", "token", "sentence", "character", "paragraph"]: # Added more methods
         raise ToolInputError(f"Invalid chunk_method: '{chunk_method}'. Must be one of: semantic, token, sentence, character, paragraph.")

    kb_manager = _get_kb_manager()
    try:
        result = await kb_manager.add_documents(
            knowledge_base_name=knowledge_base_name,
            documents=documents,
            metadatas=metadatas,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            chunk_method=chunk_method,
            embedding_model=embedding_model
        )
        return result
    except Exception as e:
        logger.error(f"Failed to add documents to knowledge base '{knowledge_base_name}': {e}", exc_info=True)
        if isinstance(e, (ResourceError, ToolInputError, ProviderError)):
            raise
        raise ResourceError(f"Failed to add documents to knowledge base '{knowledge_base_name}': {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e

@with_tool_metrics
@with_error_handling
async def retrieve_context(
    knowledge_base_name: str,
    query: str,
    top_k: int = 5,
    retrieval_method: str = "vector",
    min_score: Optional[float] = None, # Changed default to None
    metadata_filter: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """Retrieves relevant document chunks (context) from a knowledge base based on a query.

    Searches the specified knowledge base for chunks semantically similar to the query.

    Args:
        knowledge_base_name: The name of the knowledge base to query.
        query: The text query to search for relevant context.
        top_k: (Optional) The maximum number of relevant chunks to retrieve. Defaults to 5.
        retrieval_method: (Optional) The method used for retrieval.
                          Options: "vector" (semantic similarity search), "hybrid" (combines vector search
                          with keyword-based search, may require specific backend support).
                          Defaults to "vector".
        min_score: (Optional) The minimum similarity score (typically between 0 and 1) for a chunk
                   to be included in the results. Higher values mean stricter relevance.
                   If None (default), the backend decides or no filtering is applied.
        metadata_filter: (Optional) A dictionary used to filter results based on metadata associated
                         with the chunks during `add_documents`. Filters use exact matches.
                         Example: {"source": "filename.pdf", "page": 5}
                         Example: {"author": "Alice"}
                         Defaults to None (no metadata filtering).

    Returns:
        A dictionary containing the retrieved context:
        {
            "success": true,
            "query": "What are the project goals?",
            "knowledge_base_name": "project_docs_v1",
            "retrieved_chunks": [
                {
                    "content": "The main goal of Project X is to improve user engagement...",
                    "score": 0.85,
                    "metadata": {"source": "project_plan.docx", "page": 1}
                },
                { ... } # Other relevant chunks
            ]
        }
        or an error dictionary:
        {
            "success": false,
            "knowledge_base_name": "project_docs_v1",
            "error": "Knowledge base 'project_docs_v1' not found."
        }

    Raises:
        ResourceError: If the knowledge base doesn't exist or retrieval fails.
        ToolInputError: If inputs are invalid (e.g., invalid retrieval_method).
    """
    if not knowledge_base_name:
        raise ToolInputError("Knowledge base name cannot be empty.")
    if not query or not isinstance(query, str):
        raise ToolInputError("Query must be a non-empty string.")
    if retrieval_method not in ["vector", "hybrid"]: # Add more methods if supported by backend
        raise ToolInputError(f"Invalid retrieval_method: '{retrieval_method}'. Must be one of: vector, hybrid.")

    kb_retriever = _get_kb_retriever()
    try:
        # Note: The actual implementation might vary based on the retriever service
        # Here we assume the service handles different methods via parameters or distinct functions
        # Keeping the previous logic structure for now.
        if retrieval_method == "hybrid":
            # Assuming a specific hybrid method exists or the main retrieve handles it
            # This might need adjustment based on the actual service implementation
            logger.debug(f"Attempting hybrid retrieval for '{knowledge_base_name}'")
            result = await kb_retriever.retrieve_hybrid( # Or potentially kb_retriever.retrieve with a method flag
                knowledge_base_name=knowledge_base_name,
                query=query,
                top_k=top_k,
                min_score=min_score,
                metadata_filter=metadata_filter
            )
        else: # Default to vector
            logger.debug(f"Attempting vector retrieval for '{knowledge_base_name}'")
            result = await kb_retriever.retrieve(
                knowledge_base_name=knowledge_base_name,
                query=query,
                top_k=top_k,
                rerank=True, # Assuming rerank is often desired for vector
                min_score=min_score,
                metadata_filter=metadata_filter
            )
        return result
    except Exception as e:
        logger.error(f"Failed to retrieve context from knowledge base '{knowledge_base_name}' for query '{query}': {e}", exc_info=True)
        if isinstance(e, (ResourceError, ToolInputError)):
            raise
        raise ResourceError(f"Failed to retrieve context from knowledge base '{knowledge_base_name}': {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e

@with_tool_metrics
@with_error_handling
async def generate_with_rag(
    knowledge_base_name: str,
    query: str,
    provider: Optional[str] = None,
    model: Optional[str] = None,
    template: str = "rag_default",
    max_tokens: int = 1000,
    temperature: float = 0.3,
    top_k: int = 5,
    retrieval_method: str = "vector",
    min_score: Optional[float] = None, # Changed default to None
    include_sources: bool = True
) -> Dict[str, Any]:
    """Generates a response to a query using context retrieved from a knowledge base (RAG).

    This function first retrieves relevant document chunks using `retrieve_context` parameters,
    then feeds the query and the retrieved context into an LLM using a specified prompt template
    to generate a final, context-aware answer.

    Args:
        knowledge_base_name: The name of the knowledge base to retrieve context from.
        query: The user's query or question to be answered.
        provider: (Optional) The LLM provider for the generation step (e.g., "openai", "anthropic").
                  If None, the RAG engine selects a default provider.
        model: (Optional) The specific LLM model ID for generation (e.g., "openai/gpt-4.1-mini").
               If None, the RAG engine selects a default model.
        template: (Optional) The name of the prompt template to use for combining the query and context.
                  Available templates might include: "rag_default" (standard Q&A), "rag_with_sources"
                  (default, includes source attribution), "rag_summarize" (summarizes retrieved context
                  based on query), "rag_analysis" (performs analysis based on context).
                  Defaults to "rag_default" (or potentially "rag_with_sources" depending on engine default).
        max_tokens: (Optional) Maximum number of tokens for the generated LLM response. Defaults to 1000.
        temperature: (Optional) Sampling temperature for the LLM generation (0.0 to 1.0). Lower values
                     are more deterministic, higher values more creative. Defaults to 0.3.
        top_k: (Optional) Maximum number of context chunks to retrieve (passed to retrieval). Defaults to 5.
        retrieval_method: (Optional) Method for retrieving context ("vector", "hybrid"). Defaults to "vector".
        min_score: (Optional) Minimum similarity score for retrieved chunks. Defaults to None.
        include_sources: (Optional) Whether the final response object should explicitly include details
                         of the source chunks used for generation. Defaults to True.

    Returns:
        A dictionary containing the generated response and related information:
        {
            "success": true,
            "query": "What are the project goals?",
            "knowledge_base_name": "project_docs_v1",
            "generated_response": "The main goal of Project X is to improve user engagement by implementing features A, B, and C.",
            "sources": [ # Included if include_sources=True
                {
                    "content": "The main goal of Project X is to improve user engagement...",
                    "score": 0.85,
                    "metadata": {"source": "project_plan.docx", "page": 1}
                },
                { ... } # Other source chunks used
            ],
            "model": "openai/gpt-4.1-mini", # Actual model used
            "provider": "openai",
            "tokens": { "input": ..., "output": ..., "total": ... }, # Generation tokens
            "cost": 0.000120,
            "processing_time": 5.2,
            "retrieval_time": 0.8, # Time spent only on retrieval
            "generation_time": 4.4 # Time spent only on generation
        }
        or an error dictionary:
        {
            "success": false,
            "knowledge_base_name": "project_docs_v1",
            "error": "RAG generation failed: Knowledge base 'project_docs_v1' not found."
        }

    Raises:
        ResourceError: If the knowledge base doesn't exist or retrieval fails.
        ProviderError: If the LLM provider fails during generation.
        ToolInputError: If inputs are invalid.
    """
    if not knowledge_base_name:
        raise ToolInputError("Knowledge base name cannot be empty.")
    if not query or not isinstance(query, str):
        raise ToolInputError("Query must be a non-empty string.")

    rag_engine = _get_rag_engine()
    try:
        result = await rag_engine.generate_with_rag(
            knowledge_base_name=knowledge_base_name,
            query=query,
            provider=provider,
            model=model,
            template=template,
            max_tokens=max_tokens,
            temperature=temperature,
            top_k=top_k,
            retrieval_method=retrieval_method,
            min_score=min_score,
            include_sources=include_sources
        )
        return result
    except Exception as e:
        logger.error(f"RAG generation failed for query on '{knowledge_base_name}': {e}", exc_info=True)
        if isinstance(e, (ResourceError, ProviderError, ToolInputError)): 
            raise
        # Wrap generic errors
        raise ResourceError(f"RAG generation failed: {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/prompts/templates.py:
--------------------------------------------------------------------------------

```python
"""Prompt template management and rendering for Ultimate MCP Server."""
import json
import re
from enum import Enum
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from jinja2 import Environment, FileSystemLoader, Template, select_autoescape

from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.services.prompts.repository import get_prompt_repository
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


class TemplateFormat(str, Enum):
    """Template format options."""
    JINJA = "jinja"
    SIMPLE = "simple"
    MARKDOWN = "markdown"
    JSON = "json"


class TemplateType(str, Enum):
    """Template type options."""
    COMPLETION = "completion"
    CHAT = "chat"
    SYSTEM = "system"
    USER = "user"
    FUNCTION = "function"
    EXTRACTION = "extraction"


class PromptTemplate:
    """Template for generating prompts for LLM providers."""
    
    def __init__(
        self,
        template: str,
        template_id: str,
        format: Union[str, TemplateFormat] = TemplateFormat.JINJA,
        type: Union[str, TemplateType] = TemplateType.COMPLETION,
        metadata: Optional[Dict[str, Any]] = None,
        provider_defaults: Optional[Dict[str, Any]] = None,
        description: Optional[str] = None,
        required_vars: Optional[List[str]] = None,
        example_vars: Optional[Dict[str, Any]] = None,
    ):
        """Initialize a prompt template.
        
        Args:
            template: Template string
            template_id: Unique identifier for this template
            format: Template format (jinja, simple, markdown, or json)
            type: Template type (completion, chat, system, user, function, extraction)
            metadata: Optional metadata for the template
            provider_defaults: Optional provider-specific defaults
            description: Optional description of the template
            required_vars: Optional list of required variables
            example_vars: Optional example variables for testing
        """
        self.template = template
        self.template_id = template_id
        
        # Normalize format and type to enum values
        self.format = TemplateFormat(format) if isinstance(format, str) else format
        self.type = TemplateType(type) if isinstance(type, str) else type
        
        # Store additional attributes
        self.metadata = metadata or {}
        self.provider_defaults = provider_defaults or {}
        self.description = description
        self.example_vars = example_vars or {}
        
        # Extract required variables based on format
        self.required_vars = required_vars or self._extract_required_vars()
        
        # Compile template if using Jinja format
        self._compiled_template: Optional[Template] = None
        if self.format == TemplateFormat.JINJA:
            self._compiled_template = self._compile_template()
    
    def _extract_required_vars(self) -> List[str]:
        """Extract required variables from template based on format.
        
        Returns:
            List of required variable names
        """
        if self.format == TemplateFormat.JINJA:
            # Extract variables using regex for basic Jinja pattern
            matches = re.findall(r'{{(.*?)}}', self.template)
            vars_set: Set[str] = set()
            
            for match in matches:
                # Extract variable name (removing filters and whitespace)
                var_name = match.split('|')[0].strip()
                if var_name and not var_name.startswith('_'):
                    vars_set.add(var_name)
                    
            return sorted(list(vars_set))
            
        elif self.format == TemplateFormat.SIMPLE:
            # Extract variables from {variable} format
            matches = re.findall(r'{([^{}]*)}', self.template)
            return sorted(list(set(matches)))
            
        elif self.format == TemplateFormat.JSON:
            # Try to find JSON template variables
            try:
                # Parse as JSON to find potential variables
                template_dict = json.loads(self.template)
                return self._extract_json_vars(template_dict)
            except json.JSONDecodeError:
                logger.warning(
                    f"Failed to parse JSON template: {self.template_id}",
                    emoji_key="warning"
                )
                return []
                
        # Default: no variables detected
        return []
    
    def _extract_json_vars(self, obj: Any, prefix: str = "") -> List[str]:
        """Recursively extract variables from a JSON object.
        
        Args:
            obj: JSON object to extract variables from
            prefix: Prefix for nested variables
            
        Returns:
            List of variable names
        """
        vars_list = []
        
        if isinstance(obj, dict):
            for key, value in obj.items():
                if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
                    # This is a variable placeholder
                    var_name = value[2:-1]  # Remove ${ and }
                    vars_list.append(f"{prefix}{var_name}")
                elif isinstance(value, (dict, list)):
                    # Recursively extract from nested structures
                    nested_prefix = f"{prefix}{key}." if prefix else f"{key}."
                    vars_list.extend(self._extract_json_vars(value, nested_prefix))
        elif isinstance(obj, list):
            for _i, item in enumerate(obj):
                if isinstance(item, (dict, list)):
                    vars_list.extend(self._extract_json_vars(item, prefix))
                elif isinstance(item, str) and item.startswith("${") and item.endswith("}"):
                    var_name = item[2:-1]
                    vars_list.append(f"{prefix}{var_name}")
        
        return sorted(list(set(vars_list)))
    
    def _compile_template(self) -> Template:
        """Compile the Jinja template.
        
        Returns:
            Compiled Jinja template
            
        Raises:
            ValueError: If template compilation fails
        """
        try:
            env = Environment(autoescape=select_autoescape(['html', 'xml']))
            return env.from_string(self.template)
        except Exception as e:
            logger.error(
                f"Failed to compile template {self.template_id}: {str(e)}",
                emoji_key="error"
            )
            raise ValueError(f"Invalid template format: {str(e)}") from e
    
    def render(self, variables: Dict[str, Any]) -> str:
        """Render the template with the provided variables.
        
        Args:
            variables: Dictionary of variables to render with
            
        Returns:
            Rendered template string
            
        Raises:
            ValueError: If required variables are missing
        """
        # Check for required variables
        missing_vars = [var for var in self.required_vars if var not in variables]
        if missing_vars:
            raise ValueError(
                f"Missing required variables for template {self.template_id}: {', '.join(missing_vars)}"
            )
        
        # Render based on format
        if self.format == TemplateFormat.JINJA:
            if not self._compiled_template:
                self._compiled_template = self._compile_template()
            return self._compiled_template.render(**variables)
            
        elif self.format == TemplateFormat.SIMPLE:
            # Simple variable substitution with {var} syntax
            result = self.template
            for var_name, var_value in variables.items():
                result = result.replace(f"{{{var_name}}}", str(var_value))
            return result
            
        elif self.format == TemplateFormat.JSON:
            try:
                # Parse template as JSON
                template_dict = json.loads(self.template)
                
                # Replace variables in the JSON structure
                rendered_dict = self._render_json_vars(template_dict, variables)
                
                # Convert back to JSON string
                return json.dumps(rendered_dict)
                
            except json.JSONDecodeError:
                logger.error(
                    f"Failed to parse JSON template: {self.template_id}",
                    emoji_key="error"
                )
                # Fall back to simple replacement
                return self.template
            
        elif self.format == TemplateFormat.MARKDOWN:
            # Process markdown with simple variable substitution
            result = self.template
            for var_name, var_value in variables.items():
                result = result.replace(f"{{{var_name}}}", str(var_value))
            return result
            
        # Default: return template as is
        return self.template
    
    def _render_json_vars(self, obj: Any, variables: Dict[str, Any]) -> Any:
        """Recursively render variables in a JSON object.
        
        Args:
            obj: JSON object to render variables in
            variables: Dictionary of variables to render with
            
        Returns:
            Rendered JSON object
        """
        if isinstance(obj, dict):
            return {
                key: self._render_json_vars(value, variables) 
                for key, value in obj.items()
            }
        elif isinstance(obj, list):
            return [self._render_json_vars(item, variables) for item in obj]
        elif isinstance(obj, str) and obj.startswith("${") and obj.endswith("}"):
            # This is a variable placeholder
            var_name = obj[2:-1]  # Remove ${ and }
            # Get the variable value, or keep placeholder if not found
            return variables.get(var_name, obj)
        else:
            return obj
    
    def validate_variables(self, variables: Dict[str, Any]) -> Tuple[bool, List[str]]:
        """Validate that all required variables are provided.
        
        Args:
            variables: Dictionary of variables to validate
            
        Returns:
            Tuple of (is_valid, missing_variables)
        """
        missing_vars = [var for var in self.required_vars if var not in variables]
        return len(missing_vars) == 0, missing_vars
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert template to dictionary representation.
        
        Returns:
            Dictionary representation of template
        """
        return {
            "template_id": self.template_id,
            "template": self.template,
            "format": self.format.value,
            "type": self.type.value,
            "metadata": self.metadata,
            "provider_defaults": self.provider_defaults,
            "description": self.description,
            "required_vars": self.required_vars,
            "example_vars": self.example_vars,
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "PromptTemplate":
        """Create a template from dictionary representation.
        
        Args:
            data: Dictionary representation of template
            
        Returns:
            PromptTemplate instance
        """
        return cls(
            template=data["template"],
            template_id=data["template_id"],
            format=data.get("format", TemplateFormat.JINJA),
            type=data.get("type", TemplateType.COMPLETION),
            metadata=data.get("metadata"),
            provider_defaults=data.get("provider_defaults"),
            description=data.get("description"),
            required_vars=data.get("required_vars"),
            example_vars=data.get("example_vars"),
        )
    
    def get_provider_defaults(self, provider: str) -> Dict[str, Any]:
        """Get provider-specific default parameters.
        
        Args:
            provider: Provider name
            
        Returns:
            Dictionary of default parameters
        """
        return self.provider_defaults.get(provider, {})


class PromptTemplateRenderer:
    """Service for rendering prompt templates."""
    
    def __init__(self, template_dir: Optional[Union[str, Path]] = None):
        """Initialize the prompt template renderer.
        
        Args:
            template_dir: Optional directory containing template files
        """
        # Set template directory
        if template_dir:
            self.template_dir = Path(template_dir)
        else:
            # Default to project directory / templates
            self.template_dir = Path.home() / ".ultimate" / "templates"
            
        # Create directory if it doesn't exist
        self.template_dir.mkdir(parents=True, exist_ok=True)
        
        # Set up Jinja environment for file-based templates
        self.jinja_env = Environment(
            loader=FileSystemLoader(str(self.template_dir)),
            autoescape=select_autoescape(['html', 'xml']),
            trim_blocks=True,
            lstrip_blocks=True,
        )
        
        # Get prompt repository for template storage
        self.repository = get_prompt_repository()
        
        # Template cache
        self._template_cache: Dict[str, PromptTemplate] = {}
    
    async def get_template(self, template_id: str) -> Optional[PromptTemplate]:
        """Get a template by ID.
        
        Args:
            template_id: Template identifier
            
        Returns:
            PromptTemplate instance or None if not found
        """
        # Check cache first
        if template_id in self._template_cache:
            return self._template_cache[template_id]
            
        # Look up in repository
        template_data = await self.repository.get_prompt(template_id)
        if template_data:
            template = PromptTemplate.from_dict(template_data)
            # Cache for future use
            self._template_cache[template_id] = template
            return template
        
        # Try to load from file if not in repository
        template_path = self.template_dir / f"{template_id}.j2"
        if template_path.exists():
            # Load template from file
            with open(template_path, "r", encoding="utf-8") as f:
                template_content = f.read()
                
            # Create template instance
            template = PromptTemplate(
                template=template_content,
                template_id=template_id,
                format=TemplateFormat.JINJA,
            )
            
            # Cache for future use
            self._template_cache[template_id] = template
            return template
            
        return None
    
    async def render_template(
        self,
        template_id: str,
        variables: Dict[str, Any],
        provider: Optional[str] = None
    ) -> str:
        """Render a template with variables.
        
        Args:
            template_id: Template identifier
            variables: Variables to render the template with
            provider: Optional provider name for provider-specific adjustments
            
        Returns:
            Rendered template string
            
        Raises:
            ValueError: If template not found or rendering fails
        """
        # Get the template
        template = await self.get_template(template_id)
        if not template:
            raise ValueError(f"Template not found: {template_id}")
            
        # Check if all required variables are provided
        is_valid, missing_vars = template.validate_variables(variables)
        if not is_valid:
            raise ValueError(
                f"Missing required variables for template {template_id}: {', '.join(missing_vars)}"
            )
        
        # Render the template
        rendered = template.render(variables)
        
        # Apply provider-specific adjustments if provided
        if provider:
            rendered = self._apply_provider_adjustments(rendered, provider, template)
            
        return rendered
    
    def _apply_provider_adjustments(
        self,
        rendered: str,
        provider: str,
        template: PromptTemplate
    ) -> str:
        """Apply provider-specific adjustments to rendered template.
        
        Args:
            rendered: Rendered template string
            provider: Provider name
            template: Template being rendered
            
        Returns:
            Adjusted template string
        """
        # Apply provider-specific transformations
        if provider == Provider.ANTHROPIC.value:
            # Anthropic-specific adjustments
            if template.type == TemplateType.SYSTEM:
                # Ensure no trailing newlines for system prompts
                rendered = rendered.rstrip()
        elif provider == Provider.OPENAI.value:
            # OpenAI-specific adjustments
            pass
        elif provider == Provider.GEMINI.value:
            # Gemini-specific adjustments
            pass
        
        return rendered
    
    async def save_template(self, template: PromptTemplate) -> bool:
        """Save a template to the repository.
        
        Args:
            template: Template to save
            
        Returns:
            True if successful
        """
        # Update cache
        self._template_cache[template.template_id] = template
        
        # Save to repository
        return await self.repository.save_prompt(
            prompt_id=template.template_id,
            prompt_data=template.to_dict()
        )
    
    async def delete_template(self, template_id: str) -> bool:
        """Delete a template from the repository.
        
        Args:
            template_id: Template identifier
            
        Returns:
            True if successful
        """
        # Remove from cache
        if template_id in self._template_cache:
            del self._template_cache[template_id]
            
        # Delete from repository
        return await self.repository.delete_prompt(template_id)
    
    async def list_templates(self) -> List[str]:
        """List available templates.
        
        Returns:
            List of template IDs
        """
        # Get templates from repository
        return await self.repository.list_prompts()
    
    def clear_cache(self) -> None:
        """Clear the template cache."""
        self._template_cache.clear()


# Global template renderer instance
_template_renderer: Optional[PromptTemplateRenderer] = None


def get_template_renderer() -> PromptTemplateRenderer:
    """Get the global template renderer instance.
    
    Returns:
        PromptTemplateRenderer instance
    """
    global _template_renderer
    if _template_renderer is None:
        _template_renderer = PromptTemplateRenderer()
    return _template_renderer


@lru_cache(maxsize=32)
def get_template_path(template_id: str) -> Optional[Path]:
    """Get the path to a template file.
    
    Args:
        template_id: Template identifier
        
    Returns:
        Path to template file or None if not found
    """
    # Try standard locations
    template_dirs = [
        # First check the user's template directory
        Path.home() / ".ultimate" / "templates",
        # Then check the package's template directory
        Path(__file__).parent.parent.parent.parent / "templates",
    ]
    
    for template_dir in template_dirs:
        # Check for .j2 extension first
        template_path = template_dir / f"{template_id}.j2"
        if template_path.exists():
            return template_path
            
        # Check for .tpl extension
        template_path = template_dir / f"{template_id}.tpl"
        if template_path.exists():
            return template_path
            
        # Check for .md extension
        template_path = template_dir / f"{template_id}.md"
        if template_path.exists():
            return template_path
            
        # Check for .json extension
        template_path = template_dir / f"{template_id}.json"
        if template_path.exists():
            return template_path
    
    return None


async def render_prompt_template(
    template_id: str,
    variables: Dict[str, Any],
    provider: Optional[str] = None
) -> str:
    """Render a prompt template.
    
    Args:
        template_id: Template identifier
        variables: Variables to render the template with
        provider: Optional provider name for provider-specific adjustments
        
    Returns:
        Rendered template string
        
    Raises:
        ValueError: If template not found or rendering fails
    """
    renderer = get_template_renderer()
    return await renderer.render_template(template_id, variables, provider)


async def render_prompt(
    template_content: str,
    variables: Dict[str, Any],
    format: Union[str, TemplateFormat] = TemplateFormat.JINJA,
) -> str:
    """Render a prompt from template content.
    
    Args:
        template_content: Template content string
        variables: Variables to render the template with
        format: Template format
        
    Returns:
        Rendered template string
        
    Raises:
        ValueError: If rendering fails
    """
    # Create a temporary template
    template = PromptTemplate(
        template=template_content,
        template_id="_temp_template",
        format=format,
    )
    
    # Render and return
    return template.render(variables)


async def get_template_defaults(
    template_id: str,
    provider: str
) -> Dict[str, Any]:
    """Get provider-specific default parameters for a template.
    
    Args:
        template_id: Template identifier
        provider: Provider name
        
    Returns:
        Dictionary of default parameters
        
    Raises:
        ValueError: If template not found
    """
    renderer = get_template_renderer()
    template = await renderer.get_template(template_id)
    if not template:
        raise ValueError(f"Template not found: {template_id}")
        
    return template.get_provider_defaults(provider)
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/openrouter.py:
--------------------------------------------------------------------------------

```python
# ultimate/core/providers/openrouter.py
"""OpenRouter provider implementation."""
import os
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union

from openai import AsyncOpenAI

from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.constants import DEFAULT_MODELS, Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.utils import get_logger

# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.openrouter")

# Default OpenRouter Base URL (can be overridden by config)
DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"

class OpenRouterProvider(BaseProvider):
    """Provider implementation for OpenRouter API (using OpenAI-compatible interface)."""

    provider_name = Provider.OPENROUTER.value

    def __init__(self, **kwargs):
        """Initialize the OpenRouter provider.

        Args:
            **kwargs: Additional options:
                - base_url (str): Override the default OpenRouter API base URL.
                - http_referer (str): Optional HTTP-Referer header.
                - x_title (str): Optional X-Title header.
        """
        config = get_config().providers.openrouter
        super().__init__(**kwargs)
        self.name = "openrouter"
        
        # Use config default first, then fallback to constants
        self.default_model = config.default_model or DEFAULT_MODELS.get(Provider.OPENROUTER)
        if not config.default_model:
            logger.debug(f"No default model set in config for OpenRouter, using fallback from constants: {self.default_model}")

        # Get base_url from config, fallback to kwargs, then constant
        self.base_url = config.base_url or kwargs.get("base_url", DEFAULT_OPENROUTER_BASE_URL)

        # Get additional headers from config's additional_params
        self.http_referer = config.additional_params.get("http_referer") or kwargs.get("http_referer")
        self.x_title = config.additional_params.get("x_title") or kwargs.get("x_title")
        
        # We'll create the client in initialize() instead
        self.client = None
        self.available_models = []
    
    async def initialize(self) -> bool:
        """Initialize the OpenRouter client.
        
        Returns:
            bool: True if initialization was successful
        """
        try:
            # Create headers dictionary
            headers = {}
            if self.http_referer:
                headers["HTTP-Referer"] = self.http_referer
            if self.x_title:
                headers["X-Title"] = self.x_title
            
            # Get timeout from config
            config = get_config().providers.openrouter
            timeout = config.timeout or 30.0  # Default timeout 30s
            
            # Check if API key is available
            if not self.api_key:
                logger.warning(f"{self.name} API key not found in configuration. Provider will be unavailable.")
                return False
                
            # Create the client
            self.client = AsyncOpenAI(
                base_url=self.base_url,
                api_key=self.api_key,
                default_headers=headers,
                timeout=timeout
            )
            
            # Pre-fetch available models
            try:
                self.available_models = await self.list_models()
                logger.info(f"Loaded {len(self.available_models)} models from OpenRouter")
            except Exception as model_err:
                logger.warning(f"Failed to fetch models from OpenRouter: {str(model_err)}")
                # Use hardcoded fallback models
                self.available_models = self._get_fallback_models()
            
            logger.success(
                "OpenRouter provider initialized successfully", 
                emoji_key="provider"
            )
            return True
            
        except Exception as e:
            logger.error(
                f"Failed to initialize OpenRouter provider: {str(e)}", 
                emoji_key="error"
            )
            return False

    def _initialize_client(self, **kwargs):
        """Initialize the OpenAI async client with OpenRouter specifics."""
        # This method is now deprecated - use initialize() instead
        logger.warning("_initialize_client() is deprecated, use initialize() instead")
        return False

    async def generate_completion(
        self,
        prompt: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> ModelResponse:
        """Generate a completion using OpenRouter.

        Args:
            prompt: Text prompt to send to the model
            model: Model name (e.g., "openai/gpt-4.1-mini", "google/gemini-flash-1.5")
            max_tokens: Maximum tokens to generate
            temperature: Temperature parameter (0.0-1.0)
            **kwargs: Additional model-specific parameters, including:
                - extra_headers (Dict): Additional headers for this specific call.
                - extra_body (Dict): OpenRouter-specific arguments.

        Returns:
            ModelResponse with completion result

        Raises:
            Exception: If API call fails
        """
        if not self.client:
            initialized = await self._initialize_client()
            if not initialized:
                raise RuntimeError(f"{self.provider_name} provider not initialized.")

        # Use default model if not specified
        model = model or self.default_model

        # Ensure we have a model name before proceeding
        if model is None:
            logger.error("Completion failed: No model specified and no default model configured for OpenRouter.")
            raise ValueError("No model specified and no default model configured for OpenRouter.")

        # Strip provider prefix only if it matches OUR provider name
        if model.startswith(f"{self.provider_name}:"):
            model = model.split(":", 1)[1]
            logger.debug(f"Stripped provider prefix from model name: {model}")
        # Note: Keep prefixes like 'openai/' or 'google/' as OpenRouter uses them.
        # DO NOT strip other provider prefixes as they're needed for OpenRouter routing

        # Create messages
        messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]

        # Prepare API call parameters
        params = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
        }

        if max_tokens is not None:
            params["max_tokens"] = max_tokens

        # Extract OpenRouter specific args from kwargs
        extra_headers = kwargs.pop("extra_headers", {})
        extra_body = kwargs.pop("extra_body", {})

        json_mode = kwargs.pop("json_mode", False)
        if json_mode:
            # OpenRouter uses OpenAI-compatible API
            params["response_format"] = {"type": "json_object"}
            self.logger.debug("Setting response_format to JSON mode for OpenRouter")

        # Add any remaining kwargs to the main params (standard OpenAI args)
        params.update(kwargs)

        self.logger.info(
            f"Generating completion with {self.provider_name} model {model}",
            emoji_key=self.provider_name,
            prompt_length=len(prompt),
            json_mode_requested=json_mode
        )

        try:
            # Make API call with timing
            response, processing_time = await self.process_with_timer(
                self.client.chat.completions.create, **params, extra_headers=extra_headers, extra_body=extra_body
            )

            # Extract response text
            completion_text = response.choices[0].message.content

            # Create standardized response
            result = ModelResponse(
                text=completion_text,
                model=response.model, # Use model returned by API
                provider=self.provider_name,
                input_tokens=response.usage.prompt_tokens,
                output_tokens=response.usage.completion_tokens,
                total_tokens=response.usage.total_tokens,
                processing_time=processing_time,
                raw_response=response,
            )

            self.logger.success(
                f"{self.provider_name} completion successful",
                emoji_key="success",
                model=result.model,
                tokens={
                    "input": result.input_tokens,
                    "output": result.output_tokens
                },
                cost=result.cost, # Will be calculated by ModelResponse
                time=result.processing_time
            )

            return result

        except Exception as e:
            self.logger.error(
                f"{self.provider_name} completion failed for model {model}: {str(e)}",
                emoji_key="error",
                model=model
            )
            raise

    async def generate_completion_stream(
        self,
        prompt: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
        """Generate a streaming completion using OpenRouter.

        Args:
            prompt: Text prompt to send to the model
            model: Model name (e.g., "openai/gpt-4.1-mini")
            max_tokens: Maximum tokens to generate
            temperature: Temperature parameter (0.0-1.0)
            **kwargs: Additional model-specific parameters, including:
                - extra_headers (Dict): Additional headers for this specific call.
                - extra_body (Dict): OpenRouter-specific arguments.

        Yields:
            Tuple of (text_chunk, metadata)

        Raises:
            Exception: If API call fails
        """
        if not self.client:
            initialized = await self._initialize_client()
            if not initialized:
                raise RuntimeError(f"{self.provider_name} provider not initialized.")

        model = model or self.default_model
        if model.startswith(f"{self.provider_name}:"):
            model = model.split(":", 1)[1]
        # DO NOT strip other provider prefixes as they're needed for OpenRouter routing

        messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]

        params = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "stream": True,
        }
        if max_tokens is not None:
            params["max_tokens"] = max_tokens

        extra_headers = kwargs.pop("extra_headers", {})
        extra_body = kwargs.pop("extra_body", {})

        json_mode = kwargs.pop("json_mode", False)
        if json_mode:
            # OpenRouter uses OpenAI-compatible API
            params["response_format"] = {"type": "json_object"}
            self.logger.debug("Setting response_format to JSON mode for OpenRouter streaming")

        params.update(kwargs)

        self.logger.info(
            f"Generating streaming completion with {self.provider_name} model {model}",
            emoji_key=self.provider_name,
            prompt_length=len(prompt),
            json_mode_requested=json_mode
        )

        start_time = time.time()
        total_chunks = 0
        final_model_name = model # Store initially requested model

        try:
            stream = await self.client.chat.completions.create(**params, extra_headers=extra_headers, extra_body=extra_body)

            async for chunk in stream:
                total_chunks += 1
                delta = chunk.choices[0].delta
                content = delta.content or ""

                # Try to get model name from the chunk if available (some providers include it)
                if chunk.model:
                    final_model_name = chunk.model

                metadata = {
                    "model": final_model_name,
                    "provider": self.provider_name,
                    "chunk_index": total_chunks,
                    "finish_reason": chunk.choices[0].finish_reason,
                }

                yield content, metadata

            processing_time = time.time() - start_time
            self.logger.success(
                f"{self.provider_name} streaming completion successful",
                emoji_key="success",
                model=final_model_name,
                chunks=total_chunks,
                time=processing_time
            )

        except Exception as e:
            self.logger.error(
                f"{self.provider_name} streaming completion failed for model {model}: {str(e)}",
                emoji_key="error",
                model=model
            )
            raise

    async def list_models(self) -> List[Dict[str, Any]]:
        """List available OpenRouter models (provides examples, not exhaustive).

        OpenRouter offers a vast number of models. This list provides common examples.
        Refer to OpenRouter documentation for the full list.

        Returns:
            List of example model information dictionaries
        """
        if self.available_models:
            return self.available_models
        models = self._get_fallback_models()
        return models

    def get_default_model(self) -> str:
        """Get the default OpenRouter model.

        Returns:
            Default model name (e.g., "openai/gpt-4.1-mini")
        """
        # Allow override via environment variable
        default_model_env = os.environ.get("OPENROUTER_DEFAULT_MODEL")
        if default_model_env:
            return default_model_env

        # Fallback to constants
        return DEFAULT_MODELS.get(self.provider_name, "openai/gpt-4.1-mini")

    async def check_api_key(self) -> bool:
        """Check if the OpenRouter API key is valid by attempting a small request."""
        if not self.client:
            # Try to initialize if not already done
            if not await self._initialize_client():
                return False # Initialization failed

        try:
            # Attempt a simple, low-cost operation, e.g., list models (even if it returns 404/permission error, it validates the key/URL)
            # Or use a very small completion request
            await self.client.chat.completions.create(
                model=self.get_default_model(),
                messages=[{"role": "user", "content": "test"}],
                max_tokens=1,
                temperature=0
            )
            return True
        except Exception as e:
            logger.warning(f"API key check failed for {self.provider_name}: {str(e)}", emoji_key="warning")
            return False

    def get_available_models(self) -> List[str]:
        """Return a list of available model names."""
        return [model["id"] for model in self.available_models]

    def is_model_available(self, model_name: str) -> bool:
        """Check if a specific model is available."""
        available_model_ids = [model["id"] for model in self.available_models]
        return model_name in available_model_ids

    async def create_completion(self, model: str, messages: List[Dict[str, str]], stream: bool = False, **kwargs) -> Union[str, AsyncGenerator[str, None]]:
        """Create a completion using the specified model."""
        if not self.client:
            raise RuntimeError("OpenRouter client not initialized (likely missing API key).")
            
        # Check if model is available
        if not self.is_model_available(model):
            # Fallback to default if provided model isn't listed? Or raise error?
            # Let's try the default model if the requested one isn't confirmed available.
            if self.default_model and self.is_model_available(self.default_model):
                logger.warning(f"Model '{model}' not found in available list. Falling back to default '{self.default_model}'.")
                model = self.default_model
            else:
                # If even the default isn't available or set, raise error
                raise ValueError(f"Model '{model}' is not available via OpenRouter according to fetched list, and no valid default model is set.")

        merged_kwargs = {**kwargs}
        # OpenRouter uses standard OpenAI params like max_tokens, temperature, etc.
        # Ensure essential params are passed
        if 'max_tokens' not in merged_kwargs:
            merged_kwargs['max_tokens'] = get_config().providers.openrouter.max_tokens or 1024 # Use config or default

        if stream:
            logger.debug(f"Creating stream completion: Model={model}, Params={merged_kwargs}")
            return self._stream_completion_generator(model, messages, **merged_kwargs)
        else:
            logger.debug(f"Creating completion: Model={model}, Params={merged_kwargs}")
            try:
                response = await self.client.chat.completions.create(
                    model=model,
                    messages=messages,
                    stream=False,
                    **merged_kwargs
                )
                # Extract content based on OpenAI library version
                if hasattr(response, 'choices') and response.choices:
                    choice = response.choices[0]
                    if hasattr(choice, 'message') and hasattr(choice.message, 'content'):
                        return choice.message.content or "" # Return empty string if content is None
                    elif hasattr(choice, 'delta') and hasattr(choice.delta, 'content'): # Should not happen for stream=False but check
                        return choice.delta.content or ""
                logger.warning("Could not extract content from OpenRouter response.")
                return "" # Return empty string if no content found
            except Exception as e:
                logger.error(f"OpenRouter completion failed: {e}", exc_info=True)
                raise RuntimeError(f"OpenRouter API call failed: {e}") from e

    async def _stream_completion_generator(self, model: str, messages: List[Dict[str, str]], **kwargs) -> AsyncGenerator[str, None]:
        """Async generator for streaming completions."""
        if not self.client:
            raise RuntimeError("OpenRouter client not initialized (likely missing API key).")
        try:
            stream = await self.client.chat.completions.create(
                model=model,
                messages=messages,
                stream=True,
                **kwargs
            )
            async for chunk in stream:
                # Extract content based on OpenAI library version
                content = ""
                if hasattr(chunk, 'choices') and chunk.choices:
                     choice = chunk.choices[0]
                     if hasattr(choice, 'delta') and hasattr(choice.delta, 'content'):
                          content = choice.delta.content
                     elif hasattr(choice, 'message') and hasattr(choice.message, 'content'): # Should not happen for stream=True
                          content = choice.message.content

                if content:
                     yield content
        except Exception as e:
            logger.error(f"OpenRouter stream completion failed: {e}", exc_info=True)
            # Depending on desired behavior, either raise or yield an error message
            # yield f"Error during stream: {e}"
            raise RuntimeError(f"OpenRouter API stream failed: {e}") from e

    # --- Cost Calculation (Needs OpenRouter Specific Data) ---
    def get_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> Optional[float]:
        """Calculate the cost of a request based on OpenRouter pricing.

        Note: Requires loading detailed model pricing info, which is not
              done by default in fetch_available_models.
              This is a placeholder and needs enhancement.
        """
        # Placeholder: Need to fetch and store detailed pricing from OpenRouter
        # Example structure (needs actual data):
        openrouter_pricing = {
             # "model_id": {"prompt_cost_per_mtok": X, "completion_cost_per_mtok": Y},
             "openai/gpt-4o": {"prompt_cost_per_mtok": 5.0, "completion_cost_per_mtok": 15.0},
             "google/gemini-pro-1.5": {"prompt_cost_per_mtok": 3.5, "completion_cost_per_mtok": 10.5},
             "anthropic/claude-3-opus": {"prompt_cost_per_mtok": 15.0, "completion_cost_per_mtok": 75.0},
             # ... add more model costs from openrouter.ai/docs#models ...
        }

        model_cost = openrouter_pricing.get(model)
        if model_cost:
            prompt_cost = (prompt_tokens / 1_000_000) * model_cost.get("prompt_cost_per_mtok", 0)
            completion_cost = (completion_tokens / 1_000_000) * model_cost.get("completion_cost_per_mtok", 0)
            return prompt_cost + completion_cost
        else:
            logger.warning(f"Cost calculation not available for OpenRouter model: {model}")
            # Return None if cost cannot be calculated
            return None

    # --- Prompt Formatting --- #
    def format_prompt(self, messages: List[Dict[str, str]]) -> Any:
        """Use standard list of dictionaries format for OpenRouter (like OpenAI)."""
        # OpenRouter generally uses the same format as OpenAI
        return messages

    def _get_fallback_models(self) -> List[Dict[str, Any]]:
        """Return a list of fallback models when API is not accessible."""
        return [
            {
                "id": "mistralai/mistral-large",
                "provider": self.provider_name,
                "description": "Mistral: Strong open-weight model.",
            },
            {
                "id": "mistralai/mistral-nemo",
                "provider": self.provider_name,
                "description": "Mistral: Strong open-weight model.",
            },
            {
                "id": "meta-llama/llama-3-70b-instruct",
                "provider": self.provider_name,
                "description": "Meta: Powerful open-source instruction-tuned model.",
            },
        ]

# Make available via discovery
__all__ = ["OpenRouterProvider"]
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/parsing.py:
--------------------------------------------------------------------------------

```python
"""Parsing utilities for Ultimate MCP Server.

This module provides utility functions for parsing and processing 
results from Ultimate MCP Server operations that were previously defined in
example scripts but are now part of the library.
"""

import json
import re
from typing import Any, Dict

from ultimate_mcp_server.utils import get_logger

# Initialize logger
logger = get_logger("ultimate_mcp_server.utils.parsing")

def extract_json_from_markdown(text: str) -> str:
    """Extracts a JSON string embedded within markdown code fences.

    Handles various markdown code block formats and edge cases:
    - Complete code blocks: ```json ... ``` or ``` ... ```
    - Alternative fence styles: ~~~json ... ~~~ 
    - Incomplete/truncated blocks with only opening fence
    - Multiple code blocks (chooses the first valid JSON)
    - Extensive JSON repair for common LLM output issues:
        - Unterminated strings
        - Trailing commas
        - Missing closing brackets
        - Unquoted keys
        - Truncated content

    If no valid JSON-like content is found in fences, returns the original string.

    Args:
        text: The input string possibly containing markdown-fenced JSON.

    Returns:
        The extracted JSON string or the stripped original string.
    """
    if not text:
        return ""
        
    cleaned_text = text.strip()
    possible_json_candidates = []
    
    # Try to find JSON inside complete code blocks with various fence styles
    # Look for backtick fences (most common)
    backtick_matches = re.finditer(r"```(?:json)?\s*(.*?)\s*```", cleaned_text, re.DOTALL | re.IGNORECASE)
    for match in backtick_matches:
        possible_json_candidates.append(match.group(1).strip())
    
    # Look for tilde fences (less common but valid in some markdown)
    tilde_matches = re.finditer(r"~~~(?:json)?\s*(.*?)\s*~~~", cleaned_text, re.DOTALL | re.IGNORECASE)
    for match in tilde_matches:
        possible_json_candidates.append(match.group(1).strip())
    
    # If no complete blocks found, check for blocks with only opening fence
    if not possible_json_candidates:
        # Try backtick opening fence
        backtick_start = re.search(r"```(?:json)?\s*", cleaned_text, re.IGNORECASE)
        if backtick_start:
            content_after_fence = cleaned_text[backtick_start.end():].strip()
            possible_json_candidates.append(content_after_fence)
        
        # Try tilde opening fence
        tilde_start = re.search(r"~~~(?:json)?\s*", cleaned_text, re.IGNORECASE)
        if tilde_start:
            content_after_fence = cleaned_text[tilde_start.end():].strip()
            possible_json_candidates.append(content_after_fence)
    
    # If still no candidates, add the original text as last resort
    if not possible_json_candidates:
        possible_json_candidates.append(cleaned_text)
    
    # Try each candidate, returning the first one that looks like valid JSON
    for candidate in possible_json_candidates:
        # Apply advanced JSON repair
        repaired = _repair_json(candidate)
        try:
            # Validate if it's actually parseable JSON
            json.loads(repaired)
            return repaired  # Return the first valid JSON
        except json.JSONDecodeError:
            # If repair didn't work, continue to the next candidate
            continue
    
    # If no candidate worked with regular repair, try more aggressive repair on the first candidate
    if possible_json_candidates:
        aggressive_repair = _repair_json(possible_json_candidates[0], aggressive=True)
        try:
            json.loads(aggressive_repair)
            return aggressive_repair
        except json.JSONDecodeError:
            # Return the best we can - the first candidate with basic cleaning
            # This will still fail in json.loads, but at least we tried
            return possible_json_candidates[0]
    
    # Absolute fallback - return the original text
    return cleaned_text

def _repair_json(text: str, aggressive=False) -> str:
    """
    Repair common JSON formatting issues in LLM-generated output.
    
    This internal utility function applies a series of transformations to fix common
    JSON formatting problems that frequently occur in LLM outputs. It can operate in
    two modes: standard and aggressive.
    
    In standard mode (aggressive=False), it applies basic repairs like:
    - Removing trailing commas before closing brackets/braces
    - Ensuring property names are properly quoted
    - Basic structure validation
    
    In aggressive mode (aggressive=True), it applies more extensive repairs:
    - Fixing unterminated string literals by adding missing quotes
    - Balancing unmatched brackets and braces
    - Adding missing values for dangling properties
    - Handling truncated JSON at the end of strings
    - Attempting to recover partial JSON structures
    
    The aggressive repairs are particularly useful when dealing with outputs from
    models that have been truncated mid-generation or contain structural errors
    that would normally make the JSON unparseable.
    
    Args:
        text: The JSON-like string to repair, potentially containing formatting errors
        aggressive: Whether to apply more extensive repair techniques beyond basic
                   formatting fixes. Default is False (basic repairs only).
        
    Returns:
        A repaired JSON string that is more likely to be parseable. Note that even
        with aggressive repairs, the function cannot guarantee valid JSON for
        severely corrupted inputs.
    
    Note:
        This function is intended for internal use by extract_json_from_markdown.
        While it attempts to fix common issues, it may not address all possible
        JSON formatting problems, especially in severely malformed inputs.
    """
    if not text:
        return text
        
    # Step 1: Basic cleanup
    result = text.strip()
    
    # Quick check if it even remotely looks like JSON
    if not (result.startswith('{') or result.startswith('[')):
        return result
        
    # Step 2: Fix common issues
    
    # Fix trailing commas before closing brackets
    result = re.sub(r',\s*([\}\]])', r'\1', result)
    
    # Ensure property names are quoted
    result = re.sub(r'([{,]\s*)([a-zA-Z0-9_$]+)(\s*:)', r'\1"\2"\3', result)
    
    # If we're not in aggressive mode, return after basic fixes
    if not aggressive:
        return result
        
    # Step 3: Aggressive repairs for truncated/malformed JSON
    
    # Track opening/closing brackets to detect imbalance
    open_braces = result.count('{')
    close_braces = result.count('}')
    open_brackets = result.count('[')
    close_brackets = result.count(']')
    
    # Count quotes to check if we have an odd number (indicating unterminated strings)
    quote_count = result.count('"')
    if quote_count % 2 != 0:
        # We have an odd number of quotes, meaning at least one string is unterminated
        # This is a much more aggressive approach to fix strings
        
        # First, try to find all strings that are properly terminated
        proper_strings = []
        pos = 0
        in_string = False
        string_start = 0
        
        # This helps track properly formed strings and identify problematic ones
        while pos < len(result):
            if result[pos] == '"' and (pos == 0 or result[pos-1] != '\\'):
                if not in_string:
                    # Start of a string
                    in_string = True
                    string_start = pos
                else:
                    # End of a string
                    in_string = False
                    proper_strings.append((string_start, pos))
            pos += 1
        
        # If we're still in a string at the end, we found an unterminated string
        if in_string:
            # Force terminate it at the end
            result += '"'
    
    # Even more aggressive string fixing
    # This regexp looks for a quote followed by any characters not containing a quote
    # followed by a comma, closing brace, or bracket, without a quote in between
    # This indicates an unterminated string
    result = re.sub(r'"([^"]*?)(?=,|\s*[\]}]|$)', r'"\1"', result)
    
    # Fix cases where value might be truncated mid-word just before closing quote
    # If we find something that looks like it's in the middle of a string, terminate it
    result = re.sub(r'"([^"]+)(\s*[\]}]|,|$)', lambda m: 
        f'"{m.group(1)}"{"" if m.group(2).startswith(",") or m.group(2) in "]}," else m.group(2)}', 
        result)
    
    # Fix dangling quotes at the end of the string - these usually indicate a truncated string
    if result.rstrip().endswith('"'):
        # Add closing quote and appropriate structure depending on context
        result = result.rstrip() + '"'
        
        # Look at the previous few characters to determine if we need a comma or not
        context = result[-20:] if len(result) > 20 else result
        # If string ends with x": " it's likely a property name
        if re.search(r'"\s*:\s*"$', context):
            # Add a placeholder value and closing structure for the property
            result += "unknown"
            
    # Check for dangling property (property name with colon but no value)
    result = re.sub(r'"([^"]+)"\s*:(?!\s*["{[\w-])', r'"\1": null', result)
    
    # Add missing closing brackets/braces if needed
    if open_braces > close_braces:
        result += '}' * (open_braces - close_braces)
    if open_brackets > close_brackets:
        result += ']' * (open_brackets - close_brackets)
    
    # Handle truncated JSON structure - look for incomplete objects at the end
    # This is complex, but we'll try some common patterns
    
    # If JSON ends with a property name and colon but no value
    if re.search(r'"[^"]+"\s*:\s*$', result):
        result += 'null'
    
    # If JSON ends with a comma, it needs another value - add a null
    if re.search(r',\s*$', result):
        result += 'null'
        
    # If the JSON structure is fundamentally corrupted at the end (common in truncation)
    # Close any unclosed objects or arrays
    if not (result.endswith('}') or result.endswith(']') or result.endswith('"')):
        # Count unmatched opening brackets
        stack = []
        for char in result:
            if char in '{[':
                stack.append(char)
            elif char in '}]':
                if stack and ((stack[-1] == '{' and char == '}') or (stack[-1] == '[' and char == ']')):
                    stack.pop()
                    
        # Close any unclosed structures
        for bracket in reversed(stack):
            if bracket == '{':
                result += '}'
            elif bracket == '[':
                result += ']'
    
    # As a final safeguard, try to eval the JSON with a permissive parser
    # This won't fix deep structural issues but catches cases our regexes missed
    try:
        import simplejson
        simplejson.loads(result, parse_constant=lambda x: x)
    except (ImportError, simplejson.JSONDecodeError):
        try:
            # Try one last time with the more permissive custom JSON parser
            _scan_once = json.scanner.py_make_scanner(json.JSONDecoder())
            try:
                _scan_once(result, 0)
            except StopIteration:
                # Likely unterminated JSON - do one final pass of common fixups
                
                # Check for unterminated strings of various forms one more time
                if re.search(r'(?<!")"(?:[^"\\]|\\.)*[^"\\](?!")(?=,|\s*[\]}]|$)', result):
                    # Even more aggressive fixes, replacing with generic values
                    result = re.sub(r'(?<!")"(?:[^"\\]|\\.)*[^"\\](?!")(?=,|\s*[\]}]|$)', 
                                    r'"invalid_string"', result)
                
                # Ensure valid JSON-like structure
                if not (result.endswith('}') or result.endswith(']')):
                    if result.count('{') > result.count('}'):
                        result += '}'
                    if result.count('[') > result.count(']'):
                        result += ']'
            except Exception:
                # Something else is wrong, but we've tried our best
                pass
        except Exception:
            # We've done all we reasonably can
            pass
    
    return result

async def parse_result(result: Any) -> Dict[str, Any]:
    """Parse the result from a tool call into a usable dictionary.
    
    Handles various return types from MCP tools, including TextContent objects,
    list results, and direct dictionaries. Attempts to extract JSON from
    markdown code fences if present.
    
    Args:
        result: Result from an MCP tool call or provider operation
            
    Returns:
        Parsed dictionary containing the result data
    """
    try:
        text_to_parse = None
        # Handle TextContent object (which has a .text attribute)
        if hasattr(result, 'text'):
            text_to_parse = result.text
                
        # Handle list result
        elif isinstance(result, list):
            if result:
                first_item = result[0]
                if hasattr(first_item, 'text'): 
                    text_to_parse = first_item.text
                elif isinstance(first_item, dict):
                    # NEW: Check if it's an MCP-style text content dict
                    if first_item.get("type") == "text" and "text" in first_item:
                        text_to_parse = first_item["text"]
                    else:
                        # It's some other dictionary, return it directly
                        return first_item 
                elif isinstance(first_item, str): 
                    text_to_parse = first_item
                else:
                    logger.warning(f"List item type not directly parseable: {type(first_item)}")
                    return {"error": f"List item type not directly parseable: {type(first_item)}", "original_result_type": str(type(result))}
            else: # Empty list
                return {} # Or perhaps an error/warning? For now, empty dict.
            
        # Handle dictionary directly
        elif isinstance(result, dict):
            return result

        # Handle string directly
        elif isinstance(result, str):
            text_to_parse = result

        # If text_to_parse is still None or is empty/whitespace after potential assignments
        if text_to_parse is None or not text_to_parse.strip():
            logger.warning(f"No parsable text content found in result (type: {type(result)}, content preview: \'{str(text_to_parse)[:100]}...\').")
            return {"error": "No parsable text content found in result", "result_type": str(type(result)), "content_preview": str(text_to_parse)[:100] if text_to_parse else None}

        # At this point, text_to_parse should be a non-empty string.
        # Attempt to extract JSON from markdown (if any)
        # If no markdown, or extraction fails, json_to_parse will be text_to_parse itself.
        json_to_parse = extract_json_from_markdown(text_to_parse)

        if not json_to_parse.strip(): # If extraction resulted in an empty string (e.g. from "``` ```")
            logger.warning(f"JSON extraction from text_to_parse yielded an empty string. Original text_to_parse: \'{text_to_parse[:200]}...\'")
            # Fallback to trying the original text_to_parse if extraction gave nothing useful
            # This covers cases where text_to_parse might be pure JSON without fences.
            if text_to_parse.strip(): # Ensure original text_to_parse wasn't also empty
                 json_to_parse = text_to_parse 
            else: # Should have been caught by the earlier check, but as a safeguard:
                return {"error": "Content became empty after attempting JSON extraction", "original_text_to_parse": text_to_parse}


        # Now, json_to_parse should be the best candidate string for JSON parsing.
        # Only attempt to parse if it's not empty/whitespace.
        if not json_to_parse.strip():
            logger.warning(f"Final string to parse is empty. Original text_to_parse: \'{text_to_parse[:200]}...\'")
            return {"error": "Final string for JSON parsing is empty", "original_text_to_parse": text_to_parse}

        try:
            return json.loads(json_to_parse)
        except json.JSONDecodeError as e:
            problematic_text_for_repair = json_to_parse # This is the string that failed json.loads
            logger.warning(f"Initial JSON parsing failed for: '{problematic_text_for_repair[:200]}...' Error: {e}. Attempting LLM repair...", emoji_key="warning")
            try:
                from ultimate_mcp_server.tools.completion import generate_completion
                
                system_message_content = "You are an expert JSON repair assistant. Your goal is to return only valid JSON."
                # Prepend system instruction to the main prompt for completion models
                # (as generate_completion with openai provider doesn't natively use a separate system_prompt field in its current design)
                user_repair_request = (
                    f"The following text is supposed to be valid JSON but failed parsing. "
                    f"Please correct it and return *only* the raw, valid JSON string. "
                    f"Do not include any explanations or markdown formatting. "
                    f"If it's impossible to extract or repair to valid JSON, return an empty JSON object {{}}. "
                    f"Problematic text:\n\n```text\n{problematic_text_for_repair}\n```"
                )
                combined_prompt = f"{system_message_content}\n\n{user_repair_request}"

                llm_repair_result = await generate_completion(
                    prompt=combined_prompt, # Use the combined prompt
                    provider="openai",
                    model="gpt-4.1-mini", 
                    temperature=0.0,
                    additional_params={} # Remove system_prompt from here
                )

                text_from_llm_repair = llm_repair_result.get("text", "")
                if not text_from_llm_repair.strip():
                    logger.error("LLM repair attempt returned empty string.")
                    return {"error": "LLM repair returned empty string", "original_text": problematic_text_for_repair}

                # Re-extract from LLM response, as it might add fences
                final_repaired_json_str = extract_json_from_markdown(text_from_llm_repair)
                
                if not final_repaired_json_str.strip():
                    logger.error(f"LLM repair extracted an empty JSON string from LLM response: {text_from_llm_repair[:200]}...")
                    return {"error": "LLM repair extracted empty JSON", "llm_response": text_from_llm_repair, "original_text": problematic_text_for_repair}

                try:
                    logger.debug(f"Attempting to parse LLM-repaired JSON: {final_repaired_json_str[:200]}...")
                    parsed_llm_result = json.loads(final_repaired_json_str)
                    logger.success("LLM JSON repair successful.", emoji_key="success")
                    return parsed_llm_result
                except json.JSONDecodeError as llm_e:
                    logger.error(f"LLM repair attempt failed. LLM response could not be parsed as JSON: {llm_e}. LLM response (after extraction): '{final_repaired_json_str[:200]}' Original LLM text: '{text_from_llm_repair[:500]}...'")
                    return {"error": "LLM repair failed to produce valid JSON", "detail": str(llm_e), "llm_response_extracted": final_repaired_json_str, "llm_response_raw": text_from_llm_repair, "original_text": problematic_text_for_repair}
            except Exception as repair_ex:
                logger.error(f"Exception during LLM repair process: {repair_ex}", exc_info=True)
                return {"error": "Exception during LLM repair", "detail": str(repair_ex), "original_text": problematic_text_for_repair}

    except Exception as e: # General error in parse_result
        logger.error(f"Critical error in parse_result: {e}", exc_info=True)
        return {"error": "Critical error during result parsing", "detail": str(e)}

async def process_mcp_result(result: Any) -> Dict[str, Any]:
    """
    Process and normalize results from MCP tool calls into a consistent dictionary format.
    
    This function serves as a user-friendly interface for handling and normalizing
    the various types of results that can be returned from MCP tools and provider operations.
    It acts as a bridge between the raw MCP tool outputs and downstream application code
    that expects a consistent dictionary structure.
    
    The function handles multiple return formats:
    - TextContent objects with a .text attribute
    - List results containing TextContent objects or dictionaries
    - Direct dictionary returns
    - JSON-like strings embedded in markdown code blocks
    
    Key features:
    - Automatic extraction of JSON from markdown code fences
    - JSON repair for malformed or truncated LLM outputs
    - Fallback to LLM-based repair for difficult parsing cases
    - Consistent error handling and reporting
    
    This function is especially useful in:
    - Handling results from completion tools where LLMs may return JSON in various formats
    - Processing tool responses that contain structured data embedded in text
    - Creating a consistent interface for downstream processing of MCP tool results
    - Simplifying error handling in client applications
    
    Args:
        result: The raw result from an MCP tool call or provider operation, which could
               be a TextContent object, a list, a dictionary, or another structure
            
    Returns:
        A dictionary containing either:
        - The successfully parsed result data
        - An error description with diagnostic information if parsing failed
        
    Example:
        ```python
        result = await some_mcp_tool()
        processed_data = await process_mcp_result(result)
        
        # Check for errors in the processed result
        if "error" in processed_data:
            print(f"Error processing result: {processed_data['error']}")
        else:
            # Use the normalized data
            print(f"Processed data: {processed_data}")
        ```
    """
    return await parse_result(result) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/retriever.py:
--------------------------------------------------------------------------------

```python
"""Knowledge base retriever for RAG functionality."""
import time
from typing import Any, Dict, List, Optional

from ultimate_mcp_server.services.knowledge_base.feedback import get_rag_feedback_service
from ultimate_mcp_server.services.knowledge_base.utils import build_metadata_filter
from ultimate_mcp_server.services.vector import VectorDatabaseService
from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


class KnowledgeBaseRetriever:
    """
    Advanced retrieval engine for knowledge base collections in RAG applications.
    
    The KnowledgeBaseRetriever provides sophisticated search capabilities for finding
    the most relevant documents within knowledge bases. It offers multiple retrieval
    strategies optimized for different search scenarios, from pure semantic vector
    search to hybrid approaches combining vector and keyword matching.
    
    Key Features:
    - Multiple retrieval methods (vector, hybrid, keyword)
    - Metadata filtering for targeted searches
    - Content-based filtering for keyword matching
    - Configurable similarity thresholds and relevance scoring
    - Feedback mechanisms for continuous retrieval improvement
    - Performance monitoring and diagnostics
    - Advanced parameter tuning for specialized search needs
    
    Retrieval Methods:
    1. Vector Search: Uses embeddings for semantic similarity matching
       - Best for finding conceptually related content
       - Handles paraphrasing and semantic equivalence
       - Computationally efficient for large collections
    
    2. Hybrid Search: Combines vector and keyword matching with weighted scoring
       - Balances semantic understanding with exact term matching
       - Addresses vocabulary mismatch problems
       - Provides more robust retrieval across diverse query types
    
    3. Keyword Filtering: Limits results to those containing specific text
       - Used for explicit term presence requirements
       - Can be combined with other search methods
    
    Architecture:
    The retriever operates as a higher-level service above the vector database,
    working in concert with:
    - Embedding services for query vectorization
    - Vector database services for efficient similarity search
    - Feedback services for result quality improvement
    - Metadata filters for context-aware retrieval
    
    Usage in RAG Applications:
    This retriever is a critical component in RAG pipelines, responsible for
    the quality and relevance of context provided to LLMs. Tuning retrieval
    parameters significantly impacts the quality of generated responses.
    
    Example Usage:
    ```python
    # Get retriever instance
    retriever = get_knowledge_base_retriever()
    
    # Simple vector search
    results = await retriever.retrieve(
        knowledge_base_name="company_policies",
        query="What is our remote work policy?",
        top_k=3,
        min_score=0.7
    )
    
    # Hybrid search with metadata filtering
    dept_results = await retriever.retrieve_hybrid(
        knowledge_base_name="company_policies",
        query="security requirements for customer data",
        top_k=5,
        vector_weight=0.6,
        keyword_weight=0.4,
        metadata_filter={"department": "security", "status": "active"}
    )
    
    # Process and use the retrieved documents
    for item in results["results"]:
        print(f"Document (score: {item['score']:.2f}): {item['document'][:100]}...")
        print(f"Source: {item['metadata'].get('source', 'unknown')}")
    
    # Record which documents were actually useful
    await retriever.record_feedback(
        knowledge_base_name="company_policies",
        query="What is our remote work policy?",
        retrieved_documents=results["results"],
        used_document_ids=["doc123", "doc456"]
    )
    ```
    """
    
    def __init__(self, vector_service: VectorDatabaseService):
        """Initialize the knowledge base retriever.
        
        Args:
            vector_service: Vector database service for retrieving embeddings
        """
        self.vector_service = vector_service
        self.feedback_service = get_rag_feedback_service()
        
        # Get embedding service for generating query embeddings
        from ultimate_mcp_server.services.vector.embeddings import get_embedding_service
        self.embedding_service = get_embedding_service()
        
        logger.info("Knowledge base retriever initialized", extra={"emoji_key": "success"})
    
    async def _validate_knowledge_base(self, name: str) -> Dict[str, Any]:
        """Validate that a knowledge base exists.
        
        Args:
            name: Knowledge base name
            
        Returns:
            Validation result
        """
        # Check if knowledge base exists
        collections = await self.vector_service.list_collections()
        
        if name not in collections:
            logger.warning(
                f"Knowledge base '{name}' not found", 
                extra={"emoji_key": "warning"}
            )
            return {"status": "not_found", "name": name}
        
        # Get metadata
        metadata = await self.vector_service.get_collection_metadata(name)
        
        if metadata.get("type") != "knowledge_base":
            logger.warning(
                f"Collection '{name}' is not a knowledge base", 
                extra={"emoji_key": "warning"}
            )
            return {"status": "not_knowledge_base", "name": name}
        
        return {
            "status": "valid",
            "name": name,
            "metadata": metadata
        }
    
    async def retrieve(
        self,
        knowledge_base_name: str,
        query: str,
        top_k: int = 5,
        min_score: float = 0.6,
        metadata_filter: Optional[Dict[str, Any]] = None,
        content_filter: Optional[str] = None,
        embedding_model: Optional[str] = None,
        apply_feedback: bool = True,
        search_params: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Retrieve documents from a knowledge base using vector search.
        
        Args:
            knowledge_base_name: Knowledge base name
            query: Query text
            top_k: Number of results to return
            min_score: Minimum similarity score
            metadata_filter: Optional metadata filter (field->value or field->{op:value})
            content_filter: Text to search for in documents
            embedding_model: Optional embedding model name
            apply_feedback: Whether to apply feedback adjustments
            search_params: Optional ChromaDB search parameters
            
        Returns:
            Retrieved documents with metadata
        """
        start_time = time.time()
        
        # Validate knowledge base
        kb_info = await self._validate_knowledge_base(knowledge_base_name)
        
        if kb_info["status"] != "valid":
            logger.warning(
                f"Knowledge base '{knowledge_base_name}' not found or invalid", 
                extra={"emoji_key": "warning"}
            )
            return {
                "status": "error",
                "message": f"Knowledge base '{knowledge_base_name}' not found or invalid"
            }
        
        logger.debug(f"DEBUG: Knowledge base validated - metadata: {kb_info['metadata']}")
        
        # Use the same embedding model that was used to create the knowledge base
        if not embedding_model and kb_info["metadata"].get("embedding_model"):
            embedding_model = kb_info["metadata"]["embedding_model"]
            logger.debug(f"Using embedding model from knowledge base metadata: {embedding_model}")
        
        # If embedding model is specified, ensure it's saved in the metadata for future use
        if embedding_model and not kb_info["metadata"].get("embedding_model"):
            try:
                await self.vector_service.update_collection_metadata(
                    name=knowledge_base_name,
                    metadata={
                        **kb_info["metadata"],
                        "embedding_model": embedding_model
                    }
                )
                logger.debug(f"Updated knowledge base metadata with embedding model: {embedding_model}")
            except Exception as e:
                logger.warning(f"Failed to update knowledge base metadata with embedding model: {str(e)}")
        
        # Get or create ChromaDB collection
        collection = await self.vector_service.get_collection(knowledge_base_name)
        logger.debug(f"DEBUG: Retrieved collection type: {type(collection)}")
        
        # Set search parameters if provided
        if search_params:
            await self.vector_service.update_collection_metadata(
                collection_name=knowledge_base_name,
                metadata={
                    **kb_info["metadata"],
                    **{f"hnsw:{k}": v for k, v in search_params.items()}
                }
            )
        
        # Create includes parameter
        includes = ["documents", "metadatas", "distances"]
        
        # Create where_document parameter for content filtering
        where_document = {"$contains": content_filter} if content_filter else None
        
        # Convert metadata filter format if provided
        chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None
        
        logger.debug(f"DEBUG: Search parameters - top_k: {top_k}, min_score: {min_score}, filter: {chroma_filter}, where_document: {where_document}")
        
        try:
            # Generate embedding directly with our embedding service
            # Call create_embeddings with a list and get the first result
            query_embeddings = await self.embedding_service.create_embeddings(
                texts=[query],
                # model=embedding_model # Model is set during service init
            )
            if not query_embeddings:
                logger.error(f"Failed to generate embedding for query: {query}")
                return { "status": "error", "message": "Failed to generate query embedding" }
            query_embedding = query_embeddings[0]
            
            logger.debug(f"Generated query embedding with model: {self.embedding_service.model_name}, dimension: {len(query_embedding)}")
            
            # Use correct query method based on collection type
            if hasattr(collection, 'query') and not hasattr(collection, 'search_by_text'):
                # ChromaDB collection
                logger.debug("Using ChromaDB direct query with embeddings")
                try:
                    search_results = collection.query(
                        query_embeddings=[query_embedding],  # Use our embedding directly
                        n_results=top_k * 2, 
                        where=chroma_filter,
                        where_document=where_document,
                        include=includes
                    )
                except Exception as e:
                    logger.error(f"ChromaDB query error: {str(e)}")
                    raise
            else:
                # Our custom VectorCollection
                logger.debug("Using VectorCollection search method")
                search_results = await collection.query(
                    query_texts=[query],
                    n_results=top_k * 2,
                    where=chroma_filter,
                    where_document=where_document,
                    include=includes,
                    embedding_model=embedding_model
                )
            
            # Debug raw results
            logger.debug(f"DEBUG: Raw search results - keys: {search_results.keys()}")
            logger.debug(f"DEBUG: Documents count: {len(search_results.get('documents', [[]])[0])}")
            logger.debug(f"DEBUG: IDs: {search_results.get('ids', [[]])[0]}")
            logger.debug(f"DEBUG: Distances: {search_results.get('distances', [[]])[0]}")
            
            # Process results
            results = []
            for i, doc in enumerate(search_results["documents"][0]):
                # Convert distance to similarity score (1 = exact match, 0 = completely different)
                # Most distance metrics return 0 for exact match, so we use 1 - distance
                # This works for cosine, l2, etc.
                similarity = 1.0 - float(search_results["distances"][0][i])
                
                # Debug each document
                logger.debug(f"DEBUG: Document {i} - ID: {search_results['ids'][0][i]}")
                logger.debug(f"DEBUG: Similarity: {similarity} (min required: {min_score})")
                logger.debug(f"DEBUG: Document content (first 100 chars): {doc[:100] if doc else 'Empty'}")
                
                if search_results["metadatas"] and i < len(search_results["metadatas"][0]):
                    metadata = search_results["metadatas"][0][i]
                    logger.debug(f"DEBUG: Metadata: {metadata}")
                
                # Skip results below minimum score
                if similarity < min_score:
                    logger.debug(f"DEBUG: Skipping document {i} due to low similarity: {similarity} < {min_score}")
                    continue
                
                results.append({
                    "id": search_results["ids"][0][i],
                    "document": doc,
                    "metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {},
                    "score": similarity
                })
            
            logger.debug(f"DEBUG: After filtering, {len(results)} documents remain.")
            
            # Apply feedback adjustments if requested
            if apply_feedback:
                results = await self.feedback_service.apply_feedback_adjustments(
                    knowledge_base_name=knowledge_base_name,
                    results=results,
                    query=query
                )
            
            # Limit to top_k
            results = results[:top_k]
            
            # Track retrieval time
            retrieval_time = time.time() - start_time
            
            logger.info(
                f"Retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s", 
                extra={"emoji_key": "success"}
            )
            
            return {
                "status": "success",
                "query": query,
                "results": results,
                "count": len(results),
                "retrieval_time": retrieval_time
            }
            
        except Exception as e:
            logger.error(
                f"Error retrieving from knowledge base '{knowledge_base_name}': {str(e)}", 
                extra={"emoji_key": "error"}
            )
            
            return {
                "status": "error",
                "message": str(e)
            }
    
    async def retrieve_hybrid(
        self,
        knowledge_base_name: str,
        query: str,
        top_k: int = 5,
        vector_weight: float = 0.7,
        keyword_weight: float = 0.3,
        min_score: float = 0.6,
        metadata_filter: Optional[Dict[str, Any]] = None,
        additional_keywords: Optional[List[str]] = None,
        apply_feedback: bool = True,
        search_params: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Retrieve documents using hybrid search.
        
        Args:
            knowledge_base_name: Knowledge base name
            query: Query text
            top_k: Number of documents to retrieve
            vector_weight: Weight for vector search component
            keyword_weight: Weight for keyword search component
            min_score: Minimum similarity score
            metadata_filter: Optional metadata filter
            additional_keywords: Additional keywords to include in search
            apply_feedback: Whether to apply feedback adjustments
            search_params: Optional ChromaDB search parameters
            
        Returns:
            Retrieved documents with metadata
        """
        start_time = time.time()
        
        # Validate knowledge base
        kb_info = await self._validate_knowledge_base(knowledge_base_name)
        
        if kb_info["status"] != "valid":
            logger.warning(
                f"Knowledge base '{knowledge_base_name}' not found or invalid", 
                extra={"emoji_key": "warning"}
            )
            return {
                "status": "error",
                "message": f"Knowledge base '{knowledge_base_name}' not found or invalid"
            }
        
        # Get or create ChromaDB collection
        collection = await self.vector_service.get_collection(knowledge_base_name)
        
        # Set search parameters if provided
        if search_params:
            await self.vector_service.update_collection_metadata(
                collection_name=knowledge_base_name,
                metadata={
                    **kb_info["metadata"],
                    **{f"hnsw:{k}": v for k, v in search_params.items()}
                }
            )
        
        # Convert metadata filter format if provided
        chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None
        
        # Create content filter based on query and additional keywords
        content_text = query
        if additional_keywords:
            content_text = f"{query} {' '.join(additional_keywords)}"
        
        # Use ChromaDB's hybrid search by providing both query text and content filter
        try:
            # Vector search results with content filter
            search_results = await collection.query(
                query_texts=[query],
                n_results=top_k * 3,  # Get more results for combining
                where=chroma_filter,
                where_document={"$contains": content_text} if content_text else None,
                include=["documents", "metadatas", "distances"],
                embedding_model=None  # Use default embedding model
            )
            
            # Process results
            combined_results = {}
            
            # Process vector search results
            for i, doc in enumerate(search_results["documents"][0]):
                doc_id = search_results["ids"][0][i]
                vector_score = 1.0 - float(search_results["distances"][0][i])
                
                combined_results[doc_id] = {
                    "id": doc_id,
                    "document": doc,
                    "metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {},
                    "vector_score": vector_score,
                    "keyword_score": 0.0,
                    "score": vector_score * vector_weight
                }
            
            # Now do a keyword-only search if we have keywords component
            if keyword_weight > 0:
                keyword_results = await collection.query(
                    query_texts=None,  # No vector query
                    n_results=top_k * 3,
                    where=chroma_filter,
                    where_document={"$contains": content_text},
                    include=["documents", "metadatas"],
                    embedding_model=None  # No embedding model needed for keyword-only search
                )
                
                # Process keyword results
                for i, doc in enumerate(keyword_results["documents"][0]):
                    doc_id = keyword_results["ids"][0][i]
                    # Approximate keyword score based on position (best = 1.0)
                    keyword_score = 1.0 - (i / len(keyword_results["documents"][0]))
                    
                    if doc_id in combined_results:
                        # Update existing result
                        combined_results[doc_id]["keyword_score"] = keyword_score
                        combined_results[doc_id]["score"] += keyword_score * keyword_weight
                    else:
                        # Add new result
                        combined_results[doc_id] = {
                            "id": doc_id,
                            "document": doc,
                            "metadata": keyword_results["metadatas"][0][i] if keyword_results["metadatas"] else {},
                            "vector_score": 0.0,
                            "keyword_score": keyword_score,
                            "score": keyword_score * keyword_weight
                        }
            
            # Convert to list and filter by min_score
            results = [r for r in combined_results.values() if r["score"] >= min_score]
            
            # Apply feedback adjustments if requested
            if apply_feedback:
                results = await self.feedback_service.apply_feedback_adjustments(
                    knowledge_base_name=knowledge_base_name,
                    results=results,
                    query=query
                )
            
            # Sort by score and limit to top_k
            results.sort(key=lambda x: x["score"], reverse=True)
            results = results[:top_k]
            
            # Track retrieval time
            retrieval_time = time.time() - start_time
            
            logger.info(
                f"Hybrid search retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s", 
                extra={"emoji_key": "success"}
            )
            
            return {
                "status": "success",
                "query": query,
                "results": results,
                "count": len(results),
                "retrieval_time": retrieval_time
            }
            
        except Exception as e:
            logger.error(
                f"Error performing hybrid search on '{knowledge_base_name}': {str(e)}", 
                extra={"emoji_key": "error"}
            )
            
            return {
                "status": "error",
                "message": str(e)
            }
    
    async def record_feedback(
        self,
        knowledge_base_name: str,
        query: str,
        retrieved_documents: List[Dict[str, Any]],
        used_document_ids: Optional[List[str]] = None,
        explicit_feedback: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Record feedback for retrieval results.
        
        Args:
            knowledge_base_name: Knowledge base name
            query: Query text
            retrieved_documents: List of retrieved documents
            used_document_ids: List of document IDs used in the response
            explicit_feedback: Explicit feedback for documents
            
        Returns:
            Feedback recording result
        """
        # Convert list to set if provided
        used_ids_set = set(used_document_ids) if used_document_ids else None
        
        # Record feedback
        result = await self.feedback_service.record_retrieval_feedback(
            knowledge_base_name=knowledge_base_name,
            query=query,
            retrieved_documents=retrieved_documents,
            used_document_ids=used_ids_set,
            explicit_feedback=explicit_feedback
        )
        
        return result 
```
Page 6/35FirstPrevNextLast