#
tokens: 46835/50000 7/353 files (page 13/19)
lines: off (toggle) GitHub
raw markdown copy
This is page 13 of 19. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   └── fix-github-issue.md
│   └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── documentation.yml
│   │   ├── feature_request.yml
│   │   └── tool_addition.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-pr.yml
│       ├── docker-release.yml
│       ├── semantic-pr.yml
│       ├── semantic-release.yml
│       └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   ├── constants.py
│   ├── models.py
│   ├── parsers
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│   ├── __init__.py
│   ├── azure_models.json
│   ├── cli_clients
│   │   ├── claude.json
│   │   ├── codex.json
│   │   └── gemini.json
│   ├── custom_models.json
│   ├── dial_models.json
│   ├── gemini_models.json
│   ├── openai_models.json
│   ├── openrouter_models.json
│   └── xai_models.json
├── config.py
├── docker
│   ├── README.md
│   └── scripts
│       ├── build.ps1
│       ├── build.sh
│       ├── deploy.ps1
│       ├── deploy.sh
│       └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── adding_providers.md
│   ├── adding_tools.md
│   ├── advanced-usage.md
│   ├── ai_banter.md
│   ├── ai-collaboration.md
│   ├── azure_openai.md
│   ├── configuration.md
│   ├── context-revival.md
│   ├── contributions.md
│   ├── custom_models.md
│   ├── docker-deployment.md
│   ├── gemini-setup.md
│   ├── getting-started.md
│   ├── index.md
│   ├── locale-configuration.md
│   ├── logging.md
│   ├── model_ranking.md
│   ├── testing.md
│   ├── tools
│   │   ├── analyze.md
│   │   ├── apilookup.md
│   │   ├── challenge.md
│   │   ├── chat.md
│   │   ├── clink.md
│   │   ├── codereview.md
│   │   ├── consensus.md
│   │   ├── debug.md
│   │   ├── docgen.md
│   │   ├── listmodels.md
│   │   ├── planner.md
│   │   ├── precommit.md
│   │   ├── refactor.md
│   │   ├── secaudit.md
│   │   ├── testgen.md
│   │   ├── thinkdeep.md
│   │   ├── tracer.md
│   │   └── version.md
│   ├── troubleshooting.md
│   ├── vcr-testing.md
│   └── wsl-setup.md
├── examples
│   ├── claude_config_macos.json
│   └── claude_config_wsl.json
├── LICENSE
├── providers
│   ├── __init__.py
│   ├── azure_openai.py
│   ├── base.py
│   ├── custom.py
│   ├── dial.py
│   ├── gemini.py
│   ├── openai_compatible.py
│   ├── openai.py
│   ├── openrouter.py
│   ├── registries
│   │   ├── __init__.py
│   │   ├── azure.py
│   │   ├── base.py
│   │   ├── custom.py
│   │   ├── dial.py
│   │   ├── gemini.py
│   │   ├── openai.py
│   │   ├── openrouter.py
│   │   └── xai.py
│   ├── registry_provider_mixin.py
│   ├── registry.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── model_capabilities.py
│   │   ├── model_response.py
│   │   ├── provider_type.py
│   │   └── temperature.py
│   └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│   └── sync_version.py
├── server.py
├── simulator_tests
│   ├── __init__.py
│   ├── base_test.py
│   ├── conversation_base_test.py
│   ├── log_utils.py
│   ├── test_analyze_validation.py
│   ├── test_basic_conversation.py
│   ├── test_chat_simple_validation.py
│   ├── test_codereview_validation.py
│   ├── test_consensus_conversation.py
│   ├── test_consensus_three_models.py
│   ├── test_consensus_workflow_accurate.py
│   ├── test_content_validation.py
│   ├── test_conversation_chain_validation.py
│   ├── test_cross_tool_comprehensive.py
│   ├── test_cross_tool_continuation.py
│   ├── test_debug_certain_confidence.py
│   ├── test_debug_validation.py
│   ├── test_line_number_validation.py
│   ├── test_logs_validation.py
│   ├── test_model_thinking_config.py
│   ├── test_o3_model_selection.py
│   ├── test_o3_pro_expensive.py
│   ├── test_ollama_custom_url.py
│   ├── test_openrouter_fallback.py
│   ├── test_openrouter_models.py
│   ├── test_per_tool_deduplication.py
│   ├── test_planner_continuation_history.py
│   ├── test_planner_validation_old.py
│   ├── test_planner_validation.py
│   ├── test_precommitworkflow_validation.py
│   ├── test_prompt_size_limit_bug.py
│   ├── test_refactor_validation.py
│   ├── test_secaudit_validation.py
│   ├── test_testgen_validation.py
│   ├── test_thinkdeep_validation.py
│   ├── test_token_allocation_validation.py
│   ├── test_vision_capability.py
│   └── test_xai_models.py
├── systemprompts
│   ├── __init__.py
│   ├── analyze_prompt.py
│   ├── chat_prompt.py
│   ├── clink
│   │   ├── codex_codereviewer.txt
│   │   ├── default_codereviewer.txt
│   │   ├── default_planner.txt
│   │   └── default.txt
│   ├── codereview_prompt.py
│   ├── consensus_prompt.py
│   ├── debug_prompt.py
│   ├── docgen_prompt.py
│   ├── generate_code_prompt.py
│   ├── planner_prompt.py
│   ├── precommit_prompt.py
│   ├── refactor_prompt.py
│   ├── secaudit_prompt.py
│   ├── testgen_prompt.py
│   ├── thinkdeep_prompt.py
│   └── tracer_prompt.py
├── tests
│   ├── __init__.py
│   ├── CASSETTE_MAINTENANCE.md
│   ├── conftest.py
│   ├── gemini_cassettes
│   │   ├── chat_codegen
│   │   │   └── gemini25_pro_calculator
│   │   │       └── mldev.json
│   │   ├── chat_cross
│   │   │   └── step1_gemini25_flash_number
│   │   │       └── mldev.json
│   │   └── consensus
│   │       └── step2_gemini25_flash_against
│   │           └── mldev.json
│   ├── http_transport_recorder.py
│   ├── mock_helpers.py
│   ├── openai_cassettes
│   │   ├── chat_cross_step2_gpt5_reminder.json
│   │   ├── chat_gpt5_continuation.json
│   │   ├── chat_gpt5_moon_distance.json
│   │   ├── consensus_step1_gpt5_for.json
│   │   └── o3_pro_basic_math.json
│   ├── pii_sanitizer.py
│   ├── sanitize_cassettes.py
│   ├── test_alias_target_restrictions.py
│   ├── test_auto_mode_comprehensive.py
│   ├── test_auto_mode_custom_provider_only.py
│   ├── test_auto_mode_model_listing.py
│   ├── test_auto_mode_provider_selection.py
│   ├── test_auto_mode.py
│   ├── test_auto_model_planner_fix.py
│   ├── test_azure_openai_provider.py
│   ├── test_buggy_behavior_prevention.py
│   ├── test_cassette_semantic_matching.py
│   ├── test_challenge.py
│   ├── test_chat_codegen_integration.py
│   ├── test_chat_cross_model_continuation.py
│   ├── test_chat_openai_integration.py
│   ├── test_chat_simple.py
│   ├── test_clink_claude_agent.py
│   ├── test_clink_claude_parser.py
│   ├── test_clink_codex_agent.py
│   ├── test_clink_gemini_agent.py
│   ├── test_clink_gemini_parser.py
│   ├── test_clink_integration.py
│   ├── test_clink_parsers.py
│   ├── test_clink_tool.py
│   ├── test_collaboration.py
│   ├── test_config.py
│   ├── test_consensus_integration.py
│   ├── test_consensus_schema.py
│   ├── test_consensus.py
│   ├── test_conversation_continuation_integration.py
│   ├── test_conversation_field_mapping.py
│   ├── test_conversation_file_features.py
│   ├── test_conversation_memory.py
│   ├── test_conversation_missing_files.py
│   ├── test_custom_openai_temperature_fix.py
│   ├── test_custom_provider.py
│   ├── test_debug.py
│   ├── test_deploy_scripts.py
│   ├── test_dial_provider.py
│   ├── test_directory_expansion_tracking.py
│   ├── test_disabled_tools.py
│   ├── test_docker_claude_desktop_integration.py
│   ├── test_docker_config_complete.py
│   ├── test_docker_healthcheck.py
│   ├── test_docker_implementation.py
│   ├── test_docker_mcp_validation.py
│   ├── test_docker_security.py
│   ├── test_docker_volume_persistence.py
│   ├── test_file_protection.py
│   ├── test_gemini_token_usage.py
│   ├── test_image_support_integration.py
│   ├── test_image_validation.py
│   ├── test_integration_utf8.py
│   ├── test_intelligent_fallback.py
│   ├── test_issue_245_simple.py
│   ├── test_large_prompt_handling.py
│   ├── test_line_numbers_integration.py
│   ├── test_listmodels_restrictions.py
│   ├── test_listmodels.py
│   ├── test_mcp_error_handling.py
│   ├── test_model_enumeration.py
│   ├── test_model_metadata_continuation.py
│   ├── test_model_resolution_bug.py
│   ├── test_model_restrictions.py
│   ├── test_o3_pro_output_text_fix.py
│   ├── test_o3_temperature_fix_simple.py
│   ├── test_openai_compatible_token_usage.py
│   ├── test_openai_provider.py
│   ├── test_openrouter_provider.py
│   ├── test_openrouter_registry.py
│   ├── test_parse_model_option.py
│   ├── test_per_tool_model_defaults.py
│   ├── test_pii_sanitizer.py
│   ├── test_pip_detection_fix.py
│   ├── test_planner.py
│   ├── test_precommit_workflow.py
│   ├── test_prompt_regression.py
│   ├── test_prompt_size_limit_bug_fix.py
│   ├── test_provider_retry_logic.py
│   ├── test_provider_routing_bugs.py
│   ├── test_provider_utf8.py
│   ├── test_providers.py
│   ├── test_rate_limit_patterns.py
│   ├── test_refactor.py
│   ├── test_secaudit.py
│   ├── test_server.py
│   ├── test_supported_models_aliases.py
│   ├── test_thinking_modes.py
│   ├── test_tools.py
│   ├── test_tracer.py
│   ├── test_utf8_localization.py
│   ├── test_utils.py
│   ├── test_uvx_resource_packaging.py
│   ├── test_uvx_support.py
│   ├── test_workflow_file_embedding.py
│   ├── test_workflow_metadata.py
│   ├── test_workflow_prompt_size_validation_simple.py
│   ├── test_workflow_utf8.py
│   ├── test_xai_provider.py
│   ├── transport_helpers.py
│   └── triangle.png
├── tools
│   ├── __init__.py
│   ├── analyze.py
│   ├── apilookup.py
│   ├── challenge.py
│   ├── chat.py
│   ├── clink.py
│   ├── codereview.py
│   ├── consensus.py
│   ├── debug.py
│   ├── docgen.py
│   ├── listmodels.py
│   ├── models.py
│   ├── planner.py
│   ├── precommit.py
│   ├── refactor.py
│   ├── secaudit.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── base_models.py
│   │   ├── base_tool.py
│   │   ├── exceptions.py
│   │   └── schema_builders.py
│   ├── simple
│   │   ├── __init__.py
│   │   └── base.py
│   ├── testgen.py
│   ├── thinkdeep.py
│   ├── tracer.py
│   ├── version.py
│   └── workflow
│       ├── __init__.py
│       ├── base.py
│       ├── schema_builders.py
│       └── workflow_mixin.py
├── utils
│   ├── __init__.py
│   ├── client_info.py
│   ├── conversation_memory.py
│   ├── env.py
│   ├── file_types.py
│   ├── file_utils.py
│   ├── image_utils.py
│   ├── model_context.py
│   ├── model_restrictions.py
│   ├── security_config.py
│   ├── storage_backend.py
│   └── token_utils.py
└── zen-mcp-server
```

# Files

--------------------------------------------------------------------------------
/tools/analyze.py:
--------------------------------------------------------------------------------

```python
"""
AnalyzeWorkflow tool - Step-by-step code analysis with systematic investigation

This tool provides a structured workflow for comprehensive code and file analysis.
It guides the CLI agent through systematic investigation steps with forced pauses between each step
to ensure thorough code examination, pattern identification, and architectural assessment before proceeding.
The tool supports complex analysis scenarios including architectural review, performance analysis,
security assessment, and maintainability evaluation.

Key features:
- Step-by-step analysis workflow with progress tracking
- Context-aware file embedding (references during investigation, full content for analysis)
- Automatic pattern and insight tracking with categorization
- Expert analysis integration with external models
- Support for focused analysis (architecture, performance, security, quality)
- Confidence-based workflow optimization
"""

import logging
from typing import TYPE_CHECKING, Any, Literal, Optional

from pydantic import Field, model_validator

if TYPE_CHECKING:
    from tools.models import ToolModelCategory

from config import TEMPERATURE_ANALYTICAL
from systemprompts import ANALYZE_PROMPT
from tools.shared.base_models import WorkflowRequest

from .workflow.base import WorkflowTool

logger = logging.getLogger(__name__)

# Tool-specific field descriptions for analyze workflow
ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS = {
    "step": (
        "The analysis plan. Step 1: State your strategy, including how you will map the codebase structure, "
        "understand business logic, and assess code quality, performance implications, and architectural patterns. "
        "Later steps: Report findings and adapt the approach as new insights emerge."
    ),
    "step_number": (
        "The index of the current step in the analysis sequence, beginning at 1. Each step should build upon or "
        "revise the previous one."
    ),
    "total_steps": (
        "Your current estimate for how many steps will be needed to complete the analysis. "
        "Adjust as new findings emerge."
    ),
    "next_step_required": (
        "Set to true if you plan to continue the investigation with another step. False means you believe the "
        "analysis is complete and ready for expert validation."
    ),
    "findings": (
        "Summary of discoveries from this step, including architectural patterns, tech stack assessment, scalability characteristics, "
        "performance implications, maintainability factors, and strategic improvement opportunities. "
        "IMPORTANT: Document both strengths (good patterns, solid architecture) and concerns (tech debt, overengineering, unnecessary complexity). "
        "In later steps, confirm or update past findings with additional evidence."
    ),
    "files_checked": (
        "List all files examined (absolute paths). Include even ruled-out files to track exploration path."
    ),
    "relevant_files": (
        "Subset of files_checked directly relevant to analysis findings (absolute paths). Include files with "
        "significant patterns, architectural decisions, or strategic improvement opportunities."
    ),
    "relevant_context": (
        "List methods/functions central to analysis findings, in 'ClassName.methodName' or 'functionName' format. "
        "Prioritize those demonstrating key patterns, architectural decisions, or improvement opportunities."
    ),
    "images": (
        "Optional absolute paths to architecture diagrams or visual references that help with analysis context."
    ),
    "confidence": (
        "Your confidence in the analysis: exploring, low, medium, high, very_high, almost_certain, or certain. "
        "'certain' indicates the analysis is complete and ready for validation."
    ),
    "analysis_type": "Type of analysis to perform (architecture, performance, security, quality, general)",
    "output_format": "How to format the output (summary, detailed, actionable)",
}


