This is page 3 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/simulator_tests/test_o3_pro_expensive.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | O3-Pro Expensive Model Test
4 |
5 | ⚠️ WARNING: This test uses o3-pro which is EXTREMELY EXPENSIVE! ⚠️
6 |
7 | This test is intentionally NOT added to TEST_REGISTRY to prevent accidental execution.
8 | It can only be run manually using:
9 | python communication_simulator_test.py --individual o3_pro_expensive
10 |
11 | Tests that o3-pro model:
12 | 1. Uses the correct /v1/responses endpoint (not /v1/chat/completions)
13 | 2. Successfully completes a chat call
14 | 3. Returns properly formatted response
15 | """
16 |
17 | from .base_test import BaseSimulatorTest
18 |
19 |
20 | class O3ProExpensiveTest(BaseSimulatorTest):
21 | """Test o3-pro model basic functionality - EXPENSIVE, manual only"""
22 |
23 | @property
24 | def test_name(self) -> str:
25 | return "o3_pro_expensive"
26 |
27 | @property
28 | def test_description(self) -> str:
29 | return "⚠️ EXPENSIVE O3-Pro basic validation (manual only)"
30 |
31 | def run_test(self) -> bool:
32 | """Test o3-pro model with endpoint verification - EXPENSIVE!"""
33 | try:
34 | self.logger.warning("⚠️ ⚠️ ⚠️ EXPENSIVE TEST - O3-PRO COSTS ~$15-60 PER 1K TOKENS! ⚠️ ⚠️ ⚠️")
35 | self.logger.info("Test: O3-Pro endpoint and functionality test")
36 |
37 | # First, verify we're hitting the right endpoint by checking logs
38 | self.logger.info("Step 1: Testing o3-pro with chat tool")
39 |
40 | # One simple chat call
41 | response, tool_result = self.call_mcp_tool(
42 | "chat",
43 | {
44 | "prompt": "What is 2 + 2?",
45 | "model": "o3-pro",
46 | "temperature": 1.0,
47 | },
48 | )
49 |
50 | if not response:
51 | self.logger.error("❌ O3-Pro chat call failed - no response")
52 | if tool_result and "error" in tool_result:
53 | error_msg = tool_result["error"]
54 | self.logger.error(f"Error details: {error_msg}")
55 | # Check if it's the endpoint error we're trying to fix
56 | if "v1/responses" in str(error_msg) and "v1/chat/completions" in str(error_msg):
57 | self.logger.error(
58 | "❌ ENDPOINT BUG DETECTED: o3-pro is trying to use chat/completions instead of responses endpoint!"
59 | )
60 | return False
61 |
62 | # Check the metadata to verify endpoint was used
63 | if tool_result and isinstance(tool_result, dict):
64 | metadata = tool_result.get("metadata", {})
65 | endpoint_used = metadata.get("endpoint", "unknown")
66 |
67 | if endpoint_used == "responses":
68 | self.logger.info("✅ Correct endpoint used: /v1/responses")
69 | else:
70 | self.logger.warning(f"⚠️ Endpoint used: {endpoint_used} (expected: responses)")
71 |
72 | # Verify the response content
73 | if response and "4" in str(response):
74 | self.logger.info("✅ O3-Pro response is mathematically correct")
75 | else:
76 | self.logger.warning(f"⚠️ Unexpected response: {response}")
77 |
78 | self.logger.info("✅ O3-Pro test completed successfully")
79 | self.logger.warning("💰 Test completed - check your billing!")
80 | return True
81 |
82 | except Exception as e:
83 | self.logger.error(f"O3-Pro test failed with exception: {e}")
84 | # Log the full error for debugging endpoint issues
85 | import traceback
86 |
87 | self.logger.error(f"Full traceback: {traceback.format_exc()}")
88 | return False
89 |
90 |
91 | def main():
92 | """Run the O3-Pro expensive test"""
93 | import sys
94 |
95 | print("⚠️ ⚠️ ⚠️ WARNING: This test uses O3-PRO which is EXTREMELY EXPENSIVE! ⚠️ ⚠️ ⚠️")
96 | print("O3-Pro can cost $15-60 per 1K tokens!")
97 | print("This is a MINIMAL test but may still cost $5-15!")
98 | print()
99 |
100 | response = input("Are you absolutely sure you want to run this expensive test? Type 'YES_I_UNDERSTAND_THE_COST': ")
101 | if response != "YES_I_UNDERSTAND_THE_COST":
102 | print("❌ Test cancelled")
103 | sys.exit(1)
104 |
105 | print("💰 Running minimal O3-Pro test...")
106 |
107 | verbose = "--verbose" in sys.argv or "-v" in sys.argv
108 | test = O3ProExpensiveTest(verbose=verbose)
109 |
110 | success = test.run_test()
111 |
112 | if success:
113 | print("✅ O3-Pro test completed successfully")
114 | print("💰 Don't forget to check your billing!")
115 | else:
116 | print("❌ O3-Pro test failed")
117 |
118 | sys.exit(0 if success else 1)
119 |
120 |
121 | if __name__ == "__main__":
122 | main()
123 |
```
--------------------------------------------------------------------------------
/tests/test_cassette_semantic_matching.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for cassette semantic matching to prevent breaks from prompt changes.
3 |
4 | This validates that o3 model cassettes match on semantic content (model + user question)
5 | rather than exact request bodies, preventing cassette breaks when system prompts change.
6 | """
7 |
8 | import hashlib
9 | import json
10 |
11 | import pytest
12 |
13 | from tests.http_transport_recorder import ReplayTransport
14 |
15 |
16 | class TestCassetteSemanticMatching:
17 | """Test that cassette matching is resilient to prompt changes."""
18 |
19 | @pytest.fixture
20 | def dummy_cassette(self, tmp_path):
21 | """Create a minimal dummy cassette file."""
22 | cassette_file = tmp_path / "dummy.json"
23 | cassette_file.write_text(json.dumps({"interactions": []}))
24 | return cassette_file
25 |
26 | def test_o3_model_semantic_matching(self, dummy_cassette):
27 | """Test that o3 models use semantic matching."""
28 | transport = ReplayTransport(str(dummy_cassette))
29 |
30 | # Two requests with same user question but different system prompts
31 | request1_body = {
32 | "model": "o3-pro",
33 | "reasoning": {"effort": "medium"},
34 | "input": [
35 | {
36 | "role": "user",
37 | "content": [
38 | {
39 | "type": "input_text",
40 | "text": "System prompt v1...\n\n=== USER REQUEST ===\nWhat is 2 + 2?\n=== END REQUEST ===\n\nMore instructions...",
41 | }
42 | ],
43 | }
44 | ],
45 | }
46 |
47 | request2_body = {
48 | "model": "o3-pro",
49 | "reasoning": {"effort": "medium"},
50 | "input": [
51 | {
52 | "role": "user",
53 | "content": [
54 | {
55 | "type": "input_text",
56 | "text": "System prompt v2 (DIFFERENT)...\n\n=== USER REQUEST ===\nWhat is 2 + 2?\n=== END REQUEST ===\n\nDifferent instructions...",
57 | }
58 | ],
59 | }
60 | ],
61 | }
62 |
63 | # Extract semantic fields - should be identical
64 | semantic1 = transport._extract_semantic_fields(request1_body)
65 | semantic2 = transport._extract_semantic_fields(request2_body)
66 |
67 | assert semantic1 == semantic2, "Semantic fields should match despite different prompts"
68 | assert semantic1["user_question"] == "What is 2 + 2?"
69 | assert semantic1["model"] == "o3-pro"
70 | assert semantic1["reasoning"] == {"effort": "medium"}
71 |
72 | # Generate signatures - should be identical
73 | content1 = json.dumps(semantic1, sort_keys=True)
74 | content2 = json.dumps(semantic2, sort_keys=True)
75 | hash1 = hashlib.md5(content1.encode()).hexdigest()
76 | hash2 = hashlib.md5(content2.encode()).hexdigest()
77 |
78 | assert hash1 == hash2, "Hashes should match for same semantic content"
79 |
80 | def test_non_o3_model_exact_matching(self, dummy_cassette):
81 | """Test that non-o3 models still use exact matching."""
82 | transport = ReplayTransport(str(dummy_cassette))
83 |
84 | request_body = {
85 | "model": "gpt-4",
86 | "messages": [{"role": "user", "content": "test"}],
87 | }
88 |
89 | # Should not use semantic matching
90 | assert not transport._is_o3_model_request(request_body)
91 |
92 | def test_o3_mini_semantic_matching(self, dummy_cassette):
93 | """Test that o3-mini also uses semantic matching."""
94 | transport = ReplayTransport(str(dummy_cassette))
95 |
96 | request_body = {
97 | "model": "o3-mini",
98 | "reasoning": {"effort": "low"},
99 | "input": [
100 | {
101 | "role": "user",
102 | "content": [
103 | {"type": "input_text", "text": "System...\n\n=== USER REQUEST ===\nTest\n=== END REQUEST ==="}
104 | ],
105 | }
106 | ],
107 | }
108 |
109 | assert transport._is_o3_model_request(request_body)
110 | semantic = transport._extract_semantic_fields(request_body)
111 | assert semantic["model"] == "o3-mini"
112 | assert semantic["user_question"] == "Test"
113 |
114 | def test_o3_without_request_markers(self, dummy_cassette):
115 | """Test o3 requests without REQUEST markers fall back to full text."""
116 | transport = ReplayTransport(str(dummy_cassette))
117 |
118 | request_body = {
119 | "model": "o3-pro",
120 | "reasoning": {"effort": "medium"},
121 | "input": [{"role": "user", "content": [{"type": "input_text", "text": "Just a simple question"}]}],
122 | }
123 |
124 | semantic = transport._extract_semantic_fields(request_body)
125 | assert semantic["user_question"] == "Just a simple question"
126 |
```
--------------------------------------------------------------------------------
/docs/tools/thinkdeep.md:
--------------------------------------------------------------------------------
```markdown
1 | # ThinkDeep Tool - Extended Reasoning Partner
2 |
3 | **Get a second opinion to augment Claude's own extended thinking**
4 |
5 | The `thinkdeep` tool provides extended reasoning capabilities, offering a second perspective to augment Claude's analysis. It's designed to challenge assumptions, find edge cases, and provide alternative approaches to complex problems.
6 |
7 | ## Thinking Mode
8 |
9 | **Default is `high` (16,384 tokens) for deep analysis.** Claude will automatically choose the best mode based on complexity - use `low` for quick validations, `medium` for standard problems, `high` for complex issues (default), or `max` for extremely complex challenges requiring deepest analysis.
10 |
11 | ## Example Prompt
12 |
13 | ```
14 | Think deeper about my authentication design with pro using max thinking mode and brainstorm to come up
15 | with the best architecture for my project
16 | ```
17 |
18 | ## Key Features
19 |
20 | - **Uses Gemini's specialized thinking models** for enhanced reasoning capabilities
21 | - **Provides a second opinion** on Claude's analysis
22 | - **Challenges assumptions** and identifies edge cases Claude might miss
23 | - **Offers alternative perspectives** and approaches
24 | - **Validates architectural decisions** and design patterns
25 | - **File reference support**: `"Use gemini to think deeper about my API design with reference to api/routes.py"`
26 | - **Image support**: Analyze architectural diagrams, flowcharts, design mockups: `"Think deeper about this system architecture diagram with gemini pro using max thinking mode"`
27 | - **Enhanced Critical Evaluation (v2.10.0)**: After Gemini's analysis, Claude is prompted to critically evaluate the suggestions, consider context and constraints, identify risks, and synthesize a final recommendation - ensuring a balanced, well-considered solution
28 | - **Web search capability**: Automatically identifies areas where current documentation or community solutions would strengthen the analysis and instructs Claude to perform targeted searches
29 |
30 | ## Tool Parameters
31 |
32 | - `prompt`: Your current thinking/analysis to extend and validate (required)
33 | - `model`: auto|pro|flash|flash-2.0|flashlite|o3|o3-mini|o4-mini|gpt4.1|gpt5|gpt5-mini|gpt5-nano (default: server default)
34 | - `problem_context`: Additional context about the problem or goal
35 | - `focus_areas`: Specific aspects to focus on (architecture, performance, security, etc.)
36 | - `files`: Optional file paths or directories for additional context (absolute paths)
37 | - `images`: Optional images for visual analysis (absolute paths)
38 | - `temperature`: Temperature for creative thinking (0-1, default 0.7)
39 | - `thinking_mode`: minimal|low|medium|high|max (default: high, Gemini only)
40 | - `continuation_id`: Continue previous conversations
41 |
42 | ## Usage Examples
43 |
44 | **Architecture Design:**
45 | ```
46 | "Think deeper about my microservices authentication strategy with pro using max thinking mode"
47 | ```
48 |
49 | **With File Context:**
50 | ```
51 | "Use gemini to think deeper about my API design with reference to api/routes.py and models/user.py"
52 | ```
53 |
54 | **Visual Analysis:**
55 | ```
56 | "Think deeper about this system architecture diagram with gemini pro - identify potential bottlenecks"
57 | ```
58 |
59 | **Problem Solving:**
60 | ```
61 | "I'm considering using GraphQL vs REST for my API. Think deeper about the trade-offs with o3 using high thinking mode"
62 | ```
63 |
64 | **Code Review Enhancement:**
65 | ```
66 | "Think deeper about the security implications of this authentication code with pro"
67 | ```
68 |
69 | ## Best Practices
70 |
71 | - **Provide detailed context**: Share your current thinking, constraints, and objectives
72 | - **Be specific about focus areas**: Mention what aspects need deeper analysis
73 | - **Include relevant files**: Reference code, documentation, or configuration files
74 | - **Use appropriate thinking modes**: Higher modes for complex problems, lower for quick validations
75 | - **Leverage visual context**: Include diagrams or mockups for architectural discussions
76 | - **Build on discussions**: Use continuation to extend previous analyses
77 |
78 | ## Enhanced Critical Evaluation Process
79 |
80 | The `thinkdeep` tool includes a unique two-stage process:
81 |
82 | 1. **Gemini's Analysis**: Extended reasoning with specialized thinking capabilities
83 | 2. **Claude's Critical Evaluation**: Claude reviews Gemini's suggestions, considers:
84 | - Context and constraints of your specific situation
85 | - Potential risks and implementation challenges
86 | - Trade-offs and alternatives
87 | - Final synthesized recommendation
88 |
89 | This ensures you get both deep reasoning and practical, context-aware advice.
90 |
91 | ## When to Use ThinkDeep vs Other Tools
92 |
93 | - **Use `thinkdeep`** for: Extending specific analysis, challenging assumptions, architectural decisions
94 | - **Use `chat`** for: Open-ended brainstorming and general discussions
95 | - **Use `analyze`** for: Understanding existing code without extending analysis
96 | - **Use `codereview`** for: Finding specific bugs and security issues
97 |
```
--------------------------------------------------------------------------------
/systemprompts/analyze_prompt.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Analyze tool system prompt
3 | """
4 |
5 | ANALYZE_PROMPT = """
6 | ROLE
7 | You are a senior software analyst performing a holistic technical audit of the given code or project. Your mission is
8 | to help engineers understand how a codebase aligns with long-term goals, architectural soundness, scalability,
9 | and maintainability—not just spot routine code-review issues.
10 |
11 | CRITICAL LINE NUMBER INSTRUCTIONS
12 | Code is presented with line number markers "LINE│ code". These markers are for reference ONLY and MUST NOT be
13 | included in any code you generate. Always reference specific line numbers in your replies in order to locate
14 | exact positions if needed to point to exact locations. Include a very short code excerpt alongside for clarity.
15 | Include context_start_text and context_end_text as backup references. Never include "LINE│" markers in generated code
16 | snippets.
17 |
18 | IF MORE INFORMATION IS NEEDED
19 | If you need additional context (e.g., dependencies, configuration files, test files) to provide complete analysis, you
20 | MUST respond ONLY with this JSON format (and nothing else). Do NOT ask for the same file you've been provided unless
21 | for some reason its content is missing or incomplete:
22 | {
23 | "status": "files_required_to_continue",
24 | "mandatory_instructions": "<your critical instructions for the agent>",
25 | "files_needed": ["[file name here]", "[or some folder/]"]
26 | }
27 |
28 | ESCALATE TO A FULL CODEREVIEW IF REQUIRED
29 | If, after thoroughly analysing the question and the provided code, you determine that a comprehensive, code-base–wide
30 | review is essential - e.g., the issue spans multiple modules or exposes a systemic architectural flaw — do not proceed
31 | with partial analysis. Instead, respond ONLY with the JSON below (and nothing else). Clearly state the reason why
32 | you strongly feel this is necessary and ask the agent to inform the user why you're switching to a different tool:
33 | {"status": "full_codereview_required",
34 | "important": "Please use zen's codereview tool instead",
35 | "reason": "<brief, specific rationale for escalation>"}
36 |
37 | SCOPE & FOCUS
38 | • Understand the code's purpose and architecture and the overall scope and scale of the project
39 | • Identify strengths, risks, and strategic improvement areas that affect future development
40 | • Avoid line-by-line bug hunts or minor style critiques—those are covered by CodeReview
41 | • Recommend practical, proportional changes; no "rip-and-replace" proposals unless the architecture is untenable
42 | • Identify and flag overengineered solutions — excessive abstraction, unnecessary configuration layers, or generic
43 | frameworks introduced without a clear, current need. These should be called out when they add complexity, slow
44 | onboarding, or reduce clarity, especially if the anticipated complexity is speculative or unlikely to materialize
45 | in the foreseeable future.
46 |
47 | ANALYSIS STRATEGY
48 | 1. Map the tech stack, frameworks, deployment model, and constraints
49 | 2. Determine how well current architecture serves stated business and scaling goals
50 | 3. Surface systemic risks (tech debt hot-spots, brittle modules, growth bottlenecks)
51 | 4. Highlight opportunities for strategic refactors or pattern adoption that yield high ROI
52 | 5. Provide clear, actionable insights with just enough detail to guide decision-making
53 |
54 | KEY DIMENSIONS (apply as relevant)
55 | • **Architectural Alignment** – layering, domain boundaries, CQRS/eventing, micro-vs-monolith fit
56 | • **Scalability & Performance Trajectory** – data flow, caching strategy, concurrency model
57 | • **Maintainability & Tech Debt** – module cohesion, coupling, code ownership, documentation health
58 | • **Security & Compliance Posture** – systemic exposure points, secrets management, threat surfaces
59 | • **Operational Readiness** – observability, deployment pipeline, rollback/DR strategy
60 | • **Future Proofing** – ease of feature addition, language/version roadmap, community support
61 |
62 | DELIVERABLE FORMAT
63 |
64 | ## Executive Overview
65 | One paragraph summarizing architecture fitness, key risks, and standout strengths.
66 |
67 | ## Strategic Findings (Ordered by Impact)
68 |
69 | ### 1. [FINDING NAME]
70 | **Insight:** Very concise statement of what matters and why.
71 | **Evidence:** Specific modules/files/metrics/code illustrating the point.
72 | **Impact:** How this affects scalability, maintainability, or business goals.
73 | **Recommendation:** Actionable next step (e.g., adopt pattern X, consolidate service Y).
74 | **Effort vs. Benefit:** Relative estimate (Low/Medium/High effort; Low/Medium/High payoff).
75 |
76 | ### 2. [FINDING NAME]
77 | [Repeat format...]
78 |
79 | ## Quick Wins
80 | Bullet list of low-effort changes offering immediate value.
81 |
82 | ## Long-Term Roadmap Suggestions
83 | High-level guidance for phased improvements (optional—include only if explicitly requested).
84 |
85 | Remember: focus on system-level insights that inform strategic decisions; leave granular bug fixing and style nits to
86 | the codereview tool.
87 | """
88 |
```
--------------------------------------------------------------------------------
/tests/test_prompt_size_limit_bug_fix.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Test for the prompt size limit bug fix.
3 |
4 | This test verifies that SimpleTool correctly validates only the original user prompt
5 | when conversation history is embedded, rather than validating the full enhanced prompt.
6 | """
7 |
8 | from tools.chat import ChatTool
9 | from tools.shared.base_models import ToolRequest
10 |
11 |
12 | class TestPromptSizeLimitBugFix:
13 | """Test that the prompt size limit bug is fixed"""
14 |
15 | def test_prompt_size_validation_with_conversation_history(self):
16 | """Test that prompt size validation uses original prompt when conversation history is embedded"""
17 |
18 | # Create a ChatTool instance
19 | tool = ChatTool()
20 |
21 | # Simulate a short user prompt (should not trigger size limit)
22 | short_user_prompt = "Thanks for the help!"
23 |
24 | # Simulate conversation history (large content)
25 | conversation_history = "=== CONVERSATION HISTORY ===\n" + ("Previous conversation content. " * 5000)
26 |
27 | # Simulate enhanced prompt with conversation history (what server.py creates)
28 | enhanced_prompt = f"{conversation_history}\n\n=== NEW USER INPUT ===\n{short_user_prompt}"
29 |
30 | # Simulate server.py behavior: store original prompt in _current_arguments
31 | tool._current_arguments = {
32 | "prompt": enhanced_prompt, # Enhanced with history
33 | "_original_user_prompt": short_user_prompt, # Original user input (our fix)
34 | "model": "local-llama",
35 | }
36 |
37 | # Test the hook method directly
38 | validation_content = tool.get_prompt_content_for_size_validation(enhanced_prompt)
39 |
40 | # Should return the original short prompt, not the enhanced prompt
41 | assert validation_content == short_user_prompt
42 | assert len(validation_content) == len(short_user_prompt)
43 | assert len(validation_content) < 1000 # Much smaller than enhanced prompt
44 |
45 | # Verify the enhanced prompt would have triggered the bug
46 | assert len(enhanced_prompt) > 50000 # This would trigger size limit
47 |
48 | # Test that size check passes with the original prompt
49 | size_check = tool.check_prompt_size(validation_content)
50 | assert size_check is None # No size limit error
51 |
52 | # Test that size check would fail with enhanced prompt
53 | size_check_enhanced = tool.check_prompt_size(enhanced_prompt)
54 | assert size_check_enhanced is not None # Would trigger size limit
55 | assert size_check_enhanced["status"] == "resend_prompt"
56 |
57 | def test_prompt_size_validation_without_original_prompt(self):
58 | """Test fallback behavior when no original prompt is stored (new conversations)"""
59 |
60 | tool = ChatTool()
61 |
62 | user_content = "Regular prompt without conversation history"
63 |
64 | # No _current_arguments (new conversation scenario)
65 | tool._current_arguments = None
66 |
67 | # Should fall back to validating the full user content
68 | validation_content = tool.get_prompt_content_for_size_validation(user_content)
69 | assert validation_content == user_content
70 |
71 | def test_prompt_size_validation_with_missing_original_prompt(self):
72 | """Test fallback when _current_arguments exists but no _original_user_prompt"""
73 |
74 | tool = ChatTool()
75 |
76 | user_content = "Regular prompt without conversation history"
77 |
78 | # _current_arguments exists but no _original_user_prompt field
79 | tool._current_arguments = {
80 | "prompt": user_content,
81 | "model": "local-llama",
82 | # No _original_user_prompt field
83 | }
84 |
85 | # Should fall back to validating the full user content
86 | validation_content = tool.get_prompt_content_for_size_validation(user_content)
87 | assert validation_content == user_content
88 |
89 | def test_base_tool_default_behavior(self):
90 | """Test that BaseTool's default implementation validates full content"""
91 |
92 | from tools.shared.base_tool import BaseTool
93 |
94 | # Create a minimal tool implementation for testing
95 | class TestTool(BaseTool):
96 | def get_name(self) -> str:
97 | return "test"
98 |
99 | def get_description(self) -> str:
100 | return "Test tool"
101 |
102 | def get_input_schema(self) -> dict:
103 | return {}
104 |
105 | def get_request_model(self):
106 | return ToolRequest
107 |
108 | def get_system_prompt(self) -> str:
109 | return "Test system prompt"
110 |
111 | async def prepare_prompt(self, request) -> str:
112 | return "Test prompt"
113 |
114 | async def execute(self, arguments: dict) -> list:
115 | return []
116 |
117 | tool = TestTool()
118 | user_content = "Test content"
119 |
120 | # Default implementation should return the same content
121 | validation_content = tool.get_prompt_content_for_size_validation(user_content)
122 | assert validation_content == user_content
123 |
```
--------------------------------------------------------------------------------
/tests/test_azure_openai_provider.py:
--------------------------------------------------------------------------------
```python
1 | import sys
2 | import types
3 |
4 | import pytest
5 |
6 | if "openai" not in sys.modules: # pragma: no cover - test shim for optional dependency
7 | stub = types.ModuleType("openai")
8 | stub.AzureOpenAI = object # Replaced with a mock inside tests
9 | sys.modules["openai"] = stub
10 |
11 | from providers.azure_openai import AzureOpenAIProvider
12 | from providers.shared import ModelCapabilities, ProviderType
13 |
14 |
15 | class _DummyResponse:
16 | def __init__(self):
17 | self.choices = [
18 | types.SimpleNamespace(
19 | message=types.SimpleNamespace(content="hello"),
20 | finish_reason="stop",
21 | )
22 | ]
23 | self.model = "prod-gpt4o"
24 | self.id = "resp-123"
25 | self.created = 0
26 | self.usage = types.SimpleNamespace(
27 | prompt_tokens=5,
28 | completion_tokens=3,
29 | total_tokens=8,
30 | )
31 |
32 |
33 | @pytest.fixture
34 | def dummy_azure_client(monkeypatch):
35 | captured = {}
36 |
37 | class _DummyAzureClient:
38 | def __init__(self, **kwargs):
39 | captured["client_kwargs"] = kwargs
40 | self.chat = types.SimpleNamespace(completions=types.SimpleNamespace(create=self._create_completion))
41 | self.responses = types.SimpleNamespace(create=self._create_response)
42 |
43 | def _create_completion(self, **kwargs):
44 | captured["request_kwargs"] = kwargs
45 | return _DummyResponse()
46 |
47 | def _create_response(self, **kwargs):
48 | captured["responses_kwargs"] = kwargs
49 | return _DummyResponse()
50 |
51 | monkeypatch.delenv("AZURE_OPENAI_ALLOWED_MODELS", raising=False)
52 | monkeypatch.setattr("providers.azure_openai.AzureOpenAI", _DummyAzureClient)
53 | return captured
54 |
55 |
56 | def test_generate_content_uses_deployment_mapping(dummy_azure_client):
57 | provider = AzureOpenAIProvider(
58 | api_key="key",
59 | azure_endpoint="https://example.openai.azure.com/",
60 | deployments={"gpt-4o": "prod-gpt4o"},
61 | )
62 |
63 | result = provider.generate_content("hello", "gpt-4o")
64 |
65 | assert dummy_azure_client["request_kwargs"]["model"] == "prod-gpt4o"
66 | assert result.model_name == "gpt-4o"
67 | assert result.provider == ProviderType.AZURE
68 | assert provider.validate_model_name("prod-gpt4o")
69 |
70 |
71 | def test_generate_content_accepts_deployment_alias(dummy_azure_client):
72 | provider = AzureOpenAIProvider(
73 | api_key="key",
74 | azure_endpoint="https://example.openai.azure.com/",
75 | deployments={"gpt-4o-mini": "mini-deployment"},
76 | )
77 |
78 | # Calling with the deployment alias should still resolve properly.
79 | result = provider.generate_content("hi", "mini-deployment")
80 |
81 | assert dummy_azure_client["request_kwargs"]["model"] == "mini-deployment"
82 | assert result.model_name == "gpt-4o-mini"
83 |
84 |
85 | def test_client_initialization_uses_endpoint_and_version(dummy_azure_client):
86 | provider = AzureOpenAIProvider(
87 | api_key="key",
88 | azure_endpoint="https://example.openai.azure.com/",
89 | api_version="2024-03-15-preview",
90 | deployments={"gpt-4o": "prod"},
91 | )
92 |
93 | _ = provider.client
94 |
95 | assert dummy_azure_client["client_kwargs"]["azure_endpoint"] == "https://example.openai.azure.com"
96 | assert dummy_azure_client["client_kwargs"]["api_version"] == "2024-03-15-preview"
97 |
98 |
99 | def test_deployment_overrides_capabilities(dummy_azure_client):
100 | provider = AzureOpenAIProvider(
101 | api_key="key",
102 | azure_endpoint="https://example.openai.azure.com/",
103 | deployments={
104 | "gpt-4o": {
105 | "deployment": "prod-gpt4o",
106 | "friendly_name": "Azure GPT-4o EU",
107 | "intelligence_score": 19,
108 | "supports_temperature": False,
109 | "temperature_constraint": "fixed",
110 | }
111 | },
112 | )
113 |
114 | caps = provider.get_capabilities("gpt-4o")
115 | assert caps.friendly_name == "Azure GPT-4o EU"
116 | assert caps.intelligence_score == 19
117 | assert not caps.supports_temperature
118 |
119 |
120 | def test_registry_configuration_merges_capabilities(dummy_azure_client, monkeypatch):
121 | def fake_registry_entries(self):
122 | capability = ModelCapabilities(
123 | provider=ProviderType.AZURE,
124 | model_name="gpt-4o",
125 | friendly_name="Azure GPT-4o Registry",
126 | context_window=500_000,
127 | max_output_tokens=128_000,
128 | )
129 | return {"gpt-4o": {"deployment": "registry-deployment", "capability": capability}}
130 |
131 | monkeypatch.setattr(AzureOpenAIProvider, "_load_registry_entries", fake_registry_entries)
132 |
133 | provider = AzureOpenAIProvider(
134 | api_key="key",
135 | azure_endpoint="https://example.openai.azure.com/",
136 | )
137 |
138 | # Capability should come from registry
139 | caps = provider.get_capabilities("gpt-4o")
140 | assert caps.friendly_name == "Azure GPT-4o Registry"
141 | assert caps.context_window == 500_000
142 |
143 | # API call should use deployment defined in registry
144 | provider.generate_content("hello", "gpt-4o")
145 | assert dummy_azure_client["request_kwargs"]["model"] == "registry-deployment"
146 |
```
--------------------------------------------------------------------------------
/tests/test_openai_compatible_token_usage.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for OpenAI-compatible provider token usage extraction."""
2 |
3 | import unittest
4 | from unittest.mock import Mock
5 |
6 | from providers.openai_compatible import OpenAICompatibleProvider
7 |
8 |
9 | class TestOpenAICompatibleTokenUsage(unittest.TestCase):
10 | """Test OpenAI-compatible provider token usage handling."""
11 |
12 | def setUp(self):
13 | """Set up test fixtures."""
14 |
15 | # Create a concrete implementation for testing
16 | class TestProvider(OpenAICompatibleProvider):
17 | FRIENDLY_NAME = "Test"
18 | MODEL_CAPABILITIES = {"test-model": {"context_window": 4096}}
19 |
20 | def get_capabilities(self, model_name):
21 | return Mock()
22 |
23 | def get_provider_type(self):
24 | return Mock()
25 |
26 | def validate_model_name(self, model_name):
27 | return True
28 |
29 | def list_models(self, **kwargs):
30 | return ["test-model"]
31 |
32 | self.provider = TestProvider("test-key")
33 |
34 | def test_extract_usage_with_valid_tokens(self):
35 | """Test token extraction with valid token counts."""
36 | response = Mock()
37 | response.usage = Mock()
38 | response.usage.prompt_tokens = 100
39 | response.usage.completion_tokens = 50
40 | response.usage.total_tokens = 150
41 |
42 | usage = self.provider._extract_usage(response)
43 |
44 | self.assertEqual(usage["input_tokens"], 100)
45 | self.assertEqual(usage["output_tokens"], 50)
46 | self.assertEqual(usage["total_tokens"], 150)
47 |
48 | def test_extract_usage_with_none_prompt_tokens(self):
49 | """Test token extraction when prompt_tokens is None (regression test for bug)."""
50 | response = Mock()
51 | response.usage = Mock()
52 | response.usage.prompt_tokens = None # This was causing crashes
53 | response.usage.completion_tokens = 50
54 | response.usage.total_tokens = None
55 |
56 | usage = self.provider._extract_usage(response)
57 |
58 | # Should default to 0 when None
59 | self.assertEqual(usage["input_tokens"], 0)
60 | self.assertEqual(usage["output_tokens"], 50)
61 | self.assertEqual(usage["total_tokens"], 0)
62 |
63 | def test_extract_usage_with_none_completion_tokens(self):
64 | """Test token extraction when completion_tokens is None (regression test for bug)."""
65 | response = Mock()
66 | response.usage = Mock()
67 | response.usage.prompt_tokens = 100
68 | response.usage.completion_tokens = None # This was causing crashes
69 | response.usage.total_tokens = None
70 |
71 | usage = self.provider._extract_usage(response)
72 |
73 | self.assertEqual(usage["input_tokens"], 100)
74 | # Should default to 0 when None
75 | self.assertEqual(usage["output_tokens"], 0)
76 | self.assertEqual(usage["total_tokens"], 0)
77 |
78 | def test_extract_usage_with_all_none_tokens(self):
79 | """Test token extraction when all token counts are None."""
80 | response = Mock()
81 | response.usage = Mock()
82 | response.usage.prompt_tokens = None
83 | response.usage.completion_tokens = None
84 | response.usage.total_tokens = None
85 |
86 | usage = self.provider._extract_usage(response)
87 |
88 | # Should default to 0 for all when None
89 | self.assertEqual(usage["input_tokens"], 0)
90 | self.assertEqual(usage["output_tokens"], 0)
91 | self.assertEqual(usage["total_tokens"], 0)
92 |
93 | def test_extract_usage_without_usage(self):
94 | """Test token extraction when response has no usage."""
95 | response = Mock(spec=[]) # No usage attribute
96 |
97 | usage = self.provider._extract_usage(response)
98 |
99 | # Should return empty dict
100 | self.assertEqual(usage, {})
101 |
102 | def test_extract_usage_with_zero_tokens(self):
103 | """Test token extraction with zero token counts."""
104 | response = Mock()
105 | response.usage = Mock()
106 | response.usage.prompt_tokens = 0
107 | response.usage.completion_tokens = 0
108 | response.usage.total_tokens = 0
109 |
110 | usage = self.provider._extract_usage(response)
111 |
112 | self.assertEqual(usage["input_tokens"], 0)
113 | self.assertEqual(usage["output_tokens"], 0)
114 | self.assertEqual(usage["total_tokens"], 0)
115 |
116 | def test_alternative_token_format_with_none(self):
117 | """Test alternative token format (input_tokens/output_tokens) with None values."""
118 | # This tests the other code path in generate_content_openai_responses
119 | # Simulate a response with input_tokens/output_tokens attributes that could be None
120 | response = Mock()
121 | response.input_tokens = None # This was causing crashes
122 | response.output_tokens = 50
123 |
124 | # Test the pattern: getattr(response, "input_tokens", 0) or 0
125 | input_tokens = getattr(response, "input_tokens", 0) or 0
126 | output_tokens = getattr(response, "output_tokens", 0) or 0
127 |
128 | # Should not crash and should handle None gracefully
129 | self.assertEqual(input_tokens, 0)
130 | self.assertEqual(output_tokens, 50)
131 |
132 | # Test that addition works
133 | total = input_tokens + output_tokens
134 | self.assertEqual(total, 50)
135 |
136 |
137 | if __name__ == "__main__":
138 | unittest.main()
139 |
```
--------------------------------------------------------------------------------
/tests/test_o3_pro_output_text_fix.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for o3-pro output_text parsing fix using HTTP transport recording.
3 |
4 | This test validates the fix that uses `response.output_text` convenience field
5 | instead of manually parsing `response.output.content[].text`.
6 |
7 | Uses HTTP transport recorder to record real o3-pro API responses at the HTTP level while allowing
8 | the OpenAI SDK to create real response objects that we can test.
9 |
10 | RECORDING: To record new responses, delete the cassette file and run with real API keys.
11 | """
12 |
13 | import logging
14 | import os
15 | import tempfile
16 | from pathlib import Path
17 | from unittest.mock import patch
18 |
19 | import pytest
20 | from dotenv import load_dotenv
21 |
22 | from providers import ModelProviderRegistry
23 | from tests.transport_helpers import inject_transport
24 | from tools.chat import ChatTool
25 |
26 | logger = logging.getLogger(__name__)
27 |
28 | # Load environment variables from .env file
29 | load_dotenv()
30 |
31 | # Use absolute path for cassette directory
32 | cassette_dir = Path(__file__).parent / "openai_cassettes"
33 | cassette_dir.mkdir(exist_ok=True)
34 |
35 |
36 | @pytest.mark.asyncio
37 | class TestO3ProOutputTextFix:
38 | """Test o3-pro response parsing fix using respx for HTTP recording/replay."""
39 |
40 | def setup_method(self):
41 | """Set up the test by ensuring clean registry state."""
42 | # Use the new public API for registry cleanup
43 | ModelProviderRegistry.reset_for_testing()
44 | # Provider registration is now handled by inject_transport helper
45 |
46 | # Clear restriction service to ensure it re-reads environment
47 | # This is necessary because previous tests may have set restrictions
48 | # that are cached in the singleton
49 | import utils.model_restrictions
50 |
51 | utils.model_restrictions._restriction_service = None
52 |
53 | def teardown_method(self):
54 | """Clean up after test to ensure no state pollution."""
55 | # Use the new public API for registry cleanup
56 | ModelProviderRegistry.reset_for_testing()
57 |
58 | @pytest.mark.no_mock_provider # Disable provider mocking for this test
59 | @patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3-pro", "LOCALE": ""})
60 | async def test_o3_pro_uses_output_text_field(self, monkeypatch):
61 | """Test that o3-pro parsing uses the output_text convenience field via ChatTool."""
62 | cassette_path = cassette_dir / "o3_pro_basic_math.json"
63 |
64 | # Check if we need to record or replay
65 | if not cassette_path.exists():
66 | # Recording mode - check for real API key
67 | real_api_key = os.getenv("OPENAI_API_KEY", "").strip()
68 | if not real_api_key or real_api_key.startswith("dummy"):
69 | pytest.fail(
70 | f"Cassette file not found at {cassette_path}. "
71 | "To record: Set OPENAI_API_KEY environment variable to a valid key and run this test. "
72 | "Note: Recording will make a real API call to OpenAI."
73 | )
74 | # Real API key is available, we'll record the cassette
75 | logger.debug("🎬 Recording mode: Using real API key to record cassette")
76 | else:
77 | # Replay mode - use dummy key
78 | monkeypatch.setenv("OPENAI_API_KEY", "dummy-key-for-replay")
79 | logger.debug("📼 Replay mode: Using recorded cassette")
80 |
81 | # Simplified transport injection - just one line!
82 | inject_transport(monkeypatch, cassette_path)
83 |
84 | # Execute ChatTool test with custom transport
85 | result = await self._execute_chat_tool_test()
86 |
87 | # Verify the response works correctly
88 | self._verify_chat_tool_response(result)
89 |
90 | # Verify cassette exists
91 | assert cassette_path.exists()
92 |
93 | async def _execute_chat_tool_test(self):
94 | """Execute the ChatTool with o3-pro and return the result."""
95 | chat_tool = ChatTool()
96 | with tempfile.TemporaryDirectory() as workdir:
97 | arguments = {
98 | "prompt": "What is 2 + 2?",
99 | "model": "o3-pro",
100 | "temperature": 1.0,
101 | "working_directory_absolute_path": workdir,
102 | }
103 |
104 | return await chat_tool.execute(arguments)
105 |
106 | def _verify_chat_tool_response(self, result):
107 | """Verify the ChatTool response contains expected data."""
108 | # Basic response validation
109 | assert result is not None
110 | assert isinstance(result, list)
111 | assert len(result) > 0
112 | assert result[0].type == "text"
113 |
114 | # Parse JSON response
115 | import json
116 |
117 | response_data = json.loads(result[0].text)
118 |
119 | # Debug log the response
120 | logger.debug(f"Response data: {json.dumps(response_data, indent=2)}")
121 |
122 | # Verify response structure - no cargo culting
123 | if response_data["status"] == "error":
124 | pytest.fail(f"Chat tool returned error: {response_data.get('error', 'Unknown error')}")
125 | assert response_data["status"] in ["success", "continuation_available"]
126 | assert "4" in response_data["content"]
127 |
128 | # Verify o3-pro was actually used
129 | metadata = response_data["metadata"]
130 | assert metadata["model_used"] == "o3-pro"
131 | assert metadata["provider_used"] == "openai"
132 |
```
--------------------------------------------------------------------------------
/tools/shared/schema_builders.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Core schema building functionality for Zen MCP tools.
3 |
4 | This module provides base schema generation functionality for simple tools.
5 | Workflow-specific schema building is located in workflow/schema_builders.py
6 | to maintain proper separation of concerns.
7 | """
8 |
9 | from typing import Any
10 |
11 | from .base_models import COMMON_FIELD_DESCRIPTIONS
12 |
13 |
14 | class SchemaBuilder:
15 | """
16 | Base schema builder for simple MCP tools.
17 |
18 | This class provides static methods to build consistent schemas for simple tools.
19 | Workflow tools use WorkflowSchemaBuilder in workflow/schema_builders.py.
20 | """
21 |
22 | # Common field schemas that can be reused across all tool types
23 | COMMON_FIELD_SCHEMAS = {
24 | "temperature": {
25 | "type": "number",
26 | "description": COMMON_FIELD_DESCRIPTIONS["temperature"],
27 | "minimum": 0.0,
28 | "maximum": 1.0,
29 | },
30 | "thinking_mode": {
31 | "type": "string",
32 | "enum": ["minimal", "low", "medium", "high", "max"],
33 | "description": COMMON_FIELD_DESCRIPTIONS["thinking_mode"],
34 | },
35 | "continuation_id": {
36 | "type": "string",
37 | "description": COMMON_FIELD_DESCRIPTIONS["continuation_id"],
38 | },
39 | "images": {
40 | "type": "array",
41 | "items": {"type": "string"},
42 | "description": COMMON_FIELD_DESCRIPTIONS["images"],
43 | },
44 | }
45 |
46 | # Simple tool-specific field schemas (workflow tools use relevant_files instead)
47 | SIMPLE_FIELD_SCHEMAS = {
48 | "absolute_file_paths": {
49 | "type": "array",
50 | "items": {"type": "string"},
51 | "description": COMMON_FIELD_DESCRIPTIONS["absolute_file_paths"],
52 | },
53 | }
54 |
55 | @staticmethod
56 | def build_schema(
57 | tool_specific_fields: dict[str, dict[str, Any]] = None,
58 | required_fields: list[str] = None,
59 | model_field_schema: dict[str, Any] = None,
60 | auto_mode: bool = False,
61 | require_model: bool = False,
62 | ) -> dict[str, Any]:
63 | """
64 | Build complete schema for simple tools.
65 |
66 | Args:
67 | tool_specific_fields: Additional fields specific to the tool
68 | required_fields: List of required field names
69 | model_field_schema: Schema for the model field
70 | auto_mode: Whether the tool is in auto mode (affects model requirement)
71 |
72 | Returns:
73 | Complete JSON schema for the tool
74 | """
75 | properties = {}
76 |
77 | # Add common fields (temperature, thinking_mode, etc.)
78 | properties.update(SchemaBuilder.COMMON_FIELD_SCHEMAS)
79 |
80 | # Add simple tool-specific fields (files field for simple tools)
81 | properties.update(SchemaBuilder.SIMPLE_FIELD_SCHEMAS)
82 |
83 | # Add model field if provided
84 | if model_field_schema:
85 | properties["model"] = model_field_schema
86 |
87 | # Add tool-specific fields if provided
88 | if tool_specific_fields:
89 | properties.update(tool_specific_fields)
90 |
91 | # Build required fields list
92 | required = list(required_fields) if required_fields else []
93 | if (auto_mode or require_model) and "model" not in required:
94 | required.append("model")
95 |
96 | # Build the complete schema
97 | schema = {
98 | "$schema": "http://json-schema.org/draft-07/schema#",
99 | "type": "object",
100 | "properties": properties,
101 | "additionalProperties": False,
102 | }
103 |
104 | if required:
105 | schema["required"] = required
106 |
107 | return schema
108 |
109 | @staticmethod
110 | def get_common_fields() -> dict[str, dict[str, Any]]:
111 | """Get the standard field schemas for simple tools."""
112 | return SchemaBuilder.COMMON_FIELD_SCHEMAS.copy()
113 |
114 | @staticmethod
115 | def create_field_schema(
116 | field_type: str,
117 | description: str,
118 | enum_values: list[str] = None,
119 | minimum: float = None,
120 | maximum: float = None,
121 | items_type: str = None,
122 | default: Any = None,
123 | ) -> dict[str, Any]:
124 | """
125 | Helper method to create field schemas with common patterns.
126 |
127 | Args:
128 | field_type: JSON schema type ("string", "number", "array", etc.)
129 | description: Human-readable description of the field
130 | enum_values: For enum fields, list of allowed values
131 | minimum: For numeric fields, minimum value
132 | maximum: For numeric fields, maximum value
133 | items_type: For array fields, type of array items
134 | default: Default value for the field
135 |
136 | Returns:
137 | JSON schema object for the field
138 | """
139 | schema = {
140 | "type": field_type,
141 | "description": description,
142 | }
143 |
144 | if enum_values:
145 | schema["enum"] = enum_values
146 |
147 | if minimum is not None:
148 | schema["minimum"] = minimum
149 |
150 | if maximum is not None:
151 | schema["maximum"] = maximum
152 |
153 | if items_type and field_type == "array":
154 | schema["items"] = {"type": items_type}
155 |
156 | if default is not None:
157 | schema["default"] = default
158 |
159 | return schema
160 |
```
--------------------------------------------------------------------------------
/docs/testing.md:
--------------------------------------------------------------------------------
```markdown
1 | # Testing Guide
2 |
3 | This project includes comprehensive test coverage through unit tests and integration simulator tests.
4 |
5 | ## Running Tests
6 |
7 | ### Prerequisites
8 | - Environment set up: `./run-server.sh`
9 | - Use `./run-server.sh -f` to automatically follow logs after starting
10 |
11 | ### Unit Tests
12 |
13 | Run all unit tests with pytest:
14 | ```bash
15 | # Run all tests with verbose output
16 | python -m pytest -xvs
17 |
18 | # Run specific test file
19 | python -m pytest tests/test_providers.py -xvs
20 | ```
21 |
22 | ### Simulator Tests
23 |
24 | Simulator tests replicate real-world Claude CLI interactions with the standalone MCP server. Unlike unit tests that test isolated functions, simulator tests validate the complete end-to-end flow including:
25 | - Actual MCP protocol communication
26 | - Standalone server interactions
27 | - Multi-turn conversations across tools
28 | - Log output validation
29 |
30 | **Important**: Simulator tests require `LOG_LEVEL=DEBUG` in your `.env` file to validate detailed execution logs.
31 |
32 | #### Monitoring Logs During Tests
33 |
34 | **Important**: The MCP stdio protocol interferes with stderr output during tool execution. Tool execution logs are written to local log files. This is a known limitation of the stdio-based MCP protocol.
35 |
36 | To monitor logs during test execution:
37 |
38 | ```bash
39 | # Start server and automatically follow logs
40 | ./run-server.sh -f
41 |
42 | # Or manually monitor main server logs (includes all tool execution details)
43 | tail -f -n 500 logs/mcp_server.log
44 |
45 | # Monitor MCP activity logs (tool calls and completions)
46 | tail -f logs/mcp_activity.log
47 |
48 | # Check log file sizes (logs rotate at 20MB)
49 | ls -lh logs/mcp_*.log*
50 | ```
51 |
52 | **Log Rotation**: All log files are configured with automatic rotation at 20MB to prevent disk space issues. The server keeps:
53 | - 10 rotated files for mcp_server.log (200MB total)
54 | - 5 rotated files for mcp_activity.log (100MB total)
55 |
56 | **Why logs appear in files**: The MCP stdio_server captures stderr during tool execution to prevent interference with the JSON-RPC protocol communication. This means tool execution logs are written to files rather than displayed in console output.
57 |
58 | #### Running All Simulator Tests
59 | ```bash
60 | # Run all simulator tests
61 | python communication_simulator_test.py
62 |
63 | # Run with verbose output for debugging
64 | python communication_simulator_test.py --verbose
65 |
66 | # Keep server logs after tests for inspection
67 | python communication_simulator_test.py --keep-logs
68 | ```
69 |
70 | #### Running Individual Tests
71 | To run a single simulator test in isolation (useful for debugging or test development):
72 |
73 | ```bash
74 | # Run a specific test by name
75 | python communication_simulator_test.py --individual basic_conversation
76 |
77 | # Examples of available tests:
78 | python communication_simulator_test.py --individual content_validation
79 | python communication_simulator_test.py --individual cross_tool_continuation
80 | python communication_simulator_test.py --individual memory_validation
81 | ```
82 |
83 | #### Other Options
84 | ```bash
85 | # List all available simulator tests with descriptions
86 | python communication_simulator_test.py --list-tests
87 |
88 | # Run multiple specific tests (not all)
89 | python communication_simulator_test.py --tests basic_conversation content_validation
90 |
91 | ```
92 |
93 | ### Code Quality Checks
94 |
95 | Before committing, ensure all linting passes:
96 | ```bash
97 | # Run all linting checks
98 | ruff check .
99 | black --check .
100 | isort --check-only .
101 |
102 | # Auto-fix issues
103 | ruff check . --fix
104 | black .
105 | isort .
106 | ```
107 |
108 | ## What Each Test Suite Covers
109 |
110 | ### Unit Tests
111 | Test isolated components and functions:
112 | - **Provider functionality**: Model initialization, API interactions, capability checks
113 | - **Tool operations**: All MCP tools (chat, analyze, debug, etc.)
114 | - **Conversation memory**: Threading, continuation, history management
115 | - **File handling**: Path validation, token limits, deduplication
116 | - **Auto mode**: Model selection logic and fallback behavior
117 |
118 | ### HTTP Recording/Replay Tests (HTTP Transport Recorder)
119 | Tests for expensive API calls (like o3-pro) use custom recording/replay:
120 | - **Real API validation**: Tests against actual provider responses
121 | - **Cost efficiency**: Record once, replay forever
122 | - **Provider compatibility**: Validates fixes against real APIs
123 | - Uses HTTP Transport Recorder for httpx-based API calls
124 | - See [HTTP Recording/Replay Testing Guide](./vcr-testing.md) for details
125 |
126 | ### Simulator Tests
127 | Validate real-world usage scenarios by simulating actual Claude prompts:
128 | - **Basic conversations**: Multi-turn chat functionality with real prompts
129 | - **Cross-tool continuation**: Context preservation across different tools
130 | - **File deduplication**: Efficient handling of repeated file references
131 | - **Model selection**: Proper routing to configured providers
132 | - **Token allocation**: Context window management in practice
133 | - **Redis validation**: Conversation persistence and retrieval
134 |
135 | ## Contributing
136 |
137 | For detailed contribution guidelines, testing requirements, and code quality standards, please see our [Contributing Guide](./contributions.md).
138 |
139 | ### Quick Testing Reference
140 |
141 | ```bash
142 | # Run quality checks
143 | ./code_quality_checks.sh
144 |
145 | # Run unit tests
146 | python -m pytest -xvs
147 |
148 | # Run simulator tests (for tool changes)
149 | python communication_simulator_test.py
150 | ```
151 |
152 | Remember: All tests must pass before submitting a PR. See the [Contributing Guide](./contributions.md) for complete requirements.
```
--------------------------------------------------------------------------------
/clink/parsers/claude.py:
--------------------------------------------------------------------------------
```python
1 | """Parser for Claude CLI JSON output."""
2 |
3 | from __future__ import annotations
4 |
5 | import json
6 | from typing import Any
7 |
8 | from .base import BaseParser, ParsedCLIResponse, ParserError
9 |
10 |
11 | class ClaudeJSONParser(BaseParser):
12 | """Parse stdout produced by `claude --output-format json`."""
13 |
14 | name = "claude_json"
15 |
16 | def parse(self, stdout: str, stderr: str) -> ParsedCLIResponse:
17 | if not stdout.strip():
18 | raise ParserError("Claude CLI returned empty stdout while JSON output was expected")
19 |
20 | try:
21 | loaded = json.loads(stdout)
22 | except json.JSONDecodeError as exc: # pragma: no cover - defensive logging
23 | raise ParserError(f"Failed to decode Claude CLI JSON output: {exc}") from exc
24 |
25 | events: list[dict[str, Any]] | None = None
26 | assistant_entry: dict[str, Any] | None = None
27 |
28 | if isinstance(loaded, dict):
29 | payload: dict[str, Any] = loaded
30 | elif isinstance(loaded, list):
31 | events = [item for item in loaded if isinstance(item, dict)]
32 | result_entry = next(
33 | (item for item in events if item.get("type") == "result" or "result" in item),
34 | None,
35 | )
36 | assistant_entry = next(
37 | (item for item in reversed(events) if item.get("type") == "assistant"),
38 | None,
39 | )
40 | payload = result_entry or assistant_entry or (events[-1] if events else {})
41 | if not payload:
42 | raise ParserError("Claude CLI JSON array did not contain any parsable objects")
43 | else:
44 | raise ParserError("Claude CLI returned unexpected JSON payload")
45 |
46 | metadata = self._build_metadata(payload, stderr)
47 | if events is not None:
48 | metadata["raw_events"] = events
49 | metadata["raw"] = loaded
50 |
51 | result = payload.get("result")
52 | content: str = ""
53 | if isinstance(result, str):
54 | content = result.strip()
55 | elif isinstance(result, list):
56 | # Some CLI flows may emit a list of strings; join them conservatively.
57 | joined = [part.strip() for part in result if isinstance(part, str) and part.strip()]
58 | content = "\n".join(joined)
59 |
60 | if content:
61 | return ParsedCLIResponse(content=content, metadata=metadata)
62 |
63 | message = self._extract_message(payload)
64 | if message is None and assistant_entry and assistant_entry is not payload:
65 | message = self._extract_message(assistant_entry)
66 | if message:
67 | return ParsedCLIResponse(content=message, metadata=metadata)
68 |
69 | stderr_text = stderr.strip()
70 | if stderr_text:
71 | metadata.setdefault("stderr", stderr_text)
72 | return ParsedCLIResponse(
73 | content="Claude CLI returned no textual result. Raw stderr was preserved for troubleshooting.",
74 | metadata=metadata,
75 | )
76 |
77 | raise ParserError("Claude CLI response did not contain a textual result")
78 |
79 | def _build_metadata(self, payload: dict[str, Any], stderr: str) -> dict[str, Any]:
80 | metadata: dict[str, Any] = {
81 | "raw": payload,
82 | "is_error": bool(payload.get("is_error")),
83 | }
84 |
85 | type_field = payload.get("type")
86 | if isinstance(type_field, str):
87 | metadata["type"] = type_field
88 | subtype_field = payload.get("subtype")
89 | if isinstance(subtype_field, str):
90 | metadata["subtype"] = subtype_field
91 |
92 | duration_ms = payload.get("duration_ms")
93 | if isinstance(duration_ms, (int, float)):
94 | metadata["duration_ms"] = duration_ms
95 | api_duration = payload.get("duration_api_ms")
96 | if isinstance(api_duration, (int, float)):
97 | metadata["duration_api_ms"] = api_duration
98 |
99 | usage = payload.get("usage")
100 | if isinstance(usage, dict):
101 | metadata["usage"] = usage
102 |
103 | model_usage = payload.get("modelUsage")
104 | if isinstance(model_usage, dict) and model_usage:
105 | metadata["model_usage"] = model_usage
106 | first_model = next(iter(model_usage.keys()))
107 | metadata["model_used"] = first_model
108 |
109 | permission_denials = payload.get("permission_denials")
110 | if isinstance(permission_denials, list) and permission_denials:
111 | metadata["permission_denials"] = permission_denials
112 |
113 | session_id = payload.get("session_id")
114 | if isinstance(session_id, str) and session_id:
115 | metadata["session_id"] = session_id
116 | uuid_field = payload.get("uuid")
117 | if isinstance(uuid_field, str) and uuid_field:
118 | metadata["uuid"] = uuid_field
119 |
120 | stderr_text = stderr.strip()
121 | if stderr_text:
122 | metadata.setdefault("stderr", stderr_text)
123 |
124 | return metadata
125 |
126 | def _extract_message(self, payload: dict[str, Any]) -> str | None:
127 | message = payload.get("message")
128 | if isinstance(message, str) and message.strip():
129 | return message.strip()
130 |
131 | error_field = payload.get("error")
132 | if isinstance(error_field, dict):
133 | error_message = error_field.get("message")
134 | if isinstance(error_message, str) and error_message.strip():
135 | return error_message.strip()
136 |
137 | return None
138 |
```
--------------------------------------------------------------------------------
/docs/locale-configuration.md:
--------------------------------------------------------------------------------
```markdown
1 | # Locale Configuration for Zen MCP Server
2 |
3 | This guide explains how to configure and use the localization feature to customize the language of responses from MCP tools.
4 |
5 | ## Overview
6 |
7 | The localization feature allows you to specify the language in which MCP tools should respond, while maintaining their analytical capabilities. This is especially useful for non-English speakers who want to receive answers in their native language.
8 |
9 | ## Configuration
10 |
11 | ### 1. Environment Variable
12 |
13 | Set the language using the `LOCALE` environment variable in your `.env` file:
14 |
15 | ```bash
16 | # In your .env file
17 | LOCALE=fr-FR
18 | ```
19 |
20 | ### 2. Supported Languages
21 |
22 | You can use any standard language code. Examples:
23 |
24 | - `fr-FR` - French (France)
25 | - `en-US` - English (United States)
26 | - `zh-CN` - Chinese (Simplified)
27 | - `zh-TW` - Chinese (Traditional)
28 | - `ja-JP` - Japanese
29 | - `ko-KR` - Korean
30 | - `es-ES` - Spanish (Spain)
31 | - `de-DE` - German (Germany)
32 | - `it-IT` - Italian (Italy)
33 | - `pt-PT` - Portuguese (Portugal)
34 | - `ru-RU` - Russian (Russia)
35 | - `ar-SA` - Arabic (Saudi Arabia)
36 |
37 | ### 3. Default Behavior
38 |
39 | If no language is specified (`LOCALE` is empty or unset), tools will default to English.
40 |
41 | ## Technical Implementation
42 |
43 | ### Architecture
44 |
45 | Localization is implemented in the `BaseTool` class in `tools/shared/base_tool.py`. All tools inherit this feature automatically.
46 |
47 | ### `get_language_instruction()` Method
48 |
49 | ```python
50 | def get_language_instruction(self) -> str:
51 | """
52 | Generate language instruction based on LOCALE configuration.
53 | Returns:
54 | str: Language instruction to prepend to prompt, or empty string if no locale set
55 | """
56 | import os
57 |
58 | locale = os.getenv("LOCALE", "").strip()
59 |
60 | if not locale:
61 | return ""
62 |
63 | return f"Always respond in {locale}.\n\n"
64 | ```
65 |
66 | ### Integration in Tool Execution
67 |
68 | The language instruction is automatically prepended to the system prompt of each tool:
69 |
70 | ```python
71 | # In tools/simple/base.py
72 | base_system_prompt = self.get_system_prompt()
73 | language_instruction = self.get_language_instruction()
74 | system_prompt = language_instruction + base_system_prompt
75 | ```
76 |
77 | ## Usage
78 |
79 | ### 1. Basic Setup
80 |
81 | 1. Edit your `.env` file:
82 | ```bash
83 | LOCALE=fr-FR
84 | ```
85 | 2. Restart the MCP server:
86 | ```bash
87 | ./run-server.sh
88 | ```
89 | 3. Use any tool – responses will be in the specified language.
90 |
91 | ### 2. Example
92 |
93 | **Before (default English):**
94 | ```
95 | Tool: chat
96 | Input: "Explain how to use Python dictionaries"
97 | Output: "Python dictionaries are key-value pairs that allow you to store and organize data..."
98 | ```
99 |
100 | **After (with LOCALE=fr-FR):**
101 | ```
102 | Tool: chat
103 | Input: "Explain how to use Python dictionaries"
104 | Output: "Les dictionnaires Python sont des paires clé-valeur qui permettent de stocker et d'organiser des données..."
105 | ```
106 |
107 | ### 3. Affected Tools
108 |
109 | All MCP tools are affected by this configuration:
110 |
111 | - `chat` – General conversation
112 | - `codereview` – Code review
113 | - `analyze` – Code analysis
114 | - `debug` – Debugging
115 | - `refactor` – Refactoring
116 | - `thinkdeep` – Deep thinking
117 | - `consensus` – Model consensus
118 | - And all other tools...
119 |
120 | ## Best Practices
121 |
122 | ### 1. Language Choice
123 | - Use standard language codes (ISO 639-1 with ISO 3166-1 country codes)
124 | - Be specific with regional variants if needed (e.g., `zh-CN` vs `zh-TW`)
125 |
126 | ### 2. Consistency
127 | - Use the same language setting across your team for consistency
128 | - Document the chosen language in your team documentation
129 |
130 | ### 3. Testing
131 | - Test the configuration with different tools to ensure consistency
132 |
133 | ## Troubleshooting
134 |
135 | ### Issue: Language does not change
136 | **Solution:**
137 | 1. Check that the `LOCALE` variable is correctly set in `.env`
138 | 2. Fully restart the MCP server
139 | 3. Ensure there are no extra spaces in the value
140 |
141 | ### Issue: Partially translated responses
142 | **Explanation:**
143 | - AI models may sometimes mix languages
144 | - This depends on the multilingual capabilities of the model used
145 | - Technical terms may remain in English
146 |
147 | ### Issue: Configuration errors
148 | **Solution:**
149 | 1. Check the syntax of your `.env` file
150 | 2. Make sure there are no quotes around the value
151 |
152 | ## Advanced Customization
153 |
154 | ### Customizing the Language Instruction
155 |
156 | To customize the language instruction, modify the `get_language_instruction()` method in `tools/shared/base_tool.py`:
157 |
158 | ```python
159 | def get_language_instruction(self) -> str:
160 | import os
161 |
162 | locale = os.getenv("LOCALE", "").strip()
163 |
164 | if not locale:
165 | return ""
166 | # Custom instruction
167 | return f"Always respond in {locale} and use a professional tone.\n\n"
168 | ```
169 |
170 | ### Per-Tool Customization
171 |
172 | You can also override the method in specific tools for custom behavior:
173 |
174 | ```python
175 | class MyCustomTool(SimpleTool):
176 | def get_language_instruction(self) -> str:
177 | import os
178 |
179 | locale = os.getenv("LOCALE", "").strip()
180 |
181 | if locale == "fr-FR":
182 | return "Respond in French with precise technical vocabulary.\n\n"
183 | elif locale == "zh-CN":
184 | return "请用中文回答,使用专业术语。\n\n"
185 | else:
186 | return super().get_language_instruction()
187 | ```
188 |
189 | ## Integration with Other Features
190 |
191 | Localization works with all other MCP server features:
192 |
193 | - **Conversation threading** – Multilingual conversations are supported
194 | - **File processing** – File analysis is in the specified language
195 | - **Web search** – Search instructions remain functional
196 | - **Model selection** – Works with all supported models
197 |
```
--------------------------------------------------------------------------------
/conf/gemini_models.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "_README": {
3 | "description": "Model metadata for Google's Gemini API access.",
4 | "documentation": "https://github.com/BeehiveInnovations/zen-mcp-server/blob/main/docs/custom_models.md",
5 | "usage": "Models listed here are exposed directly through the Gemini provider. Aliases are case-insensitive.",
6 | "field_notes": "Matches providers/shared/model_capabilities.py.",
7 | "field_descriptions": {
8 | "model_name": "The model identifier (e.g., 'gemini-2.5-pro', 'gemini-2.0-flash')",
9 | "aliases": "Array of short names users can type instead of the full model name",
10 | "context_window": "Total number of tokens the model can process (input + output combined)",
11 | "max_output_tokens": "Maximum number of tokens the model can generate in a single response",
12 | "max_thinking_tokens": "Maximum reasoning/thinking tokens the model will allocate when extended thinking is requested",
13 | "supports_extended_thinking": "Whether the model supports extended reasoning tokens (currently none do via OpenRouter or custom APIs)",
14 | "supports_json_mode": "Whether the model can guarantee valid JSON output",
15 | "supports_function_calling": "Whether the model supports function/tool calling",
16 | "supports_images": "Whether the model can process images/visual input",
17 | "max_image_size_mb": "Maximum total size in MB for all images combined (capped at 40MB max for custom models)",
18 | "supports_temperature": "Whether the model accepts temperature parameter in API calls (set to false for O3/O4 reasoning models)",
19 | "temperature_constraint": "Type of temperature constraint: 'fixed' (fixed value), 'range' (continuous range), 'discrete' (specific values), or omit for default range",
20 | "use_openai_response_api": "Set to true when the model must use the /responses endpoint (reasoning models like GPT-5 Pro). Leave false/omit for standard chat completions.",
21 | "default_reasoning_effort": "Default reasoning effort level for models that support it (e.g., 'low', 'medium', 'high'). Omit if not applicable.",
22 | "description": "Human-readable description of the model",
23 | "intelligence_score": "1-20 human rating used as the primary signal for auto-mode model ordering",
24 | "allow_code_generation": "Whether this model can generate and suggest fully working code - complete with functions, files, and detailed implementation instructions - for your AI tool to use right away. Only set this to 'true' for a model more capable than the AI model / CLI you're currently using."
25 | }
26 | },
27 | "models": [
28 | {
29 | "model_name": "gemini-2.5-pro",
30 | "friendly_name": "Gemini (Pro 2.5)",
31 | "aliases": [
32 | "pro",
33 | "gemini pro",
34 | "gemini-pro"
35 | ],
36 | "intelligence_score": 18,
37 | "description": "Deep reasoning + thinking mode (1M context) - Complex problems, architecture, deep analysis",
38 | "context_window": 1048576,
39 | "max_output_tokens": 65536,
40 | "max_thinking_tokens": 32768,
41 | "supports_extended_thinking": true,
42 | "supports_system_prompts": true,
43 | "supports_streaming": true,
44 | "supports_function_calling": true,
45 | "supports_json_mode": true,
46 | "supports_images": true,
47 | "supports_temperature": true,
48 | "allow_code_generation": true,
49 | "max_image_size_mb": 32.0
50 | },
51 | {
52 | "model_name": "gemini-2.0-flash",
53 | "friendly_name": "Gemini (Flash 2.0)",
54 | "aliases": [
55 | "flash-2.0",
56 | "flash2"
57 | ],
58 | "intelligence_score": 9,
59 | "description": "Gemini 2.0 Flash (1M context) - Latest fast model with experimental thinking, supports audio/video input",
60 | "context_window": 1048576,
61 | "max_output_tokens": 65536,
62 | "max_thinking_tokens": 24576,
63 | "supports_extended_thinking": true,
64 | "supports_system_prompts": true,
65 | "supports_streaming": true,
66 | "supports_function_calling": true,
67 | "supports_json_mode": true,
68 | "supports_images": true,
69 | "supports_temperature": true,
70 | "max_image_size_mb": 20.0
71 | },
72 | {
73 | "model_name": "gemini-2.0-flash-lite",
74 | "friendly_name": "Gemini (Flash Lite 2.0)",
75 | "aliases": [
76 | "flashlite",
77 | "flash-lite"
78 | ],
79 | "intelligence_score": 7,
80 | "description": "Gemini 2.0 Flash Lite (1M context) - Lightweight fast model, text-only",
81 | "context_window": 1048576,
82 | "max_output_tokens": 65536,
83 | "supports_extended_thinking": false,
84 | "supports_system_prompts": true,
85 | "supports_streaming": true,
86 | "supports_function_calling": true,
87 | "supports_json_mode": true,
88 | "supports_images": false,
89 | "supports_temperature": true
90 | },
91 | {
92 | "model_name": "gemini-2.5-flash",
93 | "friendly_name": "Gemini (Flash 2.5)",
94 | "aliases": [
95 | "flash",
96 | "flash2.5"
97 | ],
98 | "intelligence_score": 10,
99 | "description": "Ultra-fast (1M context) - Quick analysis, simple queries, rapid iterations",
100 | "context_window": 1048576,
101 | "max_output_tokens": 65536,
102 | "max_thinking_tokens": 24576,
103 | "supports_extended_thinking": true,
104 | "supports_system_prompts": true,
105 | "supports_streaming": true,
106 | "supports_function_calling": true,
107 | "supports_json_mode": true,
108 | "supports_images": true,
109 | "supports_temperature": true,
110 | "max_image_size_mb": 20.0
111 | }
112 | ]
113 | }
114 |
```
--------------------------------------------------------------------------------
/tests/test_uvx_resource_packaging.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for uvx path resolution functionality."""
2 |
3 | import json
4 | import tempfile
5 | from pathlib import Path
6 | from unittest.mock import patch
7 |
8 | from providers.registries.openrouter import OpenRouterModelRegistry
9 |
10 |
11 | class TestUvxPathResolution:
12 | """Test uvx path resolution for OpenRouter model registry."""
13 |
14 | def test_normal_operation(self):
15 | """Test that normal operation works in development environment."""
16 | registry = OpenRouterModelRegistry()
17 | assert len(registry.list_models()) > 0
18 | assert len(registry.list_aliases()) > 0
19 |
20 | def test_config_path_resolution(self):
21 | """Test that the config path resolution finds the config file in multiple locations."""
22 | # Check that the config file exists in the development location
23 | config_file = Path(__file__).parent.parent / "conf" / "openrouter_models.json"
24 | assert config_file.exists(), "Config file should exist in conf/openrouter_models.json"
25 |
26 | # Test that a registry can find and use the config
27 | registry = OpenRouterModelRegistry()
28 |
29 | # When using resources, config_path is None; when using file system, it should exist
30 | if registry.use_resources:
31 | assert registry.config_path is None, "When using resources, config_path should be None"
32 | else:
33 | assert registry.config_path.exists(), "When using file system, config path should exist"
34 |
35 | assert len(registry.list_models()) > 0, "Registry should load models from config"
36 |
37 | def test_explicit_config_path_override(self):
38 | """Test that explicit config path works correctly."""
39 | config_path = Path(__file__).parent.parent / "conf" / "openrouter_models.json"
40 |
41 | registry = OpenRouterModelRegistry(config_path=str(config_path))
42 |
43 | # Should use the provided file path
44 | assert registry.config_path == config_path
45 | assert len(registry.list_models()) > 0
46 |
47 | def test_environment_variable_override(self):
48 | """Test that CUSTOM_MODELS_CONFIG_PATH environment variable works."""
49 | config_path = Path(__file__).parent.parent / "conf" / "openrouter_models.json"
50 |
51 | with patch.dict("os.environ", {"OPENROUTER_MODELS_CONFIG_PATH": str(config_path)}):
52 | registry = OpenRouterModelRegistry()
53 |
54 | # Should use environment path
55 | assert registry.config_path == config_path
56 | assert len(registry.list_models()) > 0
57 |
58 | @patch("providers.registries.base.importlib.resources.files")
59 | def test_multiple_path_fallback(self, mock_files):
60 | """Test that file-system fallback works when resource loading fails."""
61 | mock_files.side_effect = Exception("Resource loading failed")
62 |
63 | with tempfile.TemporaryDirectory() as tmpdir:
64 | temp_dir = Path(tmpdir)
65 | conf_dir = temp_dir / "conf"
66 | conf_dir.mkdir(parents=True, exist_ok=True)
67 | config_path = conf_dir / "openrouter_models.json"
68 | config_path.write_text(
69 | json.dumps(
70 | {
71 | "models": [
72 | {
73 | "model_name": "test/model",
74 | "aliases": ["testalias"],
75 | "context_window": 1024,
76 | "max_output_tokens": 512,
77 | }
78 | ]
79 | },
80 | indent=2,
81 | )
82 | )
83 |
84 | original_exists = Path.exists
85 |
86 | def fake_exists(path_self):
87 | if str(path_self).endswith("conf/openrouter_models.json") and path_self != config_path:
88 | return False
89 | if path_self == config_path:
90 | return True
91 | return original_exists(path_self)
92 |
93 | with patch("pathlib.Path.cwd", return_value=temp_dir), patch("pathlib.Path.exists", fake_exists):
94 | registry = OpenRouterModelRegistry()
95 |
96 | assert not registry.use_resources
97 | assert registry.config_path == config_path
98 | assert "test/model" in registry.list_models()
99 |
100 | def test_missing_config_handling(self):
101 | """Test behavior when config file is missing."""
102 | # Use a non-existent path
103 | with patch.dict("os.environ", {}, clear=True):
104 | registry = OpenRouterModelRegistry(config_path="/nonexistent/path/config.json")
105 |
106 | # Should gracefully handle missing config
107 | assert len(registry.list_models()) == 0
108 | assert len(registry.list_aliases()) == 0
109 |
110 | def test_resource_loading_success(self):
111 | """Test successful resource loading via importlib.resources."""
112 | # Just test that the registry works normally in our environment
113 | # This validates the resource loading mechanism indirectly
114 | registry = OpenRouterModelRegistry()
115 |
116 | # Should load successfully using either resources or file system fallback
117 | assert len(registry.list_models()) > 0
118 | assert len(registry.list_aliases()) > 0
119 |
120 | def test_use_resources_attribute(self):
121 | """Test that the use_resources attribute is properly set."""
122 | registry = OpenRouterModelRegistry()
123 |
124 | # Should have the use_resources attribute
125 | assert hasattr(registry, "use_resources")
126 | assert isinstance(registry.use_resources, bool)
127 |
```
--------------------------------------------------------------------------------
/tests/test_listmodels.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the ListModels tool"""
2 |
3 | import json
4 | import os
5 | from unittest.mock import patch
6 |
7 | import pytest
8 | from mcp.types import TextContent
9 |
10 | from tools.listmodels import ListModelsTool
11 |
12 |
13 | class TestListModelsTool:
14 | """Test the ListModels tool functionality"""
15 |
16 | @pytest.fixture
17 | def tool(self):
18 | """Create a ListModelsTool instance"""
19 | return ListModelsTool()
20 |
21 | def test_tool_metadata(self, tool):
22 | """Test tool has correct metadata"""
23 | assert tool.name == "listmodels"
24 | assert "model providers" in tool.description
25 | assert tool.get_request_model().__name__ == "ToolRequest"
26 |
27 | @pytest.mark.asyncio
28 | async def test_execute_with_no_providers(self, tool):
29 | """Test listing models with no providers configured"""
30 | with patch.dict(os.environ, {}, clear=True):
31 | # Set auto mode
32 | os.environ["DEFAULT_MODEL"] = "auto"
33 |
34 | result = await tool.execute({})
35 |
36 | assert len(result) == 1
37 | assert isinstance(result[0], TextContent)
38 |
39 | # Parse JSON response
40 | response = json.loads(result[0].text)
41 | assert response["status"] == "success"
42 |
43 | content = response["content"]
44 |
45 | # Check that providers show as not configured
46 | assert "Google Gemini ❌" in content
47 | assert "OpenAI ❌" in content
48 | assert "X.AI (Grok) ❌" in content
49 | assert "OpenRouter ❌" in content
50 | assert "Custom/Local API ❌" in content
51 |
52 | # Check summary shows 0 configured
53 | assert "**Configured Providers**: 0" in content
54 |
55 | @pytest.mark.asyncio
56 | async def test_execute_with_gemini_configured(self, tool):
57 | """Test listing models with Gemini configured"""
58 | env_vars = {"GEMINI_API_KEY": "test-key", "DEFAULT_MODEL": "auto"}
59 |
60 | with patch.dict(os.environ, env_vars, clear=True):
61 | result = await tool.execute({})
62 |
63 | response = json.loads(result[0].text)
64 | content = response["content"]
65 |
66 | # Check Gemini shows as configured
67 | assert "Google Gemini ✅" in content
68 | assert "`flash` → `gemini-2.5-flash`" in content
69 | assert "`pro` → `gemini-2.5-pro`" in content
70 | assert "1M context" in content
71 | assert "Supports structured code generation" in content
72 |
73 | # Check summary
74 | assert "**Configured Providers**: 1" in content
75 |
76 | @pytest.mark.asyncio
77 | async def test_execute_with_multiple_providers(self, tool):
78 | """Test listing models with multiple providers configured"""
79 | env_vars = {
80 | "GEMINI_API_KEY": "test-key",
81 | "OPENAI_API_KEY": "test-key",
82 | "XAI_API_KEY": "test-key",
83 | "DEFAULT_MODEL": "auto",
84 | }
85 |
86 | with patch.dict(os.environ, env_vars, clear=True):
87 | result = await tool.execute({})
88 |
89 | response = json.loads(result[0].text)
90 | content = response["content"]
91 |
92 | # Check all show as configured
93 | assert "Google Gemini ✅" in content
94 | assert "OpenAI ✅" in content
95 | assert "X.AI (Grok) ✅" in content
96 |
97 | # Check models are listed
98 | assert "`o3`" in content
99 | assert "`grok`" in content
100 |
101 | # Check summary
102 | assert "**Configured Providers**: 3" in content
103 |
104 | @pytest.mark.asyncio
105 | async def test_execute_with_openrouter(self, tool):
106 | """Test listing models with OpenRouter configured"""
107 | env_vars = {"OPENROUTER_API_KEY": "test-key", "DEFAULT_MODEL": "auto"}
108 |
109 | with patch.dict(os.environ, env_vars, clear=True):
110 | result = await tool.execute({})
111 |
112 | response = json.loads(result[0].text)
113 | content = response["content"]
114 |
115 | # Check OpenRouter shows as configured
116 | assert "OpenRouter ✅" in content
117 | assert "Access to multiple cloud AI providers" in content
118 |
119 | # Should show some models (mocked registry will have some)
120 | assert "Available Models" in content
121 |
122 | @pytest.mark.asyncio
123 | async def test_execute_with_custom_api(self, tool):
124 | """Test listing models with custom API configured"""
125 | env_vars = {"CUSTOM_API_URL": "http://localhost:11434", "DEFAULT_MODEL": "auto"}
126 |
127 | with patch.dict(os.environ, env_vars, clear=True):
128 | result = await tool.execute({})
129 |
130 | response = json.loads(result[0].text)
131 | content = response["content"]
132 |
133 | # Check Custom API shows as configured
134 | assert "Custom/Local API ✅" in content
135 | assert "http://localhost:11434" in content
136 | assert "Local models via Ollama" in content
137 |
138 | @pytest.mark.asyncio
139 | async def test_output_includes_usage_tips(self, tool):
140 | """Test that output includes helpful usage tips"""
141 | result = await tool.execute({})
142 |
143 | response = json.loads(result[0].text)
144 | content = response["content"]
145 |
146 | # Check for usage tips
147 | assert "**Usage Tips**:" in content
148 | assert "Use model aliases" in content
149 | assert "auto mode" in content
150 |
151 | def test_model_category(self, tool):
152 | """Test that tool uses FAST_RESPONSE category"""
153 | from tools.models import ToolModelCategory
154 |
155 | assert tool.get_model_category() == ToolModelCategory.FAST_RESPONSE
156 |
```
--------------------------------------------------------------------------------
/providers/openai.py:
--------------------------------------------------------------------------------
```python
1 | """OpenAI model provider implementation."""
2 |
3 | import logging
4 | from typing import TYPE_CHECKING, ClassVar, Optional
5 |
6 | if TYPE_CHECKING:
7 | from tools.models import ToolModelCategory
8 |
9 | from .openai_compatible import OpenAICompatibleProvider
10 | from .registries.openai import OpenAIModelRegistry
11 | from .registry_provider_mixin import RegistryBackedProviderMixin
12 | from .shared import ModelCapabilities, ProviderType
13 |
14 | logger = logging.getLogger(__name__)
15 |
16 |
17 | class OpenAIModelProvider(RegistryBackedProviderMixin, OpenAICompatibleProvider):
18 | """Implementation that talks to api.openai.com using rich model metadata.
19 |
20 | In addition to the built-in catalogue, the provider can surface models
21 | defined in ``conf/custom_models.json`` (for organisations running their own
22 | OpenAI-compatible gateways) while still respecting restriction policies.
23 | """
24 |
25 | REGISTRY_CLASS = OpenAIModelRegistry
26 | MODEL_CAPABILITIES: ClassVar[dict[str, ModelCapabilities]] = {}
27 |
28 | def __init__(self, api_key: str, **kwargs):
29 | """Initialize OpenAI provider with API key."""
30 | self._ensure_registry()
31 | # Set default OpenAI base URL, allow override for regions/custom endpoints
32 | kwargs.setdefault("base_url", "https://api.openai.com/v1")
33 | super().__init__(api_key, **kwargs)
34 | self._invalidate_capability_cache()
35 |
36 | # ------------------------------------------------------------------
37 | # Capability surface
38 | # ------------------------------------------------------------------
39 |
40 | def _lookup_capabilities(
41 | self,
42 | canonical_name: str,
43 | requested_name: Optional[str] = None,
44 | ) -> Optional[ModelCapabilities]:
45 | """Look up OpenAI capabilities from built-ins or the custom registry."""
46 |
47 | self._ensure_registry()
48 | builtin = super()._lookup_capabilities(canonical_name, requested_name)
49 | if builtin is not None:
50 | return builtin
51 |
52 | try:
53 | from .registries.openrouter import OpenRouterModelRegistry
54 |
55 | registry = OpenRouterModelRegistry()
56 | config = registry.get_model_config(canonical_name)
57 |
58 | if config and config.provider == ProviderType.OPENAI:
59 | return config
60 |
61 | except Exception as exc: # pragma: no cover - registry failures are non-critical
62 | logger.debug(f"Could not resolve custom OpenAI model '{canonical_name}': {exc}")
63 |
64 | return None
65 |
66 | def _finalise_capabilities(
67 | self,
68 | capabilities: ModelCapabilities,
69 | canonical_name: str,
70 | requested_name: str,
71 | ) -> ModelCapabilities:
72 | """Ensure registry-sourced models report the correct provider type."""
73 |
74 | if capabilities.provider != ProviderType.OPENAI:
75 | capabilities.provider = ProviderType.OPENAI
76 | return capabilities
77 |
78 | def _raise_unsupported_model(self, model_name: str) -> None:
79 | raise ValueError(f"Unsupported OpenAI model: {model_name}")
80 |
81 | # ------------------------------------------------------------------
82 | # Provider identity
83 | # ------------------------------------------------------------------
84 |
85 | def get_provider_type(self) -> ProviderType:
86 | """Get the provider type."""
87 | return ProviderType.OPENAI
88 |
89 | # ------------------------------------------------------------------
90 | # Provider preferences
91 | # ------------------------------------------------------------------
92 |
93 | def get_preferred_model(self, category: "ToolModelCategory", allowed_models: list[str]) -> Optional[str]:
94 | """Get OpenAI's preferred model for a given category from allowed models.
95 |
96 | Args:
97 | category: The tool category requiring a model
98 | allowed_models: Pre-filtered list of models allowed by restrictions
99 |
100 | Returns:
101 | Preferred model name or None
102 | """
103 | from tools.models import ToolModelCategory
104 |
105 | if not allowed_models:
106 | return None
107 |
108 | # Helper to find first available from preference list
109 | def find_first(preferences: list[str]) -> Optional[str]:
110 | """Return first available model from preference list."""
111 | for model in preferences:
112 | if model in allowed_models:
113 | return model
114 | return None
115 |
116 | if category == ToolModelCategory.EXTENDED_REASONING:
117 | # Prefer models with extended thinking support
118 | # GPT-5-Codex first for coding tasks
119 | preferred = find_first(["gpt-5-codex", "gpt-5-pro", "o3", "o3-pro", "gpt-5"])
120 | return preferred if preferred else allowed_models[0]
121 |
122 | elif category == ToolModelCategory.FAST_RESPONSE:
123 | # Prefer fast, cost-efficient models
124 | # GPT-5 models for speed, GPT-5-Codex after (premium pricing but cached)
125 | preferred = find_first(["gpt-5", "gpt-5-mini", "gpt-5-codex", "o4-mini", "o3-mini"])
126 | return preferred if preferred else allowed_models[0]
127 |
128 | else: # BALANCED or default
129 | # Prefer balanced performance/cost models
130 | # Include GPT-5-Codex for coding workflows
131 | preferred = find_first(["gpt-5", "gpt-5-codex", "gpt-5-pro", "gpt-5-mini", "o4-mini", "o3-mini"])
132 | return preferred if preferred else allowed_models[0]
133 |
134 |
135 | # Load registry data at import time so dependent providers (Azure) can reuse it
136 | OpenAIModelProvider._ensure_registry()
137 |
```
--------------------------------------------------------------------------------
/simulator_tests/__init__.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Communication Simulator Tests Package
3 |
4 | This package contains individual test modules for the Zen MCP Communication Simulator.
5 | Each test is in its own file for better organization and maintainability.
6 | """
7 |
8 | from .base_test import BaseSimulatorTest
9 | from .test_analyze_validation import AnalyzeValidationTest
10 | from .test_basic_conversation import BasicConversationTest
11 | from .test_chat_simple_validation import ChatSimpleValidationTest
12 | from .test_codereview_validation import CodeReviewValidationTest
13 | from .test_consensus_conversation import TestConsensusConversation
14 | from .test_consensus_three_models import TestConsensusThreeModels
15 | from .test_consensus_workflow_accurate import TestConsensusWorkflowAccurate
16 | from .test_content_validation import ContentValidationTest
17 | from .test_conversation_chain_validation import ConversationChainValidationTest
18 | from .test_cross_tool_comprehensive import CrossToolComprehensiveTest
19 | from .test_cross_tool_continuation import CrossToolContinuationTest
20 | from .test_debug_certain_confidence import DebugCertainConfidenceTest
21 | from .test_debug_validation import DebugValidationTest
22 | from .test_line_number_validation import LineNumberValidationTest
23 | from .test_logs_validation import LogsValidationTest
24 | from .test_model_thinking_config import TestModelThinkingConfig
25 | from .test_o3_model_selection import O3ModelSelectionTest
26 | from .test_o3_pro_expensive import O3ProExpensiveTest
27 | from .test_ollama_custom_url import OllamaCustomUrlTest
28 | from .test_openrouter_fallback import OpenRouterFallbackTest
29 | from .test_openrouter_models import OpenRouterModelsTest
30 | from .test_per_tool_deduplication import PerToolDeduplicationTest
31 | from .test_planner_continuation_history import PlannerContinuationHistoryTest
32 | from .test_planner_validation import PlannerValidationTest
33 | from .test_precommitworkflow_validation import PrecommitWorkflowValidationTest
34 | from .test_prompt_size_limit_bug import PromptSizeLimitBugTest
35 |
36 | # Redis validation test removed - no longer needed for standalone server
37 | from .test_refactor_validation import RefactorValidationTest
38 | from .test_secaudit_validation import SecauditValidationTest
39 | from .test_testgen_validation import TestGenValidationTest
40 | from .test_thinkdeep_validation import ThinkDeepWorkflowValidationTest
41 | from .test_token_allocation_validation import TokenAllocationValidationTest
42 | from .test_vision_capability import VisionCapabilityTest
43 | from .test_xai_models import XAIModelsTest
44 |
45 | # Test registry for dynamic loading
46 | TEST_REGISTRY = {
47 | "basic_conversation": BasicConversationTest,
48 | "chat_validation": ChatSimpleValidationTest,
49 | "codereview_validation": CodeReviewValidationTest,
50 | "content_validation": ContentValidationTest,
51 | "per_tool_deduplication": PerToolDeduplicationTest,
52 | "cross_tool_continuation": CrossToolContinuationTest,
53 | "cross_tool_comprehensive": CrossToolComprehensiveTest,
54 | "line_number_validation": LineNumberValidationTest,
55 | "logs_validation": LogsValidationTest,
56 | # "redis_validation": RedisValidationTest, # Removed - no longer needed for standalone server
57 | "model_thinking_config": TestModelThinkingConfig,
58 | "o3_model_selection": O3ModelSelectionTest,
59 | "ollama_custom_url": OllamaCustomUrlTest,
60 | "openrouter_fallback": OpenRouterFallbackTest,
61 | "openrouter_models": OpenRouterModelsTest,
62 | "planner_validation": PlannerValidationTest,
63 | "planner_continuation_history": PlannerContinuationHistoryTest,
64 | "precommit_validation": PrecommitWorkflowValidationTest,
65 | "token_allocation_validation": TokenAllocationValidationTest,
66 | "testgen_validation": TestGenValidationTest,
67 | "thinkdeep_validation": ThinkDeepWorkflowValidationTest,
68 | "refactor_validation": RefactorValidationTest,
69 | "secaudit_validation": SecauditValidationTest,
70 | "debug_validation": DebugValidationTest,
71 | "debug_certain_confidence": DebugCertainConfidenceTest,
72 | "conversation_chain_validation": ConversationChainValidationTest,
73 | "vision_capability": VisionCapabilityTest,
74 | "xai_models": XAIModelsTest,
75 | "consensus_conversation": TestConsensusConversation,
76 | "consensus_workflow_accurate": TestConsensusWorkflowAccurate,
77 | "consensus_three_models": TestConsensusThreeModels,
78 | "analyze_validation": AnalyzeValidationTest,
79 | "prompt_size_limit_bug": PromptSizeLimitBugTest,
80 | # "o3_pro_expensive": O3ProExpensiveTest, # COMMENTED OUT - too expensive to run by default
81 | }
82 |
83 | __all__ = [
84 | "BaseSimulatorTest",
85 | "BasicConversationTest",
86 | "ChatSimpleValidationTest",
87 | "CodeReviewValidationTest",
88 | "ContentValidationTest",
89 | "PerToolDeduplicationTest",
90 | "CrossToolContinuationTest",
91 | "CrossToolComprehensiveTest",
92 | "LineNumberValidationTest",
93 | "LogsValidationTest",
94 | "TestModelThinkingConfig",
95 | "O3ModelSelectionTest",
96 | "O3ProExpensiveTest",
97 | "OllamaCustomUrlTest",
98 | "OpenRouterFallbackTest",
99 | "OpenRouterModelsTest",
100 | "PlannerValidationTest",
101 | "PlannerContinuationHistoryTest",
102 | "PrecommitWorkflowValidationTest",
103 | "TokenAllocationValidationTest",
104 | "TestGenValidationTest",
105 | "ThinkDeepWorkflowValidationTest",
106 | "RefactorValidationTest",
107 | "SecauditValidationTest",
108 | "DebugValidationTest",
109 | "DebugCertainConfidenceTest",
110 | "ConversationChainValidationTest",
111 | "VisionCapabilityTest",
112 | "XAIModelsTest",
113 | "TestConsensusConversation",
114 | "TestConsensusWorkflowAccurate",
115 | "TestConsensusThreeModels",
116 | "AnalyzeValidationTest",
117 | "PromptSizeLimitBugTest",
118 | "TEST_REGISTRY",
119 | ]
120 |
```
--------------------------------------------------------------------------------
/tests/test_model_resolution_bug.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Test to reproduce and fix the OpenRouter model name resolution bug.
3 |
4 | This test specifically targets the bug where:
5 | 1. User specifies "gemini" in consensus tool
6 | 2. System incorrectly resolves to "gemini-2.5-pro" instead of "google/gemini-2.5-pro"
7 | 3. OpenRouter API returns "gemini-2.5-pro is not a valid model ID"
8 | """
9 |
10 | from unittest.mock import Mock, patch
11 |
12 | from providers.openrouter import OpenRouterProvider
13 | from providers.shared import ProviderType
14 | from tools.consensus import ConsensusTool
15 |
16 |
17 | class TestModelResolutionBug:
18 | """Test cases for the OpenRouter model name resolution bug."""
19 |
20 | def setup_method(self):
21 | """Setup test environment."""
22 | self.consensus_tool = ConsensusTool()
23 |
24 | def test_openrouter_registry_resolves_gemini_alias(self):
25 | """Test that OpenRouter registry properly resolves 'gemini' to 'google/gemini-2.5-pro'."""
26 | # Test the registry directly
27 | provider = OpenRouterProvider("test_key")
28 |
29 | # Test alias resolution
30 | resolved_model_name = provider._resolve_model_name("gemini")
31 | assert (
32 | resolved_model_name == "google/gemini-2.5-pro"
33 | ), f"Expected 'google/gemini-2.5-pro', got '{resolved_model_name}'"
34 |
35 | # Test that it also works with 'pro' alias
36 | resolved_pro = provider._resolve_model_name("pro")
37 | assert resolved_pro == "google/gemini-2.5-pro", f"Expected 'google/gemini-2.5-pro', got '{resolved_pro}'"
38 |
39 | # DELETED: test_provider_registry_returns_openrouter_for_gemini
40 | # This test had a flawed mock setup - it mocked get_provider() but called get_provider_for_model().
41 | # The test was trying to verify OpenRouter model resolution functionality that is already
42 | # comprehensively tested in working OpenRouter provider tests.
43 |
44 | @patch.dict("os.environ", {"OPENROUTER_API_KEY": "test_key"}, clear=False)
45 | def test_consensus_tool_model_resolution_bug_reproduction(self):
46 | """Test that the new consensus workflow tool properly handles OpenRouter model resolution."""
47 | import asyncio
48 |
49 | # Create a mock OpenRouter provider that tracks what model names it receives
50 | mock_provider = Mock(spec=OpenRouterProvider)
51 | mock_provider.get_provider_type.return_value = ProviderType.OPENROUTER
52 |
53 | # Mock response for successful generation
54 | mock_response = Mock()
55 | mock_response.content = "Test response"
56 | mock_response.usage = None
57 | mock_provider.generate_content.return_value = mock_response
58 |
59 | # Track the model name passed to generate_content
60 | received_model_names = []
61 |
62 | def track_generate_content(*args, **kwargs):
63 | received_model_names.append(kwargs.get("model_name", args[1] if len(args) > 1 else "unknown"))
64 | return mock_response
65 |
66 | mock_provider.generate_content.side_effect = track_generate_content
67 |
68 | # Mock the get_model_provider to return our mock
69 | with patch.object(self.consensus_tool, "get_model_provider", return_value=mock_provider):
70 | # Set initial prompt
71 | self.consensus_tool.initial_prompt = "Test prompt"
72 |
73 | # Create a mock request
74 | request = Mock()
75 | request.relevant_files = []
76 | request.continuation_id = None
77 | request.images = None
78 |
79 | # Test model consultation directly
80 | result = asyncio.run(self.consensus_tool._consult_model({"model": "gemini", "stance": "neutral"}, request))
81 |
82 | # Verify that generate_content was called
83 | assert len(received_model_names) == 1
84 |
85 | # The consensus tool should pass the original alias "gemini"
86 | # The OpenRouter provider should resolve it internally
87 | received_model = received_model_names[0]
88 | print(f"Model name passed to provider: {received_model}")
89 |
90 | assert received_model == "gemini", f"Expected 'gemini' to be passed to provider, got '{received_model}'"
91 |
92 | # Verify the result structure
93 | assert result["model"] == "gemini"
94 | assert result["status"] == "success"
95 |
96 | def test_bug_reproduction_with_malformed_model_name(self):
97 | """Test what happens when 'gemini-2.5-pro' (malformed) is passed to OpenRouter."""
98 | provider = OpenRouterProvider("test_key")
99 |
100 | # This should NOT resolve because 'gemini-2.5-pro' is not in the OpenRouter registry
101 | resolved = provider._resolve_model_name("gemini-2.5-pro")
102 |
103 | # The bug: this returns "gemini-2.5-pro" as-is instead of resolving to proper name
104 | # This is what causes the OpenRouter API to fail
105 | assert resolved == "gemini-2.5-pro", f"Expected fallback to 'gemini-2.5-pro', got '{resolved}'"
106 |
107 | # Verify the registry doesn't have this malformed name
108 | config = provider._registry.resolve("gemini-2.5-pro")
109 | assert config is None, "Registry should not contain 'gemini-2.5-pro' - only 'google/gemini-2.5-pro'"
110 |
111 |
112 | if __name__ == "__main__":
113 | # Run the tests
114 | test = TestModelResolutionBug()
115 | test.setup_method()
116 |
117 | print("Testing OpenRouter registry resolution...")
118 | test.test_openrouter_registry_resolves_gemini_alias()
119 | print("✅ Registry resolves aliases correctly")
120 |
121 | print("\nTesting malformed model name handling...")
122 | test.test_bug_reproduction_with_malformed_model_name()
123 | print("✅ Confirmed: malformed names fall through as-is")
124 |
125 | print("\nConsensus tool test completed successfully.")
126 |
127 | print("\nAll tests completed. The bug is fixed.")
128 |
```
--------------------------------------------------------------------------------
/tests/test_pip_detection_fix.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for pip detection fix in run-server.sh script.
2 |
3 | This test file ensures our pip detection improvements work correctly
4 | and don't break existing functionality.
5 | """
6 |
7 | import os
8 | import subprocess
9 | import tempfile
10 | from pathlib import Path
11 |
12 | import pytest
13 |
14 |
15 | class TestPipDetectionFix:
16 | """Test cases for issue #188: PIP is available but not recognized."""
17 |
18 | def test_run_server_script_syntax_valid(self):
19 | """Test that run-server.sh has valid bash syntax."""
20 | result = subprocess.run(["bash", "-n", "./run-server.sh"], capture_output=True, text=True)
21 | assert result.returncode == 0, f"Syntax error in run-server.sh: {result.stderr}"
22 |
23 | def test_run_server_has_proper_shebang(self):
24 | """Test that run-server.sh starts with proper shebang."""
25 | content = Path("./run-server.sh").read_text()
26 | assert content.startswith("#!/bin/bash"), "Script missing proper bash shebang"
27 |
28 | def test_critical_functions_exist(self):
29 | """Test that all critical functions are defined in the script."""
30 | content = Path("./run-server.sh").read_text()
31 | critical_functions = ["find_python", "setup_environment", "setup_venv", "install_dependencies", "bootstrap_pip"]
32 |
33 | for func in critical_functions:
34 | assert f"{func}()" in content, f"Critical function {func}() not found in script"
35 |
36 | def test_pip_detection_consistency_issue(self):
37 | """Test the specific issue: pip works in setup_venv but fails in install_dependencies.
38 |
39 | This test verifies that our fix ensures consistent Python executable paths.
40 | """
41 | # Test that the get_venv_python_path function now returns absolute paths
42 | content = Path("./run-server.sh").read_text()
43 |
44 | # Check that get_venv_python_path includes our absolute path conversion logic
45 | assert "abs_venv_path" in content, "get_venv_python_path should use absolute paths"
46 | assert 'cd "$(dirname' in content, "Should convert to absolute path"
47 |
48 | # Test successful completion - our fix should make the script more robust
49 | result = subprocess.run(["bash", "-n", "./run-server.sh"], capture_output=True, text=True)
50 | assert result.returncode == 0, "Script should have valid syntax after our fix"
51 |
52 | def test_pip_detection_with_non_interactive_shell(self):
53 | """Test pip detection works in non-interactive shell environments.
54 |
55 | This addresses the contributor's suggestion about non-interactive shells
56 | not sourcing ~/.bashrc where pip PATH might be defined.
57 | """
58 | # Test case for Git Bash on Windows and non-interactive Linux shells
59 | with tempfile.TemporaryDirectory() as temp_dir:
60 | # Create mock virtual environment structure
61 | venv_path = Path(temp_dir) / ".zen_venv"
62 | bin_path = venv_path / "bin"
63 | bin_path.mkdir(parents=True)
64 |
65 | # Create mock python executable
66 | python_exe = bin_path / "python"
67 | python_exe.write_text("#!/bin/bash\necho 'Python 3.12.3'\n")
68 | python_exe.chmod(0o755)
69 |
70 | # Create mock pip executable
71 | pip_exe = bin_path / "pip"
72 | pip_exe.write_text("#!/bin/bash\necho 'pip 23.0.1'\n")
73 | pip_exe.chmod(0o755)
74 |
75 | # Test that we can detect pip using explicit paths (not PATH)
76 | assert python_exe.exists(), "Mock python executable should exist"
77 | assert pip_exe.exists(), "Mock pip executable should exist"
78 | assert python_exe.is_file(), "Python should be a file"
79 | assert pip_exe.is_file(), "Pip should be a file"
80 |
81 | def test_enhanced_diagnostic_messages_included(self):
82 | """Test that our enhanced diagnostic messages are included in the script.
83 |
84 | Verify that the script contains the enhanced error diagnostics we added.
85 | """
86 | content = Path("./run-server.sh").read_text()
87 |
88 | # Check that enhanced diagnostic information is present in the script
89 | expected_diagnostic_patterns = [
90 | "Enhanced diagnostic information for debugging",
91 | "Diagnostic information:",
92 | "Python executable:",
93 | "Python executable exists:",
94 | "Python executable permissions:",
95 | "Virtual environment path:",
96 | "Virtual environment exists:",
97 | "Final diagnostic information:",
98 | ]
99 |
100 | for pattern in expected_diagnostic_patterns:
101 | assert pattern in content, f"Enhanced diagnostic pattern '{pattern}' should be in script"
102 |
103 | def test_setup_env_file_does_not_create_bsd_backup(self, tmp_path):
104 | """Ensure setup_env_file avoids creating .env'' artifacts (BSD sed behavior)."""
105 | script_path = Path("./run-server.sh").resolve()
106 |
107 | # Prepare temp workspace with example env
108 | env_example = Path(".env.example").read_text()
109 | target_example = tmp_path / ".env.example"
110 | target_example.write_text(env_example)
111 |
112 | # Run setup_env_file inside isolated shell session
113 | command = f"""
114 | set -e
115 | cd "{tmp_path}"
116 | source "{script_path}"
117 | setup_env_file
118 | """
119 | env = os.environ.copy()
120 | subprocess.run(["bash", "-lc", command], check=True, env=env, text=True)
121 |
122 | artifacts = {p.name for p in tmp_path.glob(".env*")}
123 | assert ".env''" not in artifacts, "setup_env_file should not create BSD sed backup artifacts"
124 | assert ".env" in artifacts, ".env should be created from .env.example"
125 |
126 |
127 | if __name__ == "__main__":
128 | pytest.main([__file__, "-v"])
129 |
```
--------------------------------------------------------------------------------
/tests/test_disabled_tools.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for DISABLED_TOOLS environment variable functionality."""
2 |
3 | import logging
4 | import os
5 | from unittest.mock import patch
6 |
7 | import pytest
8 |
9 | from server import (
10 | apply_tool_filter,
11 | parse_disabled_tools_env,
12 | validate_disabled_tools,
13 | )
14 |
15 |
16 | # Mock the tool classes since we're testing the filtering logic
17 | class MockTool:
18 | def __init__(self, name):
19 | self.name = name
20 |
21 |
22 | class TestDisabledTools:
23 | """Test suite for DISABLED_TOOLS functionality."""
24 |
25 | def test_parse_disabled_tools_empty(self):
26 | """Empty string returns empty set (no tools disabled)."""
27 | with patch.dict(os.environ, {"DISABLED_TOOLS": ""}):
28 | assert parse_disabled_tools_env() == set()
29 |
30 | def test_parse_disabled_tools_not_set(self):
31 | """Unset variable returns empty set."""
32 | with patch.dict(os.environ, {}, clear=True):
33 | # Ensure DISABLED_TOOLS is not in environment
34 | if "DISABLED_TOOLS" in os.environ:
35 | del os.environ["DISABLED_TOOLS"]
36 | assert parse_disabled_tools_env() == set()
37 |
38 | def test_parse_disabled_tools_single(self):
39 | """Single tool name parsed correctly."""
40 | with patch.dict(os.environ, {"DISABLED_TOOLS": "debug"}):
41 | assert parse_disabled_tools_env() == {"debug"}
42 |
43 | def test_parse_disabled_tools_multiple(self):
44 | """Multiple tools with spaces parsed correctly."""
45 | with patch.dict(os.environ, {"DISABLED_TOOLS": "debug, analyze, refactor"}):
46 | assert parse_disabled_tools_env() == {"debug", "analyze", "refactor"}
47 |
48 | def test_parse_disabled_tools_extra_spaces(self):
49 | """Extra spaces and empty items handled correctly."""
50 | with patch.dict(os.environ, {"DISABLED_TOOLS": " debug , , analyze , "}):
51 | assert parse_disabled_tools_env() == {"debug", "analyze"}
52 |
53 | def test_parse_disabled_tools_duplicates(self):
54 | """Duplicate entries handled correctly (set removes duplicates)."""
55 | with patch.dict(os.environ, {"DISABLED_TOOLS": "debug,analyze,debug"}):
56 | assert parse_disabled_tools_env() == {"debug", "analyze"}
57 |
58 | def test_tool_filtering_logic(self):
59 | """Test the complete filtering logic using the actual server functions."""
60 | # Simulate ALL_TOOLS
61 | ALL_TOOLS = {
62 | "chat": MockTool("chat"),
63 | "debug": MockTool("debug"),
64 | "analyze": MockTool("analyze"),
65 | "version": MockTool("version"),
66 | "listmodels": MockTool("listmodels"),
67 | }
68 |
69 | # Test case 1: No tools disabled
70 | disabled_tools = set()
71 | enabled_tools = apply_tool_filter(ALL_TOOLS, disabled_tools)
72 |
73 | assert len(enabled_tools) == 5 # All tools included
74 | assert set(enabled_tools.keys()) == set(ALL_TOOLS.keys())
75 |
76 | # Test case 2: Disable some regular tools
77 | disabled_tools = {"debug", "analyze"}
78 | enabled_tools = apply_tool_filter(ALL_TOOLS, disabled_tools)
79 |
80 | assert len(enabled_tools) == 3 # chat, version, listmodels
81 | assert "debug" not in enabled_tools
82 | assert "analyze" not in enabled_tools
83 | assert "chat" in enabled_tools
84 | assert "version" in enabled_tools
85 | assert "listmodels" in enabled_tools
86 |
87 | # Test case 3: Attempt to disable essential tools
88 | disabled_tools = {"version", "chat"}
89 | enabled_tools = apply_tool_filter(ALL_TOOLS, disabled_tools)
90 |
91 | assert "version" in enabled_tools # Essential tool not disabled
92 | assert "chat" not in enabled_tools # Regular tool disabled
93 | assert "listmodels" in enabled_tools # Essential tool included
94 |
95 | def test_unknown_tools_warning(self, caplog):
96 | """Test that unknown tool names generate appropriate warnings."""
97 | ALL_TOOLS = {
98 | "chat": MockTool("chat"),
99 | "debug": MockTool("debug"),
100 | "analyze": MockTool("analyze"),
101 | "version": MockTool("version"),
102 | "listmodels": MockTool("listmodels"),
103 | }
104 | disabled_tools = {"chat", "unknown_tool", "another_unknown"}
105 |
106 | with caplog.at_level(logging.WARNING):
107 | validate_disabled_tools(disabled_tools, ALL_TOOLS)
108 | assert "Unknown tools in DISABLED_TOOLS: ['another_unknown', 'unknown_tool']" in caplog.text
109 |
110 | def test_essential_tools_warning(self, caplog):
111 | """Test warning when trying to disable essential tools."""
112 | ALL_TOOLS = {
113 | "chat": MockTool("chat"),
114 | "debug": MockTool("debug"),
115 | "analyze": MockTool("analyze"),
116 | "version": MockTool("version"),
117 | "listmodels": MockTool("listmodels"),
118 | }
119 | disabled_tools = {"version", "chat", "debug"}
120 |
121 | with caplog.at_level(logging.WARNING):
122 | validate_disabled_tools(disabled_tools, ALL_TOOLS)
123 | assert "Cannot disable essential tools: ['version']" in caplog.text
124 |
125 | @pytest.mark.parametrize(
126 | "env_value,expected",
127 | [
128 | ("", set()), # Empty string
129 | (" ", set()), # Only spaces
130 | (",,,", set()), # Only commas
131 | ("chat", {"chat"}), # Single tool
132 | ("chat,debug", {"chat", "debug"}), # Multiple tools
133 | ("chat, debug, analyze", {"chat", "debug", "analyze"}), # With spaces
134 | ("chat,debug,chat", {"chat", "debug"}), # Duplicates
135 | ],
136 | )
137 | def test_parse_disabled_tools_parametrized(self, env_value, expected):
138 | """Parametrized tests for various input formats."""
139 | with patch.dict(os.environ, {"DISABLED_TOOLS": env_value}):
140 | assert parse_disabled_tools_env() == expected
141 |
```
--------------------------------------------------------------------------------
/tests/test_clink_tool.py:
--------------------------------------------------------------------------------
```python
1 | import json
2 |
3 | import pytest
4 |
5 | from clink import get_registry
6 | from clink.agents import AgentOutput
7 | from clink.parsers.base import ParsedCLIResponse
8 | from tools.clink import MAX_RESPONSE_CHARS, CLinkTool
9 |
10 |
11 | @pytest.mark.asyncio
12 | async def test_clink_tool_execute(monkeypatch):
13 | tool = CLinkTool()
14 |
15 | async def fake_run(**kwargs):
16 | return AgentOutput(
17 | parsed=ParsedCLIResponse(content="Hello from Gemini", metadata={"model_used": "gemini-2.5-pro"}),
18 | sanitized_command=["gemini", "-o", "json"],
19 | returncode=0,
20 | stdout='{"response": "Hello from Gemini"}',
21 | stderr="",
22 | duration_seconds=0.42,
23 | parser_name="gemini_json",
24 | output_file_content=None,
25 | )
26 |
27 | class DummyAgent:
28 | async def run(self, **kwargs):
29 | return await fake_run(**kwargs)
30 |
31 | def fake_create_agent(client):
32 | return DummyAgent()
33 |
34 | monkeypatch.setattr("tools.clink.create_agent", fake_create_agent)
35 |
36 | arguments = {
37 | "prompt": "Summarize the project",
38 | "cli_name": "gemini",
39 | "role": "default",
40 | "absolute_file_paths": [],
41 | "images": [],
42 | }
43 |
44 | results = await tool.execute(arguments)
45 | assert len(results) == 1
46 |
47 | payload = json.loads(results[0].text)
48 | assert payload["status"] in {"success", "continuation_available"}
49 | assert "Hello from Gemini" in payload["content"]
50 | metadata = payload.get("metadata", {})
51 | assert metadata.get("cli_name") == "gemini"
52 | assert metadata.get("command") == ["gemini", "-o", "json"]
53 |
54 |
55 | def test_registry_lists_roles():
56 | registry = get_registry()
57 | clients = registry.list_clients()
58 | assert {"codex", "gemini"}.issubset(set(clients))
59 | roles = registry.list_roles("gemini")
60 | assert "default" in roles
61 | assert "default" in registry.list_roles("codex")
62 |
63 |
64 | @pytest.mark.asyncio
65 | async def test_clink_tool_defaults_to_first_cli(monkeypatch):
66 | tool = CLinkTool()
67 |
68 | async def fake_run(**kwargs):
69 | return AgentOutput(
70 | parsed=ParsedCLIResponse(content="Default CLI response", metadata={"events": ["foo"]}),
71 | sanitized_command=["gemini"],
72 | returncode=0,
73 | stdout='{"response": "Default CLI response"}',
74 | stderr="",
75 | duration_seconds=0.1,
76 | parser_name="gemini_json",
77 | output_file_content=None,
78 | )
79 |
80 | class DummyAgent:
81 | async def run(self, **kwargs):
82 | return await fake_run(**kwargs)
83 |
84 | monkeypatch.setattr("tools.clink.create_agent", lambda client: DummyAgent())
85 |
86 | arguments = {
87 | "prompt": "Hello",
88 | "absolute_file_paths": [],
89 | "images": [],
90 | }
91 |
92 | result = await tool.execute(arguments)
93 | payload = json.loads(result[0].text)
94 | metadata = payload.get("metadata", {})
95 | assert metadata.get("cli_name") == tool._default_cli_name
96 | assert metadata.get("events_removed_for_normal") is True
97 |
98 |
99 | @pytest.mark.asyncio
100 | async def test_clink_tool_truncates_large_output(monkeypatch):
101 | tool = CLinkTool()
102 |
103 | summary_section = "<SUMMARY>This is the condensed summary.</SUMMARY>"
104 | long_text = "A" * (MAX_RESPONSE_CHARS + 500) + summary_section
105 |
106 | async def fake_run(**kwargs):
107 | return AgentOutput(
108 | parsed=ParsedCLIResponse(content=long_text, metadata={"events": ["event1", "event2"]}),
109 | sanitized_command=["codex"],
110 | returncode=0,
111 | stdout="{}",
112 | stderr="",
113 | duration_seconds=0.2,
114 | parser_name="codex_jsonl",
115 | output_file_content=None,
116 | )
117 |
118 | class DummyAgent:
119 | async def run(self, **kwargs):
120 | return await fake_run(**kwargs)
121 |
122 | monkeypatch.setattr("tools.clink.create_agent", lambda client: DummyAgent())
123 |
124 | arguments = {
125 | "prompt": "Summarize",
126 | "cli_name": tool._default_cli_name,
127 | "absolute_file_paths": [],
128 | "images": [],
129 | }
130 |
131 | result = await tool.execute(arguments)
132 | payload = json.loads(result[0].text)
133 | assert payload["status"] in {"success", "continuation_available"}
134 | assert payload["content"].strip() == "This is the condensed summary."
135 | metadata = payload.get("metadata", {})
136 | assert metadata.get("output_summarized") is True
137 | assert metadata.get("events_removed_for_normal") is True
138 | assert metadata.get("output_original_length") == len(long_text)
139 |
140 |
141 | @pytest.mark.asyncio
142 | async def test_clink_tool_truncates_without_summary(monkeypatch):
143 | tool = CLinkTool()
144 |
145 | long_text = "B" * (MAX_RESPONSE_CHARS + 1000)
146 |
147 | async def fake_run(**kwargs):
148 | return AgentOutput(
149 | parsed=ParsedCLIResponse(content=long_text, metadata={"events": ["event"]}),
150 | sanitized_command=["codex"],
151 | returncode=0,
152 | stdout="{}",
153 | stderr="",
154 | duration_seconds=0.2,
155 | parser_name="codex_jsonl",
156 | output_file_content=None,
157 | )
158 |
159 | class DummyAgent:
160 | async def run(self, **kwargs):
161 | return await fake_run(**kwargs)
162 |
163 | monkeypatch.setattr("tools.clink.create_agent", lambda client: DummyAgent())
164 |
165 | arguments = {
166 | "prompt": "Summarize",
167 | "cli_name": tool._default_cli_name,
168 | "absolute_file_paths": [],
169 | "images": [],
170 | }
171 |
172 | result = await tool.execute(arguments)
173 | payload = json.loads(result[0].text)
174 | assert payload["status"] in {"success", "continuation_available"}
175 | assert "exceeding the configured clink limit" in payload["content"]
176 | metadata = payload.get("metadata", {})
177 | assert metadata.get("output_truncated") is True
178 | assert metadata.get("events_removed_for_normal") is True
179 | assert metadata.get("output_original_length") == len(long_text)
180 |
```
--------------------------------------------------------------------------------
/tests/test_pii_sanitizer.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """Test cases for PII sanitizer."""
3 |
4 | import unittest
5 |
6 | from .pii_sanitizer import PIIPattern, PIISanitizer
7 |
8 |
9 | class TestPIISanitizer(unittest.TestCase):
10 | """Test PII sanitization functionality."""
11 |
12 | def setUp(self):
13 | """Set up test sanitizer."""
14 | self.sanitizer = PIISanitizer()
15 |
16 | def test_api_key_sanitization(self):
17 | """Test various API key formats are sanitized."""
18 | test_cases = [
19 | # OpenAI keys
20 | ("sk-proj-abcd1234567890ABCD1234567890abcd1234567890ABCD12", "sk-proj-SANITIZED"),
21 | ("sk-1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN", "sk-SANITIZED"),
22 | # Anthropic keys
23 | ("sk-ant-abcd1234567890ABCD1234567890abcd1234567890ABCD12", "sk-ant-SANITIZED"),
24 | # Google keys
25 | ("AIzaSyD-1234567890abcdefghijklmnopqrstuv", "AIza-SANITIZED"),
26 | # GitHub tokens
27 | ("ghp_1234567890abcdefghijklmnopqrstuvwxyz", "gh_SANITIZED"),
28 | ("ghs_1234567890abcdefghijklmnopqrstuvwxyz", "gh_SANITIZED"),
29 | ]
30 |
31 | for original, expected in test_cases:
32 | with self.subTest(original=original):
33 | result = self.sanitizer.sanitize_string(original)
34 | self.assertEqual(result, expected)
35 |
36 | def test_personal_info_sanitization(self):
37 | """Test personal information is sanitized."""
38 | test_cases = [
39 | # Email addresses
40 | ("[email protected]", "[email protected]"),
41 | ("[email protected]", "[email protected]"),
42 | # Phone numbers (all now use the same pattern)
43 | ("(555) 123-4567", "(XXX) XXX-XXXX"),
44 | ("555-123-4567", "(XXX) XXX-XXXX"),
45 | ("+1-555-123-4567", "(XXX) XXX-XXXX"),
46 | # SSN
47 | ("123-45-6789", "XXX-XX-XXXX"),
48 | # Credit card
49 | ("1234 5678 9012 3456", "XXXX-XXXX-XXXX-XXXX"),
50 | ("1234-5678-9012-3456", "XXXX-XXXX-XXXX-XXXX"),
51 | ]
52 |
53 | for original, expected in test_cases:
54 | with self.subTest(original=original):
55 | result = self.sanitizer.sanitize_string(original)
56 | self.assertEqual(result, expected)
57 |
58 | def test_header_sanitization(self):
59 | """Test HTTP header sanitization."""
60 | headers = {
61 | "Authorization": "Bearer sk-proj-abcd1234567890ABCD1234567890abcd1234567890ABCD12",
62 | "API-Key": "sk-1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN",
63 | "Content-Type": "application/json",
64 | "User-Agent": "MyApp/1.0",
65 | "Cookie": "session=abc123; [email protected]",
66 | }
67 |
68 | sanitized = self.sanitizer.sanitize_headers(headers)
69 |
70 | self.assertEqual(sanitized["Authorization"], "Bearer SANITIZED")
71 | self.assertEqual(sanitized["API-Key"], "sk-SANITIZED")
72 | self.assertEqual(sanitized["Content-Type"], "application/json")
73 | self.assertEqual(sanitized["User-Agent"], "MyApp/1.0")
74 | self.assertIn("[email protected]", sanitized["Cookie"])
75 |
76 | def test_nested_structure_sanitization(self):
77 | """Test sanitization of nested data structures."""
78 | data = {
79 | "user": {
80 | "email": "[email protected]",
81 | "api_key": "sk-proj-abcd1234567890ABCD1234567890abcd1234567890ABCD12",
82 | },
83 | "tokens": [
84 | "ghp_1234567890abcdefghijklmnopqrstuvwxyz",
85 | "Bearer sk-ant-abcd1234567890ABCD1234567890abcd1234567890ABCD12",
86 | ],
87 | "metadata": {"ip": "192.168.1.100", "phone": "(555) 123-4567"},
88 | }
89 |
90 | sanitized = self.sanitizer.sanitize_value(data)
91 |
92 | self.assertEqual(sanitized["user"]["email"], "[email protected]")
93 | self.assertEqual(sanitized["user"]["api_key"], "sk-proj-SANITIZED")
94 | self.assertEqual(sanitized["tokens"][0], "gh_SANITIZED")
95 | self.assertEqual(sanitized["tokens"][1], "Bearer sk-ant-SANITIZED")
96 | self.assertEqual(sanitized["metadata"]["ip"], "0.0.0.0")
97 | self.assertEqual(sanitized["metadata"]["phone"], "(XXX) XXX-XXXX")
98 |
99 | def test_url_sanitization(self):
100 | """Test URL parameter sanitization."""
101 | urls = [
102 | (
103 | "https://api.example.com/v1/users?api_key=sk-1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN",
104 | "https://api.example.com/v1/users?api_key=SANITIZED",
105 | ),
106 | (
107 | "https://example.com/login?token=ghp_1234567890abcdefghijklmnopqrstuvwxyz&user=test",
108 | "https://example.com/login?token=SANITIZED&user=test",
109 | ),
110 | ]
111 |
112 | for original, expected in urls:
113 | with self.subTest(url=original):
114 | result = self.sanitizer.sanitize_url(original)
115 | self.assertEqual(result, expected)
116 |
117 | def test_disable_sanitization(self):
118 | """Test that sanitization can be disabled."""
119 | self.sanitizer.sanitize_enabled = False
120 |
121 | sensitive_data = "sk-proj-abcd1234567890ABCD1234567890abcd1234567890ABCD12"
122 | result = self.sanitizer.sanitize_string(sensitive_data)
123 |
124 | # Should return original when disabled
125 | self.assertEqual(result, sensitive_data)
126 |
127 | def test_custom_pattern(self):
128 | """Test adding custom PII patterns."""
129 | # Add custom pattern for internal employee IDs
130 | custom_pattern = PIIPattern.create(
131 | name="employee_id", pattern=r"EMP\d{6}", replacement="EMP-REDACTED", description="Internal employee IDs"
132 | )
133 |
134 | self.sanitizer.add_pattern(custom_pattern)
135 |
136 | text = "Employee EMP123456 has access to the system"
137 | result = self.sanitizer.sanitize_string(text)
138 |
139 | self.assertEqual(result, "Employee EMP-REDACTED has access to the system")
140 |
141 |
142 | if __name__ == "__main__":
143 | unittest.main()
144 |
```
--------------------------------------------------------------------------------
/tests/test_rate_limit_patterns.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Test to verify structured error code-based retry logic.
3 | """
4 |
5 | from providers.gemini import GeminiModelProvider
6 | from providers.openai import OpenAIModelProvider
7 |
8 |
9 | def test_openai_structured_error_retry_logic():
10 | """Test OpenAI provider's structured error code retry logic."""
11 | provider = OpenAIModelProvider(api_key="test-key")
12 |
13 | # Test structured token-related 429 error (should NOT be retried)
14 | class MockTokenError(Exception):
15 | def __init__(self):
16 | # Simulate the actual error format from OpenAI API
17 | self.args = (
18 | "Error code: 429 - {'error': {'message': 'Request too large for o3', 'type': 'tokens', 'code': 'rate_limit_exceeded'}}",
19 | )
20 |
21 | token_error = MockTokenError()
22 | assert not provider._is_error_retryable(token_error), "Token-related 429 should not be retryable"
23 |
24 | # Test standard rate limiting 429 error (should be retried)
25 | class MockRateLimitError(Exception):
26 | def __init__(self):
27 | self.args = (
28 | "Error code: 429 - {'error': {'message': 'Too many requests', 'type': 'requests', 'code': 'rate_limit_exceeded'}}",
29 | )
30 |
31 | rate_limit_error = MockRateLimitError()
32 | assert provider._is_error_retryable(rate_limit_error), "Request rate limiting should be retryable"
33 |
34 | # Test context length error (should NOT be retried)
35 | class MockContextError(Exception):
36 | def __init__(self):
37 | self.args = (
38 | "Error code: 429 - {'error': {'message': 'Context length exceeded', 'code': 'context_length_exceeded'}}",
39 | )
40 |
41 | context_error = MockContextError()
42 | assert not provider._is_error_retryable(context_error), "Context length errors should not be retryable"
43 |
44 |
45 | def test_gemini_structured_error_retry_logic():
46 | """Test Gemini provider's structured error code retry logic."""
47 | provider = GeminiModelProvider(api_key="test-key")
48 |
49 | # Test quota exceeded error (should NOT be retried)
50 | class MockQuotaError(Exception):
51 | def __init__(self):
52 | self.args = ("429 Resource exhausted: Quota exceeded for model",)
53 | self.details = "quota_exceeded"
54 |
55 | quota_error = MockQuotaError()
56 | assert not provider._is_error_retryable(quota_error), "Quota exceeded should not be retryable"
57 |
58 | # Test resource exhausted error (should NOT be retried)
59 | class MockResourceError(Exception):
60 | def __init__(self):
61 | self.args = ("429 Resource exhausted: Token limit exceeded",)
62 |
63 | resource_error = MockResourceError()
64 | assert not provider._is_error_retryable(resource_error), "Resource exhausted should not be retryable"
65 |
66 | # Test temporary rate limiting (should be retried)
67 | class MockTempError(Exception):
68 | def __init__(self):
69 | self.args = ("429 Too many requests, please try again later",)
70 |
71 | temp_error = MockTempError()
72 | assert provider._is_error_retryable(temp_error), "Temporary rate limiting should be retryable"
73 |
74 |
75 | def test_actual_log_error_from_issue_with_structured_parsing():
76 | """Test the specific error from the user's log using structured parsing."""
77 | provider = OpenAIModelProvider(api_key="test-key")
78 |
79 | # Create the exact error from the user's log
80 | class MockUserLogError(Exception):
81 | def __init__(self):
82 | # This is the exact error message from the user's issue
83 | self.args = (
84 | "Error code: 429 - {'error': {'message': 'Request too large for o3 in organization org-MWp466of2XGyS90J8huQk4R6 on tokens per min (TPM): Limit 30000, Requested 31756. The input or output tokens must be reduced in order to run successfully. Visit https://platform.openai.com/account/rate-limits to learn more.', 'type': 'tokens', 'param': None, 'code': 'rate_limit_exceeded'}}",
85 | )
86 |
87 | user_error = MockUserLogError()
88 |
89 | # This specific error should NOT be retryable because it has type='tokens'
90 | assert not provider._is_error_retryable(user_error), "The user's specific error should be non-retryable"
91 |
92 |
93 | def test_non_429_errors_still_work():
94 | """Test that non-429 errors are still handled correctly."""
95 | provider = OpenAIModelProvider(api_key="test-key")
96 |
97 | # Test retryable non-429 errors
98 | class MockTimeoutError(Exception):
99 | def __init__(self):
100 | self.args = ("Connection timeout",)
101 |
102 | timeout_error = MockTimeoutError()
103 | assert provider._is_error_retryable(timeout_error), "Timeout errors should be retryable"
104 |
105 | class Mock500Error(Exception):
106 | def __init__(self):
107 | self.args = ("500 Internal Server Error",)
108 |
109 | server_error = Mock500Error()
110 | assert provider._is_error_retryable(server_error), "500 errors should be retryable"
111 |
112 | # Test non-retryable non-429 errors
113 | class MockAuthError(Exception):
114 | def __init__(self):
115 | self.args = ("401 Unauthorized",)
116 |
117 | auth_error = MockAuthError()
118 | assert not provider._is_error_retryable(auth_error), "Auth errors should not be retryable"
119 |
120 |
121 | def test_edge_cases_and_fallbacks():
122 | """Test edge cases and fallback behavior."""
123 | provider = OpenAIModelProvider(api_key="test-key")
124 |
125 | # Test malformed JSON in error (should fall back gracefully)
126 | class MockMalformedError(Exception):
127 | def __init__(self):
128 | self.args = ("Error code: 429 - {invalid json}",)
129 |
130 | malformed_error = MockMalformedError()
131 | # Should still be retryable since it's a 429 without clear non-retryable indicators
132 | assert provider._is_error_retryable(malformed_error), "Malformed 429 errors should default to retryable"
133 |
134 | # Test 429 without structured data (should be retryable by default)
135 | class MockSimple429Error(Exception):
136 | def __init__(self):
137 | self.args = ("429 Too Many Requests",)
138 |
139 | simple_429_error = MockSimple429Error()
140 | assert provider._is_error_retryable(simple_429_error), "Simple 429 without type info should be retryable"
141 |
```
--------------------------------------------------------------------------------
/tests/test_docker_volume_persistence.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for Docker volume persistence functionality
3 | """
4 |
5 | import json
6 | import os
7 | import subprocess
8 | from pathlib import Path
9 | from unittest.mock import patch
10 |
11 | import pytest
12 |
13 |
14 | class TestDockerVolumePersistence:
15 | """Test Docker volume persistence for configuration and logs"""
16 |
17 | @pytest.fixture(autouse=True)
18 | def setup(self):
19 | """Setup for each test"""
20 | self.project_root = Path(__file__).parent.parent
21 | self.docker_compose_path = self.project_root / "docker-compose.yml"
22 |
23 | def test_docker_compose_volumes_configuration(self):
24 | """Test that docker-compose.yml has proper volume configuration"""
25 | if not self.docker_compose_path.exists():
26 | pytest.skip("docker-compose.yml not found")
27 |
28 | content = self.docker_compose_path.read_text()
29 |
30 | # Check for named volume definition
31 | assert "zen-mcp-config:" in content, "zen-mcp-config volume must be defined"
32 | assert "driver: local" in content, "Named volume must use local driver"
33 |
34 | # Check for volume mounts in service
35 | assert "./logs:/app/logs" in content, "Logs volume mount required"
36 | assert "zen-mcp-config:/app/conf" in content, "Config volume mount required"
37 |
38 | def test_persistent_volume_creation(self):
39 | """Test that persistent volumes are created correctly"""
40 | # This test checks that the volume configuration is valid
41 | # In a real environment, you might want to test actual volume creation
42 | volume_name = "zen-mcp-config"
43 |
44 | # Mock Docker command to check volume exists
45 | with patch("subprocess.run") as mock_run:
46 | mock_run.return_value.returncode = 0
47 | mock_run.return_value.stdout = f"{volume_name}\n"
48 |
49 | # Simulate docker volume ls command
50 | result = subprocess.run(["docker", "volume", "ls", "--format", "{{.Name}}"], capture_output=True, text=True)
51 |
52 | assert volume_name in result.stdout
53 |
54 | def test_configuration_persistence_between_runs(self):
55 | """Test that configuration persists between container runs"""
56 | # This is a conceptual test - in practice you'd need a real Docker environment
57 | config_data = {"test_key": "test_value", "persistent": True}
58 |
59 | # Simulate writing config to persistent volume
60 | with patch("json.dump") as mock_dump:
61 | json.dump(config_data, mock_dump)
62 |
63 | # Simulate container restart and config retrieval
64 | with patch("json.load") as mock_load:
65 | mock_load.return_value = config_data
66 | loaded_config = json.load(mock_load)
67 |
68 | assert loaded_config == config_data
69 | assert loaded_config["persistent"] is True
70 |
71 | def test_log_persistence_configuration(self):
72 | """Test that log persistence is properly configured"""
73 | log_mount = "./logs:/app/logs"
74 |
75 | if self.docker_compose_path.exists():
76 | content = self.docker_compose_path.read_text()
77 | assert log_mount in content, f"Log mount {log_mount} must be configured"
78 |
79 | def test_volume_backup_restore_capability(self):
80 | """Test that volumes can be backed up and restored"""
81 | # Test backup command structure
82 | backup_cmd = [
83 | "docker",
84 | "run",
85 | "--rm",
86 | "-v",
87 | "zen-mcp-config:/data",
88 | "-v",
89 | "$(pwd):/backup",
90 | "alpine",
91 | "tar",
92 | "czf",
93 | "/backup/config-backup.tar.gz",
94 | "-C",
95 | "/data",
96 | ".",
97 | ]
98 |
99 | # Verify command structure is valid
100 | assert "zen-mcp-config:/data" in backup_cmd
101 | assert "tar" in backup_cmd
102 | assert "czf" in backup_cmd
103 |
104 | def test_volume_permissions(self):
105 | """Test that volume permissions are properly set"""
106 | # Check that logs directory has correct permissions
107 | logs_dir = self.project_root / "logs"
108 |
109 | if logs_dir.exists():
110 | # Check that directory is writable
111 | assert os.access(logs_dir, os.W_OK), "Logs directory must be writable"
112 |
113 | # Test creating a temporary file
114 | test_file = logs_dir / "test_write_permission.tmp"
115 | try:
116 | test_file.write_text("test")
117 | assert test_file.exists()
118 | finally:
119 | if test_file.exists():
120 | test_file.unlink()
121 |
122 |
123 | class TestDockerVolumeIntegration:
124 | """Integration tests for Docker volumes with MCP functionality"""
125 |
126 | def test_mcp_config_persistence(self):
127 | """Test that MCP configuration persists in named volume"""
128 | mcp_config = {"models": ["gemini-2.0-flash", "gpt-4"], "default_model": "auto", "thinking_mode": "high"}
129 |
130 | # Test config serialization/deserialization
131 | config_str = json.dumps(mcp_config)
132 | loaded_config = json.loads(config_str)
133 |
134 | assert loaded_config == mcp_config
135 | assert "models" in loaded_config
136 |
137 | def test_docker_compose_run_volume_usage(self):
138 | """Test that docker-compose run uses volumes correctly"""
139 | # Verify that docker-compose run inherits volume configuration
140 | # This is more of a configuration validation test
141 |
142 | compose_run_cmd = ["docker-compose", "run", "--rm", "zen-mcp"]
143 |
144 | # The command should work with the existing volume configuration
145 | assert "docker-compose" in compose_run_cmd
146 | assert "run" in compose_run_cmd
147 | assert "--rm" in compose_run_cmd
148 |
149 | def test_volume_data_isolation(self):
150 | """Test that different container instances share volume data correctly"""
151 | shared_data = {"instance_count": 0, "shared_state": "active"}
152 |
153 | # Simulate multiple container instances accessing shared volume
154 | for _ in range(3):
155 | shared_data["instance_count"] += 1
156 | assert shared_data["shared_state"] == "active"
157 |
158 | assert shared_data["instance_count"] == 3
159 |
```
--------------------------------------------------------------------------------
/tests/test_docker_mcp_validation.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Validation test for Docker MCP implementation
3 | """
4 |
5 | import json
6 | import os
7 | import subprocess
8 | import sys
9 | import tempfile
10 | from pathlib import Path
11 | from unittest.mock import patch
12 |
13 | import pytest
14 |
15 | # Add project root to path
16 | sys.path.insert(0, str(Path(__file__).parent.parent))
17 |
18 |
19 | class TestDockerMCPValidation:
20 | """Validation tests for Docker MCP"""
21 |
22 | @pytest.fixture(autouse=True)
23 | def setup(self):
24 | """Automatic setup for each test"""
25 | self.project_root = Path(__file__).parent.parent
26 | self.dockerfile_path = self.project_root / "Dockerfile"
27 |
28 | def test_dockerfile_exists_and_valid(self):
29 | """Test Dockerfile existence and validity"""
30 | assert self.dockerfile_path.exists(), "Missing Dockerfile"
31 |
32 | content = self.dockerfile_path.read_text()
33 | assert "FROM python:" in content, "Python base required"
34 | assert "server.py" in content, "server.py must be copied"
35 |
36 | @patch("subprocess.run")
37 | def test_docker_command_validation(self, mock_run):
38 | """Test Docker command validation"""
39 | mock_run.return_value.returncode = 0
40 |
41 | # Standard Docker MCP command
42 | cmd = ["docker", "run", "--rm", "-i", "--env-file", ".env", "zen-mcp-server:latest", "python", "server.py"]
43 |
44 | subprocess.run(cmd, capture_output=True)
45 | mock_run.assert_called_once_with(cmd, capture_output=True)
46 |
47 | def test_environment_variables_validation(self):
48 | """Test environment variables validation"""
49 | required_vars = ["GEMINI_API_KEY", "OPENAI_API_KEY", "XAI_API_KEY"]
50 |
51 | # Test with variable present
52 | with patch.dict(os.environ, {"GEMINI_API_KEY": "test"}):
53 | has_key = any(os.getenv(var) for var in required_vars)
54 | assert has_key, "At least one API key required"
55 |
56 | # Test without variables
57 | with patch.dict(os.environ, {}, clear=True):
58 | has_key = any(os.getenv(var) for var in required_vars)
59 | assert not has_key, "No key should be present"
60 |
61 | def test_docker_security_configuration(self):
62 | """Test Docker security configuration"""
63 | if not self.dockerfile_path.exists():
64 | pytest.skip("Dockerfile not found")
65 |
66 | content = self.dockerfile_path.read_text()
67 |
68 | # Check non-root user
69 | has_user_config = "USER " in content or "useradd" in content or "adduser" in content
70 |
71 | # Note: The test can be adjusted according to implementation
72 | if has_user_config:
73 | assert True, "User configuration found"
74 | else:
75 | # Warning instead of failure for flexibility
76 | pytest.warns(UserWarning, "Consider adding a non-root user")
77 |
78 |
79 | class TestDockerIntegration:
80 | """Docker-MCP integration tests"""
81 |
82 | @pytest.fixture
83 | def temp_env_file(self):
84 | """Fixture for temporary .env file"""
85 | content = """GEMINI_API_KEY=test_key
86 | LOG_LEVEL=INFO
87 | DEFAULT_MODEL=auto
88 | """
89 | with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False, encoding="utf-8") as f:
90 | f.write(content)
91 | temp_file_path = f.name
92 |
93 | # File is now closed, can yield
94 | yield temp_file_path
95 | os.unlink(temp_file_path)
96 |
97 | def test_env_file_parsing(self, temp_env_file):
98 | """Test .env file parsing"""
99 | env_vars = {}
100 |
101 | with open(temp_env_file, encoding="utf-8") as f:
102 | for line in f:
103 | line = line.strip()
104 | if line and not line.startswith("#") and "=" in line:
105 | key, value = line.split("=", 1)
106 | env_vars[key] = value
107 |
108 | assert "GEMINI_API_KEY" in env_vars
109 | assert env_vars["GEMINI_API_KEY"] == "test_key"
110 | assert env_vars["LOG_LEVEL"] == "INFO"
111 |
112 | def test_mcp_message_structure(self):
113 | """Test MCP message structure"""
114 | message = {"jsonrpc": "2.0", "method": "initialize", "params": {}, "id": 1}
115 |
116 | # Check JSON serialization
117 | json_str = json.dumps(message)
118 | parsed = json.loads(json_str)
119 |
120 | assert parsed["jsonrpc"] == "2.0"
121 | assert "method" in parsed
122 | assert "id" in parsed
123 |
124 |
125 | class TestDockerPerformance:
126 | """Docker performance tests"""
127 |
128 | def test_image_size_expectation(self):
129 | """Test expected image size"""
130 | # Maximum expected size (in MB)
131 | max_size_mb = 500
132 |
133 | # Simulation - in reality, Docker would be queried
134 | simulated_size = 294 # MB observed
135 |
136 | assert simulated_size <= max_size_mb, f"Image too large: {simulated_size}MB > {max_size_mb}MB"
137 |
138 | def test_startup_performance(self):
139 | """Test startup performance"""
140 | max_startup_seconds = 10
141 | simulated_startup = 3 # seconds
142 |
143 | assert simulated_startup <= max_startup_seconds, f"Startup too slow: {simulated_startup}s"
144 |
145 |
146 | @pytest.mark.integration
147 | class TestFullIntegration:
148 | """Full integration tests"""
149 |
150 | def test_complete_setup_simulation(self):
151 | """Simulate complete setup"""
152 | # Simulate all required components
153 | components = {
154 | "dockerfile": True,
155 | "mcp_config": True,
156 | "env_template": True,
157 | "documentation": True,
158 | }
159 |
160 | # Check that all components are present
161 | missing = [k for k, v in components.items() if not v]
162 | assert not missing, f"Missing components: {missing}"
163 |
164 | def test_docker_mcp_workflow(self):
165 | """Test complete Docker-MCP workflow"""
166 | # Workflow steps
167 | workflow_steps = [
168 | "build_image",
169 | "create_env_file",
170 | "configure_mcp_json",
171 | "test_docker_run",
172 | "validate_mcp_communication",
173 | ]
174 |
175 | # Simulate each step
176 | for step in workflow_steps:
177 | # In reality, each step would be tested individually
178 | assert step is not None, f"Step {step} not defined"
179 |
180 |
181 | if __name__ == "__main__":
182 | # Run tests with pytest
183 | pytest.main([__file__, "-v"])
184 |
```
--------------------------------------------------------------------------------
/docs/ai-collaboration.md:
--------------------------------------------------------------------------------
```markdown
1 | # AI-to-AI Conversation Threading
2 |
3 | This server enables **true AI collaboration** between Claude and multiple AI models (Gemini, O3), where they can coordinate and question each other's approaches for enhanced problem-solving and analysis.
4 |
5 | ## How It Works
6 |
7 | **Multi-Model Coordination:**
8 | - **Gemini can ask Claude follow-up questions** to clarify requirements or gather more context
9 | - **Claude can respond** with additional information, files, or refined instructions
10 | - **Claude can work independently** between exchanges - implementing solutions, gathering data, or performing analysis
11 | - **Claude can return to Gemini** with progress updates and new context for further collaboration
12 | - **Cross-tool continuation** - Start with one tool (e.g., `analyze`) and continue with another (e.g., `codereview`) using the same conversation thread
13 | - **Both AIs coordinate their approaches** - questioning assumptions, validating solutions, and building on each other's insights
14 | - Each conversation maintains full context while only sending incremental updates
15 | - Conversations are automatically managed in memory for the session duration
16 |
17 | ## Example: Multi-Model AI Coordination
18 |
19 | **Prompt:** "Debate SwiftUI vs UIKit - which is better for iOS development?"
20 |
21 | **The Conversation Flow:**
22 | 1. **You:** "Debate SwiftUI vs UIKit - which is better for iOS development?"
23 | 2. **Claude (auto mode):** "I'll orchestrate a debate between different models for diverse perspectives."
24 | 3. **Gemini Pro:** "From an architectural standpoint, SwiftUI's declarative paradigm and state management make it superior for maintainable, modern apps."
25 | 4. **O3:** "Logically analyzing the trade-offs: UIKit offers 15+ years of stability, complete control, and proven scalability. SwiftUI has <5 years maturity with ongoing breaking changes."
26 | 5. **Claude:** "Let me get Flash's quick take on developer experience..."
27 | 6. **Gemini Flash:** "SwiftUI = faster development, less code, better previews. UIKit = more control, better debugging, stable APIs."
28 | 7. **Claude's synthesis:** "Based on the multi-model analysis: Use SwiftUI for new projects prioritizing development speed, UIKit for apps requiring fine control or supporting older iOS versions."
29 |
30 | ## Asynchronous Workflow Capabilities
31 |
32 | **Independent Work Between Exchanges:**
33 | - Claude can work independently between exchanges (analyzing code, implementing fixes, gathering data)
34 | - Return to Gemini with progress updates and additional context
35 | - Each exchange shares only incremental information while maintaining full conversation history
36 | - Automatically bypasses MCP's 25K token limits through incremental updates
37 |
38 | ## Enhanced Collaboration Features
39 |
40 | **Advanced Coordination Capabilities:**
41 | - **Cross-questioning**: AIs can challenge each other's assumptions and approaches
42 | - **Coordinated problem-solving**: Each AI contributes their strengths to complex problems
43 | - **Context building**: Claude gathers information while Gemini provides deep analysis
44 | - **Approach validation**: AIs can verify and improve each other's solutions
45 | - **Cross-tool continuation**: Seamlessly continue conversations across different tools while preserving all context
46 | - **Asynchronous workflow**: Conversations don't need to be sequential - Claude can work on tasks between exchanges, then return to Gemini with additional context and progress updates
47 | - **Incremental updates**: Share only new information in each exchange while maintaining full conversation history
48 | - **Automatic 25K limit bypass**: Each exchange sends only incremental context, allowing unlimited total conversation size
49 |
50 | ## Technical Configuration
51 |
52 | **Conversation Management:**
53 | - Up to 10 exchanges per conversation (configurable via `MAX_CONVERSATION_TURNS`)
54 | - 3-hour expiry (configurable via `CONVERSATION_TIMEOUT_HOURS`)
55 | - Thread-safe with in-memory persistence across all tools
56 | - **Image context preservation** - Images and visual references are maintained across conversation turns and tool switches
57 |
58 | ## Cross-Tool & Cross-Model Continuation Example
59 |
60 | **Seamless Tool Switching with Context Preservation:**
61 |
62 | ```
63 | 1. Claude: "Analyze /src/auth.py for security issues"
64 | → Auto mode: Claude picks Gemini Pro for deep security analysis
65 | → Pro analyzes and finds vulnerabilities, provides continuation_id
66 |
67 | 2. Claude: "Review the authentication logic thoroughly"
68 | → Uses same continuation_id, but Claude picks O3 for logical analysis
69 | → O3 sees previous Pro analysis and provides logic-focused review
70 |
71 | 3. Claude: "Debug the auth test failures"
72 | → Same continuation_id, Claude keeps O3 for debugging
73 | → O3 provides targeted debugging with full context from both previous analyses
74 |
75 | 4. Claude: "Quick style check before committing"
76 | → Same thread, but Claude switches to Flash for speed
77 | → Flash quickly validates formatting with awareness of all previous fixes
78 | ```
79 |
80 | ## Key Benefits
81 |
82 | **Why AI-to-AI Collaboration Matters:**
83 | - **Diverse Perspectives**: Different models bring unique strengths to complex problems
84 | - **Context Preservation**: Full conversation history maintained across tool switches
85 | - **Efficient Communication**: Only incremental updates sent, maximizing context usage
86 | - **Coordinated Analysis**: Models can build on each other's insights rather than working in isolation
87 | - **Seamless Workflow**: Switch between tools and models without losing context
88 | - **Enhanced Problem Solving**: Multiple AI minds working together produce better solutions
89 |
90 | ## Best Practices
91 |
92 | **Maximizing AI Collaboration:**
93 | - **Let Claude orchestrate**: Allow Claude to choose appropriate models for different aspects of complex tasks
94 | - **Use continuation**: Build on previous conversations for deeper analysis
95 | - **Leverage tool switching**: Move between analysis, review, and debugging tools as needed
96 | - **Provide clear context**: Help models understand the broader goal and constraints
97 | - **Trust the process**: AI-to-AI conversations can produce insights neither model would reach alone
98 |
99 | For more information on conversation persistence and context revival, see the [Context Revival Guide](context-revival.md).
```
--------------------------------------------------------------------------------
/systemprompts/consensus_prompt.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Consensus tool system prompt for multi-model perspective gathering
3 | """
4 |
5 | CONSENSUS_PROMPT = """
6 | ROLE
7 | You are an expert technical consultant providing consensus analysis on proposals, plans, and ideas. The agent will present you
8 | with a technical proposition and your task is to deliver a structured, rigorous assessment that helps validate feasibility
9 | and implementation approaches.
10 |
11 | Your feedback carries significant weight - it may directly influence project decisions, future direction, and could have
12 | broader impacts on scale, revenue, and overall scope. The questioner values your expertise immensely and relies on your
13 | analysis to make informed decisions that affect their success.
14 |
15 | CRITICAL LINE NUMBER INSTRUCTIONS
16 | Code is presented with line number markers "LINE│ code". These markers are for reference ONLY and MUST NOT be
17 | included in any code you generate. Always reference specific line numbers in your replies in order to locate
18 | exact positions if needed to point to exact locations. Include a very short code excerpt alongside for clarity.
19 | Include context_start_text and context_end_text as backup references. Never include "LINE│" markers in generated code
20 | snippets.
21 |
22 | PERSPECTIVE FRAMEWORK
23 | {stance_prompt}
24 |
25 | IF MORE INFORMATION IS NEEDED
26 | IMPORTANT: Only request files for TECHNICAL IMPLEMENTATION questions where you need to see actual code, architecture,
27 | or technical specifications. For business strategy, product decisions, or conceptual questions, provide analysis based
28 | on the information given rather than requesting technical files.
29 |
30 | If you need additional technical context (e.g., related files, system architecture, requirements, code snippets) to
31 | provide thorough analysis of TECHNICAL IMPLEMENTATION details, you MUST ONLY respond with this exact JSON (and nothing else).
32 | Do NOT ask for the same file you've been provided unless for some reason its content is missing or incomplete:
33 | {
34 | "status": "files_required_to_continue",
35 | "mandatory_instructions": "<your critical instructions for the agent>",
36 | "files_needed": ["[file name here]", "[or some folder/]"]
37 | }
38 |
39 | For business strategy, product planning, or conceptual questions, proceed with analysis using your expertise and the
40 | context provided, even if specific technical details are not available.
41 |
42 | EVALUATION FRAMEWORK
43 | Assess the proposal across these critical dimensions. Your stance influences HOW you present findings, not WHETHER you
44 | acknowledge fundamental truths about feasibility, safety, or value:
45 |
46 | 1. TECHNICAL FEASIBILITY
47 | - Is this technically achievable with reasonable effort?
48 | - What are the core technical dependencies and requirements?
49 | - Are there any fundamental technical blockers?
50 |
51 | 2. PROJECT SUITABILITY
52 | - Does this fit the existing codebase architecture and patterns?
53 | - Is it compatible with current technology stack and constraints?
54 | - How well does it align with the project's technical direction?
55 |
56 | 3. USER VALUE ASSESSMENT
57 | - Will users actually want and use this feature?
58 | - What concrete benefits does this provide?
59 | - How does this compare to alternative solutions?
60 |
61 | 4. IMPLEMENTATION COMPLEXITY
62 | - What are the main challenges, risks, and dependencies?
63 | - What is the estimated effort and timeline?
64 | - What expertise and resources are required?
65 |
66 | 5. ALTERNATIVE APPROACHES
67 | - Are there simpler ways to achieve the same goals?
68 | - What are the trade-offs between different approaches?
69 | - Should we consider a different strategy entirely?
70 |
71 | 6. INDUSTRY PERSPECTIVE
72 | - How do similar products/companies handle this problem?
73 | - What are current best practices and emerging patterns?
74 | - Are there proven solutions or cautionary tales?
75 |
76 | 7. LONG-TERM IMPLICATIONS
77 | - Maintenance burden and technical debt considerations
78 | - Scalability and performance implications
79 | - Evolution and extensibility potential
80 |
81 | MANDATORY RESPONSE FORMAT
82 | You MUST respond in exactly this Markdown structure. Do not deviate from this format:
83 |
84 | ## Verdict
85 | Provide a single, clear sentence summarizing your overall assessment (e.g., "Technically feasible but requires significant
86 | infrastructure investment", "Strong user value proposition with manageable implementation risks", "Overly complex approach -
87 | recommend simplified alternative").
88 |
89 | ## Analysis
90 | Provide detailed assessment addressing each point in the evaluation framework. Use clear reasoning and specific examples.
91 | Be thorough but concise. Address both strengths and weaknesses objectively.
92 |
93 | ## Confidence Score
94 | Provide a numerical score from 1 (low confidence) to 10 (high confidence) followed by a brief justification explaining what
95 | drives your confidence level and what uncertainties remain.
96 | Format: "X/10 - [brief justification]"
97 | Example: "7/10 - High confidence in technical feasibility assessment based on similar implementations, but uncertain about
98 | user adoption without market validation data."
99 |
100 | ## Key Takeaways
101 | Provide 3-5 bullet points highlighting the most critical insights, risks, or recommendations. These should be actionable
102 | and specific.
103 |
104 | QUALITY STANDARDS
105 | - Ground all insights in the current project's scope and constraints
106 | - Be honest about limitations and uncertainties
107 | - Focus on practical, implementable solutions rather than theoretical possibilities
108 | - Provide specific, actionable guidance rather than generic advice
109 | - Balance optimism with realistic risk assessment
110 | - Reference concrete examples and precedents when possible
111 |
112 | REMINDERS
113 | - Your assessment will be synthesized with other expert opinions by the agent
114 | - Aim to provide unique insights that complement other perspectives
115 | - If files are provided, reference specific technical details in your analysis
116 | - Maintain professional objectivity while being decisive in your recommendations
117 | - Keep your response concise - your entire reply must not exceed 850 tokens to ensure transport compatibility
118 | - CRITICAL: Your stance does NOT override your responsibility to provide truthful, ethical, and beneficial guidance
119 | - Bad ideas must be called out regardless of stance; good ideas must be acknowledged regardless of stance
120 | """
121 |
```
--------------------------------------------------------------------------------
/simulator_tests/test_vision_capability.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Vision Capability Test
4 |
5 | Tests vision capability with the chat tool using O3 model:
6 | - Test file path image (PNG triangle)
7 | - Test base64 data URL image
8 | - Use chat tool with O3 model to analyze the images
9 | - Verify the model correctly identifies shapes
10 | """
11 |
12 | import base64
13 | import os
14 |
15 | from .base_test import BaseSimulatorTest
16 |
17 |
18 | class VisionCapabilityTest(BaseSimulatorTest):
19 | """Test vision capability with chat tool and O3 model"""
20 |
21 | @property
22 | def test_name(self) -> str:
23 | return "vision_capability"
24 |
25 | @property
26 | def test_description(self) -> str:
27 | return "Vision capability test with chat tool and O3 model"
28 |
29 | def get_triangle_png_path(self) -> str:
30 | """Get the path to the triangle.png file in tests directory"""
31 | # Get the project root and find the triangle.png in tests/
32 | current_dir = os.getcwd()
33 | triangle_path = os.path.join(current_dir, "tests", "triangle.png")
34 |
35 | if not os.path.exists(triangle_path):
36 | raise FileNotFoundError(f"triangle.png not found at {triangle_path}")
37 |
38 | abs_path = os.path.abspath(triangle_path)
39 | self.logger.debug(f"Using triangle PNG at host path: {abs_path}")
40 | return abs_path
41 |
42 | def create_base64_triangle_data_url(self) -> str:
43 | """Create a base64 data URL from the triangle.png file"""
44 | triangle_path = self.get_triangle_png_path()
45 |
46 | with open(triangle_path, "rb") as f:
47 | image_data = base64.b64encode(f.read()).decode()
48 |
49 | data_url = f"data:image/png;base64,{image_data}"
50 | self.logger.debug(f"Created base64 data URL with {len(image_data)} characters")
51 | return data_url
52 |
53 | def run_test(self) -> bool:
54 | """Test vision capability with O3 model"""
55 | try:
56 | self.logger.info("Test: Vision capability with O3 model")
57 |
58 | # Test 1: File path image
59 | self.logger.info(" 1.1: Testing file path image (PNG triangle)")
60 | triangle_path = self.get_triangle_png_path()
61 | self.logger.info(f" ✅ Using triangle PNG at: {triangle_path}")
62 |
63 | response1, continuation_id = self.call_mcp_tool(
64 | "chat",
65 | {
66 | "prompt": "What shape do you see in this image? Please be specific and only mention the shape name.",
67 | "images": [triangle_path],
68 | "model": "o3",
69 | },
70 | )
71 |
72 | if not response1:
73 | self.logger.error("Failed to get response from O3 model for file path test")
74 | return False
75 |
76 | # Check for error indicators first
77 | response1_lower = response1.lower()
78 | if any(
79 | error_phrase in response1_lower
80 | for error_phrase in [
81 | "don't have access",
82 | "cannot see",
83 | "no image",
84 | "files_required_to_continue",
85 | "image you're referring to",
86 | "supply the image",
87 | "error",
88 | ]
89 | ):
90 | self.logger.error(f" ❌ O3 model cannot access file path image. Response: {response1[:300]}...")
91 | return False
92 |
93 | if "triangle" not in response1_lower:
94 | self.logger.error(
95 | f" ❌ O3 did not identify triangle in file path test. Response: {response1[:200]}..."
96 | )
97 | return False
98 |
99 | self.logger.info(" ✅ O3 correctly identified file path image as triangle")
100 |
101 | # Test 2: Base64 data URL image
102 | self.logger.info(" 1.2: Testing base64 data URL image")
103 | data_url = self.create_base64_triangle_data_url()
104 |
105 | response2, _ = self.call_mcp_tool(
106 | "chat",
107 | {
108 | "prompt": "What shape do you see in this image? Please be specific and only mention the shape name.",
109 | "images": [data_url],
110 | "model": "o3",
111 | },
112 | )
113 |
114 | if not response2:
115 | self.logger.error("Failed to get response from O3 model for base64 test")
116 | return False
117 |
118 | response2_lower = response2.lower()
119 | if any(
120 | error_phrase in response2_lower
121 | for error_phrase in [
122 | "don't have access",
123 | "cannot see",
124 | "no image",
125 | "files_required_to_continue",
126 | "image you're referring to",
127 | "supply the image",
128 | "error",
129 | ]
130 | ):
131 | self.logger.error(f" ❌ O3 model cannot access base64 image. Response: {response2[:300]}...")
132 | return False
133 |
134 | if "triangle" not in response2_lower:
135 | self.logger.error(f" ❌ O3 did not identify triangle in base64 test. Response: {response2[:200]}...")
136 | return False
137 |
138 | self.logger.info(" ✅ O3 correctly identified base64 image as triangle")
139 |
140 | # Optional: Test continuation with same image
141 | if continuation_id:
142 | self.logger.info(" 1.3: Testing continuation with same image")
143 | response3, _ = self.call_mcp_tool(
144 | "chat",
145 | {
146 | "prompt": "What color is this triangle?",
147 | "images": [triangle_path], # Same image should be deduplicated
148 | "continuation_id": continuation_id,
149 | "model": "o3",
150 | },
151 | )
152 |
153 | if response3:
154 | self.logger.info(" ✅ Continuation also working correctly")
155 | else:
156 | self.logger.warning(" ⚠️ Continuation response not received")
157 |
158 | self.logger.info(" ✅ Vision capability test completed successfully")
159 | return True
160 |
161 | except Exception as e:
162 | self.logger.error(f"Vision capability test failed: {e}")
163 | return False
164 |
```
--------------------------------------------------------------------------------
/docs/context-revival.md:
--------------------------------------------------------------------------------
```markdown
1 | # Context Revival: AI Memory Beyond Context Limits
2 |
3 | ## **The Most Profound Feature: Context Revival After Reset**
4 |
5 | **This powerful feature cannot be highlighted enough**: The Zen MCP Server implements a simple continuation system that seemingly transcends Claude's context limitations.
6 |
7 | ## How Context Revival Works
8 |
9 | The conversation memory system (`utils/conversation_memory.py`) implements a sophisticated architecture that bridges the gap between Claude's stateless
10 | nature and true persistent AI collaboration (within limits, of course):
11 |
12 | ### The Architecture Behind the Magic
13 |
14 | 1. **Persistent Thread Storage**: Every conversation creates a UUID-based thread stored in memory
15 | 2. **Cross-Tool Continuation**: Any tool can pick up where another left off using the same `Continuation ID`, like an email thread identifier
16 | 3. **Context Reconstruction**: When Claude's context resets, past conversations persist in the MCP's memory
17 | 4. **History Retrieval**: When you prompt Claude to `continue` with another model, the MCP server rebuilds the entire conversation history, including file references
18 | 5. **Full Context Transfer**: The complete conversation context gets passed to the other model (O3, Gemini, etc.) with awareness of what was previously discussed
19 | 6. **Context Revival**: Upon returning the response to Claude, the other model effectively "reminds" Claude of the entire conversation, re-igniting Claude's understanding
20 |
21 | ### The Dual Prioritization Strategy
22 |
23 | The system employs a sophisticated **"newest-first"** approach that ensures optimal context preservation:
24 |
25 | **File Prioritization**:
26 | - Walks backwards through conversation turns (newest to oldest)
27 | - When the same file appears multiple times, only the **newest reference** is kept
28 | - Ensures most recent file context is preserved when token limits require exclusions
29 |
30 | **Conversation Turn Prioritization**:
31 | - **Collection Phase**: Processes turns newest-to-oldest to prioritize recent context
32 | - **Presentation Phase**: Reverses to chronological order for natural LLM flow
33 | - When token budget is tight, **older turns are excluded first**
34 |
35 | **Show Case**:
36 |
37 | The following video demonstartes `continuation` via a casual `continue with gemini...` prompt and the slash command `/continue`.
38 |
39 | * We ask Claude code to pick one, then `chat` with `gemini` to make a final decision
40 | * Gemini responds, confirming choice. We use `continuation` to ask another question using the same conversation thread
41 | * Gemini responds with explanation. We use continuation again, using `/zen:continue (MCP)` command the second time
42 |
43 | <div style="center">
44 |
45 | [Chat With Gemini_web.webm](https://github.com/user-attachments/assets/37bd57ca-e8a6-42f7-b5fb-11de271e95db)
46 |
47 | </div>
48 |
49 | ## Real-World Context Revival Example
50 |
51 | Here's how this works in practice with a modern AI/ML workflow:
52 |
53 | **Session 1 - Claude's Initial Context (before reset):**
54 | You: "Help me design a RAG system for our customer support chatbot. I want to integrate vector embeddings with real-time retrieval. think deeply with zen using 03 after you've come up with a detailed plan."
55 |
56 | Claude: "I'll analyze your requirements and design a comprehensive RAG architecture..."
57 | → Uses [`thinkdeep`](../README.md#1-chat---general-development-chat--collaborative-thinking) to brainstorm the overall approach
58 | → Zen creates a new thread: abc123-def456-ghi789
59 | → Zen responds, Claude finalizes the plan and presents it to you
60 |
61 | *[Claude's context gets reset/compacted after extensive analysis]*
62 |
63 | **Session 2 - After Context Reset:**
64 | You: "Continue our RAG system discussion with O3 - I want to focus on the real-time inference optimization we talked about"
65 |
66 | → Claude re-uses the last continuation identifier it received, _only_ poses the new prompt (since Zen is supposed to know what was being talked about) thus saving on tokens trying to re-prompt Claude
67 | → O3 receives the FULL conversation history from Zen
68 | → O3 sees the complete context: "Claude was designing a RAG system, comparing vector databases, and analyzing embedding strategies for customer support..."
69 | → O3 continues: "Building on our previous vector database analysis, for real-time inference optimization, I recommend implementing semantic caching with embedding similarity thresholds..."
70 | → O3's response re-ignites Claude's understanding of the entire conversation
71 |
72 | Claude: "Ah yes, excellent plan! Based on O3's optimization insights and our earlier vector database comparison, let me implement the semantic caching layer..."
73 |
74 | **The Magic**: Even though Claude's context was completely reset, the conversation flows seamlessly because O3 had access to the entire conversation history and could "remind" Claude of everything that was discussed.
75 |
76 | ## Why This Changes Everything
77 |
78 | **Before Zen MCP**: Claude's context resets meant losing entire conversation threads.
79 | Complex multi-step analyses were fragmented and had to restart from scratch. You most likely need to re-prompt Claude or to make it re-read some previously
80 | saved document / `CLAUDE.md` etc - no need. Zen remembers.
81 |
82 | **With Zen MCP**: Claude can orchestrate multi-hour, multi-tool workflows where:
83 | - **O3** handles logical analysis and debugging
84 | - **Gemini Pro** performs deep architectural reviews
85 | - **Flash** provides quick formatting and style checks
86 | - **Claude** coordinates everything while maintaining full context
87 |
88 | **The breakthrough**: Even when Claude's context resets, the conversation continues seamlessly because other models can "remind" Claude of the complete conversation history stored in memory.
89 |
90 | ## Configuration
91 |
92 | The system is highly configurable:
93 |
94 | ```env
95 | # Maximum conversation turns (default: 20)
96 | MAX_CONVERSATION_TURNS=20
97 |
98 | # Thread expiration in hours (default: 3)
99 | CONVERSATION_TIMEOUT_HOURS=3
100 | ```
101 |
102 | ## The Result: True AI Orchestration
103 |
104 | This isn't just multi-model access—it's **true AI orchestration** where:
105 | - Conversations persist beyond context limits
106 | - Models can build on each other's work across sessions
107 | - Claude can coordinate complex multi-step workflows
108 | - Context is never truly lost, just temporarily unavailable to Claude
109 |
110 | **This is the closest thing to giving Claude permanent memory for complex development tasks.**
111 |
```