#
tokens: 44044/50000 5/353 files (page 17/25)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 17 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   └── fix-github-issue.md
│   └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── documentation.yml
│   │   ├── feature_request.yml
│   │   └── tool_addition.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-pr.yml
│       ├── docker-release.yml
│       ├── semantic-pr.yml
│       ├── semantic-release.yml
│       └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   ├── constants.py
│   ├── models.py
│   ├── parsers
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│   ├── __init__.py
│   ├── azure_models.json
│   ├── cli_clients
│   │   ├── claude.json
│   │   ├── codex.json
│   │   └── gemini.json
│   ├── custom_models.json
│   ├── dial_models.json
│   ├── gemini_models.json
│   ├── openai_models.json
│   ├── openrouter_models.json
│   └── xai_models.json
├── config.py
├── docker
│   ├── README.md
│   └── scripts
│       ├── build.ps1
│       ├── build.sh
│       ├── deploy.ps1
│       ├── deploy.sh
│       └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── adding_providers.md
│   ├── adding_tools.md
│   ├── advanced-usage.md
│   ├── ai_banter.md
│   ├── ai-collaboration.md
│   ├── azure_openai.md
│   ├── configuration.md
│   ├── context-revival.md
│   ├── contributions.md
│   ├── custom_models.md
│   ├── docker-deployment.md
│   ├── gemini-setup.md
│   ├── getting-started.md
│   ├── index.md
│   ├── locale-configuration.md
│   ├── logging.md
│   ├── model_ranking.md
│   ├── testing.md
│   ├── tools
│   │   ├── analyze.md
│   │   ├── apilookup.md
│   │   ├── challenge.md
│   │   ├── chat.md
│   │   ├── clink.md
│   │   ├── codereview.md
│   │   ├── consensus.md
│   │   ├── debug.md
│   │   ├── docgen.md
│   │   ├── listmodels.md
│   │   ├── planner.md
│   │   ├── precommit.md
│   │   ├── refactor.md
│   │   ├── secaudit.md
│   │   ├── testgen.md
│   │   ├── thinkdeep.md
│   │   ├── tracer.md
│   │   └── version.md
│   ├── troubleshooting.md
│   ├── vcr-testing.md
│   └── wsl-setup.md
├── examples
│   ├── claude_config_macos.json
│   └── claude_config_wsl.json
├── LICENSE
├── providers
│   ├── __init__.py
│   ├── azure_openai.py
│   ├── base.py
│   ├── custom.py
│   ├── dial.py
│   ├── gemini.py
│   ├── openai_compatible.py
│   ├── openai.py
│   ├── openrouter.py
│   ├── registries
│   │   ├── __init__.py
│   │   ├── azure.py
│   │   ├── base.py
│   │   ├── custom.py
│   │   ├── dial.py
│   │   ├── gemini.py
│   │   ├── openai.py
│   │   ├── openrouter.py
│   │   └── xai.py
│   ├── registry_provider_mixin.py
│   ├── registry.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── model_capabilities.py
│   │   ├── model_response.py
│   │   ├── provider_type.py
│   │   └── temperature.py
│   └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│   └── sync_version.py
├── server.py
├── simulator_tests
│   ├── __init__.py
│   ├── base_test.py
│   ├── conversation_base_test.py
│   ├── log_utils.py
│   ├── test_analyze_validation.py
│   ├── test_basic_conversation.py
│   ├── test_chat_simple_validation.py
│   ├── test_codereview_validation.py
│   ├── test_consensus_conversation.py
│   ├── test_consensus_three_models.py
│   ├── test_consensus_workflow_accurate.py
│   ├── test_content_validation.py
│   ├── test_conversation_chain_validation.py
│   ├── test_cross_tool_comprehensive.py
│   ├── test_cross_tool_continuation.py
│   ├── test_debug_certain_confidence.py
│   ├── test_debug_validation.py
│   ├── test_line_number_validation.py
│   ├── test_logs_validation.py
│   ├── test_model_thinking_config.py
│   ├── test_o3_model_selection.py
│   ├── test_o3_pro_expensive.py
│   ├── test_ollama_custom_url.py
│   ├── test_openrouter_fallback.py
│   ├── test_openrouter_models.py
│   ├── test_per_tool_deduplication.py
│   ├── test_planner_continuation_history.py
│   ├── test_planner_validation_old.py
│   ├── test_planner_validation.py
│   ├── test_precommitworkflow_validation.py
│   ├── test_prompt_size_limit_bug.py
│   ├── test_refactor_validation.py
│   ├── test_secaudit_validation.py
│   ├── test_testgen_validation.py
│   ├── test_thinkdeep_validation.py
│   ├── test_token_allocation_validation.py
│   ├── test_vision_capability.py
│   └── test_xai_models.py
├── systemprompts
│   ├── __init__.py
│   ├── analyze_prompt.py
│   ├── chat_prompt.py
│   ├── clink
│   │   ├── codex_codereviewer.txt
│   │   ├── default_codereviewer.txt
│   │   ├── default_planner.txt
│   │   └── default.txt
│   ├── codereview_prompt.py
│   ├── consensus_prompt.py
│   ├── debug_prompt.py
│   ├── docgen_prompt.py
│   ├── generate_code_prompt.py
│   ├── planner_prompt.py
│   ├── precommit_prompt.py
│   ├── refactor_prompt.py
│   ├── secaudit_prompt.py
│   ├── testgen_prompt.py
│   ├── thinkdeep_prompt.py
│   └── tracer_prompt.py
├── tests
│   ├── __init__.py
│   ├── CASSETTE_MAINTENANCE.md
│   ├── conftest.py
│   ├── gemini_cassettes
│   │   ├── chat_codegen
│   │   │   └── gemini25_pro_calculator
│   │   │       └── mldev.json
│   │   ├── chat_cross
│   │   │   └── step1_gemini25_flash_number
│   │   │       └── mldev.json
│   │   └── consensus
│   │       └── step2_gemini25_flash_against
│   │           └── mldev.json
│   ├── http_transport_recorder.py
│   ├── mock_helpers.py
│   ├── openai_cassettes
│   │   ├── chat_cross_step2_gpt5_reminder.json
│   │   ├── chat_gpt5_continuation.json
│   │   ├── chat_gpt5_moon_distance.json
│   │   ├── consensus_step1_gpt5_for.json
│   │   └── o3_pro_basic_math.json
│   ├── pii_sanitizer.py
│   ├── sanitize_cassettes.py
│   ├── test_alias_target_restrictions.py
│   ├── test_auto_mode_comprehensive.py
│   ├── test_auto_mode_custom_provider_only.py
│   ├── test_auto_mode_model_listing.py
│   ├── test_auto_mode_provider_selection.py
│   ├── test_auto_mode.py
│   ├── test_auto_model_planner_fix.py
│   ├── test_azure_openai_provider.py
│   ├── test_buggy_behavior_prevention.py
│   ├── test_cassette_semantic_matching.py
│   ├── test_challenge.py
│   ├── test_chat_codegen_integration.py
│   ├── test_chat_cross_model_continuation.py
│   ├── test_chat_openai_integration.py
│   ├── test_chat_simple.py
│   ├── test_clink_claude_agent.py
│   ├── test_clink_claude_parser.py
│   ├── test_clink_codex_agent.py
│   ├── test_clink_gemini_agent.py
│   ├── test_clink_gemini_parser.py
│   ├── test_clink_integration.py
│   ├── test_clink_parsers.py
│   ├── test_clink_tool.py
│   ├── test_collaboration.py
│   ├── test_config.py
│   ├── test_consensus_integration.py
│   ├── test_consensus_schema.py
│   ├── test_consensus.py
│   ├── test_conversation_continuation_integration.py
│   ├── test_conversation_field_mapping.py
│   ├── test_conversation_file_features.py
│   ├── test_conversation_memory.py
│   ├── test_conversation_missing_files.py
│   ├── test_custom_openai_temperature_fix.py
│   ├── test_custom_provider.py
│   ├── test_debug.py
│   ├── test_deploy_scripts.py
│   ├── test_dial_provider.py
│   ├── test_directory_expansion_tracking.py
│   ├── test_disabled_tools.py
│   ├── test_docker_claude_desktop_integration.py
│   ├── test_docker_config_complete.py
│   ├── test_docker_healthcheck.py
│   ├── test_docker_implementation.py
│   ├── test_docker_mcp_validation.py
│   ├── test_docker_security.py
│   ├── test_docker_volume_persistence.py
│   ├── test_file_protection.py
│   ├── test_gemini_token_usage.py
│   ├── test_image_support_integration.py
│   ├── test_image_validation.py
│   ├── test_integration_utf8.py
│   ├── test_intelligent_fallback.py
│   ├── test_issue_245_simple.py
│   ├── test_large_prompt_handling.py
│   ├── test_line_numbers_integration.py
│   ├── test_listmodels_restrictions.py
│   ├── test_listmodels.py
│   ├── test_mcp_error_handling.py
│   ├── test_model_enumeration.py
│   ├── test_model_metadata_continuation.py
│   ├── test_model_resolution_bug.py
│   ├── test_model_restrictions.py
│   ├── test_o3_pro_output_text_fix.py
│   ├── test_o3_temperature_fix_simple.py
│   ├── test_openai_compatible_token_usage.py
│   ├── test_openai_provider.py
│   ├── test_openrouter_provider.py
│   ├── test_openrouter_registry.py
│   ├── test_parse_model_option.py
│   ├── test_per_tool_model_defaults.py
│   ├── test_pii_sanitizer.py
│   ├── test_pip_detection_fix.py
│   ├── test_planner.py
│   ├── test_precommit_workflow.py
│   ├── test_prompt_regression.py
│   ├── test_prompt_size_limit_bug_fix.py
│   ├── test_provider_retry_logic.py
│   ├── test_provider_routing_bugs.py
│   ├── test_provider_utf8.py
│   ├── test_providers.py
│   ├── test_rate_limit_patterns.py
│   ├── test_refactor.py
│   ├── test_secaudit.py
│   ├── test_server.py
│   ├── test_supported_models_aliases.py
│   ├── test_thinking_modes.py
│   ├── test_tools.py
│   ├── test_tracer.py
│   ├── test_utf8_localization.py
│   ├── test_utils.py
│   ├── test_uvx_resource_packaging.py
│   ├── test_uvx_support.py
│   ├── test_workflow_file_embedding.py
│   ├── test_workflow_metadata.py
│   ├── test_workflow_prompt_size_validation_simple.py
│   ├── test_workflow_utf8.py
│   ├── test_xai_provider.py
│   ├── transport_helpers.py
│   └── triangle.png
├── tools
│   ├── __init__.py
│   ├── analyze.py
│   ├── apilookup.py
│   ├── challenge.py
│   ├── chat.py
│   ├── clink.py
│   ├── codereview.py
│   ├── consensus.py
│   ├── debug.py
│   ├── docgen.py
│   ├── listmodels.py
│   ├── models.py
│   ├── planner.py
│   ├── precommit.py
│   ├── refactor.py
│   ├── secaudit.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── base_models.py
│   │   ├── base_tool.py
│   │   ├── exceptions.py
│   │   └── schema_builders.py
│   ├── simple
│   │   ├── __init__.py
│   │   └── base.py
│   ├── testgen.py
│   ├── thinkdeep.py
│   ├── tracer.py
│   ├── version.py
│   └── workflow
│       ├── __init__.py
│       ├── base.py
│       ├── schema_builders.py
│       └── workflow_mixin.py
├── utils
│   ├── __init__.py
│   ├── client_info.py
│   ├── conversation_memory.py
│   ├── env.py
│   ├── file_types.py
│   ├── file_utils.py
│   ├── image_utils.py
│   ├── model_context.py
│   ├── model_restrictions.py
│   ├── security_config.py
│   ├── storage_backend.py
│   └── token_utils.py
└── zen-mcp-server
```

# Files

--------------------------------------------------------------------------------
/tools/docgen.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Documentation Generation tool - Automated code documentation with complexity analysis
  3 | 
  4 | This tool provides a structured workflow for adding comprehensive documentation to codebases.
  5 | It guides you through systematic code analysis to generate modern documentation with:
  6 | - Function/method parameter documentation
  7 | - Big O complexity analysis
  8 | - Call flow and dependency documentation
  9 | - Inline comments for complex logic
 10 | - Smart updating of existing documentation
 11 | 
 12 | Key features:
 13 | - Step-by-step documentation workflow with progress tracking
 14 | - Context-aware file embedding (references during analysis, full content for documentation)
 15 | - Automatic conversation threading and history preservation
 16 | - Expert analysis integration with external models
 17 | - Support for multiple programming languages and documentation styles
 18 | - Configurable documentation features via parameters
 19 | """
 20 | 
 21 | import logging
 22 | from typing import TYPE_CHECKING, Any, Optional
 23 | 
 24 | from pydantic import Field
 25 | 
 26 | if TYPE_CHECKING:
 27 |     from tools.models import ToolModelCategory
 28 | 
 29 | from config import TEMPERATURE_ANALYTICAL
 30 | from systemprompts import DOCGEN_PROMPT
 31 | from tools.shared.base_models import WorkflowRequest
 32 | 
 33 | from .workflow.base import WorkflowTool
 34 | 
 35 | logger = logging.getLogger(__name__)
 36 | 
 37 | # Tool-specific field descriptions for documentation generation
 38 | DOCGEN_FIELD_DESCRIPTIONS = {
 39 |     "step": (
 40 |         "Step 1 (Discovery): list every file that needs documentation and record the total. Do not write docs yet. "
 41 |         "Steps 2+: document exactly one file per step. Never change code logic; log bugs separately. Keep the counters accurate."
 42 |     ),
 43 |     "step_number": "Current documentation step (starts at 1).",
 44 |     "total_steps": "1 discovery step + one step per file documented (tracks via `total_files_to_document`).",
 45 |     "next_step_required": "True while more files still need documentation; False once everything is complete.",
 46 |     "findings": "Summarize documentation gaps, complexity, call flows, and well-documented areas. Stop and report immediately if you uncover a bug.",
 47 |     "relevant_files": "Absolute paths for the file(s) you are documenting this step—stick to a single file per step.",
 48 |     "relevant_context": "Functions or methods needing documentation (e.g. 'Class.method', 'function_name'), especially complex or user-facing areas.",
 49 |     "num_files_documented": "Count of files finished so far. Increment only when a file is fully documented.",
 50 |     "total_files_to_document": "Total files identified in discovery; completion requires matching this count.",
 51 |     "document_complexity": "Include algorithmic complexity (Big O) analysis when True (default).",
 52 |     "document_flow": "Include call flow/dependency notes when True (default).",
 53 |     "update_existing": "True (default) to polish inaccurate or outdated docs instead of leaving them untouched.",
 54 |     "comments_on_complex_logic": "True (default) to add inline comments around non-obvious logic.",
 55 | }
 56 | 
 57 | 
 58 | class DocgenRequest(WorkflowRequest):
 59 |     """Request model for documentation generation steps"""
 60 | 
 61 |     # Required workflow fields
 62 |     step: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step"])
 63 |     step_number: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step_number"])
 64 |     total_steps: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["total_steps"])
 65 |     next_step_required: bool = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["next_step_required"])
 66 | 
 67 |     # Documentation analysis tracking fields
 68 |     findings: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["findings"])
 69 |     relevant_files: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_files"])
 70 |     relevant_context: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_context"])
 71 | 
 72 |     # Critical completion tracking counters
 73 |     num_files_documented: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"])
 74 |     total_files_to_document: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"])
 75 | 
 76 |     # Documentation generation configuration parameters
 77 |     document_complexity: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_complexity"])
 78 |     document_flow: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_flow"])
 79 |     update_existing: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["update_existing"])
 80 |     comments_on_complex_logic: Optional[bool] = Field(
 81 |         True, description=DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"]
 82 |     )
 83 | 
 84 | 
 85 | class DocgenTool(WorkflowTool):
 86 |     """
 87 |     Documentation generation tool for automated code documentation with complexity analysis.
 88 | 
 89 |     This tool implements a structured documentation workflow that guides users through
 90 |     methodical code analysis to generate comprehensive documentation including:
 91 |     - Function/method signatures and parameter descriptions
 92 |     - Algorithmic complexity (Big O) analysis
 93 |     - Call flow and dependency documentation
 94 |     - Inline comments for complex logic
 95 |     - Modern documentation style appropriate for the language/platform
 96 |     """
 97 | 
 98 |     def __init__(self):
 99 |         super().__init__()
100 |         self.initial_request = None
101 | 
102 |     def get_name(self) -> str:
103 |         return "docgen"
104 | 
105 |     def get_description(self) -> str:
106 |         return (
107 |             "Generates comprehensive code documentation with systematic analysis of functions, classes, and complexity. "
108 |             "Use for documentation generation, code analysis, complexity assessment, and API documentation. "
109 |             "Analyzes code structure and patterns to create thorough documentation."
110 |         )
111 | 
112 |     def get_system_prompt(self) -> str:
113 |         return DOCGEN_PROMPT
114 | 
115 |     def get_default_temperature(self) -> float:
116 |         return TEMPERATURE_ANALYTICAL
117 | 
118 |     def get_model_category(self) -> "ToolModelCategory":
119 |         """Docgen requires analytical and reasoning capabilities"""
120 |         from tools.models import ToolModelCategory
121 | 
122 |         return ToolModelCategory.EXTENDED_REASONING
123 | 
124 |     def requires_model(self) -> bool:
125 |         """
126 |         Docgen tool doesn't require model resolution at the MCP boundary.
127 | 
128 |         The docgen tool is a self-contained workflow tool that guides the CLI agent through
129 |         systematic documentation generation without calling external AI models.
130 | 
131 |         Returns:
132 |             bool: False - docgen doesn't need external AI model access
133 |         """
134 |         return False
135 | 
136 |     def requires_expert_analysis(self) -> bool:
137 |         """Docgen is self-contained and doesn't need expert analysis."""
138 |         return False
139 | 
140 |     def get_workflow_request_model(self):
141 |         """Return the docgen-specific request model."""
142 |         return DocgenRequest
143 | 
144 |     def get_tool_fields(self) -> dict[str, dict[str, Any]]:
145 |         """Return the tool-specific fields for docgen."""
146 |         return {
147 |             "document_complexity": {
148 |                 "type": "boolean",
149 |                 "default": True,
150 |                 "description": DOCGEN_FIELD_DESCRIPTIONS["document_complexity"],
151 |             },
152 |             "document_flow": {
153 |                 "type": "boolean",
154 |                 "default": True,
155 |                 "description": DOCGEN_FIELD_DESCRIPTIONS["document_flow"],
156 |             },
157 |             "update_existing": {
158 |                 "type": "boolean",
159 |                 "default": True,
160 |                 "description": DOCGEN_FIELD_DESCRIPTIONS["update_existing"],
161 |             },
162 |             "comments_on_complex_logic": {
163 |                 "type": "boolean",
164 |                 "default": True,
165 |                 "description": DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"],
166 |             },
167 |             "num_files_documented": {
168 |                 "type": "integer",
169 |                 "default": 0,
170 |                 "minimum": 0,
171 |                 "description": DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"],
172 |             },
173 |             "total_files_to_document": {
174 |                 "type": "integer",
175 |                 "default": 0,
176 |                 "minimum": 0,
177 |                 "description": DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"],
178 |             },
179 |         }
180 | 
181 |     def get_required_fields(self) -> list[str]:
182 |         """Return additional required fields beyond the standard workflow requirements."""
183 |         return [
184 |             "document_complexity",
185 |             "document_flow",
186 |             "update_existing",
187 |             "comments_on_complex_logic",
188 |             "num_files_documented",
189 |             "total_files_to_document",
190 |         ]
191 | 
192 |     def get_input_schema(self) -> dict[str, Any]:
193 |         """Generate input schema using WorkflowSchemaBuilder with field exclusions."""
194 |         from .workflow.schema_builders import WorkflowSchemaBuilder
195 | 
196 |         # Exclude workflow fields that documentation generation doesn't need
197 |         excluded_workflow_fields = [
198 |             "confidence",  # Documentation doesn't use confidence levels
199 |             "hypothesis",  # Documentation doesn't use hypothesis
200 |             "files_checked",  # Documentation uses doc_files and doc_methods instead for better tracking
201 |         ]
202 | 
203 |         # Exclude common fields that documentation generation doesn't need
204 |         excluded_common_fields = [
205 |             "model",  # Documentation doesn't need external model selection
206 |             "temperature",  # Documentation doesn't need temperature control
207 |             "thinking_mode",  # Documentation doesn't need thinking mode
208 |             "images",  # Documentation doesn't use images
209 |         ]
210 | 
211 |         return WorkflowSchemaBuilder.build_schema(
212 |             tool_specific_fields=self.get_tool_fields(),
213 |             required_fields=self.get_required_fields(),  # Include docgen-specific required fields
214 |             model_field_schema=None,  # Exclude model field - docgen doesn't need external model selection
215 |             auto_mode=False,  # Force non-auto mode to prevent model field addition
216 |             tool_name=self.get_name(),
217 |             excluded_workflow_fields=excluded_workflow_fields,
218 |             excluded_common_fields=excluded_common_fields,
219 |         )
220 | 
221 |     def get_required_actions(
222 |         self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
223 |     ) -> list[str]:
224 |         """Define required actions for comprehensive documentation analysis with step-by-step file focus."""
225 |         if step_number == 1:
226 |             # Initial discovery ONLY - no documentation yet
227 |             return [
228 |                 "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
229 |                 "Discover ALL files in the current directory (not nested) that need documentation",
230 |                 "COUNT the exact number of files that need documentation",
231 |                 "LIST all the files you found that need documentation by name",
232 |                 "IDENTIFY the programming language(s) to use MODERN documentation style (/// for Objective-C, /** */ for Java/JavaScript, etc.)",
233 |                 "DO NOT start documenting any files yet - this is discovery phase only",
234 |                 "Report the total count and file list clearly to the user",
235 |                 "IMMEDIATELY call docgen step 2 after discovery to begin documentation phase",
236 |                 "WHEN CALLING DOCGEN step 2: Set total_files_to_document to the exact count you found",
237 |                 "WHEN CALLING DOCGEN step 2: Set num_files_documented to 0 (haven't started yet)",
238 |             ]
239 |         elif step_number == 2:
240 |             # Start documentation phase with first file
241 |             return [
242 |                 "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
243 |                 "Choose the FIRST file from your discovered list to start documentation",
244 |                 "For the chosen file: identify ALL functions, classes, and methods within it",
245 |                 'USE MODERN documentation style for the programming language (/// for Objective-C, /** */ for Java/JavaScript, """ for Python, etc.)',
246 |                 "Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
247 |                 "When file is 100% documented, increment num_files_documented from 0 to 1",
248 |                 "Note any dependencies this file has (what it imports/calls) and what calls into it",
249 |                 "CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
250 |                 "Report which specific functions you documented in this step for accountability",
251 |                 "Report progress: num_files_documented (1) out of total_files_to_document",
252 |             ]
253 |         elif step_number <= 4:
254 |             # Continue with focused file-by-file approach
255 |             return [
256 |                 "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
257 |                 "Choose the NEXT undocumented file from your discovered list",
258 |                 "For the chosen file: identify ALL functions, classes, and methods within it",
259 |                 "USE MODERN documentation style for the programming language (NEVER use legacy /* */ style for languages with modern alternatives)",
260 |                 "Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
261 |                 "When file is 100% documented, increment num_files_documented by 1",
262 |                 "Verify that EVERY function in the current file has proper documentation (no skipping)",
263 |                 "CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
264 |                 "Report specific function names you documented for verification",
265 |                 "Report progress: current num_files_documented out of total_files_to_document",
266 |             ]
267 |         else:
268 |             # Continue systematic file-by-file coverage
269 |             return [
270 |                 "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
271 |                 "Check counters: num_files_documented vs total_files_to_document",
272 |                 "If num_files_documented < total_files_to_document: choose NEXT undocumented file",
273 |                 "USE MODERN documentation style appropriate for each programming language (NEVER legacy styles)",
274 |                 "Document every function, method, and class in current file with no exceptions",
275 |                 "When file is 100% documented, increment num_files_documented by 1",
276 |                 "CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
277 |                 "Report progress: current num_files_documented out of total_files_to_document",
278 |                 "If num_files_documented < total_files_to_document: RESTART docgen with next step",
279 |                 "ONLY set next_step_required=false when num_files_documented equals total_files_to_document",
280 |                 "For nested dependencies: check if functions call into subdirectories and document those too",
281 |                 "CRITICAL: If ANY bugs/logic errors were found, STOP and ask user before proceeding",
282 |             ]
283 | 
284 |     def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
285 |         """Docgen is self-contained and doesn't need expert analysis."""
286 |         return False
287 | 
288 |     def prepare_expert_analysis_context(self, consolidated_findings) -> str:
289 |         """Docgen doesn't use expert analysis."""
290 |         return ""
291 | 
292 |     def get_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
293 |         """
294 |         Provide step-specific guidance for documentation generation workflow.
295 | 
296 |         This method generates docgen-specific guidance used by get_step_guidance_message().
297 |         """
298 |         # Generate the next steps instruction based on required actions
299 |         # Calculate dynamic total_steps based on files to document
300 |         total_files_to_document = self.get_request_total_files_to_document(request)
301 |         calculated_total_steps = 1 + total_files_to_document if total_files_to_document > 0 else request.total_steps
302 | 
303 |         required_actions = self.get_required_actions(step_number, confidence, request.findings, calculated_total_steps)
304 | 
305 |         if step_number == 1:
306 |             next_steps = (
307 |                 f"DISCOVERY PHASE ONLY - DO NOT START DOCUMENTING YET!\n"
308 |                 f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first perform "
309 |                 f"FILE DISCOVERY step by step. DO NOT DOCUMENT ANYTHING YET. "
310 |                 f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
311 |                 + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
312 |                 + f"\n\nCRITICAL: When you call {self.get_name()} step 2, set total_files_to_document to the exact count "
313 |                 f"of files needing documentation and set num_files_documented to 0 (haven't started documenting yet). "
314 |                 f"Your total_steps will be automatically calculated as 1 (discovery) + number of files to document. "
315 |                 f"Step 2 will BEGIN the documentation phase. Report the count clearly and then IMMEDIATELY "
316 |                 f"proceed to call {self.get_name()} step 2 to start documenting the first file."
317 |             )
318 |         elif step_number == 2:
319 |             next_steps = (
320 |                 f"DOCUMENTATION PHASE BEGINS! ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
321 |                 f"START FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
322 |                 f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
323 |                 + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
324 |                 + f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented from 0 to 1 when first file complete. "
325 |                 f"REPORT counters: current num_files_documented out of total_files_to_document. "
326 |                 f"CRITICAL: If you found ANY bugs/logic errors, STOP documenting and ask user what to do before continuing. "
327 |                 f"Do NOT move to a new file until the current one is completely documented. "
328 |                 f"When ready for step {step_number + 1}, report completed work with updated counters."
329 |             )
330 |         elif step_number <= 4:
331 |             next_steps = (
332 |                 f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
333 |                 f"CONTINUE FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
334 |                 f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
335 |                 + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
336 |                 + f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented when file complete. "
337 |                 f"REPORT counters: current num_files_documented out of total_files_to_document. "
338 |                 f"CRITICAL: If you found ANY bugs/logic errors, STOP documenting and ask user what to do before continuing. "
339 |                 f"Do NOT move to a new file until the current one is completely documented. "
340 |                 f"When ready for step {step_number + 1}, report completed work with updated counters."
341 |             )
342 |         else:
343 |             next_steps = (
344 |                 f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
345 |                 f"CRITICAL: Check if MORE FILES need documentation before finishing! "
346 |                 f"REQUIRED ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
347 |                 + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
348 |                 + f"\n\nREPORT which functions you documented and update num_files_documented when file complete. "
349 |                 f"CHECK: If num_files_documented < total_files_to_document, RESTART {self.get_name()} with next step! "
350 |                 f"CRITICAL: Only set next_step_required=false when num_files_documented equals total_files_to_document! "
351 |                 f"REPORT counters: current num_files_documented out of total_files_to_document. "
352 |                 f"CRITICAL: If ANY bugs/logic errors were found during documentation, STOP and ask user before proceeding. "
353 |                 f"NO recursive {self.get_name()} calls without actual documentation work!"
354 |             )
355 | 
356 |         return {"next_steps": next_steps}
357 | 
358 |     # Hook method overrides for docgen-specific behavior
359 | 
360 |     async def handle_work_completion(self, response_data: dict, request, arguments: dict) -> dict:
361 |         """
362 |         Override work completion to enforce counter validation.
363 | 
364 |         The docgen tool MUST complete ALL files before finishing. If counters don't match,
365 |         force continuation regardless of next_step_required setting.
366 |         """
367 |         # CRITICAL VALIDATION: Check if all files have been documented using proper inheritance hooks
368 |         num_files_documented = self.get_request_num_files_documented(request)
369 |         total_files_to_document = self.get_request_total_files_to_document(request)
370 | 
371 |         if num_files_documented < total_files_to_document:
372 |             # Counters don't match - force continuation!
373 |             logger.warning(
374 |                 f"Docgen stopping early: {num_files_documented} < {total_files_to_document}. "
375 |                 f"Forcing continuation to document remaining files."
376 |             )
377 | 
378 |             # Override to continuation mode
379 |             response_data["status"] = "documentation_analysis_required"
380 |             response_data[f"pause_for_{self.get_name()}"] = True
381 |             response_data["next_steps"] = (
382 |                 f"CRITICAL ERROR: You attempted to finish documentation with only {num_files_documented} "
383 |                 f"out of {total_files_to_document} files documented! You MUST continue documenting "
384 |                 f"the remaining {total_files_to_document - num_files_documented} files. "
385 |                 f"Call {self.get_name()} again with step {request.step_number + 1} and continue documentation "
386 |                 f"of the next undocumented file. DO NOT set next_step_required=false until ALL files are documented!"
387 |             )
388 |             return response_data
389 | 
390 |         # If counters match, proceed with normal completion
391 |         return await super().handle_work_completion(response_data, request, arguments)
392 | 
393 |     def prepare_step_data(self, request) -> dict:
394 |         """
395 |         Prepare docgen-specific step data for processing.
396 | 
397 |         Calculates total_steps dynamically based on number of files to document:
398 |         - Step 1: Discovery phase
399 |         - Steps 2+: One step per file to document
400 |         """
401 |         # Calculate dynamic total_steps based on files to document
402 |         total_files_to_document = self.get_request_total_files_to_document(request)
403 |         if total_files_to_document > 0:
404 |             # Discovery step (1) + one step per file
405 |             calculated_total_steps = 1 + total_files_to_document
406 |         else:
407 |             # Fallback to request total_steps if no file count available
408 |             calculated_total_steps = request.total_steps
409 | 
410 |         step_data = {
411 |             "step": request.step,
412 |             "step_number": request.step_number,
413 |             "total_steps": calculated_total_steps,  # Use calculated value
414 |             "findings": request.findings,
415 |             "relevant_files": request.relevant_files,
416 |             "relevant_context": request.relevant_context,
417 |             "num_files_documented": request.num_files_documented,
418 |             "total_files_to_document": request.total_files_to_document,
419 |             "issues_found": [],  # Docgen uses this for documentation gaps
420 |             "confidence": "medium",  # Default confidence for docgen
421 |             "hypothesis": "systematic_documentation_needed",  # Default hypothesis
422 |             "images": [],  # Docgen doesn't typically use images
423 |             # CRITICAL: Include documentation configuration parameters so the model can see them
424 |             "document_complexity": request.document_complexity,
425 |             "document_flow": request.document_flow,
426 |             "update_existing": request.update_existing,
427 |             "comments_on_complex_logic": request.comments_on_complex_logic,
428 |         }
429 |         return step_data
430 | 
431 |     def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
432 |         """
433 |         Docgen tool skips expert analysis when the CLI agent has "certain" confidence.
434 |         """
435 |         return request.confidence == "certain" and not request.next_step_required
436 | 
437 |     # Override inheritance hooks for docgen-specific behavior
438 | 
439 |     def get_completion_status(self) -> str:
440 |         """Docgen tools use docgen-specific status."""
441 |         return "documentation_analysis_complete"
442 | 
443 |     def get_completion_data_key(self) -> str:
444 |         """Docgen uses 'complete_documentation_analysis' key."""
445 |         return "complete_documentation_analysis"
446 | 
447 |     def get_final_analysis_from_request(self, request):
448 |         """Docgen tools use 'hypothesis' field for documentation strategy."""
449 |         return request.hypothesis
450 | 
451 |     def get_confidence_level(self, request) -> str:
452 |         """Docgen tools use 'certain' for high confidence."""
453 |         return request.confidence or "high"
454 | 
455 |     def get_completion_message(self) -> str:
456 |         """Docgen-specific completion message."""
457 |         return (
458 |             "Documentation analysis complete with high confidence. You have identified the comprehensive "
459 |             "documentation needs and strategy. MANDATORY: Present the user with the documentation plan "
460 |             "and IMMEDIATELY proceed with implementing the documentation without requiring further "
461 |             "consultation. Focus on the precise documentation improvements needed."
462 |         )
463 | 
464 |     def get_skip_reason(self) -> str:
465 |         """Docgen-specific skip reason."""
466 |         return "Completed comprehensive documentation analysis locally"
467 | 
468 |     def get_request_relevant_context(self, request) -> list:
469 |         """Get relevant_context for docgen tool."""
470 |         try:
471 |             return request.relevant_context or []
472 |         except AttributeError:
473 |             return []
474 | 
475 |     def get_request_num_files_documented(self, request) -> int:
476 |         """Get num_files_documented from request. Override for custom handling."""
477 |         try:
478 |             return request.num_files_documented or 0
479 |         except AttributeError:
480 |             return 0
481 | 
482 |     def get_request_total_files_to_document(self, request) -> int:
483 |         """Get total_files_to_document from request. Override for custom handling."""
484 |         try:
485 |             return request.total_files_to_document or 0
486 |         except AttributeError:
487 |             return 0
488 | 
489 |     def get_skip_expert_analysis_status(self) -> str:
490 |         """Docgen-specific expert analysis skip status."""
491 |         return "skipped_due_to_complete_analysis"
492 | 
493 |     def prepare_work_summary(self) -> str:
494 |         """Docgen-specific work summary."""
495 |         try:
496 |             return f"Completed {len(self.work_history)} documentation analysis steps"
497 |         except AttributeError:
498 |             return "Completed documentation analysis"
499 | 
500 |     def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
501 |         """
502 |         Docgen-specific completion message.
503 |         """
504 |         return (
505 |             "DOCUMENTATION ANALYSIS IS COMPLETE FOR ALL FILES (num_files_documented equals total_files_to_document). "
506 |             "MANDATORY FINAL VERIFICATION: Before presenting your summary, you MUST perform a final verification scan. "
507 |             "Read through EVERY file you documented and check EVERY function, method, class, and property to confirm "
508 |             "it has proper documentation including complexity analysis and call flow information. If ANY items lack "
509 |             "documentation, document them immediately before finishing. "
510 |             "THEN present a clear summary showing: 1) Final counters: num_files_documented out of total_files_to_document, "
511 |             "2) Complete accountability list of ALL files you documented with verification status, "
512 |             "3) Detailed list of EVERY function/method you documented in each file (proving complete coverage), "
513 |             "4) Any dependency relationships you discovered between files, 5) Recommended documentation improvements with concrete examples including "
514 |             "complexity analysis and call flow information. 6) **CRITICAL**: List any bugs or logic issues you found "
515 |             "during documentation but did NOT fix - present these to the user and ask what they'd like to do about them. "
516 |             "Make it easy for a developer to see the complete documentation status across the entire codebase with full accountability."
517 |         )
518 | 
519 |     def get_step_guidance_message(self, request) -> str:
520 |         """
521 |         Docgen-specific step guidance with detailed analysis instructions.
522 |         """
523 |         step_guidance = self.get_step_guidance(request.step_number, request.confidence, request)
524 |         return step_guidance["next_steps"]
525 | 
526 |     def customize_workflow_response(self, response_data: dict, request) -> dict:
527 |         """
528 |         Customize response to match docgen tool format.
529 |         """
530 |         # Store initial request on first step
531 |         if request.step_number == 1:
532 |             self.initial_request = request.step
533 | 
534 |         # Convert generic status names to docgen-specific ones
535 |         tool_name = self.get_name()
536 |         status_mapping = {
537 |             f"{tool_name}_in_progress": "documentation_analysis_in_progress",
538 |             f"pause_for_{tool_name}": "pause_for_documentation_analysis",
539 |             f"{tool_name}_required": "documentation_analysis_required",
540 |             f"{tool_name}_complete": "documentation_analysis_complete",
541 |         }
542 | 
543 |         if response_data["status"] in status_mapping:
544 |             response_data["status"] = status_mapping[response_data["status"]]
545 | 
546 |         # Rename status field to match docgen tool
547 |         if f"{tool_name}_status" in response_data:
548 |             response_data["documentation_analysis_status"] = response_data.pop(f"{tool_name}_status")
549 |             # Add docgen-specific status fields
550 |             response_data["documentation_analysis_status"]["documentation_strategies"] = len(
551 |                 self.consolidated_findings.hypotheses
552 |             )
553 | 
554 |         # Rename complete documentation analysis data
555 |         if f"complete_{tool_name}" in response_data:
556 |             response_data["complete_documentation_analysis"] = response_data.pop(f"complete_{tool_name}")
557 | 
558 |         # Map the completion flag to match docgen tool
559 |         if f"{tool_name}_complete" in response_data:
560 |             response_data["documentation_analysis_complete"] = response_data.pop(f"{tool_name}_complete")
561 | 
562 |         # Map the required flag to match docgen tool
563 |         if f"{tool_name}_required" in response_data:
564 |             response_data["documentation_analysis_required"] = response_data.pop(f"{tool_name}_required")
565 | 
566 |         return response_data
567 | 
568 |     # Required abstract methods from BaseTool
569 |     def get_request_model(self):
570 |         """Return the docgen-specific request model."""
571 |         return DocgenRequest
572 | 
573 |     async def prepare_prompt(self, request) -> str:
574 |         """Not used - workflow tools use execute_workflow()."""
575 |         return ""  # Workflow tools use execute_workflow() directly
576 | 
```

