#
tokens: 47530/50000 7/207 files (page 8/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 8 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/examples/data/Steve_Jobs_Introducing_The_iPhone_compressed.md:
--------------------------------------------------------------------------------

```markdown
# Transcript: Steve_Jobs_Introducing_The_iPhone_compressed.mp3

## Metadata
- **Duration:** 14:00.46
- **Language:** en (confidence: 1.00)
- **Transcription Model:** large-v3
- **Device:** cuda
- **Processing Time:** 99.07 seconds

## Full Transcript

 This is a day I've been looking forward to for two and a half years.  Every once in a while, a revolutionary product comes along that changes everything.  And Apple has been, well, first of all, one's very fortunate if you get to work on just one of these in your career.  Apple's been very fortunate.  It's been able to introduce a few of these into the world.  In 1984, we introduced the Macintosh.  It didn't just change Apple.  It changed the whole computer industry.  In 2001, we introduced the first iPod.  And it didn't just change the way we all listen to music.  It changed the entire music industry.  Well, today.  Today.  We're introducing three revolutionary products of this class.  The first one is a widescreen iPod with touch controls.  The second is a revolutionary mobile phone.  And the third is a breakthrough internet communications device.  So, three things.  A widescreen iPod with touch controls.  A revolutionary mobile phone.  And a breakthrough internet communications device.  An iPod.  A phone.  And an internet communicator.  An iPod.  A phone.  These are not three separate devices.  This is one device.  We are calling it iPhone.  Today.  Today.  Today, Apple is going to reinvent the phone.  And here it is.  Actually, here it is, but we're going to leave it there for now.  So, before we get into it, let me talk about a category of things.  The most advanced phones are called smart phones.  So they say.  And they typically combine a phone plus some email capability.  Plus, they say it's the internet, sort of the baby internet.  Into one device.  And they all have these plastic little keyboards on them.  And the problem is that they're not so smart.  And they're not so easy to use.  So, if you kind of make a, you know, business school 101 graph of the smart axis and the easy to use axis.  Phones, regular cell phones are kind of right there.  They're not so smart.  And they're, you know, not so easy to use.  But smart phones are definitely a little smarter.  But they actually are harder to use.  They're really complicated.  Just for the basic stuff, people have a hard time figuring out how to use them.  Well, we don't want to do either one of these things.  What we want to do is make a leapfrog product that is way smarter than any mobile device has ever been.  And super easy to use.  This is what iPhone is.  Okay.  So, we're going to reinvent the phone.  Now, we're going to start.  We're going to start with a revolutionary user interface.  Is the result of years of research and development.  And, of course, it's an interplay of hardware and software.  Now, why do we need a revolutionary user interface?  I mean, here's four smart phones, right?  Motorola Q, the Blackberry, Palm Treo, Nokia E62, the usual suspects.  And what's wrong with their user interfaces?  Well, the problem with them.  Is really sort of in the bottom 40 there.  It's this stuff right here.  They all have these keyboards that are there whether you need them or not to be there.  And they all have these control buttons that are fixed in plastic.  And are the same for every application.  Well, every application wants a slightly different user interface.  A slightly optimized set of buttons just for it.  And what happens if you think of a great idea six months from now?  You can't run around and add a button to these things.  They're already shipped.  So, what do you do?  It doesn't work because the buttons and the controls can't change.  They can't change for each application.  And they can't change down the road if you think of another great idea you want to add to this product.  Well, how do you solve this?  Hmm.  It turns out we have solved it.  We solved it in computers 20 years ago.  We solved it with a bitmap screen that could display anything we want.  Put any user interface.  And a pointing device.  We solved it with the mouse.  Right?  We solved this problem.  So, how are we going to take this to a mobile device?  Well, what we're going to do is get rid of all these buttons and just make a giant screen.  A giant screen.  Now, how are we going to communicate this?  We don't want to carry around a mouse.  Right?  So, what are we going to do?  Oh, a stylus.  Right?  We're going to use a stylus.  No.  Who wants a stylus?  You have to get them and put them away and you lose them.  Yuck.  Nobody wants a stylus.  So, let's not use a stylus.  We're going to use the best pointing device in the world.  We're going to use a pointing device that we're all born with.  We're born with 10 of them.  We're going to use our fingers.  We're going to touch this with our fingers.  And we have invented a new technology called multi-touch, which is phenomenal.  It works like magic.  You don't need a stylus.  It's far more accurate than any touch display that's ever been shipped.  It ignores unintended touches.  It's super smart.  You can do multi-finger gestures on it.  And boy, have we patented it.  We've been very lucky to have brought a few revolutionary user interfaces to the market in our time.  First was the mouse.  The second was the click wheel.  And now we're going to bring multi-touch to the market.  And each of these revolutionary user interfaces has made possible a revolutionary product.  The Mac, the iPod, and now the iPhone.  So, a revolutionary user interface.  We're going to build on top of that with software.  Now, software on mobile phones is like software.  It's like baby software.  It's not so powerful.  And today, we're going to show you a software breakthrough.  Software that's at least five years ahead of what's on any other phone.  Now, how do we do this?  Well, we start with a strong foundation.  iPhone runs OS X.  Now, why would we want to run such a sophisticated operating system  on a mobile device?  Well, because it's got everything we need.  It's got multi-tasking.  It's got the best networking.  It already knows how to power manage.  We've been doing this on mobile computers for years.  It's got awesome security.  And to write apps.  It's got everything from Coco and the graphics.  And it's got core animation built in.  And it's got the audio and video that OS X is famous for.  It's got all the stuff we want.  And it's built right in to iPhone.  And that has let us create desktop class applications and networking.  Right?  Not the crippled stuff that you find on most phones.  This is real desktop class applications.  Now, you know, one of the pioneers of our industry, Alan Kay,  has had a lot of great quotes throughout the years.  And I ran across one of them recently that explains how we look at this.  Explains why we go about doing things the way we do.  Because we love software.  And here's the quote.  People who are really serious about software should make their own hardware.  You know?  Alan said this 30 years ago.  And this is how we feel about it.  And so we're bringing breakthrough software to a mobile device for the first time.  It's five years ahead of anything on any other phone.  The second thing we're doing  is we're learning from the iPod.  Syncing with iTunes.  You know, we're going to ship our hundred millionth iPod this year.  And that's tens of millions of people  that know how to sync these devices with their PCs or Mac  and sync all of their media right on to their iPod.  Right?  So you just drop your iPod in  and it automatically syncs.  You're going to do the same thing with iPhone.  It automatically syncs to your PC or Mac  right through iTunes.  And iTunes is going to sync all your media onto your iPhone.  Your music, your audiobooks, podcasts, movies, TV shows, music videos.  But it also syncs a ton of data.  Your contacts, your calendars, and your photos,  which you can get on your iPod today,  your notes, your bookmarks from your web browser,  your email accounts, your whole email setup,  all that stuff can be moved over to iPhone completely automatically.  It's really nice.  And we do it through iTunes.  Again, you go to iTunes and you set it up,  just like you'd set up an iPod or an Apple TV.  And you set up what you want synced to your iPhone.  And it's just like an iPod.  Charge and sync.  So sync with iTunes.  Third thing I want to talk about a little is design.  We've designed something wonderful for your hand.  Just wonderful.  And this is what it looks like.  It's got a three and a half inch screen.  It's really big.  And it's the highest resolution screen we've ever shipped.  It's 160 pixels per inch.  Highest we've ever shipped.  It's gorgeous.  And on the front, there's only one button down there.  We call it the home button.  It takes you home from wherever you are.  And that's it.  Let's take a look at the side.  It's really thin.  It's thinner than any smartphone out there at 11.6 millimeters.  Thinner than the Q, thinner than the Blackjack,  thinner than all of them.  It's really nice.  And we've got some controls on the side.  We've got a little switch for ring and silent.  We've got a volume up and down control.  Let's look at the back.  On the back, the biggest thing of note  is we've got a two megapixel camera built right in.  The other side, and we're back on the front.  So let's take a look at the top now.  We've got a headset jack.  Three and a half millimeter.  All your iPod headphones fit right in.  We've got a place, a little tray for your SIM card.  And we've got one switch  for sleep and wake.  Just push it to go to sleep, push it to wake up.  Let's take a look at the bottom.  We've got a speaker.  We've got a microphone.  And we've got our 30-pin iPod connector.  So that's the bottom.  Now, we've also got some stuff you can't see.  We've got three really advanced sensors built into this phone.  The first one is a proximity sensor.  It senses when physical objects get close.  So when you bring iPhone up to your ear,  take a phone call, it turns off the display,  and it turns off the touch sensor instantly.  Why do you want to do that?  Well, one, to save battery, but two,  so you don't get spurious inputs from your face into the touchscreen.  It just automatically turns them off.  Take it away, boom, it's back on.  So it's got a proximity sensor built in.  It's got an ambient light sensor as well.  We sense the ambient lighting conditions  and adjust the brightness of the display  to match the ambient lighting conditions.  Again, better user experience saves power.  And the third thing we've got is an accelerometer  so that we can tell when you switch from portrait to landscape.

## Segments

**[00:04.11 → 00:08.89]**  This is a day I've been looking forward to for two and a half years.

**[00:15.64 → 00:21.86]**  Every once in a while, a revolutionary product comes along that changes everything.

**[00:25.66 → 00:33.30]**  And Apple has been, well, first of all, one's very fortunate if you get to work on just one of these in your career.

**[00:35.67 → 00:37.01]**  Apple's been very fortunate.

**[00:37.81 → 00:41.91]**  It's been able to introduce a few of these into the world.

**[00:41.91 → 00:46.87]**  In 1984, we introduced the Macintosh.

**[00:47.39 → 00:49.05]**  It didn't just change Apple.

**[00:49.53 → 00:51.49]**  It changed the whole computer industry.

**[01:03.15 → 01:07.53]**  In 2001, we introduced the first iPod.

**[01:09.55 → 01:15.37]**  And it didn't just change the way we all listen to music.

**[01:15.65 → 01:17.85]**  It changed the entire music industry.

**[01:20.00 → 01:22.68]**  Well, today.

**[01:22.78 → 01:23.18]**  Today.

**[01:24.68 → 01:29.66]**  We're introducing three revolutionary products of this class.

**[01:32.32 → 01:39.27]**  The first one is a widescreen iPod with touch controls.

**[01:39.61 → 01:54.70]**  The second is a revolutionary mobile phone.

**[02:04.40 → 02:10.24]**  And the third is a breakthrough internet communications device.

**[02:14.88 → 02:17.46]**  So, three things.

**[02:17.74 → 02:20.56]**  A widescreen iPod with touch controls.

**[02:20.56 → 02:22.90]**  A revolutionary mobile phone.

**[02:23.04 → 02:26.26]**  And a breakthrough internet communications device.

**[02:26.60 → 02:29.55]**  An iPod.

**[02:30.33 → 02:31.65]**  A phone.

**[02:33.16 → 02:35.24]**  And an internet communicator.

**[02:36.10 → 02:37.84]**  An iPod.

**[02:38.38 → 02:39.56]**  A phone.

**[02:39.88 → 02:49.96]**  These are not three separate devices.

**[02:50.54 → 02:52.34]**  This is one device.

**[02:52.70 → 03:00.14]**  We are calling it iPhone.

**[03:02.07 → 03:02.95]**  Today.

**[03:05.06 → 03:05.50]**  Today.

**[03:05.50 → 03:08.64]**  Today, Apple is going to reinvent the phone.

**[03:10.35 → 03:11.29]**  And here it is.

**[03:20.06 → 03:22.86]**  Actually, here it is, but we're going to leave it there for now.

**[03:24.36 → 03:33.58]**  So, before we get into it, let me talk about a category of things.

**[03:33.68 → 03:36.10]**  The most advanced phones are called smart phones.

**[03:37.28 → 03:38.16]**  So they say.

**[03:39.24 → 03:43.64]**  And they typically combine a phone plus some email capability.

**[03:43.96 → 03:46.68]**  Plus, they say it's the internet, sort of the baby internet.

**[03:46.68 → 03:47.82]**  Into one device.

**[03:47.96 → 03:50.78]**  And they all have these plastic little keyboards on them.

**[03:51.48 → 03:55.46]**  And the problem is that they're not so smart.

**[03:55.82 → 03:57.68]**  And they're not so easy to use.

**[03:57.74 → 04:04.70]**  So, if you kind of make a, you know, business school 101 graph of the smart axis and the easy to use axis.

**[04:05.14 → 04:07.24]**  Phones, regular cell phones are kind of right there.

**[04:07.32 → 04:08.38]**  They're not so smart.

**[04:08.44 → 04:10.84]**  And they're, you know, not so easy to use.

**[04:12.44 → 04:14.72]**  But smart phones are definitely a little smarter.

**[04:14.72 → 04:16.80]**  But they actually are harder to use.

**[04:17.12 → 04:18.60]**  They're really complicated.

**[04:18.86 → 04:22.44]**  Just for the basic stuff, people have a hard time figuring out how to use them.

**[04:23.44 → 04:25.82]**  Well, we don't want to do either one of these things.

**[04:26.04 → 04:33.46]**  What we want to do is make a leapfrog product that is way smarter than any mobile device has ever been.

**[04:33.68 → 04:35.50]**  And super easy to use.

**[04:35.64 → 04:37.48]**  This is what iPhone is.

**[04:37.82 → 04:38.36]**  Okay.

**[04:40.49 → 04:43.09]**  So, we're going to reinvent the phone.

**[04:44.45 → 04:46.09]**  Now, we're going to start.

**[04:46.35 → 04:52.85]**  We're going to start with a revolutionary user interface.

**[04:54.23 → 04:58.55]**  Is the result of years of research and development.

**[05:00.19 → 05:03.61]**  And, of course, it's an interplay of hardware and software.

**[05:04.67 → 05:07.49]**  Now, why do we need a revolutionary user interface?

**[05:07.77 → 05:10.15]**  I mean, here's four smart phones, right?

**[05:10.25 → 05:15.09]**  Motorola Q, the Blackberry, Palm Treo, Nokia E62, the usual suspects.

**[05:15.41 → 05:18.15]**  And what's wrong with their user interfaces?

**[05:18.19 → 05:19.95]**  Well, the problem with them.

**[05:20.45 → 05:22.57]**  Is really sort of in the bottom 40 there.

**[05:23.05 → 05:24.67]**  It's this stuff right here.

**[05:25.69 → 05:30.13]**  They all have these keyboards that are there whether you need them or not to be there.

**[05:30.49 → 05:34.41]**  And they all have these control buttons that are fixed in plastic.

**[05:34.71 → 05:36.93]**  And are the same for every application.

**[05:37.21 → 05:39.93]**  Well, every application wants a slightly different user interface.

**[05:40.17 → 05:43.35]**  A slightly optimized set of buttons just for it.

**[05:43.81 → 05:46.77]**  And what happens if you think of a great idea six months from now?

**[05:46.93 → 05:49.21]**  You can't run around and add a button to these things.

**[05:49.21 → 05:49.89]**  They're already shipped.

**[05:50.19 → 05:51.27]**  So, what do you do?

**[05:51.96 → 05:56.41]**  It doesn't work because the buttons and the controls can't change.

**[05:56.67 → 05:58.27]**  They can't change for each application.

**[05:58.55 → 06:03.87]**  And they can't change down the road if you think of another great idea you want to add to this product.

**[06:04.55 → 06:05.99]**  Well, how do you solve this?

**[06:06.43 → 06:06.81]**  Hmm.

**[06:07.09 → 06:09.37]**  It turns out we have solved it.

**[06:09.43 → 06:12.19]**  We solved it in computers 20 years ago.

**[06:12.81 → 06:17.55]**  We solved it with a bitmap screen that could display anything we want.

**[06:17.71 → 06:19.11]**  Put any user interface.

**[06:19.11 → 06:21.79]**  And a pointing device.

**[06:22.23 → 06:23.57]**  We solved it with the mouse.

**[06:24.07 → 06:24.45]**  Right?

**[06:25.01 → 06:26.13]**  We solved this problem.

**[06:26.25 → 06:28.61]**  So, how are we going to take this to a mobile device?

**[06:29.35 → 06:34.51]**  Well, what we're going to do is get rid of all these buttons and just make a giant screen.

**[06:35.47 → 06:36.57]**  A giant screen.

**[06:38.39 → 06:40.39]**  Now, how are we going to communicate this?

**[06:40.45 → 06:41.81]**  We don't want to carry around a mouse.

**[06:41.95 → 06:42.19]**  Right?

**[06:42.31 → 06:43.31]**  So, what are we going to do?

**[06:43.91 → 06:44.95]**  Oh, a stylus.

**[06:45.03 → 06:45.25]**  Right?

**[06:45.65 → 06:46.81]**  We're going to use a stylus.

**[06:47.83 → 06:48.23]**  No.

**[06:50.35 → 06:51.69]**  Who wants a stylus?

**[06:52.69 → 06:55.15]**  You have to get them and put them away and you lose them.

**[06:55.27 → 06:55.67]**  Yuck.

**[06:56.27 → 06:57.57]**  Nobody wants a stylus.

**[06:57.71 → 06:59.11]**  So, let's not use a stylus.

**[07:00.19 → 07:02.73]**  We're going to use the best pointing device in the world.

**[07:02.79 → 07:05.47]**  We're going to use a pointing device that we're all born with.

**[07:05.57 → 07:06.67]**  We're born with 10 of them.

**[07:06.71 → 07:07.53]**  We're going to use our fingers.

**[07:08.49 → 07:09.95]**  We're going to touch this with our fingers.

**[07:10.13 → 07:15.47]**  And we have invented a new technology called multi-touch, which is phenomenal.

**[07:16.27 → 07:17.83]**  It works like magic.

**[07:19.45 → 07:21.31]**  You don't need a stylus.

**[07:22.05 → 07:26.09]**  It's far more accurate than any touch display that's ever been shipped.

**[07:26.65 → 07:29.03]**  It ignores unintended touches.

**[07:29.15 → 07:30.07]**  It's super smart.

**[07:31.13 → 07:33.79]**  You can do multi-finger gestures on it.

**[07:34.39 → 07:36.41]**  And boy, have we patented it.

**[07:46.21 → 07:53.01]**  We've been very lucky to have brought a few revolutionary user interfaces to the market in our time.

**[07:53.73 → 07:54.73]**  First was the mouse.

**[07:55.73 → 07:58.55]**  The second was the click wheel.

**[07:58.75 → 08:02.11]**  And now we're going to bring multi-touch to the market.

**[08:02.61 → 08:09.01]**  And each of these revolutionary user interfaces has made possible a revolutionary product.

**[08:09.23 → 08:12.79]**  The Mac, the iPod, and now the iPhone.

**[08:13.25 → 08:15.85]**  So, a revolutionary user interface.

**[08:16.09 → 08:20.09]**  We're going to build on top of that with software.

**[08:20.29 → 08:23.93]**  Now, software on mobile phones is like software.

**[08:23.93 → 08:25.55]**  It's like baby software.

**[08:26.47 → 08:28.17]**  It's not so powerful.

**[08:28.65 → 08:32.01]**  And today, we're going to show you a software breakthrough.

**[08:32.27 → 08:37.15]**  Software that's at least five years ahead of what's on any other phone.

**[08:37.41 → 08:38.45]**  Now, how do we do this?

**[08:38.51 → 08:41.03]**  Well, we start with a strong foundation.

**[08:41.79 → 08:43.87]**  iPhone runs OS X.

**[08:47.68 → 08:57.76]**  Now, why would we want to run such a sophisticated operating system

**[08:57.76 → 08:59.26]**  on a mobile device?

**[08:59.46 → 09:01.38]**  Well, because it's got everything we need.

**[09:01.94 → 09:03.50]**  It's got multi-tasking.

**[09:03.72 → 09:05.18]**  It's got the best networking.

**[09:05.52 → 09:07.70]**  It already knows how to power manage.

**[09:07.78 → 09:09.96]**  We've been doing this on mobile computers for years.

**[09:10.30 → 09:11.74]**  It's got awesome security.

**[09:12.06 → 09:13.28]**  And to write apps.

**[09:13.72 → 09:17.30]**  It's got everything from Coco and the graphics.

**[09:17.38 → 09:19.50]**  And it's got core animation built in.

**[09:19.66 → 09:23.58]**  And it's got the audio and video that OS X is famous for.

**[09:23.66 → 09:25.10]**  It's got all the stuff we want.

**[09:25.10 → 09:27.52]**  And it's built right in to iPhone.

**[09:27.76 → 09:32.82]**  And that has let us create desktop class applications and networking.

**[09:33.92 → 09:34.46]**  Right?

**[09:35.66 → 09:38.78]**  Not the crippled stuff that you find on most phones.

**[09:38.94 → 09:41.58]**  This is real desktop class applications.

**[09:42.28 → 09:46.74]**  Now, you know, one of the pioneers of our industry, Alan Kay,

**[09:46.98 → 09:49.44]**  has had a lot of great quotes throughout the years.

**[09:49.60 → 09:55.00]**  And I ran across one of them recently that explains how we look at this.

**[09:56.06 → 09:59.18]**  Explains why we go about doing things the way we do.

**[09:59.30 → 10:00.72]**  Because we love software.

**[10:01.90 → 10:02.96]**  And here's the quote.

**[10:03.44 → 10:07.50]**  People who are really serious about software should make their own hardware.

**[10:08.32 → 10:08.98]**  You know?

**[10:09.46 → 10:11.40]**  Alan said this 30 years ago.

**[10:11.96 → 10:13.76]**  And this is how we feel about it.

**[10:13.82 → 10:18.64]**  And so we're bringing breakthrough software to a mobile device for the first time.

**[10:18.76 → 10:22.14]**  It's five years ahead of anything on any other phone.

**[10:22.84 → 10:24.14]**  The second thing we're doing

**[10:24.14 → 10:25.84]**  is we're learning from the iPod.

**[10:26.24 → 10:27.30]**  Syncing with iTunes.

**[10:27.48 → 10:30.76]**  You know, we're going to ship our hundred millionth iPod this year.

**[10:31.20 → 10:34.78]**  And that's tens of millions of people

**[10:34.78 → 10:38.10]**  that know how to sync these devices with their PCs or Mac

**[10:38.10 → 10:41.72]**  and sync all of their media right on to their iPod.

**[10:41.82 → 10:42.34]**  Right?

**[10:42.46 → 10:44.72]**  So you just drop your iPod in

**[10:44.72 → 10:46.38]**  and it automatically syncs.

**[10:46.50 → 10:48.68]**  You're going to do the same thing with iPhone.

**[10:48.96 → 10:51.20]**  It automatically syncs to your PC or Mac

**[10:51.96 → 10:52.74]**  right through iTunes.

**[10:52.74 → 10:56.34]**  And iTunes is going to sync all your media onto your iPhone.

**[10:56.50 → 11:01.64]**  Your music, your audiobooks, podcasts, movies, TV shows, music videos.

**[11:01.92 → 11:03.96]**  But it also syncs a ton of data.

**[11:04.64 → 11:06.84]**  Your contacts, your calendars, and your photos,

**[11:06.94 → 11:08.28]**  which you can get on your iPod today,

**[11:08.44 → 11:11.86]**  your notes, your bookmarks from your web browser,

**[11:12.34 → 11:14.68]**  your email accounts, your whole email setup,

**[11:14.74 → 11:17.62]**  all that stuff can be moved over to iPhone completely automatically.

**[11:18.44 → 11:19.46]**  It's really nice.

**[11:19.64 → 11:22.44]**  And we do it through iTunes.

**[11:23.76 → 11:26.16]**  Again, you go to iTunes and you set it up,

**[11:26.22 → 11:28.46]**  just like you'd set up an iPod or an Apple TV.

**[11:28.96 → 11:31.46]**  And you set up what you want synced to your iPhone.

**[11:32.30 → 11:34.26]**  And it's just like an iPod.

**[11:35.14 → 11:36.30]**  Charge and sync.

**[11:36.78 → 11:37.92]**  So sync with iTunes.

**[11:40.02 → 11:42.24]**  Third thing I want to talk about a little is design.

**[11:43.08 → 11:46.38]**  We've designed something wonderful for your hand.

**[11:47.02 → 11:47.86]**  Just wonderful.

**[11:48.60 → 11:49.90]**  And this is what it looks like.

**[11:52.64 → 11:54.52]**  It's got a three and a half inch screen.

**[11:54.52 → 11:55.90]**  It's really big.

**[11:56.98 → 12:00.24]**  And it's the highest resolution screen we've ever shipped.

**[12:00.38 → 12:02.18]**  It's 160 pixels per inch.

**[12:02.82 → 12:04.14]**  Highest we've ever shipped.

**[12:04.26 → 12:04.82]**  It's gorgeous.

**[12:05.22 → 12:07.78]**  And on the front, there's only one button down there.

**[12:07.88 → 12:08.80]**  We call it the home button.

**[12:08.90 → 12:10.54]**  It takes you home from wherever you are.

**[12:10.70 → 12:11.56]**  And that's it.

**[12:12.50 → 12:13.66]**  Let's take a look at the side.

**[12:14.02 → 12:15.40]**  It's really thin.

**[12:15.60 → 12:20.26]**  It's thinner than any smartphone out there at 11.6 millimeters.

**[12:20.42 → 12:22.94]**  Thinner than the Q, thinner than the Blackjack,

**[12:23.02 → 12:24.18]**  thinner than all of them.

**[12:25.06 → 12:25.92]**  It's really nice.

**[12:26.54 → 12:28.28]**  And we've got some controls on the side.

**[12:28.40 → 12:29.88]**  We've got a little switch for ring and silent.

**[12:29.98 → 12:31.74]**  We've got a volume up and down control.

**[12:33.14 → 12:33.96]**  Let's look at the back.

**[12:34.76 → 12:36.44]**  On the back, the biggest thing of note

**[12:36.44 → 12:38.42]**  is we've got a two megapixel camera built right in.

**[12:42.40 → 12:44.12]**  The other side, and we're back on the front.

**[12:44.24 → 12:45.46]**  So let's take a look at the top now.

**[12:46.34 → 12:48.58]**  We've got a headset jack.

**[12:49.18 → 12:50.18]**  Three and a half millimeter.

**[12:50.38 → 12:52.02]**  All your iPod headphones fit right in.

**[12:53.32 → 12:55.62]**  We've got a place, a little tray for your SIM card.

**[12:56.20 → 12:57.44]**  And we've got one switch

**[12:57.78 → 12:58.36]**  for sleep and wake.

**[12:58.52 → 13:00.64]**  Just push it to go to sleep, push it to wake up.

**[13:01.84 → 13:02.86]**  Let's take a look at the bottom.

**[13:04.74 → 13:05.82]**  We've got a speaker.

**[13:07.12 → 13:08.20]**  We've got a microphone.

**[13:08.82 → 13:11.32]**  And we've got our 30-pin iPod connector.

**[13:12.40 → 13:13.46]**  So that's the bottom.

**[13:14.40 → 13:16.62]**  Now, we've also got some stuff you can't see.

**[13:17.32 → 13:20.88]**  We've got three really advanced sensors built into this phone.

**[13:21.14 → 13:22.84]**  The first one is a proximity sensor.

**[13:23.22 → 13:25.48]**  It senses when physical objects get close.

**[13:25.58 → 13:27.62]**  So when you bring iPhone up to your ear,

**[13:28.08 → 13:30.82]**  take a phone call, it turns off the display,

**[13:31.04 → 13:33.02]**  and it turns off the touch sensor instantly.

**[13:33.36 → 13:34.38]**  Why do you want to do that?

**[13:34.48 → 13:35.90]**  Well, one, to save battery, but two,

**[13:36.04 → 13:38.98]**  so you don't get spurious inputs from your face into the touchscreen.

**[13:39.22 → 13:40.40]**  It just automatically turns them off.

**[13:40.42 → 13:41.50]**  Take it away, boom, it's back on.

**[13:41.82 → 13:43.76]**  So it's got a proximity sensor built in.

**[13:43.78 → 13:45.28]**  It's got an ambient light sensor as well.

**[13:45.58 → 13:47.42]**  We sense the ambient lighting conditions

**[13:47.42 → 13:49.50]**  and adjust the brightness of the display

**[13:49.50 → 13:51.04]**  to match the ambient lighting conditions.

**[13:51.10 → 13:53.52]**  Again, better user experience saves power.

**[13:53.96 → 13:56.58]**  And the third thing we've got is an accelerometer

**[13:56.58 → 13:59.88]**  so that we can tell when you switch from portrait to landscape.

```

--------------------------------------------------------------------------------
/completion_support.py:
--------------------------------------------------------------------------------

```python
"""
Completion support for MCP servers.

This module implements the argument completion system for the MCP (Model Control Protocol)
servers, enabling interactive, context-aware autocompletion for tool arguments. The completion
system helps users and LLMs efficiently use tools by suggesting valid values for arguments
based on the current context.

Key Components:
- CompletionProvider (abstract): Base class for all completion providers
- StaticCompletionProvider: Provides completions from predefined, static lists
- DynamicCompletionProvider: Generates completions on-demand through callback functions
- CompletionRegistry: Central registry managing providers for different tools
- Utility functions: Helper functions for common completion scenarios (file paths, etc.)

The module supports a flexible, extensible architecture where:
1. Each tool can register its own providers for different arguments
2. Static lists can be used for enumerated options (e.g., formats, modes)
3. Dynamic functions can be used for context-dependent values (files, users, etc.)
4. A fallback provider can handle common arguments across tools

Usage Example:
```python
# Create completion registry
registry = CompletionRegistry()

# Register dynamic provider for a tool
registry.register_provider("database_tool", DynamicCompletionProvider({
    "table_name": async_db_tables_function,
    "column_name": async_table_columns_function
}))

# Set default provider for common arguments across all tools
registry.set_default_provider(COMMON_COMPLETIONS)

# Later, when handling MCP completion requests:
completions = await registry.get_completions(
    tool_name="document_tool", 
    argument_name="format",
    current_value="pd"  # User has typed "pd" so far
)
# Returns: {"values": ["pdf"], "hasMore": False, "total": 1}
```

This system integrates with the MCP server to provide real-time completion
suggestions as users type, significantly improving usability and reducing errors.
"""
from typing import Any, Callable, Dict, List, Optional


class CompletionProvider:
    """
    Abstract base class defining the interface for argument completion providers.
    
    CompletionProvider serves as the foundation for all completion mechanisms in the MCP
    system. It defines a consistent interface that all provider implementations must follow,
    ensuring that consumers of completions can work with different providers interchangeably.
    
    To implement a custom completion provider:
    1. Subclass CompletionProvider
    2. Implement the get_completions() method to return suggestions
    3. Implement the supports_argument() method to indicate what arguments your provider handles
    
    The framework includes two standard implementations:
    - StaticCompletionProvider: Uses predefined lists of values
    - DynamicCompletionProvider: Generates values dynamically via callback functions
    
    Custom implementations might include providers that:
    - Query external APIs for suggestions
    - Read from databases or other data sources
    - Implement complex filtering or ranking logic
    - Cache results for performance optimization
    
    Providers should handle any internal errors gracefully and return an empty list rather
    than raising exceptions that would disrupt the completion flow.
    """
    
    async def get_completions(self, argument_name: str, current_value: str, **context) -> List[str]:
        """
        Get completion suggestions for an argument.
        
        This method is called when the MCP system needs completions for an argument.
        It should return relevant suggestions based on the provided information.
        
        Args:
            argument_name: Name of the argument being completed (e.g., "file_path", "format")
            current_value: Current partial value entered by the user (may be empty)
            **context: Additional context that may affect completions, such as:
                - tool_name: The name of the tool requesting completions
                - Other argument values from the same tool call
                - User information, preferences, or permissions
                - Environment information
            
        Returns:
            List of string suggestions that are valid completions for the current_value.
            Return an empty list if no suggestions are available.
            
        Raises:
            NotImplementedError: If not implemented by a subclass
        """
        raise NotImplementedError("Subclasses must implement get_completions")
        
    def supports_argument(self, argument_name: str) -> bool:
        """
        Check if this provider supports completion for a given argument.
        
        This method allows the CompletionRegistry to quickly determine if this
        provider can handle completions for a specific argument without having
        to call get_completions and risk exceptions or empty results.
        
        Args:
            argument_name: Name of the argument to check for support
            
        Returns:
            True if this provider can provide completions for the argument,
            False otherwise
            
        Raises:
            NotImplementedError: If not implemented by a subclass
        """
        raise NotImplementedError("Subclasses must implement supports_argument")


class StaticCompletionProvider(CompletionProvider):
    """
    Completion provider that returns predefined, static suggestion lists for arguments.
    
    This provider implements a straightforward approach to argument completion using
    predefined lists of suggestions for each supported argument name. When queried,
    it filters these static lists based on the current user input prefix. This makes it
    ideal for arguments with a fixed, known set of possible values like:
    
    - Enumerated options (e.g., file formats, provider names)
    - Common settings or modes (e.g., analysis types, priority levels)
    - Frequently used values that rarely change
    
    The provider automatically performs case-insensitive prefix matching on the
    predefined suggestions when a partial value is provided. For example, if 
    completions include ["openai", "anthropic"] and the current_value is "open",
    only "openai" will be returned.
    
    Usage example:
        ```python
        # Create a provider with predefined completions for common arguments
        provider = StaticCompletionProvider({
            "format": ["json", "csv", "xml", "yaml"],
            "provider": ["openai", "anthropic", "cohere", "azure"],
            "priority": ["low", "medium", "high"]
        })
        
        # Later, get all format options
        completions = await provider.get_completions("format", "")  # Returns all formats
        
        # Or get filtered provider options
        completions = await provider.get_completions("provider", "co")  # Returns ["cohere"]
        ```
    
    For arguments that require dynamic or context-sensitive completions (like file paths 
    or current database tables), use DynamicCompletionProvider instead.
    """
    
    def __init__(self, completions: Dict[str, List[str]]):
        """
        Initialize with static completion values.
        
        Args:
            completions: Dictionary mapping argument names to suggestion lists
        """
        self.completions = completions
        
    async def get_completions(self, argument_name: str, current_value: str, **context) -> List[str]:
        """Get completion suggestions from static values."""
        if not self.supports_argument(argument_name):
            return []
            
        # Filter suggestions based on current value
        suggestions = self.completions.get(argument_name, [])
        if current_value:
            return [s for s in suggestions if s.lower().startswith(current_value.lower())]
        return suggestions
        
    def supports_argument(self, argument_name: str) -> bool:
        """Check if static completions exist for this argument."""
        return argument_name in self.completions


class DynamicCompletionProvider(CompletionProvider):
    """
    Completion provider that generates suggestions dynamically using callback functions.
    
    Unlike the StaticCompletionProvider which uses fixed lists, this provider calls
    specialized functions to generate completion suggestions on demand. This approach
    is essential for arguments whose valid values:
    
    - Depend on the current system state (e.g., existing files, running processes)
    - Vary based on user context or previous selections
    - Are too numerous to predefine (e.g., all possible file paths)
    - Require external API calls or database queries to determine
    
    The provider maps argument names to async callback functions that are responsible
    for generating appropriate suggestions based on the current partial input and context.
    Each callback function should accept at least two parameters:
    - current_value: The current partial input string
    - **context: Additional context information that may be useful for generating completions
    
    Usage example:
        ```python
        # Define completion functions
        async def complete_files(current_value, **context):
            # Custom logic to find matching files
            return ["file1.txt", "file2.txt", "folder/"]
            
        async def complete_users(current_value, **context):
            # Query database for matching users
            db = context.get("database")
            users = await db.query(f"SELECT username FROM users WHERE username LIKE '{current_value}%'")
            return [user.username for user in users]
        
        # Create dynamic provider with these functions
        provider = DynamicCompletionProvider({
            "file_path": complete_files,
            "username": complete_users
        })
        ```
    
    Each completion function should handle errors gracefully and return an empty list
    rather than raising exceptions. The provider will log errors but won't propagate them
    to the MCP completion API response.
    """
    
    def __init__(self, completion_functions: Dict[str, Callable]):
        """
        Initialize with dynamic completion functions.
        
        Args:
            completion_functions: Dictionary mapping argument names to completion functions.
                Each function should be an async function that accepts at least 
                (current_value: str, **context) and returns a List[str] of suggestions.
        """
        self.completion_functions = completion_functions
        
    async def get_completions(self, argument_name: str, current_value: str, **context) -> List[str]:
        """Get completion suggestions by calling appropriate function."""
        if not self.supports_argument(argument_name):
            return []
            
        # Call the function to get suggestions
        func = self.completion_functions.get(argument_name)
        if func:
            try:
                suggestions = await func(current_value, **context)
                return suggestions
            except Exception as e:
                # Log error and return empty list
                print(f"Error getting completions for {argument_name}: {str(e)}")
                return []
        return []
        
    def supports_argument(self, argument_name: str) -> bool:
        """Check if a completion function exists for this argument."""
        return argument_name in self.completion_functions


class CompletionRegistry:
    """
    Central registry managing completion providers for different tools and arguments.
    
    The CompletionRegistry serves as the core orchestration component of the MCP completion
    system, providing a unified interface for registration, management, and access to
    completion providers. It implements:
    
    1. Tool-specific provider registration and lookup
    2. A fallback mechanism through a default provider
    3. Standardized response formatting according to the MCP specification
    4. Error handling and graceful degradation
    
    This registry is designed to be the single entry point for all completion requests
    in an MCP server. Client code can register different providers for different tools,
    set up a default provider for common arguments, and then handle all completion requests
    through a single, consistent interface.
    
    Each tool in the system can have its own dedicated completion provider that understands
    the specific requirements and valid values for that tool's arguments. When no specific
    provider is registered for a tool, the registry falls back to the default provider,
    which typically handles common arguments like formats, providers, and models.
    
    Usage workflow:
    1. Create a registry during server initialization
    2. Register tool-specific providers for specialized tools
    3. Set a default provider for common arguments
    4. Use get_completions() to handle MCP completion protocol requests
    
    The registry ensures that all responses follow the MCP completion protocol format,
    even when errors occur or no completions are available, providing a consistent
    experience for clients.
    """
    
    def __init__(self):
        """Initialize the registry."""
        self.tool_providers = {}  # Map of tool_name -> provider
        self.default_provider = None
        
    def register_provider(self, tool_name: str, provider: CompletionProvider):
        """
        Register a completion provider for a specific tool.
        
        This method associates a completion provider with a specific tool name in the registry.
        When the system receives completion requests for this tool, the registered provider 
        will be used to generate suggestions for its arguments.
        
        Each tool can have exactly one registered provider. If a provider is already
        registered for the specified tool, it will be replaced by the new provider.
        This allows for dynamic reconfiguration of completion sources as needed.
        
        The provider can be either a StaticCompletionProvider for fixed option lists,
        a DynamicCompletionProvider for context-dependent suggestions, or any custom
        implementation of the CompletionProvider interface.
        
        Args:
            tool_name: Identifier of the tool to register a provider for. This should match
                      the tool name used in MCP requests (e.g., "search_documents", 
                      "analyze_image").
            provider: The completion provider instance that will handle suggestion requests
                     for this tool's arguments. Must implement the CompletionProvider interface.
                     
        Note:
            If a tool requires completions for only some of its arguments, the provider
            should still be registered here, and its supports_argument() method should
            return False for unsupported arguments.
        
        Example:
            ```python
            # Register provider for search_documents tool
            registry.register_provider(
                "search_documents",
                StaticCompletionProvider({
                    "source": ["web", "database", "local_files"],
                    "sort_by": ["relevance", "date", "title"]
                })
            )
            ```
        """
        self.tool_providers[tool_name] = provider
        
    def set_default_provider(self, provider: CompletionProvider):
        """
        Set a default provider to handle arguments for tools without specific providers.
        
        This method establishes a fallback completion provider that is used when:
        1. No tool-specific provider is registered for a requested tool
        2. A registered provider exists but doesn't support the requested argument
        
        The default provider is typically configured to handle common arguments that
        appear across multiple tools, such as:
        - Provider names (e.g., "openai", "anthropic", "azure")
        - Model identifiers (e.g., "gpt-4o", "claude-3-5-haiku-20241022")
        - Common formats (e.g., "json", "csv", "markdown")
        - Universal settings (e.g., "temperature", "max_tokens")
        
        Only one default provider can be active at a time. Setting a new default
        provider replaces any previously set default.
        
        Args:
            provider: The completion provider instance to use as the default fallback
                     for all tools without specific providers or for arguments not
                     supported by their specific providers.
                     
        Note:
            When no default provider is set and no tool-specific provider is found,
            the system will return an empty completion result rather than raising
            an error.
            
        Example:
            ```python
            # Set default provider for common arguments across all tools
            registry.set_default_provider(
                StaticCompletionProvider({
                    "provider": ["openai", "anthropic", "cohere"],
                    "temperature": ["0.0", "0.5", "0.7", "1.0"],
                    "format": ["json", "text", "markdown"]
                })
            )
            ```
        """
        self.default_provider = provider
        
    def get_provider(self, tool_name: str) -> Optional[CompletionProvider]:
        """
        Retrieve the appropriate completion provider for a specific tool.
        
        This method implements the provider resolution logic for the registry,
        determining which completion provider should handle a given tool's
        argument completions. It follows this resolution sequence:
        
        1. Look for a provider specifically registered for the requested tool
        2. If no tool-specific provider exists, fall back to the default provider
        3. If neither exists, return None
        
        This lookup process encapsulates the registry's fallback mechanism,
        allowing tool-specific providers to take precedence while ensuring
        that common arguments can still be handled by a default provider.
        
        Args:
            tool_name: The identifier of the tool to find a provider for, matching
                      the name used when registering the provider.
            
        Returns:
            CompletionProvider: The appropriate provider to handle completions for the tool.
                               This will be either:
                               - The tool's specifically registered provider, if one exists
                               - The default provider, if no tool-specific provider exists
                               - None, if no applicable provider is found
                               
        Note:
            This method is primarily used internally by get_completions(), but can also
            be called directly to check provider availability without requesting completions.
        """
        return self.tool_providers.get(tool_name, self.default_provider)
        
    async def get_completions(
        self, 
        tool_name: str, 
        argument_name: str, 
        current_value: str, 
        **context
    ) -> Dict[str, Any]:
        """
        Get completion suggestions for a tool argument with standardized response format.
        
        This method serves as the main API endpoint for the MCP completion protocol.
        It provides argument suggestions by:
        1. Finding the appropriate provider for the requested tool
        2. Checking if that provider supports the requested argument
        3. Calling the provider's get_completions method if supported
        4. Formatting the results according to the MCP specification
        
        If no provider exists for the tool, or the provider doesn't support the
        argument, an empty result structure is returned rather than raising an error.
        Additionally, any exceptions in the provider's completion logic are caught
        and result in an empty response with error logging.
        
        Args:
            tool_name: Name of the tool requesting completion suggestions
            argument_name: Name of the argument within the tool to provide completions for
            current_value: Current value or partial input entered by the user
            **context: Additional context that may be useful for generating completions,
                such as values of other arguments, user preferences, or environment info
            
        Returns:
            Dictionary conforming to the MCP completion protocol with these keys:
            - values: List of suggested completion values (strings), limited to 100 items
            - hasMore: Boolean indicating if more than 100 suggestions were available
            - total: Total number of suggestions actually included in the 'values' list
            
        Example response:
            ```python
            {
                "values": ["option1", "option2", "option3"],
                "hasMore": False,
                "total": 3
            }
            ```
            
        Note:
            The response is always structured as a valid MCP completion response, even
            when errors occur or no suggestions are available. This ensures clients
            always receive a predictable format.
        """
        provider = self.get_provider(tool_name)
        
        if not provider or not provider.supports_argument(argument_name):
            return {
                "values": [],
                "hasMore": False,
                "total": 0
            }
            
        try:
            # Get suggestions from provider
            suggestions = await provider.get_completions(
                argument_name=argument_name,
                current_value=current_value,
                tool_name=tool_name,
                **context
            )
            
            # Limit to 100 items as per MCP spec
            has_more = len(suggestions) > 100
            suggestions = suggestions[:100]
            
            return {
                "values": suggestions,
                "hasMore": has_more,
                "total": len(suggestions)
            }
        except Exception as e:
            # Log error and return empty result
            print(f"Error getting completions: {str(e)}")
            return {
                "values": [],
                "hasMore": False,
                "total": 0
            }


# Example usage for file path completion
async def complete_file_paths(current_value: str, **context) -> List[str]:
    """
    Generate filesystem path completion suggestions based on the current input.
    
    This utility function provides intelligent path suggestions for file-related arguments,
    making it easier for users to navigate and select files or directories in the filesystem.
    It handles various path formats including relative paths, absolute paths, and user home
    directory references.
    
    Behavior:
    - For empty input: Returns common starting points ("./", "../", "/")
    - For partial paths: Performs glob matching to find all matching files and directories
    - For directories: Appends a trailing slash to distinguish them from files
    - Expands user directory references (e.g., "~/documents" becomes "/home/user/documents")
    
    Path matching is case-sensitive or case-insensitive depending on the underlying filesystem.
    On Windows, matching is typically case-insensitive, while on Unix-like systems it's case-sensitive.
    
    The function handles permission errors gracefully - if a directory cannot be accessed due to
    permission restrictions, it will be excluded from results without raising an exception.
    
    Args:
        current_value: The current path string provided by the user (can be empty, partial, or complete)
        **context: Additional context that may influence completions:
            - working_directory: Optional alternative directory to use as the base for relative paths
            - file_extensions: Optional list of file extensions to filter results (e.g., [".py", ".txt"])
            - include_hidden: Optional boolean to include hidden files/directories (default: False)
        
    Returns:
        List of path suggestions that match or extend the current_value. Each suggestion is formatted as:
        - Regular files: The full path to the file (e.g., "./src/main.py")
        - Directories: The full path with a trailing slash (e.g., "./src/utils/")
        
    Examples:
        - Input: "" → Output: ["./", "../", "/"]
        - Input: "doc" → Output: ["documents/", "docker-compose.yml", "dockerfile"]
        - Input: "~/Down" → Output: ["/home/user/Downloads/"]
        - Input: "./src/" → Output: ["./src/main.py", "./src/utils/", "./src/tests/"]
        
    Edge Cases:
        - Symlinks: Followed to their target with the symlink path in the results
        - Special files: Included in results, treated as regular files
        - Non-existent paths: Returns partial matches based on the parent directory, if any
        - Permission errors: Silently skips directories that cannot be accessed
        
    Notes:
        - Results are capped at 100 items to comply with MCP specification
        - Directory suggestions always end with a trailing slash
        - The function handles filesystem errors gracefully, returning empty list on access errors
    """
    import glob
    import os
    
    # Handle empty value
    if not current_value:
        return ["./", "../", "/"]
        
    # Expand user directory if needed
    path = os.path.expanduser(current_value)
    
    # Get the directory to search in
    directory = os.path.dirname(path) if os.path.basename(path) else path
    if not directory:
        directory = "."
        
    # Get matching files/directories
    pattern = os.path.join(directory, f"{os.path.basename(path)}*")
    matches = glob.glob(pattern)
    
    # Format results
    results = []
    for match in matches:
        if os.path.isdir(match):
            results.append(f"{match}/")
        else:
            results.append(match)
            
    return results


# Example completions for common arguments
"""
Predefined completion provider for common argument types used across multiple tools.

This global provider instance contains standardized suggestion lists for frequently
used arguments in the MCP system. It serves as a convenient default provider that
can be registered with the CompletionRegistry for tools that don't need specialized
completion providers.

The included completion lists cover:
- provider: Common LLM provider names (e.g., "openai", "anthropic")
- model: Popular model identifiers from various providers
- format: Standard data formats for input/output
- source_type: Common data source types for analysis tools
- analysis_type: Standard categories of analysis operations

Usage example:
```python
# Set as the default provider in a registry to handle common arguments
registry = CompletionRegistry()
registry.set_default_provider(COMMON_COMPLETIONS)

# Later, even for tools without specific providers, common arguments will work:
await registry.get_completions(
    tool_name="any_tool",
    argument_name="provider",
    current_value="open"  # Will match "openai"
)
```

This provider can be extended with additional arguments by creating a new
StaticCompletionProvider that combines these defaults with tool-specific completions.
"""
COMMON_COMPLETIONS = StaticCompletionProvider({
    "provider": ["openai", "anthropic", "gemini", "mistral", "custom"],
    "model": [
        "gpt-4-turbo", "gpt-4o", "claude-3-5-sonnet", "claude-3-opus", 
        "gemini-1.5-pro", "gemini-1.5-flash", "mistral-large"
    ],
    "format": ["json", "text", "markdown", "html", "csv"],
    "source_type": ["csv", "json", "excel", "database", "api"],
    "analysis_type": ["general", "sentiment", "entities", "summary"]
}) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/ums_api/ums_services.py:
--------------------------------------------------------------------------------

```python
"""Business logic and service functions for UMS API."""

import json
import math
import sqlite3
from collections import Counter, defaultdict, deque
from datetime import datetime
from pathlib import Path
from threading import Lock
from typing import Any, Dict, List, Optional

from .ums_models import (
    MemoryDetail,
    PreviewMemory,
    CriticalPathAction,
    FlameGraphNode,
)
from .ums_database import get_db_connection


# ---------- Utility Functions ----------

def format_file_size(size_bytes: int) -> str:
    """Format file size in human readable format"""
    if size_bytes == 0:
        return "0 B"

    size_names = ["B", "KB", "MB", "GB", "TB"]
    i = int(math.floor(math.log(size_bytes, 1024)))
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return f"{s} {size_names[i]}"


def _dict_depth(d: Dict[str, Any], depth: int = 0) -> int:
    if not isinstance(d, dict) or not d:
        return depth
    return max(_dict_depth(v, depth + 1) for v in d.values())


def _count_values(d: Dict[str, Any]) -> int:
    cnt = 0
    for v in d.values():
        if isinstance(v, dict):
            cnt += _count_values(v)
        elif isinstance(v, list):
            cnt += len(v)
        else:
            cnt += 1
    return cnt


def calculate_state_complexity(state_data: Dict[str, Any]) -> float:
    if not state_data:
        return 0.0
    comp = (
        len(state_data) * 5 + _dict_depth(state_data) * 10 + _count_values(state_data) * 0.5
    )
    return round(min(100.0, comp), 2)


def compute_state_diff(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
    diff = {"added": {}, "removed": {}, "modified": {}, "magnitude": 0.0}
    keys = set(a) | set(b)
    changed = 0
    for k in keys:
        if k not in a:
            diff["added"][k] = b[k]
            changed += 1
        elif k not in b:
            diff["removed"][k] = a[k]
            changed += 1
        elif a[k] != b[k]:
            diff["modified"][k] = {"before": a[k], "after": b[k]}
            changed += 1
    if keys:
        diff["magnitude"] = (changed / len(keys)) * 100
    return diff


# ---------- Action Monitor Helper Functions ----------

def get_action_status_indicator(status: str, execution_time: float) -> dict:
    """Get status indicator with color and icon for action status"""
    indicators = {
        "running": {"color": "blue", "icon": "play", "label": "Running"},
        "executing": {"color": "blue", "icon": "cpu", "label": "Executing"},
        "in_progress": {"color": "orange", "icon": "clock", "label": "In Progress"},
        "completed": {"color": "green", "icon": "check", "label": "Completed"},
        "failed": {"color": "red", "icon": "x", "label": "Failed"},
        "cancelled": {"color": "gray", "icon": "stop", "label": "Cancelled"},
        "timeout": {"color": "yellow", "icon": "timer-off", "label": "Timeout"},
    }

    indicator = indicators.get(
        status, {"color": "gray", "icon": "help", "label": "Unknown"}
    )

    # Add urgency flag for long-running actions
    if (
        status in ["running", "executing", "in_progress"] and execution_time > 120
    ):  # 2 minutes
        indicator["urgency"] = "high"
    elif (
        status in ["running", "executing", "in_progress"] and execution_time > 60
    ):  # 1 minute
        indicator["urgency"] = "medium"
    else:
        indicator["urgency"] = "low"

    return indicator


def categorize_action_performance(execution_time: float, estimated_duration: float) -> str:
    """Categorize action performance based on execution time vs estimate"""
    if estimated_duration <= 0:
        return "unknown"

    ratio = execution_time / estimated_duration

    if ratio <= 0.5:
        return "excellent"
    elif ratio <= 0.8:
        return "good"
    elif ratio <= 1.2:
        return "acceptable"
    elif ratio <= 2.0:
        return "slow"
    else:
        return "very_slow"


def get_action_resource_usage(action_id: str) -> dict:
    """Get resource usage for an action (placeholder implementation)"""
    # This is a placeholder - in a real implementation, you'd fetch actual metrics
    return {"cpu_usage": 0.0, "memory_usage": 0.0, "network_io": 0.0, "disk_io": 0.0}


def estimate_wait_time(position: int, queue: list) -> float:
    """Estimate wait time based on queue position and historical data"""
    if position == 0:
        return 0.0
    # Average action time of 30 seconds (this could be calculated from historical data)
    avg_action_time = 30.0
    return position * avg_action_time


def get_priority_label(priority: int) -> str:
    """Get human-readable priority label"""
    if priority <= 1:
        return "Critical"
    elif priority <= 3:
        return "High"
    elif priority <= 5:
        return "Normal"
    elif priority <= 7:
        return "Low"
    else:
        return "Very Low"


def calculate_action_performance_score(action: dict) -> float:
    """Calculate performance score for a completed action"""
    if action["status"] != "completed":
        return 0.0

    execution_time = action.get("execution_duration", 0)
    if execution_time <= 0:
        return 100.0

    if execution_time <= 5:
        return 100.0
    elif execution_time <= 15:
        return 90.0
    elif execution_time <= 30:
        return 80.0
    elif execution_time <= 60:
        return 70.0
    elif execution_time <= 120:
        return 60.0
    else:
        return max(50.0, 100.0 - (execution_time / 10))


def calculate_efficiency_rating(execution_time: float, result_size: int) -> str:
    """Calculate efficiency rating based on time and output"""
    if execution_time <= 0:
        return "unknown"

    efficiency_score = result_size / execution_time if execution_time > 0 else 0

    if efficiency_score >= 100:
        return "excellent"
    elif efficiency_score >= 50:
        return "good"
    elif efficiency_score >= 20:
        return "fair"
    else:
        return "poor"


def calculate_performance_summary(actions: list) -> dict:
    """Calculate performance summary from action history"""
    if not actions:
        return {
            "avg_score": 0.0,
            "top_performer": None,
            "worst_performer": None,
            "efficiency_distribution": {},
        }

    scores = [a.get("performance_score", 0) for a in actions]
    avg_score = sum(scores) / len(scores)

    best_action = max(actions, key=lambda a: a.get("performance_score", 0))
    worst_action = min(actions, key=lambda a: a.get("performance_score", 0))

    efficiency_counts = Counter(a.get("efficiency_rating", "unknown") for a in actions)

    return {
        "avg_score": round(avg_score, 2),
        "top_performer": {
            "tool_name": best_action.get("tool_name", ""),
            "score": best_action.get("performance_score", 0),
        },
        "worst_performer": {
            "tool_name": worst_action.get("tool_name", ""),
            "score": worst_action.get("performance_score", 0),
        },
        "efficiency_distribution": dict(efficiency_counts),
    }


def generate_performance_insights(
    overall_stats: dict, tool_stats: list, hourly_metrics: list
) -> list:
    """Generate actionable performance insights"""
    insights = []

    success_rate = (
        overall_stats.get("successful_actions", 0) / overall_stats.get("total_actions", 1)
    ) * 100
    if success_rate < 80:
        insights.append(
            {
                "type": "warning",
                "title": "Low Success Rate",
                "message": f"Current success rate is {success_rate:.1f}%. Consider investigating failing tools.",
                "severity": "high",
            }
        )

    if tool_stats:
        slowest_tool = max(tool_stats, key=lambda t: t.get("avg_duration", 0))
        if slowest_tool.get("avg_duration", 0) > 60:
            insights.append(
                {
                    "type": "info",
                    "title": "Performance Optimization",
                    "message": f"{slowest_tool['tool_name']} is taking {slowest_tool['avg_duration']:.1f}s on average. Consider optimization.",
                    "severity": "medium",
                }
            )

    if hourly_metrics:
        peak_hour = max(hourly_metrics, key=lambda h: h.get("action_count", 0))
        insights.append(
            {
                "type": "info",
                "title": "Peak Usage",
                "message": f"Peak usage occurs at {peak_hour['hour']}:00 with {peak_hour['action_count']} actions.",
                "severity": "low",
            }
        )

    return insights


# ---------- Memory Quality Functions ----------

def find_cognitive_patterns(
    states: List[Dict[str, Any]], min_length: int, similarity_threshold: float
) -> List[Dict[str, Any]]:
    """Find recurring patterns in cognitive states"""
    patterns = []
    type_sequences = defaultdict(list)
    for state in states:
        type_sequences[state["state_type"]].append(state)
    for state_type, sequence in type_sequences.items():
        if len(sequence) >= min_length * 2:
            for length in range(min_length, len(sequence) // 2 + 1):
                for start in range(len(sequence) - length * 2 + 1):
                    subseq1 = sequence[start : start + length]
                    subseq2 = sequence[start + length : start + length * 2]
                    similarity = calculate_sequence_similarity(subseq1, subseq2)
                    if similarity >= similarity_threshold:
                        patterns.append(
                            {
                                "type": f"repeating_{state_type}",
                                "length": length,
                                "similarity": similarity,
                                "occurrences": 2,
                                "first_occurrence": subseq1[0]["timestamp"],
                                "pattern_description": f"Repeating {state_type} sequence of {length} states",
                            }
                        )
    return sorted(patterns, key=lambda p: p["similarity"], reverse=True)


def calculate_sequence_similarity(
    seq1: List[Dict[str, Any]], seq2: List[Dict[str, Any]]
) -> float:
    """Calculate similarity between two state sequences"""
    if len(seq1) != len(seq2):
        return 0.0
    total_similarity = 0.0
    for s1, s2 in zip(seq1, seq2, strict=False):
        state_sim = calculate_single_state_similarity(s1, s2)
        total_similarity += state_sim
    return total_similarity / len(seq1)


def calculate_single_state_similarity(
    state1: Dict[str, Any], state2: Dict[str, Any]
) -> float:
    """Calculate similarity between two individual states"""
    data1 = state1.get("state_data", {})
    data2 = state2.get("state_data", {})
    if not data1 and not data2:
        return 1.0
    if not data1 or not data2:
        return 0.0
    keys1 = set(data1.keys())
    keys2 = set(data2.keys())
    key_similarity = len(keys1 & keys2) / len(keys1 | keys2) if keys1 | keys2 else 1.0
    common_keys = keys1 & keys2
    value_similarity = 0.0
    if common_keys:
        matching_values = sum(1 for key in common_keys if data1[key] == data2[key])
        value_similarity = matching_values / len(common_keys)
    return (key_similarity + value_similarity) / 2


def analyze_state_transitions(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Analyze transitions between cognitive states"""
    transitions = defaultdict(int)
    for i in range(len(states) - 1):
        current_type = states[i]["state_type"]
        next_type = states[i + 1]["state_type"]
        transition = f"{current_type} → {next_type}"
        transitions[transition] += 1
    sorted_transitions = sorted(transitions.items(), key=lambda x: x[1], reverse=True)
    return [
        {
            "transition": transition,
            "count": count,
            "percentage": (count / (len(states) - 1)) * 100 if len(states) > 1 else 0,
        }
        for transition, count in sorted_transitions
    ]


def detect_cognitive_anomalies(states: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Detect anomalous cognitive states"""
    anomalies = []
    if len(states) < 3:
        return anomalies
    complexities = [calculate_state_complexity(s.get("state_data", {})) for s in states]
    avg_complexity = sum(complexities) / len(complexities)
    std_complexity = (
        sum((c - avg_complexity) ** 2 for c in complexities) / len(complexities)
    ) ** 0.5
    for i, state in enumerate(states):
        complexity = complexities[i]
        z_score = (
            (complexity - avg_complexity) / std_complexity if std_complexity > 0 else 0
        )
        if abs(z_score) > 2:
            anomalies.append(
                {
                    "state_id": state["state_id"],
                    "timestamp": state["timestamp"],
                    "anomaly_type": "complexity_outlier",
                    "z_score": z_score,
                    "description": f"Unusual complexity: {complexity:.1f} (avg: {avg_complexity:.1f})",
                    "severity": "high" if abs(z_score) > 3 else "medium",
                }
            )
    return anomalies


# ---------- Working Memory System ----------

class WorkingMemorySystem:
    """
    Working memory system for managing active memories with focus capabilities.
    
    This system maintains a pool of recent memories with relevance scoring
    and focus mode for filtering based on keywords or patterns.
    """
    
    def __init__(self, capacity: int = 100, focus_threshold: float = 0.7):
        self.capacity = capacity
        self.focus_threshold = focus_threshold
        self.memory_pool = deque(maxlen=capacity)
        self.focus_mode_enabled = False
        self.focus_keywords = []
        self.memory_index = {}  # memory_id -> memory mapping
        self.category_index = defaultdict(list)  # category -> [memory_ids]
        self.access_counts = defaultdict(int)  # memory_id -> access count
        self.relevance_scores = {}  # memory_id -> relevance score
        self.initialized_at = datetime.now()
        self.last_optimization = datetime.now()
        self.optimization_count = 0
        
    def add_memory(self, memory_id: str, content: str, category: str, importance: float = 5.0):
        """Add a memory to the working pool"""
        memory = {
            'memory_id': memory_id,
            'content': content,
            'category': category,
            'importance': importance,
            'added_at': datetime.now().timestamp(),
            'last_accessed': datetime.now().timestamp()
        }
        
        # Remove old memory if exists
        if memory_id in self.memory_index:
            self.remove_memory(memory_id)
        
        # Add to pool
        self.memory_pool.append(memory)
        self.memory_index[memory_id] = memory
        self.category_index[category].append(memory_id)
        
        # Calculate initial relevance
        self._calculate_relevance(memory)
        
    def remove_memory(self, memory_id: str):
        """Remove a memory from the working pool"""
        if memory_id in self.memory_index:
            memory = self.memory_index[memory_id]
            self.memory_pool.remove(memory)
            del self.memory_index[memory_id]
            self.category_index[memory['category']].remove(memory_id)
            if memory_id in self.relevance_scores:
                del self.relevance_scores[memory_id]
            if memory_id in self.access_counts:
                del self.access_counts[memory_id]
    
    def access_memory(self, memory_id: str):
        """Record memory access and update relevance"""
        if memory_id in self.memory_index:
            self.access_counts[memory_id] += 1
            self.memory_index[memory_id]['last_accessed'] = datetime.now().timestamp()
            self._calculate_relevance(self.memory_index[memory_id])
    
    def set_focus_mode(self, enabled: bool, keywords: List[str] = None):
        """Enable or disable focus mode with optional keywords"""
        self.focus_mode_enabled = enabled
        self.focus_keywords = keywords or []
        
        # Recalculate relevance for all memories
        for memory in self.memory_pool:
            self._calculate_relevance(memory)
    
    def _calculate_relevance(self, memory: dict):
        """Calculate relevance score for a memory"""
        base_score = memory['importance'] / 10.0  # Normalize to 0-1
        
        # Recency factor
        age_hours = (datetime.now().timestamp() - memory['added_at']) / 3600
        recency_factor = max(0.1, 1.0 - (age_hours / 24))  # Decay over 24 hours
        
        # Access frequency factor
        access_factor = min(1.0, self.access_counts[memory['memory_id']] / 10.0)
        
        # Focus mode factor
        focus_factor = 1.0
        if self.focus_mode_enabled and self.focus_keywords:
            content_lower = memory['content'].lower()
            keyword_matches = sum(1 for kw in self.focus_keywords if kw.lower() in content_lower)
            focus_factor = min(2.0, 1.0 + (keyword_matches * 0.5))
        
        # Calculate final score
        relevance = base_score * recency_factor * (0.5 + 0.5 * access_factor) * focus_factor
        self.relevance_scores[memory['memory_id']] = min(1.0, relevance)
    
    def get_active_memories(self, limit: int = None) -> List[dict]:
        """Get active memories sorted by relevance"""
        memories = list(self.memory_pool)
        
        # Filter by focus threshold if in focus mode
        if self.focus_mode_enabled:
            memories = [m for m in memories if self.relevance_scores.get(m['memory_id'], 0) >= self.focus_threshold]
        
        # Sort by relevance
        memories.sort(key=lambda m: self.relevance_scores.get(m['memory_id'], 0), reverse=True)
        
        if limit:
            memories = memories[:limit]
        
        return memories
    
    def get_statistics(self) -> dict:
        """Get working memory statistics"""
        active_memories = self.get_active_memories()
        
        # Category distribution
        category_dist = {}
        for category, memory_ids in self.category_index.items():
            category_dist[category] = len(memory_ids)
        
        # Calculate average relevance
        relevance_values = list(self.relevance_scores.values())
        avg_relevance = sum(relevance_values) / len(relevance_values) if relevance_values else 0
        
        return {
            'total_memories': len(self.memory_pool),
            'active_memories': len(active_memories),
            'capacity_used': len(self.memory_pool) / self.capacity * 100,
            'avg_relevance_score': avg_relevance,
            'category_distribution': category_dist,
            'total_accesses': sum(self.access_counts.values()),
            'optimization_suggestions': self._get_optimization_suggestions()
        }
    
    def _get_optimization_suggestions(self) -> int:
        """Count optimization suggestions"""
        suggestions = 0
        
        # Check for low relevance memories
        low_relevance = sum(1 for score in self.relevance_scores.values() if score < 0.3)
        if low_relevance > self.capacity * 0.2:  # More than 20% low relevance
            suggestions += 1
        
        # Check for stale memories
        now = datetime.now().timestamp()
        stale_memories = sum(1 for m in self.memory_pool if (now - m['last_accessed']) > 3600)  # 1 hour
        if stale_memories > self.capacity * 0.3:  # More than 30% stale
            suggestions += 1
        
        # Check for unbalanced categories
        if self.category_index:
            sizes = [len(ids) for ids in self.category_index.values()]
            if max(sizes) > sum(sizes) * 0.5:  # One category has more than 50%
                suggestions += 1
        
        return suggestions
    
    def optimize(self):
        """Optimize working memory by removing low-relevance memories"""
        # Remove memories below threshold
        to_remove = [
            m['memory_id'] for m in self.memory_pool 
            if self.relevance_scores.get(m['memory_id'], 0) < 0.2
        ]
        
        for memory_id in to_remove:
            self.remove_memory(memory_id)
        
        self.last_optimization = datetime.now()
        self.optimization_count += 1
        
        return len(to_remove)


# Global working memory instance
_working_memory_system = None
_working_memory_lock = Lock()


def get_working_memory_system() -> WorkingMemorySystem:
    """Get or create the global working memory system instance"""
    global _working_memory_system
    
    with _working_memory_lock:
        if _working_memory_system is None:
            _working_memory_system = WorkingMemorySystem()
        return _working_memory_system


# ---------- Timeline Analysis Functions ----------

def generate_timeline_segments(
    timeline_data: List[Dict[str, Any]], granularity: str, hours: int
) -> List[Dict[str, Any]]:
    """Generate timeline segments summarising state counts / complexity over time."""
    if not timeline_data:
        return []

    start_ts = min(item["timestamp"] for item in timeline_data)
    end_ts = max(item["timestamp"] for item in timeline_data)

    seg_seconds = 1 if granularity == "second" else 60 if granularity == "minute" else 3600
    segments: List[Dict[str, Any]] = []
    current = start_ts

    while current < end_ts:
        seg_end = current + seg_seconds
        seg_states = [it for it in timeline_data if current <= it["timestamp"] < seg_end]
        if seg_states:
            segments.append(
                {
                    "start_time": current,
                    "end_time": seg_end,
                    "state_count": len(seg_states),
                    "avg_complexity": sum(s["complexity_score"] for s in seg_states)
                    / len(seg_states),
                    "max_change_magnitude": max(s["change_magnitude"] for s in seg_states),
                    "dominant_type": Counter(
                        s["state_type"] for s in seg_states
                    ).most_common(1)[0][0],
                }
            )
        current = seg_end
    return segments


def calculate_timeline_stats(timeline_data: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Return aggregate stats about timeline complexity / changes."""
    if not timeline_data:
        return {}

    complexities = [it["complexity_score"] for it in timeline_data]
    changes = [it["change_magnitude"] for it in timeline_data if it["change_magnitude"] > 0]
    stypes = Counter(it["state_type"] for it in timeline_data)
    return {
        "avg_complexity": sum(complexities) / len(complexities),
        "max_complexity": max(complexities),
        "avg_change_magnitude": (sum(changes) / len(changes)) if changes else 0,
        "max_change_magnitude": max(changes) if changes else 0,
        "most_common_type": stypes.most_common(1)[0][0] if stypes else None,
        "type_distribution": dict(stypes),
    }


# ---------- Flame Graph Functions ----------

def build_flame_graph_structure(actions: List[Dict], workflow_id: str) -> Dict:
    """Build hierarchical flame graph structure from actions"""
    total_duration = sum(action.get('duration', 0) for action in actions if action.get('duration'))
    
    flame_graph_data = {
        'name': f'Workflow {workflow_id}',
        'value': total_duration,
        'children': []
    }
    
    # Group actions by tool for flame graph hierarchy
    tool_groups = {}
    for action in actions:
        tool_name = action.get('tool_name', 'unknown')
        if tool_name not in tool_groups:
            tool_groups[tool_name] = []
        tool_groups[tool_name].append(action)
    
    # Build hierarchical structure
    for tool_name, tool_actions in tool_groups.items():
        tool_duration = sum(action.get('duration', 0) for action in tool_actions if action.get('duration'))
        
        tool_node = {
            'name': tool_name,
            'value': tool_duration,
            'children': []
        }
        
        # Add individual actions as children
        for action in tool_actions:
            if action.get('duration'):
                action_node = {
                    'name': f"Action {action['action_id']}",
                    'value': action['duration'],
                    'action_id': action['action_id'],
                    'status': action.get('status'),
                    'reasoning': action.get('reasoning', ''),
                    'started_at': action.get('started_at'),
                    'completed_at': action.get('completed_at')
                }
                tool_node['children'].append(action_node)
        
        flame_graph_data['children'].append(tool_node)
    
    return flame_graph_data


def calculate_critical_path(actions: List[Dict]) -> List[Dict]:
    """Calculate the critical path through the workflow"""
    if not actions:
        return []
    
    # Sort actions by start time
    sorted_actions = sorted(actions, key=lambda x: x.get('started_at', 0))
    
    critical_path = []
    current_time = min(action['started_at'] for action in sorted_actions if action.get('started_at'))
    workflow_end = max(action['completed_at'] for action in sorted_actions if action.get('completed_at'))
    
    while current_time < workflow_end:
        # Find action that was running at current_time and ends latest
        running_actions = [
            a for a in sorted_actions 
            if a.get('started_at', 0) <= current_time and a.get('completed_at', 0) > current_time
        ]
        
        if running_actions:
            # Find the action that ends latest (most critical)
            critical_action = max(running_actions, key=lambda x: x.get('completed_at', 0))
            if critical_action not in [cp['action_id'] for cp in critical_path]:
                critical_path.append({
                    'action_id': critical_action['action_id'],
                    'tool_name': critical_action.get('tool_name'),
                    'duration': critical_action.get('duration', 0),
                    'start_time': critical_action.get('started_at'),
                    'end_time': critical_action.get('completed_at')
                })
            current_time = critical_action.get('completed_at', current_time + 1)
        else:
            # No action running, find next action start
            future_actions = [a for a in sorted_actions if a.get('started_at', 0) > current_time]
            if future_actions:
                current_time = min(a['started_at'] for a in future_actions)
            else:
                break
    
    return critical_path


def convert_to_model(node: Dict) -> FlameGraphNode:
    """Convert flame graph dictionary to Pydantic model"""
    return FlameGraphNode(
        name=node['name'],
        value=node['value'],
        children=[convert_to_model(child) for child in node.get('children', [])],
        action_id=node.get('action_id'),
        status=node.get('status'),
        reasoning=node.get('reasoning'),
        started_at=node.get('started_at'),
        completed_at=node.get('completed_at')
    )


# ---------- Performance Recommendation Functions ----------

def calculate_tool_reliability_score(tool_stats: dict) -> float:
    """Calculate reliability score for a tool"""
    total_calls = tool_stats.get('total_calls', 0)
    successful_calls = tool_stats.get('successful_calls', 0)
    
    if total_calls == 0:
        return 0.0
    
    success_rate = successful_calls / total_calls
    volume_factor = min(1.0, total_calls / 100)  # Normalize by 100 calls
    
    return round(success_rate * volume_factor * 100, 2)


def categorize_tool_performance(avg_execution_time: float) -> str:
    """Categorize tool performance based on average execution time"""
    if avg_execution_time is None:
        return 'unknown'
    
    if avg_execution_time <= 5:
        return 'fast'
    elif avg_execution_time <= 15:
        return 'normal'
    elif avg_execution_time <= 30:
        return 'slow'
    else:
        return 'very_slow' 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/services/prompts.py:
--------------------------------------------------------------------------------

```python
"""Prompt template service for managing and rendering prompt templates."""
import asyncio
import json
import os
import threading
from pathlib import Path
from typing import Any, Dict, Optional

from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.exceptions import PromptTemplateError
from ultimate_mcp_server.utils.logging import get_logger

logger = get_logger(__name__)

# Singleton instance
_prompt_service = None


def get_prompt_service():
    """Get the global prompt service instance."""
    global _prompt_service
    if _prompt_service is None:
        _prompt_service = PromptService()
    return _prompt_service


class PromptService:
    """
    Service for managing, storing, rendering, and versioning prompt templates.
    
    The PromptService provides a centralized system for handling prompt templates
    used throughout the MCP application. It manages the entire lifecycle of prompt
    templates, from loading them from disk to rendering them with variables for use
    with language models. The service provides both persistent storage and runtime
    management of templates.
    
    Key Features:
    - File-based template storage using both .txt and .json formats
    - Runtime template registration and modification
    - Variable substitution for dynamic prompt generation
    - Categorization of templates for organizational purposes
    - Asynchronous persistence to avoid blocking operations
    - Error handling and logging for template issues
    
    Template Organization:
    Templates are organized using a naming convention where the prefix before the
    first underscore represents the category (e.g., "rag_query" belongs to the "rag"
    category). This categorization is used when saving templates to disk, with each
    category stored in its own JSON file.
    
    File Formats:
    - Individual .txt files: One template per file, filename is the template name
    - JSON files: Multiple templates in a single file, typically grouped by category
    
    This service employs a singleton pattern, ensuring only one instance exists
    across the application. Always use the get_prompt_service() or get_prompt_manager()
    functions to access it, rather than instantiating directly.
    
    Usage Example:
    ```python
    # Get the service
    prompt_service = get_prompt_service()
    
    # Retrieve a template
    template = prompt_service.get_template("rag_query")
    
    # Register a new template
    prompt_service.register_template(
        "greeting",
        "Hello {name}, welcome to {service_name}!"
    )
    
    # Render a template with variables
    greeting = prompt_service.render_template(
        "greeting",
        {"name": "Alice", "service_name": "Ultimate MCP"}
    )
    ```
    
    Note:
        All file operations are handled with proper error handling to ensure
        the service continues functioning even if individual template files
        are corrupted or missing.
    """
    
    def __init__(self):
        """Initialize the prompt service.
        
        Args:
            templates_dir: Directory containing template files
        """
        self.templates: Dict[str, str] = {}
        try:
            config = get_config()
            self.templates_dir = config.prompt_templates_directory
            logger.info(f"Initializing PromptService. Looking for templates in: {self.templates_dir}")
            self._load_templates()
        except Exception as e:
            logger.error(f"Failed to initialize PromptService: {e}", exc_info=True)
            # Allow service to exist even if loading fails, get_template will raise errors
        
        # Create templates directory if it doesn't exist
        os.makedirs(self.templates_dir, exist_ok=True)
        
        # Read templates from files
        self._read_templates()
        logger.info(f"Prompt service initialized with {len(self.templates)} templates")
    
    def _load_templates(self):
        """
        Load prompt templates from individual .txt files in the templates directory.
        
        This method scans the configured templates directory for .txt files and loads
        each file as a separate template. It uses the filename (without extension)
        as the template name and the file content as the template text. This provides
        a simple way to manage templates as individual files, which can be useful for
        version control and template organization.
        
        The loading process:
        1. Verifies the templates directory exists and is accessible
        2. Scans for all .txt files using glob pattern matching
        3. For each file:
           - Extracts the template name from the filename
           - Reads the file content as the template text
           - Adds the template to the in-memory template dictionary
           - Logs the successful load
        4. Handles exceptions for each file individually to prevent a single corrupted
           file from blocking all template loading
        5. Logs summary information about the loading process
        
        This approach allows templates to be:
        - Managed individually in separate files
        - Edited directly using text editors
        - Organized in a flat structure for simplicity
        - Added or removed without changing code
        
        The method is called during service initialization but can be called again
        to refresh templates from disk if needed.
        
        Note:
            This method only processes .txt files. JSON format templates are handled
            by the separate _read_templates method. Both methods work together to 
            provide a complete template loading solution.
        """
        if not Path(self.templates_dir).is_dir():
            logger.warning(f"Prompt templates directory not found or not a directory: {self.templates_dir}")
            return

        loaded_count = 0
        for filepath in Path(self.templates_dir).glob('*.txt'):
            try:
                template_name = filepath.stem # Use filename without extension as name
                with open(filepath, 'r', encoding='utf-8') as f:
                    content = f.read()
                self.templates[template_name] = content
                logger.debug(f"Loaded prompt template: {template_name}")
                loaded_count += 1
            except Exception as e:
                logger.error(f"Failed to load prompt template {filepath.name}: {e}")

        if loaded_count > 0:
             logger.info(f"Successfully loaded {loaded_count} prompt templates.")
        else:
             logger.info("No prompt templates found or loaded.")
    
    def _read_templates(self) -> None:
        """
        Load prompt templates from JSON files in the templates directory.
        
        This method complements _load_templates by handling template collections
        stored in JSON format. It scans for .json files in the templates directory
        and processes each file to extract multiple templates. Each JSON file can
        contain multiple templates, organized as a dictionary with template names
        as keys and template content as values.
        
        The loading process:
        1. Scans the templates directory for all .json files
        2. For each JSON file:
           - Parses the JSON content
           - Extracts each key-value pair as a template name and content
           - Handles both simple string templates and structured template objects
           - Adds valid templates to the in-memory template collection
        3. Logs detailed information about successful and failed loads
        
        Template JSON Format Support:
        - Simple format: `{"template_name": "Template content with {variables}"}` 
        - Structured format: `{"template_name": {"text": "Template content", ...}}`
          where the template text is extracted from the "text" field
        
        The JSON format is particularly useful for:
        - Storing multiple related templates in a single file
        - Organizing templates by category or function
        - Including metadata or configuration alongside templates
        - Efficiently managing large collections of templates
        
        Error Handling:
        - Each JSON file is processed independently, so errors in one file won't
          prevent loading from other files
        - Invalid template formats trigger warnings but don't halt processing
        - JSON parse errors are logged with file information for easier debugging
        
        Note:
            This method works in conjunction with _load_templates to provide a
            comprehensive template loading system supporting both individual
            .txt files and collections in .json files.
        """
        try:
            template_files = list(Path(self.templates_dir).glob("*.json"))
            logger.info(f"Found {len(template_files)} template files")
            
            for template_file in template_files:
                try:
                    with open(template_file, "r", encoding="utf-8") as f:
                        templates_data = json.load(f)
                    
                    # Add templates from file
                    for template_name, template_content in templates_data.items():
                        if isinstance(template_content, str):
                            self.templates[template_name] = template_content
                        elif isinstance(template_content, dict) and "text" in template_content:
                            self.templates[template_name] = template_content["text"]
                        else:
                            logger.warning(f"Invalid template format for {template_name}")
                            
                    logger.info(f"Loaded templates from {template_file.name}")
                except Exception as e:
                    logger.error(f"Error loading template file {template_file.name}: {str(e)}")
        except Exception as e:
            logger.error(f"Error reading templates: {str(e)}")
    
    def _save_templates(self) -> None:
        """
        Persist all in-memory templates to disk in categorized JSON files.
        
        This method implements the template persistence strategy, organizing templates
        by category and saving them to appropriate JSON files on disk. It ensures that
        any runtime changes to templates (additions, modifications, or deletions) are
        preserved across application restarts.
        
        The saving process:
        1. Groups templates into categories based on naming conventions
           - Extracts the category from the template name (prefix before first underscore)
           - Templates without underscores go to the "general" category
        2. For each category:
           - Creates or updates a JSON file named "{category}_templates.json"
           - Writes all templates in that category as a formatted JSON object
           - Uses proper indentation for human readability
        3. Logs detailed information about the save operation
        
        Template Categorization:
        The method uses a convention-based approach to categorize templates:
        - "rag_query" → Category: "rag", saved to "rag_templates.json"
        - "chat_system" → Category: "chat", saved to "chat_templates.json"
        - "greeting" → Category: "general", saved to "general_templates.json"
        
        This categorization approach:
        - Keeps related templates together for easier management
        - Avoids a single monolithic file for all templates
        - Makes it easier to locate templates by purpose
        - Reduces the chance of merge conflicts in version control
        
        Error Handling:
        - The entire save operation is wrapped in exception handling to prevent
          crashes due to file system issues
        - Detailed error information is logged for debugging
        - Even if saving fails, the in-memory templates remain intact
        
        Note:
            This method is called both directly and asynchronously through
            _async_save_templates to provide both immediate and non-blocking
            persistence options.
        """
        try:
            # Group templates by category
            categorized_templates: Dict[str, Dict[str, Any]] = {}
            
            for template_name, template_text in self.templates.items():
                # Extract category from template name (before first _)
                parts = template_name.split("_", 1)
                category = parts[0] if len(parts) > 1 else "general"
                
                if category not in categorized_templates:
                    categorized_templates[category] = {}
                
                categorized_templates[category][template_name] = template_text
            
            # Save each category to its own file
            for category, templates in categorized_templates.items():
                file_path = Path(self.templates_dir) / f"{category}_templates.json"
                
                with open(file_path, "w", encoding="utf-8") as f:
                    json.dump(templates, f, indent=2)
                
                logger.info(f"Saved {len(templates)} templates to {file_path.name}")
                
        except Exception as e:
            logger.error(f"Error saving templates: {str(e)}")
    
    def get_template(self, template_name: str) -> Optional[str]:
        """
        Retrieve a specific prompt template by its name.
        
        This method provides access to individual templates stored in the service.
        It performs a simple dictionary lookup, returning the template text if found
        or None if the requested template doesn't exist in the collection.
        
        The lookup is exact and case-sensitive, with no fuzzy matching or fallback
        behavior. This design ensures predictable template resolution, which is
        important for maintaining consistent prompt behavior in production systems.
        
        Args:
            template_name: The exact name of the template to retrieve
            
        Returns:
            The template text as a string if found, None if not found
            
        Usage Example:
        ```python
        template = prompt_service.get_template("rag_query")
        if template:
            # Template found, use it
            prompt = template.format(query="What is machine learning?")
        else:
            # Template not found, handle the error
            logger.error(f"Template 'rag_query' not found")
            prompt = "Default fallback prompt: {query}"
        ```
        
        Note:
            Consider checking the return value for None before using the template,
            or use a default template as a fallback to handle missing templates
            gracefully.
        """
        return self.templates.get(template_name)
    
    def get_all_templates(self) -> Dict[str, str]:
        """
        Retrieve a copy of all available prompt templates.
        
        This method returns a dictionary containing all templates currently loaded
        in the service, with template names as keys and template texts as values.
        The returned dictionary is a shallow copy of the internal templates collection,
        ensuring that modifications to the returned dictionary won't affect the
        service's template storage.
        
        Use cases for this method include:
        - Listing available templates in an admin interface
        - Analyzing or processing multiple templates at once
        - Creating a template catalog or documentation
        - Debugging template availability issues
        
        Returns:
            A dictionary mapping template names to their content
            
        Usage Example:
        ```python
        all_templates = prompt_service.get_all_templates()
        
        # Display available templates
        print(f"Available templates ({len(all_templates)}): ")
        for name in sorted(all_templates.keys()):
            print(f" - {name}")
            
        # Find templates by pattern
        rag_templates = {
            name: content 
            for name, content in all_templates.items()
            if name.startswith("rag_")
        }
        ```
        
        Note:
            While the dictionary is a copy, the template strings themselves
            are not deep-copied. This is generally not an issue since strings
            are immutable in Python.
        """
        return self.templates.copy()
    
    def register_template(self, template_name: str, template_text: str) -> bool:
        """
        Register a new template or update an existing one in the template collection.
        
        This method adds a new template to the in-memory template collection or updates
        an existing template if the name already exists. After adding or updating the
        template, it initiates an asynchronous save operation to persist the changes
        to disk, ensuring durability without blocking the calling code.
        
        The template registration process:
        1. Adds or updates the template in the in-memory dictionary
        2. Schedules an asynchronous task to save all templates to disk
        3. Returns a success indicator
        
        This method is the primary way to programmatically add or modify templates
        at runtime, enabling dynamic template management without requiring file
        system access or application restarts.
        
        Template Naming Conventions:
        While not enforced, it's recommended to follow these naming conventions:
        - Use lowercase names with underscores for readability
        - Prefix with category name for organizational purposes (e.g., "rag_query")
        - Use descriptive names that indicate the template's purpose
        
        Args:
            template_name: Name for the template (used for later retrieval)
            template_text: Content of the template with variable placeholders
            
        Returns:
            True if the template was successfully registered, False if an error occurred
            
        Usage Example:
        ```python
        # Register a simple greeting template
        success = prompt_service.register_template(
            "greeting_formal",
            "Dear {title} {last_name},\n\nI hope this message finds you well."
        )
        
        # Register a more complex template with formatting options
        success = prompt_service.register_template(
            "invoice_summary",
            "Invoice #{invoice_id}\nDate: {date}\nTotal: ${amount:.2f}\n\n{items}"
        )
        ```
        
        Note:
            This method handles the persistence automatically through an asynchronous
            save operation. The changes are immediately available in memory but may
            take a moment to be written to disk.
        """
        try:
            self.templates[template_name] = template_text
            
            # Schedule template save
            asyncio.create_task(self._async_save_templates())
            
            return True
        except Exception as e:
            logger.error(f"Error registering template {template_name}: {str(e)}")
            return False
    
    async def _async_save_templates(self) -> None:
        """
        Asynchronously persist templates to disk without blocking the main execution flow.
        
        This method provides a non-blocking way to save templates by delegating to
        the synchronous _save_templates method. It's designed to be called from
        contexts where immediate persistence is desired but blocking operations
        would be problematic, such as during API request handling.
        
        When called:
        - The method executes _save_templates directly rather than creating a task
        - Despite being async, it doesn't actually perform any async operations
        - This approach simplifies the interface while maintaining consistent
          method signatures
        
        Usage Context:
        This method is typically called after template modifications to ensure
        changes are persisted, such as after:
        - Registering new templates
        - Updating existing templates
        - Removing templates
        
        Since saving templates is an I/O-bound operation that involves disk writes,
        this async wrapper helps to:
        - Prevent UI freezing in interactive contexts
        - Avoid blocking the event loop in server contexts
        - Return control quickly to the calling code
        - Ensure template persistence happens reliably in the background
        
        Note:
            While designed for asynchronous usage, this implementation currently
            performs blocking I/O. In a future optimization, this could be changed
            to use true async file I/O using libraries like aiofiles.
        """
        self._save_templates()
    
    def remove_template(self, template_name: str) -> bool:
        """
        Remove a template from the collection if it exists.
        
        This method deletes a template from the in-memory template collection
        and initiates an asynchronous save operation to persist the deletion to disk.
        If the specified template doesn't exist, the method returns False but
        doesn't raise an exception, following a fail-soft approach for easier
        error handling.
        
        The template removal process:
        1. Checks if the template exists in the collection
        2. If found, removes it from the in-memory dictionary
        3. Schedules an asynchronous task to save the updated template collection
        4. Returns a boolean indicating success or failure
        
        This method enables runtime management of templates, allowing obsolete
        or incorrect templates to be removed without requiring file system access
        or application restarts.
        
        Args:
            template_name: Name of the template to remove
            
        Returns:
            True if the template was found and removed, False if it wasn't found
            
        Usage Example:
        ```python
        # Check if removal was successful
        if prompt_service.remove_template("outdated_template"):
            logger.info("Template successfully removed")
        else:
            logger.warning("Template not found, nothing to remove")
            
        # Unconditional removal attempt (ignoring result)
        prompt_service.remove_template("temporary_template")
        ```
        
        Note:
            The template is immediately removed from memory but the disk
            persistence happens asynchronously. If the application crashes
            immediately after this call, the template might still exist in
            the persisted files when the application restarts.
        """
        if template_name in self.templates:
            del self.templates[template_name]
            
            # Schedule template save
            asyncio.create_task(self._async_save_templates())
            
            return True
        return False
    
    def render_template(
        self, 
        template_name: str, 
        variables: Dict[str, Any]
    ) -> Optional[str]:
        """
        Render a prompt template by substituting variables into the template text.
        
        This method performs dynamic template rendering using Python's string formatting
        system. It takes a template by name and a dictionary of variables, substitutes
        the variables into the template placeholders, and returns the fully rendered text
        ready for use with language models or other downstream components.
        
        The rendering process:
        1. Retrieves the template by name from the template repository
        2. Validates that the template exists
        3. Performs variable substitution using Python's str.format() method
        4. Handles any errors that occur during substitution
        
        Template Format:
        Templates use Python's string formatting syntax with curly braces:
        - Simple variables: "Hello, {name}!"
        - Nested attributes: "Author: {book.author}"
        - Formatting options: "Score: {score:.2f}"
        
        Error Handling:
        The method has comprehensive error handling for common issues:
        - Missing templates: Returns None with a warning log
        - Missing variables: Logs the specific missing variable and returns None
        - Format errors: Logs the formatting error and returns None
        
        Variable handling:
        - All variables must be provided in the variables dictionary
        - Variable types should be compatible with string formatting
        - Complex objects can be used if they have string representations
        
        Args:
            template_name: Name of the template to render
            variables: Dictionary mapping variable names to their values
            
        Returns:
            Rendered template text with variables substituted, or None if rendering fails
            
        Example:
            ```python
            # Define a template
            service.register_template(
                "user_profile", 
                "Name: {name}\nAge: {age}\nRole: {role}"
            )
            
            # Render with variables
            profile = service.render_template(
                "user_profile",
                {"name": "Alice", "age": 30, "role": "Administrator"}
            )
            # Result: "Name: Alice\nAge: 30\nRole: Administrator"
            ```
        """
        template = self.get_template(template_name)
        if not template:
            logger.warning(f"Template {template_name} not found")
            return None
        
        try:
            return template.format(**variables)
        except KeyError as e:
            logger.error(f"Missing variable in template {template_name}: {str(e)}")
            return None
        except Exception as e:
            logger.error(f"Error rendering template {template_name}: {str(e)}")
            return None 

# Global instance
_prompt_manager_instance = None
_prompt_manager_lock = threading.Lock()

def get_prompt_manager() -> PromptService:
    """
    Get or create the global thread-safe singleton PromptService instance.
    
    This function implements a thread-safe singleton pattern for the PromptService,
    ensuring that only one instance is created and shared across the entire application,
    regardless of which thread accesses it. It uses a mutex lock to prevent race conditions
    when multiple threads attempt to create the instance simultaneously.
    
    The singleton pattern ensures all components throughout the application use the same
    prompt template repository and caching system, providing consistent behavior across
    different threads and request contexts.
    
    Returns:
        PromptService: The global singleton PromptService instance.
        
    Example:
        ```python
        # This will always return the same instance, even from different threads
        prompt_manager = get_prompt_manager()
        template = prompt_manager.render_template("greeting", {"name": "User"})
        ```
    """
    global _prompt_manager_instance
    if _prompt_manager_instance is None:
        with _prompt_manager_lock:
            if _prompt_manager_instance is None:
                _prompt_manager_instance = PromptService()
    return _prompt_manager_instance

# Example Usage
if __name__ == '__main__':
    from ultimate_mcp_server.utils.logging import setup_logging

    setup_logging(log_level="DEBUG")

    # Create dummy templates dir and file for example
    EXAMPLE_TEMPLATES_DIR = Path("./temp_prompt_templates_example")
    EXAMPLE_TEMPLATES_DIR.mkdir(exist_ok=True)
    (EXAMPLE_TEMPLATES_DIR / "greeting.txt").write_text("Hello, {{name}}! How are you today?")
    (EXAMPLE_TEMPLATES_DIR / "summary.txt").write_text("Summarize the following text:\n\n{{text}}")

    # Set env var to use this temp dir
    os.environ['GATEWAY_PROMPT_TEMPLATES_DIR'] = str(EXAMPLE_TEMPLATES_DIR.resolve())
    os.environ['GATEWAY_FORCE_CONFIG_RELOAD'] = 'true' # Force reload

    try:
        manager = get_prompt_manager()
        print(f"Templates directory: {manager.templates_dir}")
        print(f"Available templates: {manager.list_templates()}")

        greeting_template = manager.get_template('greeting')
        print(f"Greeting Template: {greeting_template}")

        try:
            manager.get_template('non_existent')
        except PromptTemplateError as e:
            print(f"Caught expected error: {e}")

    finally:
        # Clean up
        import shutil
        shutil.rmtree(EXAMPLE_TEMPLATES_DIR)
        print(f"Cleaned up {EXAMPLE_TEMPLATES_DIR}")
        if 'GATEWAY_PROMPT_TEMPLATES_DIR' in os.environ:
            del os.environ['GATEWAY_PROMPT_TEMPLATES_DIR']
        if 'GATEWAY_FORCE_CONFIG_RELOAD' in os.environ:
            del os.environ['GATEWAY_FORCE_CONFIG_RELOAD']
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/tournaments/utils.py:
--------------------------------------------------------------------------------

```python
"""
Utility functions for tournament functionality.
"""

import difflib
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional

from ultimate_mcp_server.core.models.tournament import (
    EvaluatorConfig,
    ModelResponseData,
    TournamentData,
    TournamentStatus,
)

# For file write, if using a tool:
from ultimate_mcp_server.tools.filesystem import write_file

logger = logging.getLogger(__name__)


def create_round_prompt(
    tournament: TournamentData,
    round_num: int,
    previous_round_variant_responses: Dict[
        str, ModelResponseData
    ],  # Now takes full ModelResponseData
    target_model_variant_id: Optional[str] = None,  # For per-variant system prompts etc. (future)
) -> str:
    """Creates the prompt for a specific round."""
    if round_num == 0:
        return tournament.config.prompt

    # --- Build prompt with previous round's responses ---
    base_prompt_header = f"""This is Round {round_num} of an iterative refinement process.
Original Problem:
---
{tournament.config.prompt}
---

In the previous round (Round {round_num - 1}), different model variants produced the following outputs.
Your goal is to synthesize the best aspects, address weaknesses, and produce a superior solution.
"""

    responses_section = []
    for variant_id, resp_data in previous_round_variant_responses.items():
        # For code tournaments, use extracted code if available and valid for prompting.
        # For text, use full response_text.
        content_to_show = ""
        if tournament.config.tournament_type == "code":
            # Prioritize clean extracted code for next round's prompt
            content_to_show = (
                resp_data.extracted_code if resp_data.extracted_code else resp_data.response_text
            )
            if (
                not content_to_show or len(content_to_show.strip()) < 10
            ):  # Heuristic for empty/trivial code
                content_to_show = resp_data.response_text  # Fallback to full text if code is bad
            content_to_show = (
                f"```python\n{content_to_show.strip()}\n```"
                if content_to_show
                else "[No valid code extracted]"
            )
        else:  # Text tournament
            content_to_show = (
                resp_data.response_text if resp_data.response_text else "[No response text]"
            )

        if resp_data.error:
            content_to_show += f"\n[Note: This variant encountered an error: {resp_data.error}]"

        # Show overall score if available
        score_info = ""
        if resp_data.overall_score is not None:
            score_info = f" (Overall Score: {resp_data.overall_score:.2f})"

        responses_section.append(
            f"--- Output from Variant: {variant_id}{score_info} ---\n{content_to_show.strip()}\n"
        )

    # --- Add type-specific instructions ---
    if tournament.config.tournament_type == "code":
        instructions = """
Carefully analyze all previous code solutions. Consider correctness, efficiency, readability, robustness, and how well they integrate good ideas.
Produce a NEW, complete Python implementation that is demonstrably better than any single prior solution.
Provide ONLY the Python code block, enclosed in triple backticks (```python ... ```).
Do not include any explanations outside of code comments.
"""
    else:  # Text tournament
        instructions = """
Analyze each previous response based on the original problem. Consider relevance, accuracy, completeness, clarity, conciseness, and style.
Synthesize the best aspects of ALL responses into a single, improved response.
Your new response should be superior to any individual response from the previous round.
You MAY optionally start your response with a brief (1-2 sentences) explanation of your synthesis choices, enclosed in <thinking>...</thinking> tags.
Then, provide the improved text response itself.
"""

    final_prompt = f"{base_prompt_header}\n{''.join(responses_section)}\n---Refinement Instructions---\n{instructions}"
    return final_prompt.strip()


async def extract_thinking(response_text: str) -> Optional[str]:
    """Extracts <thinking>...</thinking> block. More robust extraction can be added."""
    if not response_text:
        return None
    match = re.search(r"<thinking>(.*?)</thinking>", response_text, re.DOTALL | re.IGNORECASE)
    return match.group(1).strip() if match else None


async def save_model_response_content(
    tournament_storage_path: Path,
    round_num: int,
    variant_id: str,
    response_text: Optional[str],
    extracted_code: Optional[str],
    thinking_process: Optional[str],
    metrics: Dict[str, Any],
    tournament_type: Literal["code", "text"],
) -> Dict[str, Optional[str]]:
    """Saves response text, extracted code, and metadata to files."""
    round_dir = tournament_storage_path / f"round_{round_num}"
    round_dir.mkdir(parents=True, exist_ok=True)

    sanitized_variant_id = re.sub(r"[^a-zA-Z0-9_\-.]", "_", variant_id)
    base_filename = f"{sanitized_variant_id}_r{round_num}"

    # --- Main Markdown Report File ---
    md_content = f"# Response: {variant_id} - Round {round_num}\n\n"
    md_content += "## Metrics\n"
    for k, v in metrics.items():
        if isinstance(v, float):
            md_content += f"- **{k.replace('_', ' ').title()}:** {v:.4f}\n"
        else:
            md_content += f"- **{k.replace('_', ' ').title()}:** {v}\n"

    if thinking_process:
        md_content += f"\n## Thinking Process\n```\n{thinking_process}\n```\n"

    md_content += f"\n## Full Response Text\n```\n{response_text or '[No response text]'}\n```\n"

    if tournament_type == "code" and extracted_code:
        md_content += f"\n## Extracted Code\n```python\n{extracted_code}\n```\n"

    md_file_path = round_dir / f"{base_filename}_report.md"
    md_file_path.write_text(md_content, encoding="utf-8")

    saved_paths = {"markdown_file": str(md_file_path), "code_file": None}

    # --- Save Raw Extracted Code (if any) ---
    if tournament_type == "code" and extracted_code:
        code_file_path = round_dir / f"{base_filename}.py"
        code_file_path.write_text(extracted_code, encoding="utf-8")
        saved_paths["code_file"] = str(code_file_path)

    logger.debug(f"Saved response artifacts for {variant_id} to {round_dir}")
    return saved_paths


def generate_comparison_file_content(tournament: TournamentData, round_num: int) -> Optional[str]:
    if round_num < 0 or round_num >= len(tournament.rounds_results):
        return None
    round_result = tournament.rounds_results[round_num]
    if not round_result.responses:
        return None

    content = f"# Tournament Comparison Report - Round {round_num}\n\n"
    content += f"**Tournament:** {tournament.name} (ID: {tournament.tournament_id})\n"
    content += f"**Type:** {tournament.config.tournament_type}\n"
    content += f"**Generated:** {datetime.now(timezone.utc).isoformat()}\n\n"

    content += "## Round Summary & Scores\n"
    content += (
        "| Variant ID | Overall Score | Key Metrics (e.g., Cost, Latency) | Evaluator Scores |\n"
    )
    content += (
        "|------------|---------------|-----------------------------------|------------------|\n"
    )

    sorted_responses = sorted(
        round_result.responses.items(),
        key=lambda item: item[1].overall_score if item[1].overall_score is not None else -1,
        reverse=True,
    )

    for variant_id, resp_data in sorted_responses:
        score_str = (
            f"{resp_data.overall_score:.2f}" if resp_data.overall_score is not None else "N/A"
        )
        cost = resp_data.metrics.get("cost", 0.0)
        latency = resp_data.metrics.get("latency_ms", "N/A")
        key_metrics = f"Cost: ${cost:.4f}, Latency: {latency}ms"

        eval_scores_str = (
            "; ".join(
                [
                    f"{eval_id}: {s_data.get('score', 'N/A')}"
                    for eval_id, s_data in resp_data.scores.items()
                ]
            )
            if resp_data.scores
            else "N/A"
        )

        content += f"| {variant_id} | {score_str} | {key_metrics} | {eval_scores_str} |\n"
    content += "\n"

    # --- Add Diffs (Proposal 6) ---
    if round_num > 0 and tournament.config.tournament_type == "code":
        content += "## Code Diffs from Previous Best (if applicable)\n"
        prev_round_best_code = None
        # Find best code from previous round (simplistic: first non-error, highest score)
        if round_num - 1 >= 0:
            prev_round_data = tournament.rounds_results[round_num - 1]
            best_prev_resp = max(
                filter(
                    lambda r: r.extracted_code and r.overall_score is not None,
                    prev_round_data.responses.values(),
                ),
                key=lambda r: r.overall_score,
                default=None,
            )
            if best_prev_resp:
                prev_round_best_code = best_prev_resp.extracted_code

        current_best_resp = max(
            filter(
                lambda r: r.extracted_code and r.overall_score is not None,
                round_result.responses.values(),
            ),
            key=lambda r: r.overall_score,
            default=None,
        )
        current_best_code = current_best_resp.extracted_code if current_best_resp else None

        if prev_round_best_code and current_best_code:
            diff = difflib.unified_diff(
                prev_round_best_code.splitlines(keepends=True),
                current_best_code.splitlines(keepends=True),
                fromfile=f"round_{round_num - 1}_best.py",
                tofile=f"round_{round_num}_best.py",
                lineterm="",
            )
            content += f"### Diff: Best of Round {round_num - 1} vs Best of Round {round_num}\n"
            content += "```diff\n"
            content += "".join(diff)
            content += "\n```\n\n"
        elif current_best_code:
            content += "Could not determine previous round's best code for diffing, or this is the first round with code.\n"
        # TODO: Add HTML diff for text tournaments if a library is available.

    content += "## Detailed Variant Responses\n"
    for variant_id, resp_data in sorted_responses:
        content += f"### Variant: {variant_id}\n"
        content += f"- **Original Model:** {resp_data.model_id_original}\n"
        content += (
            f"- **Overall Score:** {resp_data.overall_score:.2f}\n"
            if resp_data.overall_score is not None
            else "- **Overall Score:** N/A\n"
        )
        content += "#### Metrics:\n"
        for k, v in resp_data.metrics.items():
            content += f"  - {k}: {v}\n"
        content += "#### Evaluator Scores:\n"
        if resp_data.scores:
            for eval_id, s_data in resp_data.scores.items():
                content += f"  - **{eval_id}**: Score: {s_data.get('score', 'N/A')}\n    - Details: {s_data.get('details', 'N/A')[:200]}...\n"  # Truncate details
        else:
            content += "  - No scores available.\n"

        if resp_data.thinking_process:
            content += f"#### Thinking Process:\n```\n{resp_data.thinking_process}\n```\n"

        content_key = (
            "Extracted Code" if tournament.config.tournament_type == "code" else "Response Text"
        )
        code_lang_hint = "python" if tournament.config.tournament_type == "code" else ""
        actual_content = (
            resp_data.extracted_code
            if tournament.config.tournament_type == "code" and resp_data.extracted_code
            else resp_data.response_text
        )

        content += f"#### {content_key}:\n```{code_lang_hint}\n{actual_content or '[Content not available]'}\n```\n"
        if resp_data.response_file_path:  # Link to the full report for this variant
            # Make path relative to tournament storage root for portability
            try:
                tournament_root = Path(tournament.storage_path)
                relative_path = Path(resp_data.response_file_path).relative_to(
                    tournament_root.parent
                )  # one level up for `round_X/file`
                content += f"\n[View Full Variant Report](./{relative_path})\n"
            except ValueError:  # If not relative (e.g. absolute path)
                content += f"\n[View Full Variant Report]({resp_data.response_file_path})\n"

        content += "\n---\n"
    return content


def generate_leaderboard_file_content(tournament: TournamentData, round_num: int) -> Optional[str]:
    """Generates a leaderboard summary for the current round."""
    if round_num < 0 or round_num >= len(tournament.rounds_results):
        return None
    round_result = tournament.rounds_results[round_num]
    if not round_result.responses:
        return None

    content = f"# Leaderboard - Round {round_num}\n\n"
    content += f"**Tournament:** {tournament.name}\n"
    content += f"**Primary Metric(s):** {', '.join([e.evaluator_id for e in tournament.config.evaluators if e.primary_metric]) or 'Overall Score'}\n\n"

    content += "| Rank | Variant ID | Overall Score | Primary Metric Score(s) |\n"
    content += "|------|------------|---------------|-------------------------|\n"

    # Sort by overall_score, then by primary metric if tied (more complex sorting can be added)
    sorted_responses = sorted(
        round_result.responses.values(),
        key=lambda r: r.overall_score if r.overall_score is not None else -float("inf"),
        reverse=True,
    )

    for i, resp_data in enumerate(sorted_responses):
        rank = i + 1
        score_str = (
            f"{resp_data.overall_score:.2f}" if resp_data.overall_score is not None else "N/A"
        )

        primary_scores_list = []
        for eval_cfg in tournament.config.evaluators:
            if eval_cfg.primary_metric and eval_cfg.evaluator_id in resp_data.scores:
                primary_scores_list.append(
                    f"{eval_cfg.evaluator_id}: {resp_data.scores[eval_cfg.evaluator_id].get('score', 'N/A')}"
                )
        primary_metrics_str = "; ".join(primary_scores_list) or "N/A"

        content += (
            f"| {rank} | {resp_data.model_id_variant} | {score_str} | {primary_metrics_str} |\n"
        )

    return content


def calculate_weighted_score(
    scores: Dict[str, Dict[str, Any]], evaluator_configs: List[EvaluatorConfig]
) -> Optional[float]:
    """Calculates a single weighted overall score from multiple evaluator scores."""
    if not scores or not evaluator_configs:
        return None

    total_score = 0.0
    total_weight = 0.0

    for eval_cfg in evaluator_configs:
        eval_id = eval_cfg.evaluator_id
        if eval_id in scores:
            score_data = scores[eval_id]
            # Assuming 'score' is the primary numerical output of an evaluator
            numerical_score = score_data.get("score")
            if isinstance(numerical_score, (int, float)):
                total_score += numerical_score * eval_cfg.weight
                total_weight += eval_cfg.weight
            else:
                logger.warning(
                    f"Evaluator '{eval_id}' provided non-numeric score: {numerical_score}"
                )

    if total_weight == 0:
        # If no weights or no valid scores, average non-weighted if any scores present
        valid_scores = [
            s.get("score") for s in scores.values() if isinstance(s.get("score"), (int, float))
        ]
        return sum(valid_scores) / len(valid_scores) if valid_scores else None

    return total_score / total_weight


def update_overall_best_response(tournament: TournamentData):
    """Identifies and updates the tournament's overall best response across all completed rounds."""
    current_best_score = -float("inf")
    if (
        tournament.overall_best_response
        and tournament.overall_best_response.overall_score is not None
    ):
        current_best_score = tournament.overall_best_response.overall_score

    new_best_found = False
    for round_result in tournament.rounds_results:
        if round_result.status == TournamentStatus.COMPLETED:
            for _, resp_data in round_result.responses.items():
                if resp_data.overall_score is not None and not resp_data.error:
                    if resp_data.overall_score > current_best_score:
                        tournament.overall_best_response = resp_data
                        current_best_score = resp_data.overall_score
                        new_best_found = True

    if new_best_found:
        logger.info(
            f"New overall best response for tournament '{tournament.name}' found: {tournament.overall_best_response.model_id_variant} with score {current_best_score:.2f}"
        )


def calculate_code_metrics(code: Optional[str]) -> dict:
    """
    Calculates basic metrics about a code string.
    """
    if not code:
        return {
            "code_lines": 0,
            "code_size_kb": 0.0,
            "function_count": 0,
            "class_count": 0,
            "import_count": 0,
        }

    code_lines = code.count("\n") + 1
    code_size_bytes = len(code.encode("utf-8"))
    code_size_kb = round(code_size_bytes / 1024, 2)
    function_count = len(re.findall(r"\bdef\s+\w+", code))
    class_count = len(re.findall(r"\bclass\s+\w+", code))
    import_count = len(re.findall(r"^import\s+|\bfrom\s+", code, re.MULTILINE))

    return {
        "code_lines": code_lines,
        "code_size_kb": code_size_kb,
        "function_count": function_count,
        "class_count": class_count,
        "import_count": import_count,
    }


def generate_comparison_file(tournament: TournamentData, round_num: int) -> Optional[str]:
    """Generate a markdown comparison file for the given round.

    Args:
        tournament: The tournament data.
        round_num: The round number to generate the comparison for.

    Returns:
        The markdown content string, or None if data is missing.
    """
    if round_num < 0 or round_num >= len(tournament.rounds_results):
        logger.warning(f"Cannot generate comparison for invalid round {round_num}")
        return None

    round_result = tournament.rounds_results[round_num]
    if not round_result.responses:
        logger.warning(f"Cannot generate comparison for round {round_num}, no responses found.")
        return None

    previous_round = tournament.rounds_results[round_num - 1] if round_num > 0 else None
    is_code_tournament = tournament.config.tournament_type == "code"

    # Start with a comprehensive header
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    comparison_content = f"# Tournament Comparison - Round {round_num}\n\n"
    comparison_content += f"**Generated:** {timestamp}\n"
    comparison_content += f"**Tournament ID:** {tournament.tournament_id}\n"
    comparison_content += f"**Tournament Name:** {tournament.config.name}\n"
    comparison_content += f"**Type:** {tournament.config.tournament_type}\n"
    comparison_content += f"**Current Round:** {round_num} of {tournament.config.rounds}\n"
    comparison_content += (
        f"**Models:** {', '.join(model.model_id for model in tournament.config.models)}\n\n"
    )

    # Add original prompt section
    if round_num == 0:
        comparison_content += f"## Original Prompt\n\n```\n{tournament.config.prompt}\n```\n\n"
    else:
        # For later rounds, show what was provided to the models
        comparison_content += f"## Round {round_num} Prompt\n\n"
        # Get a sample prompt - all models get the same prompt in a round
        sample_prompt = create_round_prompt(tournament, round_num)
        comparison_content += f"```\n{sample_prompt[:500]}...\n```\n\n"

    # Summarize overall metrics
    comparison_content += "## Summary Metrics\n\n"
    comparison_content += "| Model | Tokens In | Tokens Out | Cost | Latency (ms) |\n"
    comparison_content += "|-------|-----------|------------|------|-------------|\n"

    for model_id, response_data in sorted(round_result.responses.items()):
        metrics = response_data.metrics
        tokens_in = metrics.get("input_tokens", "N/A")
        tokens_out = metrics.get("output_tokens", "N/A")
        cost = metrics.get("cost", "N/A")
        latency = metrics.get("latency_ms", "N/A")

        display_model_id = model_id.split(":")[-1] if ":" in model_id else model_id
        cost_display = f"${cost:.6f}" if isinstance(cost, (int, float)) else cost

        comparison_content += (
            f"| {display_model_id} | {tokens_in} | {tokens_out} | {cost_display} | {latency} |\n"
        )

    comparison_content += "\n## Detailed Model Responses\n\n"

    for model_id, response_data in sorted(round_result.responses.items()):
        metrics = response_data.metrics
        display_model_id = model_id.split(":")[-1] if ":" in model_id else model_id

        comparison_content += f"### {display_model_id}\n\n"

        # Display detailed metrics as a subsection
        comparison_content += "#### Metrics\n\n"
        tokens_in = metrics.get("input_tokens", "N/A")
        tokens_out = metrics.get("output_tokens", "N/A")
        total_tokens = metrics.get("total_tokens", "N/A")
        cost = metrics.get("cost", "N/A")
        latency = metrics.get("latency_ms", "N/A")

        comparison_content += (
            f"- **Tokens:** {tokens_in} in, {tokens_out} out, {total_tokens} total\n"
        )
        if isinstance(cost, (int, float)):
            comparison_content += f"- **Cost:** ${cost:.6f}\n"
        else:
            comparison_content += f"- **Cost:** {cost}\n"
        comparison_content += f"- **Latency:** {latency}ms\n"

        # Code-specific metrics
        if is_code_tournament:
            code_lines = metrics.get("code_lines", "N/A")
            code_size = metrics.get("code_size_kb", "N/A")
            comparison_content += f"- **Code Stats:** {code_lines} lines, {code_size} KB\n"

        comparison_content += "\n"

        # Display thinking process if available
        if response_data.thinking_process:
            comparison_content += "#### Thinking Process\n\n"
            comparison_content += f"```\n{response_data.thinking_process}\n```\n\n"

        # Display response content
        if is_code_tournament:
            comparison_content += "#### Extracted Code\n\n"
            comparison_content += "```python\n"
            comparison_content += response_data.extracted_code or "# No code extracted"
            comparison_content += "\n```\n\n"
        else:
            # For text tournaments, display the raw response
            comparison_content += "#### Response Text\n\n"
            comparison_content += "```\n"
            comparison_content += response_data.response_text or "[No response text]"
            comparison_content += "\n```\n\n"

        # Add link to the full response file
        if response_data.response_file_path:
            comparison_content += (
                f"[View full response file]({response_data.response_file_path})\n\n"
            )

    # Add a section comparing changes from previous round if this isn't round 0
    if previous_round and previous_round.responses:
        comparison_content += "## Changes from Previous Round\n\n"
        for model_id, response_data in sorted(round_result.responses.items()):
            if model_id in previous_round.responses:
                display_model_id = model_id.split(":")[-1] if ":" in model_id else model_id
                comparison_content += f"### {display_model_id}\n\n"

                # Compare metrics
                current_metrics = response_data.metrics
                previous_metrics = previous_round.responses[model_id].metrics

                current_tokens_out = current_metrics.get("output_tokens", 0)
                previous_tokens_out = previous_metrics.get("output_tokens", 0)
                token_change = (
                    current_tokens_out - previous_tokens_out
                    if isinstance(current_tokens_out, (int, float))
                    and isinstance(previous_tokens_out, (int, float))
                    else "N/A"
                )

                comparison_content += f"- **Token Change:** {token_change} tokens\n"

                # Note: Here you could add more sophisticated text comparison/diff
                comparison_content += "- Review the full responses to see detailed changes\n\n"

    return comparison_content.strip()


async def save_model_response(
    tournament: TournamentData,
    round_num: int,
    model_id: str,
    response_text: str,
    thinking: Optional[str] = None,
    timestamp: Optional[str] = None,
) -> str:
    """Save model response to a file using standardized filesystem tools.

    Args:
        tournament: Tournament data
        round_num: Round number
        model_id: Model ID that generated this response
        response_text: The text response to save
        thinking: Optional thinking process from the model
        timestamp: Optional timestamp (defaults to current time if not provided)

    Returns:
        Path to saved response file
    """
    if not timestamp:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Get path to tournament storage directory
    storage_dir = Path(tournament.storage_path)
    round_dir = storage_dir / f"round_{round_num}"
    round_dir.mkdir(exist_ok=True)

    # Create a safe filename from model ID
    safe_model_id = model_id.replace(":", "_").replace("/", "_")
    response_file = round_dir / f"{safe_model_id}_response.md"

    # Construct the markdown file with basic metadata header
    content = f"""# Response from {model_id}

## Metadata
- Tournament: {tournament.name}
- Round: {round_num}
- Model: {model_id}
- Timestamp: {timestamp}

## Response:

{response_text}
"""

    # Add thinking process if available
    if thinking:
        content += f"\n\n## Thinking Process:\n\n{thinking}\n"

    # Use the standard filesystem write tool
    try:
        # Properly use the async write_file tool
        result = await write_file(path=str(response_file), content=content)

        if not result.get("success", False):
            logger.warning(f"Standard write_file tool reported failure: {result.get('error')}")
            # Fall back to direct write
            with open(response_file, "w", encoding="utf-8") as f:
                f.write(content)
    except Exception as e:
        logger.error(f"Error using standardized file writer: {e}. Using direct file write.")
        # Fall back to direct write in case of errors
        with open(response_file, "w", encoding="utf-8") as f:
            f.write(content)

    return str(response_file)


def get_round_dir(tournament: TournamentData, round_num: int) -> Path:
    """Get the directory path for a specific tournament round.

    Args:
        tournament: The tournament data.
        round_num: The round number.

    Returns:
        Path to the round directory.
    """
    tournament_dir = Path(tournament.storage_path)
    round_dir = tournament_dir / f"round_{round_num}"
    return round_dir


def get_word_count(text: str) -> int:
    """Get the word count of a text string.

    Args:
        text: The text to count words in.

    Returns:
        The number of words.
    """
    if not text:
        return 0
    return len(text.split())


def generate_synthesis_prompt(
    tournament: TournamentData, previous_responses: Dict[str, str]
) -> str:
    """Generate the prompt for the synthesis round.

    Args:
        tournament: The tournament data
        previous_responses: A dictionary mapping model IDs to their responses

    Returns:
        The synthesis prompt for the next round.
    """
    # Letter used for referring to models to avoid bias
    letters = ["A", "B", "C", "D", "E", "F", "G", "H"]

    # Start with a base prompt instructing the model what to do
    prompt = f"""# {tournament.name} - Synthesis Round

Your task is to create an improved version based on the responses from multiple models.

Original task:
{tournament.config.prompt}

Below are responses from different models. Review them and create a superior response 
that combines the strengths of each model's approach while addressing any weaknesses.

"""

    # Add each model's response
    for i, (model_id, response) in enumerate(previous_responses.items()):
        if i < len(letters):
            letter = letters[i]
            model_name = model_id.split(":")[-1] if ":" in model_id else model_id

            prompt += f"""
## Model {letter} ({model_name}) Response:

{response}

"""

    # Add synthesis instructions
    prompt += """
# Your Task

Based on the responses above:

1. Create a single, unified response that represents the best synthesis of the information
2. Incorporate the strengths of each model's approach
3. Improve upon any weaknesses or omissions
4. Your response should be more comprehensive, accurate, and well-structured than any individual response

## Thinking Process
Start by briefly analyzing the strengths and weaknesses of each model's response, then explain your synthesis approach.

Example: "I synthesized the structured approach of Model A with the comprehensive detail from Model B, ensuring..."

"""

    return prompt

```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/async_utils.py:
--------------------------------------------------------------------------------

```python
"""Async utilities for Ultimate MCP Server."""
import asyncio
import functools
import time
from contextlib import asynccontextmanager
from typing import Any, Callable, List, Optional, Type, TypeVar, Union

from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)

# Type definitions
T = TypeVar('T')
AsyncCallable = Callable[..., Any]


class RateLimiter:
    """
    Rate limiter for controlling request rates to external services.
    
    This class implements a token bucket algorithm to enforce API rate limits,
    preventing too many requests in a short period of time. It's designed for use
    in asynchronous code and will automatically pause execution when limits are reached.
    
    The rate limiter tracks the timestamps of recent calls and blocks new calls
    if they would exceed the configured rate limit. When the limit is reached,
    the acquire() method blocks until enough time has passed to allow another call.
    
    This is useful for:
    - Respecting API rate limits of external services
    - Preventing service overload in high-concurrency applications
    - Implementing polite crawling/scraping behavior
    - Managing resource access in distributed systems
    
    Usage example:
        ```python
        # Create a rate limiter that allows 5 calls per second
        limiter = RateLimiter(max_calls=5, period=1.0)
        
        async def make_api_call():
            # This will automatically wait if we're over the limit
            await limiter.acquire()
            # Now make the actual API call...
            return await actual_api_call()
        ```
    """
    
    def __init__(self, max_calls: int, period: float):
        """
        Initialize the rate limiter.
        
        Args:
            max_calls: Maximum number of calls allowed within the specified period.
                       For example, 100 calls per period.
            period: Time period in seconds over which the max_calls limit applies.
                    For example, 60.0 for a per-minute rate limit.
        """
        self.max_calls = max_calls
        self.period = period
        self.calls = []
        self.lock = asyncio.Lock()
        
    async def acquire(self):
        """
        Acquire permission to make a call, waiting if necessary.
        
        This method blocks until a call is allowed based on the rate limit. When the
        limit has been reached, it will sleep until enough time has passed to allow
        another call, respecting the configured max_calls within the period.
        
        The method ensures thread-safety through an asyncio lock, making it safe to use
        across multiple tasks. It also handles the case where waiting for rate limit
        permissions overlaps with multiple concurrent requests.
        
        Returns:
            None. When this method returns, the caller is allowed to proceed.
            
        Raises:
            asyncio.CancelledError: If the task is cancelled while waiting.
        """
        async with self.lock:
            now = time.time()
            
            # Remove expired timestamps
            self.calls = [t for t in self.calls if now - t < self.period]
            
            # Check if we're under the limit
            if len(self.calls) < self.max_calls:
                self.calls.append(now)
                return
                
            # Calculate wait time
            wait_time = self.period - (now - self.calls[0])
            if wait_time > 0:
                # Release lock while waiting
                self.lock.release()
                try:
                    logger.debug(
                        f"Rate limit reached, waiting {wait_time:.2f}s",
                        emoji_key="warning"
                    )
                    await asyncio.sleep(wait_time)
                finally:
                    # Reacquire lock
                    await self.lock.acquire()
                
                # Retry after waiting
                await self.acquire()
            else:
                # Oldest call just expired, record new call
                self.calls = self.calls[1:] + [now]


@asynccontextmanager
async def timed_context(name: str):
    """
    Async context manager for measuring and logging operation duration.
    
    This utility provides a simple way to time asynchronous operations and log their
    duration upon completion. It's useful for performance monitoring, debugging,
    and identifying bottlenecks in asynchronous code.
    
    The context manager:
    1. Records the start time when entering the context
    2. Allows the wrapped code to execute
    3. Calculates the elapsed time when the context exits
    4. Logs the operation name and duration with appropriate formatting
    
    This works with any async code, including API calls, database operations,
    file I/O, or computational tasks.
    
    Args:
        name: Descriptive name of the operation being timed. This name will appear
              in log messages for easy identification.
        
    Yields:
        None - This context manager doesn't provide any additional context variables.
        
    Example usage:
        ```python
        async def fetch_user_data(user_id):
            async with timed_context("Fetch user data"):
                return await database.get_user(user_id)
                
        async def process_document(doc_id):
            async with timed_context("Document processing"):
                # Multiple operations can be timed together
                doc = await fetch_document(doc_id)
                results = await analyze_document(doc)
                return results
        ```
    """
    start_time = time.time()
    try:
        yield
    finally:
        duration = time.time() - start_time
        logger.debug(
            f"{name} completed in {duration:.3f}s",
            emoji_key="time",
            time=duration
        )


async def gather_with_concurrency(
    n: int,
    *tasks,
    return_exceptions: bool = False
) -> List[Any]:
    """
    Run multiple async tasks with a controlled concurrency limit.
    
    This function provides a way to execute multiple asynchronous tasks in parallel
    while ensuring that no more than a specified number of tasks run simultaneously.
    It's similar to asyncio.gather() but with an added concurrency control mechanism.
    
    This is particularly valuable for:
    - Preventing resource exhaustion when processing many tasks
    - Respecting service capacity limitations
    - Managing memory usage by limiting parallel execution
    - Implementing "worker pool" patterns in async code
    
    The function uses a semaphore internally to control the number of concurrently
    executing tasks. Tasks beyond the concurrency limit will wait until a running
    task completes and releases the semaphore.
    
    Args:
        n: Maximum number of tasks to run concurrently. This controls resource usage
           and prevents overloading the system or external services.
        *tasks: Any number of awaitable coroutine objects to execute.
        return_exceptions: If True, exceptions are returned as results rather than being
                          raised. If False, the first raised exception will propagate.
                          This matches the behavior of asyncio.gather().
        
    Returns:
        List of task results in the same order as the tasks were provided, regardless
        of the order in which they completed.
        
    Example usage:
        ```python
        # Process a list of URLs with at most 5 concurrent requests
        urls = ["https://example.com/1", "https://example.com/2", ...]
        tasks = [fetch_url(url) for url in urls]
        results = await gather_with_concurrency(5, *tasks)
        
        # With exception handling
        try:
            results = await gather_with_concurrency(10, *tasks, return_exceptions=False)
        except Exception as e:
            # Handle first exception
            pass
        
        # Or capture exceptions in results
        results = await gather_with_concurrency(10, *tasks, return_exceptions=True)
        for result in results:
            if isinstance(result, Exception):
                # Handle this exception
                pass
        ```
    """
    semaphore = asyncio.Semaphore(n)
    
    async def run_task_with_semaphore(task):
        async with semaphore:
            return await task
            
    return await asyncio.gather(
        *(run_task_with_semaphore(task) for task in tasks),
        return_exceptions=return_exceptions
    )


async def run_with_timeout(
    coro: Any,
    timeout: float,
    default: Optional[T] = None,
    log_timeout: bool = True
) -> Union[Any, T]:
    """
    Run an async coroutine with a timeout, returning a default value if time expires.
    
    This utility function executes an async operation with a strict time limit and
    provides graceful handling of timeouts. If the operation completes within the
    specified timeout, its result is returned normally. If the timeout is exceeded,
    the specified default value is returned instead, and the operation is cancelled.
    
    This functionality is particularly useful for:
    - Making external API calls that might hang or take too long
    - Implementing responsive UIs that can't wait indefinitely
    - Handling potentially slow operations in time-sensitive contexts
    - Creating fallback behavior for unreliable services
    
    The function uses asyncio.wait_for internally and properly handles the
    TimeoutError, converting it to a controlled return of the default value.
    
    Args:
        coro: The coroutine (awaitable) to execute with a timeout. This can be
              any async function call or awaitable object.
        timeout: Maximum execution time in seconds before timing out. Must be a
                positive number.
        default: The value to return if the operation times out. Defaults to None.
                This can be any type, and will be returned exactly as provided.
        log_timeout: Whether to log a warning message when a timeout occurs. Set
                    to False to suppress the warning. Default is True.
        
    Returns:
        The result of the coroutine if it completes within the timeout period,
        otherwise the specified default value.
        
    Raises:
        Exception: Any exception raised by the coroutine other than TimeoutError
                  will be propagated to the caller.
        
    Example usage:
        ```python
        # Basic usage with default fallback
        result = await run_with_timeout(
            fetch_data_from_api("https://example.com/data"),
            timeout=5.0,
            default={"status": "timeout", "data": None}
        )
        
        # Without logging timeouts
        result = await run_with_timeout(
            slow_operation(),
            timeout=10.0,
            default=None,
            log_timeout=False
        )
        
        # With type checking (using TypeVar)
        data: Optional[List[str]] = await run_with_timeout(
            get_string_list(),
            timeout=3.0,
            default=None
        )
        ```
    """
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except asyncio.TimeoutError:
        if log_timeout:
            logger.warning(
                f"Operation timed out after {timeout}s",
                emoji_key="time",
                time=timeout
            )
        return default


def async_retry(
    max_retries: int = 3,
    retry_delay: float = 1.0,
    backoff_factor: float = 2.0,
    retry_exceptions: Optional[List[Type[Exception]]] = None,
    max_backoff: Optional[float] = None
):
    """
    Decorator for automatically retrying async functions when they raise exceptions.
    
    This decorator implements a configurable exponential backoff retry strategy for
    asynchronous functions. When the decorated function raises a specified exception,
    the decorator will automatically wait and retry the operation, with an increasing
    delay between attempts.
    
    The retry behavior includes:
    - A configurable number of maximum retry attempts
    - Initial delay between retries
    - Exponential backoff (each retry waits longer than the previous one)
    - Optional filtering of which exception types trigger retries
    - Optional maximum backoff time to cap the exponential growth
    - Detailed logging of retry attempts and final failures
    
    This pattern is especially useful for:
    - Network operations that may fail temporarily
    - API calls subject to rate limiting or intermittent failures
    - Database operations that may encounter transient errors
    - Any resource access that may be temporarily unavailable
    
    Args:
        max_retries: Maximum number of retry attempts after the initial call
                    (default: 3). Total attempts will be max_retries + 1.
        retry_delay: Initial delay between retries in seconds (default: 1.0).
                    This is the wait time after the first failure.
        backoff_factor: Multiplier applied to delay between retries (default: 2.0).
                       Each retry will wait backoff_factor times longer than the previous.
        retry_exceptions: List of exception types that should trigger a retry.
                         If None (default), all exceptions trigger retries.
        max_backoff: Maximum delay between retries in seconds, regardless of the
                    backoff calculation. None (default) means no maximum.
        
    Returns:
        A decorator function that wraps the target async function with retry logic.
        
    Example usage:
        ```python
        # Basic usage - retry any exception up to 3 times
        @async_retry()
        async def fetch_data(url):
            return await make_request(url)
            
        # Custom configuration - retry specific exceptions with longer delays
        @async_retry(
            max_retries=5,
            retry_delay=2.0,
            backoff_factor=3.0,
            retry_exceptions=[ConnectionError, TimeoutError],
            max_backoff=30.0
        )
        async def send_to_service(data):
            return await service.process(data)
        ```
        
    Note:
        The decorated function's signature and return type are preserved, making this
        decorator compatible with static type checking.
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            exceptions = []
            delay = retry_delay
            
            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    # Check if we should retry this exception
                    if retry_exceptions and not any(
                        isinstance(e, exc_type) for exc_type in retry_exceptions
                    ):
                        raise
                        
                    exceptions.append(e)
                    
                    # If this was the last attempt, reraise
                    if attempt >= max_retries:
                        if len(exceptions) > 1:
                            logger.error(
                                f"Function {func.__name__} failed after {max_retries+1} attempts",
                                emoji_key="error",
                                attempts=max_retries+1
                            )
                        raise
                    
                    # Log retry
                    logger.warning(
                        f"Retrying {func.__name__} after error: {str(e)} "
                        f"(attempt {attempt+1}/{max_retries+1})",
                        emoji_key="warning",
                        attempt=attempt+1,
                        max_attempts=max_retries+1,
                        error=str(e)
                    )
                    
                    # Wait before retrying
                    await asyncio.sleep(delay)
                    
                    # Increase delay for next retry
                    delay *= backoff_factor
                    if max_backoff:
                        delay = min(delay, max_backoff)
            
            # Shouldn't get here, but just in case
            raise exceptions[-1]
                
        return wrapper
    return decorator


async def map_async(
    func: Callable[[Any], Any],
    items: List[Any],
    concurrency: int = 10,
    chunk_size: Optional[int] = None
) -> List[Any]:
    """Map a function over items with limited concurrency.
    
    This utility provides efficient parallel processing of a list of items while controlling the
    maximum number of concurrent operations. It applies the provided async function to each item 
    in the list, respecting the concurrency limit set by the semaphore.
    
    The function supports two processing modes:
    1. Chunked processing: When chunk_size is provided, items are processed in batches to improve
       memory efficiency when dealing with large lists.
    2. Full parallel processing: When chunk_size is omitted, all items are processed in parallel
       but still limited by the concurrency parameter.
    
    Args:
        func: Async function to apply to each item. This function should accept a single item
              and return a result.
        items: List of items to process. Each item will be passed individually to the function.
        concurrency: Maximum number of concurrent tasks allowed. This controls the load on system
                     resources. Default is 10.
        chunk_size: Optional batch size for processing. If provided, items are processed in chunks
                    of this size to limit memory usage. If None, all items are processed at once
                    (but still constrained by concurrency).
        
    Returns:
        List of results from applying the function to each item, in the same order as the input items.
        
    Examples:
        ```python
        # Define an async function to process an item
        async def process_item(item):
            await asyncio.sleep(0.1)  # Simulate I/O or processing time
            return item * 2
            
        # Process a list of 100 items with max 5 concurrent tasks
        items = list(range(100))
        results = await map_async(process_item, items, concurrency=5)
        
        # Process a large list in chunks to manage memory usage
        large_list = list(range(10000))
        results = await map_async(process_item, large_list, concurrency=10, chunk_size=500)
        ```
        
    Notes:
        - If the items list is empty, an empty list is returned immediately.
        - The function preserves the original order of items in the result list.
        - For CPU-bound tasks, consider using ProcessPoolExecutor with asyncio.to_thread
          instead of this function, as this is optimized for I/O-bound tasks.
    """
    if not items:
        return []
    
    # Create semaphore for concurrency control
    semaphore = asyncio.Semaphore(concurrency)
    
    # Define task function
    async def process_item(item):
        async with semaphore:
            return await func(item)
    
    # If using chunks, process in batches
    if chunk_size:
        results = []
        # Process in chunks for better memory management
        for i in range(0, len(items), chunk_size):
            chunk = items[i:i+chunk_size]
            chunk_results = await asyncio.gather(
                *(process_item(item) for item in chunk)
            )
            results.extend(chunk_results)
        return results
    else:
        # Process all at once with concurrency limit
        return await asyncio.gather(
            *(process_item(item) for item in items)
        )


class AsyncBatchProcessor:
    """
    Processor for efficiently batching async operations to optimize throughput.
    
    This class provides a framework for batch processing asynchronous operations,
    which is useful for optimizing I/O-bound tasks such as database writes, API calls,
    or other operations where batching improves efficiency. It automatically collects
    individual items and processes them in batches when:
    
    1. The batch reaches a specified size (controlled by batch_size)
    2. A specified time interval elapses (controlled by flush_interval)
    3. A manual flush is requested
    
    The processor also controls concurrency, allowing multiple batches to be processed
    simultaneously while limiting the maximum number of concurrent operations to prevent
    overwhelming system resources or external services.
    
    Common use cases include:
    - Batching database inserts or updates for better throughput
    - Aggregating API calls to services that support bulk operations
    - Optimizing data processing pipelines with chunked operations
    - Building high-performance ETL (Extract, Transform, Load) processes
    
    Usage involves extending this class to implement custom batch processing logic
    by overriding the _process_batch method with specific implementation details.
    
    This class implements the async context manager protocol, allowing for use in
    async with statements to ensure proper resource cleanup.
    """
    
    def __init__(
        self,
        batch_size: int = 100,
        max_concurrency: int = 5,
        flush_interval: Optional[float] = None
    ):
        """
        Initialize the batch processor with configuration settings.
        
        Args:
            batch_size: Maximum number of items to collect before processing a batch.
                       Higher values generally improve throughput at the cost of increased
                       latency and memory usage. Default is 100.
            max_concurrency: Maximum number of concurrent batch operations allowed.
                            This prevents overwhelming external services or system
                            resources. Default is 5 concurrent batch operations.
            flush_interval: Optional automatic flush interval in seconds. When specified,
                           any collected items will be flushed after this interval,
                           regardless of whether the batch_size has been reached.
                           Set to None (default) to disable automatic flushing.
        """
        self.batch_size = batch_size
        self.max_concurrency = max_concurrency
        self.flush_interval = flush_interval
        
        self.items = []
        self.flush_task = None
        self.semaphore = asyncio.Semaphore(max_concurrency)
        
    async def add(self, item: Any):
        """
        Add an item to the current batch for processing.
        
        This method adds the provided item to the internal collection batch.
        The item will be processed when either:
        1. The batch reaches the configured batch_size
        2. The flush_interval elapses (if configured)
        3. A manual flush() is called
        
        The method automatically triggers a flush operation if the number of
        collected items reaches the configured batch_size.
        
        If a flush_interval is set and no auto-flush task is running, this method
        also initializes a background task to automatically flush items after
        the specified interval.
        
        Args:
            item: The item to add to the batch. Can be any type that your
                 _process_batch implementation can handle.
                 
        Returns:
            None
            
        Example:
            ```python
            processor = MyBatchProcessor(batch_size=50)
            await processor.add({"id": 1, "value": "data"})
            ```
        """
        self.items.append(item)
        
        # Start flush task if needed
        if self.flush_interval and not self.flush_task:
            self.flush_task = asyncio.create_task(self._auto_flush())
            
        # Flush if batch is full
        if len(self.items) >= self.batch_size:
            await self.flush()
            
    async def flush(self) -> List[Any]:
        """
        Process all currently batched items immediately.
        
        This method forces processing of all currently collected items, regardless
        of whether the batch is full or the flush interval has elapsed. It's useful
        when you need to ensure all items are processed without waiting, such as:
        
        - When shutting down the application
        - Before a checkpoint or commit operation
        - When immediate processing is needed for time-sensitive data
        - At the end of a processing cycle
        
        The method handles empty batches gracefully, returning an empty list
        when there are no items to process.
        
        Returns:
            List of results from processing the batch. The exact content depends
            on what the _process_batch implementation returns. Returns an empty
            list if there were no items to process.
            
        Example:
            ```python
            # Process any pending items immediately
            results = await processor.flush()
            ```
        """
        if not self.items:
            return []
            
        # Get current items
        items = self.items
        self.items = []
        
        # Cancel flush task if running
        if self.flush_task:
            self.flush_task.cancel()
            self.flush_task = None
            
        # Process the batch
        return await self._process_batch(items)
        
    async def _auto_flush(self):
        """
        Internal background task for automatic periodic flushing.
        
        This method implements the auto-flush functionality that runs periodically
        when flush_interval is set. It sleeps for the configured interval and then
        triggers a flush operation if any items are pending.
        
        The task continues running until:
        1. It's cancelled (typically when a manual flush occurs)
        2. The processor is shut down (via __aexit__)
        
        This method is not intended to be called directly but is started automatically
        by the add() method when needed and a flush_interval is configured.
        
        Raises:
            asyncio.CancelledError: When the task is cancelled. This is caught
                                   internally and used to terminate the loop.
        """
        try:
            while True:
                await asyncio.sleep(self.flush_interval)
                if self.items:
                    await self.flush()
        except asyncio.CancelledError:
            # Task was cancelled, which is expected
            pass
            
    async def _process_batch(self, batch: List[Any]) -> List[Any]:
        """
        Process a batch of items (to be overridden by subclasses).
        
        This method should be overridden by subclasses to implement the actual
        batch processing logic. The base implementation simply returns the batch
        unchanged and logs a warning, as it's not meant to be used directly.
        
        When implementing this method in a subclass, typical patterns include:
        - Sending a bulk API request with all batch items
        - Executing a batch database operation
        - Processing items in parallel with controlled concurrency
        - Aggregating items for a combined operation
        
        Args:
            batch: List of items to process that were collected via add()
            
        Returns:
            List of processed results, where each result corresponds to an item
            in the input batch. The actual return type depends on the specific
            implementation in the subclass.
            
        Example implementation:
            ```python
            async def _process_batch(self, batch: List[dict]) -> List[dict]:
                # Add a batch_id to each item
                batch_id = str(uuid.uuid4())
                for item in batch:
                    item['batch_id'] = batch_id
                    
                # Send to database in a single operation
                results = await self.db.insert_many(batch)
                return results
            ```
        """
        # This should be overridden by subclasses
        logger.warning(
            f"Default batch processing used for {len(batch)} items",
            emoji_key="warning"
        )
        return batch
        
    async def __aenter__(self):
        """
        Enter the async context manager.
        
        Allows the batch processor to be used in an async with statement,
        which ensures proper cleanup when the context is exited.
        
        Returns:
            The batch processor instance, ready for use.
            
        Example:
            ```python
            async with MyBatchProcessor(batch_size=100) as processor:
                for item in items:
                    await processor.add(item)
            # All pending items are automatically flushed when the context exits
            ```
        """
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        Exit the async context manager.
        
        This method is called when exiting an async with block. It ensures
        that any pending items are flushed before the context manager completes,
        preventing data loss when the processor goes out of scope.
        
        Args:
            exc_type: Exception type if an exception was raised in the context
            exc_val: Exception value if an exception was raised
            exc_tb: Exception traceback if an exception was raised
            
        Returns:
            False, indicating that any exceptions should be propagated.
        """
        # Flush any remaining items
        if self.items:
            await self.flush()
```

--------------------------------------------------------------------------------
/examples/tournament_code_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Tournament Code Demo - Demonstrates running a code improvement tournament

This script shows how to:
1. Create a tournament with multiple models, including diversity and evaluators.
2. Track progress across multiple rounds.
3. Retrieve and analyze the improved code and evaluation scores.

The tournament task is to write and iteratively improve a Python function for
parsing messy CSV data, handling various edge cases.

Usage:
  python examples/tournament_code_demo.py [--task TASK] [--rounds N]

Options:
  --task TASK       Specify a coding task (default: parse_csv)
  --rounds N        Number of tournament rounds (default: 2)
"""

import argparse
import asyncio
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional  # Added List

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax  # For displaying code

# Add these imports to fix undefined names
from ultimate_mcp_server.core.models.tournament import TournamentStatus

# Assuming Gateway, PromptTemplate, etc. are correctly located
from ultimate_mcp_server.core.server import Gateway
from ultimate_mcp_server.exceptions import ProviderError, ToolError
from ultimate_mcp_server.services.prompts import PromptTemplate  # If used

# Import tournament tools for manual registration
from ultimate_mcp_server.tools.tournament import (
    create_tournament,
    get_tournament_results,
    get_tournament_status,
)
from ultimate_mcp_server.utils import (
    get_logger,
    process_mcp_result,
)  # process_mcp_result might need updates
from ultimate_mcp_server.utils.display import (  # Ensure these are updated for new TournamentData structure
    CostTracker,
    display_tournament_results,  # This will need significant updates
    display_tournament_status,  # This likely needs updates too
)
from ultimate_mcp_server.utils.logging.console import console

# Initialize logger
logger = get_logger("example.tournament_code")

# Global gateway instance (initialized in setup_gateway)
gateway: Optional[Gateway] = None

# --- Configuration ---
DEFAULT_MODEL_CONFIGS: List[Dict[str, Any]] = [
    {
        "model_id": "openai/gpt-4o-mini",  # Example, use your actual model IDs
        "diversity_count": 2,  # Generate 2 variants from this model
        "temperature": 0.7,
    },
    {
        "model_id": "anthropic/claude-3-5-haiku-20241022",
        "diversity_count": 1,
        "temperature": 0.6,
    },
    # Add more models as available/desired
    # {
    #     "model_id": "google/gemini-1.5-flash-latest",
    #     "diversity_count": 1,
    # },
]
DEFAULT_NUM_ROUNDS = 2
DEFAULT_TOURNAMENT_NAME = "Advanced Code Improvement Tournament"

# Default Evaluators
DEFAULT_EVALUATORS: List[Dict[str, Any]] = [
    {
        "evaluator_id": "python_syntax_checker",
        "type": "regex_match",
        "params": {
            "patterns": [r"^\s*def\s+\w+\(.*\):|^\s*class\s+\w+:"],  # Basic check for def/class
            "target_field": "extracted_code",
            "match_mode": "any_can_match",
        },
        "weight": 0.2,
        "primary_metric": False,
    },
    {
        "evaluator_id": "code_length_penalty",  # Example: Penalize overly short/long code
        "type": "regex_match",  # Could be a custom evaluator
        "params": {
            # This regex means: content has between 5 and 500 lines (approx)
            "patterns": [r"^(?:[^\n]*\n){4,499}[^\n]*$"],
            "target_field": "extracted_code",
            "match_mode": "all_must_match",  # Must be within the line range
            "regex_flag_options": ["MULTILINE", "DOTALL"],
        },
        "weight": 0.1,
    },
    {
        "evaluator_id": "llm_code_grader",
        "type": "llm_grader",
        "params": {
            "model_id": "anthropic/claude-3-5-haiku-20241022",  # Use a cost-effective grader
            "rubric": (
                "Evaluate the provided Python code based on the original prompt. "
                "Score from 0-100 considering: \n"
                "1. Correctness & Robustness (does it likely solve the problem, handle edges?).\n"
                "2. Efficiency (algorithmic complexity, resource usage).\n"
                "3. Readability & Maintainability (clarity, comments, Pythonic style).\n"
                "4. Completeness (are all requirements addressed?).\n"
                "Provide a 'Score: XX' line and a brief justification."
            ),
        },
        "weight": 0.7,  # Main evaluator
        "primary_metric": True,
    },
]


# The generic code prompt template
TEMPLATE_CODE = """
# GENERIC CODE TOURNAMENT PROMPT TEMPLATE

Write a {{code_type}} that {{task_description}}.

{{context}}

Your solution should:
{% for requirement in requirements %}
{{ loop.index }}. {{requirement}}
{% endfor %}

{% if example_inputs %}
Example inputs:
```
{{example_inputs}}
```
{% endif %}

{% if example_outputs %}
Expected outputs:
```
{{example_outputs}}
```
{% endif %}

Provide ONLY the Python code for your solution, enclosed in triple backticks (```python ... ```).
No explanations before or after the code block, unless they are comments within the code itself.
"""

# Define predefined tasks
TASKS = {
    "parse_csv": {
        "code_type": "Python function",
        "task_description": "parses a CSV string that may use different delimiters and contains various edge cases",
        "context": "Your function should be robust enough to handle real-world messy CSV data.",
        "requirements": [
            "Implement `parse_csv_string(csv_data: str) -> list[dict]`",
            "Accept a string `csv_data` which might contain CSV data",
            "Automatically detect the delimiter (comma, semicolon, or tab)",
            "Handle quoted fields correctly, including escaped quotes within fields",
            "Treat the first row as the header",
            "Return a list of dictionaries, where each dictionary represents a row",
            "Handle errors gracefully by logging warnings and skipping problematic rows",
            "Return an empty list if the input is empty or only contains a header",
            "Include necessary imports (e.g., `csv`, `io`).",
            "Be efficient for moderately large inputs (e.g., up to 1MB).",
        ],
        "example_inputs": """name,age,city\n"Smith, John",42,New York\n"Doe, Jane";39;"Los Angeles, CA"\n"\\"Williams\\", Bob"\t65\t"Chicago" """,
        "example_outputs": """[\n    {"name": "Smith, John", "age": "42", "city": "New York"},\n    {"name": "Doe, Jane", "age": "39", "city": "Los Angeles, CA"},\n    {"name": "\\"Williams\\", Bob", "age": "65", "city": "Chicago"}\n]""",
    },
    # Add other tasks (calculator, string_util) here if needed, similar structure
}


def create_custom_task_variables(task_description_custom: str):
    return {
        "code_type": "Python function",
        "task_description": task_description_custom,
        "context": "Ensure your solution is well-documented and handles potential edge cases.",
        "requirements": [
            "Implement the solution as specified in the task description.",
            "Write clean, readable, and efficient Python code.",
            "Include type hints and comprehensive docstrings.",
            "Handle potential errors gracefully.",
            "Make sure all necessary imports are included.",
        ],
        "example_inputs": "# Provide relevant example inputs if applicable",
        "example_outputs": "# Provide expected outputs for the examples if applicable",
    }


def parse_arguments():
    parser = argparse.ArgumentParser(description="Run a code improvement tournament demo")
    parser.add_argument(
        "--task",
        type=str,
        default="parse_csv",
        choices=list(TASKS.keys()) + ["custom"],
        help="Coding task (default: parse_csv)",
    )
    parser.add_argument(
        "--custom-task", type=str, help="Custom coding task description (used when --task=custom)"
    )
    parser.add_argument(
        "--rounds",
        type=int,
        default=DEFAULT_NUM_ROUNDS,
        help=f"Number of tournament rounds (default: {DEFAULT_NUM_ROUNDS})",
    )
    parser.add_argument(
        "--models",
        type=str,
        nargs="+",
        default=[mc["model_id"] for mc in DEFAULT_MODEL_CONFIGS],  # Pass only model_id strings
        help="List of model IDs to participate (e.g., 'openai/gpt-4o-mini'). Overrides default models.",
    )
    return parser.parse_args()


async def setup_gateway_for_demo():
    global gateway
    if gateway:
        return

    logger.info("Initializing gateway for code tournament demo...", emoji_key="rocket")
    # Assuming Gateway constructor and _initialize_providers are async
    # The actual Gateway might not need to be created this way if it's a singleton managed by server.py
    # For a standalone demo, this direct instantiation is okay.
    # For integration, you'd likely get the existing gateway instance.
    try:
        # This is a simplified setup. In a real server, Gateway might be a singleton.
        # Here, we create a new one for the demo.
        # Ensure your actual Gateway class can be instantiated and initialized like this.
        # from ultimate_mcp_server.core import get_gateway_instance, async_init_gateway
        # gateway = get_gateway_instance()
        # if not gateway:
        #     gateway = await async_init_gateway() # This sets the global instance

        # For a script, direct instantiation:
        gateway = Gateway(
            name="code_tournament_demo_gateway", register_tools=False  # Changed: register_tools=False, removed load_all_tools
        )
        # In a script, you might need to manually initialize providers if not done by Gateway constructor
        if not gateway.providers:  # Check if providers are already initialized
            await gateway._initialize_providers()

        # Manually register tournament tools
        mcp = gateway.mcp
        mcp.tool()(create_tournament)
        mcp.tool()(get_tournament_status)
        mcp.tool()(get_tournament_results)
        logger.info("Manually registered tournament tools for the demo.")

    except Exception as e:
        logger.critical(f"Failed to initialize Gateway: {e}", exc_info=True)
        raise

    # Verify tournament tools are registered (they should be if register_tools=True and load_all_tools=True)
    # This check is more for sanity.
    mcp_tools = await gateway.mcp.list_tools()
    registered_tool_names = [t.name for t in mcp_tools]
    required_tournament_tools = [
        "create_tournament",
        "get_tournament_status",
        "get_tournament_results",
    ]
    missing_tools = [
        tool for tool in required_tournament_tools if tool not in registered_tool_names
    ]

    if missing_tools:
        logger.error(
            f"Gateway initialized, but required tournament tools are missing: {missing_tools}",
            emoji_key="error",
        )
        logger.info(f"Available tools: {registered_tool_names}")
        raise RuntimeError(f"Required tournament tools not registered: {missing_tools}")

    logger.success(
        "Gateway for demo initialized and tournament tools verified.", emoji_key="heavy_check_mark"
    )


async def poll_tournament_status_enhanced(
    tournament_id: str, storage_path: Optional[str] = None, interval: int = 10
) -> Optional[str]:
    logger.info(
        f"Polling status for tournament {tournament_id} (storage: {storage_path})...",
        emoji_key="hourglass",
    )
    final_states = [
        status.value
        for status in [
            TournamentStatus.COMPLETED,
            TournamentStatus.FAILED,
            TournamentStatus.CANCELLED,
        ]
    ]

    while True:
        status_input = {"tournament_id": tournament_id}
        status_result_raw = await gateway.mcp.call_tool("get_tournament_status", status_input)

        # Process MCP result to get the dictionary
        status_data_dict = await process_mcp_result(status_result_raw)  # Ensure this returns a dict

        if "error" in status_data_dict or not status_data_dict.get(
            "success", True
        ):  # Check for tool call error
            error_message = status_data_dict.get(
                "error_message", status_data_dict.get("error", "Unknown error fetching status")
            )
            if storage_path and "not found" in error_message.lower():
                # Fallback to direct file reading
                state_file = Path(storage_path) / "tournament_state.json"
                logger.debug(f"Tournament not found via API, trying direct read: {state_file}")
                if state_file.exists():
                    try:
                        direct_state = json.loads(state_file.read_text())
                        current_status = direct_state.get("status")
                        # Reconstruct a compatible status_data_dict for display
                        status_data_dict = {
                            "tournament_id": tournament_id,
                            "name": direct_state.get("name"),
                            "tournament_type": direct_state.get("config", {}).get(
                                "tournament_type"
                            ),
                            "status": current_status,
                            "current_round": direct_state.get("current_round", -1)
                            + 1,  # Adjust for display
                            "total_rounds": direct_state.get("config", {}).get("rounds", 0),
                            "progress_summary": f"Read from file. Round {direct_state.get('current_round', -1) + 1}.",
                            "created_at": direct_state.get("created_at"),
                            "updated_at": direct_state.get("updated_at"),
                            "error_message": direct_state.get("error_message"),
                        }
                        logger.info(f"Successfully read direct state from file: {current_status}")
                    except Exception as e:
                        logger.error(f"Error reading state file directly: {e}")
                        # Keep original error message
                else:
                    logger.warning(f"Fallback state file not found: {state_file}")
            else:  # Non-"not found" error or no storage path
                logger.error(
                    f"Error fetching tournament status: {error_message}", emoji_key="error"
                )
                return None  # Indicate polling error

        # Display status using the utility function (ensure it handles the new dict structure)
        display_tournament_status(status_data_dict)  # Expects a dict

        current_status_val = status_data_dict.get("status")
        if current_status_val in final_states:
            logger.success(
                f"Tournament {tournament_id} reached final state: {current_status_val}",
                emoji_key="heavy_check_mark",
            )
            return current_status_val

        await asyncio.sleep(interval)


# --- Robust result processing for demo ---
async def robust_process_mcp_result(result_raw, storage_path=None):
    from ultimate_mcp_server.utils import process_mcp_result
    try:
        processed = await process_mcp_result(result_raw)
        # If no error, or error is not about JSON, return as is
        if not processed.get("error") or "LLM repair" not in processed.get("error", ""):
            return processed
    except Exception as e:
        processed = {"error": f"Exception in process_mcp_result: {e}"}

    # Fallback: try to load from file if storage_path is provided
    if storage_path:
        state_file = Path(storage_path) / "tournament_state.json"
        if state_file.exists():
            try:
                with open(state_file, "r", encoding="utf-8") as f:
                    return json.load(f)
            except Exception as file_e:
                return {"error": f"Failed to parse both API and file: {file_e}"}
    # Otherwise, return a clear error
    return {"error": f"API did not return JSON. Raw: {str(result_raw)[:200]}"}


async def run_code_tournament_demo(tracker: CostTracker, args: argparse.Namespace):
    if args.task == "custom":
        if not args.custom_task:
            console.print(
                "[bold red]Error:[/bold red] --custom-task description must be provided when --task=custom."
            )
            return 1
        task_name = "custom_task"
        task_vars = create_custom_task_variables(args.custom_task)
        task_description_log = args.custom_task
    elif args.task in TASKS:
        task_name = args.task
        task_vars = TASKS[task_name]
        task_description_log = task_vars["task_description"]
    else:  # Should not happen due to argparse choices
        console.print(f"[bold red]Error:[/bold red] Unknown task '{args.task}'.")
        return 1

    console.print(
        Rule(
            f"[bold blue]{DEFAULT_TOURNAMENT_NAME} - Task: {task_name.replace('_', ' ').title()}[/bold blue]"
        )
    )
    console.print(f"Task Description: [yellow]{escape(task_description_log)}[/yellow]")

    # Prepare model_configs based on CLI input or defaults
    # The tool now expects list of dicts for model_configs
    current_model_configs = []
    if args.models == [mc["model_id"] for mc in DEFAULT_MODEL_CONFIGS]:  # Default models used
        current_model_configs = DEFAULT_MODEL_CONFIGS
        console.print(
            f"Using default models: [cyan]{', '.join([mc['model_id'] for mc in current_model_configs])}[/cyan]"
        )
    else:  # Custom models from CLI
        for model_id_str in args.models:
            # Find if this model_id_str matches any default config to get its diversity/temp
            # This is a simple way; a more complex CLI could allow full ModelConfig dicts
            default_mc = next(
                (mc for mc in DEFAULT_MODEL_CONFIGS if mc["model_id"] == model_id_str), None
            )
            if default_mc:
                current_model_configs.append(default_mc)
            else:  # Model from CLI not in defaults, use basic config
                current_model_configs.append({"model_id": model_id_str, "diversity_count": 1})
        console.print(f"Using CLI specified models: [cyan]{', '.join(args.models)}[/cyan]")

    console.print(f"Rounds: [cyan]{args.rounds}[/cyan]")
    console.print(
        f"Evaluators: [cyan]{', '.join([e['evaluator_id'] for e in DEFAULT_EVALUATORS])}[/cyan]"
    )

    code_prompt_template = PromptTemplate(
        template=TEMPLATE_CODE,
        template_id="demo_code_prompt",
        required_vars=["code_type", "task_description", "context", "requirements", "example_inputs", "example_outputs"]
    )
    try:
        initial_prompt = code_prompt_template.render(task_vars)
    except Exception as e:
        logger.error(f"Failed to render prompt template: {e}", exc_info=True)
        return 1

    console.print(
        Panel(
            escape(initial_prompt[:500] + "..."),
            title="[bold]Initial Prompt Preview[/bold]",
            border_style="dim",
        )
    )

    create_input = {
        "name": f"{DEFAULT_TOURNAMENT_NAME} - {task_name.replace('_', ' ').title()}",
        "prompt": initial_prompt,
        "models": current_model_configs,  # This should be List[Dict] for the tool
        "rounds": args.rounds,
        "tournament_type": "code",
        "evaluators": DEFAULT_EVALUATORS,  # Pass evaluator configs
        # Add other new config params if desired, e.g., extraction_model_id
        "extraction_model_id": "anthropic/claude-3-5-haiku-20241022",
        "max_retries_per_model_call": 2,
        "max_concurrent_model_calls": 3,
    }

    tournament_id: Optional[str] = None
    storage_path: Optional[str] = None

    try:
        logger.info("Creating code tournament...", emoji_key="gear")
        create_result_raw = await gateway.mcp.call_tool("create_tournament", create_input)
        create_data = await process_mcp_result(
            create_result_raw
        )  # process_mcp_result must return dict

        # Corrected error handling, similar to tournament_text_demo.py
        if "error" in create_data:
            error_msg = create_data.get("error_message", create_data.get("error", "Unknown error creating tournament"))
            logger.error(f"Failed to create tournament: {error_msg}", emoji_key="cross_mark")
            console.print(f"[bold red]Error creating tournament:[/bold red] {escape(error_msg)}")
            return 1

        tournament_id = create_data.get("tournament_id")
        storage_path = create_data.get("storage_path")  # Get storage_path

        if not tournament_id:
            logger.error(
                "No tournament ID returned from create_tournament call.", emoji_key="cross_mark"
            )
            console.print("[bold red]Error: No tournament ID returned.[/bold red]")
            return 1

        console.print(
            f"Tournament [bold green]'{create_input['name']}'[/bold green] created successfully!"
        )
        console.print(f"  ID: [yellow]{tournament_id}[/yellow]")
        console.print(f"  Status: [magenta]{create_data.get('status')}[/magenta]")
        if storage_path:
            console.print(f"  Storage Path: [blue underline]{storage_path}[/blue underline]")

        await asyncio.sleep(1)  # Brief pause for task scheduling

        final_status_val = await poll_tournament_status_enhanced(
            tournament_id, storage_path, interval=10
        )

        if final_status_val == TournamentStatus.COMPLETED.value:
            logger.info(
                f"Tournament {tournament_id} completed. Fetching final results...",
                emoji_key="sports_medal",
            )
            results_input = {"tournament_id": tournament_id}
            results_raw = await gateway.mcp.call_tool("get_tournament_results", results_input)
            processed_results_dict = await robust_process_mcp_result(
                results_raw, storage_path
            )

            results_data_dict = processed_results_dict
            workaround_applied_successfully = False

            # If process_mcp_result itself signals an error
            # (This will be true if JSON parsing failed and LLM repair also failed to produce valid JSON)
            if "error" in processed_results_dict: # Simpler check for any error from process_mcp_result
                original_error_msg = processed_results_dict.get("error", "Unknown error processing results")
                logger.warning(
                    f"Initial processing of 'get_tournament_results' failed with: {original_error_msg}"
                )

                # Attempt workaround if it's a code tournament, storage path is known, 
                # AND the initial processing via MCP failed.
                current_tournament_type = create_input.get("tournament_type", "unknown")
                if current_tournament_type == "code" and storage_path:
                    logger.info(
                        f"Applying workaround for 'get_tournament_results' failure. "
                        f"Attempting to load results directly from storage: {storage_path}"
                    )
                    state_file_path = Path(storage_path) / "tournament_state.json"
                    if state_file_path.exists():
                        try:
                            with open(state_file_path, 'r', encoding='utf-8') as f:
                                results_data_dict = json.load(f)  # Override with data from file
                            logger.success(
                                f"Workaround successful: Loaded results from {state_file_path}"
                            )
                            workaround_applied_successfully = True
                        except Exception as e:
                            logger.error(
                                f"Workaround failed: Could not load or parse {state_file_path}: {e}"
                            )
                            # results_data_dict remains processed_results_dict (the error dict from initial processing)
                    else:
                        logger.warning(
                            f"Workaround failed: State file not found at {state_file_path}"
                        )
                        # results_data_dict remains processed_results_dict (the error dict from initial processing)
                # If not a code tournament, or no storage path, or workaround failed,
                # results_data_dict is still the original error dict from processed_results_dict
            
            # Now, check the final results_data_dict (either from tool or successful workaround)
            # This outer check sees if results_data_dict *still* has an error after potential workaround
            if "error" in results_data_dict: 
                # This block will be hit if:
                # 1. Original tool call failed AND it wasn't the specific known issue for the workaround.
                # 2. Original tool call failed with the known issue, BUT the workaround also failed (e.g., file not found, parse error).
                final_error_msg = results_data_dict.get("error_message", results_data_dict.get("error", "Unknown error"))
                logger.error(
                    f"Failed to get tournament results (workaround_applied_successfully={workaround_applied_successfully}): {final_error_msg}",
                    emoji_key="cross_mark"
                )
                console.print(f"[bold red]Error fetching results:[/bold red] {escape(final_error_msg)}")
            else:
                # Successfully got data, either from tool or workaround
                if workaround_applied_successfully:
                    console.print(
                        "[yellow i](Workaround applied: Results loaded directly from tournament_state.json)[/yellow i]"
                    )
                
                # Pass the full dictionary results_data_dict to display_tournament_results
                display_tournament_results(
                    results_data_dict, console
                )  # Ensure this function handles the new structure

                # Example of accessing overall best response
                overall_best_resp_data = results_data_dict.get("overall_best_response")
                if overall_best_resp_data:
                    console.print(
                        Rule("[bold green]Overall Best Response Across All Rounds[/bold green]")
                    )
                    best_variant_id = overall_best_resp_data.get("model_id_variant", "N/A")
                    best_score = overall_best_resp_data.get("overall_score", "N/A")
                    console.print(
                        f"Best Variant: [cyan]{best_variant_id}[/cyan] (Score: {best_score:.2f if isinstance(best_score, float) else 'N/A'})"
                    )
                    best_code = overall_best_resp_data.get("extracted_code")
                    if best_code:
                        console.print(
                            Panel(
                                Syntax(best_code, "python", theme="monokai", line_numbers=True),
                                title=f"Code from {best_variant_id}",
                                border_style="green",
                            )
                        )
                    else:
                        console.print(
                            "[yellow]No extracted code found for the overall best response.[/yellow]"
                        )

                # Try to find and mention the leaderboard file from the last round
                last_round_num = results_data_dict.get("config", {}).get("rounds", 0) - 1
                if last_round_num >= 0 and last_round_num < len(
                    results_data_dict.get("rounds_results", [])
                ):
                    last_round_data = results_data_dict["rounds_results"][last_round_num]
                    leaderboard_file = last_round_data.get("leaderboard_file_path")
                    if leaderboard_file:
                        console.print(
                            f"\nCheck the final leaderboard: [blue underline]{leaderboard_file}[/blue underline]"
                        )
                    comparison_file = last_round_data.get("comparison_file_path")
                    if comparison_file:
                        console.print(
                            f"Check the final round comparison: [blue underline]{comparison_file}[/blue underline]"
                        )

        elif final_status_val:  # FAILED or CANCELLED
            logger.warning(
                f"Tournament {tournament_id} ended with status: {final_status_val}",
                emoji_key="warning",
            )
            console.print(
                f"[bold yellow]Tournament ended with status: {final_status_val}[/bold yellow]"
            )
            # Optionally fetch results for FAILED tournaments to see partial data / error
            if final_status_val == TournamentStatus.FAILED.value:
                results_input = {"tournament_id": tournament_id}
                results_raw = await gateway.mcp.call_tool("get_tournament_results", results_input)
                results_data_dict = await robust_process_mcp_result(
                    results_raw, storage_path
                )
                if results_data_dict and not results_data_dict.get(
                    "error_message"
                ):  # Check success of get_tournament_results
                    display_tournament_results(results_data_dict, console)  # Display what we have
        else:  # Polling failed
            logger.error(f"Polling failed for tournament {tournament_id}.", emoji_key="cross_mark")
            console.print(f"[bold red]Polling failed for tournament {tournament_id}.[/bold red]")

    except (ToolError, ProviderError, Exception) as e:  # Catch more general errors
        logger.error(
            f"An error occurred during the code tournament demo: {e}",
            exc_info=True,
            emoji_key="error",
        )
        console.print(f"[bold red]Demo Error:[/bold red] {escape(str(e))}")
        return 1
    finally:
        tracker.display_summary(console)
        logger.info("Code tournament demo finished.", emoji_key="party_popper")
    return 0


async def main_async():
    args = parse_arguments()
    tracker = CostTracker()
    exit_code = 1  # Default to error
    try:
        await setup_gateway_for_demo()
        exit_code = await run_code_tournament_demo(tracker, args)
    except Exception as e:
        console.print(
            f"[bold red]Critical error in demo setup or execution:[/bold red] {escape(str(e))}"
        )
        logger.critical(f"Demo main_async failed: {e}", exc_info=True)
    finally:
        # Simplified cleanup, similar to tournament_text_demo.py
        if gateway:
            # Perform any necessary general gateway cleanup if available in the future
            # For now, specific sandbox closing is removed as it caused issues and
            # repl_python is not explicitly registered/used by this demo with register_tools=False
            pass
        logger.info("Demo finished.")
    return exit_code


if __name__ == "__main__":
    try:
        final_exit_code = asyncio.run(main_async())
    except KeyboardInterrupt:
        console.print("\n[bold yellow]Demo interrupted by user.[/bold yellow]")
        final_exit_code = 130  # Standard exit code for Ctrl+C
    sys.exit(final_exit_code)

```
Page 8/35FirstPrevNextLast