#
tokens: 43577/50000 6/353 files (page 16/25)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 16 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   └── fix-github-issue.md
│   └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── documentation.yml
│   │   ├── feature_request.yml
│   │   └── tool_addition.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-pr.yml
│       ├── docker-release.yml
│       ├── semantic-pr.yml
│       ├── semantic-release.yml
│       └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   ├── constants.py
│   ├── models.py
│   ├── parsers
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│   ├── __init__.py
│   ├── azure_models.json
│   ├── cli_clients
│   │   ├── claude.json
│   │   ├── codex.json
│   │   └── gemini.json
│   ├── custom_models.json
│   ├── dial_models.json
│   ├── gemini_models.json
│   ├── openai_models.json
│   ├── openrouter_models.json
│   └── xai_models.json
├── config.py
├── docker
│   ├── README.md
│   └── scripts
│       ├── build.ps1
│       ├── build.sh
│       ├── deploy.ps1
│       ├── deploy.sh
│       └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── adding_providers.md
│   ├── adding_tools.md
│   ├── advanced-usage.md
│   ├── ai_banter.md
│   ├── ai-collaboration.md
│   ├── azure_openai.md
│   ├── configuration.md
│   ├── context-revival.md
│   ├── contributions.md
│   ├── custom_models.md
│   ├── docker-deployment.md
│   ├── gemini-setup.md
│   ├── getting-started.md
│   ├── index.md
│   ├── locale-configuration.md
│   ├── logging.md
│   ├── model_ranking.md
│   ├── testing.md
│   ├── tools
│   │   ├── analyze.md
│   │   ├── apilookup.md
│   │   ├── challenge.md
│   │   ├── chat.md
│   │   ├── clink.md
│   │   ├── codereview.md
│   │   ├── consensus.md
│   │   ├── debug.md
│   │   ├── docgen.md
│   │   ├── listmodels.md
│   │   ├── planner.md
│   │   ├── precommit.md
│   │   ├── refactor.md
│   │   ├── secaudit.md
│   │   ├── testgen.md
│   │   ├── thinkdeep.md
│   │   ├── tracer.md
│   │   └── version.md
│   ├── troubleshooting.md
│   ├── vcr-testing.md
│   └── wsl-setup.md
├── examples
│   ├── claude_config_macos.json
│   └── claude_config_wsl.json
├── LICENSE
├── providers
│   ├── __init__.py
│   ├── azure_openai.py
│   ├── base.py
│   ├── custom.py
│   ├── dial.py
│   ├── gemini.py
│   ├── openai_compatible.py
│   ├── openai.py
│   ├── openrouter.py
│   ├── registries
│   │   ├── __init__.py
│   │   ├── azure.py
│   │   ├── base.py
│   │   ├── custom.py
│   │   ├── dial.py
│   │   ├── gemini.py
│   │   ├── openai.py
│   │   ├── openrouter.py
│   │   └── xai.py
│   ├── registry_provider_mixin.py
│   ├── registry.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── model_capabilities.py
│   │   ├── model_response.py
│   │   ├── provider_type.py
│   │   └── temperature.py
│   └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│   └── sync_version.py
├── server.py
├── simulator_tests
│   ├── __init__.py
│   ├── base_test.py
│   ├── conversation_base_test.py
│   ├── log_utils.py
│   ├── test_analyze_validation.py
│   ├── test_basic_conversation.py
│   ├── test_chat_simple_validation.py
│   ├── test_codereview_validation.py
│   ├── test_consensus_conversation.py
│   ├── test_consensus_three_models.py
│   ├── test_consensus_workflow_accurate.py
│   ├── test_content_validation.py
│   ├── test_conversation_chain_validation.py
│   ├── test_cross_tool_comprehensive.py
│   ├── test_cross_tool_continuation.py
│   ├── test_debug_certain_confidence.py
│   ├── test_debug_validation.py
│   ├── test_line_number_validation.py
│   ├── test_logs_validation.py
│   ├── test_model_thinking_config.py
│   ├── test_o3_model_selection.py
│   ├── test_o3_pro_expensive.py
│   ├── test_ollama_custom_url.py
│   ├── test_openrouter_fallback.py
│   ├── test_openrouter_models.py
│   ├── test_per_tool_deduplication.py
│   ├── test_planner_continuation_history.py
│   ├── test_planner_validation_old.py
│   ├── test_planner_validation.py
│   ├── test_precommitworkflow_validation.py
│   ├── test_prompt_size_limit_bug.py
│   ├── test_refactor_validation.py
│   ├── test_secaudit_validation.py
│   ├── test_testgen_validation.py
│   ├── test_thinkdeep_validation.py
│   ├── test_token_allocation_validation.py
│   ├── test_vision_capability.py
│   └── test_xai_models.py
├── systemprompts
│   ├── __init__.py
│   ├── analyze_prompt.py
│   ├── chat_prompt.py
│   ├── clink
│   │   ├── codex_codereviewer.txt
│   │   ├── default_codereviewer.txt
│   │   ├── default_planner.txt
│   │   └── default.txt
│   ├── codereview_prompt.py
│   ├── consensus_prompt.py
│   ├── debug_prompt.py
│   ├── docgen_prompt.py
│   ├── generate_code_prompt.py
│   ├── planner_prompt.py
│   ├── precommit_prompt.py
│   ├── refactor_prompt.py
│   ├── secaudit_prompt.py
│   ├── testgen_prompt.py
│   ├── thinkdeep_prompt.py
│   └── tracer_prompt.py
├── tests
│   ├── __init__.py
│   ├── CASSETTE_MAINTENANCE.md
│   ├── conftest.py
│   ├── gemini_cassettes
│   │   ├── chat_codegen
│   │   │   └── gemini25_pro_calculator
│   │   │       └── mldev.json
│   │   ├── chat_cross
│   │   │   └── step1_gemini25_flash_number
│   │   │       └── mldev.json
│   │   └── consensus
│   │       └── step2_gemini25_flash_against
│   │           └── mldev.json
│   ├── http_transport_recorder.py
│   ├── mock_helpers.py
│   ├── openai_cassettes
│   │   ├── chat_cross_step2_gpt5_reminder.json
│   │   ├── chat_gpt5_continuation.json
│   │   ├── chat_gpt5_moon_distance.json
│   │   ├── consensus_step1_gpt5_for.json
│   │   └── o3_pro_basic_math.json
│   ├── pii_sanitizer.py
│   ├── sanitize_cassettes.py
│   ├── test_alias_target_restrictions.py
│   ├── test_auto_mode_comprehensive.py
│   ├── test_auto_mode_custom_provider_only.py
│   ├── test_auto_mode_model_listing.py
│   ├── test_auto_mode_provider_selection.py
│   ├── test_auto_mode.py
│   ├── test_auto_model_planner_fix.py
│   ├── test_azure_openai_provider.py
│   ├── test_buggy_behavior_prevention.py
│   ├── test_cassette_semantic_matching.py
│   ├── test_challenge.py
│   ├── test_chat_codegen_integration.py
│   ├── test_chat_cross_model_continuation.py
│   ├── test_chat_openai_integration.py
│   ├── test_chat_simple.py
│   ├── test_clink_claude_agent.py
│   ├── test_clink_claude_parser.py
│   ├── test_clink_codex_agent.py
│   ├── test_clink_gemini_agent.py
│   ├── test_clink_gemini_parser.py
│   ├── test_clink_integration.py
│   ├── test_clink_parsers.py
│   ├── test_clink_tool.py
│   ├── test_collaboration.py
│   ├── test_config.py
│   ├── test_consensus_integration.py
│   ├── test_consensus_schema.py
│   ├── test_consensus.py
│   ├── test_conversation_continuation_integration.py
│   ├── test_conversation_field_mapping.py
│   ├── test_conversation_file_features.py
│   ├── test_conversation_memory.py
│   ├── test_conversation_missing_files.py
│   ├── test_custom_openai_temperature_fix.py
│   ├── test_custom_provider.py
│   ├── test_debug.py
│   ├── test_deploy_scripts.py
│   ├── test_dial_provider.py
│   ├── test_directory_expansion_tracking.py
│   ├── test_disabled_tools.py
│   ├── test_docker_claude_desktop_integration.py
│   ├── test_docker_config_complete.py
│   ├── test_docker_healthcheck.py
│   ├── test_docker_implementation.py
│   ├── test_docker_mcp_validation.py
│   ├── test_docker_security.py
│   ├── test_docker_volume_persistence.py
│   ├── test_file_protection.py
│   ├── test_gemini_token_usage.py
│   ├── test_image_support_integration.py
│   ├── test_image_validation.py
│   ├── test_integration_utf8.py
│   ├── test_intelligent_fallback.py
│   ├── test_issue_245_simple.py
│   ├── test_large_prompt_handling.py
│   ├── test_line_numbers_integration.py
│   ├── test_listmodels_restrictions.py
│   ├── test_listmodels.py
│   ├── test_mcp_error_handling.py
│   ├── test_model_enumeration.py
│   ├── test_model_metadata_continuation.py
│   ├── test_model_resolution_bug.py
│   ├── test_model_restrictions.py
│   ├── test_o3_pro_output_text_fix.py
│   ├── test_o3_temperature_fix_simple.py
│   ├── test_openai_compatible_token_usage.py
│   ├── test_openai_provider.py
│   ├── test_openrouter_provider.py
│   ├── test_openrouter_registry.py
│   ├── test_parse_model_option.py
│   ├── test_per_tool_model_defaults.py
│   ├── test_pii_sanitizer.py
│   ├── test_pip_detection_fix.py
│   ├── test_planner.py
│   ├── test_precommit_workflow.py
│   ├── test_prompt_regression.py
│   ├── test_prompt_size_limit_bug_fix.py
│   ├── test_provider_retry_logic.py
│   ├── test_provider_routing_bugs.py
│   ├── test_provider_utf8.py
│   ├── test_providers.py
│   ├── test_rate_limit_patterns.py
│   ├── test_refactor.py
│   ├── test_secaudit.py
│   ├── test_server.py
│   ├── test_supported_models_aliases.py
│   ├── test_thinking_modes.py
│   ├── test_tools.py
│   ├── test_tracer.py
│   ├── test_utf8_localization.py
│   ├── test_utils.py
│   ├── test_uvx_resource_packaging.py
│   ├── test_uvx_support.py
│   ├── test_workflow_file_embedding.py
│   ├── test_workflow_metadata.py
│   ├── test_workflow_prompt_size_validation_simple.py
│   ├── test_workflow_utf8.py
│   ├── test_xai_provider.py
│   ├── transport_helpers.py
│   └── triangle.png
├── tools
│   ├── __init__.py
│   ├── analyze.py
│   ├── apilookup.py
│   ├── challenge.py
│   ├── chat.py
│   ├── clink.py
│   ├── codereview.py
│   ├── consensus.py
│   ├── debug.py
│   ├── docgen.py
│   ├── listmodels.py
│   ├── models.py
│   ├── planner.py
│   ├── precommit.py
│   ├── refactor.py
│   ├── secaudit.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── base_models.py
│   │   ├── base_tool.py
│   │   ├── exceptions.py
│   │   └── schema_builders.py
│   ├── simple
│   │   ├── __init__.py
│   │   └── base.py
│   ├── testgen.py
│   ├── thinkdeep.py
│   ├── tracer.py
│   ├── version.py
│   └── workflow
│       ├── __init__.py
│       ├── base.py
│       ├── schema_builders.py
│       └── workflow_mixin.py
├── utils
│   ├── __init__.py
│   ├── client_info.py
│   ├── conversation_memory.py
│   ├── env.py
│   ├── file_types.py
│   ├── file_utils.py
│   ├── image_utils.py
│   ├── model_context.py
│   ├── model_restrictions.py
│   ├── security_config.py
│   ├── storage_backend.py
│   └── token_utils.py
└── zen-mcp-server
```

# Files

--------------------------------------------------------------------------------
/tools/testgen.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | TestGen Workflow tool - Step-by-step test generation with expert validation
  3 | 
  4 | This tool provides a structured workflow for comprehensive test generation.
  5 | It guides the CLI agent through systematic investigation steps with forced pauses between each step
  6 | to ensure thorough code examination, test planning, and pattern identification before proceeding.
  7 | The tool supports finding updates and expert analysis integration for comprehensive test suite generation.
  8 | 
  9 | Key features:
 10 | - Step-by-step test generation workflow with progress tracking
 11 | - Context-aware file embedding (references during investigation, full content for analysis)
 12 | - Automatic test pattern detection and framework identification
 13 | - Expert analysis integration with external models for additional test suggestions
 14 | - Support for edge case identification and comprehensive coverage
 15 | - Confidence-based workflow optimization
 16 | """
 17 | 
 18 | import logging
 19 | from typing import TYPE_CHECKING, Any, Optional
 20 | 
 21 | from pydantic import Field, model_validator
 22 | 
 23 | if TYPE_CHECKING:
 24 |     from tools.models import ToolModelCategory
 25 | 
 26 | from config import TEMPERATURE_ANALYTICAL
 27 | from systemprompts import TESTGEN_PROMPT
 28 | from tools.shared.base_models import WorkflowRequest
 29 | 
 30 | from .workflow.base import WorkflowTool
 31 | 
 32 | logger = logging.getLogger(__name__)
 33 | 
 34 | # Tool-specific field descriptions for test generation workflow
 35 | TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS = {
 36 |     "step": (
 37 |         "Test plan for this step. Step 1: outline how you'll analyse structure, business logic, critical paths, and edge cases. Later steps: record findings and new scenarios as they emerge."
 38 |     ),
 39 |     "step_number": "Current test-generation step (starts at 1) — each step should build on prior work.",
 40 |     "total_steps": "Estimated number of steps needed for test planning; adjust as new scenarios appear.",
 41 |     "next_step_required": "True while more investigation or planning remains; set False when test planning is ready for expert validation.",
 42 |     "findings": "Summarise functionality, critical paths, edge cases, boundary conditions, error handling, and existing test patterns. Cover both happy and failure paths.",
 43 |     "files_checked": "Absolute paths of every file examined, including those ruled out.",
 44 |     "relevant_files": "Absolute paths of code that requires new or updated tests (implementation, dependencies, existing test fixtures).",
 45 |     "relevant_context": "Functions/methods needing coverage (e.g. 'Class.method', 'function_name'), with emphasis on critical paths and error-prone code.",
 46 |     "confidence": (
 47 |         "Indicate your current confidence in the test generation assessment. Use: 'exploring' (starting analysis), "
 48 |         "'low' (early investigation), 'medium' (some patterns identified), 'high' (strong understanding), "
 49 |         "'very_high' (very strong understanding), 'almost_certain' (nearly complete test plan), 'certain' "
 50 |         "(100% confidence - test plan is thoroughly complete and all test scenarios are identified with no need for external model validation). "
 51 |         "Do NOT use 'certain' unless the test generation analysis is comprehensively complete, use 'very_high' or 'almost_certain' instead if not 100% sure. "
 52 |         "Using 'certain' means you have complete confidence locally and prevents external model validation."
 53 |     ),
 54 |     "images": "Optional absolute paths to diagrams or visuals that clarify the system under test.",
 55 | }
 56 | 
 57 | 
 58 | class TestGenRequest(WorkflowRequest):
 59 |     """Request model for test generation workflow investigation steps"""
 60 | 
 61 |     # Required fields for each investigation step
 62 |     step: str = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["step"])
 63 |     step_number: int = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
 64 |     total_steps: int = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
 65 |     next_step_required: bool = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
 66 | 
 67 |     # Investigation tracking fields
 68 |     findings: str = Field(..., description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
 69 |     files_checked: list[str] = Field(
 70 |         default_factory=list, description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
 71 |     )
 72 |     relevant_files: list[str] = Field(
 73 |         default_factory=list, description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
 74 |     )
 75 |     relevant_context: list[str] = Field(
 76 |         default_factory=list, description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
 77 |     )
 78 |     confidence: Optional[str] = Field("low", description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["confidence"])
 79 | 
 80 |     # Optional images for visual context
 81 |     images: Optional[list[str]] = Field(default=None, description=TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["images"])
 82 | 
 83 |     # Override inherited fields to exclude them from schema (except model which needs to be available)
 84 |     temperature: Optional[float] = Field(default=None, exclude=True)
 85 |     thinking_mode: Optional[str] = Field(default=None, exclude=True)
 86 | 
 87 |     @model_validator(mode="after")
 88 |     def validate_step_one_requirements(self):
 89 |         """Ensure step 1 has required relevant_files field."""
 90 |         if self.step_number == 1 and not self.relevant_files:
 91 |             raise ValueError("Step 1 requires 'relevant_files' field to specify code files to generate tests for")
 92 |         return self
 93 | 
 94 | 
 95 | class TestGenTool(WorkflowTool):
 96 |     """
 97 |     Test Generation workflow tool for step-by-step test planning and expert validation.
 98 | 
 99 |     This tool implements a structured test generation workflow that guides users through
100 |     methodical investigation steps, ensuring thorough code examination, pattern identification,
101 |     and test scenario planning before reaching conclusions. It supports complex testing scenarios
102 |     including edge case identification, framework detection, and comprehensive coverage planning.
103 |     """
104 | 
105 |     __test__ = False  # Prevent pytest from collecting this class as a test
106 | 
107 |     def __init__(self):
108 |         super().__init__()
109 |         self.initial_request = None
110 | 
111 |     def get_name(self) -> str:
112 |         return "testgen"
113 | 
114 |     def get_description(self) -> str:
115 |         return (
116 |             "Creates comprehensive test suites with edge case coverage for specific functions, classes, or modules. "
117 |             "Analyzes code paths, identifies failure modes, and generates framework-specific tests. "
118 |             "Be specific about scope - target particular components rather than testing everything."
119 |         )
120 | 
121 |     def get_system_prompt(self) -> str:
122 |         return TESTGEN_PROMPT
123 | 
124 |     def get_default_temperature(self) -> float:
125 |         return TEMPERATURE_ANALYTICAL
126 | 
127 |     def get_model_category(self) -> "ToolModelCategory":
128 |         """Test generation requires thorough analysis and reasoning"""
129 |         from tools.models import ToolModelCategory
130 | 
131 |         return ToolModelCategory.EXTENDED_REASONING
132 | 
133 |     def get_workflow_request_model(self):
134 |         """Return the test generation workflow-specific request model."""
135 |         return TestGenRequest
136 | 
137 |     def get_input_schema(self) -> dict[str, Any]:
138 |         """Generate input schema using WorkflowSchemaBuilder with test generation-specific overrides."""
139 |         from .workflow.schema_builders import WorkflowSchemaBuilder
140 | 
141 |         # Test generation workflow-specific field overrides
142 |         testgen_field_overrides = {
143 |             "step": {
144 |                 "type": "string",
145 |                 "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["step"],
146 |             },
147 |             "step_number": {
148 |                 "type": "integer",
149 |                 "minimum": 1,
150 |                 "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
151 |             },
152 |             "total_steps": {
153 |                 "type": "integer",
154 |                 "minimum": 1,
155 |                 "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
156 |             },
157 |             "next_step_required": {
158 |                 "type": "boolean",
159 |                 "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
160 |             },
161 |             "findings": {
162 |                 "type": "string",
163 |                 "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
164 |             },
165 |             "files_checked": {
166 |                 "type": "array",
167 |                 "items": {"type": "string"},
168 |                 "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"],
169 |             },
170 |             "relevant_files": {
171 |                 "type": "array",
172 |                 "items": {"type": "string"},
173 |                 "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
174 |             },
175 |             "confidence": {
176 |                 "type": "string",
177 |                 "enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
178 |                 "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["confidence"],
179 |             },
180 |             "images": {
181 |                 "type": "array",
182 |                 "items": {"type": "string"},
183 |                 "description": TESTGEN_WORKFLOW_FIELD_DESCRIPTIONS["images"],
184 |             },
185 |         }
186 | 
187 |         # Use WorkflowSchemaBuilder with test generation-specific tool fields
188 |         return WorkflowSchemaBuilder.build_schema(
189 |             tool_specific_fields=testgen_field_overrides,
190 |             model_field_schema=self.get_model_field_schema(),
191 |             auto_mode=self.is_effective_auto_mode(),
192 |             tool_name=self.get_name(),
193 |         )
194 | 
195 |     def get_required_actions(
196 |         self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
197 |     ) -> list[str]:
198 |         """Define required actions for each investigation phase."""
199 |         if step_number == 1:
200 |             # Initial test generation investigation tasks
201 |             return [
202 |                 "Read and understand the code files specified for test generation",
203 |                 "Analyze the overall structure, public APIs, and main functionality",
204 |                 "Identify critical business logic and complex algorithms that need testing",
205 |                 "Look for existing test patterns or examples if provided",
206 |                 "Understand dependencies, external interactions, and integration points",
207 |                 "Note any potential testability issues or areas that might be hard to test",
208 |             ]
209 |         elif confidence in ["exploring", "low"]:
210 |             # Need deeper investigation
211 |             return [
212 |                 "Examine specific functions and methods to understand their behavior",
213 |                 "Trace through code paths to identify all possible execution flows",
214 |                 "Identify edge cases, boundary conditions, and error scenarios",
215 |                 "Check for async operations, state management, and side effects",
216 |                 "Look for non-deterministic behavior or external dependencies",
217 |                 "Analyze error handling and exception cases that need testing",
218 |             ]
219 |         elif confidence in ["medium", "high"]:
220 |             # Close to completion - need final verification
221 |             return [
222 |                 "Verify all critical paths have been identified for testing",
223 |                 "Confirm edge cases and boundary conditions are comprehensive",
224 |                 "Check that test scenarios cover both success and failure cases",
225 |                 "Ensure async behavior and concurrency issues are addressed",
226 |                 "Validate that the testing strategy aligns with code complexity",
227 |                 "Double-check that findings include actionable test scenarios",
228 |             ]
229 |         else:
230 |             # General investigation needed
231 |             return [
232 |                 "Continue examining the codebase for additional test scenarios",
233 |                 "Gather more evidence about code behavior and dependencies",
234 |                 "Test your assumptions about how the code should be tested",
235 |                 "Look for patterns that confirm your testing strategy",
236 |                 "Focus on areas that haven't been thoroughly examined yet",
237 |             ]
238 | 
239 |     def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
240 |         """
241 |         Decide when to call external model based on investigation completeness.
242 | 
243 |         Always call expert analysis for test generation to get additional test ideas.
244 |         """
245 |         # Check if user requested to skip assistant model
246 |         if request and not self.get_request_use_assistant_model(request):
247 |             return False
248 | 
249 |         # Always benefit from expert analysis for comprehensive test coverage
250 |         return len(consolidated_findings.relevant_files) > 0 or len(consolidated_findings.findings) >= 1
251 | 
252 |     def prepare_expert_analysis_context(self, consolidated_findings) -> str:
253 |         """Prepare context for external model call for test generation validation."""
254 |         context_parts = [
255 |             f"=== TEST GENERATION REQUEST ===\n{self.initial_request or 'Test generation workflow initiated'}\n=== END REQUEST ==="
256 |         ]
257 | 
258 |         # Add investigation summary
259 |         investigation_summary = self._build_test_generation_summary(consolidated_findings)
260 |         context_parts.append(
261 |             f"\n=== AGENT'S TEST PLANNING INVESTIGATION ===\n{investigation_summary}\n=== END INVESTIGATION ==="
262 |         )
263 | 
264 |         # Add relevant code elements if available
265 |         if consolidated_findings.relevant_context:
266 |             methods_text = "\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
267 |             context_parts.append(f"\n=== CODE ELEMENTS TO TEST ===\n{methods_text}\n=== END CODE ELEMENTS ===")
268 | 
269 |         # Add images if available
270 |         if consolidated_findings.images:
271 |             images_text = "\n".join(f"- {img}" for img in consolidated_findings.images)
272 |             context_parts.append(f"\n=== VISUAL DOCUMENTATION ===\n{images_text}\n=== END VISUAL DOCUMENTATION ===")
273 | 
274 |         return "\n".join(context_parts)
275 | 
276 |     def _build_test_generation_summary(self, consolidated_findings) -> str:
277 |         """Prepare a comprehensive summary of the test generation investigation."""
278 |         summary_parts = [
279 |             "=== SYSTEMATIC TEST GENERATION INVESTIGATION SUMMARY ===",
280 |             f"Total steps: {len(consolidated_findings.findings)}",
281 |             f"Files examined: {len(consolidated_findings.files_checked)}",
282 |             f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
283 |             f"Code elements to test: {len(consolidated_findings.relevant_context)}",
284 |             "",
285 |             "=== INVESTIGATION PROGRESSION ===",
286 |         ]
287 | 
288 |         for finding in consolidated_findings.findings:
289 |             summary_parts.append(finding)
290 | 
291 |         return "\\n".join(summary_parts)
292 | 
293 |     def should_include_files_in_expert_prompt(self) -> bool:
294 |         """Include files in expert analysis for comprehensive test generation."""
295 |         return True
296 | 
297 |     def should_embed_system_prompt(self) -> bool:
298 |         """Embed system prompt in expert analysis for proper context."""
299 |         return True
300 | 
301 |     def get_expert_thinking_mode(self) -> str:
302 |         """Use high thinking mode for thorough test generation analysis."""
303 |         return "high"
304 | 
305 |     def get_expert_analysis_instruction(self) -> str:
306 |         """Get specific instruction for test generation expert analysis."""
307 |         return (
308 |             "Please provide comprehensive test generation guidance based on the investigation findings. "
309 |             "Focus on identifying additional test scenarios, edge cases not yet covered, framework-specific "
310 |             "best practices, and providing concrete test implementation examples following the multi-agent "
311 |             "workflow specified in the system prompt."
312 |         )
313 | 
314 |     # Hook method overrides for test generation-specific behavior
315 | 
316 |     def prepare_step_data(self, request) -> dict:
317 |         """
318 |         Map test generation-specific fields for internal processing.
319 |         """
320 |         step_data = {
321 |             "step": request.step,
322 |             "step_number": request.step_number,
323 |             "findings": request.findings,
324 |             "files_checked": request.files_checked,
325 |             "relevant_files": request.relevant_files,
326 |             "relevant_context": request.relevant_context,
327 |             "confidence": request.confidence,
328 |             "images": request.images or [],
329 |         }
330 |         return step_data
331 | 
332 |     def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
333 |         """
334 |         Test generation workflow skips expert analysis when the CLI agent has "certain" confidence.
335 |         """
336 |         return request.confidence == "certain" and not request.next_step_required
337 | 
338 |     def store_initial_issue(self, step_description: str):
339 |         """Store initial request for expert analysis."""
340 |         self.initial_request = step_description
341 | 
342 |     # Override inheritance hooks for test generation-specific behavior
343 | 
344 |     def get_completion_status(self) -> str:
345 |         """Test generation tools use test-specific status."""
346 |         return "test_generation_complete_ready_for_implementation"
347 | 
348 |     def get_completion_data_key(self) -> str:
349 |         """Test generation uses 'complete_test_generation' key."""
350 |         return "complete_test_generation"
351 | 
352 |     def get_final_analysis_from_request(self, request):
353 |         """Test generation tools use findings for final analysis."""
354 |         return request.findings
355 | 
356 |     def get_confidence_level(self, request) -> str:
357 |         """Test generation tools use 'certain' for high confidence."""
358 |         return "certain"
359 | 
360 |     def get_completion_message(self) -> str:
361 |         """Test generation-specific completion message."""
362 |         return (
363 |             "Test generation analysis complete with CERTAIN confidence. You have identified all test scenarios "
364 |             "and provided comprehensive coverage strategy. MANDATORY: Present the user with the complete test plan "
365 |             "and IMMEDIATELY proceed with creating the test files following the identified patterns and framework. "
366 |             "Focus on implementing concrete, runnable tests with proper assertions."
367 |         )
368 | 
369 |     def get_skip_reason(self) -> str:
370 |         """Test generation-specific skip reason."""
371 |         return "Completed comprehensive test planning with full confidence locally"
372 | 
373 |     def get_skip_expert_analysis_status(self) -> str:
374 |         """Test generation-specific expert analysis skip status."""
375 |         return "skipped_due_to_certain_test_confidence"
376 | 
377 |     def prepare_work_summary(self) -> str:
378 |         """Test generation-specific work summary."""
379 |         return self._build_test_generation_summary(self.consolidated_findings)
380 | 
381 |     def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
382 |         """
383 |         Test generation-specific completion message.
384 |         """
385 |         base_message = (
386 |             "TEST GENERATION ANALYSIS IS COMPLETE. You MUST now implement ALL identified test scenarios, "
387 |             "creating comprehensive test files that cover happy paths, edge cases, error conditions, and "
388 |             "boundary scenarios. Organize tests by functionality, use appropriate assertions, and follow "
389 |             "the identified framework patterns. Provide concrete, executable test code—make it easy for "
390 |             "a developer to run the tests and understand what each test validates."
391 |         )
392 | 
393 |         # Add expert analysis guidance only when expert analysis was actually used
394 |         if expert_analysis_used:
395 |             expert_guidance = self.get_expert_analysis_guidance()
396 |             if expert_guidance:
397 |                 return f"{base_message}\\n\\n{expert_guidance}"
398 | 
399 |         return base_message
400 | 
401 |     def get_expert_analysis_guidance(self) -> str:
402 |         """
403 |         Provide specific guidance for handling expert analysis in test generation.
404 |         """
405 |         return (
406 |             "IMPORTANT: Additional test scenarios and edge cases have been provided by the expert analysis above. "
407 |             "You MUST incorporate these suggestions into your test implementation, ensuring comprehensive coverage. "
408 |             "Validate that the expert's test ideas are practical and align with the codebase structure. Combine "
409 |             "your systematic investigation findings with the expert's additional scenarios to create a thorough "
410 |             "test suite that catches real-world bugs before they reach production."
411 |         )
412 | 
413 |     def get_step_guidance_message(self, request) -> str:
414 |         """
415 |         Test generation-specific step guidance with detailed investigation instructions.
416 |         """
417 |         step_guidance = self.get_test_generation_step_guidance(request.step_number, request.confidence, request)
418 |         return step_guidance["next_steps"]
419 | 
420 |     def get_test_generation_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
421 |         """
422 |         Provide step-specific guidance for test generation workflow.
423 |         """
424 |         # Generate the next steps instruction based on required actions
425 |         required_actions = self.get_required_actions(step_number, confidence, request.findings, request.total_steps)
426 | 
427 |         if step_number == 1:
428 |             next_steps = (
429 |                 f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first analyze "
430 |                 f"the code thoroughly using appropriate tools. CRITICAL AWARENESS: You need to understand "
431 |                 f"the code structure, identify testable behaviors, find edge cases and boundary conditions, "
432 |                 f"and determine the appropriate testing strategy. Use file reading tools, code analysis, and "
433 |                 f"systematic examination to gather comprehensive information about what needs to be tested. "
434 |                 f"Only call {self.get_name()} again AFTER completing your investigation. When you call "
435 |                 f"{self.get_name()} next time, use step_number: {step_number + 1} and report specific "
436 |                 f"code paths examined, test scenarios identified, and testing patterns discovered."
437 |             )
438 |         elif confidence in ["exploring", "low"]:
439 |             next_steps = (
440 |                 f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
441 |                 f"deeper analysis for test generation. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
442 |                 + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
443 |                 + f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
444 |                 + "completing these test planning tasks."
445 |             )
446 |         elif confidence in ["medium", "high"]:
447 |             next_steps = (
448 |                 f"WAIT! Your test generation analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
449 |                 + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
450 |                 + f"\\n\\nREMEMBER: Ensure you have identified all test scenarios including edge cases and error conditions. "
451 |                 f"Document findings with specific test cases to implement, then call {self.get_name()} "
452 |                 f"with step_number: {step_number + 1}."
453 |             )
454 |         else:
455 |             next_steps = (
456 |                 f"PAUSE ANALYSIS. Before calling {self.get_name()} step {step_number + 1}, you MUST examine more code thoroughly. "
457 |                 + "Required: "
458 |                 + ", ".join(required_actions[:2])
459 |                 + ". "
460 |                 + f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
461 |                 f"NEW test scenarios from actual code analysis, not just theories. NO recursive {self.get_name()} calls "
462 |                 f"without investigation work!"
463 |             )
464 | 
465 |         return {"next_steps": next_steps}
466 | 
467 |     def customize_workflow_response(self, response_data: dict, request) -> dict:
468 |         """
469 |         Customize response to match test generation workflow format.
470 |         """
471 |         # Store initial request on first step
472 |         if request.step_number == 1:
473 |             self.initial_request = request.step
474 | 
475 |         # Convert generic status names to test generation-specific ones
476 |         tool_name = self.get_name()
477 |         status_mapping = {
478 |             f"{tool_name}_in_progress": "test_generation_in_progress",
479 |             f"pause_for_{tool_name}": "pause_for_test_analysis",
480 |             f"{tool_name}_required": "test_analysis_required",
481 |             f"{tool_name}_complete": "test_generation_complete",
482 |         }
483 | 
484 |         if response_data["status"] in status_mapping:
485 |             response_data["status"] = status_mapping[response_data["status"]]
486 | 
487 |         # Rename status field to match test generation workflow
488 |         if f"{tool_name}_status" in response_data:
489 |             response_data["test_generation_status"] = response_data.pop(f"{tool_name}_status")
490 |             # Add test generation-specific status fields
491 |             response_data["test_generation_status"]["test_scenarios_identified"] = len(
492 |                 self.consolidated_findings.relevant_context
493 |             )
494 |             response_data["test_generation_status"]["analysis_confidence"] = self.get_request_confidence(request)
495 | 
496 |         # Map complete_testgen to complete_test_generation
497 |         if f"complete_{tool_name}" in response_data:
498 |             response_data["complete_test_generation"] = response_data.pop(f"complete_{tool_name}")
499 | 
500 |         # Map the completion flag to match test generation workflow
501 |         if f"{tool_name}_complete" in response_data:
502 |             response_data["test_generation_complete"] = response_data.pop(f"{tool_name}_complete")
503 | 
504 |         return response_data
505 | 
506 |     # Required abstract methods from BaseTool
507 |     def get_request_model(self):
508 |         """Return the test generation workflow-specific request model."""
509 |         return TestGenRequest
510 | 
511 |     async def prepare_prompt(self, request) -> str:
512 |         """Not used - workflow tools use execute_workflow()."""
513 |         return ""  # Workflow tools use execute_workflow() directly
514 | 
```

