This is page 15 of 19. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------
```markdown
# CHANGELOG
<!-- version list -->
## v9.1.3 (2025-10-22)
### Bug Fixes
- Reduced token usage, removed parameters from schema that CLIs never seem to use
([`3e27319`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/3e27319e60b0287df918856b58b2bbf042c948a8))
- Telemetry option no longer available in gemini 0.11
([`2a8dff0`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/2a8dff0cc8a3f33111533cdb971d654637ed0578))
### Chores
- Sync version to config.py [skip ci]
([`9e163f9`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/9e163f9dc0654fc28961c9897b7c787a2b96e57d))
- Sync version to config.py [skip ci]
([`557e443`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/557e443a63ffd733fb41faaa8696f6f4bb2c2fd1))
### Refactoring
- Improved precommit system prompt
([`3efff60`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/3efff6056e322ee1531d7bed5601038c129a8b29))
## v9.1.2 (2025-10-21)
### Bug Fixes
- Configure codex with a longer timeout
([`d2773f4`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d2773f488af28986632846652874de9ff633049c))
- Handle claude's array style JSON https://github.com/BeehiveInnovations/zen-mcp-server/issues/295
([`d5790a9`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d5790a9bfef719f03d17f2d719f1882e55d13b3b))
### Chores
- Sync version to config.py [skip ci]
([`04132f1`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/04132f1459f1e086afd8e3d456f671b63338f846))
## v9.1.1 (2025-10-17)
### Bug Fixes
- Failing test
([`aed3e3e`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/aed3e3ee80c440ac8ab0d4abbf235b84df723d18))
- Handler for parsing multiple generated code blocks
([`f4c20d2`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/f4c20d2a20e1c57d8b10e8f508e07e2a8d72f94a))
- Improved error reporting; codex cli would at times fail to figure out how to handle plain-text /
JSON errors
([`95e69a7`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/95e69a7cb234305dcd37dcdd2f22be715922e9a8))
### Chores
- Sync version to config.py [skip ci]
([`942757a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/942757a360a74c021b2a1aa63e394f18f5abcecd))
## v9.1.0 (2025-10-17)
### Chores
- Sync version to config.py [skip ci]
([`3ee0c8f`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/3ee0c8f555cb51b975700290919c2a8e2ada8cc4))
### Features
- Enhance review prompts to emphasize static analysis
([`36e66e2`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/36e66e2e9a44a73a466545d4d3477ecb2cb3e669))
## v9.0.4 (2025-10-17)
### Chores
- Sync version to config.py [skip ci]
([`8c6f653`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/8c6f6532d843f7f1b283ce9b6472e5ba991efe16))
## v9.0.3 (2025-10-16)
### Bug Fixes
- Remove duplicate -o json flag in gemini CLI config
([`3b2eff5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/3b2eff58ac0e2388045a7442c63f56ce259b54ba))
### Chores
- Sync version to config.py [skip ci]
([`b205d71`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/b205d7159b674ce47ebc11af7255d1e3556fff93))
## v9.0.2 (2025-10-15)
### Bug Fixes
- Update Claude CLI commands to new mcp syntax
([`a2189cb`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/a2189cb88a295ebad6268b9b08c893cd65bc1d89))
### Chores
- Sync version to config.py [skip ci]
([`d08cdc6`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d08cdc6691e0f68917f2824945905b7256e0e568))
## v9.0.1 (2025-10-14)
### Bug Fixes
- Add JSON output flag to gemini CLI configuration
([`eb3dff8`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/eb3dff845828f60ff2659586883af622b8b035eb))
### Chores
- Sync version to config.py [skip ci]
([`b9408aa`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/b9408aae8860d43b1da0ba67f9db98db7e4de2cf))
## v9.0.0 (2025-10-08)
### Chores
- Sync version to config.py [skip ci]
([`23c9b35`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/23c9b35d5226b07b59a4c4b3d7833ba81b019ea8))
### Features
- Claude Code as a CLI agent now supported. Mix and match: spawn claude code from within claude
code, or claude code from within codex.
([`4cfaa0b`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/4cfaa0b6060769adfbd785a072526a5368421a73))
## v8.0.2 (2025-10-08)
### Bug Fixes
- Restore run-server quote trimming regex
([`1de4542`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/1de454224c105891137134e2a25c2ee4f00dba45))
### Chores
- Sync version to config.py [skip ci]
([`728fb43`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/728fb439b929c9dc37646b24537ae043208fda7d))
## v8.0.1 (2025-10-08)
### Bug Fixes
- Resolve executable path for cross-platform compatibility in CLI agent
([`f98046c`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/f98046c2fccaa7f9a24665a0d705a98006461da5))
### Chores
- Sync version to config.py [skip ci]
([`52245b9`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/52245b91eaa5d720f8c3b21ead55248dd8e8bd57))
### Testing
- Fix clink agent tests to mock shutil.which() for executable resolution
([`4370be3`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/4370be33b4b69a40456527213bcd62321a925a57))
## v8.0.0 (2025-10-07)
### Chores
- Sync version to config.py [skip ci]
([`4c34541`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/4c3454121c3c678cdfe8ea03fa77f4dd414df9bc))
## v7.8.1 (2025-10-07)
### Bug Fixes
- Updated model description to fix test
([`04f7ce5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/04f7ce5b03804564263f53a765931edba9c320cd))
### Chores
- Sync version to config.py [skip ci]
([`c27e81d`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/c27e81d6d2f22978816f798a161a869d1ab5f025))
### Refactoring
- Moved registries into a separate module and code cleanup
([`7c36b92`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/7c36b9255a13007a10af4fadefc21aadfce482b0))
## v7.8.0 (2025-10-07)
### Chores
- Sync version to config.py [skip ci]
([`3e5fa96`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/3e5fa96c981bbd7b844a9887a518ffe266b78e9b))
### Documentation
- Consensus video
([`2352684`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/23526841922a73c68094e5205e19af04a1f6c8cc))
- Formatting
([`7d7c74b`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/7d7c74b5a38b7d1adf132b8e28034017df7aa852))
- Link to videos from main page
([`e8ef193`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/e8ef193daba393b55a3beaaba49721bb9182378a))
- Update README.md
([`7b13543`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/7b13543824fc0af729daf753ecdddba9ee7d9f1e))
### Features
- All native providers now read from catalog files like OpenRouter / Custom configs. Allows for
greater control over the capabilities
([`2a706d5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/2a706d5720c0bf97b71c3e0fc95c15f78015bedf))
- Provider cleanup
([`9268dda`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/9268ddad2a07306351765b47098134512739f49f))
### Refactoring
- New base class for model registry / loading
([`02d13da`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/02d13da897016d7491b4a10a1195983385d66654))
## v7.7.0 (2025-10-07)
### Chores
- Sync version to config.py [skip ci]
([`70ae62a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/70ae62a2cd663c3abcabddd1be1bc6ed9512d7df))
### Documentation
- Video
([`ed5dda7`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/ed5dda7c5a9439c2835cc69d76e6377169ad048a))
### Features
- More aliases
([`5f0aaf5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/5f0aaf5f69c9d188d817b5ffbf6738c61da40ec7))
## v7.6.0 (2025-10-07)
### Chores
- Sync version to config.py [skip ci]
([`c1c75ba`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/c1c75ba304c2840329650c46273e87eab9b05906))
- Sync version to config.py [skip ci]
([`0fa9b66`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/0fa9b6658099c8e0d79fda0c7d2347f62d0e6137))
### Documentation
- Info about AI client timeouts
([`3ddfed5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/3ddfed5ef09000791e1c94b041c43dc273ed53a8))
### Features
- Add support for openai/gpt-5-pro model
([`abed075`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/abed075b2eaa99e9618202f47ff921094baae952))
## v7.5.2 (2025-10-06)
### Bug Fixes
- Handle 429 response https://github.com/BeehiveInnovations/zen-mcp-server/issues/273
([`cbe1d79`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/cbe1d7993276bd014b495cbd2d0ece1f5d7583d9))
### Chores
- Sync version to config.py [skip ci]
([`74fdd36`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/74fdd36de92d34681fcc5a2f772c3d05634f0a55))
## v7.5.1 (2025-10-06)
### Chores
- Sync version to config.py [skip ci]
([`004e379`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/004e379cf2f1853829dccb15fa72ec18d282f1a4))
## v7.5.0 (2025-10-06)
### Chores
- Sync version to config.py [skip ci]
([`71e7cd5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/71e7cd55b1f4955a6d718fddc0de419414d133b6))
### Documentation
- Video
([`775e4d5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/775e4d50b826858095c5f2a61a07fc01c4a00816))
- Videos
([`bb2066c`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/bb2066c909f6581ba40fc5ddef3870954ae553ab))
### Features
- Support for GPT-5-Pro highest reasoning model
https://github.com/BeehiveInnovations/zen-mcp-server/issues/275
([`a65485a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/a65485a1e52fc79739000426295a27d096f4c9d8))
## v7.4.0 (2025-10-06)
### Chores
- Sync version to config.py [skip ci]
([`76bf98e`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/76bf98e5cd972dabd3c79b25fcb9b9a717b23f6d))
### Features
- Improved prompt
([`b1e9963`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/b1e9963991a41dff082ec1dce5691c318f105e6d))
## v7.3.0 (2025-10-06)
### Chores
- Sync version to config.py [skip ci]
([`e7920d0`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/e7920d0ed16c0e6de9d1ccaa0b58d3fb5cbd7f2f))
### Documentation
- Fixed typo
([`3ab0aa8`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/3ab0aa8314ad5992bcb00de549a0fab2e522751d))
- Fixed typo
([`c17ce3c`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/c17ce3cf958d488b97fa7127942542ab514b58bd))
- Update apilookup.md
([`1918679`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/19186794edac4fce5523e671310aecff4cbfdc81))
- Update README.md
([`23c6c78`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/23c6c78bf152ede6e7b5f7b7770b12a8442845a3))
### Features
- Codex supports web-search natively but needs to be turned on, run-server script asks if the user
would like this done
([`97ba7e4`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/97ba7e44ce7e3fd874759514ed2f0738033fc801))
## v7.2.0 (2025-10-06)
### Chores
- Sync version to config.py [skip ci]
([`1854b1e`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/1854b1e26b705cda0dc3f4d733647f1454aa0352))
### Documentation
- Updated
([`bb57f71`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/bb57f719666ab6a586d835688ff8086282a5a0dc))
### Features
- New tool to perform apilookup (latest APIs / SDKs / language features etc)
https://github.com/BeehiveInnovations/zen-mcp-server/issues/204
([`5bea595`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/5bea59540f58b3c45044828c10f131aed104dd1c))
### Refactoring
- De-duplicate roles to avoid explosion when more CLIs get added
([`c42e9e9`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/c42e9e9c34d7ae4732e2e4fbed579b681a6d170d))
## v7.1.1 (2025-10-06)
### Bug Fixes
- Clink missing in toml
([`1ff77fa`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/1ff77faa800ad6c2dde49cad98dfa72035fe1c81))
### Chores
- Sync version to config.py [skip ci]
([`e02e78d`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/e02e78d903b35f4c01b8039f4157e97b38d3ec7b))
### Documentation
- Example for codex cli
([`344c42b`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/344c42bcbfb543bfd05cbc27fd5b419c76b77954))
- Example for codex cli
([`c3044de`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/c3044de7424e638dde5c8ec49adb6c3c7c5a60b2))
- Update README.md
([`2e719ae`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/2e719ae35e7979f7b83bd910867e79863a7f9ceb))
## v7.1.0 (2025-10-05)
### Chores
- Sync version to config.py [skip ci]
([`d54bfdd`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d54bfdd49797d076ec9cade44c56292a8089c744))
### Features
- Support for codex as external CLI
([`561e4aa`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/561e4aaaa8a89eb89c03985b9e7720cc98ef666c))
## v7.0.2 (2025-10-05)
### Chores
- Sync version to config.py [skip ci]
([`f2142a2`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/f2142a22ec50abc54b464eedd6b8239d20c509be))
## v7.0.1 (2025-10-05)
### Bug Fixes
- --yolo needed for running shell commands, documentation added
([`15ae3f2`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/15ae3f24babccf42f43be5028bf8c60c05a6beaf))
### Chores
- Sync version to config.py [skip ci]
([`bc4a27b`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/bc4a27b18a4a3f45afb22178e61ea0be4d6a273c))
### Documentation
- Updated intro
([`fb668c3`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/fb668c39b5f6e3dd37f7027f953f6004f258f2bf))
## v7.0.0 (2025-10-05)
### Chores
- Sync version to config.py [skip ci]
([`0d46976`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/0d46976a8aa85254e4dbe06f5e71161cd3b13938))
- Sync version to config.py [skip ci]
([`8296bf8`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/8296bf871c39597a904c70e7d98c72fcb4dc5a84))
### Documentation
- Instructions for OpenCode
([`bd66622`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/bd666227c8f7557483f7e24fb8544fc0456600dc))
- Updated intro
([`615873c`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/615873c3db2ecf5ce6475caa3445e1da9a2517bd))
### Features
- Huge update - Link another CLI (such as `gemini` directly from with Claude Code / Codex).
https://github.com/BeehiveInnovations/zen-mcp-server/issues/208
([`a2ccb48`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/a2ccb48e9a5080a75dbfd483b5f09fc719c887e5))
### Refactoring
- Fixed test
([`9c99b9b`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/9c99b9b35219f54db8d7be0958d4390a106631ae))
- Include file modification dates too
([`47973e9`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/47973e945efa2cdbdb8f3404d467d7f1abc62b0a))
## v6.1.0 (2025-10-04)
### Chores
- Sync version to config.py [skip ci]
([`18095d7`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/18095d7d398e4bf3d24c57a52c81ac619acb1b89))
### Documentation
- Updated intro
([`aa65394`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/aa6539472c4ddf1c3c1bac446fdee03e75e1cb50))
### Features
- Support for Qwen Code
([`fe9968b`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/fe9968b633d0312b82426e9ebddfe1d6515be3c5))
## v6.0.0 (2025-10-04)
### Chores
- Sync version to config.py [skip ci]
([`ae8749a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/ae8749ab37bdaa7e225b5219820adeb74ca9a552))
### Documentation
- Updated
([`e91ed2a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/e91ed2a924b1702edf9e1417479ac0dee0ca1553))
### Features
- Azure OpenAI / Azure AI Foundry support. Models should be defined in conf/azure_models.json (or a
custom path). See .env.example for environment variables or see readme.
https://github.com/BeehiveInnovations/zen-mcp-server/issues/265
([`ff9a07a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/ff9a07a37adf7a24aa87c63b3ba9db88bdff467b))
- Breaking change - OpenRouter models are now read from conf/openrouter_models.json while Custom /
Self-hosted models are read from conf/custom_models.json
([`ff9a07a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/ff9a07a37adf7a24aa87c63b3ba9db88bdff467b))
- OpenAI/compatible models (such as Azure OpenAI) can declare if they use the response API instead
via `use_openai_responses_api`
([`3824d13`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/3824d131618683572e9e8fffa6b25ccfabf4cf50))
- OpenRouter / Custom Models / Azure can separately also use custom config paths now (see
.env.example )
([`ff9a07a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/ff9a07a37adf7a24aa87c63b3ba9db88bdff467b))
### Refactoring
- Breaking change: `is_custom` property has been removed from model_capabilities.py (and thus
custom_models.json) given each models are now read from separate configuration files
([`ff9a07a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/ff9a07a37adf7a24aa87c63b3ba9db88bdff467b))
- Model registry class made abstract, OpenRouter / Custom Provider / Azure OpenAI now subclass these
([`ff9a07a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/ff9a07a37adf7a24aa87c63b3ba9db88bdff467b))
## v5.22.0 (2025-10-04)
### Bug Fixes
- CI test
([`bc93b53`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/bc93b5343bbd8657b95ab47c00a2cb99a68a009f))
- Listmodels to always honor restricted models
([`4015e91`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/4015e917ed32ae374ec6493b74993fcb34f4a971))
### Chores
- Sync version to config.py [skip ci]
([`054e34e`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/054e34e31ca5bee5a11c0e3e6537f58e8897c79c))
- Sync version to config.py [skip ci]
([`c0334d7`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/c0334d77922f1b05e3fd755851da112567fb9ae6))
### Features
- Centralized environment handling, ensures ZEN_MCP_FORCE_ENV_OVERRIDE is honored correctly
([`2c534ac`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/2c534ac06e4c6078b96781dfb55c5759b982afe8))
### Refactoring
- Don't retry on 429
([`d184024`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d18402482087f52b7bd07755c9304ed00ed20592))
- Improved retry logic and moved core logic to base class
([`f955100`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/f955100f3a82973ccd987607e1d8a1bbe07828c8))
- Removed subclass override when the base class should be resolving the model name
([`06d7701`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/06d7701cc3ee09732ab713fa9c7c004199154483))
## v5.21.0 (2025-10-03)
### Chores
- Sync version to config.py [skip ci]
([`ddb20a6`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/ddb20a6cdb8cdeee27c0aacb0b9c794283b5774c))
## v5.20.1 (2025-10-03)
### Chores
- Sync version to config.py [skip ci]
([`03addcf`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/03addcfa2d3aed5086fe4c94e8b9ae56229a93ae))
## v5.20.0 (2025-10-03)
### Chores
- Sync version to config.py [skip ci]
([`539bc72`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/539bc72f1ca2a2cadcccad02de1fd5fc22cd0415))
## v5.19.0 (2025-10-03)
### Bug Fixes
- Add GPT-5-Codex to Responses API routing and simplify comments
([`82b021d`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/82b021d75acc791e68c7afb35f6492f68cf02bec))
### Chores
- Sync version to config.py [skip ci]
([`8e32ef3`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/8e32ef33e3ce7ab2a9d7eb5c90fe5b93b12d5c26))
### Documentation
- Bumped defaults
([`95d98a9`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/95d98a9bc0a5bafadccb9f6d1e4eda97a0dd2ce7))
### Features
- Add GPT-5-Codex support with Responses API integration
([`f265342`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/f2653427ca829368e7145325d20a98df3ee6d6b4))
### Testing
- Cross tool memory recall, testing continuation via cassette recording
([`88493bd`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/88493bd357c6a12477c3160813100dae1bc46493))
## v5.18.3 (2025-10-03)
### Bug Fixes
- External model name now recorded properly in responses
([`d55130a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d55130a430401e106cd86f3e830b3d756472b7ff))
### Chores
- Sync version to config.py [skip ci]
([`5714e20`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/5714e2016405f7607b44d78f85081c7ccee706e5))
### Documentation
- Updated docs
([`b4e5090`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/b4e50901ba60c88137a29d00ecf99718582856d3))
### Refactoring
- Generic name for the CLI agent
([`e9b6947`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/e9b69476cd922c12931d62ccc3be9082bbbf6014))
- Generic name for the CLI agent
([`7a6fa0e`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/7a6fa0e77a8c4a682dc11c9bbb16bdaf86d9edf4))
- Generic name for the CLI agent
([`b692da2`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/b692da2a82facce7455b8f2ec0108e1db84c07c3))
- Generic name for the CLI agent
([`f76ebbf`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/f76ebbf280cc78ffcfe17cb4590aeaa231db8aa1))
- Generic name for the CLI agent
([`c05913a`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/c05913a09e53e195b9a108647c09c061ced19d17))
- Generic name for the CLI agent
([`0dfaa63`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/0dfaa6312ed95ac3d1ae0032334ae1286871b15e))
### Testing
- Fixed integration tests, removed magicmock
([`87ccb6b`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/87ccb6b25ba32a3cb9c4cc64fc0e96294f492c04))
## v5.18.2 (2025-10-02)
### Bug Fixes
- Https://github.com/BeehiveInnovations/zen-mcp-server/issues/194
([`8b3a286`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/8b3a2867fb83eccb3a8e8467e7e3fc5b8ebe1d0c))
### Chores
- Sync version to config.py [skip ci]
([`bf2196c`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/bf2196cdd58ae8d8d93597f2be69c798265d678f))
## v5.18.1 (2025-10-02)
### Chores
- Sync version to config.py [skip ci]
([`e434a26`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/e434a2614af82efd15de4dd94b2c30559c91414e))
## v5.18.0 (2025-10-02)
### Chores
- Sync version to config.py [skip ci]
([`e78fe35`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/e78fe35a1b64cc0ed89664440ef7c7b94495d7dc))
### Features
- Added `intelligence_score` to the model capabilities schema; a 1-20 number that can be specified
to influence the sort order of models presented to the CLI in `auto selection` mode
([`6cab9e5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/6cab9e56fc5373da5c11d4545bcb85371d4803a4))
## v5.17.4 (2025-10-02)
### Chores
- Sync version to config.py [skip ci]
([`a6c9b92`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/a6c9b9212c77852d9e9a8780f4bc3e53b3bfed2f))
## v5.17.3 (2025-10-02)
### Chores
- Sync version to config.py [skip ci]
([`722f6f8`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/722f6f86ae228206ce0094d109a3b20499d4e11a))
## v5.17.2 (2025-10-02)
### Chores
- Sync version to config.py [skip ci]
([`e47a7e8`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/e47a7e89d5bfad0bb0150cb3207f1a37dc91b170))
## v5.17.1 (2025-10-02)
### Bug Fixes
- Baseclass should return MODEL_CAPABILITIES
([`82a03ce`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/82a03ce63f28fece17bfc1d70bdb75aadec4c6bb))
### Chores
- Sync version to config.py [skip ci]
([`7ce66bd`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/7ce66bd9508865cef64dc30936e86e37c1a306d0))
### Documentation
- Document custom timeout values
([`218fbdf`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/218fbdf49cb90f2353f58bbaef567519dd876634))
### Refactoring
- Clean temperature inference
([`9c11ecc`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/9c11ecc4bf37562aa08dc3ecfa70f380e0ead357))
- Cleanup
([`6ec2033`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/6ec2033f34c74ad139036de83a34cf6d374db77b))
- Cleanup provider base class; cleanup shared responsibilities; cleanup public contract
([`693b84d`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/693b84db2b87271ac809abcf02100eee7405720b))
- Cleanup token counting
([`7fe9fc4`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/7fe9fc49f8e3cd92be4c45a6645d5d4ab3014091))
- Code cleanup
([`bb138e2`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/bb138e2fb552f837b0f9f466027580e1feb26f7c))
- Code cleanup
([`182aa62`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/182aa627dfba6c578089f83444882cdd2635a7e3))
- Moved image related code out of base provider into a separate utility
([`14a35af`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/14a35afa1d25408e62b968d9846be7bffaede327))
- Moved temperature method from base provider to model capabilities
([`6d237d0`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/6d237d09709f757a042baf655f47eb4ddfc078ad))
- Moved temperature method from base provider to model capabilities
([`f461cb4`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/f461cb451953f882bbde096a9ecf0584deb1dde8))
- Removed hard coded checks, use model capabilities instead
([`250545e`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/250545e34f8d4f8026bfebb3171f3c2bc40f4692))
- Removed hook from base class, turned into helper static method instead
([`2b10adc`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/2b10adcaf2b8741f0da5de84cc3483eae742a014))
- Removed method from provider, should use model capabilities instead
([`a254ff2`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/a254ff2220ba00ec30f5110c69a4841419917382))
- Renaming to reflect underlying type
([`1dc25f6`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/1dc25f6c3d4cdbf01f041cc424e3b5235c23175b))
## v5.17.0 (2025-10-02)
### Bug Fixes
- Use types.HttpOptions from module imports instead of local import
([`956e8a6`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/956e8a6927837f5c7f031a0db1dd0b0b5483c626))
### Chores
- Sync version to config.py [skip ci]
([`0836213`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/0836213071d0037d8a6d2e64d34ab5df79b8e684))
### Code Style
- Apply Black formatting to use double quotes
([`33ea896`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/33ea896c511764904bf2b6b22df823928f88a148))
### Features
- Add custom Gemini endpoint support
([`462bce0`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/462bce002e2141b342260969588e69f55f8bb46a))
### Refactoring
- Simplify Gemini provider initialization using kwargs dict
([`023940b`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/023940be3e38a7eedbc8bf8404a4a5afc50f8398))
## v5.16.0 (2025-10-01)
### Bug Fixes
- Resolve logging timing and import organization issues
([`d34c299`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d34c299f02a233af4f17bdcc848219bf07799723))
### Chores
- Sync version to config.py [skip ci]
([`b6c4bca`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/b6c4bca158e4cee1ae4abd08b7e55216ebffba2d))
### Code Style
- Fix ruff import sorting issue
([`4493a69`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/4493a693332e0532d04ad3634de2a2f5b1249b64))
### Features
- Add configurable environment variable override system
([`93ce698`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/93ce6987b6e7d8678ffa5ac51f5106a7a21ce67b))
## v5.15.0 (2025-10-01)
### Chores
- Sync version to config.py [skip ci]
([`b0fe956`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/b0fe956f8a50240507e0fc911f0800634c15e9f7))
### Features
- Depending on the number of tools in use, this change should save ~50% of overall tokens used.
fixes https://github.com/BeehiveInnovations/zen-mcp-server/issues/255 but also refactored
individual tools to instead encourage the agent to use the listmodels tool if needed.
([`d9449c7`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d9449c7bb607caff3f0454f210ddfc36256c738a))
### Performance Improvements
- Tweaks to schema descriptions, aiming to reduce token usage without performance degradation
([`cc8a4df`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/cc8a4dfd21b6f3dae4972a833b619e53c964693b))
### Refactoring
- Trimmed some prompts
([`f69ff03`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/f69ff03c4d10e606a1dfed2a167f3ba2e2236ba8))
## v5.14.1 (2025-10-01)
### Bug Fixes
- Https://github.com/BeehiveInnovations/zen-mcp-server/issues/258
([`696b45f`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/696b45f25e80faccb67034254cf9a8fc4c643dbd))
### Chores
- Sync version to config.py [skip ci]
([`692016c`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/692016c6205ed0a0c3d9e830482d88231aca2e31))
## v5.14.0 (2025-10-01)
### Chores
- Sync version to config.py [skip ci]
([`c0f822f`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/c0f822ffa23292d668f7b5dd3cb62e3f23fb29af))
### Features
- Add Claude Sonnet 4.5 and update alias configuration
([`95c4822`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/95c4822af2dc55f59c0e4ed9454673d6ca964731))
### Testing
- Update tests to match new Claude Sonnet 4.5 alias configuration
([`7efb409`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/7efb4094d4eb7db006340d3d9240b9113ac25cd3))
## v5.13.0 (2025-10-01)
### Bug Fixes
- Add sonnet alias for Claude Sonnet 4.1 to match opus/haiku pattern
([`dc96344`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/dc96344db043e087ee4f8bf264a79c51dc2e0b7a))
- Missing "optenai/" in name
([`7371ed6`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/7371ed6487b7d90a1b225a67dca2a38c1a52f2ad))
### Chores
- Sync version to config.py [skip ci]
([`b8479fc`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/b8479fc638083d6caa4bad6205e3d3fcab830aca))
### Features
- Add comprehensive GPT-5 series model support
([`4930824`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/493082405237e66a2f033481a5f8bf8293b0d553))
## v5.12.1 (2025-10-01)
### Bug Fixes
- Resolve consensus tool model_context parameter missing issue
([`9044b63`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/9044b63809113047fe678d659e4fcd175f58e87a))
### Chores
- Sync version to config.py [skip ci]
([`e3ebf4e`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/e3ebf4e94eba63acdc4df5a0b0493e44e3343dd1))
### Code Style
- Fix trailing whitespace in consensus.py
([`0760b31`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/0760b31f8a6d03c4bea3fd2a94dfbbfab0ad5079))
### Refactoring
- Optimize ModelContext creation in consensus tool
([`30a8952`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/30a8952fbccd22bebebd14eb2c8005404b79bcd6))
## v5.12.0 (2025-10-01)
### Bug Fixes
- Removed use_websearch; this parameter was confusing Codex. It started using this to prompt the
external model to perform searches! web-search is enabled by Claude / Codex etc by default and the
external agent can ask claude to search on its behalf.
([`cff6d89`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/cff6d8998f64b73265c4e31b2352462d6afe377f))
### Chores
- Sync version to config.py [skip ci]
([`28cabe0`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/28cabe0833661b0bab56d4227781ee2da332b00c))
### Features
- Implement semantic cassette matching for o3 models
([`70fa088`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/70fa088c32ac4e6153d5e7b30a3e32022be2f908))
## v5.11.2 (2025-10-01)
### Chores
- Sync version to config.py [skip ci]
([`4d6f1b4`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/4d6f1b41005dee428c955e33f04f8f9f6259e662))
## v5.11.1 (2025-10-01)
### Bug Fixes
- Remove duplicate OpenAI models from listmodels output
([`c29e762`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/c29e7623ace257eb45396cdf8c19e1659e29edb9))
### Chores
- Sync version to config.py [skip ci]
([`1209064`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/12090646ee83f2368311d595d87ae947e46ddacd))
### Testing
- Update OpenAI provider alias tests to match new format
([`d13700c`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d13700c14c7ee3d092302837cb1726d17bab1ab8))
## v5.11.0 (2025-08-26)
### Chores
- Sync version to config.py [skip ci]
([`9735469`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/973546990f2c45afa93f1aa6de33ff461ecf1a83))
### Features
- Codex CLI support
([`ce56d16`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/ce56d16240ddcc476145a512561efe5c66438f0d))
## v5.10.3 (2025-08-24)
### Bug Fixes
- Address test failures and PR feedback
([`6bd9d67`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/6bd9d6709acfb584ab30a0a4d6891cabdb6d3ccf))
- Resolve temperature handling issues for O3/custom models
([#245](https://github.com/BeehiveInnovations/zen-mcp-server/pull/245),
[`3b4fd88`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/3b4fd88d7e9a3f09fea616a10cb3e9d6c1a0d63b))
### Chores
- Sync version to config.py [skip ci]
([`d6e6808`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d6e6808be525192ab8388c0f01bc1bbd016fc23a))
## v5.10.2 (2025-08-24)
### Bug Fixes
- Another fix for https://github.com/BeehiveInnovations/zen-mcp-server/issues/251
([`a07036e`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/a07036e6805042895109c00f921c58a09caaa319))
### Chores
- Sync version to config.py [skip ci]
([`9da5c37`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/9da5c37809cbde19d0c7ffed273ae93ca883a016))
## v5.10.0 (2025-08-22)
### Chores
- Sync version to config.py [skip ci]
([`1254205`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/12542054a214022d3f515e53367f5bf3a77fb289))
### Features
- Refactored and tweaked model descriptions / schema to use fewer tokens at launch (average
reduction per field description: 60-80%) without sacrificing tool effectiveness
([`4b202f5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/4b202f5d1d24cea1394adab26a976188f847bd09))
## v5.9.0 (2025-08-21)
### Documentation
- Update instructions for precommit
([`90821b5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/90821b51ff653475d9fb1bc70b57951d963e8841))
### Features
- Refactored and improved codereview in line with precommit. Reviews are now either external
(default) or internal. Takes away anxiety and loss of tokens when Claude incorrectly decides to be
'confident' about its own changes and bungle things up.
([`80d21e5`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/80d21e57c0246762c0a306ede5b93d6aeb2315d8))
### Refactoring
- Minor prompt tweaks
([`d30c212`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/d30c212029c05b767d99b5391c1dd4cee78ef336))
## v5.8.6 (2025-08-20)
### Bug Fixes
- Escape backslashes in TOML regex pattern
([`1c973af`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/1c973afb002650b9bbee8a831b756bef848915a1))
- Establish version 5.8.6 and add version sync automation
([`90a4195`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/90a419538128b54fbd30da4b8a8088ac59f8c691))
- Restore proper version 5.8.6
([`340b58f`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/340b58f2e790b84c3736aa96df7f6f5f2d6a13c9))
### Chores
- Sync version to config.py [skip ci]
([`4f82f65`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/4f82f6500502b7b6ba41875a560c41f6a63b683b))
## v1.1.0 (2025-08-20)
### Features
- Improvements to precommit
([`2966dcf`](https://github.com/BeehiveInnovations/zen-mcp-server/commit/2966dcf2682feb7eef4073738d0c225a44ce0533))
## v1.0.0 (2025-08-20)
- Initial Release
```
--------------------------------------------------------------------------------
/tools/consensus.py:
--------------------------------------------------------------------------------
```python
"""
Consensus tool - Step-by-step multi-model consensus with expert analysis
This tool provides a structured workflow for gathering consensus from multiple models.
It guides the CLI agent through systematic steps where the CLI agent first provides its own analysis,
then consults each requested model one by one, and finally synthesizes all perspectives.
Key features:
- Step-by-step consensus workflow with progress tracking
- The CLI agent's initial neutral analysis followed by model-specific consultations
- Context-aware file embedding
- Support for stance-based analysis (for/against/neutral)
- Final synthesis combining all perspectives
"""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING, Any
from pydantic import Field, model_validator
if TYPE_CHECKING:
from tools.models import ToolModelCategory
from mcp.types import TextContent
from config import TEMPERATURE_ANALYTICAL
from systemprompts import CONSENSUS_PROMPT
from tools.shared.base_models import ConsolidatedFindings, WorkflowRequest
from utils.conversation_memory import MAX_CONVERSATION_TURNS, create_thread, get_thread
from .workflow.base import WorkflowTool
logger = logging.getLogger(__name__)
# Tool-specific field descriptions for consensus workflow
CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS = {
"step": (
"Consensus prompt. Step 1: write the exact proposal/question every model will see (use 'Evaluate…', not meta commentary). "
"Steps 2+: capture internal notes about the latest model response—these notes are NOT sent to other models."
),
"step_number": "Current step index (starts at 1). Step 1 is your analysis; steps 2+ handle each model response.",
"total_steps": "Total steps = number of models consulted plus the final synthesis step.",
"next_step_required": "True if more model consultations remain; set false when ready to synthesize.",
"findings": (
"Step 1: your independent analysis for later synthesis (not shared with other models). Steps 2+: summarize the newest model response."
),
"relevant_files": "Optional supporting files that help the consensus analysis. Must be absolute full, non-abbreviated paths.",
"models": (
"User-specified list of models to consult (provide at least two entries). "
"Each entry may include model, stance (for/against/neutral), and stance_prompt. "
"Each (model, stance) pair must be unique, e.g. [{'model':'gpt5','stance':'for'}, {'model':'pro','stance':'against'}]."
),
"current_model_index": "0-based index of the next model to consult (managed internally).",
"model_responses": "Internal log of responses gathered so far.",
"images": "Optional absolute image paths or base64 references that add helpful visual context.",
}
class ConsensusRequest(WorkflowRequest):
"""Request model for consensus workflow steps"""
# Required fields for each step
step: str = Field(..., description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["step"])
step_number: int = Field(..., description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
total_steps: int = Field(..., description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
next_step_required: bool = Field(..., description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
# Investigation tracking fields
findings: str = Field(..., description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
confidence: str = Field(default="exploring", exclude=True, description="Not used")
# Consensus-specific fields (only needed in step 1)
models: list[dict] | None = Field(None, description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["models"])
relevant_files: list[str] | None = Field(
default_factory=list,
description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
)
# Internal tracking fields
current_model_index: int | None = Field(
0,
description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["current_model_index"],
)
model_responses: list[dict] | None = Field(
default_factory=list,
description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["model_responses"],
)
# Optional images for visual debugging
images: list[str] | None = Field(default=None, description=CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["images"])
# Override inherited fields to exclude them from schema
temperature: float | None = Field(default=None, exclude=True)
thinking_mode: str | None = Field(default=None, exclude=True)
# Not used in consensus workflow
files_checked: list[str] | None = Field(default_factory=list, exclude=True)
relevant_context: list[str] | None = Field(default_factory=list, exclude=True)
issues_found: list[dict] | None = Field(default_factory=list, exclude=True)
hypothesis: str | None = Field(None, exclude=True)
@model_validator(mode="after")
def validate_step_one_requirements(self):
"""Ensure step 1 has required models field and unique model+stance combinations."""
if self.step_number == 1:
if not self.models:
raise ValueError("Step 1 requires 'models' field to specify which models to consult")
# Check for unique model + stance combinations
seen_combinations = set()
for model_config in self.models:
model_name = model_config.get("model", "")
stance = model_config.get("stance", "neutral")
combination = f"{model_name}:{stance}"
if combination in seen_combinations:
raise ValueError(
f"Duplicate model + stance combination found: {model_name} with stance '{stance}'. "
f"Each model + stance combination must be unique."
)
seen_combinations.add(combination)
return self
class ConsensusTool(WorkflowTool):
"""
Consensus workflow tool for step-by-step multi-model consensus gathering.
This tool implements a structured consensus workflow where the CLI agent first provides
its own neutral analysis, then consults each specified model individually,
and finally synthesizes all perspectives into a unified recommendation.
"""
def __init__(self):
super().__init__()
self.initial_prompt: str | None = None
self.original_proposal: str | None = None # Store the original proposal separately
self.models_to_consult: list[dict] = []
self.accumulated_responses: list[dict] = []
self._current_arguments: dict[str, Any] = {}
def get_name(self) -> str:
return "consensus"
def get_description(self) -> str:
return (
"Builds multi-model consensus through systematic analysis and structured debate. "
"Use for complex decisions, architectural choices, feature proposals, and technology evaluations. "
"Consults multiple models with different stances to synthesize comprehensive recommendations."
)
def get_system_prompt(self) -> str:
# For the CLI agent's initial analysis, use a neutral version of the consensus prompt
return CONSENSUS_PROMPT.replace(
"{stance_prompt}",
"""BALANCED PERSPECTIVE
Provide objective analysis considering both positive and negative aspects. However, if there is overwhelming evidence
that the proposal clearly leans toward being exceptionally good or particularly problematic, you MUST accurately
reflect this reality. Being "balanced" means being truthful about the weight of evidence, not artificially creating
50/50 splits when the reality is 90/10.
Your analysis should:
- Present all significant pros and cons discovered
- Weight them according to actual impact and likelihood
- If evidence strongly favors one conclusion, clearly state this
- Provide proportional coverage based on the strength of arguments
- Help the questioner see the true balance of considerations
Remember: Artificial balance that misrepresents reality is not helpful. True balance means accurate representation
of the evidence, even when it strongly points in one direction.""",
)
def get_default_temperature(self) -> float:
return TEMPERATURE_ANALYTICAL
def get_model_category(self) -> ToolModelCategory:
"""Consensus workflow requires extended reasoning"""
from tools.models import ToolModelCategory
return ToolModelCategory.EXTENDED_REASONING
def get_workflow_request_model(self):
"""Return the consensus workflow-specific request model."""
return ConsensusRequest
def get_input_schema(self) -> dict[str, Any]:
"""Generate input schema for consensus workflow."""
from .workflow.schema_builders import WorkflowSchemaBuilder
# Consensus tool-specific field definitions
consensus_field_overrides = {
# Override standard workflow fields that need consensus-specific descriptions
"step": {
"type": "string",
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["step"],
},
"step_number": {
"type": "integer",
"minimum": 1,
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
},
"total_steps": {
"type": "integer",
"minimum": 1,
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
},
"next_step_required": {
"type": "boolean",
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
},
"findings": {
"type": "string",
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
},
"relevant_files": {
"type": "array",
"items": {"type": "string"},
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
},
# consensus-specific fields (not in base workflow)
"models": {
"type": "array",
"items": {
"type": "object",
"properties": {
"model": {"type": "string"},
"stance": {"type": "string", "enum": ["for", "against", "neutral"], "default": "neutral"},
"stance_prompt": {"type": "string"},
},
"required": ["model"],
},
"description": (
"User-specified roster of models to consult (provide at least two entries). "
+ CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["models"]
),
"minItems": 2,
},
"current_model_index": {
"type": "integer",
"minimum": 0,
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["current_model_index"],
},
"model_responses": {
"type": "array",
"items": {"type": "object"},
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["model_responses"],
},
"images": {
"type": "array",
"items": {"type": "string"},
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["images"],
},
}
# Provide guidance on available models similar to single-model tools
model_description = (
"When the user names a model, you MUST use that exact value or report the "
"provider error—never swap in another option. Use the `listmodels` tool for the full roster."
)
summaries, total, restricted = self._get_ranked_model_summaries()
remainder = max(0, total - len(summaries))
if summaries:
label = "Allowed models" if restricted else "Top models"
top_line = "; ".join(summaries)
if remainder > 0:
top_line = f"{label}: {top_line}; +{remainder} more via `listmodels`."
else:
top_line = f"{label}: {top_line}."
model_description = f"{model_description} {top_line}"
else:
model_description = (
f"{model_description} No models detected—configure provider credentials or use the `listmodels` tool "
"to inspect availability."
)
restriction_note = self._get_restriction_note()
if restriction_note and (remainder > 0 or not summaries):
model_description = f"{model_description} {restriction_note}."
existing_models_desc = consensus_field_overrides["models"]["description"]
consensus_field_overrides["models"]["description"] = f"{existing_models_desc} {model_description}"
# Define excluded fields for consensus workflow
excluded_workflow_fields = [
"files_checked", # Not used in consensus workflow
"relevant_context", # Not used in consensus workflow
"issues_found", # Not used in consensus workflow
"hypothesis", # Not used in consensus workflow
"confidence", # Not used in consensus workflow
]
excluded_common_fields = [
"model", # Consensus uses 'models' field instead
"temperature", # Not used in consensus workflow
"thinking_mode", # Not used in consensus workflow
]
requires_model = self.requires_model()
model_field_schema = self.get_model_field_schema() if requires_model else None
auto_mode = self.is_effective_auto_mode() if requires_model else False
return WorkflowSchemaBuilder.build_schema(
tool_specific_fields=consensus_field_overrides,
model_field_schema=model_field_schema,
auto_mode=auto_mode,
tool_name=self.get_name(),
excluded_workflow_fields=excluded_workflow_fields,
excluded_common_fields=excluded_common_fields,
require_model=requires_model,
)
def get_required_actions(
self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
) -> list[str]: # noqa: ARG002
"""Define required actions for each consensus phase.
Now includes request parameter for continuation-aware decisions.
Note: confidence parameter is kept for compatibility with base class but not used.
"""
if step_number == 1:
# CLI Agent's initial analysis
return [
"You've provided your initial analysis. The tool will now consult other models.",
"Wait for the next step to receive the first model's response.",
]
elif step_number < total_steps - 1:
# Processing individual model responses
return [
"Review the model response provided in this step",
"Note key agreements and disagreements with previous analyses",
"Wait for the next model's response",
]
else:
# Ready for final synthesis
return [
"All models have been consulted",
"Synthesize all perspectives into a comprehensive recommendation",
"Identify key points of agreement and disagreement",
"Provide clear, actionable guidance based on the consensus",
]
def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
"""Consensus workflow doesn't use traditional expert analysis - it consults models step by step."""
return False
def prepare_expert_analysis_context(self, consolidated_findings) -> str:
"""Not used in consensus workflow."""
return ""
def requires_expert_analysis(self) -> bool:
"""Consensus workflow handles its own model consultations."""
return False
def requires_model(self) -> bool:
"""
Consensus tool doesn't require model resolution at the MCP boundary.
Uses it's own set of models
Returns:
bool: False
"""
return False
# Hook method overrides for consensus-specific behavior
def prepare_step_data(self, request) -> dict:
"""Prepare consensus-specific step data."""
step_data = {
"step": request.step,
"step_number": request.step_number,
"findings": request.findings,
"files_checked": [], # Not used
"relevant_files": request.relevant_files or [],
"relevant_context": [], # Not used
"issues_found": [], # Not used
"confidence": "exploring", # Not used, kept for compatibility
"hypothesis": None, # Not used
"images": request.images or [], # Now used for visual context
}
return step_data
async def handle_work_completion(self, response_data: dict, request, arguments: dict) -> dict: # noqa: ARG002
"""Handle consensus workflow completion - no expert analysis, just final synthesis."""
response_data["consensus_complete"] = True
response_data["status"] = "consensus_workflow_complete"
# Prepare final synthesis data
response_data["complete_consensus"] = {
"initial_prompt": self.original_proposal if self.original_proposal else self.initial_prompt,
"models_consulted": [m["model"] + ":" + m.get("stance", "neutral") for m in self.accumulated_responses],
"total_responses": len(self.accumulated_responses),
"consensus_confidence": "high", # Consensus complete
}
response_data["next_steps"] = (
"CONSENSUS GATHERING IS COMPLETE. You MUST now synthesize all perspectives and present:\n"
"1. Key points of AGREEMENT across models\n"
"2. Key points of DISAGREEMENT and why they differ\n"
"3. Your final consolidated recommendation\n"
"4. Specific, actionable next steps for implementation\n"
"5. Critical risks or concerns that must be addressed"
)
return response_data
def handle_work_continuation(self, response_data: dict, request) -> dict:
"""Handle continuation between consensus steps."""
current_idx = request.current_model_index or 0
if request.step_number == 1:
# After CLI Agent's initial analysis, prepare to consult first model
response_data["status"] = "consulting_models"
response_data["next_model"] = self.models_to_consult[0] if self.models_to_consult else None
response_data["next_steps"] = (
"Your initial analysis is complete. The tool will now consult the specified models."
)
elif current_idx < len(self.models_to_consult):
next_model = self.models_to_consult[current_idx]
response_data["status"] = "consulting_next_model"
response_data["next_model"] = next_model
response_data["models_remaining"] = len(self.models_to_consult) - current_idx
response_data["next_steps"] = f"Model consultation in progress. Next: {next_model['model']}"
else:
response_data["status"] = "ready_for_synthesis"
response_data["next_steps"] = "All models consulted. Ready for final synthesis."
return response_data
async def execute_workflow(self, arguments: dict[str, Any]) -> list:
"""Override execute_workflow to handle model consultations between steps."""
# Store arguments
self._current_arguments = arguments
# Validate request
request = self.get_workflow_request_model()(**arguments)
# Resolve existing continuation_id or create a new one on first step
continuation_id = request.continuation_id
if request.step_number == 1:
if not continuation_id:
clean_args = {k: v for k, v in arguments.items() if k not in ["_model_context", "_resolved_model_name"]}
continuation_id = create_thread(self.get_name(), clean_args)
request.continuation_id = continuation_id
arguments["continuation_id"] = continuation_id
self.work_history = []
self.consolidated_findings = ConsolidatedFindings()
# Store the original proposal from step 1 - this is what all models should see
self.store_initial_issue(request.step)
self.initial_request = request.step
self.models_to_consult = request.models or []
self.accumulated_responses = []
# Set total steps: len(models) (each step includes consultation + response)
request.total_steps = len(self.models_to_consult)
# For all steps (1 through total_steps), consult the corresponding model
if request.step_number <= request.total_steps:
# Calculate which model to consult for this step
model_idx = request.step_number - 1 # 0-based index
if model_idx < len(self.models_to_consult):
# Track workflow state for conversation memory
step_data = self.prepare_step_data(request)
self.work_history.append(step_data)
self._update_consolidated_findings(step_data)
# Consult the model for this step
model_response = await self._consult_model(self.models_to_consult[model_idx], request)
# Add to accumulated responses
self.accumulated_responses.append(model_response)
# Include the model response in the step data
response_data = {
"status": "model_consulted",
"step_number": request.step_number,
"total_steps": request.total_steps,
"model_consulted": model_response["model"],
"model_stance": model_response.get("stance", "neutral"),
"model_response": model_response,
"current_model_index": model_idx + 1,
"next_step_required": request.step_number < request.total_steps,
}
# Add CLAI Agent's analysis to step 1
if request.step_number == 1:
response_data["agent_analysis"] = {
"initial_analysis": request.step,
"findings": request.findings,
}
response_data["status"] = "analysis_and_first_model_consulted"
# Check if this is the final step
if request.step_number == request.total_steps:
response_data["status"] = "consensus_workflow_complete"
response_data["consensus_complete"] = True
response_data["complete_consensus"] = {
"initial_prompt": self.original_proposal if self.original_proposal else self.initial_prompt,
"models_consulted": [
f"{m['model']}:{m.get('stance', 'neutral')}" for m in self.accumulated_responses
],
"total_responses": len(self.accumulated_responses),
"consensus_confidence": "high",
}
response_data["next_steps"] = (
"CONSENSUS GATHERING IS COMPLETE. Synthesize all perspectives and present:\n"
"1. Key points of AGREEMENT across models\n"
"2. Key points of DISAGREEMENT and why they differ\n"
"3. Your final consolidated recommendation\n"
"4. Specific, actionable next steps for implementation\n"
"5. Critical risks or concerns that must be addressed"
)
else:
response_data["next_steps"] = (
f"Model {model_response['model']} has provided its {model_response.get('stance', 'neutral')} "
f"perspective. Please analyze this response and call {self.get_name()} again with:\n"
f"- step_number: {request.step_number + 1}\n"
f"- findings: Summarize key points from this model's response"
)
# Add continuation information and workflow customization
response_data = self.customize_workflow_response(response_data, request)
# Ensure consensus-specific metadata is attached
self._add_workflow_metadata(response_data, arguments)
if continuation_id:
self.store_conversation_turn(continuation_id, response_data, request)
continuation_offer = self._build_continuation_offer(continuation_id)
if continuation_offer:
response_data["continuation_offer"] = continuation_offer
return [TextContent(type="text", text=json.dumps(response_data, indent=2, ensure_ascii=False))]
# Otherwise, use standard workflow execution
return await super().execute_workflow(arguments)
def _build_continuation_offer(self, continuation_id: str) -> dict[str, Any] | None:
"""Create a continuation offer without exposing prior model responses."""
try:
from tools.models import ContinuationOffer
thread = get_thread(continuation_id)
if thread and thread.turns:
remaining_turns = max(0, MAX_CONVERSATION_TURNS - len(thread.turns))
else:
remaining_turns = MAX_CONVERSATION_TURNS - 1
# Provide a neutral note specific to consensus workflow
note = (
f"Consensus workflow can continue for {remaining_turns} more exchanges."
if remaining_turns > 0
else "Consensus workflow continuation limit reached."
)
continuation_offer = ContinuationOffer(
continuation_id=continuation_id,
note=note,
remaining_turns=remaining_turns,
)
return continuation_offer.model_dump()
except Exception:
return None
async def _consult_model(self, model_config: dict, request) -> dict:
"""Consult a single model and return its response."""
try:
# Import and create ModelContext once at the beginning
from utils.model_context import ModelContext
# Get the provider for this model
model_name = model_config["model"]
provider = self.get_model_provider(model_name)
# Create model context once and reuse for both file processing and temperature validation
model_context = ModelContext(model_name=model_name)
# Prepare the prompt with any relevant files
# Use continuation_id=None for blinded consensus - each model should only see
# original prompt + files, not conversation history or other model responses
# CRITICAL: Use the original proposal from step 1, NOT what's in request.step for steps 2+!
# Steps 2+ contain summaries/notes that must NEVER be sent to other models
prompt = self.original_proposal if self.original_proposal else self.initial_prompt
if request.relevant_files:
file_content, _ = self._prepare_file_content_for_prompt(
request.relevant_files,
None, # Use None instead of request.continuation_id for blinded consensus
"Context files",
model_context=model_context,
)
if file_content:
prompt = f"{prompt}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ==="
# Get stance-specific system prompt
stance = model_config.get("stance", "neutral")
stance_prompt = model_config.get("stance_prompt")
system_prompt = self._get_stance_enhanced_prompt(stance, stance_prompt)
# Validate temperature against model constraints (respects supports_temperature)
validated_temperature, temp_warnings = self.validate_and_correct_temperature(
self.get_default_temperature(), model_context
)
# Log any temperature corrections
for warning in temp_warnings:
logger.warning(warning)
# Call the model with validated temperature
response = provider.generate_content(
prompt=prompt,
model_name=model_name,
system_prompt=system_prompt,
temperature=validated_temperature,
thinking_mode="medium",
images=request.images if request.images else None,
)
return {
"model": model_name,
"stance": stance,
"status": "success",
"verdict": response.content,
"metadata": {
"provider": provider.get_provider_type().value,
"model_name": model_name,
},
}
except Exception as e:
logger.exception("Error consulting model %s", model_config)
return {
"model": model_config.get("model", "unknown"),
"stance": model_config.get("stance", "neutral"),
"status": "error",
"error": str(e),
}
def _get_stance_enhanced_prompt(self, stance: str, custom_stance_prompt: str | None = None) -> str:
"""Get the system prompt with stance injection."""
base_prompt = CONSENSUS_PROMPT
if custom_stance_prompt:
return base_prompt.replace("{stance_prompt}", custom_stance_prompt)
stance_prompts = {
"for": """SUPPORTIVE PERSPECTIVE WITH INTEGRITY
You are tasked with advocating FOR this proposal, but with CRITICAL GUARDRAILS:
MANDATORY ETHICAL CONSTRAINTS:
- This is NOT a debate for entertainment. You MUST act in good faith and in the best interest of the questioner
- You MUST think deeply about whether supporting this idea is safe, sound, and passes essential requirements
- You MUST be direct and unequivocal in saying "this is a bad idea" when it truly is
- There must be at least ONE COMPELLING reason to be optimistic, otherwise DO NOT support it
WHEN TO REFUSE SUPPORT (MUST OVERRIDE STANCE):
- If the idea is fundamentally harmful to users, project, or stakeholders
- If implementation would violate security, privacy, or ethical standards
- If the proposal is technically infeasible within realistic constraints
- If costs/risks dramatically outweigh any potential benefits
YOUR SUPPORTIVE ANALYSIS SHOULD:
- Identify genuine strengths and opportunities
- Propose solutions to overcome legitimate challenges
- Highlight synergies with existing systems
- Suggest optimizations that enhance value
- Present realistic implementation pathways
Remember: Being "for" means finding the BEST possible version of the idea IF it has merit, not blindly supporting bad ideas.""",
"against": """CRITICAL PERSPECTIVE WITH RESPONSIBILITY
You are tasked with critiquing this proposal, but with ESSENTIAL BOUNDARIES:
MANDATORY FAIRNESS CONSTRAINTS:
- You MUST NOT oppose genuinely excellent, common-sense ideas just to be contrarian
- You MUST acknowledge when a proposal is fundamentally sound and well-conceived
- You CANNOT give harmful advice or recommend against beneficial changes
- If the idea is outstanding, say so clearly while offering constructive refinements
WHEN TO MODERATE CRITICISM (MUST OVERRIDE STANCE):
- If the proposal addresses critical user needs effectively
- If it follows established best practices with good reason
- If benefits clearly and substantially outweigh risks
- If it's the obvious right solution to the problem
YOUR CRITICAL ANALYSIS SHOULD:
- Identify legitimate risks and failure modes
- Point out overlooked complexities
- Suggest more efficient alternatives
- Highlight potential negative consequences
- Question assumptions that may be flawed
Remember: Being "against" means rigorous scrutiny to ensure quality, not undermining good ideas that deserve support.""",
"neutral": """BALANCED PERSPECTIVE
Provide objective analysis considering both positive and negative aspects. However, if there is overwhelming evidence
that the proposal clearly leans toward being exceptionally good or particularly problematic, you MUST accurately
reflect this reality. Being "balanced" means being truthful about the weight of evidence, not artificially creating
50/50 splits when the reality is 90/10.
Your analysis should:
- Present all significant pros and cons discovered
- Weight them according to actual impact and likelihood
- If evidence strongly favors one conclusion, clearly state this
- Provide proportional coverage based on the strength of arguments
- Help the questioner see the true balance of considerations
Remember: Artificial balance that misrepresents reality is not helpful. True balance means accurate representation
of the evidence, even when it strongly points in one direction.""",
}
stance_prompt = stance_prompts.get(stance, stance_prompts["neutral"])
return base_prompt.replace("{stance_prompt}", stance_prompt)
def customize_workflow_response(self, response_data: dict, request) -> dict:
"""Customize response for consensus workflow."""
# Store model responses in the response for tracking
if self.accumulated_responses:
response_data["accumulated_responses"] = self.accumulated_responses
# Add consensus-specific fields
if request.step_number == 1:
response_data["consensus_workflow_status"] = "initial_analysis_complete"
elif request.step_number < request.total_steps - 1:
response_data["consensus_workflow_status"] = "consulting_models"
else:
response_data["consensus_workflow_status"] = "ready_for_synthesis"
# Customize metadata for consensus workflow
self._customize_consensus_metadata(response_data, request)
return response_data
def _customize_consensus_metadata(self, response_data: dict, request) -> None:
"""
Customize metadata for consensus workflow to accurately reflect multi-model nature.
The default workflow metadata shows the model running Agent's analysis steps,
but consensus is a multi-model tool that consults different models. We need
to provide accurate metadata that reflects this.
"""
if "metadata" not in response_data:
response_data["metadata"] = {}
metadata = response_data["metadata"]
# Always preserve tool_name
metadata["tool_name"] = self.get_name()
if request.step_number == request.total_steps:
# Final step - show comprehensive consensus metadata
models_consulted = []
if self.models_to_consult:
models_consulted = [f"{m['model']}:{m.get('stance', 'neutral')}" for m in self.models_to_consult]
metadata.update(
{
"workflow_type": "multi_model_consensus",
"models_consulted": models_consulted,
"consensus_complete": True,
"total_models": len(self.models_to_consult) if self.models_to_consult else 0,
}
)
# Remove the misleading single model metadata
metadata.pop("model_used", None)
metadata.pop("provider_used", None)
else:
# Intermediate steps - show consensus workflow in progress
models_to_consult = []
if self.models_to_consult:
models_to_consult = [f"{m['model']}:{m.get('stance', 'neutral')}" for m in self.models_to_consult]
metadata.update(
{
"workflow_type": "multi_model_consensus",
"models_to_consult": models_to_consult,
"consultation_step": request.step_number,
"total_consultation_steps": request.total_steps,
}
)
# Remove the misleading single model metadata that shows Agent's execution model
# instead of the models being consulted
metadata.pop("model_used", None)
metadata.pop("provider_used", None)
def _add_workflow_metadata(self, response_data: dict, arguments: dict[str, Any]) -> None:
"""
Override workflow metadata addition for consensus tool.
The consensus tool doesn't use single model metadata because it's a multi-model
workflow. Instead, we provide consensus-specific metadata that accurately
reflects the models being consulted.
"""
# Initialize metadata if not present
if "metadata" not in response_data:
response_data["metadata"] = {}
# Add basic tool metadata
response_data["metadata"]["tool_name"] = self.get_name()
# The consensus-specific metadata is already added by _customize_consensus_metadata
# which is called from customize_workflow_response. We don't add the standard
# single-model metadata (model_used, provider_used) because it's misleading
# for a multi-model consensus workflow.
logger.debug(
f"[CONSENSUS_METADATA] {self.get_name()}: Using consensus-specific metadata instead of single-model metadata"
)
def store_initial_issue(self, step_description: str):
"""Store initial prompt for model consultations."""
self.original_proposal = step_description
self.initial_prompt = step_description # Keep for backward compatibility
# Required abstract methods from BaseTool
def get_request_model(self):
"""Return the consensus workflow-specific request model."""
return ConsensusRequest
async def prepare_prompt(self, request) -> str: # noqa: ARG002
"""Not used - workflow tools use execute_workflow()."""
return "" # Workflow tools use execute_workflow() directly
```
--------------------------------------------------------------------------------
/tools/precommit.py:
--------------------------------------------------------------------------------
```python
"""
Precommit Workflow tool - Step-by-step pre-commit validation with expert analysis
This tool provides a structured workflow for comprehensive pre-commit validation.
It guides the CLI agent through systematic investigation steps with forced pauses between each step
to ensure thorough code examination, git change analysis, and issue detection before proceeding.
The tool supports finding updates and expert analysis integration.
Key features:
- Step-by-step pre-commit investigation workflow with progress tracking
- Context-aware file embedding (references during investigation, full content for analysis)
- Automatic git repository discovery and change analysis
- Expert analysis integration with external models (default)
- Support for multiple repositories and change types
- Configurable validation type (external with expert model or internal only)
"""
import logging
from typing import TYPE_CHECKING, Any, Literal, Optional
from pydantic import Field, model_validator
if TYPE_CHECKING:
from tools.models import ToolModelCategory
from config import TEMPERATURE_ANALYTICAL
from systemprompts import PRECOMMIT_PROMPT
from tools.shared.base_models import WorkflowRequest
from .workflow.base import WorkflowTool
logger = logging.getLogger(__name__)
# Tool-specific field descriptions for precommit workflow
PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS = {
"step": (
"Step 1: outline how you'll validate the git changes. Later steps: report findings. Review diffs and impacts, use `relevant_files`, and avoid pasting large snippets."
),
"step_number": "Current pre-commit step number (starts at 1).",
"total_steps": (
"Planned number of validation steps. External validation: use at most three (analysis → follow-ups → summary). Internal validation: a single step. Honour these limits when resuming via continuation_id."
),
"next_step_required": (
"True to continue with another step, False when validation is complete. "
"CRITICAL: If total_steps>=3 or when `precommit_type = external`, set to True until the final step. "
"When continuation_id is provided: Follow the same validation rules based on precommit_type."
),
"findings": "Record git diff insights, risks, missing tests, security concerns, and positives; update previous notes as you go.",
"files_checked": "Absolute paths for every file examined, including ruled-out candidates.",
"relevant_files": "Absolute paths of files involved in the change or validation (code, configs, tests, docs). Must be absolute full non-abbreviated paths.",
"relevant_context": "Key functions/methods touched by the change (e.g. 'Class.method', 'function_name').",
"issues_found": "List issues with severity (critical/high/medium/low) plus descriptions (bugs, security, performance, coverage).",
"precommit_type": "'external' (default, triggers expert model) or 'internal' (local-only validation).",
"images": "Optional absolute paths to screenshots or diagrams that aid validation.",
"path": "Absolute path to the repository root. Required in step 1.",
"compare_to": "Optional git ref (branch/tag/commit) to diff against; falls back to staged/unstaged changes.",
"include_staged": "Whether to inspect staged changes (ignored when `compare_to` is set).",
"include_unstaged": "Whether to inspect unstaged changes (ignored when `compare_to` is set).",
"focus_on": "Optional emphasis areas such as security, performance, or test coverage.",
"severity_filter": "Lowest severity to include when reporting issues.",
}
class PrecommitRequest(WorkflowRequest):
"""Request model for precommit workflow investigation steps"""
# Required fields for each investigation step
step: str = Field(..., description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["step"])
step_number: int = Field(..., description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
total_steps: int = Field(..., description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
next_step_required: bool = Field(..., description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
# Investigation tracking fields
findings: str = Field(..., description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
files_checked: list[str] = Field(
default_factory=list, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
)
relevant_files: list[str] = Field(
default_factory=list, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
)
relevant_context: list[str] = Field(
default_factory=list, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
)
issues_found: list[dict] = Field(
default_factory=list, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["issues_found"]
)
precommit_type: Optional[Literal["external", "internal"]] = Field(
"external", description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["precommit_type"]
)
# Optional images for visual validation
images: Optional[list[str]] = Field(default=None, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["images"])
# Precommit-specific fields (only used in step 1 to initialize)
# Required for step 1, validated in model_validator
path: Optional[str] = Field(None, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["path"])
compare_to: Optional[str] = Field(None, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["compare_to"])
include_staged: Optional[bool] = Field(True, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["include_staged"])
include_unstaged: Optional[bool] = Field(
True, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["include_unstaged"]
)
focus_on: Optional[str] = Field(None, description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["focus_on"])
severity_filter: Optional[Literal["critical", "high", "medium", "low", "all"]] = Field(
"all", description=PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["severity_filter"]
)
# Override inherited fields to exclude them from schema (except model which needs to be available)
temperature: Optional[float] = Field(default=None, exclude=True)
thinking_mode: Optional[str] = Field(default=None, exclude=True)
@model_validator(mode="after")
def validate_step_one_requirements(self):
"""Ensure step 1 has required path field."""
if self.step_number == 1 and not self.path:
raise ValueError("Step 1 requires 'path' field to specify git repository location")
return self
class PrecommitTool(WorkflowTool):
"""
Precommit workflow tool for step-by-step pre-commit validation and expert analysis.
This tool implements a structured pre-commit validation workflow that guides users through
methodical investigation steps, ensuring thorough change examination, issue identification,
and validation before reaching conclusions. It supports complex validation scenarios including
multi-repository analysis, security review, performance validation, and integration testing.
"""
def __init__(self):
super().__init__()
self.initial_request = None
self.git_config = {}
def get_name(self) -> str:
return "precommit"
def get_description(self) -> str:
return (
"Validates git changes and repository state before committing with systematic analysis. "
"Use for multi-repository validation, security review, change impact assessment, and completeness verification. "
"Guides through structured investigation with expert analysis."
)
def get_system_prompt(self) -> str:
return PRECOMMIT_PROMPT
def get_default_temperature(self) -> float:
return TEMPERATURE_ANALYTICAL
def get_model_category(self) -> "ToolModelCategory":
"""Precommit requires thorough analysis and reasoning"""
from tools.models import ToolModelCategory
return ToolModelCategory.EXTENDED_REASONING
def get_workflow_request_model(self):
"""Return the precommit workflow-specific request model."""
return PrecommitRequest
def get_input_schema(self) -> dict[str, Any]:
"""Generate input schema using WorkflowSchemaBuilder with precommit-specific overrides."""
from .workflow.schema_builders import WorkflowSchemaBuilder
# Precommit workflow-specific field overrides
precommit_field_overrides = {
"step": {
"type": "string",
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["step"],
},
"step_number": {
"type": "integer",
"minimum": 1,
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
},
"total_steps": {
"type": "integer",
"minimum": 3,
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
},
"next_step_required": {
"type": "boolean",
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
},
"findings": {
"type": "string",
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
},
"files_checked": {
"type": "array",
"items": {"type": "string"},
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"],
},
"relevant_files": {
"type": "array",
"items": {"type": "string"},
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
},
"precommit_type": {
"type": "string",
"enum": ["external", "internal"],
"default": "external",
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["precommit_type"],
},
"issues_found": {
"type": "array",
"items": {"type": "object"},
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["issues_found"],
},
"images": {
"type": "array",
"items": {"type": "string"},
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["images"],
},
# Precommit-specific fields (for step 1)
"path": {
"type": "string",
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["path"],
},
"compare_to": {
"type": "string",
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["compare_to"],
},
"include_staged": {
"type": "boolean",
"default": True,
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["include_staged"],
},
"include_unstaged": {
"type": "boolean",
"default": True,
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["include_unstaged"],
},
"focus_on": {
"type": "string",
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["focus_on"],
},
"severity_filter": {
"type": "string",
"enum": ["critical", "high", "medium", "low", "all"],
"default": "all",
"description": PRECOMMIT_WORKFLOW_FIELD_DESCRIPTIONS["severity_filter"],
},
}
# Use WorkflowSchemaBuilder with precommit-specific tool fields
return WorkflowSchemaBuilder.build_schema(
tool_specific_fields=precommit_field_overrides,
model_field_schema=self.get_model_field_schema(),
auto_mode=self.is_effective_auto_mode(),
tool_name=self.get_name(),
)
def get_required_actions(
self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
) -> list[str]:
"""Define required actions for each investigation phase.
Now includes request parameter for continuation-aware decisions.
"""
# Check for continuation - fast track mode
if request:
continuation_id = self.get_request_continuation_id(request)
precommit_type = self.get_precommit_type(request)
if continuation_id and precommit_type == "external":
if step_number == 1:
return [
"Execute git status to see all changes",
"Execute git diff --cached for staged changes (exclude binary files)",
"Execute git diff for unstaged changes (exclude binary files)",
"List any relevant untracked files as well.",
]
else:
return ["Complete validation and proceed to expert analysis with changeset file"]
# Extract counts for normal flow
findings_count = len(findings.split("\n")) if findings else 0
issues_count = self.get_consolidated_issues_count()
if step_number == 1:
# Initial pre-commit investigation tasks
return [
"Search for all git repositories in the specified path using appropriate tools",
"Check git status to identify staged, unstaged, and untracked changes as required",
"Execute git status to see all changes",
"Execute git diff --cached for staged changes (exclude binary files)",
"Execute git diff for unstaged changes (exclude binary files)",
"List any relevant untracked files as well.",
"Understand what functionality was added, modified, or removed",
"Identify the scope and intent of the changes being committed",
"CRITICAL: You are on step 1 - you MUST set next_step_required=True and continue to at least step 3 minimum",
]
elif step_number == 2:
# Need deeper investigation
actions = [
"Examine the specific files you've identified as changed or relevant",
"Analyze the logic and implementation details of modifications",
"Check for potential issues: bugs, security risks, performance problems",
"Verify that changes align with good coding practices and patterns",
"Look for missing tests, documentation, or configuration updates",
]
# Add step validation reminder
if request and request.total_steps >= 3:
actions.append(
f"CRITICAL: You are on step 2 of {request.total_steps} minimum steps - you MUST set next_step_required=True unless this is the final step"
)
return actions
elif step_number >= 2 and (findings_count > 2 or issues_count > 0):
# Close to completion - need final verification
actions = [
"Verify all identified issues have been properly documented",
"Check for any missed dependencies or related files that need review",
"Confirm the completeness and correctness of your assessment",
"Ensure all security, performance, and quality concerns are captured",
"Validate that your findings are comprehensive and actionable",
]
# Add step validation reminder
if request and request.total_steps >= 3 and step_number < request.total_steps:
actions.append(
f"CRITICAL: You are on step {step_number} of {request.total_steps} minimum steps - set next_step_required=True to continue"
)
elif request and request.total_steps >= 3 and step_number >= request.total_steps:
actions.append(
f"You are on final step {step_number} - you may now set next_step_required=False to complete"
)
return actions
else:
# General investigation needed
actions = [
"Continue examining the changes and their potential impact",
"Gather more evidence using appropriate investigation tools",
"Test your assumptions about the changes and their effects",
"Look for patterns that confirm or refute your current assessment",
]
# Add step validation reminder for all other cases
if request and request.total_steps >= 3:
if step_number < request.total_steps:
actions.append(
f"CRITICAL: You are on step {step_number} of {request.total_steps} minimum steps - set next_step_required=True to continue"
)
else:
actions.append(
f"You are on final step {step_number} - you may now set next_step_required=False to complete"
)
return actions
def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
"""
Decide when to call external model based on investigation completeness.
For continuations with external type, always proceed with expert analysis.
"""
# Check if user requested to skip assistant model
if request and not self.get_request_use_assistant_model(request):
return False
# For continuations with external type, always proceed with expert analysis
continuation_id = self.get_request_continuation_id(request)
if continuation_id and request.precommit_type == "external":
return True # Always perform expert analysis for external continuations
# Check if we have meaningful investigation data
return (
len(consolidated_findings.relevant_files) > 0
or len(consolidated_findings.findings) >= 2
or len(consolidated_findings.issues_found) > 0
)
def prepare_expert_analysis_context(self, consolidated_findings) -> str:
"""Prepare context for external model call for final pre-commit validation."""
context_parts = [
f"=== PRE-COMMIT ANALYSIS REQUEST ===\\n{self.initial_request or 'Pre-commit validation initiated'}\\n=== END REQUEST ==="
]
# Add investigation summary
investigation_summary = self._build_precommit_summary(consolidated_findings)
context_parts.append(
f"\\n=== AGENT'S PRE-COMMIT INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
)
# Add git configuration context if available
if self.git_config:
config_text = "\\n".join(f"- {key}: {value}" for key, value in self.git_config.items())
context_parts.append(f"\\n=== GIT CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")
# Add relevant methods/functions if available
if consolidated_findings.relevant_context:
methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")
# Add issues found evolution if available
if consolidated_findings.issues_found:
issues_text = "\\n".join(
f"[{issue.get('severity', 'unknown').upper()}] {issue.get('description', 'No description')}"
for issue in consolidated_findings.issues_found
)
context_parts.append(f"\\n=== ISSUES IDENTIFIED ===\\n{issues_text}\\n=== END ISSUES ===")
# Add assessment evolution if available
if consolidated_findings.hypotheses:
assessments_text = "\\n".join(
f"Step {h['step']}: {h['hypothesis']}" for h in consolidated_findings.hypotheses
)
context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")
# Add images if available
if consolidated_findings.images:
images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
context_parts.append(
f"\\n=== VISUAL VALIDATION INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
)
return "\\n".join(context_parts)
def _build_precommit_summary(self, consolidated_findings) -> str:
"""Prepare a comprehensive summary of the pre-commit investigation."""
summary_parts = [
"=== SYSTEMATIC PRE-COMMIT INVESTIGATION SUMMARY ===",
f"Total steps: {len(consolidated_findings.findings)}",
f"Files examined: {len(consolidated_findings.files_checked)}",
f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
f"Issues identified: {len(consolidated_findings.issues_found)}",
"",
"=== INVESTIGATION PROGRESSION ===",
]
for finding in consolidated_findings.findings:
summary_parts.append(finding)
return "\\n".join(summary_parts)
def should_include_files_in_expert_prompt(self) -> bool:
"""Include files in expert analysis for comprehensive validation."""
return True
def should_embed_system_prompt(self) -> bool:
"""Embed system prompt in expert analysis for proper context."""
return True
def get_expert_thinking_mode(self) -> str:
"""Use high thinking mode for thorough pre-commit analysis."""
return "high"
def get_expert_analysis_instruction(self) -> str:
"""Get specific instruction for pre-commit expert analysis."""
return (
"Please provide comprehensive pre-commit validation based on the investigation findings. "
"Focus on identifying any remaining issues, validating the completeness of the analysis, "
"and providing final recommendations for commit readiness."
)
# Hook method overrides for precommit-specific behavior
def prepare_step_data(self, request) -> dict:
"""
Map precommit-specific fields for internal processing.
"""
step_data = {
"step": request.step,
"step_number": request.step_number,
"findings": request.findings,
"files_checked": request.files_checked,
"relevant_files": request.relevant_files,
"relevant_context": request.relevant_context,
"issues_found": request.issues_found,
"precommit_type": request.precommit_type,
"hypothesis": request.findings, # Map findings to hypothesis for compatibility
"images": request.images or [],
"confidence": "high", # Dummy value for workflow_mixin compatibility
}
return step_data
def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
"""
Precommit workflow skips expert analysis only when precommit_type is "internal".
Default is always to use expert analysis (external).
For continuations with external type, always perform expert analysis immediately.
"""
# If it's a continuation and precommit_type is external, don't skip
continuation_id = self.get_request_continuation_id(request)
if continuation_id and request.precommit_type != "internal":
return False # Always do expert analysis for external continuations
return request.precommit_type == "internal" and not request.next_step_required
def store_initial_issue(self, step_description: str):
"""Store initial request for expert analysis."""
self.initial_request = step_description
# Override inheritance hooks for precommit-specific behavior
def get_completion_status(self) -> str:
"""Precommit tools use precommit-specific status."""
return "validation_complete_ready_for_commit"
def get_completion_data_key(self) -> str:
"""Precommit uses 'complete_validation' key."""
return "complete_validation"
def get_final_analysis_from_request(self, request):
"""Precommit tools use 'findings' field."""
return request.findings
def get_precommit_type(self, request) -> str:
"""Get precommit type from request. Hook method for clean inheritance."""
try:
return request.precommit_type or "external"
except AttributeError:
return "external" # Default to external validation
def get_consolidated_issues_count(self) -> int:
"""Get count of issues from consolidated findings. Hook method for clean access."""
try:
return len(self.consolidated_findings.issues_found)
except AttributeError:
return 0
def get_completion_message(self) -> str:
"""Precommit-specific completion message."""
return (
"Pre-commit validation complete. You have identified all issues "
"and verified commit readiness. MANDATORY: Present the user with the complete validation results "
"and IMMEDIATELY proceed with commit if no critical issues found, or provide specific fix guidance "
"if issues need resolution. Focus on actionable next steps."
)
def get_skip_reason(self) -> str:
"""Precommit-specific skip reason."""
return (
"Completed comprehensive pre-commit validation with internal analysis only (no external model validation)"
)
def get_skip_expert_analysis_status(self) -> str:
"""Precommit-specific expert analysis skip status."""
return "skipped_due_to_internal_analysis_type"
def prepare_work_summary(self) -> str:
"""Precommit-specific work summary."""
return self._build_precommit_summary(self.consolidated_findings)
def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
"""
Precommit-specific completion message.
Args:
expert_analysis_used: True if expert analysis was successfully executed
"""
base_message = (
"PRE-COMMIT VALIDATION IS COMPLETE. You may delete any `zen_precommit.changeset` created. You MUST now summarize "
"and present ALL validation results, identified issues with their severity levels, and exact commit recommendations. "
"Clearly state whether the changes are ready for commit or require fixes first. Provide concrete, actionable guidance for "
"any issues that need resolution—make it easy for a developer to understand exactly what needs to be "
"done before committing."
)
# Add expert analysis guidance only when expert analysis was actually used
if expert_analysis_used:
expert_guidance = self.get_expert_analysis_guidance()
if expert_guidance:
return f"{base_message}\n\n{expert_guidance}"
return base_message
def get_expert_analysis_guidance(self) -> str:
"""
Get additional guidance for handling expert analysis results in pre-commit context.
Returns:
Additional guidance text for validating and using expert analysis findings
"""
return (
"IMPORTANT: Expert analysis has been provided above. You MUST carefully review "
"the expert's validation findings and security assessments. Cross-reference the "
"expert's analysis with your own investigation to ensure all critical issues are "
"addressed. Pay special attention to any security vulnerabilities, performance "
"concerns, or architectural issues identified by the expert review."
)
def get_step_guidance_message(self, request) -> str:
"""
Precommit-specific step guidance with detailed investigation instructions.
"""
step_guidance = self.get_precommit_step_guidance(request.step_number, request)
return step_guidance["next_steps"]
def get_precommit_step_guidance(self, step_number: int, request) -> dict[str, Any]:
"""
Provide step-specific guidance for precommit workflow.
Uses get_required_actions to determine what needs to be done,
then formats those actions into appropriate guidance messages.
"""
# Get the required actions from the single source of truth
required_actions = self.get_required_actions(
step_number,
request.precommit_type or "external", # Using precommit_type as confidence proxy
request.findings or "",
request.total_steps,
request, # Pass request for continuation-aware decisions
)
# Check if this is a continuation to provide context-aware guidance
continuation_id = self.get_request_continuation_id(request)
is_external_continuation = continuation_id and request.precommit_type == "external"
is_internal_continuation = continuation_id and request.precommit_type == "internal"
# Format the guidance based on step number and continuation status
if step_number == 1:
if is_external_continuation:
# Fast-track mode for external continuations
next_steps = (
"You are on step 1 of MAXIMUM 2 steps. CRITICAL: Gather and save the complete git changeset NOW. "
"MANDATORY ACTIONS:\\n"
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ "\\n\\nMANDATORY: The changeset may be large. You MUST save the required changeset as a 'zen_precommit.changeset' file "
"(replacing any existing one) in your work directory and include the FULL absolute path in relevant_files (exclude any "
"binary files). ONLY include the code changes, no extra commentary."
"Set next_step_required=True and step_number=2 for the next call."
)
elif is_internal_continuation:
# Internal validation mode
next_steps = (
"Continuing previous conversation with internal validation only. The analysis will build "
"upon the prior findings without external model validation. REQUIRED ACTIONS:\\n"
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
)
else:
# Normal flow for new validations
next_steps = (
f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first investigate "
f"the git repositories and changes using appropriate tools. CRITICAL AWARENESS: You need to:\\n"
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\\n\\nOnly call {self.get_name()} again AFTER completing your investigation. "
f"When you call {self.get_name()} next time, use step_number: {step_number + 1} "
f"and report specific files examined, changes analyzed, and validation findings discovered."
)
elif step_number == 2:
# CRITICAL: Check if violating minimum step requirement
if (
request.total_steps >= 3
and request.step_number < request.total_steps
and not request.next_step_required
):
next_steps = (
f"ERROR: You set total_steps={request.total_steps} but next_step_required=False on step {request.step_number}. "
f"This violates the minimum step requirement. You MUST set next_step_required=True until you reach the final step. "
f"Call {self.get_name()} again with next_step_required=True and continue your investigation."
)
elif is_external_continuation or (not request.next_step_required and request.precommit_type == "external"):
# Fast-track completion or about to complete - ensure changeset is saved
next_steps = (
"Proceeding immediately to expert analysis. "
f"MANDATORY: call {self.get_name()} tool immediately again, and set next_step_required=False to "
f"trigger external validation NOW. "
f"MANDATORY: Include the entire changeset! The changeset may be large. You MUST save the required "
f"changeset as a 'zen_precommit.changeset' file (replacing any existing one) in your work directory "
f"and include the FULL absolute path in relevant_files so the expert can access the complete changeset. "
f"ONLY include the code changes, no extra commentary."
)
else:
# Normal flow - deeper analysis needed
next_steps = (
f"STOP! Do NOT call {self.get_name()} again yet. You are on step 2 of {request.total_steps} minimum required steps. "
f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\\n\\nRemember: You MUST set next_step_required=True until step {request.total_steps}. "
+ f"Only call {self.get_name()} again with step_number: {step_number + 1} AFTER completing these validations."
)
elif step_number >= 3:
if not request.next_step_required and request.precommit_type == "external":
# About to complete - ensure changeset is saved
next_steps = (
"Completing validation and proceeding to expert analysis. "
"MANDATORY: Save the complete git changeset as a 'zen_precommit.changeset' file "
"in your work directory and include the FULL absolute path in relevant_files."
)
else:
# Later steps - final verification
next_steps = (
f"WAIT! Your validation needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\\n\\nREMEMBER: Ensure you have identified all potential issues and verified commit readiness. "
f"Document findings with specific file references and issue descriptions, then call {self.get_name()} "
f"with step_number: {step_number + 1}."
)
else:
# Fallback for any other case - check minimum step violation first
if (
request.total_steps >= 3
and request.step_number < request.total_steps
and not request.next_step_required
):
next_steps = (
f"ERROR: You set total_steps={request.total_steps} but next_step_required=False on step {request.step_number}. "
f"This violates the minimum step requirement. You MUST set next_step_required=True until step {request.total_steps}."
)
elif not request.next_step_required and request.precommit_type == "external":
next_steps = (
"Completing validation. "
"MANDATORY: Save complete git changeset as 'zen_precommit.changeset' file and include path in relevant_files, "
"excluding any binary files."
)
else:
next_steps = (
f"PAUSE VALIDATION. Before calling {self.get_name()} step {step_number + 1}, you MUST examine more code and changes. "
+ "Required: "
+ ", ".join(required_actions[:2])
+ ". "
+ f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
f"NEW evidence from actual change analysis, not just theories. NO recursive {self.get_name()} calls "
f"without investigation work!"
)
return {"next_steps": next_steps}
def customize_workflow_response(self, response_data: dict, request) -> dict:
"""
Customize response to match precommit workflow format.
"""
# Store initial request on first step
if request.step_number == 1:
self.initial_request = request.step
# Store git configuration for expert analysis
if request.path:
self.git_config = {
"path": request.path,
"compare_to": request.compare_to,
"include_staged": request.include_staged,
"include_unstaged": request.include_unstaged,
"severity_filter": request.severity_filter,
}
# Convert generic status names to precommit-specific ones
tool_name = self.get_name()
status_mapping = {
f"{tool_name}_in_progress": "validation_in_progress",
f"pause_for_{tool_name}": "pause_for_validation",
f"{tool_name}_required": "validation_required",
f"{tool_name}_complete": "validation_complete",
}
if response_data["status"] in status_mapping:
response_data["status"] = status_mapping[response_data["status"]]
# Rename status field to match precommit workflow
if f"{tool_name}_status" in response_data:
response_data["validation_status"] = response_data.pop(f"{tool_name}_status")
# Add precommit-specific status fields
response_data["validation_status"]["issues_identified"] = len(self.consolidated_findings.issues_found)
response_data["validation_status"]["precommit_type"] = request.precommit_type or "external"
# Map complete_precommitworkflow to complete_validation
if f"complete_{tool_name}" in response_data:
response_data["complete_validation"] = response_data.pop(f"complete_{tool_name}")
# Map the completion flag to match precommit workflow
if f"{tool_name}_complete" in response_data:
response_data["validation_complete"] = response_data.pop(f"{tool_name}_complete")
return response_data
# Required abstract methods from BaseTool
def get_request_model(self):
"""Return the precommit workflow-specific request model."""
return PrecommitRequest
async def prepare_prompt(self, request) -> str:
"""Not used - workflow tools use execute_workflow()."""
return "" # Workflow tools use execute_workflow() directly
```
--------------------------------------------------------------------------------
/simulator_tests/test_thinkdeep_validation.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
ThinkDeep Tool Validation Test
Tests the thinkdeep tool's capabilities using the new workflow architecture.
This validates that the workflow-based deep thinking implementation provides
step-by-step thinking with expert analysis integration.
"""
import json
from typing import Optional
from .conversation_base_test import ConversationBaseTest
class ThinkDeepWorkflowValidationTest(ConversationBaseTest):
"""Test thinkdeep tool with new workflow architecture"""
@property
def test_name(self) -> str:
return "thinkdeep_validation"
@property
def test_description(self) -> str:
return "ThinkDeep workflow tool validation with new workflow architecture"
def run_test(self) -> bool:
"""Test thinkdeep tool capabilities"""
# Set up the test environment
self.setUp()
try:
self.logger.info("Test: ThinkDeepWorkflow tool validation (new architecture)")
# Create test files for thinking context
self._create_thinking_context()
# Test 1: Single thinking session with multiple steps
if not self._test_single_thinking_session():
return False
# Test 2: Thinking flow that requires refocusing
if not self._test_thinking_refocus_flow():
return False
# Test 3: Complete thinking with expert analysis
if not self._test_complete_thinking_with_analysis():
return False
# Test 4: Certain confidence behavior
if not self._test_certain_confidence():
return False
# Test 5: Context-aware file embedding
if not self._test_context_aware_file_embedding():
return False
# Test 6: Multi-step file context optimization
if not self._test_multi_step_file_context():
return False
self.logger.info(" ✅ All thinkdeep validation tests passed")
return True
except Exception as e:
self.logger.error(f"ThinkDeep validation test failed: {e}")
return False
def _create_thinking_context(self):
"""Create test files for deep thinking context"""
# Create architecture document
architecture_doc = """# Microservices Architecture Design
## Current System
- Monolithic application with 500k LOC
- Single PostgreSQL database
- Peak load: 10k requests/minute
- Team size: 25 developers
- Deployment: Manual, 2-week cycles
## Proposed Migration to Microservices
### Benefits
- Independent deployments
- Technology diversity
- Team autonomy
- Scalability improvements
### Challenges
- Data consistency
- Network latency
- Operational complexity
- Transaction management
### Key Considerations
- Service boundaries
- Data migration strategy
- Communication patterns
- Monitoring and observability
"""
# Create requirements document
requirements_doc = """# Migration Requirements
## Business Goals
- Reduce deployment cycle from 2 weeks to daily
- Support 50k requests/minute by Q4
- Enable A/B testing capabilities
- Improve system resilience
## Technical Constraints
- Zero downtime migration
- Maintain data consistency
- Budget: $200k for infrastructure
- Timeline: 6 months
- Existing team skills: Java, Spring Boot
## Success Metrics
- Deployment frequency: 10x improvement
- System availability: 99.9%
- Response time: <200ms p95
- Developer productivity: 30% improvement
"""
# Create performance analysis
performance_analysis = """# Current Performance Analysis
## Database Bottlenecks
- Connection pool exhaustion during peak hours
- Complex joins affecting query performance
- Lock contention on user_sessions table
- Read replica lag causing data inconsistency
## Application Issues
- Memory leaks in background processing
- Thread pool starvation
- Cache invalidation storms
- Session clustering problems
## Infrastructure Limits
- Single server deployment
- Manual scaling processes
- Limited monitoring capabilities
- No circuit breaker patterns
"""
# Create test files
self.architecture_file = self.create_additional_test_file("architecture_design.md", architecture_doc)
self.requirements_file = self.create_additional_test_file("migration_requirements.md", requirements_doc)
self.performance_file = self.create_additional_test_file("performance_analysis.md", performance_analysis)
self.logger.info(" ✅ Created thinking context files:")
self.logger.info(f" - {self.architecture_file}")
self.logger.info(f" - {self.requirements_file}")
self.logger.info(f" - {self.performance_file}")
def _test_single_thinking_session(self) -> bool:
"""Test a complete thinking session with multiple steps"""
try:
self.logger.info(" 1.1: Testing single thinking session")
# Step 1: Start thinking analysis
self.logger.info(" 1.1.1: Step 1 - Initial thinking analysis")
response1, continuation_id = self.call_mcp_tool(
"thinkdeep",
{
"step": "I need to think deeply about the microservices migration strategy. Let me analyze the trade-offs, risks, and implementation approach systematically.",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"findings": "Initial analysis shows significant architectural complexity but potential for major scalability and development velocity improvements. Need to carefully consider migration strategy and service boundaries.",
"files_checked": [self.architecture_file, self.requirements_file],
"relevant_files": [self.architecture_file, self.requirements_file],
"relevant_context": ["microservices_migration", "service_boundaries", "data_consistency"],
"confidence": "low",
"problem_context": "Enterprise application migration from monolith to microservices",
"focus_areas": ["architecture", "scalability", "risk_assessment"],
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to get initial thinking response")
return False
# Parse and validate JSON response
response1_data = self._parse_thinkdeep_response(response1)
if not response1_data:
return False
# Validate step 1 response structure - expect pause_for_thinkdeep for next_step_required=True
if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_thinkdeep"):
return False
self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
# Step 2: Deep analysis
self.logger.info(" 1.1.2: Step 2 - Deep analysis of alternatives")
response2, _ = self.call_mcp_tool(
"thinkdeep",
{
"step": "Analyzing different migration approaches: strangler fig pattern vs big bang vs gradual extraction. Each has different risk profiles and timelines.",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"findings": "Strangler fig pattern emerges as best approach: lower risk, incremental value delivery, team learning curve management. Key insight: start with read-only services to minimize data consistency issues.",
"files_checked": [self.architecture_file, self.requirements_file, self.performance_file],
"relevant_files": [self.architecture_file, self.performance_file],
"relevant_context": ["strangler_fig_pattern", "service_extraction", "risk_mitigation"],
"issues_found": [
{"severity": "high", "description": "Data consistency challenges during migration"},
{"severity": "medium", "description": "Team skill gap in distributed systems"},
],
"confidence": "medium",
"continuation_id": continuation_id,
},
)
if not response2:
self.logger.error("Failed to continue thinking to step 2")
return False
response2_data = self._parse_thinkdeep_response(response2)
if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_thinkdeep"):
return False
# Check thinking status tracking
thinking_status = response2_data.get("thinking_status", {})
if thinking_status.get("files_checked", 0) < 3:
self.logger.error("Files checked count not properly tracked")
return False
if thinking_status.get("thinking_confidence") != "medium":
self.logger.error("Confidence level not properly tracked")
return False
self.logger.info(" ✅ Step 2 successful with proper tracking")
# Store continuation_id for next test
self.thinking_continuation_id = continuation_id
return True
except Exception as e:
self.logger.error(f"Single thinking session test failed: {e}")
return False
def _test_thinking_refocus_flow(self) -> bool:
"""Test thinking workflow that shifts direction mid-analysis"""
try:
self.logger.info(" 1.2: Testing thinking refocus workflow")
# Start a new thinking session for testing refocus behaviour
self.logger.info(" 1.2.1: Start thinking session for refocus test")
response1, continuation_id = self.call_mcp_tool(
"thinkdeep",
{
"step": "Thinking about optimal database architecture for the new microservices",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"findings": "Initial thought: each service should have its own database for independence",
"files_checked": [self.architecture_file],
"relevant_files": [self.architecture_file],
"relevant_context": ["database_per_service", "data_independence"],
"confidence": "low",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start refocus test thinking")
return False
# Step 2: Initial direction
self.logger.info(" 1.2.2: Step 2 - Initial analysis direction")
response2, _ = self.call_mcp_tool(
"thinkdeep",
{
"step": "Exploring database-per-service pattern implementation",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"findings": "Database-per-service creates significant complexity for transactions and reporting",
"files_checked": [self.architecture_file, self.performance_file],
"relevant_files": [self.performance_file],
"relevant_context": ["database_per_service", "transaction_management"],
"issues_found": [
{"severity": "high", "description": "Cross-service transactions become complex"},
{"severity": "medium", "description": "Reporting queries span multiple databases"},
],
"confidence": "low",
"continuation_id": continuation_id,
},
)
if not response2:
self.logger.error("Failed to continue to step 2")
return False
# Step 3: Backtrack and revise approach
self.logger.info(" 1.2.3: Step 3 - Backtrack and revise thinking")
response3, _ = self.call_mcp_tool(
"thinkdeep",
{
"step": "Refocusing - maybe shared database with service-specific schemas is better initially. Then gradually extract databases as services mature.",
"step_number": 3,
"total_steps": 4,
"next_step_required": True,
"findings": "Hybrid approach: shared database with bounded contexts, then gradual extraction. This reduces initial complexity while preserving migration path to full service independence.",
"files_checked": [self.architecture_file, self.requirements_file],
"relevant_files": [self.architecture_file, self.requirements_file],
"relevant_context": ["shared_database", "bounded_contexts", "gradual_extraction"],
"confidence": "medium",
"continuation_id": continuation_id,
},
)
if not response3:
self.logger.error("Failed to refocus")
return False
response3_data = self._parse_thinkdeep_response(response3)
if not self._validate_step_response(response3_data, 3, 4, True, "pause_for_thinkdeep"):
return False
self.logger.info(" ✅ Refocus working correctly")
return True
except Exception as e:
self.logger.error(f"Refocus test failed: {e}")
return False
def _test_complete_thinking_with_analysis(self) -> bool:
"""Test complete thinking ending with expert analysis"""
try:
self.logger.info(" 1.3: Testing complete thinking with expert analysis")
# Use the continuation from first test
continuation_id = getattr(self, "thinking_continuation_id", None)
if not continuation_id:
# Start fresh if no continuation available
self.logger.info(" 1.3.0: Starting fresh thinking session")
response0, continuation_id = self.call_mcp_tool(
"thinkdeep",
{
"step": "Thinking about the complete microservices migration strategy",
"step_number": 1,
"total_steps": 2,
"next_step_required": True,
"findings": "Comprehensive analysis of migration approaches and risks",
"files_checked": [self.architecture_file, self.requirements_file],
"relevant_files": [self.architecture_file, self.requirements_file],
"relevant_context": ["migration_strategy", "risk_assessment"],
},
)
if not response0 or not continuation_id:
self.logger.error("Failed to start fresh thinking session")
return False
# Final step - trigger expert analysis
self.logger.info(" 1.3.1: Final step - complete thinking analysis")
response_final, _ = self.call_mcp_tool(
"thinkdeep",
{
"step": "Thinking analysis complete. I've thoroughly considered the migration strategy, risks, and implementation approach.",
"step_number": 2,
"total_steps": 2,
"next_step_required": False, # Final step - triggers expert analysis
"findings": "Comprehensive migration strategy: strangler fig pattern with shared database initially, gradual service extraction based on business value and technical feasibility. Key success factors: team training, monitoring infrastructure, and incremental rollout.",
"files_checked": [self.architecture_file, self.requirements_file, self.performance_file],
"relevant_files": [self.architecture_file, self.requirements_file, self.performance_file],
"relevant_context": ["strangler_fig", "migration_strategy", "risk_mitigation", "team_readiness"],
"issues_found": [
{"severity": "medium", "description": "Team needs distributed systems training"},
{"severity": "low", "description": "Monitoring tools need upgrade"},
],
"confidence": "high",
"continuation_id": continuation_id,
"model": "flash", # Use flash for expert analysis
},
)
if not response_final:
self.logger.error("Failed to complete thinking")
return False
response_final_data = self._parse_thinkdeep_response(response_final)
if not response_final_data:
return False
# Validate final response structure - accept both expert analysis and special statuses
valid_final_statuses = ["calling_expert_analysis", "files_required_to_continue"]
if response_final_data.get("status") not in valid_final_statuses:
self.logger.error(
f"Expected status in {valid_final_statuses}, got '{response_final_data.get('status')}'"
)
return False
if not response_final_data.get("thinking_complete"):
self.logger.error("Expected thinking_complete=true for final step")
return False
# Check for expert analysis or special status content
if response_final_data.get("status") == "calling_expert_analysis":
if "expert_analysis" not in response_final_data:
self.logger.error("Missing expert_analysis in final response")
return False
expert_analysis = response_final_data.get("expert_analysis", {})
else:
# For special statuses like files_required_to_continue, analysis may be in content
expert_analysis = response_final_data.get("content", "{}")
if isinstance(expert_analysis, str):
try:
expert_analysis = json.loads(expert_analysis)
except (json.JSONDecodeError, TypeError):
expert_analysis = {"analysis": expert_analysis}
# Check for expected analysis content (checking common patterns)
analysis_text = json.dumps(expert_analysis, ensure_ascii=False).lower()
# Look for thinking analysis validation
thinking_indicators = ["migration", "strategy", "microservices", "risk", "approach", "implementation"]
found_indicators = sum(1 for indicator in thinking_indicators if indicator in analysis_text)
if found_indicators >= 3:
self.logger.info(" ✅ Expert analysis validated the thinking correctly")
else:
self.logger.warning(
f" ⚠️ Expert analysis may not have fully validated the thinking (found {found_indicators}/6 indicators)"
)
# Check complete thinking summary
if "complete_thinking" not in response_final_data:
self.logger.error("Missing complete_thinking in final response")
return False
complete_thinking = response_final_data["complete_thinking"]
if not complete_thinking.get("relevant_context"):
self.logger.error("Missing relevant context in complete thinking")
return False
if "migration_strategy" not in complete_thinking["relevant_context"]:
self.logger.error("Expected context not found in thinking summary")
return False
self.logger.info(" ✅ Complete thinking with expert analysis successful")
return True
except Exception as e:
self.logger.error(f"Complete thinking test failed: {e}")
return False
def _test_certain_confidence(self) -> bool:
"""Test certain confidence behavior - should skip expert analysis"""
try:
self.logger.info(" 1.4: Testing certain confidence behavior")
# Test certain confidence - should skip expert analysis
self.logger.info(" 1.4.1: Certain confidence thinking")
response_certain, _ = self.call_mcp_tool(
"thinkdeep",
{
"step": "I have thoroughly analyzed all aspects of the migration strategy with complete certainty.",
"step_number": 1,
"total_steps": 1,
"next_step_required": False, # Final step
"findings": "Definitive conclusion: strangler fig pattern with phased database extraction is the optimal approach. Risk mitigation through team training and robust monitoring. Timeline: 6 months with monthly service extractions.",
"files_checked": [self.architecture_file, self.requirements_file, self.performance_file],
"relevant_files": [self.architecture_file, self.requirements_file],
"relevant_context": ["migration_complete_strategy", "implementation_plan"],
"confidence": "certain", # This should skip expert analysis
"model": "flash",
},
)
if not response_certain:
self.logger.error("Failed to test certain confidence")
return False
response_certain_data = self._parse_thinkdeep_response(response_certain)
if not response_certain_data:
return False
# Validate certain confidence response - should skip expert analysis
if response_certain_data.get("status") != "deep_thinking_complete_ready_for_implementation":
self.logger.error(
f"Expected status 'deep_thinking_complete_ready_for_implementation', got '{response_certain_data.get('status')}'"
)
return False
if not response_certain_data.get("skip_expert_analysis"):
self.logger.error("Expected skip_expert_analysis=true for certain confidence")
return False
expert_analysis = response_certain_data.get("expert_analysis", {})
if expert_analysis.get("status") != "skipped_due_to_certain_thinking_confidence":
self.logger.error("Expert analysis should be skipped for certain confidence")
return False
self.logger.info(" ✅ Certain confidence behavior working correctly")
return True
except Exception as e:
self.logger.error(f"Certain confidence test failed: {e}")
return False
def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
"""Call an MCP tool in-process - override for thinkdeep-specific response handling"""
# Use in-process implementation to maintain conversation memory
response_text, _ = self.call_mcp_tool_direct(tool_name, params)
if not response_text:
return None, None
# Extract continuation_id from thinkdeep response specifically
continuation_id = self._extract_thinkdeep_continuation_id(response_text)
return response_text, continuation_id
def _extract_thinkdeep_continuation_id(self, response_text: str) -> Optional[str]:
"""Extract continuation_id from thinkdeep response"""
try:
# Parse the response
response_data = json.loads(response_text)
return response_data.get("continuation_id")
except json.JSONDecodeError as e:
self.logger.debug(f"Failed to parse response for thinkdeep continuation_id: {e}")
return None
def _parse_thinkdeep_response(self, response_text: str) -> dict:
"""Parse thinkdeep tool JSON response"""
try:
# Parse the response - it should be direct JSON
return json.loads(response_text)
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse thinkdeep response as JSON: {e}")
self.logger.error(f"Response text: {response_text[:500]}...")
return {}
def _validate_step_response(
self,
response_data: dict,
expected_step: int,
expected_total: int,
expected_next_required: bool,
expected_status: str,
) -> bool:
"""Validate a thinkdeep thinking step response structure"""
try:
# Check status
if response_data.get("status") != expected_status:
self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
return False
# Check step number
if response_data.get("step_number") != expected_step:
self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
return False
# Check total steps
if response_data.get("total_steps") != expected_total:
self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
return False
# Check next_step_required
if response_data.get("next_step_required") != expected_next_required:
self.logger.error(
f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
)
return False
# Check thinking_status exists
if "thinking_status" not in response_data:
self.logger.error("Missing thinking_status in response")
return False
# Check next_steps guidance
if not response_data.get("next_steps"):
self.logger.error("Missing next_steps guidance in response")
return False
return True
except Exception as e:
self.logger.error(f"Error validating step response: {e}")
return False
def _test_context_aware_file_embedding(self) -> bool:
"""Test context-aware file embedding optimization"""
try:
self.logger.info(" 1.5: Testing context-aware file embedding")
# Create additional test files for context testing
strategy_doc = """# Implementation Strategy
## Phase 1: Foundation (Month 1-2)
- Set up monitoring and logging infrastructure
- Establish CI/CD pipelines for microservices
- Team training on distributed systems concepts
## Phase 2: Initial Services (Month 3-4)
- Extract read-only services (user profiles, product catalog)
- Implement API gateway
- Set up service discovery
## Phase 3: Core Services (Month 5-6)
- Extract transaction services
- Implement saga patterns for distributed transactions
- Performance optimization and monitoring
"""
tech_stack_doc = """# Technology Stack Decisions
## Service Framework
- Spring Boot 2.7 (team familiarity)
- Docker containers
- Kubernetes orchestration
## Communication
- REST APIs for synchronous communication
- Apache Kafka for asynchronous messaging
- gRPC for high-performance internal communication
## Data Layer
- PostgreSQL (existing expertise)
- Redis for caching
- Elasticsearch for search and analytics
## Monitoring
- Prometheus + Grafana
- Distributed tracing with Jaeger
- Centralized logging with ELK stack
"""
# Create test files
strategy_file = self.create_additional_test_file("implementation_strategy.md", strategy_doc)
tech_stack_file = self.create_additional_test_file("tech_stack.md", tech_stack_doc)
# Test 1: New conversation, intermediate step - should only reference files
self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
response1, continuation_id = self.call_mcp_tool(
"thinkdeep",
{
"step": "Starting deep thinking about implementation timeline and technology choices",
"step_number": 1,
"total_steps": 3,
"next_step_required": True, # Intermediate step
"findings": "Initial analysis of implementation strategy and technology stack decisions",
"files_checked": [strategy_file, tech_stack_file],
"relevant_files": [strategy_file], # This should be referenced, not embedded
"relevant_context": ["implementation_timeline", "technology_selection"],
"confidence": "low",
"model": "flash",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start context-aware file embedding test")
return False
response1_data = self._parse_thinkdeep_response(response1)
if not response1_data:
return False
# Check file context - should be reference_only for intermediate step
file_context = response1_data.get("file_context", {})
if file_context.get("type") != "reference_only":
self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
return False
if "Files referenced but not embedded" not in file_context.get("context_optimization", ""):
self.logger.error("Expected context optimization message for reference_only")
return False
self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
# Test 2: Final step - should embed files for expert analysis
self.logger.info(" 1.5.2: Final step (should embed files)")
response2, _ = self.call_mcp_tool(
"thinkdeep",
{
"step": "Thinking analysis complete - comprehensive evaluation of implementation approach",
"step_number": 2,
"total_steps": 2,
"next_step_required": False, # Final step - should embed files
"continuation_id": continuation_id,
"findings": "Complete analysis: phased implementation with proven technology stack minimizes risk while maximizing team effectiveness. Timeline is realistic with proper training and infrastructure setup.",
"files_checked": [strategy_file, tech_stack_file],
"relevant_files": [strategy_file, tech_stack_file], # Should be fully embedded
"relevant_context": ["implementation_plan", "technology_decisions", "risk_management"],
"confidence": "high",
"model": "flash",
},
)
if not response2:
self.logger.error("Failed to complete to final step")
return False
response2_data = self._parse_thinkdeep_response(response2)
if not response2_data:
return False
# Check file context - should be fully_embedded for final step
file_context2 = response2_data.get("file_context", {})
if file_context2.get("type") != "fully_embedded":
self.logger.error(
f"Expected fully_embedded file context for final step, got: {file_context2.get('type')}"
)
return False
if "Full file content embedded for expert analysis" not in file_context2.get("context_optimization", ""):
self.logger.error("Expected expert analysis optimization message for fully_embedded")
return False
self.logger.info(" ✅ Final step correctly uses fully_embedded file context")
# Verify expert analysis was called for final step
if response2_data.get("status") != "calling_expert_analysis":
self.logger.error("Final step should trigger expert analysis")
return False
if "expert_analysis" not in response2_data:
self.logger.error("Expert analysis should be present in final step")
return False
self.logger.info(" ✅ Context-aware file embedding test completed successfully")
return True
except Exception as e:
self.logger.error(f"Context-aware file embedding test failed: {e}")
return False
def _test_multi_step_file_context(self) -> bool:
"""Test multi-step workflow with proper file context transitions"""
try:
self.logger.info(" 1.6: Testing multi-step file context optimization")
# Create a complex scenario with multiple thinking documents
risk_analysis = """# Risk Analysis
## Technical Risks
- Service mesh complexity
- Data consistency challenges
- Performance degradation during migration
- Operational overhead increase
## Business Risks
- Extended development timelines
- Potential system instability
- Team productivity impact
- Customer experience disruption
## Mitigation Strategies
- Gradual rollout with feature flags
- Comprehensive monitoring and alerting
- Rollback procedures for each phase
- Customer communication plan
"""
success_metrics = """# Success Metrics and KPIs
## Development Velocity
- Deployment frequency: Target 10x improvement
- Lead time for changes: <2 hours
- Mean time to recovery: <30 minutes
- Change failure rate: <5%
## System Performance
- Response time: <200ms p95
- System availability: 99.9%
- Throughput: 50k requests/minute
- Resource utilization: 70% optimal
## Business Impact
- Developer satisfaction: >8/10
- Time to market: 50% reduction
- Operational costs: 20% reduction
- System reliability: 99.9% uptime
"""
# Create test files
risk_file = self.create_additional_test_file("risk_analysis.md", risk_analysis)
metrics_file = self.create_additional_test_file("success_metrics.md", success_metrics)
# Step 1: Start thinking analysis (new conversation)
self.logger.info(" 1.6.1: Step 1 - Start thinking analysis")
response1, continuation_id = self.call_mcp_tool(
"thinkdeep",
{
"step": "Beginning comprehensive analysis of migration risks and success criteria",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"findings": "Initial assessment of risk factors and success metrics for microservices migration",
"files_checked": [risk_file],
"relevant_files": [risk_file],
"relevant_context": ["risk_assessment", "migration_planning"],
"confidence": "low",
"model": "flash",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start multi-step file context test")
return False
response1_data = self._parse_thinkdeep_response(response1)
# Validate step 1 - should use reference_only
file_context1 = response1_data.get("file_context", {})
if file_context1.get("type") != "reference_only":
self.logger.error("Step 1 should use reference_only file context")
return False
self.logger.info(" ✅ Step 1: reference_only file context")
# Step 2: Expand thinking analysis
self.logger.info(" 1.6.2: Step 2 - Expand thinking analysis")
response2, _ = self.call_mcp_tool(
"thinkdeep",
{
"step": "Deepening analysis by correlating risks with success metrics",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"continuation_id": continuation_id,
"findings": "Key insight: technical risks directly impact business metrics. Need balanced approach prioritizing high-impact, low-risk improvements first.",
"files_checked": [risk_file, metrics_file],
"relevant_files": [risk_file, metrics_file],
"relevant_context": ["risk_metric_correlation", "priority_matrix"],
"confidence": "medium",
"model": "flash",
},
)
if not response2:
self.logger.error("Failed to continue to step 2")
return False
response2_data = self._parse_thinkdeep_response(response2)
# Validate step 2 - should still use reference_only
file_context2 = response2_data.get("file_context", {})
if file_context2.get("type") != "reference_only":
self.logger.error("Step 2 should use reference_only file context")
return False
self.logger.info(" ✅ Step 2: reference_only file context with multiple files")
# Step 3: Deep analysis
self.logger.info(" 1.6.3: Step 3 - Deep strategic analysis")
response3, _ = self.call_mcp_tool(
"thinkdeep",
{
"step": "Synthesizing risk mitigation strategies with measurable success criteria",
"step_number": 3,
"total_steps": 4,
"next_step_required": True,
"continuation_id": continuation_id,
"findings": "Strategic framework emerging: phase-gate approach with clear go/no-go criteria at each milestone. Emphasis on early wins to build confidence and momentum.",
"files_checked": [risk_file, metrics_file, self.requirements_file],
"relevant_files": [risk_file, metrics_file, self.requirements_file],
"relevant_context": ["phase_gate_approach", "milestone_criteria", "early_wins"],
"confidence": "high",
"model": "flash",
},
)
if not response3:
self.logger.error("Failed to continue to step 3")
return False
response3_data = self._parse_thinkdeep_response(response3)
# Validate step 3 - should still use reference_only
file_context3 = response3_data.get("file_context", {})
if file_context3.get("type") != "reference_only":
self.logger.error("Step 3 should use reference_only file context")
return False
self.logger.info(" ✅ Step 3: reference_only file context")
# Step 4: Final analysis with expert consultation
self.logger.info(" 1.6.4: Step 4 - Final step with expert analysis")
response4, _ = self.call_mcp_tool(
"thinkdeep",
{
"step": "Thinking analysis complete - comprehensive strategic framework developed",
"step_number": 4,
"total_steps": 4,
"next_step_required": False, # Final step - should embed files
"continuation_id": continuation_id,
"findings": "Complete strategic framework: risk-balanced migration with measurable success criteria, phase-gate governance, and clear rollback procedures. Framework aligns technical execution with business objectives.",
"files_checked": [risk_file, metrics_file, self.requirements_file, self.architecture_file],
"relevant_files": [risk_file, metrics_file, self.requirements_file, self.architecture_file],
"relevant_context": ["strategic_framework", "governance_model", "success_measurement"],
"confidence": "high",
"model": "flash",
},
)
if not response4:
self.logger.error("Failed to complete to final step")
return False
response4_data = self._parse_thinkdeep_response(response4)
# Validate step 4 - should use fully_embedded for expert analysis
file_context4 = response4_data.get("file_context", {})
if file_context4.get("type") != "fully_embedded":
self.logger.error("Step 4 (final) should use fully_embedded file context")
return False
if "expert analysis" not in file_context4.get("context_optimization", "").lower():
self.logger.error("Final step should mention expert analysis in context optimization")
return False
# Verify expert analysis was triggered
if response4_data.get("status") != "calling_expert_analysis":
self.logger.error("Final step should trigger expert analysis")
return False
# Check that expert analysis has file context
expert_analysis = response4_data.get("expert_analysis", {})
if not expert_analysis:
self.logger.error("Expert analysis should be present in final step")
return False
self.logger.info(" ✅ Step 4: fully_embedded file context with expert analysis")
# Validate the complete workflow progression
progression_summary = {
"step_1": "reference_only (new conversation, intermediate)",
"step_2": "reference_only (continuation, intermediate)",
"step_3": "reference_only (continuation, intermediate)",
"step_4": "fully_embedded (continuation, final)",
}
self.logger.info(" 📋 File context progression:")
for step, context_type in progression_summary.items():
self.logger.info(f" {step}: {context_type}")
self.logger.info(" ✅ Multi-step file context optimization test completed successfully")
return True
except Exception as e:
self.logger.error(f"Multi-step file context test failed: {e}")
return False
```
--------------------------------------------------------------------------------
/simulator_tests/test_debug_validation.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
DebugWorkflow Tool Validation Test
Tests the debug tool's capabilities using the new workflow architecture.
This validates that the new workflow-based implementation maintains
all the functionality of the original debug tool.
"""
import json
from typing import Optional
from .conversation_base_test import ConversationBaseTest
class DebugValidationTest(ConversationBaseTest):
"""Test debug tool with new workflow architecture"""
@property
def test_name(self) -> str:
return "debug_validation"
@property
def test_description(self) -> str:
return "Debug tool validation with new workflow architecture"
def run_test(self) -> bool:
"""Test debug tool capabilities"""
# Set up the test environment
self.setUp()
try:
self.logger.info("Test: DebugWorkflow tool validation (new architecture)")
# Create a Python file with a subtle but realistic bug
self._create_buggy_code()
# Test 1: Single investigation session with multiple steps
if not self._test_single_investigation_session():
return False
# Test 2: Investigation flow that requires refinement
if not self._test_investigation_refine_flow():
return False
# Test 3: Complete investigation with expert analysis
if not self._test_complete_investigation_with_analysis():
return False
# Test 4: Certain confidence behavior
if not self._test_certain_confidence():
return False
# Test 5: Context-aware file embedding
if not self._test_context_aware_file_embedding():
return False
# Test 6: Multi-step file context optimization
if not self._test_multi_step_file_context():
return False
self.logger.info(" ✅ All debug validation tests passed")
return True
except Exception as e:
self.logger.error(f"DebugWorkflow validation test failed: {e}")
return False
def _create_buggy_code(self):
"""Create test files with a subtle bug for debugging"""
# Create a Python file with dictionary iteration bug
buggy_code = """#!/usr/bin/env python3
import json
from datetime import datetime, timedelta
class SessionManager:
def __init__(self):
self.active_sessions = {}
self.session_timeout = 30 * 60 # 30 minutes in seconds
def create_session(self, user_id, user_data):
\"\"\"Create a new user session\"\"\"
session_id = f"sess_{user_id}_{datetime.now().timestamp()}"
session_info = {
'user_id': user_id,
'user_data': user_data,
'created_at': datetime.now(),
'expires_at': datetime.now() + timedelta(seconds=self.session_timeout)
}
self.active_sessions[session_id] = session_info
return session_id
def validate_session(self, session_id):
\"\"\"Check if session is valid and not expired\"\"\"
if session_id not in self.active_sessions:
return False
session = self.active_sessions[session_id]
current_time = datetime.now()
# Check if session has expired
if current_time > session['expires_at']:
del self.active_sessions[session_id]
return False
return True
def cleanup_expired_sessions(self):
\"\"\"Remove expired sessions from memory\"\"\"
current_time = datetime.now()
expired_count = 0
# BUG: Modifying dictionary while iterating over it
for session_id, session in self.active_sessions.items():
if current_time > session['expires_at']:
del self.active_sessions[session_id] # This causes RuntimeError
expired_count += 1
return expired_count
"""
# Create test file with subtle bug
self.buggy_file = self.create_additional_test_file("session_manager.py", buggy_code)
self.logger.info(f" ✅ Created test file with subtle bug: {self.buggy_file}")
# Create error description
error_description = """ISSUE DESCRIPTION:
Our session management system is experiencing intermittent failures during cleanup operations.
SYMPTOMS:
- Random RuntimeError: dictionary changed size during iteration
- Occurs during high load when many sessions expire simultaneously
- Error happens in cleanup_expired_sessions method
- Affects about 5% of cleanup operations
ERROR LOG:
RuntimeError: dictionary changed size during iteration
File "session_manager.py", line 44, in cleanup_expired_sessions
for session_id, session in self.active_sessions.items():
"""
self.error_file = self.create_additional_test_file("error_description.txt", error_description)
self.logger.info(f" ✅ Created error description file: {self.error_file}")
def _test_single_investigation_session(self) -> bool:
"""Test a complete investigation session with multiple steps"""
try:
self.logger.info(" 1.1: Testing single investigation session")
# Step 1: Start investigation
self.logger.info(" 1.1.1: Step 1 - Initial investigation")
response1, continuation_id = self.call_mcp_tool(
"debug",
{
"step": "I need to investigate intermittent RuntimeError during session cleanup. Let me start by examining the error description and understanding the symptoms.",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"findings": "RuntimeError occurs during dictionary iteration in cleanup_expired_sessions method. Error happens intermittently during high load.",
"files_checked": [self.error_file],
"relevant_files": [self.error_file],
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to get initial investigation response")
return False
# Parse and validate JSON response
response1_data = self._parse_debug_response(response1)
if not response1_data:
return False
# Validate step 1 response structure - expect pause_for_investigation for next_step_required=True
if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_investigation"):
return False
self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
# Step 2: Examine the code
self.logger.info(" 1.1.2: Step 2 - Code examination")
response2, _ = self.call_mcp_tool(
"debug",
{
"step": "Now examining the session_manager.py file to understand the cleanup_expired_sessions implementation and identify the root cause.",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"findings": "Found the issue: cleanup_expired_sessions modifies self.active_sessions dictionary while iterating over it with .items(). This causes RuntimeError when del is called during iteration.",
"files_checked": [self.error_file, self.buggy_file],
"relevant_files": [self.buggy_file],
"relevant_context": ["SessionManager.cleanup_expired_sessions"],
"hypothesis": "Dictionary is being modified during iteration causing RuntimeError",
"confidence": "high",
"continuation_id": continuation_id,
},
)
if not response2:
self.logger.error("Failed to continue investigation to step 2")
return False
response2_data = self._parse_debug_response(response2)
if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_investigation"):
return False
# Check investigation status tracking
investigation_status = response2_data.get("investigation_status", {})
if investigation_status.get("files_checked", 0) < 2:
self.logger.error("Files checked count not properly tracked")
return False
if investigation_status.get("relevant_context", 0) != 1:
self.logger.error("Relevant context not properly tracked")
return False
if investigation_status.get("current_confidence") != "high":
self.logger.error("Confidence level not properly tracked")
return False
self.logger.info(" ✅ Step 2 successful with proper tracking")
# Store continuation_id for next test
self.investigation_continuation_id = continuation_id
return True
except Exception as e:
self.logger.error(f"Single investigation session test failed: {e}")
return False
def _test_investigation_refine_flow(self) -> bool:
"""Test investigation flow that requires refining the approach"""
try:
self.logger.info(" 1.2: Testing investigation refinement workflow")
# Start a new investigation for testing refinement behaviour
self.logger.info(" 1.2.1: Start investigation for refinement test")
response1, continuation_id = self.call_mcp_tool(
"debug",
{
"step": "Investigating performance degradation in data processing pipeline",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"findings": "Initial analysis shows slow database queries",
"files_checked": ["/db/queries.py"],
"relevant_files": ["/db/queries.py"],
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start refinement test investigation")
return False
# Step 2: Wrong direction
self.logger.info(" 1.2.2: Step 2 - Wrong investigation path")
response2, _ = self.call_mcp_tool(
"debug",
{
"step": "Focusing on database optimization strategies",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"findings": "Database queries seem optimized, might be looking in wrong place",
"files_checked": ["/db/queries.py", "/db/indexes.py"],
"relevant_files": [],
"hypothesis": "Database performance issues",
"confidence": "low",
"continuation_id": continuation_id,
},
)
if not response2:
self.logger.error("Failed to continue to step 2")
return False
# Step 3: Backtrack from step 2
self.logger.info(" 1.2.3: Step 3 - Refine investigation path")
response3, _ = self.call_mcp_tool(
"debug",
{
"step": "Refocusing - the issue might not be database related. Let me investigate the data processing algorithm instead.",
"step_number": 3,
"total_steps": 4,
"next_step_required": True,
"findings": "Found inefficient nested loops in data processor causing O(n²) complexity",
"files_checked": ["/processor/algorithm.py"],
"relevant_files": ["/processor/algorithm.py"],
"relevant_context": ["DataProcessor.process_batch"],
"hypothesis": "Inefficient algorithm causing performance issues",
"confidence": "medium",
"continuation_id": continuation_id,
},
)
if not response3:
self.logger.error("Failed to refine investigation")
return False
response3_data = self._parse_debug_response(response3)
if not self._validate_step_response(response3_data, 3, 4, True, "pause_for_investigation"):
return False
self.logger.info(" ✅ Investigation refinement working correctly")
return True
except Exception as e:
self.logger.error(f"Investigation refinement test failed: {e}")
return False
def _test_complete_investigation_with_analysis(self) -> bool:
"""Test complete investigation ending with expert analysis"""
try:
self.logger.info(" 1.3: Testing complete investigation with expert analysis")
# Use the continuation from first test
continuation_id = getattr(self, "investigation_continuation_id", None)
if not continuation_id:
# Start fresh if no continuation available
self.logger.info(" 1.3.0: Starting fresh investigation")
response0, continuation_id = self.call_mcp_tool(
"debug",
{
"step": "Investigating the dictionary iteration bug in session cleanup",
"step_number": 1,
"total_steps": 2,
"next_step_required": True,
"findings": "Found dictionary modification during iteration",
"files_checked": [self.buggy_file],
"relevant_files": [self.buggy_file],
"relevant_context": ["SessionManager.cleanup_expired_sessions"],
},
)
if not response0 or not continuation_id:
self.logger.error("Failed to start fresh investigation")
return False
# Final step - trigger expert analysis
self.logger.info(" 1.3.1: Final step - complete investigation")
response_final, _ = self.call_mcp_tool(
"debug",
{
"step": "Investigation complete. The root cause is confirmed: cleanup_expired_sessions modifies the dictionary while iterating, causing RuntimeError.",
"step_number": 2,
"total_steps": 2,
"next_step_required": False, # Final step - triggers expert analysis
"findings": "Root cause identified: del self.active_sessions[session_id] on line 46 modifies dictionary during iteration starting at line 44. Fix: collect expired IDs first, then delete.",
"files_checked": [self.buggy_file],
"relevant_files": [self.buggy_file],
"relevant_context": ["SessionManager.cleanup_expired_sessions"],
"hypothesis": "Dictionary modification during iteration causes RuntimeError in cleanup_expired_sessions",
"confidence": "high",
"continuation_id": continuation_id,
"model": "flash", # Use flash for expert analysis
},
)
if not response_final:
self.logger.error("Failed to complete investigation")
return False
response_final_data = self._parse_debug_response(response_final)
if not response_final_data:
return False
# Validate final response structure - expect calling_expert_analysis for next_step_required=False
if response_final_data.get("status") != "calling_expert_analysis":
self.logger.error(
f"Expected status 'calling_expert_analysis', got '{response_final_data.get('status')}'"
)
return False
if not response_final_data.get("investigation_complete"):
self.logger.error("Expected investigation_complete=true for final step")
return False
# Check for expert analysis
if "expert_analysis" not in response_final_data:
self.logger.error("Missing expert_analysis in final response")
return False
expert_analysis = response_final_data.get("expert_analysis", {})
# Check for expected analysis content (checking common patterns)
analysis_text = json.dumps(expert_analysis, ensure_ascii=False).lower()
# Look for bug identification
bug_indicators = ["dictionary", "iteration", "modify", "runtime", "error", "del"]
found_indicators = sum(1 for indicator in bug_indicators if indicator in analysis_text)
if found_indicators >= 3:
self.logger.info(" ✅ Expert analysis identified the bug correctly")
else:
self.logger.warning(
f" ⚠️ Expert analysis may not have fully identified the bug (found {found_indicators}/6 indicators)"
)
# Check complete investigation summary
if "complete_investigation" not in response_final_data:
self.logger.error("Missing complete_investigation in final response")
return False
complete_investigation = response_final_data["complete_investigation"]
if not complete_investigation.get("relevant_context"):
self.logger.error("Missing relevant context in complete investigation")
return False
if "SessionManager.cleanup_expired_sessions" not in complete_investigation["relevant_context"]:
self.logger.error("Expected method not found in investigation summary")
return False
self.logger.info(" ✅ Complete investigation with expert analysis successful")
return True
except Exception as e:
self.logger.error(f"Complete investigation test failed: {e}")
return False
def _test_certain_confidence(self) -> bool:
"""Test certain confidence behavior - should skip expert analysis"""
try:
self.logger.info(" 1.4: Testing certain confidence behavior")
# Test certain confidence - should skip expert analysis
self.logger.info(" 1.4.1: Certain confidence investigation")
response_certain, _ = self.call_mcp_tool(
"debug",
{
"step": "I have confirmed the exact root cause with 100% certainty: dictionary modification during iteration.",
"step_number": 1,
"total_steps": 1,
"next_step_required": False, # Final step
"findings": "The bug is on line 44-47: for loop iterates over dict.items() while del modifies the dict inside the loop. Fix is simple: collect expired IDs first, then delete after iteration.",
"files_checked": [self.buggy_file],
"relevant_files": [self.buggy_file],
"relevant_context": ["SessionManager.cleanup_expired_sessions"],
"hypothesis": "Dictionary modification during iteration causes RuntimeError - fix is straightforward",
"confidence": "certain", # This should skip expert analysis
"model": "flash",
},
)
if not response_certain:
self.logger.error("Failed to test certain confidence")
return False
response_certain_data = self._parse_debug_response(response_certain)
if not response_certain_data:
return False
# Validate certain confidence response - should skip expert analysis
if response_certain_data.get("status") != "certain_confidence_proceed_with_fix":
self.logger.error(
f"Expected status 'certain_confidence_proceed_with_fix', got '{response_certain_data.get('status')}'"
)
return False
if not response_certain_data.get("skip_expert_analysis"):
self.logger.error("Expected skip_expert_analysis=true for certain confidence")
return False
expert_analysis = response_certain_data.get("expert_analysis", {})
if expert_analysis.get("status") != "skipped_due_to_certain_confidence":
self.logger.error("Expert analysis should be skipped for certain confidence")
return False
self.logger.info(" ✅ Certain confidence behavior working correctly")
return True
except Exception as e:
self.logger.error(f"Certain confidence test failed: {e}")
return False
def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
"""Call an MCP tool in-process - override for debug-specific response handling"""
# Use in-process implementation to maintain conversation memory
response_text, _ = self.call_mcp_tool_direct(tool_name, params)
if not response_text:
return None, None
# Extract continuation_id from debug response specifically
continuation_id = self._extract_debug_continuation_id(response_text)
return response_text, continuation_id
def _extract_debug_continuation_id(self, response_text: str) -> Optional[str]:
"""Extract continuation_id from debug response"""
try:
# Parse the response
response_data = json.loads(response_text)
return response_data.get("continuation_id")
except json.JSONDecodeError as e:
self.logger.debug(f"Failed to parse response for debug continuation_id: {e}")
return None
def _parse_debug_response(self, response_text: str) -> dict:
"""Parse debug tool JSON response"""
try:
# Parse the response - it should be direct JSON
return json.loads(response_text)
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse debug response as JSON: {e}")
self.logger.error(f"Response text: {response_text[:500]}...")
return {}
def _validate_step_response(
self,
response_data: dict,
expected_step: int,
expected_total: int,
expected_next_required: bool,
expected_status: str,
) -> bool:
"""Validate a debug investigation step response structure"""
try:
# Check status
if response_data.get("status") != expected_status:
self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
return False
# Check step number
if response_data.get("step_number") != expected_step:
self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
return False
# Check total steps
if response_data.get("total_steps") != expected_total:
self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
return False
# Check next_step_required
if response_data.get("next_step_required") != expected_next_required:
self.logger.error(
f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
)
return False
# Check investigation_status exists
if "investigation_status" not in response_data:
self.logger.error("Missing investigation_status in response")
return False
# Check next_steps guidance
if not response_data.get("next_steps"):
self.logger.error("Missing next_steps guidance in response")
return False
return True
except Exception as e:
self.logger.error(f"Error validating step response: {e}")
return False
def _test_context_aware_file_embedding(self) -> bool:
"""Test context-aware file embedding optimization"""
try:
self.logger.info(" 1.5: Testing context-aware file embedding")
# Create multiple test files for context testing
file1_content = """#!/usr/bin/env python3
def process_data(data):
\"\"\"Process incoming data\"\"\"
result = []
for item in data:
if item.get('valid'):
result.append(item['value'])
return result
"""
file2_content = """#!/usr/bin/env python3
def validate_input(data):
\"\"\"Validate input data\"\"\"
if not isinstance(data, list):
raise ValueError("Data must be a list")
for item in data:
if not isinstance(item, dict):
raise ValueError("Items must be dictionaries")
if 'value' not in item:
raise ValueError("Items must have 'value' key")
return True
"""
# Create test files
file1 = self.create_additional_test_file("data_processor.py", file1_content)
file2 = self.create_additional_test_file("validator.py", file2_content)
# Test 1: New conversation, intermediate step - should only reference files
self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
response1, continuation_id = self.call_mcp_tool(
"debug",
{
"step": "Starting investigation of data processing pipeline",
"step_number": 1,
"total_steps": 3,
"next_step_required": True, # Intermediate step
"findings": "Initial analysis of data processing components",
"files_checked": [file1, file2],
"relevant_files": [file1], # This should be referenced, not embedded
"relevant_context": ["process_data"],
"hypothesis": "Investigating data flow",
"confidence": "low",
"model": "flash",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start context-aware file embedding test")
return False
response1_data = self._parse_debug_response(response1)
if not response1_data:
return False
# Check file context - should be reference_only for intermediate step
file_context = response1_data.get("file_context", {})
if file_context.get("type") != "reference_only":
self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
return False
if "Files referenced but not embedded" not in file_context.get("context_optimization", ""):
self.logger.error("Expected context optimization message for reference_only")
return False
self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
# Test 2: Intermediate step with continuation - should still only reference
self.logger.info(" 1.5.2: Intermediate step with continuation (should reference only)")
response2, _ = self.call_mcp_tool(
"debug",
{
"step": "Continuing investigation with more detailed analysis",
"step_number": 2,
"total_steps": 3,
"next_step_required": True, # Still intermediate
"continuation_id": continuation_id,
"findings": "Found potential issues in validation logic",
"files_checked": [file1, file2],
"relevant_files": [file1, file2], # Both files referenced
"relevant_context": ["process_data", "validate_input"],
"hypothesis": "Validation might be too strict",
"confidence": "medium",
"model": "flash",
},
)
if not response2:
self.logger.error("Failed to continue to step 2")
return False
response2_data = self._parse_debug_response(response2)
if not response2_data:
return False
# Check file context - should still be reference_only
file_context2 = response2_data.get("file_context", {})
if file_context2.get("type") != "reference_only":
self.logger.error(f"Expected reference_only file context for step 2, got: {file_context2.get('type')}")
return False
# Should include reference note
if not file_context2.get("note"):
self.logger.error("Expected file reference note for intermediate step")
return False
reference_note = file_context2.get("note", "")
if "data_processor.py" not in reference_note or "validator.py" not in reference_note:
self.logger.error("File reference note should mention both files")
return False
self.logger.info(" ✅ Intermediate step with continuation correctly uses reference_only")
# Test 3: Final step - should embed files for expert analysis
self.logger.info(" 1.5.3: Final step (should embed files)")
response3, _ = self.call_mcp_tool(
"debug",
{
"step": "Investigation complete - identified the root cause",
"step_number": 3,
"total_steps": 3,
"next_step_required": False, # Final step - should embed files
"continuation_id": continuation_id,
"findings": "Root cause: validator is rejecting valid data due to strict type checking",
"files_checked": [file1, file2],
"relevant_files": [file1, file2], # Should be fully embedded
"relevant_context": ["process_data", "validate_input"],
"hypothesis": "Validation logic is too restrictive for valid edge cases",
"confidence": "high",
"model": "flash",
},
)
if not response3:
self.logger.error("Failed to complete to final step")
return False
response3_data = self._parse_debug_response(response3)
if not response3_data:
return False
# Check file context - should be fully_embedded for final step
file_context3 = response3_data.get("file_context", {})
if file_context3.get("type") != "fully_embedded":
self.logger.error(
f"Expected fully_embedded file context for final step, got: {file_context3.get('type')}"
)
return False
if "Full file content embedded for expert analysis" not in file_context3.get("context_optimization", ""):
self.logger.error("Expected expert analysis optimization message for fully_embedded")
return False
# Should show files embedded count
files_embedded = file_context3.get("files_embedded", 0)
if files_embedded == 0:
# This is OK - files might already be in conversation history
self.logger.info(
" ℹ️ Files embedded count is 0 - files already in conversation history (smart deduplication)"
)
else:
self.logger.info(f" ✅ Files embedded count: {files_embedded}")
self.logger.info(" ✅ Final step correctly uses fully_embedded file context")
# Verify expert analysis was called for final step
if response3_data.get("status") != "calling_expert_analysis":
self.logger.error("Final step should trigger expert analysis")
return False
if "expert_analysis" not in response3_data:
self.logger.error("Expert analysis should be present in final step")
return False
self.logger.info(" ✅ Context-aware file embedding test completed successfully")
return True
except Exception as e:
self.logger.error(f"Context-aware file embedding test failed: {e}")
return False
def _test_multi_step_file_context(self) -> bool:
"""Test multi-step workflow with proper file context transitions"""
try:
self.logger.info(" 1.6: Testing multi-step file context optimization")
# Create a complex scenario with multiple files
config_content = """#!/usr/bin/env python3
import os
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///app.db')
DEBUG_MODE = os.getenv('DEBUG', 'False').lower() == 'true'
MAX_CONNECTIONS = int(os.getenv('MAX_CONNECTIONS', '10'))
# Bug: This will cause issues when MAX_CONNECTIONS is not a valid integer
CACHE_SIZE = MAX_CONNECTIONS * 2 # Problematic if MAX_CONNECTIONS is invalid
"""
server_content = """#!/usr/bin/env python3
from config import DATABASE_URL, DEBUG_MODE, CACHE_SIZE
import sqlite3
class DatabaseServer:
def __init__(self):
self.connection_pool = []
self.cache_size = CACHE_SIZE # This will fail if CACHE_SIZE is invalid
def connect(self):
try:
conn = sqlite3.connect(DATABASE_URL)
self.connection_pool.append(conn)
return conn
except Exception as e:
print(f"Connection failed: {e}")
return None
"""
# Create test files
config_file = self.create_additional_test_file("config.py", config_content)
server_file = self.create_additional_test_file("database_server.py", server_content)
# Step 1: Start investigation (new conversation)
self.logger.info(" 1.6.1: Step 1 - Start investigation")
response1, continuation_id = self.call_mcp_tool(
"debug",
{
"step": "Investigating application startup failures in production environment",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"findings": "Application fails to start with configuration errors",
"files_checked": [config_file],
"relevant_files": [config_file],
"relevant_context": [],
"hypothesis": "Configuration issue causing startup failure",
"confidence": "low",
"model": "flash",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start multi-step file context test")
return False
response1_data = self._parse_debug_response(response1)
# Validate step 1 - should use reference_only
file_context1 = response1_data.get("file_context", {})
if file_context1.get("type") != "reference_only":
self.logger.error("Step 1 should use reference_only file context")
return False
self.logger.info(" ✅ Step 1: reference_only file context")
# Step 2: Expand investigation
self.logger.info(" 1.6.2: Step 2 - Expand investigation")
response2, _ = self.call_mcp_tool(
"debug",
{
"step": "Found configuration issue - investigating database server initialization",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"continuation_id": continuation_id,
"findings": "MAX_CONNECTIONS environment variable contains invalid value, causing CACHE_SIZE calculation to fail",
"files_checked": [config_file, server_file],
"relevant_files": [config_file, server_file],
"relevant_context": ["DatabaseServer.__init__"],
"hypothesis": "Invalid environment variable causing integer conversion error",
"confidence": "medium",
"model": "flash",
},
)
if not response2:
self.logger.error("Failed to continue to step 2")
return False
response2_data = self._parse_debug_response(response2)
# Validate step 2 - should still use reference_only
file_context2 = response2_data.get("file_context", {})
if file_context2.get("type") != "reference_only":
self.logger.error("Step 2 should use reference_only file context")
return False
# Should reference both files
reference_note = file_context2.get("note", "")
if "config.py" not in reference_note or "database_server.py" not in reference_note:
self.logger.error("Step 2 should reference both files in note")
return False
self.logger.info(" ✅ Step 2: reference_only file context with multiple files")
# Step 3: Deep analysis
self.logger.info(" 1.6.3: Step 3 - Deep analysis")
response3, _ = self.call_mcp_tool(
"debug",
{
"step": "Analyzing the exact error propagation path and impact",
"step_number": 3,
"total_steps": 4,
"next_step_required": True,
"continuation_id": continuation_id,
"findings": "Error occurs in config.py line 8 when MAX_CONNECTIONS is not numeric, then propagates to DatabaseServer.__init__",
"files_checked": [config_file, server_file],
"relevant_files": [config_file, server_file],
"relevant_context": ["DatabaseServer.__init__"],
"hypothesis": "Need proper error handling and validation for environment variables",
"confidence": "high",
"model": "flash",
},
)
if not response3:
self.logger.error("Failed to continue to step 3")
return False
response3_data = self._parse_debug_response(response3)
# Validate step 3 - should still use reference_only
file_context3 = response3_data.get("file_context", {})
if file_context3.get("type") != "reference_only":
self.logger.error("Step 3 should use reference_only file context")
return False
self.logger.info(" ✅ Step 3: reference_only file context")
# Step 4: Final analysis with expert consultation
self.logger.info(" 1.6.4: Step 4 - Final step with expert analysis")
response4, _ = self.call_mcp_tool(
"debug",
{
"step": "Investigation complete - root cause identified with solution",
"step_number": 4,
"total_steps": 4,
"next_step_required": False, # Final step - should embed files
"continuation_id": continuation_id,
"findings": "Root cause: config.py assumes MAX_CONNECTIONS env var is always a valid integer. Fix: add try/except with default value and proper validation.",
"files_checked": [config_file, server_file],
"relevant_files": [config_file, server_file],
"relevant_context": ["DatabaseServer.__init__"],
"hypothesis": "Environment variable validation needed with proper error handling",
"confidence": "high",
"model": "flash",
},
)
if not response4:
self.logger.error("Failed to complete to final step")
return False
response4_data = self._parse_debug_response(response4)
# Validate step 4 - should use fully_embedded for expert analysis
file_context4 = response4_data.get("file_context", {})
if file_context4.get("type") != "fully_embedded":
self.logger.error("Step 4 (final) should use fully_embedded file context")
return False
if "expert analysis" not in file_context4.get("context_optimization", "").lower():
self.logger.error("Final step should mention expert analysis in context optimization")
return False
# Verify expert analysis was triggered
if response4_data.get("status") != "calling_expert_analysis":
self.logger.error("Final step should trigger expert analysis")
return False
# Check that expert analysis has file context
expert_analysis = response4_data.get("expert_analysis", {})
if not expert_analysis:
self.logger.error("Expert analysis should be present in final step")
return False
self.logger.info(" ✅ Step 4: fully_embedded file context with expert analysis")
# Validate the complete workflow progression
progression_summary = {
"step_1": "reference_only (new conversation, intermediate)",
"step_2": "reference_only (continuation, intermediate)",
"step_3": "reference_only (continuation, intermediate)",
"step_4": "fully_embedded (continuation, final)",
}
self.logger.info(" 📋 File context progression:")
for step, context_type in progression_summary.items():
self.logger.info(f" {step}: {context_type}")
self.logger.info(" ✅ Multi-step file context optimization test completed successfully")
return True
except Exception as e:
self.logger.error(f"Multi-step file context test failed: {e}")
return False
```