class AnalyzeWorkflowRequest(WorkflowRequest):
    """Request model for analyze workflow investigation steps"""

    # Required fields for each investigation step
    step: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"])
    step_number: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
    total_steps: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
    next_step_required: bool = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])

    # Investigation tracking fields
    findings: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
    files_checked: list[str] = Field(
        default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
    )
    relevant_files: list[str] = Field(
        default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
    )
    relevant_context: list[str] = Field(
        default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
    )

    # Issues found during analysis (structured with severity)
    issues_found: list[dict] = Field(
        default_factory=list,
        description="Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
    )

    # Optional images for visual context
    images: Optional[list[str]] = Field(default=None, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"])

    # Analyze-specific fields (only used in step 1 to initialize)
    # Note: Use relevant_files field instead of files for consistency across workflow tools
    analysis_type: Optional[Literal["architecture", "performance", "security", "quality", "general"]] = Field(
        "general", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"]
    )
    output_format: Optional[Literal["summary", "detailed", "actionable"]] = Field(
        "detailed", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"]
    )

    # Keep thinking_mode from original analyze tool; temperature is inherited from WorkflowRequest

    @model_validator(mode="after")
    def validate_step_one_requirements(self):
        """Ensure step 1 has required relevant_files."""
        if self.step_number == 1:
            if not self.relevant_files:
                raise ValueError("Step 1 requires 'relevant_files' field to specify files or directories to analyze")
        return self


class AnalyzeTool(WorkflowTool):
    """
    Analyze workflow tool for step-by-step code analysis and expert validation.

    This tool implements a structured analysis workflow that guides users through
    methodical investigation steps, ensuring thorough code examination, pattern identification,
    and architectural assessment before reaching conclusions. It supports complex analysis scenarios
    including architectural review, performance analysis, security assessment, and maintainability evaluation.
    """

    def __init__(self):
        super().__init__()
        self.initial_request = None
        self.analysis_config = {}

    def get_name(self) -> str:
        return "analyze"

    def get_description(self) -> str:
        return (
            "Performs comprehensive code analysis with systematic investigation and expert validation. "
            "Use for architecture, performance, maintainability, and pattern analysis. "
            "Guides through structured code review and strategic planning."
        )

    def get_system_prompt(self) -> str:
        return ANALYZE_PROMPT

    def get_default_temperature(self) -> float:
        return TEMPERATURE_ANALYTICAL

    def get_model_category(self) -> "ToolModelCategory":
        """Analyze workflow requires thorough analysis and reasoning"""
        from tools.models import ToolModelCategory

        return ToolModelCategory.EXTENDED_REASONING

    def get_workflow_request_model(self):
        """Return the analyze workflow-specific request model."""
        return AnalyzeWorkflowRequest

    def get_input_schema(self) -> dict[str, Any]:
        """Generate input schema using WorkflowSchemaBuilder with analyze-specific overrides."""
        from .workflow.schema_builders import WorkflowSchemaBuilder

        # Fields to exclude from analyze workflow (inherited from WorkflowRequest but not used)
        excluded_fields = {"hypothesis", "confidence"}

        # Analyze workflow-specific field overrides
        analyze_field_overrides = {
            "step": {
                "type": "string",
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"],
            },
            "step_number": {
                "type": "integer",
                "minimum": 1,
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
            },
            "total_steps": {
                "type": "integer",
                "minimum": 1,
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
            },
            "next_step_required": {
                "type": "boolean",
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
            },
            "findings": {
                "type": "string",
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
            },
            "files_checked": {
                "type": "array",
                "items": {"type": "string"},
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"],
            },
            "relevant_files": {
                "type": "array",
                "items": {"type": "string"},
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
            },
            "confidence": {
                "type": "string",
                "enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["confidence"],
            },
            "images": {
                "type": "array",
                "items": {"type": "string"},
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"],
            },
            "issues_found": {
                "type": "array",
                "items": {"type": "object"},
                "description": "Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
            },
            "analysis_type": {
                "type": "string",
                "enum": ["architecture", "performance", "security", "quality", "general"],
                "default": "general",
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"],
            },
            "output_format": {
                "type": "string",
                "enum": ["summary", "detailed", "actionable"],
                "default": "detailed",
                "description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"],
            },
        }

        # Use WorkflowSchemaBuilder with analyze-specific tool fields
        return WorkflowSchemaBuilder.build_schema(
            tool_specific_fields=analyze_field_overrides,
            model_field_schema=self.get_model_field_schema(),
            auto_mode=self.is_effective_auto_mode(),
            tool_name=self.get_name(),
            excluded_workflow_fields=list(excluded_fields),
        )

    def get_required_actions(
        self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
    ) -> list[str]:
        """Define required actions for each investigation phase."""
        if step_number == 1:
            # Initial analysis investigation tasks
            return [
                "Read and understand the code files specified for analysis",
                "Map the tech stack, frameworks, and overall architecture",
                "Identify the main components, modules, and their relationships",
                "Understand the business logic and intended functionality",
                "Examine architectural patterns and design decisions used",
                "Look for strengths, risks, and strategic improvement areas",
            ]
        elif step_number < total_steps:
            # Need deeper investigation
            return [
                "Examine specific architectural patterns and design decisions in detail",
                "Analyze scalability characteristics and performance implications",
                "Assess maintainability factors: module cohesion, coupling, tech debt",
                "Identify security posture and potential systemic vulnerabilities",
                "Look for overengineering, unnecessary complexity, or missing abstractions",
                "Evaluate how well the architecture serves business and scaling goals",
            ]
        else:
            # Close to completion - need final verification
            return [
                "Verify all significant architectural insights have been documented",
                "Confirm strategic improvement opportunities are comprehensively captured",
                "Ensure both strengths and risks are properly identified with evidence",
                "Validate that findings align with the analysis type and goals specified",
                "Check that recommendations are actionable and proportional to the codebase",
                "Confirm the analysis provides clear guidance for strategic decisions",
            ]

    def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
        """
        Always call expert analysis for comprehensive validation.

        Analysis benefits from a second opinion to ensure completeness.
        """
        # Check if user explicitly requested to skip assistant model
        if request and not self.get_request_use_assistant_model(request):
            return False

        # For analysis, we always want expert validation if we have any meaningful data
        return len(consolidated_findings.relevant_files) > 0 or len(consolidated_findings.findings) >= 1

    def prepare_expert_analysis_context(self, consolidated_findings) -> str:
        """Prepare context for external model call for final analysis validation."""
        context_parts = [
            f"=== ANALYSIS REQUEST ===\\n{self.initial_request or 'Code analysis workflow initiated'}\\n=== END REQUEST ==="
        ]

        # Add investigation summary
        investigation_summary = self._build_analysis_summary(consolidated_findings)
        context_parts.append(
            f"\\n=== AGENT'S ANALYSIS INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
        )

        # Add analysis configuration context if available
        if self.analysis_config:
            config_text = "\\n".join(f"- {key}: {value}" for key, value in self.analysis_config.items() if value)
            context_parts.append(f"\\n=== ANALYSIS CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")

        # Add relevant code elements if available
        if consolidated_findings.relevant_context:
            methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
            context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")

        # Add assessment evolution if available
        if consolidated_findings.hypotheses:
            assessments_text = "\\n".join(
                f"Step {h['step']}: {h['hypothesis']}" for h in consolidated_findings.hypotheses
            )
            context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")

        # Add images if available
        if consolidated_findings.images:
            images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
            context_parts.append(
                f"\\n=== VISUAL ANALYSIS INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
            )

        return "\\n".join(context_parts)

    def _build_analysis_summary(self, consolidated_findings) -> str:
        """Prepare a comprehensive summary of the analysis investigation."""
        summary_parts = [
            "=== SYSTEMATIC ANALYSIS INVESTIGATION SUMMARY ===",
            f"Total steps: {len(consolidated_findings.findings)}",
            f"Files examined: {len(consolidated_findings.files_checked)}",
            f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
            f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
            "",
            "=== INVESTIGATION PROGRESSION ===",
        ]

        for finding in consolidated_findings.findings:
            summary_parts.append(finding)

        return "\\n".join(summary_parts)

    def should_include_files_in_expert_prompt(self) -> bool:
        """Include files in expert analysis for comprehensive validation."""
        return True

    def should_embed_system_prompt(self) -> bool:
        """Embed system prompt in expert analysis for proper context."""
        return True

    def get_expert_thinking_mode(self) -> str:
        """Use high thinking mode for thorough analysis."""
        return "high"

    def get_expert_analysis_instruction(self) -> str:
        """Get specific instruction for analysis expert validation."""
        return (
            "Please provide comprehensive analysis validation based on the investigation findings. "
            "Focus on identifying any remaining architectural insights, validating the completeness of the analysis, "
            "and providing final strategic recommendations following the structured format specified in the system prompt."
        )

    # Hook method overrides for analyze-specific behavior

    def prepare_step_data(self, request) -> dict:
        """
        Map analyze-specific fields for internal processing.
        """
        step_data = {
            "step": request.step,
            "step_number": request.step_number,
            "findings": request.findings,
            "files_checked": request.files_checked,
            "relevant_files": request.relevant_files,
            "relevant_context": request.relevant_context,
            "issues_found": request.issues_found,  # Analyze workflow uses issues_found for structured problem tracking
            "confidence": "medium",  # Fixed value for workflow compatibility
            "hypothesis": request.findings,  # Map findings to hypothesis for compatibility
            "images": request.images or [],
        }
        return step_data

    def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
        """
        Analyze workflow always uses expert analysis for comprehensive validation.

        Analysis benefits from a second opinion to ensure completeness and catch
        any missed insights or alternative perspectives.
        """
        return False

    def store_initial_issue(self, step_description: str):
        """Store initial request for expert analysis."""
        self.initial_request = step_description

    # Override inheritance hooks for analyze-specific behavior

    def get_completion_status(self) -> str:
        """Analyze tools use analysis-specific status."""
        return "analysis_complete_ready_for_implementation"

    def get_completion_data_key(self) -> str:
        """Analyze uses 'complete_analysis' key."""
        return "complete_analysis"

    def get_final_analysis_from_request(self, request):
        """Analyze tools use 'findings' field."""
        return request.findings

    def get_confidence_level(self, request) -> str:
        """Analyze tools use fixed confidence for consistency."""
        return "medium"

    def get_completion_message(self) -> str:
        """Analyze-specific completion message."""
        return (
            "Analysis complete. You have identified all significant patterns, "
            "architectural insights, and strategic opportunities. MANDATORY: Present the user with the complete "
            "analysis results organized by strategic impact, and IMMEDIATELY proceed with implementing the "
            "highest priority recommendations or provide specific guidance for improvements. Focus on actionable "
            "strategic insights."
        )

    def get_skip_reason(self) -> str:
        """Analyze-specific skip reason."""
        return "Completed comprehensive analysis locally"

    def get_skip_expert_analysis_status(self) -> str:
        """Analyze-specific expert analysis skip status."""
        return "skipped_due_to_complete_analysis"

    def prepare_work_summary(self) -> str:
        """Analyze-specific work summary."""
        return self._build_analysis_summary(self.consolidated_findings)

    def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
        """
        Analyze-specific completion message.
        """
        base_message = (
            "ANALYSIS IS COMPLETE. You MUST now summarize and present ALL analysis findings organized by "
            "strategic impact (Critical → High → Medium → Low), specific architectural insights with code references, "
            "and exact recommendations for improvement. Clearly prioritize the top 3 strategic opportunities that need "
            "immediate attention. Provide concrete, actionable guidance for each finding—make it easy for a developer "
            "to understand exactly what strategic improvements to implement and how to approach them."
        )

        # Add expert analysis guidance only when expert analysis was actually used
        if expert_analysis_used:
            expert_guidance = self.get_expert_analysis_guidance()
            if expert_guidance:
                return f"{base_message}\n\n{expert_guidance}"

        return base_message

    def get_expert_analysis_guidance(self) -> str:
        """
        Provide specific guidance for handling expert analysis in code analysis.
        """
        return (
            "IMPORTANT: Analysis from an assistant model has been provided above. You MUST thoughtfully evaluate and validate "
            "the expert insights rather than treating them as definitive conclusions. Cross-reference the expert "
            "analysis with your own systematic investigation, verify that architectural recommendations are "
            "appropriate for this codebase's scale and context, and ensure suggested improvements align with "
            "the project's goals and constraints. Present a comprehensive synthesis that combines your detailed "
            "analysis with validated expert perspectives, clearly distinguishing between patterns you've "
            "independently identified and additional strategic insights from expert validation."
        )

    def get_step_guidance_message(self, request) -> str:
        """
        Analyze-specific step guidance with detailed investigation instructions.
        """
        step_guidance = self.get_analyze_step_guidance(request.step_number, request)
        return step_guidance["next_steps"]

    def get_analyze_step_guidance(self, step_number: int, request) -> dict[str, Any]:
        """
        Provide step-specific guidance for analyze workflow.
        """
        # Generate the next steps instruction based on required actions
        required_actions = self.get_required_actions(step_number, "medium", request.findings, request.total_steps)

        if step_number == 1:
            next_steps = (
                f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
                f"the code files thoroughly using appropriate tools. CRITICAL AWARENESS: You need to understand "
                f"the architectural patterns, assess scalability and performance characteristics, identify strategic "
                f"improvement areas, and look for systemic risks, overengineering, and missing abstractions. "
                f"Use file reading tools, code analysis, and systematic examination to gather comprehensive information. "
                f"Only call {self.get_name()} again AFTER completing your investigation. When you call "
                f"{self.get_name()} next time, use step_number: {step_number + 1} and report specific "
                f"files examined, architectural insights found, and strategic assessment discoveries."
            )
        elif step_number < request.total_steps:
            next_steps = (
                f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
                f"deeper analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
                + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
                + f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
                + "completing these analysis tasks."
            )
        else:
            next_steps = (
                f"WAIT! Your analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
                + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
                + f"\\n\\nREMEMBER: Ensure you have identified all significant architectural insights and strategic "
                f"opportunities across all areas. Document findings with specific file references and "
                f"code examples where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
            )

        return {"next_steps": next_steps}

    def customize_workflow_response(self, response_data: dict, request) -> dict:
        """
        Customize response to match analyze workflow format.
        """
        # Store initial request on first step
        if request.step_number == 1:
            self.initial_request = request.step
            # Store analysis configuration for expert analysis
            if request.relevant_files:
                self.analysis_config = {
                    "relevant_files": request.relevant_files,
                    "analysis_type": request.analysis_type,
                    "output_format": request.output_format,
                }

        # Convert generic status names to analyze-specific ones
        tool_name = self.get_name()
        status_mapping = {
            f"{tool_name}_in_progress": "analysis_in_progress",
            f"pause_for_{tool_name}": "pause_for_analysis",
            f"{tool_name}_required": "analysis_required",
            f"{tool_name}_complete": "analysis_complete",
        }

        if response_data["status"] in status_mapping:
            response_data["status"] = status_mapping[response_data["status"]]

        # Rename status field to match analyze workflow
        if f"{tool_name}_status" in response_data:
            response_data["analysis_status"] = response_data.pop(f"{tool_name}_status")
            # Add analyze-specific status fields
            response_data["analysis_status"]["insights_by_severity"] = {}
            for insight in self.consolidated_findings.issues_found:
                severity = insight.get("severity", "unknown")
                if severity not in response_data["analysis_status"]["insights_by_severity"]:
                    response_data["analysis_status"]["insights_by_severity"][severity] = 0
                response_data["analysis_status"]["insights_by_severity"][severity] += 1
            response_data["analysis_status"]["analysis_confidence"] = self.get_request_confidence(request)

        # Map complete_analyze to complete_analysis
        if f"complete_{tool_name}" in response_data:
            response_data["complete_analysis"] = response_data.pop(f"complete_{tool_name}")

        # Map the completion flag to match analyze workflow
        if f"{tool_name}_complete" in response_data:
            response_data["analysis_complete"] = response_data.pop(f"{tool_name}_complete")

        return response_data

    # Required abstract methods from BaseTool
    def get_request_model(self):
        """Return the analyze workflow-specific request model."""
        return AnalyzeWorkflowRequest

    async def prepare_prompt(self, request) -> str:
        """Not used - workflow tools use execute_workflow()."""
        return ""  # Workflow tools use execute_workflow() directly

```

--------------------------------------------------------------------------------
/simulator_tests/test_planner_validation.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
PlannerWorkflow Tool Validation Test

Tests the planner tool's capabilities using the new workflow architecture.
This validates that the new workflow-based implementation maintains all the
functionality of the original planner tool while using the workflow pattern
like the debug tool.
"""

import json
from typing import Optional

from .conversation_base_test import ConversationBaseTest


class PlannerValidationTest(ConversationBaseTest):
    """Test planner tool with new workflow architecture"""

    @property
    def test_name(self) -> str:
        return "planner_validation"

    @property
    def test_description(self) -> str:
        return "PlannerWorkflow tool validation with new workflow architecture"

    def run_test(self) -> bool:
        """Test planner tool capabilities"""
        # Set up the test environment
        self.setUp()

        try:
            self.logger.info("Test: PlannerWorkflow tool validation (new architecture)")

            # Test 1: Single planning session with workflow architecture
            if not self._test_single_planning_session():
                return False

            # Test 2: Planning with continuation using workflow
            if not self._test_planning_with_continuation():
                return False

            # Test 3: Complex plan with deep thinking pauses
            if not self._test_complex_plan_deep_thinking():
                return False

            # Test 4: Self-contained completion (no expert analysis)
            if not self._test_self_contained_completion():
                return False

            # Test 5: Branching and revision with workflow
            if not self._test_branching_and_revision():
                return False

            # Test 6: Workflow file context behavior
            if not self._test_workflow_file_context():
                return False

            self.logger.info("  ✅ All planner validation tests passed")
            return True

        except Exception as e:
            self.logger.error(f"PlannerWorkflow validation test failed: {e}")
            return False

    def _test_single_planning_session(self) -> bool:
        """Test a complete planning session with workflow architecture"""
        try:
            self.logger.info("  1.1: Testing single planning session with workflow")

            # Step 1: Start planning
            self.logger.info("    1.1.1: Step 1 - Initial planning step")
            response1, continuation_id = self.call_mcp_tool(
                "planner",
                {
                    "step": "I need to plan a comprehensive API redesign for our legacy system. Let me start by analyzing the current state and identifying key requirements for the new API architecture.",
                    "step_number": 1,
                    "total_steps": 4,
                    "next_step_required": True,
                    "model": "flash",
                },
            )

            if not response1 or not continuation_id:
                self.logger.error("Failed to get initial planning response")
                return False

            # Parse and validate JSON response
            response1_data = self._parse_planner_response(response1)
            if not response1_data:
                return False

            # Validate step 1 response structure - expect pause_for_planner for next_step_required=True
            if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_planner"):
                return False

            # Debug: Log the actual response structure to see what we're getting
            self.logger.debug(f"Response structure: {list(response1_data.keys())}")

            # Check workflow-specific response structure (more flexible)
            status_key = None
            for key in response1_data.keys():
                if key.endswith("_status"):
                    status_key = key
                    break

            if not status_key:
                self.logger.error(f"Missing workflow status field in response: {list(response1_data.keys())}")
                return False

            self.logger.debug(f"Found status field: {status_key}")

            # Check required_actions for workflow guidance
            if not response1_data.get("required_actions"):
                self.logger.error("Missing required_actions in workflow response")
                return False

            self.logger.info(f"    ✅ Step 1 successful with workflow, continuation_id: {continuation_id}")

            # Step 2: Continue planning
            self.logger.info("    1.1.2: Step 2 - API domain analysis")
            response2, _ = self.call_mcp_tool(
                "planner",
                {
                    "step": "After analyzing the current API, I can identify three main domains: User Management, Content Management, and Analytics. Let me design the new API structure with RESTful endpoints and proper versioning.",
                    "step_number": 2,
                    "total_steps": 4,
                    "next_step_required": True,
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response2:
                self.logger.error("Failed to continue planning to step 2")
                return False

            response2_data = self._parse_planner_response(response2)
            if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_planner"):
                return False

            # Check step history tracking in workflow (more flexible)
            status_key = None
            for key in response2_data.keys():
                if key.endswith("_status"):
                    status_key = key
                    break

            if status_key:
                workflow_status = response2_data.get(status_key, {})
                step_history_length = workflow_status.get("step_history_length", 0)
                if step_history_length < 2:
                    self.logger.error(f"Step history not properly tracked in workflow: {step_history_length}")
                    return False
                self.logger.debug(f"Step history length: {step_history_length}")
            else:
                self.logger.warning("No workflow status found, skipping step history check")

            self.logger.info("    ✅ Step 2 successful with workflow tracking")

            # Step 3: Final step - should trigger completion
            self.logger.info("    1.1.3: Step 3 - Final planning step")
            response3, _ = self.call_mcp_tool(
                "planner",
                {
                    "step": "API redesign plan complete: Phase 1 - User Management API, Phase 2 - Content Management API, Phase 3 - Analytics API. Each phase includes proper authentication, rate limiting, and comprehensive documentation.",
                    "step_number": 3,
                    "total_steps": 3,  # Adjusted total
                    "next_step_required": False,  # Final step - should complete without expert analysis
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response3:
                self.logger.error("Failed to complete planning session")
                return False

            response3_data = self._parse_planner_response(response3)
            if not response3_data:
                return False

            # Validate final response structure - should be self-contained completion
            if response3_data.get("status") != "planner_complete":
                self.logger.error(f"Expected status 'planner_complete', got '{response3_data.get('status')}'")
                return False

            if not response3_data.get("planning_complete"):
                self.logger.error("Expected planning_complete=true for final step")
                return False

            # Should NOT have expert_analysis (self-contained)
            if "expert_analysis" in response3_data:
                self.logger.error("PlannerWorkflow should be self-contained without expert analysis")
                return False

            # Check plan_summary exists
            if not response3_data.get("plan_summary"):
                self.logger.error("Missing plan_summary in final step")
                return False

            self.logger.info("    ✅ Planning session completed successfully with workflow architecture")

            # Store continuation_id for next test
            self.api_continuation_id = continuation_id
            return True

        except Exception as e:
            self.logger.error(f"Single planning session test failed: {e}")
            return False

    def _test_planning_with_continuation(self) -> bool:
        """Test planning continuation with workflow architecture"""
        try:
            self.logger.info("  1.2: Testing planning continuation with workflow")

            # Use continuation from previous test if available
            continuation_id = getattr(self, "api_continuation_id", None)
            if not continuation_id:
                # Start fresh if no continuation available
                self.logger.info("    1.2.0: Starting fresh planning session")
                response0, continuation_id = self.call_mcp_tool(
                    "planner",
                    {
                        "step": "Planning API security strategy",
                        "step_number": 1,
                        "total_steps": 2,
                        "next_step_required": True,
                        "model": "flash",
                    },
                )
                if not response0 or not continuation_id:
                    self.logger.error("Failed to start fresh planning session")
                    return False

            # Test continuation step
            self.logger.info("    1.2.1: Continue planning session")
            response1, _ = self.call_mcp_tool(
                "planner",
                {
                    "step": "Building on the API redesign, let me now plan the security implementation with OAuth 2.0, API keys, and rate limiting strategies.",
                    "step_number": 2,
                    "total_steps": 2,
                    "next_step_required": True,
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response1:
                self.logger.error("Failed to continue planning")
                return False

            response1_data = self._parse_planner_response(response1)
            if not response1_data:
                return False

            # Validate continuation behavior
            if not self._validate_step_response(response1_data, 2, 2, True, "pause_for_planner"):
                return False

            # Check that continuation_id is preserved
            if response1_data.get("continuation_id") != continuation_id:
                self.logger.error("Continuation ID not preserved in workflow")
                return False

            self.logger.info("    ✅ Planning continuation working with workflow")
            return True

        except Exception as e:
            self.logger.error(f"Planning continuation test failed: {e}")
            return False

    def _test_complex_plan_deep_thinking(self) -> bool:
        """Test complex plan with deep thinking pauses"""
        try:
            self.logger.info("  1.3: Testing complex plan with deep thinking pauses")

            # Start complex plan (≥5 steps) - should trigger deep thinking
            self.logger.info("    1.3.1: Step 1 of complex plan (should trigger deep thinking)")
            response1, continuation_id = self.call_mcp_tool(
                "planner",
                {
                    "step": "I need to plan a complete digital transformation for our enterprise organization, including cloud migration, process automation, and cultural change management.",
                    "step_number": 1,
                    "total_steps": 8,  # Complex plan ≥5 steps
                    "next_step_required": True,
                    "model": "flash",
                },
            )

            if not response1 or not continuation_id:
                self.logger.error("Failed to start complex planning")
                return False

            response1_data = self._parse_planner_response(response1)
            if not response1_data:
                return False

            # Should trigger deep thinking pause for complex plan
            if response1_data.get("status") != "pause_for_deep_thinking":
                self.logger.error("Expected deep thinking pause for complex plan step 1")
                return False

            if not response1_data.get("thinking_required"):
                self.logger.error("Expected thinking_required=true for complex plan")
                return False

            # Check required thinking actions
            required_thinking = response1_data.get("required_thinking", [])
            if len(required_thinking) < 4:
                self.logger.error("Expected comprehensive thinking requirements for complex plan")
                return False

            # Check for deep thinking guidance in next_steps
            next_steps = response1_data.get("next_steps", "")
            if "MANDATORY" not in next_steps or "deep thinking" not in next_steps.lower():
                self.logger.error("Expected mandatory deep thinking guidance")
                return False

            self.logger.info("    ✅ Complex plan step 1 correctly triggered deep thinking pause")

            # Step 2 of complex plan - should also trigger deep thinking
            self.logger.info("    1.3.2: Step 2 of complex plan (should trigger deep thinking)")
            response2, _ = self.call_mcp_tool(
                "planner",
                {
                    "step": "After deep analysis, I can see this transformation requires three parallel tracks: Technical Infrastructure, Business Process, and Human Capital. Let me design the coordination strategy.",
                    "step_number": 2,
                    "total_steps": 8,
                    "next_step_required": True,
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response2:
                self.logger.error("Failed to continue complex planning")
                return False

            response2_data = self._parse_planner_response(response2)
            if not response2_data:
                return False

            # Step 2 should also trigger deep thinking for complex plans
            if response2_data.get("status") != "pause_for_deep_thinking":
                self.logger.error("Expected deep thinking pause for complex plan step 2")
                return False

            self.logger.info("    ✅ Complex plan step 2 correctly triggered deep thinking pause")

            # Step 4 of complex plan - should use normal flow (after step 3)
            self.logger.info("    1.3.3: Step 4 of complex plan (should use normal flow)")
            response4, _ = self.call_mcp_tool(
                "planner",
                {
                    "step": "Now moving to tactical planning: Phase 1 execution details with specific timelines and resource allocation for the technical infrastructure track.",
                    "step_number": 4,
                    "total_steps": 8,
                    "next_step_required": True,
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response4:
                self.logger.error("Failed to continue to step 4")
                return False

            response4_data = self._parse_planner_response(response4)
            if not response4_data:
                return False

            # Step 4 should use normal flow (no more deep thinking pauses)
            if response4_data.get("status") != "pause_for_planner":
                self.logger.error("Expected normal planning flow for step 4")
                return False

            if response4_data.get("thinking_required"):
                self.logger.error("Step 4 should not require special thinking pause")
                return False

            self.logger.info("    ✅ Complex plan transitions to normal flow after step 3")
            return True

        except Exception as e:
            self.logger.error(f"Complex plan deep thinking test failed: {e}")
            return False

    def _test_self_contained_completion(self) -> bool:
        """Test self-contained completion without expert analysis"""
        try:
            self.logger.info("  1.4: Testing self-contained completion")

            # Simple planning session that should complete without expert analysis
            self.logger.info("    1.4.1: Simple planning session")
            response1, continuation_id = self.call_mcp_tool(
                "planner",
                {
                    "step": "Planning a simple website redesign with new color scheme and improved navigation.",
                    "step_number": 1,
                    "total_steps": 2,
                    "next_step_required": True,
                    "model": "flash",
                },
            )

            if not response1 or not continuation_id:
                self.logger.error("Failed to start simple planning")
                return False

            # Final step - should complete without expert analysis
            self.logger.info("    1.4.2: Final step - self-contained completion")
            response2, _ = self.call_mcp_tool(
                "planner",
                {
                    "step": "Website redesign plan complete: Phase 1 - Update color palette and typography, Phase 2 - Redesign navigation structure and user flows.",
                    "step_number": 2,
                    "total_steps": 2,
                    "next_step_required": False,  # Final step
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response2:
                self.logger.error("Failed to complete simple planning")
                return False

            response2_data = self._parse_planner_response(response2)
            if not response2_data:
                return False

            # Validate self-contained completion
            if response2_data.get("status") != "planner_complete":
                self.logger.error("Expected self-contained completion status")
                return False

            # Should NOT call expert analysis
            if "expert_analysis" in response2_data:
                self.logger.error("PlannerWorkflow should not call expert analysis")
                return False

            # Should have planning_complete flag
            if not response2_data.get("planning_complete"):
                self.logger.error("Expected planning_complete=true")
                return False

            # Should have plan_summary
            if not response2_data.get("plan_summary"):
                self.logger.error("Expected plan_summary in completion")
                return False

            # Check completion instructions
            output = response2_data.get("output", {})
            if not output.get("instructions"):
                self.logger.error("Missing output instructions for plan presentation")
                return False

            self.logger.info("    ✅ Self-contained completion working correctly")
            return True

        except Exception as e:
            self.logger.error(f"Self-contained completion test failed: {e}")
            return False

    def _test_branching_and_revision(self) -> bool:
        """Test branching and revision with workflow architecture"""
        try:
            self.logger.info("  1.5: Testing branching and revision with workflow")

            # Start planning session for branching test
            self.logger.info("    1.5.1: Start planning for branching test")
            response1, continuation_id = self.call_mcp_tool(
                "planner",
                {
                    "step": "Planning mobile app development strategy with different technology options to evaluate.",
                    "step_number": 1,
                    "total_steps": 4,
                    "next_step_required": True,
                    "model": "flash",
                },
            )

            if not response1 or not continuation_id:
                self.logger.error("Failed to start branching test")
                return False

            # Create branch
            self.logger.info("    1.5.2: Create branch for React Native approach")
            response2, _ = self.call_mcp_tool(
                "planner",
                {
                    "step": "Branch A: React Native approach - cross-platform development with shared codebase, faster development cycle, and consistent UI across platforms.",
                    "step_number": 2,
                    "total_steps": 4,
                    "next_step_required": True,
                    "is_branch_point": True,
                    "branch_from_step": 1,
                    "branch_id": "react-native",
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response2:
                self.logger.error("Failed to create branch")
                return False

            response2_data = self._parse_planner_response(response2)
            if not response2_data:
                return False

            # Validate branching in workflow
            metadata = response2_data.get("metadata", {})
            if not metadata.get("is_branch_point"):
                self.logger.error("Branch point not recorded in workflow")
                return False

            if metadata.get("branch_id") != "react-native":
                self.logger.error("Branch ID not properly recorded")
                return False

            if "react-native" not in metadata.get("branches", []):
                self.logger.error("Branch not added to branches list")
                return False

            self.logger.info("    ✅ Branching working with workflow architecture")

            # Test revision
            self.logger.info("    1.5.3: Test revision capability")
            response3, _ = self.call_mcp_tool(
                "planner",
                {
                    "step": "Revision of step 2: After consideration, let me revise the React Native approach to include performance optimizations and native module integration for critical features.",
                    "step_number": 3,
                    "total_steps": 4,
                    "next_step_required": True,
                    "is_step_revision": True,
                    "revises_step_number": 2,
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response3:
                self.logger.error("Failed to create revision")
                return False

            response3_data = self._parse_planner_response(response3)
            if not response3_data:
                return False

            # Validate revision in workflow
            metadata = response3_data.get("metadata", {})
            if not metadata.get("is_step_revision"):
                self.logger.error("Step revision not recorded in workflow")
                return False

            if metadata.get("revises_step_number") != 2:
                self.logger.error("Revised step number not properly recorded")
                return False

            self.logger.info("    ✅ Revision working with workflow architecture")
            return True

        except Exception as e:
            self.logger.error(f"Branching and revision test failed: {e}")
            return False

    def _test_workflow_file_context(self) -> bool:
        """Test workflow file context behavior (should be minimal for planner)"""
        try:
            self.logger.info("  1.6: Testing workflow file context behavior")

            # Planner typically doesn't use files, but test the workflow handles this correctly
            self.logger.info("    1.6.1: Planning step with no files (normal case)")
            response1, continuation_id = self.call_mcp_tool(
                "planner",
                {
                    "step": "Planning data architecture for analytics platform.",
                    "step_number": 1,
                    "total_steps": 2,
                    "next_step_required": True,
                    "model": "flash",
                },
            )

            if not response1 or not continuation_id:
                self.logger.error("Failed to start workflow file context test")
                return False

            response1_data = self._parse_planner_response(response1)
            if not response1_data:
                return False

            # Planner workflow should not have file_context since it doesn't use files
            if "file_context" in response1_data:
                self.logger.info("    ℹ️ Workflow file context present but should be minimal for planner")

            # Final step
            self.logger.info("    1.6.2: Final step (should complete without file embedding)")
            response2, _ = self.call_mcp_tool(
                "planner",
                {
                    "step": "Data architecture plan complete with data lakes, processing pipelines, and analytics layers.",
                    "step_number": 2,
                    "total_steps": 2,
                    "next_step_required": False,
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response2:
                self.logger.error("Failed to complete workflow file context test")
                return False

            response2_data = self._parse_planner_response(response2)
            if not response2_data:
                return False

            # Final step should complete self-contained
            if response2_data.get("status") != "planner_complete":
                self.logger.error("Expected self-contained completion for planner workflow")
                return False

            self.logger.info("    ✅ Workflow file context behavior appropriate for planner")
            return True

        except Exception as e:
            self.logger.error(f"Workflow file context test failed: {e}")
            return False

    def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
        """Call an MCP tool in-process - override for planner-specific response handling"""
        # Use in-process implementation to maintain conversation memory
        response_text, _ = self.call_mcp_tool_direct(tool_name, params)

        if not response_text:
            return None, None

        # Extract continuation_id from planner response specifically
        continuation_id = self._extract_planner_continuation_id(response_text)

        return response_text, continuation_id

    def _extract_planner_continuation_id(self, response_text: str) -> Optional[str]:
        """Extract continuation_id from planner response"""
        try:
            # Parse the response
            response_data = json.loads(response_text)
            return response_data.get("continuation_id")

        except json.JSONDecodeError as e:
            self.logger.debug(f"Failed to parse response for planner continuation_id: {e}")
            return None

    def _parse_planner_response(self, response_text: str) -> dict:
        """Parse planner tool JSON response"""
        try:
            # Parse the response - it should be direct JSON
            return json.loads(response_text)

        except json.JSONDecodeError as e:
            self.logger.error(f"Failed to parse planner response as JSON: {e}")
            self.logger.error(f"Response text: {response_text[:500]}...")
            return {}

    def _validate_step_response(
        self,
        response_data: dict,
        expected_step: int,
        expected_total: int,
        expected_next_required: bool,
        expected_status: str,
    ) -> bool:
        """Validate a planner step response structure"""
        try:
            # Check status
            if response_data.get("status") != expected_status:
                self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
                return False

            # Check step number
            if response_data.get("step_number") != expected_step:
                self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
                return False

            # Check total steps
            if response_data.get("total_steps") != expected_total:
                self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
                return False

            # Check next_step_required
            if response_data.get("next_step_required") != expected_next_required:
                self.logger.error(
                    f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
                )
                return False

            # Check step_content exists
            if not response_data.get("step_content"):
                self.logger.error("Missing step_content in response")
                return False

            # Check next_steps guidance
            if not response_data.get("next_steps"):
                self.logger.error("Missing next_steps guidance in response")
                return False

            return True

        except Exception as e:
            self.logger.error(f"Error validating step response: {e}")
            return False

```