--------------------------------------------------------------------------------
/simulator_tests/test_secaudit_validation.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | SECAUDIT Tool Validation Test
  4 | 
  5 | Tests the secaudit tool's capabilities using the workflow architecture.
  6 | This validates that the workflow-based security audit provides step-by-step
  7 | analysis with proper investigation guidance and expert analysis integration.
  8 | """
  9 | 
 10 | import json
 11 | 
 12 | from .conversation_base_test import ConversationBaseTest
 13 | 
 14 | 
 15 | class SecauditValidationTest(ConversationBaseTest):
 16 |     """Test secaudit tool with workflow architecture"""
 17 | 
 18 |     @property
 19 |     def test_name(self) -> str:
 20 |         return "secaudit_validation"
 21 | 
 22 |     @property
 23 |     def test_description(self) -> str:
 24 |         return "SECAUDIT tool validation with security audit workflow architecture"
 25 | 
 26 |     def run_test(self) -> bool:
 27 |         """Test secaudit tool capabilities"""
 28 |         # Set up the test environment
 29 |         self.setUp()
 30 | 
 31 |         try:
 32 |             self.logger.info("Test: SECAUDIT tool validation (security workflow architecture)")
 33 | 
 34 |             # Create test code with various security vulnerabilities
 35 |             self._create_test_code_for_audit()
 36 | 
 37 |             # Test 1: Single audit session with multiple steps
 38 |             if not self._test_single_audit_session():
 39 |                 return False
 40 | 
 41 |             # Test 2: Audit with specific focus areas
 42 |             if not self._test_focused_security_audit():
 43 |                 return False
 44 | 
 45 |             # Test 3: Complete audit with expert analysis using fast model
 46 |             if not self._test_complete_audit_with_analysis():
 47 |                 return False
 48 | 
 49 |             # Test 4: Certain confidence behavior
 50 |             if not self._test_certain_confidence():
 51 |                 return False
 52 | 
 53 |             # Test 5: Continuation test with chat tool
 54 |             if not self._test_continuation_with_chat():
 55 |                 return False
 56 | 
 57 |             # Test 6: Model selection control
 58 |             if not self._test_model_selection():
 59 |                 return False
 60 | 
 61 |             self.logger.info("  ✅ All secaudit validation tests passed")
 62 |             return True
 63 | 
 64 |         except Exception as e:
 65 |             self.logger.error(f"SECAUDIT validation test failed: {e}")
 66 |             return False
 67 | 
 68 |     def _create_test_code_for_audit(self):
 69 |         """Create test files with various security vulnerabilities"""
 70 |         # Create an authentication module with multiple security issues
 71 |         auth_code = """#!/usr/bin/env python3
 72 | import hashlib
 73 | import pickle
 74 | import sqlite3
 75 | from flask import request, session
 76 | 
 77 | class AuthenticationManager:
 78 |     def __init__(self, db_path="users.db"):
 79 |         # A01: Broken Access Control - No proper session management
 80 |         self.db_path = db_path
 81 |         self.sessions = {}  # In-memory session storage
 82 |     def login(self, username, password):
 83 |         '''User login with various security vulnerabilities'''
 84 |         # A03: Injection - SQL injection vulnerability
 85 |         conn = sqlite3.connect(self.db_path)
 86 |         cursor = conn.cursor()
 87 | 
 88 |         # Direct string interpolation in SQL query
 89 |         query = f"SELECT id, password_hash FROM users WHERE username = '{username}'"
 90 |         cursor.execute(query)
 91 | 
 92 |         user = cursor.fetchone()
 93 |         if not user:
 94 |             return {"status": "failed", "message": "User not found"}
 95 | 
 96 |         # A02: Cryptographic Failures - Weak hashing algorithm
 97 |         password_hash = hashlib.md5(password.encode()).hexdigest()
 98 | 
 99 |         if user[1] == password_hash:
