This is page 20 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/simulator_tests/test_thinkdeep_validation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | ThinkDeep Tool Validation Test
4 |
5 | Tests the thinkdeep tool's capabilities using the new workflow architecture.
6 | This validates that the workflow-based deep thinking implementation provides
7 | step-by-step thinking with expert analysis integration.
8 | """
9 |
10 | import json
11 | from typing import Optional
12 |
13 | from .conversation_base_test import ConversationBaseTest
14 |
15 |
16 | class ThinkDeepWorkflowValidationTest(ConversationBaseTest):
17 | """Test thinkdeep tool with new workflow architecture"""
18 |
19 | @property
20 | def test_name(self) -> str:
21 | return "thinkdeep_validation"
22 |
23 | @property
24 | def test_description(self) -> str:
25 | return "ThinkDeep workflow tool validation with new workflow architecture"
26 |
27 | def run_test(self) -> bool:
28 | """Test thinkdeep tool capabilities"""
29 | # Set up the test environment
30 | self.setUp()
31 |
32 | try:
33 | self.logger.info("Test: ThinkDeepWorkflow tool validation (new architecture)")
34 |
35 | # Create test files for thinking context
36 | self._create_thinking_context()
37 |
38 | # Test 1: Single thinking session with multiple steps
39 | if not self._test_single_thinking_session():
40 | return False
41 |
42 | # Test 2: Thinking flow that requires refocusing
43 | if not self._test_thinking_refocus_flow():
44 | return False
45 |
46 | # Test 3: Complete thinking with expert analysis
47 | if not self._test_complete_thinking_with_analysis():
48 | return False
49 |
50 | # Test 4: Certain confidence behavior
51 | if not self._test_certain_confidence():
52 | return False
53 |
54 | # Test 5: Context-aware file embedding
55 | if not self._test_context_aware_file_embedding():
56 | return False
57 |
58 | # Test 6: Multi-step file context optimization
59 | if not self._test_multi_step_file_context():
60 | return False
61 |
62 | self.logger.info(" ✅ All thinkdeep validation tests passed")
63 | return True
64 |
65 | except Exception as e:
66 | self.logger.error(f"ThinkDeep validation test failed: {e}")
67 | return False
68 |
69 | def _create_thinking_context(self):
70 | """Create test files for deep thinking context"""
71 | # Create architecture document
72 | architecture_doc = """# Microservices Architecture Design
73 |
74 | ## Current System
75 | - Monolithic application with 500k LOC
76 | - Single PostgreSQL database
77 | - Peak load: 10k requests/minute
78 | - Team size: 25 developers
79 | - Deployment: Manual, 2-week cycles
80 |
81 | ## Proposed Migration to Microservices
82 |
83 | ### Benefits
84 | - Independent deployments
85 | - Technology diversity
86 | - Team autonomy
87 | - Scalability improvements
88 |
89 | ### Challenges
90 | - Data consistency
91 | - Network latency
92 | - Operational complexity
93 | - Transaction management
94 |
95 | ### Key Considerations
96 | - Service boundaries
97 | - Data migration strategy
98 | - Communication patterns
99 | - Monitoring and observability
100 | """
101 |
102 | # Create requirements document
103 | requirements_doc = """# Migration Requirements
104 |
105 | ## Business Goals
106 | - Reduce deployment cycle from 2 weeks to daily
107 | - Support 50k requests/minute by Q4
108 | - Enable A/B testing capabilities
109 | - Improve system resilience
110 |
111 | ## Technical Constraints
112 | - Zero downtime migration
113 | - Maintain data consistency
114 | - Budget: $200k for infrastructure
115 | - Timeline: 6 months
116 | - Existing team skills: Java, Spring Boot
117 |
118 | ## Success Metrics
119 | - Deployment frequency: 10x improvement
120 | - System availability: 99.9%
121 | - Response time: <200ms p95
122 | - Developer productivity: 30% improvement
123 | """
124 |
125 | # Create performance analysis
126 | performance_analysis = """# Current Performance Analysis
127 |
128 | ## Database Bottlenecks
129 | - Connection pool exhaustion during peak hours
130 | - Complex joins affecting query performance
131 | - Lock contention on user_sessions table
132 | - Read replica lag causing data inconsistency
133 |
134 | ## Application Issues
135 | - Memory leaks in background processing
136 | - Thread pool starvation
137 | - Cache invalidation storms
138 | - Session clustering problems
139 |
140 | ## Infrastructure Limits
141 | - Single server deployment
142 | - Manual scaling processes
143 | - Limited monitoring capabilities
144 | - No circuit breaker patterns
145 | """
146 |
147 | # Create test files
148 | self.architecture_file = self.create_additional_test_file("architecture_design.md", architecture_doc)
149 | self.requirements_file = self.create_additional_test_file("migration_requirements.md", requirements_doc)
150 | self.performance_file = self.create_additional_test_file("performance_analysis.md", performance_analysis)
151 |
152 | self.logger.info(" ✅ Created thinking context files:")
153 | self.logger.info(f" - {self.architecture_file}")
154 | self.logger.info(f" - {self.requirements_file}")
155 | self.logger.info(f" - {self.performance_file}")
156 |
157 | def _test_single_thinking_session(self) -> bool:
158 | """Test a complete thinking session with multiple steps"""
159 | try:
160 | self.logger.info(" 1.1: Testing single thinking session")
161 |
162 | # Step 1: Start thinking analysis
163 | self.logger.info(" 1.1.1: Step 1 - Initial thinking analysis")
164 | response1, continuation_id = self.call_mcp_tool(
165 | "thinkdeep",
166 | {
167 | "step": "I need to think deeply about the microservices migration strategy. Let me analyze the trade-offs, risks, and implementation approach systematically.",
168 | "step_number": 1,
169 | "total_steps": 4,
170 | "next_step_required": True,
171 | "findings": "Initial analysis shows significant architectural complexity but potential for major scalability and development velocity improvements. Need to carefully consider migration strategy and service boundaries.",
172 | "files_checked": [self.architecture_file, self.requirements_file],
173 | "relevant_files": [self.architecture_file, self.requirements_file],
174 | "relevant_context": ["microservices_migration", "service_boundaries", "data_consistency"],
175 | "confidence": "low",
176 | "problem_context": "Enterprise application migration from monolith to microservices",
177 | "focus_areas": ["architecture", "scalability", "risk_assessment"],
178 | },
179 | )
180 |
181 | if not response1 or not continuation_id:
182 | self.logger.error("Failed to get initial thinking response")
183 | return False
184 |
185 | # Parse and validate JSON response
186 | response1_data = self._parse_thinkdeep_response(response1)
187 | if not response1_data:
188 | return False
189 |
190 | # Validate step 1 response structure - expect pause_for_thinkdeep for next_step_required=True
191 | if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_thinkdeep"):
192 | return False
193 |
194 | self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
195 |
196 | # Step 2: Deep analysis
197 | self.logger.info(" 1.1.2: Step 2 - Deep analysis of alternatives")
198 | response2, _ = self.call_mcp_tool(
199 | "thinkdeep",
200 | {
201 | "step": "Analyzing different migration approaches: strangler fig pattern vs big bang vs gradual extraction. Each has different risk profiles and timelines.",
202 | "step_number": 2,
203 | "total_steps": 4,
204 | "next_step_required": True,
205 | "findings": "Strangler fig pattern emerges as best approach: lower risk, incremental value delivery, team learning curve management. Key insight: start with read-only services to minimize data consistency issues.",
206 | "files_checked": [self.architecture_file, self.requirements_file, self.performance_file],
207 | "relevant_files": [self.architecture_file, self.performance_file],
208 | "relevant_context": ["strangler_fig_pattern", "service_extraction", "risk_mitigation"],
209 | "issues_found": [
210 | {"severity": "high", "description": "Data consistency challenges during migration"},
211 | {"severity": "medium", "description": "Team skill gap in distributed systems"},
212 | ],
213 | "confidence": "medium",
214 | "continuation_id": continuation_id,
215 | },
216 | )
217 |
218 | if not response2:
219 | self.logger.error("Failed to continue thinking to step 2")
220 | return False
221 |
222 | response2_data = self._parse_thinkdeep_response(response2)
223 | if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_thinkdeep"):
224 | return False
225 |
226 | # Check thinking status tracking
227 | thinking_status = response2_data.get("thinking_status", {})
228 | if thinking_status.get("files_checked", 0) < 3:
229 | self.logger.error("Files checked count not properly tracked")
230 | return False
231 |
232 | if thinking_status.get("thinking_confidence") != "medium":
233 | self.logger.error("Confidence level not properly tracked")
234 | return False
235 |
236 | self.logger.info(" ✅ Step 2 successful with proper tracking")
237 |
238 | # Store continuation_id for next test
239 | self.thinking_continuation_id = continuation_id
240 | return True
241 |
242 | except Exception as e:
243 | self.logger.error(f"Single thinking session test failed: {e}")
244 | return False
245 |
246 | def _test_thinking_refocus_flow(self) -> bool:
247 | """Test thinking workflow that shifts direction mid-analysis"""
248 | try:
249 | self.logger.info(" 1.2: Testing thinking refocus workflow")
250 |
251 | # Start a new thinking session for testing refocus behaviour
252 | self.logger.info(" 1.2.1: Start thinking session for refocus test")
253 | response1, continuation_id = self.call_mcp_tool(
254 | "thinkdeep",
255 | {
256 | "step": "Thinking about optimal database architecture for the new microservices",
257 | "step_number": 1,
258 | "total_steps": 4,
259 | "next_step_required": True,
260 | "findings": "Initial thought: each service should have its own database for independence",
261 | "files_checked": [self.architecture_file],
262 | "relevant_files": [self.architecture_file],
263 | "relevant_context": ["database_per_service", "data_independence"],
264 | "confidence": "low",
265 | },
266 | )
267 |
268 | if not response1 or not continuation_id:
269 | self.logger.error("Failed to start refocus test thinking")
270 | return False
271 |
272 | # Step 2: Initial direction
273 | self.logger.info(" 1.2.2: Step 2 - Initial analysis direction")
274 | response2, _ = self.call_mcp_tool(
275 | "thinkdeep",
276 | {
277 | "step": "Exploring database-per-service pattern implementation",
278 | "step_number": 2,
279 | "total_steps": 4,
280 | "next_step_required": True,
281 | "findings": "Database-per-service creates significant complexity for transactions and reporting",
282 | "files_checked": [self.architecture_file, self.performance_file],
283 | "relevant_files": [self.performance_file],
284 | "relevant_context": ["database_per_service", "transaction_management"],
285 | "issues_found": [
286 | {"severity": "high", "description": "Cross-service transactions become complex"},
287 | {"severity": "medium", "description": "Reporting queries span multiple databases"},
288 | ],
289 | "confidence": "low",
290 | "continuation_id": continuation_id,
291 | },
292 | )
293 |
294 | if not response2:
295 | self.logger.error("Failed to continue to step 2")
296 | return False
297 |
298 | # Step 3: Backtrack and revise approach
299 | self.logger.info(" 1.2.3: Step 3 - Backtrack and revise thinking")
300 | response3, _ = self.call_mcp_tool(
301 | "thinkdeep",
302 | {
303 | "step": "Refocusing - maybe shared database with service-specific schemas is better initially. Then gradually extract databases as services mature.",
304 | "step_number": 3,
305 | "total_steps": 4,
306 | "next_step_required": True,
307 | "findings": "Hybrid approach: shared database with bounded contexts, then gradual extraction. This reduces initial complexity while preserving migration path to full service independence.",
308 | "files_checked": [self.architecture_file, self.requirements_file],
309 | "relevant_files": [self.architecture_file, self.requirements_file],
310 | "relevant_context": ["shared_database", "bounded_contexts", "gradual_extraction"],
311 | "confidence": "medium",
312 | "continuation_id": continuation_id,
313 | },
314 | )
315 |
316 | if not response3:
317 | self.logger.error("Failed to refocus")
318 | return False
319 |
320 | response3_data = self._parse_thinkdeep_response(response3)
321 | if not self._validate_step_response(response3_data, 3, 4, True, "pause_for_thinkdeep"):
322 | return False
323 |
324 | self.logger.info(" ✅ Refocus working correctly")
325 | return True
326 |
327 | except Exception as e:
328 | self.logger.error(f"Refocus test failed: {e}")
329 | return False
330 |
331 | def _test_complete_thinking_with_analysis(self) -> bool:
332 | """Test complete thinking ending with expert analysis"""
333 | try:
334 | self.logger.info(" 1.3: Testing complete thinking with expert analysis")
335 |
336 | # Use the continuation from first test
337 | continuation_id = getattr(self, "thinking_continuation_id", None)
338 | if not continuation_id:
339 | # Start fresh if no continuation available
340 | self.logger.info(" 1.3.0: Starting fresh thinking session")
341 | response0, continuation_id = self.call_mcp_tool(
342 | "thinkdeep",
343 | {
344 | "step": "Thinking about the complete microservices migration strategy",
345 | "step_number": 1,
346 | "total_steps": 2,
347 | "next_step_required": True,
348 | "findings": "Comprehensive analysis of migration approaches and risks",
349 | "files_checked": [self.architecture_file, self.requirements_file],
350 | "relevant_files": [self.architecture_file, self.requirements_file],
351 | "relevant_context": ["migration_strategy", "risk_assessment"],
352 | },
353 | )
354 | if not response0 or not continuation_id:
355 | self.logger.error("Failed to start fresh thinking session")
356 | return False
357 |
358 | # Final step - trigger expert analysis
359 | self.logger.info(" 1.3.1: Final step - complete thinking analysis")
360 | response_final, _ = self.call_mcp_tool(
361 | "thinkdeep",
362 | {
363 | "step": "Thinking analysis complete. I've thoroughly considered the migration strategy, risks, and implementation approach.",
364 | "step_number": 2,
365 | "total_steps": 2,
366 | "next_step_required": False, # Final step - triggers expert analysis
367 | "findings": "Comprehensive migration strategy: strangler fig pattern with shared database initially, gradual service extraction based on business value and technical feasibility. Key success factors: team training, monitoring infrastructure, and incremental rollout.",
368 | "files_checked": [self.architecture_file, self.requirements_file, self.performance_file],
369 | "relevant_files": [self.architecture_file, self.requirements_file, self.performance_file],
370 | "relevant_context": ["strangler_fig", "migration_strategy", "risk_mitigation", "team_readiness"],
371 | "issues_found": [
372 | {"severity": "medium", "description": "Team needs distributed systems training"},
373 | {"severity": "low", "description": "Monitoring tools need upgrade"},
374 | ],
375 | "confidence": "high",
376 | "continuation_id": continuation_id,
377 | "model": "flash", # Use flash for expert analysis
378 | },
379 | )
380 |
381 | if not response_final:
382 | self.logger.error("Failed to complete thinking")
383 | return False
384 |
385 | response_final_data = self._parse_thinkdeep_response(response_final)
386 | if not response_final_data:
387 | return False
388 |
389 | # Validate final response structure - accept both expert analysis and special statuses
390 | valid_final_statuses = ["calling_expert_analysis", "files_required_to_continue"]
391 | if response_final_data.get("status") not in valid_final_statuses:
392 | self.logger.error(
393 | f"Expected status in {valid_final_statuses}, got '{response_final_data.get('status')}'"
394 | )
395 | return False
396 |
397 | if not response_final_data.get("thinking_complete"):
398 | self.logger.error("Expected thinking_complete=true for final step")
399 | return False
400 |
401 | # Check for expert analysis or special status content
402 | if response_final_data.get("status") == "calling_expert_analysis":
403 | if "expert_analysis" not in response_final_data:
404 | self.logger.error("Missing expert_analysis in final response")
405 | return False
406 | expert_analysis = response_final_data.get("expert_analysis", {})
407 | else:
408 | # For special statuses like files_required_to_continue, analysis may be in content
409 | expert_analysis = response_final_data.get("content", "{}")
410 | if isinstance(expert_analysis, str):
411 | try:
412 | expert_analysis = json.loads(expert_analysis)
413 | except (json.JSONDecodeError, TypeError):
414 | expert_analysis = {"analysis": expert_analysis}
415 |
416 | # Check for expected analysis content (checking common patterns)
417 | analysis_text = json.dumps(expert_analysis, ensure_ascii=False).lower()
418 |
419 | # Look for thinking analysis validation
420 | thinking_indicators = ["migration", "strategy", "microservices", "risk", "approach", "implementation"]
421 | found_indicators = sum(1 for indicator in thinking_indicators if indicator in analysis_text)
422 |
423 | if found_indicators >= 3:
424 | self.logger.info(" ✅ Expert analysis validated the thinking correctly")
425 | else:
426 | self.logger.warning(
427 | f" ⚠️ Expert analysis may not have fully validated the thinking (found {found_indicators}/6 indicators)"
428 | )
429 |
430 | # Check complete thinking summary
431 | if "complete_thinking" not in response_final_data:
432 | self.logger.error("Missing complete_thinking in final response")
433 | return False
434 |
435 | complete_thinking = response_final_data["complete_thinking"]
436 | if not complete_thinking.get("relevant_context"):
437 | self.logger.error("Missing relevant context in complete thinking")
438 | return False
439 |
440 | if "migration_strategy" not in complete_thinking["relevant_context"]:
441 | self.logger.error("Expected context not found in thinking summary")
442 | return False
443 |
444 | self.logger.info(" ✅ Complete thinking with expert analysis successful")
445 | return True
446 |
447 | except Exception as e:
448 | self.logger.error(f"Complete thinking test failed: {e}")
449 | return False
450 |
451 | def _test_certain_confidence(self) -> bool:
452 | """Test certain confidence behavior - should skip expert analysis"""
453 | try:
454 | self.logger.info(" 1.4: Testing certain confidence behavior")
455 |
456 | # Test certain confidence - should skip expert analysis
457 | self.logger.info(" 1.4.1: Certain confidence thinking")
458 | response_certain, _ = self.call_mcp_tool(
459 | "thinkdeep",
460 | {
461 | "step": "I have thoroughly analyzed all aspects of the migration strategy with complete certainty.",
462 | "step_number": 1,
463 | "total_steps": 1,
464 | "next_step_required": False, # Final step
465 | "findings": "Definitive conclusion: strangler fig pattern with phased database extraction is the optimal approach. Risk mitigation through team training and robust monitoring. Timeline: 6 months with monthly service extractions.",
466 | "files_checked": [self.architecture_file, self.requirements_file, self.performance_file],
467 | "relevant_files": [self.architecture_file, self.requirements_file],
468 | "relevant_context": ["migration_complete_strategy", "implementation_plan"],
469 | "confidence": "certain", # This should skip expert analysis
470 | "model": "flash",
471 | },
472 | )
473 |
474 | if not response_certain:
475 | self.logger.error("Failed to test certain confidence")
476 | return False
477 |
478 | response_certain_data = self._parse_thinkdeep_response(response_certain)
479 | if not response_certain_data:
480 | return False
481 |
482 | # Validate certain confidence response - should skip expert analysis
483 | if response_certain_data.get("status") != "deep_thinking_complete_ready_for_implementation":
484 | self.logger.error(
485 | f"Expected status 'deep_thinking_complete_ready_for_implementation', got '{response_certain_data.get('status')}'"
486 | )
487 | return False
488 |
489 | if not response_certain_data.get("skip_expert_analysis"):
490 | self.logger.error("Expected skip_expert_analysis=true for certain confidence")
491 | return False
492 |
493 | expert_analysis = response_certain_data.get("expert_analysis", {})
494 | if expert_analysis.get("status") != "skipped_due_to_certain_thinking_confidence":
495 | self.logger.error("Expert analysis should be skipped for certain confidence")
496 | return False
497 |
498 | self.logger.info(" ✅ Certain confidence behavior working correctly")
499 | return True
500 |
501 | except Exception as e:
502 | self.logger.error(f"Certain confidence test failed: {e}")
503 | return False
504 |
505 | def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
506 | """Call an MCP tool in-process - override for thinkdeep-specific response handling"""
507 | # Use in-process implementation to maintain conversation memory
508 | response_text, _ = self.call_mcp_tool_direct(tool_name, params)
509 |
510 | if not response_text:
511 | return None, None
512 |
513 | # Extract continuation_id from thinkdeep response specifically
514 | continuation_id = self._extract_thinkdeep_continuation_id(response_text)
515 |
516 | return response_text, continuation_id
517 |
518 | def _extract_thinkdeep_continuation_id(self, response_text: str) -> Optional[str]:
519 | """Extract continuation_id from thinkdeep response"""
520 | try:
521 | # Parse the response
522 | response_data = json.loads(response_text)
523 | return response_data.get("continuation_id")
524 |
525 | except json.JSONDecodeError as e:
526 | self.logger.debug(f"Failed to parse response for thinkdeep continuation_id: {e}")
527 | return None
528 |
529 | def _parse_thinkdeep_response(self, response_text: str) -> dict:
530 | """Parse thinkdeep tool JSON response"""
531 | try:
532 | # Parse the response - it should be direct JSON
533 | return json.loads(response_text)
534 |
535 | except json.JSONDecodeError as e:
536 | self.logger.error(f"Failed to parse thinkdeep response as JSON: {e}")
537 | self.logger.error(f"Response text: {response_text[:500]}...")
538 | return {}
539 |
540 | def _validate_step_response(
541 | self,
542 | response_data: dict,
543 | expected_step: int,
544 | expected_total: int,
545 | expected_next_required: bool,
546 | expected_status: str,
547 | ) -> bool:
548 | """Validate a thinkdeep thinking step response structure"""
549 | try:
550 | # Check status
551 | if response_data.get("status") != expected_status:
552 | self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
553 | return False
554 |
555 | # Check step number
556 | if response_data.get("step_number") != expected_step:
557 | self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
558 | return False
559 |
560 | # Check total steps
561 | if response_data.get("total_steps") != expected_total:
562 | self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
563 | return False
564 |
565 | # Check next_step_required
566 | if response_data.get("next_step_required") != expected_next_required:
567 | self.logger.error(
568 | f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
569 | )
570 | return False
571 |
572 | # Check thinking_status exists
573 | if "thinking_status" not in response_data:
574 | self.logger.error("Missing thinking_status in response")
575 | return False
576 |
577 | # Check next_steps guidance
578 | if not response_data.get("next_steps"):
579 | self.logger.error("Missing next_steps guidance in response")
580 | return False
581 |
582 | return True
583 |
584 | except Exception as e:
585 | self.logger.error(f"Error validating step response: {e}")
586 | return False
587 |
588 | def _test_context_aware_file_embedding(self) -> bool:
589 | """Test context-aware file embedding optimization"""
590 | try:
591 | self.logger.info(" 1.5: Testing context-aware file embedding")
592 |
593 | # Create additional test files for context testing
594 | strategy_doc = """# Implementation Strategy
595 |
596 | ## Phase 1: Foundation (Month 1-2)
597 | - Set up monitoring and logging infrastructure
598 | - Establish CI/CD pipelines for microservices
599 | - Team training on distributed systems concepts
600 |
601 | ## Phase 2: Initial Services (Month 3-4)
602 | - Extract read-only services (user profiles, product catalog)
603 | - Implement API gateway
604 | - Set up service discovery
605 |
606 | ## Phase 3: Core Services (Month 5-6)
607 | - Extract transaction services
608 | - Implement saga patterns for distributed transactions
609 | - Performance optimization and monitoring
610 | """
611 |
612 | tech_stack_doc = """# Technology Stack Decisions
613 |
614 | ## Service Framework
615 | - Spring Boot 2.7 (team familiarity)
616 | - Docker containers
617 | - Kubernetes orchestration
618 |
619 | ## Communication
620 | - REST APIs for synchronous communication
621 | - Apache Kafka for asynchronous messaging
622 | - gRPC for high-performance internal communication
623 |
624 | ## Data Layer
625 | - PostgreSQL (existing expertise)
626 | - Redis for caching
627 | - Elasticsearch for search and analytics
628 |
629 | ## Monitoring
630 | - Prometheus + Grafana
631 | - Distributed tracing with Jaeger
632 | - Centralized logging with ELK stack
633 | """
634 |
635 | # Create test files
636 | strategy_file = self.create_additional_test_file("implementation_strategy.md", strategy_doc)
637 | tech_stack_file = self.create_additional_test_file("tech_stack.md", tech_stack_doc)
638 |
639 | # Test 1: New conversation, intermediate step - should only reference files
640 | self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
641 | response1, continuation_id = self.call_mcp_tool(
642 | "thinkdeep",
643 | {
644 | "step": "Starting deep thinking about implementation timeline and technology choices",
645 | "step_number": 1,
646 | "total_steps": 3,
647 | "next_step_required": True, # Intermediate step
648 | "findings": "Initial analysis of implementation strategy and technology stack decisions",
649 | "files_checked": [strategy_file, tech_stack_file],
650 | "relevant_files": [strategy_file], # This should be referenced, not embedded
651 | "relevant_context": ["implementation_timeline", "technology_selection"],
652 | "confidence": "low",
653 | "model": "flash",
654 | },
655 | )
656 |
657 | if not response1 or not continuation_id:
658 | self.logger.error("Failed to start context-aware file embedding test")
659 | return False
660 |
661 | response1_data = self._parse_thinkdeep_response(response1)
662 | if not response1_data:
663 | return False
664 |
665 | # Check file context - should be reference_only for intermediate step
666 | file_context = response1_data.get("file_context", {})
667 | if file_context.get("type") != "reference_only":
668 | self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
669 | return False
670 |
671 | if "Files referenced but not embedded" not in file_context.get("context_optimization", ""):
672 | self.logger.error("Expected context optimization message for reference_only")
673 | return False
674 |
675 | self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
676 |
677 | # Test 2: Final step - should embed files for expert analysis
678 | self.logger.info(" 1.5.2: Final step (should embed files)")
679 | response2, _ = self.call_mcp_tool(
680 | "thinkdeep",
681 | {
682 | "step": "Thinking analysis complete - comprehensive evaluation of implementation approach",
683 | "step_number": 2,
684 | "total_steps": 2,
685 | "next_step_required": False, # Final step - should embed files
686 | "continuation_id": continuation_id,
687 | "findings": "Complete analysis: phased implementation with proven technology stack minimizes risk while maximizing team effectiveness. Timeline is realistic with proper training and infrastructure setup.",
688 | "files_checked": [strategy_file, tech_stack_file],
689 | "relevant_files": [strategy_file, tech_stack_file], # Should be fully embedded
690 | "relevant_context": ["implementation_plan", "technology_decisions", "risk_management"],
691 | "confidence": "high",
692 | "model": "flash",
693 | },
694 | )
695 |
696 | if not response2:
697 | self.logger.error("Failed to complete to final step")
698 | return False
699 |
700 | response2_data = self._parse_thinkdeep_response(response2)
701 | if not response2_data:
702 | return False
703 |
704 | # Check file context - should be fully_embedded for final step
705 | file_context2 = response2_data.get("file_context", {})
706 | if file_context2.get("type") != "fully_embedded":
707 | self.logger.error(
708 | f"Expected fully_embedded file context for final step, got: {file_context2.get('type')}"
709 | )
710 | return False
711 |
712 | if "Full file content embedded for expert analysis" not in file_context2.get("context_optimization", ""):
713 | self.logger.error("Expected expert analysis optimization message for fully_embedded")
714 | return False
715 |
716 | self.logger.info(" ✅ Final step correctly uses fully_embedded file context")
717 |
718 | # Verify expert analysis was called for final step
719 | if response2_data.get("status") != "calling_expert_analysis":
720 | self.logger.error("Final step should trigger expert analysis")
721 | return False
722 |
723 | if "expert_analysis" not in response2_data:
724 | self.logger.error("Expert analysis should be present in final step")
725 | return False
726 |
727 | self.logger.info(" ✅ Context-aware file embedding test completed successfully")
728 | return True
729 |
730 | except Exception as e:
731 | self.logger.error(f"Context-aware file embedding test failed: {e}")
732 | return False
733 |
734 | def _test_multi_step_file_context(self) -> bool:
735 | """Test multi-step workflow with proper file context transitions"""
736 | try:
737 | self.logger.info(" 1.6: Testing multi-step file context optimization")
738 |
739 | # Create a complex scenario with multiple thinking documents
740 | risk_analysis = """# Risk Analysis
741 |
742 | ## Technical Risks
743 | - Service mesh complexity
744 | - Data consistency challenges
745 | - Performance degradation during migration
746 | - Operational overhead increase
747 |
748 | ## Business Risks
749 | - Extended development timelines
750 | - Potential system instability
751 | - Team productivity impact
752 | - Customer experience disruption
753 |
754 | ## Mitigation Strategies
755 | - Gradual rollout with feature flags
756 | - Comprehensive monitoring and alerting
757 | - Rollback procedures for each phase
758 | - Customer communication plan
759 | """
760 |
761 | success_metrics = """# Success Metrics and KPIs
762 |
763 | ## Development Velocity
764 | - Deployment frequency: Target 10x improvement
765 | - Lead time for changes: <2 hours
766 | - Mean time to recovery: <30 minutes
767 | - Change failure rate: <5%
768 |
769 | ## System Performance
770 | - Response time: <200ms p95
771 | - System availability: 99.9%
772 | - Throughput: 50k requests/minute
773 | - Resource utilization: 70% optimal
774 |
775 | ## Business Impact
776 | - Developer satisfaction: >8/10
777 | - Time to market: 50% reduction
778 | - Operational costs: 20% reduction
779 | - System reliability: 99.9% uptime
780 | """
781 |
782 | # Create test files
783 | risk_file = self.create_additional_test_file("risk_analysis.md", risk_analysis)
784 | metrics_file = self.create_additional_test_file("success_metrics.md", success_metrics)
785 |
786 | # Step 1: Start thinking analysis (new conversation)
787 | self.logger.info(" 1.6.1: Step 1 - Start thinking analysis")
788 | response1, continuation_id = self.call_mcp_tool(
789 | "thinkdeep",
790 | {
791 | "step": "Beginning comprehensive analysis of migration risks and success criteria",
792 | "step_number": 1,
793 | "total_steps": 4,
794 | "next_step_required": True,
795 | "findings": "Initial assessment of risk factors and success metrics for microservices migration",
796 | "files_checked": [risk_file],
797 | "relevant_files": [risk_file],
798 | "relevant_context": ["risk_assessment", "migration_planning"],
799 | "confidence": "low",
800 | "model": "flash",
801 | },
802 | )
803 |
804 | if not response1 or not continuation_id:
805 | self.logger.error("Failed to start multi-step file context test")
806 | return False
807 |
808 | response1_data = self._parse_thinkdeep_response(response1)
809 |
810 | # Validate step 1 - should use reference_only
811 | file_context1 = response1_data.get("file_context", {})
812 | if file_context1.get("type") != "reference_only":
813 | self.logger.error("Step 1 should use reference_only file context")
814 | return False
815 |
816 | self.logger.info(" ✅ Step 1: reference_only file context")
817 |
818 | # Step 2: Expand thinking analysis
819 | self.logger.info(" 1.6.2: Step 2 - Expand thinking analysis")
820 | response2, _ = self.call_mcp_tool(
821 | "thinkdeep",
822 | {
823 | "step": "Deepening analysis by correlating risks with success metrics",
824 | "step_number": 2,
825 | "total_steps": 4,
826 | "next_step_required": True,
827 | "continuation_id": continuation_id,
828 | "findings": "Key insight: technical risks directly impact business metrics. Need balanced approach prioritizing high-impact, low-risk improvements first.",
829 | "files_checked": [risk_file, metrics_file],
830 | "relevant_files": [risk_file, metrics_file],
831 | "relevant_context": ["risk_metric_correlation", "priority_matrix"],
832 | "confidence": "medium",
833 | "model": "flash",
834 | },
835 | )
836 |
837 | if not response2:
838 | self.logger.error("Failed to continue to step 2")
839 | return False
840 |
841 | response2_data = self._parse_thinkdeep_response(response2)
842 |
843 | # Validate step 2 - should still use reference_only
844 | file_context2 = response2_data.get("file_context", {})
845 | if file_context2.get("type") != "reference_only":
846 | self.logger.error("Step 2 should use reference_only file context")
847 | return False
848 |
849 | self.logger.info(" ✅ Step 2: reference_only file context with multiple files")
850 |
851 | # Step 3: Deep analysis
852 | self.logger.info(" 1.6.3: Step 3 - Deep strategic analysis")
853 | response3, _ = self.call_mcp_tool(
854 | "thinkdeep",
855 | {
856 | "step": "Synthesizing risk mitigation strategies with measurable success criteria",
857 | "step_number": 3,
858 | "total_steps": 4,
859 | "next_step_required": True,
860 | "continuation_id": continuation_id,
861 | "findings": "Strategic framework emerging: phase-gate approach with clear go/no-go criteria at each milestone. Emphasis on early wins to build confidence and momentum.",
862 | "files_checked": [risk_file, metrics_file, self.requirements_file],
863 | "relevant_files": [risk_file, metrics_file, self.requirements_file],
864 | "relevant_context": ["phase_gate_approach", "milestone_criteria", "early_wins"],
865 | "confidence": "high",
866 | "model": "flash",
867 | },
868 | )
869 |
870 | if not response3:
871 | self.logger.error("Failed to continue to step 3")
872 | return False
873 |
874 | response3_data = self._parse_thinkdeep_response(response3)
875 |
876 | # Validate step 3 - should still use reference_only
877 | file_context3 = response3_data.get("file_context", {})
878 | if file_context3.get("type") != "reference_only":
879 | self.logger.error("Step 3 should use reference_only file context")
880 | return False
881 |
882 | self.logger.info(" ✅ Step 3: reference_only file context")
883 |
884 | # Step 4: Final analysis with expert consultation
885 | self.logger.info(" 1.6.4: Step 4 - Final step with expert analysis")
886 | response4, _ = self.call_mcp_tool(
887 | "thinkdeep",
888 | {
889 | "step": "Thinking analysis complete - comprehensive strategic framework developed",
890 | "step_number": 4,
891 | "total_steps": 4,
892 | "next_step_required": False, # Final step - should embed files
893 | "continuation_id": continuation_id,
894 | "findings": "Complete strategic framework: risk-balanced migration with measurable success criteria, phase-gate governance, and clear rollback procedures. Framework aligns technical execution with business objectives.",
895 | "files_checked": [risk_file, metrics_file, self.requirements_file, self.architecture_file],
896 | "relevant_files": [risk_file, metrics_file, self.requirements_file, self.architecture_file],
897 | "relevant_context": ["strategic_framework", "governance_model", "success_measurement"],
898 | "confidence": "high",
899 | "model": "flash",
900 | },
901 | )
902 |
903 | if not response4:
904 | self.logger.error("Failed to complete to final step")
905 | return False
906 |
907 | response4_data = self._parse_thinkdeep_response(response4)
908 |
909 | # Validate step 4 - should use fully_embedded for expert analysis
910 | file_context4 = response4_data.get("file_context", {})
911 | if file_context4.get("type") != "fully_embedded":
912 | self.logger.error("Step 4 (final) should use fully_embedded file context")
913 | return False
914 |
915 | if "expert analysis" not in file_context4.get("context_optimization", "").lower():
916 | self.logger.error("Final step should mention expert analysis in context optimization")
917 | return False
918 |
919 | # Verify expert analysis was triggered
920 | if response4_data.get("status") != "calling_expert_analysis":
921 | self.logger.error("Final step should trigger expert analysis")
922 | return False
923 |
924 | # Check that expert analysis has file context
925 | expert_analysis = response4_data.get("expert_analysis", {})
926 | if not expert_analysis:
927 | self.logger.error("Expert analysis should be present in final step")
928 | return False
929 |
930 | self.logger.info(" ✅ Step 4: fully_embedded file context with expert analysis")
931 |
932 | # Validate the complete workflow progression
933 | progression_summary = {
934 | "step_1": "reference_only (new conversation, intermediate)",
935 | "step_2": "reference_only (continuation, intermediate)",
936 | "step_3": "reference_only (continuation, intermediate)",
937 | "step_4": "fully_embedded (continuation, final)",
938 | }
939 |
940 | self.logger.info(" 📋 File context progression:")
941 | for step, context_type in progression_summary.items():
942 | self.logger.info(f" {step}: {context_type}")
943 |
944 | self.logger.info(" ✅ Multi-step file context optimization test completed successfully")
945 | return True
946 |
947 | except Exception as e:
948 | self.logger.error(f"Multi-step file context test failed: {e}")
949 | return False
950 |
```
--------------------------------------------------------------------------------
/simulator_tests/test_debug_validation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | DebugWorkflow Tool Validation Test
4 |
5 | Tests the debug tool's capabilities using the new workflow architecture.
6 | This validates that the new workflow-based implementation maintains
7 | all the functionality of the original debug tool.
8 | """
9 |
10 | import json
11 | from typing import Optional
12 |
13 | from .conversation_base_test import ConversationBaseTest
14 |
15 |
16 | class DebugValidationTest(ConversationBaseTest):
17 | """Test debug tool with new workflow architecture"""
18 |
19 | @property
20 | def test_name(self) -> str:
21 | return "debug_validation"
22 |
23 | @property
24 | def test_description(self) -> str:
25 | return "Debug tool validation with new workflow architecture"
26 |
27 | def run_test(self) -> bool:
28 | """Test debug tool capabilities"""
29 | # Set up the test environment
30 | self.setUp()
31 |
32 | try:
33 | self.logger.info("Test: DebugWorkflow tool validation (new architecture)")
34 |
35 | # Create a Python file with a subtle but realistic bug
36 | self._create_buggy_code()
37 |
38 | # Test 1: Single investigation session with multiple steps
39 | if not self._test_single_investigation_session():
40 | return False
41 |
42 | # Test 2: Investigation flow that requires refinement
43 | if not self._test_investigation_refine_flow():
44 | return False
45 |
46 | # Test 3: Complete investigation with expert analysis
47 | if not self._test_complete_investigation_with_analysis():
48 | return False
49 |
50 | # Test 4: Certain confidence behavior
51 | if not self._test_certain_confidence():
52 | return False
53 |
54 | # Test 5: Context-aware file embedding
55 | if not self._test_context_aware_file_embedding():
56 | return False
57 |
58 | # Test 6: Multi-step file context optimization
59 | if not self._test_multi_step_file_context():
60 | return False
61 |
62 | self.logger.info(" ✅ All debug validation tests passed")
63 | return True
64 |
65 | except Exception as e:
66 | self.logger.error(f"DebugWorkflow validation test failed: {e}")
67 | return False
68 |
69 | def _create_buggy_code(self):
70 | """Create test files with a subtle bug for debugging"""
71 | # Create a Python file with dictionary iteration bug
72 | buggy_code = """#!/usr/bin/env python3
73 | import json
74 | from datetime import datetime, timedelta
75 |
76 | class SessionManager:
77 | def __init__(self):
78 | self.active_sessions = {}
79 | self.session_timeout = 30 * 60 # 30 minutes in seconds
80 |
81 | def create_session(self, user_id, user_data):
82 | \"\"\"Create a new user session\"\"\"
83 | session_id = f"sess_{user_id}_{datetime.now().timestamp()}"
84 |
85 | session_info = {
86 | 'user_id': user_id,
87 | 'user_data': user_data,
88 | 'created_at': datetime.now(),
89 | 'expires_at': datetime.now() + timedelta(seconds=self.session_timeout)
90 | }
91 |
92 | self.active_sessions[session_id] = session_info
93 | return session_id
94 |
95 | def validate_session(self, session_id):
96 | \"\"\"Check if session is valid and not expired\"\"\"
97 | if session_id not in self.active_sessions:
98 | return False
99 |
100 | session = self.active_sessions[session_id]
101 | current_time = datetime.now()
102 |
103 | # Check if session has expired
104 | if current_time > session['expires_at']:
105 | del self.active_sessions[session_id]
106 | return False
107 |
108 | return True
109 |
110 | def cleanup_expired_sessions(self):
111 | \"\"\"Remove expired sessions from memory\"\"\"
112 | current_time = datetime.now()
113 | expired_count = 0
114 |
115 | # BUG: Modifying dictionary while iterating over it
116 | for session_id, session in self.active_sessions.items():
117 | if current_time > session['expires_at']:
118 | del self.active_sessions[session_id] # This causes RuntimeError
119 | expired_count += 1
120 |
121 | return expired_count
122 | """
123 |
124 | # Create test file with subtle bug
125 | self.buggy_file = self.create_additional_test_file("session_manager.py", buggy_code)
126 | self.logger.info(f" ✅ Created test file with subtle bug: {self.buggy_file}")
127 |
128 | # Create error description
129 | error_description = """ISSUE DESCRIPTION:
130 | Our session management system is experiencing intermittent failures during cleanup operations.
131 |
132 | SYMPTOMS:
133 | - Random RuntimeError: dictionary changed size during iteration
134 | - Occurs during high load when many sessions expire simultaneously
135 | - Error happens in cleanup_expired_sessions method
136 | - Affects about 5% of cleanup operations
137 |
138 | ERROR LOG:
139 | RuntimeError: dictionary changed size during iteration
140 | File "session_manager.py", line 44, in cleanup_expired_sessions
141 | for session_id, session in self.active_sessions.items():
142 | """
143 |
144 | self.error_file = self.create_additional_test_file("error_description.txt", error_description)
145 | self.logger.info(f" ✅ Created error description file: {self.error_file}")
146 |
147 | def _test_single_investigation_session(self) -> bool:
148 | """Test a complete investigation session with multiple steps"""
149 | try:
150 | self.logger.info(" 1.1: Testing single investigation session")
151 |
152 | # Step 1: Start investigation
153 | self.logger.info(" 1.1.1: Step 1 - Initial investigation")
154 | response1, continuation_id = self.call_mcp_tool(
155 | "debug",
156 | {
157 | "step": "I need to investigate intermittent RuntimeError during session cleanup. Let me start by examining the error description and understanding the symptoms.",
158 | "step_number": 1,
159 | "total_steps": 4,
160 | "next_step_required": True,
161 | "findings": "RuntimeError occurs during dictionary iteration in cleanup_expired_sessions method. Error happens intermittently during high load.",
162 | "files_checked": [self.error_file],
163 | "relevant_files": [self.error_file],
164 | },
165 | )
166 |
167 | if not response1 or not continuation_id:
168 | self.logger.error("Failed to get initial investigation response")
169 | return False
170 |
171 | # Parse and validate JSON response
172 | response1_data = self._parse_debug_response(response1)
173 | if not response1_data:
174 | return False
175 |
176 | # Validate step 1 response structure - expect pause_for_investigation for next_step_required=True
177 | if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_investigation"):
178 | return False
179 |
180 | self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
181 |
182 | # Step 2: Examine the code
183 | self.logger.info(" 1.1.2: Step 2 - Code examination")
184 | response2, _ = self.call_mcp_tool(
185 | "debug",
186 | {
187 | "step": "Now examining the session_manager.py file to understand the cleanup_expired_sessions implementation and identify the root cause.",
188 | "step_number": 2,
189 | "total_steps": 4,
190 | "next_step_required": True,
191 | "findings": "Found the issue: cleanup_expired_sessions modifies self.active_sessions dictionary while iterating over it with .items(). This causes RuntimeError when del is called during iteration.",
192 | "files_checked": [self.error_file, self.buggy_file],
193 | "relevant_files": [self.buggy_file],
194 | "relevant_context": ["SessionManager.cleanup_expired_sessions"],
195 | "hypothesis": "Dictionary is being modified during iteration causing RuntimeError",
196 | "confidence": "high",
197 | "continuation_id": continuation_id,
198 | },
199 | )
200 |
201 | if not response2:
202 | self.logger.error("Failed to continue investigation to step 2")
203 | return False
204 |
205 | response2_data = self._parse_debug_response(response2)
206 | if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_investigation"):
207 | return False
208 |
209 | # Check investigation status tracking
210 | investigation_status = response2_data.get("investigation_status", {})
211 | if investigation_status.get("files_checked", 0) < 2:
212 | self.logger.error("Files checked count not properly tracked")
213 | return False
214 |
215 | if investigation_status.get("relevant_context", 0) != 1:
216 | self.logger.error("Relevant context not properly tracked")
217 | return False
218 |
219 | if investigation_status.get("current_confidence") != "high":
220 | self.logger.error("Confidence level not properly tracked")
221 | return False
222 |
223 | self.logger.info(" ✅ Step 2 successful with proper tracking")
224 |
225 | # Store continuation_id for next test
226 | self.investigation_continuation_id = continuation_id
227 | return True
228 |
229 | except Exception as e:
230 | self.logger.error(f"Single investigation session test failed: {e}")
231 | return False
232 |
233 | def _test_investigation_refine_flow(self) -> bool:
234 | """Test investigation flow that requires refining the approach"""
235 | try:
236 | self.logger.info(" 1.2: Testing investigation refinement workflow")
237 |
238 | # Start a new investigation for testing refinement behaviour
239 | self.logger.info(" 1.2.1: Start investigation for refinement test")
240 | response1, continuation_id = self.call_mcp_tool(
241 | "debug",
242 | {
243 | "step": "Investigating performance degradation in data processing pipeline",
244 | "step_number": 1,
245 | "total_steps": 4,
246 | "next_step_required": True,
247 | "findings": "Initial analysis shows slow database queries",
248 | "files_checked": ["/db/queries.py"],
249 | "relevant_files": ["/db/queries.py"],
250 | },
251 | )
252 |
253 | if not response1 or not continuation_id:
254 | self.logger.error("Failed to start refinement test investigation")
255 | return False
256 |
257 | # Step 2: Wrong direction
258 | self.logger.info(" 1.2.2: Step 2 - Wrong investigation path")
259 | response2, _ = self.call_mcp_tool(
260 | "debug",
261 | {
262 | "step": "Focusing on database optimization strategies",
263 | "step_number": 2,
264 | "total_steps": 4,
265 | "next_step_required": True,
266 | "findings": "Database queries seem optimized, might be looking in wrong place",
267 | "files_checked": ["/db/queries.py", "/db/indexes.py"],
268 | "relevant_files": [],
269 | "hypothesis": "Database performance issues",
270 | "confidence": "low",
271 | "continuation_id": continuation_id,
272 | },
273 | )
274 |
275 | if not response2:
276 | self.logger.error("Failed to continue to step 2")
277 | return False
278 |
279 | # Step 3: Backtrack from step 2
280 | self.logger.info(" 1.2.3: Step 3 - Refine investigation path")
281 | response3, _ = self.call_mcp_tool(
282 | "debug",
283 | {
284 | "step": "Refocusing - the issue might not be database related. Let me investigate the data processing algorithm instead.",
285 | "step_number": 3,
286 | "total_steps": 4,
287 | "next_step_required": True,
288 | "findings": "Found inefficient nested loops in data processor causing O(n²) complexity",
289 | "files_checked": ["/processor/algorithm.py"],
290 | "relevant_files": ["/processor/algorithm.py"],
291 | "relevant_context": ["DataProcessor.process_batch"],
292 | "hypothesis": "Inefficient algorithm causing performance issues",
293 | "confidence": "medium",
294 | "continuation_id": continuation_id,
295 | },
296 | )
297 |
298 | if not response3:
299 | self.logger.error("Failed to refine investigation")
300 | return False
301 |
302 | response3_data = self._parse_debug_response(response3)
303 | if not self._validate_step_response(response3_data, 3, 4, True, "pause_for_investigation"):
304 | return False
305 |
306 | self.logger.info(" ✅ Investigation refinement working correctly")
307 | return True
308 |
309 | except Exception as e:
310 | self.logger.error(f"Investigation refinement test failed: {e}")
311 | return False
312 |
313 | def _test_complete_investigation_with_analysis(self) -> bool:
314 | """Test complete investigation ending with expert analysis"""
315 | try:
316 | self.logger.info(" 1.3: Testing complete investigation with expert analysis")
317 |
318 | # Use the continuation from first test
319 | continuation_id = getattr(self, "investigation_continuation_id", None)
320 | if not continuation_id:
321 | # Start fresh if no continuation available
322 | self.logger.info(" 1.3.0: Starting fresh investigation")
323 | response0, continuation_id = self.call_mcp_tool(
324 | "debug",
325 | {
326 | "step": "Investigating the dictionary iteration bug in session cleanup",
327 | "step_number": 1,
328 | "total_steps": 2,
329 | "next_step_required": True,
330 | "findings": "Found dictionary modification during iteration",
331 | "files_checked": [self.buggy_file],
332 | "relevant_files": [self.buggy_file],
333 | "relevant_context": ["SessionManager.cleanup_expired_sessions"],
334 | },
335 | )
336 | if not response0 or not continuation_id:
337 | self.logger.error("Failed to start fresh investigation")
338 | return False
339 |
340 | # Final step - trigger expert analysis
341 | self.logger.info(" 1.3.1: Final step - complete investigation")
342 | response_final, _ = self.call_mcp_tool(
343 | "debug",
344 | {
345 | "step": "Investigation complete. The root cause is confirmed: cleanup_expired_sessions modifies the dictionary while iterating, causing RuntimeError.",
346 | "step_number": 2,
347 | "total_steps": 2,
348 | "next_step_required": False, # Final step - triggers expert analysis
349 | "findings": "Root cause identified: del self.active_sessions[session_id] on line 46 modifies dictionary during iteration starting at line 44. Fix: collect expired IDs first, then delete.",
350 | "files_checked": [self.buggy_file],
351 | "relevant_files": [self.buggy_file],
352 | "relevant_context": ["SessionManager.cleanup_expired_sessions"],
353 | "hypothesis": "Dictionary modification during iteration causes RuntimeError in cleanup_expired_sessions",
354 | "confidence": "high",
355 | "continuation_id": continuation_id,
356 | "model": "flash", # Use flash for expert analysis
357 | },
358 | )
359 |
360 | if not response_final:
361 | self.logger.error("Failed to complete investigation")
362 | return False
363 |
364 | response_final_data = self._parse_debug_response(response_final)
365 | if not response_final_data:
366 | return False
367 |
368 | # Validate final response structure - expect calling_expert_analysis for next_step_required=False
369 | if response_final_data.get("status") != "calling_expert_analysis":
370 | self.logger.error(
371 | f"Expected status 'calling_expert_analysis', got '{response_final_data.get('status')}'"
372 | )
373 | return False
374 |
375 | if not response_final_data.get("investigation_complete"):
376 | self.logger.error("Expected investigation_complete=true for final step")
377 | return False
378 |
379 | # Check for expert analysis
380 | if "expert_analysis" not in response_final_data:
381 | self.logger.error("Missing expert_analysis in final response")
382 | return False
383 |
384 | expert_analysis = response_final_data.get("expert_analysis", {})
385 |
386 | # Check for expected analysis content (checking common patterns)
387 | analysis_text = json.dumps(expert_analysis, ensure_ascii=False).lower()
388 |
389 | # Look for bug identification
390 | bug_indicators = ["dictionary", "iteration", "modify", "runtime", "error", "del"]
391 | found_indicators = sum(1 for indicator in bug_indicators if indicator in analysis_text)
392 |
393 | if found_indicators >= 3:
394 | self.logger.info(" ✅ Expert analysis identified the bug correctly")
395 | else:
396 | self.logger.warning(
397 | f" ⚠️ Expert analysis may not have fully identified the bug (found {found_indicators}/6 indicators)"
398 | )
399 |
400 | # Check complete investigation summary
401 | if "complete_investigation" not in response_final_data:
402 | self.logger.error("Missing complete_investigation in final response")
403 | return False
404 |
405 | complete_investigation = response_final_data["complete_investigation"]
406 | if not complete_investigation.get("relevant_context"):
407 | self.logger.error("Missing relevant context in complete investigation")
408 | return False
409 |
410 | if "SessionManager.cleanup_expired_sessions" not in complete_investigation["relevant_context"]:
411 | self.logger.error("Expected method not found in investigation summary")
412 | return False
413 |
414 | self.logger.info(" ✅ Complete investigation with expert analysis successful")
415 | return True
416 |
417 | except Exception as e:
418 | self.logger.error(f"Complete investigation test failed: {e}")
419 | return False
420 |
421 | def _test_certain_confidence(self) -> bool:
422 | """Test certain confidence behavior - should skip expert analysis"""
423 | try:
424 | self.logger.info(" 1.4: Testing certain confidence behavior")
425 |
426 | # Test certain confidence - should skip expert analysis
427 | self.logger.info(" 1.4.1: Certain confidence investigation")
428 | response_certain, _ = self.call_mcp_tool(
429 | "debug",
430 | {
431 | "step": "I have confirmed the exact root cause with 100% certainty: dictionary modification during iteration.",
432 | "step_number": 1,
433 | "total_steps": 1,
434 | "next_step_required": False, # Final step
435 | "findings": "The bug is on line 44-47: for loop iterates over dict.items() while del modifies the dict inside the loop. Fix is simple: collect expired IDs first, then delete after iteration.",
436 | "files_checked": [self.buggy_file],
437 | "relevant_files": [self.buggy_file],
438 | "relevant_context": ["SessionManager.cleanup_expired_sessions"],
439 | "hypothesis": "Dictionary modification during iteration causes RuntimeError - fix is straightforward",
440 | "confidence": "certain", # This should skip expert analysis
441 | "model": "flash",
442 | },
443 | )
444 |
445 | if not response_certain:
446 | self.logger.error("Failed to test certain confidence")
447 | return False
448 |
449 | response_certain_data = self._parse_debug_response(response_certain)
450 | if not response_certain_data:
451 | return False
452 |
453 | # Validate certain confidence response - should skip expert analysis
454 | if response_certain_data.get("status") != "certain_confidence_proceed_with_fix":
455 | self.logger.error(
456 | f"Expected status 'certain_confidence_proceed_with_fix', got '{response_certain_data.get('status')}'"
457 | )
458 | return False
459 |
460 | if not response_certain_data.get("skip_expert_analysis"):
461 | self.logger.error("Expected skip_expert_analysis=true for certain confidence")
462 | return False
463 |
464 | expert_analysis = response_certain_data.get("expert_analysis", {})
465 | if expert_analysis.get("status") != "skipped_due_to_certain_confidence":
466 | self.logger.error("Expert analysis should be skipped for certain confidence")
467 | return False
468 |
469 | self.logger.info(" ✅ Certain confidence behavior working correctly")
470 | return True
471 |
472 | except Exception as e:
473 | self.logger.error(f"Certain confidence test failed: {e}")
474 | return False
475 |
476 | def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
477 | """Call an MCP tool in-process - override for debug-specific response handling"""
478 | # Use in-process implementation to maintain conversation memory
479 | response_text, _ = self.call_mcp_tool_direct(tool_name, params)
480 |
481 | if not response_text:
482 | return None, None
483 |
484 | # Extract continuation_id from debug response specifically
485 | continuation_id = self._extract_debug_continuation_id(response_text)
486 |
487 | return response_text, continuation_id
488 |
489 | def _extract_debug_continuation_id(self, response_text: str) -> Optional[str]:
490 | """Extract continuation_id from debug response"""
491 | try:
492 | # Parse the response
493 | response_data = json.loads(response_text)
494 | return response_data.get("continuation_id")
495 |
496 | except json.JSONDecodeError as e:
497 | self.logger.debug(f"Failed to parse response for debug continuation_id: {e}")
498 | return None
499 |
500 | def _parse_debug_response(self, response_text: str) -> dict:
501 | """Parse debug tool JSON response"""
502 | try:
503 | # Parse the response - it should be direct JSON
504 | return json.loads(response_text)
505 |
506 | except json.JSONDecodeError as e:
507 | self.logger.error(f"Failed to parse debug response as JSON: {e}")
508 | self.logger.error(f"Response text: {response_text[:500]}...")
509 | return {}
510 |
511 | def _validate_step_response(
512 | self,
513 | response_data: dict,
514 | expected_step: int,
515 | expected_total: int,
516 | expected_next_required: bool,
517 | expected_status: str,
518 | ) -> bool:
519 | """Validate a debug investigation step response structure"""
520 | try:
521 | # Check status
522 | if response_data.get("status") != expected_status:
523 | self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
524 | return False
525 |
526 | # Check step number
527 | if response_data.get("step_number") != expected_step:
528 | self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
529 | return False
530 |
531 | # Check total steps
532 | if response_data.get("total_steps") != expected_total:
533 | self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
534 | return False
535 |
536 | # Check next_step_required
537 | if response_data.get("next_step_required") != expected_next_required:
538 | self.logger.error(
539 | f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
540 | )
541 | return False
542 |
543 | # Check investigation_status exists
544 | if "investigation_status" not in response_data:
545 | self.logger.error("Missing investigation_status in response")
546 | return False
547 |
548 | # Check next_steps guidance
549 | if not response_data.get("next_steps"):
550 | self.logger.error("Missing next_steps guidance in response")
551 | return False
552 |
553 | return True
554 |
555 | except Exception as e:
556 | self.logger.error(f"Error validating step response: {e}")
557 | return False
558 |
559 | def _test_context_aware_file_embedding(self) -> bool:
560 | """Test context-aware file embedding optimization"""
561 | try:
562 | self.logger.info(" 1.5: Testing context-aware file embedding")
563 |
564 | # Create multiple test files for context testing
565 | file1_content = """#!/usr/bin/env python3
566 | def process_data(data):
567 | \"\"\"Process incoming data\"\"\"
568 | result = []
569 | for item in data:
570 | if item.get('valid'):
571 | result.append(item['value'])
572 | return result
573 | """
574 |
575 | file2_content = """#!/usr/bin/env python3
576 | def validate_input(data):
577 | \"\"\"Validate input data\"\"\"
578 | if not isinstance(data, list):
579 | raise ValueError("Data must be a list")
580 |
581 | for item in data:
582 | if not isinstance(item, dict):
583 | raise ValueError("Items must be dictionaries")
584 | if 'value' not in item:
585 | raise ValueError("Items must have 'value' key")
586 |
587 | return True
588 | """
589 |
590 | # Create test files
591 | file1 = self.create_additional_test_file("data_processor.py", file1_content)
592 | file2 = self.create_additional_test_file("validator.py", file2_content)
593 |
594 | # Test 1: New conversation, intermediate step - should only reference files
595 | self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
596 | response1, continuation_id = self.call_mcp_tool(
597 | "debug",
598 | {
599 | "step": "Starting investigation of data processing pipeline",
600 | "step_number": 1,
601 | "total_steps": 3,
602 | "next_step_required": True, # Intermediate step
603 | "findings": "Initial analysis of data processing components",
604 | "files_checked": [file1, file2],
605 | "relevant_files": [file1], # This should be referenced, not embedded
606 | "relevant_context": ["process_data"],
607 | "hypothesis": "Investigating data flow",
608 | "confidence": "low",
609 | "model": "flash",
610 | },
611 | )
612 |
613 | if not response1 or not continuation_id:
614 | self.logger.error("Failed to start context-aware file embedding test")
615 | return False
616 |
617 | response1_data = self._parse_debug_response(response1)
618 | if not response1_data:
619 | return False
620 |
621 | # Check file context - should be reference_only for intermediate step
622 | file_context = response1_data.get("file_context", {})
623 | if file_context.get("type") != "reference_only":
624 | self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
625 | return False
626 |
627 | if "Files referenced but not embedded" not in file_context.get("context_optimization", ""):
628 | self.logger.error("Expected context optimization message for reference_only")
629 | return False
630 |
631 | self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
632 |
633 | # Test 2: Intermediate step with continuation - should still only reference
634 | self.logger.info(" 1.5.2: Intermediate step with continuation (should reference only)")
635 | response2, _ = self.call_mcp_tool(
636 | "debug",
637 | {
638 | "step": "Continuing investigation with more detailed analysis",
639 | "step_number": 2,
640 | "total_steps": 3,
641 | "next_step_required": True, # Still intermediate
642 | "continuation_id": continuation_id,
643 | "findings": "Found potential issues in validation logic",
644 | "files_checked": [file1, file2],
645 | "relevant_files": [file1, file2], # Both files referenced
646 | "relevant_context": ["process_data", "validate_input"],
647 | "hypothesis": "Validation might be too strict",
648 | "confidence": "medium",
649 | "model": "flash",
650 | },
651 | )
652 |
653 | if not response2:
654 | self.logger.error("Failed to continue to step 2")
655 | return False
656 |
657 | response2_data = self._parse_debug_response(response2)
658 | if not response2_data:
659 | return False
660 |
661 | # Check file context - should still be reference_only
662 | file_context2 = response2_data.get("file_context", {})
663 | if file_context2.get("type") != "reference_only":
664 | self.logger.error(f"Expected reference_only file context for step 2, got: {file_context2.get('type')}")
665 | return False
666 |
667 | # Should include reference note
668 | if not file_context2.get("note"):
669 | self.logger.error("Expected file reference note for intermediate step")
670 | return False
671 |
672 | reference_note = file_context2.get("note", "")
673 | if "data_processor.py" not in reference_note or "validator.py" not in reference_note:
674 | self.logger.error("File reference note should mention both files")
675 | return False
676 |
677 | self.logger.info(" ✅ Intermediate step with continuation correctly uses reference_only")
678 |
679 | # Test 3: Final step - should embed files for expert analysis
680 | self.logger.info(" 1.5.3: Final step (should embed files)")
681 | response3, _ = self.call_mcp_tool(
682 | "debug",
683 | {
684 | "step": "Investigation complete - identified the root cause",
685 | "step_number": 3,
686 | "total_steps": 3,
687 | "next_step_required": False, # Final step - should embed files
688 | "continuation_id": continuation_id,
689 | "findings": "Root cause: validator is rejecting valid data due to strict type checking",
690 | "files_checked": [file1, file2],
691 | "relevant_files": [file1, file2], # Should be fully embedded
692 | "relevant_context": ["process_data", "validate_input"],
693 | "hypothesis": "Validation logic is too restrictive for valid edge cases",
694 | "confidence": "high",
695 | "model": "flash",
696 | },
697 | )
698 |
699 | if not response3:
700 | self.logger.error("Failed to complete to final step")
701 | return False
702 |
703 | response3_data = self._parse_debug_response(response3)
704 | if not response3_data:
705 | return False
706 |
707 | # Check file context - should be fully_embedded for final step
708 | file_context3 = response3_data.get("file_context", {})
709 | if file_context3.get("type") != "fully_embedded":
710 | self.logger.error(
711 | f"Expected fully_embedded file context for final step, got: {file_context3.get('type')}"
712 | )
713 | return False
714 |
715 | if "Full file content embedded for expert analysis" not in file_context3.get("context_optimization", ""):
716 | self.logger.error("Expected expert analysis optimization message for fully_embedded")
717 | return False
718 |
719 | # Should show files embedded count
720 | files_embedded = file_context3.get("files_embedded", 0)
721 | if files_embedded == 0:
722 | # This is OK - files might already be in conversation history
723 | self.logger.info(
724 | " ℹ️ Files embedded count is 0 - files already in conversation history (smart deduplication)"
725 | )
726 | else:
727 | self.logger.info(f" ✅ Files embedded count: {files_embedded}")
728 |
729 | self.logger.info(" ✅ Final step correctly uses fully_embedded file context")
730 |
731 | # Verify expert analysis was called for final step
732 | if response3_data.get("status") != "calling_expert_analysis":
733 | self.logger.error("Final step should trigger expert analysis")
734 | return False
735 |
736 | if "expert_analysis" not in response3_data:
737 | self.logger.error("Expert analysis should be present in final step")
738 | return False
739 |
740 | self.logger.info(" ✅ Context-aware file embedding test completed successfully")
741 | return True
742 |
743 | except Exception as e:
744 | self.logger.error(f"Context-aware file embedding test failed: {e}")
745 | return False
746 |
747 | def _test_multi_step_file_context(self) -> bool:
748 | """Test multi-step workflow with proper file context transitions"""
749 | try:
750 | self.logger.info(" 1.6: Testing multi-step file context optimization")
751 |
752 | # Create a complex scenario with multiple files
753 | config_content = """#!/usr/bin/env python3
754 | import os
755 |
756 | DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///app.db')
757 | DEBUG_MODE = os.getenv('DEBUG', 'False').lower() == 'true'
758 | MAX_CONNECTIONS = int(os.getenv('MAX_CONNECTIONS', '10'))
759 |
760 | # Bug: This will cause issues when MAX_CONNECTIONS is not a valid integer
761 | CACHE_SIZE = MAX_CONNECTIONS * 2 # Problematic if MAX_CONNECTIONS is invalid
762 | """
763 |
764 | server_content = """#!/usr/bin/env python3
765 | from config import DATABASE_URL, DEBUG_MODE, CACHE_SIZE
766 | import sqlite3
767 |
768 | class DatabaseServer:
769 | def __init__(self):
770 | self.connection_pool = []
771 | self.cache_size = CACHE_SIZE # This will fail if CACHE_SIZE is invalid
772 |
773 | def connect(self):
774 | try:
775 | conn = sqlite3.connect(DATABASE_URL)
776 | self.connection_pool.append(conn)
777 | return conn
778 | except Exception as e:
779 | print(f"Connection failed: {e}")
780 | return None
781 | """
782 |
783 | # Create test files
784 | config_file = self.create_additional_test_file("config.py", config_content)
785 | server_file = self.create_additional_test_file("database_server.py", server_content)
786 |
787 | # Step 1: Start investigation (new conversation)
788 | self.logger.info(" 1.6.1: Step 1 - Start investigation")
789 | response1, continuation_id = self.call_mcp_tool(
790 | "debug",
791 | {
792 | "step": "Investigating application startup failures in production environment",
793 | "step_number": 1,
794 | "total_steps": 4,
795 | "next_step_required": True,
796 | "findings": "Application fails to start with configuration errors",
797 | "files_checked": [config_file],
798 | "relevant_files": [config_file],
799 | "relevant_context": [],
800 | "hypothesis": "Configuration issue causing startup failure",
801 | "confidence": "low",
802 | "model": "flash",
803 | },
804 | )
805 |
806 | if not response1 or not continuation_id:
807 | self.logger.error("Failed to start multi-step file context test")
808 | return False
809 |
810 | response1_data = self._parse_debug_response(response1)
811 |
812 | # Validate step 1 - should use reference_only
813 | file_context1 = response1_data.get("file_context", {})
814 | if file_context1.get("type") != "reference_only":
815 | self.logger.error("Step 1 should use reference_only file context")
816 | return False
817 |
818 | self.logger.info(" ✅ Step 1: reference_only file context")
819 |
820 | # Step 2: Expand investigation
821 | self.logger.info(" 1.6.2: Step 2 - Expand investigation")
822 | response2, _ = self.call_mcp_tool(
823 | "debug",
824 | {
825 | "step": "Found configuration issue - investigating database server initialization",
826 | "step_number": 2,
827 | "total_steps": 4,
828 | "next_step_required": True,
829 | "continuation_id": continuation_id,
830 | "findings": "MAX_CONNECTIONS environment variable contains invalid value, causing CACHE_SIZE calculation to fail",
831 | "files_checked": [config_file, server_file],
832 | "relevant_files": [config_file, server_file],
833 | "relevant_context": ["DatabaseServer.__init__"],
834 | "hypothesis": "Invalid environment variable causing integer conversion error",
835 | "confidence": "medium",
836 | "model": "flash",
837 | },
838 | )
839 |
840 | if not response2:
841 | self.logger.error("Failed to continue to step 2")
842 | return False
843 |
844 | response2_data = self._parse_debug_response(response2)
845 |
846 | # Validate step 2 - should still use reference_only
847 | file_context2 = response2_data.get("file_context", {})
848 | if file_context2.get("type") != "reference_only":
849 | self.logger.error("Step 2 should use reference_only file context")
850 | return False
851 |
852 | # Should reference both files
853 | reference_note = file_context2.get("note", "")
854 | if "config.py" not in reference_note or "database_server.py" not in reference_note:
855 | self.logger.error("Step 2 should reference both files in note")
856 | return False
857 |
858 | self.logger.info(" ✅ Step 2: reference_only file context with multiple files")
859 |
860 | # Step 3: Deep analysis
861 | self.logger.info(" 1.6.3: Step 3 - Deep analysis")
862 | response3, _ = self.call_mcp_tool(
863 | "debug",
864 | {
865 | "step": "Analyzing the exact error propagation path and impact",
866 | "step_number": 3,
867 | "total_steps": 4,
868 | "next_step_required": True,
869 | "continuation_id": continuation_id,
870 | "findings": "Error occurs in config.py line 8 when MAX_CONNECTIONS is not numeric, then propagates to DatabaseServer.__init__",
871 | "files_checked": [config_file, server_file],
872 | "relevant_files": [config_file, server_file],
873 | "relevant_context": ["DatabaseServer.__init__"],
874 | "hypothesis": "Need proper error handling and validation for environment variables",
875 | "confidence": "high",
876 | "model": "flash",
877 | },
878 | )
879 |
880 | if not response3:
881 | self.logger.error("Failed to continue to step 3")
882 | return False
883 |
884 | response3_data = self._parse_debug_response(response3)
885 |
886 | # Validate step 3 - should still use reference_only
887 | file_context3 = response3_data.get("file_context", {})
888 | if file_context3.get("type") != "reference_only":
889 | self.logger.error("Step 3 should use reference_only file context")
890 | return False
891 |
892 | self.logger.info(" ✅ Step 3: reference_only file context")
893 |
894 | # Step 4: Final analysis with expert consultation
895 | self.logger.info(" 1.6.4: Step 4 - Final step with expert analysis")
896 | response4, _ = self.call_mcp_tool(
897 | "debug",
898 | {
899 | "step": "Investigation complete - root cause identified with solution",
900 | "step_number": 4,
901 | "total_steps": 4,
902 | "next_step_required": False, # Final step - should embed files
903 | "continuation_id": continuation_id,
904 | "findings": "Root cause: config.py assumes MAX_CONNECTIONS env var is always a valid integer. Fix: add try/except with default value and proper validation.",
905 | "files_checked": [config_file, server_file],
906 | "relevant_files": [config_file, server_file],
907 | "relevant_context": ["DatabaseServer.__init__"],
908 | "hypothesis": "Environment variable validation needed with proper error handling",
909 | "confidence": "high",
910 | "model": "flash",
911 | },
912 | )
913 |
914 | if not response4:
915 | self.logger.error("Failed to complete to final step")
916 | return False
917 |
918 | response4_data = self._parse_debug_response(response4)
919 |
920 | # Validate step 4 - should use fully_embedded for expert analysis
921 | file_context4 = response4_data.get("file_context", {})
922 | if file_context4.get("type") != "fully_embedded":
923 | self.logger.error("Step 4 (final) should use fully_embedded file context")
924 | return False
925 |
926 | if "expert analysis" not in file_context4.get("context_optimization", "").lower():
927 | self.logger.error("Final step should mention expert analysis in context optimization")
928 | return False
929 |
930 | # Verify expert analysis was triggered
931 | if response4_data.get("status") != "calling_expert_analysis":
932 | self.logger.error("Final step should trigger expert analysis")
933 | return False
934 |
935 | # Check that expert analysis has file context
936 | expert_analysis = response4_data.get("expert_analysis", {})
937 | if not expert_analysis:
938 | self.logger.error("Expert analysis should be present in final step")
939 | return False
940 |
941 | self.logger.info(" ✅ Step 4: fully_embedded file context with expert analysis")
942 |
943 | # Validate the complete workflow progression
944 | progression_summary = {
945 | "step_1": "reference_only (new conversation, intermediate)",
946 | "step_2": "reference_only (continuation, intermediate)",
947 | "step_3": "reference_only (continuation, intermediate)",
948 | "step_4": "fully_embedded (continuation, final)",
949 | }
950 |
951 | self.logger.info(" 📋 File context progression:")
952 | for step, context_type in progression_summary.items():
953 | self.logger.info(f" {step}: {context_type}")
954 |
955 | self.logger.info(" ✅ Multi-step file context optimization test completed successfully")
956 | return True
957 |
958 | except Exception as e:
959 | self.logger.error(f"Multi-step file context test failed: {e}")
960 | return False
961 |
```
--------------------------------------------------------------------------------
/tools/simple/base.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Base class for simple MCP tools.
3 |
4 | Simple tools follow a straightforward pattern:
5 | 1. Receive request
6 | 2. Prepare prompt (with absolute file paths, context, etc.)
7 | 3. Call AI model
8 | 4. Format and return response
9 |
10 | They use the shared SchemaBuilder for consistent schema generation
11 | and inherit all the conversation, file processing, and model handling
12 | capabilities from BaseTool.
13 | """
14 |
15 | from abc import abstractmethod
16 | from typing import Any, Optional
17 |
18 | from tools.shared.base_models import ToolRequest
19 | from tools.shared.base_tool import BaseTool
20 | from tools.shared.exceptions import ToolExecutionError
21 | from tools.shared.schema_builders import SchemaBuilder
22 |
23 |
24 | class SimpleTool(BaseTool):
25 | """
26 | Base class for simple (non-workflow) tools.
27 |
28 | Simple tools are request/response tools that don't require multi-step workflows.
29 | They benefit from:
30 | - Automatic schema generation using SchemaBuilder
31 | - Inherited conversation handling and file processing
32 | - Standardized model integration
33 | - Consistent error handling and response formatting
34 |
35 | To create a simple tool:
36 | 1. Inherit from SimpleTool
37 | 2. Implement get_tool_fields() to define tool-specific fields
38 | 3. Implement prepare_prompt() for prompt preparation
39 | 4. Optionally override format_response() for custom formatting
40 | 5. Optionally override get_required_fields() for custom requirements
41 |
42 | Example:
43 | class ChatTool(SimpleTool):
44 | def get_name(self) -> str:
45 | return "chat"
46 |
47 | def get_tool_fields(self) -> Dict[str, Dict[str, Any]]:
48 | return {
49 | "prompt": {
50 | "type": "string",
51 | "description": "Your question or idea...",
52 | },
53 | "absolute_file_paths": SimpleTool.FILES_FIELD,
54 | }
55 |
56 | def get_required_fields(self) -> List[str]:
57 | return ["prompt"]
58 | """
59 |
60 | # Common field definitions that simple tools can reuse
61 | FILES_FIELD = SchemaBuilder.SIMPLE_FIELD_SCHEMAS["absolute_file_paths"]
62 | IMAGES_FIELD = SchemaBuilder.COMMON_FIELD_SCHEMAS["images"]
63 |
64 | @abstractmethod
65 | def get_tool_fields(self) -> dict[str, dict[str, Any]]:
66 | """
67 | Return tool-specific field definitions.
68 |
69 | This method should return a dictionary mapping field names to their
70 | JSON schema definitions. Common fields (model, temperature, etc.)
71 | are added automatically by the base class.
72 |
73 | Returns:
74 | Dict mapping field names to JSON schema objects
75 |
76 | Example:
77 | return {
78 | "prompt": {
79 | "type": "string",
80 | "description": "The user's question or request",
81 | },
82 | "absolute_file_paths": SimpleTool.FILES_FIELD, # Reuse common field
83 | "max_tokens": {
84 | "type": "integer",
85 | "minimum": 1,
86 | "description": "Maximum tokens for response",
87 | }
88 | }
89 | """
90 | pass
91 |
92 | def get_required_fields(self) -> list[str]:
93 | """
94 | Return list of required field names.
95 |
96 | Override this to specify which fields are required for your tool.
97 | The model field is automatically added if in auto mode.
98 |
99 | Returns:
100 | List of required field names
101 | """
102 | return []
103 |
104 | def get_annotations(self) -> Optional[dict[str, Any]]:
105 | """
106 | Return tool annotations. Simple tools are read-only by default.
107 |
108 | All simple tools perform operations without modifying the environment.
109 | They may call external AI models for analysis or conversation, but they
110 | don't write files or make system changes.
111 |
112 | Override this method if your simple tool needs different annotations.
113 |
114 | Returns:
115 | Dictionary with readOnlyHint set to True
116 | """
117 | return {"readOnlyHint": True}
118 |
119 | def format_response(self, response: str, request, model_info: Optional[dict] = None) -> str:
120 | """
121 | Format the AI response before returning to the client.
122 |
123 | This is a hook method that subclasses can override to customize
124 | response formatting. The default implementation returns the response as-is.
125 |
126 | Args:
127 | response: The raw response from the AI model
128 | request: The validated request object
129 | model_info: Optional model information dictionary
130 |
131 | Returns:
132 | Formatted response string
133 | """
134 | return response
135 |
136 | def get_input_schema(self) -> dict[str, Any]:
137 | """
138 | Generate the complete input schema using SchemaBuilder.
139 |
140 | This method automatically combines:
141 | - Tool-specific fields from get_tool_fields()
142 | - Common fields (temperature, thinking_mode, etc.)
143 | - Model field with proper auto-mode handling
144 | - Required fields from get_required_fields()
145 |
146 | Tools can override this method for custom schema generation while
147 | still benefiting from SimpleTool's convenience methods.
148 |
149 | Returns:
150 | Complete JSON schema for the tool
151 | """
152 | required_fields = list(self.get_required_fields())
153 | return SchemaBuilder.build_schema(
154 | tool_specific_fields=self.get_tool_fields(),
155 | required_fields=required_fields,
156 | model_field_schema=self.get_model_field_schema(),
157 | auto_mode=self.is_effective_auto_mode(),
158 | )
159 |
160 | def get_request_model(self):
161 | """
162 | Return the request model class.
163 |
164 | Simple tools use the base ToolRequest by default.
165 | Override this if your tool needs a custom request model.
166 | """
167 | return ToolRequest
168 |
169 | # Hook methods for safe attribute access without hasattr/getattr
170 |
171 | def get_request_model_name(self, request) -> Optional[str]:
172 | """Get model name from request. Override for custom model name handling."""
173 | try:
174 | return request.model
175 | except AttributeError:
176 | return None
177 |
178 | def get_request_images(self, request) -> list:
179 | """Get images from request. Override for custom image handling."""
180 | try:
181 | return request.images if request.images is not None else []
182 | except AttributeError:
183 | return []
184 |
185 | def get_request_continuation_id(self, request) -> Optional[str]:
186 | """Get continuation_id from request. Override for custom continuation handling."""
187 | try:
188 | return request.continuation_id
189 | except AttributeError:
190 | return None
191 |
192 | def get_request_prompt(self, request) -> str:
193 | """Get prompt from request. Override for custom prompt handling."""
194 | try:
195 | return request.prompt
196 | except AttributeError:
197 | return ""
198 |
199 | def get_request_temperature(self, request) -> Optional[float]:
200 | """Get temperature from request. Override for custom temperature handling."""
201 | try:
202 | return request.temperature
203 | except AttributeError:
204 | return None
205 |
206 | def get_validated_temperature(self, request, model_context: Any) -> tuple[float, list[str]]:
207 | """
208 | Get temperature from request and validate it against model constraints.
209 |
210 | This is a convenience method that combines temperature extraction and validation
211 | for simple tools. It ensures temperature is within valid range for the model.
212 |
213 | Args:
214 | request: The request object containing temperature
215 | model_context: Model context object containing model info
216 |
217 | Returns:
218 | Tuple of (validated_temperature, warning_messages)
219 | """
220 | temperature = self.get_request_temperature(request)
221 | if temperature is None:
222 | temperature = self.get_default_temperature()
223 | return self.validate_and_correct_temperature(temperature, model_context)
224 |
225 | def get_request_thinking_mode(self, request) -> Optional[str]:
226 | """Get thinking_mode from request. Override for custom thinking mode handling."""
227 | try:
228 | return request.thinking_mode
229 | except AttributeError:
230 | return None
231 |
232 | def get_request_files(self, request) -> list:
233 | """Get absolute file paths from request. Override for custom file handling."""
234 | try:
235 | files = request.absolute_file_paths
236 | except AttributeError:
237 | files = None
238 | if files is None:
239 | return []
240 | return files
241 |
242 | def get_request_as_dict(self, request) -> dict:
243 | """Convert request to dictionary. Override for custom serialization."""
244 | try:
245 | # Try Pydantic v2 method first
246 | return request.model_dump()
247 | except AttributeError:
248 | try:
249 | # Fall back to Pydantic v1 method
250 | return request.dict()
251 | except AttributeError:
252 | # Last resort - convert to dict manually
253 | return {"prompt": self.get_request_prompt(request)}
254 |
255 | def set_request_files(self, request, files: list) -> None:
256 | """Set absolute file paths on request. Override for custom file setting."""
257 | try:
258 | request.absolute_file_paths = files
259 | except AttributeError:
260 | pass
261 |
262 | def get_actually_processed_files(self) -> list:
263 | """Get actually processed files. Override for custom file tracking."""
264 | try:
265 | return self._actually_processed_files
266 | except AttributeError:
267 | return []
268 |
269 | async def execute(self, arguments: dict[str, Any]) -> list:
270 | """
271 | Execute the simple tool using the comprehensive flow from old base.py.
272 |
273 | This method replicates the proven execution pattern while using SimpleTool hooks.
274 | """
275 | import logging
276 |
277 | from mcp.types import TextContent
278 |
279 | from tools.models import ToolOutput
280 |
281 | logger = logging.getLogger(f"tools.{self.get_name()}")
282 |
283 | try:
284 | # Store arguments for access by helper methods
285 | self._current_arguments = arguments
286 |
287 | logger.info(f"🔧 {self.get_name()} tool called with arguments: {list(arguments.keys())}")
288 |
289 | # Validate request using the tool's Pydantic model
290 | request_model = self.get_request_model()
291 | request = request_model(**arguments)
292 | logger.debug(f"Request validation successful for {self.get_name()}")
293 |
294 | # Validate file paths for security
295 | # This prevents path traversal attacks and ensures proper access control
296 | path_error = self._validate_file_paths(request)
297 | if path_error:
298 | error_output = ToolOutput(
299 | status="error",
300 | content=path_error,
301 | content_type="text",
302 | )
303 | logger.error("Path validation failed for %s: %s", self.get_name(), path_error)
304 | raise ToolExecutionError(error_output.model_dump_json())
305 |
306 | # Handle model resolution like old base.py
307 | model_name = self.get_request_model_name(request)
308 | if not model_name:
309 | from config import DEFAULT_MODEL
310 |
311 | model_name = DEFAULT_MODEL
312 |
313 | # Store the current model name for later use
314 | self._current_model_name = model_name
315 |
316 | # Handle model context from arguments (for in-process testing)
317 | if "_model_context" in arguments:
318 | self._model_context = arguments["_model_context"]
319 | logger.debug(f"{self.get_name()}: Using model context from arguments")
320 | else:
321 | # Create model context if not provided
322 | from utils.model_context import ModelContext
323 |
324 | self._model_context = ModelContext(model_name)
325 | logger.debug(f"{self.get_name()}: Created model context for {model_name}")
326 |
327 | # Get images if present
328 | images = self.get_request_images(request)
329 | continuation_id = self.get_request_continuation_id(request)
330 |
331 | # Handle conversation history and prompt preparation
332 | if continuation_id:
333 | # Check if conversation history is already embedded
334 | field_value = self.get_request_prompt(request)
335 | if "=== CONVERSATION HISTORY ===" in field_value:
336 | # Use pre-embedded history
337 | prompt = field_value
338 | logger.debug(f"{self.get_name()}: Using pre-embedded conversation history")
339 | else:
340 | # No embedded history - reconstruct it (for in-process calls)
341 | logger.debug(f"{self.get_name()}: No embedded history found, reconstructing conversation")
342 |
343 | # Get thread context
344 | from utils.conversation_memory import add_turn, build_conversation_history, get_thread
345 |
346 | thread_context = get_thread(continuation_id)
347 |
348 | if thread_context:
349 | # Add user's new input to conversation
350 | user_prompt = self.get_request_prompt(request)
351 | user_files = self.get_request_files(request)
352 | if user_prompt:
353 | add_turn(continuation_id, "user", user_prompt, files=user_files)
354 |
355 | # Get updated thread context after adding the turn
356 | thread_context = get_thread(continuation_id)
357 | logger.debug(
358 | f"{self.get_name()}: Retrieved updated thread with {len(thread_context.turns)} turns"
359 | )
360 |
361 | # Build conversation history with updated thread context
362 | conversation_history, conversation_tokens = build_conversation_history(
363 | thread_context, self._model_context
364 | )
365 |
366 | # Get the base prompt from the tool
367 | base_prompt = await self.prepare_prompt(request)
368 |
369 | # Combine with conversation history
370 | if conversation_history:
371 | prompt = f"{conversation_history}\n\n=== NEW USER INPUT ===\n{base_prompt}"
372 | else:
373 | prompt = base_prompt
374 | else:
375 | # Thread not found, prepare normally
376 | logger.warning(f"Thread {continuation_id} not found, preparing prompt normally")
377 | prompt = await self.prepare_prompt(request)
378 | else:
379 | # New conversation, prepare prompt normally
380 | prompt = await self.prepare_prompt(request)
381 |
382 | # Add follow-up instructions for new conversations
383 | from server import get_follow_up_instructions
384 |
385 | follow_up_instructions = get_follow_up_instructions(0)
386 | prompt = f"{prompt}\n\n{follow_up_instructions}"
387 | logger.debug(
388 | f"Added follow-up instructions for new {self.get_name()} conversation"
389 | ) # Validate images if any were provided
390 | if images:
391 | image_validation_error = self._validate_image_limits(
392 | images, model_context=self._model_context, continuation_id=continuation_id
393 | )
394 | if image_validation_error:
395 | error_output = ToolOutput(
396 | status=image_validation_error.get("status", "error"),
397 | content=image_validation_error.get("content"),
398 | content_type=image_validation_error.get("content_type", "text"),
399 | metadata=image_validation_error.get("metadata"),
400 | )
401 | payload = error_output.model_dump_json()
402 | logger.error("Image validation failed for %s: %s", self.get_name(), payload)
403 | raise ToolExecutionError(payload)
404 |
405 | # Get and validate temperature against model constraints
406 | temperature, temp_warnings = self.get_validated_temperature(request, self._model_context)
407 |
408 | # Log any temperature corrections
409 | for warning in temp_warnings:
410 | # Get thinking mode with defaults
411 | logger.warning(warning)
412 | thinking_mode = self.get_request_thinking_mode(request)
413 | if thinking_mode is None:
414 | thinking_mode = self.get_default_thinking_mode()
415 |
416 | # Get the provider from model context (clean OOP - no re-fetching)
417 | provider = self._model_context.provider
418 | capabilities = self._model_context.capabilities
419 |
420 | # Get system prompt for this tool
421 | base_system_prompt = self.get_system_prompt()
422 | capability_augmented_prompt = self._augment_system_prompt_with_capabilities(
423 | base_system_prompt, capabilities
424 | )
425 | language_instruction = self.get_language_instruction()
426 | system_prompt = language_instruction + capability_augmented_prompt
427 |
428 | # Generate AI response using the provider
429 | logger.info(f"Sending request to {provider.get_provider_type().value} API for {self.get_name()}")
430 | logger.info(
431 | f"Using model: {self._model_context.model_name} via {provider.get_provider_type().value} provider"
432 | )
433 |
434 | # Estimate tokens for logging
435 | from utils.token_utils import estimate_tokens
436 |
437 | estimated_tokens = estimate_tokens(prompt)
438 | logger.debug(f"Prompt length: {len(prompt)} characters (~{estimated_tokens:,} tokens)")
439 |
440 | # Resolve model capabilities for feature gating
441 | supports_thinking = capabilities.supports_extended_thinking
442 |
443 | # Generate content with provider abstraction
444 | model_response = provider.generate_content(
445 | prompt=prompt,
446 | model_name=self._current_model_name,
447 | system_prompt=system_prompt,
448 | temperature=temperature,
449 | thinking_mode=thinking_mode if supports_thinking else None,
450 | images=images if images else None,
451 | )
452 |
453 | logger.info(f"Received response from {provider.get_provider_type().value} API for {self.get_name()}")
454 |
455 | # Process the model's response
456 | if model_response.content:
457 | raw_text = model_response.content
458 |
459 | # Create model info for conversation tracking
460 | model_info = {
461 | "provider": provider,
462 | "model_name": self._current_model_name,
463 | "model_response": model_response,
464 | }
465 |
466 | # Parse response using the same logic as old base.py
467 | tool_output = self._parse_response(raw_text, request, model_info)
468 | logger.info(f"✅ {self.get_name()} tool completed successfully")
469 |
470 | else:
471 | # Handle cases where the model couldn't generate a response
472 | metadata = model_response.metadata or {}
473 | finish_reason = metadata.get("finish_reason", "Unknown")
474 |
475 | if metadata.get("is_blocked_by_safety"):
476 | # Specific handling for content safety blocks
477 | safety_details = metadata.get("safety_feedback") or "details not provided"
478 | logger.warning(
479 | f"Response blocked by content safety policy for {self.get_name()}. "
480 | f"Reason: {finish_reason}, Details: {safety_details}"
481 | )
482 | tool_output = ToolOutput(
483 | status="error",
484 | content="Your request was blocked by the content safety policy. "
485 | "Please try modifying your prompt.",
486 | content_type="text",
487 | )
488 | else:
489 | # Handle other empty responses - could be legitimate completion or unclear blocking
490 | if finish_reason == "STOP":
491 | # Model completed normally but returned empty content - retry with clarification
492 | logger.info(
493 | f"Model completed with empty response for {self.get_name()}, retrying with clarification"
494 | )
495 |
496 | # Retry the same request with modified prompt asking for explicit response
497 | original_prompt = prompt
498 | retry_prompt = f"{original_prompt}\n\nIMPORTANT: Please provide a substantive response. If you cannot respond to the above request, please explain why and suggest alternatives."
499 |
500 | try:
501 | retry_response = provider.generate_content(
502 | prompt=retry_prompt,
503 | model_name=self._current_model_name,
504 | system_prompt=system_prompt,
505 | temperature=temperature,
506 | thinking_mode=thinking_mode if supports_thinking else None,
507 | images=images if images else None,
508 | )
509 |
510 | if retry_response.content:
511 | # Successful retry - use the retry response
512 | logger.info(f"Retry successful for {self.get_name()}")
513 | raw_text = retry_response.content
514 |
515 | # Update model info for the successful retry
516 | model_info = {
517 | "provider": provider,
518 | "model_name": self._current_model_name,
519 | "model_response": retry_response,
520 | }
521 |
522 | # Parse the retry response
523 | tool_output = self._parse_response(raw_text, request, model_info)
524 | logger.info(f"✅ {self.get_name()} tool completed successfully after retry")
525 | else:
526 | # Retry also failed - inspect metadata to find out why
527 | retry_metadata = retry_response.metadata or {}
528 | if retry_metadata.get("is_blocked_by_safety"):
529 | # The retry was blocked by safety filters
530 | safety_details = retry_metadata.get("safety_feedback") or "details not provided"
531 | logger.warning(
532 | f"Retry for {self.get_name()} was blocked by content safety policy. "
533 | f"Details: {safety_details}"
534 | )
535 | tool_output = ToolOutput(
536 | status="error",
537 | content="Your request was also blocked by the content safety policy after a retry. "
538 | "Please try rephrasing your prompt significantly.",
539 | content_type="text",
540 | )
541 | else:
542 | # Retry failed for other reasons (e.g., another STOP)
543 | tool_output = ToolOutput(
544 | status="error",
545 | content="The model repeatedly returned empty responses. This may indicate content filtering or a model issue.",
546 | content_type="text",
547 | )
548 | except Exception as retry_error:
549 | logger.warning(f"Retry failed for {self.get_name()}: {retry_error}")
550 | tool_output = ToolOutput(
551 | status="error",
552 | content=f"Model returned empty response and retry failed: {str(retry_error)}",
553 | content_type="text",
554 | )
555 | else:
556 | # Non-STOP finish reasons are likely actual errors
557 | logger.warning(
558 | f"Response blocked or incomplete for {self.get_name()}. Finish reason: {finish_reason}"
559 | )
560 | tool_output = ToolOutput(
561 | status="error",
562 | content=f"Response blocked or incomplete. Finish reason: {finish_reason}",
563 | content_type="text",
564 | )
565 |
566 | # Return the tool output as TextContent, marking protocol errors appropriately
567 | payload = tool_output.model_dump_json()
568 | if tool_output.status == "error":
569 | logger.error("%s reported error status - raising ToolExecutionError", self.get_name())
570 | raise ToolExecutionError(payload)
571 | return [TextContent(type="text", text=payload)]
572 |
573 | except ToolExecutionError:
574 | raise
575 | except Exception as e:
576 | # Special handling for MCP size check errors
577 | if str(e).startswith("MCP_SIZE_CHECK:"):
578 | # Extract the JSON content after the prefix
579 | json_content = str(e)[len("MCP_SIZE_CHECK:") :]
580 | raise ToolExecutionError(json_content)
581 |
582 | logger.error(f"Error in {self.get_name()}: {str(e)}")
583 | error_output = ToolOutput(
584 | status="error",
585 | content=f"Error in {self.get_name()}: {str(e)}",
586 | content_type="text",
587 | )
588 | raise ToolExecutionError(error_output.model_dump_json()) from e
589 |
590 | def _parse_response(self, raw_text: str, request, model_info: Optional[dict] = None):
591 | """
592 | Parse the raw response and format it using the hook method.
593 |
594 | This simplified version focuses on the SimpleTool pattern: format the response
595 | using the format_response hook, then handle conversation continuation.
596 | """
597 | from tools.models import ToolOutput
598 |
599 | # Format the response using the hook method
600 | formatted_response = self.format_response(raw_text, request, model_info)
601 |
602 | # Handle conversation continuation like old base.py
603 | continuation_id = self.get_request_continuation_id(request)
604 | if continuation_id:
605 | self._record_assistant_turn(continuation_id, raw_text, request, model_info)
606 |
607 | # Create continuation offer like old base.py
608 | continuation_data = self._create_continuation_offer(request, model_info)
609 | if continuation_data:
610 | return self._create_continuation_offer_response(formatted_response, continuation_data, request, model_info)
611 | else:
612 | # Build metadata with model and provider info for success response
613 | metadata = {}
614 | if model_info:
615 | model_name = model_info.get("model_name")
616 | if model_name:
617 | metadata["model_used"] = model_name
618 | provider = model_info.get("provider")
619 | if provider:
620 | # Handle both provider objects and string values
621 | if isinstance(provider, str):
622 | metadata["provider_used"] = provider
623 | else:
624 | try:
625 | metadata["provider_used"] = provider.get_provider_type().value
626 | except AttributeError:
627 | # Fallback if provider doesn't have get_provider_type method
628 | metadata["provider_used"] = str(provider)
629 |
630 | return ToolOutput(
631 | status="success",
632 | content=formatted_response,
633 | content_type="text",
634 | metadata=metadata if metadata else None,
635 | )
636 |
637 | def _create_continuation_offer(self, request, model_info: Optional[dict] = None):
638 | """Create continuation offer following old base.py pattern"""
639 | continuation_id = self.get_request_continuation_id(request)
640 |
641 | try:
642 | from utils.conversation_memory import create_thread, get_thread
643 |
644 | if continuation_id:
645 | # Existing conversation
646 | thread_context = get_thread(continuation_id)
647 | if thread_context and thread_context.turns:
648 | turn_count = len(thread_context.turns)
649 | from utils.conversation_memory import MAX_CONVERSATION_TURNS
650 |
651 | if turn_count >= MAX_CONVERSATION_TURNS - 1:
652 | return None # No more turns allowed
653 |
654 | remaining_turns = MAX_CONVERSATION_TURNS - turn_count - 1
655 | return {
656 | "continuation_id": continuation_id,
657 | "remaining_turns": remaining_turns,
658 | "note": f"You can continue this conversation for {remaining_turns} more exchanges.",
659 | }
660 | else:
661 | # New conversation - create thread and offer continuation
662 | # Convert request to dict for initial_context
663 | initial_request_dict = self.get_request_as_dict(request)
664 |
665 | new_thread_id = create_thread(tool_name=self.get_name(), initial_request=initial_request_dict)
666 |
667 | # Add the initial user turn to the new thread
668 | from utils.conversation_memory import MAX_CONVERSATION_TURNS, add_turn
669 |
670 | user_prompt = self.get_request_prompt(request)
671 | user_files = self.get_request_files(request)
672 | user_images = self.get_request_images(request)
673 |
674 | # Add user's initial turn
675 | add_turn(
676 | new_thread_id, "user", user_prompt, files=user_files, images=user_images, tool_name=self.get_name()
677 | )
678 |
679 | return {
680 | "continuation_id": new_thread_id,
681 | "remaining_turns": MAX_CONVERSATION_TURNS - 1,
682 | "note": f"You can continue this conversation for {MAX_CONVERSATION_TURNS - 1} more exchanges.",
683 | }
684 | except Exception:
685 | return None
686 |
687 | def _create_continuation_offer_response(
688 | self, content: str, continuation_data: dict, request, model_info: Optional[dict] = None
689 | ):
690 | """Create response with continuation offer following old base.py pattern"""
691 | from tools.models import ContinuationOffer, ToolOutput
692 |
693 | try:
694 | if not self.get_request_continuation_id(request):
695 | self._record_assistant_turn(
696 | continuation_data["continuation_id"],
697 | content,
698 | request,
699 | model_info,
700 | )
701 |
702 | continuation_offer = ContinuationOffer(
703 | continuation_id=continuation_data["continuation_id"],
704 | note=continuation_data["note"],
705 | remaining_turns=continuation_data["remaining_turns"],
706 | )
707 |
708 | # Build metadata with model and provider info
709 | metadata = {"tool_name": self.get_name(), "conversation_ready": True}
710 | if model_info:
711 | model_name = model_info.get("model_name")
712 | if model_name:
713 | metadata["model_used"] = model_name
714 | provider = model_info.get("provider")
715 | if provider:
716 | # Handle both provider objects and string values
717 | if isinstance(provider, str):
718 | metadata["provider_used"] = provider
719 | else:
720 | try:
721 | metadata["provider_used"] = provider.get_provider_type().value
722 | except AttributeError:
723 | # Fallback if provider doesn't have get_provider_type method
724 | metadata["provider_used"] = str(provider)
725 |
726 | return ToolOutput(
727 | status="continuation_available",
728 | content=content,
729 | content_type="text",
730 | continuation_offer=continuation_offer,
731 | metadata=metadata,
732 | )
733 | except Exception:
734 | # Fallback to simple success if continuation offer fails
735 | return ToolOutput(status="success", content=content, content_type="text")
736 |
737 | def _record_assistant_turn(
738 | self, continuation_id: str, response_text: str, request, model_info: Optional[dict]
739 | ) -> None:
740 | """Persist an assistant response in conversation memory."""
741 |
742 | if not continuation_id:
743 | return
744 |
745 | from utils.conversation_memory import add_turn
746 |
747 | model_provider = None
748 | model_name = None
749 | model_metadata = None
750 |
751 | if model_info:
752 | provider = model_info.get("provider")
753 | if provider:
754 | if isinstance(provider, str):
755 | model_provider = provider
756 | else:
757 | try:
758 | model_provider = provider.get_provider_type().value
759 | except AttributeError:
760 | model_provider = str(provider)
761 | model_name = model_info.get("model_name")
762 | model_response = model_info.get("model_response")
763 | if model_response:
764 | model_metadata = {"usage": model_response.usage, "metadata": model_response.metadata}
765 |
766 | add_turn(
767 | continuation_id,
768 | "assistant",
769 | response_text,
770 | files=self.get_request_files(request),
771 | images=self.get_request_images(request),
772 | tool_name=self.get_name(),
773 | model_provider=model_provider,
774 | model_name=model_name,
775 | model_metadata=model_metadata,
776 | )
777 |
778 | # Convenience methods for common tool patterns
779 |
780 | def build_standard_prompt(
781 | self, system_prompt: str, user_content: str, request, file_context_title: str = "CONTEXT FILES"
782 | ) -> str:
783 | """
784 | Build a standard prompt with system prompt, user content, and optional files.
785 |
786 | This is a convenience method that handles the common pattern of:
787 | 1. Adding file content if present
788 | 2. Checking token limits
789 | 3. Adding web search instructions
790 | 4. Combining everything into a well-formatted prompt
791 |
792 | Args:
793 | system_prompt: The system prompt for the tool
794 | user_content: The main user request/content
795 | request: The validated request object
796 | file_context_title: Title for the file context section
797 |
798 | Returns:
799 | Complete formatted prompt ready for the AI model
800 | """
801 | # Check size limits against raw user input before enriching with internal context
802 | content_to_validate = self.get_prompt_content_for_size_validation(user_content)
803 | self._validate_token_limit(content_to_validate, "Content")
804 |
805 | # Add context files if provided (does not affect MCP boundary enforcement)
806 | files = self.get_request_files(request)
807 | if files:
808 | file_content, processed_files = self._prepare_file_content_for_prompt(
809 | files,
810 | self.get_request_continuation_id(request),
811 | "Context files",
812 | model_context=getattr(self, "_model_context", None),
813 | )
814 | self._actually_processed_files = processed_files
815 | if file_content:
816 | user_content = f"{user_content}\n\n=== {file_context_title} ===\n{file_content}\n=== END CONTEXT ===="
817 |
818 | # Add standardized web search guidance
819 | websearch_instruction = self.get_websearch_instruction(self.get_websearch_guidance())
820 |
821 | # Combine system prompt with user content
822 | full_prompt = f"""{system_prompt}{websearch_instruction}
823 |
824 | === USER REQUEST ===
825 | {user_content}
826 | === END REQUEST ===
827 |
828 | Please provide a thoughtful, comprehensive response:"""
829 |
830 | return full_prompt
831 |
832 | def get_prompt_content_for_size_validation(self, user_content: str) -> str:
833 | """
834 | Override to use original user prompt for size validation when conversation history is embedded.
835 |
836 | When server.py embeds conversation history into the prompt field, it also stores
837 | the original user prompt in _original_user_prompt. We use that for size validation
838 | to avoid incorrectly triggering size limits due to conversation history.
839 |
840 | Args:
841 | user_content: The user content (may include conversation history)
842 |
843 | Returns:
844 | The original user prompt if available, otherwise the full user content
845 | """
846 | # Check if we have the current arguments from execute() method
847 | current_args = getattr(self, "_current_arguments", None)
848 | if current_args:
849 | # If server.py embedded conversation history, it stores original prompt separately
850 | original_user_prompt = current_args.get("_original_user_prompt")
851 | if original_user_prompt is not None:
852 | # Use original user prompt for size validation (excludes conversation history)
853 | return original_user_prompt
854 |
855 | # Fallback to default behavior (validate full user content)
856 | return user_content
857 |
858 | def get_websearch_guidance(self) -> Optional[str]:
859 | """
860 | Return tool-specific web search guidance.
861 |
862 | Override this to provide tool-specific guidance for when web searches
863 | would be helpful. Return None to use the default guidance.
864 |
865 | Returns:
866 | Tool-specific web search guidance or None for default
867 | """
868 | return None
869 |
870 | def handle_prompt_file_with_fallback(self, request) -> str:
871 | """
872 | Handle prompt.txt files with fallback to request field.
873 |
874 | This is a convenience method for tools that accept prompts either
875 | as a field or as a prompt.txt file. It handles the extraction
876 | and validation automatically.
877 |
878 | Args:
879 | request: The validated request object
880 |
881 | Returns:
882 | The effective prompt content
883 |
884 | Raises:
885 | ValueError: If prompt is too large for MCP transport
886 | """
887 | # Check for prompt.txt in provided absolute file paths
888 | files = self.get_request_files(request)
889 | if files:
890 | prompt_content, updated_files = self.handle_prompt_file(files)
891 |
892 | # Update request files list if needed
893 | if updated_files is not None:
894 | self.set_request_files(request, updated_files)
895 | else:
896 | prompt_content = None
897 |
898 | # Use prompt.txt content if available, otherwise use the prompt field
899 | user_content = prompt_content if prompt_content else self.get_request_prompt(request)
900 |
901 | # Check user input size at MCP transport boundary (excluding conversation history)
902 | validation_content = self.get_prompt_content_for_size_validation(user_content)
903 | size_check = self.check_prompt_size(validation_content)
904 | if size_check:
905 | from tools.models import ToolOutput
906 |
907 | raise ValueError(f"MCP_SIZE_CHECK:{ToolOutput(**size_check).model_dump_json()}")
908 |
909 | return user_content
910 |
911 | def get_chat_style_websearch_guidance(self) -> str:
912 | """
913 | Get Chat tool-style web search guidance.
914 |
915 | Returns web search guidance that matches the original Chat tool pattern.
916 | This is useful for tools that want to maintain the same search behavior.
917 |
918 | Returns:
919 | Web search guidance text
920 | """
921 | return """When discussing topics, consider if searches for these would help:
922 | - Documentation for any technologies or concepts mentioned
923 | - Current best practices and patterns
924 | - Recent developments or updates
925 | - Community discussions and solutions"""
926 |
927 | def supports_custom_request_model(self) -> bool:
928 | """
929 | Indicate whether this tool supports custom request models.
930 |
931 | Simple tools support custom request models by default. Tools that override
932 | get_request_model() to return something other than ToolRequest should
933 | return True here.
934 |
935 | Returns:
936 | True if the tool uses a custom request model
937 | """
938 | return self.get_request_model() != ToolRequest
939 |
940 | def _validate_file_paths(self, request) -> Optional[str]:
941 | """
942 | Validate that all file paths in the request are absolute paths.
943 |
944 | This is a security measure to prevent path traversal attacks and ensure
945 | proper access control. All file paths must be absolute (starting with '/').
946 |
947 | Args:
948 | request: The validated request object
949 |
950 | Returns:
951 | Optional[str]: Error message if validation fails, None if all paths are valid
952 | """
953 | import os
954 |
955 | # Check if request has absolute file paths attribute (legacy tools may still provide 'files')
956 | files = self.get_request_files(request)
957 | if files:
958 | for file_path in files:
959 | if not os.path.isabs(file_path):
960 | return (
961 | f"Error: All file paths must be FULL absolute paths to real files / folders - DO NOT SHORTEN. "
962 | f"Received relative path: {file_path}\n"
963 | f"Please provide the full absolute path starting with '/' (must be FULL absolute paths to real files / folders - DO NOT SHORTEN)"
964 | )
965 |
966 | return None
967 |
968 | def prepare_chat_style_prompt(self, request, system_prompt: str = None) -> str:
969 | """
970 | Prepare a prompt using Chat tool-style patterns.
971 |
972 | This convenience method replicates the Chat tool's prompt preparation logic:
973 | 1. Handle prompt.txt file if present
974 | 2. Add file context with specific formatting
975 | 3. Add web search guidance
976 | 4. Format with system prompt
977 |
978 | Args:
979 | request: The validated request object
980 | system_prompt: System prompt to use (uses get_system_prompt() if None)
981 |
982 | Returns:
983 | Complete formatted prompt
984 | """
985 | # Use provided system prompt or get from tool
986 | if system_prompt is None:
987 | system_prompt = self.get_system_prompt()
988 |
989 | # Get user content (handles prompt.txt files)
990 | user_content = self.handle_prompt_file_with_fallback(request)
991 |
992 | # Build standard prompt with Chat-style web search guidance
993 | websearch_guidance = self.get_chat_style_websearch_guidance()
994 |
995 | # Override the websearch guidance temporarily
996 | original_guidance = self.get_websearch_guidance
997 | self.get_websearch_guidance = lambda: websearch_guidance
998 |
999 | try:
1000 | full_prompt = self.build_standard_prompt(system_prompt, user_content, request, "CONTEXT FILES")
1001 | finally:
1002 | # Restore original guidance method
1003 | self.get_websearch_guidance = original_guidance
1004 |
1005 | if system_prompt:
1006 | marker = "\n\n=== USER REQUEST ===\n"
1007 | if marker in full_prompt:
1008 | _, user_section = full_prompt.split(marker, 1)
1009 | return f"=== USER REQUEST ===\n{user_section}"
1010 |
1011 | return full_prompt
1012 |
```
--------------------------------------------------------------------------------
/simulator_tests/test_codereview_validation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | CodeReview Tool Validation Test
4 |
5 | Tests the codereview tool's capabilities using the new workflow architecture.
6 | This validates that the workflow-based code review provides step-by-step
7 | analysis with proper investigation guidance and expert analysis integration.
8 | """
9 |
10 | import json
11 | from typing import Optional
12 |
13 | from .conversation_base_test import ConversationBaseTest
14 |
15 |
16 | class CodeReviewValidationTest(ConversationBaseTest):
17 | """Test codereview tool with new workflow architecture"""
18 |
19 | @property
20 | def test_name(self) -> str:
21 | return "codereview_validation"
22 |
23 | @property
24 | def test_description(self) -> str:
25 | return "CodeReview tool validation with new workflow architecture"
26 |
27 | def run_test(self) -> bool:
28 | """Test codereview tool capabilities"""
29 | # Set up the test environment
30 | self.setUp()
31 |
32 | try:
33 | self.logger.info("Test: CodeReviewWorkflow tool validation (new architecture)")
34 |
35 | # Create test code with various issues for review
36 | self._create_test_code_for_review()
37 |
38 | # Test 1: Single review session with multiple steps
39 | if not self._test_single_review_session():
40 | return False
41 |
42 | # Test 2: Review flow that requires refocusing
43 | if not self._test_review_refocus_flow():
44 | return False
45 |
46 | # Test 3: Complete review with expert analysis
47 | if not self._test_complete_review_with_analysis():
48 | return False
49 |
50 | # Test 4: Certain confidence behavior
51 | if not self._test_certain_confidence():
52 | return False
53 |
54 | # Test 5: Context-aware file embedding
55 | if not self._test_context_aware_file_embedding():
56 | return False
57 |
58 | # Test 6: Multi-step file context optimization
59 | if not self._test_multi_step_file_context():
60 | return False
61 |
62 | self.logger.info(" ✅ All codereview validation tests passed")
63 | return True
64 |
65 | except Exception as e:
66 | self.logger.error(f"CodeReviewWorkflow validation test failed: {e}")
67 | return False
68 |
69 | def _create_test_code_for_review(self):
70 | """Create test files with various code quality issues for review"""
71 | # Create a payment processing module with multiple issues
72 | payment_code = """#!/usr/bin/env python3
73 | import hashlib
74 | import requests
75 | import json
76 | from datetime import datetime
77 |
78 | class PaymentProcessor:
79 | def __init__(self, api_key):
80 | self.api_key = api_key # Security issue: API key stored in plain text
81 | self.base_url = "https://payment-gateway.example.com"
82 | self.session = requests.Session()
83 | self.failed_payments = [] # Performance issue: unbounded list
84 |
85 | def process_payment(self, amount, card_number, cvv, user_id):
86 | \"\"\"Process a payment transaction\"\"\"
87 | # Security issue: No input validation
88 | # Performance issue: Inefficient nested loops
89 | for attempt in range(3):
90 | for retry in range(5):
91 | try:
92 | # Security issue: Logging sensitive data
93 | print(f"Processing payment: {card_number}, CVV: {cvv}")
94 |
95 | # Over-engineering: Complex hashing that's not needed
96 | payment_hash = self._generate_complex_hash(amount, card_number, cvv, user_id, datetime.now())
97 |
98 | # Security issue: Insecure HTTP request construction
99 | url = f"{self.base_url}/charge?amount={amount}&card={card_number}&api_key={self.api_key}"
100 |
101 | response = self.session.get(url) # Security issue: using GET for sensitive data
102 |
103 | if response.status_code == 200:
104 | return {"status": "success", "hash": payment_hash}
105 | else:
106 | # Code smell: Generic exception handling without specific error types
107 | self.failed_payments.append({"amount": amount, "timestamp": datetime.now()})
108 |
109 | except Exception as e:
110 | # Code smell: Bare except clause and poor error handling
111 | print(f"Payment failed: {e}")
112 | continue
113 |
114 | return {"status": "failed"}
115 |
116 | def _generate_complex_hash(self, amount, card_number, cvv, user_id, timestamp):
117 | \"\"\"Over-engineered hash generation with unnecessary complexity\"\"\"
118 | # Over-engineering: Overly complex for no clear benefit
119 | combined = f"{amount}-{card_number}-{cvv}-{user_id}-{timestamp}"
120 |
121 | # Security issue: Weak hashing algorithm
122 | hash1 = hashlib.md5(combined.encode()).hexdigest()
123 | hash2 = hashlib.sha1(hash1.encode()).hexdigest()
124 | hash3 = hashlib.md5(hash2.encode()).hexdigest()
125 |
126 | # Performance issue: Unnecessary string operations in loop
127 | result = ""
128 | for i in range(len(hash3)):
129 | for j in range(3): # Arbitrary nested loop
130 | result += hash3[i] if i % 2 == 0 else hash3[i].upper()
131 |
132 | return result[:32] # Arbitrary truncation
133 |
134 | def get_payment_history(self, user_id):
135 | \"\"\"Get payment history - has scalability issues\"\"\"
136 | # Performance issue: No pagination, could return massive datasets
137 | # Performance issue: Inefficient algorithm O(n²)
138 | all_payments = self._fetch_all_payments() # Could be millions of records
139 | user_payments = []
140 |
141 | for payment in all_payments:
142 | for field in payment: # Unnecessary nested iteration
143 | if field == "user_id" and payment[field] == user_id:
144 | user_payments.append(payment)
145 | break
146 |
147 | return user_payments
148 |
149 | def _fetch_all_payments(self):
150 | \"\"\"Simulated method that would fetch all payments\"\"\"
151 | # Maintainability issue: Hard-coded test data
152 | return [
153 | {"user_id": 1, "amount": 100, "status": "success"},
154 | {"user_id": 2, "amount": 200, "status": "failed"},
155 | {"user_id": 1, "amount": 150, "status": "success"},
156 | ]
157 | """
158 |
159 | # Create test file with multiple issues
160 | self.payment_file = self.create_additional_test_file("payment_processor.py", payment_code)
161 | self.logger.info(f" ✅ Created test file with code issues: {self.payment_file}")
162 |
163 | # Create configuration file with additional issues
164 | config_code = """#!/usr/bin/env python3
165 | import os
166 |
167 | # Security issue: Hardcoded secrets
168 | DATABASE_PASSWORD = "admin123"
169 | SECRET_KEY = "my-secret-key-12345"
170 |
171 | # Over-engineering: Unnecessarily complex configuration class
172 | class ConfigurationManager:
173 | def __init__(self):
174 | self.config_cache = {}
175 | self.config_hierarchy = {}
176 | self.config_validators = {}
177 | self.config_transformers = {}
178 | self.config_listeners = []
179 |
180 | def get_config(self, key, default=None):
181 | # Over-engineering: Complex caching for simple config lookup
182 | if key in self.config_cache:
183 | cached_value = self.config_cache[key]
184 | if self._validate_cached_value(cached_value):
185 | return self._transform_value(key, cached_value)
186 |
187 | # Code smell: Complex nested conditionals
188 | if key in self.config_hierarchy:
189 | hierarchy = self.config_hierarchy[key]
190 | for level in hierarchy:
191 | if level == "env":
192 | value = os.getenv(key.upper(), default)
193 | elif level == "file":
194 | value = self._read_from_file(key, default)
195 | elif level == "database":
196 | value = self._read_from_database(key, default)
197 | else:
198 | value = default
199 |
200 | if value is not None:
201 | self.config_cache[key] = value
202 | return self._transform_value(key, value)
203 |
204 | return default
205 |
206 | def _validate_cached_value(self, value):
207 | # Maintainability issue: Unclear validation logic
208 | if isinstance(value, str) and len(value) > 1000:
209 | return False
210 | return True
211 |
212 | def _transform_value(self, key, value):
213 | # Code smell: Unnecessary abstraction
214 | if key in self.config_transformers:
215 | transformer = self.config_transformers[key]
216 | return transformer(value)
217 | return value
218 |
219 | def _read_from_file(self, key, default):
220 | # Maintainability issue: No error handling for file operations
221 | with open(f"/etc/app/{key}.conf") as f:
222 | return f.read().strip()
223 |
224 | def _read_from_database(self, key, default):
225 | # Performance issue: Database query for every config read
226 | # No connection pooling or caching
227 | import sqlite3
228 | conn = sqlite3.connect("config.db")
229 | cursor = conn.cursor()
230 | cursor.execute("SELECT value FROM config WHERE key = ?", (key,))
231 | result = cursor.fetchone()
232 | conn.close()
233 | return result[0] if result else default
234 | """
235 |
236 | self.config_file = self.create_additional_test_file("config.py", config_code)
237 | self.logger.info(f" ✅ Created configuration file with issues: {self.config_file}")
238 |
239 | def _test_single_review_session(self) -> bool:
240 | """Test a complete code review session with multiple steps"""
241 | try:
242 | self.logger.info(" 1.1: Testing single code review session")
243 |
244 | # Step 1: Start review
245 | self.logger.info(" 1.1.1: Step 1 - Initial review")
246 | response1, continuation_id = self.call_mcp_tool(
247 | "codereview",
248 | {
249 | "step": "I need to perform a comprehensive code review of the payment processing module. Let me start by examining the code structure and identifying potential issues.",
250 | "step_number": 1,
251 | "total_steps": 4,
252 | "next_step_required": True,
253 | "findings": "Initial examination reveals a payment processing class with potential security and performance concerns.",
254 | "files_checked": [self.payment_file],
255 | "relevant_files": [self.payment_file],
256 | "absolute_file_paths": [self.payment_file], # Required for step 1
257 | "review_type": "full",
258 | "severity_filter": "all",
259 | },
260 | )
261 |
262 | if not response1 or not continuation_id:
263 | self.logger.error("Failed to get initial review response")
264 | return False
265 |
266 | # Parse and validate JSON response
267 | response1_data = self._parse_review_response(response1)
268 | if not response1_data:
269 | return False
270 |
271 | # Validate step 1 response structure - expect pause_for_code_review for next_step_required=True
272 | if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_code_review"):
273 | return False
274 |
275 | self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
276 |
277 | # Step 2: Detailed analysis
278 | self.logger.info(" 1.1.2: Step 2 - Detailed security analysis")
279 | response2, _ = self.call_mcp_tool(
280 | "codereview",
281 | {
282 | "step": "Now performing detailed security analysis of the payment processor code to identify vulnerabilities and code quality issues.",
283 | "step_number": 2,
284 | "total_steps": 4,
285 | "next_step_required": True,
286 | "findings": "Found multiple security issues: API key stored in plain text, sensitive data logging, insecure HTTP methods, and weak hashing algorithms.",
287 | "files_checked": [self.payment_file],
288 | "relevant_files": [self.payment_file],
289 | "relevant_context": ["PaymentProcessor.__init__", "PaymentProcessor.process_payment"],
290 | "issues_found": [
291 | {"severity": "critical", "description": "API key stored in plain text in memory"},
292 | {"severity": "critical", "description": "Credit card and CVV logged in plain text"},
293 | {"severity": "high", "description": "Using GET method for sensitive payment data"},
294 | {"severity": "medium", "description": "Weak MD5 hashing algorithm used"},
295 | ],
296 | "confidence": "high",
297 | "continuation_id": continuation_id,
298 | },
299 | )
300 |
301 | if not response2:
302 | self.logger.error("Failed to continue review to step 2")
303 | return False
304 |
305 | response2_data = self._parse_review_response(response2)
306 | if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_code_review"):
307 | return False
308 |
309 | # Check review status tracking
310 | review_status = response2_data.get("code_review_status", {})
311 | if review_status.get("files_checked", 0) < 1:
312 | self.logger.error("Files checked count not properly tracked")
313 | return False
314 |
315 | if review_status.get("relevant_context", 0) != 2:
316 | self.logger.error("Relevant context not properly tracked")
317 | return False
318 |
319 | # Check issues by severity
320 | issues_by_severity = review_status.get("issues_by_severity", {})
321 | if issues_by_severity.get("critical", 0) != 2:
322 | self.logger.error("Critical issues not properly tracked")
323 | return False
324 |
325 | if issues_by_severity.get("high", 0) != 1:
326 | self.logger.error("High severity issues not properly tracked")
327 | return False
328 |
329 | self.logger.info(" ✅ Step 2 successful with proper issue tracking")
330 |
331 | # Store continuation_id for next test
332 | self.review_continuation_id = continuation_id
333 | return True
334 |
335 | except Exception as e:
336 | self.logger.error(f"Single review session test failed: {e}")
337 | return False
338 |
339 | def _test_review_refocus_flow(self) -> bool:
340 | """Test code review flow that revises findings by refocusing"""
341 | try:
342 | self.logger.info(" 1.2: Testing code review refocus workflow")
343 |
344 | # Start a new review for testing refocus behaviour
345 | self.logger.info(" 1.2.1: Start review for refocus test")
346 | response1, continuation_id = self.call_mcp_tool(
347 | "codereview",
348 | {
349 | "step": "Reviewing configuration management code for best practices",
350 | "step_number": 1,
351 | "total_steps": 4,
352 | "next_step_required": True,
353 | "findings": "Initial analysis shows complex configuration class",
354 | "files_checked": [self.config_file],
355 | "relevant_files": [self.config_file],
356 | "absolute_file_paths": [self.config_file],
357 | "review_type": "full",
358 | },
359 | )
360 |
361 | if not response1 or not continuation_id:
362 | self.logger.error("Failed to start refocus test review")
363 | return False
364 |
365 | # Step 2: Initial direction
366 | self.logger.info(" 1.2.2: Step 2 - Initial analysis direction")
367 | response2, _ = self.call_mcp_tool(
368 | "codereview",
369 | {
370 | "step": "Focusing on configuration architecture patterns",
371 | "step_number": 2,
372 | "total_steps": 4,
373 | "next_step_required": True,
374 | "findings": "Architecture seems overly complex, but need to look more carefully at security issues",
375 | "files_checked": [self.config_file],
376 | "relevant_files": [self.config_file],
377 | "issues_found": [
378 | {"severity": "medium", "description": "Complex configuration hierarchy"},
379 | ],
380 | "confidence": "low",
381 | "continuation_id": continuation_id,
382 | },
383 | )
384 |
385 | if not response2:
386 | self.logger.error("Failed to continue to step 2")
387 | return False
388 |
389 | # Step 3: Shift focus based on new evidence
390 | self.logger.info(" 1.2.3: Step 3 - Refocus on security issues")
391 | response3, _ = self.call_mcp_tool(
392 | "codereview",
393 | {
394 | "step": "Refocusing - need to concentrate on the critical security issues I initially missed. Found hardcoded secrets and credentials in plain text.",
395 | "step_number": 3,
396 | "total_steps": 4,
397 | "next_step_required": True,
398 | "findings": "Found critical security vulnerabilities: hardcoded DATABASE_PASSWORD and SECRET_KEY in plain text",
399 | "files_checked": [self.config_file],
400 | "relevant_files": [self.config_file],
401 | "relevant_context": ["ConfigurationManager.__init__"],
402 | "issues_found": [
403 | {"severity": "critical", "description": "Hardcoded database password in source code"},
404 | {"severity": "critical", "description": "Hardcoded secret key in source code"},
405 | {"severity": "high", "description": "Over-engineered configuration system"},
406 | ],
407 | "confidence": "high",
408 | "continuation_id": continuation_id,
409 | },
410 | )
411 |
412 | if not response3:
413 | self.logger.error("Failed to refocus")
414 | return False
415 |
416 | response3_data = self._parse_review_response(response3)
417 | if not self._validate_step_response(response3_data, 3, 4, True, "pause_for_code_review"):
418 | return False
419 |
420 | self.logger.info(" ✅ Refocus flow working correctly")
421 | return True
422 |
423 | except Exception as e:
424 | self.logger.error(f"Refocus test failed: {e}")
425 | return False
426 |
427 | def _test_complete_review_with_analysis(self) -> bool:
428 | """Test complete code review ending with expert analysis"""
429 | try:
430 | self.logger.info(" 1.3: Testing complete review with expert analysis")
431 |
432 | # Use the continuation from first test
433 | continuation_id = getattr(self, "review_continuation_id", None)
434 | if not continuation_id:
435 | # Start fresh if no continuation available
436 | self.logger.info(" 1.3.0: Starting fresh review")
437 | response0, continuation_id = self.call_mcp_tool(
438 | "codereview",
439 | {
440 | "step": "Reviewing payment processor for security and quality issues",
441 | "step_number": 1,
442 | "total_steps": 2,
443 | "next_step_required": True,
444 | "findings": "Found multiple security and performance issues",
445 | "files_checked": [self.payment_file],
446 | "relevant_files": [self.payment_file],
447 | "absolute_file_paths": [self.payment_file],
448 | "relevant_context": ["PaymentProcessor.process_payment"],
449 | },
450 | )
451 | if not response0 or not continuation_id:
452 | self.logger.error("Failed to start fresh review")
453 | return False
454 |
455 | # Final step - trigger expert analysis
456 | self.logger.info(" 1.3.1: Final step - complete review")
457 | response_final, _ = self.call_mcp_tool(
458 | "codereview",
459 | {
460 | "step": "Code review complete. Identified comprehensive security, performance, and maintainability issues throughout the payment processing module.",
461 | "step_number": 2,
462 | "total_steps": 2,
463 | "next_step_required": False, # Final step - triggers expert analysis
464 | "findings": "Complete analysis reveals critical security vulnerabilities, performance bottlenecks, over-engineering patterns, and maintainability concerns. All issues documented with severity levels.",
465 | "files_checked": [self.payment_file],
466 | "relevant_files": [self.payment_file],
467 | "relevant_context": [
468 | "PaymentProcessor.process_payment",
469 | "PaymentProcessor._generate_complex_hash",
470 | "PaymentProcessor.get_payment_history",
471 | ],
472 | "issues_found": [
473 | {"severity": "critical", "description": "API key stored in plain text"},
474 | {"severity": "critical", "description": "Sensitive payment data logged"},
475 | {"severity": "high", "description": "SQL injection vulnerability potential"},
476 | {"severity": "medium", "description": "Over-engineered hash generation"},
477 | {"severity": "low", "description": "Poor error handling patterns"},
478 | ],
479 | "confidence": "high",
480 | "continuation_id": continuation_id,
481 | "model": "flash", # Use flash for expert analysis
482 | },
483 | )
484 |
485 | if not response_final:
486 | self.logger.error("Failed to complete review")
487 | return False
488 |
489 | response_final_data = self._parse_review_response(response_final)
490 | if not response_final_data:
491 | return False
492 |
493 | # Validate final response structure - expect calling_expert_analysis for next_step_required=False
494 | if response_final_data.get("status") != "calling_expert_analysis":
495 | self.logger.error(
496 | f"Expected status 'calling_expert_analysis', got '{response_final_data.get('status')}'"
497 | )
498 | return False
499 |
500 | if not response_final_data.get("code_review_complete"):
501 | self.logger.error("Expected code_review_complete=true for final step")
502 | return False
503 |
504 | # Check for expert analysis
505 | if "expert_analysis" not in response_final_data:
506 | self.logger.error("Missing expert_analysis in final response")
507 | return False
508 |
509 | expert_analysis = response_final_data.get("expert_analysis", {})
510 |
511 | # Check for expected analysis content (checking common patterns)
512 | analysis_text = json.dumps(expert_analysis, ensure_ascii=False).lower()
513 |
514 | # Look for code review identification
515 | review_indicators = ["security", "vulnerability", "performance", "critical", "api", "key"]
516 | found_indicators = sum(1 for indicator in review_indicators if indicator in analysis_text)
517 |
518 | if found_indicators >= 3:
519 | self.logger.info(" ✅ Expert analysis identified the issues correctly")
520 | else:
521 | self.logger.warning(
522 | f" ⚠️ Expert analysis may not have fully identified the issues (found {found_indicators}/6 indicators)"
523 | )
524 |
525 | # Check complete review summary
526 | if "complete_code_review" not in response_final_data:
527 | self.logger.error("Missing complete_code_review in final response")
528 | return False
529 |
530 | complete_review = response_final_data["complete_code_review"]
531 | if not complete_review.get("relevant_context"):
532 | self.logger.error("Missing relevant context in complete review")
533 | return False
534 |
535 | if "PaymentProcessor.process_payment" not in complete_review["relevant_context"]:
536 | self.logger.error("Expected method not found in review summary")
537 | return False
538 |
539 | self.logger.info(" ✅ Complete review with expert analysis successful")
540 | return True
541 |
542 | except Exception as e:
543 | self.logger.error(f"Complete review test failed: {e}")
544 | return False
545 |
546 | def _test_certain_confidence(self) -> bool:
547 | """Test certain confidence behavior - should skip expert analysis"""
548 | try:
549 | self.logger.info(" 1.4: Testing certain confidence behavior")
550 |
551 | # Test certain confidence - should skip expert analysis
552 | self.logger.info(" 1.4.1: Certain confidence review")
553 | response_certain, _ = self.call_mcp_tool(
554 | "codereview",
555 | {
556 | "step": "I have completed a thorough code review with 100% certainty of all issues identified.",
557 | "step_number": 1,
558 | "total_steps": 1,
559 | "next_step_required": False, # Final step
560 | "findings": "Complete review identified all critical security issues, performance problems, and code quality concerns. All issues are documented with clear severity levels and specific recommendations.",
561 | "files_checked": [self.payment_file],
562 | "relevant_files": [self.payment_file],
563 | "absolute_file_paths": [self.payment_file],
564 | "relevant_context": ["PaymentProcessor.process_payment"],
565 | "issues_found": [
566 | {"severity": "critical", "description": "Hardcoded API key security vulnerability"},
567 | {"severity": "high", "description": "Performance bottleneck in payment history"},
568 | ],
569 | "review_validation_type": "internal", # This should skip expert analysis
570 | "model": "flash",
571 | },
572 | )
573 |
574 | if not response_certain:
575 | self.logger.error("Failed to test certain confidence")
576 | return False
577 |
578 | response_certain_data = self._parse_review_response(response_certain)
579 | if not response_certain_data:
580 | return False
581 |
582 | # Validate certain confidence response - should skip expert analysis
583 | if response_certain_data.get("status") != "code_review_complete_ready_for_implementation":
584 | self.logger.error(
585 | f"Expected status 'code_review_complete_ready_for_implementation', got '{response_certain_data.get('status')}'"
586 | )
587 | return False
588 |
589 | if not response_certain_data.get("skip_expert_analysis"):
590 | self.logger.error("Expected skip_expert_analysis=true for certain confidence")
591 | return False
592 |
593 | expert_analysis = response_certain_data.get("expert_analysis", {})
594 | if expert_analysis.get("status") not in [
595 | "skipped_due_to_certain_review_confidence",
596 | "skipped_due_to_internal_analysis_type",
597 | ]:
598 | self.logger.error("Expert analysis should be skipped for certain confidence")
599 | return False
600 |
601 | self.logger.info(" ✅ Certain confidence behavior working correctly")
602 | return True
603 |
604 | except Exception as e:
605 | self.logger.error(f"Certain confidence test failed: {e}")
606 | return False
607 |
608 | def _test_context_aware_file_embedding(self) -> bool:
609 | """Test context-aware file embedding optimization"""
610 | try:
611 | self.logger.info(" 1.5: Testing context-aware file embedding")
612 |
613 | # Create multiple test files for context testing
614 | utils_content = """#!/usr/bin/env python3
615 | def calculate_discount(price, discount_percent):
616 | \"\"\"Calculate discount amount\"\"\"
617 | if discount_percent < 0 or discount_percent > 100:
618 | raise ValueError("Invalid discount percentage")
619 |
620 | return price * (discount_percent / 100)
621 |
622 | def format_currency(amount):
623 | \"\"\"Format amount as currency\"\"\"
624 | return f"${amount:.2f}"
625 | """
626 |
627 | validator_content = """#!/usr/bin/env python3
628 | import re
629 |
630 | def validate_email(email):
631 | \"\"\"Validate email format\"\"\"
632 | pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'
633 | return re.match(pattern, email) is not None
634 |
635 | def validate_credit_card(card_number):
636 | \"\"\"Basic credit card validation\"\"\"
637 | # Remove spaces and dashes
638 | card_number = re.sub(r'[\\s-]', '', card_number)
639 |
640 | # Check if all digits
641 | if not card_number.isdigit():
642 | return False
643 |
644 | # Basic length check
645 | return len(card_number) in [13, 14, 15, 16]
646 | """
647 |
648 | # Create test files
649 | utils_file = self.create_additional_test_file("utils.py", utils_content)
650 | validator_file = self.create_additional_test_file("validator.py", validator_content)
651 |
652 | # Test 1: New conversation, intermediate step - should only reference files
653 | self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
654 | response1, continuation_id = self.call_mcp_tool(
655 | "codereview",
656 | {
657 | "step": "Starting comprehensive code review of utility modules",
658 | "step_number": 1,
659 | "total_steps": 3,
660 | "next_step_required": True, # Intermediate step
661 | "findings": "Initial analysis of utility and validation functions",
662 | "files_checked": [utils_file, validator_file],
663 | "relevant_files": [utils_file], # This should be referenced, not embedded
664 | "absolute_file_paths": [utils_file, validator_file], # Required for step 1
665 | "relevant_context": ["calculate_discount"],
666 | "confidence": "low",
667 | "model": "flash",
668 | },
669 | )
670 |
671 | if not response1 or not continuation_id:
672 | self.logger.error("Failed to start context-aware file embedding test")
673 | return False
674 |
675 | response1_data = self._parse_review_response(response1)
676 | if not response1_data:
677 | return False
678 |
679 | # Check file context - should be reference_only for intermediate step
680 | file_context = response1_data.get("file_context", {})
681 | if file_context.get("type") != "reference_only":
682 | self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
683 | return False
684 |
685 | if "Files referenced but not embedded" not in file_context.get("context_optimization", ""):
686 | self.logger.error("Expected context optimization message for reference_only")
687 | return False
688 |
689 | self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
690 |
691 | # Test 2: Final step - should embed files for expert analysis
692 | self.logger.info(" 1.5.2: Final step (should embed files)")
693 | response3, _ = self.call_mcp_tool(
694 | "codereview",
695 | {
696 | "step": "Code review complete - identified all issues and recommendations",
697 | "step_number": 3,
698 | "total_steps": 3,
699 | "next_step_required": False, # Final step - should embed files
700 | "continuation_id": continuation_id,
701 | "findings": "Complete review: utility functions have proper error handling, validation functions are robust",
702 | "files_checked": [utils_file, validator_file],
703 | "relevant_files": [utils_file, validator_file], # Should be fully embedded
704 | "relevant_context": ["calculate_discount", "validate_email", "validate_credit_card"],
705 | "issues_found": [
706 | {"severity": "low", "description": "Could add more comprehensive email validation"},
707 | {"severity": "medium", "description": "Credit card validation logic could be more robust"},
708 | ],
709 | "confidence": "medium",
710 | "model": "flash",
711 | },
712 | )
713 |
714 | if not response3:
715 | self.logger.error("Failed to complete to final step")
716 | return False
717 |
718 | response3_data = self._parse_review_response(response3)
719 | if not response3_data:
720 | return False
721 |
722 | # Check file context - should be fully_embedded for final step
723 | file_context3 = response3_data.get("file_context", {})
724 | if file_context3.get("type") != "fully_embedded":
725 | self.logger.error(
726 | f"Expected fully_embedded file context for final step, got: {file_context3.get('type')}"
727 | )
728 | return False
729 |
730 | if "Full file content embedded for expert analysis" not in file_context3.get("context_optimization", ""):
731 | self.logger.error("Expected expert analysis optimization message for fully_embedded")
732 | return False
733 |
734 | self.logger.info(" ✅ Final step correctly uses fully_embedded file context")
735 |
736 | # Verify expert analysis was called for final step
737 | if response3_data.get("status") != "calling_expert_analysis":
738 | self.logger.error("Final step should trigger expert analysis")
739 | return False
740 |
741 | if "expert_analysis" not in response3_data:
742 | self.logger.error("Expert analysis should be present in final step")
743 | return False
744 |
745 | self.logger.info(" ✅ Context-aware file embedding test completed successfully")
746 | return True
747 |
748 | except Exception as e:
749 | self.logger.error(f"Context-aware file embedding test failed: {e}")
750 | return False
751 |
752 | def _test_multi_step_file_context(self) -> bool:
753 | """Test multi-step workflow with proper file context transitions"""
754 | try:
755 | self.logger.info(" 1.6: Testing multi-step file context optimization")
756 |
757 | # Use existing payment and config files for multi-step test
758 | files_to_review = [self.payment_file, self.config_file]
759 |
760 | # Step 1: Start review (new conversation)
761 | self.logger.info(" 1.6.1: Step 1 - Start comprehensive review")
762 | response1, continuation_id = self.call_mcp_tool(
763 | "codereview",
764 | {
765 | "step": "Starting comprehensive security and quality review of payment system components",
766 | "step_number": 1,
767 | "total_steps": 4,
768 | "next_step_required": True,
769 | "findings": "Initial review of payment processor and configuration management modules",
770 | "files_checked": files_to_review,
771 | "relevant_files": [self.payment_file],
772 | "absolute_file_paths": files_to_review,
773 | "relevant_context": [],
774 | "confidence": "low",
775 | "review_type": "security",
776 | "model": "flash",
777 | },
778 | )
779 |
780 | if not response1 or not continuation_id:
781 | self.logger.error("Failed to start multi-step file context test")
782 | return False
783 |
784 | response1_data = self._parse_review_response(response1)
785 |
786 | # Validate step 1 - should use reference_only
787 | file_context1 = response1_data.get("file_context", {})
788 | if file_context1.get("type") != "reference_only":
789 | self.logger.error("Step 1 should use reference_only file context")
790 | return False
791 |
792 | self.logger.info(" ✅ Step 1: reference_only file context")
793 |
794 | # Step 2: Security analysis
795 | self.logger.info(" 1.6.2: Step 2 - Security analysis")
796 | response2, _ = self.call_mcp_tool(
797 | "codereview",
798 | {
799 | "step": "Focusing on critical security vulnerabilities across both modules",
800 | "step_number": 2,
801 | "total_steps": 4,
802 | "next_step_required": True,
803 | "continuation_id": continuation_id,
804 | "findings": "Found critical security issues: hardcoded secrets in config, API key exposure in payment processor",
805 | "files_checked": files_to_review,
806 | "relevant_files": files_to_review,
807 | "relevant_context": ["PaymentProcessor.__init__", "ConfigurationManager"],
808 | "issues_found": [
809 | {"severity": "critical", "description": "Hardcoded database password"},
810 | {"severity": "critical", "description": "API key stored in plain text"},
811 | ],
812 | "confidence": "medium",
813 | "model": "flash",
814 | },
815 | )
816 |
817 | if not response2:
818 | self.logger.error("Failed to continue to step 2")
819 | return False
820 |
821 | response2_data = self._parse_review_response(response2)
822 |
823 | # Validate step 2 - should still use reference_only
824 | file_context2 = response2_data.get("file_context", {})
825 | if file_context2.get("type") != "reference_only":
826 | self.logger.error("Step 2 should use reference_only file context")
827 | return False
828 |
829 | self.logger.info(" ✅ Step 2: reference_only file context")
830 |
831 | # Step 3: Performance and architecture analysis
832 | self.logger.info(" 1.6.3: Step 3 - Performance and architecture analysis")
833 | response3, _ = self.call_mcp_tool(
834 | "codereview",
835 | {
836 | "step": "Analyzing performance bottlenecks and architectural concerns",
837 | "step_number": 3,
838 | "total_steps": 4,
839 | "next_step_required": True,
840 | "continuation_id": continuation_id,
841 | "findings": "Performance issues: unbounded lists, inefficient algorithms, over-engineered patterns",
842 | "files_checked": files_to_review,
843 | "relevant_files": files_to_review,
844 | "relevant_context": [
845 | "PaymentProcessor.get_payment_history",
846 | "PaymentProcessor._generate_complex_hash",
847 | ],
848 | "issues_found": [
849 | {"severity": "high", "description": "O(n²) algorithm in payment history"},
850 | {"severity": "medium", "description": "Over-engineered hash generation"},
851 | {"severity": "medium", "description": "Unbounded failed_payments list"},
852 | ],
853 | "confidence": "high",
854 | "model": "flash",
855 | },
856 | )
857 |
858 | if not response3:
859 | self.logger.error("Failed to continue to step 3")
860 | return False
861 |
862 | response3_data = self._parse_review_response(response3)
863 |
864 | # Validate step 3 - should still use reference_only
865 | file_context3 = response3_data.get("file_context", {})
866 | if file_context3.get("type") != "reference_only":
867 | self.logger.error("Step 3 should use reference_only file context")
868 | return False
869 |
870 | self.logger.info(" ✅ Step 3: reference_only file context")
871 |
872 | # Step 4: Final comprehensive analysis
873 | self.logger.info(" 1.6.4: Step 4 - Final comprehensive analysis")
874 | response4, _ = self.call_mcp_tool(
875 | "codereview",
876 | {
877 | "step": "Code review complete - comprehensive analysis of all security, performance, and quality issues",
878 | "step_number": 4,
879 | "total_steps": 4,
880 | "next_step_required": False, # Final step - should embed files
881 | "continuation_id": continuation_id,
882 | "findings": "Complete review: identified critical security vulnerabilities, performance bottlenecks, over-engineering patterns, and maintainability concerns across payment and configuration modules.",
883 | "files_checked": files_to_review,
884 | "relevant_files": files_to_review,
885 | "relevant_context": ["PaymentProcessor.process_payment", "ConfigurationManager.get_config"],
886 | "issues_found": [
887 | {"severity": "critical", "description": "Multiple hardcoded secrets"},
888 | {"severity": "high", "description": "Performance and security issues in payment processing"},
889 | {"severity": "medium", "description": "Over-engineered architecture patterns"},
890 | ],
891 | "confidence": "high",
892 | "model": "flash",
893 | },
894 | )
895 |
896 | if not response4:
897 | self.logger.error("Failed to complete to final step")
898 | return False
899 |
900 | response4_data = self._parse_review_response(response4)
901 |
902 | # Validate step 4 - should use fully_embedded for expert analysis
903 | file_context4 = response4_data.get("file_context", {})
904 | if file_context4.get("type") != "fully_embedded":
905 | self.logger.error("Step 4 (final) should use fully_embedded file context")
906 | return False
907 |
908 | if "expert analysis" not in file_context4.get("context_optimization", "").lower():
909 | self.logger.error("Final step should mention expert analysis in context optimization")
910 | return False
911 |
912 | # Verify expert analysis was triggered
913 | if response4_data.get("status") != "calling_expert_analysis":
914 | self.logger.error("Final step should trigger expert analysis")
915 | return False
916 |
917 | # Check that expert analysis has content
918 | expert_analysis = response4_data.get("expert_analysis", {})
919 | if not expert_analysis:
920 | self.logger.error("Expert analysis should be present in final step")
921 | return False
922 |
923 | self.logger.info(" ✅ Step 4: fully_embedded file context with expert analysis")
924 |
925 | # Validate the complete workflow progression
926 | progression_summary = {
927 | "step_1": "reference_only (new conversation, intermediate)",
928 | "step_2": "reference_only (continuation, intermediate)",
929 | "step_3": "reference_only (continuation, intermediate)",
930 | "step_4": "fully_embedded (continuation, final)",
931 | }
932 |
933 | self.logger.info(" 📋 File context progression:")
934 | for step, context_type in progression_summary.items():
935 | self.logger.info(f" {step}: {context_type}")
936 |
937 | self.logger.info(" ✅ Multi-step file context optimization test completed successfully")
938 | return True
939 |
940 | except Exception as e:
941 | self.logger.error(f"Multi-step file context test failed: {e}")
942 | return False
943 |
944 | def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
945 | """Call an MCP tool in-process - override for codereview-specific response handling"""
946 | # Use in-process implementation to maintain conversation memory
947 | response_text, _ = self.call_mcp_tool_direct(tool_name, params)
948 |
949 | if not response_text:
950 | return None, None
951 |
952 | # Extract continuation_id from codereview response specifically
953 | continuation_id = self._extract_review_continuation_id(response_text)
954 |
955 | return response_text, continuation_id
956 |
957 | def _extract_review_continuation_id(self, response_text: str) -> Optional[str]:
958 | """Extract continuation_id from codereview response"""
959 | try:
960 | # Parse the response
961 | response_data = json.loads(response_text)
962 | return response_data.get("continuation_id")
963 |
964 | except json.JSONDecodeError as e:
965 | self.logger.debug(f"Failed to parse response for review continuation_id: {e}")
966 | return None
967 |
968 | def _parse_review_response(self, response_text: str) -> dict:
969 | """Parse codereview tool JSON response"""
970 | try:
971 | # Parse the response - it should be direct JSON
972 | return json.loads(response_text)
973 |
974 | except json.JSONDecodeError as e:
975 | self.logger.error(f"Failed to parse review response as JSON: {e}")
976 | self.logger.error(f"Response text: {response_text[:500]}...")
977 | return {}
978 |
979 | def _validate_step_response(
980 | self,
981 | response_data: dict,
982 | expected_step: int,
983 | expected_total: int,
984 | expected_next_required: bool,
985 | expected_status: str,
986 | ) -> bool:
987 | """Validate a codereview step response structure"""
988 | try:
989 | # Check status
990 | if response_data.get("status") != expected_status:
991 | self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
992 | return False
993 |
994 | # Check step number
995 | if response_data.get("step_number") != expected_step:
996 | self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
997 | return False
998 |
999 | # Check total steps
1000 | if response_data.get("total_steps") != expected_total:
1001 | self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
1002 | return False
1003 |
1004 | # Check next_step_required
1005 | if response_data.get("next_step_required") != expected_next_required:
1006 | self.logger.error(
1007 | f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
1008 | )
1009 | return False
1010 |
1011 | # Check code_review_status exists
1012 | if "code_review_status" not in response_data:
1013 | self.logger.error("Missing code_review_status in response")
1014 | return False
1015 |
1016 | # Check next_steps guidance
1017 | if not response_data.get("next_steps"):
1018 | self.logger.error("Missing next_steps guidance in response")
1019 | return False
1020 |
1021 | return True
1022 |
1023 | except Exception as e:
1024 | self.logger.error(f"Error validating step response: {e}")
1025 | return False
1026 |
```