This is page 16 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/tools/testgen.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | TestGen Workflow tool - Step-by-step test generation with expert validation
3 |
4 | This tool provides a structured workflow for comprehensive test generation.
5 | It guides the CLI agent through systematic investigation steps with forced pauses between each step
6 | to ensure thorough code examination, test planning, and pattern identification before proceeding.
7 | The tool supports finding updates and expert analysis integration for comprehensive test suite generation.
8 |
9 | Key features:
10 | - Step-by-step test generation workflow with progress tracking
11 | - Context-aware file embedding (references during investigation, full content for analysis)
12 | - Automatic test pattern detection and framework identification
13 | - Expert analysis integration with external models for additional test suggestions
14 | - Support for edge case identification and comprehensive coverage
15 | - Confidence-based workflow optimization
16 | """
17 |
18 | import logging
19 | from typing import TYPE_CHECKING, Any, Optional
20 |
21 | from pydantic import Field, model_validator
22 |
23 | if TYPE_CHECKING:
24 | from tools.models import ToolModelCategory
25 |
26 | from config import TEMPERATURE_ANALYTICAL
27 | from systemprompts import TESTGEN_PROMPT
28 | from tools.shared.base_models import WorkflowRequest
29 |
30 | from .workflow.base import WorkflowTool
31 |
32 | logger = logging.getLogger(__name__)
33 |
34 | # Tool-specific field descriptions for test generation workflow
35 | TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS = {
36 | "step": (
37 | "Test plan for this step. Step 1: outline how you'll analyse structure, business logic, critical paths, and edge cases. Later steps: record findings and new scenarios as they emerge."
38 | ),
39 | "step_number": "Current test-generation step (starts at 1) — each step should build on prior work.",
40 | "total_steps": "Estimated number of steps needed for test planning; adjust as new scenarios appear.",
41 | "next_step_required": "True while more investigation or planning remains; set False when test planning is ready for expert validation.",
42 | "findings": "Summarise functionality, critical paths, edge cases, boundary conditions, error handling, and existing test patterns. Cover both happy and failure paths.",
43 | "files_checked": "Absolute paths of every file examined, including those ruled out.",
44 | "relevant_files": "Absolute paths of code that requires new or updated tests (implementation, dependencies, existing test fixtures).",
45 | "relevant_context": "Functions/methods needing coverage (e.g. 'Class.method', 'function_name'), with emphasis on critical paths and error-prone code.",
46 | "confidence": (
47 | "Indicate your current confidence in the test generation assessment. Use: 'exploring' (starting analysis), "
48 | "'low' (early investigation), 'medium' (some patterns identified), 'high' (strong understanding), "
49 | "'very_high' (very strong understanding), 'almost_certain' (nearly complete test plan), 'certain' "
50 | "(100% confidence - test plan is thoroughly complete and all test scenarios are identified with no need for external model validation). "
51 | "Do NOT use 'certain' unless the test generation analysis is comprehensively complete, use 'very_high' or 'almost_certain' instead if not 100% sure. "
52 | "Using 'certain' means you have complete confidence locally and prevents external model validation."
53 | ),
54 | "images": "Optional absolute paths to diagrams or visuals that clarify the system under test.",
55 | }
56 |
57 |
58 | class TestGenRequest(WorkflowRequest):
59 | """Request model for test generation workflow investigation steps"""
60 |
61 | # Required fields for each investigation step
62 | step: str = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["step"])
63 | step_number: int = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
64 | total_steps: int = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
65 | next_step_required: bool = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
66 |
67 | # Investigation tracking fields
68 | findings: str = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
69 | files_checked: list[str] = Field(
70 | default_factory=list, description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
71 | )
72 | relevant_files: list[str] = Field(
73 | default_factory=list, description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
74 | )
75 | relevant_context: list[str] = Field(
76 | default_factory=list, description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
77 | )
78 | confidence: Optional[str] = Field("low", description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["confidence"])
79 |
80 | # Optional images for visual context
81 | images: Optional[list[str]] = Field(default=None, description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["images"])
82 |
83 | # Override inherited fields to exclude them from schema (except model which needs to be available)
84 | temperature: Optional[float] = Field(default=None, exclude=True)
85 | thinking_mode: Optional[str] = Field(default=None, exclude=True)
86 |
87 | @model_validator(mode="after")
88 | def validate_step_one_requirements(self):
89 | """Ensure step 1 has required relevant_files field."""
90 | if self.step_number == 1 and not self.relevant_files:
91 | raise ValueError("Step 1 requires 'relevant_files' field to specify code files to generate tests for")
92 | return self
93 |
94 |
95 | class TestGenTool(WorkflowTool):
96 | """
97 | Test Generation workflow tool for step-by-step test planning and expert validation.
98 |
99 | This tool implements a structured test generation workflow that guides users through
100 | methodical investigation steps, ensuring thorough code examination, pattern identification,
101 | and test scenario planning before reaching conclusions. It supports complex testing scenarios
102 | including edge case identification, framework detection, and comprehensive coverage planning.
103 | """
104 |
105 | __test__ = False # Prevent pytest from collecting this class as a test
106 |
107 | def __init__(self):
108 | super().__init__()
109 | self.initial_request = None
110 |
111 | def get_name(self) -> str:
112 | return "testgen"
113 |
114 | def get_description(self) -> str:
115 | return (
116 | "Creates comprehensive test suites with edge case coverage for specific functions, classes, or modules. "
117 | "Analyzes code paths, identifies failure modes, and generates framework-specific tests. "
118 | "Be specific about scope - target particular components rather than testing everything."
119 | )
120 |
121 | def get_system_prompt(self) -> str:
122 | return TESTGEN_PROMPT
123 |
124 | def get_default_temperature(self) -> float:
125 | return TEMPERATURE_ANALYTICAL
126 |
127 | def get_model_category(self) -> "ToolModelCategory":
128 | """Test generation requires thorough analysis and reasoning"""
129 | from tools.models import ToolModelCategory
130 |
131 | return ToolModelCategory.EXTENDED_REASONING
132 |
133 | def get_workflow_request_model(self):
134 | """Return the test generation workflow-specific request model."""
135 | return TestGenRequest
136 |
137 | def get_input_schema(self) -> dict[str, Any]:
138 | """Generate input schema using WorkflowSchemaBuilder with test generation-specific overrides."""
139 | from .workflow.schema_builders import WorkflowSchemaBuilder
140 |
141 | # Test generation workflow-specific field overrides
142 | testgen_field_overrides = {
143 | "step": {
144 | "type": "string",
145 | "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["step"],
146 | },
147 | "step_number": {
148 | "type": "integer",
149 | "minimum": 1,
150 | "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
151 | },
152 | "total_steps": {
153 | "type": "integer",
154 | "minimum": 1,
155 | "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
156 | },
157 | "next_step_required": {
158 | "type": "boolean",
159 | "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
160 | },
161 | "findings": {
162 | "type": "string",
163 | "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
164 | },
165 | "files_checked": {
166 | "type": "array",
167 | "items": {"type": "string"},
168 | "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"],
169 | },
170 | "relevant_files": {
171 | "type": "array",
172 | "items": {"type": "string"},
173 | "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
174 | },
175 | "confidence": {
176 | "type": "string",
177 | "enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
178 | "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["confidence"],
179 | },
180 | "images": {
181 | "type": "array",
182 | "items": {"type": "string"},
183 | "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["images"],
184 | },
185 | }
186 |
187 | # Use WorkflowSchemaBuilder with test generation-specific tool fields
188 | return WorkflowSchemaBuilder.build_schema(
189 | tool_specific_fields=testgen_field_overrides,
190 | model_field_schema=self.get_model_field_schema(),
191 | auto_mode=self.is_effective_auto_mode(),
192 | tool_name=self.get_name(),
193 | )
194 |
195 | def get_required_actions(
196 | self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
197 | ) -> list[str]:
198 | """Define required actions for each investigation phase."""
199 | if step_number == 1:
200 | # Initial test generation investigation tasks
201 | return [
202 | "Read and understand the code files specified for test generation",
203 | "Analyze the overall structure, public APIs, and main functionality",
204 | "Identify critical business logic and complex algorithms that need testing",
205 | "Look for existing test patterns or examples if provided",
206 | "Understand dependencies, external interactions, and integration points",
207 | "Note any potential testability issues or areas that might be hard to test",
208 | ]
209 | elif confidence in ["exploring", "low"]:
210 | # Need deeper investigation
211 | return [
212 | "Examine specific functions and methods to understand their behavior",
213 | "Trace through code paths to identify all possible execution flows",
214 | "Identify edge cases, boundary conditions, and error scenarios",
215 | "Check for async operations, state management, and side effects",
216 | "Look for non-deterministic behavior or external dependencies",
217 | "Analyze error handling and exception cases that need testing",
218 | ]
219 | elif confidence in ["medium", "high"]:
220 | # Close to completion - need final verification
221 | return [
222 | "Verify all critical paths have been identified for testing",
223 | "Confirm edge cases and boundary conditions are comprehensive",
224 | "Check that test scenarios cover both success and failure cases",
225 | "Ensure async behavior and concurrency issues are addressed",
226 | "Validate that the testing strategy aligns with code complexity",
227 | "Double-check that findings include actionable test scenarios",
228 | ]
229 | else:
230 | # General investigation needed
231 | return [
232 | "Continue examining the codebase for additional test scenarios",
233 | "Gather more evidence about code behavior and dependencies",
234 | "Test your assumptions about how the code should be tested",
235 | "Look for patterns that confirm your testing strategy",
236 | "Focus on areas that haven't been thoroughly examined yet",
237 | ]
238 |
239 | def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
240 | """
241 | Decide when to call external model based on investigation completeness.
242 |
243 | Always call expert analysis for test generation to get additional test ideas.
244 | """
245 | # Check if user requested to skip assistant model
246 | if request and not self.get_request_use_assistant_model(request):
247 | return False
248 |
249 | # Always benefit from expert analysis for comprehensive test coverage
250 | return len(consolidated_findings.relevant_files) > 0 or len(consolidated_findings.findings) >= 1
251 |
252 | def prepare_expert_analysis_context(self, consolidated_findings) -> str:
253 | """Prepare context for external model call for test generation validation."""
254 | context_parts = [
255 | f"=== TEST GENERATION REQUEST ===\n{self.initial_request or 'Test generation workflow initiated'}\n=== END REQUEST ==="
256 | ]
257 |
258 | # Add investigation summary
259 | investigation_summary = self._build_test_generation_summary(consolidated_findings)
260 | context_parts.append(
261 | f"\n=== AGENT'S TEST PLANNING INVESTIGATION ===\n{investigation_summary}\n=== END INVESTIGATION ==="
262 | )
263 |
264 | # Add relevant code elements if available
265 | if consolidated_findings.relevant_context:
266 | methods_text = "\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
267 | context_parts.append(f"\n=== CODE ELEMENTS TO TEST ===\n{methods_text}\n=== END CODE ELEMENTS ===")
268 |
269 | # Add images if available
270 | if consolidated_findings.images:
271 | images_text = "\n".join(f"- {img}" for img in consolidated_findings.images)
272 | context_parts.append(f"\n=== VISUAL DOCUMENTATION ===\n{images_text}\n=== END VISUAL DOCUMENTATION ===")
273 |
274 | return "\n".join(context_parts)
275 |
276 | def _build_test_generation_summary(self, consolidated_findings) -> str:
277 | """Prepare a comprehensive summary of the test generation investigation."""
278 | summary_parts = [
279 | "=== SYSTEMATIC TEST GENERATION INVESTIGATION SUMMARY ===",
280 | f"Total steps: {len(consolidated_findings.findings)}",
281 | f"Files examined: {len(consolidated_findings.files_checked)}",
282 | f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
283 | f"Code elements to test: {len(consolidated_findings.relevant_context)}",
284 | "",
285 | "=== INVESTIGATION PROGRESSION ===",
286 | ]
287 |
288 | for finding in consolidated_findings.findings:
289 | summary_parts.append(finding)
290 |
291 | return "\\n".join(summary_parts)
292 |
293 | def should_include_files_in_expert_prompt(self) -> bool:
294 | """Include files in expert analysis for comprehensive test generation."""
295 | return True
296 |
297 | def should_embed_system_prompt(self) -> bool:
298 | """Embed system prompt in expert analysis for proper context."""
299 | return True
300 |
301 | def get_expert_thinking_mode(self) -> str:
302 | """Use high thinking mode for thorough test generation analysis."""
303 | return "high"
304 |
305 | def get_expert_analysis_instruction(self) -> str:
306 | """Get specific instruction for test generation expert analysis."""
307 | return (
308 | "Please provide comprehensive test generation guidance based on the investigation findings. "
309 | "Focus on identifying additional test scenarios, edge cases not yet covered, framework-specific "
310 | "best practices, and providing concrete test implementation examples following the multi-agent "
311 | "workflow specified in the system prompt."
312 | )
313 |
314 | # Hook method overrides for test generation-specific behavior
315 |
316 | def prepare_step_data(self, request) -> dict:
317 | """
318 | Map test generation-specific fields for internal processing.
319 | """
320 | step_data = {
321 | "step": request.step,
322 | "step_number": request.step_number,
323 | "findings": request.findings,
324 | "files_checked": request.files_checked,
325 | "relevant_files": request.relevant_files,
326 | "relevant_context": request.relevant_context,
327 | "confidence": request.confidence,
328 | "images": request.images or [],
329 | }
330 | return step_data
331 |
332 | def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
333 | """
334 | Test generation workflow skips expert analysis when the CLI agent has "certain" confidence.
335 | """
336 | return request.confidence == "certain" and not request.next_step_required
337 |
338 | def store_initial_issue(self, step_description: str):
339 | """Store initial request for expert analysis."""
340 | self.initial_request = step_description
341 |
342 | # Override inheritance hooks for test generation-specific behavior
343 |
344 | def get_completion_status(self) -> str:
345 | """Test generation tools use test-specific status."""
346 | return "test_generation_complete_ready_for_implementation"
347 |
348 | def get_completion_data_key(self) -> str:
349 | """Test generation uses 'complete_test_generation' key."""
350 | return "complete_test_generation"
351 |
352 | def get_final_analysis_from_request(self, request):
353 | """Test generation tools use findings for final analysis."""
354 | return request.findings
355 |
356 | def get_confidence_level(self, request) -> str:
357 | """Test generation tools use 'certain' for high confidence."""
358 | return "certain"
359 |
360 | def get_completion_message(self) -> str:
361 | """Test generation-specific completion message."""
362 | return (
363 | "Test generation analysis complete with CERTAIN confidence. You have identified all test scenarios "
364 | "and provided comprehensive coverage strategy. MANDATORY: Present the user with the complete test plan "
365 | "and IMMEDIATELY proceed with creating the test files following the identified patterns and framework. "
366 | "Focus on implementing concrete, runnable tests with proper assertions."
367 | )
368 |
369 | def get_skip_reason(self) -> str:
370 | """Test generation-specific skip reason."""
371 | return "Completed comprehensive test planning with full confidence locally"
372 |
373 | def get_skip_expert_analysis_status(self) -> str:
374 | """Test generation-specific expert analysis skip status."""
375 | return "skipped_due_to_certain_test_confidence"
376 |
377 | def prepare_work_summary(self) -> str:
378 | """Test generation-specific work summary."""
379 | return self._build_test_generation_summary(self.consolidated_findings)
380 |
381 | def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
382 | """
383 | Test generation-specific completion message.
384 | """
385 | base_message = (
386 | "TEST GENERATION ANALYSIS IS COMPLETE. You MUST now implement ALL identified test scenarios, "
387 | "creating comprehensive test files that cover happy paths, edge cases, error conditions, and "
388 | "boundary scenarios. Organize tests by functionality, use appropriate assertions, and follow "
389 | "the identified framework patterns. Provide concrete, executable test code—make it easy for "
390 | "a developer to run the tests and understand what each test validates."
391 | )
392 |
393 | # Add expert analysis guidance only when expert analysis was actually used
394 | if expert_analysis_used:
395 | expert_guidance = self.get_expert_analysis_guidance()
396 | if expert_guidance:
397 | return f"{base_message}\\n\\n{expert_guidance}"
398 |
399 | return base_message
400 |
401 | def get_expert_analysis_guidance(self) -> str:
402 | """
403 | Provide specific guidance for handling expert analysis in test generation.
404 | """
405 | return (
406 | "IMPORTANT: Additional test scenarios and edge cases have been provided by the expert analysis above. "
407 | "You MUST incorporate these suggestions into your test implementation, ensuring comprehensive coverage. "
408 | "Validate that the expert's test ideas are practical and align with the codebase structure. Combine "
409 | "your systematic investigation findings with the expert's additional scenarios to create a thorough "
410 | "test suite that catches real-world bugs before they reach production."
411 | )
412 |
413 | def get_step_guidance_message(self, request) -> str:
414 | """
415 | Test generation-specific step guidance with detailed investigation instructions.
416 | """
417 | step_guidance = self.get_test_generation_step_guidance(request.step_number, request.confidence, request)
418 | return step_guidance["next_steps"]
419 |
420 | def get_test_generation_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
421 | """
422 | Provide step-specific guidance for test generation workflow.
423 | """
424 | # Generate the next steps instruction based on required actions
425 | required_actions = self.get_required_actions(step_number, confidence, request.findings, request.total_steps)
426 |
427 | if step_number == 1:
428 | next_steps = (
429 | f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first analyze "
430 | f"the code thoroughly using appropriate tools. CRITICAL AWARENESS: You need to understand "
431 | f"the code structure, identify testable behaviors, find edge cases and boundary conditions, "
432 | f"and determine the appropriate testing strategy. Use file reading tools, code analysis, and "
433 | f"systematic examination to gather comprehensive information about what needs to be tested. "
434 | f"Only call {self.get_name()} again AFTER completing your investigation. When you call "
435 | f"{self.get_name()} next time, use step_number: {step_number + 1} and report specific "
436 | f"code paths examined, test scenarios identified, and testing patterns discovered."
437 | )
438 | elif confidence in ["exploring", "low"]:
439 | next_steps = (
440 | f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
441 | f"deeper analysis for test generation. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
442 | + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
443 | + f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
444 | + "completing these test planning tasks."
445 | )
446 | elif confidence in ["medium", "high"]:
447 | next_steps = (
448 | f"WAIT! Your test generation analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
449 | + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
450 | + f"\\n\\nREMEMBER: Ensure you have identified all test scenarios including edge cases and error conditions. "
451 | f"Document findings with specific test cases to implement, then call {self.get_name()} "
452 | f"with step_number: {step_number + 1}."
453 | )
454 | else:
455 | next_steps = (
456 | f"PAUSE ANALYSIS. Before calling {self.get_name()} step {step_number + 1}, you MUST examine more code thoroughly. "
457 | + "Required: "
458 | + ", ".join(required_actions[:2])
459 | + ". "
460 | + f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
461 | f"NEW test scenarios from actual code analysis, not just theories. NO recursive {self.get_name()} calls "
462 | f"without investigation work!"
463 | )
464 |
465 | return {"next_steps": next_steps}
466 |
467 | def customize_workflow_response(self, response_data: dict, request) -> dict:
468 | """
469 | Customize response to match test generation workflow format.
470 | """
471 | # Store initial request on first step
472 | if request.step_number == 1:
473 | self.initial_request = request.step
474 |
475 | # Convert generic status names to test generation-specific ones
476 | tool_name = self.get_name()
477 | status_mapping = {
478 | f"{tool_name}_in_progress": "test_generation_in_progress",
479 | f"pause_for_{tool_name}": "pause_for_test_analysis",
480 | f"{tool_name}_required": "test_analysis_required",
481 | f"{tool_name}_complete": "test_generation_complete",
482 | }
483 |
484 | if response_data["status"] in status_mapping:
485 | response_data["status"] = status_mapping[response_data["status"]]
486 |
487 | # Rename status field to match test generation workflow
488 | if f"{tool_name}_status" in response_data:
489 | response_data["test_generation_status"] = response_data.pop(f"{tool_name}_status")
490 | # Add test generation-specific status fields
491 | response_data["test_generation_status"]["test_scenarios_identified"] = len(
492 | self.consolidated_findings.relevant_context
493 | )
494 | response_data["test_generation_status"]["analysis_confidence"] = self.get_request_confidence(request)
495 |
496 | # Map complete_testgen to complete_test_generation
497 | if f"complete_{tool_name}" in response_data:
498 | response_data["complete_test_generation"] = response_data.pop(f"complete_{tool_name}")
499 |
500 | # Map the completion flag to match test generation workflow
501 | if f"{tool_name}_complete" in response_data:
502 | response_data["test_generation_complete"] = response_data.pop(f"{tool_name}_complete")
503 |
504 | return response_data
505 |
506 | # Required abstract methods from BaseTool
507 | def get_request_model(self):
508 | """Return the test generation workflow-specific request model."""
509 | return TestGenRequest
510 |
511 | async def prepare_prompt(self, request) -> str:
512 | """Not used - workflow tools use execute_workflow()."""
513 | return "" # Workflow tools use execute_workflow() directly
514 |
```
--------------------------------------------------------------------------------
/simulator_tests/test_secaudit_validation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | SECAUDIT Tool Validation Test
4 |
5 | Tests the secaudit tool's capabilities using the workflow architecture.
6 | This validates that the workflow-based security audit provides step-by-step
7 | analysis with proper investigation guidance and expert analysis integration.
8 | """
9 |
10 | import json
11 |
12 | from .conversation_base_test import ConversationBaseTest
13 |
14 |
15 | class SecauditValidationTest(ConversationBaseTest):
16 | """Test secaudit tool with workflow architecture"""
17 |
18 | @property
19 | def test_name(self) -> str:
20 | return "secaudit_validation"
21 |
22 | @property
23 | def test_description(self) -> str:
24 | return "SECAUDIT tool validation with security audit workflow architecture"
25 |
26 | def run_test(self) -> bool:
27 | """Test secaudit tool capabilities"""
28 | # Set up the test environment
29 | self.setUp()
30 |
31 | try:
32 | self.logger.info("Test: SECAUDIT tool validation (security workflow architecture)")
33 |
34 | # Create test code with various security vulnerabilities
35 | self._create_test_code_for_audit()
36 |
37 | # Test 1: Single audit session with multiple steps
38 | if not self._test_single_audit_session():
39 | return False
40 |
41 | # Test 2: Audit with specific focus areas
42 | if not self._test_focused_security_audit():
43 | return False
44 |
45 | # Test 3: Complete audit with expert analysis using fast model
46 | if not self._test_complete_audit_with_analysis():
47 | return False
48 |
49 | # Test 4: Certain confidence behavior
50 | if not self._test_certain_confidence():
51 | return False
52 |
53 | # Test 5: Continuation test with chat tool
54 | if not self._test_continuation_with_chat():
55 | return False
56 |
57 | # Test 6: Model selection control
58 | if not self._test_model_selection():
59 | return False
60 |
61 | self.logger.info(" ✅ All secaudit validation tests passed")
62 | return True
63 |
64 | except Exception as e:
65 | self.logger.error(f"SECAUDIT validation test failed: {e}")
66 | return False
67 |
68 | def _create_test_code_for_audit(self):
69 | """Create test files with various security vulnerabilities"""
70 | # Create an authentication module with multiple security issues
71 | auth_code = """#!/usr/bin/env python3
72 | import hashlib
73 | import pickle
74 | import sqlite3
75 | from flask import request, session
76 |
77 | class AuthenticationManager:
78 | def __init__(self, db_path="users.db"):
79 | # A01: Broken Access Control - No proper session management
80 | self.db_path = db_path
81 | self.sessions = {} # In-memory session storage
82 | def login(self, username, password):
83 | '''User login with various security vulnerabilities'''
84 | # A03: Injection - SQL injection vulnerability
85 | conn = sqlite3.connect(self.db_path)
86 | cursor = conn.cursor()
87 |
88 | # Direct string interpolation in SQL query
89 | query = f"SELECT id, password_hash FROM users WHERE username = '{username}'"
90 | cursor.execute(query)
91 |
92 | user = cursor.fetchone()
93 | if not user:
94 | return {"status": "failed", "message": "User not found"}
95 |
96 | # A02: Cryptographic Failures - Weak hashing algorithm
97 | password_hash = hashlib.md5(password.encode()).hexdigest()
98 |
99 | if user[1] == password_hash:
100 | # A07: Identification and Authentication Failures - Weak session generation
101 | session_id = hashlib.md5(f"{username}{password}".encode()).hexdigest()
102 | self.sessions[session_id] = {"user_id": user[0], "username": username}
103 |
104 | return {"status": "success", "session_id": session_id}
105 | else:
106 | return {"status": "failed", "message": "Invalid password"}
107 |
108 | def reset_password(self, email):
109 | '''Password reset with security issues'''
110 | # A04: Insecure Design - No rate limiting or validation
111 | reset_token = hashlib.md5(email.encode()).hexdigest()
112 |
113 | # A09: Security Logging and Monitoring Failures - No security event logging
114 | # Simply returns token without any verification or logging
115 | return {"reset_token": reset_token, "url": f"/reset?token={reset_token}"}
116 |
117 | def deserialize_user_data(self, data):
118 | '''Unsafe deserialization'''
119 | # A08: Software and Data Integrity Failures - Insecure deserialization
120 | return pickle.loads(data)
121 |
122 | def get_user_profile(self, user_id):
123 | '''Get user profile with authorization issues'''
124 | # A01: Broken Access Control - No authorization check
125 | conn = sqlite3.connect(self.db_path)
126 | cursor = conn.cursor()
127 |
128 | # Fetches any user profile without checking permissions
129 | cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
130 | return cursor.fetchone()
131 | """
132 |
133 | # Create authentication file
134 | self.auth_file = self.create_additional_test_file("auth_manager.py", auth_code)
135 | self.logger.info(f" ✅ Created authentication file with security issues: {self.auth_file}")
136 |
137 | # Create API endpoint with additional vulnerabilities
138 | api_code = """#!/usr/bin/env python3
139 | from flask import Flask, request, jsonify
140 | import os
141 | import subprocess
142 | import requests
143 |
144 | app = Flask(__name__)
145 |
146 | # A05: Security Misconfiguration - Debug mode enabled
147 | app.config['DEBUG'] = True
148 | app.config['SECRET_KEY'] = 'dev-secret-key' # Hardcoded secret
149 |
150 | @app.route('/api/search', methods=['GET'])
151 | def search():
152 | '''Search endpoint with multiple vulnerabilities'''
153 | # A03: Injection - XSS vulnerability, no input sanitization
154 | query = request.args.get('q', '')
155 |
156 | # A03: Injection - Command injection vulnerability
157 | if 'file:' in query:
158 | filename = query.split('file:')[1]
159 | # Direct command execution
160 | result = subprocess.run(f"cat {filename}", shell=True, capture_output=True, text=True)
161 | return jsonify({"result": result.stdout})
162 |
163 | # A10: Server-Side Request Forgery (SSRF)
164 | if query.startswith('http'):
165 | # No validation of URL, allows internal network access
166 | response = requests.get(query)
167 | return jsonify({"content": response.text})
168 |
169 | # Return search results without output encoding
170 | return f"<h1>Search Results for: {query}</h1>"
171 |
172 | @app.route('/api/admin', methods=['GET'])
173 | def admin_panel():
174 | '''Admin panel with broken access control'''
175 | # A01: Broken Access Control - No authentication check
176 | # Anyone can access admin functionality
177 | action = request.args.get('action')
178 |
179 | if action == 'delete_user':
180 | user_id = request.args.get('user_id')
181 | # Performs privileged action without authorization
182 | return jsonify({"status": "User deleted", "user_id": user_id})
183 |
184 | return jsonify({"status": "Admin panel"})
185 |
186 | @app.route('/api/upload', methods=['POST'])
187 | def upload_file():
188 | '''File upload with security issues'''
189 | # A05: Security Misconfiguration - No file type validation
190 | file = request.files.get('file')
191 | if file:
192 | # Saves any file type to server
193 | filename = file.filename
194 | file.save(os.path.join('/tmp', filename))
195 |
196 | # A03: Path traversal vulnerability
197 | return jsonify({"status": "File uploaded", "path": f"/tmp/{filename}"})
198 |
199 | return jsonify({"error": "No file provided"})
200 |
201 | # A06: Vulnerable and Outdated Components
202 | # Using old Flask version with known vulnerabilities (hypothetical)
203 | # requirements.txt: Flask==0.12.2 (known security issues)
204 |
205 | if __name__ == '__main__':
206 | # A05: Security Misconfiguration - Running on all interfaces
207 | app.run(host='0.0.0.0', port=5000, debug=True)
208 | """
209 |
210 | # Create API file
211 | self.api_file = self.create_additional_test_file("api_endpoints.py", api_code)
212 | self.logger.info(f" ✅ Created API file with security vulnerabilities: {self.api_file}")
213 |
214 | def _test_single_audit_session(self) -> bool:
215 | """Test a single security audit session with multiple steps"""
216 | self.logger.info(" 🔧 Testing single audit session...")
217 |
218 | try:
219 | # Step 1: Initial security audit request
220 | response, continuation_id = self.call_mcp_tool_direct(
221 | "secaudit",
222 | {
223 | "step": f"Begin security audit of authentication system in {self.auth_file}",
224 | "step_number": 1,
225 | "total_steps": 6,
226 | "next_step_required": True,
227 | "findings": "Starting security assessment",
228 | "relevant_files": [self.auth_file],
229 | "model": "gemini-2.0-flash-lite",
230 | },
231 | )
232 |
233 | if not response:
234 | self.logger.error("Failed to call secaudit tool")
235 | return False
236 |
237 | # Parse and validate the response
238 | try:
239 | response_data = json.loads(response) if response else {}
240 | except json.JSONDecodeError:
241 | response_data = {}
242 |
243 | # Check if it's asking for investigation
244 | status = response_data.get("status", "")
245 | if status != "pause_for_secaudit":
246 | self.logger.error(f"Expected pause_for_secaudit status, got: {status}")
247 | return False
248 |
249 | # Step 2: Continue with findings
250 | response2, _ = self.call_mcp_tool_direct(
251 | "secaudit",
252 | {
253 | "step": "Examined authentication module and found critical security vulnerabilities",
254 | "step_number": 2,
255 | "total_steps": 6,
256 | "next_step_required": True,
257 | "findings": (
258 | "Found multiple OWASP Top 10 vulnerabilities: "
259 | "1. SQL injection in login method (line 88) - direct string interpolation in query "
260 | "2. Weak MD5 hashing for passwords (line 96) - cryptographically broken "
261 | "3. Insecure session management (line 100) - predictable session IDs "
262 | "4. Unsafe deserialization (line 119) - pickle.loads without validation"
263 | ),
264 | "files_checked": [self.auth_file],
265 | "relevant_files": [self.auth_file],
266 | "relevant_context": ["AuthenticationManager.login", "AuthenticationManager.deserialize_user_data"],
267 | "issues_found": [
268 | {"severity": "critical", "description": "SQL injection vulnerability in login method"},
269 | {"severity": "high", "description": "Weak MD5 password hashing"},
270 | {"severity": "high", "description": "Insecure session management"},
271 | {"severity": "critical", "description": "Unsafe deserialization vulnerability"},
272 | ],
273 | "confidence": "medium",
274 | "continuation_id": continuation_id,
275 | "model": "gemini-2.0-flash-lite",
276 | },
277 | )
278 |
279 | if not response2:
280 | self.logger.error("Failed to continue to step 2")
281 | return False
282 |
283 | self.logger.info(" ✅ Single audit session test passed")
284 | return True
285 |
286 | except Exception as e:
287 | self.logger.error(f"Single audit session test failed: {e}")
288 | return False
289 |
290 | def _test_focused_security_audit(self) -> bool:
291 | """Test security audit with specific focus areas"""
292 | self.logger.info(" 🔧 Testing focused security audit...")
293 |
294 | try:
295 | # Request OWASP-focused audit
296 | response, continuation_id = self.call_mcp_tool_direct(
297 | "secaudit",
298 | {
299 | "step": f"Begin OWASP-focused security audit of {self.api_file}",
300 | "step_number": 1,
301 | "total_steps": 4,
302 | "next_step_required": True,
303 | "findings": "Starting OWASP Top 10 focused security assessment",
304 | "relevant_files": [self.api_file],
305 | "security_scope": "Web API endpoints",
306 | "threat_level": "high",
307 | "audit_focus": "owasp",
308 | "model": "gemini-2.0-flash-lite",
309 | },
310 | )
311 |
312 | if not response:
313 | self.logger.error("Failed to start OWASP-focused audit")
314 | return False
315 |
316 | # Verify the audit was configured correctly
317 | try:
318 | response_data = json.loads(response)
319 | # The tool should acknowledge the OWASP focus
320 | if response_data.get("status") == "pause_for_secaudit":
321 | self.logger.info(" ✅ Focused security audit test passed")
322 | return True
323 | except json.JSONDecodeError:
324 | pass
325 |
326 | self.logger.error("Expected proper OWASP-focused configuration")
327 | return False
328 |
329 | except Exception as e:
330 | self.logger.error(f"Focused security audit test failed: {e}")
331 | return False
332 |
333 | def _test_complete_audit_with_analysis(self) -> bool:
334 | """Test complete security audit with expert analysis"""
335 | self.logger.info(" 🔧 Testing complete audit with expert analysis...")
336 |
337 | try:
338 | # Step 1: Start fresh audit
339 | response1, continuation_id = self.call_mcp_tool_direct(
340 | "secaudit",
341 | {
342 | "step": f"Begin comprehensive security audit of {self.auth_file} and {self.api_file}",
343 | "step_number": 1,
344 | "total_steps": 3,
345 | "next_step_required": True,
346 | "findings": "Starting OWASP Top 10 security assessment of authentication and API modules",
347 | "relevant_files": [self.auth_file, self.api_file],
348 | "security_scope": "Web application with authentication and API endpoints",
349 | "model": "gemini-2.0-flash-lite",
350 | },
351 | )
352 |
353 | if not response1:
354 | self.logger.error("Failed to start comprehensive audit")
355 | return False
356 |
357 | # Step 2: Continue with detailed findings
358 | response2, _ = self.call_mcp_tool_direct(
359 | "secaudit",
360 | {
361 | "step": "Completed comprehensive security investigation of both modules",
362 | "step_number": 2,
363 | "total_steps": 3,
364 | "next_step_required": True,
365 | "findings": (
366 | "Found critical OWASP vulnerabilities across both modules: "
367 | "A01: Broken Access Control in admin panel, "
368 | "A03: SQL injection in login and command injection in search, "
369 | "A02: Weak cryptography with MD5 hashing, "
370 | "A05: Security misconfiguration with debug mode enabled, "
371 | "A07: Weak session management, "
372 | "A08: Insecure deserialization, "
373 | "A10: SSRF vulnerability in search endpoint"
374 | ),
375 | "files_checked": [self.auth_file, self.api_file],
376 | "relevant_files": [self.auth_file, self.api_file],
377 | "relevant_context": [
378 | "AuthenticationManager.login",
379 | "AuthenticationManager.deserialize_user_data",
380 | "api.search",
381 | "api.admin_panel",
382 | ],
383 | "issues_found": [
384 | {"severity": "critical", "description": "SQL injection in login method"},
385 | {"severity": "critical", "description": "Command injection in search endpoint"},
386 | {"severity": "critical", "description": "SSRF vulnerability allowing internal network access"},
387 | {"severity": "high", "description": "Broken access control on admin panel"},
388 | {"severity": "high", "description": "Insecure deserialization vulnerability"},
389 | {"severity": "high", "description": "XSS vulnerability in search results"},
390 | {"severity": "medium", "description": "Weak MD5 password hashing"},
391 | {"severity": "medium", "description": "Security misconfiguration - debug mode enabled"},
392 | ],
393 | "confidence": "high",
394 | "continuation_id": continuation_id,
395 | "model": "gemini-2.0-flash-lite",
396 | },
397 | )
398 |
399 | # Final step - skip expert analysis to avoid timeout
400 | response3, _ = self.call_mcp_tool_direct(
401 | "secaudit",
402 | {
403 | "step": "Complete security assessment with all vulnerabilities documented",
404 | "step_number": 3,
405 | "total_steps": 3,
406 | "next_step_required": False,
407 | "findings": "Security audit complete with 8 vulnerabilities identified across OWASP categories",
408 | "files_checked": [self.auth_file, self.api_file],
409 | "relevant_files": [self.auth_file, self.api_file],
410 | "confidence": "high", # High confidence to trigger expert analysis
411 | "continuation_id": continuation_id,
412 | "model": "gemini-2.0-flash-lite",
413 | },
414 | )
415 |
416 | if response3:
417 | # Check for expert analysis or completion status
418 | try:
419 | response_data = json.loads(response3)
420 | status = response_data.get("status", "")
421 | # Either expert analysis completed or security analysis complete
422 | if status in ["complete", "security_analysis_complete"]:
423 | self.logger.info(" ✅ Complete audit with expert analysis test passed")
424 | return True
425 | except json.JSONDecodeError:
426 | # If not JSON, check for security content (expert analysis output)
427 | if "security" in response3.lower() or "vulnerability" in response3.lower():
428 | self.logger.info(" ✅ Complete audit with expert analysis test passed")
429 | return True
430 |
431 | self.logger.error("Expected expert security analysis or completion")
432 | return False
433 |
434 | except Exception as e:
435 | self.logger.error(f"Complete audit with analysis test failed: {e}")
436 | return False
437 |
438 | def _test_certain_confidence(self) -> bool:
439 | """Test behavior when confidence is certain"""
440 | self.logger.info(" 🔧 Testing certain confidence behavior...")
441 |
442 | try:
443 | # Request with certain confidence
444 | response, _ = self.call_mcp_tool_direct(
445 | "secaudit",
446 | {
447 | "step": f"Security audit complete for {self.auth_file}",
448 | "step_number": 1,
449 | "total_steps": 1,
450 | "next_step_required": False,
451 | "findings": "Critical SQL injection vulnerability confirmed in login method",
452 | "files_checked": [self.auth_file],
453 | "relevant_files": [self.auth_file],
454 | "issues_found": [
455 | {"severity": "critical", "description": "SQL injection vulnerability in login method"}
456 | ],
457 | "confidence": "certain",
458 | "model": "gemini-2.0-flash-lite",
459 | },
460 | )
461 |
462 | if not response:
463 | self.logger.error("Failed to execute certain confidence test")
464 | return False
465 |
466 | try:
467 | response_data = json.loads(response)
468 | # With certain confidence, should complete without expert analysis
469 | if response_data.get("status") == "security_analysis_complete":
470 | self.logger.info(" ✅ Certain confidence correctly completes without expert analysis")
471 | return True
472 | except json.JSONDecodeError:
473 | pass
474 |
475 | # Check if findings are shown directly
476 | response_lower = response.lower()
477 | if "sql injection" in response_lower or "vulnerability" in response_lower:
478 | self.logger.info(" ✅ Certain confidence shows findings directly")
479 | return True
480 |
481 | self.logger.error("Expected completion or direct findings with certain confidence")
482 | return False
483 |
484 | except Exception as e:
485 | self.logger.error(f"Certain confidence test failed: {e}")
486 | return False
487 |
488 | def _test_continuation_with_chat(self) -> bool:
489 | """Test continuation functionality with chat tool"""
490 | self.logger.info(" 🔧 Testing continuation with chat tool...")
491 |
492 | try:
493 | # First, run a security audit that generates a continuation_id
494 | response1, continuation_id = self.call_mcp_tool_direct(
495 | "secaudit",
496 | {
497 | "step": f"Start analyzing {self.auth_file} for authentication vulnerabilities",
498 | "step_number": 1,
499 | "total_steps": 4,
500 | "next_step_required": True,
501 | "findings": "Beginning authentication security analysis",
502 | "relevant_files": [self.auth_file],
503 | "model": "gemini-2.0-flash-lite",
504 | },
505 | )
506 |
507 | if not response1:
508 | self.logger.error("Failed to start audit for continuation test")
509 | return False
510 |
511 | # Extract continuation_id if present
512 | if not continuation_id:
513 | self.logger.info(" ⚠️ No continuation_id returned, checking response")
514 | try:
515 | response_data = json.loads(response1)
516 | # Look for thread_id in metadata
517 | metadata = response_data.get("metadata", {})
518 | continuation_id = metadata.get("thread_id")
519 | except json.JSONDecodeError:
520 | pass
521 |
522 | if continuation_id:
523 | # Now test using chat tool with continuation
524 | chat_response, _ = self.call_mcp_tool_direct(
525 | "chat",
526 | {
527 | "prompt": "Can you tell me more about the SQL injection vulnerability details found in the security audit?",
528 | "continuation_id": continuation_id,
529 | "model": "gemini-2.0-flash-lite",
530 | },
531 | )
532 |
533 | if chat_response:
534 | self.logger.info(" ✅ Chat tool continuation test passed")
535 | return True
536 | else:
537 | # Without continuation_id, just verify the audit step worked
538 | if response1:
539 | self.logger.info(" ✅ Audit step completed (continuation test limited)")
540 | return True
541 |
542 | self.logger.error("Expected successful continuation or audit step")
543 | return False
544 |
545 | except Exception as e:
546 | self.logger.error(f"Continuation test failed: {e}")
547 | return False
548 |
549 | def _test_model_selection(self) -> bool:
550 | """Test model selection and skip expert analysis option"""
551 | self.logger.info(" 🔧 Testing model selection control...")
552 |
553 | try:
554 | # Test 1: Explicit model selection
555 | response1, _ = self.call_mcp_tool_direct(
556 | "secaudit",
557 | {
558 | "step": f"Analyze {self.api_file} for SSRF vulnerabilities",
559 | "step_number": 1,
560 | "total_steps": 2,
561 | "next_step_required": True,
562 | "findings": "Starting SSRF vulnerability analysis",
563 | "relevant_files": [self.api_file],
564 | "audit_focus": "owasp",
565 | "model": "gemini-2.0-flash-lite",
566 | },
567 | )
568 |
569 | if response1:
570 | self.logger.info(" ✅ Model selection recognized")
571 |
572 | # Test 2: Skip expert analysis
573 | response2, _ = self.call_mcp_tool_direct(
574 | "secaudit",
575 | {
576 | "step": f"Complete security investigation of {self.auth_file}",
577 | "step_number": 1,
578 | "total_steps": 1,
579 | "next_step_required": False,
580 | "findings": "Security issues documented",
581 | "files_checked": [self.auth_file],
582 | "relevant_files": [self.auth_file],
583 | "confidence": "high",
584 | "use_assistant_model": False, # Skip expert analysis
585 | "model": "gemini-2.0-flash-lite",
586 | },
587 | )
588 |
589 | if response2:
590 | try:
591 | response_data = json.loads(response2)
592 | # Should complete without expert analysis
593 | if response_data.get("status") == "security_analysis_complete":
594 | self.logger.info(" ✅ Skip expert analysis option works")
595 | return True
596 | except json.JSONDecodeError:
597 | pass
598 |
599 | # Or might just complete the analysis
600 | response_lower = response2.lower()
601 | if "complete" in response_lower or "security" in response_lower:
602 | self.logger.info(" ✅ Analysis performed without expert model")
603 | return True
604 |
605 | self.logger.error("Expected model selection or skip behavior")
606 | return False
607 |
608 | except Exception as e:
609 | self.logger.error(f"Model selection test failed: {e}")
610 | return False
611 |
```
--------------------------------------------------------------------------------
/tools/planner.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Interactive Sequential Planner - Break down complex tasks through step-by-step planning
3 |
4 | This tool enables structured planning through an interactive, step-by-step process that builds
5 | plans incrementally with the ability to revise, branch, and adapt as understanding deepens.
6 |
7 | The planner guides users through sequential thinking with forced pauses between steps to ensure
8 | thorough consideration of alternatives, dependencies, and strategic decisions before moving to
9 | tactical implementation details.
10 |
11 | Key features:
12 | - Sequential planning with full context awareness
13 | - Forced deep reflection for complex plans (≥5 steps) in early stages
14 | - Branching capabilities for exploring alternative approaches
15 | - Revision capabilities to update earlier decisions
16 | - Dynamic step count adjustment as plans evolve
17 | - Self-contained completion without external expert analysis
18 |
19 | Perfect for: complex project planning, system design with unknowns, migration strategies,
20 | architectural decisions, and breaking down large problems into manageable steps.
21 | """
22 |
23 | import logging
24 | from typing import TYPE_CHECKING, Any
25 |
26 | from pydantic import Field, field_validator
27 |
28 | if TYPE_CHECKING:
29 | from tools.models import ToolModelCategory
30 |
31 | from config import TEMPERATURE_BALANCED
32 | from systemprompts import PLANNER_PROMPT
33 | from tools.shared.base_models import WorkflowRequest
34 |
35 | from .workflow.base import WorkflowTool
36 |
37 | logger = logging.getLogger(__name__)
38 |
39 | # Tool-specific field descriptions matching original planner tool
40 | PLANNER_FIELD_DESCRIPTIONS = {
41 | "step": (
42 | "Planning content for this step. Step 1: describe the task, problem and scope. Later steps: capture updates, "
43 | "revisions, branches, or open questions that shape the plan."
44 | ),
45 | "step_number": "Current planning step number (starts at 1).",
46 | "total_steps": "Estimated number of planning steps; adjust as the plan evolves.",
47 | "next_step_required": "Set true when another planning step will follow after this one.",
48 | "is_step_revision": "Set true when you are replacing a previously recorded step.",
49 | "revises_step_number": "Step number being replaced when revising.",
50 | "is_branch_point": "True when this step creates a new branch to explore an alternative path.",
51 | "branch_from_step": "If branching, the step number that this branch starts from.",
52 | "branch_id": "Name for this branch (e.g. 'approach-A', 'migration-path').",
53 | "more_steps_needed": "True when you now expect to add additional steps beyond the prior estimate.",
54 | }
55 |
56 |
57 | class PlannerRequest(WorkflowRequest):
58 | """Request model for planner workflow tool matching original planner exactly"""
59 |
60 | # Required fields for each planning step
61 | step: str = Field(..., description=PLANNER_FIELD_DESCRIPTIONS["step"])
62 | step_number: int = Field(..., description=PLANNER_FIELD_DESCRIPTIONS["step_number"])
63 | total_steps: int = Field(..., description=PLANNER_FIELD_DESCRIPTIONS["total_steps"])
64 | next_step_required: bool = Field(..., description=PLANNER_FIELD_DESCRIPTIONS["next_step_required"])
65 |
66 | # Optional revision/branching fields (planning-specific)
67 | is_step_revision: bool | None = Field(False, description=PLANNER_FIELD_DESCRIPTIONS["is_step_revision"])
68 | revises_step_number: int | None = Field(None, description=PLANNER_FIELD_DESCRIPTIONS["revises_step_number"])
69 | is_branch_point: bool | None = Field(False, description=PLANNER_FIELD_DESCRIPTIONS["is_branch_point"])
70 | branch_from_step: int | None = Field(None, description=PLANNER_FIELD_DESCRIPTIONS["branch_from_step"])
71 | branch_id: str | None = Field(None, description=PLANNER_FIELD_DESCRIPTIONS["branch_id"])
72 | more_steps_needed: bool | None = Field(False, description=PLANNER_FIELD_DESCRIPTIONS["more_steps_needed"])
73 |
74 | # Exclude all investigation/analysis fields that aren't relevant to planning
75 | findings: str = Field(
76 | default="", exclude=True, description="Not used for planning - step content serves as findings"
77 | )
78 | files_checked: list[str] = Field(default_factory=list, exclude=True, description="Planning doesn't examine files")
79 | relevant_files: list[str] = Field(default_factory=list, exclude=True, description="Planning doesn't use files")
80 | relevant_context: list[str] = Field(
81 | default_factory=list, exclude=True, description="Planning doesn't track code context"
82 | )
83 | issues_found: list[dict] = Field(default_factory=list, exclude=True, description="Planning doesn't find issues")
84 | confidence: str = Field(default="planning", exclude=True, description="Planning uses different confidence model")
85 | hypothesis: str | None = Field(default=None, exclude=True, description="Planning doesn't use hypothesis")
86 |
87 | # Exclude other non-planning fields
88 | temperature: float | None = Field(default=None, exclude=True)
89 | thinking_mode: str | None = Field(default=None, exclude=True)
90 | use_assistant_model: bool | None = Field(default=False, exclude=True, description="Planning is self-contained")
91 | images: list | None = Field(default=None, exclude=True, description="Planning doesn't use images")
92 |
93 | @field_validator("step_number")
94 | @classmethod
95 | def validate_step_number(cls, v):
96 | if v < 1:
97 | raise ValueError("step_number must be at least 1")
98 | return v
99 |
100 | @field_validator("total_steps")
101 | @classmethod
102 | def validate_total_steps(cls, v):
103 | if v < 1:
104 | raise ValueError("total_steps must be at least 1")
105 | return v
106 |
107 |
108 | class PlannerTool(WorkflowTool):
109 | """
110 | Planner workflow tool for step-by-step planning using the workflow architecture.
111 |
112 | This tool provides the same planning capabilities as the original planner tool
113 | but uses the new workflow architecture for consistency with other workflow tools.
114 | It maintains all the original functionality including:
115 | - Sequential step-by-step planning
116 | - Branching and revision capabilities
117 | - Deep thinking pauses for complex plans
118 | - Conversation memory integration
119 | - Self-contained operation (no expert analysis)
120 | """
121 |
122 | def __init__(self):
123 | super().__init__()
124 | self.branches = {}
125 |
126 | def get_name(self) -> str:
127 | return "planner"
128 |
129 | def get_description(self) -> str:
130 | return (
131 | "Breaks down complex tasks through interactive, sequential planning with revision and branching capabilities. "
132 | "Use for complex project planning, system design, migration strategies, and architectural decisions. "
133 | "Builds plans incrementally with deep reflection for complex scenarios."
134 | )
135 |
136 | def get_system_prompt(self) -> str:
137 | return PLANNER_PROMPT
138 |
139 | def get_default_temperature(self) -> float:
140 | return TEMPERATURE_BALANCED
141 |
142 | def get_model_category(self) -> "ToolModelCategory":
143 | """Planner requires deep analysis and reasoning"""
144 | from tools.models import ToolModelCategory
145 |
146 | return ToolModelCategory.EXTENDED_REASONING
147 |
148 | def requires_model(self) -> bool:
149 | """
150 | Planner tool doesn't require model resolution at the MCP boundary.
151 |
152 | The planner is a pure data processing tool that organizes planning steps
153 | and provides structured guidance without calling external AI models.
154 |
155 | Returns:
156 | bool: False - planner doesn't need AI model access
157 | """
158 | return False
159 |
160 | def get_workflow_request_model(self):
161 | """Return the planner-specific request model."""
162 | return PlannerRequest
163 |
164 | def get_input_schema(self) -> dict[str, Any]:
165 | """Generate input schema for planner workflow using override pattern."""
166 | from .workflow.schema_builders import WorkflowSchemaBuilder
167 |
168 | # Planner tool-specific field definitions
169 | planner_field_overrides = {
170 | # Override standard workflow fields that need planning-specific descriptions
171 | "step": {
172 | "type": "string",
173 | "description": PLANNER_FIELD_DESCRIPTIONS["step"], # Very planning-specific instructions
174 | },
175 | # NEW planning-specific fields (not in base workflow)
176 | "is_step_revision": {
177 | "type": "boolean",
178 | "description": PLANNER_FIELD_DESCRIPTIONS["is_step_revision"],
179 | },
180 | "revises_step_number": {
181 | "type": "integer",
182 | "minimum": 1,
183 | "description": PLANNER_FIELD_DESCRIPTIONS["revises_step_number"],
184 | },
185 | "is_branch_point": {
186 | "type": "boolean",
187 | "description": PLANNER_FIELD_DESCRIPTIONS["is_branch_point"],
188 | },
189 | "branch_from_step": {
190 | "type": "integer",
191 | "minimum": 1,
192 | "description": PLANNER_FIELD_DESCRIPTIONS["branch_from_step"],
193 | },
194 | "branch_id": {
195 | "type": "string",
196 | "description": PLANNER_FIELD_DESCRIPTIONS["branch_id"],
197 | },
198 | "more_steps_needed": {
199 | "type": "boolean",
200 | "description": PLANNER_FIELD_DESCRIPTIONS["more_steps_needed"],
201 | },
202 | }
203 |
204 | # Define excluded fields for planner workflow
205 | excluded_workflow_fields = [
206 | "findings", # Planning uses step content instead
207 | "files_checked", # Planning doesn't examine files
208 | "relevant_files", # Planning doesn't use files
209 | "relevant_context", # Planning doesn't track code context
210 | "issues_found", # Planning doesn't find issues
211 | "confidence", # Planning uses different confidence model
212 | "hypothesis", # Planning doesn't use hypothesis
213 | ]
214 |
215 | excluded_common_fields = [
216 | "temperature", # Planning doesn't need temperature control
217 | "thinking_mode", # Planning doesn't need thinking mode
218 | "images", # Planning doesn't use images
219 | "absolute_file_paths", # Planning doesn't use file attachments
220 | ]
221 |
222 | # Build schema with proper field exclusion (following consensus pattern)
223 | return WorkflowSchemaBuilder.build_schema(
224 | tool_specific_fields=planner_field_overrides,
225 | required_fields=[], # No additional required fields beyond workflow defaults
226 | model_field_schema=self.get_model_field_schema(),
227 | auto_mode=self.is_effective_auto_mode(),
228 | tool_name=self.get_name(),
229 | excluded_workflow_fields=excluded_workflow_fields,
230 | excluded_common_fields=excluded_common_fields,
231 | )
232 |
233 | # ================================================================================
234 | # Abstract Methods - Required Implementation from BaseWorkflowMixin
235 | # ================================================================================
236 |
237 | def get_required_actions(
238 | self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
239 | ) -> list[str]:
240 | """Define required actions for each planning phase."""
241 | if step_number == 1:
242 | # Initial planning tasks
243 | return [
244 | "Think deeply about the complete scope and complexity of what needs to be planned",
245 | "Consider multiple approaches and their trade-offs",
246 | "Identify key constraints, dependencies, and potential challenges",
247 | "Think about stakeholders, success criteria, and critical requirements",
248 | ]
249 | elif step_number <= 3 and total_steps >= 5:
250 | # Complex plan early stages - force deep thinking
251 | if step_number == 2:
252 | return [
253 | "Evaluate the approach from step 1 - are there better alternatives?",
254 | "Break down the major phases and identify critical decision points",
255 | "Consider resource requirements and potential bottlenecks",
256 | "Think about how different parts interconnect and affect each other",
257 | ]
258 | else: # step_number == 3
259 | return [
260 | "Validate that the emerging plan addresses the original requirements",
261 | "Identify any gaps or assumptions that need clarification",
262 | "Consider how to validate progress and adjust course if needed",
263 | "Think about what the first concrete steps should be",
264 | ]
265 | else:
266 | # Later steps or simple plans
267 | return [
268 | "Continue developing the plan with concrete, actionable steps",
269 | "Consider implementation details and practical considerations",
270 | "Think about how to sequence and coordinate different activities",
271 | "Prepare for execution planning and resource allocation",
272 | ]
273 |
274 | def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
275 | """Planner is self-contained and doesn't need expert analysis."""
276 | return False
277 |
278 | def prepare_expert_analysis_context(self, consolidated_findings) -> str:
279 | """Planner doesn't use expert analysis."""
280 | return ""
281 |
282 | def requires_expert_analysis(self) -> bool:
283 | """Planner is self-contained like the original planner tool."""
284 | return False
285 |
286 | # ================================================================================
287 | # Workflow Customization - Match Original Planner Behavior
288 | # ================================================================================
289 |
290 | def prepare_step_data(self, request) -> dict:
291 | """
292 | Prepare step data from request with planner-specific fields.
293 | """
294 | step_data = {
295 | "step": request.step,
296 | "step_number": request.step_number,
297 | "findings": f"Planning step {request.step_number}: {request.step}", # Use step content as findings
298 | "files_checked": [], # Planner doesn't check files
299 | "relevant_files": [], # Planner doesn't use files
300 | "relevant_context": [], # Planner doesn't track context like debug
301 | "issues_found": [], # Planner doesn't track issues
302 | "confidence": "planning", # Planning confidence is different from investigation
303 | "hypothesis": None, # Planner doesn't use hypothesis
304 | "images": [], # Planner doesn't use images
305 | # Planner-specific fields
306 | "is_step_revision": request.is_step_revision or False,
307 | "revises_step_number": request.revises_step_number,
308 | "is_branch_point": request.is_branch_point or False,
309 | "branch_from_step": request.branch_from_step,
310 | "branch_id": request.branch_id,
311 | "more_steps_needed": request.more_steps_needed or False,
312 | }
313 | return step_data
314 |
315 | def build_base_response(self, request, continuation_id: str = None) -> dict:
316 | """
317 | Build the base response structure with planner-specific fields.
318 | """
319 | # Use work_history from workflow mixin for consistent step tracking
320 | # Add 1 to account for current step being processed
321 | current_step_count = len(self.work_history) + 1
322 |
323 | response_data = {
324 | "status": f"{self.get_name()}_in_progress",
325 | "step_number": request.step_number,
326 | "total_steps": request.total_steps,
327 | "next_step_required": request.next_step_required,
328 | "step_content": request.step,
329 | f"{self.get_name()}_status": {
330 | "files_checked": len(self.consolidated_findings.files_checked),
331 | "relevant_files": len(self.consolidated_findings.relevant_files),
332 | "relevant_context": len(self.consolidated_findings.relevant_context),
333 | "issues_found": len(self.consolidated_findings.issues_found),
334 | "images_collected": len(self.consolidated_findings.images),
335 | "current_confidence": self.get_request_confidence(request),
336 | "step_history_length": current_step_count, # Use work_history + current step
337 | },
338 | "metadata": {
339 | "branches": list(self.branches.keys()),
340 | "step_history_length": current_step_count, # Use work_history + current step
341 | "is_step_revision": request.is_step_revision or False,
342 | "revises_step_number": request.revises_step_number,
343 | "is_branch_point": request.is_branch_point or False,
344 | "branch_from_step": request.branch_from_step,
345 | "branch_id": request.branch_id,
346 | "more_steps_needed": request.more_steps_needed or False,
347 | },
348 | }
349 |
350 | if continuation_id:
351 | response_data["continuation_id"] = continuation_id
352 |
353 | return response_data
354 |
355 | def handle_work_continuation(self, response_data: dict, request) -> dict:
356 | """
357 | Handle work continuation with planner-specific deep thinking pauses.
358 | """
359 | response_data["status"] = f"pause_for_{self.get_name()}"
360 | response_data[f"{self.get_name()}_required"] = True
361 |
362 | # Get planner-specific required actions
363 | required_actions = self.get_required_actions(request.step_number, "planning", request.step, request.total_steps)
364 | response_data["required_actions"] = required_actions
365 |
366 | # Enhanced deep thinking pauses for complex plans
367 | if request.total_steps >= 5 and request.step_number <= 3:
368 | response_data["status"] = "pause_for_deep_thinking"
369 | response_data["thinking_required"] = True
370 | response_data["required_thinking"] = required_actions
371 |
372 | if request.step_number == 1:
373 | response_data["next_steps"] = (
374 | f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. This is a complex plan ({request.total_steps} steps) "
375 | f"that requires deep thinking. You MUST first spend time reflecting on the planning challenge:\n\n"
376 | f"REQUIRED DEEP THINKING before calling {self.get_name()} step {request.step_number + 1}:\n"
377 | f"1. Analyze the FULL SCOPE: What exactly needs to be accomplished?\n"
378 | f"2. Consider MULTIPLE APPROACHES: What are 2-3 different ways to tackle this?\n"
379 | f"3. Identify CONSTRAINTS & DEPENDENCIES: What limits our options?\n"
380 | f"4. Think about SUCCESS CRITERIA: How will we know we've succeeded?\n"
381 | f"5. Consider RISKS & MITIGATION: What could go wrong early vs late?\n\n"
382 | f"Only call {self.get_name()} again with step_number: {request.step_number + 1} AFTER this deep analysis."
383 | )
384 | elif request.step_number == 2:
385 | response_data["next_steps"] = (
386 | f"STOP! Complex planning requires reflection between steps. DO NOT call {self.get_name()} immediately.\n\n"
387 | f"MANDATORY REFLECTION before {self.get_name()} step {request.step_number + 1}:\n"
388 | f"1. EVALUATE YOUR APPROACH: Is the direction from step 1 still the best?\n"
389 | f"2. IDENTIFY MAJOR PHASES: What are the 3-5 main chunks of work?\n"
390 | f"3. SPOT DEPENDENCIES: What must happen before what?\n"
391 | f"4. CONSIDER RESOURCES: What skills, tools, or access do we need?\n"
392 | f"5. FIND CRITICAL PATHS: Where could delays hurt the most?\n\n"
393 | f"Think deeply about these aspects, then call {self.get_name()} with step_number: {request.step_number + 1}."
394 | )
395 | elif request.step_number == 3:
396 | response_data["next_steps"] = (
397 | f"PAUSE for final strategic reflection. DO NOT call {self.get_name()} yet.\n\n"
398 | f"FINAL DEEP THINKING before {self.get_name()} step {request.step_number + 1}:\n"
399 | f"1. VALIDATE COMPLETENESS: Does this plan address all original requirements?\n"
400 | f"2. CHECK FOR GAPS: What assumptions need validation? What's unclear?\n"
401 | f"3. PLAN FOR ADAPTATION: How will we know if we need to change course?\n"
402 | f"4. DEFINE FIRST STEPS: What are the first 2-3 concrete actions?\n"
403 | f"5. TRANSITION MINDSET: Ready to shift from strategic to tactical planning?\n\n"
404 | f"After this reflection, call {self.get_name()} with step_number: {request.step_number + 1} to continue with tactical details."
405 | )
406 | else:
407 | # Normal flow for simple plans or later steps
408 | remaining_steps = request.total_steps - request.step_number
409 | response_data["next_steps"] = (
410 | f"Continue with step {request.step_number + 1}. Approximately {remaining_steps} steps remaining."
411 | )
412 |
413 | return response_data
414 |
415 | def customize_workflow_response(self, response_data: dict, request) -> dict:
416 | """
417 | Customize response to match original planner tool format.
418 | """
419 | # No need to append to step_history since workflow mixin already manages work_history
420 | # and we calculate step counts from work_history
421 |
422 | # Handle branching like original planner
423 | if request.is_branch_point and request.branch_from_step and request.branch_id:
424 | if request.branch_id not in self.branches:
425 | self.branches[request.branch_id] = []
426 | step_data = self.prepare_step_data(request)
427 | self.branches[request.branch_id].append(step_data)
428 |
429 | # Ensure metadata exists and preserve existing metadata from build_base_response
430 | if "metadata" not in response_data:
431 | response_data["metadata"] = {}
432 |
433 | # Store planner-specific metadata that should persist through workflow metadata addition
434 | planner_metadata = {
435 | "branches": list(self.branches.keys()),
436 | "is_step_revision": request.is_step_revision or False,
437 | "revises_step_number": request.revises_step_number,
438 | "is_branch_point": request.is_branch_point or False,
439 | "branch_from_step": request.branch_from_step,
440 | "branch_id": request.branch_id,
441 | "more_steps_needed": request.more_steps_needed or False,
442 | }
443 |
444 | # Update metadata while preserving existing values
445 | response_data["metadata"].update(planner_metadata)
446 |
447 | # Add planner-specific output instructions for final steps
448 | if not request.next_step_required:
449 | response_data["planning_complete"] = True
450 | response_data["plan_summary"] = (
451 | f"COMPLETE PLAN: {request.step} (Total {request.total_steps} steps completed)"
452 | )
453 | response_data["output"] = {
454 | "instructions": "This is a structured planning response. Present the step_content as the main planning analysis. If next_step_required is true, continue with the next step. If planning_complete is true, present the complete plan in a well-structured format with clear sections, headings, numbered steps, and visual elements like ASCII charts for phases/dependencies. Use bullet points, sub-steps, sequences, and visual organization to make complex plans easy to understand and follow. IMPORTANT: Do NOT use emojis - use clear text formatting and ASCII characters only. Do NOT mention time estimates or costs unless explicitly requested.",
455 | "format": "step_by_step_planning",
456 | "presentation_guidelines": {
457 | "completed_plans": "Use clear headings, numbered phases, ASCII diagrams for workflows/dependencies, bullet points for sub-tasks, and visual sequences where helpful. No emojis. No time/cost estimates unless requested.",
458 | "step_content": "Present as main analysis with clear structure and actionable insights. No emojis. No time/cost estimates unless requested.",
459 | "continuation": "Use continuation_id for related planning sessions or implementation planning",
460 | },
461 | }
462 | response_data["next_steps"] = (
463 | "Planning complete. Present the complete plan to the user in a well-structured format with clear sections, "
464 | "numbered steps, visual elements (ASCII charts/diagrams where helpful), sub-step breakdowns, and implementation guidance. "
465 | "Use headings, bullet points, and visual organization to make the plan easy to follow. "
466 | "If there are phases, dependencies, or parallel tracks, show these relationships visually. "
467 | "IMPORTANT: Do NOT use emojis - use clear text formatting and ASCII characters only. "
468 | "Do NOT mention time estimates or costs unless explicitly requested. "
469 | "After presenting the plan, offer to either help implement specific parts or use the continuation_id to start related planning sessions."
470 | )
471 |
472 | # Convert generic status names to planner-specific ones
473 | tool_name = self.get_name()
474 | status_mapping = {
475 | f"{tool_name}_in_progress": "planning_in_progress",
476 | f"pause_for_{tool_name}": "pause_for_planning",
477 | f"{tool_name}_required": "planning_required",
478 | f"{tool_name}_complete": "planning_complete",
479 | }
480 |
481 | if response_data["status"] in status_mapping:
482 | response_data["status"] = status_mapping[response_data["status"]]
483 |
484 | return response_data
485 |
486 | # ================================================================================
487 | # Hook Method Overrides for Planner-Specific Behavior
488 | # ================================================================================
489 |
490 | def get_completion_status(self) -> str:
491 | """Planner uses planning-specific status."""
492 | return "planning_complete"
493 |
494 | def get_completion_data_key(self) -> str:
495 | """Planner uses 'complete_planning' key."""
496 | return "complete_planning"
497 |
498 | def get_completion_message(self) -> str:
499 | """Planner-specific completion message."""
500 | return (
501 | "Planning complete. Present the complete plan to the user in a well-structured format "
502 | "and offer to help implement specific parts or start related planning sessions."
503 | )
504 |
505 | def get_skip_reason(self) -> str:
506 | """Planner-specific skip reason."""
507 | return "Planner is self-contained and completes planning without external analysis"
508 |
509 | def get_skip_expert_analysis_status(self) -> str:
510 | """Planner-specific expert analysis skip status."""
511 | return "skipped_by_tool_design"
512 |
513 | def store_initial_issue(self, step_description: str):
514 | """Store initial planning description."""
515 | self.initial_planning_description = step_description
516 |
517 | def get_initial_request(self, fallback_step: str) -> str:
518 | """Get initial planning description."""
519 | try:
520 | return self.initial_planning_description
521 | except AttributeError:
522 | return fallback_step
523 |
524 | # Required abstract methods from BaseTool
525 | def get_request_model(self):
526 | """Return the planner-specific request model."""
527 | return PlannerRequest
528 |
529 | async def prepare_prompt(self, request) -> str:
530 | """Not used - workflow tools use execute_workflow()."""
531 | return "" # Workflow tools use execute_workflow() directly
532 |
```
--------------------------------------------------------------------------------
/tools/debug.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Debug tool - Systematic root cause analysis and debugging assistance
3 |
4 | This tool provides a structured workflow for investigating complex bugs and issues.
5 | It guides you through systematic investigation steps with forced pauses between each step
6 | to ensure thorough code examination before proceeding. The tool supports hypothesis evolution
7 | and expert analysis integration for comprehensive debugging.
8 |
9 | Key features:
10 | - Step-by-step investigation workflow with progress tracking
11 | - Context-aware file embedding (references during investigation, full content for analysis)
12 | - Automatic conversation threading and history preservation
13 | - Expert analysis integration with external models
14 | - Support for visual debugging with image context
15 | - Confidence-based workflow optimization
16 | """
17 |
18 | import logging
19 | from typing import TYPE_CHECKING, Any, Optional
20 |
21 | from pydantic import Field
22 |
23 | if TYPE_CHECKING:
24 | from tools.models import ToolModelCategory
25 |
26 | from config import TEMPERATURE_ANALYTICAL
27 | from systemprompts import DEBUG_ISSUE_PROMPT
28 | from tools.shared.base_models import WorkflowRequest
29 |
30 | from .workflow.base import WorkflowTool
31 |
32 | logger = logging.getLogger(__name__)
33 |
34 | # Tool-specific field descriptions matching original debug tool
35 | DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS = {
36 | "step": (
37 | "Investigation step. Step 1: State issue+direction. "
38 | "Symptoms misleading; 'no bug' valid. Trace dependencies, verify hypotheses. "
39 | "Use relevant_files for code; this for text only."
40 | ),
41 | "step_number": "Current step index (starts at 1). Build upon previous steps.",
42 | "total_steps": (
43 | "Estimated total steps needed to complete the investigation. Adjust as new findings emerge. "
44 | "IMPORTANT: When continuation_id is provided (continuing a previous conversation), set this to 1 as we're not starting a new multi-step investigation."
45 | ),
46 | "next_step_required": (
47 | "True if you plan to continue the investigation with another step. False means root cause is known or investigation is complete. "
48 | "IMPORTANT: When continuation_id is provided (continuing a previous conversation), set this to False to immediately proceed with expert analysis."
49 | ),
50 | "findings": (
51 | "Discoveries: clues, code/log evidence, disproven theories. Be specific. "
52 | "If no bug found, document clearly as valid."
53 | ),
54 | "files_checked": "All examined files (absolute paths), including ruled-out ones.",
55 | "relevant_files": "Files directly relevant to issue (absolute paths). Cause, trigger, or manifestation locations.",
56 | "relevant_context": "Methods/functions central to issue: 'Class.method' or 'function'. Focus on inputs/branching/state.",
57 | "hypothesis": (
58 | "Concrete root cause theory from evidence. Can revise. "
59 | "Valid: 'No bug found - user misunderstanding' or 'Symptoms unrelated to code' if supported."
60 | ),
61 | "confidence": (
62 | "Your confidence in the hypothesis: exploring (starting out), low (early idea), medium (some evidence), "
63 | "high (strong evidence), very_high (very strong evidence), almost_certain (nearly confirmed), "
64 | "certain (100% confidence - root cause and fix are both confirmed locally with no need for external validation). "
65 | "WARNING: Do NOT use 'certain' unless the issue can be fully resolved with a fix, use 'very_high' or 'almost_certain' instead when not 100% sure. "
66 | "Using 'certain' means you have ABSOLUTE confidence locally and PREVENTS external model validation."
67 | ),
68 | "images": "Optional screenshots/visuals clarifying issue (absolute paths).",
69 | }
70 |
71 |
72 | class DebugInvestigationRequest(WorkflowRequest):
73 | """Request model for debug investigation steps matching original debug tool exactly"""
74 |
75 | # Required fields for each investigation step
76 | step: str = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["step"])
77 | step_number: int = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["step_number"])
78 | total_steps: int = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["total_steps"])
79 | next_step_required: bool = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["next_step_required"])
80 |
81 | # Investigation tracking fields
82 | findings: str = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["findings"])
83 | files_checked: list[str] = Field(
84 | default_factory=list, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["files_checked"]
85 | )
86 | relevant_files: list[str] = Field(
87 | default_factory=list, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["relevant_files"]
88 | )
89 | relevant_context: list[str] = Field(
90 | default_factory=list, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["relevant_context"]
91 | )
92 | hypothesis: Optional[str] = Field(None, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["hypothesis"])
93 | confidence: Optional[str] = Field("low", description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["confidence"])
94 |
95 | # Optional images for visual debugging
96 | images: Optional[list[str]] = Field(default=None, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["images"])
97 |
98 | # Override inherited fields to exclude them from schema (except model which needs to be available)
99 | temperature: Optional[float] = Field(default=None, exclude=True)
100 | thinking_mode: Optional[str] = Field(default=None, exclude=True)
101 |
102 |
103 | class DebugIssueTool(WorkflowTool):
104 | """
105 | Debug tool for systematic root cause analysis and issue investigation.
106 |
107 | This tool implements a structured debugging workflow that guides users through
108 | methodical investigation steps, ensuring thorough code examination and evidence
109 | gathering before reaching conclusions. It supports complex debugging scenarios
110 | including race conditions, memory leaks, performance issues, and integration problems.
111 | """
112 |
113 | def __init__(self):
114 | super().__init__()
115 | self.initial_issue = None
116 |
117 | def get_name(self) -> str:
118 | return "debug"
119 |
120 | def get_description(self) -> str:
121 | return (
122 | "Performs systematic debugging and root cause analysis for any type of issue. "
123 | "Use for complex bugs, mysterious errors, performance issues, race conditions, memory leaks, and integration problems. "
124 | "Guides through structured investigation with hypothesis testing and expert analysis."
125 | )
126 |
127 | def get_system_prompt(self) -> str:
128 | return DEBUG_ISSUE_PROMPT
129 |
130 | def get_default_temperature(self) -> float:
131 | return TEMPERATURE_ANALYTICAL
132 |
133 | def get_model_category(self) -> "ToolModelCategory":
134 | """Debug requires deep analysis and reasoning"""
135 | from tools.models import ToolModelCategory
136 |
137 | return ToolModelCategory.EXTENDED_REASONING
138 |
139 | def get_workflow_request_model(self):
140 | """Return the debug-specific request model."""
141 | return DebugInvestigationRequest
142 |
143 | def get_input_schema(self) -> dict[str, Any]:
144 | """Generate input schema using WorkflowSchemaBuilder with debug-specific overrides."""
145 | from .workflow.schema_builders import WorkflowSchemaBuilder
146 |
147 | # Debug-specific field overrides
148 | debug_field_overrides = {
149 | "step": {
150 | "type": "string",
151 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["step"],
152 | },
153 | "step_number": {
154 | "type": "integer",
155 | "minimum": 1,
156 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["step_number"],
157 | },
158 | "total_steps": {
159 | "type": "integer",
160 | "minimum": 1,
161 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["total_steps"],
162 | },
163 | "next_step_required": {
164 | "type": "boolean",
165 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["next_step_required"],
166 | },
167 | "findings": {
168 | "type": "string",
169 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["findings"],
170 | },
171 | "files_checked": {
172 | "type": "array",
173 | "items": {"type": "string"},
174 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["files_checked"],
175 | },
176 | "relevant_files": {
177 | "type": "array",
178 | "items": {"type": "string"},
179 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["relevant_files"],
180 | },
181 | "confidence": {
182 | "type": "string",
183 | "enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
184 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["confidence"],
185 | },
186 | "hypothesis": {
187 | "type": "string",
188 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["hypothesis"],
189 | },
190 | "images": {
191 | "type": "array",
192 | "items": {"type": "string"},
193 | "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["images"],
194 | },
195 | }
196 |
197 | # Use WorkflowSchemaBuilder with debug-specific tool fields
198 | return WorkflowSchemaBuilder.build_schema(
199 | tool_specific_fields=debug_field_overrides,
200 | model_field_schema=self.get_model_field_schema(),
201 | auto_mode=self.is_effective_auto_mode(),
202 | tool_name=self.get_name(),
203 | )
204 |
205 | def get_required_actions(
206 | self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
207 | ) -> list[str]:
208 | """Define required actions for each investigation phase."""
209 | if step_number == 1:
210 | # Initial investigation tasks
211 | return [
212 | "Search for code related to the reported issue or symptoms",
213 | "Examine relevant files and understand the current implementation",
214 | "Understand the project structure and locate relevant modules",
215 | "Identify how the affected functionality is supposed to work",
216 | ]
217 | elif confidence in ["exploring", "low"]:
218 | # Need deeper investigation
219 | return [
220 | "Examine the specific files you've identified as relevant",
221 | "Trace method calls and data flow through the system",
222 | "Check for edge cases, boundary conditions, and assumptions in the code",
223 | "Look for related configuration, dependencies, or external factors",
224 | ]
225 | elif confidence in ["medium", "high", "very_high"]:
226 | # Close to root cause - need confirmation
227 | return [
228 | "Examine the exact code sections where you believe the issue occurs",
229 | "Trace the execution path that leads to the failure",
230 | "Verify your hypothesis with concrete code evidence",
231 | "Check for any similar patterns elsewhere in the codebase",
232 | ]
233 | elif confidence == "almost_certain":
234 | # Almost certain - final verification before conclusion
235 | return [
236 | "Finalize your root cause analysis with specific evidence",
237 | "Document the complete chain of causation from symptom to root cause",
238 | "Verify the minimal fix approach is correct",
239 | "Consider if expert analysis would provide additional insights",
240 | ]
241 | else:
242 | # General investigation needed
243 | return [
244 | "Continue examining the code paths identified in your hypothesis",
245 | "Gather more evidence using appropriate investigation tools",
246 | "Test edge cases and boundary conditions",
247 | "Look for patterns that confirm or refute your theory",
248 | ]
249 |
250 | def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
251 | """
252 | Decide when to call external model based on investigation completeness.
253 |
254 | Don't call expert analysis if the CLI agent has certain confidence - trust their judgment.
255 | """
256 | # Check if user requested to skip assistant model
257 | if request and not self.get_request_use_assistant_model(request):
258 | return False
259 |
260 | # Check if we have meaningful investigation data
261 | return (
262 | len(consolidated_findings.relevant_files) > 0
263 | or len(consolidated_findings.findings) >= 2
264 | or len(consolidated_findings.issues_found) > 0
265 | )
266 |
267 | def prepare_expert_analysis_context(self, consolidated_findings) -> str:
268 | """Prepare context for external model call matching original debug tool format."""
269 | context_parts = [
270 | f"=== ISSUE DESCRIPTION ===\n{self.initial_issue or 'Investigation initiated'}\n=== END DESCRIPTION ==="
271 | ]
272 |
273 | # Add special note if confidence is almost_certain
274 | if consolidated_findings.confidence == "almost_certain":
275 | context_parts.append(
276 | "\n=== IMPORTANT: ALMOST CERTAIN CONFIDENCE ===\n"
277 | "The agent has reached 'almost_certain' confidence but has NOT confirmed the bug with 100% certainty. "
278 | "Your role is to:\n"
279 | "1. Validate the agent's hypothesis and investigation\n"
280 | "2. Identify any missing evidence or overlooked aspects\n"
281 | "3. Provide additional insights that could confirm or refute the hypothesis\n"
282 | "4. Help finalize the root cause analysis with complete certainty\n"
283 | "=== END IMPORTANT ==="
284 | )
285 |
286 | # Add investigation summary
287 | investigation_summary = self._build_investigation_summary(consolidated_findings)
288 | context_parts.append(f"\n=== AGENT'S INVESTIGATION FINDINGS ===\n{investigation_summary}\n=== END FINDINGS ===")
289 |
290 | # Add error context if available
291 | error_context = self._extract_error_context(consolidated_findings)
292 | if error_context:
293 | context_parts.append(f"\n=== ERROR CONTEXT/STACK TRACE ===\n{error_context}\n=== END CONTEXT ===")
294 |
295 | # Add relevant methods/functions if available
296 | if consolidated_findings.relevant_context:
297 | methods_text = "\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
298 | context_parts.append(f"\n=== RELEVANT METHODS/FUNCTIONS ===\n{methods_text}\n=== END METHODS ===")
299 |
300 | # Add hypothesis evolution if available
301 | if consolidated_findings.hypotheses:
302 | hypotheses_text = "\n".join(
303 | f"Step {h['step']} ({h['confidence']} confidence): {h['hypothesis']}"
304 | for h in consolidated_findings.hypotheses
305 | )
306 | context_parts.append(f"\n=== HYPOTHESIS EVOLUTION ===\n{hypotheses_text}\n=== END HYPOTHESES ===")
307 |
308 | # Add images if available
309 | if consolidated_findings.images:
310 | images_text = "\n".join(f"- {img}" for img in consolidated_findings.images)
311 | context_parts.append(
312 | f"\n=== VISUAL DEBUGGING INFORMATION ===\n{images_text}\n=== END VISUAL INFORMATION ==="
313 | )
314 |
315 | # Add file content if we have relevant files
316 | if consolidated_findings.relevant_files:
317 | file_content, _ = self._prepare_file_content_for_prompt(
318 | list(consolidated_findings.relevant_files), None, "Essential debugging files"
319 | )
320 | if file_content:
321 | context_parts.append(
322 | f"\n=== ESSENTIAL FILES FOR DEBUGGING ===\n{file_content}\n=== END ESSENTIAL FILES ==="
323 | )
324 |
325 | return "\n".join(context_parts)
326 |
327 | def _build_investigation_summary(self, consolidated_findings) -> str:
328 | """Prepare a comprehensive summary of the investigation."""
329 | summary_parts = [
330 | "=== SYSTEMATIC INVESTIGATION SUMMARY ===",
331 | f"Total steps: {len(consolidated_findings.findings)}",
332 | f"Files examined: {len(consolidated_findings.files_checked)}",
333 | f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
334 | f"Methods/functions involved: {len(consolidated_findings.relevant_context)}",
335 | "",
336 | "=== INVESTIGATION PROGRESSION ===",
337 | ]
338 |
339 | for finding in consolidated_findings.findings:
340 | summary_parts.append(finding)
341 |
342 | return "\n".join(summary_parts)
343 |
344 | def _extract_error_context(self, consolidated_findings) -> Optional[str]:
345 | """Extract error context from investigation findings."""
346 | error_patterns = ["error", "exception", "stack trace", "traceback", "failure"]
347 | error_context_parts = []
348 |
349 | for finding in consolidated_findings.findings:
350 | if any(pattern in finding.lower() for pattern in error_patterns):
351 | error_context_parts.append(finding)
352 |
353 | return "\n".join(error_context_parts) if error_context_parts else None
354 |
355 | def get_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
356 | """
357 | Provide step-specific guidance matching original debug tool behavior.
358 |
359 | This method generates debug-specific guidance that's used by get_step_guidance_message().
360 | """
361 | # Generate the next steps instruction based on required actions
362 | required_actions = self.get_required_actions(step_number, confidence, request.findings, request.total_steps)
363 |
364 | if step_number == 1:
365 | next_steps = (
366 | f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first investigate "
367 | f"the codebase using appropriate tools. CRITICAL AWARENESS: The reported symptoms might be "
368 | f"caused by issues elsewhere in the code, not where symptoms appear. Also, after thorough "
369 | f"investigation, it's possible NO BUG EXISTS - the issue might be a misunderstanding or "
370 | f"user expectation mismatch. Search broadly, examine implementations, understand the logic flow. "
371 | f"Only call {self.get_name()} again AFTER gathering concrete evidence. When you call "
372 | f"{self.get_name()} next time, "
373 | f"use step_number: {step_number + 1} and report specific files examined and findings discovered."
374 | )
375 | elif confidence in ["exploring", "low"]:
376 | next_steps = (
377 | f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified potential areas "
378 | f"but need concrete evidence. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
379 | + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
380 | + f"\n\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
381 | + "completing these investigations."
382 | )
383 | elif confidence in ["medium", "high", "very_high"]:
384 | next_steps = (
385 | f"WAIT! Your hypothesis needs verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\n"
386 | + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
387 | + f"\n\nREMEMBER: If you cannot find concrete evidence of a bug causing the reported symptoms, "
388 | f"'no bug found' is a valid conclusion. Consider suggesting discussion with your thought partner "
389 | f"or engineering assistant for clarification. Document findings with specific file:line references, "
390 | f"then call {self.get_name()} with step_number: {step_number + 1}."
391 | )
392 | elif confidence == "almost_certain":
393 | next_steps = (
394 | "ALMOST CERTAIN - Prepare for final analysis. REQUIRED ACTIONS:\n"
395 | + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
396 | + "\n\nIMPORTANT: You're almost certain about the root cause. If you have NOT found the bug with "
397 | "100% certainty, consider setting next_step_required=false to invoke expert analysis. The expert "
398 | "can validate your hypotheses and provide additional insights. If you ARE 100% certain and have "
399 | "identified the exact bug and fix, proceed to confidence='certain'. Otherwise, let expert analysis "
400 | "help finalize the investigation."
401 | )
402 | else:
403 | next_steps = (
404 | f"PAUSE INVESTIGATION. Before calling {self.get_name()} step {step_number + 1}, you MUST examine code. "
405 | + "Required: "
406 | + ", ".join(required_actions[:2])
407 | + ". "
408 | + f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
409 | f"NEW evidence from actual code examination, not just theories. If no bug evidence "
410 | f"is found, suggesting "
411 | f"collaboration with thought partner is valuable. NO recursive {self.get_name()} calls "
412 | f"without investigation work!"
413 | )
414 |
415 | return {"next_steps": next_steps}
416 |
417 | # Hook method overrides for debug-specific behavior
418 |
419 | def prepare_step_data(self, request) -> dict:
420 | """
421 | Prepare debug-specific step data for processing.
422 | """
423 | step_data = {
424 | "step": request.step,
425 | "step_number": request.step_number,
426 | "findings": request.findings,
427 | "files_checked": request.files_checked,
428 | "relevant_files": request.relevant_files,
429 | "relevant_context": request.relevant_context,
430 | "issues_found": [], # Debug tool doesn't use issues_found field
431 | "confidence": request.confidence,
432 | "hypothesis": request.hypothesis,
433 | "images": request.images or [],
434 | }
435 | return step_data
436 |
437 | def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
438 | """
439 | Debug tool skips expert analysis when agent has "certain" confidence.
440 | """
441 | return request.confidence == "certain" and not request.next_step_required
442 |
443 | # Override inheritance hooks for debug-specific behavior
444 |
445 | def get_completion_status(self) -> str:
446 | """Debug tools use debug-specific status."""
447 | return "certain_confidence_proceed_with_fix"
448 |
449 | def get_completion_data_key(self) -> str:
450 | """Debug uses 'complete_investigation' key."""
451 | return "complete_investigation"
452 |
453 | def get_final_analysis_from_request(self, request):
454 | """Debug tools use 'hypothesis' field."""
455 | return request.hypothesis
456 |
457 | def get_confidence_level(self, request) -> str:
458 | """Debug tools use 'certain' for high confidence."""
459 | return "certain"
460 |
461 | def get_completion_message(self) -> str:
462 | """Debug-specific completion message."""
463 | return (
464 | "Investigation complete with CERTAIN confidence. You have identified the exact "
465 | "root cause and a minimal fix. MANDATORY: Present the user with the root cause analysis "
466 | "and IMMEDIATELY proceed with implementing the simple fix without requiring further "
467 | "consultation. Focus on the precise, minimal change needed."
468 | )
469 |
470 | def get_skip_reason(self) -> str:
471 | """Debug-specific skip reason."""
472 | return "Identified exact root cause with minimal fix requirement locally"
473 |
474 | def get_request_relevant_context(self, request) -> list:
475 | """Get relevant_context for debug tool."""
476 | try:
477 | return request.relevant_context or []
478 | except AttributeError:
479 | return []
480 |
481 | def get_skip_expert_analysis_status(self) -> str:
482 | """Debug-specific expert analysis skip status."""
483 | return "skipped_due_to_certain_confidence"
484 |
485 | def prepare_work_summary(self) -> str:
486 | """Debug-specific work summary."""
487 | return self._build_investigation_summary(self.consolidated_findings)
488 |
489 | def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
490 | """
491 | Debug-specific completion message.
492 |
493 | Args:
494 | expert_analysis_used: True if expert analysis was successfully executed
495 | """
496 | base_message = (
497 | "INVESTIGATION IS COMPLETE. YOU MUST now summarize and present ALL key findings, confirmed "
498 | "hypotheses, and exact recommended fixes. Clearly identify the most likely root cause and "
499 | "provide concrete, actionable implementation guidance. Highlight affected code paths and display "
500 | "reasoning that led to this conclusion—make it easy for a developer to understand exactly where "
501 | "the problem lies. Where necessary, show cause-and-effect / bug-trace call graph."
502 | )
503 |
504 | # Add expert analysis guidance only when expert analysis was actually used
505 | if expert_analysis_used:
506 | expert_guidance = self.get_expert_analysis_guidance()
507 | if expert_guidance:
508 | return f"{base_message}\n\n{expert_guidance}"
509 |
510 | return base_message
511 |
512 | def get_expert_analysis_guidance(self) -> str:
513 | """
514 | Get additional guidance for handling expert analysis results in debug context.
515 |
516 | Returns:
517 | Additional guidance text for validating and using expert analysis findings
518 | """
519 | return (
520 | "IMPORTANT: Expert debugging analysis has been provided above. You MUST validate "
521 | "the expert's root cause analysis and proposed fixes against your own investigation. "
522 | "Ensure the expert's findings align with the evidence you've gathered and that the "
523 | "recommended solutions address the actual problem, not just symptoms. If the expert "
524 | "suggests a different root cause than you identified, carefully consider both perspectives "
525 | "and present a balanced assessment to the user."
526 | )
527 |
528 | def get_step_guidance_message(self, request) -> str:
529 | """
530 | Debug-specific step guidance with detailed investigation instructions.
531 | """
532 | step_guidance = self.get_step_guidance(request.step_number, request.confidence, request)
533 | return step_guidance["next_steps"]
534 |
535 | def customize_workflow_response(self, response_data: dict, request) -> dict:
536 | """
537 | Customize response to match original debug tool format.
538 | """
539 | # Store initial issue on first step
540 | if request.step_number == 1:
541 | self.initial_issue = request.step
542 |
543 | # Convert generic status names to debug-specific ones
544 | tool_name = self.get_name()
545 | status_mapping = {
546 | f"{tool_name}_in_progress": "investigation_in_progress",
547 | f"pause_for_{tool_name}": "pause_for_investigation",
548 | f"{tool_name}_required": "investigation_required",
549 | f"{tool_name}_complete": "investigation_complete",
550 | }
551 |
552 | if response_data["status"] in status_mapping:
553 | response_data["status"] = status_mapping[response_data["status"]]
554 |
555 | # Rename status field to match debug tool
556 | if f"{tool_name}_status" in response_data:
557 | response_data["investigation_status"] = response_data.pop(f"{tool_name}_status")
558 | # Add debug-specific status fields
559 | response_data["investigation_status"]["hypotheses_formed"] = len(self.consolidated_findings.hypotheses)
560 |
561 | # Rename complete investigation data
562 | if f"complete_{tool_name}" in response_data:
563 | response_data["complete_investigation"] = response_data.pop(f"complete_{tool_name}")
564 |
565 | # Map the completion flag to match original debug tool
566 | if f"{tool_name}_complete" in response_data:
567 | response_data["investigation_complete"] = response_data.pop(f"{tool_name}_complete")
568 |
569 | # Map the required flag to match original debug tool
570 | if f"{tool_name}_required" in response_data:
571 | response_data["investigation_required"] = response_data.pop(f"{tool_name}_required")
572 |
573 | return response_data
574 |
575 | # Required abstract methods from BaseTool
576 | def get_request_model(self):
577 | """Return the debug-specific request model."""
578 | return DebugInvestigationRequest
579 |
580 | async def prepare_prompt(self, request) -> str:
581 | """Not used - workflow tools use execute_workflow()."""
582 | return "" # Workflow tools use execute_workflow() directly
583 |
```
--------------------------------------------------------------------------------
/tools/analyze.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | AnalyzeWorkflow tool - Step-by-step code analysis with systematic investigation
3 |
4 | This tool provides a structured workflow for comprehensive code and file analysis.
5 | It guides the CLI agent through systematic investigation steps with forced pauses between each step
6 | to ensure thorough code examination, pattern identification, and architectural assessment before proceeding.
7 | The tool supports complex analysis scenarios including architectural review, performance analysis,
8 | security assessment, and maintainability evaluation.
9 |
10 | Key features:
11 | - Step-by-step analysis workflow with progress tracking
12 | - Context-aware file embedding (references during investigation, full content for analysis)
13 | - Automatic pattern and insight tracking with categorization
14 | - Expert analysis integration with external models
15 | - Support for focused analysis (architecture, performance, security, quality)
16 | - Confidence-based workflow optimization
17 | """
18 |
19 | import logging
20 | from typing import TYPE_CHECKING, Any, Literal, Optional
21 |
22 | from pydantic import Field, model_validator
23 |
24 | if TYPE_CHECKING:
25 | from tools.models import ToolModelCategory
26 |
27 | from config import TEMPERATURE_ANALYTICAL
28 | from systemprompts import ANALYZE_PROMPT
29 | from tools.shared.base_models import WorkflowRequest
30 |
31 | from .workflow.base import WorkflowTool
32 |
33 | logger = logging.getLogger(__name__)
34 |
35 | # Tool-specific field descriptions for analyze workflow
36 | ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS = {
37 | "step": (
38 | "The analysis plan. Step 1: State your strategy, including how you will map the codebase structure, "
39 | "understand business logic, and assess code quality, performance implications, and architectural patterns. "
40 | "Later steps: Report findings and adapt the approach as new insights emerge."
41 | ),
42 | "step_number": (
43 | "The index of the current step in the analysis sequence, beginning at 1. Each step should build upon or "
44 | "revise the previous one."
45 | ),
46 | "total_steps": (
47 | "Your current estimate for how many steps will be needed to complete the analysis. "
48 | "Adjust as new findings emerge."
49 | ),
50 | "next_step_required": (
51 | "Set to true if you plan to continue the investigation with another step. False means you believe the "
52 | "analysis is complete and ready for expert validation."
53 | ),
54 | "findings": (
55 | "Summary of discoveries from this step, including architectural patterns, tech stack assessment, scalability characteristics, "
56 | "performance implications, maintainability factors, and strategic improvement opportunities. "
57 | "IMPORTANT: Document both strengths (good patterns, solid architecture) and concerns (tech debt, overengineering, unnecessary complexity). "
58 | "In later steps, confirm or update past findings with additional evidence."
59 | ),
60 | "files_checked": (
61 | "List all files examined (absolute paths). Include even ruled-out files to track exploration path."
62 | ),
63 | "relevant_files": (
64 | "Subset of files_checked directly relevant to analysis findings (absolute paths). Include files with "
65 | "significant patterns, architectural decisions, or strategic improvement opportunities."
66 | ),
67 | "relevant_context": (
68 | "List methods/functions central to analysis findings, in 'ClassName.methodName' or 'functionName' format. "
69 | "Prioritize those demonstrating key patterns, architectural decisions, or improvement opportunities."
70 | ),
71 | "images": (
72 | "Optional absolute paths to architecture diagrams or visual references that help with analysis context."
73 | ),
74 | "confidence": (
75 | "Your confidence in the analysis: exploring, low, medium, high, very_high, almost_certain, or certain. "
76 | "'certain' indicates the analysis is complete and ready for validation."
77 | ),
78 | "analysis_type": "Type of analysis to perform (architecture, performance, security, quality, general)",
79 | "output_format": "How to format the output (summary, detailed, actionable)",
80 | }
81 |
82 |
83 | class AnalyzeWorkflowRequest(WorkflowRequest):
84 | """Request model for analyze workflow investigation steps"""
85 |
86 | # Required fields for each investigation step
87 | step: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"])
88 | step_number: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
89 | total_steps: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
90 | next_step_required: bool = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
91 |
92 | # Investigation tracking fields
93 | findings: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
94 | files_checked: list[str] = Field(
95 | default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
96 | )
97 | relevant_files: list[str] = Field(
98 | default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
99 | )
100 | relevant_context: list[str] = Field(
101 | default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
102 | )
103 |
104 | # Issues found during analysis (structured with severity)
105 | issues_found: list[dict] = Field(
106 | default_factory=list,
107 | description="Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
108 | )
109 |
110 | # Optional images for visual context
111 | images: Optional[list[str]] = Field(default=None, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"])
112 |
113 | # Analyze-specific fields (only used in step 1 to initialize)
114 | # Note: Use relevant_files field instead of files for consistency across workflow tools
115 | analysis_type: Optional[Literal["architecture", "performance", "security", "quality", "general"]] = Field(
116 | "general", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"]
117 | )
118 | output_format: Optional[Literal["summary", "detailed", "actionable"]] = Field(
119 | "detailed", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"]
120 | )
121 |
122 | # Keep thinking_mode from original analyze tool; temperature is inherited from WorkflowRequest
123 |
124 | @model_validator(mode="after")
125 | def validate_step_one_requirements(self):
126 | """Ensure step 1 has required relevant_files."""
127 | if self.step_number == 1:
128 | if not self.relevant_files:
129 | raise ValueError("Step 1 requires 'relevant_files' field to specify files or directories to analyze")
130 | return self
131 |
132 |
133 | class AnalyzeTool(WorkflowTool):
134 | """
135 | Analyze workflow tool for step-by-step code analysis and expert validation.
136 |
137 | This tool implements a structured analysis workflow that guides users through
138 | methodical investigation steps, ensuring thorough code examination, pattern identification,
139 | and architectural assessment before reaching conclusions. It supports complex analysis scenarios
140 | including architectural review, performance analysis, security assessment, and maintainability evaluation.
141 | """
142 |
143 | def __init__(self):
144 | super().__init__()
145 | self.initial_request = None
146 | self.analysis_config = {}
147 |
148 | def get_name(self) -> str:
149 | return "analyze"
150 |
151 | def get_description(self) -> str:
152 | return (
153 | "Performs comprehensive code analysis with systematic investigation and expert validation. "
154 | "Use for architecture, performance, maintainability, and pattern analysis. "
155 | "Guides through structured code review and strategic planning."
156 | )
157 |
158 | def get_system_prompt(self) -> str:
159 | return ANALYZE_PROMPT
160 |
161 | def get_default_temperature(self) -> float:
162 | return TEMPERATURE_ANALYTICAL
163 |
164 | def get_model_category(self) -> "ToolModelCategory":
165 | """Analyze workflow requires thorough analysis and reasoning"""
166 | from tools.models import ToolModelCategory
167 |
168 | return ToolModelCategory.EXTENDED_REASONING
169 |
170 | def get_workflow_request_model(self):
171 | """Return the analyze workflow-specific request model."""
172 | return AnalyzeWorkflowRequest
173 |
174 | def get_input_schema(self) -> dict[str, Any]:
175 | """Generate input schema using WorkflowSchemaBuilder with analyze-specific overrides."""
176 | from .workflow.schema_builders import WorkflowSchemaBuilder
177 |
178 | # Fields to exclude from analyze workflow (inherited from WorkflowRequest but not used)
179 | excluded_fields = {"hypothesis", "confidence"}
180 |
181 | # Analyze workflow-specific field overrides
182 | analyze_field_overrides = {
183 | "step": {
184 | "type": "string",
185 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"],
186 | },
187 | "step_number": {
188 | "type": "integer",
189 | "minimum": 1,
190 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
191 | },
192 | "total_steps": {
193 | "type": "integer",
194 | "minimum": 1,
195 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
196 | },
197 | "next_step_required": {
198 | "type": "boolean",
199 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
200 | },
201 | "findings": {
202 | "type": "string",
203 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
204 | },
205 | "files_checked": {
206 | "type": "array",
207 | "items": {"type": "string"},
208 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"],
209 | },
210 | "relevant_files": {
211 | "type": "array",
212 | "items": {"type": "string"},
213 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
214 | },
215 | "confidence": {
216 | "type": "string",
217 | "enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
218 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["confidence"],
219 | },
220 | "images": {
221 | "type": "array",
222 | "items": {"type": "string"},
223 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"],
224 | },
225 | "issues_found": {
226 | "type": "array",
227 | "items": {"type": "object"},
228 | "description": "Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
229 | },
230 | "analysis_type": {
231 | "type": "string",
232 | "enum": ["architecture", "performance", "security", "quality", "general"],
233 | "default": "general",
234 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"],
235 | },
236 | "output_format": {
237 | "type": "string",
238 | "enum": ["summary", "detailed", "actionable"],
239 | "default": "detailed",
240 | "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"],
241 | },
242 | }
243 |
244 | # Use WorkflowSchemaBuilder with analyze-specific tool fields
245 | return WorkflowSchemaBuilder.build_schema(
246 | tool_specific_fields=analyze_field_overrides,
247 | model_field_schema=self.get_model_field_schema(),
248 | auto_mode=self.is_effective_auto_mode(),
249 | tool_name=self.get_name(),
250 | excluded_workflow_fields=list(excluded_fields),
251 | )
252 |
253 | def get_required_actions(
254 | self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
255 | ) -> list[str]:
256 | """Define required actions for each investigation phase."""
257 | if step_number == 1:
258 | # Initial analysis investigation tasks
259 | return [
260 | "Read and understand the code files specified for analysis",
261 | "Map the tech stack, frameworks, and overall architecture",
262 | "Identify the main components, modules, and their relationships",
263 | "Understand the business logic and intended functionality",
264 | "Examine architectural patterns and design decisions used",
265 | "Look for strengths, risks, and strategic improvement areas",
266 | ]
267 | elif step_number < total_steps:
268 | # Need deeper investigation
269 | return [
270 | "Examine specific architectural patterns and design decisions in detail",
271 | "Analyze scalability characteristics and performance implications",
272 | "Assess maintainability factors: module cohesion, coupling, tech debt",
273 | "Identify security posture and potential systemic vulnerabilities",
274 | "Look for overengineering, unnecessary complexity, or missing abstractions",
275 | "Evaluate how well the architecture serves business and scaling goals",
276 | ]
277 | else:
278 | # Close to completion - need final verification
279 | return [
280 | "Verify all significant architectural insights have been documented",
281 | "Confirm strategic improvement opportunities are comprehensively captured",
282 | "Ensure both strengths and risks are properly identified with evidence",
283 | "Validate that findings align with the analysis type and goals specified",
284 | "Check that recommendations are actionable and proportional to the codebase",
285 | "Confirm the analysis provides clear guidance for strategic decisions",
286 | ]
287 |
288 | def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
289 | """
290 | Always call expert analysis for comprehensive validation.
291 |
292 | Analysis benefits from a second opinion to ensure completeness.
293 | """
294 | # Check if user explicitly requested to skip assistant model
295 | if request and not self.get_request_use_assistant_model(request):
296 | return False
297 |
298 | # For analysis, we always want expert validation if we have any meaningful data
299 | return len(consolidated_findings.relevant_files) > 0 or len(consolidated_findings.findings) >= 1
300 |
301 | def prepare_expert_analysis_context(self, consolidated_findings) -> str:
302 | """Prepare context for external model call for final analysis validation."""
303 | context_parts = [
304 | f"=== ANALYSIS REQUEST ===\\n{self.initial_request or 'Code analysis workflow initiated'}\\n=== END REQUEST ==="
305 | ]
306 |
307 | # Add investigation summary
308 | investigation_summary = self._build_analysis_summary(consolidated_findings)
309 | context_parts.append(
310 | f"\\n=== AGENT'S ANALYSIS INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
311 | )
312 |
313 | # Add analysis configuration context if available
314 | if self.analysis_config:
315 | config_text = "\\n".join(f"- {key}: {value}" for key, value in self.analysis_config.items() if value)
316 | context_parts.append(f"\\n=== ANALYSIS CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")
317 |
318 | # Add relevant code elements if available
319 | if consolidated_findings.relevant_context:
320 | methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
321 | context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")
322 |
323 | # Add assessment evolution if available
324 | if consolidated_findings.hypotheses:
325 | assessments_text = "\\n".join(
326 | f"Step {h['step']}: {h['hypothesis']}" for h in consolidated_findings.hypotheses
327 | )
328 | context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")
329 |
330 | # Add images if available
331 | if consolidated_findings.images:
332 | images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
333 | context_parts.append(
334 | f"\\n=== VISUAL ANALYSIS INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
335 | )
336 |
337 | return "\\n".join(context_parts)
338 |
339 | def _build_analysis_summary(self, consolidated_findings) -> str:
340 | """Prepare a comprehensive summary of the analysis investigation."""
341 | summary_parts = [
342 | "=== SYSTEMATIC ANALYSIS INVESTIGATION SUMMARY ===",
343 | f"Total steps: {len(consolidated_findings.findings)}",
344 | f"Files examined: {len(consolidated_findings.files_checked)}",
345 | f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
346 | f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
347 | "",
348 | "=== INVESTIGATION PROGRESSION ===",
349 | ]
350 |
351 | for finding in consolidated_findings.findings:
352 | summary_parts.append(finding)
353 |
354 | return "\\n".join(summary_parts)
355 |
356 | def should_include_files_in_expert_prompt(self) -> bool:
357 | """Include files in expert analysis for comprehensive validation."""
358 | return True
359 |
360 | def should_embed_system_prompt(self) -> bool:
361 | """Embed system prompt in expert analysis for proper context."""
362 | return True
363 |
364 | def get_expert_thinking_mode(self) -> str:
365 | """Use high thinking mode for thorough analysis."""
366 | return "high"
367 |
368 | def get_expert_analysis_instruction(self) -> str:
369 | """Get specific instruction for analysis expert validation."""
370 | return (
371 | "Please provide comprehensive analysis validation based on the investigation findings. "
372 | "Focus on identifying any remaining architectural insights, validating the completeness of the analysis, "
373 | "and providing final strategic recommendations following the structured format specified in the system prompt."
374 | )
375 |
376 | # Hook method overrides for analyze-specific behavior
377 |
378 | def prepare_step_data(self, request) -> dict:
379 | """
380 | Map analyze-specific fields for internal processing.
381 | """
382 | step_data = {
383 | "step": request.step,
384 | "step_number": request.step_number,
385 | "findings": request.findings,
386 | "files_checked": request.files_checked,
387 | "relevant_files": request.relevant_files,
388 | "relevant_context": request.relevant_context,
389 | "issues_found": request.issues_found, # Analyze workflow uses issues_found for structured problem tracking
390 | "confidence": "medium", # Fixed value for workflow compatibility
391 | "hypothesis": request.findings, # Map findings to hypothesis for compatibility
392 | "images": request.images or [],
393 | }
394 | return step_data
395 |
396 | def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
397 | """
398 | Analyze workflow always uses expert analysis for comprehensive validation.
399 |
400 | Analysis benefits from a second opinion to ensure completeness and catch
401 | any missed insights or alternative perspectives.
402 | """
403 | return False
404 |
405 | def store_initial_issue(self, step_description: str):
406 | """Store initial request for expert analysis."""
407 | self.initial_request = step_description
408 |
409 | # Override inheritance hooks for analyze-specific behavior
410 |
411 | def get_completion_status(self) -> str:
412 | """Analyze tools use analysis-specific status."""
413 | return "analysis_complete_ready_for_implementation"
414 |
415 | def get_completion_data_key(self) -> str:
416 | """Analyze uses 'complete_analysis' key."""
417 | return "complete_analysis"
418 |
419 | def get_final_analysis_from_request(self, request):
420 | """Analyze tools use 'findings' field."""
421 | return request.findings
422 |
423 | def get_confidence_level(self, request) -> str:
424 | """Analyze tools use fixed confidence for consistency."""
425 | return "medium"
426 |
427 | def get_completion_message(self) -> str:
428 | """Analyze-specific completion message."""
429 | return (
430 | "Analysis complete. You have identified all significant patterns, "
431 | "architectural insights, and strategic opportunities. MANDATORY: Present the user with the complete "
432 | "analysis results organized by strategic impact, and IMMEDIATELY proceed with implementing the "
433 | "highest priority recommendations or provide specific guidance for improvements. Focus on actionable "
434 | "strategic insights."
435 | )
436 |
437 | def get_skip_reason(self) -> str:
438 | """Analyze-specific skip reason."""
439 | return "Completed comprehensive analysis locally"
440 |
441 | def get_skip_expert_analysis_status(self) -> str:
442 | """Analyze-specific expert analysis skip status."""
443 | return "skipped_due_to_complete_analysis"
444 |
445 | def prepare_work_summary(self) -> str:
446 | """Analyze-specific work summary."""
447 | return self._build_analysis_summary(self.consolidated_findings)
448 |
449 | def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
450 | """
451 | Analyze-specific completion message.
452 | """
453 | base_message = (
454 | "ANALYSIS IS COMPLETE. You MUST now summarize and present ALL analysis findings organized by "
455 | "strategic impact (Critical → High → Medium → Low), specific architectural insights with code references, "
456 | "and exact recommendations for improvement. Clearly prioritize the top 3 strategic opportunities that need "
457 | "immediate attention. Provide concrete, actionable guidance for each finding—make it easy for a developer "
458 | "to understand exactly what strategic improvements to implement and how to approach them."
459 | )
460 |
461 | # Add expert analysis guidance only when expert analysis was actually used
462 | if expert_analysis_used:
463 | expert_guidance = self.get_expert_analysis_guidance()
464 | if expert_guidance:
465 | return f"{base_message}\n\n{expert_guidance}"
466 |
467 | return base_message
468 |
469 | def get_expert_analysis_guidance(self) -> str:
470 | """
471 | Provide specific guidance for handling expert analysis in code analysis.
472 | """
473 | return (
474 | "IMPORTANT: Analysis from an assistant model has been provided above. You MUST thoughtfully evaluate and validate "
475 | "the expert insights rather than treating them as definitive conclusions. Cross-reference the expert "
476 | "analysis with your own systematic investigation, verify that architectural recommendations are "
477 | "appropriate for this codebase's scale and context, and ensure suggested improvements align with "
478 | "the project's goals and constraints. Present a comprehensive synthesis that combines your detailed "
479 | "analysis with validated expert perspectives, clearly distinguishing between patterns you've "
480 | "independently identified and additional strategic insights from expert validation."
481 | )
482 |
483 | def get_step_guidance_message(self, request) -> str:
484 | """
485 | Analyze-specific step guidance with detailed investigation instructions.
486 | """
487 | step_guidance = self.get_analyze_step_guidance(request.step_number, request)
488 | return step_guidance["next_steps"]
489 |
490 | def get_analyze_step_guidance(self, step_number: int, request) -> dict[str, Any]:
491 | """
492 | Provide step-specific guidance for analyze workflow.
493 | """
494 | # Generate the next steps instruction based on required actions
495 | required_actions = self.get_required_actions(step_number, "medium", request.findings, request.total_steps)
496 |
497 | if step_number == 1:
498 | next_steps = (
499 | f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
500 | f"the code files thoroughly using appropriate tools. CRITICAL AWARENESS: You need to understand "
501 | f"the architectural patterns, assess scalability and performance characteristics, identify strategic "
502 | f"improvement areas, and look for systemic risks, overengineering, and missing abstractions. "
503 | f"Use file reading tools, code analysis, and systematic examination to gather comprehensive information. "
504 | f"Only call {self.get_name()} again AFTER completing your investigation. When you call "
505 | f"{self.get_name()} next time, use step_number: {step_number + 1} and report specific "
506 | f"files examined, architectural insights found, and strategic assessment discoveries."
507 | )
508 | elif step_number < request.total_steps:
509 | next_steps = (
510 | f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
511 | f"deeper analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
512 | + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
513 | + f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
514 | + "completing these analysis tasks."
515 | )
516 | else:
517 | next_steps = (
518 | f"WAIT! Your analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
519 | + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
520 | + f"\\n\\nREMEMBER: Ensure you have identified all significant architectural insights and strategic "
521 | f"opportunities across all areas. Document findings with specific file references and "
522 | f"code examples where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
523 | )
524 |
525 | return {"next_steps": next_steps}
526 |
527 | def customize_workflow_response(self, response_data: dict, request) -> dict:
528 | """
529 | Customize response to match analyze workflow format.
530 | """
531 | # Store initial request on first step
532 | if request.step_number == 1:
533 | self.initial_request = request.step
534 | # Store analysis configuration for expert analysis
535 | if request.relevant_files:
536 | self.analysis_config = {
537 | "relevant_files": request.relevant_files,
538 | "analysis_type": request.analysis_type,
539 | "output_format": request.output_format,
540 | }
541 |
542 | # Convert generic status names to analyze-specific ones
543 | tool_name = self.get_name()
544 | status_mapping = {
545 | f"{tool_name}_in_progress": "analysis_in_progress",
546 | f"pause_for_{tool_name}": "pause_for_analysis",
547 | f"{tool_name}_required": "analysis_required",
548 | f"{tool_name}_complete": "analysis_complete",
549 | }
550 |
551 | if response_data["status"] in status_mapping:
552 | response_data["status"] = status_mapping[response_data["status"]]
553 |
554 | # Rename status field to match analyze workflow
555 | if f"{tool_name}_status" in response_data:
556 | response_data["analysis_status"] = response_data.pop(f"{tool_name}_status")
557 | # Add analyze-specific status fields
558 | response_data["analysis_status"]["insights_by_severity"] = {}
559 | for insight in self.consolidated_findings.issues_found:
560 | severity = insight.get("severity", "unknown")
561 | if severity not in response_data["analysis_status"]["insights_by_severity"]:
562 | response_data["analysis_status"]["insights_by_severity"][severity] = 0
563 | response_data["analysis_status"]["insights_by_severity"][severity] += 1
564 | response_data["analysis_status"]["analysis_confidence"] = self.get_request_confidence(request)
565 |
566 | # Map complete_analyze to complete_analysis
567 | if f"complete_{tool_name}" in response_data:
568 | response_data["complete_analysis"] = response_data.pop(f"complete_{tool_name}")
569 |
570 | # Map the completion flag to match analyze workflow
571 | if f"{tool_name}_complete" in response_data:
572 | response_data["analysis_complete"] = response_data.pop(f"{tool_name}_complete")
573 |
574 | return response_data
575 |
576 | # Required abstract methods from BaseTool
577 | def get_request_model(self):
578 | """Return the analyze workflow-specific request model."""
579 | return AnalyzeWorkflowRequest
580 |
581 | async def prepare_prompt(self, request) -> str:
582 | """Not used - workflow tools use execute_workflow()."""
583 | return "" # Workflow tools use execute_workflow() directly
584 |
```
--------------------------------------------------------------------------------
/simulator_tests/test_planner_validation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | PlannerWorkflow Tool Validation Test
4 |
5 | Tests the planner tool's capabilities using the new workflow architecture.
6 | This validates that the new workflow-based implementation maintains all the
7 | functionality of the original planner tool while using the workflow pattern
8 | like the debug tool.
9 | """
10 |
11 | import json
12 | from typing import Optional
13 |
14 | from .conversation_base_test import ConversationBaseTest
15 |
16 |
17 | class PlannerValidationTest(ConversationBaseTest):
18 | """Test planner tool with new workflow architecture"""
19 |
20 | @property
21 | def test_name(self) -> str:
22 | return "planner_validation"
23 |
24 | @property
25 | def test_description(self) -> str:
26 | return "PlannerWorkflow tool validation with new workflow architecture"
27 |
28 | def run_test(self) -> bool:
29 | """Test planner tool capabilities"""
30 | # Set up the test environment
31 | self.setUp()
32 |
33 | try:
34 | self.logger.info("Test: PlannerWorkflow tool validation (new architecture)")
35 |
36 | # Test 1: Single planning session with workflow architecture
37 | if not self._test_single_planning_session():
38 | return False
39 |
40 | # Test 2: Planning with continuation using workflow
41 | if not self._test_planning_with_continuation():
42 | return False
43 |
44 | # Test 3: Complex plan with deep thinking pauses
45 | if not self._test_complex_plan_deep_thinking():
46 | return False
47 |
48 | # Test 4: Self-contained completion (no expert analysis)
49 | if not self._test_self_contained_completion():
50 | return False
51 |
52 | # Test 5: Branching and revision with workflow
53 | if not self._test_branching_and_revision():
54 | return False
55 |
56 | # Test 6: Workflow file context behavior
57 | if not self._test_workflow_file_context():
58 | return False
59 |
60 | self.logger.info(" ✅ All planner validation tests passed")
61 | return True
62 |
63 | except Exception as e:
64 | self.logger.error(f"PlannerWorkflow validation test failed: {e}")
65 | return False
66 |
67 | def _test_single_planning_session(self) -> bool:
68 | """Test a complete planning session with workflow architecture"""
69 | try:
70 | self.logger.info(" 1.1: Testing single planning session with workflow")
71 |
72 | # Step 1: Start planning
73 | self.logger.info(" 1.1.1: Step 1 - Initial planning step")
74 | response1, continuation_id = self.call_mcp_tool(
75 | "planner",
76 | {
77 | "step": "I need to plan a comprehensive API redesign for our legacy system. Let me start by analyzing the current state and identifying key requirements for the new API architecture.",
78 | "step_number": 1,
79 | "total_steps": 4,
80 | "next_step_required": True,
81 | "model": "flash",
82 | },
83 | )
84 |
85 | if not response1 or not continuation_id:
86 | self.logger.error("Failed to get initial planning response")
87 | return False
88 |
89 | # Parse and validate JSON response
90 | response1_data = self._parse_planner_response(response1)
91 | if not response1_data:
92 | return False
93 |
94 | # Validate step 1 response structure - expect pause_for_planner for next_step_required=True
95 | if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_planner"):
96 | return False
97 |
98 | # Debug: Log the actual response structure to see what we're getting
99 | self.logger.debug(f"Response structure: {list(response1_data.keys())}")
100 |
101 | # Check workflow-specific response structure (more flexible)
102 | status_key = None
103 | for key in response1_data.keys():
104 | if key.endswith("_status"):
105 | status_key = key
106 | break
107 |
108 | if not status_key:
109 | self.logger.error(f"Missing workflow status field in response: {list(response1_data.keys())}")
110 | return False
111 |
112 | self.logger.debug(f"Found status field: {status_key}")
113 |
114 | # Check required_actions for workflow guidance
115 | if not response1_data.get("required_actions"):
116 | self.logger.error("Missing required_actions in workflow response")
117 | return False
118 |
119 | self.logger.info(f" ✅ Step 1 successful with workflow, continuation_id: {continuation_id}")
120 |
121 | # Step 2: Continue planning
122 | self.logger.info(" 1.1.2: Step 2 - API domain analysis")
123 | response2, _ = self.call_mcp_tool(
124 | "planner",
125 | {
126 | "step": "After analyzing the current API, I can identify three main domains: User Management, Content Management, and Analytics. Let me design the new API structure with RESTful endpoints and proper versioning.",
127 | "step_number": 2,
128 | "total_steps": 4,
129 | "next_step_required": True,
130 | "continuation_id": continuation_id,
131 | "model": "flash",
132 | },
133 | )
134 |
135 | if not response2:
136 | self.logger.error("Failed to continue planning to step 2")
137 | return False
138 |
139 | response2_data = self._parse_planner_response(response2)
140 | if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_planner"):
141 | return False
142 |
143 | # Check step history tracking in workflow (more flexible)
144 | status_key = None
145 | for key in response2_data.keys():
146 | if key.endswith("_status"):
147 | status_key = key
148 | break
149 |
150 | if status_key:
151 | workflow_status = response2_data.get(status_key, {})
152 | step_history_length = workflow_status.get("step_history_length", 0)
153 | if step_history_length < 2:
154 | self.logger.error(f"Step history not properly tracked in workflow: {step_history_length}")
155 | return False
156 | self.logger.debug(f"Step history length: {step_history_length}")
157 | else:
158 | self.logger.warning("No workflow status found, skipping step history check")
159 |
160 | self.logger.info(" ✅ Step 2 successful with workflow tracking")
161 |
162 | # Step 3: Final step - should trigger completion
163 | self.logger.info(" 1.1.3: Step 3 - Final planning step")
164 | response3, _ = self.call_mcp_tool(
165 | "planner",
166 | {
167 | "step": "API redesign plan complete: Phase 1 - User Management API, Phase 2 - Content Management API, Phase 3 - Analytics API. Each phase includes proper authentication, rate limiting, and comprehensive documentation.",
168 | "step_number": 3,
169 | "total_steps": 3, # Adjusted total
170 | "next_step_required": False, # Final step - should complete without expert analysis
171 | "continuation_id": continuation_id,
172 | "model": "flash",
173 | },
174 | )
175 |
176 | if not response3:
177 | self.logger.error("Failed to complete planning session")
178 | return False
179 |
180 | response3_data = self._parse_planner_response(response3)
181 | if not response3_data:
182 | return False
183 |
184 | # Validate final response structure - should be self-contained completion
185 | if response3_data.get("status") != "planner_complete":
186 | self.logger.error(f"Expected status 'planner_complete', got '{response3_data.get('status')}'")
187 | return False
188 |
189 | if not response3_data.get("planning_complete"):
190 | self.logger.error("Expected planning_complete=true for final step")
191 | return False
192 |
193 | # Should NOT have expert_analysis (self-contained)
194 | if "expert_analysis" in response3_data:
195 | self.logger.error("PlannerWorkflow should be self-contained without expert analysis")
196 | return False
197 |
198 | # Check plan_summary exists
199 | if not response3_data.get("plan_summary"):
200 | self.logger.error("Missing plan_summary in final step")
201 | return False
202 |
203 | self.logger.info(" ✅ Planning session completed successfully with workflow architecture")
204 |
205 | # Store continuation_id for next test
206 | self.api_continuation_id = continuation_id
207 | return True
208 |
209 | except Exception as e:
210 | self.logger.error(f"Single planning session test failed: {e}")
211 | return False
212 |
213 | def _test_planning_with_continuation(self) -> bool:
214 | """Test planning continuation with workflow architecture"""
215 | try:
216 | self.logger.info(" 1.2: Testing planning continuation with workflow")
217 |
218 | # Use continuation from previous test if available
219 | continuation_id = getattr(self, "api_continuation_id", None)
220 | if not continuation_id:
221 | # Start fresh if no continuation available
222 | self.logger.info(" 1.2.0: Starting fresh planning session")
223 | response0, continuation_id = self.call_mcp_tool(
224 | "planner",
225 | {
226 | "step": "Planning API security strategy",
227 | "step_number": 1,
228 | "total_steps": 2,
229 | "next_step_required": True,
230 | "model": "flash",
231 | },
232 | )
233 | if not response0 or not continuation_id:
234 | self.logger.error("Failed to start fresh planning session")
235 | return False
236 |
237 | # Test continuation step
238 | self.logger.info(" 1.2.1: Continue planning session")
239 | response1, _ = self.call_mcp_tool(
240 | "planner",
241 | {
242 | "step": "Building on the API redesign, let me now plan the security implementation with OAuth 2.0, API keys, and rate limiting strategies.",
243 | "step_number": 2,
244 | "total_steps": 2,
245 | "next_step_required": True,
246 | "continuation_id": continuation_id,
247 | "model": "flash",
248 | },
249 | )
250 |
251 | if not response1:
252 | self.logger.error("Failed to continue planning")
253 | return False
254 |
255 | response1_data = self._parse_planner_response(response1)
256 | if not response1_data:
257 | return False
258 |
259 | # Validate continuation behavior
260 | if not self._validate_step_response(response1_data, 2, 2, True, "pause_for_planner"):
261 | return False
262 |
263 | # Check that continuation_id is preserved
264 | if response1_data.get("continuation_id") != continuation_id:
265 | self.logger.error("Continuation ID not preserved in workflow")
266 | return False
267 |
268 | self.logger.info(" ✅ Planning continuation working with workflow")
269 | return True
270 |
271 | except Exception as e:
272 | self.logger.error(f"Planning continuation test failed: {e}")
273 | return False
274 |
275 | def _test_complex_plan_deep_thinking(self) -> bool:
276 | """Test complex plan with deep thinking pauses"""
277 | try:
278 | self.logger.info(" 1.3: Testing complex plan with deep thinking pauses")
279 |
280 | # Start complex plan (≥5 steps) - should trigger deep thinking
281 | self.logger.info(" 1.3.1: Step 1 of complex plan (should trigger deep thinking)")
282 | response1, continuation_id = self.call_mcp_tool(
283 | "planner",
284 | {
285 | "step": "I need to plan a complete digital transformation for our enterprise organization, including cloud migration, process automation, and cultural change management.",
286 | "step_number": 1,
287 | "total_steps": 8, # Complex plan ≥5 steps
288 | "next_step_required": True,
289 | "model": "flash",
290 | },
291 | )
292 |
293 | if not response1 or not continuation_id:
294 | self.logger.error("Failed to start complex planning")
295 | return False
296 |
297 | response1_data = self._parse_planner_response(response1)
298 | if not response1_data:
299 | return False
300 |
301 | # Should trigger deep thinking pause for complex plan
302 | if response1_data.get("status") != "pause_for_deep_thinking":
303 | self.logger.error("Expected deep thinking pause for complex plan step 1")
304 | return False
305 |
306 | if not response1_data.get("thinking_required"):
307 | self.logger.error("Expected thinking_required=true for complex plan")
308 | return False
309 |
310 | # Check required thinking actions
311 | required_thinking = response1_data.get("required_thinking", [])
312 | if len(required_thinking) < 4:
313 | self.logger.error("Expected comprehensive thinking requirements for complex plan")
314 | return False
315 |
316 | # Check for deep thinking guidance in next_steps
317 | next_steps = response1_data.get("next_steps", "")
318 | if "MANDATORY" not in next_steps or "deep thinking" not in next_steps.lower():
319 | self.logger.error("Expected mandatory deep thinking guidance")
320 | return False
321 |
322 | self.logger.info(" ✅ Complex plan step 1 correctly triggered deep thinking pause")
323 |
324 | # Step 2 of complex plan - should also trigger deep thinking
325 | self.logger.info(" 1.3.2: Step 2 of complex plan (should trigger deep thinking)")
326 | response2, _ = self.call_mcp_tool(
327 | "planner",
328 | {
329 | "step": "After deep analysis, I can see this transformation requires three parallel tracks: Technical Infrastructure, Business Process, and Human Capital. Let me design the coordination strategy.",
330 | "step_number": 2,
331 | "total_steps": 8,
332 | "next_step_required": True,
333 | "continuation_id": continuation_id,
334 | "model": "flash",
335 | },
336 | )
337 |
338 | if not response2:
339 | self.logger.error("Failed to continue complex planning")
340 | return False
341 |
342 | response2_data = self._parse_planner_response(response2)
343 | if not response2_data:
344 | return False
345 |
346 | # Step 2 should also trigger deep thinking for complex plans
347 | if response2_data.get("status") != "pause_for_deep_thinking":
348 | self.logger.error("Expected deep thinking pause for complex plan step 2")
349 | return False
350 |
351 | self.logger.info(" ✅ Complex plan step 2 correctly triggered deep thinking pause")
352 |
353 | # Step 4 of complex plan - should use normal flow (after step 3)
354 | self.logger.info(" 1.3.3: Step 4 of complex plan (should use normal flow)")
355 | response4, _ = self.call_mcp_tool(
356 | "planner",
357 | {
358 | "step": "Now moving to tactical planning: Phase 1 execution details with specific timelines and resource allocation for the technical infrastructure track.",
359 | "step_number": 4,
360 | "total_steps": 8,
361 | "next_step_required": True,
362 | "continuation_id": continuation_id,
363 | "model": "flash",
364 | },
365 | )
366 |
367 | if not response4:
368 | self.logger.error("Failed to continue to step 4")
369 | return False
370 |
371 | response4_data = self._parse_planner_response(response4)
372 | if not response4_data:
373 | return False
374 |
375 | # Step 4 should use normal flow (no more deep thinking pauses)
376 | if response4_data.get("status") != "pause_for_planner":
377 | self.logger.error("Expected normal planning flow for step 4")
378 | return False
379 |
380 | if response4_data.get("thinking_required"):
381 | self.logger.error("Step 4 should not require special thinking pause")
382 | return False
383 |
384 | self.logger.info(" ✅ Complex plan transitions to normal flow after step 3")
385 | return True
386 |
387 | except Exception as e:
388 | self.logger.error(f"Complex plan deep thinking test failed: {e}")
389 | return False
390 |
391 | def _test_self_contained_completion(self) -> bool:
392 | """Test self-contained completion without expert analysis"""
393 | try:
394 | self.logger.info(" 1.4: Testing self-contained completion")
395 |
396 | # Simple planning session that should complete without expert analysis
397 | self.logger.info(" 1.4.1: Simple planning session")
398 | response1, continuation_id = self.call_mcp_tool(
399 | "planner",
400 | {
401 | "step": "Planning a simple website redesign with new color scheme and improved navigation.",
402 | "step_number": 1,
403 | "total_steps": 2,
404 | "next_step_required": True,
405 | "model": "flash",
406 | },
407 | )
408 |
409 | if not response1 or not continuation_id:
410 | self.logger.error("Failed to start simple planning")
411 | return False
412 |
413 | # Final step - should complete without expert analysis
414 | self.logger.info(" 1.4.2: Final step - self-contained completion")
415 | response2, _ = self.call_mcp_tool(
416 | "planner",
417 | {
418 | "step": "Website redesign plan complete: Phase 1 - Update color palette and typography, Phase 2 - Redesign navigation structure and user flows.",
419 | "step_number": 2,
420 | "total_steps": 2,
421 | "next_step_required": False, # Final step
422 | "continuation_id": continuation_id,
423 | "model": "flash",
424 | },
425 | )
426 |
427 | if not response2:
428 | self.logger.error("Failed to complete simple planning")
429 | return False
430 |
431 | response2_data = self._parse_planner_response(response2)
432 | if not response2_data:
433 | return False
434 |
435 | # Validate self-contained completion
436 | if response2_data.get("status") != "planner_complete":
437 | self.logger.error("Expected self-contained completion status")
438 | return False
439 |
440 | # Should NOT call expert analysis
441 | if "expert_analysis" in response2_data:
442 | self.logger.error("PlannerWorkflow should not call expert analysis")
443 | return False
444 |
445 | # Should have planning_complete flag
446 | if not response2_data.get("planning_complete"):
447 | self.logger.error("Expected planning_complete=true")
448 | return False
449 |
450 | # Should have plan_summary
451 | if not response2_data.get("plan_summary"):
452 | self.logger.error("Expected plan_summary in completion")
453 | return False
454 |
455 | # Check completion instructions
456 | output = response2_data.get("output", {})
457 | if not output.get("instructions"):
458 | self.logger.error("Missing output instructions for plan presentation")
459 | return False
460 |
461 | self.logger.info(" ✅ Self-contained completion working correctly")
462 | return True
463 |
464 | except Exception as e:
465 | self.logger.error(f"Self-contained completion test failed: {e}")
466 | return False
467 |
468 | def _test_branching_and_revision(self) -> bool:
469 | """Test branching and revision with workflow architecture"""
470 | try:
471 | self.logger.info(" 1.5: Testing branching and revision with workflow")
472 |
473 | # Start planning session for branching test
474 | self.logger.info(" 1.5.1: Start planning for branching test")
475 | response1, continuation_id = self.call_mcp_tool(
476 | "planner",
477 | {
478 | "step": "Planning mobile app development strategy with different technology options to evaluate.",
479 | "step_number": 1,
480 | "total_steps": 4,
481 | "next_step_required": True,
482 | "model": "flash",
483 | },
484 | )
485 |
486 | if not response1 or not continuation_id:
487 | self.logger.error("Failed to start branching test")
488 | return False
489 |
490 | # Create branch
491 | self.logger.info(" 1.5.2: Create branch for React Native approach")
492 | response2, _ = self.call_mcp_tool(
493 | "planner",
494 | {
495 | "step": "Branch A: React Native approach - cross-platform development with shared codebase, faster development cycle, and consistent UI across platforms.",
496 | "step_number": 2,
497 | "total_steps": 4,
498 | "next_step_required": True,
499 | "is_branch_point": True,
500 | "branch_from_step": 1,
501 | "branch_id": "react-native",
502 | "continuation_id": continuation_id,
503 | "model": "flash",
504 | },
505 | )
506 |
507 | if not response2:
508 | self.logger.error("Failed to create branch")
509 | return False
510 |
511 | response2_data = self._parse_planner_response(response2)
512 | if not response2_data:
513 | return False
514 |
515 | # Validate branching in workflow
516 | metadata = response2_data.get("metadata", {})
517 | if not metadata.get("is_branch_point"):
518 | self.logger.error("Branch point not recorded in workflow")
519 | return False
520 |
521 | if metadata.get("branch_id") != "react-native":
522 | self.logger.error("Branch ID not properly recorded")
523 | return False
524 |
525 | if "react-native" not in metadata.get("branches", []):
526 | self.logger.error("Branch not added to branches list")
527 | return False
528 |
529 | self.logger.info(" ✅ Branching working with workflow architecture")
530 |
531 | # Test revision
532 | self.logger.info(" 1.5.3: Test revision capability")
533 | response3, _ = self.call_mcp_tool(
534 | "planner",
535 | {
536 | "step": "Revision of step 2: After consideration, let me revise the React Native approach to include performance optimizations and native module integration for critical features.",
537 | "step_number": 3,
538 | "total_steps": 4,
539 | "next_step_required": True,
540 | "is_step_revision": True,
541 | "revises_step_number": 2,
542 | "continuation_id": continuation_id,
543 | "model": "flash",
544 | },
545 | )
546 |
547 | if not response3:
548 | self.logger.error("Failed to create revision")
549 | return False
550 |
551 | response3_data = self._parse_planner_response(response3)
552 | if not response3_data:
553 | return False
554 |
555 | # Validate revision in workflow
556 | metadata = response3_data.get("metadata", {})
557 | if not metadata.get("is_step_revision"):
558 | self.logger.error("Step revision not recorded in workflow")
559 | return False
560 |
561 | if metadata.get("revises_step_number") != 2:
562 | self.logger.error("Revised step number not properly recorded")
563 | return False
564 |
565 | self.logger.info(" ✅ Revision working with workflow architecture")
566 | return True
567 |
568 | except Exception as e:
569 | self.logger.error(f"Branching and revision test failed: {e}")
570 | return False
571 |
572 | def _test_workflow_file_context(self) -> bool:
573 | """Test workflow file context behavior (should be minimal for planner)"""
574 | try:
575 | self.logger.info(" 1.6: Testing workflow file context behavior")
576 |
577 | # Planner typically doesn't use files, but test the workflow handles this correctly
578 | self.logger.info(" 1.6.1: Planning step with no files (normal case)")
579 | response1, continuation_id = self.call_mcp_tool(
580 | "planner",
581 | {
582 | "step": "Planning data architecture for analytics platform.",
583 | "step_number": 1,
584 | "total_steps": 2,
585 | "next_step_required": True,
586 | "model": "flash",
587 | },
588 | )
589 |
590 | if not response1 or not continuation_id:
591 | self.logger.error("Failed to start workflow file context test")
592 | return False
593 |
594 | response1_data = self._parse_planner_response(response1)
595 | if not response1_data:
596 | return False
597 |
598 | # Planner workflow should not have file_context since it doesn't use files
599 | if "file_context" in response1_data:
600 | self.logger.info(" ℹ️ Workflow file context present but should be minimal for planner")
601 |
602 | # Final step
603 | self.logger.info(" 1.6.2: Final step (should complete without file embedding)")
604 | response2, _ = self.call_mcp_tool(
605 | "planner",
606 | {
607 | "step": "Data architecture plan complete with data lakes, processing pipelines, and analytics layers.",
608 | "step_number": 2,
609 | "total_steps": 2,
610 | "next_step_required": False,
611 | "continuation_id": continuation_id,
612 | "model": "flash",
613 | },
614 | )
615 |
616 | if not response2:
617 | self.logger.error("Failed to complete workflow file context test")
618 | return False
619 |
620 | response2_data = self._parse_planner_response(response2)
621 | if not response2_data:
622 | return False
623 |
624 | # Final step should complete self-contained
625 | if response2_data.get("status") != "planner_complete":
626 | self.logger.error("Expected self-contained completion for planner workflow")
627 | return False
628 |
629 | self.logger.info(" ✅ Workflow file context behavior appropriate for planner")
630 | return True
631 |
632 | except Exception as e:
633 | self.logger.error(f"Workflow file context test failed: {e}")
634 | return False
635 |
636 | def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
637 | """Call an MCP tool in-process - override for planner-specific response handling"""
638 | # Use in-process implementation to maintain conversation memory
639 | response_text, _ = self.call_mcp_tool_direct(tool_name, params)
640 |
641 | if not response_text:
642 | return None, None
643 |
644 | # Extract continuation_id from planner response specifically
645 | continuation_id = self._extract_planner_continuation_id(response_text)
646 |
647 | return response_text, continuation_id
648 |
649 | def _extract_planner_continuation_id(self, response_text: str) -> Optional[str]:
650 | """Extract continuation_id from planner response"""
651 | try:
652 | # Parse the response
653 | response_data = json.loads(response_text)
654 | return response_data.get("continuation_id")
655 |
656 | except json.JSONDecodeError as e:
657 | self.logger.debug(f"Failed to parse response for planner continuation_id: {e}")
658 | return None
659 |
660 | def _parse_planner_response(self, response_text: str) -> dict:
661 | """Parse planner tool JSON response"""
662 | try:
663 | # Parse the response - it should be direct JSON
664 | return json.loads(response_text)
665 |
666 | except json.JSONDecodeError as e:
667 | self.logger.error(f"Failed to parse planner response as JSON: {e}")
668 | self.logger.error(f"Response text: {response_text[:500]}...")
669 | return {}
670 |
671 | def _validate_step_response(
672 | self,
673 | response_data: dict,
674 | expected_step: int,
675 | expected_total: int,
676 | expected_next_required: bool,
677 | expected_status: str,
678 | ) -> bool:
679 | """Validate a planner step response structure"""
680 | try:
681 | # Check status
682 | if response_data.get("status") != expected_status:
683 | self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
684 | return False
685 |
686 | # Check step number
687 | if response_data.get("step_number") != expected_step:
688 | self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
689 | return False
690 |
691 | # Check total steps
692 | if response_data.get("total_steps") != expected_total:
693 | self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
694 | return False
695 |
696 | # Check next_step_required
697 | if response_data.get("next_step_required") != expected_next_required:
698 | self.logger.error(
699 | f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
700 | )
701 | return False
702 |
703 | # Check step_content exists
704 | if not response_data.get("step_content"):
705 | self.logger.error("Missing step_content in response")
706 | return False
707 |
708 | # Check next_steps guidance
709 | if not response_data.get("next_steps"):
710 | self.logger.error("Missing next_steps guidance in response")
711 | return False
712 |
713 | return True
714 |
715 | except Exception as e:
716 | self.logger.error(f"Error validating step response: {e}")
717 | return False
718 |
```