100 |             # A07: Identification and Authentication Failures - Weak session generation
101 |             session_id = hashlib.md5(f"{username}{password}".encode()).hexdigest()
102 |             self.sessions[session_id] = {"user_id": user[0], "username": username}
103 | 
104 |             return {"status": "success", "session_id": session_id}
105 |         else:
106 |             return {"status": "failed", "message": "Invalid password"}
107 | 
108 |     def reset_password(self, email):
109 |         '''Password reset with security issues'''
110 |         # A04: Insecure Design - No rate limiting or validation
111 |         reset_token = hashlib.md5(email.encode()).hexdigest()
112 | 
113 |         # A09: Security Logging and Monitoring Failures - No security event logging
114 |         # Simply returns token without any verification or logging
115 |         return {"reset_token": reset_token, "url": f"/reset?token={reset_token}"}
116 | 
117 |     def deserialize_user_data(self, data):
118 |         '''Unsafe deserialization'''
119 |         # A08: Software and Data Integrity Failures - Insecure deserialization
120 |         return pickle.loads(data)
121 | 
122 |     def get_user_profile(self, user_id):
123 |         '''Get user profile with authorization issues'''
124 |         # A01: Broken Access Control - No authorization check
125 |         conn = sqlite3.connect(self.db_path)
126 |         cursor = conn.cursor()
127 | 
128 |         # Fetches any user profile without checking permissions
129 |         cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
130 |         return cursor.fetchone()
131 | """
132 | 
133 |         # Create authentication file
134 |         self.auth_file = self.create_additional_test_file("auth_manager.py", auth_code)
135 |         self.logger.info(f"  ✅ Created authentication file with security issues: {self.auth_file}")
136 | 
137 |         # Create API endpoint with additional vulnerabilities
138 |         api_code = """#!/usr/bin/env python3
139 | from flask import Flask, request, jsonify
140 | import os
141 | import subprocess
142 | import requests
143 | 
144 | app = Flask(__name__)
145 | 
146 | # A05: Security Misconfiguration - Debug mode enabled
147 | app.config['DEBUG'] = True
148 | app.config['SECRET_KEY'] = 'dev-secret-key'  # Hardcoded secret
149 | 
150 | @app.route('/api/search', methods=['GET'])
151 | def search():
152 |     '''Search endpoint with multiple vulnerabilities'''
153 |     # A03: Injection - XSS vulnerability, no input sanitization
154 |     query = request.args.get('q', '')
155 | 
156 |     # A03: Injection - Command injection vulnerability
157 |     if 'file:' in query:
158 |         filename = query.split('file:')[1]
159 |         # Direct command execution
160 |         result = subprocess.run(f"cat {filename}", shell=True, capture_output=True, text=True)
161 |         return jsonify({"result": result.stdout})
162 | 
163 |     # A10: Server-Side Request Forgery (SSRF)
164 |     if query.startswith('http'):
165 |         # No validation of URL, allows internal network access
166 |         response = requests.get(query)
167 |         return jsonify({"content": response.text})
168 | 
169 |     # Return search results without output encoding
170 |     return f"<h1>Search Results for: {query}</h1>"
171 | 
172 | @app.route('/api/admin', methods=['GET'])
173 | def admin_panel():
174 |     '''Admin panel with broken access control'''
175 |     # A01: Broken Access Control - No authentication check
176 |     # Anyone can access admin functionality
177 |     action = request.args.get('action')
178 | 
179 |     if action == 'delete_user':
180 |         user_id = request.args.get('user_id')
181 |         # Performs privileged action without authorization
182 |         return jsonify({"status": "User deleted", "user_id": user_id})
183 | 
184 |     return jsonify({"status": "Admin panel"})
185 | 
186 | @app.route('/api/upload', methods=['POST'])
187 | def upload_file():
188 |     '''File upload with security issues'''
189 |     # A05: Security Misconfiguration - No file type validation
190 |     file = request.files.get('file')
191 |     if file:
192 |         # Saves any file type to server
193 |         filename = file.filename
194 |         file.save(os.path.join('/tmp', filename))
195 | 
196 |         # A03: Path traversal vulnerability
197 |         return jsonify({"status": "File uploaded", "path": f"/tmp/{filename}"})
198 | 
199 |     return jsonify({"error": "No file provided"})
200 | 
201 | # A06: Vulnerable and Outdated Components
202 | # Using old Flask version with known vulnerabilities (hypothetical)
203 | # requirements.txt: Flask==0.12.2 (known security issues)
204 | 
205 | if __name__ == '__main__':
206 |     # A05: Security Misconfiguration - Running on all interfaces
207 |     app.run(host='0.0.0.0', port=5000, debug=True)
208 | """
209 | 
210 |         # Create API file
211 |         self.api_file = self.create_additional_test_file("api_endpoints.py", api_code)
212 |         self.logger.info(f"  ✅ Created API file with security vulnerabilities: {self.api_file}")
213 | 
214 |     def _test_single_audit_session(self) -> bool:
215 |         """Test a single security audit session with multiple steps"""
216 |         self.logger.info("  🔧 Testing single audit session...")
217 | 
218 |         try:
219 |             # Step 1: Initial security audit request
220 |             response, continuation_id = self.call_mcp_tool_direct(
221 |                 "secaudit",
222 |                 {
223 |                     "step": f"Begin security audit of authentication system in {self.auth_file}",
224 |                     "step_number": 1,
225 |                     "total_steps": 6,
226 |                     "next_step_required": True,
227 |                     "findings": "Starting security assessment",
228 |                     "relevant_files": [self.auth_file],
229 |                     "model": "gemini-2.0-flash-lite",
230 |                 },
231 |             )
232 | 
233 |             if not response:
234 |                 self.logger.error("Failed to call secaudit tool")
235 |                 return False
236 | 
237 |             # Parse and validate the response
238 |             try:
239 |                 response_data = json.loads(response) if response else {}
240 |             except json.JSONDecodeError:
241 |                 response_data = {}
242 | 
243 |             # Check if it's asking for investigation
244 |             status = response_data.get("status", "")
245 |             if status != "pause_for_secaudit":
246 |                 self.logger.error(f"Expected pause_for_secaudit status, got: {status}")
247 |                 return False
248 | 
249 |             # Step 2: Continue with findings
250 |             response2, _ = self.call_mcp_tool_direct(
251 |                 "secaudit",
252 |                 {
253 |                     "step": "Examined authentication module and found critical security vulnerabilities",
254 |                     "step_number": 2,
255 |                     "total_steps": 6,
256 |                     "next_step_required": True,
257 |                     "findings": (
258 |                         "Found multiple OWASP Top 10 vulnerabilities: "
259 |                         "1. SQL injection in login method (line 88) - direct string interpolation in query "
260 |                         "2. Weak MD5 hashing for passwords (line 96) - cryptographically broken "
261 |                         "3. Insecure session management (line 100) - predictable session IDs "
262 |                         "4. Unsafe deserialization (line 119) - pickle.loads without validation"
263 |                     ),
264 |                     "files_checked": [self.auth_file],
265 |                     "relevant_files": [self.auth_file],
266 |                     "relevant_context": ["AuthenticationManager.login", "AuthenticationManager.deserialize_user_data"],
267 |                     "issues_found": [
268 |                         {"severity": "critical", "description": "SQL injection vulnerability in login method"},
269 |                         {"severity": "high", "description": "Weak MD5 password hashing"},
270 |                         {"severity": "high", "description": "Insecure session management"},
271 |                         {"severity": "critical", "description": "Unsafe deserialization vulnerability"},
272 |                     ],
273 |                     "confidence": "medium",
274 |                     "continuation_id": continuation_id,
275 |                     "model": "gemini-2.0-flash-lite",
276 |                 },
277 |             )
278 | 
279 |             if not response2:
280 |                 self.logger.error("Failed to continue to step 2")
281 |                 return False
282 | 
283 |             self.logger.info("  ✅ Single audit session test passed")
284 |             return True
285 | 
286 |         except Exception as e:
287 |             self.logger.error(f"Single audit session test failed: {e}")
288 |             return False
289 | 
290 |     def _test_focused_security_audit(self) -> bool:
291 |         """Test security audit with specific focus areas"""
292 |         self.logger.info("  🔧 Testing focused security audit...")
293 | 
294 |         try:
295 |             # Request OWASP-focused audit
296 |             response, continuation_id = self.call_mcp_tool_direct(
297 |                 "secaudit",
298 |                 {
299 |                     "step": f"Begin OWASP-focused security audit of {self.api_file}",
300 |                     "step_number": 1,
301 |                     "total_steps": 4,
302 |                     "next_step_required": True,
303 |                     "findings": "Starting OWASP Top 10 focused security assessment",
304 |                     "relevant_files": [self.api_file],
305 |                     "security_scope": "Web API endpoints",
306 |                     "threat_level": "high",
307 |                     "audit_focus": "owasp",
308 |                     "model": "gemini-2.0-flash-lite",
309 |                 },
310 |             )
311 | 
312 |             if not response:
313 |                 self.logger.error("Failed to start OWASP-focused audit")
314 |                 return False
315 | 
316 |             # Verify the audit was configured correctly
317 |             try:
318 |                 response_data = json.loads(response)
319 |                 # The tool should acknowledge the OWASP focus
320 |                 if response_data.get("status") == "pause_for_secaudit":
321 |                     self.logger.info("  ✅ Focused security audit test passed")
322 |                     return True
323 |             except json.JSONDecodeError:
324 |                 pass
325 | 
326 |             self.logger.error("Expected proper OWASP-focused configuration")
327 |             return False
328 | 
329 |         except Exception as e:
330 |             self.logger.error(f"Focused security audit test failed: {e}")
331 |             return False
332 | 
333 |     def _test_complete_audit_with_analysis(self) -> bool:
334 |         """Test complete security audit with expert analysis"""
335 |         self.logger.info("  🔧 Testing complete audit with expert analysis...")
336 | 
337 |         try:
338 |             # Step 1: Start fresh audit
339 |             response1, continuation_id = self.call_mcp_tool_direct(
340 |                 "secaudit",
341 |                 {
342 |                     "step": f"Begin comprehensive security audit of {self.auth_file} and {self.api_file}",
343 |                     "step_number": 1,
344 |                     "total_steps": 3,
345 |                     "next_step_required": True,
346 |                     "findings": "Starting OWASP Top 10 security assessment of authentication and API modules",
347 |                     "relevant_files": [self.auth_file, self.api_file],
348 |                     "security_scope": "Web application with authentication and API endpoints",
349 |                     "model": "gemini-2.0-flash-lite",
350 |                 },
351 |             )
352 | 
353 |             if not response1:
354 |                 self.logger.error("Failed to start comprehensive audit")
355 |                 return False
356 | 
357 |             # Step 2: Continue with detailed findings
358 |             response2, _ = self.call_mcp_tool_direct(
359 |                 "secaudit",
360 |                 {
361 |                     "step": "Completed comprehensive security investigation of both modules",
362 |                     "step_number": 2,
363 |                     "total_steps": 3,
364 |                     "next_step_required": True,
365 |                     "findings": (
366 |                         "Found critical OWASP vulnerabilities across both modules: "
367 |                         "A01: Broken Access Control in admin panel, "
368 |                         "A03: SQL injection in login and command injection in search, "
369 |                         "A02: Weak cryptography with MD5 hashing, "
370 |                         "A05: Security misconfiguration with debug mode enabled, "
371 |                         "A07: Weak session management, "
372 |                         "A08: Insecure deserialization, "
373 |                         "A10: SSRF vulnerability in search endpoint"
374 |                     ),
375 |                     "files_checked": [self.auth_file, self.api_file],
376 |                     "relevant_files": [self.auth_file, self.api_file],
377 |                     "relevant_context": [
378 |                         "AuthenticationManager.login",
379 |                         "AuthenticationManager.deserialize_user_data",
380 |                         "api.search",
381 |                         "api.admin_panel",
382 |                     ],
383 |                     "issues_found": [
384 |                         {"severity": "critical", "description": "SQL injection in login method"},
385 |                         {"severity": "critical", "description": "Command injection in search endpoint"},
386 |                         {"severity": "critical", "description": "SSRF vulnerability allowing internal network access"},
387 |                         {"severity": "high", "description": "Broken access control on admin panel"},
388 |                         {"severity": "high", "description": "Insecure deserialization vulnerability"},
389 |                         {"severity": "high", "description": "XSS vulnerability in search results"},
390 |                         {"severity": "medium", "description": "Weak MD5 password hashing"},
391 |                         {"severity": "medium", "description": "Security misconfiguration - debug mode enabled"},
392 |                     ],
393 |                     "confidence": "high",
394 |                     "continuation_id": continuation_id,
395 |                     "model": "gemini-2.0-flash-lite",
396 |                 },
397 |             )
398 | 
399 |             # Final step - skip expert analysis to avoid timeout
400 |             response3, _ = self.call_mcp_tool_direct(
401 |                 "secaudit",
402 |                 {
403 |                     "step": "Complete security assessment with all vulnerabilities documented",
404 |                     "step_number": 3,
405 |                     "total_steps": 3,
406 |                     "next_step_required": False,
407 |                     "findings": "Security audit complete with 8 vulnerabilities identified across OWASP categories",
408 |                     "files_checked": [self.auth_file, self.api_file],
409 |                     "relevant_files": [self.auth_file, self.api_file],
410 |                     "confidence": "high",  # High confidence to trigger expert analysis
411 |                     "continuation_id": continuation_id,
412 |                     "model": "gemini-2.0-flash-lite",
413 |                 },
414 |             )
415 | 
416 |             if response3:
417 |                 # Check for expert analysis or completion status
418 |                 try:
419 |                     response_data = json.loads(response3)
420 |                     status = response_data.get("status", "")
421 |                     # Either expert analysis completed or security analysis complete
422 |                     if status in ["complete", "security_analysis_complete"]:
423 |                         self.logger.info("  ✅ Complete audit with expert analysis test passed")
424 |                         return True
425 |                 except json.JSONDecodeError:
426 |                     # If not JSON, check for security content (expert analysis output)
427 |                     if "security" in response3.lower() or "vulnerability" in response3.lower():
428 |                         self.logger.info("  ✅ Complete audit with expert analysis test passed")
429 |                         return True
430 | 
431 |             self.logger.error("Expected expert security analysis or completion")
432 |             return False
433 | 
434 |         except Exception as e:
435 |             self.logger.error(f"Complete audit with analysis test failed: {e}")
436 |             return False
437 | 
438 |     def _test_certain_confidence(self) -> bool:
439 |         """Test behavior when confidence is certain"""
440 |         self.logger.info("  🔧 Testing certain confidence behavior...")
441 | 
442 |         try:
443 |             # Request with certain confidence
444 |             response, _ = self.call_mcp_tool_direct(
445 |                 "secaudit",
446 |                 {
447 |                     "step": f"Security audit complete for {self.auth_file}",
448 |                     "step_number": 1,
449 |                     "total_steps": 1,
450 |                     "next_step_required": False,
451 |                     "findings": "Critical SQL injection vulnerability confirmed in login method",
452 |                     "files_checked": [self.auth_file],
453 |                     "relevant_files": [self.auth_file],
454 |                     "issues_found": [
455 |                         {"severity": "critical", "description": "SQL injection vulnerability in login method"}
456 |                     ],
457 |                     "confidence": "certain",
458 |                     "model": "gemini-2.0-flash-lite",
459 |                 },
460 |             )
461 | 
462 |             if not response:
463 |                 self.logger.error("Failed to execute certain confidence test")
464 |                 return False
465 | 
466 |             try:
467 |                 response_data = json.loads(response)
468 |                 # With certain confidence, should complete without expert analysis
469 |                 if response_data.get("status") == "security_analysis_complete":
470 |                     self.logger.info("  ✅ Certain confidence correctly completes without expert analysis")
471 |                     return True
472 |             except json.JSONDecodeError:
473 |                 pass
474 | 
475 |             # Check if findings are shown directly
476 |             response_lower = response.lower()
477 |             if "sql injection" in response_lower or "vulnerability" in response_lower:
478 |                 self.logger.info("  ✅ Certain confidence shows findings directly")
479 |                 return True
480 | 
481 |             self.logger.error("Expected completion or direct findings with certain confidence")
482 |             return False
483 | 
484 |         except Exception as e:
485 |             self.logger.error(f"Certain confidence test failed: {e}")
486 |             return False
487 | 
488 |     def _test_continuation_with_chat(self) -> bool:
489 |         """Test continuation functionality with chat tool"""
490 |         self.logger.info("  🔧 Testing continuation with chat tool...")
491 | 
492 |         try:
493 |             # First, run a security audit that generates a continuation_id
494 |             response1, continuation_id = self.call_mcp_tool_direct(
495 |                 "secaudit",
496 |                 {
497 |                     "step": f"Start analyzing {self.auth_file} for authentication vulnerabilities",
498 |                     "step_number": 1,
499 |                     "total_steps": 4,
500 |                     "next_step_required": True,
501 |                     "findings": "Beginning authentication security analysis",
502 |                     "relevant_files": [self.auth_file],
503 |                     "model": "gemini-2.0-flash-lite",
504 |                 },
505 |             )
506 | 
507 |             if not response1:
508 |                 self.logger.error("Failed to start audit for continuation test")
509 |                 return False
510 | 
511 |             # Extract continuation_id if present
512 |             if not continuation_id:
513 |                 self.logger.info("  ⚠️  No continuation_id returned, checking response")
514 |                 try:
515 |                     response_data = json.loads(response1)
516 |                     # Look for thread_id in metadata
517 |                     metadata = response_data.get("metadata", {})
518 |                     continuation_id = metadata.get("thread_id")
519 |                 except json.JSONDecodeError:
520 |                     pass
521 | 
522 |             if continuation_id:
523 |                 # Now test using chat tool with continuation
524 |                 chat_response, _ = self.call_mcp_tool_direct(
525 |                     "chat",
526 |                     {
527 |                         "prompt": "Can you tell me more about the SQL injection vulnerability details found in the security audit?",
528 |                         "continuation_id": continuation_id,
529 |                         "model": "gemini-2.0-flash-lite",
530 |                     },
531 |                 )
532 | 
533 |                 if chat_response:
534 |                     self.logger.info("  ✅ Chat tool continuation test passed")
535 |                     return True
536 |             else:
537 |                 # Without continuation_id, just verify the audit step worked
538 |                 if response1:
539 |                     self.logger.info("  ✅ Audit step completed (continuation test limited)")
540 |                     return True
541 | 
542 |             self.logger.error("Expected successful continuation or audit step")
543 |             return False
544 | 
545 |         except Exception as e:
546 |             self.logger.error(f"Continuation test failed: {e}")
547 |             return False
548 | 
549 |     def _test_model_selection(self) -> bool:
550 |         """Test model selection and skip expert analysis option"""
551 |         self.logger.info("  🔧 Testing model selection control...")
552 | 
553 |         try:
554 |             # Test 1: Explicit model selection
555 |             response1, _ = self.call_mcp_tool_direct(
556 |                 "secaudit",
557 |                 {
558 |                     "step": f"Analyze {self.api_file} for SSRF vulnerabilities",
559 |                     "step_number": 1,
560 |                     "total_steps": 2,
561 |                     "next_step_required": True,
562 |                     "findings": "Starting SSRF vulnerability analysis",
563 |                     "relevant_files": [self.api_file],
564 |                     "audit_focus": "owasp",
565 |                     "model": "gemini-2.0-flash-lite",
566 |                 },
567 |             )
568 | 
569 |             if response1:
570 |                 self.logger.info("  ✅ Model selection recognized")
571 | 
572 |             # Test 2: Skip expert analysis
573 |             response2, _ = self.call_mcp_tool_direct(
574 |                 "secaudit",
575 |                 {
576 |                     "step": f"Complete security investigation of {self.auth_file}",
577 |                     "step_number": 1,
578 |                     "total_steps": 1,
579 |                     "next_step_required": False,
580 |                     "findings": "Security issues documented",
581 |                     "files_checked": [self.auth_file],
582 |                     "relevant_files": [self.auth_file],
583 |                     "confidence": "high",
584 |                     "use_assistant_model": False,  # Skip expert analysis
585 |                     "model": "gemini-2.0-flash-lite",
586 |                 },
587 |             )
588 | 
589 |             if response2:
590 |                 try:
591 |                     response_data = json.loads(response2)
592 |                     # Should complete without expert analysis
593 |                     if response_data.get("status") == "security_analysis_complete":
594 |                         self.logger.info("  ✅ Skip expert analysis option works")
595 |                         return True
596 |                 except json.JSONDecodeError:
597 |                     pass
598 | 
599 |                 # Or might just complete the analysis
600 |                 response_lower = response2.lower()
601 |                 if "complete" in response_lower or "security" in response_lower:
602 |                     self.logger.info("  ✅ Analysis performed without expert model")
603 |                     return True
604 | 
605 |             self.logger.error("Expected model selection or skip behavior")
606 |             return False
607 | 
608 |         except Exception as e:
609 |             self.logger.error(f"Model selection test failed: {e}")
610 |             return False
611 | 
```

--------------------------------------------------------------------------------
/tools/planner.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Interactive Sequential Planner - Break down complex tasks through step-by-step planning
  3 | 
  4 | This tool enables structured planning through an interactive, step-by-step process that builds
  5 | plans incrementally with the ability to revise, branch, and adapt as understanding deepens.
  6 | 
  7 | The planner guides users through sequential thinking with forced pauses between steps to ensure
  8 | thorough consideration of alternatives, dependencies, and strategic decisions before moving to
  9 | tactical implementation details.
 10 | 
 11 | Key features:
 12 | - Sequential planning with full context awareness
 13 | - Forced deep reflection for complex plans (≥5 steps) in early stages
 14 | - Branching capabilities for exploring alternative approaches
 15 | - Revision capabilities to update earlier decisions
 16 | - Dynamic step count adjustment as plans evolve
 17 | - Self-contained completion without external expert analysis
 18 | 
 19 | Perfect for: complex project planning, system design with unknowns, migration strategies,
 20 | architectural decisions, and breaking down large problems into manageable steps.
 21 | """
 22 | 
 23 | import logging
 24 | from typing import TYPE_CHECKING, Any
 25 | 
 26 | from pydantic import Field, field_validator
 27 | 
 28 | if TYPE_CHECKING:
 29 |     from tools.models import ToolModelCategory
 30 | 
 31 | from config import TEMPERATURE_BALANCED
 32 | from systemprompts import PLANNER_PROMPT
 33 | from tools.shared.base_models import WorkflowRequest
 34 | 
 35 | from .workflow.base import WorkflowTool
 36 | 
 37 | logger = logging.getLogger(__name__)
 38 | 
 39 | # Tool-specific field descriptions matching original planner tool
 40 | PLANNER_FIELD_DESCRIPTIONS = {
 41 |     "step": (
 42 |         "Planning content for this step. Step 1: describe the task, problem and scope. Later steps: capture updates, "
 43 |         "revisions, branches, or open questions that shape the plan."
 44 |     ),
 45 |     "step_number": "Current planning step number (starts at 1).",
 46 |     "total_steps": "Estimated number of planning steps; adjust as the plan evolves.",
 47 |     "next_step_required": "Set true when another planning step will follow after this one.",
 48 |     "is_step_revision": "Set true when you are replacing a previously recorded step.",
 49 |     "revises_step_number": "Step number being replaced when revising.",
 50 |     "is_branch_point": "True when this step creates a new branch to explore an alternative path.",
 51 |     "branch_from_step": "If branching, the step number that this branch starts from.",
 52 |     "branch_id": "Name for this branch (e.g. 'approach-A', 'migration-path').",
 53 |     "more_steps_needed": "True when you now expect to add additional steps beyond the prior estimate.",
 54 | }
 55 | 
 56 | 
 57 | class PlannerRequest(WorkflowRequest):
 58 |     """Request model for planner workflow tool matching original planner exactly"""
 59 | 
 60 |     # Required fields for each planning step
 61 |     step: str = Field(..., description=PLANNER_FIELD_DESCRIPTIONS["step"])
 62 |     step_number: int = Field(..., description=PLANNER_FIELD_DESCRIPTIONS["step_number"])
 63 |     total_steps: int = Field(..., description=PLANNER_FIELD_DESCRIPTIONS["total_steps"])
 64 |     next_step_required: bool = Field(..., description=PLANNER_FIELD_DESCRIPTIONS["next_step_required"])
 65 | 
 66 |     # Optional revision/branching fields (planning-specific)
 67 |     is_step_revision: bool | None = Field(False, description=PLANNER_FIELD_DESCRIPTIONS["is_step_revision"])
 68 |     revises_step_number: int | None = Field(None, description=PLANNER_FIELD_DESCRIPTIONS["revises_step_number"])
 69 |     is_branch_point: bool | None = Field(False, description=PLANNER_FIELD_DESCRIPTIONS["is_branch_point"])
 70 |     branch_from_step: int | None = Field(None, description=PLANNER_FIELD_DESCRIPTIONS["branch_from_step"])
 71 |     branch_id: str | None = Field(None, description=PLANNER_FIELD_DESCRIPTIONS["branch_id"])
 72 |     more_steps_needed: bool | None = Field(False, description=PLANNER_FIELD_DESCRIPTIONS["more_steps_needed"])
 73 | 
 74 |     # Exclude all investigation/analysis fields that aren't relevant to planning
 75 |     findings: str = Field(
 76 |         default="", exclude=True, description="Not used for planning - step content serves as findings"
 77 |     )
 78 |     files_checked: list[str] = Field(default_factory=list, exclude=True, description="Planning doesn't examine files")
 79 |     relevant_files: list[str] = Field(default_factory=list, exclude=True, description="Planning doesn't use files")
 80 |     relevant_context: list[str] = Field(
 81 |         default_factory=list, exclude=True, description="Planning doesn't track code context"
 82 |     )
 83 |     issues_found: list[dict] = Field(default_factory=list, exclude=True, description="Planning doesn't find issues")
 84 |     confidence: str = Field(default="planning", exclude=True, description="Planning uses different confidence model")
 85 |     hypothesis: str | None = Field(default=None, exclude=True, description="Planning doesn't use hypothesis")
 86 | 
 87 |     # Exclude other non-planning fields
 88 |     temperature: float | None = Field(default=None, exclude=True)
 89 |     thinking_mode: str | None = Field(default=None, exclude=True)
 90 |     use_assistant_model: bool | None = Field(default=False, exclude=True, description="Planning is self-contained")
 91 |     images: list | None = Field(default=None, exclude=True, description="Planning doesn't use images")
 92 | 
 93 |     @field_validator("step_number")
 94 |     @classmethod
 95 |     def validate_step_number(cls, v):
 96 |         if v < 1:
 97 |             raise ValueError("step_number must be at least 1")
 98 |         return v
 99 | 
