This is page 8 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/examples/data/Steve_Jobs_Introducing_The_iPhone_compressed.md:
--------------------------------------------------------------------------------
```markdown
# Transcript: Steve_Jobs_Introducing_The_iPhone_compressed.mp3
## Metadata
- **Duration:** 14:00.46
- **Language:** en (confidence: 1.00)
- **Transcription Model:** large-v3
- **Device:** cuda
- **Processing Time:** 99.07 seconds
## Full Transcript
This is a day I've been looking forward to for two and a half years. Every once in a while, a revolutionary product comes along that changes everything. And Apple has been, well, first of all, one's very fortunate if you get to work on just one of these in your career. Apple's been very fortunate. It's been able to introduce a few of these into the world. In 1984, we introduced the Macintosh. It didn't just change Apple. It changed the whole computer industry. In 2001, we introduced the first iPod. And it didn't just change the way we all listen to music. It changed the entire music industry. Well, today. Today. We're introducing three revolutionary products of this class. The first one is a widescreen iPod with touch controls. The second is a revolutionary mobile phone. And the third is a breakthrough internet communications device. So, three things. A widescreen iPod with touch controls. A revolutionary mobile phone. And a breakthrough internet communications device. An iPod. A phone. And an internet communicator. An iPod. A phone. These are not three separate devices. This is one device. We are calling it iPhone. Today. Today. Today, Apple is going to reinvent the phone. And here it is. Actually, here it is, but we're going to leave it there for now. So, before we get into it, let me talk about a category of things. The most advanced phones are called smart phones. So they say. And they typically combine a phone plus some email capability. Plus, they say it's the internet, sort of the baby internet. Into one device. And they all have these plastic little keyboards on them. And the problem is that they're not so smart. And they're not so easy to use. So, if you kind of make a, you know, business school 101 graph of the smart axis and the easy to use axis. Phones, regular cell phones are kind of right there. They're not so smart. And they're, you know, not so easy to use. But smart phones are definitely a little smarter. But they actually are harder to use. They're really complicated. Just for the basic stuff, people have a hard time figuring out how to use them. Well, we don't want to do either one of these things. What we want to do is make a leapfrog product that is way smarter than any mobile device has ever been. And super easy to use. This is what iPhone is. Okay. So, we're going to reinvent the phone. Now, we're going to start. We're going to start with a revolutionary user interface. Is the result of years of research and development. And, of course, it's an interplay of hardware and software. Now, why do we need a revolutionary user interface? I mean, here's four smart phones, right? Motorola Q, the Blackberry, Palm Treo, Nokia E62, the usual suspects. And what's wrong with their user interfaces? Well, the problem with them. Is really sort of in the bottom 40 there. It's this stuff right here. They all have these keyboards that are there whether you need them or not to be there. And they all have these control buttons that are fixed in plastic. And are the same for every application. Well, every application wants a slightly different user interface. A slightly optimized set of buttons just for it. And what happens if you think of a great idea six months from now? You can't run around and add a button to these things. They're already shipped. So, what do you do? It doesn't work because the buttons and the controls can't change. They can't change for each application. And they can't change down the road if you think of another great idea you want to add to this product. Well, how do you solve this? Hmm. It turns out we have solved it. We solved it in computers 20 years ago. We solved it with a bitmap screen that could display anything we want. Put any user interface. And a pointing device. We solved it with the mouse. Right? We solved this problem. So, how are we going to take this to a mobile device? Well, what we're going to do is get rid of all these buttons and just make a giant screen. A giant screen. Now, how are we going to communicate this? We don't want to carry around a mouse. Right? So, what are we going to do? Oh, a stylus. Right? We're going to use a stylus. No. Who wants a stylus? You have to get them and put them away and you lose them. Yuck. Nobody wants a stylus. So, let's not use a stylus. We're going to use the best pointing device in the world. We're going to use a pointing device that we're all born with. We're born with 10 of them. We're going to use our fingers. We're going to touch this with our fingers. And we have invented a new technology called multi-touch, which is phenomenal. It works like magic. You don't need a stylus. It's far more accurate than any touch display that's ever been shipped. It ignores unintended touches. It's super smart. You can do multi-finger gestures on it. And boy, have we patented it. We've been very lucky to have brought a few revolutionary user interfaces to the market in our time. First was the mouse. The second was the click wheel. And now we're going to bring multi-touch to the market. And each of these revolutionary user interfaces has made possible a revolutionary product. The Mac, the iPod, and now the iPhone. So, a revolutionary user interface. We're going to build on top of that with software. Now, software on mobile phones is like software. It's like baby software. It's not so powerful. And today, we're going to show you a software breakthrough. Software that's at least five years ahead of what's on any other phone. Now, how do we do this? Well, we start with a strong foundation. iPhone runs OS X. Now, why would we want to run such a sophisticated operating system on a mobile device? Well, because it's got everything we need. It's got multi-tasking. It's got the best networking. It already knows how to power manage. We've been doing this on mobile computers for years. It's got awesome security. And to write apps. It's got everything from Coco and the graphics. And it's got core animation built in. And it's got the audio and video that OS X is famous for. It's got all the stuff we want. And it's built right in to iPhone. And that has let us create desktop class applications and networking. Right? Not the crippled stuff that you find on most phones. This is real desktop class applications. Now, you know, one of the pioneers of our industry, Alan Kay, has had a lot of great quotes throughout the years. And I ran across one of them recently that explains how we look at this. Explains why we go about doing things the way we do. Because we love software. And here's the quote. People who are really serious about software should make their own hardware. You know? Alan said this 30 years ago. And this is how we feel about it. And so we're bringing breakthrough software to a mobile device for the first time. It's five years ahead of anything on any other phone. The second thing we're doing is we're learning from the iPod. Syncing with iTunes. You know, we're going to ship our hundred millionth iPod this year. And that's tens of millions of people that know how to sync these devices with their PCs or Mac and sync all of their media right on to their iPod. Right? So you just drop your iPod in and it automatically syncs. You're going to do the same thing with iPhone. It automatically syncs to your PC or Mac right through iTunes. And iTunes is going to sync all your media onto your iPhone. Your music, your audiobooks, podcasts, movies, TV shows, music videos. But it also syncs a ton of data. Your contacts, your calendars, and your photos, which you can get on your iPod today, your notes, your bookmarks from your web browser, your email accounts, your whole email setup, all that stuff can be moved over to iPhone completely automatically. It's really nice. And we do it through iTunes. Again, you go to iTunes and you set it up, just like you'd set up an iPod or an Apple TV. And you set up what you want synced to your iPhone. And it's just like an iPod. Charge and sync. So sync with iTunes. Third thing I want to talk about a little is design. We've designed something wonderful for your hand. Just wonderful. And this is what it looks like. It's got a three and a half inch screen. It's really big. And it's the highest resolution screen we've ever shipped. It's 160 pixels per inch. Highest we've ever shipped. It's gorgeous. And on the front, there's only one button down there. We call it the home button. It takes you home from wherever you are. And that's it. Let's take a look at the side. It's really thin. It's thinner than any smartphone out there at 11.6 millimeters. Thinner than the Q, thinner than the Blackjack, thinner than all of them. It's really nice. And we've got some controls on the side. We've got a little switch for ring and silent. We've got a volume up and down control. Let's look at the back. On the back, the biggest thing of note is we've got a two megapixel camera built right in. The other side, and we're back on the front. So let's take a look at the top now. We've got a headset jack. Three and a half millimeter. All your iPod headphones fit right in. We've got a place, a little tray for your SIM card. And we've got one switch for sleep and wake. Just push it to go to sleep, push it to wake up. Let's take a look at the bottom. We've got a speaker. We've got a microphone. And we've got our 30-pin iPod connector. So that's the bottom. Now, we've also got some stuff you can't see. We've got three really advanced sensors built into this phone. The first one is a proximity sensor. It senses when physical objects get close. So when you bring iPhone up to your ear, take a phone call, it turns off the display, and it turns off the touch sensor instantly. Why do you want to do that? Well, one, to save battery, but two, so you don't get spurious inputs from your face into the touchscreen. It just automatically turns them off. Take it away, boom, it's back on. So it's got a proximity sensor built in. It's got an ambient light sensor as well. We sense the ambient lighting conditions and adjust the brightness of the display to match the ambient lighting conditions. Again, better user experience saves power. And the third thing we've got is an accelerometer so that we can tell when you switch from portrait to landscape.
## Segments
**[00:04.11 → 00:08.89]** This is a day I've been looking forward to for two and a half years.
**[00:15.64 → 00:21.86]** Every once in a while, a revolutionary product comes along that changes everything.
**[00:25.66 → 00:33.30]** And Apple has been, well, first of all, one's very fortunate if you get to work on just one of these in your career.
**[00:35.67 → 00:37.01]** Apple's been very fortunate.
**[00:37.81 → 00:41.91]** It's been able to introduce a few of these into the world.
**[00:41.91 → 00:46.87]** In 1984, we introduced the Macintosh.
**[00:47.39 → 00:49.05]** It didn't just change Apple.
**[00:49.53 → 00:51.49]** It changed the whole computer industry.
**[01:03.15 → 01:07.53]** In 2001, we introduced the first iPod.
**[01:09.55 → 01:15.37]** And it didn't just change the way we all listen to music.
**[01:15.65 → 01:17.85]** It changed the entire music industry.
**[01:20.00 → 01:22.68]** Well, today.
**[01:22.78 → 01:23.18]** Today.
**[01:24.68 → 01:29.66]** We're introducing three revolutionary products of this class.
**[01:32.32 → 01:39.27]** The first one is a widescreen iPod with touch controls.
**[01:39.61 → 01:54.70]** The second is a revolutionary mobile phone.
**[02:04.40 → 02:10.24]** And the third is a breakthrough internet communications device.
**[02:14.88 → 02:17.46]** So, three things.
**[02:17.74 → 02:20.56]** A widescreen iPod with touch controls.
**[02:20.56 → 02:22.90]** A revolutionary mobile phone.
**[02:23.04 → 02:26.26]** And a breakthrough internet communications device.
**[02:26.60 → 02:29.55]** An iPod.
**[02:30.33 → 02:31.65]** A phone.
**[02:33.16 → 02:35.24]** And an internet communicator.
**[02:36.10 → 02:37.84]** An iPod.
**[02:38.38 → 02:39.56]** A phone.
**[02:39.88 → 02:49.96]** These are not three separate devices.
**[02:50.54 → 02:52.34]** This is one device.
**[02:52.70 → 03:00.14]** We are calling it iPhone.
**[03:02.07 → 03:02.95]** Today.
**[03:05.06 → 03:05.50]** Today.
**[03:05.50 → 03:08.64]** Today, Apple is going to reinvent the phone.
**[03:10.35 → 03:11.29]** And here it is.
**[03:20.06 → 03:22.86]** Actually, here it is, but we're going to leave it there for now.
**[03:24.36 → 03:33.58]** So, before we get into it, let me talk about a category of things.
**[03:33.68 → 03:36.10]** The most advanced phones are called smart phones.
**[03:37.28 → 03:38.16]** So they say.
**[03:39.24 → 03:43.64]** And they typically combine a phone plus some email capability.
**[03:43.96 → 03:46.68]** Plus, they say it's the internet, sort of the baby internet.
**[03:46.68 → 03:47.82]** Into one device.
**[03:47.96 → 03:50.78]** And they all have these plastic little keyboards on them.
**[03:51.48 → 03:55.46]** And the problem is that they're not so smart.
**[03:55.82 → 03:57.68]** And they're not so easy to use.
**[03:57.74 → 04:04.70]** So, if you kind of make a, you know, business school 101 graph of the smart axis and the easy to use axis.
**[04:05.14 → 04:07.24]** Phones, regular cell phones are kind of right there.
**[04:07.32 → 04:08.38]** They're not so smart.
**[04:08.44 → 04:10.84]** And they're, you know, not so easy to use.
**[04:12.44 → 04:14.72]** But smart phones are definitely a little smarter.
**[04:14.72 → 04:16.80]** But they actually are harder to use.
**[04:17.12 → 04:18.60]** They're really complicated.
**[04:18.86 → 04:22.44]** Just for the basic stuff, people have a hard time figuring out how to use them.
**[04:23.44 → 04:25.82]** Well, we don't want to do either one of these things.
**[04:26.04 → 04:33.46]** What we want to do is make a leapfrog product that is way smarter than any mobile device has ever been.
**[04:33.68 → 04:35.50]** And super easy to use.
**[04:35.64 → 04:37.48]** This is what iPhone is.
**[04:37.82 → 04:38.36]** Okay.
**[04:40.49 → 04:43.09]** So, we're going to reinvent the phone.
**[04:44.45 → 04:46.09]** Now, we're going to start.
**[04:46.35 → 04:52.85]** We're going to start with a revolutionary user interface.
**[04:54.23 → 04:58.55]** Is the result of years of research and development.
**[05:00.19 → 05:03.61]** And, of course, it's an interplay of hardware and software.
**[05:04.67 → 05:07.49]** Now, why do we need a revolutionary user interface?
**[05:07.77 → 05:10.15]** I mean, here's four smart phones, right?
**[05:10.25 → 05:15.09]** Motorola Q, the Blackberry, Palm Treo, Nokia E62, the usual suspects.
**[05:15.41 → 05:18.15]** And what's wrong with their user interfaces?
**[05:18.19 → 05:19.95]** Well, the problem with them.
**[05:20.45 → 05:22.57]** Is really sort of in the bottom 40 there.
**[05:23.05 → 05:24.67]** It's this stuff right here.
**[05:25.69 → 05:30.13]** They all have these keyboards that are there whether you need them or not to be there.
**[05:30.49 → 05:34.41]** And they all have these control buttons that are fixed in plastic.
**[05:34.71 → 05:36.93]** And are the same for every application.
**[05:37.21 → 05:39.93]** Well, every application wants a slightly different user interface.
**[05:40.17 → 05:43.35]** A slightly optimized set of buttons just for it.
**[05:43.81 → 05:46.77]** And what happens if you think of a great idea six months from now?
**[05:46.93 → 05:49.21]** You can't run around and add a button to these things.
**[05:49.21 → 05:49.89]** They're already shipped.
**[05:50.19 → 05:51.27]** So, what do you do?
**[05:51.96 → 05:56.41]** It doesn't work because the buttons and the controls can't change.
**[05:56.67 → 05:58.27]** They can't change for each application.
**[05:58.55 → 06:03.87]** And they can't change down the road if you think of another great idea you want to add to this product.
**[06:04.55 → 06:05.99]** Well, how do you solve this?
**[06:06.43 → 06:06.81]** Hmm.
**[06:07.09 → 06:09.37]** It turns out we have solved it.
**[06:09.43 → 06:12.19]** We solved it in computers 20 years ago.
**[06:12.81 → 06:17.55]** We solved it with a bitmap screen that could display anything we want.
**[06:17.71 → 06:19.11]** Put any user interface.
**[06:19.11 → 06:21.79]** And a pointing device.
**[06:22.23 → 06:23.57]** We solved it with the mouse.
**[06:24.07 → 06:24.45]** Right?
**[06:25.01 → 06:26.13]** We solved this problem.
**[06:26.25 → 06:28.61]** So, how are we going to take this to a mobile device?
**[06:29.35 → 06:34.51]** Well, what we're going to do is get rid of all these buttons and just make a giant screen.
**[06:35.47 → 06:36.57]** A giant screen.
**[06:38.39 → 06:40.39]** Now, how are we going to communicate this?
**[06:40.45 → 06:41.81]** We don't want to carry around a mouse.
**[06:41.95 → 06:42.19]** Right?
**[06:42.31 → 06:43.31]** So, what are we going to do?
**[06:43.91 → 06:44.95]** Oh, a stylus.
**[06:45.03 → 06:45.25]** Right?
**[06:45.65 → 06:46.81]** We're going to use a stylus.
**[06:47.83 → 06:48.23]** No.
**[06:50.35 → 06:51.69]** Who wants a stylus?
**[06:52.69 → 06:55.15]** You have to get them and put them away and you lose them.
**[06:55.27 → 06:55.67]** Yuck.
**[06:56.27 → 06:57.57]** Nobody wants a stylus.
**[06:57.71 → 06:59.11]** So, let's not use a stylus.
**[07:00.19 → 07:02.73]** We're going to use the best pointing device in the world.
**[07:02.79 → 07:05.47]** We're going to use a pointing device that we're all born with.
**[07:05.57 → 07:06.67]** We're born with 10 of them.
**[07:06.71 → 07:07.53]** We're going to use our fingers.
**[07:08.49 → 07:09.95]** We're going to touch this with our fingers.
**[07:10.13 → 07:15.47]** And we have invented a new technology called multi-touch, which is phenomenal.
**[07:16.27 → 07:17.83]** It works like magic.
**[07:19.45 → 07:21.31]** You don't need a stylus.
**[07:22.05 → 07:26.09]** It's far more accurate than any touch display that's ever been shipped.
**[07:26.65 → 07:29.03]** It ignores unintended touches.
**[07:29.15 → 07:30.07]** It's super smart.
**[07:31.13 → 07:33.79]** You can do multi-finger gestures on it.
**[07:34.39 → 07:36.41]** And boy, have we patented it.
**[07:46.21 → 07:53.01]** We've been very lucky to have brought a few revolutionary user interfaces to the market in our time.
**[07:53.73 → 07:54.73]** First was the mouse.
**[07:55.73 → 07:58.55]** The second was the click wheel.
**[07:58.75 → 08:02.11]** And now we're going to bring multi-touch to the market.
**[08:02.61 → 08:09.01]** And each of these revolutionary user interfaces has made possible a revolutionary product.
**[08:09.23 → 08:12.79]** The Mac, the iPod, and now the iPhone.
**[08:13.25 → 08:15.85]** So, a revolutionary user interface.
**[08:16.09 → 08:20.09]** We're going to build on top of that with software.
**[08:20.29 → 08:23.93]** Now, software on mobile phones is like software.
**[08:23.93 → 08:25.55]** It's like baby software.
**[08:26.47 → 08:28.17]** It's not so powerful.
**[08:28.65 → 08:32.01]** And today, we're going to show you a software breakthrough.
**[08:32.27 → 08:37.15]** Software that's at least five years ahead of what's on any other phone.
**[08:37.41 → 08:38.45]** Now, how do we do this?
**[08:38.51 → 08:41.03]** Well, we start with a strong foundation.
**[08:41.79 → 08:43.87]** iPhone runs OS X.
**[08:47.68 → 08:57.76]** Now, why would we want to run such a sophisticated operating system
**[08:57.76 → 08:59.26]** on a mobile device?
**[08:59.46 → 09:01.38]** Well, because it's got everything we need.
**[09:01.94 → 09:03.50]** It's got multi-tasking.
**[09:03.72 → 09:05.18]** It's got the best networking.
**[09:05.52 → 09:07.70]** It already knows how to power manage.
**[09:07.78 → 09:09.96]** We've been doing this on mobile computers for years.
**[09:10.30 → 09:11.74]** It's got awesome security.
**[09:12.06 → 09:13.28]** And to write apps.
**[09:13.72 → 09:17.30]** It's got everything from Coco and the graphics.
**[09:17.38 → 09:19.50]** And it's got core animation built in.
**[09:19.66 → 09:23.58]** And it's got the audio and video that OS X is famous for.
**[09:23.66 → 09:25.10]** It's got all the stuff we want.
**[09:25.10 → 09:27.52]** And it's built right in to iPhone.
**[09:27.76 → 09:32.82]** And that has let us create desktop class applications and networking.
**[09:33.92 → 09:34.46]** Right?
**[09:35.66 → 09:38.78]** Not the crippled stuff that you find on most phones.
**[09:38.94 → 09:41.58]** This is real desktop class applications.
**[09:42.28 → 09:46.74]** Now, you know, one of the pioneers of our industry, Alan Kay,
**[09:46.98 → 09:49.44]** has had a lot of great quotes throughout the years.
**[09:49.60 → 09:55.00]** And I ran across one of them recently that explains how we look at this.
**[09:56.06 → 09:59.18]** Explains why we go about doing things the way we do.
**[09:59.30 → 10:00.72]** Because we love software.
**[10:01.90 → 10:02.96]** And here's the quote.
**[10:03.44 → 10:07.50]** People who are really serious about software should make their own hardware.
**[10:08.32 → 10:08.98]** You know?
**[10:09.46 → 10:11.40]** Alan said this 30 years ago.
**[10:11.96 → 10:13.76]** And this is how we feel about it.
**[10:13.82 → 10:18.64]** And so we're bringing breakthrough software to a mobile device for the first time.
**[10:18.76 → 10:22.14]** It's five years ahead of anything on any other phone.
**[10:22.84 → 10:24.14]** The second thing we're doing
**[10:24.14 → 10:25.84]** is we're learning from the iPod.
**[10:26.24 → 10:27.30]** Syncing with iTunes.
**[10:27.48 → 10:30.76]** You know, we're going to ship our hundred millionth iPod this year.
**[10:31.20 → 10:34.78]** And that's tens of millions of people
**[10:34.78 → 10:38.10]** that know how to sync these devices with their PCs or Mac
**[10:38.10 → 10:41.72]** and sync all of their media right on to their iPod.
**[10:41.82 → 10:42.34]** Right?
**[10:42.46 → 10:44.72]** So you just drop your iPod in
**[10:44.72 → 10:46.38]** and it automatically syncs.
**[10:46.50 → 10:48.68]** You're going to do the same thing with iPhone.
**[10:48.96 → 10:51.20]** It automatically syncs to your PC or Mac
**[10:51.96 → 10:52.74]** right through iTunes.
**[10:52.74 → 10:56.34]** And iTunes is going to sync all your media onto your iPhone.
**[10:56.50 → 11:01.64]** Your music, your audiobooks, podcasts, movies, TV shows, music videos.
**[11:01.92 → 11:03.96]** But it also syncs a ton of data.
**[11:04.64 → 11:06.84]** Your contacts, your calendars, and your photos,
**[11:06.94 → 11:08.28]** which you can get on your iPod today,
**[11:08.44 → 11:11.86]** your notes, your bookmarks from your web browser,
**[11:12.34 → 11:14.68]** your email accounts, your whole email setup,
**[11:14.74 → 11:17.62]** all that stuff can be moved over to iPhone completely automatically.
**[11:18.44 → 11:19.46]** It's really nice.
**[11:19.64 → 11:22.44]** And we do it through iTunes.
**[11:23.76 → 11:26.16]** Again, you go to iTunes and you set it up,
**[11:26.22 → 11:28.46]** just like you'd set up an iPod or an Apple TV.
**[11:28.96 → 11:31.46]** And you set up what you want synced to your iPhone.
**[11:32.30 → 11:34.26]** And it's just like an iPod.
**[11:35.14 → 11:36.30]** Charge and sync.
**[11:36.78 → 11:37.92]** So sync with iTunes.
**[11:40.02 → 11:42.24]** Third thing I want to talk about a little is design.
**[11:43.08 → 11:46.38]** We've designed something wonderful for your hand.
**[11:47.02 → 11:47.86]** Just wonderful.
**[11:48.60 → 11:49.90]** And this is what it looks like.
**[11:52.64 → 11:54.52]** It's got a three and a half inch screen.
**[11:54.52 → 11:55.90]** It's really big.
**[11:56.98 → 12:00.24]** And it's the highest resolution screen we've ever shipped.
**[12:00.38 → 12:02.18]** It's 160 pixels per inch.
**[12:02.82 → 12:04.14]** Highest we've ever shipped.
**[12:04.26 → 12:04.82]** It's gorgeous.
**[12:05.22 → 12:07.78]** And on the front, there's only one button down there.
**[12:07.88 → 12:08.80]** We call it the home button.
**[12:08.90 → 12:10.54]** It takes you home from wherever you are.
**[12:10.70 → 12:11.56]** And that's it.
**[12:12.50 → 12:13.66]** Let's take a look at the side.
**[12:14.02 → 12:15.40]** It's really thin.
**[12:15.60 → 12:20.26]** It's thinner than any smartphone out there at 11.6 millimeters.
**[12:20.42 → 12:22.94]** Thinner than the Q, thinner than the Blackjack,
**[12:23.02 → 12:24.18]** thinner than all of them.
**[12:25.06 → 12:25.92]** It's really nice.
**[12:26.54 → 12:28.28]** And we've got some controls on the side.
**[12:28.40 → 12:29.88]** We've got a little switch for ring and silent.
**[12:29.98 → 12:31.74]** We've got a volume up and down control.
**[12:33.14 → 12:33.96]** Let's look at the back.
**[12:34.76 → 12:36.44]** On the back, the biggest thing of note
**[12:36.44 → 12:38.42]** is we've got a two megapixel camera built right in.
**[12:42.40 → 12:44.12]** The other side, and we're back on the front.
**[12:44.24 → 12:45.46]** So let's take a look at the top now.
**[12:46.34 → 12:48.58]** We've got a headset jack.
**[12:49.18 → 12:50.18]** Three and a half millimeter.
**[12:50.38 → 12:52.02]** All your iPod headphones fit right in.
**[12:53.32 → 12:55.62]** We've got a place, a little tray for your SIM card.
**[12:56.20 → 12:57.44]** And we've got one switch
**[12:57.78 → 12:58.36]** for sleep and wake.
**[12:58.52 → 13:00.64]** Just push it to go to sleep, push it to wake up.
**[13:01.84 → 13:02.86]** Let's take a look at the bottom.
**[13:04.74 → 13:05.82]** We've got a speaker.
**[13:07.12 → 13:08.20]** We've got a microphone.
**[13:08.82 → 13:11.32]** And we've got our 30-pin iPod connector.
**[13:12.40 → 13:13.46]** So that's the bottom.
**[13:14.40 → 13:16.62]** Now, we've also got some stuff you can't see.
**[13:17.32 → 13:20.88]** We've got three really advanced sensors built into this phone.
**[13:21.14 → 13:22.84]** The first one is a proximity sensor.
**[13:23.22 → 13:25.48]** It senses when physical objects get close.
**[13:25.58 → 13:27.62]** So when you bring iPhone up to your ear,
**[13:28.08 → 13:30.82]** take a phone call, it turns off the display,
**[13:31.04 → 13:33.02]** and it turns off the touch sensor instantly.
**[13:33.36 → 13:34.38]** Why do you want to do that?
**[13:34.48 → 13:35.90]** Well, one, to save battery, but two,
**[13:36.04 → 13:38.98]** so you don't get spurious inputs from your face into the touchscreen.
**[13:39.22 → 13:40.40]** It just automatically turns them off.
**[13:40.42 → 13:41.50]** Take it away, boom, it's back on.
**[13:41.82 → 13:43.76]** So it's got a proximity sensor built in.
**[13:43.78 → 13:45.28]** It's got an ambient light sensor as well.
**[13:45.58 → 13:47.42]** We sense the ambient lighting conditions
**[13:47.42 → 13:49.50]** and adjust the brightness of the display
**[13:49.50 → 13:51.04]** to match the ambient lighting conditions.
**[13:51.10 → 13:53.52]** Again, better user experience saves power.
**[13:53.96 → 13:56.58]** And the third thing we've got is an accelerometer
**[13:56.58 → 13:59.88]** so that we can tell when you switch from portrait to landscape.
```
--------------------------------------------------------------------------------
/completion_support.py:
--------------------------------------------------------------------------------
```python
"""
Completion support for MCP servers.
This module implements the argument completion system for the MCP (Model Control Protocol)
servers, enabling interactive, context-aware autocompletion for tool arguments. The completion
system helps users and LLMs efficiently use tools by suggesting valid values for arguments
based on the current context.
Key Components:
- CompletionProvider (abstract): Base class for all completion providers
- StaticCompletionProvider: Provides completions from predefined, static lists
- DynamicCompletionProvider: Generates completions on-demand through callback functions
- CompletionRegistry: Central registry managing providers for different tools
- Utility functions: Helper functions for common completion scenarios (file paths, etc.)
The module supports a flexible, extensible architecture where:
1. Each tool can register its own providers for different arguments
2. Static lists can be used for enumerated options (e.g., formats, modes)
3. Dynamic functions can be used for context-dependent values (files, users, etc.)
4. A fallback provider can handle common arguments across tools
Usage Example:
```python
# Create completion registry
registry = CompletionRegistry()
# Register dynamic provider for a tool
registry.register_provider("database_tool", DynamicCompletionProvider({
"table_name": async_db_tables_function,
"column_name": async_table_columns_function
}))
# Set default provider for common arguments across all tools
registry.set_default_provider(COMMON_COMPLETIONS)
# Later, when handling MCP completion requests:
completions = await registry.get_completions(
tool_name="document_tool",
argument_name="format",
current_value="pd" # User has typed "pd" so far
)
# Returns: {"values": ["pdf"], "hasMore": False, "total": 1}
```
This system integrates with the MCP server to provide real-time completion
suggestions as users type, significantly improving usability and reducing errors.
"""
from typing import Any, Callable, Dict, List, Optional
class CompletionProvider:
"""
Abstract base class defining the interface for argument completion providers.
CompletionProvider serves as the foundation for all completion mechanisms in the MCP
system. It defines a consistent interface that all provider implementations must follow,
ensuring that consumers of completions can work with different providers interchangeably.
To implement a custom completion provider:
1. Subclass CompletionProvider
2. Implement the get_completions() method to return suggestions
3. Implement the supports_argument() method to indicate what arguments your provider handles
The framework includes two standard implementations:
- StaticCompletionProvider: Uses predefined lists of values
- DynamicCompletionProvider: Generates values dynamically via callback functions
Custom implementations might include providers that:
- Query external APIs for suggestions
- Read from databases or other data sources
- Implement complex filtering or ranking logic
- Cache results for performance optimization
Providers should handle any internal errors gracefully and return an empty list rather
than raising exceptions that would disrupt the completion flow.
"""
async def get_completions(self, argument_name: str, current_value: str, **context) -> List[str]:
"""
Get completion suggestions for an argument.
This method is called when the MCP system needs completions for an argument.
It should return relevant suggestions based on the provided information.
Args:
argument_name: Name of the argument being completed (e.g., "file_path", "format")
current_value: Current partial value entered by the user (may be empty)
**context: Additional context that may affect completions, such as:
- tool_name: The name of the tool requesting completions
- Other argument values from the same tool call
- User information, preferences, or permissions
- Environment information
Returns:
List of string suggestions that are valid completions for the current_value.
Return an empty list if no suggestions are available.
Raises:
NotImplementedError: If not implemented by a subclass
"""
raise NotImplementedError("Subclasses must implement get_completions")
def supports_argument(self, argument_name: str) -> bool:
"""
Check if this provider supports completion for a given argument.
This method allows the CompletionRegistry to quickly determine if this
provider can handle completions for a specific argument without having
to call get_completions and risk exceptions or empty results.
Args:
argument_name: Name of the argument to check for support
Returns:
True if this provider can provide completions for the argument,
False otherwise
Raises:
NotImplementedError: If not implemented by a subclass
"""
raise NotImplementedError("Subclasses must implement supports_argument")
class StaticCompletionProvider(CompletionProvider):
"""
Completion provider that returns predefined, static suggestion lists for arguments.
This provider implements a straightforward approach to argument completion using
predefined lists of suggestions for each supported argument name. When queried,
it filters these static lists based on the current user input prefix. This makes it
ideal for arguments with a fixed, known set of possible values like:
- Enumerated options (e.g., file formats, provider names)
- Common settings or modes (e.g., analysis types, priority levels)
- Frequently used values that rarely change
The provider automatically performs case-insensitive prefix matching on the
predefined suggestions when a partial value is provided. For example, if
completions include ["openai", "anthropic"] and the current_value is "open",
only "openai" will be returned.
Usage example:
```python
# Create a provider with predefined completions for common arguments
provider = StaticCompletionProvider({
"format": ["json", "csv", "xml", "yaml"],
"provider": ["openai", "anthropic", "cohere", "azure"],
"priority": ["low", "medium", "high"]
})
# Later, get all format options
completions = await provider.get_completions("format", "") # Returns all formats
# Or get filtered provider options
completions = await provider.get_completions("provider", "co") # Returns ["cohere"]
```
For arguments that require dynamic or context-sensitive completions (like file paths
or current database tables), use DynamicCompletionProvider instead.
"""
def __init__(self, completions: Dict[str, List[str]]):
"""
Initialize with static completion values.
Args:
completions: Dictionary mapping argument names to suggestion lists
"""
self.completions = completions
async def get_completions(self, argument_name: str, current_value: str, **context) -> List[str]:
"""Get completion suggestions from static values."""
if not self.supports_argument(argument_name):
return []
# Filter suggestions based on current value
suggestions = self.completions.get(argument_name, [])
if current_value:
return [s for s in suggestions if s.lower().startswith(current_value.lower())]
return suggestions
def supports_argument(self, argument_name: str) -> bool:
"""Check if static completions exist for this argument."""
return argument_name in self.completions
class DynamicCompletionProvider(CompletionProvider):
"""
Completion provider that generates suggestions dynamically using callback functions.
Unlike the StaticCompletionProvider which uses fixed lists, this provider calls
specialized functions to generate completion suggestions on demand. This approach
is essential for arguments whose valid values:
- Depend on the current system state (e.g., existing files, running processes)
- Vary based on user context or previous selections
- Are too numerous to predefine (e.g., all possible file paths)
- Require external API calls or database queries to determine
The provider maps argument names to async callback functions that are responsible
for generating appropriate suggestions based on the current partial input and context.
Each callback function should accept at least two parameters:
- current_value: The current partial input string
- **context: Additional context information that may be useful for generating completions
Usage example:
```python
# Define completion functions
async def complete_files(current_value, **context):
# Custom logic to find matching files
return ["file1.txt", "file2.txt", "folder/"]
async def complete_users(current_value, **context):
# Query database for matching users
db = context.get("database")
users = await db.query(f"SELECT username FROM users WHERE username LIKE '{current_value}%'")
return [user.username for user in users]
# Create dynamic provider with these functions
provider = DynamicCompletionProvider({
"file_path": complete_files,
"username": complete_users
})
```
Each completion function should handle errors gracefully and return an empty list
rather than raising exceptions. The provider will log errors but won't propagate them
to the MCP completion API response.
"""
def __init__(self, completion_functions: Dict[str, Callable]):
"""
Initialize with dynamic completion functions.
Args:
completion_functions: Dictionary mapping argument names to completion functions.
Each function should be an async function that accepts at least
(current_value: str, **context) and returns a List[str] of suggestions.
"""
self.completion_functions = completion_functions
async def get_completions(self, argument_name: str, current_value: str, **context) -> List[str]:
"""Get completion suggestions by calling appropriate function."""
if not self.supports_argument(argument_name):
return []
# Call the function to get suggestions
func = self.completion_functions.get(argument_name)
if func:
try:
suggestions = await func(current_value, **context)
return suggestions
except Exception as e:
# Log error and return empty list
print(f"Error getting completions for {argument_name}: {str(e)}")
return []
return []
def supports_argument(self, argument_name: str) -> bool:
"""Check if a completion function exists for this argument."""
return argument_name in self.completion_functions
class CompletionRegistry:
"""
Central registry managing completion providers for different tools and arguments.
The CompletionRegistry serves as the core orchestration component of the MCP completion
system, providing a unified interface for registration, management, and access to
completion providers. It implements:
1. Tool-specific provider registration and lookup
2. A fallback mechanism through a default provider
3. Standardized response formatting according to the MCP specification
4. Error handling and graceful degradation
This registry is designed to be the single entry point for all completion requests
in an MCP server. Client code can register different providers for different tools,
set up a default provider for common arguments, and then handle all completion requests
through a single, consistent interface.
Each tool in the system can have its own dedicated completion provider that understands
the specific requirements and valid values for that tool's arguments. When no specific
provider is registered for a tool, the registry falls back to the default provider,
which typically handles common arguments like formats, providers, and models.
Usage workflow:
1. Create a registry during server initialization
2. Register tool-specific providers for specialized tools
3. Set a default provider for common arguments
4. Use get_completions() to handle MCP completion protocol requests
The registry ensures that all responses follow the MCP completion protocol format,
even when errors occur or no completions are available, providing a consistent
experience for clients.
"""
def __init__(self):
"""Initialize the registry."""
self.tool_providers = {} # Map of tool_name -> provider
self.default_provider = None
def register_provider(self, tool_name: str, provider: CompletionProvider):
"""
Register a completion provider for a specific tool.
This method associates a completion provider with a specific tool name in the registry.
When the system receives completion requests for this tool, the registered provider
will be used to generate suggestions for its arguments.
Each tool can have exactly one registered provider. If a provider is already
registered for the specified tool, it will be replaced by the new provider.
This allows for dynamic reconfiguration of completion sources as needed.
The provider can be either a StaticCompletionProvider for fixed option lists,
a DynamicCompletionProvider for context-dependent suggestions, or any custom
implementation of the CompletionProvider interface.
Args:
tool_name: Identifier of the tool to register a provider for. This should match
the tool name used in MCP requests (e.g., "search_documents",
"analyze_image").
provider: The completion provider instance that will handle suggestion requests
for this tool's arguments. Must implement the CompletionProvider interface.
Note:
If a tool requires completions for only some of its arguments, the provider
should still be registered here, and its supports_argument() method should
return False for unsupported arguments.
Example:
```python
# Register provider for search_documents tool
registry.register_provider(
"search_documents",
StaticCompletionProvider({
"source": ["web", "database", "local_files"],
"sort_by": ["relevance", "date", "title"]
})
)
```
"""
self.tool_providers[tool_name] = provider
def set_default_provider(self, provider: CompletionProvider):
"""
Set a default provider to handle arguments for tools without specific providers.
This method establishes a fallback completion provider that is used when:
1. No tool-specific provider is registered for a requested tool
2. A registered provider exists but doesn't support the requested argument
The default provider is typically configured to handle common arguments that
appear across multiple tools, such as:
- Provider names (e.g., "openai", "anthropic", "azure")
- Model identifiers (e.g., "gpt-4o", "claude-3-5-haiku-20241022")
- Common formats (e.g., "json", "csv", "markdown")
- Universal settings (e.g., "temperature", "max_tokens")
Only one default provider can be active at a time. Setting a new default
provider replaces any previously set default.
Args:
provider: The completion provider instance to use as the default fallback
for all tools without specific providers or for arguments not
supported by their specific providers.
Note:
When no default provider is set and no tool-specific provider is found,
the system will return an empty completion result rather than raising
an error.
Example:
```python
# Set default provider for common arguments across all tools
registry.set_default_provider(
StaticCompletionProvider({
"provider": ["openai", "anthropic", "cohere"],
"temperature": ["0.0", "0.5", "0.7", "1.0"],
"format": ["json", "text", "markdown"]
})
)
```
"""
self.default_provider = provider
def get_provider(self, tool_name: str) -> Optional[CompletionProvider]:
"""
Retrieve the appropriate completion provider for a specific tool.
This method implements the provider resolution logic for the registry,
determining which completion provider should handle a given tool's
argument completions. It follows this resolution sequence:
1. Look for a provider specifically registered for the requested tool
2. If no tool-specific provider exists, fall back to the default provider
3. If neither exists, return None
This lookup process encapsulates the registry's fallback mechanism,
allowing tool-specific providers to take precedence while ensuring
that common arguments can still be handled by a default provider.
Args:
tool_name: The identifier of the tool to find a provider for, matching
the name used when registering the provider.
Returns:
CompletionProvider: The appropriate provider to handle completions for the tool.
This will be either:
- The tool's specifically registered provider, if one exists
- The default provider, if no tool-specific provider exists
- None, if no applicable provider is found
Note:
This method is primarily used internally by get_completions(), but can also
be called directly to check provider availability without requesting completions.
"""
return self.tool_providers.get(tool_name, self.default_provider)
async def get_completions(
self,
tool_name: str,
argument_name: str,
current_value: str,
**context
) -> Dict[str, Any]:
"""
Get completion suggestions for a tool argument with standardized response format.
This method serves as the main API endpoint for the MCP completion protocol.
It provides argument suggestions by:
1. Finding the appropriate provider for the requested tool
2. Checking if that provider supports the requested argument
3. Calling the provider's get_completions method if supported
4. Formatting the results according to the MCP specification
If no provider exists for the tool, or the provider doesn't support the
argument, an empty result structure is returned rather than raising an error.
Additionally, any exceptions in the provider's completion logic are caught
and result in an empty response with error logging.
Args:
tool_name: Name of the tool requesting completion suggestions
argument_name: Name of the argument within the tool to provide completions for
current_value: Current value or partial input entered by the user
**context: Additional context that may be useful for generating completions,
such as values of other arguments, user preferences, or environment info
Returns:
Dictionary conforming to the MCP completion protocol with these keys:
- values: List of suggested completion values (strings), limited to 100 items
- hasMore: Boolean indicating if more than 100 suggestions were available
- total: Total number of suggestions actually included in the 'values' list
Example response:
```python
{
"values": ["option1", "option2", "option3"],
"hasMore": False,
"total": 3
}
```
Note:
The response is always structured as a valid MCP completion response, even
when errors occur or no suggestions are available. This ensures clients
always receive a predictable format.
"""
provider = self.get_provider(tool_name)
if not provider or not provider.supports_argument(argument_name):
return {
"values": [],
"hasMore": False,
"total": 0
}
try:
# Get suggestions from provider
suggestions = await provider.get_completions(
argument_name=argument_name,
current_value=current_value,
tool_name=tool_name,
**context
)
# Limit to 100 items as per MCP spec
has_more = len(suggestions) > 100
suggestions = suggestions[:100]
return {
"values": suggestions,
"hasMore": has_more,
"total": len(suggestions)
}
except Exception as e:
# Log error and return empty result
print(f"Error getting completions: {str(e)}")
return {
"values": [],
"hasMore": False,
"total": 0
}
# Example usage for file path completion
async def complete_file_paths(current_value: str, **context) -> List[str]:
"""
Generate filesystem path completion suggestions based on the current input.
This utility function provides intelligent path suggestions for file-related arguments,
making it easier for users to navigate and select files or directories in the filesystem.
It handles various path formats including relative paths, absolute paths, and user home
directory references.
Behavior:
- For empty input: Returns common starting points ("./", "../", "/")
- For partial paths: Performs glob matching to find all matching files and directories
- For directories: Appends a trailing slash to distinguish them from files
- Expands user directory references (e.g., "~/documents" becomes "/home/user/documents")
Path matching is case-sensitive or case-insensitive depending on the underlying filesystem.
On Windows, matching is typically case-insensitive, while on Unix-like systems it's case-sensitive.
The function handles permission errors gracefully - if a directory cannot be accessed due to
permission restrictions, it will be excluded from results without raising an exception.
Args:
current_value: The current path string provided by the user (can be empty, partial, or complete)
**context: Additional context that may influence completions:
- working_directory: Optional alternative directory to use as the base for relative paths
- file_extensions: Optional list of file extensions to filter results (e.g., [".py", ".txt"])
- include_hidden: Optional boolean to include hidden files/directories (default: False)
Returns:
List of path suggestions that match or extend the current_value. Each suggestion is formatted as:
- Regular files: The full path to the file (e.g., "./src/main.py")
- Directories: The full path with a trailing slash (e.g., "./src/utils/")
Examples:
- Input: "" → Output: ["./", "../", "/"]
- Input: "doc" → Output: ["documents/", "docker-compose.yml", "dockerfile"]
- Input: "~/Down" → Output: ["/home/user/Downloads/"]
- Input: "./src/" → Output: ["./src/main.py", "./src/utils/", "./src/tests/"]
Edge Cases:
- Symlinks: Followed to their target with the symlink path in the results
- Special files: Included in results, treated as regular files
- Non-existent paths: Returns partial matches based on the parent directory, if any
- Permission errors: Silently skips directories that cannot be accessed
Notes:
- Results are capped at 100 items to comply with MCP specification
- Directory suggestions always end with a trailing slash
- The function handles filesystem errors gracefully, returning empty list on access errors
"""
import glob
import os
# Handle empty value
if not current_value:
return ["./", "../", "/"]
# Expand user directory if needed
path = os.path.expanduser(current_value)
# Get the directory to search in
directory = os.path.dirname(path) if os.path.basename(path) else path
if not directory:
directory = "."
# Get matching files/directories
pattern = os.path.join(directory, f"{os.path.basename(path)}*")
matches = glob.glob(pattern)
# Format results
results = []
for match in matches:
if os.path.isdir(match):
results.append(f"{match}/")
else:
results.append(match)
return results
# Example completions for common arguments
"""
Predefined completion provider for common argument types used across multiple tools.
This global provider instance contains standardized suggestion lists for frequently
used arguments in the MCP system. It serves as a convenient default provider that
can be registered with the CompletionRegistry for tools that don't need specialized
completion providers.
The included completion lists cover:
- provider: Common LLM provider names (e.g., "openai", "anthropic")
- model: Popular model identifiers from various providers
- format: Standard data formats for input/output
- source_type: Common data source types for analysis tools
- analysis_type: Standard categories of analysis operations
Usage example:
```python
# Set as the default provider in a registry to handle common arguments
registry = CompletionRegistry()
registry.set_default_provider(COMMON_COMPLETIONS)
# Later, even for tools without specific providers, common arguments will work:
await registry.get_completions(
tool_name="any_tool",
argument_name="provider",
current_value="open" # Will match "openai"
)
```
This provider can be extended with additional arguments by creating a new
StaticCompletionProvider that combines these defaults with tool-specific completions.
"""
COMMON_COMPLETIONS = StaticCompletionProvider({
"provider": ["openai", "anthropic", "gemini", "mistral", "custom"],
"model": [
"gpt-4-turbo", "gpt-4o", "claude-3-5-sonnet", "claude-3-opus",
"gemini-1.5-pro", "gemini-1.5-flash", "mistral-large"
],
"format": ["json", "text", "markdown", "html", "csv"],
"source_type": ["csv", "json", "excel", "database", "api"],
"analysis_type": ["general", "sentiment", "entities", "summary"]
})
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/ums_api/ums_services.py:
--------------------------------------------------------------------------------
```python
"""Business logic and service functions for UMS API."""
import json
import math
import sqlite3
from collections import Counter, defaultdict, deque
from datetime import datetime
from pathlib import Path
from threading import Lock
from typing import Any, Dict, List, Optional
from .ums_models import (
MemoryDetail,
PreviewMemory,
CriticalPathAction,
FlameGraphNode,
)
from .ums_database import get_db_connection
# ---------- Utility Functions ----------
def format_file_size(size_bytes: int) -> str:
"""Format file size in human readable format"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_names[i]}"
def _dict_depth(d: Dict[str, Any], depth: int = 0) -> int:
if not isinstance(d, dict) or not d:
return depth
return max(_dict_depth(v, depth + 1) for v in d.values())
def _count_values(d: Dict[str, Any]) -> int:
cnt = 0
for v in d.values():
if isinstance(v, dict):
cnt += _count_values(v)
elif isinstance(v, list):
cnt += len(v)
else:
cnt += 1
return cnt
def calculate_state_complexity(state_data: Dict[str, Any]) -> float:
if not state_data:
return 0.0
comp = (
len(state_data) * 5 + _dict_depth(state_data) * 10 + _count_values(state_data) * 0.5
)
return round(min(100.0, comp), 2)
def compute_state_diff(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
diff = {"added": {}, "removed": {}, "modified": {}, "magnitude": 0.0}
keys = set(a) | set(b)
changed = 0
for k in keys:
if k not in a:
diff["added"][k] = b[k]
changed += 1
elif k not in b:
diff["removed"][k] = a[k]
changed += 1
elif a[k] != b[k]:
diff["modified"][k] = {"before": a[k], "after": b[k]}
changed += 1
if keys:
diff["magnitude"] = (changed / len(keys)) * 100
return diff
# ---------- Action Monitor Helper Functions ----------
def get_action_status_indicator(status: str, execution_time: float) -> dict:
"""Get status indicator with color and icon for action status"""
indicators = {
"running": {"color": "blue", "icon": "play", "label": "Running"},
"executing": {"color": "blue", "icon": "cpu", "label": "Executing"},
"in_progress": {"color": "orange", "icon": "clock", "label": "In Progress"},
"completed": {"color": "green", "icon": "check", "label": "Completed"},
"failed": {"color": "red", "icon": "x", "label": "Failed"},
"cancelled": {"color": "gray", "icon": "stop", "label": "Cancelled"},
"timeout": {"color": "yellow", "icon": "timer-off", "label": "Timeout"},
}
indicator = indicators.get(
status, {"color": "gray", "icon": "help", "label": "Unknown"}
)
# Add urgency flag for long-running actions
if (
status in ["running", "executing", "in_progress"] and execution_time > 120
): # 2 minutes
indicator["urgency"] = "high"
elif (
status in ["running", "executing", "in_progress"] and execution_time > 60
): # 1 minute
indicator["urgency"] = "medium"
else:
indicator["urgency"] = "low"
return indicator
def categorize_action_performance(execution_time: float, estimated_duration: float) -> str:
"""Categorize action performance based on execution time vs estimate"""
if estimated_duration <= 0:
return "unknown"
ratio = execution_time / estimated_duration
if ratio <= 0.5:
return "excellent"
elif ratio <= 0.8:
return "good"
elif ratio <= 1.2:
return "acceptable"
elif ratio <= 2.0:
return "slow"
else:
return "very_slow"
def get_action_resource_usage(action_id: str) -> dict:
"""Get resource usage for an action (placeholder implementation)"""
# This is a placeholder - in a real implementation, you'd fetch actual metrics
return {"cpu_usage": 0.0, "memory_usage": 0.0, "network_io": 0.0, "disk_io": 0.0}
def estimate_wait_time(position: int, queue: list) -> float:
"""Estimate wait time based on queue position and historical data"""
if position == 0:
return 0.0
# Average action time of 30 seconds (this could be calculated from historical data)
avg_action_time = 30.0
return position * avg_action_time
def get_priority_label(priority: int) -> str:
"""Get human-readable priority label"""
if priority <= 1:
return "Critical"
elif priority <= 3:
return "High"
elif priority <= 5:
return "Normal"
elif priority <= 7:
return "Low"
else:
return "Very Low"
def calculate_action_performance_score(action: dict) -> float:
"""Calculate performance score for a completed action"""
if action["status"] != "completed":
return 0.0
execution_time = action.get("execution_duration", 0)
if execution_time <= 0:
return 100.0
if execution_time <= 5:
return 100.0
elif execution_time <= 15:
return 90.0
elif execution_time <= 30:
return 80.0
elif execution_time <= 60:
return 70.0
elif execution_time <= 120:
return 60.0
else:
return max(50.0, 100.0 - (execution_time / 10))
def calculate_efficiency_rating(execution_time: float, result_size: int) -> str:
"""Calculate efficiency rating based on time and output"""
if execution_time <= 0:
return "unknown"
efficiency_score = result_size / execution_time if execution_time > 0 else 0
if efficiency_score >= 100:
return "excellent"
elif efficiency_score >= 50:
return "good"
elif efficiency_score >= 20:
return "fair"
else:
return "poor"
def calculate_performance_summary(actions: list) -> dict:
"""Calculate performance summary from action history"""
if not actions:
return {
"avg_score": 0.0,
"top_performer": None,
"worst_performer": None,
"efficiency_distribution": {},
}
scores = [a.get("performance_score", 0) for a in actions]
avg_score = sum(scores) / len(scores)
best_action = max(actions, key=lambda a: a.get("performance_score", 0))
worst_action = min(actions, key=lambda a: a.get("performance_score", 0))
efficiency_counts = Counter(a.get("efficiency_rating", "unknown") for a in actions)
return {
"avg_score": round(avg_score, 2),
"top_performer": {
"tool_name": best_action.get("tool_name", ""),
"score": best_action.get("performance_score", 0),
},
"worst_performer": {
"tool_name": worst_action.get("tool_name", ""),
"score": worst_action.get("performance_score", 0),
},
"efficiency_distribution": dict(efficiency_counts),
}
def generate_performance_insights(
overall_stats: dict, tool_stats: list, hourly_metrics: list
) -> list:
"""Generate actionable performance insights"""
insights = []
success_rate = (
overall_stats.get("successful_actions", 0) / overall_stats.get("total_actions", 1)
) * 100
if success_rate < 80:
insights.append(
{
"type": "warning",
"title": "Low Success Rate",
"message": f"Current success rate is {success_rate:.1f}%. Consider investigating failing tools.",
"severity": "high",
}
)
if tool_stats:
slowest_tool = max(tool_stats, key=lambda t: t.get("avg_duration", 0))
if slowest_tool.get("avg_duration", 0) > 60:
insights.append(
{
"type": "info",
"title": "Performance Optimization",
"message": f"{slowest_tool['tool_name']} is taking {slowest_tool['avg_duration']:.1f}s on average. Consider optimization.",
"severity": "medium",
}
)
if hourly_metrics:
peak_hour = max(hourly_metrics, key=lambda h: h.get("action_count", 0))
insights.append(
{
"type": "info",
"title": "Peak Usage",
"message": f"Peak usage occurs at {peak_hour['hour']}:00 with {peak_hour['action_count']} actions.",
"severity": "low",
}
)
return insights
# ---------- Memory Quality Functions ----------
def find_cognitive_patterns(
states: List[Dict[str, Any]], min_length: int, similarity_threshold: float
) -> List[Dict[str, Any]]:
"""Find recurring patterns in cognitive states"""
patterns = []
type_sequences = defaultdict(list)
for state in states:
type_sequences[state["state_type"]].append(state)
for state_type, sequence in type_sequences.items():
if len(sequence) >= min_length * 2:
for length in range(min_length, len(sequence) // 2 + 1):
for start in range(len(sequence) - length * 2 + 1):
subseq1 = sequence[start : start + length]
subseq2 = sequence[start + length : start + length * 2]
similarity = calculate_sequence_similarity(subseq1, subseq2)
if similarity >= similarity_threshold:
patterns.append(
{
"type": f"repeating_{state_type}",
"length": length,
"similarity": similarity,
"occurrences": 2,
"first_occurrence": subseq1[0]["timestamp"],
"pattern_description": f"Repeating {state_type} sequence of {length} states",
}
)
return sorted(patterns, key=lambda p: p["similarity"], reverse=True)
def calculate_sequence_similarity(
seq1: List[Dict[str, Any]], seq2: List[Dict[str, Any]]
) -> float:
"""Calculate similarity between two state sequences"""
if len(seq1) != len(seq2):
return 0.0
total_similarity = 0.0
for s1, s2 in zip(seq1, seq2, strict=False):
state_sim = calculate_single_state_similarity(s1, s2)
total_similarity += state_sim
return total_similarity / len(seq1)
def calculate_single_state_similarity(
state1: Dict[str, Any], state2: Dict[str, Any]
) -> float:
"""Calculate similarity between two individual states"""
data1 = state1.get("state_data", {})
data2 = state2.get("state_data", {})
if not data1 and not data2:
return 1.0
if not data1 or not data2:
return 0.0
keys1 = set(data1.keys())
keys2 = set(data2.keys())
key_similarity = len(keys1 & keys2) / len(keys1 | keys2) if keys1 | keys2 else 1.0
common_keys = keys1 & keys2
value_similarity = 0.0
if common_keys:
matching_values = sum(1 for key in common_keys if data1[key] == data2[key])
value_similarity = matching_values / len(common_keys)
return (key_similarity + value_similarity) / 2
def analyze_state_transitions(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Analyze transitions between cognitive states"""
transitions = defaultdict(int)
for i in range(len(states) - 1):
current_type = states[i]["state_type"]
next_type = states[i + 1]["state_type"]
transition = f"{current_type} → {next_type}"
transitions[transition] += 1
sorted_transitions = sorted(transitions.items(), key=lambda x: x[1], reverse=True)
return [
{
"transition": transition,
"count": count,
"percentage": (count / (len(states) - 1)) * 100 if len(states) > 1 else 0,
}
for transition, count in sorted_transitions
]
def detect_cognitive_anomalies(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Detect anomalous cognitive states"""
anomalies = []
if len(states) < 3:
return anomalies
complexities = [calculate_state_complexity(s.get("state_data", {})) for s in states]
avg_complexity = sum(complexities) / len(complexities)
std_complexity = (
sum((c - avg_complexity) ** 2 for c in complexities) / len(complexities)
) ** 0.5
for i, state in enumerate(states):
complexity = complexities[i]
z_score = (
(complexity - avg_complexity) / std_complexity if std_complexity > 0 else 0
)
if abs(z_score) > 2:
anomalies.append(
{
"state_id": state["state_id"],
"timestamp": state["timestamp"],
"anomaly_type": "complexity_outlier",
"z_score": z_score,
"description": f"Unusual complexity: {complexity:.1f} (avg: {avg_complexity:.1f})",
"severity": "high" if abs(z_score) > 3 else "medium",
}
)
return anomalies
# ---------- Working Memory System ----------
class WorkingMemorySystem:
"""
Working memory system for managing active memories with focus capabilities.
This system maintains a pool of recent memories with relevance scoring
and focus mode for filtering based on keywords or patterns.
"""
def __init__(self, capacity: int = 100, focus_threshold: float = 0.7):
self.capacity = capacity
self.focus_threshold = focus_threshold
self.memory_pool = deque(maxlen=capacity)
self.focus_mode_enabled = False
self.focus_keywords = []
self.memory_index = {} # memory_id -> memory mapping
self.category_index = defaultdict(list) # category -> [memory_ids]
self.access_counts = defaultdict(int) # memory_id -> access count
self.relevance_scores = {} # memory_id -> relevance score
self.initialized_at = datetime.now()
self.last_optimization = datetime.now()
self.optimization_count = 0
def add_memory(self, memory_id: str, content: str, category: str, importance: float = 5.0):
"""Add a memory to the working pool"""
memory = {
'memory_id': memory_id,
'content': content,
'category': category,
'importance': importance,
'added_at': datetime.now().timestamp(),
'last_accessed': datetime.now().timestamp()
}
# Remove old memory if exists
if memory_id in self.memory_index:
self.remove_memory(memory_id)
# Add to pool
self.memory_pool.append(memory)
self.memory_index[memory_id] = memory
self.category_index[category].append(memory_id)
# Calculate initial relevance
self._calculate_relevance(memory)
def remove_memory(self, memory_id: str):
"""Remove a memory from the working pool"""
if memory_id in self.memory_index:
memory = self.memory_index[memory_id]
self.memory_pool.remove(memory)
del self.memory_index[memory_id]
self.category_index[memory['category']].remove(memory_id)
if memory_id in self.relevance_scores:
del self.relevance_scores[memory_id]
if memory_id in self.access_counts:
del self.access_counts[memory_id]
def access_memory(self, memory_id: str):
"""Record memory access and update relevance"""
if memory_id in self.memory_index:
self.access_counts[memory_id] += 1
self.memory_index[memory_id]['last_accessed'] = datetime.now().timestamp()
self._calculate_relevance(self.memory_index[memory_id])
def set_focus_mode(self, enabled: bool, keywords: List[str] = None):
"""Enable or disable focus mode with optional keywords"""
self.focus_mode_enabled = enabled
self.focus_keywords = keywords or []
# Recalculate relevance for all memories
for memory in self.memory_pool:
self._calculate_relevance(memory)
def _calculate_relevance(self, memory: dict):
"""Calculate relevance score for a memory"""
base_score = memory['importance'] / 10.0 # Normalize to 0-1
# Recency factor
age_hours = (datetime.now().timestamp() - memory['added_at']) / 3600
recency_factor = max(0.1, 1.0 - (age_hours / 24)) # Decay over 24 hours
# Access frequency factor
access_factor = min(1.0, self.access_counts[memory['memory_id']] / 10.0)
# Focus mode factor
focus_factor = 1.0
if self.focus_mode_enabled and self.focus_keywords:
content_lower = memory['content'].lower()
keyword_matches = sum(1 for kw in self.focus_keywords if kw.lower() in content_lower)
focus_factor = min(2.0, 1.0 + (keyword_matches * 0.5))
# Calculate final score
relevance = base_score * recency_factor * (0.5 + 0.5 * access_factor) * focus_factor
self.relevance_scores[memory['memory_id']] = min(1.0, relevance)
def get_active_memories(self, limit: int = None) -> List[dict]:
"""Get active memories sorted by relevance"""
memories = list(self.memory_pool)
# Filter by focus threshold if in focus mode
if self.focus_mode_enabled:
memories = [m for m in memories if self.relevance_scores.get(m['memory_id'], 0) >= self.focus_threshold]
# Sort by relevance
memories.sort(key=lambda m: self.relevance_scores.get(m['memory_id'], 0), reverse=True)
if limit:
memories = memories[:limit]
return memories
def get_statistics(self) -> dict:
"""Get working memory statistics"""
active_memories = self.get_active_memories()
# Category distribution
category_dist = {}
for category, memory_ids in self.category_index.items():
category_dist[category] = len(memory_ids)
# Calculate average relevance
relevance_values = list(self.relevance_scores.values())
avg_relevance = sum(relevance_values) / len(relevance_values) if relevance_values else 0
return {
'total_memories': len(self.memory_pool),
'active_memories': len(active_memories),
'capacity_used': len(self.memory_pool) / self.capacity * 100,
'avg_relevance_score': avg_relevance,
'category_distribution': category_dist,
'total_accesses': sum(self.access_counts.values()),
'optimization_suggestions': self._get_optimization_suggestions()
}
def _get_optimization_suggestions(self) -> int:
"""Count optimization suggestions"""
suggestions = 0
# Check for low relevance memories
low_relevance = sum(1 for score in self.relevance_scores.values() if score < 0.3)
if low_relevance > self.capacity * 0.2: # More than 20% low relevance
suggestions += 1
# Check for stale memories
now = datetime.now().timestamp()
stale_memories = sum(1 for m in self.memory_pool if (now - m['last_accessed']) > 3600) # 1 hour
if stale_memories > self.capacity * 0.3: # More than 30% stale
suggestions += 1
# Check for unbalanced categories
if self.category_index:
sizes = [len(ids) for ids in self.category_index.values()]
if max(sizes) > sum(sizes) * 0.5: # One category has more than 50%
suggestions += 1
return suggestions
def optimize(self):
"""Optimize working memory by removing low-relevance memories"""
# Remove memories below threshold
to_remove = [
m['memory_id'] for m in self.memory_pool
if self.relevance_scores.get(m['memory_id'], 0) < 0.2
]
for memory_id in to_remove:
self.remove_memory(memory_id)
self.last_optimization = datetime.now()
self.optimization_count += 1
return len(to_remove)
# Global working memory instance
_working_memory_system = None
_working_memory_lock = Lock()
def get_working_memory_system() -> WorkingMemorySystem:
"""Get or create the global working memory system instance"""
global _working_memory_system
with _working_memory_lock:
if _working_memory_system is None:
_working_memory_system = WorkingMemorySystem()
return _working_memory_system
# ---------- Timeline Analysis Functions ----------
def generate_timeline_segments(
timeline_data: List[Dict[str, Any]], granularity: str, hours: int
) -> List[Dict[str, Any]]:
"""Generate timeline segments summarising state counts / complexity over time."""
if not timeline_data:
return []
start_ts = min(item["timestamp"] for item in timeline_data)
end_ts = max(item["timestamp"] for item in timeline_data)
seg_seconds = 1 if granularity == "second" else 60 if granularity == "minute" else 3600
segments: List[Dict[str, Any]] = []
current = start_ts
while current < end_ts:
seg_end = current + seg_seconds
seg_states = [it for it in timeline_data if current <= it["timestamp"] < seg_end]
if seg_states:
segments.append(
{
"start_time": current,
"end_time": seg_end,
"state_count": len(seg_states),
"avg_complexity": sum(s["complexity_score"] for s in seg_states)
/ len(seg_states),
"max_change_magnitude": max(s["change_magnitude"] for s in seg_states),
"dominant_type": Counter(
s["state_type"] for s in seg_states
).most_common(1)[0][0],
}
)
current = seg_end
return segments
def calculate_timeline_stats(timeline_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Return aggregate stats about timeline complexity / changes."""
if not timeline_data:
return {}
complexities = [it["complexity_score"] for it in timeline_data]
changes = [it["change_magnitude"] for it in timeline_data if it["change_magnitude"] > 0]
stypes = Counter(it["state_type"] for it in timeline_data)
return {
"avg_complexity": sum(complexities) / len(complexities),
"max_complexity": max(complexities),
"avg_change_magnitude": (sum(changes) / len(changes)) if changes else 0,
"max_change_magnitude": max(changes) if changes else 0,
"most_common_type": stypes.most_common(1)[0][0] if stypes else None,
"type_distribution": dict(stypes),
}
# ---------- Flame Graph Functions ----------
def build_flame_graph_structure(actions: List[Dict], workflow_id: str) -> Dict:
"""Build hierarchical flame graph structure from actions"""
total_duration = sum(action.get('duration', 0) for action in actions if action.get('duration'))
flame_graph_data = {
'name': f'Workflow {workflow_id}',
'value': total_duration,
'children': []
}
# Group actions by tool for flame graph hierarchy
tool_groups = {}
for action in actions:
tool_name = action.get('tool_name', 'unknown')
if tool_name not in tool_groups:
tool_groups[tool_name] = []
tool_groups[tool_name].append(action)
# Build hierarchical structure
for tool_name, tool_actions in tool_groups.items():
tool_duration = sum(action.get('duration', 0) for action in tool_actions if action.get('duration'))
tool_node = {
'name': tool_name,
'value': tool_duration,
'children': []
}
# Add individual actions as children
for action in tool_actions:
if action.get('duration'):
action_node = {
'name': f"Action {action['action_id']}",
'value': action['duration'],
'action_id': action['action_id'],
'status': action.get('status'),
'reasoning': action.get('reasoning', ''),
'started_at': action.get('started_at'),
'completed_at': action.get('completed_at')
}
tool_node['children'].append(action_node)
flame_graph_data['children'].append(tool_node)
return flame_graph_data
def calculate_critical_path(actions: List[Dict]) -> List[Dict]:
"""Calculate the critical path through the workflow"""
if not actions:
return []
# Sort actions by start time
sorted_actions = sorted(actions, key=lambda x: x.get('started_at', 0))
critical_path = []
current_time = min(action['started_at'] for action in sorted_actions if action.get('started_at'))
workflow_end = max(action['completed_at'] for action in sorted_actions if action.get('completed_at'))
while current_time < workflow_end:
# Find action that was running at current_time and ends latest
running_actions = [
a for a in sorted_actions
if a.get('started_at', 0) <= current_time and a.get('completed_at', 0) > current_time
]
if running_actions:
# Find the action that ends latest (most critical)
critical_action = max(running_actions, key=lambda x: x.get('completed_at', 0))
if critical_action not in [cp['action_id'] for cp in critical_path]:
critical_path.append({
'action_id': critical_action['action_id'],
'tool_name': critical_action.get('tool_name'),
'duration': critical_action.get('duration', 0),
'start_time': critical_action.get('started_at'),
'end_time': critical_action.get('completed_at')
})
current_time = critical_action.get('completed_at', current_time + 1)
else:
# No action running, find next action start
future_actions = [a for a in sorted_actions if a.get('started_at', 0) > current_time]
if future_actions:
current_time = min(a['started_at'] for a in future_actions)
else:
break
return critical_path
def convert_to_model(node: Dict) -> FlameGraphNode:
"""Convert flame graph dictionary to Pydantic model"""
return FlameGraphNode(
name=node['name'],
value=node['value'],
children=[convert_to_model(child) for child in node.get('children', [])],
action_id=node.get('action_id'),
status=node.get('status'),
reasoning=node.get('reasoning'),
started_at=node.get('started_at'),
completed_at=node.get('completed_at')
)
# ---------- Performance Recommendation Functions ----------
def calculate_tool_reliability_score(tool_stats: dict) -> float:
"""Calculate reliability score for a tool"""
total_calls = tool_stats.get('total_calls', 0)
successful_calls = tool_stats.get('successful_calls', 0)
if total_calls == 0:
return 0.0
success_rate = successful_calls / total_calls
volume_factor = min(1.0, total_calls / 100) # Normalize by 100 calls
return round(success_rate * volume_factor * 100, 2)
def categorize_tool_performance(avg_execution_time: float) -> str:
"""Categorize tool performance based on average execution time"""
if avg_execution_time is None:
return 'unknown'
if avg_execution_time <= 5:
return 'fast'
elif avg_execution_time <= 15:
return 'normal'
elif avg_execution_time <= 30:
return 'slow'
else:
return 'very_slow'
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/services/prompts.py:
--------------------------------------------------------------------------------
```python
"""Prompt template service for managing and rendering prompt templates."""
import asyncio
import json
import os
import threading
from pathlib import Path
from typing import Any, Dict, Optional
from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.exceptions import PromptTemplateError
from ultimate_mcp_server.utils.logging import get_logger
logger = get_logger(__name__)
# Singleton instance
_prompt_service = None
def get_prompt_service():
"""Get the global prompt service instance."""
global _prompt_service
if _prompt_service is None:
_prompt_service = PromptService()
return _prompt_service
class PromptService:
"""
Service for managing, storing, rendering, and versioning prompt templates.
The PromptService provides a centralized system for handling prompt templates
used throughout the MCP application. It manages the entire lifecycle of prompt
templates, from loading them from disk to rendering them with variables for use
with language models. The service provides both persistent storage and runtime
management of templates.
Key Features:
- File-based template storage using both .txt and .json formats
- Runtime template registration and modification
- Variable substitution for dynamic prompt generation
- Categorization of templates for organizational purposes
- Asynchronous persistence to avoid blocking operations
- Error handling and logging for template issues
Template Organization:
Templates are organized using a naming convention where the prefix before the
first underscore represents the category (e.g., "rag_query" belongs to the "rag"
category). This categorization is used when saving templates to disk, with each
category stored in its own JSON file.
File Formats:
- Individual .txt files: One template per file, filename is the template name
- JSON files: Multiple templates in a single file, typically grouped by category
This service employs a singleton pattern, ensuring only one instance exists
across the application. Always use the get_prompt_service() or get_prompt_manager()
functions to access it, rather than instantiating directly.
Usage Example:
```python
# Get the service
prompt_service = get_prompt_service()
# Retrieve a template
template = prompt_service.get_template("rag_query")
# Register a new template
prompt_service.register_template(
"greeting",
"Hello {name}, welcome to {service_name}!"
)
# Render a template with variables
greeting = prompt_service.render_template(
"greeting",
{"name": "Alice", "service_name": "Ultimate MCP"}
)
```
Note:
All file operations are handled with proper error handling to ensure
the service continues functioning even if individual template files
are corrupted or missing.
"""
def __init__(self):
"""Initialize the prompt service.
Args:
templates_dir: Directory containing template files
"""
self.templates: Dict[str, str] = {}
try:
config = get_config()
self.templates_dir = config.prompt_templates_directory
logger.info(f"Initializing PromptService. Looking for templates in: {self.templates_dir}")
self._load_templates()
except Exception as e:
logger.error(f"Failed to initialize PromptService: {e}", exc_info=True)
# Allow service to exist even if loading fails, get_template will raise errors
# Create templates directory if it doesn't exist
os.makedirs(self.templates_dir, exist_ok=True)
# Read templates from files
self._read_templates()
logger.info(f"Prompt service initialized with {len(self.templates)} templates")
def _load_templates(self):
"""
Load prompt templates from individual .txt files in the templates directory.
This method scans the configured templates directory for .txt files and loads
each file as a separate template. It uses the filename (without extension)
as the template name and the file content as the template text. This provides
a simple way to manage templates as individual files, which can be useful for
version control and template organization.
The loading process:
1. Verifies the templates directory exists and is accessible
2. Scans for all .txt files using glob pattern matching
3. For each file:
- Extracts the template name from the filename
- Reads the file content as the template text
- Adds the template to the in-memory template dictionary
- Logs the successful load
4. Handles exceptions for each file individually to prevent a single corrupted
file from blocking all template loading
5. Logs summary information about the loading process
This approach allows templates to be:
- Managed individually in separate files
- Edited directly using text editors
- Organized in a flat structure for simplicity
- Added or removed without changing code
The method is called during service initialization but can be called again
to refresh templates from disk if needed.
Note:
This method only processes .txt files. JSON format templates are handled
by the separate _read_templates method. Both methods work together to
provide a complete template loading solution.
"""
if not Path(self.templates_dir).is_dir():
logger.warning(f"Prompt templates directory not found or not a directory: {self.templates_dir}")
return
loaded_count = 0
for filepath in Path(self.templates_dir).glob('*.txt'):
try:
template_name = filepath.stem # Use filename without extension as name
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
self.templates[template_name] = content
logger.debug(f"Loaded prompt template: {template_name}")
loaded_count += 1
except Exception as e:
logger.error(f"Failed to load prompt template {filepath.name}: {e}")
if loaded_count > 0:
logger.info(f"Successfully loaded {loaded_count} prompt templates.")
else:
logger.info("No prompt templates found or loaded.")
def _read_templates(self) -> None:
"""
Load prompt templates from JSON files in the templates directory.
This method complements _load_templates by handling template collections
stored in JSON format. It scans for .json files in the templates directory
and processes each file to extract multiple templates. Each JSON file can
contain multiple templates, organized as a dictionary with template names
as keys and template content as values.
The loading process:
1. Scans the templates directory for all .json files
2. For each JSON file:
- Parses the JSON content
- Extracts each key-value pair as a template name and content
- Handles both simple string templates and structured template objects
- Adds valid templates to the in-memory template collection
3. Logs detailed information about successful and failed loads
Template JSON Format Support:
- Simple format: `{"template_name": "Template content with {variables}"}`
- Structured format: `{"template_name": {"text": "Template content", ...}}`
where the template text is extracted from the "text" field
The JSON format is particularly useful for:
- Storing multiple related templates in a single file
- Organizing templates by category or function
- Including metadata or configuration alongside templates
- Efficiently managing large collections of templates
Error Handling:
- Each JSON file is processed independently, so errors in one file won't
prevent loading from other files
- Invalid template formats trigger warnings but don't halt processing
- JSON parse errors are logged with file information for easier debugging
Note:
This method works in conjunction with _load_templates to provide a
comprehensive template loading system supporting both individual
.txt files and collections in .json files.
"""
try:
template_files = list(Path(self.templates_dir).glob("*.json"))
logger.info(f"Found {len(template_files)} template files")
for template_file in template_files:
try:
with open(template_file, "r", encoding="utf-8") as f:
templates_data = json.load(f)
# Add templates from file
for template_name, template_content in templates_data.items():
if isinstance(template_content, str):
self.templates[template_name] = template_content
elif isinstance(template_content, dict) and "text" in template_content:
self.templates[template_name] = template_content["text"]
else:
logger.warning(f"Invalid template format for {template_name}")
logger.info(f"Loaded templates from {template_file.name}")
except Exception as e:
logger.error(f"Error loading template file {template_file.name}: {str(e)}")
except Exception as e:
logger.error(f"Error reading templates: {str(e)}")
def _save_templates(self) -> None:
"""
Persist all in-memory templates to disk in categorized JSON files.
This method implements the template persistence strategy, organizing templates
by category and saving them to appropriate JSON files on disk. It ensures that
any runtime changes to templates (additions, modifications, or deletions) are
preserved across application restarts.
The saving process:
1. Groups templates into categories based on naming conventions
- Extracts the category from the template name (prefix before first underscore)
- Templates without underscores go to the "general" category
2. For each category:
- Creates or updates a JSON file named "{category}_templates.json"
- Writes all templates in that category as a formatted JSON object
- Uses proper indentation for human readability
3. Logs detailed information about the save operation
Template Categorization:
The method uses a convention-based approach to categorize templates:
- "rag_query" → Category: "rag", saved to "rag_templates.json"
- "chat_system" → Category: "chat", saved to "chat_templates.json"
- "greeting" → Category: "general", saved to "general_templates.json"
This categorization approach:
- Keeps related templates together for easier management
- Avoids a single monolithic file for all templates
- Makes it easier to locate templates by purpose
- Reduces the chance of merge conflicts in version control
Error Handling:
- The entire save operation is wrapped in exception handling to prevent
crashes due to file system issues
- Detailed error information is logged for debugging
- Even if saving fails, the in-memory templates remain intact
Note:
This method is called both directly and asynchronously through
_async_save_templates to provide both immediate and non-blocking
persistence options.
"""
try:
# Group templates by category
categorized_templates: Dict[str, Dict[str, Any]] = {}
for template_name, template_text in self.templates.items():
# Extract category from template name (before first _)
parts = template_name.split("_", 1)
category = parts[0] if len(parts) > 1 else "general"
if category not in categorized_templates:
categorized_templates[category] = {}
categorized_templates[category][template_name] = template_text
# Save each category to its own file
for category, templates in categorized_templates.items():
file_path = Path(self.templates_dir) / f"{category}_templates.json"
with open(file_path, "w", encoding="utf-8") as f:
json.dump(templates, f, indent=2)
logger.info(f"Saved {len(templates)} templates to {file_path.name}")
except Exception as e:
logger.error(f"Error saving templates: {str(e)}")
def get_template(self, template_name: str) -> Optional[str]:
"""
Retrieve a specific prompt template by its name.
This method provides access to individual templates stored in the service.
It performs a simple dictionary lookup, returning the template text if found
or None if the requested template doesn't exist in the collection.
The lookup is exact and case-sensitive, with no fuzzy matching or fallback
behavior. This design ensures predictable template resolution, which is
important for maintaining consistent prompt behavior in production systems.
Args:
template_name: The exact name of the template to retrieve
Returns:
The template text as a string if found, None if not found
Usage Example:
```python
template = prompt_service.get_template("rag_query")
if template:
# Template found, use it
prompt = template.format(query="What is machine learning?")
else:
# Template not found, handle the error
logger.error(f"Template 'rag_query' not found")
prompt = "Default fallback prompt: {query}"
```
Note:
Consider checking the return value for None before using the template,
or use a default template as a fallback to handle missing templates
gracefully.
"""
return self.templates.get(template_name)
def get_all_templates(self) -> Dict[str, str]:
"""
Retrieve a copy of all available prompt templates.
This method returns a dictionary containing all templates currently loaded
in the service, with template names as keys and template texts as values.
The returned dictionary is a shallow copy of the internal templates collection,
ensuring that modifications to the returned dictionary won't affect the
service's template storage.
Use cases for this method include:
- Listing available templates in an admin interface
- Analyzing or processing multiple templates at once
- Creating a template catalog or documentation
- Debugging template availability issues
Returns:
A dictionary mapping template names to their content
Usage Example:
```python
all_templates = prompt_service.get_all_templates()
# Display available templates
print(f"Available templates ({len(all_templates)}): ")
for name in sorted(all_templates.keys()):
print(f" - {name}")
# Find templates by pattern
rag_templates = {
name: content
for name, content in all_templates.items()
if name.startswith("rag_")
}
```
Note:
While the dictionary is a copy, the template strings themselves
are not deep-copied. This is generally not an issue since strings
are immutable in Python.
"""
return self.templates.copy()
def register_template(self, template_name: str, template_text: str) -> bool:
"""
Register a new template or update an existing one in the template collection.
This method adds a new template to the in-memory template collection or updates
an existing template if the name already exists. After adding or updating the
template, it initiates an asynchronous save operation to persist the changes
to disk, ensuring durability without blocking the calling code.
The template registration process:
1. Adds or updates the template in the in-memory dictionary
2. Schedules an asynchronous task to save all templates to disk
3. Returns a success indicator
This method is the primary way to programmatically add or modify templates
at runtime, enabling dynamic template management without requiring file
system access or application restarts.
Template Naming Conventions:
While not enforced, it's recommended to follow these naming conventions:
- Use lowercase names with underscores for readability
- Prefix with category name for organizational purposes (e.g., "rag_query")
- Use descriptive names that indicate the template's purpose
Args:
template_name: Name for the template (used for later retrieval)
template_text: Content of the template with variable placeholders
Returns:
True if the template was successfully registered, False if an error occurred
Usage Example:
```python
# Register a simple greeting template
success = prompt_service.register_template(
"greeting_formal",
"Dear {title} {last_name},\n\nI hope this message finds you well."
)
# Register a more complex template with formatting options
success = prompt_service.register_template(
"invoice_summary",
"Invoice #{invoice_id}\nDate: {date}\nTotal: ${amount:.2f}\n\n{items}"
)
```
Note:
This method handles the persistence automatically through an asynchronous
save operation. The changes are immediately available in memory but may
take a moment to be written to disk.
"""
try:
self.templates[template_name] = template_text
# Schedule template save
asyncio.create_task(self._async_save_templates())
return True
except Exception as e:
logger.error(f"Error registering template {template_name}: {str(e)}")
return False
async def _async_save_templates(self) -> None:
"""
Asynchronously persist templates to disk without blocking the main execution flow.
This method provides a non-blocking way to save templates by delegating to
the synchronous _save_templates method. It's designed to be called from
contexts where immediate persistence is desired but blocking operations
would be problematic, such as during API request handling.
When called:
- The method executes _save_templates directly rather than creating a task
- Despite being async, it doesn't actually perform any async operations
- This approach simplifies the interface while maintaining consistent
method signatures
Usage Context:
This method is typically called after template modifications to ensure
changes are persisted, such as after:
- Registering new templates
- Updating existing templates
- Removing templates
Since saving templates is an I/O-bound operation that involves disk writes,
this async wrapper helps to:
- Prevent UI freezing in interactive contexts
- Avoid blocking the event loop in server contexts
- Return control quickly to the calling code
- Ensure template persistence happens reliably in the background
Note:
While designed for asynchronous usage, this implementation currently
performs blocking I/O. In a future optimization, this could be changed
to use true async file I/O using libraries like aiofiles.
"""
self._save_templates()
def remove_template(self, template_name: str) -> bool:
"""
Remove a template from the collection if it exists.
This method deletes a template from the in-memory template collection
and initiates an asynchronous save operation to persist the deletion to disk.
If the specified template doesn't exist, the method returns False but
doesn't raise an exception, following a fail-soft approach for easier
error handling.
The template removal process:
1. Checks if the template exists in the collection
2. If found, removes it from the in-memory dictionary
3. Schedules an asynchronous task to save the updated template collection
4. Returns a boolean indicating success or failure
This method enables runtime management of templates, allowing obsolete
or incorrect templates to be removed without requiring file system access
or application restarts.
Args:
template_name: Name of the template to remove
Returns:
True if the template was found and removed, False if it wasn't found
Usage Example:
```python
# Check if removal was successful
if prompt_service.remove_template("outdated_template"):
logger.info("Template successfully removed")
else:
logger.warning("Template not found, nothing to remove")
# Unconditional removal attempt (ignoring result)
prompt_service.remove_template("temporary_template")
```
Note:
The template is immediately removed from memory but the disk
persistence happens asynchronously. If the application crashes
immediately after this call, the template might still exist in
the persisted files when the application restarts.
"""
if template_name in self.templates:
del self.templates[template_name]
# Schedule template save
asyncio.create_task(self._async_save_templates())
return True
return False
def render_template(
self,
template_name: str,
variables: Dict[str, Any]
) -> Optional[str]:
"""
Render a prompt template by substituting variables into the template text.
This method performs dynamic template rendering using Python's string formatting
system. It takes a template by name and a dictionary of variables, substitutes
the variables into the template placeholders, and returns the fully rendered text
ready for use with language models or other downstream components.
The rendering process:
1. Retrieves the template by name from the template repository
2. Validates that the template exists
3. Performs variable substitution using Python's str.format() method
4. Handles any errors that occur during substitution
Template Format:
Templates use Python's string formatting syntax with curly braces:
- Simple variables: "Hello, {name}!"
- Nested attributes: "Author: {book.author}"
- Formatting options: "Score: {score:.2f}"
Error Handling:
The method has comprehensive error handling for common issues:
- Missing templates: Returns None with a warning log
- Missing variables: Logs the specific missing variable and returns None
- Format errors: Logs the formatting error and returns None
Variable handling:
- All variables must be provided in the variables dictionary
- Variable types should be compatible with string formatting
- Complex objects can be used if they have string representations
Args:
template_name: Name of the template to render
variables: Dictionary mapping variable names to their values
Returns:
Rendered template text with variables substituted, or None if rendering fails
Example:
```python
# Define a template
service.register_template(
"user_profile",
"Name: {name}\nAge: {age}\nRole: {role}"
)
# Render with variables
profile = service.render_template(
"user_profile",
{"name": "Alice", "age": 30, "role": "Administrator"}
)
# Result: "Name: Alice\nAge: 30\nRole: Administrator"
```
"""
template = self.get_template(template_name)
if not template:
logger.warning(f"Template {template_name} not found")
return None
try:
return template.format(**variables)
except KeyError as e:
logger.error(f"Missing variable in template {template_name}: {str(e)}")
return None
except Exception as e:
logger.error(f"Error rendering template {template_name}: {str(e)}")
return None
# Global instance
_prompt_manager_instance = None
_prompt_manager_lock = threading.Lock()
def get_prompt_manager() -> PromptService:
"""
Get or create the global thread-safe singleton PromptService instance.
This function implements a thread-safe singleton pattern for the PromptService,
ensuring that only one instance is created and shared across the entire application,
regardless of which thread accesses it. It uses a mutex lock to prevent race conditions
when multiple threads attempt to create the instance simultaneously.
The singleton pattern ensures all components throughout the application use the same
prompt template repository and caching system, providing consistent behavior across
different threads and request contexts.
Returns:
PromptService: The global singleton PromptService instance.
Example:
```python
# This will always return the same instance, even from different threads
prompt_manager = get_prompt_manager()
template = prompt_manager.render_template("greeting", {"name": "User"})
```
"""
global _prompt_manager_instance
if _prompt_manager_instance is None:
with _prompt_manager_lock:
if _prompt_manager_instance is None:
_prompt_manager_instance = PromptService()
return _prompt_manager_instance
# Example Usage
if __name__ == '__main__':
from ultimate_mcp_server.utils.logging import setup_logging
setup_logging(log_level="DEBUG")
# Create dummy templates dir and file for example
EXAMPLE_TEMPLATES_DIR = Path("./temp_prompt_templates_example")
EXAMPLE_TEMPLATES_DIR.mkdir(exist_ok=True)
(EXAMPLE_TEMPLATES_DIR / "greeting.txt").write_text("Hello, {{name}}! How are you today?")
(EXAMPLE_TEMPLATES_DIR / "summary.txt").write_text("Summarize the following text:\n\n{{text}}")
# Set env var to use this temp dir
os.environ['GATEWAY_PROMPT_TEMPLATES_DIR'] = str(EXAMPLE_TEMPLATES_DIR.resolve())
os.environ['GATEWAY_FORCE_CONFIG_RELOAD'] = 'true' # Force reload
try:
manager = get_prompt_manager()
print(f"Templates directory: {manager.templates_dir}")
print(f"Available templates: {manager.list_templates()}")
greeting_template = manager.get_template('greeting')
print(f"Greeting Template: {greeting_template}")
try:
manager.get_template('non_existent')
except PromptTemplateError as e:
print(f"Caught expected error: {e}")
finally:
# Clean up
import shutil
shutil.rmtree(EXAMPLE_TEMPLATES_DIR)
print(f"Cleaned up {EXAMPLE_TEMPLATES_DIR}")
if 'GATEWAY_PROMPT_TEMPLATES_DIR' in os.environ:
del os.environ['GATEWAY_PROMPT_TEMPLATES_DIR']
if 'GATEWAY_FORCE_CONFIG_RELOAD' in os.environ:
del os.environ['GATEWAY_FORCE_CONFIG_RELOAD']
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/core/tournaments/utils.py:
--------------------------------------------------------------------------------
```python
"""
Utility functions for tournament functionality.
"""
import difflib
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional
from ultimate_mcp_server.core.models.tournament import (
EvaluatorConfig,
ModelResponseData,
TournamentData,
TournamentStatus,
)
# For file write, if using a tool:
from ultimate_mcp_server.tools.filesystem import write_file
logger = logging.getLogger(__name__)
def create_round_prompt(
tournament: TournamentData,
round_num: int,
previous_round_variant_responses: Dict[
str, ModelResponseData
], # Now takes full ModelResponseData
target_model_variant_id: Optional[str] = None, # For per-variant system prompts etc. (future)
) -> str:
"""Creates the prompt for a specific round."""
if round_num == 0:
return tournament.config.prompt
# --- Build prompt with previous round's responses ---
base_prompt_header = f"""This is Round {round_num} of an iterative refinement process.
Original Problem:
---
{tournament.config.prompt}
---
In the previous round (Round {round_num - 1}), different model variants produced the following outputs.
Your goal is to synthesize the best aspects, address weaknesses, and produce a superior solution.
"""
responses_section = []
for variant_id, resp_data in previous_round_variant_responses.items():
# For code tournaments, use extracted code if available and valid for prompting.
# For text, use full response_text.
content_to_show = ""
if tournament.config.tournament_type == "code":
# Prioritize clean extracted code for next round's prompt
content_to_show = (
resp_data.extracted_code if resp_data.extracted_code else resp_data.response_text
)
if (
not content_to_show or len(content_to_show.strip()) < 10
): # Heuristic for empty/trivial code
content_to_show = resp_data.response_text # Fallback to full text if code is bad
content_to_show = (
f"```python\n{content_to_show.strip()}\n```"
if content_to_show
else "[No valid code extracted]"
)
else: # Text tournament
content_to_show = (
resp_data.response_text if resp_data.response_text else "[No response text]"
)
if resp_data.error:
content_to_show += f"\n[Note: This variant encountered an error: {resp_data.error}]"
# Show overall score if available
score_info = ""
if resp_data.overall_score is not None:
score_info = f" (Overall Score: {resp_data.overall_score:.2f})"
responses_section.append(
f"--- Output from Variant: {variant_id}{score_info} ---\n{content_to_show.strip()}\n"
)
# --- Add type-specific instructions ---
if tournament.config.tournament_type == "code":
instructions = """
Carefully analyze all previous code solutions. Consider correctness, efficiency, readability, robustness, and how well they integrate good ideas.
Produce a NEW, complete Python implementation that is demonstrably better than any single prior solution.
Provide ONLY the Python code block, enclosed in triple backticks (```python ... ```).
Do not include any explanations outside of code comments.
"""
else: # Text tournament
instructions = """
Analyze each previous response based on the original problem. Consider relevance, accuracy, completeness, clarity, conciseness, and style.
Synthesize the best aspects of ALL responses into a single, improved response.
Your new response should be superior to any individual response from the previous round.
You MAY optionally start your response with a brief (1-2 sentences) explanation of your synthesis choices, enclosed in <thinking>...</thinking> tags.
Then, provide the improved text response itself.
"""
final_prompt = f"{base_prompt_header}\n{''.join(responses_section)}\n---Refinement Instructions---\n{instructions}"
return final_prompt.strip()
async def extract_thinking(response_text: str) -> Optional[str]:
"""Extracts <thinking>...</thinking> block. More robust extraction can be added."""
if not response_text:
return None
match = re.search(r"<thinking>(.*?)</thinking>", response_text, re.DOTALL | re.IGNORECASE)
return match.group(1).strip() if match else None
async def save_model_response_content(
tournament_storage_path: Path,
round_num: int,
variant_id: str,
response_text: Optional[str],
extracted_code: Optional[str],
thinking_process: Optional[str],
metrics: Dict[str, Any],
tournament_type: Literal["code", "text"],
) -> Dict[str, Optional[str]]:
"""Saves response text, extracted code, and metadata to files."""
round_dir = tournament_storage_path / f"round_{round_num}"
round_dir.mkdir(parents=True, exist_ok=True)
sanitized_variant_id = re.sub(r"[^a-zA-Z0-9_\-.]", "_", variant_id)
base_filename = f"{sanitized_variant_id}_r{round_num}"
# --- Main Markdown Report File ---
md_content = f"# Response: {variant_id} - Round {round_num}\n\n"
md_content += "## Metrics\n"
for k, v in metrics.items():
if isinstance(v, float):
md_content += f"- **{k.replace('_', ' ').title()}:** {v:.4f}\n"
else:
md_content += f"- **{k.replace('_', ' ').title()}:** {v}\n"
if thinking_process:
md_content += f"\n## Thinking Process\n```\n{thinking_process}\n```\n"
md_content += f"\n## Full Response Text\n```\n{response_text or '[No response text]'}\n```\n"
if tournament_type == "code" and extracted_code:
md_content += f"\n## Extracted Code\n```python\n{extracted_code}\n```\n"
md_file_path = round_dir / f"{base_filename}_report.md"
md_file_path.write_text(md_content, encoding="utf-8")
saved_paths = {"markdown_file": str(md_file_path), "code_file": None}
# --- Save Raw Extracted Code (if any) ---
if tournament_type == "code" and extracted_code:
code_file_path = round_dir / f"{base_filename}.py"
code_file_path.write_text(extracted_code, encoding="utf-8")
saved_paths["code_file"] = str(code_file_path)
logger.debug(f"Saved response artifacts for {variant_id} to {round_dir}")
return saved_paths
def generate_comparison_file_content(tournament: TournamentData, round_num: int) -> Optional[str]:
if round_num < 0 or round_num >= len(tournament.rounds_results):
return None
round_result = tournament.rounds_results[round_num]
if not round_result.responses:
return None
content = f"# Tournament Comparison Report - Round {round_num}\n\n"
content += f"**Tournament:** {tournament.name} (ID: {tournament.tournament_id})\n"
content += f"**Type:** {tournament.config.tournament_type}\n"
content += f"**Generated:** {datetime.now(timezone.utc).isoformat()}\n\n"
content += "## Round Summary & Scores\n"
content += (
"| Variant ID | Overall Score | Key Metrics (e.g., Cost, Latency) | Evaluator Scores |\n"
)
content += (
"|------------|---------------|-----------------------------------|------------------|\n"
)
sorted_responses = sorted(
round_result.responses.items(),
key=lambda item: item[1].overall_score if item[1].overall_score is not None else -1,
reverse=True,
)
for variant_id, resp_data in sorted_responses:
score_str = (
f"{resp_data.overall_score:.2f}" if resp_data.overall_score is not None else "N/A"
)
cost = resp_data.metrics.get("cost", 0.0)
latency = resp_data.metrics.get("latency_ms", "N/A")
key_metrics = f"Cost: ${cost:.4f}, Latency: {latency}ms"
eval_scores_str = (
"; ".join(
[
f"{eval_id}: {s_data.get('score', 'N/A')}"
for eval_id, s_data in resp_data.scores.items()
]
)
if resp_data.scores
else "N/A"
)
content += f"| {variant_id} | {score_str} | {key_metrics} | {eval_scores_str} |\n"
content += "\n"
# --- Add Diffs (Proposal 6) ---
if round_num > 0 and tournament.config.tournament_type == "code":
content += "## Code Diffs from Previous Best (if applicable)\n"
prev_round_best_code = None
# Find best code from previous round (simplistic: first non-error, highest score)
if round_num - 1 >= 0:
prev_round_data = tournament.rounds_results[round_num - 1]
best_prev_resp = max(
filter(
lambda r: r.extracted_code and r.overall_score is not None,
prev_round_data.responses.values(),
),
key=lambda r: r.overall_score,
default=None,
)
if best_prev_resp:
prev_round_best_code = best_prev_resp.extracted_code
current_best_resp = max(
filter(
lambda r: r.extracted_code and r.overall_score is not None,
round_result.responses.values(),
),
key=lambda r: r.overall_score,
default=None,
)
current_best_code = current_best_resp.extracted_code if current_best_resp else None
if prev_round_best_code and current_best_code:
diff = difflib.unified_diff(
prev_round_best_code.splitlines(keepends=True),
current_best_code.splitlines(keepends=True),
fromfile=f"round_{round_num - 1}_best.py",
tofile=f"round_{round_num}_best.py",
lineterm="",
)
content += f"### Diff: Best of Round {round_num - 1} vs Best of Round {round_num}\n"
content += "```diff\n"
content += "".join(diff)
content += "\n```\n\n"
elif current_best_code:
content += "Could not determine previous round's best code for diffing, or this is the first round with code.\n"
# TODO: Add HTML diff for text tournaments if a library is available.
content += "## Detailed Variant Responses\n"
for variant_id, resp_data in sorted_responses:
content += f"### Variant: {variant_id}\n"
content += f"- **Original Model:** {resp_data.model_id_original}\n"
content += (
f"- **Overall Score:** {resp_data.overall_score:.2f}\n"
if resp_data.overall_score is not None
else "- **Overall Score:** N/A\n"
)
content += "#### Metrics:\n"
for k, v in resp_data.metrics.items():
content += f" - {k}: {v}\n"
content += "#### Evaluator Scores:\n"
if resp_data.scores:
for eval_id, s_data in resp_data.scores.items():
content += f" - **{eval_id}**: Score: {s_data.get('score', 'N/A')}\n - Details: {s_data.get('details', 'N/A')[:200]}...\n" # Truncate details
else:
content += " - No scores available.\n"
if resp_data.thinking_process:
content += f"#### Thinking Process:\n```\n{resp_data.thinking_process}\n```\n"
content_key = (
"Extracted Code" if tournament.config.tournament_type == "code" else "Response Text"
)
code_lang_hint = "python" if tournament.config.tournament_type == "code" else ""
actual_content = (
resp_data.extracted_code
if tournament.config.tournament_type == "code" and resp_data.extracted_code
else resp_data.response_text
)
content += f"#### {content_key}:\n```{code_lang_hint}\n{actual_content or '[Content not available]'}\n```\n"
if resp_data.response_file_path: # Link to the full report for this variant
# Make path relative to tournament storage root for portability
try:
tournament_root = Path(tournament.storage_path)
relative_path = Path(resp_data.response_file_path).relative_to(
tournament_root.parent
) # one level up for `round_X/file`
content += f"\n[View Full Variant Report](./{relative_path})\n"
except ValueError: # If not relative (e.g. absolute path)
content += f"\n[View Full Variant Report]({resp_data.response_file_path})\n"
content += "\n---\n"
return content
def generate_leaderboard_file_content(tournament: TournamentData, round_num: int) -> Optional[str]:
"""Generates a leaderboard summary for the current round."""
if round_num < 0 or round_num >= len(tournament.rounds_results):
return None
round_result = tournament.rounds_results[round_num]
if not round_result.responses:
return None
content = f"# Leaderboard - Round {round_num}\n\n"
content += f"**Tournament:** {tournament.name}\n"
content += f"**Primary Metric(s):** {', '.join([e.evaluator_id for e in tournament.config.evaluators if e.primary_metric]) or 'Overall Score'}\n\n"
content += "| Rank | Variant ID | Overall Score | Primary Metric Score(s) |\n"
content += "|------|------------|---------------|-------------------------|\n"
# Sort by overall_score, then by primary metric if tied (more complex sorting can be added)
sorted_responses = sorted(
round_result.responses.values(),
key=lambda r: r.overall_score if r.overall_score is not None else -float("inf"),
reverse=True,
)
for i, resp_data in enumerate(sorted_responses):
rank = i + 1
score_str = (
f"{resp_data.overall_score:.2f}" if resp_data.overall_score is not None else "N/A"
)
primary_scores_list = []
for eval_cfg in tournament.config.evaluators:
if eval_cfg.primary_metric and eval_cfg.evaluator_id in resp_data.scores:
primary_scores_list.append(
f"{eval_cfg.evaluator_id}: {resp_data.scores[eval_cfg.evaluator_id].get('score', 'N/A')}"
)
primary_metrics_str = "; ".join(primary_scores_list) or "N/A"
content += (
f"| {rank} | {resp_data.model_id_variant} | {score_str} | {primary_metrics_str} |\n"
)
return content
def calculate_weighted_score(
scores: Dict[str, Dict[str, Any]], evaluator_configs: List[EvaluatorConfig]
) -> Optional[float]:
"""Calculates a single weighted overall score from multiple evaluator scores."""
if not scores or not evaluator_configs:
return None
total_score = 0.0
total_weight = 0.0
for eval_cfg in evaluator_configs:
eval_id = eval_cfg.evaluator_id
if eval_id in scores:
score_data = scores[eval_id]
# Assuming 'score' is the primary numerical output of an evaluator
numerical_score = score_data.get("score")
if isinstance(numerical_score, (int, float)):
total_score += numerical_score * eval_cfg.weight
total_weight += eval_cfg.weight
else:
logger.warning(
f"Evaluator '{eval_id}' provided non-numeric score: {numerical_score}"
)
if total_weight == 0:
# If no weights or no valid scores, average non-weighted if any scores present
valid_scores = [
s.get("score") for s in scores.values() if isinstance(s.get("score"), (int, float))
]
return sum(valid_scores) / len(valid_scores) if valid_scores else None
return total_score / total_weight
def update_overall_best_response(tournament: TournamentData):
"""Identifies and updates the tournament's overall best response across all completed rounds."""
current_best_score = -float("inf")
if (
tournament.overall_best_response
and tournament.overall_best_response.overall_score is not None
):
current_best_score = tournament.overall_best_response.overall_score
new_best_found = False
for round_result in tournament.rounds_results:
if round_result.status == TournamentStatus.COMPLETED:
for _, resp_data in round_result.responses.items():
if resp_data.overall_score is not None and not resp_data.error:
if resp_data.overall_score > current_best_score:
tournament.overall_best_response = resp_data
current_best_score = resp_data.overall_score
new_best_found = True
if new_best_found:
logger.info(
f"New overall best response for tournament '{tournament.name}' found: {tournament.overall_best_response.model_id_variant} with score {current_best_score:.2f}"
)
def calculate_code_metrics(code: Optional[str]) -> dict:
"""
Calculates basic metrics about a code string.
"""
if not code:
return {
"code_lines": 0,
"code_size_kb": 0.0,
"function_count": 0,
"class_count": 0,
"import_count": 0,
}
code_lines = code.count("\n") + 1
code_size_bytes = len(code.encode("utf-8"))
code_size_kb = round(code_size_bytes / 1024, 2)
function_count = len(re.findall(r"\bdef\s+\w+", code))
class_count = len(re.findall(r"\bclass\s+\w+", code))
import_count = len(re.findall(r"^import\s+|\bfrom\s+", code, re.MULTILINE))
return {
"code_lines": code_lines,
"code_size_kb": code_size_kb,
"function_count": function_count,
"class_count": class_count,
"import_count": import_count,
}
def generate_comparison_file(tournament: TournamentData, round_num: int) -> Optional[str]:
"""Generate a markdown comparison file for the given round.
Args:
tournament: The tournament data.
round_num: The round number to generate the comparison for.
Returns:
The markdown content string, or None if data is missing.
"""
if round_num < 0 or round_num >= len(tournament.rounds_results):
logger.warning(f"Cannot generate comparison for invalid round {round_num}")
return None
round_result = tournament.rounds_results[round_num]
if not round_result.responses:
logger.warning(f"Cannot generate comparison for round {round_num}, no responses found.")
return None
previous_round = tournament.rounds_results[round_num - 1] if round_num > 0 else None
is_code_tournament = tournament.config.tournament_type == "code"
# Start with a comprehensive header
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
comparison_content = f"# Tournament Comparison - Round {round_num}\n\n"
comparison_content += f"**Generated:** {timestamp}\n"
comparison_content += f"**Tournament ID:** {tournament.tournament_id}\n"
comparison_content += f"**Tournament Name:** {tournament.config.name}\n"
comparison_content += f"**Type:** {tournament.config.tournament_type}\n"
comparison_content += f"**Current Round:** {round_num} of {tournament.config.rounds}\n"
comparison_content += (
f"**Models:** {', '.join(model.model_id for model in tournament.config.models)}\n\n"
)
# Add original prompt section
if round_num == 0:
comparison_content += f"## Original Prompt\n\n```\n{tournament.config.prompt}\n```\n\n"
else:
# For later rounds, show what was provided to the models
comparison_content += f"## Round {round_num} Prompt\n\n"
# Get a sample prompt - all models get the same prompt in a round
sample_prompt = create_round_prompt(tournament, round_num)
comparison_content += f"```\n{sample_prompt[:500]}...\n```\n\n"
# Summarize overall metrics
comparison_content += "## Summary Metrics\n\n"
comparison_content += "| Model | Tokens In | Tokens Out | Cost | Latency (ms) |\n"
comparison_content += "|-------|-----------|------------|------|-------------|\n"
for model_id, response_data in sorted(round_result.responses.items()):
metrics = response_data.metrics
tokens_in = metrics.get("input_tokens", "N/A")
tokens_out = metrics.get("output_tokens", "N/A")
cost = metrics.get("cost", "N/A")
latency = metrics.get("latency_ms", "N/A")
display_model_id = model_id.split(":")[-1] if ":" in model_id else model_id
cost_display = f"${cost:.6f}" if isinstance(cost, (int, float)) else cost
comparison_content += (
f"| {display_model_id} | {tokens_in} | {tokens_out} | {cost_display} | {latency} |\n"
)
comparison_content += "\n## Detailed Model Responses\n\n"
for model_id, response_data in sorted(round_result.responses.items()):
metrics = response_data.metrics
display_model_id = model_id.split(":")[-1] if ":" in model_id else model_id
comparison_content += f"### {display_model_id}\n\n"
# Display detailed metrics as a subsection
comparison_content += "#### Metrics\n\n"
tokens_in = metrics.get("input_tokens", "N/A")
tokens_out = metrics.get("output_tokens", "N/A")
total_tokens = metrics.get("total_tokens", "N/A")
cost = metrics.get("cost", "N/A")
latency = metrics.get("latency_ms", "N/A")
comparison_content += (
f"- **Tokens:** {tokens_in} in, {tokens_out} out, {total_tokens} total\n"
)
if isinstance(cost, (int, float)):
comparison_content += f"- **Cost:** ${cost:.6f}\n"
else:
comparison_content += f"- **Cost:** {cost}\n"
comparison_content += f"- **Latency:** {latency}ms\n"
# Code-specific metrics
if is_code_tournament:
code_lines = metrics.get("code_lines", "N/A")
code_size = metrics.get("code_size_kb", "N/A")
comparison_content += f"- **Code Stats:** {code_lines} lines, {code_size} KB\n"
comparison_content += "\n"
# Display thinking process if available
if response_data.thinking_process:
comparison_content += "#### Thinking Process\n\n"
comparison_content += f"```\n{response_data.thinking_process}\n```\n\n"
# Display response content
if is_code_tournament:
comparison_content += "#### Extracted Code\n\n"
comparison_content += "```python\n"
comparison_content += response_data.extracted_code or "# No code extracted"
comparison_content += "\n```\n\n"
else:
# For text tournaments, display the raw response
comparison_content += "#### Response Text\n\n"
comparison_content += "```\n"
comparison_content += response_data.response_text or "[No response text]"
comparison_content += "\n```\n\n"
# Add link to the full response file
if response_data.response_file_path:
comparison_content += (
f"[View full response file]({response_data.response_file_path})\n\n"
)
# Add a section comparing changes from previous round if this isn't round 0
if previous_round and previous_round.responses:
comparison_content += "## Changes from Previous Round\n\n"
for model_id, response_data in sorted(round_result.responses.items()):
if model_id in previous_round.responses:
display_model_id = model_id.split(":")[-1] if ":" in model_id else model_id
comparison_content += f"### {display_model_id}\n\n"
# Compare metrics
current_metrics = response_data.metrics
previous_metrics = previous_round.responses[model_id].metrics
current_tokens_out = current_metrics.get("output_tokens", 0)
previous_tokens_out = previous_metrics.get("output_tokens", 0)
token_change = (
current_tokens_out - previous_tokens_out
if isinstance(current_tokens_out, (int, float))
and isinstance(previous_tokens_out, (int, float))
else "N/A"
)
comparison_content += f"- **Token Change:** {token_change} tokens\n"
# Note: Here you could add more sophisticated text comparison/diff
comparison_content += "- Review the full responses to see detailed changes\n\n"
return comparison_content.strip()
async def save_model_response(
tournament: TournamentData,
round_num: int,
model_id: str,
response_text: str,
thinking: Optional[str] = None,
timestamp: Optional[str] = None,
) -> str:
"""Save model response to a file using standardized filesystem tools.
Args:
tournament: Tournament data
round_num: Round number
model_id: Model ID that generated this response
response_text: The text response to save
thinking: Optional thinking process from the model
timestamp: Optional timestamp (defaults to current time if not provided)
Returns:
Path to saved response file
"""
if not timestamp:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Get path to tournament storage directory
storage_dir = Path(tournament.storage_path)
round_dir = storage_dir / f"round_{round_num}"
round_dir.mkdir(exist_ok=True)
# Create a safe filename from model ID
safe_model_id = model_id.replace(":", "_").replace("/", "_")
response_file = round_dir / f"{safe_model_id}_response.md"
# Construct the markdown file with basic metadata header
content = f"""# Response from {model_id}
## Metadata
- Tournament: {tournament.name}
- Round: {round_num}
- Model: {model_id}
- Timestamp: {timestamp}
## Response:
{response_text}
"""
# Add thinking process if available
if thinking:
content += f"\n\n## Thinking Process:\n\n{thinking}\n"
# Use the standard filesystem write tool
try:
# Properly use the async write_file tool
result = await write_file(path=str(response_file), content=content)
if not result.get("success", False):
logger.warning(f"Standard write_file tool reported failure: {result.get('error')}")
# Fall back to direct write
with open(response_file, "w", encoding="utf-8") as f:
f.write(content)
except Exception as e:
logger.error(f"Error using standardized file writer: {e}. Using direct file write.")
# Fall back to direct write in case of errors
with open(response_file, "w", encoding="utf-8") as f:
f.write(content)
return str(response_file)
def get_round_dir(tournament: TournamentData, round_num: int) -> Path:
"""Get the directory path for a specific tournament round.
Args:
tournament: The tournament data.
round_num: The round number.
Returns:
Path to the round directory.
"""
tournament_dir = Path(tournament.storage_path)
round_dir = tournament_dir / f"round_{round_num}"
return round_dir
def get_word_count(text: str) -> int:
"""Get the word count of a text string.
Args:
text: The text to count words in.
Returns:
The number of words.
"""
if not text:
return 0
return len(text.split())
def generate_synthesis_prompt(
tournament: TournamentData, previous_responses: Dict[str, str]
) -> str:
"""Generate the prompt for the synthesis round.
Args:
tournament: The tournament data
previous_responses: A dictionary mapping model IDs to their responses
Returns:
The synthesis prompt for the next round.
"""
# Letter used for referring to models to avoid bias
letters = ["A", "B", "C", "D", "E", "F", "G", "H"]
# Start with a base prompt instructing the model what to do
prompt = f"""# {tournament.name} - Synthesis Round
Your task is to create an improved version based on the responses from multiple models.
Original task:
{tournament.config.prompt}
Below are responses from different models. Review them and create a superior response
that combines the strengths of each model's approach while addressing any weaknesses.
"""
# Add each model's response
for i, (model_id, response) in enumerate(previous_responses.items()):
if i < len(letters):
letter = letters[i]
model_name = model_id.split(":")[-1] if ":" in model_id else model_id
prompt += f"""
## Model {letter} ({model_name}) Response:
{response}
"""
# Add synthesis instructions
prompt += """
# Your Task
Based on the responses above:
1. Create a single, unified response that represents the best synthesis of the information
2. Incorporate the strengths of each model's approach
3. Improve upon any weaknesses or omissions
4. Your response should be more comprehensive, accurate, and well-structured than any individual response
## Thinking Process
Start by briefly analyzing the strengths and weaknesses of each model's response, then explain your synthesis approach.
Example: "I synthesized the structured approach of Model A with the comprehensive detail from Model B, ensuring..."
"""
return prompt
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/async_utils.py:
--------------------------------------------------------------------------------
```python
"""Async utilities for Ultimate MCP Server."""
import asyncio
import functools
import time
from contextlib import asynccontextmanager
from typing import Any, Callable, List, Optional, Type, TypeVar, Union
from ultimate_mcp_server.utils import get_logger
logger = get_logger(__name__)
# Type definitions
T = TypeVar('T')
AsyncCallable = Callable[..., Any]
class RateLimiter:
"""
Rate limiter for controlling request rates to external services.
This class implements a token bucket algorithm to enforce API rate limits,
preventing too many requests in a short period of time. It's designed for use
in asynchronous code and will automatically pause execution when limits are reached.
The rate limiter tracks the timestamps of recent calls and blocks new calls
if they would exceed the configured rate limit. When the limit is reached,
the acquire() method blocks until enough time has passed to allow another call.
This is useful for:
- Respecting API rate limits of external services
- Preventing service overload in high-concurrency applications
- Implementing polite crawling/scraping behavior
- Managing resource access in distributed systems
Usage example:
```python
# Create a rate limiter that allows 5 calls per second
limiter = RateLimiter(max_calls=5, period=1.0)
async def make_api_call():
# This will automatically wait if we're over the limit
await limiter.acquire()
# Now make the actual API call...
return await actual_api_call()
```
"""
def __init__(self, max_calls: int, period: float):
"""
Initialize the rate limiter.
Args:
max_calls: Maximum number of calls allowed within the specified period.
For example, 100 calls per period.
period: Time period in seconds over which the max_calls limit applies.
For example, 60.0 for a per-minute rate limit.
"""
self.max_calls = max_calls
self.period = period
self.calls = []
self.lock = asyncio.Lock()
async def acquire(self):
"""
Acquire permission to make a call, waiting if necessary.
This method blocks until a call is allowed based on the rate limit. When the
limit has been reached, it will sleep until enough time has passed to allow
another call, respecting the configured max_calls within the period.
The method ensures thread-safety through an asyncio lock, making it safe to use
across multiple tasks. It also handles the case where waiting for rate limit
permissions overlaps with multiple concurrent requests.
Returns:
None. When this method returns, the caller is allowed to proceed.
Raises:
asyncio.CancelledError: If the task is cancelled while waiting.
"""
async with self.lock:
now = time.time()
# Remove expired timestamps
self.calls = [t for t in self.calls if now - t < self.period]
# Check if we're under the limit
if len(self.calls) < self.max_calls:
self.calls.append(now)
return
# Calculate wait time
wait_time = self.period - (now - self.calls[0])
if wait_time > 0:
# Release lock while waiting
self.lock.release()
try:
logger.debug(
f"Rate limit reached, waiting {wait_time:.2f}s",
emoji_key="warning"
)
await asyncio.sleep(wait_time)
finally:
# Reacquire lock
await self.lock.acquire()
# Retry after waiting
await self.acquire()
else:
# Oldest call just expired, record new call
self.calls = self.calls[1:] + [now]
@asynccontextmanager
async def timed_context(name: str):
"""
Async context manager for measuring and logging operation duration.
This utility provides a simple way to time asynchronous operations and log their
duration upon completion. It's useful for performance monitoring, debugging,
and identifying bottlenecks in asynchronous code.
The context manager:
1. Records the start time when entering the context
2. Allows the wrapped code to execute
3. Calculates the elapsed time when the context exits
4. Logs the operation name and duration with appropriate formatting
This works with any async code, including API calls, database operations,
file I/O, or computational tasks.
Args:
name: Descriptive name of the operation being timed. This name will appear
in log messages for easy identification.
Yields:
None - This context manager doesn't provide any additional context variables.
Example usage:
```python
async def fetch_user_data(user_id):
async with timed_context("Fetch user data"):
return await database.get_user(user_id)
async def process_document(doc_id):
async with timed_context("Document processing"):
# Multiple operations can be timed together
doc = await fetch_document(doc_id)
results = await analyze_document(doc)
return results
```
"""
start_time = time.time()
try:
yield
finally:
duration = time.time() - start_time
logger.debug(
f"{name} completed in {duration:.3f}s",
emoji_key="time",
time=duration
)
async def gather_with_concurrency(
n: int,
*tasks,
return_exceptions: bool = False
) -> List[Any]:
"""
Run multiple async tasks with a controlled concurrency limit.
This function provides a way to execute multiple asynchronous tasks in parallel
while ensuring that no more than a specified number of tasks run simultaneously.
It's similar to asyncio.gather() but with an added concurrency control mechanism.
This is particularly valuable for:
- Preventing resource exhaustion when processing many tasks
- Respecting service capacity limitations
- Managing memory usage by limiting parallel execution
- Implementing "worker pool" patterns in async code
The function uses a semaphore internally to control the number of concurrently
executing tasks. Tasks beyond the concurrency limit will wait until a running
task completes and releases the semaphore.
Args:
n: Maximum number of tasks to run concurrently. This controls resource usage
and prevents overloading the system or external services.
*tasks: Any number of awaitable coroutine objects to execute.
return_exceptions: If True, exceptions are returned as results rather than being
raised. If False, the first raised exception will propagate.
This matches the behavior of asyncio.gather().
Returns:
List of task results in the same order as the tasks were provided, regardless
of the order in which they completed.
Example usage:
```python
# Process a list of URLs with at most 5 concurrent requests
urls = ["https://example.com/1", "https://example.com/2", ...]
tasks = [fetch_url(url) for url in urls]
results = await gather_with_concurrency(5, *tasks)
# With exception handling
try:
results = await gather_with_concurrency(10, *tasks, return_exceptions=False)
except Exception as e:
# Handle first exception
pass
# Or capture exceptions in results
results = await gather_with_concurrency(10, *tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
# Handle this exception
pass
```
"""
semaphore = asyncio.Semaphore(n)
async def run_task_with_semaphore(task):
async with semaphore:
return await task
return await asyncio.gather(
*(run_task_with_semaphore(task) for task in tasks),
return_exceptions=return_exceptions
)
async def run_with_timeout(
coro: Any,
timeout: float,
default: Optional[T] = None,
log_timeout: bool = True
) -> Union[Any, T]:
"""
Run an async coroutine with a timeout, returning a default value if time expires.
This utility function executes an async operation with a strict time limit and
provides graceful handling of timeouts. If the operation completes within the
specified timeout, its result is returned normally. If the timeout is exceeded,
the specified default value is returned instead, and the operation is cancelled.
This functionality is particularly useful for:
- Making external API calls that might hang or take too long
- Implementing responsive UIs that can't wait indefinitely
- Handling potentially slow operations in time-sensitive contexts
- Creating fallback behavior for unreliable services
The function uses asyncio.wait_for internally and properly handles the
TimeoutError, converting it to a controlled return of the default value.
Args:
coro: The coroutine (awaitable) to execute with a timeout. This can be
any async function call or awaitable object.
timeout: Maximum execution time in seconds before timing out. Must be a
positive number.
default: The value to return if the operation times out. Defaults to None.
This can be any type, and will be returned exactly as provided.
log_timeout: Whether to log a warning message when a timeout occurs. Set
to False to suppress the warning. Default is True.
Returns:
The result of the coroutine if it completes within the timeout period,
otherwise the specified default value.
Raises:
Exception: Any exception raised by the coroutine other than TimeoutError
will be propagated to the caller.
Example usage:
```python
# Basic usage with default fallback
result = await run_with_timeout(
fetch_data_from_api("https://example.com/data"),
timeout=5.0,
default={"status": "timeout", "data": None}
)
# Without logging timeouts
result = await run_with_timeout(
slow_operation(),
timeout=10.0,
default=None,
log_timeout=False
)
# With type checking (using TypeVar)
data: Optional[List[str]] = await run_with_timeout(
get_string_list(),
timeout=3.0,
default=None
)
```
"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
if log_timeout:
logger.warning(
f"Operation timed out after {timeout}s",
emoji_key="time",
time=timeout
)
return default
def async_retry(
max_retries: int = 3,
retry_delay: float = 1.0,
backoff_factor: float = 2.0,
retry_exceptions: Optional[List[Type[Exception]]] = None,
max_backoff: Optional[float] = None
):
"""
Decorator for automatically retrying async functions when they raise exceptions.
This decorator implements a configurable exponential backoff retry strategy for
asynchronous functions. When the decorated function raises a specified exception,
the decorator will automatically wait and retry the operation, with an increasing
delay between attempts.
The retry behavior includes:
- A configurable number of maximum retry attempts
- Initial delay between retries
- Exponential backoff (each retry waits longer than the previous one)
- Optional filtering of which exception types trigger retries
- Optional maximum backoff time to cap the exponential growth
- Detailed logging of retry attempts and final failures
This pattern is especially useful for:
- Network operations that may fail temporarily
- API calls subject to rate limiting or intermittent failures
- Database operations that may encounter transient errors
- Any resource access that may be temporarily unavailable
Args:
max_retries: Maximum number of retry attempts after the initial call
(default: 3). Total attempts will be max_retries + 1.
retry_delay: Initial delay between retries in seconds (default: 1.0).
This is the wait time after the first failure.
backoff_factor: Multiplier applied to delay between retries (default: 2.0).
Each retry will wait backoff_factor times longer than the previous.
retry_exceptions: List of exception types that should trigger a retry.
If None (default), all exceptions trigger retries.
max_backoff: Maximum delay between retries in seconds, regardless of the
backoff calculation. None (default) means no maximum.
Returns:
A decorator function that wraps the target async function with retry logic.
Example usage:
```python
# Basic usage - retry any exception up to 3 times
@async_retry()
async def fetch_data(url):
return await make_request(url)
# Custom configuration - retry specific exceptions with longer delays
@async_retry(
max_retries=5,
retry_delay=2.0,
backoff_factor=3.0,
retry_exceptions=[ConnectionError, TimeoutError],
max_backoff=30.0
)
async def send_to_service(data):
return await service.process(data)
```
Note:
The decorated function's signature and return type are preserved, making this
decorator compatible with static type checking.
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
exceptions = []
delay = retry_delay
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
# Check if we should retry this exception
if retry_exceptions and not any(
isinstance(e, exc_type) for exc_type in retry_exceptions
):
raise
exceptions.append(e)
# If this was the last attempt, reraise
if attempt >= max_retries:
if len(exceptions) > 1:
logger.error(
f"Function {func.__name__} failed after {max_retries+1} attempts",
emoji_key="error",
attempts=max_retries+1
)
raise
# Log retry
logger.warning(
f"Retrying {func.__name__} after error: {str(e)} "
f"(attempt {attempt+1}/{max_retries+1})",
emoji_key="warning",
attempt=attempt+1,
max_attempts=max_retries+1,
error=str(e)
)
# Wait before retrying
await asyncio.sleep(delay)
# Increase delay for next retry
delay *= backoff_factor
if max_backoff:
delay = min(delay, max_backoff)
# Shouldn't get here, but just in case
raise exceptions[-1]
return wrapper
return decorator
async def map_async(
func: Callable[[Any], Any],
items: List[Any],
concurrency: int = 10,
chunk_size: Optional[int] = None
) -> List[Any]:
"""Map a function over items with limited concurrency.
This utility provides efficient parallel processing of a list of items while controlling the
maximum number of concurrent operations. It applies the provided async function to each item
in the list, respecting the concurrency limit set by the semaphore.
The function supports two processing modes:
1. Chunked processing: When chunk_size is provided, items are processed in batches to improve
memory efficiency when dealing with large lists.
2. Full parallel processing: When chunk_size is omitted, all items are processed in parallel
but still limited by the concurrency parameter.
Args:
func: Async function to apply to each item. This function should accept a single item
and return a result.
items: List of items to process. Each item will be passed individually to the function.
concurrency: Maximum number of concurrent tasks allowed. This controls the load on system
resources. Default is 10.
chunk_size: Optional batch size for processing. If provided, items are processed in chunks
of this size to limit memory usage. If None, all items are processed at once
(but still constrained by concurrency).
Returns:
List of results from applying the function to each item, in the same order as the input items.
Examples:
```python
# Define an async function to process an item
async def process_item(item):
await asyncio.sleep(0.1) # Simulate I/O or processing time
return item * 2
# Process a list of 100 items with max 5 concurrent tasks
items = list(range(100))
results = await map_async(process_item, items, concurrency=5)
# Process a large list in chunks to manage memory usage
large_list = list(range(10000))
results = await map_async(process_item, large_list, concurrency=10, chunk_size=500)
```
Notes:
- If the items list is empty, an empty list is returned immediately.
- The function preserves the original order of items in the result list.
- For CPU-bound tasks, consider using ProcessPoolExecutor with asyncio.to_thread
instead of this function, as this is optimized for I/O-bound tasks.
"""
if not items:
return []
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(concurrency)
# Define task function
async def process_item(item):
async with semaphore:
return await func(item)
# If using chunks, process in batches
if chunk_size:
results = []
# Process in chunks for better memory management
for i in range(0, len(items), chunk_size):
chunk = items[i:i+chunk_size]
chunk_results = await asyncio.gather(
*(process_item(item) for item in chunk)
)
results.extend(chunk_results)
return results
else:
# Process all at once with concurrency limit
return await asyncio.gather(
*(process_item(item) for item in items)
)
class AsyncBatchProcessor:
"""
Processor for efficiently batching async operations to optimize throughput.
This class provides a framework for batch processing asynchronous operations,
which is useful for optimizing I/O-bound tasks such as database writes, API calls,
or other operations where batching improves efficiency. It automatically collects
individual items and processes them in batches when:
1. The batch reaches a specified size (controlled by batch_size)
2. A specified time interval elapses (controlled by flush_interval)
3. A manual flush is requested
The processor also controls concurrency, allowing multiple batches to be processed
simultaneously while limiting the maximum number of concurrent operations to prevent
overwhelming system resources or external services.
Common use cases include:
- Batching database inserts or updates for better throughput
- Aggregating API calls to services that support bulk operations
- Optimizing data processing pipelines with chunked operations
- Building high-performance ETL (Extract, Transform, Load) processes
Usage involves extending this class to implement custom batch processing logic
by overriding the _process_batch method with specific implementation details.
This class implements the async context manager protocol, allowing for use in
async with statements to ensure proper resource cleanup.
"""
def __init__(
self,
batch_size: int = 100,
max_concurrency: int = 5,
flush_interval: Optional[float] = None
):
"""
Initialize the batch processor with configuration settings.
Args:
batch_size: Maximum number of items to collect before processing a batch.
Higher values generally improve throughput at the cost of increased
latency and memory usage. Default is 100.
max_concurrency: Maximum number of concurrent batch operations allowed.
This prevents overwhelming external services or system
resources. Default is 5 concurrent batch operations.
flush_interval: Optional automatic flush interval in seconds. When specified,
any collected items will be flushed after this interval,
regardless of whether the batch_size has been reached.
Set to None (default) to disable automatic flushing.
"""
self.batch_size = batch_size
self.max_concurrency = max_concurrency
self.flush_interval = flush_interval
self.items = []
self.flush_task = None
self.semaphore = asyncio.Semaphore(max_concurrency)
async def add(self, item: Any):
"""
Add an item to the current batch for processing.
This method adds the provided item to the internal collection batch.
The item will be processed when either:
1. The batch reaches the configured batch_size
2. The flush_interval elapses (if configured)
3. A manual flush() is called
The method automatically triggers a flush operation if the number of
collected items reaches the configured batch_size.
If a flush_interval is set and no auto-flush task is running, this method
also initializes a background task to automatically flush items after
the specified interval.
Args:
item: The item to add to the batch. Can be any type that your
_process_batch implementation can handle.
Returns:
None
Example:
```python
processor = MyBatchProcessor(batch_size=50)
await processor.add({"id": 1, "value": "data"})
```
"""
self.items.append(item)
# Start flush task if needed
if self.flush_interval and not self.flush_task:
self.flush_task = asyncio.create_task(self._auto_flush())
# Flush if batch is full
if len(self.items) >= self.batch_size:
await self.flush()
async def flush(self) -> List[Any]:
"""
Process all currently batched items immediately.
This method forces processing of all currently collected items, regardless
of whether the batch is full or the flush interval has elapsed. It's useful
when you need to ensure all items are processed without waiting, such as:
- When shutting down the application
- Before a checkpoint or commit operation
- When immediate processing is needed for time-sensitive data
- At the end of a processing cycle
The method handles empty batches gracefully, returning an empty list
when there are no items to process.
Returns:
List of results from processing the batch. The exact content depends
on what the _process_batch implementation returns. Returns an empty
list if there were no items to process.
Example:
```python
# Process any pending items immediately
results = await processor.flush()
```
"""
if not self.items:
return []
# Get current items
items = self.items
self.items = []
# Cancel flush task if running
if self.flush_task:
self.flush_task.cancel()
self.flush_task = None
# Process the batch
return await self._process_batch(items)
async def _auto_flush(self):
"""
Internal background task for automatic periodic flushing.
This method implements the auto-flush functionality that runs periodically
when flush_interval is set. It sleeps for the configured interval and then
triggers a flush operation if any items are pending.
The task continues running until:
1. It's cancelled (typically when a manual flush occurs)
2. The processor is shut down (via __aexit__)
This method is not intended to be called directly but is started automatically
by the add() method when needed and a flush_interval is configured.
Raises:
asyncio.CancelledError: When the task is cancelled. This is caught
internally and used to terminate the loop.
"""
try:
while True:
await asyncio.sleep(self.flush_interval)
if self.items:
await self.flush()
except asyncio.CancelledError:
# Task was cancelled, which is expected
pass
async def _process_batch(self, batch: List[Any]) -> List[Any]:
"""
Process a batch of items (to be overridden by subclasses).
This method should be overridden by subclasses to implement the actual
batch processing logic. The base implementation simply returns the batch
unchanged and logs a warning, as it's not meant to be used directly.
When implementing this method in a subclass, typical patterns include:
- Sending a bulk API request with all batch items
- Executing a batch database operation
- Processing items in parallel with controlled concurrency
- Aggregating items for a combined operation
Args:
batch: List of items to process that were collected via add()
Returns:
List of processed results, where each result corresponds to an item
in the input batch. The actual return type depends on the specific
implementation in the subclass.
Example implementation:
```python
async def _process_batch(self, batch: List[dict]) -> List[dict]:
# Add a batch_id to each item
batch_id = str(uuid.uuid4())
for item in batch:
item['batch_id'] = batch_id
# Send to database in a single operation
results = await self.db.insert_many(batch)
return results
```
"""
# This should be overridden by subclasses
logger.warning(
f"Default batch processing used for {len(batch)} items",
emoji_key="warning"
)
return batch
async def __aenter__(self):
"""
Enter the async context manager.
Allows the batch processor to be used in an async with statement,
which ensures proper cleanup when the context is exited.
Returns:
The batch processor instance, ready for use.
Example:
```python
async with MyBatchProcessor(batch_size=100) as processor:
for item in items:
await processor.add(item)
# All pending items are automatically flushed when the context exits
```
"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Exit the async context manager.
This method is called when exiting an async with block. It ensures
that any pending items are flushed before the context manager completes,
preventing data loss when the processor goes out of scope.
Args:
exc_type: Exception type if an exception was raised in the context
exc_val: Exception value if an exception was raised
exc_tb: Exception traceback if an exception was raised
Returns:
False, indicating that any exceptions should be propagated.
"""
# Flush any remaining items
if self.items:
await self.flush()
```
--------------------------------------------------------------------------------
/examples/tournament_code_demo.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Tournament Code Demo - Demonstrates running a code improvement tournament
This script shows how to:
1. Create a tournament with multiple models, including diversity and evaluators.
2. Track progress across multiple rounds.
3. Retrieve and analyze the improved code and evaluation scores.
The tournament task is to write and iteratively improve a Python function for
parsing messy CSV data, handling various edge cases.
Usage:
python examples/tournament_code_demo.py [--task TASK] [--rounds N]
Options:
--task TASK Specify a coding task (default: parse_csv)
--rounds N Number of tournament rounds (default: 2)
"""
import argparse
import asyncio
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional # Added List
# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax # For displaying code
# Add these imports to fix undefined names
from ultimate_mcp_server.core.models.tournament import TournamentStatus
# Assuming Gateway, PromptTemplate, etc. are correctly located
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.exceptions import ProviderError, ToolError
from ultimate_mcp_server.services.prompts import PromptTemplate # If used
# Import tournament tools for manual registration
from ultimate_mcp_server.tools.tournament import (
create_tournament,
get_tournament_results,
get_tournament_status,
)
from ultimate_mcp_server.utils import (
get_logger,
process_mcp_result,
) # process_mcp_result might need updates
from ultimate_mcp_server.utils.display import ( # Ensure these are updated for new TournamentData structure
CostTracker,
display_tournament_results, # This will need significant updates
display_tournament_status, # This likely needs updates too
)
from ultimate_mcp_server.utils.logging.console import console
# Initialize logger
logger = get_logger("example.tournament_code")
# Global gateway instance (initialized in setup_gateway)
gateway: Optional[Gateway] = None
# --- Configuration ---
DEFAULT_MODEL_CONFIGS: List[Dict[str, Any]] = [
{
"model_id": "openai/gpt-4o-mini", # Example, use your actual model IDs
"diversity_count": 2, # Generate 2 variants from this model
"temperature": 0.7,
},
{
"model_id": "anthropic/claude-3-5-haiku-20241022",
"diversity_count": 1,
"temperature": 0.6,
},
# Add more models as available/desired
# {
# "model_id": "google/gemini-1.5-flash-latest",
# "diversity_count": 1,
# },
]
DEFAULT_NUM_ROUNDS = 2
DEFAULT_TOURNAMENT_NAME = "Advanced Code Improvement Tournament"
# Default Evaluators
DEFAULT_EVALUATORS: List[Dict[str, Any]] = [
{
"evaluator_id": "python_syntax_checker",
"type": "regex_match",
"params": {
"patterns": [r"^\s*def\s+\w+\(.*\):|^\s*class\s+\w+:"], # Basic check for def/class
"target_field": "extracted_code",
"match_mode": "any_can_match",
},
"weight": 0.2,
"primary_metric": False,
},
{
"evaluator_id": "code_length_penalty", # Example: Penalize overly short/long code
"type": "regex_match", # Could be a custom evaluator
"params": {
# This regex means: content has between 5 and 500 lines (approx)
"patterns": [r"^(?:[^\n]*\n){4,499}[^\n]*$"],
"target_field": "extracted_code",
"match_mode": "all_must_match", # Must be within the line range
"regex_flag_options": ["MULTILINE", "DOTALL"],
},
"weight": 0.1,
},
{
"evaluator_id": "llm_code_grader",
"type": "llm_grader",
"params": {
"model_id": "anthropic/claude-3-5-haiku-20241022", # Use a cost-effective grader
"rubric": (
"Evaluate the provided Python code based on the original prompt. "
"Score from 0-100 considering: \n"
"1. Correctness & Robustness (does it likely solve the problem, handle edges?).\n"
"2. Efficiency (algorithmic complexity, resource usage).\n"
"3. Readability & Maintainability (clarity, comments, Pythonic style).\n"
"4. Completeness (are all requirements addressed?).\n"
"Provide a 'Score: XX' line and a brief justification."
),
},
"weight": 0.7, # Main evaluator
"primary_metric": True,
},
]
# The generic code prompt template
TEMPLATE_CODE = """
# GENERIC CODE TOURNAMENT PROMPT TEMPLATE
Write a {{code_type}} that {{task_description}}.
{{context}}
Your solution should:
{% for requirement in requirements %}
{{ loop.index }}. {{requirement}}
{% endfor %}
{% if example_inputs %}
Example inputs:
```
{{example_inputs}}
```
{% endif %}
{% if example_outputs %}
Expected outputs:
```
{{example_outputs}}
```
{% endif %}
Provide ONLY the Python code for your solution, enclosed in triple backticks (```python ... ```).
No explanations before or after the code block, unless they are comments within the code itself.
"""
# Define predefined tasks
TASKS = {
"parse_csv": {
"code_type": "Python function",
"task_description": "parses a CSV string that may use different delimiters and contains various edge cases",
"context": "Your function should be robust enough to handle real-world messy CSV data.",
"requirements": [
"Implement `parse_csv_string(csv_data: str) -> list[dict]`",
"Accept a string `csv_data` which might contain CSV data",
"Automatically detect the delimiter (comma, semicolon, or tab)",
"Handle quoted fields correctly, including escaped quotes within fields",
"Treat the first row as the header",
"Return a list of dictionaries, where each dictionary represents a row",
"Handle errors gracefully by logging warnings and skipping problematic rows",
"Return an empty list if the input is empty or only contains a header",
"Include necessary imports (e.g., `csv`, `io`).",
"Be efficient for moderately large inputs (e.g., up to 1MB).",
],
"example_inputs": """name,age,city\n"Smith, John",42,New York\n"Doe, Jane";39;"Los Angeles, CA"\n"\\"Williams\\", Bob"\t65\t"Chicago" """,
"example_outputs": """[\n {"name": "Smith, John", "age": "42", "city": "New York"},\n {"name": "Doe, Jane", "age": "39", "city": "Los Angeles, CA"},\n {"name": "\\"Williams\\", Bob", "age": "65", "city": "Chicago"}\n]""",
},
# Add other tasks (calculator, string_util) here if needed, similar structure
}
def create_custom_task_variables(task_description_custom: str):
return {
"code_type": "Python function",
"task_description": task_description_custom,
"context": "Ensure your solution is well-documented and handles potential edge cases.",
"requirements": [
"Implement the solution as specified in the task description.",
"Write clean, readable, and efficient Python code.",
"Include type hints and comprehensive docstrings.",
"Handle potential errors gracefully.",
"Make sure all necessary imports are included.",
],
"example_inputs": "# Provide relevant example inputs if applicable",
"example_outputs": "# Provide expected outputs for the examples if applicable",
}
def parse_arguments():
parser = argparse.ArgumentParser(description="Run a code improvement tournament demo")
parser.add_argument(
"--task",
type=str,
default="parse_csv",
choices=list(TASKS.keys()) + ["custom"],
help="Coding task (default: parse_csv)",
)
parser.add_argument(
"--custom-task", type=str, help="Custom coding task description (used when --task=custom)"
)
parser.add_argument(
"--rounds",
type=int,
default=DEFAULT_NUM_ROUNDS,
help=f"Number of tournament rounds (default: {DEFAULT_NUM_ROUNDS})",
)
parser.add_argument(
"--models",
type=str,
nargs="+",
default=[mc["model_id"] for mc in DEFAULT_MODEL_CONFIGS], # Pass only model_id strings
help="List of model IDs to participate (e.g., 'openai/gpt-4o-mini'). Overrides default models.",
)
return parser.parse_args()
async def setup_gateway_for_demo():
global gateway
if gateway:
return
logger.info("Initializing gateway for code tournament demo...", emoji_key="rocket")
# Assuming Gateway constructor and _initialize_providers are async
# The actual Gateway might not need to be created this way if it's a singleton managed by server.py
# For a standalone demo, this direct instantiation is okay.
# For integration, you'd likely get the existing gateway instance.
try:
# This is a simplified setup. In a real server, Gateway might be a singleton.
# Here, we create a new one for the demo.
# Ensure your actual Gateway class can be instantiated and initialized like this.
# from ultimate_mcp_server.core import get_gateway_instance, async_init_gateway
# gateway = get_gateway_instance()
# if not gateway:
# gateway = await async_init_gateway() # This sets the global instance
# For a script, direct instantiation:
gateway = Gateway(
name="code_tournament_demo_gateway", register_tools=False # Changed: register_tools=False, removed load_all_tools
)
# In a script, you might need to manually initialize providers if not done by Gateway constructor
if not gateway.providers: # Check if providers are already initialized
await gateway._initialize_providers()
# Manually register tournament tools
mcp = gateway.mcp
mcp.tool()(create_tournament)
mcp.tool()(get_tournament_status)
mcp.tool()(get_tournament_results)
logger.info("Manually registered tournament tools for the demo.")
except Exception as e:
logger.critical(f"Failed to initialize Gateway: {e}", exc_info=True)
raise
# Verify tournament tools are registered (they should be if register_tools=True and load_all_tools=True)
# This check is more for sanity.
mcp_tools = await gateway.mcp.list_tools()
registered_tool_names = [t.name for t in mcp_tools]
required_tournament_tools = [
"create_tournament",
"get_tournament_status",
"get_tournament_results",
]
missing_tools = [
tool for tool in required_tournament_tools if tool not in registered_tool_names
]
if missing_tools:
logger.error(
f"Gateway initialized, but required tournament tools are missing: {missing_tools}",
emoji_key="error",
)
logger.info(f"Available tools: {registered_tool_names}")
raise RuntimeError(f"Required tournament tools not registered: {missing_tools}")
logger.success(
"Gateway for demo initialized and tournament tools verified.", emoji_key="heavy_check_mark"
)
async def poll_tournament_status_enhanced(
tournament_id: str, storage_path: Optional[str] = None, interval: int = 10
) -> Optional[str]:
logger.info(
f"Polling status for tournament {tournament_id} (storage: {storage_path})...",
emoji_key="hourglass",
)
final_states = [
status.value
for status in [
TournamentStatus.COMPLETED,
TournamentStatus.FAILED,
TournamentStatus.CANCELLED,
]
]
while True:
status_input = {"tournament_id": tournament_id}
status_result_raw = await gateway.mcp.call_tool("get_tournament_status", status_input)
# Process MCP result to get the dictionary
status_data_dict = await process_mcp_result(status_result_raw) # Ensure this returns a dict
if "error" in status_data_dict or not status_data_dict.get(
"success", True
): # Check for tool call error
error_message = status_data_dict.get(
"error_message", status_data_dict.get("error", "Unknown error fetching status")
)
if storage_path and "not found" in error_message.lower():
# Fallback to direct file reading
state_file = Path(storage_path) / "tournament_state.json"
logger.debug(f"Tournament not found via API, trying direct read: {state_file}")
if state_file.exists():
try:
direct_state = json.loads(state_file.read_text())
current_status = direct_state.get("status")
# Reconstruct a compatible status_data_dict for display
status_data_dict = {
"tournament_id": tournament_id,
"name": direct_state.get("name"),
"tournament_type": direct_state.get("config", {}).get(
"tournament_type"
),
"status": current_status,
"current_round": direct_state.get("current_round", -1)
+ 1, # Adjust for display
"total_rounds": direct_state.get("config", {}).get("rounds", 0),
"progress_summary": f"Read from file. Round {direct_state.get('current_round', -1) + 1}.",
"created_at": direct_state.get("created_at"),
"updated_at": direct_state.get("updated_at"),
"error_message": direct_state.get("error_message"),
}
logger.info(f"Successfully read direct state from file: {current_status}")
except Exception as e:
logger.error(f"Error reading state file directly: {e}")
# Keep original error message
else:
logger.warning(f"Fallback state file not found: {state_file}")
else: # Non-"not found" error or no storage path
logger.error(
f"Error fetching tournament status: {error_message}", emoji_key="error"
)
return None # Indicate polling error
# Display status using the utility function (ensure it handles the new dict structure)
display_tournament_status(status_data_dict) # Expects a dict
current_status_val = status_data_dict.get("status")
if current_status_val in final_states:
logger.success(
f"Tournament {tournament_id} reached final state: {current_status_val}",
emoji_key="heavy_check_mark",
)
return current_status_val
await asyncio.sleep(interval)
# --- Robust result processing for demo ---
async def robust_process_mcp_result(result_raw, storage_path=None):
from ultimate_mcp_server.utils import process_mcp_result
try:
processed = await process_mcp_result(result_raw)
# If no error, or error is not about JSON, return as is
if not processed.get("error") or "LLM repair" not in processed.get("error", ""):
return processed
except Exception as e:
processed = {"error": f"Exception in process_mcp_result: {e}"}
# Fallback: try to load from file if storage_path is provided
if storage_path:
state_file = Path(storage_path) / "tournament_state.json"
if state_file.exists():
try:
with open(state_file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as file_e:
return {"error": f"Failed to parse both API and file: {file_e}"}
# Otherwise, return a clear error
return {"error": f"API did not return JSON. Raw: {str(result_raw)[:200]}"}
async def run_code_tournament_demo(tracker: CostTracker, args: argparse.Namespace):
if args.task == "custom":
if not args.custom_task:
console.print(
"[bold red]Error:[/bold red] --custom-task description must be provided when --task=custom."
)
return 1
task_name = "custom_task"
task_vars = create_custom_task_variables(args.custom_task)
task_description_log = args.custom_task
elif args.task in TASKS:
task_name = args.task
task_vars = TASKS[task_name]
task_description_log = task_vars["task_description"]
else: # Should not happen due to argparse choices
console.print(f"[bold red]Error:[/bold red] Unknown task '{args.task}'.")
return 1
console.print(
Rule(
f"[bold blue]{DEFAULT_TOURNAMENT_NAME} - Task: {task_name.replace('_', ' ').title()}[/bold blue]"
)
)
console.print(f"Task Description: [yellow]{escape(task_description_log)}[/yellow]")
# Prepare model_configs based on CLI input or defaults
# The tool now expects list of dicts for model_configs
current_model_configs = []
if args.models == [mc["model_id"] for mc in DEFAULT_MODEL_CONFIGS]: # Default models used
current_model_configs = DEFAULT_MODEL_CONFIGS
console.print(
f"Using default models: [cyan]{', '.join([mc['model_id'] for mc in current_model_configs])}[/cyan]"
)
else: # Custom models from CLI
for model_id_str in args.models:
# Find if this model_id_str matches any default config to get its diversity/temp
# This is a simple way; a more complex CLI could allow full ModelConfig dicts
default_mc = next(
(mc for mc in DEFAULT_MODEL_CONFIGS if mc["model_id"] == model_id_str), None
)
if default_mc:
current_model_configs.append(default_mc)
else: # Model from CLI not in defaults, use basic config
current_model_configs.append({"model_id": model_id_str, "diversity_count": 1})
console.print(f"Using CLI specified models: [cyan]{', '.join(args.models)}[/cyan]")
console.print(f"Rounds: [cyan]{args.rounds}[/cyan]")
console.print(
f"Evaluators: [cyan]{', '.join([e['evaluator_id'] for e in DEFAULT_EVALUATORS])}[/cyan]"
)
code_prompt_template = PromptTemplate(
template=TEMPLATE_CODE,
template_id="demo_code_prompt",
required_vars=["code_type", "task_description", "context", "requirements", "example_inputs", "example_outputs"]
)
try:
initial_prompt = code_prompt_template.render(task_vars)
except Exception as e:
logger.error(f"Failed to render prompt template: {e}", exc_info=True)
return 1
console.print(
Panel(
escape(initial_prompt[:500] + "..."),
title="[bold]Initial Prompt Preview[/bold]",
border_style="dim",
)
)
create_input = {
"name": f"{DEFAULT_TOURNAMENT_NAME} - {task_name.replace('_', ' ').title()}",
"prompt": initial_prompt,
"models": current_model_configs, # This should be List[Dict] for the tool
"rounds": args.rounds,
"tournament_type": "code",
"evaluators": DEFAULT_EVALUATORS, # Pass evaluator configs
# Add other new config params if desired, e.g., extraction_model_id
"extraction_model_id": "anthropic/claude-3-5-haiku-20241022",
"max_retries_per_model_call": 2,
"max_concurrent_model_calls": 3,
}
tournament_id: Optional[str] = None
storage_path: Optional[str] = None
try:
logger.info("Creating code tournament...", emoji_key="gear")
create_result_raw = await gateway.mcp.call_tool("create_tournament", create_input)
create_data = await process_mcp_result(
create_result_raw
) # process_mcp_result must return dict
# Corrected error handling, similar to tournament_text_demo.py
if "error" in create_data:
error_msg = create_data.get("error_message", create_data.get("error", "Unknown error creating tournament"))
logger.error(f"Failed to create tournament: {error_msg}", emoji_key="cross_mark")
console.print(f"[bold red]Error creating tournament:[/bold red] {escape(error_msg)}")
return 1
tournament_id = create_data.get("tournament_id")
storage_path = create_data.get("storage_path") # Get storage_path
if not tournament_id:
logger.error(
"No tournament ID returned from create_tournament call.", emoji_key="cross_mark"
)
console.print("[bold red]Error: No tournament ID returned.[/bold red]")
return 1
console.print(
f"Tournament [bold green]'{create_input['name']}'[/bold green] created successfully!"
)
console.print(f" ID: [yellow]{tournament_id}[/yellow]")
console.print(f" Status: [magenta]{create_data.get('status')}[/magenta]")
if storage_path:
console.print(f" Storage Path: [blue underline]{storage_path}[/blue underline]")
await asyncio.sleep(1) # Brief pause for task scheduling
final_status_val = await poll_tournament_status_enhanced(
tournament_id, storage_path, interval=10
)
if final_status_val == TournamentStatus.COMPLETED.value:
logger.info(
f"Tournament {tournament_id} completed. Fetching final results...",
emoji_key="sports_medal",
)
results_input = {"tournament_id": tournament_id}
results_raw = await gateway.mcp.call_tool("get_tournament_results", results_input)
processed_results_dict = await robust_process_mcp_result(
results_raw, storage_path
)
results_data_dict = processed_results_dict
workaround_applied_successfully = False
# If process_mcp_result itself signals an error
# (This will be true if JSON parsing failed and LLM repair also failed to produce valid JSON)
if "error" in processed_results_dict: # Simpler check for any error from process_mcp_result
original_error_msg = processed_results_dict.get("error", "Unknown error processing results")
logger.warning(
f"Initial processing of 'get_tournament_results' failed with: {original_error_msg}"
)
# Attempt workaround if it's a code tournament, storage path is known,
# AND the initial processing via MCP failed.
current_tournament_type = create_input.get("tournament_type", "unknown")
if current_tournament_type == "code" and storage_path:
logger.info(
f"Applying workaround for 'get_tournament_results' failure. "
f"Attempting to load results directly from storage: {storage_path}"
)
state_file_path = Path(storage_path) / "tournament_state.json"
if state_file_path.exists():
try:
with open(state_file_path, 'r', encoding='utf-8') as f:
results_data_dict = json.load(f) # Override with data from file
logger.success(
f"Workaround successful: Loaded results from {state_file_path}"
)
workaround_applied_successfully = True
except Exception as e:
logger.error(
f"Workaround failed: Could not load or parse {state_file_path}: {e}"
)
# results_data_dict remains processed_results_dict (the error dict from initial processing)
else:
logger.warning(
f"Workaround failed: State file not found at {state_file_path}"
)
# results_data_dict remains processed_results_dict (the error dict from initial processing)
# If not a code tournament, or no storage path, or workaround failed,
# results_data_dict is still the original error dict from processed_results_dict
# Now, check the final results_data_dict (either from tool or successful workaround)
# This outer check sees if results_data_dict *still* has an error after potential workaround
if "error" in results_data_dict:
# This block will be hit if:
# 1. Original tool call failed AND it wasn't the specific known issue for the workaround.
# 2. Original tool call failed with the known issue, BUT the workaround also failed (e.g., file not found, parse error).
final_error_msg = results_data_dict.get("error_message", results_data_dict.get("error", "Unknown error"))
logger.error(
f"Failed to get tournament results (workaround_applied_successfully={workaround_applied_successfully}): {final_error_msg}",
emoji_key="cross_mark"
)
console.print(f"[bold red]Error fetching results:[/bold red] {escape(final_error_msg)}")
else:
# Successfully got data, either from tool or workaround
if workaround_applied_successfully:
console.print(
"[yellow i](Workaround applied: Results loaded directly from tournament_state.json)[/yellow i]"
)
# Pass the full dictionary results_data_dict to display_tournament_results
display_tournament_results(
results_data_dict, console
) # Ensure this function handles the new structure
# Example of accessing overall best response
overall_best_resp_data = results_data_dict.get("overall_best_response")
if overall_best_resp_data:
console.print(
Rule("[bold green]Overall Best Response Across All Rounds[/bold green]")
)
best_variant_id = overall_best_resp_data.get("model_id_variant", "N/A")
best_score = overall_best_resp_data.get("overall_score", "N/A")
console.print(
f"Best Variant: [cyan]{best_variant_id}[/cyan] (Score: {best_score:.2f if isinstance(best_score, float) else 'N/A'})"
)
best_code = overall_best_resp_data.get("extracted_code")
if best_code:
console.print(
Panel(
Syntax(best_code, "python", theme="monokai", line_numbers=True),
title=f"Code from {best_variant_id}",
border_style="green",
)
)
else:
console.print(
"[yellow]No extracted code found for the overall best response.[/yellow]"
)
# Try to find and mention the leaderboard file from the last round
last_round_num = results_data_dict.get("config", {}).get("rounds", 0) - 1
if last_round_num >= 0 and last_round_num < len(
results_data_dict.get("rounds_results", [])
):
last_round_data = results_data_dict["rounds_results"][last_round_num]
leaderboard_file = last_round_data.get("leaderboard_file_path")
if leaderboard_file:
console.print(
f"\nCheck the final leaderboard: [blue underline]{leaderboard_file}[/blue underline]"
)
comparison_file = last_round_data.get("comparison_file_path")
if comparison_file:
console.print(
f"Check the final round comparison: [blue underline]{comparison_file}[/blue underline]"
)
elif final_status_val: # FAILED or CANCELLED
logger.warning(
f"Tournament {tournament_id} ended with status: {final_status_val}",
emoji_key="warning",
)
console.print(
f"[bold yellow]Tournament ended with status: {final_status_val}[/bold yellow]"
)
# Optionally fetch results for FAILED tournaments to see partial data / error
if final_status_val == TournamentStatus.FAILED.value:
results_input = {"tournament_id": tournament_id}
results_raw = await gateway.mcp.call_tool("get_tournament_results", results_input)
results_data_dict = await robust_process_mcp_result(
results_raw, storage_path
)
if results_data_dict and not results_data_dict.get(
"error_message"
): # Check success of get_tournament_results
display_tournament_results(results_data_dict, console) # Display what we have
else: # Polling failed
logger.error(f"Polling failed for tournament {tournament_id}.", emoji_key="cross_mark")
console.print(f"[bold red]Polling failed for tournament {tournament_id}.[/bold red]")
except (ToolError, ProviderError, Exception) as e: # Catch more general errors
logger.error(
f"An error occurred during the code tournament demo: {e}",
exc_info=True,
emoji_key="error",
)
console.print(f"[bold red]Demo Error:[/bold red] {escape(str(e))}")
return 1
finally:
tracker.display_summary(console)
logger.info("Code tournament demo finished.", emoji_key="party_popper")
return 0
async def main_async():
args = parse_arguments()
tracker = CostTracker()
exit_code = 1 # Default to error
try:
await setup_gateway_for_demo()
exit_code = await run_code_tournament_demo(tracker, args)
except Exception as e:
console.print(
f"[bold red]Critical error in demo setup or execution:[/bold red] {escape(str(e))}"
)
logger.critical(f"Demo main_async failed: {e}", exc_info=True)
finally:
# Simplified cleanup, similar to tournament_text_demo.py
if gateway:
# Perform any necessary general gateway cleanup if available in the future
# For now, specific sandbox closing is removed as it caused issues and
# repl_python is not explicitly registered/used by this demo with register_tools=False
pass
logger.info("Demo finished.")
return exit_code
if __name__ == "__main__":
try:
final_exit_code = asyncio.run(main_async())
except KeyboardInterrupt:
console.print("\n[bold yellow]Demo interrupted by user.[/bold yellow]")
final_exit_code = 130 # Standard exit code for Ctrl+C
sys.exit(final_exit_code)
```