--------------------------------------------------------------------------------
/tools/docgen.py:
--------------------------------------------------------------------------------

```python
"""
Documentation Generation tool - Automated code documentation with complexity analysis

This tool provides a structured workflow for adding comprehensive documentation to codebases.
It guides you through systematic code analysis to generate modern documentation with:
- Function/method parameter documentation
- Big O complexity analysis
- Call flow and dependency documentation
- Inline comments for complex logic
- Smart updating of existing documentation

Key features:
- Step-by-step documentation workflow with progress tracking
- Context-aware file embedding (references during analysis, full content for documentation)
- Automatic conversation threading and history preservation
- Expert analysis integration with external models
- Support for multiple programming languages and documentation styles
- Configurable documentation features via parameters
"""

import logging
from typing import TYPE_CHECKING, Any, Optional

from pydantic import Field

if TYPE_CHECKING:
    from tools.models import ToolModelCategory

from config import TEMPERATURE_ANALYTICAL
from systemprompts import DOCGEN_PROMPT
from tools.shared.base_models import WorkflowRequest

from .workflow.base import WorkflowTool

logger = logging.getLogger(__name__)

# Tool-specific field descriptions for documentation generation
DOCGEN_FIELD_DESCRIPTIONS = {
    "step": (
        "Step 1 (Discovery): list every file that needs documentation and record the total. Do not write docs yet. "
        "Steps 2+: document exactly one file per step. Never change code logic; log bugs separately. Keep the counters accurate."
    ),
    "step_number": "Current documentation step (starts at 1).",
    "total_steps": "1 discovery step + one step per file documented (tracks via `total_files_to_document`).",
    "next_step_required": "True while more files still need documentation; False once everything is complete.",
    "findings": "Summarize documentation gaps, complexity, call flows, and well-documented areas. Stop and report immediately if you uncover a bug.",
    "relevant_files": "Absolute paths for the file(s) you are documenting this step—stick to a single file per step.",
    "relevant_context": "Functions or methods needing documentation (e.g. 'Class.method', 'function_name'), especially complex or user-facing areas.",
    "num_files_documented": "Count of files finished so far. Increment only when a file is fully documented.",
    "total_files_to_document": "Total files identified in discovery; completion requires matching this count.",
    "document_complexity": "Include algorithmic complexity (Big O) analysis when True (default).",
    "document_flow": "Include call flow/dependency notes when True (default).",
    "update_existing": "True (default) to polish inaccurate or outdated docs instead of leaving them untouched.",
    "comments_on_complex_logic": "True (default) to add inline comments around non-obvious logic.",
}


class DocgenRequest(WorkflowRequest):
    """Request model for documentation generation steps"""

    # Required workflow fields
    step: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step"])
    step_number: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step_number"])
    total_steps: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["total_steps"])
    next_step_required: bool = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["next_step_required"])

    # Documentation analysis tracking fields
    findings: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["findings"])
    relevant_files: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_files"])
    relevant_context: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_context"])

    # Critical completion tracking counters
    num_files_documented: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"])
    total_files_to_document: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"])

    # Documentation generation configuration parameters
    document_complexity: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_complexity"])
    document_flow: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_flow"])
    update_existing: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["update_existing"])
    comments_on_complex_logic: Optional[bool] = Field(
        True, description=DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"]
    )


class DocgenTool(WorkflowTool):
    """
    Documentation generation tool for automated code documentation with complexity analysis.

    This tool implements a structured documentation workflow that guides users through
    methodical code analysis to generate comprehensive documentation including:
    - Function/method signatures and parameter descriptions
    - Algorithmic complexity (Big O) analysis
    - Call flow and dependency documentation
    - Inline comments for complex logic
    - Modern documentation style appropriate for the language/platform
    """

    def __init__(self):
        super().__init__()
        self.initial_request = None

    def get_name(self) -> str:
        return "docgen"

    def get_description(self) -> str:
        return (
            "Generates comprehensive code documentation with systematic analysis of functions, classes, and complexity. "
            "Use for documentation generation, code analysis, complexity assessment, and API documentation. "
            "Analyzes code structure and patterns to create thorough documentation."
        )

    def get_system_prompt(self) -> str:
        return DOCGEN_PROMPT

    def get_default_temperature(self) -> float:
        return TEMPERATURE_ANALYTICAL

    def get_model_category(self) -> "ToolModelCategory":
        """Docgen requires analytical and reasoning capabilities"""
        from tools.models import ToolModelCategory

        return ToolModelCategory.EXTENDED_REASONING

    def requires_model(self) -> bool:
        """
        Docgen tool doesn't require model resolution at the MCP boundary.

        The docgen tool is a self-contained workflow tool that guides the CLI agent through
        systematic documentation generation without calling external AI models.

        Returns:
            bool: False - docgen doesn't need external AI model access
        """
        return False

    def requires_expert_analysis(self) -> bool:
        """Docgen is self-contained and doesn't need expert analysis."""
        return False

    def get_workflow_request_model(self):
        """Return the docgen-specific request model."""
        return DocgenRequest

    def get_tool_fields(self) -> dict[str, dict[str, Any]]:
        """Return the tool-specific fields for docgen."""
        return {
            "document_complexity": {
                "type": "boolean",
                "default": True,
                "description": DOCGEN_FIELD_DESCRIPTIONS["document_complexity"],
            },
            "document_flow": {
                "type": "boolean",
                "default": True,
                "description": DOCGEN_FIELD_DESCRIPTIONS["document_flow"],
            },
            "update_existing": {
                "type": "boolean",
                "default": True,
                "description": DOCGEN_FIELD_DESCRIPTIONS["update_existing"],
            },
            "comments_on_complex_logic": {
                "type": "boolean",
                "default": True,
                "description": DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"],
            },
            "num_files_documented": {
                "type": "integer",
                "default": 0,
                "minimum": 0,
                "description": DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"],
            },
            "total_files_to_document": {
                "type": "integer",
                "default": 0,
                "minimum": 0,
                "description": DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"],
            },
        }

    def get_required_fields(self) -> list[str]:
        """Return additional required fields beyond the standard workflow requirements."""
        return [
            "document_complexity",
            "document_flow",
            "update_existing",
            "comments_on_complex_logic",
            "num_files_documented",
            "total_files_to_document",
        ]

    def get_input_schema(self) -> dict[str, Any]:
        """Generate input schema using WorkflowSchemaBuilder with field exclusions."""
        from .workflow.schema_builders import WorkflowSchemaBuilder

        # Exclude workflow fields that documentation generation doesn't need
        excluded_workflow_fields = [
            "confidence",  # Documentation doesn't use confidence levels
            "hypothesis",  # Documentation doesn't use hypothesis
            "files_checked",  # Documentation uses doc_files and doc_methods instead for better tracking
        ]

        # Exclude common fields that documentation generation doesn't need
        excluded_common_fields = [
            "model",  # Documentation doesn't need external model selection
            "temperature",  # Documentation doesn't need temperature control
            "thinking_mode",  # Documentation doesn't need thinking mode
            "images",  # Documentation doesn't use images
        ]

        return WorkflowSchemaBuilder.build_schema(
            tool_specific_fields=self.get_tool_fields(),
            required_fields=self.get_required_fields(),  # Include docgen-specific required fields
            model_field_schema=None,  # Exclude model field - docgen doesn't need external model selection
            auto_mode=False,  # Force non-auto mode to prevent model field addition
            tool_name=self.get_name(),
            excluded_workflow_fields=excluded_workflow_fields,
            excluded_common_fields=excluded_common_fields,
        )

    def get_required_actions(
        self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
    ) -> list[str]:
        """Define required actions for comprehensive documentation analysis with step-by-step file focus."""
        if step_number == 1:
            # Initial discovery ONLY - no documentation yet
            return [
                "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
                "Discover ALL files in the current directory (not nested) that need documentation",
                "COUNT the exact number of files that need documentation",
                "LIST all the files you found that need documentation by name",
                "IDENTIFY the programming language(s) to use MODERN documentation style (/// for Objective-C, /** */ for Java/JavaScript, etc.)",
                "DO NOT start documenting any files yet - this is discovery phase only",
                "Report the total count and file list clearly to the user",
                "IMMEDIATELY call docgen step 2 after discovery to begin documentation phase",
                "WHEN CALLING DOCGEN step 2: Set total_files_to_document to the exact count you found",
                "WHEN CALLING DOCGEN step 2: Set num_files_documented to 0 (haven't started yet)",
            ]
        elif step_number == 2:
            # Start documentation phase with first file
            return [
                "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
                "Choose the FIRST file from your discovered list to start documentation",
                "For the chosen file: identify ALL functions, classes, and methods within it",
                'USE MODERN documentation style for the programming language (/// for Objective-C, /** */ for Java/JavaScript, """ for Python, etc.)',
                "Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
                "When file is 100% documented, increment num_files_documented from 0 to 1",
                "Note any dependencies this file has (what it imports/calls) and what calls into it",
                "CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
                "Report which specific functions you documented in this step for accountability",
                "Report progress: num_files_documented (1) out of total_files_to_document",
            ]
        elif step_number <= 4:
            # Continue with focused file-by-file approach
            return [
                "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
                "Choose the NEXT undocumented file from your discovered list",
                "For the chosen file: identify ALL functions, classes, and methods within it",
                "USE MODERN documentation style for the programming language (NEVER use legacy /* */ style for languages with modern alternatives)",
                "Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
                "When file is 100% documented, increment num_files_documented by 1",
                "Verify that EVERY function in the current file has proper documentation (no skipping)",
                "CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
                "Report specific function names you documented for verification",
                "Report progress: current num_files_documented out of total_files_to_document",
            ]
        else:
            # Continue systematic file-by-file coverage
            return [
                "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
                "Check counters: num_files_documented vs total_files_to_document",
                "If num_files_documented < total_files_to_document: choose NEXT undocumented file",
                "USE MODERN documentation style appropriate for each programming language (NEVER legacy styles)",
                "Document every function, method, and class in current file with no exceptions",
                "When file is 100% documented, increment num_files_documented by 1",
                "CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
                "Report progress: current num_files_documented out of total_files_to_document",
                "If num_files_documented < total_files_to_document: RESTART docgen with next step",
                "ONLY set next_step_required=false when num_files_documented equals total_files_to_document",
                "For nested dependencies: check if functions call into subdirectories and document those too",
                "CRITICAL: If ANY bugs/logic errors were found, STOP and ask user before proceeding",
            ]

    def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
        """Docgen is self-contained and doesn't need expert analysis."""
        return False

    def prepare_expert_analysis_context(self, consolidated_findings) -> str:
        """Docgen doesn't use expert analysis."""
        return ""

    def get_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
        """
        Provide step-specific guidance for documentation generation workflow.

        This method generates docgen-specific guidance used by get_step_guidance_message().
        """
        # Generate the next steps instruction based on required actions
        # Calculate dynamic total_steps based on files to document
        total_files_to_document = self.get_request_total_files_to_document(request)
        calculated_total_steps = 1 + total_files_to_document if total_files_to_document > 0 else request.total_steps

        required_actions = self.get_required_actions(step_number, confidence, request.findings, calculated_total_steps)

        if step_number == 1:
            next_steps = (
                f"DISCOVERY PHASE ONLY - DO NOT START DOCUMENTING YET!\n"
                f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first perform "
                f"FILE DISCOVERY step by step. DO NOT DOCUMENT ANYTHING YET. "
                f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
                + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
                + f"\n\nCRITICAL: When you call {self.get_name()} step 2, set total_files_to_document to the exact count "
                f"of files needing documentation and set num_files_documented to 0 (haven't started documenting yet). "
                f"Your total_steps will be automatically calculated as 1 (discovery) + number of files to document. "
                f"Step 2 will BEGIN the documentation phase. Report the count clearly and then IMMEDIATELY "
                f"proceed to call {self.get_name()} step 2 to start documenting the first file."
            )
        elif step_number == 2:
            next_steps = (
                f"DOCUMENTATION PHASE BEGINS! ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
                f"START FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
                f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
                + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
                + f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented from 0 to 1 when first file complete. "
                f"REPORT counters: current num_files_documented out of total_files_to_document. "
                f"CRITICAL: If you found ANY bugs/logic errors, STOP documenting and ask user what to do before continuing. "
                f"Do NOT move to a new file until the current one is completely documented. "
                f"When ready for step {step_number + 1}, report completed work with updated counters."
            )
        elif step_number <= 4:
            next_steps = (
                f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
                f"CONTINUE FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
                f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
                + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
                + f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented when file complete. "
                f"REPORT counters: current num_files_documented out of total_files_to_document. "
                f"CRITICAL: If you found ANY bugs/logic errors, STOP documenting and ask user what to do before continuing. "
                f"Do NOT move to a new file until the current one is completely documented. "
                f"When ready for step {step_number + 1}, report completed work with updated counters."
            )
        else:
            next_steps = (
                f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
                f"CRITICAL: Check if MORE FILES need documentation before finishing! "
                f"REQUIRED ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
                + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
                + f"\n\nREPORT which functions you documented and update num_files_documented when file complete. "
                f"CHECK: If num_files_documented < total_files_to_document, RESTART {self.get_name()} with next step! "
                f"CRITICAL: Only set next_step_required=false when num_files_documented equals total_files_to_document! "
                f"REPORT counters: current num_files_documented out of total_files_to_document. "
                f"CRITICAL: If ANY bugs/logic errors were found during documentation, STOP and ask user before proceeding. "
                f"NO recursive {self.get_name()} calls without actual documentation work!"
            )

        return {"next_steps": next_steps}

    # Hook method overrides for docgen-specific behavior

    async def handle_work_completion(self, response_data: dict, request, arguments: dict) -> dict:
        """
        Override work completion to enforce counter validation.

        The docgen tool MUST complete ALL files before finishing. If counters don't match,
        force continuation regardless of next_step_required setting.
        """
        # CRITICAL VALIDATION: Check if all files have been documented using proper inheritance hooks
        num_files_documented = self.get_request_num_files_documented(request)
        total_files_to_document = self.get_request_total_files_to_document(request)

        if num_files_documented < total_files_to_document:
            # Counters don't match - force continuation!
            logger.warning(
                f"Docgen stopping early: {num_files_documented} < {total_files_to_document}. "
                f"Forcing continuation to document remaining files."
            )

            # Override to continuation mode
            response_data["status"] = "documentation_analysis_required"
            response_data[f"pause_for_{self.get_name()}"] = True
            response_data["next_steps"] = (
                f"CRITICAL ERROR: You attempted to finish documentation with only {num_files_documented} "
                f"out of {total_files_to_document} files documented! You MUST continue documenting "
                f"the remaining {total_files_to_document - num_files_documented} files. "
                f"Call {self.get_name()} again with step {request.step_number + 1} and continue documentation "
                f"of the next undocumented file. DO NOT set next_step_required=false until ALL files are documented!"
            )
            return response_data

        # If counters match, proceed with normal completion
        return await super().handle_work_completion(response_data, request, arguments)

    def prepare_step_data(self, request) -> dict:
        """
        Prepare docgen-specific step data for processing.

        Calculates total_steps dynamically based on number of files to document:
        - Step 1: Discovery phase
        - Steps 2+: One step per file to document
        """
        # Calculate dynamic total_steps based on files to document
        total_files_to_document = self.get_request_total_files_to_document(request)
        if total_files_to_document > 0:
            # Discovery step (1) + one step per file
            calculated_total_steps = 1 + total_files_to_document
        else:
            # Fallback to request total_steps if no file count available
            calculated_total_steps = request.total_steps

        step_data = {
            "step": request.step,
            "step_number": request.step_number,
            "total_steps": calculated_total_steps,  # Use calculated value
            "findings": request.findings,
            "relevant_files": request.relevant_files,
            "relevant_context": request.relevant_context,
            "num_files_documented": request.num_files_documented,
            "total_files_to_document": request.total_files_to_document,
            "issues_found": [],  # Docgen uses this for documentation gaps
            "confidence": "medium",  # Default confidence for docgen
            "hypothesis": "systematic_documentation_needed",  # Default hypothesis
            "images": [],  # Docgen doesn't typically use images
            # CRITICAL: Include documentation configuration parameters so the model can see them
            "document_complexity": request.document_complexity,
            "document_flow": request.document_flow,
            "update_existing": request.update_existing,
            "comments_on_complex_logic": request.comments_on_complex_logic,
        }
        return step_data

    def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
        """
        Docgen tool skips expert analysis when the CLI agent has "certain" confidence.
        """
        return request.confidence == "certain" and not request.next_step_required

    # Override inheritance hooks for docgen-specific behavior

    def get_completion_status(self) -> str:
        """Docgen tools use docgen-specific status."""
        return "documentation_analysis_complete"

    def get_completion_data_key(self) -> str:
        """Docgen uses 'complete_documentation_analysis' key."""
        return "complete_documentation_analysis"

    def get_final_analysis_from_request(self, request):
        """Docgen tools use 'hypothesis' field for documentation strategy."""
        return request.hypothesis

    def get_confidence_level(self, request) -> str:
        """Docgen tools use 'certain' for high confidence."""
        return request.confidence or "high"

    def get_completion_message(self) -> str:
        """Docgen-specific completion message."""
        return (
            "Documentation analysis complete with high confidence. You have identified the comprehensive "
            "documentation needs and strategy. MANDATORY: Present the user with the documentation plan "
            "and IMMEDIATELY proceed with implementing the documentation without requiring further "
            "consultation. Focus on the precise documentation improvements needed."
        )

    def get_skip_reason(self) -> str:
        """Docgen-specific skip reason."""
        return "Completed comprehensive documentation analysis locally"

    def get_request_relevant_context(self, request) -> list:
        """Get relevant_context for docgen tool."""
        try:
            return request.relevant_context or []
        except AttributeError:
            return []

    def get_request_num_files_documented(self, request) -> int:
        """Get num_files_documented from request. Override for custom handling."""
        try:
            return request.num_files_documented or 0
        except AttributeError:
            return 0

    def get_request_total_files_to_document(self, request) -> int:
        """Get total_files_to_document from request. Override for custom handling."""
        try:
            return request.total_files_to_document or 0
        except AttributeError:
            return 0

    def get_skip_expert_analysis_status(self) -> str:
        """Docgen-specific expert analysis skip status."""
        return "skipped_due_to_complete_analysis"

    def prepare_work_summary(self) -> str:
        """Docgen-specific work summary."""
        try:
            return f"Completed {len(self.work_history)} documentation analysis steps"
        except AttributeError:
            return "Completed documentation analysis"

    def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
        """
        Docgen-specific completion message.
        """
        return (
            "DOCUMENTATION ANALYSIS IS COMPLETE FOR ALL FILES (num_files_documented equals total_files_to_document). "
            "MANDATORY FINAL VERIFICATION: Before presenting your summary, you MUST perform a final verification scan. "
            "Read through EVERY file you documented and check EVERY function, method, class, and property to confirm "
            "it has proper documentation including complexity analysis and call flow information. If ANY items lack "
            "documentation, document them immediately before finishing. "
            "THEN present a clear summary showing: 1) Final counters: num_files_documented out of total_files_to_document, "
            "2) Complete accountability list of ALL files you documented with verification status, "
            "3) Detailed list of EVERY function/method you documented in each file (proving complete coverage), "
            "4) Any dependency relationships you discovered between files, 5) Recommended documentation improvements with concrete examples including "
            "complexity analysis and call flow information. 6) **CRITICAL**: List any bugs or logic issues you found "
            "during documentation but did NOT fix - present these to the user and ask what they'd like to do about them. "
            "Make it easy for a developer to see the complete documentation status across the entire codebase with full accountability."
        )

    def get_step_guidance_message(self, request) -> str:
        """
        Docgen-specific step guidance with detailed analysis instructions.
        """
        step_guidance = self.get_step_guidance(request.step_number, request.confidence, request)
        return step_guidance["next_steps"]

    def customize_workflow_response(self, response_data: dict, request) -> dict:
        """
        Customize response to match docgen tool format.
        """
        # Store initial request on first step
        if request.step_number == 1:
            self.initial_request = request.step

        # Convert generic status names to docgen-specific ones
        tool_name = self.get_name()
        status_mapping = {
            f"{tool_name}_in_progress": "documentation_analysis_in_progress",
            f"pause_for_{tool_name}": "pause_for_documentation_analysis",
            f"{tool_name}_required": "documentation_analysis_required",
            f"{tool_name}_complete": "documentation_analysis_complete",
        }

        if response_data["status"] in status_mapping:
            response_data["status"] = status_mapping[response_data["status"]]

        # Rename status field to match docgen tool
        if f"{tool_name}_status" in response_data:
            response_data["documentation_analysis_status"] = response_data.pop(f"{tool_name}_status")
            # Add docgen-specific status fields
            response_data["documentation_analysis_status"]["documentation_strategies"] = len(
                self.consolidated_findings.hypotheses
            )

        # Rename complete documentation analysis data
        if f"complete_{tool_name}" in response_data:
            response_data["complete_documentation_analysis"] = response_data.pop(f"complete_{tool_name}")

        # Map the completion flag to match docgen tool
        if f"{tool_name}_complete" in response_data:
            response_data["documentation_analysis_complete"] = response_data.pop(f"{tool_name}_complete")

        # Map the required flag to match docgen tool
        if f"{tool_name}_required" in response_data:
            response_data["documentation_analysis_required"] = response_data.pop(f"{tool_name}_required")

        return response_data

    # Required abstract methods from BaseTool
    def get_request_model(self):
        """Return the docgen-specific request model."""
        return DocgenRequest

    async def prepare_prompt(self, request) -> str:
        """Not used - workflow tools use execute_workflow()."""
        return ""  # Workflow tools use execute_workflow() directly

```