--------------------------------------------------------------------------------
/tests/test_conversation_memory.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Test suite for conversation memory system
  3 | 
  4 | Tests the Redis-based conversation persistence needed for AI-to-AI multi-turn
  5 | discussions in stateless MCP environments.
  6 | """
  7 | 
  8 | import os
  9 | from unittest.mock import Mock, patch
 10 | 
 11 | import pytest
 12 | 
 13 | from server import get_follow_up_instructions
 14 | from utils.conversation_memory import (
 15 |     CONVERSATION_TIMEOUT_SECONDS,
 16 |     MAX_CONVERSATION_TURNS,
 17 |     ConversationTurn,
 18 |     ThreadContext,
 19 |     add_turn,
 20 |     build_conversation_history,
 21 |     create_thread,
 22 |     get_thread,
 23 | )
 24 | 
 25 | 
 26 | class TestConversationMemory:
 27 |     """Test the conversation memory system for stateless MCP requests"""
 28 | 
 29 |     @patch("utils.conversation_memory.get_storage")
 30 |     def test_create_thread(self, mock_storage):
 31 |         """Test creating a new thread"""
 32 |         mock_client = Mock()
 33 |         mock_storage.return_value = mock_client
 34 | 
 35 |         thread_id = create_thread("chat", {"prompt": "Hello", "absolute_file_paths": ["/test.py"]})
 36 | 
 37 |         assert thread_id is not None
 38 |         assert len(thread_id) == 36  # UUID4 length
 39 | 
 40 |         # Verify Redis was called
 41 |         mock_client.setex.assert_called_once()
 42 |         call_args = mock_client.setex.call_args
 43 |         assert call_args[0][0] == f"thread:{thread_id}"  # key
 44 |         assert call_args[0][1] == CONVERSATION_TIMEOUT_SECONDS  # TTL from configuration
 45 | 
 46 |     @patch("utils.conversation_memory.get_storage")
 47 |     def test_get_thread_valid(self, mock_storage):
 48 |         """Test retrieving an existing thread"""
 49 |         mock_client = Mock()
 50 |         mock_storage.return_value = mock_client
 51 | 
 52 |         test_uuid = "12345678-1234-1234-1234-123456789012"
 53 | 
 54 |         # Create valid ThreadContext and serialize it
 55 |         context_obj = ThreadContext(
 56 |             thread_id=test_uuid,
 57 |             created_at="2023-01-01T00:00:00Z",
 58 |             last_updated_at="2023-01-01T00:01:00Z",
 59 |             tool_name="chat",
 60 |             turns=[],
 61 |             initial_context={"prompt": "test"},
 62 |         )
 63 |         mock_client.get.return_value = context_obj.model_dump_json()
 64 | 
 65 |         context = get_thread(test_uuid)
 66 | 
 67 |         assert context is not None
 68 |         assert context.thread_id == test_uuid
 69 |         assert context.tool_name == "chat"
 70 |         mock_client.get.assert_called_once_with(f"thread:{test_uuid}")
 71 | 
 72 |     @patch("utils.conversation_memory.get_storage")
 73 |     def test_get_thread_invalid_uuid(self, mock_storage):
 74 |         """Test handling invalid UUID"""
 75 |         context = get_thread("invalid-uuid")
 76 |         assert context is None
 77 | 
 78 |     @patch("utils.conversation_memory.get_storage")
 79 |     def test_get_thread_not_found(self, mock_storage):
 80 |         """Test handling thread not found"""
 81 |         mock_client = Mock()
 82 |         mock_storage.return_value = mock_client
 83 |         mock_client.get.return_value = None
 84 | 
 85 |         context = get_thread("12345678-1234-1234-1234-123456789012")
 86 |         assert context is None
 87 | 
 88 |     @patch("utils.conversation_memory.get_storage")
 89 |     def test_add_turn_success(self, mock_storage):
 90 |         """Test adding a turn to existing thread"""
 91 |         mock_client = Mock()
 92 |         mock_storage.return_value = mock_client
 93 | 
 94 |         test_uuid = "12345678-1234-1234-1234-123456789012"
 95 | 
 96 |         # Create valid ThreadContext
 97 |         context_obj = ThreadContext(
 98 |             thread_id=test_uuid,
 99 |             created_at="2023-01-01T00:00:00Z",
100 |             last_updated_at="2023-01-01T00:01:00Z",
101 |             tool_name="chat",
102 |             turns=[],
103 |             initial_context={"prompt": "test"},
104 |         )
105 |         mock_client.get.return_value = context_obj.model_dump_json()
106 | 
107 |         success = add_turn(test_uuid, "user", "Hello there")
108 | 
109 |         assert success is True
110 |         # Verify Redis get and setex were called
111 |         mock_client.get.assert_called_once()
112 |         mock_client.setex.assert_called_once()
113 | 
114 |     @patch("utils.conversation_memory.get_storage")
115 |     def test_add_turn_max_limit(self, mock_storage):
116 |         """Test turn limit enforcement"""
117 |         mock_client = Mock()
118 |         mock_storage.return_value = mock_client
119 | 
120 |         test_uuid = "12345678-1234-1234-1234-123456789012"
121 | 
122 |         # Create thread with MAX_CONVERSATION_TURNS turns (at limit)
123 |         turns = [
124 |             ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
125 |             for i in range(MAX_CONVERSATION_TURNS)
126 |         ]
127 |         context_obj = ThreadContext(
128 |             thread_id=test_uuid,
129 |             created_at="2023-01-01T00:00:00Z",
130 |             last_updated_at="2023-01-01T00:01:00Z",
131 |             tool_name="chat",
132 |             turns=turns,
133 |             initial_context={"prompt": "test"},
134 |         )
135 |         mock_client.get.return_value = context_obj.model_dump_json()
136 | 
137 |         success = add_turn(test_uuid, "user", "This should fail")
138 | 
139 |         assert success is False
140 | 
141 |     @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
142 |     def test_build_conversation_history(self, project_path):
143 |         """Test building conversation history format with files and speaker identification"""
144 |         from providers.registry import ModelProviderRegistry
145 | 
146 |         ModelProviderRegistry.clear_cache()
147 | 
148 |         # Create real test files to test actual file embedding functionality
149 |         main_file = project_path / "main.py"
150 |         readme_file = project_path / "docs" / "readme.md"
151 |         examples_dir = project_path / "examples"
152 |         examples_file = examples_dir / "example.py"
153 | 
154 |         # Create directories and files
155 |         readme_file.parent.mkdir(parents=True, exist_ok=True)
156 |         examples_dir.mkdir(parents=True, exist_ok=True)
157 | 
158 |         main_file.write_text("def main():\n    print('Hello world')\n")
159 |         readme_file.write_text("# Project Documentation\nThis is a test project.\n")
160 |         examples_file.write_text("# Example code\nprint('Example')\n")
161 | 
162 |         test_uuid = "12345678-1234-1234-1234-123456789012"
163 | 
164 |         turns = [
165 |             ConversationTurn(
166 |                 role="user",
167 |                 content="What is Python?",
168 |                 timestamp="2023-01-01T00:00:00Z",
169 |                 files=[str(main_file), str(readme_file)],
170 |             ),
171 |             ConversationTurn(
172 |                 role="assistant",
173 |                 content="Python is a programming language",
174 |                 timestamp="2023-01-01T00:01:00Z",
175 |                 files=[str(examples_dir)],  # Directory will be expanded to files
176 |                 tool_name="chat",
177 |                 model_name="gpt-5",
178 |                 model_provider="openai",
179 |             ),
180 |         ]
181 | 
182 |         context = ThreadContext(
183 |             thread_id=test_uuid,
184 |             created_at="2023-01-01T00:00:00Z",
185 |             last_updated_at="2023-01-01T00:01:00Z",
186 |             tool_name="chat",
187 |             turns=turns,
188 |             initial_context={},
189 |         )
190 | 
191 |         history, tokens = build_conversation_history(context, model_context=None)
192 | 
193 |         # Test basic structure
194 |         assert "CONVERSATION HISTORY" in history
195 |         assert f"Thread: {test_uuid}" in history
196 |         assert "Tool: chat" in history
197 |         assert f"Turn 2/{MAX_CONVERSATION_TURNS}" in history
198 | 
199 |         # Test speaker identification
200 |         assert "--- Turn 1 (Agent) ---" in history
201 |         assert "--- Turn 2 (gpt-5 using chat via openai) ---" in history
202 | 
203 |         # Test content
204 |         assert "What is Python?" in history
205 |         assert "Python is a programming language" in history
206 | 
207 |         # Test file tracking
208 |         # Check that the new file embedding section is included
209 |         assert "=== FILES REFERENCED IN THIS CONVERSATION ===" in history
210 |         assert "The following files have been shared and analyzed during our conversation." in history
211 | 
212 |         # Check that file context from previous turns is included (now shows files used per turn)
213 |         assert f"Files used in this turn: {main_file}, {readme_file}" in history
214 |         assert f"Files used in this turn: {examples_dir}" in history
215 | 
216 |         # Verify actual file content is embedded
217 |         assert "def main():" in history
218 |         assert "Hello world" in history
219 |         assert "Project Documentation" in history
220 | 
221 |     def test_build_conversation_history_empty(self):
222 |         """Test building history with no turns"""
223 |         test_uuid = "12345678-1234-1234-1234-123456789012"
224 | 
225 |         context = ThreadContext(
226 |             thread_id=test_uuid,
227 |             created_at="2023-01-01T00:00:00Z",
228 |             last_updated_at="2023-01-01T00:00:00Z",
229 |             tool_name="chat",
230 |             turns=[],
231 |             initial_context={},
232 |         )
233 | 
234 |         history, tokens = build_conversation_history(context, model_context=None)
235 |         assert history == ""
236 |         assert tokens == 0
237 | 
238 | 
239 | class TestConversationFlow:
240 |     """Test complete conversation flows simulating stateless MCP requests"""
241 | 
242 |     @patch("utils.conversation_memory.get_storage")
243 |     def test_complete_conversation_cycle(self, mock_storage):
244 |         """Test a complete 5-turn conversation until limit reached"""
245 |         mock_client = Mock()
246 |         mock_storage.return_value = mock_client
247 | 
248 |         # Simulate independent MCP request cycles
249 | 
250 |         # REQUEST 1: Initial request creates thread
251 |         thread_id = create_thread("chat", {"prompt": "Analyze this code"})
252 |         initial_context = ThreadContext(
253 |             thread_id=thread_id,
254 |             created_at="2023-01-01T00:00:00Z",
255 |             last_updated_at="2023-01-01T00:00:00Z",
256 |             tool_name="chat",
257 |             turns=[],
258 |             initial_context={"prompt": "Analyze this code"},
259 |         )
260 |         mock_client.get.return_value = initial_context.model_dump_json()
261 | 
262 |         # Add assistant response
263 |         success = add_turn(
264 |             thread_id,
265 |             "assistant",
266 |             "Code analysis complete",
267 |         )
268 |         assert success is True
269 | 
270 |         # REQUEST 2: User responds to follow-up (independent request cycle)
271 |         # Simulate retrieving updated context from Redis
272 |         context_after_1 = ThreadContext(
273 |             thread_id=thread_id,
274 |             created_at="2023-01-01T00:00:00Z",
275 |             last_updated_at="2023-01-01T00:01:00Z",
276 |             tool_name="chat",
277 |             turns=[
278 |                 ConversationTurn(
279 |                     role="assistant",
280 |                     content="Code analysis complete",
281 |                     timestamp="2023-01-01T00:00:30Z",
282 |                 )
283 |             ],
284 |             initial_context={"prompt": "Analyze this code"},
285 |         )
286 |         mock_client.get.return_value = context_after_1.model_dump_json()
287 | 
288 |         success = add_turn(thread_id, "user", "Yes, check error handling")
289 |         assert success is True
290 | 
291 |         success = add_turn(thread_id, "assistant", "Error handling reviewed")
292 |         assert success is True
293 | 
294 |         # REQUEST 3-5: Continue conversation (simulating independent cycles)
295 |         # After turn 3
296 |         context_after_3 = ThreadContext(
297 |             thread_id=thread_id,
298 |             created_at="2023-01-01T00:00:00Z",
299 |             last_updated_at="2023-01-01T00:03:00Z",
300 |             tool_name="chat",
301 |             turns=[
302 |                 ConversationTurn(
303 |                     role="assistant",
304 |                     content="Code analysis complete",
305 |                     timestamp="2023-01-01T00:00:30Z",
306 |                 ),
307 |                 ConversationTurn(role="user", content="Yes, check error handling", timestamp="2023-01-01T00:01:30Z"),
308 |                 ConversationTurn(
309 |                     role="assistant",
310 |                     content="Error handling reviewed",
311 |                     timestamp="2023-01-01T00:02:30Z",
312 |                 ),
313 |             ],
314 |             initial_context={"prompt": "Analyze this code"},
315 |         )
316 |         mock_client.get.return_value = context_after_3.model_dump_json()
317 | 
318 |         success = add_turn(thread_id, "user", "Yes, check tests")
319 |         assert success is True
320 | 
321 |         success = add_turn(thread_id, "assistant", "Test coverage analyzed")
322 |         assert success is True
323 | 
324 |         # REQUEST 6: Try to exceed MAX_CONVERSATION_TURNS limit - should fail
325 |         turns_at_limit = [
326 |             ConversationTurn(
327 |                 role="assistant" if i % 2 == 0 else "user", content=f"Turn {i + 1}", timestamp="2023-01-01T00:00:30Z"
328 |             )
329 |             for i in range(MAX_CONVERSATION_TURNS)
330 |         ]
331 | 
332 |         context_at_limit = ThreadContext(
333 |             thread_id=thread_id,
334 |             created_at="2023-01-01T00:00:00Z",
335 |             last_updated_at="2023-01-01T00:05:00Z",
336 |             tool_name="chat",
337 |             turns=turns_at_limit,
338 |             initial_context={"prompt": "Analyze this code"},
339 |         )
340 |         mock_client.get.return_value = context_at_limit.model_dump_json()
341 | 
342 |         # This should fail - conversation has reached limit
343 |         success = add_turn(thread_id, "user", "This should be rejected")
344 |         assert success is False  # CONVERSATION STOPS HERE
345 | 
346 |     @patch("utils.conversation_memory.get_storage")
347 |     def test_invalid_continuation_id_error(self, mock_storage):
348 |         """Test that invalid continuation IDs raise proper error for restart"""
349 |         from server import reconstruct_thread_context
350 | 
351 |         mock_client = Mock()
352 |         mock_storage.return_value = mock_client
353 |         mock_client.get.return_value = None  # Thread not found
354 | 
355 |         arguments = {"continuation_id": "invalid-uuid-12345", "prompt": "Continue conversation"}
356 | 
357 |         # Should raise ValueError asking to restart
358 |         with pytest.raises(ValueError) as exc_info:
359 |             import asyncio
360 | 
361 |             asyncio.run(reconstruct_thread_context(arguments))
362 | 
363 |         error_msg = str(exc_info.value)
364 |         assert "Conversation thread 'invalid-uuid-12345' was not found or has expired" in error_msg
365 |         assert (
366 |             "Please restart the conversation by providing your full question/prompt without the continuation_id"
367 |             in error_msg
368 |         )
369 | 
370 |     @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
371 |     def test_dynamic_max_turns_configuration(self):
372 |         """Test that all functions respect MAX_CONVERSATION_TURNS configuration"""
373 |         from providers.registry import ModelProviderRegistry
374 | 
375 |         ModelProviderRegistry.clear_cache()
376 | 
377 |         # This test ensures if we change MAX_CONVERSATION_TURNS, everything updates
378 | 
379 |         # Test with different max values by patching the constant
380 |         test_values = [3, 7, 10]
381 | 
382 |         for test_max in test_values:
383 |             # Create turns up to the test limit
384 |             turns = [
385 |                 ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
386 |                 for i in range(test_max)
387 |             ]
388 | 
389 |             # Test history building respects the limit
390 |             test_uuid = "12345678-1234-1234-1234-123456789012"
391 |             context = ThreadContext(
392 |                 thread_id=test_uuid,
393 |                 created_at="2023-01-01T00:00:00Z",
394 |                 last_updated_at="2023-01-01T00:00:00Z",
395 |                 tool_name="chat",
396 |                 turns=turns,
397 |                 initial_context={},
398 |             )
399 | 
400 |             history, tokens = build_conversation_history(context, model_context=None)
401 |             expected_turn_text = f"Turn {test_max}/{MAX_CONVERSATION_TURNS}"
402 |             assert expected_turn_text in history
403 | 
404 |     def test_follow_up_instructions_dynamic_behavior(self):
405 |         """Test that follow-up instructions change correctly based on turn count and max setting"""
406 |         # Test with default MAX_CONVERSATION_TURNS
407 |         max_turns = MAX_CONVERSATION_TURNS
408 | 
409 |         # Test early conversation (should allow follow-ups)
410 |         early_instructions = get_follow_up_instructions(0, max_turns)
411 |         assert "CONVERSATION CONTINUATION" in early_instructions
412 |         assert f"({max_turns - 1} exchanges remaining)" in early_instructions
413 |         assert "Feel free to ask clarifying questions" in early_instructions
414 | 
415 |         # Test mid conversation
416 |         mid_instructions = get_follow_up_instructions(2, max_turns)
417 |         assert "CONVERSATION CONTINUATION" in mid_instructions
418 |         assert f"({max_turns - 3} exchanges remaining)" in mid_instructions
419 |         assert "Feel free to ask clarifying questions" in mid_instructions
420 | 
421 |         # Test approaching limit (should stop follow-ups)
422 |         limit_instructions = get_follow_up_instructions(max_turns - 1, max_turns)
423 |         assert "Do NOT include any follow-up questions" in limit_instructions
424 |         assert "final exchange" in limit_instructions
425 | 
426 |         # Test at limit
427 |         at_limit_instructions = get_follow_up_instructions(max_turns, max_turns)
428 |         assert "Do NOT include any follow-up questions" in at_limit_instructions
429 | 
430 |         # Test with custom max_turns to ensure dynamic behavior
431 |         custom_max = 3
432 |         custom_early = get_follow_up_instructions(0, custom_max)
433 |         assert f"({custom_max - 1} exchanges remaining)" in custom_early
434 | 
435 |         custom_limit = get_follow_up_instructions(custom_max - 1, custom_max)
436 |         assert "Do NOT include any follow-up questions" in custom_limit
437 | 
438 |     def test_follow_up_instructions_defaults_to_config(self):
439 |         """Test that follow-up instructions use MAX_CONVERSATION_TURNS when max_turns not provided"""
440 |         instructions = get_follow_up_instructions(0)  # No max_turns parameter
441 |         expected_remaining = MAX_CONVERSATION_TURNS - 1
442 |         assert f"({expected_remaining} exchanges remaining)" in instructions
443 | 
444 |     @patch("utils.conversation_memory.get_storage")
445 |     def test_complete_conversation_with_dynamic_turns(self, mock_storage):
446 |         """Test complete conversation respecting MAX_CONVERSATION_TURNS dynamically"""
447 |         mock_client = Mock()
448 |         mock_storage.return_value = mock_client
449 | 
450 |         thread_id = create_thread("chat", {"prompt": "Start conversation"})
451 | 
452 |         # Simulate conversation up to MAX_CONVERSATION_TURNS - 1
453 |         for turn_num in range(MAX_CONVERSATION_TURNS - 1):
454 |             # Mock context with current turns
455 |             turns = [
456 |                 ConversationTurn(
457 |                     role="user" if i % 2 == 0 else "assistant",
458 |                     content=f"Turn {i + 1}",
459 |                     timestamp="2023-01-01T00:00:00Z",
460 |                 )
461 |                 for i in range(turn_num)
462 |             ]
463 | 
464 |             context = ThreadContext(
465 |                 thread_id=thread_id,
466 |                 created_at="2023-01-01T00:00:00Z",
467 |                 last_updated_at="2023-01-01T00:00:00Z",
468 |                 tool_name="chat",
469 |                 turns=turns,
470 |                 initial_context={"prompt": "Start conversation"},
471 |             )
472 |             mock_client.get.return_value = context.model_dump_json()
473 | 
474 |             # Should succeed
475 |             success = add_turn(thread_id, "user", f"User turn {turn_num + 1}")
476 |             assert success is True, f"Turn {turn_num + 1} should succeed"
477 | 
478 |         # Now we should be at the limit - create final context
479 |         final_turns = [
480 |             ConversationTurn(
481 |                 role="user" if i % 2 == 0 else "assistant", content=f"Turn {i + 1}", timestamp="2023-01-01T00:00:00Z"
482 |             )
483 |             for i in range(MAX_CONVERSATION_TURNS)
484 |         ]
485 | 
486 |         final_context = ThreadContext(
487 |             thread_id=thread_id,
488 |             created_at="2023-01-01T00:00:00Z",
489 |             last_updated_at="2023-01-01T00:00:00Z",
490 |             tool_name="chat",
491 |             turns=final_turns,
492 |             initial_context={"prompt": "Start conversation"},
493 |         )
494 |         mock_client.get.return_value = final_context.model_dump_json()
495 | 
496 |         # This should fail - at the limit
497 |         success = add_turn(thread_id, "user", "This should fail")
498 |         assert success is False, f"Turn {MAX_CONVERSATION_TURNS + 1} should fail"
499 | 
500 |     @patch("utils.conversation_memory.get_storage")
501 |     @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
502 |     def test_conversation_with_files_and_context_preservation(self, mock_storage):
503 |         """Test complete conversation flow with file tracking and context preservation"""
504 |         from providers.registry import ModelProviderRegistry
505 | 
506 |         ModelProviderRegistry.clear_cache()
507 | 
508 |         mock_client = Mock()
509 |         mock_storage.return_value = mock_client
510 | 
511 |         # Start conversation with files using a simple tool
512 |         thread_id = create_thread("chat", {"prompt": "Analyze this codebase", "absolute_file_paths": ["/project/src/"]})
513 | 
514 |         # Turn 1: Claude provides context with multiple files
515 |         initial_context = ThreadContext(
516 |             thread_id=thread_id,
517 |             created_at="2023-01-01T00:00:00Z",
518 |             last_updated_at="2023-01-01T00:00:00Z",
519 |             tool_name="chat",
520 |             turns=[],
521 |             initial_context={
522 |                 "prompt": "Analyze this codebase",
523 |                 "absolute_file_paths": ["/project/src/"],
524 |             },
525 |         )
526 |         mock_client.get.return_value = initial_context.model_dump_json()
527 | 
528 |         # Add Gemini's response
529 |         success = add_turn(
530 |             thread_id,
531 |             "assistant",
532 |             "I've analyzed your codebase structure.",
533 |             files=["/project/src/main.py", "/project/src/utils.py"],
534 |             tool_name="analyze",
535 |             model_name="gemini-2.5-flash",
536 |             model_provider="google",
537 |         )
538 |         assert success is True
539 | 
540 |         # Turn 2: Claude responds with different files
541 |         context_turn_1 = ThreadContext(
542 |             thread_id=thread_id,
543 |             created_at="2023-01-01T00:00:00Z",
544 |             last_updated_at="2023-01-01T00:01:00Z",
545 |             tool_name="analyze",
546 |             turns=[
547 |                 ConversationTurn(
548 |                     role="assistant",
549 |                     content="I've analyzed your codebase structure.",
550 |                     timestamp="2023-01-01T00:00:30Z",
551 |                     files=["/project/src/main.py", "/project/src/utils.py"],
552 |                     tool_name="analyze",
553 |                     model_name="gemini-2.5-flash",
554 |                     model_provider="google",
555 |                 )
556 |             ],
557 |             initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
558 |         )
559 |         mock_client.get.return_value = context_turn_1.model_dump_json()
560 | 
561 |         # User responds with test files
562 |         success = add_turn(
563 |             thread_id, "user", "Yes, check the test coverage", files=["/project/tests/", "/project/test_main.py"]
564 |         )
565 |         assert success is True
566 | 
567 |         # Turn 3: Gemini analyzes tests
568 |         context_turn_2 = ThreadContext(
569 |             thread_id=thread_id,
570 |             created_at="2023-01-01T00:00:00Z",
571 |             last_updated_at="2023-01-01T00:02:00Z",
572 |             tool_name="analyze",
573 |             turns=[
574 |                 ConversationTurn(
575 |                     role="assistant",
576 |                     content="I've analyzed your codebase structure.",
577 |                     timestamp="2023-01-01T00:00:30Z",
578 |                     files=["/project/src/main.py", "/project/src/utils.py"],
579 |                     tool_name="analyze",
580 |                 ),
581 |                 ConversationTurn(
582 |                     role="user",
583 |                     content="Yes, check the test coverage",
584 |                     timestamp="2023-01-01T00:01:30Z",
585 |                     files=["/project/tests/", "/project/test_main.py"],
586 |                 ),
587 |             ],
588 |             initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
589 |         )
590 |         mock_client.get.return_value = context_turn_2.model_dump_json()
591 | 
592 |         success = add_turn(
593 |             thread_id,
594 |             "assistant",
595 |             "Test coverage analysis complete. Coverage is 85%.",
596 |             files=["/project/tests/test_utils.py", "/project/coverage.html"],
597 |             tool_name="analyze",
598 |             model_name="gemini-2.5-flash",
599 |             model_provider="google",
600 |         )
601 |         assert success is True
602 | 
603 |         # Build conversation history and verify chronological file preservation
604 |         final_context = ThreadContext(
605 |             thread_id=thread_id,
606 |             created_at="2023-01-01T00:00:00Z",
607 |             last_updated_at="2023-01-01T00:03:00Z",
608 |             tool_name="analyze",
609 |             turns=[
610 |                 ConversationTurn(
611 |                     role="assistant",
612 |                     content="I've analyzed your codebase structure.",
613 |                     timestamp="2023-01-01T00:00:30Z",
614 |                     files=["/project/src/main.py", "/project/src/utils.py"],
615 |                     tool_name="analyze",
616 |                     model_name="gemini-2.5-flash",
617 |                     model_provider="google",
618 |                 ),
619 |                 ConversationTurn(
620 |                     role="user",
621 |                     content="Yes, check the test coverage",
622 |                     timestamp="2023-01-01T00:01:30Z",
623 |                     files=["/project/tests/", "/project/test_main.py"],
624 |                 ),
625 |                 ConversationTurn(
626 |                     role="assistant",
627 |                     content="Test coverage analysis complete. Coverage is 85%.",
628 |                     timestamp="2023-01-01T00:02:30Z",
629 |                     files=["/project/tests/test_utils.py", "/project/coverage.html"],
630 |                     tool_name="analyze",
631 |                     model_name="gemini-2.5-flash",
632 |                     model_provider="google",
633 |                 ),
634 |             ],
635 |             initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
636 |         )
637 | 
638 |         history, tokens = build_conversation_history(final_context)
639 | 
640 |         # Verify chronological order and speaker identification
641 |         assert "--- Turn 1 (gemini-2.5-flash using analyze via google) ---" in history
642 |         assert "--- Turn 2 (Agent) ---" in history
643 |         assert "--- Turn 3 (gemini-2.5-flash using analyze via google) ---" in history
644 | 
645 |         # Verify all files are preserved in chronological order
646 |         turn_1_files = "Files used in this turn: /project/src/main.py, /project/src/utils.py"
647 |         turn_2_files = "Files used in this turn: /project/tests/, /project/test_main.py"
648 |         turn_3_files = "Files used in this turn: /project/tests/test_utils.py, /project/coverage.html"
649 | 
650 |         assert turn_1_files in history
651 |         assert turn_2_files in history
652 |         assert turn_3_files in history
653 | 
654 |         # Verify content
655 |         assert "I've analyzed your codebase structure." in history
656 |         assert "Yes, check the test coverage" in history
657 |         assert "Test coverage analysis complete. Coverage is 85%." in history
658 | 
659 |         # Verify chronological ordering (turn 1 appears before turn 2, etc.)
660 |         turn_1_pos = history.find("--- Turn 1 (gemini-2.5-flash using analyze via google) ---")
661 |         turn_2_pos = history.find("--- Turn 2 (Agent) ---")
662 |         turn_3_pos = history.find("--- Turn 3 (gemini-2.5-flash using analyze via google) ---")
663 | 
664 |         assert turn_1_pos < turn_2_pos < turn_3_pos
665 | 
666 |     @patch("utils.conversation_memory.get_storage")
667 |     def test_stateless_request_isolation(self, mock_storage):
668 |         """Test that each request cycle is independent but shares context via Redis"""
669 |         mock_client = Mock()
670 |         mock_storage.return_value = mock_client
671 | 
672 |         # Simulate two different "processes" accessing same thread
673 |         thread_id = "12345678-1234-1234-1234-123456789012"
674 | 
675 |         # Process 1: Creates thread
676 |         initial_context = ThreadContext(
677 |             thread_id=thread_id,
678 |             created_at="2023-01-01T00:00:00Z",
679 |             last_updated_at="2023-01-01T00:00:00Z",
680 |             tool_name="thinkdeep",
681 |             turns=[],
682 |             initial_context={"prompt": "Think about architecture"},
683 |         )
684 |         mock_client.get.return_value = initial_context.model_dump_json()
685 | 
686 |         success = add_turn(thread_id, "assistant", "Architecture analysis")
687 |         assert success is True
688 | 
689 |         # Process 2: Different "request cycle" accesses same thread
690 |         context_from_redis = ThreadContext(
691 |             thread_id=thread_id,
692 |             created_at="2023-01-01T00:00:00Z",
693 |             last_updated_at="2023-01-01T00:01:00Z",
694 |             tool_name="thinkdeep",
695 |             turns=[
696 |                 ConversationTurn(
697 |                     role="assistant",
698 |                     content="Architecture analysis",
699 |                     timestamp="2023-01-01T00:00:30Z",
700 |                 )
701 |             ],
702 |             initial_context={"prompt": "Think about architecture"},
703 |         )
704 |         mock_client.get.return_value = context_from_redis.model_dump_json()
705 | 
706 |         # Verify context continuity across "processes"
707 |         retrieved_context = get_thread(thread_id)
708 |         assert retrieved_context is not None
709 |         assert len(retrieved_context.turns) == 1
710 | 
711 |     @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
712 |     def test_token_limit_optimization_in_conversation_history(self):
713 |         """Test that build_conversation_history efficiently handles token limits"""
714 |         import os
715 |         import tempfile
716 | 
717 |         from providers.registry import ModelProviderRegistry
718 | 
719 |         ModelProviderRegistry.clear_cache()
720 | 
721 |         from utils.conversation_memory import build_conversation_history
722 | 
723 |         # Create test files with known content sizes
724 |         with tempfile.TemporaryDirectory() as temp_dir:
725 |             # Create small and large test files
726 |             small_file = os.path.join(temp_dir, "small.py")
727 |             large_file = os.path.join(temp_dir, "large.py")
728 | 
729 |             small_content = "# Small file\nprint('hello')\n"
730 |             large_content = "# Large file\n" + "x = 1\n" * 10000  # Very large file
731 | 
732 |             with open(small_file, "w") as f:
733 |                 f.write(small_content)
734 |             with open(large_file, "w") as f:
735 |                 f.write(large_content)
736 | 
737 |             # Create context with files that would exceed token limit
738 |             context = ThreadContext(
739 |                 thread_id="test-token-limit",
740 |                 created_at="2023-01-01T00:00:00Z",
741 |                 last_updated_at="2023-01-01T00:01:00Z",
742 |                 tool_name="analyze",
743 |                 turns=[
744 |                     ConversationTurn(
745 |                         role="user",
746 |                         content="Analyze these files",
747 |                         timestamp="2023-01-01T00:00:30Z",
748 |                         files=[small_file, large_file],  # Large file should be truncated
749 |                     )
750 |                 ],
751 |                 initial_context={"prompt": "Analyze code"},
752 |             )
753 | 
754 |             # Build conversation history (should handle token limits gracefully)
755 |             history, tokens = build_conversation_history(context, model_context=None)
756 | 
757 |             # Verify the history was built successfully
758 |             assert "=== CONVERSATION HISTORY" in history
759 |             assert "=== FILES REFERENCED IN THIS CONVERSATION ===" in history
760 | 
761 |             # The small file should be included, but large file might be truncated
762 |             # At minimum, verify no crashes and history is generated
763 |             assert len(history) > 0
764 | 
765 |             # If truncation occurred, there should be a note about it
766 |             if "additional file(s) were truncated due to token limit" in history:
767 |                 assert small_file in history or large_file in history
768 |             else:
769 |                 # Both files fit within limit
770 |                 assert small_file in history
771 |                 assert large_file in history
772 | 
773 | 
774 | if __name__ == "__main__":
775 |     pytest.main([__file__])
776 | 
```

--------------------------------------------------------------------------------
/tests/test_large_prompt_handling.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for large prompt handling functionality.
  3 | 
  4 | This test module verifies that the MCP server correctly handles
  5 | prompts that exceed the 50,000 character limit by requesting
  6 | Claude to save them to a file and resend.
  7 | """
  8 | 
  9 | import json
 10 | import os
 11 | import shutil
 12 | import tempfile
 13 | from unittest.mock import MagicMock, patch
 14 | 
 15 | import pytest
 16 | 
 17 | from config import MCP_PROMPT_SIZE_LIMIT
 18 | from tools.chat import ChatTool
 19 | from tools.codereview import CodeReviewTool
 20 | from tools.shared.exceptions import ToolExecutionError
 21 | 
 22 | # from tools.debug import DebugIssueTool  # Commented out - debug tool refactored
 23 | 
 24 | 
 25 | class TestLargePromptHandling:
 26 |     """Test suite for large prompt handling across all tools."""
 27 | 
 28 |     def teardown_method(self):
 29 |         """Clean up after each test to prevent state pollution."""
 30 |         # Clear provider registry singleton
 31 |         from providers.registry import ModelProviderRegistry
 32 | 
 33 |         ModelProviderRegistry._instance = None
 34 | 
 35 |     @pytest.fixture
 36 |     def large_prompt(self):
 37 |         """Create a prompt larger than MCP_PROMPT_SIZE_LIMIT characters."""
 38 |         return "x" * (MCP_PROMPT_SIZE_LIMIT + 1000)
 39 | 
 40 |     @pytest.fixture
 41 |     def normal_prompt(self):
 42 |         """Create a normal-sized prompt."""
 43 |         return "This is a normal prompt that should work fine."
 44 | 
 45 |     @pytest.fixture
 46 |     def temp_prompt_file(self, large_prompt):
 47 |         """Create a temporary prompt.txt file with large content."""
 48 |         # Create temp file with exact name "prompt.txt"
 49 |         temp_dir = tempfile.mkdtemp()
 50 |         file_path = os.path.join(temp_dir, "prompt.txt")
 51 |         with open(file_path, "w") as f:
 52 |             f.write(large_prompt)
 53 |         return file_path
 54 | 
 55 |     @pytest.mark.asyncio
 56 |     async def test_chat_large_prompt_detection(self, large_prompt):
 57 |         """Test that chat tool detects large prompts."""
 58 |         tool = ChatTool()
 59 |         temp_dir = tempfile.mkdtemp()
 60 |         temp_dir = tempfile.mkdtemp()
 61 |         try:
 62 |             with pytest.raises(ToolExecutionError) as exc_info:
 63 |                 await tool.execute({"prompt": large_prompt, "working_directory_absolute_path": temp_dir})
 64 |         finally:
 65 |             shutil.rmtree(temp_dir, ignore_errors=True)
 66 | 
 67 |         output = json.loads(exc_info.value.payload)
 68 |         assert output["status"] == "resend_prompt"
 69 |         assert f"{MCP_PROMPT_SIZE_LIMIT:,} characters" in output["content"]
 70 |         # The prompt size should match the user input since we check at MCP transport boundary before adding internal content
 71 |         assert output["metadata"]["prompt_size"] == len(large_prompt)
 72 |         assert output["metadata"]["limit"] == MCP_PROMPT_SIZE_LIMIT
 73 | 
 74 |     @pytest.mark.asyncio
 75 |     async def test_chat_normal_prompt_works(self, normal_prompt):
 76 |         """Test that chat tool works normally with regular prompts."""
 77 |         tool = ChatTool()
 78 | 
 79 |         temp_dir = tempfile.mkdtemp()
 80 | 
 81 |         # This test runs in the test environment which uses dummy keys
 82 |         # The chat tool will return an error for dummy keys, which is expected
 83 |         try:
 84 |             try:
 85 |                 result = await tool.execute(
 86 |                     {"prompt": normal_prompt, "model": "gemini-2.5-flash", "working_directory_absolute_path": temp_dir}
 87 |                 )
 88 |             except ToolExecutionError as exc:
 89 |                 output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
 90 |             else:
 91 |                 assert len(result) == 1
 92 |                 output = json.loads(result[0].text)
 93 |         finally:
 94 |             shutil.rmtree(temp_dir, ignore_errors=True)
 95 | 
 96 |         # Whether provider succeeds or fails, we should not hit the resend_prompt branch
 97 |         assert output["status"] != "resend_prompt"
 98 | 
 99 |     @pytest.mark.asyncio