100 |     @field_validator("total_steps")
101 |     @classmethod
102 |     def validate_total_steps(cls, v):
103 |         if v < 1:
104 |             raise ValueError("total_steps must be at least 1")
105 |         return v
106 | 
107 | 
108 | class PlannerTool(WorkflowTool):
109 |     """
110 |     Planner workflow tool for step-by-step planning using the workflow architecture.
111 | 
112 |     This tool provides the same planning capabilities as the original planner tool
113 |     but uses the new workflow architecture for consistency with other workflow tools.
114 |     It maintains all the original functionality including:
115 |     - Sequential step-by-step planning
116 |     - Branching and revision capabilities
117 |     - Deep thinking pauses for complex plans
118 |     - Conversation memory integration
119 |     - Self-contained operation (no expert analysis)
120 |     """
121 | 
122 |     def __init__(self):
123 |         super().__init__()
124 |         self.branches = {}
125 | 
126 |     def get_name(self) -> str:
127 |         return "planner"
128 | 
129 |     def get_description(self) -> str:
130 |         return (
131 |             "Breaks down complex tasks through interactive, sequential planning with revision and branching capabilities. "
132 |             "Use for complex project planning, system design, migration strategies, and architectural decisions. "
133 |             "Builds plans incrementally with deep reflection for complex scenarios."
134 |         )
135 | 
136 |     def get_system_prompt(self) -> str:
137 |         return PLANNER_PROMPT
138 | 
139 |     def get_default_temperature(self) -> float:
140 |         return TEMPERATURE_BALANCED
141 | 
142 |     def get_model_category(self) -> "ToolModelCategory":
143 |         """Planner requires deep analysis and reasoning"""
144 |         from tools.models import ToolModelCategory
145 | 
146 |         return ToolModelCategory.EXTENDED_REASONING
147 | 
148 |     def requires_model(self) -> bool:
149 |         """
150 |         Planner tool doesn't require model resolution at the MCP boundary.
151 | 
152 |         The planner is a pure data processing tool that organizes planning steps
153 |         and provides structured guidance without calling external AI models.
154 | 
155 |         Returns:
156 |             bool: False - planner doesn't need AI model access
157 |         """
158 |         return False
159 | 
160 |     def get_workflow_request_model(self):
161 |         """Return the planner-specific request model."""
162 |         return PlannerRequest
163 | 
164 |     def get_input_schema(self) -> dict[str, Any]:
165 |         """Generate input schema for planner workflow using override pattern."""
166 |         from .workflow.schema_builders import WorkflowSchemaBuilder
167 | 
168 |         # Planner tool-specific field definitions
169 |         planner_field_overrides = {
170 |             # Override standard workflow fields that need planning-specific descriptions
171 |             "step": {
172 |                 "type": "string",
173 |                 "description": PLANNER_FIELD_DESCRIPTIONS["step"],  # Very planning-specific instructions
174 |             },
175 |             # NEW planning-specific fields (not in base workflow)
176 |             "is_step_revision": {
177 |                 "type": "boolean",
178 |                 "description": PLANNER_FIELD_DESCRIPTIONS["is_step_revision"],
179 |             },
180 |             "revises_step_number": {
181 |                 "type": "integer",
182 |                 "minimum": 1,
183 |                 "description": PLANNER_FIELD_DESCRIPTIONS["revises_step_number"],
184 |             },
185 |             "is_branch_point": {
186 |                 "type": "boolean",
187 |                 "description": PLANNER_FIELD_DESCRIPTIONS["is_branch_point"],
188 |             },
189 |             "branch_from_step": {
190 |                 "type": "integer",
191 |                 "minimum": 1,
192 |                 "description": PLANNER_FIELD_DESCRIPTIONS["branch_from_step"],
193 |             },
194 |             "branch_id": {
195 |                 "type": "string",
196 |                 "description": PLANNER_FIELD_DESCRIPTIONS["branch_id"],
197 |             },
198 |             "more_steps_needed": {
199 |                 "type": "boolean",
200 |                 "description": PLANNER_FIELD_DESCRIPTIONS["more_steps_needed"],
201 |             },
202 |         }
203 | 
204 |         # Define excluded fields for planner workflow
205 |         excluded_workflow_fields = [
206 |             "findings",  # Planning uses step content instead
207 |             "files_checked",  # Planning doesn't examine files
208 |             "relevant_files",  # Planning doesn't use files
209 |             "relevant_context",  # Planning doesn't track code context
210 |             "issues_found",  # Planning doesn't find issues
211 |             "confidence",  # Planning uses different confidence model
212 |             "hypothesis",  # Planning doesn't use hypothesis
213 |         ]
214 | 
215 |         excluded_common_fields = [
216 |             "temperature",  # Planning doesn't need temperature control
217 |             "thinking_mode",  # Planning doesn't need thinking mode
218 |             "images",  # Planning doesn't use images
219 |             "absolute_file_paths",  # Planning doesn't use file attachments
220 |         ]
221 | 
222 |         # Build schema with proper field exclusion (following consensus pattern)
223 |         return WorkflowSchemaBuilder.build_schema(
224 |             tool_specific_fields=planner_field_overrides,
225 |             required_fields=[],  # No additional required fields beyond workflow defaults
226 |             model_field_schema=self.get_model_field_schema(),
227 |             auto_mode=self.is_effective_auto_mode(),
228 |             tool_name=self.get_name(),
229 |             excluded_workflow_fields=excluded_workflow_fields,
230 |             excluded_common_fields=excluded_common_fields,
231 |         )
232 | 
233 |     # ================================================================================
234 |     # Abstract Methods - Required Implementation from BaseWorkflowMixin
235 |     # ================================================================================
236 | 
237 |     def get_required_actions(
238 |         self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
239 |     ) -> list[str]:
240 |         """Define required actions for each planning phase."""
241 |         if step_number == 1:
242 |             # Initial planning tasks
243 |             return [
244 |                 "Think deeply about the complete scope and complexity of what needs to be planned",
245 |                 "Consider multiple approaches and their trade-offs",
246 |                 "Identify key constraints, dependencies, and potential challenges",
247 |                 "Think about stakeholders, success criteria, and critical requirements",
248 |             ]
249 |         elif step_number <= 3 and total_steps >= 5:
250 |             # Complex plan early stages - force deep thinking
251 |             if step_number == 2:
252 |                 return [
253 |                     "Evaluate the approach from step 1 - are there better alternatives?",
254 |                     "Break down the major phases and identify critical decision points",
255 |                     "Consider resource requirements and potential bottlenecks",
256 |                     "Think about how different parts interconnect and affect each other",
257 |                 ]
258 |             else:  # step_number == 3
259 |                 return [
260 |                     "Validate that the emerging plan addresses the original requirements",
261 |                     "Identify any gaps or assumptions that need clarification",
262 |                     "Consider how to validate progress and adjust course if needed",
263 |                     "Think about what the first concrete steps should be",
264 |                 ]
265 |         else:
266 |             # Later steps or simple plans
267 |             return [
268 |                 "Continue developing the plan with concrete, actionable steps",
269 |                 "Consider implementation details and practical considerations",
270 |                 "Think about how to sequence and coordinate different activities",
271 |                 "Prepare for execution planning and resource allocation",
272 |             ]
273 | 
274 |     def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
275 |         """Planner is self-contained and doesn't need expert analysis."""
276 |         return False
277 | 
278 |     def prepare_expert_analysis_context(self, consolidated_findings) -> str:
279 |         """Planner doesn't use expert analysis."""
280 |         return ""
281 | 
282 |     def requires_expert_analysis(self) -> bool:
283 |         """Planner is self-contained like the original planner tool."""
284 |         return False
285 | 
286 |     # ================================================================================
287 |     # Workflow Customization - Match Original Planner Behavior
288 |     # ================================================================================
289 | 
290 |     def prepare_step_data(self, request) -> dict:
291 |         """
292 |         Prepare step data from request with planner-specific fields.
293 |         """
294 |         step_data = {
295 |             "step": request.step,
296 |             "step_number": request.step_number,
297 |             "findings": f"Planning step {request.step_number}: {request.step}",  # Use step content as findings
298 |             "files_checked": [],  # Planner doesn't check files
299 |             "relevant_files": [],  # Planner doesn't use files
300 |             "relevant_context": [],  # Planner doesn't track context like debug
301 |             "issues_found": [],  # Planner doesn't track issues
302 |             "confidence": "planning",  # Planning confidence is different from investigation
303 |             "hypothesis": None,  # Planner doesn't use hypothesis
304 |             "images": [],  # Planner doesn't use images
305 |             # Planner-specific fields
306 |             "is_step_revision": request.is_step_revision or False,
307 |             "revises_step_number": request.revises_step_number,
308 |             "is_branch_point": request.is_branch_point or False,
309 |             "branch_from_step": request.branch_from_step,
310 |             "branch_id": request.branch_id,
311 |             "more_steps_needed": request.more_steps_needed or False,
312 |         }
313 |         return step_data
314 | 
315 |     def build_base_response(self, request, continuation_id: str = None) -> dict:
316 |         """
317 |         Build the base response structure with planner-specific fields.
318 |         """
319 |         # Use work_history from workflow mixin for consistent step tracking
320 |         # Add 1 to account for current step being processed
321 |         current_step_count = len(self.work_history) + 1
322 | 
323 |         response_data = {
324 |             "status": f"{self.get_name()}_in_progress",
325 |             "step_number": request.step_number,
326 |             "total_steps": request.total_steps,
327 |             "next_step_required": request.next_step_required,
328 |             "step_content": request.step,
329 |             f"{self.get_name()}_status": {
330 |                 "files_checked": len(self.consolidated_findings.files_checked),
331 |                 "relevant_files": len(self.consolidated_findings.relevant_files),
332 |                 "relevant_context": len(self.consolidated_findings.relevant_context),
333 |                 "issues_found": len(self.consolidated_findings.issues_found),
334 |                 "images_collected": len(self.consolidated_findings.images),
335 |                 "current_confidence": self.get_request_confidence(request),
336 |                 "step_history_length": current_step_count,  # Use work_history + current step
337 |             },
338 |             "metadata": {
339 |                 "branches": list(self.branches.keys()),
340 |                 "step_history_length": current_step_count,  # Use work_history + current step
341 |                 "is_step_revision": request.is_step_revision or False,
342 |                 "revises_step_number": request.revises_step_number,
343 |                 "is_branch_point": request.is_branch_point or False,
344 |                 "branch_from_step": request.branch_from_step,
345 |                 "branch_id": request.branch_id,
346 |                 "more_steps_needed": request.more_steps_needed or False,
347 |             },
348 |         }
349 | 
350 |         if continuation_id:
351 |             response_data["continuation_id"] = continuation_id
352 | 
353 |         return response_data
354 | 
355 |     def handle_work_continuation(self, response_data: dict, request) -> dict:
356 |         """
357 |         Handle work continuation with planner-specific deep thinking pauses.
358 |         """
359 |         response_data["status"] = f"pause_for_{self.get_name()}"
360 |         response_data[f"{self.get_name()}_required"] = True
361 | 
362 |         # Get planner-specific required actions
363 |         required_actions = self.get_required_actions(request.step_number, "planning", request.step, request.total_steps)
364 |         response_data["required_actions"] = required_actions
365 | 
366 |         # Enhanced deep thinking pauses for complex plans
367 |         if request.total_steps >= 5 and request.step_number <= 3:
368 |             response_data["status"] = "pause_for_deep_thinking"
369 |             response_data["thinking_required"] = True
370 |             response_data["required_thinking"] = required_actions
371 | 
372 |             if request.step_number == 1:
373 |                 response_data["next_steps"] = (
374 |                     f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. This is a complex plan ({request.total_steps} steps) "
375 |                     f"that requires deep thinking. You MUST first spend time reflecting on the planning challenge:\n\n"
376 |                     f"REQUIRED DEEP THINKING before calling {self.get_name()} step {request.step_number + 1}:\n"
377 |                     f"1. Analyze the FULL SCOPE: What exactly needs to be accomplished?\n"
378 |                     f"2. Consider MULTIPLE APPROACHES: What are 2-3 different ways to tackle this?\n"
379 |                     f"3. Identify CONSTRAINTS & DEPENDENCIES: What limits our options?\n"
380 |                     f"4. Think about SUCCESS CRITERIA: How will we know we've succeeded?\n"
381 |                     f"5. Consider RISKS & MITIGATION: What could go wrong early vs late?\n\n"
382 |                     f"Only call {self.get_name()} again with step_number: {request.step_number + 1} AFTER this deep analysis."
383 |                 )
384 |             elif request.step_number == 2:
385 |                 response_data["next_steps"] = (
386 |                     f"STOP! Complex planning requires reflection between steps. DO NOT call {self.get_name()} immediately.\n\n"
387 |                     f"MANDATORY REFLECTION before {self.get_name()} step {request.step_number + 1}:\n"
388 |                     f"1. EVALUATE YOUR APPROACH: Is the direction from step 1 still the best?\n"
389 |                     f"2. IDENTIFY MAJOR PHASES: What are the 3-5 main chunks of work?\n"
390 |                     f"3. SPOT DEPENDENCIES: What must happen before what?\n"
391 |                     f"4. CONSIDER RESOURCES: What skills, tools, or access do we need?\n"
392 |                     f"5. FIND CRITICAL PATHS: Where could delays hurt the most?\n\n"
393 |                     f"Think deeply about these aspects, then call {self.get_name()} with step_number: {request.step_number + 1}."
394 |                 )
395 |             elif request.step_number == 3:
396 |                 response_data["next_steps"] = (
397 |                     f"PAUSE for final strategic reflection. DO NOT call {self.get_name()} yet.\n\n"
398 |                     f"FINAL DEEP THINKING before {self.get_name()} step {request.step_number + 1}:\n"
399 |                     f"1. VALIDATE COMPLETENESS: Does this plan address all original requirements?\n"
400 |                     f"2. CHECK FOR GAPS: What assumptions need validation? What's unclear?\n"
401 |                     f"3. PLAN FOR ADAPTATION: How will we know if we need to change course?\n"
402 |                     f"4. DEFINE FIRST STEPS: What are the first 2-3 concrete actions?\n"
403 |                     f"5. TRANSITION MINDSET: Ready to shift from strategic to tactical planning?\n\n"
404 |                     f"After this reflection, call {self.get_name()} with step_number: {request.step_number + 1} to continue with tactical details."
405 |                 )
406 |         else:
407 |             # Normal flow for simple plans or later steps
408 |             remaining_steps = request.total_steps - request.step_number
409 |             response_data["next_steps"] = (
410 |                 f"Continue with step {request.step_number + 1}. Approximately {remaining_steps} steps remaining."
411 |             )
412 | 
413 |         return response_data
414 | 
415 |     def customize_workflow_response(self, response_data: dict, request) -> dict:
416 |         """
417 |         Customize response to match original planner tool format.
418 |         """
419 |         # No need to append to step_history since workflow mixin already manages work_history
420 |         # and we calculate step counts from work_history
421 | 
422 |         # Handle branching like original planner
423 |         if request.is_branch_point and request.branch_from_step and request.branch_id:
424 |             if request.branch_id not in self.branches:
425 |                 self.branches[request.branch_id] = []
426 |             step_data = self.prepare_step_data(request)
427 |             self.branches[request.branch_id].append(step_data)
428 | 
429 |         # Ensure metadata exists and preserve existing metadata from build_base_response
430 |         if "metadata" not in response_data:
431 |             response_data["metadata"] = {}
432 | 
433 |         # Store planner-specific metadata that should persist through workflow metadata addition
434 |         planner_metadata = {
435 |             "branches": list(self.branches.keys()),
436 |             "is_step_revision": request.is_step_revision or False,
437 |             "revises_step_number": request.revises_step_number,
438 |             "is_branch_point": request.is_branch_point or False,
439 |             "branch_from_step": request.branch_from_step,
440 |             "branch_id": request.branch_id,
441 |             "more_steps_needed": request.more_steps_needed or False,
442 |         }
443 | 
444 |         # Update metadata while preserving existing values
445 |         response_data["metadata"].update(planner_metadata)
446 | 
447 |         # Add planner-specific output instructions for final steps
448 |         if not request.next_step_required:
449 |             response_data["planning_complete"] = True
450 |             response_data["plan_summary"] = (
451 |                 f"COMPLETE PLAN: {request.step} (Total {request.total_steps} steps completed)"
452 |             )
453 |             response_data["output"] = {
454 |                 "instructions": "This is a structured planning response. Present the step_content as the main planning analysis. If next_step_required is true, continue with the next step. If planning_complete is true, present the complete plan in a well-structured format with clear sections, headings, numbered steps, and visual elements like ASCII charts for phases/dependencies. Use bullet points, sub-steps, sequences, and visual organization to make complex plans easy to understand and follow. IMPORTANT: Do NOT use emojis - use clear text formatting and ASCII characters only. Do NOT mention time estimates or costs unless explicitly requested.",
455 |                 "format": "step_by_step_planning",
456 |                 "presentation_guidelines": {
457 |                     "completed_plans": "Use clear headings, numbered phases, ASCII diagrams for workflows/dependencies, bullet points for sub-tasks, and visual sequences where helpful. No emojis. No time/cost estimates unless requested.",
458 |                     "step_content": "Present as main analysis with clear structure and actionable insights. No emojis. No time/cost estimates unless requested.",
459 |                     "continuation": "Use continuation_id for related planning sessions or implementation planning",
460 |                 },
461 |             }
462 |             response_data["next_steps"] = (
463 |                 "Planning complete. Present the complete plan to the user in a well-structured format with clear sections, "
464 |                 "numbered steps, visual elements (ASCII charts/diagrams where helpful), sub-step breakdowns, and implementation guidance. "
465 |                 "Use headings, bullet points, and visual organization to make the plan easy to follow. "
466 |                 "If there are phases, dependencies, or parallel tracks, show these relationships visually. "
467 |                 "IMPORTANT: Do NOT use emojis - use clear text formatting and ASCII characters only. "
468 |                 "Do NOT mention time estimates or costs unless explicitly requested. "
469 |                 "After presenting the plan, offer to either help implement specific parts or use the continuation_id to start related planning sessions."
470 |             )
471 | 
472 |         # Convert generic status names to planner-specific ones
473 |         tool_name = self.get_name()
474 |         status_mapping = {
475 |             f"{tool_name}_in_progress": "planning_in_progress",
476 |             f"pause_for_{tool_name}": "pause_for_planning",
477 |             f"{tool_name}_required": "planning_required",
478 |             f"{tool_name}_complete": "planning_complete",
479 |         }
480 | 
481 |         if response_data["status"] in status_mapping:
482 |             response_data["status"] = status_mapping[response_data["status"]]
483 | 
484 |         return response_data
485 | 
486 |     # ================================================================================
487 |     # Hook Method Overrides for Planner-Specific Behavior
488 |     # ================================================================================
489 | 
490 |     def get_completion_status(self) -> str:
491 |         """Planner uses planning-specific status."""
492 |         return "planning_complete"
493 | 
494 |     def get_completion_data_key(self) -> str:
495 |         """Planner uses 'complete_planning' key."""
496 |         return "complete_planning"
497 | 
498 |     def get_completion_message(self) -> str:
499 |         """Planner-specific completion message."""
500 |         return (
501 |             "Planning complete. Present the complete plan to the user in a well-structured format "
502 |             "and offer to help implement specific parts or start related planning sessions."
503 |         )
504 | 
505 |     def get_skip_reason(self) -> str:
506 |         """Planner-specific skip reason."""
507 |         return "Planner is self-contained and completes planning without external analysis"
508 | 
509 |     def get_skip_expert_analysis_status(self) -> str:
510 |         """Planner-specific expert analysis skip status."""
511 |         return "skipped_by_tool_design"
512 | 
513 |     def store_initial_issue(self, step_description: str):
514 |         """Store initial planning description."""
515 |         self.initial_planning_description = step_description
516 | 
517 |     def get_initial_request(self, fallback_step: str) -> str:
518 |         """Get initial planning description."""
519 |         try:
520 |             return self.initial_planning_description
521 |         except AttributeError:
522 |             return fallback_step
523 | 
524 |     # Required abstract methods from BaseTool
525 |     def get_request_model(self):
526 |         """Return the planner-specific request model."""
527 |         return PlannerRequest
528 | 
529 |     async def prepare_prompt(self, request) -> str:
530 |         """Not used - workflow tools use execute_workflow()."""
531 |         return ""  # Workflow tools use execute_workflow() directly
532 | 
```

--------------------------------------------------------------------------------
/tools/debug.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Debug tool - Systematic root cause analysis and debugging assistance
  3 | 
  4 | This tool provides a structured workflow for investigating complex bugs and issues.
  5 | It guides you through systematic investigation steps with forced pauses between each step
  6 | to ensure thorough code examination before proceeding. The tool supports hypothesis evolution
  7 | and expert analysis integration for comprehensive debugging.
  8 | 
  9 | Key features:
 10 | - Step-by-step investigation workflow with progress tracking
 11 | - Context-aware file embedding (references during investigation, full content for analysis)
 12 | - Automatic conversation threading and history preservation
 13 | - Expert analysis integration with external models
 14 | - Support for visual debugging with image context
 15 | - Confidence-based workflow optimization
 16 | """
 17 | 
 18 | import logging
 19 | from typing import TYPE_CHECKING, Any, Optional
 20 | 
 21 | from pydantic import Field
 22 | 
 23 | if TYPE_CHECKING:
 24 |     from tools.models import ToolModelCategory
 25 | 
 26 | from config import TEMPERATURE_ANALYTICAL
 27 | from systemprompts import DEBUG_ISSUE_PROMPT
 28 | from tools.shared.base_models import WorkflowRequest
 29 | 
 30 | from .workflow.base import WorkflowTool
 31 | 
 32 | logger = logging.getLogger(__name__)
 33 | 
 34 | # Tool-specific field descriptions matching original debug tool
 35 | DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS = {
 36 |     "step": (
 37 |         "Investigation step. Step 1: State issue+direction. "
 38 |         "Symptoms misleading; 'no bug' valid. Trace dependencies, verify hypotheses. "
 39 |         "Use relevant_files for code; this for text only."
 40 |     ),
 41 |     "step_number": "Current step index (starts at 1). Build upon previous steps.",
 42 |     "total_steps": (
 43 |         "Estimated total steps needed to complete the investigation. Adjust as new findings emerge. "
 44 |         "IMPORTANT: When continuation_id is provided (continuing a previous conversation), set this to 1 as we're not starting a new multi-step investigation."
 45 |     ),
 46 |     "next_step_required": (
 47 |         "True if you plan to continue the investigation with another step. False means root cause is known or investigation is complete. "
 48 |         "IMPORTANT: When continuation_id is provided (continuing a previous conversation), set this to False to immediately proceed with expert analysis."
 49 |     ),
 50 |     "findings": (
 51 |         "Discoveries: clues, code/log evidence, disproven theories. Be specific. "
 52 |         "If no bug found, document clearly as valid."
 53 |     ),
 54 |     "files_checked": "All examined files (absolute paths), including ruled-out ones.",
 55 |     "relevant_files": "Files directly relevant to issue (absolute paths). Cause, trigger, or manifestation locations.",
 56 |     "relevant_context": "Methods/functions central to issue: 'Class.method' or 'function'. Focus on inputs/branching/state.",
 57 |     "hypothesis": (
 58 |         "Concrete root cause theory from evidence. Can revise. "
 59 |         "Valid: 'No bug found - user misunderstanding' or 'Symptoms unrelated to code' if supported."
 60 |     ),
 61 |     "confidence": (
 62 |         "Your confidence in the hypothesis: exploring (starting out), low (early idea), medium (some evidence), "
 63 |         "high (strong evidence), very_high (very strong evidence), almost_certain (nearly confirmed), "
 64 |         "certain (100% confidence - root cause and fix are both confirmed locally with no need for external validation). "
 65 |         "WARNING: Do NOT use 'certain' unless the issue can be fully resolved with a fix, use 'very_high' or 'almost_certain' instead when not 100% sure. "
 66 |         "Using 'certain' means you have ABSOLUTE confidence locally and PREVENTS external model validation."
 67 |     ),
 68 |     "images": "Optional screenshots/visuals clarifying issue (absolute paths).",
 69 | }
 70 | 
 71 | 
 72 | class DebugInvestigationRequest(WorkflowRequest):
 73 |     """Request model for debug investigation steps matching original debug tool exactly"""
 74 | 
 75 |     # Required fields for each investigation step
 76 |     step: str = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["step"])
 77 |     step_number: int = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["step_number"])
 78 |     total_steps: int = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["total_steps"])
 79 |     next_step_required: bool = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["next_step_required"])
 80 | 
 81 |     # Investigation tracking fields
 82 |     findings: str = Field(..., description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["findings"])
 83 |     files_checked: list[str] = Field(
 84 |         default_factory=list, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["files_checked"]
 85 |     )
 86 |     relevant_files: list[str] = Field(
 87 |         default_factory=list, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["relevant_files"]
 88 |     )
 89 |     relevant_context: list[str] = Field(
 90 |         default_factory=list, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["relevant_context"]
 91 |     )
 92 |     hypothesis: Optional[str] = Field(None, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["hypothesis"])
 93 |     confidence: Optional[str] = Field("low", description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["confidence"])
 94 | 
 95 |     # Optional images for visual debugging
 96 |     images: Optional[list[str]] = Field(default=None, description=DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["images"])
 97 | 
 98 |     # Override inherited fields to exclude them from schema (except model which needs to be available)
 99 |     temperature: Optional[float] = Field(default=None, exclude=True)
100 |     thinking_mode: Optional[str] = Field(default=None, exclude=True)
101 | 
102 | 
103 | class DebugIssueTool(WorkflowTool):
104 |     """
105 |     Debug tool for systematic root cause analysis and issue investigation.
106 | 
107 |     This tool implements a structured debugging workflow that guides users through
108 |     methodical investigation steps, ensuring thorough code examination and evidence
109 |     gathering before reaching conclusions. It supports complex debugging scenarios
110 |     including race conditions, memory leaks, performance issues, and integration problems.
111 |     """
112 | 
113 |     def __init__(self):
114 |         super().__init__()
115 |         self.initial_issue = None
116 | 
117 |     def get_name(self) -> str:
118 |         return "debug"
119 | 
120 |     def get_description(self) -> str:
121 |         return (
122 |             "Performs systematic debugging and root cause analysis for any type of issue. "
123 |             "Use for complex bugs, mysterious errors, performance issues, race conditions, memory leaks, and integration problems. "
124 |             "Guides through structured investigation with hypothesis testing and expert analysis."
125 |         )
126 | 
127 |     def get_system_prompt(self) -> str:
128 |         return DEBUG_ISSUE_PROMPT
129 | 
130 |     def get_default_temperature(self) -> float:
131 |         return TEMPERATURE_ANALYTICAL
132 | 
133 |     def get_model_category(self) -> "ToolModelCategory":
134 |         """Debug requires deep analysis and reasoning"""
135 |         from tools.models import ToolModelCategory
136 | 
137 |         return ToolModelCategory.EXTENDED_REASONING
138 | 
139 |     def get_workflow_request_model(self):
140 |         """Return the debug-specific request model."""
141 |         return DebugInvestigationRequest
142 | 
143 |     def get_input_schema(self) -> dict[str, Any]:
144 |         """Generate input schema using WorkflowSchemaBuilder with debug-specific overrides."""
145 |         from .workflow.schema_builders import WorkflowSchemaBuilder
146 | 
147 |         # Debug-specific field overrides
148 |         debug_field_overrides = {
149 |             "step": {
150 |                 "type": "string",
151 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["step"],
152 |             },
153 |             "step_number": {
154 |                 "type": "integer",
155 |                 "minimum": 1,
156 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["step_number"],
157 |             },
158 |             "total_steps": {
159 |                 "type": "integer",
160 |                 "minimum": 1,
161 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["total_steps"],
162 |             },
163 |             "next_step_required": {
164 |                 "type": "boolean",
165 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["next_step_required"],
166 |             },
167 |             "findings": {
168 |                 "type": "string",
169 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["findings"],
170 |             },
171 |             "files_checked": {
172 |                 "type": "array",
173 |                 "items": {"type": "string"},
174 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["files_checked"],
175 |             },
176 |             "relevant_files": {
177 |                 "type": "array",
178 |                 "items": {"type": "string"},
179 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["relevant_files"],
180 |             },
181 |             "confidence": {
182 |                 "type": "string",
183 |                 "enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
184 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["confidence"],
185 |             },
186 |             "hypothesis": {
187 |                 "type": "string",
188 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["hypothesis"],
189 |             },
190 |             "images": {
191 |                 "type": "array",
192 |                 "items": {"type": "string"},
193 |                 "description": DEBUG_INVESTIGATION_FIELD_DESCRIPTIONS["images"],
194 |             },
195 |         }
196 | 
197 |         # Use WorkflowSchemaBuilder with debug-specific tool fields
198 |         return WorkflowSchemaBuilder.build_schema(
199 |             tool_specific_fields=debug_field_overrides,
200 |             model_field_schema=self.get_model_field_schema(),
201 |             auto_mode=self.is_effective_auto_mode(),
202 |             tool_name=self.get_name(),
203 |         )
204 | 
205 |     def get_required_actions(
206 |         self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
207 |     ) -> list[str]:
208 |         """Define required actions for each investigation phase."""
209 |         if step_number == 1:
210 |             # Initial investigation tasks
211 |             return [
212 |                 "Search for code related to the reported issue or symptoms",
213 |                 "Examine relevant files and understand the current implementation",
214 |                 "Understand the project structure and locate relevant modules",
215 |                 "Identify how the affected functionality is supposed to work",
216 |             ]
217 |         elif confidence in ["exploring", "low"]:
218 |             # Need deeper investigation
219 |             return [
220 |                 "Examine the specific files you've identified as relevant",
221 |                 "Trace method calls and data flow through the system",
222 |                 "Check for edge cases, boundary conditions, and assumptions in the code",
223 |                 "Look for related configuration, dependencies, or external factors",
224 |             ]
225 |         elif confidence in ["medium", "high", "very_high"]:
226 |             # Close to root cause - need confirmation
227 |             return [
228 |                 "Examine the exact code sections where you believe the issue occurs",
229 |                 "Trace the execution path that leads to the failure",
230 |                 "Verify your hypothesis with concrete code evidence",
231 |                 "Check for any similar patterns elsewhere in the codebase",
232 |             ]
233 |         elif confidence == "almost_certain":
234 |             # Almost certain - final verification before conclusion
235 |             return [
236 |                 "Finalize your root cause analysis with specific evidence",
237 |                 "Document the complete chain of causation from symptom to root cause",
238 |                 "Verify the minimal fix approach is correct",
239 |                 "Consider if expert analysis would provide additional insights",
240 |             ]
241 |         else:
242 |             # General investigation needed
243 |             return [
244 |                 "Continue examining the code paths identified in your hypothesis",
245 |                 "Gather more evidence using appropriate investigation tools",
246 |                 "Test edge cases and boundary conditions",
247 |                 "Look for patterns that confirm or refute your theory",
248 |             ]
249 | 
250 |     def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
251 |         """
252 |         Decide when to call external model based on investigation completeness.
253 | 
254 |         Don't call expert analysis if the CLI agent has certain confidence - trust their judgment.
255 |         """
256 |         # Check if user requested to skip assistant model
257 |         if request and not self.get_request_use_assistant_model(request):
258 |             return False
259 | 
260 |         # Check if we have meaningful investigation data
261 |         return (
262 |             len(consolidated_findings.relevant_files) > 0
263 |             or len(consolidated_findings.findings) >= 2
264 |             or len(consolidated_findings.issues_found) > 0
265 |         )
266 | 
267 |     def prepare_expert_analysis_context(self, consolidated_findings) -> str:
268 |         """Prepare context for external model call matching original debug tool format."""
269 |         context_parts = [
270 |             f"=== ISSUE DESCRIPTION ===\n{self.initial_issue or 'Investigation initiated'}\n=== END DESCRIPTION ==="
271 |         ]
272 | 
273 |         # Add special note if confidence is almost_certain
274 |         if consolidated_findings.confidence == "almost_certain":
275 |             context_parts.append(
276 |                 "\n=== IMPORTANT: ALMOST CERTAIN CONFIDENCE ===\n"
277 |                 "The agent has reached 'almost_certain' confidence but has NOT confirmed the bug with 100% certainty. "
278 |                 "Your role is to:\n"
279 |                 "1. Validate the agent's hypothesis and investigation\n"
280 |                 "2. Identify any missing evidence or overlooked aspects\n"
281 |                 "3. Provide additional insights that could confirm or refute the hypothesis\n"
282 |                 "4. Help finalize the root cause analysis with complete certainty\n"
283 |                 "=== END IMPORTANT ==="
284 |             )
285 | 
286 |         # Add investigation summary
287 |         investigation_summary = self._build_investigation_summary(consolidated_findings)
288 |         context_parts.append(f"\n=== AGENT'S INVESTIGATION FINDINGS ===\n{investigation_summary}\n=== END FINDINGS ===")
289 | 
290 |         # Add error context if available
291 |         error_context = self._extract_error_context(consolidated_findings)
292 |         if error_context:
293 |             context_parts.append(f"\n=== ERROR CONTEXT/STACK TRACE ===\n{error_context}\n=== END CONTEXT ===")
294 | 
295 |         # Add relevant methods/functions if available
296 |         if consolidated_findings.relevant_context:
297 |             methods_text = "\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
298 |             context_parts.append(f"\n=== RELEVANT METHODS/FUNCTIONS ===\n{methods_text}\n=== END METHODS ===")
299 | 
300 |         # Add hypothesis evolution if available
301 |         if consolidated_findings.hypotheses:
302 |             hypotheses_text = "\n".join(
303 |                 f"Step {h['step']} ({h['confidence']} confidence): {h['hypothesis']}"
304 |                 for h in consolidated_findings.hypotheses
305 |             )
306 |             context_parts.append(f"\n=== HYPOTHESIS EVOLUTION ===\n{hypotheses_text}\n=== END HYPOTHESES ===")
307 | 
308 |         # Add images if available
309 |         if consolidated_findings.images:
310 |             images_text = "\n".join(f"- {img}" for img in consolidated_findings.images)
311 |             context_parts.append(
312 |                 f"\n=== VISUAL DEBUGGING INFORMATION ===\n{images_text}\n=== END VISUAL INFORMATION ==="
313 |             )
314 | 
315 |         # Add file content if we have relevant files
316 |         if consolidated_findings.relevant_files:
317 |             file_content, _ = self._prepare_file_content_for_prompt(
318 |                 list(consolidated_findings.relevant_files), None, "Essential debugging files"
319 |             )
320 |             if file_content:
321 |                 context_parts.append(
322 |                     f"\n=== ESSENTIAL FILES FOR DEBUGGING ===\n{file_content}\n=== END ESSENTIAL FILES ==="
323 |                 )
324 | 
325 |         return "\n".join(context_parts)
326 | 
327 |     def _build_investigation_summary(self, consolidated_findings) -> str:
328 |         """Prepare a comprehensive summary of the investigation."""
329 |         summary_parts = [
330 |             "=== SYSTEMATIC INVESTIGATION SUMMARY ===",
331 |             f"Total steps: {len(consolidated_findings.findings)}",
332 |             f"Files examined: {len(consolidated_findings.files_checked)}",
333 |             f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
334 |             f"Methods/functions involved: {len(consolidated_findings.relevant_context)}",
335 |             "",
336 |             "=== INVESTIGATION PROGRESSION ===",
337 |         ]
338 | 
339 |         for finding in consolidated_findings.findings:
340 |             summary_parts.append(finding)
341 | 
342 |         return "\n".join(summary_parts)
343 | 
344 |     def _extract_error_context(self, consolidated_findings) -> Optional[str]:
345 |         """Extract error context from investigation findings."""
346 |         error_patterns = ["error", "exception", "stack trace", "traceback", "failure"]
347 |         error_context_parts = []
348 | 
349 |         for finding in consolidated_findings.findings:
350 |             if any(pattern in finding.lower() for pattern in error_patterns):
351 |                 error_context_parts.append(finding)
352 | 
353 |         return "\n".join(error_context_parts) if error_context_parts else None
354 | 
355 |     def get_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
356 |         """
357 |         Provide step-specific guidance matching original debug tool behavior.
358 | 
359 |         This method generates debug-specific guidance that's used by get_step_guidance_message().
360 |         """
361 |         # Generate the next steps instruction based on required actions
362 |         required_actions = self.get_required_actions(step_number, confidence, request.findings, request.total_steps)
363 | 
364 |         if step_number == 1:
365 |             next_steps = (
366 |                 f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first investigate "
367 |                 f"the codebase using appropriate tools. CRITICAL AWARENESS: The reported symptoms might be "
368 |                 f"caused by issues elsewhere in the code, not where symptoms appear. Also, after thorough "
369 |                 f"investigation, it's possible NO BUG EXISTS - the issue might be a misunderstanding or "
370 |                 f"user expectation mismatch. Search broadly, examine implementations, understand the logic flow. "
371 |                 f"Only call {self.get_name()} again AFTER gathering concrete evidence. When you call "
372 |                 f"{self.get_name()} next time, "
373 |                 f"use step_number: {step_number + 1} and report specific files examined and findings discovered."
374 |             )
375 |         elif confidence in ["exploring", "low"]:
376 |             next_steps = (
377 |                 f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified potential areas "
378 |                 f"but need concrete evidence. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
379 |                 + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
380 |                 + f"\n\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
381 |                 + "completing these investigations."
382 |             )
383 |         elif confidence in ["medium", "high", "very_high"]:
384 |             next_steps = (
385 |                 f"WAIT! Your hypothesis needs verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\n"
386 |                 + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
387 |                 + f"\n\nREMEMBER: If you cannot find concrete evidence of a bug causing the reported symptoms, "
388 |                 f"'no bug found' is a valid conclusion. Consider suggesting discussion with your thought partner "
389 |                 f"or engineering assistant for clarification. Document findings with specific file:line references, "
390 |                 f"then call {self.get_name()} with step_number: {step_number + 1}."
391 |             )
392 |         elif confidence == "almost_certain":
393 |             next_steps = (
394 |                 "ALMOST CERTAIN - Prepare for final analysis. REQUIRED ACTIONS:\n"
395 |                 + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
396 |                 + "\n\nIMPORTANT: You're almost certain about the root cause. If you have NOT found the bug with "
397 |                 "100% certainty, consider setting next_step_required=false to invoke expert analysis. The expert "
398 |                 "can validate your hypotheses and provide additional insights. If you ARE 100% certain and have "
399 |                 "identified the exact bug and fix, proceed to confidence='certain'. Otherwise, let expert analysis "
400 |                 "help finalize the investigation."
401 |             )
402 |         else:
403 |             next_steps = (
404 |                 f"PAUSE INVESTIGATION. Before calling {self.get_name()} step {step_number + 1}, you MUST examine code. "
405 |                 + "Required: "
406 |                 + ", ".join(required_actions[:2])
407 |                 + ". "
408 |                 + f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
409 |                 f"NEW evidence from actual code examination, not just theories. If no bug evidence "
410 |                 f"is found, suggesting "
411 |                 f"collaboration with thought partner is valuable. NO recursive {self.get_name()} calls "
412 |                 f"without investigation work!"
413 |             )
414 | 
415 |         return {"next_steps": next_steps}
416 | 
417 |     # Hook method overrides for debug-specific behavior
418 | 
419 |     def prepare_step_data(self, request) -> dict:
420 |         """
421 |         Prepare debug-specific step data for processing.
422 |         """
423 |         step_data = {
424 |             "step": request.step,
425 |             "step_number": request.step_number,
426 |             "findings": request.findings,
427 |             "files_checked": request.files_checked,
428 |             "relevant_files": request.relevant_files,
429 |             "relevant_context": request.relevant_context,
430 |             "issues_found": [],  # Debug tool doesn't use issues_found field
431 |             "confidence": request.confidence,
432 |             "hypothesis": request.hypothesis,
433 |             "images": request.images or [],
434 |         }
435 |         return step_data
436 | 
437 |     def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
438 |         """
439 |         Debug tool skips expert analysis when agent has "certain" confidence.
440 |         """
441 |         return request.confidence == "certain" and not request.next_step_required
442 | 
443 |     # Override inheritance hooks for debug-specific behavior
444 | 
445 |     def get_completion_status(self) -> str:
446 |         """Debug tools use debug-specific status."""
447 |         return "certain_confidence_proceed_with_fix"
448 | 
449 |     def get_completion_data_key(self) -> str:
450 |         """Debug uses 'complete_investigation' key."""
451 |         return "complete_investigation"
452 | 
453 |     def get_final_analysis_from_request(self, request):
454 |         """Debug tools use 'hypothesis' field."""
455 |         return request.hypothesis
456 | 
457 |     def get_confidence_level(self, request) -> str:
458 |         """Debug tools use 'certain' for high confidence."""
459 |         return "certain"
460 | 
461 |     def get_completion_message(self) -> str:
462 |         """Debug-specific completion message."""
463 |         return (
464 |             "Investigation complete with CERTAIN confidence. You have identified the exact "
465 |             "root cause and a minimal fix. MANDATORY: Present the user with the root cause analysis "
466 |             "and IMMEDIATELY proceed with implementing the simple fix without requiring further "
467 |             "consultation. Focus on the precise, minimal change needed."
468 |         )
469 | 
470 |     def get_skip_reason(self) -> str:
471 |         """Debug-specific skip reason."""
472 |         return "Identified exact root cause with minimal fix requirement locally"
473 | 
474 |     def get_request_relevant_context(self, request) -> list:
475 |         """Get relevant_context for debug tool."""
476 |         try:
477 |             return request.relevant_context or []
478 |         except AttributeError:
479 |             return []
480 | 
481 |     def get_skip_expert_analysis_status(self) -> str:
482 |         """Debug-specific expert analysis skip status."""
483 |         return "skipped_due_to_certain_confidence"
484 | 
485 |     def prepare_work_summary(self) -> str:
486 |         """Debug-specific work summary."""
487 |         return self._build_investigation_summary(self.consolidated_findings)
488 | 
489 |     def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
490 |         """
491 |         Debug-specific completion message.
492 | 
493 |         Args:
494 |             expert_analysis_used: True if expert analysis was successfully executed
495 |         """
496 |         base_message = (
497 |             "INVESTIGATION IS COMPLETE. YOU MUST now summarize and present ALL key findings, confirmed "
498 |             "hypotheses, and exact recommended fixes. Clearly identify the most likely root cause and "
499 |             "provide concrete, actionable implementation guidance. Highlight affected code paths and display "
500 |             "reasoning that led to this conclusion—make it easy for a developer to understand exactly where "
501 |             "the problem lies. Where necessary, show cause-and-effect / bug-trace call graph."
502 |         )
503 | 
504 |         # Add expert analysis guidance only when expert analysis was actually used
505 |         if expert_analysis_used:
506 |             expert_guidance = self.get_expert_analysis_guidance()
507 |             if expert_guidance:
508 |                 return f"{base_message}\n\n{expert_guidance}"
509 | 
510 |         return base_message
511 | 
512 |     def get_expert_analysis_guidance(self) -> str:
513 |         """
514 |         Get additional guidance for handling expert analysis results in debug context.
515 | 
516 |         Returns:
517 |             Additional guidance text for validating and using expert analysis findings
518 |         """
519 |         return (
520 |             "IMPORTANT: Expert debugging analysis has been provided above. You MUST validate "
521 |             "the expert's root cause analysis and proposed fixes against your own investigation. "
522 |             "Ensure the expert's findings align with the evidence you've gathered and that the "
523 |             "recommended solutions address the actual problem, not just symptoms. If the expert "
524 |             "suggests a different root cause than you identified, carefully consider both perspectives "
525 |             "and present a balanced assessment to the user."
526 |         )
527 | 
528 |     def get_step_guidance_message(self, request) -> str:
529 |         """
530 |         Debug-specific step guidance with detailed investigation instructions.
531 |         """
532 |         step_guidance = self.get_step_guidance(request.step_number, request.confidence, request)
533 |         return step_guidance["next_steps"]
534 | 
535 |     def customize_workflow_response(self, response_data: dict, request) -> dict:
536 |         """
537 |         Customize response to match original debug tool format.
538 |         """
539 |         # Store initial issue on first step
540 |         if request.step_number == 1:
541 |             self.initial_issue = request.step
542 | 
543 |         # Convert generic status names to debug-specific ones
544 |         tool_name = self.get_name()
545 |         status_mapping = {
546 |             f"{tool_name}_in_progress": "investigation_in_progress",
547 |             f"pause_for_{tool_name}": "pause_for_investigation",
548 |             f"{tool_name}_required": "investigation_required",
549 |             f"{tool_name}_complete": "investigation_complete",
550 |         }
551 | 
552 |         if response_data["status"] in status_mapping:
553 |             response_data["status"] = status_mapping[response_data["status"]]
554 | 
555 |         # Rename status field to match debug tool
556 |         if f"{tool_name}_status" in response_data:
557 |             response_data["investigation_status"] = response_data.pop(f"{tool_name}_status")
558 |             # Add debug-specific status fields
559 |             response_data["investigation_status"]["hypotheses_formed"] = len(self.consolidated_findings.hypotheses)
560 | 
561 |         # Rename complete investigation data
562 |         if f"complete_{tool_name}" in response_data:
563 |             response_data["complete_investigation"] = response_data.pop(f"complete_{tool_name}")
564 | 
565 |         # Map the completion flag to match original debug tool
566 |         if f"{tool_name}_complete" in response_data:
567 |             response_data["investigation_complete"] = response_data.pop(f"{tool_name}_complete")
568 | 
569 |         # Map the required flag to match original debug tool
570 |         if f"{tool_name}_required" in response_data:
571 |             response_data["investigation_required"] = response_data.pop(f"{tool_name}_required")
572 | 
573 |         return response_data
574 | 
575 |     # Required abstract methods from BaseTool
576 |     def get_request_model(self):
577 |         """Return the debug-specific request model."""
578 |         return DebugInvestigationRequest
579 | 
580 |     async def prepare_prompt(self, request) -> str:
581 |         """Not used - workflow tools use execute_workflow()."""
582 |         return ""  # Workflow tools use execute_workflow() directly
583 | 
```