--------------------------------------------------------------------------------
/tests/test_conversation_memory.py:
--------------------------------------------------------------------------------

```python
"""
Test suite for conversation memory system

Tests the Redis-based conversation persistence needed for AI-to-AI multi-turn
discussions in stateless MCP environments.
"""

import os
from unittest.mock import Mock, patch

import pytest

from server import get_follow_up_instructions
from utils.conversation_memory import (
    CONVERSATION_TIMEOUT_SECONDS,
    MAX_CONVERSATION_TURNS,
    ConversationTurn,
    ThreadContext,
    add_turn,
    build_conversation_history,
    create_thread,
    get_thread,
)


class TestConversationMemory:
    """Test the conversation memory system for stateless MCP requests"""

    @patch("utils.conversation_memory.get_storage")
    def test_create_thread(self, mock_storage):
        """Test creating a new thread"""
        mock_client = Mock()
        mock_storage.return_value = mock_client

        thread_id = create_thread("chat", {"prompt": "Hello", "absolute_file_paths": ["/test.py"]})

        assert thread_id is not None
        assert len(thread_id) == 36  # UUID4 length

        # Verify Redis was called
        mock_client.setex.assert_called_once()
        call_args = mock_client.setex.call_args
        assert call_args[0][0] == f"thread:{thread_id}"  # key
        assert call_args[0][1] == CONVERSATION_TIMEOUT_SECONDS  # TTL from configuration

    @patch("utils.conversation_memory.get_storage")
    def test_get_thread_valid(self, mock_storage):
        """Test retrieving an existing thread"""
        mock_client = Mock()
        mock_storage.return_value = mock_client

        test_uuid = "12345678-1234-1234-1234-123456789012"

        # Create valid ThreadContext and serialize it
        context_obj = ThreadContext(
            thread_id=test_uuid,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:01:00Z",
            tool_name="chat",
            turns=[],
            initial_context={"prompt": "test"},
        )
        mock_client.get.return_value = context_obj.model_dump_json()

        context = get_thread(test_uuid)

        assert context is not None
        assert context.thread_id == test_uuid
        assert context.tool_name == "chat"
        mock_client.get.assert_called_once_with(f"thread:{test_uuid}")

    @patch("utils.conversation_memory.get_storage")
    def test_get_thread_invalid_uuid(self, mock_storage):
        """Test handling invalid UUID"""
        context = get_thread("invalid-uuid")
        assert context is None

    @patch("utils.conversation_memory.get_storage")
    def test_get_thread_not_found(self, mock_storage):
        """Test handling thread not found"""
        mock_client = Mock()
        mock_storage.return_value = mock_client
        mock_client.get.return_value = None

        context = get_thread("12345678-1234-1234-1234-123456789012")
        assert context is None

    @patch("utils.conversation_memory.get_storage")
    def test_add_turn_success(self, mock_storage):
        """Test adding a turn to existing thread"""
        mock_client = Mock()
        mock_storage.return_value = mock_client

        test_uuid = "12345678-1234-1234-1234-123456789012"

        # Create valid ThreadContext
        context_obj = ThreadContext(
            thread_id=test_uuid,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:01:00Z",
            tool_name="chat",
            turns=[],
            initial_context={"prompt": "test"},
        )
        mock_client.get.return_value = context_obj.model_dump_json()

        success = add_turn(test_uuid, "user", "Hello there")

        assert success is True
        # Verify Redis get and setex were called
        mock_client.get.assert_called_once()
        mock_client.setex.assert_called_once()

    @patch("utils.conversation_memory.get_storage")
    def test_add_turn_max_limit(self, mock_storage):
        """Test turn limit enforcement"""
        mock_client = Mock()
        mock_storage.return_value = mock_client

        test_uuid = "12345678-1234-1234-1234-123456789012"

        # Create thread with MAX_CONVERSATION_TURNS turns (at limit)
        turns = [
            ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
            for i in range(MAX_CONVERSATION_TURNS)
        ]
        context_obj = ThreadContext(
            thread_id=test_uuid,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:01:00Z",
            tool_name="chat",
            turns=turns,
            initial_context={"prompt": "test"},
        )
        mock_client.get.return_value = context_obj.model_dump_json()

        success = add_turn(test_uuid, "user", "This should fail")

        assert success is False

    @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
    def test_build_conversation_history(self, project_path):
        """Test building conversation history format with files and speaker identification"""
        from providers.registry import ModelProviderRegistry

        ModelProviderRegistry.clear_cache()

        # Create real test files to test actual file embedding functionality
        main_file = project_path / "main.py"
        readme_file = project_path / "docs" / "readme.md"
        examples_dir = project_path / "examples"
        examples_file = examples_dir / "example.py"

        # Create directories and files
        readme_file.parent.mkdir(parents=True, exist_ok=True)
        examples_dir.mkdir(parents=True, exist_ok=True)

        main_file.write_text("def main():\n    print('Hello world')\n")
        readme_file.write_text("# Project Documentation\nThis is a test project.\n")
        examples_file.write_text("# Example code\nprint('Example')\n")

        test_uuid = "12345678-1234-1234-1234-123456789012"

        turns = [
            ConversationTurn(
                role="user",
                content="What is Python?",
                timestamp="2023-01-01T00:00:00Z",
                files=[str(main_file), str(readme_file)],
            ),
            ConversationTurn(
                role="assistant",
                content="Python is a programming language",
                timestamp="2023-01-01T00:01:00Z",
                files=[str(examples_dir)],  # Directory will be expanded to files
                tool_name="chat",
                model_name="gpt-5",
                model_provider="openai",
            ),
        ]

        context = ThreadContext(
            thread_id=test_uuid,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:01:00Z",
            tool_name="chat",
            turns=turns,
            initial_context={},
        )

        history, tokens = build_conversation_history(context, model_context=None)

        # Test basic structure
        assert "CONVERSATION HISTORY" in history
        assert f"Thread: {test_uuid}" in history
        assert "Tool: chat" in history
        assert f"Turn 2/{MAX_CONVERSATION_TURNS}" in history

        # Test speaker identification
        assert "--- Turn 1 (Agent) ---" in history
        assert "--- Turn 2 (gpt-5 using chat via openai) ---" in history

        # Test content
        assert "What is Python?" in history
        assert "Python is a programming language" in history

        # Test file tracking
        # Check that the new file embedding section is included
        assert "=== FILES REFERENCED IN THIS CONVERSATION ===" in history
        assert "The following files have been shared and analyzed during our conversation." in history

        # Check that file context from previous turns is included (now shows files used per turn)
        assert f"Files used in this turn: {main_file}, {readme_file}" in history
        assert f"Files used in this turn: {examples_dir}" in history

        # Verify actual file content is embedded
        assert "def main():" in history
        assert "Hello world" in history
        assert "Project Documentation" in history

    def test_build_conversation_history_empty(self):
        """Test building history with no turns"""
        test_uuid = "12345678-1234-1234-1234-123456789012"

        context = ThreadContext(
            thread_id=test_uuid,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:00:00Z",
            tool_name="chat",
            turns=[],
            initial_context={},
        )

        history, tokens = build_conversation_history(context, model_context=None)
        assert history == ""
        assert tokens == 0


class TestConversationFlow:
    """Test complete conversation flows simulating stateless MCP requests"""

    @patch("utils.conversation_memory.get_storage")
    def test_complete_conversation_cycle(self, mock_storage):
        """Test a complete 5-turn conversation until limit reached"""
        mock_client = Mock()
        mock_storage.return_value = mock_client

        # Simulate independent MCP request cycles

        # REQUEST 1: Initial request creates thread
        thread_id = create_thread("chat", {"prompt": "Analyze this code"})
        initial_context = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:00:00Z",
            tool_name="chat",
            turns=[],
            initial_context={"prompt": "Analyze this code"},
        )
        mock_client.get.return_value = initial_context.model_dump_json()

        # Add assistant response
        success = add_turn(
            thread_id,
            "assistant",
            "Code analysis complete",
        )
        assert success is True

        # REQUEST 2: User responds to follow-up (independent request cycle)
        # Simulate retrieving updated context from Redis
        context_after_1 = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:01:00Z",
            tool_name="chat",
            turns=[
                ConversationTurn(
                    role="assistant",
                    content="Code analysis complete",
                    timestamp="2023-01-01T00:00:30Z",
                )
            ],
            initial_context={"prompt": "Analyze this code"},
        )
        mock_client.get.return_value = context_after_1.model_dump_json()

        success = add_turn(thread_id, "user", "Yes, check error handling")
        assert success is True

        success = add_turn(thread_id, "assistant", "Error handling reviewed")
        assert success is True

        # REQUEST 3-5: Continue conversation (simulating independent cycles)
        # After turn 3
        context_after_3 = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:03:00Z",
            tool_name="chat",
            turns=[
                ConversationTurn(
                    role="assistant",
                    content="Code analysis complete",
                    timestamp="2023-01-01T00:00:30Z",
                ),
                ConversationTurn(role="user", content="Yes, check error handling", timestamp="2023-01-01T00:01:30Z"),
                ConversationTurn(
                    role="assistant",
                    content="Error handling reviewed",
                    timestamp="2023-01-01T00:02:30Z",
                ),
            ],
            initial_context={"prompt": "Analyze this code"},
        )
        mock_client.get.return_value = context_after_3.model_dump_json()

        success = add_turn(thread_id, "user", "Yes, check tests")
        assert success is True

        success = add_turn(thread_id, "assistant", "Test coverage analyzed")
        assert success is True

        # REQUEST 6: Try to exceed MAX_CONVERSATION_TURNS limit - should fail
        turns_at_limit = [
            ConversationTurn(
                role="assistant" if i % 2 == 0 else "user", content=f"Turn {i + 1}", timestamp="2023-01-01T00:00:30Z"
            )
            for i in range(MAX_CONVERSATION_TURNS)
        ]

        context_at_limit = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:05:00Z",
            tool_name="chat",
            turns=turns_at_limit,
            initial_context={"prompt": "Analyze this code"},
        )
        mock_client.get.return_value = context_at_limit.model_dump_json()

        # This should fail - conversation has reached limit
        success = add_turn(thread_id, "user", "This should be rejected")
        assert success is False  # CONVERSATION STOPS HERE

    @patch("utils.conversation_memory.get_storage")
    def test_invalid_continuation_id_error(self, mock_storage):
        """Test that invalid continuation IDs raise proper error for restart"""
        from server import reconstruct_thread_context

        mock_client = Mock()
        mock_storage.return_value = mock_client
        mock_client.get.return_value = None  # Thread not found

        arguments = {"continuation_id": "invalid-uuid-12345", "prompt": "Continue conversation"}

        # Should raise ValueError asking to restart
        with pytest.raises(ValueError) as exc_info:
            import asyncio

            asyncio.run(reconstruct_thread_context(arguments))

        error_msg = str(exc_info.value)
        assert "Conversation thread 'invalid-uuid-12345' was not found or has expired" in error_msg
        assert (
            "Please restart the conversation by providing your full question/prompt without the continuation_id"
            in error_msg
        )

    @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
    def test_dynamic_max_turns_configuration(self):
        """Test that all functions respect MAX_CONVERSATION_TURNS configuration"""
        from providers.registry import ModelProviderRegistry

        ModelProviderRegistry.clear_cache()

        # This test ensures if we change MAX_CONVERSATION_TURNS, everything updates

        # Test with different max values by patching the constant
        test_values = [3, 7, 10]

        for test_max in test_values:
            # Create turns up to the test limit
            turns = [
                ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
                for i in range(test_max)
            ]

            # Test history building respects the limit
            test_uuid = "12345678-1234-1234-1234-123456789012"
            context = ThreadContext(
                thread_id=test_uuid,
                created_at="2023-01-01T00:00:00Z",
                last_updated_at="2023-01-01T00:00:00Z",
                tool_name="chat",
                turns=turns,
                initial_context={},
            )

            history, tokens = build_conversation_history(context, model_context=None)
            expected_turn_text = f"Turn {test_max}/{MAX_CONVERSATION_TURNS}"
            assert expected_turn_text in history

    def test_follow_up_instructions_dynamic_behavior(self):
        """Test that follow-up instructions change correctly based on turn count and max setting"""
        # Test with default MAX_CONVERSATION_TURNS
        max_turns = MAX_CONVERSATION_TURNS

        # Test early conversation (should allow follow-ups)
        early_instructions = get_follow_up_instructions(0, max_turns)
        assert "CONVERSATION CONTINUATION" in early_instructions
        assert f"({max_turns - 1} exchanges remaining)" in early_instructions
        assert "Feel free to ask clarifying questions" in early_instructions

        # Test mid conversation
        mid_instructions = get_follow_up_instructions(2, max_turns)
        assert "CONVERSATION CONTINUATION" in mid_instructions
        assert f"({max_turns - 3} exchanges remaining)" in mid_instructions
        assert "Feel free to ask clarifying questions" in mid_instructions

        # Test approaching limit (should stop follow-ups)
        limit_instructions = get_follow_up_instructions(max_turns - 1, max_turns)
        assert "Do NOT include any follow-up questions" in limit_instructions
        assert "final exchange" in limit_instructions

        # Test at limit
        at_limit_instructions = get_follow_up_instructions(max_turns, max_turns)
        assert "Do NOT include any follow-up questions" in at_limit_instructions

        # Test with custom max_turns to ensure dynamic behavior
        custom_max = 3
        custom_early = get_follow_up_instructions(0, custom_max)
        assert f"({custom_max - 1} exchanges remaining)" in custom_early

        custom_limit = get_follow_up_instructions(custom_max - 1, custom_max)
        assert "Do NOT include any follow-up questions" in custom_limit

    def test_follow_up_instructions_defaults_to_config(self):
        """Test that follow-up instructions use MAX_CONVERSATION_TURNS when max_turns not provided"""
        instructions = get_follow_up_instructions(0)  # No max_turns parameter
        expected_remaining = MAX_CONVERSATION_TURNS - 1
        assert f"({expected_remaining} exchanges remaining)" in instructions

    @patch("utils.conversation_memory.get_storage")
    def test_complete_conversation_with_dynamic_turns(self, mock_storage):
        """Test complete conversation respecting MAX_CONVERSATION_TURNS dynamically"""
        mock_client = Mock()
        mock_storage.return_value = mock_client

        thread_id = create_thread("chat", {"prompt": "Start conversation"})

        # Simulate conversation up to MAX_CONVERSATION_TURNS - 1
        for turn_num in range(MAX_CONVERSATION_TURNS - 1):
            # Mock context with current turns
            turns = [
                ConversationTurn(
                    role="user" if i % 2 == 0 else "assistant",
                    content=f"Turn {i + 1}",
                    timestamp="2023-01-01T00:00:00Z",
                )
                for i in range(turn_num)
            ]

            context = ThreadContext(
                thread_id=thread_id,
                created_at="2023-01-01T00:00:00Z",
                last_updated_at="2023-01-01T00:00:00Z",
                tool_name="chat",
                turns=turns,
                initial_context={"prompt": "Start conversation"},
            )
            mock_client.get.return_value = context.model_dump_json()

            # Should succeed
            success = add_turn(thread_id, "user", f"User turn {turn_num + 1}")
            assert success is True, f"Turn {turn_num + 1} should succeed"

        # Now we should be at the limit - create final context
        final_turns = [
            ConversationTurn(
                role="user" if i % 2 == 0 else "assistant", content=f"Turn {i + 1}", timestamp="2023-01-01T00:00:00Z"
            )
            for i in range(MAX_CONVERSATION_TURNS)
        ]

        final_context = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:00:00Z",
            tool_name="chat",
            turns=final_turns,
            initial_context={"prompt": "Start conversation"},
        )
        mock_client.get.return_value = final_context.model_dump_json()

        # This should fail - at the limit
        success = add_turn(thread_id, "user", "This should fail")
        assert success is False, f"Turn {MAX_CONVERSATION_TURNS + 1} should fail"

    @patch("utils.conversation_memory.get_storage")
    @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
    def test_conversation_with_files_and_context_preservation(self, mock_storage):
        """Test complete conversation flow with file tracking and context preservation"""
        from providers.registry import ModelProviderRegistry

        ModelProviderRegistry.clear_cache()

        mock_client = Mock()
        mock_storage.return_value = mock_client

        # Start conversation with files using a simple tool
        thread_id = create_thread("chat", {"prompt": "Analyze this codebase", "absolute_file_paths": ["/project/src/"]})

        # Turn 1: Claude provides context with multiple files
        initial_context = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:00:00Z",
            tool_name="chat",
            turns=[],
            initial_context={
                "prompt": "Analyze this codebase",
                "absolute_file_paths": ["/project/src/"],
            },
        )
        mock_client.get.return_value = initial_context.model_dump_json()

        # Add Gemini's response
        success = add_turn(
            thread_id,
            "assistant",
            "I've analyzed your codebase structure.",
            files=["/project/src/main.py", "/project/src/utils.py"],
            tool_name="analyze",
            model_name="gemini-2.5-flash",
            model_provider="google",
        )
        assert success is True

        # Turn 2: Claude responds with different files
        context_turn_1 = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:01:00Z",
            tool_name="analyze",
            turns=[
                ConversationTurn(
                    role="assistant",
                    content="I've analyzed your codebase structure.",
                    timestamp="2023-01-01T00:00:30Z",
                    files=["/project/src/main.py", "/project/src/utils.py"],
                    tool_name="analyze",
                    model_name="gemini-2.5-flash",
                    model_provider="google",
                )
            ],
            initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
        )
        mock_client.get.return_value = context_turn_1.model_dump_json()

        # User responds with test files
        success = add_turn(
            thread_id, "user", "Yes, check the test coverage", files=["/project/tests/", "/project/test_main.py"]
        )
        assert success is True

        # Turn 3: Gemini analyzes tests
        context_turn_2 = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:02:00Z",
            tool_name="analyze",
            turns=[
                ConversationTurn(
                    role="assistant",
                    content="I've analyzed your codebase structure.",
                    timestamp="2023-01-01T00:00:30Z",
                    files=["/project/src/main.py", "/project/src/utils.py"],
                    tool_name="analyze",
                ),
                ConversationTurn(
                    role="user",
                    content="Yes, check the test coverage",
                    timestamp="2023-01-01T00:01:30Z",
                    files=["/project/tests/", "/project/test_main.py"],
                ),
            ],
            initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
        )
        mock_client.get.return_value = context_turn_2.model_dump_json()

        success = add_turn(
            thread_id,
            "assistant",
            "Test coverage analysis complete. Coverage is 85%.",
            files=["/project/tests/test_utils.py", "/project/coverage.html"],
            tool_name="analyze",
            model_name="gemini-2.5-flash",
            model_provider="google",
        )
        assert success is True

        # Build conversation history and verify chronological file preservation
        final_context = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:03:00Z",
            tool_name="analyze",
            turns=[
                ConversationTurn(
                    role="assistant",
                    content="I've analyzed your codebase structure.",
                    timestamp="2023-01-01T00:00:30Z",
                    files=["/project/src/main.py", "/project/src/utils.py"],
                    tool_name="analyze",
                    model_name="gemini-2.5-flash",
                    model_provider="google",
                ),
                ConversationTurn(
                    role="user",
                    content="Yes, check the test coverage",
                    timestamp="2023-01-01T00:01:30Z",
                    files=["/project/tests/", "/project/test_main.py"],
                ),
                ConversationTurn(
                    role="assistant",
                    content="Test coverage analysis complete. Coverage is 85%.",
                    timestamp="2023-01-01T00:02:30Z",
                    files=["/project/tests/test_utils.py", "/project/coverage.html"],
                    tool_name="analyze",
                    model_name="gemini-2.5-flash",
                    model_provider="google",
                ),
            ],
            initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
        )

        history, tokens = build_conversation_history(final_context)

        # Verify chronological order and speaker identification
        assert "--- Turn 1 (gemini-2.5-flash using analyze via google) ---" in history
        assert "--- Turn 2 (Agent) ---" in history
        assert "--- Turn 3 (gemini-2.5-flash using analyze via google) ---" in history

        # Verify all files are preserved in chronological order
        turn_1_files = "Files used in this turn: /project/src/main.py, /project/src/utils.py"
        turn_2_files = "Files used in this turn: /project/tests/, /project/test_main.py"
        turn_3_files = "Files used in this turn: /project/tests/test_utils.py, /project/coverage.html"

        assert turn_1_files in history
        assert turn_2_files in history
        assert turn_3_files in history

        # Verify content
        assert "I've analyzed your codebase structure." in history
        assert "Yes, check the test coverage" in history
        assert "Test coverage analysis complete. Coverage is 85%." in history

        # Verify chronological ordering (turn 1 appears before turn 2, etc.)
        turn_1_pos = history.find("--- Turn 1 (gemini-2.5-flash using analyze via google) ---")
        turn_2_pos = history.find("--- Turn 2 (Agent) ---")
        turn_3_pos = history.find("--- Turn 3 (gemini-2.5-flash using analyze via google) ---")

        assert turn_1_pos < turn_2_pos < turn_3_pos

    @patch("utils.conversation_memory.get_storage")
    def test_stateless_request_isolation(self, mock_storage):
        """Test that each request cycle is independent but shares context via Redis"""
        mock_client = Mock()
        mock_storage.return_value = mock_client

        # Simulate two different "processes" accessing same thread
        thread_id = "12345678-1234-1234-1234-123456789012"

        # Process 1: Creates thread
        initial_context = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:00:00Z",
            tool_name="thinkdeep",
            turns=[],
            initial_context={"prompt": "Think about architecture"},
        )
        mock_client.get.return_value = initial_context.model_dump_json()

        success = add_turn(thread_id, "assistant", "Architecture analysis")
        assert success is True

        # Process 2: Different "request cycle" accesses same thread
        context_from_redis = ThreadContext(
            thread_id=thread_id,
            created_at="2023-01-01T00:00:00Z",
            last_updated_at="2023-01-01T00:01:00Z",
            tool_name="thinkdeep",
            turns=[
                ConversationTurn(
                    role="assistant",
                    content="Architecture analysis",
                    timestamp="2023-01-01T00:00:30Z",
                )
            ],
            initial_context={"prompt": "Think about architecture"},
        )
        mock_client.get.return_value = context_from_redis.model_dump_json()

        # Verify context continuity across "processes"
        retrieved_context = get_thread(thread_id)
        assert retrieved_context is not None
        assert len(retrieved_context.turns) == 1

    @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
    def test_token_limit_optimization_in_conversation_history(self):
        """Test that build_conversation_history efficiently handles token limits"""
        import os
        import tempfile

        from providers.registry import ModelProviderRegistry

        ModelProviderRegistry.clear_cache()

        from utils.conversation_memory import build_conversation_history

        # Create test files with known content sizes
        with tempfile.TemporaryDirectory() as temp_dir:
            # Create small and large test files
            small_file = os.path.join(temp_dir, "small.py")
            large_file = os.path.join(temp_dir, "large.py")

            small_content = "# Small file\nprint('hello')\n"
            large_content = "# Large file\n" + "x = 1\n" * 10000  # Very large file

            with open(small_file, "w") as f:
                f.write(small_content)
            with open(large_file, "w") as f:
                f.write(large_content)

            # Create context with files that would exceed token limit
            context = ThreadContext(
                thread_id="test-token-limit",
                created_at="2023-01-01T00:00:00Z",
                last_updated_at="2023-01-01T00:01:00Z",
                tool_name="analyze",
                turns=[
                    ConversationTurn(
                        role="user",
                        content="Analyze these files",
                        timestamp="2023-01-01T00:00:30Z",
                        files=[small_file, large_file],  # Large file should be truncated
                    )
                ],
                initial_context={"prompt": "Analyze code"},
            )

            # Build conversation history (should handle token limits gracefully)
            history, tokens = build_conversation_history(context, model_context=None)

            # Verify the history was built successfully
            assert "=== CONVERSATION HISTORY" in history
            assert "=== FILES REFERENCED IN THIS CONVERSATION ===" in history

            # The small file should be included, but large file might be truncated
            # At minimum, verify no crashes and history is generated
            assert len(history) > 0

            # If truncation occurred, there should be a note about it
            if "additional file(s) were truncated due to token limit" in history:
                assert small_file in history or large_file in history
            else:
                # Both files fit within limit
                assert small_file in history
                assert large_file in history


if __name__ == "__main__":
    pytest.main([__file__])

```