100 |     async def test_chat_prompt_file_handling(self):
101 |         """Test that chat tool correctly handles prompt.txt files with reasonable size."""
102 |         tool = ChatTool()
103 |         # Use a smaller prompt that won't exceed limit when combined with system prompt
104 |         reasonable_prompt = "This is a reasonable sized prompt for testing prompt.txt file handling."
105 | 
106 |         # Create a temp file with reasonable content
107 |         temp_dir = tempfile.mkdtemp()
108 |         temp_prompt_file = os.path.join(temp_dir, "prompt.txt")
109 |         with open(temp_prompt_file, "w") as f:
110 |             f.write(reasonable_prompt)
111 | 
112 |         try:
113 |             try:
114 |                 result = await tool.execute(
115 |                     {
116 |                         "prompt": "",
117 |                         "absolute_file_paths": [temp_prompt_file],
118 |                         "model": "gemini-2.5-flash",
119 |                         "working_directory_absolute_path": temp_dir,
120 |                     }
121 |                 )
122 |             except ToolExecutionError as exc:
123 |                 output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
124 |             else:
125 |                 assert len(result) == 1
126 |                 output = json.loads(result[0].text)
127 | 
128 |             # The test may fail with dummy API keys, which is expected behavior.
129 |             # We're mainly testing that the tool processes prompt files correctly without size errors.
130 |             assert output["status"] != "resend_prompt"
131 |         finally:
132 |             # Cleanup
133 |             shutil.rmtree(temp_dir)
134 | 
135 |     @pytest.mark.asyncio
136 |     async def test_codereview_large_focus(self, large_prompt):
137 |         """Test that codereview tool detects large focus_on field using real integration testing."""
138 |         import importlib
139 |         import os
140 | 
141 |         tool = CodeReviewTool()
142 | 
143 |         # Save original environment
144 |         original_env = {
145 |             "OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY"),
146 |             "DEFAULT_MODEL": os.environ.get("DEFAULT_MODEL"),
147 |         }
148 | 
149 |         try:
150 |             # Set up environment for real provider resolution
151 |             os.environ["OPENAI_API_KEY"] = "sk-test-key-large-focus-test-not-real"
152 |             os.environ["DEFAULT_MODEL"] = "o3-mini"
153 | 
154 |             # Clear other provider keys to isolate to OpenAI
155 |             for key in ["GEMINI_API_KEY", "XAI_API_KEY", "OPENROUTER_API_KEY"]:
156 |                 os.environ.pop(key, None)
157 | 
158 |             # Reload config and clear registry
159 |             import config
160 | 
161 |             importlib.reload(config)
162 |             from providers.registry import ModelProviderRegistry
163 | 
164 |             ModelProviderRegistry._instance = None
165 | 
166 |             # Test with real provider resolution
167 |             try:
168 |                 args = {
169 |                     "step": "initial review setup",
170 |                     "step_number": 1,
171 |                     "total_steps": 1,
172 |                     "next_step_required": False,
173 |                     "findings": "Initial testing",
174 |                     "relevant_files": ["/some/file.py"],
175 |                     "files_checked": ["/some/file.py"],
176 |                     "focus_on": large_prompt,
177 |                     "prompt": "Test code review for validation purposes",
178 |                     "model": "o3-mini",
179 |                 }
180 | 
181 |                 try:
182 |                     result = await tool.execute(args)
183 |                 except ToolExecutionError as exc:
184 |                     output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
185 |                 else:
186 |                     assert len(result) == 1
187 |                     output = json.loads(result[0].text)
188 | 
189 |                 # The large focus_on may trigger the resend_prompt guard before provider access.
190 |                 # When the guard does not trigger, auto-mode falls back to provider selection and
191 |                 # returns an error about the unavailable model. Both behaviors are acceptable for this test.
192 |                 if output.get("status") == "resend_prompt":
193 |                     assert output["metadata"]["prompt_size"] == len(large_prompt)
194 |                 else:
195 |                     assert output.get("status") == "error"
196 |                     assert "Model" in output.get("content", "")
197 | 
198 |             except Exception as e:
199 |                 # If we get an unexpected exception, ensure it's not a mock artifact
200 |                 error_msg = str(e)
201 |                 assert "MagicMock" not in error_msg
202 |                 assert "'<' not supported between instances" not in error_msg
203 | 
204 |                 # Should be a real provider error (API, authentication, etc.)
205 |                 assert any(
206 |                     phrase in error_msg
207 |                     for phrase in ["API", "key", "authentication", "provider", "network", "connection"]
208 |                 )
209 | 
210 |         finally:
211 |             # Restore environment
212 |             for key, value in original_env.items():
213 |                 if value is not None:
214 |                     os.environ[key] = value
215 |                 else:
216 |                     os.environ.pop(key, None)
217 | 
218 |             # Reload config and clear registry
219 |             importlib.reload(config)
220 |             ModelProviderRegistry._instance = None
221 | 
222 |     # NOTE: Precommit test has been removed because the precommit tool has been
223 |     # refactored to use a workflow-based pattern instead of accepting simple prompt/path fields.
224 |     # The new precommit tool requires workflow fields like: step, step_number, total_steps,
225 |     # next_step_required, findings, etc. See simulator_tests/test_precommitworkflow_validation.py
226 |     # for comprehensive workflow testing including large prompt handling.
227 | 
228 |     # NOTE: Debug tool tests have been commented out because the debug tool has been
229 |     # refactored to use a self-investigation pattern instead of accepting a prompt field.
230 |     # The new debug tool requires fields like: step, step_number, total_steps, next_step_required, findings
231 |     # and doesn't have the "resend_prompt" functionality for large prompts.
232 | 
233 |     # @pytest.mark.asyncio
234 |     # async def test_debug_large_error_description(self, large_prompt):
235 |     #     """Test that debug tool detects large error_description."""
236 |     #     tool = DebugIssueTool()
237 |     #     result = await tool.execute({"prompt": large_prompt})
238 |     #
239 |     #     assert len(result) == 1
240 |     #     output = json.loads(result[0].text)
241 |     #     assert output["status"] == "resend_prompt"
242 | 
243 |     # @pytest.mark.asyncio
244 |     # async def test_debug_large_error_context(self, large_prompt, normal_prompt):
245 |     #     """Test that debug tool detects large error_context."""
246 |     #     tool = DebugIssueTool()
247 |     #     result = await tool.execute({"prompt": normal_prompt, "error_context": large_prompt})
248 |     #
249 |     #     assert len(result) == 1
250 |     #     output = json.loads(result[0].text)
251 |     #     assert output["status"] == "resend_prompt"
252 | 
253 |     # Removed: test_analyze_large_question - workflow tool handles large prompts differently
254 | 
255 |     @pytest.mark.asyncio
256 |     async def test_multiple_files_with_prompt_txt(self, temp_prompt_file):
257 |         """Test handling of prompt.txt alongside other files."""
258 |         tool = ChatTool()
259 |         other_file = "/some/other/file.py"
260 | 
261 |         with (
262 |             patch("utils.model_context.ModelContext") as mock_model_context_cls,
263 |             patch.object(tool, "handle_prompt_file") as mock_handle_prompt,
264 |             patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files,
265 |         ):
266 |             mock_provider = MagicMock()
267 |             mock_provider.get_provider_type.return_value = MagicMock(value="google")
268 |             mock_provider.generate_content.return_value = MagicMock(
269 |                 content="Success",
270 |                 usage={"input_tokens": 10, "output_tokens": 20, "total_tokens": 30},
271 |                 model_name="gemini-2.5-flash",
272 |                 metadata={"finish_reason": "STOP"},
273 |             )
274 | 
275 |             from utils.model_context import TokenAllocation
276 | 
277 |             mock_model_context = MagicMock()
278 |             mock_model_context.model_name = "gemini-2.5-flash"
279 |             mock_model_context.provider = mock_provider
280 |             mock_model_context.capabilities = MagicMock(supports_extended_thinking=False)
281 |             mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
282 |                 total_tokens=1_000_000,
283 |                 content_tokens=800_000,
284 |                 response_tokens=200_000,
285 |                 file_tokens=320_000,
286 |                 history_tokens=320_000,
287 |             )
288 |             mock_model_context_cls.return_value = mock_model_context
289 | 
290 |             # Return the prompt content and updated files list (without prompt.txt)
291 |             mock_handle_prompt.return_value = ("Large prompt content from file", [other_file])
292 | 
293 |             # Mock the centralized file preparation method
294 |             mock_prepare_files.return_value = ("File content", [other_file])
295 | 
296 |             # Use a small prompt to avoid triggering size limit
297 |             await tool.execute(
298 |                 {
299 |                     "prompt": "Test prompt",
300 |                     "absolute_file_paths": [temp_prompt_file, other_file],
301 |                     "working_directory_absolute_path": os.path.dirname(temp_prompt_file),
302 |                 }
303 |             )
304 | 
305 |             # Verify handle_prompt_file was called with the original files list
306 |             mock_handle_prompt.assert_called_once_with([temp_prompt_file, other_file])
307 | 
308 |             # Verify _prepare_file_content_for_prompt was called with the updated files list (without prompt.txt)
309 |             mock_prepare_files.assert_called_once()
310 |             files_arg = mock_prepare_files.call_args[0][0]
311 |             assert len(files_arg) == 1
312 |             assert files_arg[0] == other_file
313 | 
314 |         temp_dir = os.path.dirname(temp_prompt_file)
315 |         shutil.rmtree(temp_dir)
316 | 
317 |     @pytest.mark.asyncio
318 |     async def test_boundary_case_exactly_at_limit(self):
319 |         """Test prompt exactly at MCP_PROMPT_SIZE_LIMIT characters (should pass with the fix)."""
320 |         tool = ChatTool()
321 |         exact_prompt = "x" * MCP_PROMPT_SIZE_LIMIT
322 | 
323 |         # Mock the model provider to avoid real API calls
324 |         with patch.object(tool, "get_model_provider") as mock_get_provider:
325 |             mock_provider = MagicMock()
326 |             mock_provider.get_provider_type.return_value = MagicMock(value="google")
327 |             mock_provider.get_capabilities.return_value = MagicMock(supports_extended_thinking=False)
328 |             mock_provider.generate_content.return_value = MagicMock(
329 |                 content="Response to the large prompt",
330 |                 usage={"input_tokens": 12000, "output_tokens": 10, "total_tokens": 12010},
331 |                 model_name="gemini-2.5-flash",
332 |                 metadata={"finish_reason": "STOP"},
333 |             )
334 |             mock_get_provider.return_value = mock_provider
335 | 
336 |             # With the fix, this should now pass because we check at MCP transport boundary before adding internal content
337 |             temp_dir = tempfile.mkdtemp()
338 |             try:
339 |                 try:
340 |                     result = await tool.execute({"prompt": exact_prompt, "working_directory_absolute_path": temp_dir})
341 |                 except ToolExecutionError as exc:
342 |                     output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
343 |                 else:
344 |                     output = json.loads(result[0].text)
345 |             finally:
346 |                 shutil.rmtree(temp_dir, ignore_errors=True)
347 |             assert output["status"] != "resend_prompt"
348 | 
349 |     @pytest.mark.asyncio
350 |     async def test_boundary_case_just_over_limit(self):
351 |         """Test prompt just over MCP_PROMPT_SIZE_LIMIT characters (should trigger file request)."""
352 |         tool = ChatTool()
353 |         over_prompt = "x" * (MCP_PROMPT_SIZE_LIMIT + 1)
354 | 
355 |         temp_dir = tempfile.mkdtemp()
356 |         try:
357 |             try:
358 |                 result = await tool.execute({"prompt": over_prompt, "working_directory_absolute_path": temp_dir})
359 |             except ToolExecutionError as exc:
360 |                 output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
361 |             else:
362 |                 output = json.loads(result[0].text)
363 |         finally:
364 |             shutil.rmtree(temp_dir, ignore_errors=True)
365 |         assert output["status"] == "resend_prompt"
366 | 
367 |     @pytest.mark.asyncio
368 |     async def test_empty_prompt_no_file(self):
369 |         """Test empty prompt without prompt.txt file."""
370 |         tool = ChatTool()
371 | 
372 |         with patch.object(tool, "get_model_provider") as mock_get_provider:
373 |             mock_provider = MagicMock()
374 |             mock_provider.get_provider_type.return_value = MagicMock(value="google")
375 |             mock_provider.get_capabilities.return_value = MagicMock(supports_extended_thinking=False)
376 |             mock_provider.generate_content.return_value = MagicMock(
377 |                 content="Success",
378 |                 usage={"input_tokens": 10, "output_tokens": 20, "total_tokens": 30},
379 |                 model_name="gemini-2.5-flash",
380 |                 metadata={"finish_reason": "STOP"},
381 |             )
382 |             mock_get_provider.return_value = mock_provider
383 | 
384 |             temp_dir = tempfile.mkdtemp()
385 |             try:
386 |                 try:
387 |                     result = await tool.execute({"prompt": "", "working_directory_absolute_path": temp_dir})
388 |                 except ToolExecutionError as exc:
389 |                     output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
390 |                 else:
391 |                     output = json.loads(result[0].text)
392 |             finally:
393 |                 shutil.rmtree(temp_dir, ignore_errors=True)
394 |             assert output["status"] != "resend_prompt"
395 | 
396 |     @pytest.mark.asyncio
397 |     async def test_prompt_file_read_error(self):
398 |         """Test handling when prompt.txt can't be read."""
399 |         from tests.mock_helpers import create_mock_provider
400 | 
401 |         tool = ChatTool()
402 |         bad_file = "/nonexistent/prompt.txt"
403 | 
404 |         with (
405 |             patch.object(tool, "get_model_provider") as mock_get_provider,
406 |             patch("utils.model_context.ModelContext") as mock_model_context_class,
407 |         ):
408 | 
409 |             mock_provider = create_mock_provider(model_name="gemini-2.5-flash", context_window=1_048_576)
410 |             mock_provider.generate_content.return_value.content = "Success"
411 |             mock_get_provider.return_value = mock_provider
412 | 
413 |             # Mock ModelContext to avoid the comparison issue
414 |             from utils.model_context import TokenAllocation
415 | 
416 |             mock_model_context = MagicMock()
417 |             mock_model_context.model_name = "gemini-2.5-flash"
418 |             mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
419 |                 total_tokens=1_048_576,
420 |                 content_tokens=838_861,
421 |                 response_tokens=209_715,
422 |                 file_tokens=335_544,
423 |                 history_tokens=335_544,
424 |             )
425 |             mock_model_context_class.return_value = mock_model_context
426 | 
427 |             # Should continue with empty prompt when file can't be read
428 |             temp_dir = tempfile.mkdtemp()
429 |             try:
430 |                 try:
431 |                     result = await tool.execute(
432 |                         {"prompt": "", "absolute_file_paths": [bad_file], "working_directory_absolute_path": temp_dir}
433 |                     )
434 |                 except ToolExecutionError as exc:
435 |                     output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
436 |                 else:
437 |                     output = json.loads(result[0].text)
438 |             finally:
439 |                 shutil.rmtree(temp_dir, ignore_errors=True)
440 |             assert output["status"] != "resend_prompt"
441 | 
442 |     @pytest.mark.asyncio
443 |     async def test_large_file_context_does_not_trigger_mcp_prompt_limit(self, tmp_path):
444 |         """Large context files should not be blocked by MCP prompt limit enforcement."""
445 |         from tests.mock_helpers import create_mock_provider
446 |         from utils.model_context import TokenAllocation
447 | 
448 |         tool = ChatTool()
449 | 
450 |         # Create a file significantly larger than MCP_PROMPT_SIZE_LIMIT characters
451 |         large_content = "A" * (MCP_PROMPT_SIZE_LIMIT * 5)
452 |         large_file = tmp_path / "huge_context.txt"
453 |         large_file.write_text(large_content)
454 | 
455 |         mock_provider = create_mock_provider(model_name="flash")
456 | 
457 |         class DummyModelContext:
458 |             def __init__(self, provider):
459 |                 self.model_name = "flash"
460 |                 self._provider = provider
461 |                 self.capabilities = provider.get_capabilities("flash")
462 | 
463 |             @property
464 |             def provider(self):
465 |                 return self._provider
466 | 
467 |             def calculate_token_allocation(self):
468 |                 return TokenAllocation(
469 |                     total_tokens=1_048_576,
470 |                     content_tokens=838_861,
471 |                     response_tokens=209_715,
472 |                     file_tokens=335_544,
473 |                     history_tokens=335_544,
474 |                 )
475 | 
476 |         dummy_context = DummyModelContext(mock_provider)
477 | 
478 |         with patch.object(tool, "get_model_provider", return_value=mock_provider):
479 |             result = await tool.execute(
480 |                 {
481 |                     "prompt": "Summarize the design decisions",
482 |                     "absolute_file_paths": [str(large_file)],
483 |                     "model": "flash",
484 |                     "working_directory_absolute_path": str(tmp_path),
485 |                     "_model_context": dummy_context,
486 |                 }
487 |             )
488 | 
489 |         output = json.loads(result[0].text)
490 |         assert output["status"] != "resend_prompt"
491 | 
492 |     @pytest.mark.asyncio
493 |     async def test_mcp_boundary_with_large_internal_context(self):
494 |         """
495 |         Critical test: Ensure MCP_PROMPT_SIZE_LIMIT only applies to user input (MCP boundary),
496 |         NOT to internal context like conversation history, system prompts, or file content.
497 | 
498 |         This test verifies that even if our internal prompt (with system prompts, history, etc.)
499 |         exceeds MCP_PROMPT_SIZE_LIMIT, it should still work as long as the user's input is small.
500 |         """
501 | 
502 |         tool = ChatTool()
503 | 
504 |         # Small user input that should pass MCP boundary check
505 |         small_user_prompt = "What is the weather like?"
506 | 
507 |         # Mock a huge conversation history that would exceed MCP limits if incorrectly checked
508 |         huge_history = "x" * (MCP_PROMPT_SIZE_LIMIT * 2)  # 100K chars = way over 50K limit
509 | 
510 |         temp_dir = tempfile.mkdtemp()
511 |         original_prepare_prompt = tool.prepare_prompt
512 | 
513 |         try:
514 |             with (
515 |                 patch.object(tool, "get_model_provider") as mock_get_provider,
516 |                 patch("utils.model_context.ModelContext") as mock_model_context_class,
517 |             ):
518 |                 from tests.mock_helpers import create_mock_provider
519 |                 from utils.model_context import TokenAllocation
520 | 
521 |                 mock_provider = create_mock_provider(model_name="flash")
522 |                 mock_get_provider.return_value = mock_provider
523 | 
524 |                 mock_model_context = MagicMock()
525 |                 mock_model_context.model_name = "flash"
526 |                 mock_model_context.provider = mock_provider
527 |                 mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
528 |                     total_tokens=1_048_576,
529 |                     content_tokens=838_861,
530 |                     response_tokens=209_715,
531 |                     file_tokens=335_544,
532 |                     history_tokens=335_544,
533 |                 )
534 |                 mock_model_context_class.return_value = mock_model_context
535 | 
536 |                 async def mock_prepare_prompt(request):
537 |                     normal_prompt = await original_prepare_prompt(request)
538 |                     huge_internal_prompt = f"{normal_prompt}\n\n=== HUGE INTERNAL CONTEXT ===\n{huge_history}"
539 |                     assert len(huge_internal_prompt) > MCP_PROMPT_SIZE_LIMIT
540 |                     return huge_internal_prompt
541 | 
542 |                 tool.prepare_prompt = mock_prepare_prompt
543 | 
544 |                 result = await tool.execute(
545 |                     {"prompt": small_user_prompt, "model": "flash", "working_directory_absolute_path": temp_dir}
546 |                 )
547 |                 output = json.loads(result[0].text)
548 | 
549 |                 assert output["status"] != "resend_prompt"
550 | 
551 |                 mock_provider.generate_content.assert_called_once()
552 |                 call_kwargs = mock_provider.generate_content.call_args[1]
553 |                 actual_prompt = call_kwargs.get("prompt")
554 | 
555 |                 assert len(actual_prompt) > MCP_PROMPT_SIZE_LIMIT
556 |                 assert huge_history in actual_prompt
557 |                 assert small_user_prompt in actual_prompt
558 |         finally:
559 |             tool.prepare_prompt = original_prepare_prompt
560 |             shutil.rmtree(temp_dir, ignore_errors=True)
561 | 
562 |     @pytest.mark.asyncio
563 |     async def test_mcp_boundary_vs_internal_processing_distinction(self):
564 |         """
565 |         Test that clearly demonstrates the distinction between:
566 |         1. MCP transport boundary (user input - SHOULD be limited)
567 |         2. Internal processing (system prompts, files, history - should NOT be limited)
568 |         """
569 |         tool = ChatTool()
570 | 
571 |         # Test case 1: Large user input should fail at MCP boundary
572 |         large_user_input = "x" * (MCP_PROMPT_SIZE_LIMIT + 1000)
573 |         temp_dir = tempfile.mkdtemp()
574 |         try:
575 |             try:
576 |                 result = await tool.execute(
577 |                     {"prompt": large_user_input, "model": "flash", "working_directory_absolute_path": temp_dir}
578 |                 )
579 |             except ToolExecutionError as exc:
580 |                 output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
581 |             else:
582 |                 output = json.loads(result[0].text)
583 | 
584 |             assert output["status"] == "resend_prompt"  # Should fail
585 |             assert "too large for MCP's token limits" in output["content"]
586 | 
587 |             # Test case 2: Small user input should succeed even with huge internal processing
588 |             small_user_input = "Hello"
589 | 
590 |             try:
591 |                 result = await tool.execute(
592 |                     {
593 |                         "prompt": small_user_input,
594 |                         "model": "gemini-2.5-flash",
595 |                         "working_directory_absolute_path": temp_dir,
596 |                     }
597 |                 )
598 |             except ToolExecutionError as exc:
599 |                 output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
600 |             else:
601 |                 output = json.loads(result[0].text)
602 | 
603 |             # The test will fail with dummy API keys, which is expected behavior
604 |             # We're mainly testing that the tool processes small prompts correctly without size errors
605 |             assert output["status"] != "resend_prompt"
606 |         finally:
607 |             shutil.rmtree(temp_dir, ignore_errors=True)
608 | 
609 |     @pytest.mark.asyncio
610 |     async def test_continuation_with_huge_conversation_history(self):
611 |         """
612 |         Test that continuation calls with huge conversation history work correctly.
613 |         This simulates the exact scenario where conversation history builds up and exceeds
614 |         MCP_PROMPT_SIZE_LIMIT but should still work since history is internal processing.
615 |         """
616 |         tool = ChatTool()
617 | 
618 |         # Small user input for continuation
619 |         small_continuation_prompt = "Continue the discussion"
620 | 
621 |         # Mock huge conversation history (simulates many turns of conversation)
622 |         # Calculate repetitions needed to exceed MCP_PROMPT_SIZE_LIMIT
623 |         base_text = "=== CONVERSATION HISTORY ===\n"
624 |         repeat_text = "Previous message content\n"
625 |         # Add buffer to ensure we exceed the limit
626 |         target_size = MCP_PROMPT_SIZE_LIMIT + 1000
627 |         available_space = target_size - len(base_text)
628 |         repetitions_needed = (available_space // len(repeat_text)) + 1
629 | 
630 |         huge_conversation_history = base_text + (repeat_text * repetitions_needed)
631 | 
632 |         # Ensure the history exceeds MCP limits
633 |         assert len(huge_conversation_history) > MCP_PROMPT_SIZE_LIMIT
634 | 
635 |         temp_dir = tempfile.mkdtemp()
636 | 
637 |         with (
638 |             patch.object(tool, "get_model_provider") as mock_get_provider,
639 |             patch("utils.model_context.ModelContext") as mock_model_context_class,
640 |         ):
641 |             from tests.mock_helpers import create_mock_provider
642 | 
643 |             mock_provider = create_mock_provider(model_name="flash")
644 |             mock_provider.generate_content.return_value.content = "Continuing our conversation..."
645 |             mock_get_provider.return_value = mock_provider
646 | 
647 |             # Mock ModelContext to avoid the comparison issue
648 |             from utils.model_context import TokenAllocation
649 | 
650 |             mock_model_context = MagicMock()
651 |             mock_model_context.model_name = "flash"
652 |             mock_model_context.provider = mock_provider
653 |             mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
654 |                 total_tokens=1_048_576,
655 |                 content_tokens=838_861,
656 |                 response_tokens=209_715,
657 |                 file_tokens=335_544,
658 |                 history_tokens=335_544,
659 |             )
660 |             mock_model_context_class.return_value = mock_model_context
661 | 
662 |             # Simulate continuation by having the request contain embedded conversation history
663 |             # This mimics what server.py does when it embeds conversation history
664 |             request_with_history = {
665 |                 "prompt": f"{huge_conversation_history}\n\n=== CURRENT REQUEST ===\n{small_continuation_prompt}",
666 |                 "model": "flash",
667 |                 "continuation_id": "test_thread_123",
668 |                 "working_directory_absolute_path": temp_dir,
669 |             }
670 | 
671 |             # Mock the conversation history embedding to simulate server.py behavior
672 |             original_execute = tool.__class__.execute
673 | 
674 |             async def mock_execute_with_history(self, arguments):
675 |                 # Check if this has continuation_id (simulating server.py logic)
676 |                 if arguments.get("continuation_id"):
677 |                     # Simulate the case where conversation history is already embedded in prompt
678 |                     # by server.py before calling the tool
679 |                     field_value = arguments.get("prompt", "")
680 |                     if "=== CONVERSATION HISTORY ===" in field_value:
681 |                         # Set the flag that history is embedded
682 |                         self._has_embedded_history = True
683 | 
684 |                         # The prompt field contains both history AND user input
685 |                         # But we should only check the user input part for MCP boundary
686 |                         # (This is what our fix ensures happens in prepare_prompt)
687 | 
688 |                 # Call original execute
689 |                 return await original_execute(self, arguments)
690 | 
691 |             tool.__class__.execute = mock_execute_with_history
692 | 
693 |             try:
694 |                 # This should succeed because:
695 |                 # 1. The actual user input is small (passes MCP boundary check)
696 |                 # 2. The huge conversation history is internal processing (not subject to MCP limits)
697 |                 result = await tool.execute(request_with_history)
698 |                 output = json.loads(result[0].text)
699 | 
700 |                 # Should succeed even though total prompt with history is huge
701 |                 assert output["status"] != "resend_prompt"
702 |                 assert "Continuing our conversation" in output["content"]
703 | 
704 |                 # Verify the model was called with the complete prompt (including huge history)
705 |                 mock_provider.generate_content.assert_called_once()
706 |                 call_kwargs = mock_provider.generate_content.call_args[1]
707 |                 final_prompt = call_kwargs.get("prompt")
708 | 
709 |                 # The final prompt should contain both history and user input
710 |                 assert huge_conversation_history in final_prompt
711 |                 assert small_continuation_prompt in final_prompt
712 |                 # And it should be huge (proving we don't limit internal processing)
713 |                 assert len(final_prompt) > MCP_PROMPT_SIZE_LIMIT
714 | 
715 |             finally:
716 |                 # Restore original execute method
717 |                 tool.__class__.execute = original_execute
718 |                 shutil.rmtree(temp_dir, ignore_errors=True)
719 | 
720 | 
721 | if __name__ == "__main__":
722 |     pytest.main([__file__, "-v"])
723 | 
```

--------------------------------------------------------------------------------
/tools/refactor.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Refactor tool - Step-by-step refactoring analysis with expert validation
  3 | 
  4 | This tool provides a structured workflow for comprehensive code refactoring analysis.
  5 | It guides CLI agent through systematic investigation steps with forced pauses between each step
  6 | to ensure thorough code examination, refactoring opportunity identification, and quality
  7 | assessment before proceeding. The tool supports complex refactoring scenarios including
  8 | code smell detection, decomposition planning, modernization opportunities, and organization improvements.
  9 | 
 10 | Key features:
 11 | - Step-by-step refactoring investigation workflow with progress tracking
 12 | - Context-aware file embedding (references during investigation, full content for analysis)
 13 | - Automatic refactoring opportunity tracking with type and severity classification
 14 | - Expert analysis integration with external models
 15 | - Support for focused refactoring types (codesmells, decompose, modernize, organization)
 16 | - Confidence-based workflow optimization with refactor completion tracking
 17 | """
 18 | 
 19 | import logging
 20 | from typing import TYPE_CHECKING, Any, Literal, Optional
 21 | 
 22 | from pydantic import Field, model_validator
 23 | 
 24 | if TYPE_CHECKING:
 25 |     from tools.models import ToolModelCategory
 26 | 
 27 | from config import TEMPERATURE_ANALYTICAL
 28 | from systemprompts import REFACTOR_PROMPT
 29 | from tools.shared.base_models import WorkflowRequest
 30 | 
 31 | from .workflow.base import WorkflowTool
 32 | 
 33 | logger = logging.getLogger(__name__)
 34 | 
 35 | # Tool-specific field descriptions for refactor tool
 36 | REFACTOR_FIELD_DESCRIPTIONS = {
 37 |     "step": (
 38 |         "The refactoring plan. Step 1: State strategy. Later steps: Report findings. "
 39 |         "CRITICAL: Examine code for smells, and opportunities for decomposition, modernization, and organization. "
 40 |         "Use 'relevant_files' for code. FORBIDDEN: Large code snippets."
 41 |     ),
 42 |     "step_number": (
 43 |         "The index of the current step in the refactoring investigation sequence, beginning at 1. Each step should "
 44 |         "build upon or revise the previous one."
 45 |     ),
 46 |     "total_steps": (
 47 |         "Your current estimate for how many steps will be needed to complete the refactoring investigation. "
 48 |         "Adjust as new opportunities emerge."
 49 |     ),
 50 |     "next_step_required": (
 51 |         "Set to true if you plan to continue the investigation with another step. False means you believe the "
 52 |         "refactoring analysis is complete and ready for expert validation."
 53 |     ),
 54 |     "findings": (
 55 |         "Summary of discoveries from this step, including code smells and opportunities for decomposition, modernization, or organization. "
 56 |         "Document both strengths and weaknesses. In later steps, confirm or update past findings."
 57 |     ),
 58 |     "files_checked": (
 59 |         "List all files examined (absolute paths). Include even ruled-out files to track exploration path."
 60 |     ),
 61 |     "relevant_files": (
 62 |         "Subset of files_checked with code requiring refactoring (absolute paths). Include files with "
 63 |         "code smells, decomposition needs, or improvement opportunities."
 64 |     ),
 65 |     "relevant_context": (
 66 |         "List methods/functions central to refactoring opportunities, in 'ClassName.methodName' or 'functionName' format. "
 67 |         "Prioritize those with code smells or needing improvement."
 68 |     ),
 69 |     "issues_found": (
 70 |         "Refactoring opportunities as dictionaries with 'severity' (critical/high/medium/low), "
 71 |         "'type' (codesmells/decompose/modernize/organization), and 'description'. "
 72 |         "Include all improvement opportunities found."
 73 |     ),
 74 |     "confidence": (
 75 |         "Your confidence in refactoring analysis: exploring (starting), incomplete (significant work remaining), "
 76 |         "partial (some opportunities found, more analysis needed), complete (comprehensive analysis finished, "
 77 |         "all major opportunities identified). "
 78 |         "WARNING: Use 'complete' ONLY when fully analyzed and can provide recommendations without expert help. "
 79 |         "'complete' PREVENTS expert validation. Use 'partial' for large files or uncertain analysis."
 80 |     ),
 81 |     "images": (
 82 |         "Optional list of absolute paths to architecture diagrams, UI mockups, design documents, or visual references "
 83 |         "that help with refactoring context. Only include if they materially assist understanding or assessment."
 84 |     ),
 85 |     "refactor_type": "Type of refactoring analysis to perform (codesmells, decompose, modernize, organization)",
 86 |     "focus_areas": "Specific areas to focus on (e.g., 'performance', 'readability', 'maintainability', 'security')",
 87 |     "style_guide_examples": (
 88 |         "Optional existing code files to use as style/pattern reference (must be FULL absolute paths to real files / "
 89 |         "folders - DO NOT SHORTEN). These files represent the target coding style and patterns for the project."
 90 |     ),
 91 | }
 92 | 
 93 | 
 94 | class RefactorRequest(WorkflowRequest):
 95 |     """Request model for refactor workflow investigation steps"""
 96 | 
 97 |     # Required fields for each investigation step
 98 |     step: str = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["step"])
 99 |     step_number: int = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["step_number"])
100 |     total_steps: int = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["total_steps"])
101 |     next_step_required: bool = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["next_step_required"])
102 | 
103 |     # Investigation tracking fields
104 |     findings: str = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["findings"])
105 |     files_checked: list[str] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["files_checked"])
106 |     relevant_files: list[str] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["relevant_files"])
107 |     relevant_context: list[str] = Field(
108 |         default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["relevant_context"]
109 |     )
110 |     issues_found: list[dict] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["issues_found"])
111 |     confidence: Optional[Literal["exploring", "incomplete", "partial", "complete"]] = Field(
112 |         "incomplete", description=REFACTOR_FIELD_DESCRIPTIONS["confidence"]
113 |     )
114 | 
115 |     # Optional images for visual context
116 |     images: Optional[list[str]] = Field(default=None, description=REFACTOR_FIELD_DESCRIPTIONS["images"])
117 | 
118 |     # Refactor-specific fields (only used in step 1 to initialize)
119 |     refactor_type: Optional[Literal["codesmells", "decompose", "modernize", "organization"]] = Field(
120 |         "codesmells", description=REFACTOR_FIELD_DESCRIPTIONS["refactor_type"]
121 |     )
122 |     focus_areas: Optional[list[str]] = Field(None, description=REFACTOR_FIELD_DESCRIPTIONS["focus_areas"])
123 |     style_guide_examples: Optional[list[str]] = Field(
124 |         None, description=REFACTOR_FIELD_DESCRIPTIONS["style_guide_examples"]
125 |     )
126 | 
127 |     # Override inherited fields to exclude them from schema (except model which needs to be available)
128 |     temperature: Optional[float] = Field(default=None, exclude=True)
129 |     thinking_mode: Optional[str] = Field(default=None, exclude=True)
130 | 
131 |     @model_validator(mode="after")
132 |     def validate_step_one_requirements(self):
133 |         """Ensure step 1 has required relevant_files field."""
134 |         if self.step_number == 1 and not self.relevant_files:
135 |             raise ValueError(
136 |                 "Step 1 requires 'relevant_files' field to specify code files or directories to analyze for refactoring"
137 |             )
138 |         return self
139 | 
140 | 
141 | class RefactorTool(WorkflowTool):
142 |     """
143 |     Refactor tool for step-by-step refactoring analysis and expert validation.
144 | 
145 |     This tool implements a structured refactoring workflow that guides users through
146 |     methodical investigation steps, ensuring thorough code examination, refactoring opportunity
147 |     identification, and improvement assessment before reaching conclusions. It supports complex
148 |     refactoring scenarios including code smell detection, decomposition planning, modernization
149 |     opportunities, and organization improvements.
150 |     """
151 | 
152 |     def __init__(self):
153 |         super().__init__()
154 |         self.initial_request = None
155 |         self.refactor_config = {}
156 | 
157 |     def get_name(self) -> str:
158 |         return "refactor"
159 | 
160 |     def get_description(self) -> str:
161 |         return (
162 |             "Analyzes code for refactoring opportunities with systematic investigation. "
163 |             "Use for code smell detection, decomposition planning, modernization, and maintainability improvements. "
164 |             "Guides through structured analysis with expert validation."
165 |         )
166 | 
167 |     def get_system_prompt(self) -> str:
168 |         return REFACTOR_PROMPT
169 | 
170 |     def get_default_temperature(self) -> float:
171 |         return TEMPERATURE_ANALYTICAL
172 | 
173 |     def get_model_category(self) -> "ToolModelCategory":
174 |         """Refactor workflow requires thorough analysis and reasoning"""
175 |         from tools.models import ToolModelCategory
176 | 
177 |         return ToolModelCategory.EXTENDED_REASONING
178 | 
179 |     def get_workflow_request_model(self):
180 |         """Return the refactor workflow-specific request model."""
181 |         return RefactorRequest
182 | 
183 |     def get_input_schema(self) -> dict[str, Any]:
184 |         """Generate input schema using WorkflowSchemaBuilder with refactor-specific overrides."""
185 |         from .workflow.schema_builders import WorkflowSchemaBuilder
186 | 
187 |         # Refactor workflow-specific field overrides
188 |         refactor_field_overrides = {
189 |             "step": {
190 |                 "type": "string",
191 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["step"],
192 |             },
193 |             "step_number": {
194 |                 "type": "integer",
195 |                 "minimum": 1,
196 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["step_number"],
197 |             },
198 |             "total_steps": {
199 |                 "type": "integer",
200 |                 "minimum": 1,
201 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["total_steps"],
202 |             },
203 |             "next_step_required": {
204 |                 "type": "boolean",
205 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["next_step_required"],
206 |             },
207 |             "findings": {
208 |                 "type": "string",
209 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["findings"],
210 |             },
211 |             "files_checked": {
212 |                 "type": "array",
213 |                 "items": {"type": "string"},
214 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["files_checked"],
215 |             },
216 |             "relevant_files": {
217 |                 "type": "array",
218 |                 "items": {"type": "string"},
219 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["relevant_files"],
220 |             },
221 |             "confidence": {
222 |                 "type": "string",
223 |                 "enum": ["exploring", "incomplete", "partial", "complete"],
224 |                 "default": "incomplete",
225 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["confidence"],
226 |             },
227 |             "issues_found": {
228 |                 "type": "array",
229 |                 "items": {"type": "object"},
230 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["issues_found"],
231 |             },
232 |             "images": {
233 |                 "type": "array",
234 |                 "items": {"type": "string"},
235 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["images"],
236 |             },
237 |             # Refactor-specific fields (for step 1)
238 |             # Note: Use relevant_files field instead of files for consistency
239 |             "refactor_type": {
240 |                 "type": "string",
241 |                 "enum": ["codesmells", "decompose", "modernize", "organization"],
242 |                 "default": "codesmells",
243 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["refactor_type"],
244 |             },
245 |             "focus_areas": {
246 |                 "type": "array",
247 |                 "items": {"type": "string"},
248 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["focus_areas"],
249 |             },
250 |             "style_guide_examples": {
251 |                 "type": "array",
252 |                 "items": {"type": "string"},
253 |                 "description": REFACTOR_FIELD_DESCRIPTIONS["style_guide_examples"],
254 |             },
255 |         }
256 | 
257 |         # Use WorkflowSchemaBuilder with refactor-specific tool fields
258 |         return WorkflowSchemaBuilder.build_schema(
259 |             tool_specific_fields=refactor_field_overrides,
260 |             model_field_schema=self.get_model_field_schema(),
261 |             auto_mode=self.is_effective_auto_mode(),
262 |             tool_name=self.get_name(),
263 |         )
264 | 
265 |     def get_required_actions(
266 |         self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
267 |     ) -> list[str]:
268 |         """Define required actions for each investigation phase."""
269 |         if step_number == 1:
270 |             # Initial refactoring investigation tasks
271 |             return [
272 |                 "Read and understand the code files specified for refactoring analysis",
273 |                 "Examine the overall structure, architecture, and design patterns used",
274 |                 "Identify potential code smells: long methods, large classes, duplicate code, complex conditionals",
275 |                 "Look for decomposition opportunities: oversized components that could be broken down",
276 |                 "Check for modernization opportunities: outdated patterns, deprecated features, newer language constructs",
277 |                 "Assess organization: logical grouping, file structure, naming conventions, module boundaries",
278 |                 "Document specific refactoring opportunities with file locations and line numbers",
279 |             ]
280 |         elif confidence in ["exploring", "incomplete"]:
281 |             # Need deeper investigation
282 |             return [
283 |                 "Examine specific code sections you've identified as needing refactoring",
284 |                 "Analyze code smells in detail: complexity, coupling, cohesion issues",
285 |                 "Investigate decomposition opportunities: identify natural breaking points for large components",
286 |                 "Look for modernization possibilities: language features, patterns, libraries that could improve the code",
287 |                 "Check organization issues: related functionality that could be better grouped or structured",
288 |                 "Trace dependencies and relationships between components to understand refactoring impact",
289 |                 "Prioritize refactoring opportunities by impact and effort required",
290 |             ]
291 |         elif confidence == "partial":
292 |             # Close to completion - need final verification
293 |             return [
294 |                 "Verify all identified refactoring opportunities have been properly documented with locations",
295 |                 "Check for any missed opportunities in areas not yet thoroughly examined",
296 |                 "Confirm that refactoring suggestions align with the specified refactor_type and focus_areas",
297 |                 "Ensure refactoring opportunities are prioritized by severity and impact",
298 |                 "Validate that proposed changes would genuinely improve code quality without breaking functionality",
299 |                 "Double-check that all relevant files and code elements are captured in your analysis",
300 |             ]
301 |         else:
302 |             # General investigation needed
303 |             return [
304 |                 "Continue examining the codebase for additional refactoring opportunities",
305 |                 "Gather more evidence using appropriate code analysis techniques",
306 |                 "Test your assumptions about code quality and improvement possibilities",
307 |                 "Look for patterns that confirm or refute your current refactoring assessment",
308 |                 "Focus on areas that haven't been thoroughly examined for refactoring potential",
309 |             ]
310 | 
311 |     def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
312 |         """
313 |         Decide when to call external model based on investigation completeness.
314 | 
315 |         Don't call expert analysis if the CLI agent has certain confidence and complete refactoring - trust their judgment.
316 |         """
317 |         # Check if user requested to skip assistant model
318 |         if request and not self.get_request_use_assistant_model(request):
319 |             return False
320 | 
321 |         # Check if refactoring work is complete
322 |         if request and request.confidence == "complete":
323 |             return False
324 | 
325 |         # Check if we have meaningful investigation data
326 |         return (
327 |             len(consolidated_findings.relevant_files) > 0
328 |             or len(consolidated_findings.findings) >= 2
329 |             or len(consolidated_findings.issues_found) > 0
330 |         )
331 | 
332 |     def prepare_expert_analysis_context(self, consolidated_findings) -> str:
333 |         """Prepare context for external model call for final refactoring validation."""
334 |         context_parts = [
335 |             f"=== REFACTORING ANALYSIS REQUEST ===\\n{self.initial_request or 'Refactoring workflow initiated'}\\n=== END REQUEST ==="
336 |         ]
337 | 
338 |         # Add investigation summary
339 |         investigation_summary = self._build_refactoring_summary(consolidated_findings)
340 |         context_parts.append(
341 |             f"\\n=== AGENT'S REFACTORING INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
342 |         )
343 | 
344 |         # Add refactor configuration context if available
345 |         if self.refactor_config:
346 |             config_text = "\\n".join(f"- {key}: {value}" for key, value in self.refactor_config.items() if value)
347 |             context_parts.append(f"\\n=== REFACTOR CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")
348 | 
349 |         # Add relevant code elements if available
350 |         if consolidated_findings.relevant_context:
351 |             methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
352 |             context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")
353 | 
354 |         # Add refactoring opportunities found if available
355 |         if consolidated_findings.issues_found:
356 |             opportunities_text = "\\n".join(
357 |                 f"[{issue.get('severity', 'unknown').upper()}] {issue.get('type', 'unknown').upper()}: {issue.get('description', 'No description')}"
358 |                 for issue in consolidated_findings.issues_found
359 |             )
360 |             context_parts.append(
361 |                 f"\\n=== REFACTORING OPPORTUNITIES ===\\n{opportunities_text}\\n=== END OPPORTUNITIES ==="
362 |             )
363 | 
364 |         # Add assessment evolution if available
365 |         if consolidated_findings.hypotheses:
366 |             assessments_text = "\\n".join(
367 |                 f"Step {h['step']} ({h['confidence']} confidence): {h['hypothesis']}"
368 |                 for h in consolidated_findings.hypotheses
369 |             )
370 |             context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")
371 | 
372 |         # Add images if available
373 |         if consolidated_findings.images:
374 |             images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
375 |             context_parts.append(
376 |                 f"\\n=== VISUAL REFACTORING INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
377 |             )
378 | 
379 |         return "\\n".join(context_parts)
380 | 
381 |     def _build_refactoring_summary(self, consolidated_findings) -> str:
382 |         """Prepare a comprehensive summary of the refactoring investigation."""
383 |         summary_parts = [
384 |             "=== SYSTEMATIC REFACTORING INVESTIGATION SUMMARY ===",
385 |             f"Total steps: {len(consolidated_findings.findings)}",
386 |             f"Files examined: {len(consolidated_findings.files_checked)}",
387 |             f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
388 |             f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
389 |             f"Refactoring opportunities identified: {len(consolidated_findings.issues_found)}",
390 |             "",
391 |             "=== INVESTIGATION PROGRESSION ===",
392 |         ]
393 | 
394 |         for finding in consolidated_findings.findings:
395 |             summary_parts.append(finding)
396 | 
397 |         return "\\n".join(summary_parts)
398 | 
399 |     def should_include_files_in_expert_prompt(self) -> bool:
400 |         """Include files in expert analysis for comprehensive refactoring validation."""
401 |         return True
402 | 
403 |     def should_embed_system_prompt(self) -> bool:
404 |         """Embed system prompt in expert analysis for proper context."""
405 |         return True
406 | 
407 |     def get_expert_thinking_mode(self) -> str:
408 |         """Use high thinking mode for thorough refactoring analysis."""
409 |         return "high"
410 | 
411 |     def get_expert_analysis_instruction(self) -> str:
412 |         """Get specific instruction for refactoring expert analysis."""
413 |         return (
414 |             "Please provide comprehensive refactoring analysis based on the investigation findings. "
415 |             "Focus on validating the identified opportunities, ensuring completeness of the analysis, "
416 |             "and providing final recommendations for refactoring implementation, following the structured "
417 |             "format specified in the system prompt."
418 |         )
419 | 
420 |     # Hook method overrides for refactor-specific behavior
421 | 
422 |     def prepare_step_data(self, request) -> dict:
423 |         """
424 |         Map refactor workflow-specific fields for internal processing.
425 |         """
426 |         step_data = {
427 |             "step": request.step,
428 |             "step_number": request.step_number,
429 |             "findings": request.findings,
430 |             "files_checked": request.files_checked,
431 |             "relevant_files": request.relevant_files,
432 |             "relevant_context": request.relevant_context,
433 |             "issues_found": request.issues_found,
434 |             "confidence": request.confidence,
435 |             "hypothesis": request.findings,  # Map findings to hypothesis for compatibility
436 |             "images": request.images or [],
437 |         }
438 |         return step_data
439 | 
440 |     def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
441 |         """
442 |         Refactor workflow skips expert analysis when the CLI agent has "complete" confidence.
443 |         """
444 |         return request.confidence == "complete" and not request.next_step_required
445 | 
446 |     def store_initial_issue(self, step_description: str):
447 |         """Store initial request for expert analysis."""
448 |         self.initial_request = step_description
449 | 
450 |     # Inheritance hook methods for refactor-specific behavior
451 | 
452 |     # Override inheritance hooks for refactor-specific behavior
453 | 
454 |     def get_completion_status(self) -> str:
455 |         """Refactor tools use refactor-specific status."""
456 |         return "refactoring_analysis_complete_ready_for_implementation"
457 | 
458 |     def get_completion_data_key(self) -> str:
459 |         """Refactor uses 'complete_refactoring' key."""
460 |         return "complete_refactoring"
461 | 
462 |     def get_final_analysis_from_request(self, request):
463 |         """Refactor tools use 'findings' field."""
464 |         return request.findings
465 | 
466 |     def get_confidence_level(self, request) -> str:
467 |         """Refactor tools use 'complete' for high confidence."""
468 |         return "complete"
469 | 
470 |     def get_completion_message(self) -> str:
471 |         """Refactor-specific completion message."""
472 |         return (
473 |             "Refactoring analysis complete with COMPLETE confidence. You have identified all significant "
474 |             "refactoring opportunities and provided comprehensive analysis. MANDATORY: Present the user with "
475 |             "the complete refactoring results organized by type and severity, and IMMEDIATELY proceed with "
476 |             "implementing the highest priority refactoring opportunities or provide specific guidance for "
477 |             "improvements. Focus on actionable refactoring steps."
478 |         )
479 | 
480 |     def get_skip_reason(self) -> str:
481 |         """Refactor-specific skip reason."""
482 |         return "Completed comprehensive refactoring analysis with full confidence locally"
483 | 
484 |     def get_skip_expert_analysis_status(self) -> str:
485 |         """Refactor-specific expert analysis skip status."""
486 |         return "skipped_due_to_complete_refactoring_confidence"
487 | 
488 |     def prepare_work_summary(self) -> str:
489 |         """Refactor-specific work summary."""
490 |         return self._build_refactoring_summary(self.consolidated_findings)
491 | 
492 |     def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
493 |         """
494 |         Refactor-specific completion message.
495 | 
496 |         Args:
497 |             expert_analysis_used: True if expert analysis was successfully executed
498 |         """
499 |         base_message = (
500 |             "REFACTORING ANALYSIS IS COMPLETE. You MUST now summarize and present ALL refactoring opportunities "
501 |             "organized by type (codesmells → decompose → modernize → organization) and severity (Critical → High → "
502 |             "Medium → Low), specific code locations with line numbers, and exact recommendations for improvement. "
503 |             "Clearly prioritize the top 3 refactoring opportunities that need immediate attention. Provide concrete, "
504 |             "actionable guidance for each opportunity—make it easy for a developer to understand exactly what needs "
505 |             "to be refactored and how to implement the improvements."
506 |         )
507 | 
508 |         # Add expert analysis guidance only when expert analysis was actually used
509 |         if expert_analysis_used:
510 |             expert_guidance = self.get_expert_analysis_guidance()
511 |             if expert_guidance:
512 |                 return f"{base_message}\n\n{expert_guidance}"
513 | 
514 |         return base_message
515 | 
516 |     def get_expert_analysis_guidance(self) -> str:
517 |         """
518 |         Get additional guidance for handling expert analysis results in refactor context.
519 | 
520 |         Returns:
521 |             Additional guidance text for validating and using expert analysis findings
522 |         """
523 |         return (
524 |             "IMPORTANT: Expert refactoring analysis has been provided above. You MUST review "
525 |             "the expert's architectural insights and refactoring recommendations. Consider whether "
526 |             "the expert's suggestions align with the codebase's evolution trajectory and current "
527 |             "team priorities. Pay special attention to any breaking changes, migration complexity, "
528 |             "or performance implications highlighted by the expert. Present a balanced view that "
529 |             "considers both immediate benefits and long-term maintainability."
530 |         )
531 | 
532 |     def get_step_guidance_message(self, request) -> str:
533 |         """
534 |         Refactor-specific step guidance with detailed investigation instructions.
535 |         """
536 |         step_guidance = self.get_refactor_step_guidance(request.step_number, request.confidence, request)
537 |         return step_guidance["next_steps"]
538 | 
539 |     def get_refactor_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
540 |         """
541 |         Provide step-specific guidance for refactor workflow.
542 |         """
543 |         # Generate the next steps instruction based on required actions
544 |         required_actions = self.get_required_actions(step_number, confidence, request.findings, request.total_steps)
545 | 
546 |         if step_number == 1:
547 |             next_steps = (
548 |                 f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
549 |                 f"the code files thoroughly for refactoring opportunities using appropriate tools. CRITICAL AWARENESS: "
550 |                 f"You need to identify code smells, decomposition opportunities, modernization possibilities, and "
551 |                 f"organization improvements across the specified refactor_type. Look for complexity issues, outdated "
552 |                 f"patterns, oversized components, and structural problems. Use file reading tools, code analysis, and "
553 |                 f"systematic examination to gather comprehensive refactoring information. Only call {self.get_name()} "
554 |                 f"again AFTER completing your investigation. When you call {self.get_name()} next time, use "
555 |                 f"step_number: {step_number + 1} and report specific files examined, refactoring opportunities found, "
556 |                 f"and improvement assessments discovered."
557 |             )
558 |         elif confidence in ["exploring", "incomplete"]:
559 |             next_steps = (
560 |                 f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
561 |                 f"deeper refactoring analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
562 |                 + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
563 |                 + f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
564 |                 + "completing these refactoring analysis tasks."
565 |             )
566 |         elif confidence == "partial":
567 |             next_steps = (
568 |                 f"WAIT! Your refactoring analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
569 |                 + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
570 |                 + f"\\n\\nREMEMBER: Ensure you have identified all significant refactoring opportunities across all types and "
571 |                 f"verified the completeness of your analysis. Document opportunities with specific file references and "
572 |                 f"line numbers where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
573 |             )
574 |         else:
575 |             next_steps = (
576 |                 f"PAUSE REFACTORING ANALYSIS. Before calling {self.get_name()} step {step_number + 1}, you MUST examine more code thoroughly. "
577 |                 + "Required: "
578 |                 + ", ".join(required_actions[:2])
579 |                 + ". "
580 |                 + f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
581 |                 f"NEW evidence from actual refactoring analysis, not just theories. NO recursive {self.get_name()} calls "
582 |                 f"without investigation work!"
583 |             )
584 | 
585 |         return {"next_steps": next_steps}
586 | 
587 |     def customize_workflow_response(self, response_data: dict, request) -> dict:
588 |         """
589 |         Customize response to match refactor workflow format.
590 |         """
591 |         # Store initial request on first step
592 |         if request.step_number == 1:
593 |             self.initial_request = request.step
594 |             # Store refactor configuration for expert analysis
595 |             if request.relevant_files:
596 |                 self.refactor_config = {
597 |                     "relevant_files": request.relevant_files,
598 |                     "refactor_type": request.refactor_type,
599 |                     "focus_areas": request.focus_areas,
600 |                     "style_guide_examples": request.style_guide_examples,
601 |                 }
602 | 
603 |         # Convert generic status names to refactor-specific ones
604 |         tool_name = self.get_name()
605 |         status_mapping = {
606 |             f"{tool_name}_in_progress": "refactoring_analysis_in_progress",
607 |             f"pause_for_{tool_name}": "pause_for_refactoring_analysis",
608 |             f"{tool_name}_required": "refactoring_analysis_required",
609 |             f"{tool_name}_complete": "refactoring_analysis_complete",
610 |         }
611 | 
612 |         if response_data["status"] in status_mapping:
613 |             response_data["status"] = status_mapping[response_data["status"]]
614 | 
615 |         # Rename status field to match refactor workflow
616 |         if f"{tool_name}_status" in response_data:
617 |             response_data["refactoring_status"] = response_data.pop(f"{tool_name}_status")
618 |             # Add refactor-specific status fields
619 |             refactor_types = {}
620 |             for issue in self.consolidated_findings.issues_found:
621 |                 issue_type = issue.get("type", "unknown")
622 |                 if issue_type not in refactor_types:
623 |                     refactor_types[issue_type] = 0
624 |                 refactor_types[issue_type] += 1
625 |             response_data["refactoring_status"]["opportunities_by_type"] = refactor_types
626 |             response_data["refactoring_status"]["refactor_confidence"] = request.confidence
627 | 
628 |         # Map complete_refactor to complete_refactoring
629 |         if f"complete_{tool_name}" in response_data:
630 |             response_data["complete_refactoring"] = response_data.pop(f"complete_{tool_name}")
631 | 
632 |         # Map the completion flag to match refactor workflow
633 |         if f"{tool_name}_complete" in response_data:
634 |             response_data["refactoring_complete"] = response_data.pop(f"{tool_name}_complete")
635 | 
636 |         return response_data
637 | 
638 |     # Required abstract methods from BaseTool
639 |     def get_request_model(self):
640 |         """Return the refactor workflow-specific request model."""
641 |         return RefactorRequest
642 | 
643 |     async def prepare_prompt(self, request) -> str:
644 |         """Not used - workflow tools use execute_workflow()."""
645 |         return ""  # Workflow tools use execute_workflow() directly
646 | 
```