--------------------------------------------------------------------------------
/tools/analyze.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | AnalyzeWorkflow tool - Step-by-step code analysis with systematic investigation
  3 | 
  4 | This tool provides a structured workflow for comprehensive code and file analysis.
  5 | It guides the CLI agent through systematic investigation steps with forced pauses between each step
  6 | to ensure thorough code examination, pattern identification, and architectural assessment before proceeding.
  7 | The tool supports complex analysis scenarios including architectural review, performance analysis,
  8 | security assessment, and maintainability evaluation.
  9 | 
 10 | Key features:
 11 | - Step-by-step analysis workflow with progress tracking
 12 | - Context-aware file embedding (references during investigation, full content for analysis)
 13 | - Automatic pattern and insight tracking with categorization
 14 | - Expert analysis integration with external models
 15 | - Support for focused analysis (architecture, performance, security, quality)
 16 | - Confidence-based workflow optimization
 17 | """
 18 | 
 19 | import logging
 20 | from typing import TYPE_CHECKING, Any, Literal, Optional
 21 | 
 22 | from pydantic import Field, model_validator
 23 | 
 24 | if TYPE_CHECKING:
 25 |     from tools.models import ToolModelCategory
 26 | 
 27 | from config import TEMPERATURE_ANALYTICAL
 28 | from systemprompts import ANALYZE_PROMPT
 29 | from tools.shared.base_models import WorkflowRequest
 30 | 
 31 | from .workflow.base import WorkflowTool
 32 | 
 33 | logger = logging.getLogger(__name__)
 34 | 
 35 | # Tool-specific field descriptions for analyze workflow
 36 | ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS = {
 37 |     "step": (
 38 |         "The analysis plan. Step 1: State your strategy, including how you will map the codebase structure, "
 39 |         "understand business logic, and assess code quality, performance implications, and architectural patterns. "
 40 |         "Later steps: Report findings and adapt the approach as new insights emerge."
 41 |     ),
 42 |     "step_number": (
 43 |         "The index of the current step in the analysis sequence, beginning at 1. Each step should build upon or "
 44 |         "revise the previous one."
 45 |     ),
 46 |     "total_steps": (
 47 |         "Your current estimate for how many steps will be needed to complete the analysis. "
 48 |         "Adjust as new findings emerge."
 49 |     ),
 50 |     "next_step_required": (
 51 |         "Set to true if you plan to continue the investigation with another step. False means you believe the "
 52 |         "analysis is complete and ready for expert validation."
 53 |     ),
 54 |     "findings": (
 55 |         "Summary of discoveries from this step, including architectural patterns, tech stack assessment, scalability characteristics, "
 56 |         "performance implications, maintainability factors, and strategic improvement opportunities. "
 57 |         "IMPORTANT: Document both strengths (good patterns, solid architecture) and concerns (tech debt, overengineering, unnecessary complexity). "
 58 |         "In later steps, confirm or update past findings with additional evidence."
 59 |     ),
 60 |     "files_checked": (
 61 |         "List all files examined (absolute paths). Include even ruled-out files to track exploration path."
 62 |     ),
 63 |     "relevant_files": (
 64 |         "Subset of files_checked directly relevant to analysis findings (absolute paths). Include files with "
 65 |         "significant patterns, architectural decisions, or strategic improvement opportunities."
 66 |     ),
 67 |     "relevant_context": (
 68 |         "List methods/functions central to analysis findings, in 'ClassName.methodName' or 'functionName' format. "
 69 |         "Prioritize those demonstrating key patterns, architectural decisions, or improvement opportunities."
 70 |     ),
 71 |     "images": (
 72 |         "Optional absolute paths to architecture diagrams or visual references that help with analysis context."
 73 |     ),
 74 |     "confidence": (
 75 |         "Your confidence in the analysis: exploring, low, medium, high, very_high, almost_certain, or certain. "
 76 |         "'certain' indicates the analysis is complete and ready for validation."
 77 |     ),
 78 |     "analysis_type": "Type of analysis to perform (architecture, performance, security, quality, general)",
 79 |     "output_format": "How to format the output (summary, detailed, actionable)",
 80 | }
 81 | 
 82 | 
 83 | class AnalyzeWorkflowRequest(WorkflowRequest):
 84 |     """Request model for analyze workflow investigation steps"""
 85 | 
 86 |     # Required fields for each investigation step
 87 |     step: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"])
 88 |     step_number: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
 89 |     total_steps: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
 90 |     next_step_required: bool = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
 91 | 
 92 |     # Investigation tracking fields
 93 |     findings: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
 94 |     files_checked: list[str] = Field(
 95 |         default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
 96 |     )
 97 |     relevant_files: list[str] = Field(
 98 |         default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
 99 |     )
100 |     relevant_context: list[str] = Field(
101 |         default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
102 |     )
103 | 
104 |     # Issues found during analysis (structured with severity)
105 |     issues_found: list[dict] = Field(
106 |         default_factory=list,
107 |         description="Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
108 |     )
109 | 
110 |     # Optional images for visual context
111 |     images: Optional[list[str]] = Field(default=None, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"])
112 | 
113 |     # Analyze-specific fields (only used in step 1 to initialize)
114 |     # Note: Use relevant_files field instead of files for consistency across workflow tools
115 |     analysis_type: Optional[Literal["architecture", "performance", "security", "quality", "general"]] = Field(
116 |         "general", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"]
117 |     )
118 |     output_format: Optional[Literal["summary", "detailed", "actionable"]] = Field(
119 |         "detailed", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"]
120 |     )
121 | 
122 |     # Keep thinking_mode from original analyze tool; temperature is inherited from WorkflowRequest
123 | 
124 |     @model_validator(mode="after")
125 |     def validate_step_one_requirements(self):
126 |         """Ensure step 1 has required relevant_files."""
127 |         if self.step_number == 1:
128 |             if not self.relevant_files:
129 |                 raise ValueError("Step 1 requires 'relevant_files' field to specify files or directories to analyze")
130 |         return self
131 | 
132 | 
133 | class AnalyzeTool(WorkflowTool):
134 |     """
135 |     Analyze workflow tool for step-by-step code analysis and expert validation.
136 | 
137 |     This tool implements a structured analysis workflow that guides users through
138 |     methodical investigation steps, ensuring thorough code examination, pattern identification,
139 |     and architectural assessment before reaching conclusions. It supports complex analysis scenarios
140 |     including architectural review, performance analysis, security assessment, and maintainability evaluation.
141 |     """
142 | 
143 |     def __init__(self):
144 |         super().__init__()
145 |         self.initial_request = None
146 |         self.analysis_config = {}
147 | 
148 |     def get_name(self) -> str:
149 |         return "analyze"
150 | 
151 |     def get_description(self) -> str:
152 |         return (
153 |             "Performs comprehensive code analysis with systematic investigation and expert validation. "
154 |             "Use for architecture, performance, maintainability, and pattern analysis. "
155 |             "Guides through structured code review and strategic planning."
156 |         )
157 | 
158 |     def get_system_prompt(self) -> str:
159 |         return ANALYZE_PROMPT
160 | 
161 |     def get_default_temperature(self) -> float:
162 |         return TEMPERATURE_ANALYTICAL
163 | 
164 |     def get_model_category(self) -> "ToolModelCategory":
165 |         """Analyze workflow requires thorough analysis and reasoning"""
166 |         from tools.models import ToolModelCategory
167 | 
168 |         return ToolModelCategory.EXTENDED_REASONING
169 | 
170 |     def get_workflow_request_model(self):
171 |         """Return the analyze workflow-specific request model."""
172 |         return AnalyzeWorkflowRequest
173 | 
174 |     def get_input_schema(self) -> dict[str, Any]:
175 |         """Generate input schema using WorkflowSchemaBuilder with analyze-specific overrides."""
176 |         from .workflow.schema_builders import WorkflowSchemaBuilder
177 | 
178 |         # Fields to exclude from analyze workflow (inherited from WorkflowRequest but not used)
179 |         excluded_fields = {"hypothesis", "confidence"}
180 | 
181 |         # Analyze workflow-specific field overrides
182 |         analyze_field_overrides = {
183 |             "step": {
184 |                 "type": "string",
185 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"],
186 |             },
187 |             "step_number": {
188 |                 "type": "integer",
189 |                 "minimum": 1,
190 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
191 |             },
192 |             "total_steps": {
193 |                 "type": "integer",
194 |                 "minimum": 1,
195 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
196 |             },
197 |             "next_step_required": {
198 |                 "type": "boolean",
199 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
200 |             },
201 |             "findings": {
202 |                 "type": "string",
203 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
204 |             },
205 |             "files_checked": {
206 |                 "type": "array",
207 |                 "items": {"type": "string"},
208 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"],
209 |             },
210 |             "relevant_files": {
211 |                 "type": "array",
212 |                 "items": {"type": "string"},
213 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
214 |             },
215 |             "confidence": {
216 |                 "type": "string",
217 |                 "enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
218 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["confidence"],
219 |             },
220 |             "images": {
221 |                 "type": "array",
222 |                 "items": {"type": "string"},
223 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"],
224 |             },
225 |             "issues_found": {
226 |                 "type": "array",
227 |                 "items": {"type": "object"},
228 |                 "description": "Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
229 |             },
230 |             "analysis_type": {
231 |                 "type": "string",
232 |                 "enum": ["architecture", "performance", "security", "quality", "general"],
233 |                 "default": "general",
234 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"],
235 |             },
236 |             "output_format": {
237 |                 "type": "string",
238 |                 "enum": ["summary", "detailed", "actionable"],
239 |                 "default": "detailed",
240 |                 "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"],
241 |             },
242 |         }
243 | 
244 |         # Use WorkflowSchemaBuilder with analyze-specific tool fields
245 |         return WorkflowSchemaBuilder.build_schema(
246 |             tool_specific_fields=analyze_field_overrides,
247 |             model_field_schema=self.get_model_field_schema(),
248 |             auto_mode=self.is_effective_auto_mode(),
249 |             tool_name=self.get_name(),
250 |             excluded_workflow_fields=list(excluded_fields),
251 |         )
252 | 
253 |     def get_required_actions(
254 |         self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
255 |     ) -> list[str]:
256 |         """Define required actions for each investigation phase."""
257 |         if step_number == 1:
258 |             # Initial analysis investigation tasks
259 |             return [
260 |                 "Read and understand the code files specified for analysis",
261 |                 "Map the tech stack, frameworks, and overall architecture",
262 |                 "Identify the main components, modules, and their relationships",
263 |                 "Understand the business logic and intended functionality",
264 |                 "Examine architectural patterns and design decisions used",
265 |                 "Look for strengths, risks, and strategic improvement areas",
266 |             ]
267 |         elif step_number < total_steps:
268 |             # Need deeper investigation
269 |             return [
270 |                 "Examine specific architectural patterns and design decisions in detail",
271 |                 "Analyze scalability characteristics and performance implications",
272 |                 "Assess maintainability factors: module cohesion, coupling, tech debt",
273 |                 "Identify security posture and potential systemic vulnerabilities",
274 |                 "Look for overengineering, unnecessary complexity, or missing abstractions",
275 |                 "Evaluate how well the architecture serves business and scaling goals",
276 |             ]
277 |         else:
278 |             # Close to completion - need final verification
279 |             return [
280 |                 "Verify all significant architectural insights have been documented",
281 |                 "Confirm strategic improvement opportunities are comprehensively captured",
282 |                 "Ensure both strengths and risks are properly identified with evidence",
283 |                 "Validate that findings align with the analysis type and goals specified",
284 |                 "Check that recommendations are actionable and proportional to the codebase",
285 |                 "Confirm the analysis provides clear guidance for strategic decisions",
286 |             ]
287 | 
288 |     def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
289 |         """
290 |         Always call expert analysis for comprehensive validation.
291 | 
292 |         Analysis benefits from a second opinion to ensure completeness.
293 |         """
294 |         # Check if user explicitly requested to skip assistant model
295 |         if request and not self.get_request_use_assistant_model(request):
296 |             return False
297 | 
298 |         # For analysis, we always want expert validation if we have any meaningful data
299 |         return len(consolidated_findings.relevant_files) > 0 or len(consolidated_findings.findings) >= 1
300 | 
301 |     def prepare_expert_analysis_context(self, consolidated_findings) -> str:
302 |         """Prepare context for external model call for final analysis validation."""
303 |         context_parts = [
304 |             f"=== ANALYSIS REQUEST ===\\n{self.initial_request or 'Code analysis workflow initiated'}\\n=== END REQUEST ==="
305 |         ]
306 | 
307 |         # Add investigation summary
308 |         investigation_summary = self._build_analysis_summary(consolidated_findings)
309 |         context_parts.append(
310 |             f"\\n=== AGENT'S ANALYSIS INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
311 |         )
312 | 
313 |         # Add analysis configuration context if available
314 |         if self.analysis_config:
315 |             config_text = "\\n".join(f"- {key}: {value}" for key, value in self.analysis_config.items() if value)
316 |             context_parts.append(f"\\n=== ANALYSIS CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")
317 | 
318 |         # Add relevant code elements if available
319 |         if consolidated_findings.relevant_context:
320 |             methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
321 |             context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")
322 | 
323 |         # Add assessment evolution if available
324 |         if consolidated_findings.hypotheses:
325 |             assessments_text = "\\n".join(
326 |                 f"Step {h['step']}: {h['hypothesis']}" for h in consolidated_findings.hypotheses
327 |             )
328 |             context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")
329 | 
330 |         # Add images if available
331 |         if consolidated_findings.images:
332 |             images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
333 |             context_parts.append(
334 |                 f"\\n=== VISUAL ANALYSIS INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
335 |             )
336 | 
337 |         return "\\n".join(context_parts)
338 | 
339 |     def _build_analysis_summary(self, consolidated_findings) -> str:
340 |         """Prepare a comprehensive summary of the analysis investigation."""
341 |         summary_parts = [
342 |             "=== SYSTEMATIC ANALYSIS INVESTIGATION SUMMARY ===",
343 |             f"Total steps: {len(consolidated_findings.findings)}",
344 |             f"Files examined: {len(consolidated_findings.files_checked)}",
345 |             f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
346 |             f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
347 |             "",
348 |             "=== INVESTIGATION PROGRESSION ===",
349 |         ]
350 | 
351 |         for finding in consolidated_findings.findings:
352 |             summary_parts.append(finding)
353 | 
354 |         return "\\n".join(summary_parts)
355 | 
356 |     def should_include_files_in_expert_prompt(self) -> bool:
357 |         """Include files in expert analysis for comprehensive validation."""
358 |         return True
359 | 
360 |     def should_embed_system_prompt(self) -> bool:
361 |         """Embed system prompt in expert analysis for proper context."""
362 |         return True
363 | 
364 |     def get_expert_thinking_mode(self) -> str:
365 |         """Use high thinking mode for thorough analysis."""
366 |         return "high"
367 | 
368 |     def get_expert_analysis_instruction(self) -> str:
369 |         """Get specific instruction for analysis expert validation."""
370 |         return (
371 |             "Please provide comprehensive analysis validation based on the investigation findings. "
372 |             "Focus on identifying any remaining architectural insights, validating the completeness of the analysis, "
373 |             "and providing final strategic recommendations following the structured format specified in the system prompt."
374 |         )
375 | 
376 |     # Hook method overrides for analyze-specific behavior
377 | 
378 |     def prepare_step_data(self, request) -> dict:
379 |         """
380 |         Map analyze-specific fields for internal processing.
381 |         """
382 |         step_data = {
383 |             "step": request.step,
384 |             "step_number": request.step_number,
385 |             "findings": request.findings,
386 |             "files_checked": request.files_checked,
387 |             "relevant_files": request.relevant_files,
388 |             "relevant_context": request.relevant_context,
389 |             "issues_found": request.issues_found,  # Analyze workflow uses issues_found for structured problem tracking
390 |             "confidence": "medium",  # Fixed value for workflow compatibility
391 |             "hypothesis": request.findings,  # Map findings to hypothesis for compatibility
392 |             "images": request.images or [],
393 |         }
394 |         return step_data
395 | 
396 |     def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
397 |         """
398 |         Analyze workflow always uses expert analysis for comprehensive validation.
399 | 
400 |         Analysis benefits from a second opinion to ensure completeness and catch
401 |         any missed insights or alternative perspectives.
402 |         """
403 |         return False
404 | 
405 |     def store_initial_issue(self, step_description: str):
406 |         """Store initial request for expert analysis."""
407 |         self.initial_request = step_description
408 | 
409 |     # Override inheritance hooks for analyze-specific behavior
410 | 
411 |     def get_completion_status(self) -> str:
412 |         """Analyze tools use analysis-specific status."""
413 |         return "analysis_complete_ready_for_implementation"
414 | 
415 |     def get_completion_data_key(self) -> str:
416 |         """Analyze uses 'complete_analysis' key."""
417 |         return "complete_analysis"
418 | 
419 |     def get_final_analysis_from_request(self, request):
420 |         """Analyze tools use 'findings' field."""
421 |         return request.findings
422 | 
423 |     def get_confidence_level(self, request) -> str:
424 |         """Analyze tools use fixed confidence for consistency."""
425 |         return "medium"
426 | 
427 |     def get_completion_message(self) -> str:
428 |         """Analyze-specific completion message."""
429 |         return (
430 |             "Analysis complete. You have identified all significant patterns, "
431 |             "architectural insights, and strategic opportunities. MANDATORY: Present the user with the complete "
432 |             "analysis results organized by strategic impact, and IMMEDIATELY proceed with implementing the "
433 |             "highest priority recommendations or provide specific guidance for improvements. Focus on actionable "
434 |             "strategic insights."
435 |         )
436 | 
437 |     def get_skip_reason(self) -> str:
438 |         """Analyze-specific skip reason."""
439 |         return "Completed comprehensive analysis locally"
440 | 
441 |     def get_skip_expert_analysis_status(self) -> str:
442 |         """Analyze-specific expert analysis skip status."""
443 |         return "skipped_due_to_complete_analysis"
444 | 
445 |     def prepare_work_summary(self) -> str:
446 |         """Analyze-specific work summary."""
447 |         return self._build_analysis_summary(self.consolidated_findings)
448 | 
449 |     def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
450 |         """
451 |         Analyze-specific completion message.
452 |         """
453 |         base_message = (
454 |             "ANALYSIS IS COMPLETE. You MUST now summarize and present ALL analysis findings organized by "
455 |             "strategic impact (Critical → High → Medium → Low), specific architectural insights with code references, "
456 |             "and exact recommendations for improvement. Clearly prioritize the top 3 strategic opportunities that need "
457 |             "immediate attention. Provide concrete, actionable guidance for each finding—make it easy for a developer "
458 |             "to understand exactly what strategic improvements to implement and how to approach them."
459 |         )
460 | 
461 |         # Add expert analysis guidance only when expert analysis was actually used
462 |         if expert_analysis_used:
463 |             expert_guidance = self.get_expert_analysis_guidance()
464 |             if expert_guidance:
465 |                 return f"{base_message}\n\n{expert_guidance}"
466 | 
467 |         return base_message
468 | 
469 |     def get_expert_analysis_guidance(self) -> str:
470 |         """
471 |         Provide specific guidance for handling expert analysis in code analysis.
472 |         """
473 |         return (
474 |             "IMPORTANT: Analysis from an assistant model has been provided above. You MUST thoughtfully evaluate and validate "
475 |             "the expert insights rather than treating them as definitive conclusions. Cross-reference the expert "
476 |             "analysis with your own systematic investigation, verify that architectural recommendations are "
477 |             "appropriate for this codebase's scale and context, and ensure suggested improvements align with "
478 |             "the project's goals and constraints. Present a comprehensive synthesis that combines your detailed "
479 |             "analysis with validated expert perspectives, clearly distinguishing between patterns you've "
480 |             "independently identified and additional strategic insights from expert validation."
481 |         )
482 | 
483 |     def get_step_guidance_message(self, request) -> str:
484 |         """
485 |         Analyze-specific step guidance with detailed investigation instructions.
486 |         """
487 |         step_guidance = self.get_analyze_step_guidance(request.step_number, request)
488 |         return step_guidance["next_steps"]
489 | 
490 |     def get_analyze_step_guidance(self, step_number: int, request) -> dict[str, Any]:
491 |         """
492 |         Provide step-specific guidance for analyze workflow.
493 |         """
494 |         # Generate the next steps instruction based on required actions
495 |         required_actions = self.get_required_actions(step_number, "medium", request.findings, request.total_steps)
496 | 
497 |         if step_number == 1:
498 |             next_steps = (
499 |                 f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
500 |                 f"the code files thoroughly using appropriate tools. CRITICAL AWARENESS: You need to understand "
501 |                 f"the architectural patterns, assess scalability and performance characteristics, identify strategic "
502 |                 f"improvement areas, and look for systemic risks, overengineering, and missing abstractions. "
503 |                 f"Use file reading tools, code analysis, and systematic examination to gather comprehensive information. "
504 |                 f"Only call {self.get_name()} again AFTER completing your investigation. When you call "
505 |                 f"{self.get_name()} next time, use step_number: {step_number + 1} and report specific "
506 |                 f"files examined, architectural insights found, and strategic assessment discoveries."
507 |             )
508 |         elif step_number < request.total_steps:
509 |             next_steps = (
510 |                 f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
511 |                 f"deeper analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
512 |                 + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
513 |                 + f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
514 |                 + "completing these analysis tasks."
515 |             )
516 |         else:
517 |             next_steps = (
518 |                 f"WAIT! Your analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
519 |                 + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
520 |                 + f"\\n\\nREMEMBER: Ensure you have identified all significant architectural insights and strategic "
521 |                 f"opportunities across all areas. Document findings with specific file references and "
522 |                 f"code examples where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
523 |             )
524 | 
525 |         return {"next_steps": next_steps}
526 | 
527 |     def customize_workflow_response(self, response_data: dict, request) -> dict:
528 |         """
529 |         Customize response to match analyze workflow format.
530 |         """
531 |         # Store initial request on first step
532 |         if request.step_number == 1:
533 |             self.initial_request = request.step
534 |             # Store analysis configuration for expert analysis
535 |             if request.relevant_files:
536 |                 self.analysis_config = {
537 |                     "relevant_files": request.relevant_files,
538 |                     "analysis_type": request.analysis_type,
539 |                     "output_format": request.output_format,
540 |                 }
541 | 
542 |         # Convert generic status names to analyze-specific ones
543 |         tool_name = self.get_name()
544 |         status_mapping = {
545 |             f"{tool_name}_in_progress": "analysis_in_progress",
546 |             f"pause_for_{tool_name}": "pause_for_analysis",
547 |             f"{tool_name}_required": "analysis_required",
548 |             f"{tool_name}_complete": "analysis_complete",
549 |         }
550 | 
551 |         if response_data["status"] in status_mapping:
552 |             response_data["status"] = status_mapping[response_data["status"]]
553 | 
554 |         # Rename status field to match analyze workflow
555 |         if f"{tool_name}_status" in response_data:
556 |             response_data["analysis_status"] = response_data.pop(f"{tool_name}_status")
557 |             # Add analyze-specific status fields
558 |             response_data["analysis_status"]["insights_by_severity"] = {}
559 |             for insight in self.consolidated_findings.issues_found:
560 |                 severity = insight.get("severity", "unknown")
561 |                 if severity not in response_data["analysis_status"]["insights_by_severity"]:
562 |                     response_data["analysis_status"]["insights_by_severity"][severity] = 0
563 |                 response_data["analysis_status"]["insights_by_severity"][severity] += 1
564 |             response_data["analysis_status"]["analysis_confidence"] = self.get_request_confidence(request)
565 | 
566 |         # Map complete_analyze to complete_analysis
567 |         if f"complete_{tool_name}" in response_data:
568 |             response_data["complete_analysis"] = response_data.pop(f"complete_{tool_name}")
569 | 
570 |         # Map the completion flag to match analyze workflow
571 |         if f"{tool_name}_complete" in response_data:
572 |             response_data["analysis_complete"] = response_data.pop(f"{tool_name}_complete")
573 | 
574 |         return response_data
575 | 
576 |     # Required abstract methods from BaseTool
577 |     def get_request_model(self):
578 |         """Return the analyze workflow-specific request model."""
579 |         return AnalyzeWorkflowRequest
580 | 
581 |     async def prepare_prompt(self, request) -> str:
582 |         """Not used - workflow tools use execute_workflow()."""
583 |         return ""  # Workflow tools use execute_workflow() directly
584 | 
```

--------------------------------------------------------------------------------
/simulator_tests/test_planner_validation.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | PlannerWorkflow Tool Validation Test
  4 | 
  5 | Tests the planner tool's capabilities using the new workflow architecture.
  6 | This validates that the new workflow-based implementation maintains all the
  7 | functionality of the original planner tool while using the workflow pattern
  8 | like the debug tool.
  9 | """
 10 | 
 11 | import json
 12 | from typing import Optional
 13 | 
 14 | from .conversation_base_test import ConversationBaseTest
 15 | 
 16 | 
 17 | class PlannerValidationTest(ConversationBaseTest):
 18 |     """Test planner tool with new workflow architecture"""
 19 | 
 20 |     @property
 21 |     def test_name(self) -> str:
 22 |         return "planner_validation"
 23 | 
 24 |     @property
 25 |     def test_description(self) -> str:
 26 |         return "PlannerWorkflow tool validation with new workflow architecture"
 27 | 
 28 |     def run_test(self) -> bool:
 29 |         """Test planner tool capabilities"""
 30 |         # Set up the test environment
 31 |         self.setUp()
 32 | 
 33 |         try:
 34 |             self.logger.info("Test: PlannerWorkflow tool validation (new architecture)")
 35 | 
 36 |             # Test 1: Single planning session with workflow architecture
 37 |             if not self._test_single_planning_session():
 38 |                 return False
 39 | 
 40 |             # Test 2: Planning with continuation using workflow
 41 |             if not self._test_planning_with_continuation():
 42 |                 return False
 43 | 
 44 |             # Test 3: Complex plan with deep thinking pauses
 45 |             if not self._test_complex_plan_deep_thinking():
 46 |                 return False
 47 | 
 48 |             # Test 4: Self-contained completion (no expert analysis)
 49 |             if not self._test_self_contained_completion():
 50 |                 return False
 51 | 
 52 |             # Test 5: Branching and revision with workflow
 53 |             if not self._test_branching_and_revision():
 54 |                 return False
 55 | 
 56 |             # Test 6: Workflow file context behavior
 57 |             if not self._test_workflow_file_context():
 58 |                 return False
 59 | 
 60 |             self.logger.info("  ✅ All planner validation tests passed")
 61 |             return True
 62 | 
 63 |         except Exception as e:
 64 |             self.logger.error(f"PlannerWorkflow validation test failed: {e}")
 65 |             return False
 66 | 
 67 |     def _test_single_planning_session(self) -> bool:
 68 |         """Test a complete planning session with workflow architecture"""
 69 |         try:
 70 |             self.logger.info("  1.1: Testing single planning session with workflow")
 71 | 
 72 |             # Step 1: Start planning
 73 |             self.logger.info("    1.1.1: Step 1 - Initial planning step")
 74 |             response1, continuation_id = self.call_mcp_tool(
 75 |                 "planner",
 76 |                 {
 77 |                     "step": "I need to plan a comprehensive API redesign for our legacy system. Let me start by analyzing the current state and identifying key requirements for the new API architecture.",
 78 |                     "step_number": 1,
 79 |                     "total_steps": 4,
 80 |                     "next_step_required": True,
 81 |                     "model": "flash",
 82 |                 },
 83 |             )
 84 | 
 85 |             if not response1 or not continuation_id:
 86 |                 self.logger.error("Failed to get initial planning response")
 87 |                 return False
 88 | 
 89 |             # Parse and validate JSON response
 90 |             response1_data = self._parse_planner_response(response1)
 91 |             if not response1_data:
 92 |                 return False
 93 | 
 94 |             # Validate step 1 response structure - expect pause_for_planner for next_step_required=True
 95 |             if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_planner"):
 96 |                 return False
 97 | 
 98 |             # Debug: Log the actual response structure to see what we're getting
 99 |             self.logger.debug(f"Response structure: {list(response1_data.keys())}")