--------------------------------------------------------------------------------
/tests/test_large_prompt_handling.py:
--------------------------------------------------------------------------------

```python
"""
Tests for large prompt handling functionality.

This test module verifies that the MCP server correctly handles
prompts that exceed the 50,000 character limit by requesting
Claude to save them to a file and resend.
"""

import json
import os
import shutil
import tempfile
from unittest.mock import MagicMock, patch

import pytest

from config import MCP_PROMPT_SIZE_LIMIT
from tools.chat import ChatTool
from tools.codereview import CodeReviewTool
from tools.shared.exceptions import ToolExecutionError

# from tools.debug import DebugIssueTool  # Commented out - debug tool refactored


class TestLargePromptHandling:
    """Test suite for large prompt handling across all tools."""

    def teardown_method(self):
        """Clean up after each test to prevent state pollution."""
        # Clear provider registry singleton
        from providers.registry import ModelProviderRegistry

        ModelProviderRegistry._instance = None

    @pytest.fixture
    def large_prompt(self):
        """Create a prompt larger than MCP_PROMPT_SIZE_LIMIT characters."""
        return "x" * (MCP_PROMPT_SIZE_LIMIT + 1000)

    @pytest.fixture
    def normal_prompt(self):
        """Create a normal-sized prompt."""
        return "This is a normal prompt that should work fine."

    @pytest.fixture
    def temp_prompt_file(self, large_prompt):
        """Create a temporary prompt.txt file with large content."""
        # Create temp file with exact name "prompt.txt"
        temp_dir = tempfile.mkdtemp()
        file_path = os.path.join(temp_dir, "prompt.txt")
        with open(file_path, "w") as f:
            f.write(large_prompt)
        return file_path

    @pytest.mark.asyncio
    async def test_chat_large_prompt_detection(self, large_prompt):
        """Test that chat tool detects large prompts."""
        tool = ChatTool()
        temp_dir = tempfile.mkdtemp()
        temp_dir = tempfile.mkdtemp()
        try:
            with pytest.raises(ToolExecutionError) as exc_info:
                await tool.execute({"prompt": large_prompt, "working_directory_absolute_path": temp_dir})
        finally:
            shutil.rmtree(temp_dir, ignore_errors=True)

        output = json.loads(exc_info.value.payload)
        assert output["status"] == "resend_prompt"
        assert f"{MCP_PROMPT_SIZE_LIMIT:,} characters" in output["content"]
        # The prompt size should match the user input since we check at MCP transport boundary before adding internal content
        assert output["metadata"]["prompt_size"] == len(large_prompt)
        assert output["metadata"]["limit"] == MCP_PROMPT_SIZE_LIMIT

    @pytest.mark.asyncio
    async def test_chat_normal_prompt_works(self, normal_prompt):
        """Test that chat tool works normally with regular prompts."""
        tool = ChatTool()

        temp_dir = tempfile.mkdtemp()

        # This test runs in the test environment which uses dummy keys
        # The chat tool will return an error for dummy keys, which is expected
        try:
            try:
                result = await tool.execute(
                    {"prompt": normal_prompt, "model": "gemini-2.5-flash", "working_directory_absolute_path": temp_dir}
                )
            except ToolExecutionError as exc:
                output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
            else:
                assert len(result) == 1
                output = json.loads(result[0].text)
        finally:
            shutil.rmtree(temp_dir, ignore_errors=True)

        # Whether provider succeeds or fails, we should not hit the resend_prompt branch
        assert output["status"] != "resend_prompt"

    @pytest.mark.asyncio
    async def test_chat_prompt_file_handling(self):
        """Test that chat tool correctly handles prompt.txt files with reasonable size."""
        tool = ChatTool()
        # Use a smaller prompt that won't exceed limit when combined with system prompt
        reasonable_prompt = "This is a reasonable sized prompt for testing prompt.txt file handling."

        # Create a temp file with reasonable content
        temp_dir = tempfile.mkdtemp()
        temp_prompt_file = os.path.join(temp_dir, "prompt.txt")
        with open(temp_prompt_file, "w") as f:
            f.write(reasonable_prompt)

        try:
            try:
                result = await tool.execute(
                    {
                        "prompt": "",
                        "absolute_file_paths": [temp_prompt_file],
                        "model": "gemini-2.5-flash",
                        "working_directory_absolute_path": temp_dir,
                    }
                )
            except ToolExecutionError as exc:
                output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
            else:
                assert len(result) == 1
                output = json.loads(result[0].text)

            # The test may fail with dummy API keys, which is expected behavior.
            # We're mainly testing that the tool processes prompt files correctly without size errors.
            assert output["status"] != "resend_prompt"
        finally:
            # Cleanup
            shutil.rmtree(temp_dir)

    @pytest.mark.asyncio
    async def test_codereview_large_focus(self, large_prompt):
        """Test that codereview tool detects large focus_on field using real integration testing."""
        import importlib
        import os

        tool = CodeReviewTool()

        # Save original environment
        original_env = {
            "OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY"),
            "DEFAULT_MODEL": os.environ.get("DEFAULT_MODEL"),
        }

        try:
            # Set up environment for real provider resolution
            os.environ["OPENAI_API_KEY"] = "sk-test-key-large-focus-test-not-real"
            os.environ["DEFAULT_MODEL"] = "o3-mini"

            # Clear other provider keys to isolate to OpenAI
            for key in ["GEMINI_API_KEY", "XAI_API_KEY", "OPENROUTER_API_KEY"]:
                os.environ.pop(key, None)

            # Reload config and clear registry
            import config

            importlib.reload(config)
            from providers.registry import ModelProviderRegistry

            ModelProviderRegistry._instance = None

            # Test with real provider resolution
            try:
                args = {
                    "step": "initial review setup",
                    "step_number": 1,
                    "total_steps": 1,
                    "next_step_required": False,
                    "findings": "Initial testing",
                    "relevant_files": ["/some/file.py"],
                    "files_checked": ["/some/file.py"],
                    "focus_on": large_prompt,
                    "prompt": "Test code review for validation purposes",
                    "model": "o3-mini",
                }

                try:
                    result = await tool.execute(args)
                except ToolExecutionError as exc:
                    output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
                else:
                    assert len(result) == 1
                    output = json.loads(result[0].text)

                # The large focus_on may trigger the resend_prompt guard before provider access.
                # When the guard does not trigger, auto-mode falls back to provider selection and
                # returns an error about the unavailable model. Both behaviors are acceptable for this test.
                if output.get("status") == "resend_prompt":
                    assert output["metadata"]["prompt_size"] == len(large_prompt)
                else:
                    assert output.get("status") == "error"
                    assert "Model" in output.get("content", "")

            except Exception as e:
                # If we get an unexpected exception, ensure it's not a mock artifact
                error_msg = str(e)
                assert "MagicMock" not in error_msg
                assert "'<' not supported between instances" not in error_msg

                # Should be a real provider error (API, authentication, etc.)
                assert any(
                    phrase in error_msg
                    for phrase in ["API", "key", "authentication", "provider", "network", "connection"]
                )

        finally:
            # Restore environment
            for key, value in original_env.items():
                if value is not None:
                    os.environ[key] = value
                else:
                    os.environ.pop(key, None)

            # Reload config and clear registry
            importlib.reload(config)
            ModelProviderRegistry._instance = None

    # NOTE: Precommit test has been removed because the precommit tool has been
    # refactored to use a workflow-based pattern instead of accepting simple prompt/path fields.
    # The new precommit tool requires workflow fields like: step, step_number, total_steps,
    # next_step_required, findings, etc. See simulator_tests/test_precommitworkflow_validation.py
    # for comprehensive workflow testing including large prompt handling.

    # NOTE: Debug tool tests have been commented out because the debug tool has been
    # refactored to use a self-investigation pattern instead of accepting a prompt field.
    # The new debug tool requires fields like: step, step_number, total_steps, next_step_required, findings
    # and doesn't have the "resend_prompt" functionality for large prompts.

    # @pytest.mark.asyncio
    # async def test_debug_large_error_description(self, large_prompt):
    #     """Test that debug tool detects large error_description."""
    #     tool = DebugIssueTool()
    #     result = await tool.execute({"prompt": large_prompt})
    #
    #     assert len(result) == 1
    #     output = json.loads(result[0].text)
    #     assert output["status"] == "resend_prompt"

    # @pytest.mark.asyncio
    # async def test_debug_large_error_context(self, large_prompt, normal_prompt):
    #     """Test that debug tool detects large error_context."""
    #     tool = DebugIssueTool()
    #     result = await tool.execute({"prompt": normal_prompt, "error_context": large_prompt})
    #
    #     assert len(result) == 1
    #     output = json.loads(result[0].text)
    #     assert output["status"] == "resend_prompt"

    # Removed: test_analyze_large_question - workflow tool handles large prompts differently

    @pytest.mark.asyncio
    async def test_multiple_files_with_prompt_txt(self, temp_prompt_file):
        """Test handling of prompt.txt alongside other files."""
        tool = ChatTool()
        other_file = "/some/other/file.py"

        with (
            patch("utils.model_context.ModelContext") as mock_model_context_cls,
            patch.object(tool, "handle_prompt_file") as mock_handle_prompt,
            patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files,
        ):
            mock_provider = MagicMock()
            mock_provider.get_provider_type.return_value = MagicMock(value="google")
            mock_provider.generate_content.return_value = MagicMock(
                content="Success",
                usage={"input_tokens": 10, "output_tokens": 20, "total_tokens": 30},
                model_name="gemini-2.5-flash",
                metadata={"finish_reason": "STOP"},
            )

            from utils.model_context import TokenAllocation

            mock_model_context = MagicMock()
            mock_model_context.model_name = "gemini-2.5-flash"
            mock_model_context.provider = mock_provider
            mock_model_context.capabilities = MagicMock(supports_extended_thinking=False)
            mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
                total_tokens=1_000_000,
                content_tokens=800_000,
                response_tokens=200_000,
                file_tokens=320_000,
                history_tokens=320_000,
            )
            mock_model_context_cls.return_value = mock_model_context

            # Return the prompt content and updated files list (without prompt.txt)
            mock_handle_prompt.return_value = ("Large prompt content from file", [other_file])

            # Mock the centralized file preparation method
            mock_prepare_files.return_value = ("File content", [other_file])

            # Use a small prompt to avoid triggering size limit
            await tool.execute(
                {
                    "prompt": "Test prompt",
                    "absolute_file_paths": [temp_prompt_file, other_file],
                    "working_directory_absolute_path": os.path.dirname(temp_prompt_file),
                }
            )

            # Verify handle_prompt_file was called with the original files list
            mock_handle_prompt.assert_called_once_with([temp_prompt_file, other_file])

            # Verify _prepare_file_content_for_prompt was called with the updated files list (without prompt.txt)
            mock_prepare_files.assert_called_once()
            files_arg = mock_prepare_files.call_args[0][0]
            assert len(files_arg) == 1
            assert files_arg[0] == other_file

        temp_dir = os.path.dirname(temp_prompt_file)
        shutil.rmtree(temp_dir)

    @pytest.mark.asyncio
    async def test_boundary_case_exactly_at_limit(self):
        """Test prompt exactly at MCP_PROMPT_SIZE_LIMIT characters (should pass with the fix)."""
        tool = ChatTool()
        exact_prompt = "x" * MCP_PROMPT_SIZE_LIMIT

        # Mock the model provider to avoid real API calls
        with patch.object(tool, "get_model_provider") as mock_get_provider:
            mock_provider = MagicMock()
            mock_provider.get_provider_type.return_value = MagicMock(value="google")
            mock_provider.get_capabilities.return_value = MagicMock(supports_extended_thinking=False)
            mock_provider.generate_content.return_value = MagicMock(
                content="Response to the large prompt",
                usage={"input_tokens": 12000, "output_tokens": 10, "total_tokens": 12010},
                model_name="gemini-2.5-flash",
                metadata={"finish_reason": "STOP"},
            )
            mock_get_provider.return_value = mock_provider

            # With the fix, this should now pass because we check at MCP transport boundary before adding internal content
            temp_dir = tempfile.mkdtemp()
            try:
                try:
                    result = await tool.execute({"prompt": exact_prompt, "working_directory_absolute_path": temp_dir})
                except ToolExecutionError as exc:
                    output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
                else:
                    output = json.loads(result[0].text)
            finally:
                shutil.rmtree(temp_dir, ignore_errors=True)
            assert output["status"] != "resend_prompt"

    @pytest.mark.asyncio
    async def test_boundary_case_just_over_limit(self):
        """Test prompt just over MCP_PROMPT_SIZE_LIMIT characters (should trigger file request)."""
        tool = ChatTool()
        over_prompt = "x" * (MCP_PROMPT_SIZE_LIMIT + 1)

        temp_dir = tempfile.mkdtemp()
        try:
            try:
                result = await tool.execute({"prompt": over_prompt, "working_directory_absolute_path": temp_dir})
            except ToolExecutionError as exc:
                output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
            else:
                output = json.loads(result[0].text)
        finally:
            shutil.rmtree(temp_dir, ignore_errors=True)
        assert output["status"] == "resend_prompt"

    @pytest.mark.asyncio
    async def test_empty_prompt_no_file(self):
        """Test empty prompt without prompt.txt file."""
        tool = ChatTool()

        with patch.object(tool, "get_model_provider") as mock_get_provider:
            mock_provider = MagicMock()
            mock_provider.get_provider_type.return_value = MagicMock(value="google")
            mock_provider.get_capabilities.return_value = MagicMock(supports_extended_thinking=False)
            mock_provider.generate_content.return_value = MagicMock(
                content="Success",
                usage={"input_tokens": 10, "output_tokens": 20, "total_tokens": 30},
                model_name="gemini-2.5-flash",
                metadata={"finish_reason": "STOP"},
            )
            mock_get_provider.return_value = mock_provider

            temp_dir = tempfile.mkdtemp()
            try:
                try:
                    result = await tool.execute({"prompt": "", "working_directory_absolute_path": temp_dir})
                except ToolExecutionError as exc:
                    output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
                else:
                    output = json.loads(result[0].text)
            finally:
                shutil.rmtree(temp_dir, ignore_errors=True)
            assert output["status"] != "resend_prompt"

    @pytest.mark.asyncio
    async def test_prompt_file_read_error(self):
        """Test handling when prompt.txt can't be read."""
        from tests.mock_helpers import create_mock_provider

        tool = ChatTool()
        bad_file = "/nonexistent/prompt.txt"

        with (
            patch.object(tool, "get_model_provider") as mock_get_provider,
            patch("utils.model_context.ModelContext") as mock_model_context_class,
        ):

            mock_provider = create_mock_provider(model_name="gemini-2.5-flash", context_window=1_048_576)
            mock_provider.generate_content.return_value.content = "Success"
            mock_get_provider.return_value = mock_provider

            # Mock ModelContext to avoid the comparison issue
            from utils.model_context import TokenAllocation

            mock_model_context = MagicMock()
            mock_model_context.model_name = "gemini-2.5-flash"
            mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
                total_tokens=1_048_576,
                content_tokens=838_861,
                response_tokens=209_715,
                file_tokens=335_544,
                history_tokens=335_544,
            )
            mock_model_context_class.return_value = mock_model_context

            # Should continue with empty prompt when file can't be read
            temp_dir = tempfile.mkdtemp()
            try:
                try:
                    result = await tool.execute(
                        {"prompt": "", "absolute_file_paths": [bad_file], "working_directory_absolute_path": temp_dir}
                    )
                except ToolExecutionError as exc:
                    output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
                else:
                    output = json.loads(result[0].text)
            finally:
                shutil.rmtree(temp_dir, ignore_errors=True)
            assert output["status"] != "resend_prompt"

    @pytest.mark.asyncio
    async def test_large_file_context_does_not_trigger_mcp_prompt_limit(self, tmp_path):
        """Large context files should not be blocked by MCP prompt limit enforcement."""
        from tests.mock_helpers import create_mock_provider
        from utils.model_context import TokenAllocation

        tool = ChatTool()

        # Create a file significantly larger than MCP_PROMPT_SIZE_LIMIT characters
        large_content = "A" * (MCP_PROMPT_SIZE_LIMIT * 5)
        large_file = tmp_path / "huge_context.txt"
        large_file.write_text(large_content)

        mock_provider = create_mock_provider(model_name="flash")

        class DummyModelContext:
            def __init__(self, provider):
                self.model_name = "flash"
                self._provider = provider
                self.capabilities = provider.get_capabilities("flash")

            @property
            def provider(self):
                return self._provider

            def calculate_token_allocation(self):
                return TokenAllocation(
                    total_tokens=1_048_576,
                    content_tokens=838_861,
                    response_tokens=209_715,
                    file_tokens=335_544,
                    history_tokens=335_544,
                )

        dummy_context = DummyModelContext(mock_provider)

        with patch.object(tool, "get_model_provider", return_value=mock_provider):
            result = await tool.execute(
                {
                    "prompt": "Summarize the design decisions",
                    "absolute_file_paths": [str(large_file)],
                    "model": "flash",
                    "working_directory_absolute_path": str(tmp_path),
                    "_model_context": dummy_context,
                }
            )

        output = json.loads(result[0].text)
        assert output["status"] != "resend_prompt"

    @pytest.mark.asyncio
    async def test_mcp_boundary_with_large_internal_context(self):
        """
        Critical test: Ensure MCP_PROMPT_SIZE_LIMIT only applies to user input (MCP boundary),
        NOT to internal context like conversation history, system prompts, or file content.

        This test verifies that even if our internal prompt (with system prompts, history, etc.)
        exceeds MCP_PROMPT_SIZE_LIMIT, it should still work as long as the user's input is small.
        """

        tool = ChatTool()

        # Small user input that should pass MCP boundary check
        small_user_prompt = "What is the weather like?"

        # Mock a huge conversation history that would exceed MCP limits if incorrectly checked
        huge_history = "x" * (MCP_PROMPT_SIZE_LIMIT * 2)  # 100K chars = way over 50K limit

        temp_dir = tempfile.mkdtemp()
        original_prepare_prompt = tool.prepare_prompt

        try:
            with (
                patch.object(tool, "get_model_provider") as mock_get_provider,
                patch("utils.model_context.ModelContext") as mock_model_context_class,
            ):
                from tests.mock_helpers import create_mock_provider
                from utils.model_context import TokenAllocation

                mock_provider = create_mock_provider(model_name="flash")
                mock_get_provider.return_value = mock_provider

                mock_model_context = MagicMock()
                mock_model_context.model_name = "flash"
                mock_model_context.provider = mock_provider
                mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
                    total_tokens=1_048_576,
                    content_tokens=838_861,
                    response_tokens=209_715,
                    file_tokens=335_544,
                    history_tokens=335_544,
                )
                mock_model_context_class.return_value = mock_model_context

                async def mock_prepare_prompt(request):
                    normal_prompt = await original_prepare_prompt(request)
                    huge_internal_prompt = f"{normal_prompt}\n\n=== HUGE INTERNAL CONTEXT ===\n{huge_history}"
                    assert len(huge_internal_prompt) > MCP_PROMPT_SIZE_LIMIT
                    return huge_internal_prompt

                tool.prepare_prompt = mock_prepare_prompt

                result = await tool.execute(
                    {"prompt": small_user_prompt, "model": "flash", "working_directory_absolute_path": temp_dir}
                )
                output = json.loads(result[0].text)

                assert output["status"] != "resend_prompt"

                mock_provider.generate_content.assert_called_once()
                call_kwargs = mock_provider.generate_content.call_args[1]
                actual_prompt = call_kwargs.get("prompt")

                assert len(actual_prompt) > MCP_PROMPT_SIZE_LIMIT
                assert huge_history in actual_prompt
                assert small_user_prompt in actual_prompt
        finally:
            tool.prepare_prompt = original_prepare_prompt
            shutil.rmtree(temp_dir, ignore_errors=True)

    @pytest.mark.asyncio
    async def test_mcp_boundary_vs_internal_processing_distinction(self):
        """
        Test that clearly demonstrates the distinction between:
        1. MCP transport boundary (user input - SHOULD be limited)
        2. Internal processing (system prompts, files, history - should NOT be limited)
        """
        tool = ChatTool()

        # Test case 1: Large user input should fail at MCP boundary
        large_user_input = "x" * (MCP_PROMPT_SIZE_LIMIT + 1000)
        temp_dir = tempfile.mkdtemp()
        try:
            try:
                result = await tool.execute(
                    {"prompt": large_user_input, "model": "flash", "working_directory_absolute_path": temp_dir}
                )
            except ToolExecutionError as exc:
                output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
            else:
                output = json.loads(result[0].text)

            assert output["status"] == "resend_prompt"  # Should fail
            assert "too large for MCP's token limits" in output["content"]

            # Test case 2: Small user input should succeed even with huge internal processing
            small_user_input = "Hello"

            try:
                result = await tool.execute(
                    {
                        "prompt": small_user_input,
                        "model": "gemini-2.5-flash",
                        "working_directory_absolute_path": temp_dir,
                    }
                )
            except ToolExecutionError as exc:
                output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
            else:
                output = json.loads(result[0].text)

            # The test will fail with dummy API keys, which is expected behavior
            # We're mainly testing that the tool processes small prompts correctly without size errors
            assert output["status"] != "resend_prompt"
        finally:
            shutil.rmtree(temp_dir, ignore_errors=True)

    @pytest.mark.asyncio
    async def test_continuation_with_huge_conversation_history(self):
        """
        Test that continuation calls with huge conversation history work correctly.
        This simulates the exact scenario where conversation history builds up and exceeds
        MCP_PROMPT_SIZE_LIMIT but should still work since history is internal processing.
        """
        tool = ChatTool()

        # Small user input for continuation
        small_continuation_prompt = "Continue the discussion"

        # Mock huge conversation history (simulates many turns of conversation)
        # Calculate repetitions needed to exceed MCP_PROMPT_SIZE_LIMIT
        base_text = "=== CONVERSATION HISTORY ===\n"
        repeat_text = "Previous message content\n"
        # Add buffer to ensure we exceed the limit
        target_size = MCP_PROMPT_SIZE_LIMIT + 1000
        available_space = target_size - len(base_text)
        repetitions_needed = (available_space // len(repeat_text)) + 1

        huge_conversation_history = base_text + (repeat_text * repetitions_needed)

        # Ensure the history exceeds MCP limits
        assert len(huge_conversation_history) > MCP_PROMPT_SIZE_LIMIT

        temp_dir = tempfile.mkdtemp()

        with (
            patch.object(tool, "get_model_provider") as mock_get_provider,
            patch("utils.model_context.ModelContext") as mock_model_context_class,
        ):
            from tests.mock_helpers import create_mock_provider

            mock_provider = create_mock_provider(model_name="flash")
            mock_provider.generate_content.return_value.content = "Continuing our conversation..."
            mock_get_provider.return_value = mock_provider

            # Mock ModelContext to avoid the comparison issue
            from utils.model_context import TokenAllocation

            mock_model_context = MagicMock()
            mock_model_context.model_name = "flash"
            mock_model_context.provider = mock_provider
            mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
                total_tokens=1_048_576,
                content_tokens=838_861,
                response_tokens=209_715,
                file_tokens=335_544,
                history_tokens=335_544,
            )
            mock_model_context_class.return_value = mock_model_context

            # Simulate continuation by having the request contain embedded conversation history
            # This mimics what server.py does when it embeds conversation history
            request_with_history = {
                "prompt": f"{huge_conversation_history}\n\n=== CURRENT REQUEST ===\n{small_continuation_prompt}",
                "model": "flash",
                "continuation_id": "test_thread_123",
                "working_directory_absolute_path": temp_dir,
            }

            # Mock the conversation history embedding to simulate server.py behavior
            original_execute = tool.__class__.execute

            async def mock_execute_with_history(self, arguments):
                # Check if this has continuation_id (simulating server.py logic)
                if arguments.get("continuation_id"):
                    # Simulate the case where conversation history is already embedded in prompt
                    # by server.py before calling the tool
                    field_value = arguments.get("prompt", "")
                    if "=== CONVERSATION HISTORY ===" in field_value:
                        # Set the flag that history is embedded
                        self._has_embedded_history = True

                        # The prompt field contains both history AND user input
                        # But we should only check the user input part for MCP boundary
                        # (This is what our fix ensures happens in prepare_prompt)

                # Call original execute
                return await original_execute(self, arguments)

            tool.__class__.execute = mock_execute_with_history

            try:
                # This should succeed because:
                # 1. The actual user input is small (passes MCP boundary check)
                # 2. The huge conversation history is internal processing (not subject to MCP limits)
                result = await tool.execute(request_with_history)
                output = json.loads(result[0].text)

                # Should succeed even though total prompt with history is huge
                assert output["status"] != "resend_prompt"
                assert "Continuing our conversation" in output["content"]

                # Verify the model was called with the complete prompt (including huge history)
                mock_provider.generate_content.assert_called_once()
                call_kwargs = mock_provider.generate_content.call_args[1]
                final_prompt = call_kwargs.get("prompt")

                # The final prompt should contain both history and user input
                assert huge_conversation_history in final_prompt
                assert small_continuation_prompt in final_prompt
                # And it should be huge (proving we don't limit internal processing)
                assert len(final_prompt) > MCP_PROMPT_SIZE_LIMIT

            finally:
                # Restore original execute method
                tool.__class__.execute = original_execute
                shutil.rmtree(temp_dir, ignore_errors=True)


if __name__ == "__main__":
    pytest.main([__file__, "-v"])

```