--------------------------------------------------------------------------------
/utils/file_utils.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | File reading utilities with directory support and token management
  3 | 
  4 | This module provides secure file access functionality for the MCP server.
  5 | It implements critical security measures to prevent unauthorized file access
  6 | and manages token limits to ensure efficient API usage.
  7 | 
  8 | Key Features:
  9 | - Path validation and sandboxing to prevent directory traversal attacks
 10 | - Support for both individual files and recursive directory reading
 11 | - Token counting and management to stay within API limits
 12 | - Automatic file type detection and filtering
 13 | - Comprehensive error handling with informative messages
 14 | 
 15 | Security Model:
 16 | - All file access is restricted to PROJECT_ROOT and its subdirectories
 17 | - Absolute paths are required to prevent ambiguity
 18 | - Symbolic links are resolved to ensure they stay within bounds
 19 | 
 20 | CONVERSATION MEMORY INTEGRATION:
 21 | This module works with the conversation memory system to support efficient
 22 | multi-turn file handling:
 23 | 
 24 | 1. DEDUPLICATION SUPPORT:
 25 |    - File reading functions are called by conversation-aware tools
 26 |    - Supports newest-first file prioritization by providing accurate token estimation
 27 |    - Enables efficient file content caching and token budget management
 28 | 
 29 | 2. TOKEN BUDGET OPTIMIZATION:
 30 |    - Provides accurate token estimation for file content before reading
 31 |    - Supports the dual prioritization strategy by enabling precise budget calculations
 32 |    - Enables tools to make informed decisions about which files to include
 33 | 
 34 | 3. CROSS-TOOL FILE PERSISTENCE:
 35 |    - File reading results are used across different tools in conversation chains
 36 |    - Consistent file access patterns support conversation continuation scenarios
 37 |    - Error handling preserves conversation flow when files become unavailable
 38 | """
 39 | 
 40 | import json
 41 | import logging
 42 | import os
 43 | from datetime import datetime, timezone
 44 | from pathlib import Path
 45 | from typing import Optional
 46 | 
 47 | from .file_types import BINARY_EXTENSIONS, CODE_EXTENSIONS, IMAGE_EXTENSIONS, TEXT_EXTENSIONS
 48 | from .security_config import EXCLUDED_DIRS, is_dangerous_path
 49 | from .token_utils import DEFAULT_CONTEXT_WINDOW, estimate_tokens
 50 | 
 51 | 
 52 | def _is_builtin_custom_models_config(path_str: str) -> bool:
 53 |     """
 54 |     Check if path points to the server's built-in custom_models.json config file.
 55 | 
 56 |     This only matches the server's internal config, not user-specified CUSTOM_MODELS_CONFIG_PATH.
 57 |     We identify the built-in config by checking if it resolves to the server's conf directory.
 58 | 
 59 |     Args:
 60 |         path_str: Path to check
 61 | 
 62 |     Returns:
 63 |         True if this is the server's built-in custom_models.json config file
 64 |     """
 65 |     try:
 66 |         path = Path(path_str)
 67 | 
 68 |         # Get the server root by going up from this file: utils/file_utils.py -> server_root
 69 |         server_root = Path(__file__).parent.parent
 70 |         builtin_config = server_root / "conf" / "custom_models.json"
 71 | 
 72 |         # Check if the path resolves to the same file as our built-in config
 73 |         # This handles both relative and absolute paths to the same file
 74 |         return path.resolve() == builtin_config.resolve()
 75 | 
 76 |     except Exception:
 77 |         # If path resolution fails, it's not our built-in config
 78 |         return False
 79 | 
 80 | 
 81 | logger = logging.getLogger(__name__)
 82 | 
 83 | 
 84 | def is_mcp_directory(path: Path) -> bool:
 85 |     """
 86 |     Check if a directory is the MCP server's own directory.
 87 | 
 88 |     This prevents the MCP from including its own code when scanning projects
 89 |     where the MCP has been cloned as a subdirectory.
 90 | 
 91 |     Args:
 92 |         path: Directory path to check
 93 | 
 94 |     Returns:
 95 |         True if this is the MCP server directory or a subdirectory
 96 |     """
 97 |     if not path.is_dir():
 98 |         return False
 99 | 
100 |     # Get the directory where the MCP server is running from
101 |     # __file__ is utils/file_utils.py, so parent.parent is the MCP root
102 |     mcp_server_dir = Path(__file__).parent.parent.resolve()
103 | 
104 |     # Check if the given path is the MCP server directory or a subdirectory
105 |     try:
106 |         path.resolve().relative_to(mcp_server_dir)
107 |         logger.info(f"Detected MCP server directory at {path}, will exclude from scanning")
108 |         return True
109 |     except ValueError:
110 |         # Not a subdirectory of MCP server
111 |         return False
112 | 
113 | 
114 | def get_user_home_directory() -> Optional[Path]:
115 |     """
116 |     Get the user's home directory.
117 | 
118 |     Returns:
119 |         User's home directory path
120 |     """
121 |     return Path.home()
122 | 
123 | 
124 | def is_home_directory_root(path: Path) -> bool:
125 |     """
126 |     Check if the given path is the user's home directory root.
127 | 
128 |     This prevents scanning the entire home directory which could include
129 |     sensitive data and non-project files.
130 | 
131 |     Args:
132 |         path: Directory path to check
133 | 
134 |     Returns:
135 |         True if this is the home directory root
136 |     """
137 |     user_home = get_user_home_directory()
138 |     if not user_home:
139 |         return False
140 | 
141 |     try:
142 |         resolved_path = path.resolve()
143 |         resolved_home = user_home.resolve()
144 | 
145 |         # Check if this is exactly the home directory
146 |         if resolved_path == resolved_home:
147 |             logger.warning(
148 |                 f"Attempted to scan user home directory root: {path}. Please specify a subdirectory instead."
149 |             )
150 |             return True
151 | 
152 |         # Also check common home directory patterns
153 |         path_str = str(resolved_path).lower()
154 |         home_patterns = [
155 |             "/users/",  # macOS
156 |             "/home/",  # Linux
157 |             "c:\\users\\",  # Windows
158 |             "c:/users/",  # Windows with forward slashes
159 |         ]
160 | 
161 |         for pattern in home_patterns:
162 |             if pattern in path_str:
163 |                 # Extract the user directory path
164 |                 # e.g., /Users/fahad or /home/username
165 |                 parts = path_str.split(pattern)
166 |                 if len(parts) > 1:
167 |                     # Get the part after the pattern
168 |                     after_pattern = parts[1]
169 |                     # Check if we're at the user's root (no subdirectories)
170 |                     if "/" not in after_pattern and "\\" not in after_pattern:
171 |                         logger.warning(
172 |                             f"Attempted to scan user home directory root: {path}. "
173 |                             f"Please specify a subdirectory instead."
174 |                         )
175 |                         return True
176 | 
177 |     except Exception as e:
178 |         logger.debug(f"Error checking if path is home directory: {e}")
179 | 
180 |     return False
181 | 
182 | 
183 | def detect_file_type(file_path: str) -> str:
184 |     """
185 |     Detect file type for appropriate processing strategy.
186 | 
187 |     This function is intended for specific file type handling (e.g., image processing,
188 |     binary file analysis, or enhanced file filtering).
189 | 
190 |     Args:
191 |         file_path: Path to the file to analyze
192 | 
193 |     Returns:
194 |         str: "text", "binary", or "image"
195 |     """
196 |     path = Path(file_path)
197 | 
198 |     # Check extension first (fast)
199 |     extension = path.suffix.lower()
200 |     if extension in TEXT_EXTENSIONS:
201 |         return "text"
202 |     elif extension in IMAGE_EXTENSIONS:
203 |         return "image"
204 |     elif extension in BINARY_EXTENSIONS:
205 |         return "binary"
206 | 
207 |     # Fallback: check magic bytes for text vs binary
208 |     # This is helpful for files without extensions or unknown extensions
209 |     try:
210 |         with open(path, "rb") as f:
211 |             chunk = f.read(1024)
212 |             # Simple heuristic: if we can decode as UTF-8, likely text
213 |             chunk.decode("utf-8")
214 |             return "text"
215 |     except UnicodeDecodeError:
216 |         return "binary"
217 |     except (FileNotFoundError, PermissionError) as e:
218 |         logger.warning(f"Could not access file {file_path} for type detection: {e}")
219 |         return "unknown"
220 | 
221 | 
222 | def should_add_line_numbers(file_path: str, include_line_numbers: Optional[bool] = None) -> bool:
223 |     """
224 |     Determine if line numbers should be added to a file.
225 | 
226 |     Args:
227 |         file_path: Path to the file
228 |         include_line_numbers: Explicit preference, or None for auto-detection
229 | 
230 |     Returns:
231 |         bool: True if line numbers should be added
232 |     """
233 |     if include_line_numbers is not None:
234 |         return include_line_numbers
235 | 
236 |     # Default: DO NOT add line numbers
237 |     # Tools that want line numbers must explicitly request them
238 |     return False
239 | 
240 | 
241 | def _normalize_line_endings(content: str) -> str:
242 |     """
243 |     Normalize line endings for consistent line numbering.
244 | 
245 |     Args:
246 |         content: File content with potentially mixed line endings
247 | 
248 |     Returns:
249 |         str: Content with normalized LF line endings
250 |     """
251 |     # Normalize all line endings to LF for consistent counting
252 |     return content.replace("\r\n", "\n").replace("\r", "\n")
253 | 
254 | 
255 | def _add_line_numbers(content: str) -> str:
256 |     """
257 |     Add line numbers to text content for precise referencing.
258 | 
259 |     Args:
260 |         content: Text content to number
261 | 
262 |     Returns:
263 |         str: Content with line numbers in format "  45│ actual code line"
264 |         Supports files up to 99,999 lines with dynamic width allocation
265 |     """
266 |     # Normalize line endings first
267 |     normalized_content = _normalize_line_endings(content)
268 |     lines = normalized_content.split("\n")
269 | 
270 |     # Dynamic width allocation based on total line count
271 |     # This supports files of any size by computing required width
272 |     total_lines = len(lines)
273 |     width = len(str(total_lines))
274 |     width = max(width, 4)  # Minimum padding for readability
275 | 
276 |     # Format with dynamic width and clear separator
277 |     numbered_lines = [f"{i + 1:{width}d}│ {line}" for i, line in enumerate(lines)]
278 | 
279 |     return "\n".join(numbered_lines)
280 | 
281 | 
282 | def resolve_and_validate_path(path_str: str) -> Path:
283 |     """
284 |     Resolves and validates a path against security policies.
285 | 
286 |     This function ensures safe file access by:
287 |     1. Requiring absolute paths (no ambiguity)
288 |     2. Resolving symlinks to prevent deception
289 |     3. Blocking access to dangerous system directories
290 | 
291 |     Args:
292 |         path_str: Path string (must be absolute)
293 | 
294 |     Returns:
295 |         Resolved Path object that is safe to access
296 | 
297 |     Raises:
298 |         ValueError: If path is not absolute or otherwise invalid
299 |         PermissionError: If path is in a dangerous location
300 |     """
301 |     # Step 1: Create a Path object
302 |     user_path = Path(path_str)
303 | 
304 |     # Step 2: Security Policy - Require absolute paths
305 |     # Relative paths could be interpreted differently depending on working directory
306 |     if not user_path.is_absolute():
307 |         raise ValueError(f"Relative paths are not supported. Please provide an absolute path.\nReceived: {path_str}")
308 | 
309 |     # Step 3: Resolve the absolute path (follows symlinks, removes .. and .)
310 |     # This is critical for security as it reveals the true destination of symlinks
311 |     resolved_path = user_path.resolve()
312 | 
313 |     # Step 4: Check against dangerous paths
314 |     if is_dangerous_path(resolved_path):
315 |         logger.warning(f"Access denied - dangerous path: {resolved_path}")
316 |         raise PermissionError(f"Access to system directory denied: {path_str}")
317 | 
318 |     # Step 5: Check if it's the home directory root
319 |     if is_home_directory_root(resolved_path):
320 |         raise PermissionError(
321 |             f"Cannot scan entire home directory: {path_str}\n" f"Please specify a subdirectory within your home folder."
322 |         )
323 | 
324 |     return resolved_path
325 | 
326 | 
327 | def expand_paths(paths: list[str], extensions: Optional[set[str]] = None) -> list[str]:
328 |     """
329 |     Expand paths to individual files, handling both files and directories.
330 | 
331 |     This function recursively walks directories to find all matching files.
332 |     It automatically filters out hidden files and common non-code directories
333 |     like __pycache__ to avoid including generated or system files.
334 | 
335 |     Args:
336 |         paths: List of file or directory paths (must be absolute)
337 |         extensions: Optional set of file extensions to include (defaults to CODE_EXTENSIONS)
338 | 
339 |     Returns:
340 |         List of individual file paths, sorted for consistent ordering
341 |     """
342 |     if extensions is None:
343 |         extensions = CODE_EXTENSIONS
344 | 
345 |     expanded_files = []
346 |     seen = set()
347 | 
348 |     for path in paths:
349 |         try:
350 |             # Validate each path for security before processing
351 |             path_obj = resolve_and_validate_path(path)
352 |         except (ValueError, PermissionError):
353 |             # Skip invalid paths silently to allow partial success
354 |             continue
355 | 
356 |         if not path_obj.exists():
357 |             continue
358 | 
359 |         # Safety checks for directory scanning
360 |         if path_obj.is_dir():
361 |             # Check 1: Prevent scanning user's home directory root
362 |             if is_home_directory_root(path_obj):
363 |                 logger.warning(f"Skipping home directory root: {path}. Please specify a project subdirectory instead.")
364 |                 continue
365 | 
366 |             # Check 2: Skip if this is the MCP's own directory
367 |             if is_mcp_directory(path_obj):
368 |                 logger.info(
369 |                     f"Skipping MCP server directory: {path}. The MCP server code is excluded from project scans."
370 |                 )
371 |                 continue
372 | 
373 |         if path_obj.is_file():
374 |             # Add file directly
375 |             if str(path_obj) not in seen:
376 |                 expanded_files.append(str(path_obj))
377 |                 seen.add(str(path_obj))
378 | 
379 |         elif path_obj.is_dir():
380 |             # Walk directory recursively to find all files
381 |             for root, dirs, files in os.walk(path_obj):
382 |                 # Filter directories in-place to skip hidden and excluded directories
383 |                 # This prevents descending into .git, .venv, __pycache__, node_modules, etc.
384 |                 original_dirs = dirs[:]
385 |                 dirs[:] = []
386 |                 for d in original_dirs:
387 |                     # Skip hidden directories
388 |                     if d.startswith("."):
389 |                         continue
390 |                     # Skip excluded directories
391 |                     if d in EXCLUDED_DIRS:
392 |                         continue
393 |                     # Skip MCP directories found during traversal
394 |                     dir_path = Path(root) / d
395 |                     if is_mcp_directory(dir_path):
396 |                         logger.debug(f"Skipping MCP directory during traversal: {dir_path}")
397 |                         continue
398 |                     dirs.append(d)
399 | 
400 |                 for file in files:
401 |                     # Skip hidden files (e.g., .DS_Store, .gitignore)
402 |                     if file.startswith("."):
403 |                         continue
404 | 
405 |                     file_path = Path(root) / file
406 | 
407 |                     # Filter by extension if specified
408 |                     if not extensions or file_path.suffix.lower() in extensions:
409 |                         full_path = str(file_path)
410 |                         # Use set to prevent duplicates
411 |                         if full_path not in seen:
412 |                             expanded_files.append(full_path)
413 |                             seen.add(full_path)
414 | 
415 |     # Sort for consistent ordering across different runs
416 |     # This makes output predictable and easier to debug
417 |     expanded_files.sort()
418 |     return expanded_files
419 | 
420 | 
421 | def read_file_content(
422 |     file_path: str, max_size: int = 1_000_000, *, include_line_numbers: Optional[bool] = None
423 | ) -> tuple[str, int]:
424 |     """
425 |     Read a single file and format it for inclusion in AI prompts.
426 | 
427 |     This function handles various error conditions gracefully and always
428 |     returns formatted content, even for errors. This ensures the AI model
429 |     gets context about what files were attempted but couldn't be read.
430 | 
431 |     Args:
432 |         file_path: Path to file (must be absolute)
433 |         max_size: Maximum file size to read (default 1MB to prevent memory issues)
434 |         include_line_numbers: Whether to add line numbers. If None, auto-detects based on file type
435 | 
436 |     Returns:
437 |         Tuple of (formatted_content, estimated_tokens)
438 |         Content is wrapped with clear delimiters for AI parsing
439 |     """
440 |     logger.debug(f"[FILES] read_file_content called for: {file_path}")
441 |     try:
442 |         # Validate path security before any file operations
443 |         path = resolve_and_validate_path(file_path)
444 |         logger.debug(f"[FILES] Path validated and resolved: {path}")
445 |     except (ValueError, PermissionError) as e:
446 |         # Return error in a format that provides context to the AI
447 |         logger.debug(f"[FILES] Path validation failed for {file_path}: {type(e).__name__}: {e}")
448 |         error_msg = str(e)
449 |         content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {error_msg}\n--- END FILE ---\n"
450 |         tokens = estimate_tokens(content)
451 |         logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
452 |         return content, tokens
453 | 
454 |     try:
455 |         # Validate file existence and type
456 |         if not path.exists():
457 |             logger.debug(f"[FILES] File does not exist: {file_path}")
458 |             content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
459 |             return content, estimate_tokens(content)
460 | 
461 |         if not path.is_file():
462 |             logger.debug(f"[FILES] Path is not a file: {file_path}")
463 |             content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
464 |             return content, estimate_tokens(content)
465 | 
466 |         # Check file size to prevent memory exhaustion
467 |         stat_result = path.stat()
468 |         file_size = stat_result.st_size
469 |         logger.debug(f"[FILES] File size for {file_path}: {file_size:,} bytes")
470 |         if file_size > max_size:
471 |             logger.debug(f"[FILES] File too large: {file_path} ({file_size:,} > {max_size:,} bytes)")
472 |             modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
473 |             content = (
474 |                 f"\n--- FILE TOO LARGE: {file_path} (Last modified: {modified_at}) ---\n"
475 |                 f"File size: {file_size:,} bytes (max: {max_size:,})\n"
476 |                 "--- END FILE ---\n"
477 |             )
478 |             return content, estimate_tokens(content)
479 | 
480 |         # Determine if we should add line numbers
481 |         add_line_numbers = should_add_line_numbers(file_path, include_line_numbers)
482 |         logger.debug(f"[FILES] Line numbers for {file_path}: {'enabled' if add_line_numbers else 'disabled'}")
483 | 
484 |         # Read the file with UTF-8 encoding, replacing invalid characters
485 |         # This ensures we can handle files with mixed encodings
486 |         logger.debug(f"[FILES] Reading file content for {file_path}")
487 |         with open(path, encoding="utf-8", errors="replace") as f:
488 |             file_content = f.read()
489 | 
490 |         logger.debug(f"[FILES] Successfully read {len(file_content)} characters from {file_path}")
491 | 
492 |         # Add line numbers if requested or auto-detected
493 |         if add_line_numbers:
494 |             file_content = _add_line_numbers(file_content)
495 |             logger.debug(f"[FILES] Added line numbers to {file_path}")
496 |         else:
497 |             # Still normalize line endings for consistency
498 |             file_content = _normalize_line_endings(file_content)
499 | 
500 |         # Format with clear delimiters that help the AI understand file boundaries
501 |         # Using consistent markers makes it easier for the model to parse
502 |         # NOTE: These markers ("--- BEGIN FILE: ... ---") are distinct from git diff markers
503 |         # ("--- BEGIN DIFF: ... ---") to allow AI to distinguish between complete file content
504 |         # vs. partial diff content when files appear in both sections
505 |         modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
506 |         formatted = (
507 |             f"\n--- BEGIN FILE: {file_path} (Last modified: {modified_at}) ---\n"
508 |             f"{file_content}\n"
509 |             f"--- END FILE: {file_path} ---\n"
510 |         )
511 |         tokens = estimate_tokens(formatted)
512 |         logger.debug(f"[FILES] Formatted content for {file_path}: {len(formatted)} chars, {tokens} tokens")
513 |         return formatted, tokens
514 | 
515 |     except Exception as e:
516 |         logger.debug(f"[FILES] Exception reading file {file_path}: {type(e).__name__}: {e}")
517 |         content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
518 |         tokens = estimate_tokens(content)
519 |         logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
520 |         return content, tokens
521 | 
522 | 
523 | def read_files(
524 |     file_paths: list[str],
525 |     code: Optional[str] = None,
526 |     max_tokens: Optional[int] = None,
527 |     reserve_tokens: int = 50_000,
528 |     *,
529 |     include_line_numbers: bool = False,
530 | ) -> str:
531 |     """
532 |     Read multiple files and optional direct code with smart token management.
533 | 
534 |     This function implements intelligent token budgeting to maximize the amount
535 |     of relevant content that can be included in an AI prompt while staying
536 |     within token limits. It prioritizes direct code and reads files until
537 |     the token budget is exhausted.
538 | 
539 |     Args:
540 |         file_paths: List of file or directory paths (absolute paths required)
541 |         code: Optional direct code to include (prioritized over files)
542 |         max_tokens: Maximum tokens to use (defaults to DEFAULT_CONTEXT_WINDOW)
543 |         reserve_tokens: Tokens to reserve for prompt and response (default 50K)
544 |         include_line_numbers: Whether to add line numbers to file content
545 | 
546 |     Returns:
547 |         str: All file contents formatted for AI consumption
548 |     """
549 |     if max_tokens is None:
550 |         max_tokens = DEFAULT_CONTEXT_WINDOW
551 | 
552 |     logger.debug(f"[FILES] read_files called with {len(file_paths)} paths")
553 |     logger.debug(
554 |         f"[FILES] Token budget: max={max_tokens:,}, reserve={reserve_tokens:,}, available={max_tokens - reserve_tokens:,}"
555 |     )
556 | 
557 |     content_parts = []
558 |     total_tokens = 0
559 |     available_tokens = max_tokens - reserve_tokens
560 | 
561 |     files_skipped = []
562 | 
563 |     # Priority 1: Handle direct code if provided
564 |     # Direct code is prioritized because it's explicitly provided by the user
565 |     if code:
566 |         formatted_code = f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n"
567 |         code_tokens = estimate_tokens(formatted_code)
568 | 
569 |         if code_tokens <= available_tokens:
570 |             content_parts.append(formatted_code)
571 |             total_tokens += code_tokens
572 |             available_tokens -= code_tokens
573 | 
574 |     # Priority 2: Process file paths
575 |     if file_paths:
576 |         # Expand directories to get all individual files
577 |         logger.debug(f"[FILES] Expanding {len(file_paths)} file paths")
578 |         all_files = expand_paths(file_paths)
579 |         logger.debug(f"[FILES] After expansion: {len(all_files)} individual files")
580 | 
581 |         if not all_files and file_paths:
582 |             # No files found but paths were provided
583 |             logger.debug("[FILES] No files found from provided paths")
584 |             content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n")
585 |         else:
586 |             # Read files sequentially until token limit is reached
587 |             logger.debug(f"[FILES] Reading {len(all_files)} files with token budget {available_tokens:,}")
588 |             for i, file_path in enumerate(all_files):
589 |                 if total_tokens >= available_tokens:
590 |                     logger.debug(f"[FILES] Token budget exhausted, skipping remaining {len(all_files) - i} files")
591 |                     files_skipped.extend(all_files[i:])
592 |                     break
593 | 
594 |                 file_content, file_tokens = read_file_content(file_path, include_line_numbers=include_line_numbers)
595 |                 logger.debug(f"[FILES] File {file_path}: {file_tokens:,} tokens")
596 | 
597 |                 # Check if adding this file would exceed limit
598 |                 if total_tokens + file_tokens <= available_tokens:
599 |                     content_parts.append(file_content)
600 |                     total_tokens += file_tokens
601 |                     logger.debug(f"[FILES] Added file {file_path}, total tokens: {total_tokens:,}")
602 |                 else:
603 |                     # File too large for remaining budget
604 |                     logger.debug(
605 |                         f"[FILES] File {file_path} too large for remaining budget ({file_tokens:,} tokens, {available_tokens - total_tokens:,} remaining)"
606 |                     )
607 |                     files_skipped.append(file_path)
608 | 
609 |     # Add informative note about skipped files to help users understand
610 |     # what was omitted and why
611 |     if files_skipped:
612 |         logger.debug(f"[FILES] {len(files_skipped)} files skipped due to token limits")
613 |         skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
614 |         skip_note += f"Total skipped: {len(files_skipped)}\n"
615 |         # Show first 10 skipped files as examples
616 |         for _i, file_path in enumerate(files_skipped[:10]):
617 |             skip_note += f"  - {file_path}\n"
618 |         if len(files_skipped) > 10:
619 |             skip_note += f"  ... and {len(files_skipped) - 10} more\n"
620 |         skip_note += "--- END SKIPPED FILES ---\n"
621 |         content_parts.append(skip_note)
622 | 
623 |     result = "\n\n".join(content_parts) if content_parts else ""
624 |     logger.debug(f"[FILES] read_files complete: {len(result)} chars, {total_tokens:,} tokens used")
625 |     return result
626 | 
627 | 
628 | def estimate_file_tokens(file_path: str) -> int:
629 |     """
630 |     Estimate tokens for a file using file-type aware ratios.
631 | 
632 |     Args:
633 |         file_path: Path to the file
634 | 
635 |     Returns:
636 |         Estimated token count for the file
637 |     """
638 |     try:
639 |         if not os.path.exists(file_path) or not os.path.isfile(file_path):
640 |             return 0
641 | 
642 |         file_size = os.path.getsize(file_path)
643 | 
644 |         # Get the appropriate ratio for this file type
645 |         from .file_types import get_token_estimation_ratio
646 | 
647 |         ratio = get_token_estimation_ratio(file_path)
648 | 
649 |         return int(file_size / ratio)
650 |     except Exception:
651 |         return 0
652 | 
653 | 
654 | def check_files_size_limit(files: list[str], max_tokens: int, threshold_percent: float = 1.0) -> tuple[bool, int, int]:
655 |     """
656 |     Check if a list of files would exceed token limits.
657 | 
658 |     Args:
659 |         files: List of file paths to check
660 |         max_tokens: Maximum allowed tokens
661 |         threshold_percent: Percentage of max_tokens to use as threshold (0.0-1.0)
662 | 
663 |     Returns:
664 |         Tuple of (within_limit, total_estimated_tokens, file_count)
665 |     """
666 |     if not files:
667 |         return True, 0, 0
668 | 
669 |     total_estimated_tokens = 0
670 |     file_count = 0
671 |     threshold = int(max_tokens * threshold_percent)
672 | 
673 |     for file_path in files:
674 |         try:
675 |             estimated_tokens = estimate_file_tokens(file_path)
676 |             total_estimated_tokens += estimated_tokens
677 |             if estimated_tokens > 0:  # Only count accessible files
678 |                 file_count += 1
679 |         except Exception:
680 |             # Skip files that can't be accessed for size check
681 |             continue
682 | 
683 |     within_limit = total_estimated_tokens <= threshold
684 |     return within_limit, total_estimated_tokens, file_count
685 | 
686 | 
687 | def read_json_file(file_path: str) -> Optional[dict]:
688 |     """
689 |     Read and parse a JSON file with proper error handling.
690 | 
691 |     Args:
692 |         file_path: Path to the JSON file
693 | 
694 |     Returns:
695 |         Parsed JSON data as dict, or None if file doesn't exist or invalid
696 |     """
697 |     try:
698 |         if not os.path.exists(file_path):
699 |             return None
700 | 
701 |         with open(file_path, encoding="utf-8") as f:
702 |             return json.load(f)
703 |     except (json.JSONDecodeError, OSError):
704 |         return None
705 | 
706 | 
707 | def write_json_file(file_path: str, data: dict, indent: int = 2) -> bool:
708 |     """
709 |     Write data to a JSON file with proper formatting.
710 | 
711 |     Args:
712 |         file_path: Path to write the JSON file
713 |         data: Dictionary data to serialize
714 |         indent: JSON indentation level
715 | 
716 |     Returns:
717 |         True if successful, False otherwise
718 |     """
719 |     try:
720 |         os.makedirs(os.path.dirname(file_path), exist_ok=True)
721 | 
722 |         with open(file_path, "w", encoding="utf-8") as f:
723 |             json.dump(data, f, indent=indent, ensure_ascii=False)
724 |         return True
725 |     except (OSError, TypeError):
726 |         return False
727 | 
728 | 
729 | def get_file_size(file_path: str) -> int:
730 |     """
731 |     Get file size in bytes with proper error handling.
732 | 
733 |     Args:
734 |         file_path: Path to the file
735 | 
736 |     Returns:
737 |         File size in bytes, or 0 if file doesn't exist or error
738 |     """
739 |     try:
740 |         if os.path.exists(file_path) and os.path.isfile(file_path):
741 |             return os.path.getsize(file_path)
742 |         return 0
743 |     except OSError:
744 |         return 0
745 | 
746 | 
747 | def ensure_directory_exists(file_path: str) -> bool:
748 |     """
749 |     Ensure the parent directory of a file path exists.
750 | 
751 |     Args:
752 |         file_path: Path to file (directory will be created for parent)
753 | 
754 |     Returns:
755 |         True if directory exists or was created, False on error
756 |     """
757 |     try:
758 |         directory = os.path.dirname(file_path)
759 |         if directory:
760 |             os.makedirs(directory, exist_ok=True)
761 |         return True
762 |     except OSError:
763 |         return False
764 | 
765 | 
766 | def is_text_file(file_path: str) -> bool:
767 |     """
768 |     Check if a file is likely a text file based on extension and content.
769 | 
770 |     Args:
771 |         file_path: Path to the file
772 | 
773 |     Returns:
774 |         True if file appears to be text, False otherwise
775 |     """
776 |     from .file_types import is_text_file as check_text_type
777 | 
778 |     return check_text_type(file_path)
779 | 
780 | 
781 | def read_file_safely(file_path: str, max_size: int = 10 * 1024 * 1024) -> Optional[str]:
782 |     """
783 |     Read a file with size limits and encoding handling.
784 | 
785 |     Args:
786 |         file_path: Path to the file
787 |         max_size: Maximum file size in bytes (default 10MB)
788 | 
789 |     Returns:
790 |         File content as string, or None if file too large or unreadable
791 |     """
792 |     try:
793 |         if not os.path.exists(file_path) or not os.path.isfile(file_path):
794 |             return None
795 | 
796 |         file_size = os.path.getsize(file_path)
797 |         if file_size > max_size:
798 |             return None
799 | 
800 |         with open(file_path, encoding="utf-8", errors="ignore") as f:
801 |             return f.read()
802 |     except OSError:
803 |         return None
804 | 
805 | 
806 | def check_total_file_size(files: list[str], model_name: str) -> Optional[dict]:
807 |     """
808 |     Check if total file sizes would exceed token threshold before embedding.
809 | 
810 |     IMPORTANT: This performs STRICT REJECTION at MCP boundary.
811 |     No partial inclusion - either all files fit or request is rejected.
812 |     This forces the CLI to make better file selection decisions.
813 | 
814 |     This function MUST be called with the effective model name (after resolution).
815 |     It should never receive 'auto' or None - model resolution happens earlier.
816 | 
817 |     Args:
818 |         files: List of file paths to check
819 |         model_name: The resolved model name for context-aware thresholds (required)
820 | 
821 |     Returns:
822 |         Dict with `code_too_large` response if too large, None if acceptable
823 |     """
824 |     if not files:
825 |         return None
826 | 
827 |     # Validate we have a proper model name (not auto or None)
828 |     if not model_name or model_name.lower() == "auto":
829 |         raise ValueError(
830 |             f"check_total_file_size called with unresolved model: '{model_name}'. "
831 |             "Model must be resolved before file size checking."
832 |         )
833 | 
834 |     logger.info(f"File size check: Using model '{model_name}' for token limit calculation")
835 | 
836 |     from utils.model_context import ModelContext
837 | 
838 |     model_context = ModelContext(model_name)
839 |     token_allocation = model_context.calculate_token_allocation()
840 | 
841 |     # Dynamic threshold based on model capacity
842 |     context_window = token_allocation.total_tokens
843 |     if context_window >= 1_000_000:  # Gemini-class models
844 |         threshold_percent = 0.8  # Can be more generous
845 |     elif context_window >= 500_000:  # Mid-range models
846 |         threshold_percent = 0.7  # Moderate
847 |     else:  # OpenAI-class models (200K)
848 |         threshold_percent = 0.6  # Conservative
849 | 
850 |     max_file_tokens = int(token_allocation.file_tokens * threshold_percent)
851 | 
852 |     # Use centralized file size checking (threshold already applied to max_file_tokens)
853 |     within_limit, total_estimated_tokens, file_count = check_files_size_limit(files, max_file_tokens)
854 | 
855 |     if not within_limit:
856 |         return {
857 |             "status": "code_too_large",
858 |             "content": (
859 |                 f"The selected files are too large for analysis "
860 |                 f"(estimated {total_estimated_tokens:,} tokens, limit {max_file_tokens:,}). "
861 |                 f"Please select fewer, more specific files that are most relevant "
862 |                 f"to your question, then invoke the tool again."
863 |             ),
864 |             "content_type": "text",
865 |             "metadata": {
866 |                 "total_estimated_tokens": total_estimated_tokens,
867 |                 "limit": max_file_tokens,
868 |                 "file_count": file_count,
869 |                 "threshold_percent": threshold_percent,
870 |                 "model_context_window": context_window,
871 |                 "model_name": model_name,
872 |                 "instructions": "Reduce file selection and try again - all files must fit within budget. If this persists, please use a model with a larger context window where available.",
873 |             },
874 |         }
875 | 
876 |     return None  # Proceed with ALL files
877 | 
```
Page 17/25FirstPrevNextLast