This is page 6 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/redline-compiled.css:
--------------------------------------------------------------------------------
```css
*,:after,:before{--tw-border-spacing-x:0;--tw-border-spacing-y:0;--tw-translate-x:0;--tw-translate-y:0;--tw-rotate:0;--tw-skew-x:0;--tw-skew-y:0;--tw-scale-x:1;--tw-scale-y:1;--tw-pan-x: ;--tw-pan-y: ;--tw-pinch-zoom: ;--tw-scroll-snap-strictness:proximity;--tw-gradient-from-position: ;--tw-gradient-via-position: ;--tw-gradient-to-position: ;--tw-ordinal: ;--tw-slashed-zero: ;--tw-numeric-figure: ;--tw-numeric-spacing: ;--tw-numeric-fraction: ;--tw-ring-inset: ;--tw-ring-offset-width:0px;--tw-ring-offset-color:#fff;--tw-ring-color:rgba(59,130,246,.5);--tw-ring-offset-shadow:0 0 #0000;--tw-ring-shadow:0 0 #0000;--tw-shadow:0 0 #0000;--tw-shadow-colored:0 0 #0000;--tw-blur: ;--tw-brightness: ;--tw-contrast: ;--tw-grayscale: ;--tw-hue-rotate: ;--tw-invert: ;--tw-saturate: ;--tw-sepia: ;--tw-drop-shadow: ;--tw-backdrop-blur: ;--tw-backdrop-brightness: ;--tw-backdrop-contrast: ;--tw-backdrop-grayscale: ;--tw-backdrop-hue-rotate: ;--tw-backdrop-invert: ;--tw-backdrop-opacity: ;--tw-backdrop-saturate: ;--tw-backdrop-sepia: ;--tw-contain-size: ;--tw-contain-layout: ;--tw-contain-paint: ;--tw-contain-style: }::backdrop{--tw-border-spacing-x:0;--tw-border-spacing-y:0;--tw-translate-x:0;--tw-translate-y:0;--tw-rotate:0;--tw-skew-x:0;--tw-skew-y:0;--tw-scale-x:1;--tw-scale-y:1;--tw-pan-x: ;--tw-pan-y: ;--tw-pinch-zoom: ;--tw-scroll-snap-strictness:proximity;--tw-gradient-from-position: ;--tw-gradient-via-position: ;--tw-gradient-to-position: ;--tw-ordinal: ;--tw-slashed-zero: ;--tw-numeric-figure: ;--tw-numeric-spacing: ;--tw-numeric-fraction: ;--tw-ring-inset: ;--tw-ring-offset-width:0px;--tw-ring-offset-color:#fff;--tw-ring-color:rgba(59,130,246,.5);--tw-ring-offset-shadow:0 0 #0000;--tw-ring-shadow:0 0 #0000;--tw-shadow:0 0 #0000;--tw-shadow-colored:0 0 #0000;--tw-blur: ;--tw-brightness: ;--tw-contrast: ;--tw-grayscale: ;--tw-hue-rotate: ;--tw-invert: ;--tw-saturate: ;--tw-sepia: ;--tw-drop-shadow: ;--tw-backdrop-blur: ;--tw-backdrop-brightness: ;--tw-backdrop-contrast: ;--tw-backdrop-grayscale: ;--tw-backdrop-hue-rotate: ;--tw-backdrop-invert: ;--tw-backdrop-opacity: ;--tw-backdrop-saturate: ;--tw-backdrop-sepia: ;--tw-contain-size: ;--tw-contain-layout: ;--tw-contain-paint: ;--tw-contain-style: }
/*! tailwindcss v3.4.17 | MIT License | https://tailwindcss.com*/*,:after,:before{box-sizing:border-box;border:0 solid #e5e7eb}:after,:before{--tw-content:""}:host,html{line-height:1.5;-webkit-text-size-adjust:100%;-moz-tab-size:4;-o-tab-size:4;tab-size:4;font-family:ui-sans-serif,system-ui,sans-serif,Apple Color Emoji,Segoe UI Emoji,Segoe UI Symbol,Noto Color Emoji;font-feature-settings:normal;font-variation-settings:normal;-webkit-tap-highlight-color:transparent}body{margin:0;line-height:inherit}hr{height:0;color:inherit;border-top-width:1px}abbr:where([title]){-webkit-text-decoration:underline dotted;text-decoration:underline dotted}h1,h2,h3,h4,h5,h6{font-size:inherit;font-weight:inherit}a{color:inherit;text-decoration:inherit}b,strong{font-weight:bolder}code,kbd,pre,samp{font-family:ui-monospace,SFMono-Regular,Menlo,Monaco,Consolas,Liberation Mono,Courier New,monospace;font-feature-settings:normal;font-variation-settings:normal;font-size:1em}small{font-size:80%}sub,sup{font-size:75%;line-height:0;position:relative;vertical-align:baseline}sub{bottom:-.25em}sup{top:-.5em}table{text-indent:0;border-color:inherit;border-collapse:collapse}button,input,optgroup,select,textarea{font-family:inherit;font-feature-settings:inherit;font-variation-settings:inherit;font-size:100%;font-weight:inherit;line-height:inherit;letter-spacing:inherit;color:inherit;margin:0;padding:0}button,select{text-transform:none}button,input:where([type=button]),input:where([type=reset]),input:where([type=submit]){-webkit-appearance:button;background-color:transparent;background-image:none}:-moz-focusring{outline:auto}:-moz-ui-invalid{box-shadow:none}progress{vertical-align:baseline}::-webkit-inner-spin-button,::-webkit-outer-spin-button{height:auto}[type=search]{-webkit-appearance:textfield;outline-offset:-2px}::-webkit-search-decoration{-webkit-appearance:none}::-webkit-file-upload-button{-webkit-appearance:button;font:inherit}summary{display:list-item}blockquote,dd,dl,figure,h1,h2,h3,h4,h5,h6,hr,p,pre{margin:0}fieldset{margin:0}fieldset,legend{padding:0}menu,ol,ul{list-style:none;margin:0;padding:0}dialog{padding:0}textarea{resize:vertical}input::-moz-placeholder,textarea::-moz-placeholder{opacity:1;color:#9ca3af}input::placeholder,textarea::placeholder{opacity:1;color:#9ca3af}[role=button],button{cursor:pointer}:disabled{cursor:default}audio,canvas,embed,iframe,img,object,svg,video{display:block;vertical-align:middle}img,video{max-width:100%;height:auto}[hidden]:where(:not([hidden=until-found])){display:none}.\!container{width:100%!important}.container{width:100%}@media (min-width:640px){.\!container{max-width:640px!important}.container{max-width:640px}}@media (min-width:768px){.\!container{max-width:768px!important}.container{max-width:768px}}@media (min-width:1024px){.\!container{max-width:1024px!important}.container{max-width:1024px}}@media (min-width:1280px){.\!container{max-width:1280px!important}.container{max-width:1280px}}@media (min-width:1536px){.\!container{max-width:1536px!important}.container{max-width:1536px}}.diff-insert,ins.diff-insert,ins.diff-insert-text{--tw-bg-opacity:1;background-color:rgb(239 246 255/var(--tw-bg-opacity,1));--tw-text-opacity:1;color:rgb(30 64 175/var(--tw-text-opacity,1));text-decoration-line:none;--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(1px + var(--tw-ring-offset-width)) var(--tw-ring-color);--tw-ring-inset:inset;--tw-ring-color:rgba(147,197,253,.6)}.diff-delete,.diff-insert,del.diff-delete,del.diff-delete-text,ins.diff-insert,ins.diff-insert-text{border-radius:.125rem;padding-left:.125rem;padding-right:.125rem;box-shadow:var(--tw-ring-offset-shadow),var(--tw-ring-shadow),var(--tw-shadow,0 0 #0000)}.diff-delete,del.diff-delete,del.diff-delete-text{--tw-bg-opacity:1;background-color:rgb(255 241 242/var(--tw-bg-opacity,1));--tw-text-opacity:1;color:rgb(159 18 57/var(--tw-text-opacity,1));text-decoration-line:line-through;--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(1px + var(--tw-ring-offset-width)) var(--tw-ring-color);--tw-ring-inset:inset;--tw-ring-color:rgba(253,164,175,.6)}.diff-move-target,ins.diff-move-target{border-radius:.125rem;border-width:1px;--tw-border-opacity:1;border-color:rgb(110 231 183/var(--tw-border-opacity,1));--tw-bg-opacity:1;background-color:rgb(236 253 245/var(--tw-bg-opacity,1));padding-left:.125rem;padding-right:.125rem;--tw-text-opacity:1;color:rgb(6 78 59/var(--tw-text-opacity,1));--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(1px + var(--tw-ring-offset-width)) var(--tw-ring-color);box-shadow:var(--tw-ring-offset-shadow),var(--tw-ring-shadow),var(--tw-shadow,0 0 #0000);--tw-ring-color:rgba(52,211,153,.6)}.diff-move-source,del.diff-move-source{border-radius:.125rem;border:1px dashed rgba(52,211,153,.4);background-color:rgba(236,253,245,.5);padding-left:.125rem;padding-right:.125rem;color:rgba(6,95,70,.6);text-decoration-line:line-through}span.diff-attrib-change{border-bottom-width:1px;border-style:dotted;border-color:rgb(251 146 60/var(--tw-border-opacity,1));background-color:rgba(255,247,237,.4)}span.diff-attrib-change,span.diff-rename-node{--tw-border-opacity:1;padding-left:.125rem;padding-right:.125rem}span.diff-rename-node{border-bottom-width:2px;border-style:dotted;border-color:rgb(192 132 252/var(--tw-border-opacity,1));background-color:rgba(245,243,255,.4)}.visible{visibility:visible}.collapse{visibility:collapse}.static{position:static}.fixed{position:fixed}.absolute{position:absolute}.relative{position:relative}.bottom-10{bottom:2.5rem}.bottom-2{bottom:.5rem}.left-2{left:.5rem}.right-1{right:.25rem}.right-2{right:.5rem}.top-10{top:2.5rem}.top-2{top:.5rem}.isolate{isolation:isolate}.z-40{z-index:40}.z-50{z-index:50}.mx-auto{margin-left:auto;margin-right:auto}.ml-1{margin-left:.25rem}.ml-2{margin-left:.5rem}.mr-1{margin-right:.25rem}.block{display:block}.inline-block{display:inline-block}.inline{display:inline}.flex{display:flex}.\!table{display:table!important}.table{display:table}.grid{display:grid}.contents{display:contents}.hidden{display:none}.h-0\.5{height:.125rem}.h-3{height:.75rem}.h-4{height:1rem}.w-1{width:.25rem}.w-3{width:.75rem}.w-4{width:1rem}.max-w-3xl{max-width:48rem}.shrink{flex-shrink:1}.grow{flex-grow:1}.border-collapse{border-collapse:collapse}.transform{transform:translate(var(--tw-translate-x),var(--tw-translate-y)) rotate(var(--tw-rotate)) skewX(var(--tw-skew-x)) skewY(var(--tw-skew-y)) scaleX(var(--tw-scale-x)) scaleY(var(--tw-scale-y))}.cursor-pointer{cursor:pointer}.resize{resize:both}.flex-col{flex-direction:column}.flex-wrap{flex-wrap:wrap}.items-center{align-items:center}.gap-2{gap:.5rem}.truncate{overflow:hidden;text-overflow:ellipsis;white-space:nowrap}.rounded{border-radius:.25rem}.rounded-lg{border-radius:.5rem}.rounded-sm{border-radius:.125rem}.border{border-width:1px}.border-b{border-bottom-width:1px}.border-b-2{border-bottom-width:2px}.border-dashed{border-style:dashed}.border-dotted{border-style:dotted}.border-emerald-300{--tw-border-opacity:1;border-color:rgb(110 231 183/var(--tw-border-opacity,1))}.border-emerald-400{--tw-border-opacity:1;border-color:rgb(52 211 153/var(--tw-border-opacity,1))}.border-emerald-400\/40{border-color:rgba(52,211,153,.4)}.border-gray-300{--tw-border-opacity:1;border-color:rgb(209 213 219/var(--tw-border-opacity,1))}.border-orange-400{--tw-border-opacity:1;border-color:rgb(251 146 60/var(--tw-border-opacity,1))}.border-orange-500{--tw-border-opacity:1;border-color:rgb(249 115 22/var(--tw-border-opacity,1))}.border-purple-400{--tw-border-opacity:1;border-color:rgb(192 132 252/var(--tw-border-opacity,1))}.border-purple-500{--tw-border-opacity:1;border-color:rgb(168 85 247/var(--tw-border-opacity,1))}.bg-blue-100{--tw-bg-opacity:1;background-color:rgb(219 234 254/var(--tw-bg-opacity,1))}.bg-blue-50{--tw-bg-opacity:1;background-color:rgb(239 246 255/var(--tw-bg-opacity,1))}.bg-blue-500{--tw-bg-opacity:1;background-color:rgb(59 130 246/var(--tw-bg-opacity,1))}.bg-blue-900\/40{background-color:rgba(30,58,138,.4)}.bg-emerald-100{--tw-bg-opacity:1;background-color:rgb(209 250 229/var(--tw-bg-opacity,1))}.bg-emerald-100\/70{background-color:rgba(209,250,229,.7)}.bg-emerald-50{--tw-bg-opacity:1;background-color:rgb(236 253 245/var(--tw-bg-opacity,1))}.bg-emerald-50\/50{background-color:rgba(236,253,245,.5)}.bg-emerald-500{--tw-bg-opacity:1;background-color:rgb(16 185 129/var(--tw-bg-opacity,1))}.bg-emerald-900\/30{background-color:rgba(6,78,59,.3)}.bg-emerald-900\/40{background-color:rgba(6,78,59,.4)}.bg-gray-100{--tw-bg-opacity:1;background-color:rgb(243 244 246/var(--tw-bg-opacity,1))}.bg-gray-200{--tw-bg-opacity:1;background-color:rgb(229 231 235/var(--tw-bg-opacity,1))}.bg-orange-50{--tw-bg-opacity:1;background-color:rgb(255 247 237/var(--tw-bg-opacity,1))}.bg-orange-50\/40{background-color:rgba(255,247,237,.4)}.bg-rose-100{--tw-bg-opacity:1;background-color:rgb(255 228 230/var(--tw-bg-opacity,1))}.bg-rose-50{--tw-bg-opacity:1;background-color:rgb(255 241 242/var(--tw-bg-opacity,1))}.bg-rose-500{--tw-bg-opacity:1;background-color:rgb(244 63 94/var(--tw-bg-opacity,1))}.bg-rose-900\/40{background-color:rgba(136,19,55,.4)}.bg-transparent{background-color:transparent}.bg-violet-50\/40{background-color:rgba(245,243,255,.4)}.bg-white{--tw-bg-opacity:1;background-color:rgb(255 255 255/var(--tw-bg-opacity,1))}.bg-white\/90{background-color:hsla(0,0%,100%,.9)}.p-2{padding:.5rem}.px-0\.5{padding-left:.125rem;padding-right:.125rem}.px-1{padding-left:.25rem;padding-right:.25rem}.px-2{padding-left:.5rem;padding-right:.5rem}.px-4{padding-left:1rem;padding-right:1rem}.py-1{padding-top:.25rem;padding-bottom:.25rem}.py-8{padding-top:2rem;padding-bottom:2rem}.font-\[\'Newsreader\'\]{font-family:Newsreader}.text-\[0\.95em\]{font-size:.95em}.text-xs{font-size:.75rem;line-height:1rem}.uppercase{text-transform:uppercase}.lowercase{text-transform:lowercase}.capitalize{text-transform:capitalize}.italic{font-style:italic}.ordinal{--tw-ordinal:ordinal;font-variant-numeric:var(--tw-ordinal) var(--tw-slashed-zero) var(--tw-numeric-figure) var(--tw-numeric-spacing) var(--tw-numeric-fraction)}.text-black{--tw-text-opacity:1;color:rgb(0 0 0/var(--tw-text-opacity,1))}.text-blue-200{--tw-text-opacity:1;color:rgb(191 219 254/var(--tw-text-opacity,1))}.text-blue-800{--tw-text-opacity:1;color:rgb(30 64 175/var(--tw-text-opacity,1))}.text-emerald-200{--tw-text-opacity:1;color:rgb(167 243 208/var(--tw-text-opacity,1))}.text-emerald-300\/60{color:rgba(110,231,183,.6)}.text-emerald-800\/60{color:rgba(6,95,70,.6)}.text-emerald-900{--tw-text-opacity:1;color:rgb(6 78 59/var(--tw-text-opacity,1))}.text-gray-500{--tw-text-opacity:1;color:rgb(107 114 128/var(--tw-text-opacity,1))}.text-rose-200{--tw-text-opacity:1;color:rgb(254 205 211/var(--tw-text-opacity,1))}.text-rose-800{--tw-text-opacity:1;color:rgb(159 18 57/var(--tw-text-opacity,1))}.underline{text-decoration-line:underline}.overline{text-decoration-line:overline}.line-through{text-decoration-line:line-through}.no-underline{text-decoration-line:none}.decoration-2{text-decoration-thickness:2px}.underline-offset-4{text-underline-offset:4px}.shadow{--tw-shadow:0 1px 3px 0 rgba(0,0,0,.1),0 1px 2px -1px rgba(0,0,0,.1);--tw-shadow-colored:0 1px 3px 0 var(--tw-shadow-color),0 1px 2px -1px var(--tw-shadow-color)}.shadow,.shadow-lg{box-shadow:var(--tw-ring-offset-shadow,0 0 #0000),var(--tw-ring-shadow,0 0 #0000),var(--tw-shadow)}.shadow-lg{--tw-shadow:0 10px 15px -3px rgba(0,0,0,.1),0 4px 6px -4px rgba(0,0,0,.1);--tw-shadow-colored:0 10px 15px -3px var(--tw-shadow-color),0 4px 6px -4px var(--tw-shadow-color)}.outline{outline-style:solid}.ring{--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(3px + var(--tw-ring-offset-width)) var(--tw-ring-color)}.ring,.ring-0{box-shadow:var(--tw-ring-offset-shadow),var(--tw-ring-shadow),var(--tw-shadow,0 0 #0000)}.ring-0{--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(var(--tw-ring-offset-width)) var(--tw-ring-color)}.ring-1{--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(1px + var(--tw-ring-offset-width)) var(--tw-ring-color)}.ring-1,.ring-2{box-shadow:var(--tw-ring-offset-shadow),var(--tw-ring-shadow),var(--tw-shadow,0 0 #0000)}.ring-2{--tw-ring-offset-shadow:var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);--tw-ring-shadow:var(--tw-ring-inset) 0 0 0 calc(2px + var(--tw-ring-offset-width)) var(--tw-ring-color)}.ring-inset{--tw-ring-inset:inset}.ring-black\/10{--tw-ring-color:rgba(0,0,0,.1)}.ring-blue-300{--tw-ring-opacity:1;--tw-ring-color:rgb(147 197 253/var(--tw-ring-opacity,1))}.ring-blue-300\/60{--tw-ring-color:rgba(147,197,253,.6)}.ring-emerald-300{--tw-ring-opacity:1;--tw-ring-color:rgb(110 231 183/var(--tw-ring-opacity,1))}.ring-emerald-400\/60{--tw-ring-color:rgba(52,211,153,.6)}.ring-emerald-500\/30{--tw-ring-color:rgba(16,185,129,.3)}.ring-orange-300{--tw-ring-opacity:1;--tw-ring-color:rgb(253 186 116/var(--tw-ring-opacity,1))}.ring-rose-300{--tw-ring-opacity:1;--tw-ring-color:rgb(253 164 175/var(--tw-ring-opacity,1))}.ring-rose-300\/60{--tw-ring-color:rgba(253,164,175,.6)}.ring-offset-1{--tw-ring-offset-width:1px}.blur{--tw-blur:blur(8px)}.blur,.grayscale{filter:var(--tw-blur) var(--tw-brightness) var(--tw-contrast) var(--tw-grayscale) var(--tw-hue-rotate) var(--tw-invert) var(--tw-saturate) var(--tw-sepia) var(--tw-drop-shadow)}.grayscale{--tw-grayscale:grayscale(100%)}.invert{--tw-invert:invert(100%)}.filter,.invert{filter:var(--tw-blur) var(--tw-brightness) var(--tw-contrast) var(--tw-grayscale) var(--tw-hue-rotate) var(--tw-invert) var(--tw-saturate) var(--tw-sepia) var(--tw-drop-shadow)}.backdrop-blur-sm{--tw-backdrop-blur:blur(4px);-webkit-backdrop-filter:var(--tw-backdrop-blur) var(--tw-backdrop-brightness) var(--tw-backdrop-contrast) var(--tw-backdrop-grayscale) var(--tw-backdrop-hue-rotate) var(--tw-backdrop-invert) var(--tw-backdrop-opacity) var(--tw-backdrop-saturate) var(--tw-backdrop-sepia);backdrop-filter:var(--tw-backdrop-blur) var(--tw-backdrop-brightness) var(--tw-backdrop-contrast) var(--tw-backdrop-grayscale) var(--tw-backdrop-hue-rotate) var(--tw-backdrop-invert) var(--tw-backdrop-opacity) var(--tw-backdrop-saturate) var(--tw-backdrop-sepia)}.transition{transition-property:color,background-color,border-color,text-decoration-color,fill,stroke,opacity,box-shadow,transform,filter,-webkit-backdrop-filter;transition-property:color,background-color,border-color,text-decoration-color,fill,stroke,opacity,box-shadow,transform,filter,backdrop-filter;transition-property:color,background-color,border-color,text-decoration-color,fill,stroke,opacity,box-shadow,transform,filter,backdrop-filter,-webkit-backdrop-filter;transition-timing-function:cubic-bezier(.4,0,.2,1);transition-duration:.15s}.transition-all{transition-property:all;transition-timing-function:cubic-bezier(.4,0,.2,1);transition-duration:.15s}.transition-colors{transition-property:color,background-color,border-color,text-decoration-color,fill,stroke;transition-timing-function:cubic-bezier(.4,0,.2,1);transition-duration:.15s}.duration-200{transition-duration:.2s}.ease-out{transition-timing-function:cubic-bezier(0,0,.2,1)}.\[_hdr_start\:_hdr_end\]{_hdr_start:hdr end}.\[_nonce_start\:_nonce_end\]{_nonce_start:nonce end}.\[a\:a\]{a:a}.\[ctx_start\:ctx_end\]{ctx_start:ctx end}.\[current_pos\:best_split_pos\]{current_pos:best split pos}.\[end-1\:end\]{end-1:end}.\[hl_start\:hl_end\]{hl_start:hl end}.\[i1\:i2\]{i1:i2}.\[i\:i\+3\]{i:i+3}.\[i\:i\+chunk_size\]{i:i+chunk size}.\[i\:i\+len\(pattern\)\]{i:i+len(pattern)}.\[inherit_ndx\:inherit_ndx\+1\]{inherit_ndx:inherit ndx+1}.\[j1\:j2\]{j1:j2}.\[json_start\:json_end\]{json_start:json end}.\[last\:start\]{last:start}.\[last_end\:start\]{last_end:start}.\[last_section_end\:first_match_start\]{last_section_end:first match start}.\[left\:right\]{left:right}.\[line_offset\:end_line\]{line_offset:end line}.\[offset\:next_offset\]{offset:next offset}.\[offset\:offset\+limit\]{offset:offset+limit}.\[pos\:end_pos\]{pos:end pos}.\[pos\:new_pos\]{pos:new pos}.\[position\:offset\]{position:offset}.\[position\:start\]{position:start}.\[search_region_start\:end_index\]{search_region_start:end index}.\[section_content_start\:section_content_end\]{section_content_start:section content end}.\[section_start\:section_end\]{section_start:section end}.\[start\:end\]{start:end}.\[start\:start\+1\]{start:start+1}.\[start_index\:best_split_index\]{start_index:best split index}.\[start_index\:end_index\]{start_index:end index}.\[start_pos\:pos\]{start_pos:pos}.\[user\:passwd\@\]{user:passwd@}.hover\:bg-gray-200:hover{--tw-bg-opacity:1;background-color:rgb(229 231 235/var(--tw-bg-opacity,1))}.hover\:bg-gray-300:hover{--tw-bg-opacity:1;background-color:rgb(209 213 219/var(--tw-bg-opacity,1))}.hover\:text-gray-700:hover{--tw-text-opacity:1;color:rgb(55 65 81/var(--tw-text-opacity,1))}@media (min-width:640px){.sm\:inline{display:inline}.sm\:hidden{display:none}}@media (min-width:768px){.md\:flex{display:flex}}@media (prefers-color-scheme:dark){.dark\:inline{display:inline}.dark\:hidden{display:none}.dark\:bg-blue-900\/60{background-color:rgba(30,58,138,.6)}.dark\:bg-emerald-900\/60{background-color:rgba(6,78,59,.6)}.dark\:bg-gray-700{--tw-bg-opacity:1;background-color:rgb(55 65 81/var(--tw-bg-opacity,1))}.dark\:bg-gray-800{--tw-bg-opacity:1;background-color:rgb(31 41 55/var(--tw-bg-opacity,1))}.dark\:bg-gray-800\/90{background-color:rgba(31,41,55,.9)}.dark\:bg-gray-900{--tw-bg-opacity:1;background-color:rgb(17 24 39/var(--tw-bg-opacity,1))}.dark\:bg-orange-900\/60{background-color:rgba(124,45,18,.6)}.dark\:bg-rose-900\/60{background-color:rgba(136,19,55,.6)}.dark\:text-gray-200{--tw-text-opacity:1;color:rgb(229 231 235/var(--tw-text-opacity,1))}.dark\:ring-blue-700{--tw-ring-opacity:1;--tw-ring-color:rgb(29 78 216/var(--tw-ring-opacity,1))}.dark\:ring-emerald-700{--tw-ring-opacity:1;--tw-ring-color:rgb(4 120 87/var(--tw-ring-opacity,1))}.dark\:ring-orange-700{--tw-ring-opacity:1;--tw-ring-color:rgb(194 65 12/var(--tw-ring-opacity,1))}.dark\:ring-rose-700{--tw-ring-opacity:1;--tw-ring-color:rgb(190 18 60/var(--tw-ring-opacity,1))}.dark\:hover\:bg-gray-600:hover{--tw-bg-opacity:1;background-color:rgb(75 85 99/var(--tw-bg-opacity,1))}}
```
--------------------------------------------------------------------------------
/examples/ollama_integration_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""Ollama integration demonstration using Ultimate MCP Server."""
import asyncio
import sys
import time
from pathlib import Path
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))
# Import configuration system instead of using update_ollama_env
from ultimate_mcp_server.config import get_config
# Load the config to ensure environment variables from .env are read
config = get_config()
# Third-party imports
# These imports need to be below sys.path modification, which is why they have noqa comments
from rich import box # noqa: E402
from rich.markup import escape # noqa: E402
from rich.panel import Panel # noqa: E402
from rich.rule import Rule # noqa: E402
from rich.table import Table # noqa: E402
# Project imports
from ultimate_mcp_server.constants import Provider # noqa: E402
from ultimate_mcp_server.core.server import Gateway # noqa: E402
from ultimate_mcp_server.utils import get_logger # noqa: E402
from ultimate_mcp_server.utils.display import CostTracker # Import CostTracker # noqa: E402
from ultimate_mcp_server.utils.logging.console import console # noqa: E402
# Initialize logger
logger = get_logger("example.ollama_integration_demo")
async def compare_ollama_models(tracker: CostTracker):
"""Compare different Ollama models."""
console.print(Rule("[bold blue]🦙 Ollama Model Comparison[/bold blue]"))
logger.info("Starting Ollama models comparison", emoji_key="start")
# Create Gateway instance - this handles provider initialization
gateway = Gateway("ollama-demo", register_tools=False)
try:
# Initialize providers
logger.info("Initializing providers...", emoji_key="provider")
await gateway._initialize_providers()
provider_name = Provider.OLLAMA.value
try:
# Get the provider from the gateway
provider = gateway.providers.get(provider_name)
if not provider:
logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
return
logger.info(f"Using provider: {provider_name}", emoji_key="provider")
models = await provider.list_models()
model_names = [m["id"] for m in models] # Extract names from model dictionaries
console.print(f"Found {len(model_names)} Ollama models: [cyan]{escape(str(model_names))}[/cyan]")
# Select specific models to compare (adjust these based on what you have installed locally)
ollama_models = [
"mix_77/gemma3-qat-tools:27b",
"JollyLlama/GLM-Z1-32B-0414-Q4_K_M:latest",
"llama3.2-vision:latest"
]
# Filter based on available models
models_to_compare = [m for m in ollama_models if m in model_names]
if not models_to_compare:
logger.error("None of the selected models for comparison are available. Please use 'ollama pull MODEL' to download models first.", emoji_key="error")
console.print("[red]Selected models not found. Use 'ollama pull mix_77/gemma3-qat-tools:27b' to download models first.[/red]")
return
console.print(f"Comparing models: [yellow]{escape(str(models_to_compare))}[/yellow]")
prompt = """
Explain the concept of quantum entanglement in a way that a high school student would understand.
Keep your response brief and accessible.
"""
console.print(f"[cyan]Using Prompt:[/cyan] {escape(prompt.strip())[:100]}...")
results_data = []
for model_name in models_to_compare:
try:
logger.info(f"Testing model: {model_name}", emoji_key="model")
start_time = time.time()
result = await provider.generate_completion(
prompt=prompt,
model=model_name,
temperature=0.3,
max_tokens=300
)
processing_time = time.time() - start_time
# Track the cost
tracker.add_call(result)
results_data.append({
"model": model_name,
"text": result.text,
"tokens": {
"input": result.input_tokens,
"output": result.output_tokens,
"total": result.total_tokens
},
"cost": result.cost,
"time": processing_time
})
logger.success(
f"Completion for {model_name} successful",
emoji_key="success",
# Tokens/cost/time logged implicitly by storing in results_data
)
except Exception as e:
logger.error(f"Error testing model {model_name}: {str(e)}", emoji_key="error", exc_info=True)
# Optionally add an error entry to results_data if needed
# Display comparison results using Rich
if results_data:
console.print(Rule("[bold green]Comparison Results[/bold green]"))
for result_item in results_data:
model = result_item["model"]
time_s = result_item["time"]
tokens = result_item.get("tokens", {}).get("total", 0)
# If tokens is 0 but we have text, estimate based on text length
text = result_item.get("text", "").strip()
if tokens == 0 and text:
# Rough estimate: ~1.3 tokens per word plus some for punctuation
tokens = len(text.split()) + len(text) // 10
# Update the result item for cost tracking
result_item["tokens"]["output"] = tokens
result_item["tokens"]["total"] = tokens + result_item["tokens"]["input"]
tokens_per_second = tokens / time_s if time_s > 0 and tokens > 0 else 0
cost = result_item.get("cost", 0.0)
stats_line = (
f"Time: [yellow]{time_s:.2f}s[/yellow] | "
f"Tokens: [cyan]{tokens}[/cyan] | "
f"Speed: [blue]{tokens_per_second:.1f} tok/s[/blue] | "
f"Cost: [green]${cost:.6f}[/green]"
)
console.print(Panel(
escape(text),
title=f"[bold magenta]{escape(model)}[/bold magenta]",
subtitle=stats_line,
border_style="blue",
expand=False
))
console.print()
except Exception as e:
logger.error(f"Error in model comparison: {str(e)}", emoji_key="error", exc_info=True)
finally:
# Ensure resources are cleaned up
if hasattr(gateway, 'shutdown'):
await gateway.shutdown()
async def demonstrate_system_prompt(tracker: CostTracker):
"""Demonstrate Ollama with system prompts."""
console.print(Rule("[bold blue]🦙 Ollama System Prompt Demonstration[/bold blue]"))
logger.info("Demonstrating Ollama with system prompts", emoji_key="start")
# Create Gateway instance - this handles provider initialization
gateway = Gateway("ollama-demo", register_tools=False)
try:
# Initialize providers
logger.info("Initializing providers...", emoji_key="provider")
await gateway._initialize_providers()
provider_name = Provider.OLLAMA.value
try:
# Get the provider from the gateway
provider = gateway.providers.get(provider_name)
if not provider:
logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
return
# Use mix_77/gemma3-qat-tools:27b (ensure it's available)
model = "mix_77/gemma3-qat-tools:27b"
available_models = await provider.list_models()
model_names = [m["id"] for m in available_models]
if model not in model_names:
logger.warning(f"Model {model} not available, please run 'ollama pull mix_77/gemma3-qat-tools:27b'", emoji_key="warning")
console.print("[yellow]Model not found. Please run 'ollama pull mix_77/gemma3-qat-tools:27b' to download it.[/yellow]")
return
logger.info(f"Using model: {model}", emoji_key="model")
system_prompt = """
You are a helpful assistant with expertise in physics.
Keep all explanations accurate but very concise.
Always provide real-world examples to illustrate concepts.
"""
user_prompt = "Explain the concept of gravity."
logger.info("Generating completion with system prompt", emoji_key="processing")
result = await provider.generate_completion(
prompt=user_prompt,
model=model,
temperature=0.7,
system=system_prompt,
max_tokens=1000 # Increased max_tokens
)
# Track the cost
tracker.add_call(result)
logger.success("Completion with system prompt successful", emoji_key="success")
# Display result using Rich Panels
console.print(Panel(
escape(system_prompt.strip()),
title="[bold cyan]System Prompt[/bold cyan]",
border_style="dim cyan",
expand=False
))
console.print(Panel(
escape(user_prompt.strip()),
title="[bold yellow]User Prompt[/bold yellow]",
border_style="dim yellow",
expand=False
))
console.print(Panel(
escape(result.text.strip()),
title="[bold green]Ollama Response[/bold green]",
border_style="green",
expand=False
))
# Display stats in a small table
stats_table = Table(title="Execution Stats", show_header=False, box=box.MINIMAL, expand=False)
stats_table.add_column("Metric", style="cyan")
stats_table.add_column("Value", style="white")
stats_table.add_row("Input Tokens", str(result.input_tokens))
stats_table.add_row("Output Tokens", str(result.output_tokens))
stats_table.add_row("Cost", f"${result.cost:.6f}")
stats_table.add_row("Processing Time", f"{result.processing_time:.3f}s")
console.print(stats_table)
console.print()
except Exception as e:
logger.error(f"Error in system prompt demonstration: {str(e)}", emoji_key="error", exc_info=True)
# Optionally re-raise or handle differently
finally:
# Ensure resources are cleaned up
if hasattr(gateway, 'shutdown'):
await gateway.shutdown()
async def demonstrate_streaming(tracker: CostTracker):
"""Demonstrate Ollama streaming capabilities."""
console.print(Rule("[bold blue]🦙 Ollama Streaming Demonstration[/bold blue]"))
logger.info("Demonstrating Ollama streaming capabilities", emoji_key="start")
# Create Gateway instance - this handles provider initialization
gateway = Gateway("ollama-demo", register_tools=False)
try:
# Initialize providers
logger.info("Initializing providers...", emoji_key="provider")
await gateway._initialize_providers()
provider_name = Provider.OLLAMA.value
try:
# Get the provider from the gateway
provider = gateway.providers.get(provider_name)
if not provider:
logger.error(f"Provider {provider_name} not available or initialized", emoji_key="error")
return
# Use any available Ollama model
model = provider.get_default_model()
logger.info(f"Using model: {model}", emoji_key="model")
prompt = "Write a short poem about programming with AI"
console.print(Panel(
escape(prompt.strip()),
title="[bold yellow]Prompt[/bold yellow]",
border_style="dim yellow",
expand=False
))
logger.info("Generating streaming completion", emoji_key="processing")
console.print("[bold green]Streaming response:[/bold green]")
# Stream completion and display tokens as they arrive
output_text = ""
token_count = 0
start_time = time.time()
stream = provider.generate_completion_stream(
prompt=prompt,
model=model,
temperature=0.7,
max_tokens=200
)
final_metadata = None
async for chunk, metadata in stream:
# Display the streaming chunk
console.print(chunk, end="", highlight=False)
output_text += chunk
token_count += 1
final_metadata = metadata
# Newline after streaming is complete
console.print()
console.print()
processing_time = time.time() - start_time
if final_metadata:
# Track the cost at the end
if tracker:
# Create a simple object with the necessary attributes for the tracker
class StreamingCall:
def __init__(self, metadata):
self.model = metadata.get("model", "")
self.provider = metadata.get("provider", "")
self.input_tokens = metadata.get("input_tokens", 0)
self.output_tokens = metadata.get("output_tokens", 0)
self.total_tokens = metadata.get("total_tokens", 0)
self.cost = metadata.get("cost", 0.0)
self.processing_time = metadata.get("processing_time", 0.0)
tracker.add_call(StreamingCall(final_metadata))
# Display stats
stats_table = Table(title="Streaming Stats", show_header=False, box=box.MINIMAL, expand=False)
stats_table.add_column("Metric", style="cyan")
stats_table.add_column("Value", style="white")
stats_table.add_row("Input Tokens", str(final_metadata.get("input_tokens", 0)))
stats_table.add_row("Output Tokens", str(final_metadata.get("output_tokens", 0)))
stats_table.add_row("Cost", f"${final_metadata.get('cost', 0.0):.6f}")
stats_table.add_row("Processing Time", f"{processing_time:.3f}s")
stats_table.add_row("Tokens per Second", f"{final_metadata.get('output_tokens', 0) / processing_time if processing_time > 0 else 0:.1f}")
console.print(stats_table)
logger.success("Streaming demonstration completed", emoji_key="success")
except Exception as e:
logger.error(f"Error in streaming demonstration: {str(e)}", emoji_key="error", exc_info=True)
finally:
# Ensure resources are cleaned up
if hasattr(gateway, 'shutdown'):
await gateway.shutdown()
async def explore_ollama_models():
"""Display available Ollama models."""
console.print(Rule("[bold cyan]🦙 Available Ollama Models[/bold cyan]"))
# Create Gateway instance - this handles provider initialization
gateway = Gateway("ollama-demo", register_tools=False)
try:
# Initialize providers
logger.info("Initializing providers...", emoji_key="provider")
await gateway._initialize_providers()
# Get provider from the gateway
provider = gateway.providers.get(Provider.OLLAMA.value)
if not provider:
logger.error(f"Provider {Provider.OLLAMA.value} not available or initialized", emoji_key="error")
console.print("[red]Ollama provider not available. Make sure Ollama is installed and running on your machine.[/red]")
console.print("[yellow]Visit https://ollama.com/download for installation instructions.[/yellow]")
return
# Get list of available models
try:
models = await provider.list_models()
if not models:
console.print("[yellow]No Ollama models found. Use 'ollama pull MODEL' to download models.[/yellow]")
console.print("Example: [green]ollama pull mix_77/gemma3-qat-tools:27b[/green]")
return
# Create a table to display model information
table = Table(title="Local Ollama Models")
table.add_column("Model ID", style="cyan")
table.add_column("Description", style="green")
table.add_column("Size", style="yellow")
for model in models:
# Extract size from description if available
size_str = "Unknown"
description = model.get("description", "")
# Check if size information is in the description (format: "... (X.XX GB)")
import re
size_match = re.search(r'\((\d+\.\d+) GB\)', description)
if size_match:
size_gb = float(size_match.group(1))
size_str = f"{size_gb:.2f} GB"
table.add_row(
model["id"],
description,
size_str
)
console.print(table)
console.print("\n[dim]Note: To add more models, use 'ollama pull MODEL_NAME'[/dim]")
except Exception as e:
logger.error(f"Error listing Ollama models: {str(e)}", emoji_key="error")
console.print(f"[red]Failed to list Ollama models: {str(e)}[/red]")
console.print("[yellow]Make sure Ollama is installed and running on your machine.[/yellow]")
console.print("[yellow]Visit https://ollama.com/download for installation instructions.[/yellow]")
finally:
# Ensure resources are cleaned up
if hasattr(gateway, 'shutdown'):
await gateway.shutdown()
async def main():
"""Run Ollama integration examples."""
console.print(Panel(
"[bold]This demonstration shows how to use Ollama with the Ultimate MCP Server.[/bold]\n"
"Ollama allows you to run LLMs locally on your own machine without sending data to external services.\n\n"
"[yellow]Make sure you have Ollama installed and running:[/yellow]\n"
"- Download from [link]https://ollama.com/download[/link]\n"
"- Pull models with [green]ollama pull mix_77/gemma3-qat-tools:27b[/green] or similar commands",
title="[bold blue]🦙 Ollama Integration Demo[/bold blue]",
border_style="blue",
expand=False
))
tracker = CostTracker() # Instantiate tracker here
try:
# First show available models
await explore_ollama_models()
console.print() # Add space between sections
# Run model comparison
await compare_ollama_models(tracker) # Pass tracker
console.print() # Add space between sections
# Run system prompt demonstration
await demonstrate_system_prompt(tracker) # Pass tracker
console.print() # Add space between sections
# Run streaming demonstration
await demonstrate_streaming(tracker) # Pass tracker
# Display final summary
tracker.display_summary(console) # Display summary at the end
except Exception as e:
logger.critical(f"Example failed: {str(e)}", emoji_key="critical", exc_info=True)
return 1
finally:
# Clean up any remaining aiohttp resources by forcing garbage collection
import gc
gc.collect()
logger.success("Ollama Integration Demo Finished Successfully!", emoji_key="complete")
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/pyodide_boot_template.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<meta http-equiv="Referrer-Policy" content="strict-origin-when-cross-origin" />
<title>Pyodide Sandbox v__PYODIDE_VERSION__ (Full)</title>
<script>
/* SessionStorage stub */
try { if(typeof window!=='undefined' && typeof window.sessionStorage!=='undefined'){void window.sessionStorage;}else{throw new Error();} }
catch(_){ if(typeof window!=='undefined'){Object.defineProperty(window, "sessionStorage", {value:{getItem:()=>null,setItem:()=>{},removeItem:()=>{},clear:()=>{},key:()=>null,length:0},configurable:true,writable:false});console.warn("sessionStorage stubbed.");} }
</script>
<style>
/* Basic styles */
body { font-family: system-ui, sans-serif; margin: 15px; background-color: #f8f9fa; color: #212529; }
.status { padding: 10px 15px; border: 1px solid #dee2e6; margin-bottom: 15px; border-radius: 0.25rem; font-size: 0.9em; }
.status-loading { background-color: #e9ecef; border-color: #ced4da; color: #495057; }
.status-ready { background-color: #d1e7dd; border-color: #a3cfbb; color: #0a3622;}
.status-error { background-color: #f8d7da; border-color: #f5c2c7; color: #842029; font-weight: bold; }
</style>
</head>
<body>
<div id="status-indicator" class="status status-loading">Initializing Sandbox...</div>
<script type="module">
// --- Constants ---
const CDN_BASE = "__CDN_BASE__";
const PYODIDE_VERSION = "__PYODIDE_VERSION__";
// *** Use the constant for packages loaded AT STARTUP ***
const CORE_PACKAGES_JSON = '__CORE_PACKAGES_JSON__';
const MEM_LIMIT_MB = parseInt("__MEM_LIMIT_MB__", 10);
const logPrefix = "[PyodideFull]"; // Updated prefix
// --- DOM Element ---
const statusIndicator = document.getElementById('status-indicator');
// --- Performance & Heap Helpers ---
const perf = (typeof performance !== 'undefined') ? performance : { now: () => Date.now() };
const now = () => perf.now();
const heapMB = () => (performance?.memory?.usedJSHeapSize ?? 0) / 1048576;
// --- Safe Proxy Destruction Helper ---
function safeDestroy(proxy, name, handlerLogPrefix) {
try {
if (proxy && typeof proxy === 'object' && typeof proxy.destroy === 'function') {
const proxyId = proxy.toString ? proxy.toString() : '(proxy)';
console.log(`${handlerLogPrefix} Destroying ${name} proxy: ${proxyId}`);
proxy.destroy();
console.log(`${handlerLogPrefix} ${name} proxy destroyed.`);
return true;
}
} catch (e) { console.warn(`${handlerLogPrefix} Error destroying ${name}:`, e?.message || e); }
return false;
}
// --- Python Runner Code Template (Reads code from its own global scope) ---
// This should be the same simple runner that worked before
const pythonRunnerTemplate = `
import sys, io, contextlib, traceback, time, base64, json
print(f"[PyRunnerFull] Starting execution...")
_stdout = io.StringIO(); _stderr = io.StringIO()
result_value = None; error_info = None; execution_ok = False; elapsed_ms = 0.0; user_code = None
try:
# Get code from the global scope *this script is run in*
print("[PyRunnerFull] Getting code from _USER_CODE_TO_EXEC...")
user_code = globals().get("_USER_CODE_TO_EXEC") # Read code set by JS onto the *passed* globals proxy
if user_code is None: raise ValueError("_USER_CODE_TO_EXEC global not found in execution scope.")
print(f"[PyRunnerFull] Code retrieved ({len(user_code)} chars). Ready to execute.")
start_time = time.time()
print(f"[PyRunnerFull] Executing user code...")
try:
with contextlib.redirect_stdout(_stdout), contextlib.redirect_stderr(_stderr):
compiled_code = compile(source=user_code, filename='<user_code>', mode='exec')
exec(compiled_code, globals()) # Execute in the provided globals
if 'result' in globals(): result_value = globals()['result']
execution_ok = True
print("[PyRunnerFull] User code execution finished successfully.")
except Exception as e:
exc_type=type(e).__name__; exc_msg=str(e); tb_str=traceback.format_exc()
error_info = {'type':exc_type, 'message':exc_msg, 'traceback':tb_str}
print(f"[PyRunnerFull] User code execution failed: {exc_type}: {exc_msg}\\n{tb_str}")
finally: elapsed_ms = (time.time() - start_time) * 1000 if 'start_time' in locals() else 0
print(f"[PyRunnerFull] Exec phase took: {elapsed_ms:.1f}ms")
except Exception as outer_err:
tb_str = traceback.format_exc(); error_info = {'type': type(outer_err).__name__, 'message': str(outer_err), 'traceback': tb_str}
print(f"[PyRunnerFull] Setup/GetCode Error: {outer_err}\\n{tb_str}")
execution_ok = False
payload_dict = {'ok':execution_ok,'stdout':_stdout.getvalue(),'stderr':_stderr.getvalue(),'elapsed':elapsed_ms,'result':result_value,'error':error_info}
print("[PyRunnerFull] Returning payload dictionary.")
payload_dict # Return value
`; // End of pythonRunnerTemplate
// --- Main Async IIFE ---
(async () => {
let BOOT_MS = 0; let t0 = 0;
let corePackagesToLoad = []; // Store parsed core packages list
try {
// === Step 1: Prepare for Load ===
t0 = now(); console.log(`${logPrefix} Boot script starting at t0=${t0}`);
statusIndicator.textContent = `Importing Pyodide v${PYODIDE_VERSION}...`;
// Parse CORE packages list BEFORE calling loadPyodide
try {
corePackagesToLoad = JSON.parse(CORE_PACKAGES_JSON);
if (!Array.isArray(corePackagesToLoad)) { corePackagesToLoad = []; }
console.log(`${logPrefix} Core packages requested for init:`, corePackagesToLoad.length > 0 ? corePackagesToLoad : '(none)');
} catch (parseErr) {
console.error(`${logPrefix} Error parsing core packages JSON:`, parseErr);
statusIndicator.textContent += ' (Error parsing core package list!)';
corePackagesToLoad = [];
}
// === Step 2: Load Pyodide (with packages option) ===
const { loadPyodide } = await import(`${CDN_BASE}/pyodide.mjs`);
console.log(`${logPrefix} Calling loadPyodide with core packages...`);
statusIndicator.textContent = `Loading Pyodide runtime & core packages...`;
window.pyodide = await loadPyodide({
indexURL: `${CDN_BASE}/`,
packages: corePackagesToLoad // Load core packages during initialization
});
const pyodide = window.pyodide;
console.log(`${logPrefix} Pyodide core and initial packages loaded. Version: ${pyodide.version}`);
statusIndicator.textContent = 'Pyodide core & packages loaded.';
// === Step 3: Verify Loaded Packages (Optional Debugging) ===
if (corePackagesToLoad.length > 0) {
const loaded = pyodide.loadedPackages ? Object.keys(pyodide.loadedPackages) : [];
console.log(`${logPrefix} Currently loaded packages:`, loaded);
corePackagesToLoad.forEach(pkg => {
if (!loaded.includes(pkg)) {
console.warn(`${logPrefix} Core package '${pkg}' requested but not loaded! Check CDN/package name.`);
statusIndicator.textContent += ` (Warn: ${pkg} failed load)`;
}
});
}
BOOT_MS = now() - t0;
console.log(`${logPrefix} Pyodide setup complete in ${BOOT_MS.toFixed(0)}ms. Heap: ${heapMB().toFixed(1)} MB`);
statusIndicator.textContent = `Pyodide Ready (${BOOT_MS.toFixed(0)}ms). Awaiting commands...`;
statusIndicator.className = 'status status-ready';
Object.freeze(corePackagesToLoad);
Object.freeze(statusIndicator); // prevents accidental re-assign
// ================== Main Message Handler ==================
console.log(`${logPrefix} Setting up main message listener...`);
window.addEventListener("message", async (ev) => {
const msg = ev.data;
if (typeof msg !== 'object' || msg === null || !msg.id) { return; }
const handlerLogPrefix = `${logPrefix}[Handler id:${msg.id}]`;
console.log(`${handlerLogPrefix} Received: type=${msg.type}`);
const wall0 = now();
const reply = { id: msg.id, ok: false, stdout:'', stderr:'', result:null, elapsed:0, wall_ms:0, error:null };
let pyResultProxy = null;
let namespaceProxy = null; // Holds the target execution scope
let micropipProxy = null;
let persistentReplProxyToDestroy = null;
try { // Outer try for message handling
if (!window.pyodide) { throw new Error('Pyodide instance lost!'); }
const pyodide = window.pyodide;
// === Handle Reset Message ===
if (msg.type === "reset") {
console.log(`${handlerLogPrefix} Reset request received.`);
try {
if (pyodide.globals.has("_MCP_REPL_NS")) {
console.log(`${handlerLogPrefix} Found _MCP_REPL_NS, attempting deletion.`);
persistentReplProxyToDestroy = pyodide.globals.get("_MCP_REPL_NS");
pyodide.globals.delete("_MCP_REPL_NS");
reply.cleared = true; console.log(`${handlerLogPrefix} _MCP_REPL_NS deleted.`);
} else {
reply.cleared = false; console.log(`${handlerLogPrefix} No _MCP_REPL_NS found.`);
}
reply.ok = true;
} catch (err) {
console.error(`${handlerLogPrefix} Error during reset operation:`, err);
reply.ok = false; reply.error = { type: err.name || 'ResetError', message: `Reset failed: ${err.message || err}`, traceback: err.stack };
} finally {
safeDestroy(persistentReplProxyToDestroy, "Persistent REPL (on reset)", handlerLogPrefix);
console.log(`${handlerLogPrefix} Delivering reset response via callback (ok=${reply.ok})`);
if(typeof window._deliverReplyToHost === 'function') { window._deliverReplyToHost(reply); }
else { console.error(`${handlerLogPrefix} Host callback _deliverReplyToHost not found!`); }
}
return; // Exit handler
} // End Reset
// === Ignore Non-Exec ===
if (msg.type !== "exec") { console.log(`${handlerLogPrefix} Ignoring non-exec type: ${msg.type}`); return; }
// ================== Handle Exec Message ==================
console.log(`${handlerLogPrefix} Processing exec request (repl=${msg.repl_mode})`);
/* === Step 1: Load *Additional* Packages/Wheels === */
// Filter out packages already loaded during init
const currentlyLoaded = pyodide.loadedPackages ? Object.keys(pyodide.loadedPackages) : [];
const additionalPackagesToLoad = msg.packages?.filter(p => !currentlyLoaded.includes(p)) || [];
if (additionalPackagesToLoad.length > 0) {
const pkgs = additionalPackagesToLoad.join(", ");
console.log(`${handlerLogPrefix} Loading additional packages: ${pkgs}`);
await pyodide.loadPackage(additionalPackagesToLoad).catch(err => {
throw new Error(`Additional package loading failed: ${pkgs} - ${err?.message || err}`);
});
console.log(`${handlerLogPrefix} Additional packages loaded: ${pkgs}`);
}
// Load wheels (ensure micropip is available)
if (msg.wheels?.length) {
const whls = msg.wheels.join(", ");
console.log(`${handlerLogPrefix} Loading wheels: ${whls}`);
// Check if micropip needs loading (it might be a core package now)
if (!pyodide.loadedPackages || !pyodide.loadedPackages['micropip']) {
console.log(`${handlerLogPrefix} Loading micropip for wheels...`);
await pyodide.loadPackage("micropip").catch(err => {
throw new Error(`Failed to load micropip: ${err?.message || err}`);
});
}
micropipProxy = pyodide.pyimport("micropip");
console.log(`${handlerLogPrefix} Installing wheels via micropip...`);
for (const whl of msg.wheels) {
console.log(`${handlerLogPrefix} Installing wheel: ${whl}`);
await micropipProxy.install(whl).catch(err => {
let pyError = ""; if (err instanceof pyodide.ffi.PythonError) pyError = `${err.type}: `;
throw new Error(`Wheel install failed for ${whl}: ${pyError}${err?.message || err}`);
});
console.log(`${handlerLogPrefix} Wheel installed: ${whl}`);
}
// Micropip proxy destroyed in finally
}
/* === Step 2: Prepare Namespace Proxy (REPL aware) === */
if (msg.repl_mode) {
if (pyodide.globals.has("_MCP_REPL_NS")) {
console.log(`${handlerLogPrefix} Reusing persistent REPL namespace.`);
namespaceProxy = pyodide.globals.get("_MCP_REPL_NS");
if (!namespaceProxy || typeof namespaceProxy.set !== 'function') {
console.warn(`${handlerLogPrefix} REPL namespace invalid. Resetting.`);
safeDestroy(namespaceProxy, "Invalid REPL", handlerLogPrefix);
namespaceProxy = pyodide.toPy({'__name__': '__main__'});
pyodide.globals.set("_MCP_REPL_NS", namespaceProxy);
}
} else {
console.log(`${handlerLogPrefix} Initializing new persistent REPL namespace.`);
namespaceProxy = pyodide.toPy({'__name__': '__main__'});
pyodide.globals.set("_MCP_REPL_NS", namespaceProxy);
}
} else {
console.log(`${handlerLogPrefix} Creating fresh temporary namespace.`);
namespaceProxy = pyodide.toPy({'__name__': '__main__'});
}
if (!namespaceProxy || typeof namespaceProxy.set !== 'function') { // Final check
throw new Error("Failed to obtain valid namespace proxy.");
}
/* === Step 3: Prepare and Set User Code INTO Namespace Proxy === */
let userCode = '';
try {
if (typeof msg.code_b64 !== 'string' || msg.code_b64 === '') throw new Error("Missing/empty code_b64");
userCode = atob(msg.code_b64); // JS base64 decode
} catch (decodeErr) { throw new Error(`Base64 decode failed: ${decodeErr.message}`); }
console.log(`${handlerLogPrefix} Setting _USER_CODE_TO_EXEC on target namespace proxy...`);
namespaceProxy.set("_USER_CODE_TO_EXEC", userCode); // Set ON THE TARGET PROXY
/* === Step 4: Execute Runner === */
console.log(`${handlerLogPrefix} Executing Python runner...`);
// Pass the namespaceProxy (which now contains the code) as globals
pyResultProxy = await pyodide.runPythonAsync(pythonRunnerTemplate, { globals: namespaceProxy });
// Cleanup the code variable from the namespaceProxy afterwards
console.log(`${handlerLogPrefix} Deleting _USER_CODE_TO_EXEC from namespace proxy...`);
if (namespaceProxy.has && namespaceProxy.has("_USER_CODE_TO_EXEC")) { // Check if method exists
namespaceProxy.delete("_USER_CODE_TO_EXEC");
} else { console.warn(`${handlerLogPrefix} Could not check/delete _USER_CODE_TO_EXEC from namespace.`); }
reply.wall_ms = now() - wall0;
console.log(`${handlerLogPrefix} Python runner finished. Wall: ${reply.wall_ms.toFixed(0)}ms`);
/* === Step 5: Process Result Proxy === */
if (!pyResultProxy || typeof pyResultProxy.toJs !== 'function') { throw new Error(`Runner returned invalid result.`); }
console.log(`${handlerLogPrefix} Converting Python result payload...`);
let jsResultPayload = pyResultProxy.toJs({ dict_converter: Object.fromEntries });
Object.assign(reply, jsResultPayload); // Merge python results
reply.ok = jsResultPayload.ok;
} catch (err) { // Catch ANY error during the process
reply.wall_ms = now() - wall0;
console.error(`${handlerLogPrefix} *** ERROR DURING EXECUTION PROCESS ***:`, err);
reply.ok = false;
reply.error = { type: err.name || 'JavaScriptError', message: err.message || String(err), traceback: err.stack || null };
if (err instanceof pyodide.ffi.PythonError) { reply.error.type = err.type || 'PythonError'; }
reply.stdout = reply.stdout || ''; reply.stderr = reply.stderr || ''; reply.result = reply.result || null; reply.elapsed = reply.elapsed || 0;
} finally {
/* === Step 6: Cleanup Proxies === */
console.log(`${handlerLogPrefix} Entering finally block for cleanup.`);
safeDestroy(pyResultProxy, "Result", handlerLogPrefix);
safeDestroy(micropipProxy, "Micropip", handlerLogPrefix);
// Only destroy namespace if it was temporary (non-REPL)
if (!msg?.repl_mode) { // Use optional chaining
safeDestroy(namespaceProxy, "Temporary Namespace", handlerLogPrefix);
} else {
console.log(`${handlerLogPrefix} Skipping destruction of persistent REPL namespace proxy.`);
}
console.log(`${handlerLogPrefix} Cleanup finished.`);
/* === Step 7: Send Reply via Exposed Function === */
console.log(`${handlerLogPrefix} *** Delivering final response via exposed function *** (ok=${reply.ok})`);
console.log(`${handlerLogPrefix} Reply payload:`, JSON.stringify(reply, null, 2));
try {
if (typeof window._deliverReplyToHost === 'function') { window._deliverReplyToHost(reply); console.log(`${handlerLogPrefix} Reply delivered.`); }
else { console.error(`${handlerLogPrefix} !!! ERROR: Host function _deliverReplyToHost not found!`); }
} catch (deliveryErr) { console.error(`${handlerLogPrefix} !!! FAILED TO DELIVER REPLY !!!`, deliveryErr); }
} // End finally block
/* Step 8: Heap Watchdog */
const currentHeapMB = heapMB();
if (Number.isFinite(currentHeapMB) && currentHeapMB > MEM_LIMIT_MB) {
console.warn(`${handlerLogPrefix}[WATCHDOG] Heap ${currentHeapMB.toFixed(0)}MB > limit ${MEM_LIMIT_MB}MB. Closing!`);
statusIndicator.textContent = `Heap limit exceeded (${currentHeapMB.toFixed(0)}MB). Closing...`;
statusIndicator.className = 'status status-error';
setTimeout(() => window.close(), 200);
}
}); // End message listener
console.log(`${logPrefix} Main message listener active.`);
window.postMessage({ ready: true, boot_ms: BOOT_MS, id: "pyodide_ready" });
console.log(`${logPrefix} Full Sandbox Ready.`);
} catch (err) { // Catch Initialization Errors
const initErrorMsg = `FATAL: Pyodide init failed: ${err.message || err}`;
console.error(`${logPrefix} ${initErrorMsg}`, err.stack || '(no stack)', err);
statusIndicator.textContent = initErrorMsg; statusIndicator.className = 'status status-error';
try { window.postMessage({ id: "pyodide_init_error", ok: false, error: { type: err.name || 'InitError', message: initErrorMsg, traceback: err.stack || null } });
} catch (postErr) { console.error(`${logPrefix} Failed to post init error:`, postErr); }
}
})(); // End main IIFE
</script>
</body>
</html>
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/clients/rag_client.py:
--------------------------------------------------------------------------------
```python
"""High-level client for RAG (Retrieval-Augmented Generation) operations."""
from typing import Any, Dict, List, Optional
from ultimate_mcp_server.services.knowledge_base import (
get_knowledge_base_manager,
get_knowledge_base_retriever,
get_rag_service,
)
from ultimate_mcp_server.utils import get_logger
logger = get_logger("ultimate_mcp_server.clients.rag")
class RAGClient:
"""
High-level client for Retrieval-Augmented Generation (RAG) operations.
The RAGClient provides a simplified, unified interface for building and using
RAG systems within the MCP ecosystem. It encapsulates all the key operations in
the RAG workflow, from knowledge base creation and document ingestion to context
retrieval and LLM-augmented generation.
RAG is a technique that enhances LLM capabilities by retrieving relevant information
from external knowledge bases before generation, allowing models to access information
beyond their training data and produce more accurate, up-to-date responses.
Key capabilities:
- Knowledge base creation and management
- Document ingestion with automatic chunking
- Semantic retrieval of relevant context
- LLM generation with retrieved context
- Various retrieval methods (vector, hybrid, keyword)
Architecture:
The client follows a modular architecture with three main components:
1. Knowledge Base Manager: Handles creation, deletion, and document ingestion
2. Knowledge Base Retriever: Responsible for context retrieval using various methods
3. RAG Service: Combines retrieval with LLM generation for complete RAG workflow
Performance Considerations:
- Document chunking size affects both retrieval quality and storage requirements
- Retrieval method selection impacts accuracy vs. speed tradeoffs:
* Vector search: Fast with good semantic understanding but may miss keyword matches
* Keyword search: Good for exact matches but misses semantic similarities
* Hybrid search: Most comprehensive but computationally more expensive
- Top-k parameter balances between providing sufficient context and relevance dilution
- Different LLM models may require different prompt templates for optimal performance
This client abstracts away the complexity of the underlying vector stores,
embeddings, and retrieval mechanisms, providing a simple API for RAG operations.
Example usage:
```python
# Create a RAG client
client = RAGClient()
# Create a knowledge base
await client.create_knowledge_base(
"company_docs",
"Company documentation and policies"
)
# Add documents to the knowledge base with metadata
await client.add_documents(
"company_docs",
documents=[
"Our return policy allows returns within 30 days of purchase with receipt.",
"Product warranties cover manufacturing defects for a period of one year."
],
metadatas=[
{"source": "policies/returns.pdf", "page": 1, "department": "customer_service"},
{"source": "policies/warranty.pdf", "page": 3, "department": "legal"}
],
chunk_size=500,
chunk_method="semantic"
)
# Retrieve context without generation (for inspection or custom handling)
context = await client.retrieve(
"company_docs",
query="What is our return policy?",
top_k=3,
retrieval_method="hybrid"
)
# Print retrieved context and sources
for i, (doc, meta) in enumerate(zip(context["documents"], context["metadatas"])):
print(f"Source: {meta.get('source', 'unknown')} | Score: {context['distances'][i]:.3f}")
print(f"Content: {doc[:100]}...\n")
# Generate a response using RAG with specific provider and template
result = await client.generate_with_rag(
"company_docs",
query="Explain our warranty coverage for electronics",
provider="openai",
model="gpt-4",
template="customer_service_response",
temperature=0.3,
retrieval_method="hybrid"
)
print(result["response"])
print("\nSources:")
for source in result.get("sources", []):
print(f"- {source['metadata'].get('source')}")
```
"""
def __init__(self):
"""Initialize the RAG client."""
self.kb_manager = get_knowledge_base_manager()
self.kb_retriever = get_knowledge_base_retriever()
self.rag_service = get_rag_service()
async def create_knowledge_base(
self,
name: str,
description: Optional[str] = None,
overwrite: bool = False
) -> Dict[str, Any]:
"""Create a knowledge base.
Args:
name: The name of the knowledge base
description: Optional description
overwrite: Whether to overwrite an existing KB with the same name
Returns:
Result of the operation
"""
logger.info(f"Creating knowledge base: {name}", emoji_key="processing")
try:
result = await self.kb_manager.create_knowledge_base(
name=name,
description=description,
overwrite=overwrite
)
logger.success(f"Knowledge base created: {name}", emoji_key="success")
return result
except Exception as e:
logger.error(f"Failed to create knowledge base: {str(e)}", emoji_key="error")
raise
async def add_documents(
self,
knowledge_base_name: str,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
chunk_size: int = 1000,
chunk_method: str = "semantic"
) -> Dict[str, Any]:
"""Add documents to the knowledge base.
Args:
knowledge_base_name: Name of the knowledge base to add to
documents: List of document texts
metadatas: Optional list of metadata dictionaries
chunk_size: Size of chunks to split documents into
chunk_method: Method to use for chunking ('simple', 'semantic', etc.)
Returns:
Result of the operation
"""
logger.info(f"Adding documents to knowledge base: {knowledge_base_name}", emoji_key="processing")
try:
result = await self.kb_manager.add_documents(
knowledge_base_name=knowledge_base_name,
documents=documents,
metadatas=metadatas,
chunk_size=chunk_size,
chunk_method=chunk_method
)
added_count = result.get("added_count", 0)
logger.success(f"Added {added_count} documents to knowledge base", emoji_key="success")
return result
except Exception as e:
logger.error(f"Failed to add documents: {str(e)}", emoji_key="error")
raise
async def list_knowledge_bases(self) -> List[Any]:
"""List all knowledge bases.
Returns:
List of knowledge base information
"""
logger.info("Retrieving list of knowledge bases", emoji_key="processing")
try:
knowledge_bases = await self.kb_manager.list_knowledge_bases()
return knowledge_bases
except Exception as e:
logger.error(f"Failed to list knowledge bases: {str(e)}", emoji_key="error")
raise
async def retrieve(
self,
knowledge_base_name: str,
query: str,
top_k: int = 3,
retrieval_method: str = "vector"
) -> Dict[str, Any]:
"""
Retrieve relevant documents from a knowledge base for a given query.
This method performs the retrieval stage of RAG, finding the most relevant
documents in the knowledge base based on the query. The method is useful
for standalone retrieval operations or when you want to examine retrieved
context before generating a response.
The retrieval process works by:
1. Converting the query into a vector representation (embedding)
2. Finding documents with similar embeddings and/or matching keywords
3. Ranking and returning the most relevant documents
Available retrieval methods:
- "vector": Embedding-based similarity search using cosine distance
Best for conceptual/semantic queries where exact wording may differ
- "keyword": Traditional text search using BM25 or similar algorithms
Best for queries with specific terms that must be matched
- "hybrid": Combines vector and keyword approaches with a weighted blend
Good general-purpose approach that balances semantic and keyword matching
- "rerank": Two-stage retrieval that first gets candidates, then reranks them
More computationally intensive but often more accurate
Optimization strategies:
- For factual queries with specific terminology, use "keyword" or "hybrid"
- For conceptual or paraphrased queries, use "vector"
- For highest accuracy at cost of performance, use "rerank"
- Adjust top_k based on document length; shorter documents may need higher top_k
- Pre-filter by metadata before retrieval when targeting specific sections
Understanding retrieval metrics:
- "distances" represent similarity scores where lower values indicate higher similarity
for vector search, and higher values indicate better matches for keyword search
- Scores are normalized differently between retrieval methods, so direct
comparison between methods is not meaningful
- Score thresholds for "good matches" vary based on embedding model and content domain
Args:
knowledge_base_name: Name of the knowledge base to search
query: The search query (question or keywords)
top_k: Maximum number of documents to retrieve (default: 3)
retrieval_method: Method to use for retrieval ("vector", "keyword", "hybrid", "rerank")
Returns:
Dictionary containing:
- "documents": List of retrieved documents (text chunks)
- "metadatas": List of metadata for each document (source info, etc.)
- "distances": List of similarity scores or relevance metrics
(interpretation depends on retrieval_method)
- "query": The original query
- "retrieval_method": The method used for retrieval
- "processing_time_ms": Time taken for retrieval in milliseconds
Raises:
ValueError: If knowledge_base_name doesn't exist or query is invalid
Exception: If the retrieval process fails
Example:
```python
# Retrieve context about product returns
results = await rag_client.retrieve(
knowledge_base_name="support_docs",
query="How do I return a product?",
top_k=5,
retrieval_method="hybrid"
)
# Check if we got high-quality matches
if results["documents"] and min(results["distances"]) < 0.3: # Good match threshold
print("Found relevant information!")
else:
print("No strong matches found, consider reformulating the query")
# Display retrieved documents and their sources with scores
for i, (doc, meta) in enumerate(zip(results["documents"], results["metadatas"])):
score = results["distances"][i]
score_indicator = "🟢" if score < 0.3 else "🟡" if score < 0.6 else "🔴"
print(f"{score_indicator} Result {i+1} (score: {score:.3f}):")
print(f"Source: {meta.get('source', 'unknown')}")
print(doc[:100] + "...\n")
```
"""
logger.info(f"Retrieving context for query: '{query}'", emoji_key="processing")
try:
results = await self.kb_retriever.retrieve(
knowledge_base_name=knowledge_base_name,
query=query,
top_k=top_k,
retrieval_method=retrieval_method
)
return results
except Exception as e:
logger.error(f"Failed to retrieve context: {str(e)}", emoji_key="error")
raise
async def generate_with_rag(
self,
knowledge_base_name: str,
query: str,
provider: str = "gemini",
model: Optional[str] = None,
template: str = "rag_default",
temperature: float = 0.3,
top_k: int = 3,
retrieval_method: str = "hybrid",
include_sources: bool = True
) -> Dict[str, Any]:
"""
Generate a response using Retrieval-Augmented Generation (RAG).
This method performs the complete RAG process:
1. Retrieves relevant documents from the specified knowledge base based on the query
2. Constructs a prompt that includes both the query and retrieved context
3. Sends the augmented prompt to the LLM to generate a response
4. Optionally includes source information for transparency and citation
The retrieval process can use different methods:
- "vector": Pure semantic/embedding-based similarity search (good for conceptual queries)
- "keyword": Traditional keyword-based search (good for specific terms or phrases)
- "hybrid": Combines vector and keyword approaches (good general-purpose approach)
- "rerank": Uses a two-stage approach with retrieval and reranking
Args:
knowledge_base_name: Name of the knowledge base to query for relevant context
query: The user's question or request
provider: LLM provider to use for generation (e.g., "openai", "anthropic", "gemini")
model: Specific model to use (if None, uses provider's default)
template: Prompt template name that defines how to format the RAG prompt
Different templates can be optimized for different use cases
temperature: Sampling temperature for controlling randomness (0.0-1.0)
Lower values recommended for factual RAG responses
top_k: Number of relevant documents to retrieve and include in the context
Higher values provide more context but may dilute relevance
retrieval_method: The method to use for retrieving documents ("vector", "keyword", "hybrid")
include_sources: Whether to include source information in the output for citations
Returns:
Dictionary containing:
- "response": The generated text response from the LLM
- "sources": List of source documents and their metadata (if include_sources=True)
- "context": The retrieved context that was used for generation
- "prompt": The full prompt that was sent to the LLM (useful for debugging)
- "tokens": Token usage information (input, output, total)
- "processing_time_ms": Total processing time in milliseconds
Raises:
ValueError: If knowledge_base_name or query is invalid
Exception: If retrieval or generation fails
Example:
```python
# Generate a response using RAG with custom parameters
result = await rag_client.generate_with_rag(
knowledge_base_name="financial_docs",
query="What were our Q3 earnings?",
provider="openai",
model="gpt-4",
temperature=0.1,
top_k=5,
retrieval_method="hybrid"
)
# Access the response and sources
print(result["response"])
for src in result["sources"]:
print(f"- {src['metadata']['source']} (relevance: {src['score']})")
```
"""
logger.info(f"Generating RAG response for: '{query}'", emoji_key="processing")
try:
result = await self.rag_service.generate_with_rag(
knowledge_base_name=knowledge_base_name,
query=query,
provider=provider,
model=model,
template=template,
temperature=temperature,
top_k=top_k,
retrieval_method=retrieval_method,
include_sources=include_sources
)
return result
except Exception as e:
logger.error(f"Failed to call RAG service: {str(e)}", emoji_key="error")
raise
async def delete_knowledge_base(self, name: str) -> Dict[str, Any]:
"""Delete a knowledge base.
Args:
name: Name of the knowledge base to delete
Returns:
Result of the operation
"""
logger.info(f"Deleting knowledge base: {name}", emoji_key="processing")
try:
result = await self.kb_manager.delete_knowledge_base(name=name)
logger.success(f"Knowledge base {name} deleted successfully", emoji_key="success")
return result
except Exception as e:
logger.error(f"Failed to delete knowledge base: {str(e)}", emoji_key="error")
raise
async def reset_knowledge_base(self, knowledge_base_name: str) -> None:
"""
Reset (delete and recreate) a knowledge base.
This method completely removes an existing knowledge base and creates a new
empty one with the same name. This is useful when you need to:
- Remove all documents from a knowledge base efficiently
- Fix a corrupted knowledge base
- Change the underlying embedding model without renaming the knowledge base
- Update document chunking strategy for an entire collection
- Clear outdated information before a complete refresh
Performance Considerations:
- Resetting is significantly faster than removing documents individually
- For large knowledge bases, resetting and bulk re-adding documents can be
orders of magnitude more efficient than incremental updates
- New documents added after reset will use any updated embedding models or
chunking strategies configured in the system
Data Integrity:
- This operation preserves knowledge base configuration but removes all content
- The knowledge base name and any associated permissions remain intact
- Custom configuration settings on the knowledge base will be preserved
if the knowledge base service supports configuration persistence
WARNING: This operation is irreversible. All documents and their embeddings
will be permanently deleted. Consider backing up important data before resetting.
Disaster Recovery:
Before resetting a production knowledge base, consider these strategies:
1. Create a backup by exporting documents and metadata if the feature is available
2. Maintain source documents in original form outside the knowledge base
3. Document the ingestion pipeline to reproduce the knowledge base if needed
4. Consider creating a temporary duplicate before resetting critical knowledge bases
The reset process:
1. Deletes the entire knowledge base collection/index
2. Creates a new empty knowledge base with the same name
3. Re-initializes any associated metadata or settings
Args:
knowledge_base_name: Name of the knowledge base to reset
Returns:
None
Raises:
ValueError: If the knowledge base doesn't exist
Exception: If deletion or recreation fails
Example:
```python
# Backup critical metadata before reset (if needed)
kb_info = await rag_client.list_knowledge_bases()
kb_config = next((kb for kb in kb_info if kb['name'] == 'product_documentation'), None)
# Reset a knowledge base that may have outdated or corrupted data
await rag_client.reset_knowledge_base("product_documentation")
# After resetting, re-add documents with potentially improved chunking strategy
await rag_client.add_documents(
knowledge_base_name="product_documentation",
documents=updated_docs,
metadatas=updated_metadatas,
chunk_size=800, # Updated chunk size better suited for the content
chunk_method="semantic" # Using semantic chunking for better results
)
```
"""
logger.info(f"Deleting knowledge base: {knowledge_base_name}", emoji_key="processing")
try:
result = await self.kb_manager.delete_knowledge_base(name=knowledge_base_name)
logger.success(f"Knowledge base {knowledge_base_name} deleted successfully", emoji_key="success")
return result
except Exception as e:
logger.error(f"Failed to delete knowledge base: {str(e)}", emoji_key="error")
raise
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/document.py:
--------------------------------------------------------------------------------
```python
"""Document processing service for chunking and analyzing text documents."""
import re
from typing import List
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
class DocumentProcessor:
"""
Service for intelligent text document processing, chunking, and preparation.
The DocumentProcessor provides sophisticated document handling capabilities
focused on breaking down long documents into meaningful, properly-sized chunks
optimized for various downstream NLP tasks such as embedding generation,
semantic search, and RAG (Retrieval Augmented Generation).
Key Features:
- Multiple chunking strategies optimized for different content types
- Configurable chunk size and overlap parameters
- Semantic-aware chunking that preserves context and meaning
- Sentence boundary detection for natural text segmentation
- Token-based chunking for precise size control
- Singleton implementation for efficient resource usage
Chunking Methods:
1. Semantic Chunking: Preserves paragraph structure and semantic meaning,
preventing splits that would break logical content boundaries. Best for
maintaining context in well-structured documents.
2. Sentence Chunking: Splits documents at sentence boundaries, ensuring
no sentence is broken across chunks. Ideal for natural language text
where sentence integrity is important.
3. Token Chunking: Divides text based on approximate token counts without
special consideration for semantic boundaries. Provides the most precise
control over chunk size for token-limited systems.
Each method implements configurable overlap between chunks to maintain
context across chunk boundaries, ensuring information isn't lost when a
concept spans multiple chunks.
Usage Example:
```python
processor = get_document_processor()
# Chunk a document with default settings (token-based)
chunks = await processor.chunk_document(
document=long_text,
chunk_size=1000,
chunk_overlap=200
)
# Use semantic chunking for a well-structured document
semantic_chunks = await processor.chunk_document(
document=article_text,
chunk_size=1500,
chunk_overlap=150,
method="semantic"
)
# Process chunks for embedding or RAG
for chunk in chunks:
# Process each chunk...
```
Note:
This service implements the singleton pattern, ensuring only one instance
exists throughout the application. Always use the get_document_processor()
function to obtain the shared instance rather than creating instances directly.
"""
_instance = None
def __new__(cls, *args, **kwargs):
"""Create a singleton instance."""
if cls._instance is None:
cls._instance = super(DocumentProcessor, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
"""Initialize the document processor."""
# Only initialize once for singleton
if getattr(self, "_initialized", False):
return
logger.info("Document processor initialized", extra={"emoji_key": "success"})
self._initialized = True
async def chunk_document(
self,
document: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
method: str = "token"
) -> List[str]:
"""
Split a document into optimally sized, potentially overlapping chunks.
This method intelligently divides a document into smaller segments using
one of several chunking strategies, balancing chunk size requirements with
preserving semantic coherence. The chunking process is critical for preparing
documents for embedding, retrieval, and other NLP operations that have
input size limitations or benefit from focused context.
Chunking Methods:
- "token": (Default) Splits text based on approximate token count.
Simple and precise for size control, but may break semantic units.
- "sentence": Preserves sentence boundaries, ensuring no sentence is broken
across chunks. Better for maintaining local context and readability.
- "semantic": Most sophisticated approach that attempts to preserve paragraph
structure and semantic coherence. Best for maintaining document meaning
but may result in more size variation between chunks.
The chunk_size parameter is approximate for all methods, as they prioritize
maintaining semantic boundaries where appropriate. The actual size of returned
chunks may vary, especially when using sentence or semantic methods.
Chunk overlap creates a sliding window effect, where the end of one chunk
overlaps with the beginning of the next. This helps maintain context across
chunk boundaries and improves retrieval quality by ensuring concepts that
span multiple chunks can still be found.
Selecting Parameters:
- For embedding models with strict token limits: Use "token" with chunk_size
set safely below the model's limit
- For maximizing context preservation: Use "semantic" with larger overlap
- For balancing size precision and sentence integrity: Use "sentence"
- Larger overlap (25-50% of chunk_size) improves retrieval quality but
increases storage and processing requirements
Args:
document: Text content to be chunked
chunk_size: Target size of each chunk in approximate tokens (default: 1000)
chunk_overlap: Number of tokens to overlap between chunks (default: 200)
method: Chunking strategy to use ("token", "sentence", or "semantic")
Returns:
List of text chunks derived from the original document
Note:
Returns an empty list if the input document is empty or None.
The token estimation is approximate and based on whitespace splitting,
not a true tokenizer, so actual token counts may differ when processed
by specific models.
"""
if not document:
return []
logger.debug(
f"Chunking document using method '{method}' (size: {chunk_size}, overlap: {chunk_overlap})",
extra={"emoji_key": "processing"}
)
if method == "semantic":
return await self._chunk_semantic(document, chunk_size, chunk_overlap)
elif method == "sentence":
return await self._chunk_by_sentence(document, chunk_size, chunk_overlap)
else:
# Default to token-based chunking
return await self._chunk_by_tokens(document, chunk_size, chunk_overlap)
async def _chunk_by_tokens(
self,
document: str,
chunk_size: int = 1000,
chunk_overlap: int = 200
) -> List[str]:
"""
Split document into chunks by approximate token count without preserving semantic structures.
This is the most straightforward chunking method, dividing text based solely
on approximate token counts without special consideration for sentence or
paragraph boundaries. It provides the most predictable and precise control
over chunk sizes at the cost of potentially breaking semantic units like
sentences or paragraphs.
Algorithm implementation:
1. Approximates tokens by splitting text on whitespace (creating "words")
2. Divides the document into chunks of specified token length
3. Implements sliding window overlaps between consecutive chunks
4. Handles edge cases like empty documents and final chunks
The token approximation used is simple whitespace splitting, which provides
a reasonable estimation for most Western languages and common tokenization
schemes. While not as accurate as model-specific tokenizers, it offers a
good balance between performance and approximation quality for general use.
Chunk overlap is implemented by including tokens from the end of one chunk
at the beginning of the next, creating a sliding window effect that helps
maintain context across chunk boundaries.
This method is ideal for:
- Working with strict token limits in downstream models
- Processing text where exact chunk sizes are more important than
preserving semantic structures
- High-volume processing where simplicity and performance are priorities
- Text with unusual or inconsistent formatting where sentence/paragraph
detection might fail
Args:
document: Text content to split by tokens
chunk_size: Number of tokens (words) per chunk
chunk_overlap: Number of tokens to overlap between chunks
Returns:
List of text chunks of approximately equal token counts
Note:
True token counts in NLP models may differ from this approximation,
especially for models with subword tokenization. For applications
requiring exact token counts, consider using the model's specific
tokenizer for more accurate size estimates.
"""
# Simple token estimation (split by whitespace)
words = document.split()
# No words, return empty list
if not words:
return []
# Simple chunking
chunks = []
start = 0
while start < len(words):
# Calculate end position with potential overlap
end = min(start + chunk_size, len(words))
# Create chunk
chunk = " ".join(words[start:end])
chunks.append(chunk)
# Move to next chunk with overlap
start = end - chunk_overlap
# Avoid getting stuck at the end
if start >= len(words) - chunk_overlap:
break
logger.debug(
f"Split document into {len(chunks)} chunks by token",
extra={"emoji_key": "processing"}
)
return chunks
async def _chunk_by_sentence(
self,
document: str,
chunk_size: int = 1000,
chunk_overlap: int = 200
) -> List[str]:
"""
Split document into chunks by preserving complete sentences.
This chunking method respects sentence boundaries when dividing documents,
ensuring that no sentence is fragmented across multiple chunks. It balances
chunk size requirements with maintaining the integrity of natural language
structures, producing more readable and semantically coherent chunks than
simple token-based approaches.
Algorithm details:
1. Detects sentence boundaries using regular expressions that handle:
- Standard end punctuation (.!?)
- Common abbreviations (Mr., Dr., etc.)
- Edge cases like decimal numbers or acronyms
2. Builds chunks by adding complete sentences until the target chunk size
is approached
3. Creates overlap between chunks by including ending sentences from the
previous chunk at the beginning of the next chunk
4. Maintains approximate token count targets while prioritizing sentence
integrity
The sentence detection uses a regex pattern that aims to balance accuracy
with simplicity and efficiency. It identifies likely sentence boundaries by:
- Looking for punctuation marks followed by whitespace
- Excluding common patterns that are not sentence boundaries (e.g., "Mr.")
- Handling basic cases like quotes and parentheses
This method is ideal for:
- Natural language text where sentence flow is important
- Content where breaking mid-sentence would harm readability or context
- General purpose document processing where semantic units matter
- Documents that don't have clear paragraph structure
Args:
document: Text content to split by sentences
chunk_size: Target approximate size per chunk in tokens
chunk_overlap: Number of tokens to overlap between chunks
Returns:
List of document chunks with complete sentences
Note:
The sentence detection uses regex patterns that work well for most
standard English text but may not handle all edge cases perfectly.
For specialized text with unusual punctuation patterns, additional
customization may be needed.
"""
# Simple sentence splitting (not perfect but works for most cases)
sentence_delimiters = r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s'
sentences = re.split(sentence_delimiters, document)
sentences = [s.strip() for s in sentences if s.strip()]
# No sentences, return empty list
if not sentences:
return []
# Chunk by sentences, trying to reach target size
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
# Estimate size in tokens (approximate)
sentence_size = len(sentence.split())
# If adding this sentence exceeds the chunk size and we have content,
# finalize the current chunk
if current_chunk and current_size + sentence_size > chunk_size:
chunks.append(" ".join(current_chunk))
# Start new chunk with overlap
overlap_size = 0
overlap_chunk = []
# Add sentences from the end of previous chunk for overlap
for s in reversed(current_chunk):
s_size = len(s.split())
if overlap_size + s_size <= chunk_overlap:
overlap_chunk.insert(0, s)
overlap_size += s_size
else:
break
current_chunk = overlap_chunk
current_size = overlap_size
# Add current sentence
current_chunk.append(sentence)
current_size += sentence_size
# Add the last chunk if not empty
if current_chunk:
chunks.append(" ".join(current_chunk))
logger.debug(
f"Split document into {len(chunks)} chunks by sentence",
extra={"emoji_key": "processing"}
)
return chunks
async def _chunk_semantic(
self,
document: str,
chunk_size: int = 1000,
chunk_overlap: int = 200
) -> List[str]:
"""
Split document into chunks by semantic meaning, preserving paragraph structure.
This advanced chunking method attempts to maintain the semantic coherence and
natural structure of the document by respecting paragraph boundaries whenever
possible. It implements a hierarchical approach that:
1. First divides the document by paragraph breaks (blank lines)
2. Evaluates each paragraph for length
3. Keeps short and medium paragraphs intact to preserve their meaning
4. Further splits overly long paragraphs using sentence boundary detection
5. Assembles chunks with appropriate overlap for context continuity
The algorithm prioritizes three key aspects of document structure:
- Paragraph integrity: Treats paragraphs as coherent units of thought
- Logical flow: Maintains document organization when possible
- Size constraints: Respects chunk size limitations for downstream processing
Implementation details:
- Double newlines (\n\n) are treated as paragraph boundaries
- If a document lacks clear paragraph structure (e.g., single paragraph),
it falls back to sentence-based chunking
- For paragraphs exceeding the chunk size, sentence-based chunking is applied
- Context preservation is achieved by ensuring the last paragraph of a chunk
becomes the first paragraph of the next chunk (when appropriate)
This method is ideal for:
- Well-structured documents like articles, papers, or reports
- Content where paragraph organization conveys meaning
- Documents where natural breaks exist between conceptual sections
- Cases where preserving document structure improves retrieval quality
Args:
document: Text content to split semantically
chunk_size: Maximum approximate size per chunk in tokens
chunk_overlap: Number of tokens to overlap between chunks
Returns:
List of semantic chunks with paragraph structure preserved
Note:
Chunk sizes may vary more with semantic chunking than with other methods,
as maintaining coherent paragraph groups takes precedence over exact
size enforcement. For strict size control, use token-based chunking.
"""
# For simplicity, this implementation is similar to sentence chunking
# but with paragraph awareness
# Split by paragraphs first
paragraphs = [p.strip() for p in document.split("\n\n") if p.strip()]
# Fallback to sentence chunking if no clear paragraphs
if len(paragraphs) <= 1:
return await self._chunk_by_sentence(document, chunk_size, chunk_overlap)
# Process each paragraph and create semantic chunks
chunks = []
current_chunk = []
current_size = 0
for paragraph in paragraphs:
# Estimate size in tokens
paragraph_size = len(paragraph.split())
# If paragraph is very large, chunk it further
if paragraph_size > chunk_size:
# Add current chunk if not empty
if current_chunk:
chunks.append("\n\n".join(current_chunk))
current_chunk = []
current_size = 0
# Chunk large paragraph by sentences
paragraph_chunks = await self._chunk_by_sentence(
paragraph, chunk_size, chunk_overlap
)
chunks.extend(paragraph_chunks)
continue
# If adding this paragraph exceeds the chunk size and we have content,
# finalize the current chunk
if current_chunk and current_size + paragraph_size > chunk_size:
chunks.append("\n\n".join(current_chunk))
# Start new chunk with last paragraph for better context
if current_chunk[-1] != paragraph and len(current_chunk) > 0:
current_chunk = [current_chunk[-1]]
current_size = len(current_chunk[-1].split())
else:
current_chunk = []
current_size = 0
# Add current paragraph
current_chunk.append(paragraph)
current_size += paragraph_size
# Add the last chunk if not empty
if current_chunk:
chunks.append("\n\n".join(current_chunk))
logger.debug(
f"Split document into {len(chunks)} chunks semantically",
extra={"emoji_key": "processing"}
)
return chunks
# Singleton instance
_document_processor = None
def get_document_processor() -> DocumentProcessor:
"""
Get or create the singleton DocumentProcessor instance.
This function implements the singleton pattern for the DocumentProcessor class,
ensuring that only one instance is created and shared throughout the application.
It provides a consistent, centralized access point for document processing
capabilities while conserving system resources.
Using a singleton for the DocumentProcessor offers several benefits:
- Resource efficiency: Prevents multiple instantiations of the processor
- Consistency: Ensures all components use the same processing configuration
- Centralized access: Provides a clean API for obtaining the processor
- Lazy initialization: Creates the instance only when first needed
This function should be used instead of directly instantiating the
DocumentProcessor class to maintain the singleton pattern and ensure
proper initialization.
Returns:
The shared DocumentProcessor instance
Usage Example:
```python
# Get the document processor from anywhere in the codebase
processor = get_document_processor()
# Use the processor's methods
chunks = await processor.chunk_document(document_text)
```
Note:
Even though the DocumentProcessor class itself implements singleton logic in
its __new__ method, this function is the preferred access point as it handles
the global instance management and follows the established pattern used
throughout the MCP server codebase.
"""
global _document_processor
if _document_processor is None:
_document_processor = DocumentProcessor()
return _document_processor
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/rag.py:
--------------------------------------------------------------------------------
```python
"""MCP tools for Retrieval-Augmented Generation (RAG).
Provides functions to create, manage, and query knowledge bases (vector stores)
and generate text responses augmented with retrieved context.
"""
import re
from typing import Any, Dict, List, Optional
# Import specific exceptions for better error handling hints
from ultimate_mcp_server.exceptions import ProviderError, ResourceError, ToolInputError
from ultimate_mcp_server.services import get_rag_engine
# Moved imports for services to the top level
from ultimate_mcp_server.services.knowledge_base import (
get_knowledge_base_manager,
get_knowledge_base_retriever,
)
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
# --- Service Lazy Initialization ---
_kb_manager = None
_kb_retriever = None
_rag_engine = None
def _get_kb_manager():
"""Lazily initializes and returns the Knowledge Base Manager."""
global _kb_manager
if _kb_manager is None:
logger.debug("Initializing KnowledgeBaseManager...")
_kb_manager = get_knowledge_base_manager()
logger.info("KnowledgeBaseManager initialized.")
return _kb_manager
def _get_kb_retriever():
"""Lazily initializes and returns the Knowledge Base Retriever."""
global _kb_retriever
if _kb_retriever is None:
logger.debug("Initializing KnowledgeBaseRetriever...")
_kb_retriever = get_knowledge_base_retriever()
logger.info("KnowledgeBaseRetriever initialized.")
return _kb_retriever
def _get_rag_engine():
"""Lazily initializes and returns the RAG Engine."""
global _rag_engine
if _rag_engine is None:
logger.debug("Initializing RAGEngine...")
_rag_engine = get_rag_engine()
logger.info("RAGEngine initialized.")
return _rag_engine
# --- Standalone Tool Functions ---
@with_tool_metrics
@with_error_handling
async def create_knowledge_base(
name: str,
description: Optional[str] = None,
embedding_model: Optional[str] = None,
overwrite: bool = False
) -> Dict[str, Any]:
"""Creates a new, empty knowledge base (vector store) to hold documents.
This is the first step before adding documents.
Args:
name: A unique name for the knowledge base (e.g., "project_docs_v1").
Must be a valid identifier (letters, numbers, underscores).
description: (Optional) A brief description of the knowledge base's content or purpose.
embedding_model: (Optional) The specific embedding model ID to use for this knowledge base
(e.g., "openai/text-embedding-3-small"). If None, uses the system default.
Consistency is important; use the same model when adding documents later.
overwrite: (Optional) If True, deletes and recreates the knowledge base if one with the
same name already exists. Defaults to False (raises an error if exists).
Returns:
A dictionary confirming the creation:
{
"success": true,
"name": "project_docs_v1",
"message": "Knowledge base 'project_docs_v1' created successfully."
}
or an error dictionary if creation failed:
{
"success": false,
"name": "project_docs_v1",
"error": "Knowledge base 'project_docs_v1' already exists."
}
Raises:
ResourceError: If the knowledge base already exists (and overwrite=False) or
if there's an issue during creation (e.g., invalid name).
ToolInputError: If the provided name is invalid.
"""
# Input validation (basic example)
if not name or not re.match(r"^[a-zA-Z0-9_]+$", name):
raise ToolInputError(f"Invalid knowledge base name: '{name}'. Use only letters, numbers, underscores.")
kb_manager = _get_kb_manager() # Use lazy getter
try:
result = await kb_manager.create_knowledge_base(
name=name,
description=description,
embedding_model=embedding_model,
overwrite=overwrite
)
return result
except Exception as e:
logger.error(f"Failed to create knowledge base '{name}': {e}", exc_info=True)
# Re-raise specific error if possible, otherwise wrap
if isinstance(e, (ResourceError, ToolInputError)):
raise
raise ResourceError(f"Failed to create knowledge base '{name}': {str(e)}", resource_type="knowledge_base", resource_id=name, cause=e) from e
@with_tool_metrics
@with_error_handling
async def list_knowledge_bases() -> Dict[str, Any]:
"""Lists all available knowledge bases and their metadata.
Returns:
A dictionary containing a list of knowledge base details:
{
"success": true,
"knowledge_bases": [
{
"name": "project_docs_v1",
"description": "Documentation for Project X",
"embedding_model": "openai/text-embedding-3-small",
"document_count": 150,
"created_at": "2023-10-27T10:00:00Z"
},
{ ... } # Other knowledge bases
]
}
or an error dictionary:
{
"success": false,
"error": "Failed to retrieve knowledge base list."
}
Raises:
ResourceError: If there's an issue retrieving the list from the backend.
"""
kb_manager = _get_kb_manager()
try:
result = await kb_manager.list_knowledge_bases()
return result
except Exception as e:
logger.error(f"Failed to list knowledge bases: {e}", exc_info=True)
raise ResourceError(f"Failed to list knowledge bases: {str(e)}", resource_type="knowledge_base", cause=e) from e
@with_tool_metrics
@with_error_handling
async def delete_knowledge_base(name: str) -> Dict[str, Any]:
"""Deletes an existing knowledge base and all its documents.
Warning: This action is irreversible.
Args:
name: The exact name of the knowledge base to delete.
Returns:
A dictionary confirming the deletion:
{
"success": true,
"name": "project_docs_v1",
"message": "Knowledge base 'project_docs_v1' deleted successfully."
}
or an error dictionary:
{
"success": false,
"name": "project_docs_v1",
"error": "Knowledge base 'project_docs_v1' not found."
}
Raises:
ResourceError: If the knowledge base doesn't exist or if deletion fails.
ToolInputError: If the provided name is invalid.
"""
if not name:
raise ToolInputError("Knowledge base name cannot be empty.")
kb_manager = _get_kb_manager()
try:
result = await kb_manager.delete_knowledge_base(name)
return result
except Exception as e:
logger.error(f"Failed to delete knowledge base '{name}': {e}", exc_info=True)
if isinstance(e, (ResourceError, ToolInputError)):
raise
raise ResourceError(f"Failed to delete knowledge base '{name}': {str(e)}", resource_type="knowledge_base", resource_id=name, cause=e) from e
@with_tool_metrics
@with_error_handling
async def add_documents(
knowledge_base_name: str,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
chunk_size: int = 1000,
chunk_overlap: int = 200,
chunk_method: str = "semantic",
embedding_model: Optional[str] = None
) -> Dict[str, Any]:
"""Adds one or more documents to a specified knowledge base.
The documents are split into chunks, embedded, and stored for later retrieval.
Args:
knowledge_base_name: The name of the existing knowledge base to add documents to.
documents: A list of strings, where each string is the full text content of a document.
metadatas: (Optional) A list of dictionaries, one for each document in the `documents` list.
Each dictionary should contain metadata relevant to the corresponding document
(e.g., {"source": "filename.pdf", "page": 1, "author": "Alice"}).
This metadata is stored alongside the document chunks and can be used for filtering during retrieval.
If provided, `len(metadatas)` MUST equal `len(documents)`.
chunk_size: (Optional) The target size for document chunks. Interpretation depends on `chunk_method`
(e.g., tokens for "token" method, characters for "character", approximate size for "semantic").
Defaults to 1000.
chunk_overlap: (Optional) The number of units (tokens, characters) to overlap between consecutive chunks.
Helps maintain context across chunk boundaries. Defaults to 200.
chunk_method: (Optional) The method used for splitting documents into chunks.
Options: "semantic" (attempts to split at meaningful semantic boundaries, recommended),
"token" (splits by token count using tiktoken), "sentence" (splits by sentence).
Defaults to "semantic".
embedding_model: (Optional) The specific embedding model ID to use. If None, uses the model
associated with the knowledge base (or the system default if none was specified
at creation). It's best practice to ensure this matches the KB's model.
Returns:
A dictionary summarizing the addition process:
{
"success": true,
"knowledge_base_name": "project_docs_v1",
"documents_added": 5,
"chunks_created": 75,
"message": "Successfully added 5 documents (75 chunks) to 'project_docs_v1'."
}
or an error dictionary:
{
"success": false,
"knowledge_base_name": "project_docs_v1",
"error": "Knowledge base 'project_docs_v1' not found."
}
Raises:
ResourceError: If the knowledge base doesn't exist or if there's an error during processing/storage.
ToolInputError: If inputs are invalid (e.g., documents/metadatas length mismatch, invalid chunk_method).
ProviderError: If the LLM provider fails during generation.
"""
if not knowledge_base_name:
raise ToolInputError("Knowledge base name cannot be empty.")
if not documents or not isinstance(documents, list) or not all(isinstance(d, str) for d in documents):
raise ToolInputError("'documents' must be a non-empty list of strings.")
if metadatas and (not isinstance(metadatas, list) or len(metadatas) != len(documents)):
raise ToolInputError("'metadatas', if provided, must be a list with the same length as 'documents'.")
if chunk_method not in ["semantic", "token", "sentence", "character", "paragraph"]: # Added more methods
raise ToolInputError(f"Invalid chunk_method: '{chunk_method}'. Must be one of: semantic, token, sentence, character, paragraph.")
kb_manager = _get_kb_manager()
try:
result = await kb_manager.add_documents(
knowledge_base_name=knowledge_base_name,
documents=documents,
metadatas=metadatas,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
chunk_method=chunk_method,
embedding_model=embedding_model
)
return result
except Exception as e:
logger.error(f"Failed to add documents to knowledge base '{knowledge_base_name}': {e}", exc_info=True)
if isinstance(e, (ResourceError, ToolInputError, ProviderError)):
raise
raise ResourceError(f"Failed to add documents to knowledge base '{knowledge_base_name}': {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e
@with_tool_metrics
@with_error_handling
async def retrieve_context(
knowledge_base_name: str,
query: str,
top_k: int = 5,
retrieval_method: str = "vector",
min_score: Optional[float] = None, # Changed default to None
metadata_filter: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Retrieves relevant document chunks (context) from a knowledge base based on a query.
Searches the specified knowledge base for chunks semantically similar to the query.
Args:
knowledge_base_name: The name of the knowledge base to query.
query: The text query to search for relevant context.
top_k: (Optional) The maximum number of relevant chunks to retrieve. Defaults to 5.
retrieval_method: (Optional) The method used for retrieval.
Options: "vector" (semantic similarity search), "hybrid" (combines vector search
with keyword-based search, may require specific backend support).
Defaults to "vector".
min_score: (Optional) The minimum similarity score (typically between 0 and 1) for a chunk
to be included in the results. Higher values mean stricter relevance.
If None (default), the backend decides or no filtering is applied.
metadata_filter: (Optional) A dictionary used to filter results based on metadata associated
with the chunks during `add_documents`. Filters use exact matches.
Example: {"source": "filename.pdf", "page": 5}
Example: {"author": "Alice"}
Defaults to None (no metadata filtering).
Returns:
A dictionary containing the retrieved context:
{
"success": true,
"query": "What are the project goals?",
"knowledge_base_name": "project_docs_v1",
"retrieved_chunks": [
{
"content": "The main goal of Project X is to improve user engagement...",
"score": 0.85,
"metadata": {"source": "project_plan.docx", "page": 1}
},
{ ... } # Other relevant chunks
]
}
or an error dictionary:
{
"success": false,
"knowledge_base_name": "project_docs_v1",
"error": "Knowledge base 'project_docs_v1' not found."
}
Raises:
ResourceError: If the knowledge base doesn't exist or retrieval fails.
ToolInputError: If inputs are invalid (e.g., invalid retrieval_method).
"""
if not knowledge_base_name:
raise ToolInputError("Knowledge base name cannot be empty.")
if not query or not isinstance(query, str):
raise ToolInputError("Query must be a non-empty string.")
if retrieval_method not in ["vector", "hybrid"]: # Add more methods if supported by backend
raise ToolInputError(f"Invalid retrieval_method: '{retrieval_method}'. Must be one of: vector, hybrid.")
kb_retriever = _get_kb_retriever()
try:
# Note: The actual implementation might vary based on the retriever service
# Here we assume the service handles different methods via parameters or distinct functions
# Keeping the previous logic structure for now.
if retrieval_method == "hybrid":
# Assuming a specific hybrid method exists or the main retrieve handles it
# This might need adjustment based on the actual service implementation
logger.debug(f"Attempting hybrid retrieval for '{knowledge_base_name}'")
result = await kb_retriever.retrieve_hybrid( # Or potentially kb_retriever.retrieve with a method flag
knowledge_base_name=knowledge_base_name,
query=query,
top_k=top_k,
min_score=min_score,
metadata_filter=metadata_filter
)
else: # Default to vector
logger.debug(f"Attempting vector retrieval for '{knowledge_base_name}'")
result = await kb_retriever.retrieve(
knowledge_base_name=knowledge_base_name,
query=query,
top_k=top_k,
rerank=True, # Assuming rerank is often desired for vector
min_score=min_score,
metadata_filter=metadata_filter
)
return result
except Exception as e:
logger.error(f"Failed to retrieve context from knowledge base '{knowledge_base_name}' for query '{query}': {e}", exc_info=True)
if isinstance(e, (ResourceError, ToolInputError)):
raise
raise ResourceError(f"Failed to retrieve context from knowledge base '{knowledge_base_name}': {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e
@with_tool_metrics
@with_error_handling
async def generate_with_rag(
knowledge_base_name: str,
query: str,
provider: Optional[str] = None,
model: Optional[str] = None,
template: str = "rag_default",
max_tokens: int = 1000,
temperature: float = 0.3,
top_k: int = 5,
retrieval_method: str = "vector",
min_score: Optional[float] = None, # Changed default to None
include_sources: bool = True
) -> Dict[str, Any]:
"""Generates a response to a query using context retrieved from a knowledge base (RAG).
This function first retrieves relevant document chunks using `retrieve_context` parameters,
then feeds the query and the retrieved context into an LLM using a specified prompt template
to generate a final, context-aware answer.
Args:
knowledge_base_name: The name of the knowledge base to retrieve context from.
query: The user's query or question to be answered.
provider: (Optional) The LLM provider for the generation step (e.g., "openai", "anthropic").
If None, the RAG engine selects a default provider.
model: (Optional) The specific LLM model ID for generation (e.g., "openai/gpt-4.1-mini").
If None, the RAG engine selects a default model.
template: (Optional) The name of the prompt template to use for combining the query and context.
Available templates might include: "rag_default" (standard Q&A), "rag_with_sources"
(default, includes source attribution), "rag_summarize" (summarizes retrieved context
based on query), "rag_analysis" (performs analysis based on context).
Defaults to "rag_default" (or potentially "rag_with_sources" depending on engine default).
max_tokens: (Optional) Maximum number of tokens for the generated LLM response. Defaults to 1000.
temperature: (Optional) Sampling temperature for the LLM generation (0.0 to 1.0). Lower values
are more deterministic, higher values more creative. Defaults to 0.3.
top_k: (Optional) Maximum number of context chunks to retrieve (passed to retrieval). Defaults to 5.
retrieval_method: (Optional) Method for retrieving context ("vector", "hybrid"). Defaults to "vector".
min_score: (Optional) Minimum similarity score for retrieved chunks. Defaults to None.
include_sources: (Optional) Whether the final response object should explicitly include details
of the source chunks used for generation. Defaults to True.
Returns:
A dictionary containing the generated response and related information:
{
"success": true,
"query": "What are the project goals?",
"knowledge_base_name": "project_docs_v1",
"generated_response": "The main goal of Project X is to improve user engagement by implementing features A, B, and C.",
"sources": [ # Included if include_sources=True
{
"content": "The main goal of Project X is to improve user engagement...",
"score": 0.85,
"metadata": {"source": "project_plan.docx", "page": 1}
},
{ ... } # Other source chunks used
],
"model": "openai/gpt-4.1-mini", # Actual model used
"provider": "openai",
"tokens": { "input": ..., "output": ..., "total": ... }, # Generation tokens
"cost": 0.000120,
"processing_time": 5.2,
"retrieval_time": 0.8, # Time spent only on retrieval
"generation_time": 4.4 # Time spent only on generation
}
or an error dictionary:
{
"success": false,
"knowledge_base_name": "project_docs_v1",
"error": "RAG generation failed: Knowledge base 'project_docs_v1' not found."
}
Raises:
ResourceError: If the knowledge base doesn't exist or retrieval fails.
ProviderError: If the LLM provider fails during generation.
ToolInputError: If inputs are invalid.
"""
if not knowledge_base_name:
raise ToolInputError("Knowledge base name cannot be empty.")
if not query or not isinstance(query, str):
raise ToolInputError("Query must be a non-empty string.")
rag_engine = _get_rag_engine()
try:
result = await rag_engine.generate_with_rag(
knowledge_base_name=knowledge_base_name,
query=query,
provider=provider,
model=model,
template=template,
max_tokens=max_tokens,
temperature=temperature,
top_k=top_k,
retrieval_method=retrieval_method,
min_score=min_score,
include_sources=include_sources
)
return result
except Exception as e:
logger.error(f"RAG generation failed for query on '{knowledge_base_name}': {e}", exc_info=True)
if isinstance(e, (ResourceError, ProviderError, ToolInputError)):
raise
# Wrap generic errors
raise ResourceError(f"RAG generation failed: {str(e)}", resource_type="knowledge_base", resource_id=knowledge_base_name, cause=e) from e
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/prompts/templates.py:
--------------------------------------------------------------------------------
```python
"""Prompt template management and rendering for Ultimate MCP Server."""
import json
import re
from enum import Enum
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from jinja2 import Environment, FileSystemLoader, Template, select_autoescape
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.services.prompts.repository import get_prompt_repository
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
class TemplateFormat(str, Enum):
"""Template format options."""
JINJA = "jinja"
SIMPLE = "simple"
MARKDOWN = "markdown"
JSON = "json"
class TemplateType(str, Enum):
"""Template type options."""
COMPLETION = "completion"
CHAT = "chat"
SYSTEM = "system"
USER = "user"
FUNCTION = "function"
EXTRACTION = "extraction"
class PromptTemplate:
"""Template for generating prompts for LLM providers."""
def __init__(
self,
template: str,
template_id: str,
format: Union[str, TemplateFormat] = TemplateFormat.JINJA,
type: Union[str, TemplateType] = TemplateType.COMPLETION,
metadata: Optional[Dict[str, Any]] = None,
provider_defaults: Optional[Dict[str, Any]] = None,
description: Optional[str] = None,
required_vars: Optional[List[str]] = None,
example_vars: Optional[Dict[str, Any]] = None,
):
"""Initialize a prompt template.
Args:
template: Template string
template_id: Unique identifier for this template
format: Template format (jinja, simple, markdown, or json)
type: Template type (completion, chat, system, user, function, extraction)
metadata: Optional metadata for the template
provider_defaults: Optional provider-specific defaults
description: Optional description of the template
required_vars: Optional list of required variables
example_vars: Optional example variables for testing
"""
self.template = template
self.template_id = template_id
# Normalize format and type to enum values
self.format = TemplateFormat(format) if isinstance(format, str) else format
self.type = TemplateType(type) if isinstance(type, str) else type
# Store additional attributes
self.metadata = metadata or {}
self.provider_defaults = provider_defaults or {}
self.description = description
self.example_vars = example_vars or {}
# Extract required variables based on format
self.required_vars = required_vars or self._extract_required_vars()
# Compile template if using Jinja format
self._compiled_template: Optional[Template] = None
if self.format == TemplateFormat.JINJA:
self._compiled_template = self._compile_template()
def _extract_required_vars(self) -> List[str]:
"""Extract required variables from template based on format.
Returns:
List of required variable names
"""
if self.format == TemplateFormat.JINJA:
# Extract variables using regex for basic Jinja pattern
matches = re.findall(r'{{(.*?)}}', self.template)
vars_set: Set[str] = set()
for match in matches:
# Extract variable name (removing filters and whitespace)
var_name = match.split('|')[0].strip()
if var_name and not var_name.startswith('_'):
vars_set.add(var_name)
return sorted(list(vars_set))
elif self.format == TemplateFormat.SIMPLE:
# Extract variables from {variable} format
matches = re.findall(r'{([^{}]*)}', self.template)
return sorted(list(set(matches)))
elif self.format == TemplateFormat.JSON:
# Try to find JSON template variables
try:
# Parse as JSON to find potential variables
template_dict = json.loads(self.template)
return self._extract_json_vars(template_dict)
except json.JSONDecodeError:
logger.warning(
f"Failed to parse JSON template: {self.template_id}",
emoji_key="warning"
)
return []
# Default: no variables detected
return []
def _extract_json_vars(self, obj: Any, prefix: str = "") -> List[str]:
"""Recursively extract variables from a JSON object.
Args:
obj: JSON object to extract variables from
prefix: Prefix for nested variables
Returns:
List of variable names
"""
vars_list = []
if isinstance(obj, dict):
for key, value in obj.items():
if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
# This is a variable placeholder
var_name = value[2:-1] # Remove ${ and }
vars_list.append(f"{prefix}{var_name}")
elif isinstance(value, (dict, list)):
# Recursively extract from nested structures
nested_prefix = f"{prefix}{key}." if prefix else f"{key}."
vars_list.extend(self._extract_json_vars(value, nested_prefix))
elif isinstance(obj, list):
for _i, item in enumerate(obj):
if isinstance(item, (dict, list)):
vars_list.extend(self._extract_json_vars(item, prefix))
elif isinstance(item, str) and item.startswith("${") and item.endswith("}"):
var_name = item[2:-1]
vars_list.append(f"{prefix}{var_name}")
return sorted(list(set(vars_list)))
def _compile_template(self) -> Template:
"""Compile the Jinja template.
Returns:
Compiled Jinja template
Raises:
ValueError: If template compilation fails
"""
try:
env = Environment(autoescape=select_autoescape(['html', 'xml']))
return env.from_string(self.template)
except Exception as e:
logger.error(
f"Failed to compile template {self.template_id}: {str(e)}",
emoji_key="error"
)
raise ValueError(f"Invalid template format: {str(e)}") from e
def render(self, variables: Dict[str, Any]) -> str:
"""Render the template with the provided variables.
Args:
variables: Dictionary of variables to render with
Returns:
Rendered template string
Raises:
ValueError: If required variables are missing
"""
# Check for required variables
missing_vars = [var for var in self.required_vars if var not in variables]
if missing_vars:
raise ValueError(
f"Missing required variables for template {self.template_id}: {', '.join(missing_vars)}"
)
# Render based on format
if self.format == TemplateFormat.JINJA:
if not self._compiled_template:
self._compiled_template = self._compile_template()
return self._compiled_template.render(**variables)
elif self.format == TemplateFormat.SIMPLE:
# Simple variable substitution with {var} syntax
result = self.template
for var_name, var_value in variables.items():
result = result.replace(f"{{{var_name}}}", str(var_value))
return result
elif self.format == TemplateFormat.JSON:
try:
# Parse template as JSON
template_dict = json.loads(self.template)
# Replace variables in the JSON structure
rendered_dict = self._render_json_vars(template_dict, variables)
# Convert back to JSON string
return json.dumps(rendered_dict)
except json.JSONDecodeError:
logger.error(
f"Failed to parse JSON template: {self.template_id}",
emoji_key="error"
)
# Fall back to simple replacement
return self.template
elif self.format == TemplateFormat.MARKDOWN:
# Process markdown with simple variable substitution
result = self.template
for var_name, var_value in variables.items():
result = result.replace(f"{{{var_name}}}", str(var_value))
return result
# Default: return template as is
return self.template
def _render_json_vars(self, obj: Any, variables: Dict[str, Any]) -> Any:
"""Recursively render variables in a JSON object.
Args:
obj: JSON object to render variables in
variables: Dictionary of variables to render with
Returns:
Rendered JSON object
"""
if isinstance(obj, dict):
return {
key: self._render_json_vars(value, variables)
for key, value in obj.items()
}
elif isinstance(obj, list):
return [self._render_json_vars(item, variables) for item in obj]
elif isinstance(obj, str) and obj.startswith("${") and obj.endswith("}"):
# This is a variable placeholder
var_name = obj[2:-1] # Remove ${ and }
# Get the variable value, or keep placeholder if not found
return variables.get(var_name, obj)
else:
return obj
def validate_variables(self, variables: Dict[str, Any]) -> Tuple[bool, List[str]]:
"""Validate that all required variables are provided.
Args:
variables: Dictionary of variables to validate
Returns:
Tuple of (is_valid, missing_variables)
"""
missing_vars = [var for var in self.required_vars if var not in variables]
return len(missing_vars) == 0, missing_vars
def to_dict(self) -> Dict[str, Any]:
"""Convert template to dictionary representation.
Returns:
Dictionary representation of template
"""
return {
"template_id": self.template_id,
"template": self.template,
"format": self.format.value,
"type": self.type.value,
"metadata": self.metadata,
"provider_defaults": self.provider_defaults,
"description": self.description,
"required_vars": self.required_vars,
"example_vars": self.example_vars,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "PromptTemplate":
"""Create a template from dictionary representation.
Args:
data: Dictionary representation of template
Returns:
PromptTemplate instance
"""
return cls(
template=data["template"],
template_id=data["template_id"],
format=data.get("format", TemplateFormat.JINJA),
type=data.get("type", TemplateType.COMPLETION),
metadata=data.get("metadata"),
provider_defaults=data.get("provider_defaults"),
description=data.get("description"),
required_vars=data.get("required_vars"),
example_vars=data.get("example_vars"),
)
def get_provider_defaults(self, provider: str) -> Dict[str, Any]:
"""Get provider-specific default parameters.
Args:
provider: Provider name
Returns:
Dictionary of default parameters
"""
return self.provider_defaults.get(provider, {})
class PromptTemplateRenderer:
"""Service for rendering prompt templates."""
def __init__(self, template_dir: Optional[Union[str, Path]] = None):
"""Initialize the prompt template renderer.
Args:
template_dir: Optional directory containing template files
"""
# Set template directory
if template_dir:
self.template_dir = Path(template_dir)
else:
# Default to project directory / templates
self.template_dir = Path.home() / ".ultimate" / "templates"
# Create directory if it doesn't exist
self.template_dir.mkdir(parents=True, exist_ok=True)
# Set up Jinja environment for file-based templates
self.jinja_env = Environment(
loader=FileSystemLoader(str(self.template_dir)),
autoescape=select_autoescape(['html', 'xml']),
trim_blocks=True,
lstrip_blocks=True,
)
# Get prompt repository for template storage
self.repository = get_prompt_repository()
# Template cache
self._template_cache: Dict[str, PromptTemplate] = {}
async def get_template(self, template_id: str) -> Optional[PromptTemplate]:
"""Get a template by ID.
Args:
template_id: Template identifier
Returns:
PromptTemplate instance or None if not found
"""
# Check cache first
if template_id in self._template_cache:
return self._template_cache[template_id]
# Look up in repository
template_data = await self.repository.get_prompt(template_id)
if template_data:
template = PromptTemplate.from_dict(template_data)
# Cache for future use
self._template_cache[template_id] = template
return template
# Try to load from file if not in repository
template_path = self.template_dir / f"{template_id}.j2"
if template_path.exists():
# Load template from file
with open(template_path, "r", encoding="utf-8") as f:
template_content = f.read()
# Create template instance
template = PromptTemplate(
template=template_content,
template_id=template_id,
format=TemplateFormat.JINJA,
)
# Cache for future use
self._template_cache[template_id] = template
return template
return None
async def render_template(
self,
template_id: str,
variables: Dict[str, Any],
provider: Optional[str] = None
) -> str:
"""Render a template with variables.
Args:
template_id: Template identifier
variables: Variables to render the template with
provider: Optional provider name for provider-specific adjustments
Returns:
Rendered template string
Raises:
ValueError: If template not found or rendering fails
"""
# Get the template
template = await self.get_template(template_id)
if not template:
raise ValueError(f"Template not found: {template_id}")
# Check if all required variables are provided
is_valid, missing_vars = template.validate_variables(variables)
if not is_valid:
raise ValueError(
f"Missing required variables for template {template_id}: {', '.join(missing_vars)}"
)
# Render the template
rendered = template.render(variables)
# Apply provider-specific adjustments if provided
if provider:
rendered = self._apply_provider_adjustments(rendered, provider, template)
return rendered
def _apply_provider_adjustments(
self,
rendered: str,
provider: str,
template: PromptTemplate
) -> str:
"""Apply provider-specific adjustments to rendered template.
Args:
rendered: Rendered template string
provider: Provider name
template: Template being rendered
Returns:
Adjusted template string
"""
# Apply provider-specific transformations
if provider == Provider.ANTHROPIC.value:
# Anthropic-specific adjustments
if template.type == TemplateType.SYSTEM:
# Ensure no trailing newlines for system prompts
rendered = rendered.rstrip()
elif provider == Provider.OPENAI.value:
# OpenAI-specific adjustments
pass
elif provider == Provider.GEMINI.value:
# Gemini-specific adjustments
pass
return rendered
async def save_template(self, template: PromptTemplate) -> bool:
"""Save a template to the repository.
Args:
template: Template to save
Returns:
True if successful
"""
# Update cache
self._template_cache[template.template_id] = template
# Save to repository
return await self.repository.save_prompt(
prompt_id=template.template_id,
prompt_data=template.to_dict()
)
async def delete_template(self, template_id: str) -> bool:
"""Delete a template from the repository.
Args:
template_id: Template identifier
Returns:
True if successful
"""
# Remove from cache
if template_id in self._template_cache:
del self._template_cache[template_id]
# Delete from repository
return await self.repository.delete_prompt(template_id)
async def list_templates(self) -> List[str]:
"""List available templates.
Returns:
List of template IDs
"""
# Get templates from repository
return await self.repository.list_prompts()
def clear_cache(self) -> None:
"""Clear the template cache."""
self._template_cache.clear()
# Global template renderer instance
_template_renderer: Optional[PromptTemplateRenderer] = None
def get_template_renderer() -> PromptTemplateRenderer:
"""Get the global template renderer instance.
Returns:
PromptTemplateRenderer instance
"""
global _template_renderer
if _template_renderer is None:
_template_renderer = PromptTemplateRenderer()
return _template_renderer
@lru_cache(maxsize=32)
def get_template_path(template_id: str) -> Optional[Path]:
"""Get the path to a template file.
Args:
template_id: Template identifier
Returns:
Path to template file or None if not found
"""
# Try standard locations
template_dirs = [
# First check the user's template directory
Path.home() / ".ultimate" / "templates",
# Then check the package's template directory
Path(__file__).parent.parent.parent.parent / "templates",
]
for template_dir in template_dirs:
# Check for .j2 extension first
template_path = template_dir / f"{template_id}.j2"
if template_path.exists():
return template_path
# Check for .tpl extension
template_path = template_dir / f"{template_id}.tpl"
if template_path.exists():
return template_path
# Check for .md extension
template_path = template_dir / f"{template_id}.md"
if template_path.exists():
return template_path
# Check for .json extension
template_path = template_dir / f"{template_id}.json"
if template_path.exists():
return template_path
return None
async def render_prompt_template(
template_id: str,
variables: Dict[str, Any],
provider: Optional[str] = None
) -> str:
"""Render a prompt template.
Args:
template_id: Template identifier
variables: Variables to render the template with
provider: Optional provider name for provider-specific adjustments
Returns:
Rendered template string
Raises:
ValueError: If template not found or rendering fails
"""
renderer = get_template_renderer()
return await renderer.render_template(template_id, variables, provider)
async def render_prompt(
template_content: str,
variables: Dict[str, Any],
format: Union[str, TemplateFormat] = TemplateFormat.JINJA,
) -> str:
"""Render a prompt from template content.
Args:
template_content: Template content string
variables: Variables to render the template with
format: Template format
Returns:
Rendered template string
Raises:
ValueError: If rendering fails
"""
# Create a temporary template
template = PromptTemplate(
template=template_content,
template_id="_temp_template",
format=format,
)
# Render and return
return template.render(variables)
async def get_template_defaults(
template_id: str,
provider: str
) -> Dict[str, Any]:
"""Get provider-specific default parameters for a template.
Args:
template_id: Template identifier
provider: Provider name
Returns:
Dictionary of default parameters
Raises:
ValueError: If template not found
"""
renderer = get_template_renderer()
template = await renderer.get_template(template_id)
if not template:
raise ValueError(f"Template not found: {template_id}")
return template.get_provider_defaults(provider)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/providers/openrouter.py:
--------------------------------------------------------------------------------
```python
# ultimate/core/providers/openrouter.py
"""OpenRouter provider implementation."""
import os
import time
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union
from openai import AsyncOpenAI
from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.constants import DEFAULT_MODELS, Provider
from ultimate_mcp_server.core.providers.base import BaseProvider, ModelResponse
from ultimate_mcp_server.utils import get_logger
# Use the same naming scheme everywhere: logger at module level
logger = get_logger("ultimate_mcp_server.providers.openrouter")
# Default OpenRouter Base URL (can be overridden by config)
DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
class OpenRouterProvider(BaseProvider):
"""Provider implementation for OpenRouter API (using OpenAI-compatible interface)."""
provider_name = Provider.OPENROUTER.value
def __init__(self, **kwargs):
"""Initialize the OpenRouter provider.
Args:
**kwargs: Additional options:
- base_url (str): Override the default OpenRouter API base URL.
- http_referer (str): Optional HTTP-Referer header.
- x_title (str): Optional X-Title header.
"""
config = get_config().providers.openrouter
super().__init__(**kwargs)
self.name = "openrouter"
# Use config default first, then fallback to constants
self.default_model = config.default_model or DEFAULT_MODELS.get(Provider.OPENROUTER)
if not config.default_model:
logger.debug(f"No default model set in config for OpenRouter, using fallback from constants: {self.default_model}")
# Get base_url from config, fallback to kwargs, then constant
self.base_url = config.base_url or kwargs.get("base_url", DEFAULT_OPENROUTER_BASE_URL)
# Get additional headers from config's additional_params
self.http_referer = config.additional_params.get("http_referer") or kwargs.get("http_referer")
self.x_title = config.additional_params.get("x_title") or kwargs.get("x_title")
# We'll create the client in initialize() instead
self.client = None
self.available_models = []
async def initialize(self) -> bool:
"""Initialize the OpenRouter client.
Returns:
bool: True if initialization was successful
"""
try:
# Create headers dictionary
headers = {}
if self.http_referer:
headers["HTTP-Referer"] = self.http_referer
if self.x_title:
headers["X-Title"] = self.x_title
# Get timeout from config
config = get_config().providers.openrouter
timeout = config.timeout or 30.0 # Default timeout 30s
# Check if API key is available
if not self.api_key:
logger.warning(f"{self.name} API key not found in configuration. Provider will be unavailable.")
return False
# Create the client
self.client = AsyncOpenAI(
base_url=self.base_url,
api_key=self.api_key,
default_headers=headers,
timeout=timeout
)
# Pre-fetch available models
try:
self.available_models = await self.list_models()
logger.info(f"Loaded {len(self.available_models)} models from OpenRouter")
except Exception as model_err:
logger.warning(f"Failed to fetch models from OpenRouter: {str(model_err)}")
# Use hardcoded fallback models
self.available_models = self._get_fallback_models()
logger.success(
"OpenRouter provider initialized successfully",
emoji_key="provider"
)
return True
except Exception as e:
logger.error(
f"Failed to initialize OpenRouter provider: {str(e)}",
emoji_key="error"
)
return False
def _initialize_client(self, **kwargs):
"""Initialize the OpenAI async client with OpenRouter specifics."""
# This method is now deprecated - use initialize() instead
logger.warning("_initialize_client() is deprecated, use initialize() instead")
return False
async def generate_completion(
self,
prompt: str,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
**kwargs
) -> ModelResponse:
"""Generate a completion using OpenRouter.
Args:
prompt: Text prompt to send to the model
model: Model name (e.g., "openai/gpt-4.1-mini", "google/gemini-flash-1.5")
max_tokens: Maximum tokens to generate
temperature: Temperature parameter (0.0-1.0)
**kwargs: Additional model-specific parameters, including:
- extra_headers (Dict): Additional headers for this specific call.
- extra_body (Dict): OpenRouter-specific arguments.
Returns:
ModelResponse with completion result
Raises:
Exception: If API call fails
"""
if not self.client:
initialized = await self._initialize_client()
if not initialized:
raise RuntimeError(f"{self.provider_name} provider not initialized.")
# Use default model if not specified
model = model or self.default_model
# Ensure we have a model name before proceeding
if model is None:
logger.error("Completion failed: No model specified and no default model configured for OpenRouter.")
raise ValueError("No model specified and no default model configured for OpenRouter.")
# Strip provider prefix only if it matches OUR provider name
if model.startswith(f"{self.provider_name}:"):
model = model.split(":", 1)[1]
logger.debug(f"Stripped provider prefix from model name: {model}")
# Note: Keep prefixes like 'openai/' or 'google/' as OpenRouter uses them.
# DO NOT strip other provider prefixes as they're needed for OpenRouter routing
# Create messages
messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
# Prepare API call parameters
params = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens is not None:
params["max_tokens"] = max_tokens
# Extract OpenRouter specific args from kwargs
extra_headers = kwargs.pop("extra_headers", {})
extra_body = kwargs.pop("extra_body", {})
json_mode = kwargs.pop("json_mode", False)
if json_mode:
# OpenRouter uses OpenAI-compatible API
params["response_format"] = {"type": "json_object"}
self.logger.debug("Setting response_format to JSON mode for OpenRouter")
# Add any remaining kwargs to the main params (standard OpenAI args)
params.update(kwargs)
self.logger.info(
f"Generating completion with {self.provider_name} model {model}",
emoji_key=self.provider_name,
prompt_length=len(prompt),
json_mode_requested=json_mode
)
try:
# Make API call with timing
response, processing_time = await self.process_with_timer(
self.client.chat.completions.create, **params, extra_headers=extra_headers, extra_body=extra_body
)
# Extract response text
completion_text = response.choices[0].message.content
# Create standardized response
result = ModelResponse(
text=completion_text,
model=response.model, # Use model returned by API
provider=self.provider_name,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens,
processing_time=processing_time,
raw_response=response,
)
self.logger.success(
f"{self.provider_name} completion successful",
emoji_key="success",
model=result.model,
tokens={
"input": result.input_tokens,
"output": result.output_tokens
},
cost=result.cost, # Will be calculated by ModelResponse
time=result.processing_time
)
return result
except Exception as e:
self.logger.error(
f"{self.provider_name} completion failed for model {model}: {str(e)}",
emoji_key="error",
model=model
)
raise
async def generate_completion_stream(
self,
prompt: str,
model: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.7,
**kwargs
) -> AsyncGenerator[Tuple[str, Dict[str, Any]], None]:
"""Generate a streaming completion using OpenRouter.
Args:
prompt: Text prompt to send to the model
model: Model name (e.g., "openai/gpt-4.1-mini")
max_tokens: Maximum tokens to generate
temperature: Temperature parameter (0.0-1.0)
**kwargs: Additional model-specific parameters, including:
- extra_headers (Dict): Additional headers for this specific call.
- extra_body (Dict): OpenRouter-specific arguments.
Yields:
Tuple of (text_chunk, metadata)
Raises:
Exception: If API call fails
"""
if not self.client:
initialized = await self._initialize_client()
if not initialized:
raise RuntimeError(f"{self.provider_name} provider not initialized.")
model = model or self.default_model
if model.startswith(f"{self.provider_name}:"):
model = model.split(":", 1)[1]
# DO NOT strip other provider prefixes as they're needed for OpenRouter routing
messages = kwargs.pop("messages", None) or [{"role": "user", "content": prompt}]
params = {
"model": model,
"messages": messages,
"temperature": temperature,
"stream": True,
}
if max_tokens is not None:
params["max_tokens"] = max_tokens
extra_headers = kwargs.pop("extra_headers", {})
extra_body = kwargs.pop("extra_body", {})
json_mode = kwargs.pop("json_mode", False)
if json_mode:
# OpenRouter uses OpenAI-compatible API
params["response_format"] = {"type": "json_object"}
self.logger.debug("Setting response_format to JSON mode for OpenRouter streaming")
params.update(kwargs)
self.logger.info(
f"Generating streaming completion with {self.provider_name} model {model}",
emoji_key=self.provider_name,
prompt_length=len(prompt),
json_mode_requested=json_mode
)
start_time = time.time()
total_chunks = 0
final_model_name = model # Store initially requested model
try:
stream = await self.client.chat.completions.create(**params, extra_headers=extra_headers, extra_body=extra_body)
async for chunk in stream:
total_chunks += 1
delta = chunk.choices[0].delta
content = delta.content or ""
# Try to get model name from the chunk if available (some providers include it)
if chunk.model:
final_model_name = chunk.model
metadata = {
"model": final_model_name,
"provider": self.provider_name,
"chunk_index": total_chunks,
"finish_reason": chunk.choices[0].finish_reason,
}
yield content, metadata
processing_time = time.time() - start_time
self.logger.success(
f"{self.provider_name} streaming completion successful",
emoji_key="success",
model=final_model_name,
chunks=total_chunks,
time=processing_time
)
except Exception as e:
self.logger.error(
f"{self.provider_name} streaming completion failed for model {model}: {str(e)}",
emoji_key="error",
model=model
)
raise
async def list_models(self) -> List[Dict[str, Any]]:
"""List available OpenRouter models (provides examples, not exhaustive).
OpenRouter offers a vast number of models. This list provides common examples.
Refer to OpenRouter documentation for the full list.
Returns:
List of example model information dictionaries
"""
if self.available_models:
return self.available_models
models = self._get_fallback_models()
return models
def get_default_model(self) -> str:
"""Get the default OpenRouter model.
Returns:
Default model name (e.g., "openai/gpt-4.1-mini")
"""
# Allow override via environment variable
default_model_env = os.environ.get("OPENROUTER_DEFAULT_MODEL")
if default_model_env:
return default_model_env
# Fallback to constants
return DEFAULT_MODELS.get(self.provider_name, "openai/gpt-4.1-mini")
async def check_api_key(self) -> bool:
"""Check if the OpenRouter API key is valid by attempting a small request."""
if not self.client:
# Try to initialize if not already done
if not await self._initialize_client():
return False # Initialization failed
try:
# Attempt a simple, low-cost operation, e.g., list models (even if it returns 404/permission error, it validates the key/URL)
# Or use a very small completion request
await self.client.chat.completions.create(
model=self.get_default_model(),
messages=[{"role": "user", "content": "test"}],
max_tokens=1,
temperature=0
)
return True
except Exception as e:
logger.warning(f"API key check failed for {self.provider_name}: {str(e)}", emoji_key="warning")
return False
def get_available_models(self) -> List[str]:
"""Return a list of available model names."""
return [model["id"] for model in self.available_models]
def is_model_available(self, model_name: str) -> bool:
"""Check if a specific model is available."""
available_model_ids = [model["id"] for model in self.available_models]
return model_name in available_model_ids
async def create_completion(self, model: str, messages: List[Dict[str, str]], stream: bool = False, **kwargs) -> Union[str, AsyncGenerator[str, None]]:
"""Create a completion using the specified model."""
if not self.client:
raise RuntimeError("OpenRouter client not initialized (likely missing API key).")
# Check if model is available
if not self.is_model_available(model):
# Fallback to default if provided model isn't listed? Or raise error?
# Let's try the default model if the requested one isn't confirmed available.
if self.default_model and self.is_model_available(self.default_model):
logger.warning(f"Model '{model}' not found in available list. Falling back to default '{self.default_model}'.")
model = self.default_model
else:
# If even the default isn't available or set, raise error
raise ValueError(f"Model '{model}' is not available via OpenRouter according to fetched list, and no valid default model is set.")
merged_kwargs = {**kwargs}
# OpenRouter uses standard OpenAI params like max_tokens, temperature, etc.
# Ensure essential params are passed
if 'max_tokens' not in merged_kwargs:
merged_kwargs['max_tokens'] = get_config().providers.openrouter.max_tokens or 1024 # Use config or default
if stream:
logger.debug(f"Creating stream completion: Model={model}, Params={merged_kwargs}")
return self._stream_completion_generator(model, messages, **merged_kwargs)
else:
logger.debug(f"Creating completion: Model={model}, Params={merged_kwargs}")
try:
response = await self.client.chat.completions.create(
model=model,
messages=messages,
stream=False,
**merged_kwargs
)
# Extract content based on OpenAI library version
if hasattr(response, 'choices') and response.choices:
choice = response.choices[0]
if hasattr(choice, 'message') and hasattr(choice.message, 'content'):
return choice.message.content or "" # Return empty string if content is None
elif hasattr(choice, 'delta') and hasattr(choice.delta, 'content'): # Should not happen for stream=False but check
return choice.delta.content or ""
logger.warning("Could not extract content from OpenRouter response.")
return "" # Return empty string if no content found
except Exception as e:
logger.error(f"OpenRouter completion failed: {e}", exc_info=True)
raise RuntimeError(f"OpenRouter API call failed: {e}") from e
async def _stream_completion_generator(self, model: str, messages: List[Dict[str, str]], **kwargs) -> AsyncGenerator[str, None]:
"""Async generator for streaming completions."""
if not self.client:
raise RuntimeError("OpenRouter client not initialized (likely missing API key).")
try:
stream = await self.client.chat.completions.create(
model=model,
messages=messages,
stream=True,
**kwargs
)
async for chunk in stream:
# Extract content based on OpenAI library version
content = ""
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
if hasattr(choice, 'delta') and hasattr(choice.delta, 'content'):
content = choice.delta.content
elif hasattr(choice, 'message') and hasattr(choice.message, 'content'): # Should not happen for stream=True
content = choice.message.content
if content:
yield content
except Exception as e:
logger.error(f"OpenRouter stream completion failed: {e}", exc_info=True)
# Depending on desired behavior, either raise or yield an error message
# yield f"Error during stream: {e}"
raise RuntimeError(f"OpenRouter API stream failed: {e}") from e
# --- Cost Calculation (Needs OpenRouter Specific Data) ---
def get_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> Optional[float]:
"""Calculate the cost of a request based on OpenRouter pricing.
Note: Requires loading detailed model pricing info, which is not
done by default in fetch_available_models.
This is a placeholder and needs enhancement.
"""
# Placeholder: Need to fetch and store detailed pricing from OpenRouter
# Example structure (needs actual data):
openrouter_pricing = {
# "model_id": {"prompt_cost_per_mtok": X, "completion_cost_per_mtok": Y},
"openai/gpt-4o": {"prompt_cost_per_mtok": 5.0, "completion_cost_per_mtok": 15.0},
"google/gemini-pro-1.5": {"prompt_cost_per_mtok": 3.5, "completion_cost_per_mtok": 10.5},
"anthropic/claude-3-opus": {"prompt_cost_per_mtok": 15.0, "completion_cost_per_mtok": 75.0},
# ... add more model costs from openrouter.ai/docs#models ...
}
model_cost = openrouter_pricing.get(model)
if model_cost:
prompt_cost = (prompt_tokens / 1_000_000) * model_cost.get("prompt_cost_per_mtok", 0)
completion_cost = (completion_tokens / 1_000_000) * model_cost.get("completion_cost_per_mtok", 0)
return prompt_cost + completion_cost
else:
logger.warning(f"Cost calculation not available for OpenRouter model: {model}")
# Return None if cost cannot be calculated
return None
# --- Prompt Formatting --- #
def format_prompt(self, messages: List[Dict[str, str]]) -> Any:
"""Use standard list of dictionaries format for OpenRouter (like OpenAI)."""
# OpenRouter generally uses the same format as OpenAI
return messages
def _get_fallback_models(self) -> List[Dict[str, Any]]:
"""Return a list of fallback models when API is not accessible."""
return [
{
"id": "mistralai/mistral-large",
"provider": self.provider_name,
"description": "Mistral: Strong open-weight model.",
},
{
"id": "mistralai/mistral-nemo",
"provider": self.provider_name,
"description": "Mistral: Strong open-weight model.",
},
{
"id": "meta-llama/llama-3-70b-instruct",
"provider": self.provider_name,
"description": "Meta: Powerful open-source instruction-tuned model.",
},
]
# Make available via discovery
__all__ = ["OpenRouterProvider"]
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/parsing.py:
--------------------------------------------------------------------------------
```python
"""Parsing utilities for Ultimate MCP Server.
This module provides utility functions for parsing and processing
results from Ultimate MCP Server operations that were previously defined in
example scripts but are now part of the library.
"""
import json
import re
from typing import Any, Dict
from ultimate_mcp_server.utils import get_logger
# Initialize logger
logger = get_logger("ultimate_mcp_server.utils.parsing")
def extract_json_from_markdown(text: str) -> str:
"""Extracts a JSON string embedded within markdown code fences.
Handles various markdown code block formats and edge cases:
- Complete code blocks: ```json ... ``` or ``` ... ```
- Alternative fence styles: ~~~json ... ~~~
- Incomplete/truncated blocks with only opening fence
- Multiple code blocks (chooses the first valid JSON)
- Extensive JSON repair for common LLM output issues:
- Unterminated strings
- Trailing commas
- Missing closing brackets
- Unquoted keys
- Truncated content
If no valid JSON-like content is found in fences, returns the original string.
Args:
text: The input string possibly containing markdown-fenced JSON.
Returns:
The extracted JSON string or the stripped original string.
"""
if not text:
return ""
cleaned_text = text.strip()
possible_json_candidates = []
# Try to find JSON inside complete code blocks with various fence styles
# Look for backtick fences (most common)
backtick_matches = re.finditer(r"```(?:json)?\s*(.*?)\s*```", cleaned_text, re.DOTALL | re.IGNORECASE)
for match in backtick_matches:
possible_json_candidates.append(match.group(1).strip())
# Look for tilde fences (less common but valid in some markdown)
tilde_matches = re.finditer(r"~~~(?:json)?\s*(.*?)\s*~~~", cleaned_text, re.DOTALL | re.IGNORECASE)
for match in tilde_matches:
possible_json_candidates.append(match.group(1).strip())
# If no complete blocks found, check for blocks with only opening fence
if not possible_json_candidates:
# Try backtick opening fence
backtick_start = re.search(r"```(?:json)?\s*", cleaned_text, re.IGNORECASE)
if backtick_start:
content_after_fence = cleaned_text[backtick_start.end():].strip()
possible_json_candidates.append(content_after_fence)
# Try tilde opening fence
tilde_start = re.search(r"~~~(?:json)?\s*", cleaned_text, re.IGNORECASE)
if tilde_start:
content_after_fence = cleaned_text[tilde_start.end():].strip()
possible_json_candidates.append(content_after_fence)
# If still no candidates, add the original text as last resort
if not possible_json_candidates:
possible_json_candidates.append(cleaned_text)
# Try each candidate, returning the first one that looks like valid JSON
for candidate in possible_json_candidates:
# Apply advanced JSON repair
repaired = _repair_json(candidate)
try:
# Validate if it's actually parseable JSON
json.loads(repaired)
return repaired # Return the first valid JSON
except json.JSONDecodeError:
# If repair didn't work, continue to the next candidate
continue
# If no candidate worked with regular repair, try more aggressive repair on the first candidate
if possible_json_candidates:
aggressive_repair = _repair_json(possible_json_candidates[0], aggressive=True)
try:
json.loads(aggressive_repair)
return aggressive_repair
except json.JSONDecodeError:
# Return the best we can - the first candidate with basic cleaning
# This will still fail in json.loads, but at least we tried
return possible_json_candidates[0]
# Absolute fallback - return the original text
return cleaned_text
def _repair_json(text: str, aggressive=False) -> str:
"""
Repair common JSON formatting issues in LLM-generated output.
This internal utility function applies a series of transformations to fix common
JSON formatting problems that frequently occur in LLM outputs. It can operate in
two modes: standard and aggressive.
In standard mode (aggressive=False), it applies basic repairs like:
- Removing trailing commas before closing brackets/braces
- Ensuring property names are properly quoted
- Basic structure validation
In aggressive mode (aggressive=True), it applies more extensive repairs:
- Fixing unterminated string literals by adding missing quotes
- Balancing unmatched brackets and braces
- Adding missing values for dangling properties
- Handling truncated JSON at the end of strings
- Attempting to recover partial JSON structures
The aggressive repairs are particularly useful when dealing with outputs from
models that have been truncated mid-generation or contain structural errors
that would normally make the JSON unparseable.
Args:
text: The JSON-like string to repair, potentially containing formatting errors
aggressive: Whether to apply more extensive repair techniques beyond basic
formatting fixes. Default is False (basic repairs only).
Returns:
A repaired JSON string that is more likely to be parseable. Note that even
with aggressive repairs, the function cannot guarantee valid JSON for
severely corrupted inputs.
Note:
This function is intended for internal use by extract_json_from_markdown.
While it attempts to fix common issues, it may not address all possible
JSON formatting problems, especially in severely malformed inputs.
"""
if not text:
return text
# Step 1: Basic cleanup
result = text.strip()
# Quick check if it even remotely looks like JSON
if not (result.startswith('{') or result.startswith('[')):
return result
# Step 2: Fix common issues
# Fix trailing commas before closing brackets
result = re.sub(r',\s*([\}\]])', r'\1', result)
# Ensure property names are quoted
result = re.sub(r'([{,]\s*)([a-zA-Z0-9_$]+)(\s*:)', r'\1"\2"\3', result)
# If we're not in aggressive mode, return after basic fixes
if not aggressive:
return result
# Step 3: Aggressive repairs for truncated/malformed JSON
# Track opening/closing brackets to detect imbalance
open_braces = result.count('{')
close_braces = result.count('}')
open_brackets = result.count('[')
close_brackets = result.count(']')
# Count quotes to check if we have an odd number (indicating unterminated strings)
quote_count = result.count('"')
if quote_count % 2 != 0:
# We have an odd number of quotes, meaning at least one string is unterminated
# This is a much more aggressive approach to fix strings
# First, try to find all strings that are properly terminated
proper_strings = []
pos = 0
in_string = False
string_start = 0
# This helps track properly formed strings and identify problematic ones
while pos < len(result):
if result[pos] == '"' and (pos == 0 or result[pos-1] != '\\'):
if not in_string:
# Start of a string
in_string = True
string_start = pos
else:
# End of a string
in_string = False
proper_strings.append((string_start, pos))
pos += 1
# If we're still in a string at the end, we found an unterminated string
if in_string:
# Force terminate it at the end
result += '"'
# Even more aggressive string fixing
# This regexp looks for a quote followed by any characters not containing a quote
# followed by a comma, closing brace, or bracket, without a quote in between
# This indicates an unterminated string
result = re.sub(r'"([^"]*?)(?=,|\s*[\]}]|$)', r'"\1"', result)
# Fix cases where value might be truncated mid-word just before closing quote
# If we find something that looks like it's in the middle of a string, terminate it
result = re.sub(r'"([^"]+)(\s*[\]}]|,|$)', lambda m:
f'"{m.group(1)}"{"" if m.group(2).startswith(",") or m.group(2) in "]}," else m.group(2)}',
result)
# Fix dangling quotes at the end of the string - these usually indicate a truncated string
if result.rstrip().endswith('"'):
# Add closing quote and appropriate structure depending on context
result = result.rstrip() + '"'
# Look at the previous few characters to determine if we need a comma or not
context = result[-20:] if len(result) > 20 else result
# If string ends with x": " it's likely a property name
if re.search(r'"\s*:\s*"$', context):
# Add a placeholder value and closing structure for the property
result += "unknown"
# Check for dangling property (property name with colon but no value)
result = re.sub(r'"([^"]+)"\s*:(?!\s*["{[\w-])', r'"\1": null', result)
# Add missing closing brackets/braces if needed
if open_braces > close_braces:
result += '}' * (open_braces - close_braces)
if open_brackets > close_brackets:
result += ']' * (open_brackets - close_brackets)
# Handle truncated JSON structure - look for incomplete objects at the end
# This is complex, but we'll try some common patterns
# If JSON ends with a property name and colon but no value
if re.search(r'"[^"]+"\s*:\s*$', result):
result += 'null'
# If JSON ends with a comma, it needs another value - add a null
if re.search(r',\s*$', result):
result += 'null'
# If the JSON structure is fundamentally corrupted at the end (common in truncation)
# Close any unclosed objects or arrays
if not (result.endswith('}') or result.endswith(']') or result.endswith('"')):
# Count unmatched opening brackets
stack = []
for char in result:
if char in '{[':
stack.append(char)
elif char in '}]':
if stack and ((stack[-1] == '{' and char == '}') or (stack[-1] == '[' and char == ']')):
stack.pop()
# Close any unclosed structures
for bracket in reversed(stack):
if bracket == '{':
result += '}'
elif bracket == '[':
result += ']'
# As a final safeguard, try to eval the JSON with a permissive parser
# This won't fix deep structural issues but catches cases our regexes missed
try:
import simplejson
simplejson.loads(result, parse_constant=lambda x: x)
except (ImportError, simplejson.JSONDecodeError):
try:
# Try one last time with the more permissive custom JSON parser
_scan_once = json.scanner.py_make_scanner(json.JSONDecoder())
try:
_scan_once(result, 0)
except StopIteration:
# Likely unterminated JSON - do one final pass of common fixups
# Check for unterminated strings of various forms one more time
if re.search(r'(?<!")"(?:[^"\\]|\\.)*[^"\\](?!")(?=,|\s*[\]}]|$)', result):
# Even more aggressive fixes, replacing with generic values
result = re.sub(r'(?<!")"(?:[^"\\]|\\.)*[^"\\](?!")(?=,|\s*[\]}]|$)',
r'"invalid_string"', result)
# Ensure valid JSON-like structure
if not (result.endswith('}') or result.endswith(']')):
if result.count('{') > result.count('}'):
result += '}'
if result.count('[') > result.count(']'):
result += ']'
except Exception:
# Something else is wrong, but we've tried our best
pass
except Exception:
# We've done all we reasonably can
pass
return result
async def parse_result(result: Any) -> Dict[str, Any]:
"""Parse the result from a tool call into a usable dictionary.
Handles various return types from MCP tools, including TextContent objects,
list results, and direct dictionaries. Attempts to extract JSON from
markdown code fences if present.
Args:
result: Result from an MCP tool call or provider operation
Returns:
Parsed dictionary containing the result data
"""
try:
text_to_parse = None
# Handle TextContent object (which has a .text attribute)
if hasattr(result, 'text'):
text_to_parse = result.text
# Handle list result
elif isinstance(result, list):
if result:
first_item = result[0]
if hasattr(first_item, 'text'):
text_to_parse = first_item.text
elif isinstance(first_item, dict):
# NEW: Check if it's an MCP-style text content dict
if first_item.get("type") == "text" and "text" in first_item:
text_to_parse = first_item["text"]
else:
# It's some other dictionary, return it directly
return first_item
elif isinstance(first_item, str):
text_to_parse = first_item
else:
logger.warning(f"List item type not directly parseable: {type(first_item)}")
return {"error": f"List item type not directly parseable: {type(first_item)}", "original_result_type": str(type(result))}
else: # Empty list
return {} # Or perhaps an error/warning? For now, empty dict.
# Handle dictionary directly
elif isinstance(result, dict):
return result
# Handle string directly
elif isinstance(result, str):
text_to_parse = result
# If text_to_parse is still None or is empty/whitespace after potential assignments
if text_to_parse is None or not text_to_parse.strip():
logger.warning(f"No parsable text content found in result (type: {type(result)}, content preview: \'{str(text_to_parse)[:100]}...\').")
return {"error": "No parsable text content found in result", "result_type": str(type(result)), "content_preview": str(text_to_parse)[:100] if text_to_parse else None}
# At this point, text_to_parse should be a non-empty string.
# Attempt to extract JSON from markdown (if any)
# If no markdown, or extraction fails, json_to_parse will be text_to_parse itself.
json_to_parse = extract_json_from_markdown(text_to_parse)
if not json_to_parse.strip(): # If extraction resulted in an empty string (e.g. from "``` ```")
logger.warning(f"JSON extraction from text_to_parse yielded an empty string. Original text_to_parse: \'{text_to_parse[:200]}...\'")
# Fallback to trying the original text_to_parse if extraction gave nothing useful
# This covers cases where text_to_parse might be pure JSON without fences.
if text_to_parse.strip(): # Ensure original text_to_parse wasn't also empty
json_to_parse = text_to_parse
else: # Should have been caught by the earlier check, but as a safeguard:
return {"error": "Content became empty after attempting JSON extraction", "original_text_to_parse": text_to_parse}
# Now, json_to_parse should be the best candidate string for JSON parsing.
# Only attempt to parse if it's not empty/whitespace.
if not json_to_parse.strip():
logger.warning(f"Final string to parse is empty. Original text_to_parse: \'{text_to_parse[:200]}...\'")
return {"error": "Final string for JSON parsing is empty", "original_text_to_parse": text_to_parse}
try:
return json.loads(json_to_parse)
except json.JSONDecodeError as e:
problematic_text_for_repair = json_to_parse # This is the string that failed json.loads
logger.warning(f"Initial JSON parsing failed for: '{problematic_text_for_repair[:200]}...' Error: {e}. Attempting LLM repair...", emoji_key="warning")
try:
from ultimate_mcp_server.tools.completion import generate_completion
system_message_content = "You are an expert JSON repair assistant. Your goal is to return only valid JSON."
# Prepend system instruction to the main prompt for completion models
# (as generate_completion with openai provider doesn't natively use a separate system_prompt field in its current design)
user_repair_request = (
f"The following text is supposed to be valid JSON but failed parsing. "
f"Please correct it and return *only* the raw, valid JSON string. "
f"Do not include any explanations or markdown formatting. "
f"If it's impossible to extract or repair to valid JSON, return an empty JSON object {{}}. "
f"Problematic text:\n\n```text\n{problematic_text_for_repair}\n```"
)
combined_prompt = f"{system_message_content}\n\n{user_repair_request}"
llm_repair_result = await generate_completion(
prompt=combined_prompt, # Use the combined prompt
provider="openai",
model="gpt-4.1-mini",
temperature=0.0,
additional_params={} # Remove system_prompt from here
)
text_from_llm_repair = llm_repair_result.get("text", "")
if not text_from_llm_repair.strip():
logger.error("LLM repair attempt returned empty string.")
return {"error": "LLM repair returned empty string", "original_text": problematic_text_for_repair}
# Re-extract from LLM response, as it might add fences
final_repaired_json_str = extract_json_from_markdown(text_from_llm_repair)
if not final_repaired_json_str.strip():
logger.error(f"LLM repair extracted an empty JSON string from LLM response: {text_from_llm_repair[:200]}...")
return {"error": "LLM repair extracted empty JSON", "llm_response": text_from_llm_repair, "original_text": problematic_text_for_repair}
try:
logger.debug(f"Attempting to parse LLM-repaired JSON: {final_repaired_json_str[:200]}...")
parsed_llm_result = json.loads(final_repaired_json_str)
logger.success("LLM JSON repair successful.", emoji_key="success")
return parsed_llm_result
except json.JSONDecodeError as llm_e:
logger.error(f"LLM repair attempt failed. LLM response could not be parsed as JSON: {llm_e}. LLM response (after extraction): '{final_repaired_json_str[:200]}' Original LLM text: '{text_from_llm_repair[:500]}...'")
return {"error": "LLM repair failed to produce valid JSON", "detail": str(llm_e), "llm_response_extracted": final_repaired_json_str, "llm_response_raw": text_from_llm_repair, "original_text": problematic_text_for_repair}
except Exception as repair_ex:
logger.error(f"Exception during LLM repair process: {repair_ex}", exc_info=True)
return {"error": "Exception during LLM repair", "detail": str(repair_ex), "original_text": problematic_text_for_repair}
except Exception as e: # General error in parse_result
logger.error(f"Critical error in parse_result: {e}", exc_info=True)
return {"error": "Critical error during result parsing", "detail": str(e)}
async def process_mcp_result(result: Any) -> Dict[str, Any]:
"""
Process and normalize results from MCP tool calls into a consistent dictionary format.
This function serves as a user-friendly interface for handling and normalizing
the various types of results that can be returned from MCP tools and provider operations.
It acts as a bridge between the raw MCP tool outputs and downstream application code
that expects a consistent dictionary structure.
The function handles multiple return formats:
- TextContent objects with a .text attribute
- List results containing TextContent objects or dictionaries
- Direct dictionary returns
- JSON-like strings embedded in markdown code blocks
Key features:
- Automatic extraction of JSON from markdown code fences
- JSON repair for malformed or truncated LLM outputs
- Fallback to LLM-based repair for difficult parsing cases
- Consistent error handling and reporting
This function is especially useful in:
- Handling results from completion tools where LLMs may return JSON in various formats
- Processing tool responses that contain structured data embedded in text
- Creating a consistent interface for downstream processing of MCP tool results
- Simplifying error handling in client applications
Args:
result: The raw result from an MCP tool call or provider operation, which could
be a TextContent object, a list, a dictionary, or another structure
Returns:
A dictionary containing either:
- The successfully parsed result data
- An error description with diagnostic information if parsing failed
Example:
```python
result = await some_mcp_tool()
processed_data = await process_mcp_result(result)
# Check for errors in the processed result
if "error" in processed_data:
print(f"Error processing result: {processed_data['error']}")
else:
# Use the normalized data
print(f"Processed data: {processed_data}")
```
"""
return await parse_result(result)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/knowledge_base/retriever.py:
--------------------------------------------------------------------------------
```python
"""Knowledge base retriever for RAG functionality."""
import time
from typing import Any, Dict, List, Optional
from ultimate_mcp_server.services.knowledge_base.feedback import get_rag_feedback_service
from ultimate_mcp_server.services.knowledge_base.utils import build_metadata_filter
from ultimate_mcp_server.services.vector import VectorDatabaseService
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
class KnowledgeBaseRetriever:
"""
Advanced retrieval engine for knowledge base collections in RAG applications.
The KnowledgeBaseRetriever provides sophisticated search capabilities for finding
the most relevant documents within knowledge bases. It offers multiple retrieval
strategies optimized for different search scenarios, from pure semantic vector
search to hybrid approaches combining vector and keyword matching.
Key Features:
- Multiple retrieval methods (vector, hybrid, keyword)
- Metadata filtering for targeted searches
- Content-based filtering for keyword matching
- Configurable similarity thresholds and relevance scoring
- Feedback mechanisms for continuous retrieval improvement
- Performance monitoring and diagnostics
- Advanced parameter tuning for specialized search needs
Retrieval Methods:
1. Vector Search: Uses embeddings for semantic similarity matching
- Best for finding conceptually related content
- Handles paraphrasing and semantic equivalence
- Computationally efficient for large collections
2. Hybrid Search: Combines vector and keyword matching with weighted scoring
- Balances semantic understanding with exact term matching
- Addresses vocabulary mismatch problems
- Provides more robust retrieval across diverse query types
3. Keyword Filtering: Limits results to those containing specific text
- Used for explicit term presence requirements
- Can be combined with other search methods
Architecture:
The retriever operates as a higher-level service above the vector database,
working in concert with:
- Embedding services for query vectorization
- Vector database services for efficient similarity search
- Feedback services for result quality improvement
- Metadata filters for context-aware retrieval
Usage in RAG Applications:
This retriever is a critical component in RAG pipelines, responsible for
the quality and relevance of context provided to LLMs. Tuning retrieval
parameters significantly impacts the quality of generated responses.
Example Usage:
```python
# Get retriever instance
retriever = get_knowledge_base_retriever()
# Simple vector search
results = await retriever.retrieve(
knowledge_base_name="company_policies",
query="What is our remote work policy?",
top_k=3,
min_score=0.7
)
# Hybrid search with metadata filtering
dept_results = await retriever.retrieve_hybrid(
knowledge_base_name="company_policies",
query="security requirements for customer data",
top_k=5,
vector_weight=0.6,
keyword_weight=0.4,
metadata_filter={"department": "security", "status": "active"}
)
# Process and use the retrieved documents
for item in results["results"]:
print(f"Document (score: {item['score']:.2f}): {item['document'][:100]}...")
print(f"Source: {item['metadata'].get('source', 'unknown')}")
# Record which documents were actually useful
await retriever.record_feedback(
knowledge_base_name="company_policies",
query="What is our remote work policy?",
retrieved_documents=results["results"],
used_document_ids=["doc123", "doc456"]
)
```
"""
def __init__(self, vector_service: VectorDatabaseService):
"""Initialize the knowledge base retriever.
Args:
vector_service: Vector database service for retrieving embeddings
"""
self.vector_service = vector_service
self.feedback_service = get_rag_feedback_service()
# Get embedding service for generating query embeddings
from ultimate_mcp_server.services.vector.embeddings import get_embedding_service
self.embedding_service = get_embedding_service()
logger.info("Knowledge base retriever initialized", extra={"emoji_key": "success"})
async def _validate_knowledge_base(self, name: str) -> Dict[str, Any]:
"""Validate that a knowledge base exists.
Args:
name: Knowledge base name
Returns:
Validation result
"""
# Check if knowledge base exists
collections = await self.vector_service.list_collections()
if name not in collections:
logger.warning(
f"Knowledge base '{name}' not found",
extra={"emoji_key": "warning"}
)
return {"status": "not_found", "name": name}
# Get metadata
metadata = await self.vector_service.get_collection_metadata(name)
if metadata.get("type") != "knowledge_base":
logger.warning(
f"Collection '{name}' is not a knowledge base",
extra={"emoji_key": "warning"}
)
return {"status": "not_knowledge_base", "name": name}
return {
"status": "valid",
"name": name,
"metadata": metadata
}
async def retrieve(
self,
knowledge_base_name: str,
query: str,
top_k: int = 5,
min_score: float = 0.6,
metadata_filter: Optional[Dict[str, Any]] = None,
content_filter: Optional[str] = None,
embedding_model: Optional[str] = None,
apply_feedback: bool = True,
search_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Retrieve documents from a knowledge base using vector search.
Args:
knowledge_base_name: Knowledge base name
query: Query text
top_k: Number of results to return
min_score: Minimum similarity score
metadata_filter: Optional metadata filter (field->value or field->{op:value})
content_filter: Text to search for in documents
embedding_model: Optional embedding model name
apply_feedback: Whether to apply feedback adjustments
search_params: Optional ChromaDB search parameters
Returns:
Retrieved documents with metadata
"""
start_time = time.time()
# Validate knowledge base
kb_info = await self._validate_knowledge_base(knowledge_base_name)
if kb_info["status"] != "valid":
logger.warning(
f"Knowledge base '{knowledge_base_name}' not found or invalid",
extra={"emoji_key": "warning"}
)
return {
"status": "error",
"message": f"Knowledge base '{knowledge_base_name}' not found or invalid"
}
logger.debug(f"DEBUG: Knowledge base validated - metadata: {kb_info['metadata']}")
# Use the same embedding model that was used to create the knowledge base
if not embedding_model and kb_info["metadata"].get("embedding_model"):
embedding_model = kb_info["metadata"]["embedding_model"]
logger.debug(f"Using embedding model from knowledge base metadata: {embedding_model}")
# If embedding model is specified, ensure it's saved in the metadata for future use
if embedding_model and not kb_info["metadata"].get("embedding_model"):
try:
await self.vector_service.update_collection_metadata(
name=knowledge_base_name,
metadata={
**kb_info["metadata"],
"embedding_model": embedding_model
}
)
logger.debug(f"Updated knowledge base metadata with embedding model: {embedding_model}")
except Exception as e:
logger.warning(f"Failed to update knowledge base metadata with embedding model: {str(e)}")
# Get or create ChromaDB collection
collection = await self.vector_service.get_collection(knowledge_base_name)
logger.debug(f"DEBUG: Retrieved collection type: {type(collection)}")
# Set search parameters if provided
if search_params:
await self.vector_service.update_collection_metadata(
collection_name=knowledge_base_name,
metadata={
**kb_info["metadata"],
**{f"hnsw:{k}": v for k, v in search_params.items()}
}
)
# Create includes parameter
includes = ["documents", "metadatas", "distances"]
# Create where_document parameter for content filtering
where_document = {"$contains": content_filter} if content_filter else None
# Convert metadata filter format if provided
chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None
logger.debug(f"DEBUG: Search parameters - top_k: {top_k}, min_score: {min_score}, filter: {chroma_filter}, where_document: {where_document}")
try:
# Generate embedding directly with our embedding service
# Call create_embeddings with a list and get the first result
query_embeddings = await self.embedding_service.create_embeddings(
texts=[query],
# model=embedding_model # Model is set during service init
)
if not query_embeddings:
logger.error(f"Failed to generate embedding for query: {query}")
return { "status": "error", "message": "Failed to generate query embedding" }
query_embedding = query_embeddings[0]
logger.debug(f"Generated query embedding with model: {self.embedding_service.model_name}, dimension: {len(query_embedding)}")
# Use correct query method based on collection type
if hasattr(collection, 'query') and not hasattr(collection, 'search_by_text'):
# ChromaDB collection
logger.debug("Using ChromaDB direct query with embeddings")
try:
search_results = collection.query(
query_embeddings=[query_embedding], # Use our embedding directly
n_results=top_k * 2,
where=chroma_filter,
where_document=where_document,
include=includes
)
except Exception as e:
logger.error(f"ChromaDB query error: {str(e)}")
raise
else:
# Our custom VectorCollection
logger.debug("Using VectorCollection search method")
search_results = await collection.query(
query_texts=[query],
n_results=top_k * 2,
where=chroma_filter,
where_document=where_document,
include=includes,
embedding_model=embedding_model
)
# Debug raw results
logger.debug(f"DEBUG: Raw search results - keys: {search_results.keys()}")
logger.debug(f"DEBUG: Documents count: {len(search_results.get('documents', [[]])[0])}")
logger.debug(f"DEBUG: IDs: {search_results.get('ids', [[]])[0]}")
logger.debug(f"DEBUG: Distances: {search_results.get('distances', [[]])[0]}")
# Process results
results = []
for i, doc in enumerate(search_results["documents"][0]):
# Convert distance to similarity score (1 = exact match, 0 = completely different)
# Most distance metrics return 0 for exact match, so we use 1 - distance
# This works for cosine, l2, etc.
similarity = 1.0 - float(search_results["distances"][0][i])
# Debug each document
logger.debug(f"DEBUG: Document {i} - ID: {search_results['ids'][0][i]}")
logger.debug(f"DEBUG: Similarity: {similarity} (min required: {min_score})")
logger.debug(f"DEBUG: Document content (first 100 chars): {doc[:100] if doc else 'Empty'}")
if search_results["metadatas"] and i < len(search_results["metadatas"][0]):
metadata = search_results["metadatas"][0][i]
logger.debug(f"DEBUG: Metadata: {metadata}")
# Skip results below minimum score
if similarity < min_score:
logger.debug(f"DEBUG: Skipping document {i} due to low similarity: {similarity} < {min_score}")
continue
results.append({
"id": search_results["ids"][0][i],
"document": doc,
"metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {},
"score": similarity
})
logger.debug(f"DEBUG: After filtering, {len(results)} documents remain.")
# Apply feedback adjustments if requested
if apply_feedback:
results = await self.feedback_service.apply_feedback_adjustments(
knowledge_base_name=knowledge_base_name,
results=results,
query=query
)
# Limit to top_k
results = results[:top_k]
# Track retrieval time
retrieval_time = time.time() - start_time
logger.info(
f"Retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s",
extra={"emoji_key": "success"}
)
return {
"status": "success",
"query": query,
"results": results,
"count": len(results),
"retrieval_time": retrieval_time
}
except Exception as e:
logger.error(
f"Error retrieving from knowledge base '{knowledge_base_name}': {str(e)}",
extra={"emoji_key": "error"}
)
return {
"status": "error",
"message": str(e)
}
async def retrieve_hybrid(
self,
knowledge_base_name: str,
query: str,
top_k: int = 5,
vector_weight: float = 0.7,
keyword_weight: float = 0.3,
min_score: float = 0.6,
metadata_filter: Optional[Dict[str, Any]] = None,
additional_keywords: Optional[List[str]] = None,
apply_feedback: bool = True,
search_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Retrieve documents using hybrid search.
Args:
knowledge_base_name: Knowledge base name
query: Query text
top_k: Number of documents to retrieve
vector_weight: Weight for vector search component
keyword_weight: Weight for keyword search component
min_score: Minimum similarity score
metadata_filter: Optional metadata filter
additional_keywords: Additional keywords to include in search
apply_feedback: Whether to apply feedback adjustments
search_params: Optional ChromaDB search parameters
Returns:
Retrieved documents with metadata
"""
start_time = time.time()
# Validate knowledge base
kb_info = await self._validate_knowledge_base(knowledge_base_name)
if kb_info["status"] != "valid":
logger.warning(
f"Knowledge base '{knowledge_base_name}' not found or invalid",
extra={"emoji_key": "warning"}
)
return {
"status": "error",
"message": f"Knowledge base '{knowledge_base_name}' not found or invalid"
}
# Get or create ChromaDB collection
collection = await self.vector_service.get_collection(knowledge_base_name)
# Set search parameters if provided
if search_params:
await self.vector_service.update_collection_metadata(
collection_name=knowledge_base_name,
metadata={
**kb_info["metadata"],
**{f"hnsw:{k}": v for k, v in search_params.items()}
}
)
# Convert metadata filter format if provided
chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None
# Create content filter based on query and additional keywords
content_text = query
if additional_keywords:
content_text = f"{query} {' '.join(additional_keywords)}"
# Use ChromaDB's hybrid search by providing both query text and content filter
try:
# Vector search results with content filter
search_results = await collection.query(
query_texts=[query],
n_results=top_k * 3, # Get more results for combining
where=chroma_filter,
where_document={"$contains": content_text} if content_text else None,
include=["documents", "metadatas", "distances"],
embedding_model=None # Use default embedding model
)
# Process results
combined_results = {}
# Process vector search results
for i, doc in enumerate(search_results["documents"][0]):
doc_id = search_results["ids"][0][i]
vector_score = 1.0 - float(search_results["distances"][0][i])
combined_results[doc_id] = {
"id": doc_id,
"document": doc,
"metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {},
"vector_score": vector_score,
"keyword_score": 0.0,
"score": vector_score * vector_weight
}
# Now do a keyword-only search if we have keywords component
if keyword_weight > 0:
keyword_results = await collection.query(
query_texts=None, # No vector query
n_results=top_k * 3,
where=chroma_filter,
where_document={"$contains": content_text},
include=["documents", "metadatas"],
embedding_model=None # No embedding model needed for keyword-only search
)
# Process keyword results
for i, doc in enumerate(keyword_results["documents"][0]):
doc_id = keyword_results["ids"][0][i]
# Approximate keyword score based on position (best = 1.0)
keyword_score = 1.0 - (i / len(keyword_results["documents"][0]))
if doc_id in combined_results:
# Update existing result
combined_results[doc_id]["keyword_score"] = keyword_score
combined_results[doc_id]["score"] += keyword_score * keyword_weight
else:
# Add new result
combined_results[doc_id] = {
"id": doc_id,
"document": doc,
"metadata": keyword_results["metadatas"][0][i] if keyword_results["metadatas"] else {},
"vector_score": 0.0,
"keyword_score": keyword_score,
"score": keyword_score * keyword_weight
}
# Convert to list and filter by min_score
results = [r for r in combined_results.values() if r["score"] >= min_score]
# Apply feedback adjustments if requested
if apply_feedback:
results = await self.feedback_service.apply_feedback_adjustments(
knowledge_base_name=knowledge_base_name,
results=results,
query=query
)
# Sort by score and limit to top_k
results.sort(key=lambda x: x["score"], reverse=True)
results = results[:top_k]
# Track retrieval time
retrieval_time = time.time() - start_time
logger.info(
f"Hybrid search retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s",
extra={"emoji_key": "success"}
)
return {
"status": "success",
"query": query,
"results": results,
"count": len(results),
"retrieval_time": retrieval_time
}
except Exception as e:
logger.error(
f"Error performing hybrid search on '{knowledge_base_name}': {str(e)}",
extra={"emoji_key": "error"}
)
return {
"status": "error",
"message": str(e)
}
async def record_feedback(
self,
knowledge_base_name: str,
query: str,
retrieved_documents: List[Dict[str, Any]],
used_document_ids: Optional[List[str]] = None,
explicit_feedback: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Record feedback for retrieval results.
Args:
knowledge_base_name: Knowledge base name
query: Query text
retrieved_documents: List of retrieved documents
used_document_ids: List of document IDs used in the response
explicit_feedback: Explicit feedback for documents
Returns:
Feedback recording result
"""
# Convert list to set if provided
used_ids_set = set(used_document_ids) if used_document_ids else None
# Record feedback
result = await self.feedback_service.record_retrieval_feedback(
knowledge_base_name=knowledge_base_name,
query=query,
retrieved_documents=retrieved_documents,
used_document_ids=used_ids_set,
explicit_feedback=explicit_feedback
)
return result
```