--------------------------------------------------------------------------------
/tools/refactor.py:
--------------------------------------------------------------------------------

```python
"""
Refactor tool - Step-by-step refactoring analysis with expert validation

This tool provides a structured workflow for comprehensive code refactoring analysis.
It guides CLI agent through systematic investigation steps with forced pauses between each step
to ensure thorough code examination, refactoring opportunity identification, and quality
assessment before proceeding. The tool supports complex refactoring scenarios including
code smell detection, decomposition planning, modernization opportunities, and organization improvements.

Key features:
- Step-by-step refactoring investigation workflow with progress tracking
- Context-aware file embedding (references during investigation, full content for analysis)
- Automatic refactoring opportunity tracking with type and severity classification
- Expert analysis integration with external models
- Support for focused refactoring types (codesmells, decompose, modernize, organization)
- Confidence-based workflow optimization with refactor completion tracking
"""

import logging
from typing import TYPE_CHECKING, Any, Literal, Optional

from pydantic import Field, model_validator

if TYPE_CHECKING:
    from tools.models import ToolModelCategory

from config import TEMPERATURE_ANALYTICAL
from systemprompts import REFACTOR_PROMPT
from tools.shared.base_models import WorkflowRequest

from .workflow.base import WorkflowTool

logger = logging.getLogger(__name__)

# Tool-specific field descriptions for refactor tool
REFACTOR_FIELD_DESCRIPTIONS = {
    "step": (
        "The refactoring plan. Step 1: State strategy. Later steps: Report findings. "
        "CRITICAL: Examine code for smells, and opportunities for decomposition, modernization, and organization. "
        "Use 'relevant_files' for code. FORBIDDEN: Large code snippets."
    ),
    "step_number": (
        "The index of the current step in the refactoring investigation sequence, beginning at 1. Each step should "
        "build upon or revise the previous one."
    ),
    "total_steps": (
        "Your current estimate for how many steps will be needed to complete the refactoring investigation. "
        "Adjust as new opportunities emerge."
    ),
    "next_step_required": (
        "Set to true if you plan to continue the investigation with another step. False means you believe the "
        "refactoring analysis is complete and ready for expert validation."
    ),
    "findings": (
        "Summary of discoveries from this step, including code smells and opportunities for decomposition, modernization, or organization. "
        "Document both strengths and weaknesses. In later steps, confirm or update past findings."
    ),
    "files_checked": (
        "List all files examined (absolute paths). Include even ruled-out files to track exploration path."
    ),
    "relevant_files": (
        "Subset of files_checked with code requiring refactoring (absolute paths). Include files with "
        "code smells, decomposition needs, or improvement opportunities."
    ),
    "relevant_context": (
        "List methods/functions central to refactoring opportunities, in 'ClassName.methodName' or 'functionName' format. "
        "Prioritize those with code smells or needing improvement."
    ),
    "issues_found": (
        "Refactoring opportunities as dictionaries with 'severity' (critical/high/medium/low), "
        "'type' (codesmells/decompose/modernize/organization), and 'description'. "
        "Include all improvement opportunities found."
    ),
    "confidence": (
        "Your confidence in refactoring analysis: exploring (starting), incomplete (significant work remaining), "
        "partial (some opportunities found, more analysis needed), complete (comprehensive analysis finished, "
        "all major opportunities identified). "
        "WARNING: Use 'complete' ONLY when fully analyzed and can provide recommendations without expert help. "
        "'complete' PREVENTS expert validation. Use 'partial' for large files or uncertain analysis."
    ),
    "images": (
        "Optional list of absolute paths to architecture diagrams, UI mockups, design documents, or visual references "
        "that help with refactoring context. Only include if they materially assist understanding or assessment."
    ),
    "refactor_type": "Type of refactoring analysis to perform (codesmells, decompose, modernize, organization)",
    "focus_areas": "Specific areas to focus on (e.g., 'performance', 'readability', 'maintainability', 'security')",
    "style_guide_examples": (
        "Optional existing code files to use as style/pattern reference (must be FULL absolute paths to real files / "
        "folders - DO NOT SHORTEN). These files represent the target coding style and patterns for the project."
    ),
}


class RefactorRequest(WorkflowRequest):
    """Request model for refactor workflow investigation steps"""

    # Required fields for each investigation step
    step: str = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["step"])
    step_number: int = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["step_number"])
    total_steps: int = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["total_steps"])
    next_step_required: bool = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["next_step_required"])

    # Investigation tracking fields
    findings: str = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["findings"])
    files_checked: list[str] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["files_checked"])
    relevant_files: list[str] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["relevant_files"])
    relevant_context: list[str] = Field(
        default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["relevant_context"]
    )
    issues_found: list[dict] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["issues_found"])
    confidence: Optional[Literal["exploring", "incomplete", "partial", "complete"]] = Field(
        "incomplete", description=REFACTOR_FIELD_DESCRIPTIONS["confidence"]
    )

    # Optional images for visual context
    images: Optional[list[str]] = Field(default=None, description=REFACTOR_FIELD_DESCRIPTIONS["images"])

    # Refactor-specific fields (only used in step 1 to initialize)
    refactor_type: Optional[Literal["codesmells", "decompose", "modernize", "organization"]] = Field(
        "codesmells", description=REFACTOR_FIELD_DESCRIPTIONS["refactor_type"]
    )
    focus_areas: Optional[list[str]] = Field(None, description=REFACTOR_FIELD_DESCRIPTIONS["focus_areas"])
    style_guide_examples: Optional[list[str]] = Field(
        None, description=REFACTOR_FIELD_DESCRIPTIONS["style_guide_examples"]
    )

    # Override inherited fields to exclude them from schema (except model which needs to be available)
    temperature: Optional[float] = Field(default=None, exclude=True)
    thinking_mode: Optional[str] = Field(default=None, exclude=True)

    @model_validator(mode="after")
    def validate_step_one_requirements(self):
        """Ensure step 1 has required relevant_files field."""
        if self.step_number == 1 and not self.relevant_files:
            raise ValueError(
                "Step 1 requires 'relevant_files' field to specify code files or directories to analyze for refactoring"
            )
        return self


class RefactorTool(WorkflowTool):
    """
    Refactor tool for step-by-step refactoring analysis and expert validation.

    This tool implements a structured refactoring workflow that guides users through
    methodical investigation steps, ensuring thorough code examination, refactoring opportunity
    identification, and improvement assessment before reaching conclusions. It supports complex
    refactoring scenarios including code smell detection, decomposition planning, modernization
    opportunities, and organization improvements.
    """

    def __init__(self):
        super().__init__()
        self.initial_request = None
        self.refactor_config = {}

    def get_name(self) -> str:
        return "refactor"

    def get_description(self) -> str:
        return (
            "Analyzes code for refactoring opportunities with systematic investigation. "
            "Use for code smell detection, decomposition planning, modernization, and maintainability improvements. "
            "Guides through structured analysis with expert validation."
        )

    def get_system_prompt(self) -> str:
        return REFACTOR_PROMPT

    def get_default_temperature(self) -> float:
        return TEMPERATURE_ANALYTICAL

    def get_model_category(self) -> "ToolModelCategory":
        """Refactor workflow requires thorough analysis and reasoning"""
        from tools.models import ToolModelCategory

        return ToolModelCategory.EXTENDED_REASONING

    def get_workflow_request_model(self):
        """Return the refactor workflow-specific request model."""
        return RefactorRequest

    def get_input_schema(self) -> dict[str, Any]:
        """Generate input schema using WorkflowSchemaBuilder with refactor-specific overrides."""
        from .workflow.schema_builders import WorkflowSchemaBuilder

        # Refactor workflow-specific field overrides
        refactor_field_overrides = {
            "step": {
                "type": "string",
                "description": REFACTOR_FIELD_DESCRIPTIONS["step"],
            },
            "step_number": {
                "type": "integer",
                "minimum": 1,
                "description": REFACTOR_FIELD_DESCRIPTIONS["step_number"],
            },
            "total_steps": {
                "type": "integer",
                "minimum": 1,
                "description": REFACTOR_FIELD_DESCRIPTIONS["total_steps"],
            },
            "next_step_required": {
                "type": "boolean",
                "description": REFACTOR_FIELD_DESCRIPTIONS["next_step_required"],
            },
            "findings": {
                "type": "string",
                "description": REFACTOR_FIELD_DESCRIPTIONS["findings"],
            },
            "files_checked": {
                "type": "array",
                "items": {"type": "string"},
                "description": REFACTOR_FIELD_DESCRIPTIONS["files_checked"],
            },
            "relevant_files": {
                "type": "array",
                "items": {"type": "string"},
                "description": REFACTOR_FIELD_DESCRIPTIONS["relevant_files"],
            },
            "confidence": {
                "type": "string",
                "enum": ["exploring", "incomplete", "partial", "complete"],
                "default": "incomplete",
                "description": REFACTOR_FIELD_DESCRIPTIONS["confidence"],
            },
            "issues_found": {
                "type": "array",
                "items": {"type": "object"},
                "description": REFACTOR_FIELD_DESCRIPTIONS["issues_found"],
            },
            "images": {
                "type": "array",
                "items": {"type": "string"},
                "description": REFACTOR_FIELD_DESCRIPTIONS["images"],
            },
            # Refactor-specific fields (for step 1)
            # Note: Use relevant_files field instead of files for consistency
            "refactor_type": {
                "type": "string",
                "enum": ["codesmells", "decompose", "modernize", "organization"],
                "default": "codesmells",
                "description": REFACTOR_FIELD_DESCRIPTIONS["refactor_type"],
            },
            "focus_areas": {
                "type": "array",
                "items": {"type": "string"},
                "description": REFACTOR_FIELD_DESCRIPTIONS["focus_areas"],
            },
            "style_guide_examples": {
                "type": "array",
                "items": {"type": "string"},
                "description": REFACTOR_FIELD_DESCRIPTIONS["style_guide_examples"],
            },
        }

        # Use WorkflowSchemaBuilder with refactor-specific tool fields
        return WorkflowSchemaBuilder.build_schema(
            tool_specific_fields=refactor_field_overrides,
            model_field_schema=self.get_model_field_schema(),
            auto_mode=self.is_effective_auto_mode(),
            tool_name=self.get_name(),
        )

    def get_required_actions(
        self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
    ) -> list[str]:
        """Define required actions for each investigation phase."""
        if step_number == 1:
            # Initial refactoring investigation tasks
            return [
                "Read and understand the code files specified for refactoring analysis",
                "Examine the overall structure, architecture, and design patterns used",
                "Identify potential code smells: long methods, large classes, duplicate code, complex conditionals",
                "Look for decomposition opportunities: oversized components that could be broken down",
                "Check for modernization opportunities: outdated patterns, deprecated features, newer language constructs",
                "Assess organization: logical grouping, file structure, naming conventions, module boundaries",
                "Document specific refactoring opportunities with file locations and line numbers",
            ]
        elif confidence in ["exploring", "incomplete"]:
            # Need deeper investigation
            return [
                "Examine specific code sections you've identified as needing refactoring",
                "Analyze code smells in detail: complexity, coupling, cohesion issues",
                "Investigate decomposition opportunities: identify natural breaking points for large components",
                "Look for modernization possibilities: language features, patterns, libraries that could improve the code",
                "Check organization issues: related functionality that could be better grouped or structured",
                "Trace dependencies and relationships between components to understand refactoring impact",
                "Prioritize refactoring opportunities by impact and effort required",
            ]
        elif confidence == "partial":
            # Close to completion - need final verification
            return [
                "Verify all identified refactoring opportunities have been properly documented with locations",
                "Check for any missed opportunities in areas not yet thoroughly examined",
                "Confirm that refactoring suggestions align with the specified refactor_type and focus_areas",
                "Ensure refactoring opportunities are prioritized by severity and impact",
                "Validate that proposed changes would genuinely improve code quality without breaking functionality",
                "Double-check that all relevant files and code elements are captured in your analysis",
            ]
        else:
            # General investigation needed
            return [
                "Continue examining the codebase for additional refactoring opportunities",
                "Gather more evidence using appropriate code analysis techniques",
                "Test your assumptions about code quality and improvement possibilities",
                "Look for patterns that confirm or refute your current refactoring assessment",
                "Focus on areas that haven't been thoroughly examined for refactoring potential",
            ]

    def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
        """
        Decide when to call external model based on investigation completeness.

        Don't call expert analysis if the CLI agent has certain confidence and complete refactoring - trust their judgment.
        """
        # Check if user requested to skip assistant model
        if request and not self.get_request_use_assistant_model(request):
            return False

        # Check if refactoring work is complete
        if request and request.confidence == "complete":
            return False

        # Check if we have meaningful investigation data
        return (
            len(consolidated_findings.relevant_files) > 0
            or len(consolidated_findings.findings) >= 2
            or len(consolidated_findings.issues_found) > 0
        )

    def prepare_expert_analysis_context(self, consolidated_findings) -> str:
        """Prepare context for external model call for final refactoring validation."""
        context_parts = [
            f"=== REFACTORING ANALYSIS REQUEST ===\\n{self.initial_request or 'Refactoring workflow initiated'}\\n=== END REQUEST ==="
        ]

        # Add investigation summary
        investigation_summary = self._build_refactoring_summary(consolidated_findings)
        context_parts.append(
            f"\\n=== AGENT'S REFACTORING INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
        )

        # Add refactor configuration context if available
        if self.refactor_config:
            config_text = "\\n".join(f"- {key}: {value}" for key, value in self.refactor_config.items() if value)
            context_parts.append(f"\\n=== REFACTOR CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")

        # Add relevant code elements if available
        if consolidated_findings.relevant_context:
            methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
            context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")

        # Add refactoring opportunities found if available
        if consolidated_findings.issues_found:
            opportunities_text = "\\n".join(
                f"[{issue.get('severity', 'unknown').upper()}] {issue.get('type', 'unknown').upper()}: {issue.get('description', 'No description')}"
                for issue in consolidated_findings.issues_found
            )
            context_parts.append(
                f"\\n=== REFACTORING OPPORTUNITIES ===\\n{opportunities_text}\\n=== END OPPORTUNITIES ==="
            )

        # Add assessment evolution if available
        if consolidated_findings.hypotheses:
            assessments_text = "\\n".join(
                f"Step {h['step']} ({h['confidence']} confidence): {h['hypothesis']}"
                for h in consolidated_findings.hypotheses
            )
            context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")

        # Add images if available
        if consolidated_findings.images:
            images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
            context_parts.append(
                f"\\n=== VISUAL REFACTORING INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
            )

        return "\\n".join(context_parts)

    def _build_refactoring_summary(self, consolidated_findings) -> str:
        """Prepare a comprehensive summary of the refactoring investigation."""
        summary_parts = [
            "=== SYSTEMATIC REFACTORING INVESTIGATION SUMMARY ===",
            f"Total steps: {len(consolidated_findings.findings)}",
            f"Files examined: {len(consolidated_findings.files_checked)}",
            f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
            f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
            f"Refactoring opportunities identified: {len(consolidated_findings.issues_found)}",
            "",
            "=== INVESTIGATION PROGRESSION ===",
        ]

        for finding in consolidated_findings.findings:
            summary_parts.append(finding)

        return "\\n".join(summary_parts)

    def should_include_files_in_expert_prompt(self) -> bool:
        """Include files in expert analysis for comprehensive refactoring validation."""
        return True

    def should_embed_system_prompt(self) -> bool:
        """Embed system prompt in expert analysis for proper context."""
        return True

    def get_expert_thinking_mode(self) -> str:
        """Use high thinking mode for thorough refactoring analysis."""
        return "high"

    def get_expert_analysis_instruction(self) -> str:
        """Get specific instruction for refactoring expert analysis."""
        return (
            "Please provide comprehensive refactoring analysis based on the investigation findings. "
            "Focus on validating the identified opportunities, ensuring completeness of the analysis, "
            "and providing final recommendations for refactoring implementation, following the structured "
            "format specified in the system prompt."
        )

    # Hook method overrides for refactor-specific behavior

    def prepare_step_data(self, request) -> dict:
        """
        Map refactor workflow-specific fields for internal processing.
        """
        step_data = {
            "step": request.step,
            "step_number": request.step_number,
            "findings": request.findings,
            "files_checked": request.files_checked,
            "relevant_files": request.relevant_files,
            "relevant_context": request.relevant_context,
            "issues_found": request.issues_found,
            "confidence": request.confidence,
            "hypothesis": request.findings,  # Map findings to hypothesis for compatibility
            "images": request.images or [],
        }
        return step_data

    def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
        """
        Refactor workflow skips expert analysis when the CLI agent has "complete" confidence.
        """
        return request.confidence == "complete" and not request.next_step_required

    def store_initial_issue(self, step_description: str):
        """Store initial request for expert analysis."""
        self.initial_request = step_description

    # Inheritance hook methods for refactor-specific behavior

    # Override inheritance hooks for refactor-specific behavior

    def get_completion_status(self) -> str:
        """Refactor tools use refactor-specific status."""
        return "refactoring_analysis_complete_ready_for_implementation"

    def get_completion_data_key(self) -> str:
        """Refactor uses 'complete_refactoring' key."""
        return "complete_refactoring"

    def get_final_analysis_from_request(self, request):
        """Refactor tools use 'findings' field."""
        return request.findings

    def get_confidence_level(self, request) -> str:
        """Refactor tools use 'complete' for high confidence."""
        return "complete"

    def get_completion_message(self) -> str:
        """Refactor-specific completion message."""
        return (
            "Refactoring analysis complete with COMPLETE confidence. You have identified all significant "
            "refactoring opportunities and provided comprehensive analysis. MANDATORY: Present the user with "
            "the complete refactoring results organized by type and severity, and IMMEDIATELY proceed with "
            "implementing the highest priority refactoring opportunities or provide specific guidance for "
            "improvements. Focus on actionable refactoring steps."
        )

    def get_skip_reason(self) -> str:
        """Refactor-specific skip reason."""
        return "Completed comprehensive refactoring analysis with full confidence locally"

    def get_skip_expert_analysis_status(self) -> str:
        """Refactor-specific expert analysis skip status."""
        return "skipped_due_to_complete_refactoring_confidence"

    def prepare_work_summary(self) -> str:
        """Refactor-specific work summary."""
        return self._build_refactoring_summary(self.consolidated_findings)

    def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
        """
        Refactor-specific completion message.

        Args:
            expert_analysis_used: True if expert analysis was successfully executed
        """
        base_message = (
            "REFACTORING ANALYSIS IS COMPLETE. You MUST now summarize and present ALL refactoring opportunities "
            "organized by type (codesmells → decompose → modernize → organization) and severity (Critical → High → "
            "Medium → Low), specific code locations with line numbers, and exact recommendations for improvement. "
            "Clearly prioritize the top 3 refactoring opportunities that need immediate attention. Provide concrete, "
            "actionable guidance for each opportunity—make it easy for a developer to understand exactly what needs "
            "to be refactored and how to implement the improvements."
        )

        # Add expert analysis guidance only when expert analysis was actually used
        if expert_analysis_used:
            expert_guidance = self.get_expert_analysis_guidance()
            if expert_guidance:
                return f"{base_message}\n\n{expert_guidance}"

        return base_message

    def get_expert_analysis_guidance(self) -> str:
        """
        Get additional guidance for handling expert analysis results in refactor context.

        Returns:
            Additional guidance text for validating and using expert analysis findings
        """
        return (
            "IMPORTANT: Expert refactoring analysis has been provided above. You MUST review "
            "the expert's architectural insights and refactoring recommendations. Consider whether "
            "the expert's suggestions align with the codebase's evolution trajectory and current "
            "team priorities. Pay special attention to any breaking changes, migration complexity, "
            "or performance implications highlighted by the expert. Present a balanced view that "
            "considers both immediate benefits and long-term maintainability."
        )

    def get_step_guidance_message(self, request) -> str:
        """
        Refactor-specific step guidance with detailed investigation instructions.
        """
        step_guidance = self.get_refactor_step_guidance(request.step_number, request.confidence, request)
        return step_guidance["next_steps"]

    def get_refactor_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
        """
        Provide step-specific guidance for refactor workflow.
        """
        # Generate the next steps instruction based on required actions
        required_actions = self.get_required_actions(step_number, confidence, request.findings, request.total_steps)

        if step_number == 1:
            next_steps = (
                f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
                f"the code files thoroughly for refactoring opportunities using appropriate tools. CRITICAL AWARENESS: "
                f"You need to identify code smells, decomposition opportunities, modernization possibilities, and "
                f"organization improvements across the specified refactor_type. Look for complexity issues, outdated "
                f"patterns, oversized components, and structural problems. Use file reading tools, code analysis, and "
                f"systematic examination to gather comprehensive refactoring information. Only call {self.get_name()} "
                f"again AFTER completing your investigation. When you call {self.get_name()} next time, use "
                f"step_number: {step_number + 1} and report specific files examined, refactoring opportunities found, "
                f"and improvement assessments discovered."
            )
        elif confidence in ["exploring", "incomplete"]:
            next_steps = (
                f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
                f"deeper refactoring analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
                + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
                + f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
                + "completing these refactoring analysis tasks."
            )
        elif confidence == "partial":
            next_steps = (
                f"WAIT! Your refactoring analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
                + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
                + f"\\n\\nREMEMBER: Ensure you have identified all significant refactoring opportunities across all types and "
                f"verified the completeness of your analysis. Document opportunities with specific file references and "
                f"line numbers where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
            )
        else:
            next_steps = (
                f"PAUSE REFACTORING ANALYSIS. Before calling {self.get_name()} step {step_number + 1}, you MUST examine more code thoroughly. "
                + "Required: "
                + ", ".join(required_actions[:2])
                + ". "
                + f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
                f"NEW evidence from actual refactoring analysis, not just theories. NO recursive {self.get_name()} calls "
                f"without investigation work!"
            )

        return {"next_steps": next_steps}

    def customize_workflow_response(self, response_data: dict, request) -> dict:
        """
        Customize response to match refactor workflow format.
        """
        # Store initial request on first step
        if request.step_number == 1:
            self.initial_request = request.step
            # Store refactor configuration for expert analysis
            if request.relevant_files:
                self.refactor_config = {
                    "relevant_files": request.relevant_files,
                    "refactor_type": request.refactor_type,
                    "focus_areas": request.focus_areas,
                    "style_guide_examples": request.style_guide_examples,
                }

        # Convert generic status names to refactor-specific ones
        tool_name = self.get_name()
        status_mapping = {
            f"{tool_name}_in_progress": "refactoring_analysis_in_progress",
            f"pause_for_{tool_name}": "pause_for_refactoring_analysis",
            f"{tool_name}_required": "refactoring_analysis_required",
            f"{tool_name}_complete": "refactoring_analysis_complete",
        }

        if response_data["status"] in status_mapping:
            response_data["status"] = status_mapping[response_data["status"]]

        # Rename status field to match refactor workflow
        if f"{tool_name}_status" in response_data:
            response_data["refactoring_status"] = response_data.pop(f"{tool_name}_status")
            # Add refactor-specific status fields
            refactor_types = {}
            for issue in self.consolidated_findings.issues_found:
                issue_type = issue.get("type", "unknown")
                if issue_type not in refactor_types:
                    refactor_types[issue_type] = 0
                refactor_types[issue_type] += 1
            response_data["refactoring_status"]["opportunities_by_type"] = refactor_types
            response_data["refactoring_status"]["refactor_confidence"] = request.confidence

        # Map complete_refactor to complete_refactoring
        if f"complete_{tool_name}" in response_data:
            response_data["complete_refactoring"] = response_data.pop(f"complete_{tool_name}")

        # Map the completion flag to match refactor workflow
        if f"{tool_name}_complete" in response_data:
            response_data["refactoring_complete"] = response_data.pop(f"{tool_name}_complete")

        return response_data

    # Required abstract methods from BaseTool
    def get_request_model(self):
        """Return the refactor workflow-specific request model."""
        return RefactorRequest

    async def prepare_prompt(self, request) -> str:
        """Not used - workflow tools use execute_workflow()."""
        return ""  # Workflow tools use execute_workflow() directly

```