100 | 
101 |             # Check workflow-specific response structure (more flexible)
102 |             status_key = None
103 |             for key in response1_data.keys():
104 |                 if key.endswith("_status"):
105 |                     status_key = key
106 |                     break
107 | 
108 |             if not status_key:
109 |                 self.logger.error(f"Missing workflow status field in response: {list(response1_data.keys())}")
110 |                 return False
111 | 
112 |             self.logger.debug(f"Found status field: {status_key}")
113 | 
114 |             # Check required_actions for workflow guidance
115 |             if not response1_data.get("required_actions"):
116 |                 self.logger.error("Missing required_actions in workflow response")
117 |                 return False
118 | 
119 |             self.logger.info(f"    ✅ Step 1 successful with workflow, continuation_id: {continuation_id}")
120 | 
121 |             # Step 2: Continue planning
122 |             self.logger.info("    1.1.2: Step 2 - API domain analysis")
123 |             response2, _ = self.call_mcp_tool(
124 |                 "planner",
125 |                 {
126 |                     "step": "After analyzing the current API, I can identify three main domains: User Management, Content Management, and Analytics. Let me design the new API structure with RESTful endpoints and proper versioning.",
127 |                     "step_number": 2,
128 |                     "total_steps": 4,
129 |                     "next_step_required": True,
130 |                     "continuation_id": continuation_id,
131 |                     "model": "flash",
132 |                 },
133 |             )
134 | 
135 |             if not response2:
136 |                 self.logger.error("Failed to continue planning to step 2")
137 |                 return False
138 | 
139 |             response2_data = self._parse_planner_response(response2)
140 |             if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_planner"):
141 |                 return False
142 | 
143 |             # Check step history tracking in workflow (more flexible)
144 |             status_key = None
145 |             for key in response2_data.keys():
146 |                 if key.endswith("_status"):
147 |                     status_key = key
148 |                     break
149 | 
150 |             if status_key:
151 |                 workflow_status = response2_data.get(status_key, {})
152 |                 step_history_length = workflow_status.get("step_history_length", 0)
153 |                 if step_history_length < 2:
154 |                     self.logger.error(f"Step history not properly tracked in workflow: {step_history_length}")
155 |                     return False
156 |                 self.logger.debug(f"Step history length: {step_history_length}")
157 |             else:
158 |                 self.logger.warning("No workflow status found, skipping step history check")
159 | 
160 |             self.logger.info("    ✅ Step 2 successful with workflow tracking")
161 | 
162 |             # Step 3: Final step - should trigger completion
163 |             self.logger.info("    1.1.3: Step 3 - Final planning step")
164 |             response3, _ = self.call_mcp_tool(
165 |                 "planner",
166 |                 {
167 |                     "step": "API redesign plan complete: Phase 1 - User Management API, Phase 2 - Content Management API, Phase 3 - Analytics API. Each phase includes proper authentication, rate limiting, and comprehensive documentation.",
168 |                     "step_number": 3,
169 |                     "total_steps": 3,  # Adjusted total
170 |                     "next_step_required": False,  # Final step - should complete without expert analysis
171 |                     "continuation_id": continuation_id,
172 |                     "model": "flash",
173 |                 },
174 |             )
175 | 
176 |             if not response3:
177 |                 self.logger.error("Failed to complete planning session")
178 |                 return False
179 | 
180 |             response3_data = self._parse_planner_response(response3)
181 |             if not response3_data:
182 |                 return False
183 | 
184 |             # Validate final response structure - should be self-contained completion
185 |             if response3_data.get("status") != "planner_complete":
186 |                 self.logger.error(f"Expected status 'planner_complete', got '{response3_data.get('status')}'")
187 |                 return False
188 | 
189 |             if not response3_data.get("planning_complete"):
190 |                 self.logger.error("Expected planning_complete=true for final step")
191 |                 return False
192 | 
193 |             # Should NOT have expert_analysis (self-contained)
194 |             if "expert_analysis" in response3_data:
195 |                 self.logger.error("PlannerWorkflow should be self-contained without expert analysis")
196 |                 return False
197 | 
198 |             # Check plan_summary exists
199 |             if not response3_data.get("plan_summary"):
200 |                 self.logger.error("Missing plan_summary in final step")
201 |                 return False
202 | 
203 |             self.logger.info("    ✅ Planning session completed successfully with workflow architecture")
204 | 
205 |             # Store continuation_id for next test
206 |             self.api_continuation_id = continuation_id
207 |             return True
208 | 
209 |         except Exception as e:
210 |             self.logger.error(f"Single planning session test failed: {e}")
211 |             return False
212 | 
213 |     def _test_planning_with_continuation(self) -> bool:
214 |         """Test planning continuation with workflow architecture"""
215 |         try:
216 |             self.logger.info("  1.2: Testing planning continuation with workflow")
217 | 
218 |             # Use continuation from previous test if available
219 |             continuation_id = getattr(self, "api_continuation_id", None)
220 |             if not continuation_id:
221 |                 # Start fresh if no continuation available
222 |                 self.logger.info("    1.2.0: Starting fresh planning session")
223 |                 response0, continuation_id = self.call_mcp_tool(
224 |                     "planner",
225 |                     {
226 |                         "step": "Planning API security strategy",
227 |                         "step_number": 1,
228 |                         "total_steps": 2,
229 |                         "next_step_required": True,
230 |                         "model": "flash",
231 |                     },
232 |                 )
233 |                 if not response0 or not continuation_id:
234 |                     self.logger.error("Failed to start fresh planning session")
235 |                     return False
236 | 
237 |             # Test continuation step
238 |             self.logger.info("    1.2.1: Continue planning session")
239 |             response1, _ = self.call_mcp_tool(
240 |                 "planner",
241 |                 {
242 |                     "step": "Building on the API redesign, let me now plan the security implementation with OAuth 2.0, API keys, and rate limiting strategies.",
243 |                     "step_number": 2,
244 |                     "total_steps": 2,
245 |                     "next_step_required": True,
246 |                     "continuation_id": continuation_id,
247 |                     "model": "flash",
248 |                 },
249 |             )
250 | 
251 |             if not response1:
252 |                 self.logger.error("Failed to continue planning")
253 |                 return False
254 | 
255 |             response1_data = self._parse_planner_response(response1)
256 |             if not response1_data:
257 |                 return False
258 | 
259 |             # Validate continuation behavior
260 |             if not self._validate_step_response(response1_data, 2, 2, True, "pause_for_planner"):
261 |                 return False
262 | 
263 |             # Check that continuation_id is preserved
264 |             if response1_data.get("continuation_id") != continuation_id:
265 |                 self.logger.error("Continuation ID not preserved in workflow")
266 |                 return False
267 | 
268 |             self.logger.info("    ✅ Planning continuation working with workflow")
269 |             return True
270 | 
271 |         except Exception as e:
272 |             self.logger.error(f"Planning continuation test failed: {e}")
273 |             return False
274 | 
275 |     def _test_complex_plan_deep_thinking(self) -> bool:
276 |         """Test complex plan with deep thinking pauses"""
277 |         try:
278 |             self.logger.info("  1.3: Testing complex plan with deep thinking pauses")
279 | 
280 |             # Start complex plan (≥5 steps) - should trigger deep thinking
281 |             self.logger.info("    1.3.1: Step 1 of complex plan (should trigger deep thinking)")
282 |             response1, continuation_id = self.call_mcp_tool(
283 |                 "planner",
284 |                 {
285 |                     "step": "I need to plan a complete digital transformation for our enterprise organization, including cloud migration, process automation, and cultural change management.",
286 |                     "step_number": 1,
287 |                     "total_steps": 8,  # Complex plan ≥5 steps
288 |                     "next_step_required": True,
289 |                     "model": "flash",
290 |                 },
291 |             )
292 | 
293 |             if not response1 or not continuation_id:
294 |                 self.logger.error("Failed to start complex planning")
295 |                 return False
296 | 
297 |             response1_data = self._parse_planner_response(response1)
298 |             if not response1_data:
299 |                 return False
300 | 
301 |             # Should trigger deep thinking pause for complex plan
302 |             if response1_data.get("status") != "pause_for_deep_thinking":
303 |                 self.logger.error("Expected deep thinking pause for complex plan step 1")
304 |                 return False
305 | 
306 |             if not response1_data.get("thinking_required"):
307 |                 self.logger.error("Expected thinking_required=true for complex plan")
308 |                 return False
309 | 
310 |             # Check required thinking actions
311 |             required_thinking = response1_data.get("required_thinking", [])
312 |             if len(required_thinking) < 4:
313 |                 self.logger.error("Expected comprehensive thinking requirements for complex plan")
314 |                 return False
315 | 
316 |             # Check for deep thinking guidance in next_steps
317 |             next_steps = response1_data.get("next_steps", "")
318 |             if "MANDATORY" not in next_steps or "deep thinking" not in next_steps.lower():
319 |                 self.logger.error("Expected mandatory deep thinking guidance")
320 |                 return False
321 | 
322 |             self.logger.info("    ✅ Complex plan step 1 correctly triggered deep thinking pause")
323 | 
324 |             # Step 2 of complex plan - should also trigger deep thinking
325 |             self.logger.info("    1.3.2: Step 2 of complex plan (should trigger deep thinking)")
326 |             response2, _ = self.call_mcp_tool(
327 |                 "planner",
328 |                 {
329 |                     "step": "After deep analysis, I can see this transformation requires three parallel tracks: Technical Infrastructure, Business Process, and Human Capital. Let me design the coordination strategy.",
330 |                     "step_number": 2,
331 |                     "total_steps": 8,
332 |                     "next_step_required": True,
333 |                     "continuation_id": continuation_id,
334 |                     "model": "flash",
335 |                 },
336 |             )
337 | 
338 |             if not response2:
339 |                 self.logger.error("Failed to continue complex planning")
340 |                 return False
341 | 
342 |             response2_data = self._parse_planner_response(response2)
343 |             if not response2_data:
344 |                 return False
345 | 
346 |             # Step 2 should also trigger deep thinking for complex plans
347 |             if response2_data.get("status") != "pause_for_deep_thinking":
348 |                 self.logger.error("Expected deep thinking pause for complex plan step 2")
349 |                 return False
350 | 
351 |             self.logger.info("    ✅ Complex plan step 2 correctly triggered deep thinking pause")
352 | 
353 |             # Step 4 of complex plan - should use normal flow (after step 3)
354 |             self.logger.info("    1.3.3: Step 4 of complex plan (should use normal flow)")
355 |             response4, _ = self.call_mcp_tool(
356 |                 "planner",
357 |                 {
358 |                     "step": "Now moving to tactical planning: Phase 1 execution details with specific timelines and resource allocation for the technical infrastructure track.",
359 |                     "step_number": 4,
360 |                     "total_steps": 8,
361 |                     "next_step_required": True,
362 |                     "continuation_id": continuation_id,
363 |                     "model": "flash",
364 |                 },
365 |             )
366 | 
367 |             if not response4:
368 |                 self.logger.error("Failed to continue to step 4")
369 |                 return False
370 | 
371 |             response4_data = self._parse_planner_response(response4)
372 |             if not response4_data:
373 |                 return False
374 | 
375 |             # Step 4 should use normal flow (no more deep thinking pauses)
376 |             if response4_data.get("status") != "pause_for_planner":
377 |                 self.logger.error("Expected normal planning flow for step 4")
378 |                 return False
379 | 
380 |             if response4_data.get("thinking_required"):
381 |                 self.logger.error("Step 4 should not require special thinking pause")
382 |                 return False
383 | 
384 |             self.logger.info("    ✅ Complex plan transitions to normal flow after step 3")
385 |             return True
386 | 
387 |         except Exception as e:
388 |             self.logger.error(f"Complex plan deep thinking test failed: {e}")
389 |             return False
390 | 
391 |     def _test_self_contained_completion(self) -> bool:
392 |         """Test self-contained completion without expert analysis"""
393 |         try:
394 |             self.logger.info("  1.4: Testing self-contained completion")
395 | 
396 |             # Simple planning session that should complete without expert analysis
397 |             self.logger.info("    1.4.1: Simple planning session")
398 |             response1, continuation_id = self.call_mcp_tool(
399 |                 "planner",
400 |                 {
401 |                     "step": "Planning a simple website redesign with new color scheme and improved navigation.",
402 |                     "step_number": 1,
403 |                     "total_steps": 2,
404 |                     "next_step_required": True,
405 |                     "model": "flash",
406 |                 },
407 |             )
408 | 
409 |             if not response1 or not continuation_id:
410 |                 self.logger.error("Failed to start simple planning")
411 |                 return False
412 | 
413 |             # Final step - should complete without expert analysis
414 |             self.logger.info("    1.4.2: Final step - self-contained completion")
415 |             response2, _ = self.call_mcp_tool(
416 |                 "planner",
417 |                 {
418 |                     "step": "Website redesign plan complete: Phase 1 - Update color palette and typography, Phase 2 - Redesign navigation structure and user flows.",
419 |                     "step_number": 2,
420 |                     "total_steps": 2,
421 |                     "next_step_required": False,  # Final step
422 |                     "continuation_id": continuation_id,
423 |                     "model": "flash",
424 |                 },
425 |             )
426 | 
427 |             if not response2:
428 |                 self.logger.error("Failed to complete simple planning")
429 |                 return False
430 | 
431 |             response2_data = self._parse_planner_response(response2)
432 |             if not response2_data:
433 |                 return False
434 | 
435 |             # Validate self-contained completion
436 |             if response2_data.get("status") != "planner_complete":
437 |                 self.logger.error("Expected self-contained completion status")
438 |                 return False
439 | 
440 |             # Should NOT call expert analysis
441 |             if "expert_analysis" in response2_data:
442 |                 self.logger.error("PlannerWorkflow should not call expert analysis")
443 |                 return False
444 | 
445 |             # Should have planning_complete flag
446 |             if not response2_data.get("planning_complete"):
447 |                 self.logger.error("Expected planning_complete=true")
448 |                 return False
449 | 
450 |             # Should have plan_summary
451 |             if not response2_data.get("plan_summary"):
452 |                 self.logger.error("Expected plan_summary in completion")
453 |                 return False
454 | 
455 |             # Check completion instructions
456 |             output = response2_data.get("output", {})
457 |             if not output.get("instructions"):
458 |                 self.logger.error("Missing output instructions for plan presentation")
459 |                 return False
460 | 
461 |             self.logger.info("    ✅ Self-contained completion working correctly")
462 |             return True
463 | 
464 |         except Exception as e:
465 |             self.logger.error(f"Self-contained completion test failed: {e}")
466 |             return False
467 | 
468 |     def _test_branching_and_revision(self) -> bool:
469 |         """Test branching and revision with workflow architecture"""
470 |         try:
471 |             self.logger.info("  1.5: Testing branching and revision with workflow")
472 | 
473 |             # Start planning session for branching test
474 |             self.logger.info("    1.5.1: Start planning for branching test")
475 |             response1, continuation_id = self.call_mcp_tool(
476 |                 "planner",
477 |                 {
478 |                     "step": "Planning mobile app development strategy with different technology options to evaluate.",
479 |                     "step_number": 1,
480 |                     "total_steps": 4,
481 |                     "next_step_required": True,
482 |                     "model": "flash",
483 |                 },
484 |             )
485 | 
486 |             if not response1 or not continuation_id:
487 |                 self.logger.error("Failed to start branching test")
488 |                 return False
489 | 
490 |             # Create branch
491 |             self.logger.info("    1.5.2: Create branch for React Native approach")
492 |             response2, _ = self.call_mcp_tool(
493 |                 "planner",
494 |                 {
495 |                     "step": "Branch A: React Native approach - cross-platform development with shared codebase, faster development cycle, and consistent UI across platforms.",
496 |                     "step_number": 2,
497 |                     "total_steps": 4,
498 |                     "next_step_required": True,
499 |                     "is_branch_point": True,
500 |                     "branch_from_step": 1,
501 |                     "branch_id": "react-native",
502 |                     "continuation_id": continuation_id,
503 |                     "model": "flash",
504 |                 },
505 |             )
506 | 
507 |             if not response2:
508 |                 self.logger.error("Failed to create branch")
509 |                 return False
510 | 
511 |             response2_data = self._parse_planner_response(response2)
512 |             if not response2_data:
513 |                 return False
514 | 
515 |             # Validate branching in workflow
516 |             metadata = response2_data.get("metadata", {})
517 |             if not metadata.get("is_branch_point"):
518 |                 self.logger.error("Branch point not recorded in workflow")
519 |                 return False
520 | 
521 |             if metadata.get("branch_id") != "react-native":
522 |                 self.logger.error("Branch ID not properly recorded")
523 |                 return False
524 | 
525 |             if "react-native" not in metadata.get("branches", []):
526 |                 self.logger.error("Branch not added to branches list")
527 |                 return False
528 | 
529 |             self.logger.info("    ✅ Branching working with workflow architecture")
530 | 
531 |             # Test revision
532 |             self.logger.info("    1.5.3: Test revision capability")
533 |             response3, _ = self.call_mcp_tool(
534 |                 "planner",
535 |                 {
536 |                     "step": "Revision of step 2: After consideration, let me revise the React Native approach to include performance optimizations and native module integration for critical features.",
537 |                     "step_number": 3,
538 |                     "total_steps": 4,
539 |                     "next_step_required": True,
540 |                     "is_step_revision": True,
541 |                     "revises_step_number": 2,
542 |                     "continuation_id": continuation_id,
543 |                     "model": "flash",
544 |                 },
545 |             )
546 | 
547 |             if not response3:
548 |                 self.logger.error("Failed to create revision")
549 |                 return False
550 | 
551 |             response3_data = self._parse_planner_response(response3)
552 |             if not response3_data:
553 |                 return False
554 | 
555 |             # Validate revision in workflow
556 |             metadata = response3_data.get("metadata", {})
557 |             if not metadata.get("is_step_revision"):
558 |                 self.logger.error("Step revision not recorded in workflow")
559 |                 return False
560 | 
561 |             if metadata.get("revises_step_number") != 2:
562 |                 self.logger.error("Revised step number not properly recorded")
563 |                 return False
564 | 
565 |             self.logger.info("    ✅ Revision working with workflow architecture")
566 |             return True
567 | 
568 |         except Exception as e:
569 |             self.logger.error(f"Branching and revision test failed: {e}")
570 |             return False
571 | 
572 |     def _test_workflow_file_context(self) -> bool:
573 |         """Test workflow file context behavior (should be minimal for planner)"""
574 |         try:
575 |             self.logger.info("  1.6: Testing workflow file context behavior")
576 | 
577 |             # Planner typically doesn't use files, but test the workflow handles this correctly
578 |             self.logger.info("    1.6.1: Planning step with no files (normal case)")
579 |             response1, continuation_id = self.call_mcp_tool(
580 |                 "planner",
581 |                 {
582 |                     "step": "Planning data architecture for analytics platform.",
583 |                     "step_number": 1,
584 |                     "total_steps": 2,
585 |                     "next_step_required": True,
586 |                     "model": "flash",
587 |                 },
588 |             )
589 | 
590 |             if not response1 or not continuation_id:
591 |                 self.logger.error("Failed to start workflow file context test")
592 |                 return False
593 | 
594 |             response1_data = self._parse_planner_response(response1)
595 |             if not response1_data:
596 |                 return False
597 | 
598 |             # Planner workflow should not have file_context since it doesn't use files
599 |             if "file_context" in response1_data:
600 |                 self.logger.info("    ℹ️ Workflow file context present but should be minimal for planner")
601 | 
602 |             # Final step
603 |             self.logger.info("    1.6.2: Final step (should complete without file embedding)")
604 |             response2, _ = self.call_mcp_tool(
605 |                 "planner",
606 |                 {
607 |                     "step": "Data architecture plan complete with data lakes, processing pipelines, and analytics layers.",
608 |                     "step_number": 2,
609 |                     "total_steps": 2,
610 |                     "next_step_required": False,
611 |                     "continuation_id": continuation_id,
612 |                     "model": "flash",
613 |                 },
614 |             )
615 | 
616 |             if not response2:
617 |                 self.logger.error("Failed to complete workflow file context test")
618 |                 return False
619 | 
620 |             response2_data = self._parse_planner_response(response2)
621 |             if not response2_data:
622 |                 return False
623 | 
624 |             # Final step should complete self-contained
625 |             if response2_data.get("status") != "planner_complete":
626 |                 self.logger.error("Expected self-contained completion for planner workflow")
627 |                 return False
628 | 
629 |             self.logger.info("    ✅ Workflow file context behavior appropriate for planner")
630 |             return True
631 | 
632 |         except Exception as e:
633 |             self.logger.error(f"Workflow file context test failed: {e}")
634 |             return False
635 | 
636 |     def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
637 |         """Call an MCP tool in-process - override for planner-specific response handling"""
638 |         # Use in-process implementation to maintain conversation memory
639 |         response_text, _ = self.call_mcp_tool_direct(tool_name, params)
640 | 
641 |         if not response_text:
642 |             return None, None
643 | 
644 |         # Extract continuation_id from planner response specifically
645 |         continuation_id = self._extract_planner_continuation_id(response_text)
646 | 
647 |         return response_text, continuation_id
648 | 
649 |     def _extract_planner_continuation_id(self, response_text: str) -> Optional[str]:
650 |         """Extract continuation_id from planner response"""
651 |         try:
652 |             # Parse the response
653 |             response_data = json.loads(response_text)
654 |             return response_data.get("continuation_id")
655 | 
656 |         except json.JSONDecodeError as e:
657 |             self.logger.debug(f"Failed to parse response for planner continuation_id: {e}")
658 |             return None
659 | 
660 |     def _parse_planner_response(self, response_text: str) -> dict:
661 |         """Parse planner tool JSON response"""
662 |         try:
663 |             # Parse the response - it should be direct JSON
664 |             return json.loads(response_text)
665 | 
666 |         except json.JSONDecodeError as e:
667 |             self.logger.error(f"Failed to parse planner response as JSON: {e}")
668 |             self.logger.error(f"Response text: {response_text[:500]}...")
669 |             return {}
670 | 
671 |     def _validate_step_response(
672 |         self,
673 |         response_data: dict,
674 |         expected_step: int,
675 |         expected_total: int,
676 |         expected_next_required: bool,
677 |         expected_status: str,
678 |     ) -> bool:
679 |         """Validate a planner step response structure"""
680 |         try:
681 |             # Check status
682 |             if response_data.get("status") != expected_status:
683 |                 self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
684 |                 return False
685 | 
686 |             # Check step number
687 |             if response_data.get("step_number") != expected_step:
688 |                 self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
689 |                 return False
690 | 
691 |             # Check total steps
692 |             if response_data.get("total_steps") != expected_total:
693 |                 self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
694 |                 return False
695 | 
696 |             # Check next_step_required
697 |             if response_data.get("next_step_required") != expected_next_required:
698 |                 self.logger.error(
699 |                     f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
700 |                 )
701 |                 return False
702 | 
703 |             # Check step_content exists
704 |             if not response_data.get("step_content"):
705 |                 self.logger.error("Missing step_content in response")
706 |                 return False
707 | 
708 |             # Check next_steps guidance
709 |             if not response_data.get("next_steps"):
710 |                 self.logger.error("Missing next_steps guidance in response")
711 |                 return False
712 | 
713 |             return True
714 | 
715 |         except Exception as e:
716 |             self.logger.error(f"Error validating step response: {e}")
717 |             return False
718 | 
```
Page 16/25FirstPrevNextLast