--------------------------------------------------------------------------------
/utils/file_utils.py:
--------------------------------------------------------------------------------

```python
"""
File reading utilities with directory support and token management

This module provides secure file access functionality for the MCP server.
It implements critical security measures to prevent unauthorized file access
and manages token limits to ensure efficient API usage.

Key Features:
- Path validation and sandboxing to prevent directory traversal attacks
- Support for both individual files and recursive directory reading
- Token counting and management to stay within API limits
- Automatic file type detection and filtering
- Comprehensive error handling with informative messages

Security Model:
- All file access is restricted to PROJECT_ROOT and its subdirectories
- Absolute paths are required to prevent ambiguity
- Symbolic links are resolved to ensure they stay within bounds

CONVERSATION MEMORY INTEGRATION:
This module works with the conversation memory system to support efficient
multi-turn file handling:

1. DEDUPLICATION SUPPORT:
   - File reading functions are called by conversation-aware tools
   - Supports newest-first file prioritization by providing accurate token estimation
   - Enables efficient file content caching and token budget management

2. TOKEN BUDGET OPTIMIZATION:
   - Provides accurate token estimation for file content before reading
   - Supports the dual prioritization strategy by enabling precise budget calculations
   - Enables tools to make informed decisions about which files to include

3. CROSS-TOOL FILE PERSISTENCE:
   - File reading results are used across different tools in conversation chains
   - Consistent file access patterns support conversation continuation scenarios
   - Error handling preserves conversation flow when files become unavailable
"""

import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

from .file_types import BINARY_EXTENSIONS, CODE_EXTENSIONS, IMAGE_EXTENSIONS, TEXT_EXTENSIONS
from .security_config import EXCLUDED_DIRS, is_dangerous_path
from .token_utils import DEFAULT_CONTEXT_WINDOW, estimate_tokens


def _is_builtin_custom_models_config(path_str: str) -> bool:
    """
    Check if path points to the server's built-in custom_models.json config file.

    This only matches the server's internal config, not user-specified CUSTOM_MODELS_CONFIG_PATH.
    We identify the built-in config by checking if it resolves to the server's conf directory.

    Args:
        path_str: Path to check

    Returns:
        True if this is the server's built-in custom_models.json config file
    """
    try:
        path = Path(path_str)

        # Get the server root by going up from this file: utils/file_utils.py -> server_root
        server_root = Path(__file__).parent.parent
        builtin_config = server_root / "conf" / "custom_models.json"

        # Check if the path resolves to the same file as our built-in config
        # This handles both relative and absolute paths to the same file
        return path.resolve() == builtin_config.resolve()

    except Exception:
        # If path resolution fails, it's not our built-in config
        return False


logger = logging.getLogger(__name__)


def is_mcp_directory(path: Path) -> bool:
    """
    Check if a directory is the MCP server's own directory.

    This prevents the MCP from including its own code when scanning projects
    where the MCP has been cloned as a subdirectory.

    Args:
        path: Directory path to check

    Returns:
        True if this is the MCP server directory or a subdirectory
    """
    if not path.is_dir():
        return False

    # Get the directory where the MCP server is running from
    # __file__ is utils/file_utils.py, so parent.parent is the MCP root
    mcp_server_dir = Path(__file__).parent.parent.resolve()

    # Check if the given path is the MCP server directory or a subdirectory
    try:
        path.resolve().relative_to(mcp_server_dir)
        logger.info(f"Detected MCP server directory at {path}, will exclude from scanning")
        return True
    except ValueError:
        # Not a subdirectory of MCP server
        return False


def get_user_home_directory() -> Optional[Path]:
    """
    Get the user's home directory.

    Returns:
        User's home directory path
    """
    return Path.home()


def is_home_directory_root(path: Path) -> bool:
    """
    Check if the given path is the user's home directory root.

    This prevents scanning the entire home directory which could include
    sensitive data and non-project files.

    Args:
        path: Directory path to check

    Returns:
        True if this is the home directory root
    """
    user_home = get_user_home_directory()
    if not user_home:
        return False

    try:
        resolved_path = path.resolve()
        resolved_home = user_home.resolve()

        # Check if this is exactly the home directory
        if resolved_path == resolved_home:
            logger.warning(
                f"Attempted to scan user home directory root: {path}. Please specify a subdirectory instead."
            )
            return True

        # Also check common home directory patterns
        path_str = str(resolved_path).lower()
        home_patterns = [
            "/users/",  # macOS
            "/home/",  # Linux
            "c:\\users\\",  # Windows
            "c:/users/",  # Windows with forward slashes
        ]

        for pattern in home_patterns:
            if pattern in path_str:
                # Extract the user directory path
                # e.g., /Users/fahad or /home/username
                parts = path_str.split(pattern)
                if len(parts) > 1:
                    # Get the part after the pattern
                    after_pattern = parts[1]
                    # Check if we're at the user's root (no subdirectories)
                    if "/" not in after_pattern and "\\" not in after_pattern:
                        logger.warning(
                            f"Attempted to scan user home directory root: {path}. "
                            f"Please specify a subdirectory instead."
                        )
                        return True

    except Exception as e:
        logger.debug(f"Error checking if path is home directory: {e}")

    return False


def detect_file_type(file_path: str) -> str:
    """
    Detect file type for appropriate processing strategy.

    This function is intended for specific file type handling (e.g., image processing,
    binary file analysis, or enhanced file filtering).

    Args:
        file_path: Path to the file to analyze

    Returns:
        str: "text", "binary", or "image"
    """
    path = Path(file_path)

    # Check extension first (fast)
    extension = path.suffix.lower()
    if extension in TEXT_EXTENSIONS:
        return "text"
    elif extension in IMAGE_EXTENSIONS:
        return "image"
    elif extension in BINARY_EXTENSIONS:
        return "binary"

    # Fallback: check magic bytes for text vs binary
    # This is helpful for files without extensions or unknown extensions
    try:
        with open(path, "rb") as f:
            chunk = f.read(1024)
            # Simple heuristic: if we can decode as UTF-8, likely text
            chunk.decode("utf-8")
            return "text"
    except UnicodeDecodeError:
        return "binary"
    except (FileNotFoundError, PermissionError) as e:
        logger.warning(f"Could not access file {file_path} for type detection: {e}")
        return "unknown"


def should_add_line_numbers(file_path: str, include_line_numbers: Optional[bool] = None) -> bool:
    """
    Determine if line numbers should be added to a file.

    Args:
        file_path: Path to the file
        include_line_numbers: Explicit preference, or None for auto-detection

    Returns:
        bool: True if line numbers should be added
    """
    if include_line_numbers is not None:
        return include_line_numbers

    # Default: DO NOT add line numbers
    # Tools that want line numbers must explicitly request them
    return False


def _normalize_line_endings(content: str) -> str:
    """
    Normalize line endings for consistent line numbering.

    Args:
        content: File content with potentially mixed line endings

    Returns:
        str: Content with normalized LF line endings
    """
    # Normalize all line endings to LF for consistent counting
    return content.replace("\r\n", "\n").replace("\r", "\n")


def _add_line_numbers(content: str) -> str:
    """
    Add line numbers to text content for precise referencing.

    Args:
        content: Text content to number

    Returns:
        str: Content with line numbers in format "  45│ actual code line"
        Supports files up to 99,999 lines with dynamic width allocation
    """
    # Normalize line endings first
    normalized_content = _normalize_line_endings(content)
    lines = normalized_content.split("\n")

    # Dynamic width allocation based on total line count
    # This supports files of any size by computing required width
    total_lines = len(lines)
    width = len(str(total_lines))
    width = max(width, 4)  # Minimum padding for readability

    # Format with dynamic width and clear separator
    numbered_lines = [f"{i + 1:{width}d}│ {line}" for i, line in enumerate(lines)]

    return "\n".join(numbered_lines)


def resolve_and_validate_path(path_str: str) -> Path:
    """
    Resolves and validates a path against security policies.

    This function ensures safe file access by:
    1. Requiring absolute paths (no ambiguity)
    2. Resolving symlinks to prevent deception
    3. Blocking access to dangerous system directories

    Args:
        path_str: Path string (must be absolute)

    Returns:
        Resolved Path object that is safe to access

    Raises:
        ValueError: If path is not absolute or otherwise invalid
        PermissionError: If path is in a dangerous location
    """
    # Step 1: Create a Path object
    user_path = Path(path_str)

    # Step 2: Security Policy - Require absolute paths
    # Relative paths could be interpreted differently depending on working directory
    if not user_path.is_absolute():
        raise ValueError(f"Relative paths are not supported. Please provide an absolute path.\nReceived: {path_str}")

    # Step 3: Resolve the absolute path (follows symlinks, removes .. and .)
    # This is critical for security as it reveals the true destination of symlinks
    resolved_path = user_path.resolve()

    # Step 4: Check against dangerous paths
    if is_dangerous_path(resolved_path):
        logger.warning(f"Access denied - dangerous path: {resolved_path}")
        raise PermissionError(f"Access to system directory denied: {path_str}")

    # Step 5: Check if it's the home directory root
    if is_home_directory_root(resolved_path):
        raise PermissionError(
            f"Cannot scan entire home directory: {path_str}\n" f"Please specify a subdirectory within your home folder."
        )

    return resolved_path


def expand_paths(paths: list[str], extensions: Optional[set[str]] = None) -> list[str]:
    """
    Expand paths to individual files, handling both files and directories.

    This function recursively walks directories to find all matching files.
    It automatically filters out hidden files and common non-code directories
    like __pycache__ to avoid including generated or system files.

    Args:
        paths: List of file or directory paths (must be absolute)
        extensions: Optional set of file extensions to include (defaults to CODE_EXTENSIONS)

    Returns:
        List of individual file paths, sorted for consistent ordering
    """
    if extensions is None:
        extensions = CODE_EXTENSIONS

    expanded_files = []
    seen = set()

    for path in paths:
        try:
            # Validate each path for security before processing
            path_obj = resolve_and_validate_path(path)
        except (ValueError, PermissionError):
            # Skip invalid paths silently to allow partial success
            continue

        if not path_obj.exists():
            continue

        # Safety checks for directory scanning
        if path_obj.is_dir():
            # Check 1: Prevent scanning user's home directory root
            if is_home_directory_root(path_obj):
                logger.warning(f"Skipping home directory root: {path}. Please specify a project subdirectory instead.")
                continue

            # Check 2: Skip if this is the MCP's own directory
            if is_mcp_directory(path_obj):
                logger.info(
                    f"Skipping MCP server directory: {path}. The MCP server code is excluded from project scans."
                )
                continue

        if path_obj.is_file():
            # Add file directly
            if str(path_obj) not in seen:
                expanded_files.append(str(path_obj))
                seen.add(str(path_obj))

        elif path_obj.is_dir():
            # Walk directory recursively to find all files
            for root, dirs, files in os.walk(path_obj):
                # Filter directories in-place to skip hidden and excluded directories
                # This prevents descending into .git, .venv, __pycache__, node_modules, etc.
                original_dirs = dirs[:]
                dirs[:] = []
                for d in original_dirs:
                    # Skip hidden directories
                    if d.startswith("."):
                        continue
                    # Skip excluded directories
                    if d in EXCLUDED_DIRS:
                        continue
                    # Skip MCP directories found during traversal
                    dir_path = Path(root) / d
                    if is_mcp_directory(dir_path):
                        logger.debug(f"Skipping MCP directory during traversal: {dir_path}")
                        continue
                    dirs.append(d)

                for file in files:
                    # Skip hidden files (e.g., .DS_Store, .gitignore)
                    if file.startswith("."):
                        continue

                    file_path = Path(root) / file

                    # Filter by extension if specified
                    if not extensions or file_path.suffix.lower() in extensions:
                        full_path = str(file_path)
                        # Use set to prevent duplicates
                        if full_path not in seen:
                            expanded_files.append(full_path)
                            seen.add(full_path)

    # Sort for consistent ordering across different runs
    # This makes output predictable and easier to debug
    expanded_files.sort()
    return expanded_files


def read_file_content(
    file_path: str, max_size: int = 1_000_000, *, include_line_numbers: Optional[bool] = None
) -> tuple[str, int]:
    """
    Read a single file and format it for inclusion in AI prompts.

    This function handles various error conditions gracefully and always
    returns formatted content, even for errors. This ensures the AI model
    gets context about what files were attempted but couldn't be read.

    Args:
        file_path: Path to file (must be absolute)
        max_size: Maximum file size to read (default 1MB to prevent memory issues)
        include_line_numbers: Whether to add line numbers. If None, auto-detects based on file type

    Returns:
        Tuple of (formatted_content, estimated_tokens)
        Content is wrapped with clear delimiters for AI parsing
    """
    logger.debug(f"[FILES] read_file_content called for: {file_path}")
    try:
        # Validate path security before any file operations
        path = resolve_and_validate_path(file_path)
        logger.debug(f"[FILES] Path validated and resolved: {path}")
    except (ValueError, PermissionError) as e:
        # Return error in a format that provides context to the AI
        logger.debug(f"[FILES] Path validation failed for {file_path}: {type(e).__name__}: {e}")
        error_msg = str(e)
        content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {error_msg}\n--- END FILE ---\n"
        tokens = estimate_tokens(content)
        logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
        return content, tokens

    try:
        # Validate file existence and type
        if not path.exists():
            logger.debug(f"[FILES] File does not exist: {file_path}")
            content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
            return content, estimate_tokens(content)

        if not path.is_file():
            logger.debug(f"[FILES] Path is not a file: {file_path}")
            content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
            return content, estimate_tokens(content)

        # Check file size to prevent memory exhaustion
        stat_result = path.stat()
        file_size = stat_result.st_size
        logger.debug(f"[FILES] File size for {file_path}: {file_size:,} bytes")
        if file_size > max_size:
            logger.debug(f"[FILES] File too large: {file_path} ({file_size:,} > {max_size:,} bytes)")
            modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
            content = (
                f"\n--- FILE TOO LARGE: {file_path} (Last modified: {modified_at}) ---\n"
                f"File size: {file_size:,} bytes (max: {max_size:,})\n"
                "--- END FILE ---\n"
            )
            return content, estimate_tokens(content)

        # Determine if we should add line numbers
        add_line_numbers = should_add_line_numbers(file_path, include_line_numbers)
        logger.debug(f"[FILES] Line numbers for {file_path}: {'enabled' if add_line_numbers else 'disabled'}")

        # Read the file with UTF-8 encoding, replacing invalid characters
        # This ensures we can handle files with mixed encodings
        logger.debug(f"[FILES] Reading file content for {file_path}")
        with open(path, encoding="utf-8", errors="replace") as f:
            file_content = f.read()

        logger.debug(f"[FILES] Successfully read {len(file_content)} characters from {file_path}")

        # Add line numbers if requested or auto-detected
        if add_line_numbers:
            file_content = _add_line_numbers(file_content)
            logger.debug(f"[FILES] Added line numbers to {file_path}")
        else:
            # Still normalize line endings for consistency
            file_content = _normalize_line_endings(file_content)

        # Format with clear delimiters that help the AI understand file boundaries
        # Using consistent markers makes it easier for the model to parse
        # NOTE: These markers ("--- BEGIN FILE: ... ---") are distinct from git diff markers
        # ("--- BEGIN DIFF: ... ---") to allow AI to distinguish between complete file content
        # vs. partial diff content when files appear in both sections
        modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
        formatted = (
            f"\n--- BEGIN FILE: {file_path} (Last modified: {modified_at}) ---\n"
            f"{file_content}\n"
            f"--- END FILE: {file_path} ---\n"
        )
        tokens = estimate_tokens(formatted)
        logger.debug(f"[FILES] Formatted content for {file_path}: {len(formatted)} chars, {tokens} tokens")
        return formatted, tokens

    except Exception as e:
        logger.debug(f"[FILES] Exception reading file {file_path}: {type(e).__name__}: {e}")
        content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
        tokens = estimate_tokens(content)
        logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
        return content, tokens


def read_files(
    file_paths: list[str],
    code: Optional[str] = None,
    max_tokens: Optional[int] = None,
    reserve_tokens: int = 50_000,
    *,
    include_line_numbers: bool = False,
) -> str:
    """
    Read multiple files and optional direct code with smart token management.

    This function implements intelligent token budgeting to maximize the amount
    of relevant content that can be included in an AI prompt while staying
    within token limits. It prioritizes direct code and reads files until
    the token budget is exhausted.

    Args:
        file_paths: List of file or directory paths (absolute paths required)
        code: Optional direct code to include (prioritized over files)
        max_tokens: Maximum tokens to use (defaults to DEFAULT_CONTEXT_WINDOW)
        reserve_tokens: Tokens to reserve for prompt and response (default 50K)
        include_line_numbers: Whether to add line numbers to file content

    Returns:
        str: All file contents formatted for AI consumption
    """
    if max_tokens is None:
        max_tokens = DEFAULT_CONTEXT_WINDOW

    logger.debug(f"[FILES] read_files called with {len(file_paths)} paths")
    logger.debug(
        f"[FILES] Token budget: max={max_tokens:,}, reserve={reserve_tokens:,}, available={max_tokens - reserve_tokens:,}"
    )

    content_parts = []
    total_tokens = 0
    available_tokens = max_tokens - reserve_tokens

    files_skipped = []

    # Priority 1: Handle direct code if provided
    # Direct code is prioritized because it's explicitly provided by the user
    if code:
        formatted_code = f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n"
        code_tokens = estimate_tokens(formatted_code)

        if code_tokens <= available_tokens:
            content_parts.append(formatted_code)
            total_tokens += code_tokens
            available_tokens -= code_tokens

    # Priority 2: Process file paths
    if file_paths:
        # Expand directories to get all individual files
        logger.debug(f"[FILES] Expanding {len(file_paths)} file paths")
        all_files = expand_paths(file_paths)
        logger.debug(f"[FILES] After expansion: {len(all_files)} individual files")

        if not all_files and file_paths:
            # No files found but paths were provided
            logger.debug("[FILES] No files found from provided paths")
            content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n")
        else:
            # Read files sequentially until token limit is reached
            logger.debug(f"[FILES] Reading {len(all_files)} files with token budget {available_tokens:,}")
            for i, file_path in enumerate(all_files):
                if total_tokens >= available_tokens:
                    logger.debug(f"[FILES] Token budget exhausted, skipping remaining {len(all_files) - i} files")
                    files_skipped.extend(all_files[i:])
                    break

                file_content, file_tokens = read_file_content(file_path, include_line_numbers=include_line_numbers)
                logger.debug(f"[FILES] File {file_path}: {file_tokens:,} tokens")

                # Check if adding this file would exceed limit
                if total_tokens + file_tokens <= available_tokens:
                    content_parts.append(file_content)
                    total_tokens += file_tokens
                    logger.debug(f"[FILES] Added file {file_path}, total tokens: {total_tokens:,}")
                else:
                    # File too large for remaining budget
                    logger.debug(
                        f"[FILES] File {file_path} too large for remaining budget ({file_tokens:,} tokens, {available_tokens - total_tokens:,} remaining)"
                    )
                    files_skipped.append(file_path)

    # Add informative note about skipped files to help users understand
    # what was omitted and why
    if files_skipped:
        logger.debug(f"[FILES] {len(files_skipped)} files skipped due to token limits")
        skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
        skip_note += f"Total skipped: {len(files_skipped)}\n"
        # Show first 10 skipped files as examples
        for _i, file_path in enumerate(files_skipped[:10]):
            skip_note += f"  - {file_path}\n"
        if len(files_skipped) > 10:
            skip_note += f"  ... and {len(files_skipped) - 10} more\n"
        skip_note += "--- END SKIPPED FILES ---\n"
        content_parts.append(skip_note)

    result = "\n\n".join(content_parts) if content_parts else ""
    logger.debug(f"[FILES] read_files complete: {len(result)} chars, {total_tokens:,} tokens used")
    return result


def estimate_file_tokens(file_path: str) -> int:
    """
    Estimate tokens for a file using file-type aware ratios.

    Args:
        file_path: Path to the file

    Returns:
        Estimated token count for the file
    """
    try:
        if not os.path.exists(file_path) or not os.path.isfile(file_path):
            return 0

        file_size = os.path.getsize(file_path)

        # Get the appropriate ratio for this file type
        from .file_types import get_token_estimation_ratio

        ratio = get_token_estimation_ratio(file_path)

        return int(file_size / ratio)
    except Exception:
        return 0


def check_files_size_limit(files: list[str], max_tokens: int, threshold_percent: float = 1.0) -> tuple[bool, int, int]:
    """
    Check if a list of files would exceed token limits.

    Args:
        files: List of file paths to check
        max_tokens: Maximum allowed tokens
        threshold_percent: Percentage of max_tokens to use as threshold (0.0-1.0)

    Returns:
        Tuple of (within_limit, total_estimated_tokens, file_count)
    """
    if not files:
        return True, 0, 0

    total_estimated_tokens = 0
    file_count = 0
    threshold = int(max_tokens * threshold_percent)

    for file_path in files:
        try:
            estimated_tokens = estimate_file_tokens(file_path)
            total_estimated_tokens += estimated_tokens
            if estimated_tokens > 0:  # Only count accessible files
                file_count += 1
        except Exception:
            # Skip files that can't be accessed for size check
            continue

    within_limit = total_estimated_tokens <= threshold
    return within_limit, total_estimated_tokens, file_count


def read_json_file(file_path: str) -> Optional[dict]:
    """
    Read and parse a JSON file with proper error handling.

    Args:
        file_path: Path to the JSON file

    Returns:
        Parsed JSON data as dict, or None if file doesn't exist or invalid
    """
    try:
        if not os.path.exists(file_path):
            return None

        with open(file_path, encoding="utf-8") as f:
            return json.load(f)
    except (json.JSONDecodeError, OSError):
        return None


def write_json_file(file_path: str, data: dict, indent: int = 2) -> bool:
    """
    Write data to a JSON file with proper formatting.

    Args:
        file_path: Path to write the JSON file
        data: Dictionary data to serialize
        indent: JSON indentation level

    Returns:
        True if successful, False otherwise
    """
    try:
        os.makedirs(os.path.dirname(file_path), exist_ok=True)

        with open(file_path, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=indent, ensure_ascii=False)
        return True
    except (OSError, TypeError):
        return False


def get_file_size(file_path: str) -> int:
    """
    Get file size in bytes with proper error handling.

    Args:
        file_path: Path to the file

    Returns:
        File size in bytes, or 0 if file doesn't exist or error
    """
    try:
        if os.path.exists(file_path) and os.path.isfile(file_path):
            return os.path.getsize(file_path)
        return 0
    except OSError:
        return 0


def ensure_directory_exists(file_path: str) -> bool:
    """
    Ensure the parent directory of a file path exists.

    Args:
        file_path: Path to file (directory will be created for parent)

    Returns:
        True if directory exists or was created, False on error
    """
    try:
        directory = os.path.dirname(file_path)
        if directory:
            os.makedirs(directory, exist_ok=True)
        return True
    except OSError:
        return False


def is_text_file(file_path: str) -> bool:
    """
    Check if a file is likely a text file based on extension and content.

    Args:
        file_path: Path to the file

    Returns:
        True if file appears to be text, False otherwise
    """
    from .file_types import is_text_file as check_text_type

    return check_text_type(file_path)


def read_file_safely(file_path: str, max_size: int = 10 * 1024 * 1024) -> Optional[str]:
    """
    Read a file with size limits and encoding handling.

    Args:
        file_path: Path to the file
        max_size: Maximum file size in bytes (default 10MB)

    Returns:
        File content as string, or None if file too large or unreadable
    """
    try:
        if not os.path.exists(file_path) or not os.path.isfile(file_path):
            return None

        file_size = os.path.getsize(file_path)
        if file_size > max_size:
            return None

        with open(file_path, encoding="utf-8", errors="ignore") as f:
            return f.read()
    except OSError:
        return None


def check_total_file_size(files: list[str], model_name: str) -> Optional[dict]:
    """
    Check if total file sizes would exceed token threshold before embedding.

    IMPORTANT: This performs STRICT REJECTION at MCP boundary.
    No partial inclusion - either all files fit or request is rejected.
    This forces the CLI to make better file selection decisions.

    This function MUST be called with the effective model name (after resolution).
    It should never receive 'auto' or None - model resolution happens earlier.

    Args:
        files: List of file paths to check
        model_name: The resolved model name for context-aware thresholds (required)

    Returns:
        Dict with `code_too_large` response if too large, None if acceptable
    """
    if not files:
        return None

    # Validate we have a proper model name (not auto or None)
    if not model_name or model_name.lower() == "auto":
        raise ValueError(
            f"check_total_file_size called with unresolved model: '{model_name}'. "
            "Model must be resolved before file size checking."
        )

    logger.info(f"File size check: Using model '{model_name}' for token limit calculation")

    from utils.model_context import ModelContext

    model_context = ModelContext(model_name)
    token_allocation = model_context.calculate_token_allocation()

    # Dynamic threshold based on model capacity
    context_window = token_allocation.total_tokens
    if context_window >= 1_000_000:  # Gemini-class models
        threshold_percent = 0.8  # Can be more generous
    elif context_window >= 500_000:  # Mid-range models
        threshold_percent = 0.7  # Moderate
    else:  # OpenAI-class models (200K)
        threshold_percent = 0.6  # Conservative

    max_file_tokens = int(token_allocation.file_tokens * threshold_percent)

    # Use centralized file size checking (threshold already applied to max_file_tokens)
    within_limit, total_estimated_tokens, file_count = check_files_size_limit(files, max_file_tokens)

    if not within_limit:
        return {
            "status": "code_too_large",
            "content": (
                f"The selected files are too large for analysis "
                f"(estimated {total_estimated_tokens:,} tokens, limit {max_file_tokens:,}). "
                f"Please select fewer, more specific files that are most relevant "
                f"to your question, then invoke the tool again."
            ),
            "content_type": "text",
            "metadata": {
                "total_estimated_tokens": total_estimated_tokens,
                "limit": max_file_tokens,
                "file_count": file_count,
                "threshold_percent": threshold_percent,
                "model_context_window": context_window,
                "model_name": model_name,
                "instructions": "Reduce file selection and try again - all files must fit within budget. If this persists, please use a model with a larger context window where available.",
            },
        }

    return None  # Proceed with ALL files

```
Page 13/19FirstPrevNextLast