This is page 17 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/tools/docgen.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Documentation Generation tool - Automated code documentation with complexity analysis
3 |
4 | This tool provides a structured workflow for adding comprehensive documentation to codebases.
5 | It guides you through systematic code analysis to generate modern documentation with:
6 | - Function/method parameter documentation
7 | - Big O complexity analysis
8 | - Call flow and dependency documentation
9 | - Inline comments for complex logic
10 | - Smart updating of existing documentation
11 |
12 | Key features:
13 | - Step-by-step documentation workflow with progress tracking
14 | - Context-aware file embedding (references during analysis, full content for documentation)
15 | - Automatic conversation threading and history preservation
16 | - Expert analysis integration with external models
17 | - Support for multiple programming languages and documentation styles
18 | - Configurable documentation features via parameters
19 | """
20 |
21 | import logging
22 | from typing import TYPE_CHECKING, Any, Optional
23 |
24 | from pydantic import Field
25 |
26 | if TYPE_CHECKING:
27 | from tools.models import ToolModelCategory
28 |
29 | from config import TEMPERATURE_ANALYTICAL
30 | from systemprompts import DOCGEN_PROMPT
31 | from tools.shared.base_models import WorkflowRequest
32 |
33 | from .workflow.base import WorkflowTool
34 |
35 | logger = logging.getLogger(__name__)
36 |
37 | # Tool-specific field descriptions for documentation generation
38 | DOCGEN_FIELD_DESCRIPTIONS = {
39 | "step": (
40 | "Step 1 (Discovery): list every file that needs documentation and record the total. Do not write docs yet. "
41 | "Steps 2+: document exactly one file per step. Never change code logic; log bugs separately. Keep the counters accurate."
42 | ),
43 | "step_number": "Current documentation step (starts at 1).",
44 | "total_steps": "1 discovery step + one step per file documented (tracks via `total_files_to_document`).",
45 | "next_step_required": "True while more files still need documentation; False once everything is complete.",
46 | "findings": "Summarize documentation gaps, complexity, call flows, and well-documented areas. Stop and report immediately if you uncover a bug.",
47 | "relevant_files": "Absolute paths for the file(s) you are documenting this step—stick to a single file per step.",
48 | "relevant_context": "Functions or methods needing documentation (e.g. 'Class.method', 'function_name'), especially complex or user-facing areas.",
49 | "num_files_documented": "Count of files finished so far. Increment only when a file is fully documented.",
50 | "total_files_to_document": "Total files identified in discovery; completion requires matching this count.",
51 | "document_complexity": "Include algorithmic complexity (Big O) analysis when True (default).",
52 | "document_flow": "Include call flow/dependency notes when True (default).",
53 | "update_existing": "True (default) to polish inaccurate or outdated docs instead of leaving them untouched.",
54 | "comments_on_complex_logic": "True (default) to add inline comments around non-obvious logic.",
55 | }
56 |
57 |
58 | class DocgenRequest(WorkflowRequest):
59 | """Request model for documentation generation steps"""
60 |
61 | # Required workflow fields
62 | step: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step"])
63 | step_number: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step_number"])
64 | total_steps: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["total_steps"])
65 | next_step_required: bool = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["next_step_required"])
66 |
67 | # Documentation analysis tracking fields
68 | findings: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["findings"])
69 | relevant_files: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_files"])
70 | relevant_context: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_context"])
71 |
72 | # Critical completion tracking counters
73 | num_files_documented: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"])
74 | total_files_to_document: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"])
75 |
76 | # Documentation generation configuration parameters
77 | document_complexity: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_complexity"])
78 | document_flow: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_flow"])
79 | update_existing: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["update_existing"])
80 | comments_on_complex_logic: Optional[bool] = Field(
81 | True, description=DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"]
82 | )
83 |
84 |
85 | class DocgenTool(WorkflowTool):
86 | """
87 | Documentation generation tool for automated code documentation with complexity analysis.
88 |
89 | This tool implements a structured documentation workflow that guides users through
90 | methodical code analysis to generate comprehensive documentation including:
91 | - Function/method signatures and parameter descriptions
92 | - Algorithmic complexity (Big O) analysis
93 | - Call flow and dependency documentation
94 | - Inline comments for complex logic
95 | - Modern documentation style appropriate for the language/platform
96 | """
97 |
98 | def __init__(self):
99 | super().__init__()
100 | self.initial_request = None
101 |
102 | def get_name(self) -> str:
103 | return "docgen"
104 |
105 | def get_description(self) -> str:
106 | return (
107 | "Generates comprehensive code documentation with systematic analysis of functions, classes, and complexity. "
108 | "Use for documentation generation, code analysis, complexity assessment, and API documentation. "
109 | "Analyzes code structure and patterns to create thorough documentation."
110 | )
111 |
112 | def get_system_prompt(self) -> str:
113 | return DOCGEN_PROMPT
114 |
115 | def get_default_temperature(self) -> float:
116 | return TEMPERATURE_ANALYTICAL
117 |
118 | def get_model_category(self) -> "ToolModelCategory":
119 | """Docgen requires analytical and reasoning capabilities"""
120 | from tools.models import ToolModelCategory
121 |
122 | return ToolModelCategory.EXTENDED_REASONING
123 |
124 | def requires_model(self) -> bool:
125 | """
126 | Docgen tool doesn't require model resolution at the MCP boundary.
127 |
128 | The docgen tool is a self-contained workflow tool that guides the CLI agent through
129 | systematic documentation generation without calling external AI models.
130 |
131 | Returns:
132 | bool: False - docgen doesn't need external AI model access
133 | """
134 | return False
135 |
136 | def requires_expert_analysis(self) -> bool:
137 | """Docgen is self-contained and doesn't need expert analysis."""
138 | return False
139 |
140 | def get_workflow_request_model(self):
141 | """Return the docgen-specific request model."""
142 | return DocgenRequest
143 |
144 | def get_tool_fields(self) -> dict[str, dict[str, Any]]:
145 | """Return the tool-specific fields for docgen."""
146 | return {
147 | "document_complexity": {
148 | "type": "boolean",
149 | "default": True,
150 | "description": DOCGEN_FIELD_DESCRIPTIONS["document_complexity"],
151 | },
152 | "document_flow": {
153 | "type": "boolean",
154 | "default": True,
155 | "description": DOCGEN_FIELD_DESCRIPTIONS["document_flow"],
156 | },
157 | "update_existing": {
158 | "type": "boolean",
159 | "default": True,
160 | "description": DOCGEN_FIELD_DESCRIPTIONS["update_existing"],
161 | },
162 | "comments_on_complex_logic": {
163 | "type": "boolean",
164 | "default": True,
165 | "description": DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"],
166 | },
167 | "num_files_documented": {
168 | "type": "integer",
169 | "default": 0,
170 | "minimum": 0,
171 | "description": DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"],
172 | },
173 | "total_files_to_document": {
174 | "type": "integer",
175 | "default": 0,
176 | "minimum": 0,
177 | "description": DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"],
178 | },
179 | }
180 |
181 | def get_required_fields(self) -> list[str]:
182 | """Return additional required fields beyond the standard workflow requirements."""
183 | return [
184 | "document_complexity",
185 | "document_flow",
186 | "update_existing",
187 | "comments_on_complex_logic",
188 | "num_files_documented",
189 | "total_files_to_document",
190 | ]
191 |
192 | def get_input_schema(self) -> dict[str, Any]:
193 | """Generate input schema using WorkflowSchemaBuilder with field exclusions."""
194 | from .workflow.schema_builders import WorkflowSchemaBuilder
195 |
196 | # Exclude workflow fields that documentation generation doesn't need
197 | excluded_workflow_fields = [
198 | "confidence", # Documentation doesn't use confidence levels
199 | "hypothesis", # Documentation doesn't use hypothesis
200 | "files_checked", # Documentation uses doc_files and doc_methods instead for better tracking
201 | ]
202 |
203 | # Exclude common fields that documentation generation doesn't need
204 | excluded_common_fields = [
205 | "model", # Documentation doesn't need external model selection
206 | "temperature", # Documentation doesn't need temperature control
207 | "thinking_mode", # Documentation doesn't need thinking mode
208 | "images", # Documentation doesn't use images
209 | ]
210 |
211 | return WorkflowSchemaBuilder.build_schema(
212 | tool_specific_fields=self.get_tool_fields(),
213 | required_fields=self.get_required_fields(), # Include docgen-specific required fields
214 | model_field_schema=None, # Exclude model field - docgen doesn't need external model selection
215 | auto_mode=False, # Force non-auto mode to prevent model field addition
216 | tool_name=self.get_name(),
217 | excluded_workflow_fields=excluded_workflow_fields,
218 | excluded_common_fields=excluded_common_fields,
219 | )
220 |
221 | def get_required_actions(
222 | self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
223 | ) -> list[str]:
224 | """Define required actions for comprehensive documentation analysis with step-by-step file focus."""
225 | if step_number == 1:
226 | # Initial discovery ONLY - no documentation yet
227 | return [
228 | "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
229 | "Discover ALL files in the current directory (not nested) that need documentation",
230 | "COUNT the exact number of files that need documentation",
231 | "LIST all the files you found that need documentation by name",
232 | "IDENTIFY the programming language(s) to use MODERN documentation style (/// for Objective-C, /** */ for Java/JavaScript, etc.)",
233 | "DO NOT start documenting any files yet - this is discovery phase only",
234 | "Report the total count and file list clearly to the user",
235 | "IMMEDIATELY call docgen step 2 after discovery to begin documentation phase",
236 | "WHEN CALLING DOCGEN step 2: Set total_files_to_document to the exact count you found",
237 | "WHEN CALLING DOCGEN step 2: Set num_files_documented to 0 (haven't started yet)",
238 | ]
239 | elif step_number == 2:
240 | # Start documentation phase with first file
241 | return [
242 | "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
243 | "Choose the FIRST file from your discovered list to start documentation",
244 | "For the chosen file: identify ALL functions, classes, and methods within it",
245 | 'USE MODERN documentation style for the programming language (/// for Objective-C, /** */ for Java/JavaScript, """ for Python, etc.)',
246 | "Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
247 | "When file is 100% documented, increment num_files_documented from 0 to 1",
248 | "Note any dependencies this file has (what it imports/calls) and what calls into it",
249 | "CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
250 | "Report which specific functions you documented in this step for accountability",
251 | "Report progress: num_files_documented (1) out of total_files_to_document",
252 | ]
253 | elif step_number <= 4:
254 | # Continue with focused file-by-file approach
255 | return [
256 | "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
257 | "Choose the NEXT undocumented file from your discovered list",
258 | "For the chosen file: identify ALL functions, classes, and methods within it",
259 | "USE MODERN documentation style for the programming language (NEVER use legacy /* */ style for languages with modern alternatives)",
260 | "Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
261 | "When file is 100% documented, increment num_files_documented by 1",
262 | "Verify that EVERY function in the current file has proper documentation (no skipping)",
263 | "CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
264 | "Report specific function names you documented for verification",
265 | "Report progress: current num_files_documented out of total_files_to_document",
266 | ]
267 | else:
268 | # Continue systematic file-by-file coverage
269 | return [
270 | "CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
271 | "Check counters: num_files_documented vs total_files_to_document",
272 | "If num_files_documented < total_files_to_document: choose NEXT undocumented file",
273 | "USE MODERN documentation style appropriate for each programming language (NEVER legacy styles)",
274 | "Document every function, method, and class in current file with no exceptions",
275 | "When file is 100% documented, increment num_files_documented by 1",
276 | "CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
277 | "Report progress: current num_files_documented out of total_files_to_document",
278 | "If num_files_documented < total_files_to_document: RESTART docgen with next step",
279 | "ONLY set next_step_required=false when num_files_documented equals total_files_to_document",
280 | "For nested dependencies: check if functions call into subdirectories and document those too",
281 | "CRITICAL: If ANY bugs/logic errors were found, STOP and ask user before proceeding",
282 | ]
283 |
284 | def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
285 | """Docgen is self-contained and doesn't need expert analysis."""
286 | return False
287 |
288 | def prepare_expert_analysis_context(self, consolidated_findings) -> str:
289 | """Docgen doesn't use expert analysis."""
290 | return ""
291 |
292 | def get_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
293 | """
294 | Provide step-specific guidance for documentation generation workflow.
295 |
296 | This method generates docgen-specific guidance used by get_step_guidance_message().
297 | """
298 | # Generate the next steps instruction based on required actions
299 | # Calculate dynamic total_steps based on files to document
300 | total_files_to_document = self.get_request_total_files_to_document(request)
301 | calculated_total_steps = 1 + total_files_to_document if total_files_to_document > 0 else request.total_steps
302 |
303 | required_actions = self.get_required_actions(step_number, confidence, request.findings, calculated_total_steps)
304 |
305 | if step_number == 1:
306 | next_steps = (
307 | f"DISCOVERY PHASE ONLY - DO NOT START DOCUMENTING YET!\n"
308 | f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first perform "
309 | f"FILE DISCOVERY step by step. DO NOT DOCUMENT ANYTHING YET. "
310 | f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
311 | + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
312 | + f"\n\nCRITICAL: When you call {self.get_name()} step 2, set total_files_to_document to the exact count "
313 | f"of files needing documentation and set num_files_documented to 0 (haven't started documenting yet). "
314 | f"Your total_steps will be automatically calculated as 1 (discovery) + number of files to document. "
315 | f"Step 2 will BEGIN the documentation phase. Report the count clearly and then IMMEDIATELY "
316 | f"proceed to call {self.get_name()} step 2 to start documenting the first file."
317 | )
318 | elif step_number == 2:
319 | next_steps = (
320 | f"DOCUMENTATION PHASE BEGINS! ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
321 | f"START FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
322 | f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
323 | + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
324 | + f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented from 0 to 1 when first file complete. "
325 | f"REPORT counters: current num_files_documented out of total_files_to_document. "
326 | f"CRITICAL: If you found ANY bugs/logic errors, STOP documenting and ask user what to do before continuing. "
327 | f"Do NOT move to a new file until the current one is completely documented. "
328 | f"When ready for step {step_number + 1}, report completed work with updated counters."
329 | )
330 | elif step_number <= 4:
331 | next_steps = (
332 | f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
333 | f"CONTINUE FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
334 | f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
335 | + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
336 | + f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented when file complete. "
337 | f"REPORT counters: current num_files_documented out of total_files_to_document. "
338 | f"CRITICAL: If you found ANY bugs/logic errors, STOP documenting and ask user what to do before continuing. "
339 | f"Do NOT move to a new file until the current one is completely documented. "
340 | f"When ready for step {step_number + 1}, report completed work with updated counters."
341 | )
342 | else:
343 | next_steps = (
344 | f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
345 | f"CRITICAL: Check if MORE FILES need documentation before finishing! "
346 | f"REQUIRED ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
347 | + "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
348 | + f"\n\nREPORT which functions you documented and update num_files_documented when file complete. "
349 | f"CHECK: If num_files_documented < total_files_to_document, RESTART {self.get_name()} with next step! "
350 | f"CRITICAL: Only set next_step_required=false when num_files_documented equals total_files_to_document! "
351 | f"REPORT counters: current num_files_documented out of total_files_to_document. "
352 | f"CRITICAL: If ANY bugs/logic errors were found during documentation, STOP and ask user before proceeding. "
353 | f"NO recursive {self.get_name()} calls without actual documentation work!"
354 | )
355 |
356 | return {"next_steps": next_steps}
357 |
358 | # Hook method overrides for docgen-specific behavior
359 |
360 | async def handle_work_completion(self, response_data: dict, request, arguments: dict) -> dict:
361 | """
362 | Override work completion to enforce counter validation.
363 |
364 | The docgen tool MUST complete ALL files before finishing. If counters don't match,
365 | force continuation regardless of next_step_required setting.
366 | """
367 | # CRITICAL VALIDATION: Check if all files have been documented using proper inheritance hooks
368 | num_files_documented = self.get_request_num_files_documented(request)
369 | total_files_to_document = self.get_request_total_files_to_document(request)
370 |
371 | if num_files_documented < total_files_to_document:
372 | # Counters don't match - force continuation!
373 | logger.warning(
374 | f"Docgen stopping early: {num_files_documented} < {total_files_to_document}. "
375 | f"Forcing continuation to document remaining files."
376 | )
377 |
378 | # Override to continuation mode
379 | response_data["status"] = "documentation_analysis_required"
380 | response_data[f"pause_for_{self.get_name()}"] = True
381 | response_data["next_steps"] = (
382 | f"CRITICAL ERROR: You attempted to finish documentation with only {num_files_documented} "
383 | f"out of {total_files_to_document} files documented! You MUST continue documenting "
384 | f"the remaining {total_files_to_document - num_files_documented} files. "
385 | f"Call {self.get_name()} again with step {request.step_number + 1} and continue documentation "
386 | f"of the next undocumented file. DO NOT set next_step_required=false until ALL files are documented!"
387 | )
388 | return response_data
389 |
390 | # If counters match, proceed with normal completion
391 | return await super().handle_work_completion(response_data, request, arguments)
392 |
393 | def prepare_step_data(self, request) -> dict:
394 | """
395 | Prepare docgen-specific step data for processing.
396 |
397 | Calculates total_steps dynamically based on number of files to document:
398 | - Step 1: Discovery phase
399 | - Steps 2+: One step per file to document
400 | """
401 | # Calculate dynamic total_steps based on files to document
402 | total_files_to_document = self.get_request_total_files_to_document(request)
403 | if total_files_to_document > 0:
404 | # Discovery step (1) + one step per file
405 | calculated_total_steps = 1 + total_files_to_document
406 | else:
407 | # Fallback to request total_steps if no file count available
408 | calculated_total_steps = request.total_steps
409 |
410 | step_data = {
411 | "step": request.step,
412 | "step_number": request.step_number,
413 | "total_steps": calculated_total_steps, # Use calculated value
414 | "findings": request.findings,
415 | "relevant_files": request.relevant_files,
416 | "relevant_context": request.relevant_context,
417 | "num_files_documented": request.num_files_documented,
418 | "total_files_to_document": request.total_files_to_document,
419 | "issues_found": [], # Docgen uses this for documentation gaps
420 | "confidence": "medium", # Default confidence for docgen
421 | "hypothesis": "systematic_documentation_needed", # Default hypothesis
422 | "images": [], # Docgen doesn't typically use images
423 | # CRITICAL: Include documentation configuration parameters so the model can see them
424 | "document_complexity": request.document_complexity,
425 | "document_flow": request.document_flow,
426 | "update_existing": request.update_existing,
427 | "comments_on_complex_logic": request.comments_on_complex_logic,
428 | }
429 | return step_data
430 |
431 | def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
432 | """
433 | Docgen tool skips expert analysis when the CLI agent has "certain" confidence.
434 | """
435 | return request.confidence == "certain" and not request.next_step_required
436 |
437 | # Override inheritance hooks for docgen-specific behavior
438 |
439 | def get_completion_status(self) -> str:
440 | """Docgen tools use docgen-specific status."""
441 | return "documentation_analysis_complete"
442 |
443 | def get_completion_data_key(self) -> str:
444 | """Docgen uses 'complete_documentation_analysis' key."""
445 | return "complete_documentation_analysis"
446 |
447 | def get_final_analysis_from_request(self, request):
448 | """Docgen tools use 'hypothesis' field for documentation strategy."""
449 | return request.hypothesis
450 |
451 | def get_confidence_level(self, request) -> str:
452 | """Docgen tools use 'certain' for high confidence."""
453 | return request.confidence or "high"
454 |
455 | def get_completion_message(self) -> str:
456 | """Docgen-specific completion message."""
457 | return (
458 | "Documentation analysis complete with high confidence. You have identified the comprehensive "
459 | "documentation needs and strategy. MANDATORY: Present the user with the documentation plan "
460 | "and IMMEDIATELY proceed with implementing the documentation without requiring further "
461 | "consultation. Focus on the precise documentation improvements needed."
462 | )
463 |
464 | def get_skip_reason(self) -> str:
465 | """Docgen-specific skip reason."""
466 | return "Completed comprehensive documentation analysis locally"
467 |
468 | def get_request_relevant_context(self, request) -> list:
469 | """Get relevant_context for docgen tool."""
470 | try:
471 | return request.relevant_context or []
472 | except AttributeError:
473 | return []
474 |
475 | def get_request_num_files_documented(self, request) -> int:
476 | """Get num_files_documented from request. Override for custom handling."""
477 | try:
478 | return request.num_files_documented or 0
479 | except AttributeError:
480 | return 0
481 |
482 | def get_request_total_files_to_document(self, request) -> int:
483 | """Get total_files_to_document from request. Override for custom handling."""
484 | try:
485 | return request.total_files_to_document or 0
486 | except AttributeError:
487 | return 0
488 |
489 | def get_skip_expert_analysis_status(self) -> str:
490 | """Docgen-specific expert analysis skip status."""
491 | return "skipped_due_to_complete_analysis"
492 |
493 | def prepare_work_summary(self) -> str:
494 | """Docgen-specific work summary."""
495 | try:
496 | return f"Completed {len(self.work_history)} documentation analysis steps"
497 | except AttributeError:
498 | return "Completed documentation analysis"
499 |
500 | def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
501 | """
502 | Docgen-specific completion message.
503 | """
504 | return (
505 | "DOCUMENTATION ANALYSIS IS COMPLETE FOR ALL FILES (num_files_documented equals total_files_to_document). "
506 | "MANDATORY FINAL VERIFICATION: Before presenting your summary, you MUST perform a final verification scan. "
507 | "Read through EVERY file you documented and check EVERY function, method, class, and property to confirm "
508 | "it has proper documentation including complexity analysis and call flow information. If ANY items lack "
509 | "documentation, document them immediately before finishing. "
510 | "THEN present a clear summary showing: 1) Final counters: num_files_documented out of total_files_to_document, "
511 | "2) Complete accountability list of ALL files you documented with verification status, "
512 | "3) Detailed list of EVERY function/method you documented in each file (proving complete coverage), "
513 | "4) Any dependency relationships you discovered between files, 5) Recommended documentation improvements with concrete examples including "
514 | "complexity analysis and call flow information. 6) **CRITICAL**: List any bugs or logic issues you found "
515 | "during documentation but did NOT fix - present these to the user and ask what they'd like to do about them. "
516 | "Make it easy for a developer to see the complete documentation status across the entire codebase with full accountability."
517 | )
518 |
519 | def get_step_guidance_message(self, request) -> str:
520 | """
521 | Docgen-specific step guidance with detailed analysis instructions.
522 | """
523 | step_guidance = self.get_step_guidance(request.step_number, request.confidence, request)
524 | return step_guidance["next_steps"]
525 |
526 | def customize_workflow_response(self, response_data: dict, request) -> dict:
527 | """
528 | Customize response to match docgen tool format.
529 | """
530 | # Store initial request on first step
531 | if request.step_number == 1:
532 | self.initial_request = request.step
533 |
534 | # Convert generic status names to docgen-specific ones
535 | tool_name = self.get_name()
536 | status_mapping = {
537 | f"{tool_name}_in_progress": "documentation_analysis_in_progress",
538 | f"pause_for_{tool_name}": "pause_for_documentation_analysis",
539 | f"{tool_name}_required": "documentation_analysis_required",
540 | f"{tool_name}_complete": "documentation_analysis_complete",
541 | }
542 |
543 | if response_data["status"] in status_mapping:
544 | response_data["status"] = status_mapping[response_data["status"]]
545 |
546 | # Rename status field to match docgen tool
547 | if f"{tool_name}_status" in response_data:
548 | response_data["documentation_analysis_status"] = response_data.pop(f"{tool_name}_status")
549 | # Add docgen-specific status fields
550 | response_data["documentation_analysis_status"]["documentation_strategies"] = len(
551 | self.consolidated_findings.hypotheses
552 | )
553 |
554 | # Rename complete documentation analysis data
555 | if f"complete_{tool_name}" in response_data:
556 | response_data["complete_documentation_analysis"] = response_data.pop(f"complete_{tool_name}")
557 |
558 | # Map the completion flag to match docgen tool
559 | if f"{tool_name}_complete" in response_data:
560 | response_data["documentation_analysis_complete"] = response_data.pop(f"{tool_name}_complete")
561 |
562 | # Map the required flag to match docgen tool
563 | if f"{tool_name}_required" in response_data:
564 | response_data["documentation_analysis_required"] = response_data.pop(f"{tool_name}_required")
565 |
566 | return response_data
567 |
568 | # Required abstract methods from BaseTool
569 | def get_request_model(self):
570 | """Return the docgen-specific request model."""
571 | return DocgenRequest
572 |
573 | async def prepare_prompt(self, request) -> str:
574 | """Not used - workflow tools use execute_workflow()."""
575 | return "" # Workflow tools use execute_workflow() directly
576 |
```
--------------------------------------------------------------------------------
/tests/test_conversation_memory.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Test suite for conversation memory system
3 |
4 | Tests the Redis-based conversation persistence needed for AI-to-AI multi-turn
5 | discussions in stateless MCP environments.
6 | """
7 |
8 | import os
9 | from unittest.mock import Mock, patch
10 |
11 | import pytest
12 |
13 | from server import get_follow_up_instructions
14 | from utils.conversation_memory import (
15 | CONVERSATION_TIMEOUT_SECONDS,
16 | MAX_CONVERSATION_TURNS,
17 | ConversationTurn,
18 | ThreadContext,
19 | add_turn,
20 | build_conversation_history,
21 | create_thread,
22 | get_thread,
23 | )
24 |
25 |
26 | class TestConversationMemory:
27 | """Test the conversation memory system for stateless MCP requests"""
28 |
29 | @patch("utils.conversation_memory.get_storage")
30 | def test_create_thread(self, mock_storage):
31 | """Test creating a new thread"""
32 | mock_client = Mock()
33 | mock_storage.return_value = mock_client
34 |
35 | thread_id = create_thread("chat", {"prompt": "Hello", "absolute_file_paths": ["/test.py"]})
36 |
37 | assert thread_id is not None
38 | assert len(thread_id) == 36 # UUID4 length
39 |
40 | # Verify Redis was called
41 | mock_client.setex.assert_called_once()
42 | call_args = mock_client.setex.call_args
43 | assert call_args[0][0] == f"thread:{thread_id}" # key
44 | assert call_args[0][1] == CONVERSATION_TIMEOUT_SECONDS # TTL from configuration
45 |
46 | @patch("utils.conversation_memory.get_storage")
47 | def test_get_thread_valid(self, mock_storage):
48 | """Test retrieving an existing thread"""
49 | mock_client = Mock()
50 | mock_storage.return_value = mock_client
51 |
52 | test_uuid = "12345678-1234-1234-1234-123456789012"
53 |
54 | # Create valid ThreadContext and serialize it
55 | context_obj = ThreadContext(
56 | thread_id=test_uuid,
57 | created_at="2023-01-01T00:00:00Z",
58 | last_updated_at="2023-01-01T00:01:00Z",
59 | tool_name="chat",
60 | turns=[],
61 | initial_context={"prompt": "test"},
62 | )
63 | mock_client.get.return_value = context_obj.model_dump_json()
64 |
65 | context = get_thread(test_uuid)
66 |
67 | assert context is not None
68 | assert context.thread_id == test_uuid
69 | assert context.tool_name == "chat"
70 | mock_client.get.assert_called_once_with(f"thread:{test_uuid}")
71 |
72 | @patch("utils.conversation_memory.get_storage")
73 | def test_get_thread_invalid_uuid(self, mock_storage):
74 | """Test handling invalid UUID"""
75 | context = get_thread("invalid-uuid")
76 | assert context is None
77 |
78 | @patch("utils.conversation_memory.get_storage")
79 | def test_get_thread_not_found(self, mock_storage):
80 | """Test handling thread not found"""
81 | mock_client = Mock()
82 | mock_storage.return_value = mock_client
83 | mock_client.get.return_value = None
84 |
85 | context = get_thread("12345678-1234-1234-1234-123456789012")
86 | assert context is None
87 |
88 | @patch("utils.conversation_memory.get_storage")
89 | def test_add_turn_success(self, mock_storage):
90 | """Test adding a turn to existing thread"""
91 | mock_client = Mock()
92 | mock_storage.return_value = mock_client
93 |
94 | test_uuid = "12345678-1234-1234-1234-123456789012"
95 |
96 | # Create valid ThreadContext
97 | context_obj = ThreadContext(
98 | thread_id=test_uuid,
99 | created_at="2023-01-01T00:00:00Z",
100 | last_updated_at="2023-01-01T00:01:00Z",
101 | tool_name="chat",
102 | turns=[],
103 | initial_context={"prompt": "test"},
104 | )
105 | mock_client.get.return_value = context_obj.model_dump_json()
106 |
107 | success = add_turn(test_uuid, "user", "Hello there")
108 |
109 | assert success is True
110 | # Verify Redis get and setex were called
111 | mock_client.get.assert_called_once()
112 | mock_client.setex.assert_called_once()
113 |
114 | @patch("utils.conversation_memory.get_storage")
115 | def test_add_turn_max_limit(self, mock_storage):
116 | """Test turn limit enforcement"""
117 | mock_client = Mock()
118 | mock_storage.return_value = mock_client
119 |
120 | test_uuid = "12345678-1234-1234-1234-123456789012"
121 |
122 | # Create thread with MAX_CONVERSATION_TURNS turns (at limit)
123 | turns = [
124 | ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
125 | for i in range(MAX_CONVERSATION_TURNS)
126 | ]
127 | context_obj = ThreadContext(
128 | thread_id=test_uuid,
129 | created_at="2023-01-01T00:00:00Z",
130 | last_updated_at="2023-01-01T00:01:00Z",
131 | tool_name="chat",
132 | turns=turns,
133 | initial_context={"prompt": "test"},
134 | )
135 | mock_client.get.return_value = context_obj.model_dump_json()
136 |
137 | success = add_turn(test_uuid, "user", "This should fail")
138 |
139 | assert success is False
140 |
141 | @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
142 | def test_build_conversation_history(self, project_path):
143 | """Test building conversation history format with files and speaker identification"""
144 | from providers.registry import ModelProviderRegistry
145 |
146 | ModelProviderRegistry.clear_cache()
147 |
148 | # Create real test files to test actual file embedding functionality
149 | main_file = project_path / "main.py"
150 | readme_file = project_path / "docs" / "readme.md"
151 | examples_dir = project_path / "examples"
152 | examples_file = examples_dir / "example.py"
153 |
154 | # Create directories and files
155 | readme_file.parent.mkdir(parents=True, exist_ok=True)
156 | examples_dir.mkdir(parents=True, exist_ok=True)
157 |
158 | main_file.write_text("def main():\n print('Hello world')\n")
159 | readme_file.write_text("# Project Documentation\nThis is a test project.\n")
160 | examples_file.write_text("# Example code\nprint('Example')\n")
161 |
162 | test_uuid = "12345678-1234-1234-1234-123456789012"
163 |
164 | turns = [
165 | ConversationTurn(
166 | role="user",
167 | content="What is Python?",
168 | timestamp="2023-01-01T00:00:00Z",
169 | files=[str(main_file), str(readme_file)],
170 | ),
171 | ConversationTurn(
172 | role="assistant",
173 | content="Python is a programming language",
174 | timestamp="2023-01-01T00:01:00Z",
175 | files=[str(examples_dir)], # Directory will be expanded to files
176 | tool_name="chat",
177 | model_name="gpt-5",
178 | model_provider="openai",
179 | ),
180 | ]
181 |
182 | context = ThreadContext(
183 | thread_id=test_uuid,
184 | created_at="2023-01-01T00:00:00Z",
185 | last_updated_at="2023-01-01T00:01:00Z",
186 | tool_name="chat",
187 | turns=turns,
188 | initial_context={},
189 | )
190 |
191 | history, tokens = build_conversation_history(context, model_context=None)
192 |
193 | # Test basic structure
194 | assert "CONVERSATION HISTORY" in history
195 | assert f"Thread: {test_uuid}" in history
196 | assert "Tool: chat" in history
197 | assert f"Turn 2/{MAX_CONVERSATION_TURNS}" in history
198 |
199 | # Test speaker identification
200 | assert "--- Turn 1 (Agent) ---" in history
201 | assert "--- Turn 2 (gpt-5 using chat via openai) ---" in history
202 |
203 | # Test content
204 | assert "What is Python?" in history
205 | assert "Python is a programming language" in history
206 |
207 | # Test file tracking
208 | # Check that the new file embedding section is included
209 | assert "=== FILES REFERENCED IN THIS CONVERSATION ===" in history
210 | assert "The following files have been shared and analyzed during our conversation." in history
211 |
212 | # Check that file context from previous turns is included (now shows files used per turn)
213 | assert f"Files used in this turn: {main_file}, {readme_file}" in history
214 | assert f"Files used in this turn: {examples_dir}" in history
215 |
216 | # Verify actual file content is embedded
217 | assert "def main():" in history
218 | assert "Hello world" in history
219 | assert "Project Documentation" in history
220 |
221 | def test_build_conversation_history_empty(self):
222 | """Test building history with no turns"""
223 | test_uuid = "12345678-1234-1234-1234-123456789012"
224 |
225 | context = ThreadContext(
226 | thread_id=test_uuid,
227 | created_at="2023-01-01T00:00:00Z",
228 | last_updated_at="2023-01-01T00:00:00Z",
229 | tool_name="chat",
230 | turns=[],
231 | initial_context={},
232 | )
233 |
234 | history, tokens = build_conversation_history(context, model_context=None)
235 | assert history == ""
236 | assert tokens == 0
237 |
238 |
239 | class TestConversationFlow:
240 | """Test complete conversation flows simulating stateless MCP requests"""
241 |
242 | @patch("utils.conversation_memory.get_storage")
243 | def test_complete_conversation_cycle(self, mock_storage):
244 | """Test a complete 5-turn conversation until limit reached"""
245 | mock_client = Mock()
246 | mock_storage.return_value = mock_client
247 |
248 | # Simulate independent MCP request cycles
249 |
250 | # REQUEST 1: Initial request creates thread
251 | thread_id = create_thread("chat", {"prompt": "Analyze this code"})
252 | initial_context = ThreadContext(
253 | thread_id=thread_id,
254 | created_at="2023-01-01T00:00:00Z",
255 | last_updated_at="2023-01-01T00:00:00Z",
256 | tool_name="chat",
257 | turns=[],
258 | initial_context={"prompt": "Analyze this code"},
259 | )
260 | mock_client.get.return_value = initial_context.model_dump_json()
261 |
262 | # Add assistant response
263 | success = add_turn(
264 | thread_id,
265 | "assistant",
266 | "Code analysis complete",
267 | )
268 | assert success is True
269 |
270 | # REQUEST 2: User responds to follow-up (independent request cycle)
271 | # Simulate retrieving updated context from Redis
272 | context_after_1 = ThreadContext(
273 | thread_id=thread_id,
274 | created_at="2023-01-01T00:00:00Z",
275 | last_updated_at="2023-01-01T00:01:00Z",
276 | tool_name="chat",
277 | turns=[
278 | ConversationTurn(
279 | role="assistant",
280 | content="Code analysis complete",
281 | timestamp="2023-01-01T00:00:30Z",
282 | )
283 | ],
284 | initial_context={"prompt": "Analyze this code"},
285 | )
286 | mock_client.get.return_value = context_after_1.model_dump_json()
287 |
288 | success = add_turn(thread_id, "user", "Yes, check error handling")
289 | assert success is True
290 |
291 | success = add_turn(thread_id, "assistant", "Error handling reviewed")
292 | assert success is True
293 |
294 | # REQUEST 3-5: Continue conversation (simulating independent cycles)
295 | # After turn 3
296 | context_after_3 = ThreadContext(
297 | thread_id=thread_id,
298 | created_at="2023-01-01T00:00:00Z",
299 | last_updated_at="2023-01-01T00:03:00Z",
300 | tool_name="chat",
301 | turns=[
302 | ConversationTurn(
303 | role="assistant",
304 | content="Code analysis complete",
305 | timestamp="2023-01-01T00:00:30Z",
306 | ),
307 | ConversationTurn(role="user", content="Yes, check error handling", timestamp="2023-01-01T00:01:30Z"),
308 | ConversationTurn(
309 | role="assistant",
310 | content="Error handling reviewed",
311 | timestamp="2023-01-01T00:02:30Z",
312 | ),
313 | ],
314 | initial_context={"prompt": "Analyze this code"},
315 | )
316 | mock_client.get.return_value = context_after_3.model_dump_json()
317 |
318 | success = add_turn(thread_id, "user", "Yes, check tests")
319 | assert success is True
320 |
321 | success = add_turn(thread_id, "assistant", "Test coverage analyzed")
322 | assert success is True
323 |
324 | # REQUEST 6: Try to exceed MAX_CONVERSATION_TURNS limit - should fail
325 | turns_at_limit = [
326 | ConversationTurn(
327 | role="assistant" if i % 2 == 0 else "user", content=f"Turn {i + 1}", timestamp="2023-01-01T00:00:30Z"
328 | )
329 | for i in range(MAX_CONVERSATION_TURNS)
330 | ]
331 |
332 | context_at_limit = ThreadContext(
333 | thread_id=thread_id,
334 | created_at="2023-01-01T00:00:00Z",
335 | last_updated_at="2023-01-01T00:05:00Z",
336 | tool_name="chat",
337 | turns=turns_at_limit,
338 | initial_context={"prompt": "Analyze this code"},
339 | )
340 | mock_client.get.return_value = context_at_limit.model_dump_json()
341 |
342 | # This should fail - conversation has reached limit
343 | success = add_turn(thread_id, "user", "This should be rejected")
344 | assert success is False # CONVERSATION STOPS HERE
345 |
346 | @patch("utils.conversation_memory.get_storage")
347 | def test_invalid_continuation_id_error(self, mock_storage):
348 | """Test that invalid continuation IDs raise proper error for restart"""
349 | from server import reconstruct_thread_context
350 |
351 | mock_client = Mock()
352 | mock_storage.return_value = mock_client
353 | mock_client.get.return_value = None # Thread not found
354 |
355 | arguments = {"continuation_id": "invalid-uuid-12345", "prompt": "Continue conversation"}
356 |
357 | # Should raise ValueError asking to restart
358 | with pytest.raises(ValueError) as exc_info:
359 | import asyncio
360 |
361 | asyncio.run(reconstruct_thread_context(arguments))
362 |
363 | error_msg = str(exc_info.value)
364 | assert "Conversation thread 'invalid-uuid-12345' was not found or has expired" in error_msg
365 | assert (
366 | "Please restart the conversation by providing your full question/prompt without the continuation_id"
367 | in error_msg
368 | )
369 |
370 | @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
371 | def test_dynamic_max_turns_configuration(self):
372 | """Test that all functions respect MAX_CONVERSATION_TURNS configuration"""
373 | from providers.registry import ModelProviderRegistry
374 |
375 | ModelProviderRegistry.clear_cache()
376 |
377 | # This test ensures if we change MAX_CONVERSATION_TURNS, everything updates
378 |
379 | # Test with different max values by patching the constant
380 | test_values = [3, 7, 10]
381 |
382 | for test_max in test_values:
383 | # Create turns up to the test limit
384 | turns = [
385 | ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
386 | for i in range(test_max)
387 | ]
388 |
389 | # Test history building respects the limit
390 | test_uuid = "12345678-1234-1234-1234-123456789012"
391 | context = ThreadContext(
392 | thread_id=test_uuid,
393 | created_at="2023-01-01T00:00:00Z",
394 | last_updated_at="2023-01-01T00:00:00Z",
395 | tool_name="chat",
396 | turns=turns,
397 | initial_context={},
398 | )
399 |
400 | history, tokens = build_conversation_history(context, model_context=None)
401 | expected_turn_text = f"Turn {test_max}/{MAX_CONVERSATION_TURNS}"
402 | assert expected_turn_text in history
403 |
404 | def test_follow_up_instructions_dynamic_behavior(self):
405 | """Test that follow-up instructions change correctly based on turn count and max setting"""
406 | # Test with default MAX_CONVERSATION_TURNS
407 | max_turns = MAX_CONVERSATION_TURNS
408 |
409 | # Test early conversation (should allow follow-ups)
410 | early_instructions = get_follow_up_instructions(0, max_turns)
411 | assert "CONVERSATION CONTINUATION" in early_instructions
412 | assert f"({max_turns - 1} exchanges remaining)" in early_instructions
413 | assert "Feel free to ask clarifying questions" in early_instructions
414 |
415 | # Test mid conversation
416 | mid_instructions = get_follow_up_instructions(2, max_turns)
417 | assert "CONVERSATION CONTINUATION" in mid_instructions
418 | assert f"({max_turns - 3} exchanges remaining)" in mid_instructions
419 | assert "Feel free to ask clarifying questions" in mid_instructions
420 |
421 | # Test approaching limit (should stop follow-ups)
422 | limit_instructions = get_follow_up_instructions(max_turns - 1, max_turns)
423 | assert "Do NOT include any follow-up questions" in limit_instructions
424 | assert "final exchange" in limit_instructions
425 |
426 | # Test at limit
427 | at_limit_instructions = get_follow_up_instructions(max_turns, max_turns)
428 | assert "Do NOT include any follow-up questions" in at_limit_instructions
429 |
430 | # Test with custom max_turns to ensure dynamic behavior
431 | custom_max = 3
432 | custom_early = get_follow_up_instructions(0, custom_max)
433 | assert f"({custom_max - 1} exchanges remaining)" in custom_early
434 |
435 | custom_limit = get_follow_up_instructions(custom_max - 1, custom_max)
436 | assert "Do NOT include any follow-up questions" in custom_limit
437 |
438 | def test_follow_up_instructions_defaults_to_config(self):
439 | """Test that follow-up instructions use MAX_CONVERSATION_TURNS when max_turns not provided"""
440 | instructions = get_follow_up_instructions(0) # No max_turns parameter
441 | expected_remaining = MAX_CONVERSATION_TURNS - 1
442 | assert f"({expected_remaining} exchanges remaining)" in instructions
443 |
444 | @patch("utils.conversation_memory.get_storage")
445 | def test_complete_conversation_with_dynamic_turns(self, mock_storage):
446 | """Test complete conversation respecting MAX_CONVERSATION_TURNS dynamically"""
447 | mock_client = Mock()
448 | mock_storage.return_value = mock_client
449 |
450 | thread_id = create_thread("chat", {"prompt": "Start conversation"})
451 |
452 | # Simulate conversation up to MAX_CONVERSATION_TURNS - 1
453 | for turn_num in range(MAX_CONVERSATION_TURNS - 1):
454 | # Mock context with current turns
455 | turns = [
456 | ConversationTurn(
457 | role="user" if i % 2 == 0 else "assistant",
458 | content=f"Turn {i + 1}",
459 | timestamp="2023-01-01T00:00:00Z",
460 | )
461 | for i in range(turn_num)
462 | ]
463 |
464 | context = ThreadContext(
465 | thread_id=thread_id,
466 | created_at="2023-01-01T00:00:00Z",
467 | last_updated_at="2023-01-01T00:00:00Z",
468 | tool_name="chat",
469 | turns=turns,
470 | initial_context={"prompt": "Start conversation"},
471 | )
472 | mock_client.get.return_value = context.model_dump_json()
473 |
474 | # Should succeed
475 | success = add_turn(thread_id, "user", f"User turn {turn_num + 1}")
476 | assert success is True, f"Turn {turn_num + 1} should succeed"
477 |
478 | # Now we should be at the limit - create final context
479 | final_turns = [
480 | ConversationTurn(
481 | role="user" if i % 2 == 0 else "assistant", content=f"Turn {i + 1}", timestamp="2023-01-01T00:00:00Z"
482 | )
483 | for i in range(MAX_CONVERSATION_TURNS)
484 | ]
485 |
486 | final_context = ThreadContext(
487 | thread_id=thread_id,
488 | created_at="2023-01-01T00:00:00Z",
489 | last_updated_at="2023-01-01T00:00:00Z",
490 | tool_name="chat",
491 | turns=final_turns,
492 | initial_context={"prompt": "Start conversation"},
493 | )
494 | mock_client.get.return_value = final_context.model_dump_json()
495 |
496 | # This should fail - at the limit
497 | success = add_turn(thread_id, "user", "This should fail")
498 | assert success is False, f"Turn {MAX_CONVERSATION_TURNS + 1} should fail"
499 |
500 | @patch("utils.conversation_memory.get_storage")
501 | @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
502 | def test_conversation_with_files_and_context_preservation(self, mock_storage):
503 | """Test complete conversation flow with file tracking and context preservation"""
504 | from providers.registry import ModelProviderRegistry
505 |
506 | ModelProviderRegistry.clear_cache()
507 |
508 | mock_client = Mock()
509 | mock_storage.return_value = mock_client
510 |
511 | # Start conversation with files using a simple tool
512 | thread_id = create_thread("chat", {"prompt": "Analyze this codebase", "absolute_file_paths": ["/project/src/"]})
513 |
514 | # Turn 1: Claude provides context with multiple files
515 | initial_context = ThreadContext(
516 | thread_id=thread_id,
517 | created_at="2023-01-01T00:00:00Z",
518 | last_updated_at="2023-01-01T00:00:00Z",
519 | tool_name="chat",
520 | turns=[],
521 | initial_context={
522 | "prompt": "Analyze this codebase",
523 | "absolute_file_paths": ["/project/src/"],
524 | },
525 | )
526 | mock_client.get.return_value = initial_context.model_dump_json()
527 |
528 | # Add Gemini's response
529 | success = add_turn(
530 | thread_id,
531 | "assistant",
532 | "I've analyzed your codebase structure.",
533 | files=["/project/src/main.py", "/project/src/utils.py"],
534 | tool_name="analyze",
535 | model_name="gemini-2.5-flash",
536 | model_provider="google",
537 | )
538 | assert success is True
539 |
540 | # Turn 2: Claude responds with different files
541 | context_turn_1 = ThreadContext(
542 | thread_id=thread_id,
543 | created_at="2023-01-01T00:00:00Z",
544 | last_updated_at="2023-01-01T00:01:00Z",
545 | tool_name="analyze",
546 | turns=[
547 | ConversationTurn(
548 | role="assistant",
549 | content="I've analyzed your codebase structure.",
550 | timestamp="2023-01-01T00:00:30Z",
551 | files=["/project/src/main.py", "/project/src/utils.py"],
552 | tool_name="analyze",
553 | model_name="gemini-2.5-flash",
554 | model_provider="google",
555 | )
556 | ],
557 | initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
558 | )
559 | mock_client.get.return_value = context_turn_1.model_dump_json()
560 |
561 | # User responds with test files
562 | success = add_turn(
563 | thread_id, "user", "Yes, check the test coverage", files=["/project/tests/", "/project/test_main.py"]
564 | )
565 | assert success is True
566 |
567 | # Turn 3: Gemini analyzes tests
568 | context_turn_2 = ThreadContext(
569 | thread_id=thread_id,
570 | created_at="2023-01-01T00:00:00Z",
571 | last_updated_at="2023-01-01T00:02:00Z",
572 | tool_name="analyze",
573 | turns=[
574 | ConversationTurn(
575 | role="assistant",
576 | content="I've analyzed your codebase structure.",
577 | timestamp="2023-01-01T00:00:30Z",
578 | files=["/project/src/main.py", "/project/src/utils.py"],
579 | tool_name="analyze",
580 | ),
581 | ConversationTurn(
582 | role="user",
583 | content="Yes, check the test coverage",
584 | timestamp="2023-01-01T00:01:30Z",
585 | files=["/project/tests/", "/project/test_main.py"],
586 | ),
587 | ],
588 | initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
589 | )
590 | mock_client.get.return_value = context_turn_2.model_dump_json()
591 |
592 | success = add_turn(
593 | thread_id,
594 | "assistant",
595 | "Test coverage analysis complete. Coverage is 85%.",
596 | files=["/project/tests/test_utils.py", "/project/coverage.html"],
597 | tool_name="analyze",
598 | model_name="gemini-2.5-flash",
599 | model_provider="google",
600 | )
601 | assert success is True
602 |
603 | # Build conversation history and verify chronological file preservation
604 | final_context = ThreadContext(
605 | thread_id=thread_id,
606 | created_at="2023-01-01T00:00:00Z",
607 | last_updated_at="2023-01-01T00:03:00Z",
608 | tool_name="analyze",
609 | turns=[
610 | ConversationTurn(
611 | role="assistant",
612 | content="I've analyzed your codebase structure.",
613 | timestamp="2023-01-01T00:00:30Z",
614 | files=["/project/src/main.py", "/project/src/utils.py"],
615 | tool_name="analyze",
616 | model_name="gemini-2.5-flash",
617 | model_provider="google",
618 | ),
619 | ConversationTurn(
620 | role="user",
621 | content="Yes, check the test coverage",
622 | timestamp="2023-01-01T00:01:30Z",
623 | files=["/project/tests/", "/project/test_main.py"],
624 | ),
625 | ConversationTurn(
626 | role="assistant",
627 | content="Test coverage analysis complete. Coverage is 85%.",
628 | timestamp="2023-01-01T00:02:30Z",
629 | files=["/project/tests/test_utils.py", "/project/coverage.html"],
630 | tool_name="analyze",
631 | model_name="gemini-2.5-flash",
632 | model_provider="google",
633 | ),
634 | ],
635 | initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
636 | )
637 |
638 | history, tokens = build_conversation_history(final_context)
639 |
640 | # Verify chronological order and speaker identification
641 | assert "--- Turn 1 (gemini-2.5-flash using analyze via google) ---" in history
642 | assert "--- Turn 2 (Agent) ---" in history
643 | assert "--- Turn 3 (gemini-2.5-flash using analyze via google) ---" in history
644 |
645 | # Verify all files are preserved in chronological order
646 | turn_1_files = "Files used in this turn: /project/src/main.py, /project/src/utils.py"
647 | turn_2_files = "Files used in this turn: /project/tests/, /project/test_main.py"
648 | turn_3_files = "Files used in this turn: /project/tests/test_utils.py, /project/coverage.html"
649 |
650 | assert turn_1_files in history
651 | assert turn_2_files in history
652 | assert turn_3_files in history
653 |
654 | # Verify content
655 | assert "I've analyzed your codebase structure." in history
656 | assert "Yes, check the test coverage" in history
657 | assert "Test coverage analysis complete. Coverage is 85%." in history
658 |
659 | # Verify chronological ordering (turn 1 appears before turn 2, etc.)
660 | turn_1_pos = history.find("--- Turn 1 (gemini-2.5-flash using analyze via google) ---")
661 | turn_2_pos = history.find("--- Turn 2 (Agent) ---")
662 | turn_3_pos = history.find("--- Turn 3 (gemini-2.5-flash using analyze via google) ---")
663 |
664 | assert turn_1_pos < turn_2_pos < turn_3_pos
665 |
666 | @patch("utils.conversation_memory.get_storage")
667 | def test_stateless_request_isolation(self, mock_storage):
668 | """Test that each request cycle is independent but shares context via Redis"""
669 | mock_client = Mock()
670 | mock_storage.return_value = mock_client
671 |
672 | # Simulate two different "processes" accessing same thread
673 | thread_id = "12345678-1234-1234-1234-123456789012"
674 |
675 | # Process 1: Creates thread
676 | initial_context = ThreadContext(
677 | thread_id=thread_id,
678 | created_at="2023-01-01T00:00:00Z",
679 | last_updated_at="2023-01-01T00:00:00Z",
680 | tool_name="thinkdeep",
681 | turns=[],
682 | initial_context={"prompt": "Think about architecture"},
683 | )
684 | mock_client.get.return_value = initial_context.model_dump_json()
685 |
686 | success = add_turn(thread_id, "assistant", "Architecture analysis")
687 | assert success is True
688 |
689 | # Process 2: Different "request cycle" accesses same thread
690 | context_from_redis = ThreadContext(
691 | thread_id=thread_id,
692 | created_at="2023-01-01T00:00:00Z",
693 | last_updated_at="2023-01-01T00:01:00Z",
694 | tool_name="thinkdeep",
695 | turns=[
696 | ConversationTurn(
697 | role="assistant",
698 | content="Architecture analysis",
699 | timestamp="2023-01-01T00:00:30Z",
700 | )
701 | ],
702 | initial_context={"prompt": "Think about architecture"},
703 | )
704 | mock_client.get.return_value = context_from_redis.model_dump_json()
705 |
706 | # Verify context continuity across "processes"
707 | retrieved_context = get_thread(thread_id)
708 | assert retrieved_context is not None
709 | assert len(retrieved_context.turns) == 1
710 |
711 | @patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
712 | def test_token_limit_optimization_in_conversation_history(self):
713 | """Test that build_conversation_history efficiently handles token limits"""
714 | import os
715 | import tempfile
716 |
717 | from providers.registry import ModelProviderRegistry
718 |
719 | ModelProviderRegistry.clear_cache()
720 |
721 | from utils.conversation_memory import build_conversation_history
722 |
723 | # Create test files with known content sizes
724 | with tempfile.TemporaryDirectory() as temp_dir:
725 | # Create small and large test files
726 | small_file = os.path.join(temp_dir, "small.py")
727 | large_file = os.path.join(temp_dir, "large.py")
728 |
729 | small_content = "# Small file\nprint('hello')\n"
730 | large_content = "# Large file\n" + "x = 1\n" * 10000 # Very large file
731 |
732 | with open(small_file, "w") as f:
733 | f.write(small_content)
734 | with open(large_file, "w") as f:
735 | f.write(large_content)
736 |
737 | # Create context with files that would exceed token limit
738 | context = ThreadContext(
739 | thread_id="test-token-limit",
740 | created_at="2023-01-01T00:00:00Z",
741 | last_updated_at="2023-01-01T00:01:00Z",
742 | tool_name="analyze",
743 | turns=[
744 | ConversationTurn(
745 | role="user",
746 | content="Analyze these files",
747 | timestamp="2023-01-01T00:00:30Z",
748 | files=[small_file, large_file], # Large file should be truncated
749 | )
750 | ],
751 | initial_context={"prompt": "Analyze code"},
752 | )
753 |
754 | # Build conversation history (should handle token limits gracefully)
755 | history, tokens = build_conversation_history(context, model_context=None)
756 |
757 | # Verify the history was built successfully
758 | assert "=== CONVERSATION HISTORY" in history
759 | assert "=== FILES REFERENCED IN THIS CONVERSATION ===" in history
760 |
761 | # The small file should be included, but large file might be truncated
762 | # At minimum, verify no crashes and history is generated
763 | assert len(history) > 0
764 |
765 | # If truncation occurred, there should be a note about it
766 | if "additional file(s) were truncated due to token limit" in history:
767 | assert small_file in history or large_file in history
768 | else:
769 | # Both files fit within limit
770 | assert small_file in history
771 | assert large_file in history
772 |
773 |
774 | if __name__ == "__main__":
775 | pytest.main([__file__])
776 |
```
--------------------------------------------------------------------------------
/tests/test_large_prompt_handling.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for large prompt handling functionality.
3 |
4 | This test module verifies that the MCP server correctly handles
5 | prompts that exceed the 50,000 character limit by requesting
6 | Claude to save them to a file and resend.
7 | """
8 |
9 | import json
10 | import os
11 | import shutil
12 | import tempfile
13 | from unittest.mock import MagicMock, patch
14 |
15 | import pytest
16 |
17 | from config import MCP_PROMPT_SIZE_LIMIT
18 | from tools.chat import ChatTool
19 | from tools.codereview import CodeReviewTool
20 | from tools.shared.exceptions import ToolExecutionError
21 |
22 | # from tools.debug import DebugIssueTool # Commented out - debug tool refactored
23 |
24 |
25 | class TestLargePromptHandling:
26 | """Test suite for large prompt handling across all tools."""
27 |
28 | def teardown_method(self):
29 | """Clean up after each test to prevent state pollution."""
30 | # Clear provider registry singleton
31 | from providers.registry import ModelProviderRegistry
32 |
33 | ModelProviderRegistry._instance = None
34 |
35 | @pytest.fixture
36 | def large_prompt(self):
37 | """Create a prompt larger than MCP_PROMPT_SIZE_LIMIT characters."""
38 | return "x" * (MCP_PROMPT_SIZE_LIMIT + 1000)
39 |
40 | @pytest.fixture
41 | def normal_prompt(self):
42 | """Create a normal-sized prompt."""
43 | return "This is a normal prompt that should work fine."
44 |
45 | @pytest.fixture
46 | def temp_prompt_file(self, large_prompt):
47 | """Create a temporary prompt.txt file with large content."""
48 | # Create temp file with exact name "prompt.txt"
49 | temp_dir = tempfile.mkdtemp()
50 | file_path = os.path.join(temp_dir, "prompt.txt")
51 | with open(file_path, "w") as f:
52 | f.write(large_prompt)
53 | return file_path
54 |
55 | @pytest.mark.asyncio
56 | async def test_chat_large_prompt_detection(self, large_prompt):
57 | """Test that chat tool detects large prompts."""
58 | tool = ChatTool()
59 | temp_dir = tempfile.mkdtemp()
60 | temp_dir = tempfile.mkdtemp()
61 | try:
62 | with pytest.raises(ToolExecutionError) as exc_info:
63 | await tool.execute({"prompt": large_prompt, "working_directory_absolute_path": temp_dir})
64 | finally:
65 | shutil.rmtree(temp_dir, ignore_errors=True)
66 |
67 | output = json.loads(exc_info.value.payload)
68 | assert output["status"] == "resend_prompt"
69 | assert f"{MCP_PROMPT_SIZE_LIMIT:,} characters" in output["content"]
70 | # The prompt size should match the user input since we check at MCP transport boundary before adding internal content
71 | assert output["metadata"]["prompt_size"] == len(large_prompt)
72 | assert output["metadata"]["limit"] == MCP_PROMPT_SIZE_LIMIT
73 |
74 | @pytest.mark.asyncio
75 | async def test_chat_normal_prompt_works(self, normal_prompt):
76 | """Test that chat tool works normally with regular prompts."""
77 | tool = ChatTool()
78 |
79 | temp_dir = tempfile.mkdtemp()
80 |
81 | # This test runs in the test environment which uses dummy keys
82 | # The chat tool will return an error for dummy keys, which is expected
83 | try:
84 | try:
85 | result = await tool.execute(
86 | {"prompt": normal_prompt, "model": "gemini-2.5-flash", "working_directory_absolute_path": temp_dir}
87 | )
88 | except ToolExecutionError as exc:
89 | output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
90 | else:
91 | assert len(result) == 1
92 | output = json.loads(result[0].text)
93 | finally:
94 | shutil.rmtree(temp_dir, ignore_errors=True)
95 |
96 | # Whether provider succeeds or fails, we should not hit the resend_prompt branch
97 | assert output["status"] != "resend_prompt"
98 |
99 | @pytest.mark.asyncio
100 | async def test_chat_prompt_file_handling(self):
101 | """Test that chat tool correctly handles prompt.txt files with reasonable size."""
102 | tool = ChatTool()
103 | # Use a smaller prompt that won't exceed limit when combined with system prompt
104 | reasonable_prompt = "This is a reasonable sized prompt for testing prompt.txt file handling."
105 |
106 | # Create a temp file with reasonable content
107 | temp_dir = tempfile.mkdtemp()
108 | temp_prompt_file = os.path.join(temp_dir, "prompt.txt")
109 | with open(temp_prompt_file, "w") as f:
110 | f.write(reasonable_prompt)
111 |
112 | try:
113 | try:
114 | result = await tool.execute(
115 | {
116 | "prompt": "",
117 | "absolute_file_paths": [temp_prompt_file],
118 | "model": "gemini-2.5-flash",
119 | "working_directory_absolute_path": temp_dir,
120 | }
121 | )
122 | except ToolExecutionError as exc:
123 | output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
124 | else:
125 | assert len(result) == 1
126 | output = json.loads(result[0].text)
127 |
128 | # The test may fail with dummy API keys, which is expected behavior.
129 | # We're mainly testing that the tool processes prompt files correctly without size errors.
130 | assert output["status"] != "resend_prompt"
131 | finally:
132 | # Cleanup
133 | shutil.rmtree(temp_dir)
134 |
135 | @pytest.mark.asyncio
136 | async def test_codereview_large_focus(self, large_prompt):
137 | """Test that codereview tool detects large focus_on field using real integration testing."""
138 | import importlib
139 | import os
140 |
141 | tool = CodeReviewTool()
142 |
143 | # Save original environment
144 | original_env = {
145 | "OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY"),
146 | "DEFAULT_MODEL": os.environ.get("DEFAULT_MODEL"),
147 | }
148 |
149 | try:
150 | # Set up environment for real provider resolution
151 | os.environ["OPENAI_API_KEY"] = "sk-test-key-large-focus-test-not-real"
152 | os.environ["DEFAULT_MODEL"] = "o3-mini"
153 |
154 | # Clear other provider keys to isolate to OpenAI
155 | for key in ["GEMINI_API_KEY", "XAI_API_KEY", "OPENROUTER_API_KEY"]:
156 | os.environ.pop(key, None)
157 |
158 | # Reload config and clear registry
159 | import config
160 |
161 | importlib.reload(config)
162 | from providers.registry import ModelProviderRegistry
163 |
164 | ModelProviderRegistry._instance = None
165 |
166 | # Test with real provider resolution
167 | try:
168 | args = {
169 | "step": "initial review setup",
170 | "step_number": 1,
171 | "total_steps": 1,
172 | "next_step_required": False,
173 | "findings": "Initial testing",
174 | "relevant_files": ["/some/file.py"],
175 | "files_checked": ["/some/file.py"],
176 | "focus_on": large_prompt,
177 | "prompt": "Test code review for validation purposes",
178 | "model": "o3-mini",
179 | }
180 |
181 | try:
182 | result = await tool.execute(args)
183 | except ToolExecutionError as exc:
184 | output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
185 | else:
186 | assert len(result) == 1
187 | output = json.loads(result[0].text)
188 |
189 | # The large focus_on may trigger the resend_prompt guard before provider access.
190 | # When the guard does not trigger, auto-mode falls back to provider selection and
191 | # returns an error about the unavailable model. Both behaviors are acceptable for this test.
192 | if output.get("status") == "resend_prompt":
193 | assert output["metadata"]["prompt_size"] == len(large_prompt)
194 | else:
195 | assert output.get("status") == "error"
196 | assert "Model" in output.get("content", "")
197 |
198 | except Exception as e:
199 | # If we get an unexpected exception, ensure it's not a mock artifact
200 | error_msg = str(e)
201 | assert "MagicMock" not in error_msg
202 | assert "'<' not supported between instances" not in error_msg
203 |
204 | # Should be a real provider error (API, authentication, etc.)
205 | assert any(
206 | phrase in error_msg
207 | for phrase in ["API", "key", "authentication", "provider", "network", "connection"]
208 | )
209 |
210 | finally:
211 | # Restore environment
212 | for key, value in original_env.items():
213 | if value is not None:
214 | os.environ[key] = value
215 | else:
216 | os.environ.pop(key, None)
217 |
218 | # Reload config and clear registry
219 | importlib.reload(config)
220 | ModelProviderRegistry._instance = None
221 |
222 | # NOTE: Precommit test has been removed because the precommit tool has been
223 | # refactored to use a workflow-based pattern instead of accepting simple prompt/path fields.
224 | # The new precommit tool requires workflow fields like: step, step_number, total_steps,
225 | # next_step_required, findings, etc. See simulator_tests/test_precommitworkflow_validation.py
226 | # for comprehensive workflow testing including large prompt handling.
227 |
228 | # NOTE: Debug tool tests have been commented out because the debug tool has been
229 | # refactored to use a self-investigation pattern instead of accepting a prompt field.
230 | # The new debug tool requires fields like: step, step_number, total_steps, next_step_required, findings
231 | # and doesn't have the "resend_prompt" functionality for large prompts.
232 |
233 | # @pytest.mark.asyncio
234 | # async def test_debug_large_error_description(self, large_prompt):
235 | # """Test that debug tool detects large error_description."""
236 | # tool = DebugIssueTool()
237 | # result = await tool.execute({"prompt": large_prompt})
238 | #
239 | # assert len(result) == 1
240 | # output = json.loads(result[0].text)
241 | # assert output["status"] == "resend_prompt"
242 |
243 | # @pytest.mark.asyncio
244 | # async def test_debug_large_error_context(self, large_prompt, normal_prompt):
245 | # """Test that debug tool detects large error_context."""
246 | # tool = DebugIssueTool()
247 | # result = await tool.execute({"prompt": normal_prompt, "error_context": large_prompt})
248 | #
249 | # assert len(result) == 1
250 | # output = json.loads(result[0].text)
251 | # assert output["status"] == "resend_prompt"
252 |
253 | # Removed: test_analyze_large_question - workflow tool handles large prompts differently
254 |
255 | @pytest.mark.asyncio
256 | async def test_multiple_files_with_prompt_txt(self, temp_prompt_file):
257 | """Test handling of prompt.txt alongside other files."""
258 | tool = ChatTool()
259 | other_file = "/some/other/file.py"
260 |
261 | with (
262 | patch("utils.model_context.ModelContext") as mock_model_context_cls,
263 | patch.object(tool, "handle_prompt_file") as mock_handle_prompt,
264 | patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files,
265 | ):
266 | mock_provider = MagicMock()
267 | mock_provider.get_provider_type.return_value = MagicMock(value="google")
268 | mock_provider.generate_content.return_value = MagicMock(
269 | content="Success",
270 | usage={"input_tokens": 10, "output_tokens": 20, "total_tokens": 30},
271 | model_name="gemini-2.5-flash",
272 | metadata={"finish_reason": "STOP"},
273 | )
274 |
275 | from utils.model_context import TokenAllocation
276 |
277 | mock_model_context = MagicMock()
278 | mock_model_context.model_name = "gemini-2.5-flash"
279 | mock_model_context.provider = mock_provider
280 | mock_model_context.capabilities = MagicMock(supports_extended_thinking=False)
281 | mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
282 | total_tokens=1_000_000,
283 | content_tokens=800_000,
284 | response_tokens=200_000,
285 | file_tokens=320_000,
286 | history_tokens=320_000,
287 | )
288 | mock_model_context_cls.return_value = mock_model_context
289 |
290 | # Return the prompt content and updated files list (without prompt.txt)
291 | mock_handle_prompt.return_value = ("Large prompt content from file", [other_file])
292 |
293 | # Mock the centralized file preparation method
294 | mock_prepare_files.return_value = ("File content", [other_file])
295 |
296 | # Use a small prompt to avoid triggering size limit
297 | await tool.execute(
298 | {
299 | "prompt": "Test prompt",
300 | "absolute_file_paths": [temp_prompt_file, other_file],
301 | "working_directory_absolute_path": os.path.dirname(temp_prompt_file),
302 | }
303 | )
304 |
305 | # Verify handle_prompt_file was called with the original files list
306 | mock_handle_prompt.assert_called_once_with([temp_prompt_file, other_file])
307 |
308 | # Verify _prepare_file_content_for_prompt was called with the updated files list (without prompt.txt)
309 | mock_prepare_files.assert_called_once()
310 | files_arg = mock_prepare_files.call_args[0][0]
311 | assert len(files_arg) == 1
312 | assert files_arg[0] == other_file
313 |
314 | temp_dir = os.path.dirname(temp_prompt_file)
315 | shutil.rmtree(temp_dir)
316 |
317 | @pytest.mark.asyncio
318 | async def test_boundary_case_exactly_at_limit(self):
319 | """Test prompt exactly at MCP_PROMPT_SIZE_LIMIT characters (should pass with the fix)."""
320 | tool = ChatTool()
321 | exact_prompt = "x" * MCP_PROMPT_SIZE_LIMIT
322 |
323 | # Mock the model provider to avoid real API calls
324 | with patch.object(tool, "get_model_provider") as mock_get_provider:
325 | mock_provider = MagicMock()
326 | mock_provider.get_provider_type.return_value = MagicMock(value="google")
327 | mock_provider.get_capabilities.return_value = MagicMock(supports_extended_thinking=False)
328 | mock_provider.generate_content.return_value = MagicMock(
329 | content="Response to the large prompt",
330 | usage={"input_tokens": 12000, "output_tokens": 10, "total_tokens": 12010},
331 | model_name="gemini-2.5-flash",
332 | metadata={"finish_reason": "STOP"},
333 | )
334 | mock_get_provider.return_value = mock_provider
335 |
336 | # With the fix, this should now pass because we check at MCP transport boundary before adding internal content
337 | temp_dir = tempfile.mkdtemp()
338 | try:
339 | try:
340 | result = await tool.execute({"prompt": exact_prompt, "working_directory_absolute_path": temp_dir})
341 | except ToolExecutionError as exc:
342 | output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
343 | else:
344 | output = json.loads(result[0].text)
345 | finally:
346 | shutil.rmtree(temp_dir, ignore_errors=True)
347 | assert output["status"] != "resend_prompt"
348 |
349 | @pytest.mark.asyncio
350 | async def test_boundary_case_just_over_limit(self):
351 | """Test prompt just over MCP_PROMPT_SIZE_LIMIT characters (should trigger file request)."""
352 | tool = ChatTool()
353 | over_prompt = "x" * (MCP_PROMPT_SIZE_LIMIT + 1)
354 |
355 | temp_dir = tempfile.mkdtemp()
356 | try:
357 | try:
358 | result = await tool.execute({"prompt": over_prompt, "working_directory_absolute_path": temp_dir})
359 | except ToolExecutionError as exc:
360 | output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
361 | else:
362 | output = json.loads(result[0].text)
363 | finally:
364 | shutil.rmtree(temp_dir, ignore_errors=True)
365 | assert output["status"] == "resend_prompt"
366 |
367 | @pytest.mark.asyncio
368 | async def test_empty_prompt_no_file(self):
369 | """Test empty prompt without prompt.txt file."""
370 | tool = ChatTool()
371 |
372 | with patch.object(tool, "get_model_provider") as mock_get_provider:
373 | mock_provider = MagicMock()
374 | mock_provider.get_provider_type.return_value = MagicMock(value="google")
375 | mock_provider.get_capabilities.return_value = MagicMock(supports_extended_thinking=False)
376 | mock_provider.generate_content.return_value = MagicMock(
377 | content="Success",
378 | usage={"input_tokens": 10, "output_tokens": 20, "total_tokens": 30},
379 | model_name="gemini-2.5-flash",
380 | metadata={"finish_reason": "STOP"},
381 | )
382 | mock_get_provider.return_value = mock_provider
383 |
384 | temp_dir = tempfile.mkdtemp()
385 | try:
386 | try:
387 | result = await tool.execute({"prompt": "", "working_directory_absolute_path": temp_dir})
388 | except ToolExecutionError as exc:
389 | output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
390 | else:
391 | output = json.loads(result[0].text)
392 | finally:
393 | shutil.rmtree(temp_dir, ignore_errors=True)
394 | assert output["status"] != "resend_prompt"
395 |
396 | @pytest.mark.asyncio
397 | async def test_prompt_file_read_error(self):
398 | """Test handling when prompt.txt can't be read."""
399 | from tests.mock_helpers import create_mock_provider
400 |
401 | tool = ChatTool()
402 | bad_file = "/nonexistent/prompt.txt"
403 |
404 | with (
405 | patch.object(tool, "get_model_provider") as mock_get_provider,
406 | patch("utils.model_context.ModelContext") as mock_model_context_class,
407 | ):
408 |
409 | mock_provider = create_mock_provider(model_name="gemini-2.5-flash", context_window=1_048_576)
410 | mock_provider.generate_content.return_value.content = "Success"
411 | mock_get_provider.return_value = mock_provider
412 |
413 | # Mock ModelContext to avoid the comparison issue
414 | from utils.model_context import TokenAllocation
415 |
416 | mock_model_context = MagicMock()
417 | mock_model_context.model_name = "gemini-2.5-flash"
418 | mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
419 | total_tokens=1_048_576,
420 | content_tokens=838_861,
421 | response_tokens=209_715,
422 | file_tokens=335_544,
423 | history_tokens=335_544,
424 | )
425 | mock_model_context_class.return_value = mock_model_context
426 |
427 | # Should continue with empty prompt when file can't be read
428 | temp_dir = tempfile.mkdtemp()
429 | try:
430 | try:
431 | result = await tool.execute(
432 | {"prompt": "", "absolute_file_paths": [bad_file], "working_directory_absolute_path": temp_dir}
433 | )
434 | except ToolExecutionError as exc:
435 | output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
436 | else:
437 | output = json.loads(result[0].text)
438 | finally:
439 | shutil.rmtree(temp_dir, ignore_errors=True)
440 | assert output["status"] != "resend_prompt"
441 |
442 | @pytest.mark.asyncio
443 | async def test_large_file_context_does_not_trigger_mcp_prompt_limit(self, tmp_path):
444 | """Large context files should not be blocked by MCP prompt limit enforcement."""
445 | from tests.mock_helpers import create_mock_provider
446 | from utils.model_context import TokenAllocation
447 |
448 | tool = ChatTool()
449 |
450 | # Create a file significantly larger than MCP_PROMPT_SIZE_LIMIT characters
451 | large_content = "A" * (MCP_PROMPT_SIZE_LIMIT * 5)
452 | large_file = tmp_path / "huge_context.txt"
453 | large_file.write_text(large_content)
454 |
455 | mock_provider = create_mock_provider(model_name="flash")
456 |
457 | class DummyModelContext:
458 | def __init__(self, provider):
459 | self.model_name = "flash"
460 | self._provider = provider
461 | self.capabilities = provider.get_capabilities("flash")
462 |
463 | @property
464 | def provider(self):
465 | return self._provider
466 |
467 | def calculate_token_allocation(self):
468 | return TokenAllocation(
469 | total_tokens=1_048_576,
470 | content_tokens=838_861,
471 | response_tokens=209_715,
472 | file_tokens=335_544,
473 | history_tokens=335_544,
474 | )
475 |
476 | dummy_context = DummyModelContext(mock_provider)
477 |
478 | with patch.object(tool, "get_model_provider", return_value=mock_provider):
479 | result = await tool.execute(
480 | {
481 | "prompt": "Summarize the design decisions",
482 | "absolute_file_paths": [str(large_file)],
483 | "model": "flash",
484 | "working_directory_absolute_path": str(tmp_path),
485 | "_model_context": dummy_context,
486 | }
487 | )
488 |
489 | output = json.loads(result[0].text)
490 | assert output["status"] != "resend_prompt"
491 |
492 | @pytest.mark.asyncio
493 | async def test_mcp_boundary_with_large_internal_context(self):
494 | """
495 | Critical test: Ensure MCP_PROMPT_SIZE_LIMIT only applies to user input (MCP boundary),
496 | NOT to internal context like conversation history, system prompts, or file content.
497 |
498 | This test verifies that even if our internal prompt (with system prompts, history, etc.)
499 | exceeds MCP_PROMPT_SIZE_LIMIT, it should still work as long as the user's input is small.
500 | """
501 |
502 | tool = ChatTool()
503 |
504 | # Small user input that should pass MCP boundary check
505 | small_user_prompt = "What is the weather like?"
506 |
507 | # Mock a huge conversation history that would exceed MCP limits if incorrectly checked
508 | huge_history = "x" * (MCP_PROMPT_SIZE_LIMIT * 2) # 100K chars = way over 50K limit
509 |
510 | temp_dir = tempfile.mkdtemp()
511 | original_prepare_prompt = tool.prepare_prompt
512 |
513 | try:
514 | with (
515 | patch.object(tool, "get_model_provider") as mock_get_provider,
516 | patch("utils.model_context.ModelContext") as mock_model_context_class,
517 | ):
518 | from tests.mock_helpers import create_mock_provider
519 | from utils.model_context import TokenAllocation
520 |
521 | mock_provider = create_mock_provider(model_name="flash")
522 | mock_get_provider.return_value = mock_provider
523 |
524 | mock_model_context = MagicMock()
525 | mock_model_context.model_name = "flash"
526 | mock_model_context.provider = mock_provider
527 | mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
528 | total_tokens=1_048_576,
529 | content_tokens=838_861,
530 | response_tokens=209_715,
531 | file_tokens=335_544,
532 | history_tokens=335_544,
533 | )
534 | mock_model_context_class.return_value = mock_model_context
535 |
536 | async def mock_prepare_prompt(request):
537 | normal_prompt = await original_prepare_prompt(request)
538 | huge_internal_prompt = f"{normal_prompt}\n\n=== HUGE INTERNAL CONTEXT ===\n{huge_history}"
539 | assert len(huge_internal_prompt) > MCP_PROMPT_SIZE_LIMIT
540 | return huge_internal_prompt
541 |
542 | tool.prepare_prompt = mock_prepare_prompt
543 |
544 | result = await tool.execute(
545 | {"prompt": small_user_prompt, "model": "flash", "working_directory_absolute_path": temp_dir}
546 | )
547 | output = json.loads(result[0].text)
548 |
549 | assert output["status"] != "resend_prompt"
550 |
551 | mock_provider.generate_content.assert_called_once()
552 | call_kwargs = mock_provider.generate_content.call_args[1]
553 | actual_prompt = call_kwargs.get("prompt")
554 |
555 | assert len(actual_prompt) > MCP_PROMPT_SIZE_LIMIT
556 | assert huge_history in actual_prompt
557 | assert small_user_prompt in actual_prompt
558 | finally:
559 | tool.prepare_prompt = original_prepare_prompt
560 | shutil.rmtree(temp_dir, ignore_errors=True)
561 |
562 | @pytest.mark.asyncio
563 | async def test_mcp_boundary_vs_internal_processing_distinction(self):
564 | """
565 | Test that clearly demonstrates the distinction between:
566 | 1. MCP transport boundary (user input - SHOULD be limited)
567 | 2. Internal processing (system prompts, files, history - should NOT be limited)
568 | """
569 | tool = ChatTool()
570 |
571 | # Test case 1: Large user input should fail at MCP boundary
572 | large_user_input = "x" * (MCP_PROMPT_SIZE_LIMIT + 1000)
573 | temp_dir = tempfile.mkdtemp()
574 | try:
575 | try:
576 | result = await tool.execute(
577 | {"prompt": large_user_input, "model": "flash", "working_directory_absolute_path": temp_dir}
578 | )
579 | except ToolExecutionError as exc:
580 | output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
581 | else:
582 | output = json.loads(result[0].text)
583 |
584 | assert output["status"] == "resend_prompt" # Should fail
585 | assert "too large for MCP's token limits" in output["content"]
586 |
587 | # Test case 2: Small user input should succeed even with huge internal processing
588 | small_user_input = "Hello"
589 |
590 | try:
591 | result = await tool.execute(
592 | {
593 | "prompt": small_user_input,
594 | "model": "gemini-2.5-flash",
595 | "working_directory_absolute_path": temp_dir,
596 | }
597 | )
598 | except ToolExecutionError as exc:
599 | output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
600 | else:
601 | output = json.loads(result[0].text)
602 |
603 | # The test will fail with dummy API keys, which is expected behavior
604 | # We're mainly testing that the tool processes small prompts correctly without size errors
605 | assert output["status"] != "resend_prompt"
606 | finally:
607 | shutil.rmtree(temp_dir, ignore_errors=True)
608 |
609 | @pytest.mark.asyncio
610 | async def test_continuation_with_huge_conversation_history(self):
611 | """
612 | Test that continuation calls with huge conversation history work correctly.
613 | This simulates the exact scenario where conversation history builds up and exceeds
614 | MCP_PROMPT_SIZE_LIMIT but should still work since history is internal processing.
615 | """
616 | tool = ChatTool()
617 |
618 | # Small user input for continuation
619 | small_continuation_prompt = "Continue the discussion"
620 |
621 | # Mock huge conversation history (simulates many turns of conversation)
622 | # Calculate repetitions needed to exceed MCP_PROMPT_SIZE_LIMIT
623 | base_text = "=== CONVERSATION HISTORY ===\n"
624 | repeat_text = "Previous message content\n"
625 | # Add buffer to ensure we exceed the limit
626 | target_size = MCP_PROMPT_SIZE_LIMIT + 1000
627 | available_space = target_size - len(base_text)
628 | repetitions_needed = (available_space // len(repeat_text)) + 1
629 |
630 | huge_conversation_history = base_text + (repeat_text * repetitions_needed)
631 |
632 | # Ensure the history exceeds MCP limits
633 | assert len(huge_conversation_history) > MCP_PROMPT_SIZE_LIMIT
634 |
635 | temp_dir = tempfile.mkdtemp()
636 |
637 | with (
638 | patch.object(tool, "get_model_provider") as mock_get_provider,
639 | patch("utils.model_context.ModelContext") as mock_model_context_class,
640 | ):
641 | from tests.mock_helpers import create_mock_provider
642 |
643 | mock_provider = create_mock_provider(model_name="flash")
644 | mock_provider.generate_content.return_value.content = "Continuing our conversation..."
645 | mock_get_provider.return_value = mock_provider
646 |
647 | # Mock ModelContext to avoid the comparison issue
648 | from utils.model_context import TokenAllocation
649 |
650 | mock_model_context = MagicMock()
651 | mock_model_context.model_name = "flash"
652 | mock_model_context.provider = mock_provider
653 | mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
654 | total_tokens=1_048_576,
655 | content_tokens=838_861,
656 | response_tokens=209_715,
657 | file_tokens=335_544,
658 | history_tokens=335_544,
659 | )
660 | mock_model_context_class.return_value = mock_model_context
661 |
662 | # Simulate continuation by having the request contain embedded conversation history
663 | # This mimics what server.py does when it embeds conversation history
664 | request_with_history = {
665 | "prompt": f"{huge_conversation_history}\n\n=== CURRENT REQUEST ===\n{small_continuation_prompt}",
666 | "model": "flash",
667 | "continuation_id": "test_thread_123",
668 | "working_directory_absolute_path": temp_dir,
669 | }
670 |
671 | # Mock the conversation history embedding to simulate server.py behavior
672 | original_execute = tool.__class__.execute
673 |
674 | async def mock_execute_with_history(self, arguments):
675 | # Check if this has continuation_id (simulating server.py logic)
676 | if arguments.get("continuation_id"):
677 | # Simulate the case where conversation history is already embedded in prompt
678 | # by server.py before calling the tool
679 | field_value = arguments.get("prompt", "")
680 | if "=== CONVERSATION HISTORY ===" in field_value:
681 | # Set the flag that history is embedded
682 | self._has_embedded_history = True
683 |
684 | # The prompt field contains both history AND user input
685 | # But we should only check the user input part for MCP boundary
686 | # (This is what our fix ensures happens in prepare_prompt)
687 |
688 | # Call original execute
689 | return await original_execute(self, arguments)
690 |
691 | tool.__class__.execute = mock_execute_with_history
692 |
693 | try:
694 | # This should succeed because:
695 | # 1. The actual user input is small (passes MCP boundary check)
696 | # 2. The huge conversation history is internal processing (not subject to MCP limits)
697 | result = await tool.execute(request_with_history)
698 | output = json.loads(result[0].text)
699 |
700 | # Should succeed even though total prompt with history is huge
701 | assert output["status"] != "resend_prompt"
702 | assert "Continuing our conversation" in output["content"]
703 |
704 | # Verify the model was called with the complete prompt (including huge history)
705 | mock_provider.generate_content.assert_called_once()
706 | call_kwargs = mock_provider.generate_content.call_args[1]
707 | final_prompt = call_kwargs.get("prompt")
708 |
709 | # The final prompt should contain both history and user input
710 | assert huge_conversation_history in final_prompt
711 | assert small_continuation_prompt in final_prompt
712 | # And it should be huge (proving we don't limit internal processing)
713 | assert len(final_prompt) > MCP_PROMPT_SIZE_LIMIT
714 |
715 | finally:
716 | # Restore original execute method
717 | tool.__class__.execute = original_execute
718 | shutil.rmtree(temp_dir, ignore_errors=True)
719 |
720 |
721 | if __name__ == "__main__":
722 | pytest.main([__file__, "-v"])
723 |
```
--------------------------------------------------------------------------------
/tools/refactor.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Refactor tool - Step-by-step refactoring analysis with expert validation
3 |
4 | This tool provides a structured workflow for comprehensive code refactoring analysis.
5 | It guides CLI agent through systematic investigation steps with forced pauses between each step
6 | to ensure thorough code examination, refactoring opportunity identification, and quality
7 | assessment before proceeding. The tool supports complex refactoring scenarios including
8 | code smell detection, decomposition planning, modernization opportunities, and organization improvements.
9 |
10 | Key features:
11 | - Step-by-step refactoring investigation workflow with progress tracking
12 | - Context-aware file embedding (references during investigation, full content for analysis)
13 | - Automatic refactoring opportunity tracking with type and severity classification
14 | - Expert analysis integration with external models
15 | - Support for focused refactoring types (codesmells, decompose, modernize, organization)
16 | - Confidence-based workflow optimization with refactor completion tracking
17 | """
18 |
19 | import logging
20 | from typing import TYPE_CHECKING, Any, Literal, Optional
21 |
22 | from pydantic import Field, model_validator
23 |
24 | if TYPE_CHECKING:
25 | from tools.models import ToolModelCategory
26 |
27 | from config import TEMPERATURE_ANALYTICAL
28 | from systemprompts import REFACTOR_PROMPT
29 | from tools.shared.base_models import WorkflowRequest
30 |
31 | from .workflow.base import WorkflowTool
32 |
33 | logger = logging.getLogger(__name__)
34 |
35 | # Tool-specific field descriptions for refactor tool
36 | REFACTOR_FIELD_DESCRIPTIONS = {
37 | "step": (
38 | "The refactoring plan. Step 1: State strategy. Later steps: Report findings. "
39 | "CRITICAL: Examine code for smells, and opportunities for decomposition, modernization, and organization. "
40 | "Use 'relevant_files' for code. FORBIDDEN: Large code snippets."
41 | ),
42 | "step_number": (
43 | "The index of the current step in the refactoring investigation sequence, beginning at 1. Each step should "
44 | "build upon or revise the previous one."
45 | ),
46 | "total_steps": (
47 | "Your current estimate for how many steps will be needed to complete the refactoring investigation. "
48 | "Adjust as new opportunities emerge."
49 | ),
50 | "next_step_required": (
51 | "Set to true if you plan to continue the investigation with another step. False means you believe the "
52 | "refactoring analysis is complete and ready for expert validation."
53 | ),
54 | "findings": (
55 | "Summary of discoveries from this step, including code smells and opportunities for decomposition, modernization, or organization. "
56 | "Document both strengths and weaknesses. In later steps, confirm or update past findings."
57 | ),
58 | "files_checked": (
59 | "List all files examined (absolute paths). Include even ruled-out files to track exploration path."
60 | ),
61 | "relevant_files": (
62 | "Subset of files_checked with code requiring refactoring (absolute paths). Include files with "
63 | "code smells, decomposition needs, or improvement opportunities."
64 | ),
65 | "relevant_context": (
66 | "List methods/functions central to refactoring opportunities, in 'ClassName.methodName' or 'functionName' format. "
67 | "Prioritize those with code smells or needing improvement."
68 | ),
69 | "issues_found": (
70 | "Refactoring opportunities as dictionaries with 'severity' (critical/high/medium/low), "
71 | "'type' (codesmells/decompose/modernize/organization), and 'description'. "
72 | "Include all improvement opportunities found."
73 | ),
74 | "confidence": (
75 | "Your confidence in refactoring analysis: exploring (starting), incomplete (significant work remaining), "
76 | "partial (some opportunities found, more analysis needed), complete (comprehensive analysis finished, "
77 | "all major opportunities identified). "
78 | "WARNING: Use 'complete' ONLY when fully analyzed and can provide recommendations without expert help. "
79 | "'complete' PREVENTS expert validation. Use 'partial' for large files or uncertain analysis."
80 | ),
81 | "images": (
82 | "Optional list of absolute paths to architecture diagrams, UI mockups, design documents, or visual references "
83 | "that help with refactoring context. Only include if they materially assist understanding or assessment."
84 | ),
85 | "refactor_type": "Type of refactoring analysis to perform (codesmells, decompose, modernize, organization)",
86 | "focus_areas": "Specific areas to focus on (e.g., 'performance', 'readability', 'maintainability', 'security')",
87 | "style_guide_examples": (
88 | "Optional existing code files to use as style/pattern reference (must be FULL absolute paths to real files / "
89 | "folders - DO NOT SHORTEN). These files represent the target coding style and patterns for the project."
90 | ),
91 | }
92 |
93 |
94 | class RefactorRequest(WorkflowRequest):
95 | """Request model for refactor workflow investigation steps"""
96 |
97 | # Required fields for each investigation step
98 | step: str = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["step"])
99 | step_number: int = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["step_number"])
100 | total_steps: int = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["total_steps"])
101 | next_step_required: bool = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["next_step_required"])
102 |
103 | # Investigation tracking fields
104 | findings: str = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["findings"])
105 | files_checked: list[str] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["files_checked"])
106 | relevant_files: list[str] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["relevant_files"])
107 | relevant_context: list[str] = Field(
108 | default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["relevant_context"]
109 | )
110 | issues_found: list[dict] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["issues_found"])
111 | confidence: Optional[Literal["exploring", "incomplete", "partial", "complete"]] = Field(
112 | "incomplete", description=REFACTOR_FIELD_DESCRIPTIONS["confidence"]
113 | )
114 |
115 | # Optional images for visual context
116 | images: Optional[list[str]] = Field(default=None, description=REFACTOR_FIELD_DESCRIPTIONS["images"])
117 |
118 | # Refactor-specific fields (only used in step 1 to initialize)
119 | refactor_type: Optional[Literal["codesmells", "decompose", "modernize", "organization"]] = Field(
120 | "codesmells", description=REFACTOR_FIELD_DESCRIPTIONS["refactor_type"]
121 | )
122 | focus_areas: Optional[list[str]] = Field(None, description=REFACTOR_FIELD_DESCRIPTIONS["focus_areas"])
123 | style_guide_examples: Optional[list[str]] = Field(
124 | None, description=REFACTOR_FIELD_DESCRIPTIONS["style_guide_examples"]
125 | )
126 |
127 | # Override inherited fields to exclude them from schema (except model which needs to be available)
128 | temperature: Optional[float] = Field(default=None, exclude=True)
129 | thinking_mode: Optional[str] = Field(default=None, exclude=True)
130 |
131 | @model_validator(mode="after")
132 | def validate_step_one_requirements(self):
133 | """Ensure step 1 has required relevant_files field."""
134 | if self.step_number == 1 and not self.relevant_files:
135 | raise ValueError(
136 | "Step 1 requires 'relevant_files' field to specify code files or directories to analyze for refactoring"
137 | )
138 | return self
139 |
140 |
141 | class RefactorTool(WorkflowTool):
142 | """
143 | Refactor tool for step-by-step refactoring analysis and expert validation.
144 |
145 | This tool implements a structured refactoring workflow that guides users through
146 | methodical investigation steps, ensuring thorough code examination, refactoring opportunity
147 | identification, and improvement assessment before reaching conclusions. It supports complex
148 | refactoring scenarios including code smell detection, decomposition planning, modernization
149 | opportunities, and organization improvements.
150 | """
151 |
152 | def __init__(self):
153 | super().__init__()
154 | self.initial_request = None
155 | self.refactor_config = {}
156 |
157 | def get_name(self) -> str:
158 | return "refactor"
159 |
160 | def get_description(self) -> str:
161 | return (
162 | "Analyzes code for refactoring opportunities with systematic investigation. "
163 | "Use for code smell detection, decomposition planning, modernization, and maintainability improvements. "
164 | "Guides through structured analysis with expert validation."
165 | )
166 |
167 | def get_system_prompt(self) -> str:
168 | return REFACTOR_PROMPT
169 |
170 | def get_default_temperature(self) -> float:
171 | return TEMPERATURE_ANALYTICAL
172 |
173 | def get_model_category(self) -> "ToolModelCategory":
174 | """Refactor workflow requires thorough analysis and reasoning"""
175 | from tools.models import ToolModelCategory
176 |
177 | return ToolModelCategory.EXTENDED_REASONING
178 |
179 | def get_workflow_request_model(self):
180 | """Return the refactor workflow-specific request model."""
181 | return RefactorRequest
182 |
183 | def get_input_schema(self) -> dict[str, Any]:
184 | """Generate input schema using WorkflowSchemaBuilder with refactor-specific overrides."""
185 | from .workflow.schema_builders import WorkflowSchemaBuilder
186 |
187 | # Refactor workflow-specific field overrides
188 | refactor_field_overrides = {
189 | "step": {
190 | "type": "string",
191 | "description": REFACTOR_FIELD_DESCRIPTIONS["step"],
192 | },
193 | "step_number": {
194 | "type": "integer",
195 | "minimum": 1,
196 | "description": REFACTOR_FIELD_DESCRIPTIONS["step_number"],
197 | },
198 | "total_steps": {
199 | "type": "integer",
200 | "minimum": 1,
201 | "description": REFACTOR_FIELD_DESCRIPTIONS["total_steps"],
202 | },
203 | "next_step_required": {
204 | "type": "boolean",
205 | "description": REFACTOR_FIELD_DESCRIPTIONS["next_step_required"],
206 | },
207 | "findings": {
208 | "type": "string",
209 | "description": REFACTOR_FIELD_DESCRIPTIONS["findings"],
210 | },
211 | "files_checked": {
212 | "type": "array",
213 | "items": {"type": "string"},
214 | "description": REFACTOR_FIELD_DESCRIPTIONS["files_checked"],
215 | },
216 | "relevant_files": {
217 | "type": "array",
218 | "items": {"type": "string"},
219 | "description": REFACTOR_FIELD_DESCRIPTIONS["relevant_files"],
220 | },
221 | "confidence": {
222 | "type": "string",
223 | "enum": ["exploring", "incomplete", "partial", "complete"],
224 | "default": "incomplete",
225 | "description": REFACTOR_FIELD_DESCRIPTIONS["confidence"],
226 | },
227 | "issues_found": {
228 | "type": "array",
229 | "items": {"type": "object"},
230 | "description": REFACTOR_FIELD_DESCRIPTIONS["issues_found"],
231 | },
232 | "images": {
233 | "type": "array",
234 | "items": {"type": "string"},
235 | "description": REFACTOR_FIELD_DESCRIPTIONS["images"],
236 | },
237 | # Refactor-specific fields (for step 1)
238 | # Note: Use relevant_files field instead of files for consistency
239 | "refactor_type": {
240 | "type": "string",
241 | "enum": ["codesmells", "decompose", "modernize", "organization"],
242 | "default": "codesmells",
243 | "description": REFACTOR_FIELD_DESCRIPTIONS["refactor_type"],
244 | },
245 | "focus_areas": {
246 | "type": "array",
247 | "items": {"type": "string"},
248 | "description": REFACTOR_FIELD_DESCRIPTIONS["focus_areas"],
249 | },
250 | "style_guide_examples": {
251 | "type": "array",
252 | "items": {"type": "string"},
253 | "description": REFACTOR_FIELD_DESCRIPTIONS["style_guide_examples"],
254 | },
255 | }
256 |
257 | # Use WorkflowSchemaBuilder with refactor-specific tool fields
258 | return WorkflowSchemaBuilder.build_schema(
259 | tool_specific_fields=refactor_field_overrides,
260 | model_field_schema=self.get_model_field_schema(),
261 | auto_mode=self.is_effective_auto_mode(),
262 | tool_name=self.get_name(),
263 | )
264 |
265 | def get_required_actions(
266 | self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
267 | ) -> list[str]:
268 | """Define required actions for each investigation phase."""
269 | if step_number == 1:
270 | # Initial refactoring investigation tasks
271 | return [
272 | "Read and understand the code files specified for refactoring analysis",
273 | "Examine the overall structure, architecture, and design patterns used",
274 | "Identify potential code smells: long methods, large classes, duplicate code, complex conditionals",
275 | "Look for decomposition opportunities: oversized components that could be broken down",
276 | "Check for modernization opportunities: outdated patterns, deprecated features, newer language constructs",
277 | "Assess organization: logical grouping, file structure, naming conventions, module boundaries",
278 | "Document specific refactoring opportunities with file locations and line numbers",
279 | ]
280 | elif confidence in ["exploring", "incomplete"]:
281 | # Need deeper investigation
282 | return [
283 | "Examine specific code sections you've identified as needing refactoring",
284 | "Analyze code smells in detail: complexity, coupling, cohesion issues",
285 | "Investigate decomposition opportunities: identify natural breaking points for large components",
286 | "Look for modernization possibilities: language features, patterns, libraries that could improve the code",
287 | "Check organization issues: related functionality that could be better grouped or structured",
288 | "Trace dependencies and relationships between components to understand refactoring impact",
289 | "Prioritize refactoring opportunities by impact and effort required",
290 | ]
291 | elif confidence == "partial":
292 | # Close to completion - need final verification
293 | return [
294 | "Verify all identified refactoring opportunities have been properly documented with locations",
295 | "Check for any missed opportunities in areas not yet thoroughly examined",
296 | "Confirm that refactoring suggestions align with the specified refactor_type and focus_areas",
297 | "Ensure refactoring opportunities are prioritized by severity and impact",
298 | "Validate that proposed changes would genuinely improve code quality without breaking functionality",
299 | "Double-check that all relevant files and code elements are captured in your analysis",
300 | ]
301 | else:
302 | # General investigation needed
303 | return [
304 | "Continue examining the codebase for additional refactoring opportunities",
305 | "Gather more evidence using appropriate code analysis techniques",
306 | "Test your assumptions about code quality and improvement possibilities",
307 | "Look for patterns that confirm or refute your current refactoring assessment",
308 | "Focus on areas that haven't been thoroughly examined for refactoring potential",
309 | ]
310 |
311 | def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
312 | """
313 | Decide when to call external model based on investigation completeness.
314 |
315 | Don't call expert analysis if the CLI agent has certain confidence and complete refactoring - trust their judgment.
316 | """
317 | # Check if user requested to skip assistant model
318 | if request and not self.get_request_use_assistant_model(request):
319 | return False
320 |
321 | # Check if refactoring work is complete
322 | if request and request.confidence == "complete":
323 | return False
324 |
325 | # Check if we have meaningful investigation data
326 | return (
327 | len(consolidated_findings.relevant_files) > 0
328 | or len(consolidated_findings.findings) >= 2
329 | or len(consolidated_findings.issues_found) > 0
330 | )
331 |
332 | def prepare_expert_analysis_context(self, consolidated_findings) -> str:
333 | """Prepare context for external model call for final refactoring validation."""
334 | context_parts = [
335 | f"=== REFACTORING ANALYSIS REQUEST ===\\n{self.initial_request or 'Refactoring workflow initiated'}\\n=== END REQUEST ==="
336 | ]
337 |
338 | # Add investigation summary
339 | investigation_summary = self._build_refactoring_summary(consolidated_findings)
340 | context_parts.append(
341 | f"\\n=== AGENT'S REFACTORING INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
342 | )
343 |
344 | # Add refactor configuration context if available
345 | if self.refactor_config:
346 | config_text = "\\n".join(f"- {key}: {value}" for key, value in self.refactor_config.items() if value)
347 | context_parts.append(f"\\n=== REFACTOR CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")
348 |
349 | # Add relevant code elements if available
350 | if consolidated_findings.relevant_context:
351 | methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
352 | context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")
353 |
354 | # Add refactoring opportunities found if available
355 | if consolidated_findings.issues_found:
356 | opportunities_text = "\\n".join(
357 | f"[{issue.get('severity', 'unknown').upper()}] {issue.get('type', 'unknown').upper()}: {issue.get('description', 'No description')}"
358 | for issue in consolidated_findings.issues_found
359 | )
360 | context_parts.append(
361 | f"\\n=== REFACTORING OPPORTUNITIES ===\\n{opportunities_text}\\n=== END OPPORTUNITIES ==="
362 | )
363 |
364 | # Add assessment evolution if available
365 | if consolidated_findings.hypotheses:
366 | assessments_text = "\\n".join(
367 | f"Step {h['step']} ({h['confidence']} confidence): {h['hypothesis']}"
368 | for h in consolidated_findings.hypotheses
369 | )
370 | context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")
371 |
372 | # Add images if available
373 | if consolidated_findings.images:
374 | images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
375 | context_parts.append(
376 | f"\\n=== VISUAL REFACTORING INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
377 | )
378 |
379 | return "\\n".join(context_parts)
380 |
381 | def _build_refactoring_summary(self, consolidated_findings) -> str:
382 | """Prepare a comprehensive summary of the refactoring investigation."""
383 | summary_parts = [
384 | "=== SYSTEMATIC REFACTORING INVESTIGATION SUMMARY ===",
385 | f"Total steps: {len(consolidated_findings.findings)}",
386 | f"Files examined: {len(consolidated_findings.files_checked)}",
387 | f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
388 | f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
389 | f"Refactoring opportunities identified: {len(consolidated_findings.issues_found)}",
390 | "",
391 | "=== INVESTIGATION PROGRESSION ===",
392 | ]
393 |
394 | for finding in consolidated_findings.findings:
395 | summary_parts.append(finding)
396 |
397 | return "\\n".join(summary_parts)
398 |
399 | def should_include_files_in_expert_prompt(self) -> bool:
400 | """Include files in expert analysis for comprehensive refactoring validation."""
401 | return True
402 |
403 | def should_embed_system_prompt(self) -> bool:
404 | """Embed system prompt in expert analysis for proper context."""
405 | return True
406 |
407 | def get_expert_thinking_mode(self) -> str:
408 | """Use high thinking mode for thorough refactoring analysis."""
409 | return "high"
410 |
411 | def get_expert_analysis_instruction(self) -> str:
412 | """Get specific instruction for refactoring expert analysis."""
413 | return (
414 | "Please provide comprehensive refactoring analysis based on the investigation findings. "
415 | "Focus on validating the identified opportunities, ensuring completeness of the analysis, "
416 | "and providing final recommendations for refactoring implementation, following the structured "
417 | "format specified in the system prompt."
418 | )
419 |
420 | # Hook method overrides for refactor-specific behavior
421 |
422 | def prepare_step_data(self, request) -> dict:
423 | """
424 | Map refactor workflow-specific fields for internal processing.
425 | """
426 | step_data = {
427 | "step": request.step,
428 | "step_number": request.step_number,
429 | "findings": request.findings,
430 | "files_checked": request.files_checked,
431 | "relevant_files": request.relevant_files,
432 | "relevant_context": request.relevant_context,
433 | "issues_found": request.issues_found,
434 | "confidence": request.confidence,
435 | "hypothesis": request.findings, # Map findings to hypothesis for compatibility
436 | "images": request.images or [],
437 | }
438 | return step_data
439 |
440 | def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
441 | """
442 | Refactor workflow skips expert analysis when the CLI agent has "complete" confidence.
443 | """
444 | return request.confidence == "complete" and not request.next_step_required
445 |
446 | def store_initial_issue(self, step_description: str):
447 | """Store initial request for expert analysis."""
448 | self.initial_request = step_description
449 |
450 | # Inheritance hook methods for refactor-specific behavior
451 |
452 | # Override inheritance hooks for refactor-specific behavior
453 |
454 | def get_completion_status(self) -> str:
455 | """Refactor tools use refactor-specific status."""
456 | return "refactoring_analysis_complete_ready_for_implementation"
457 |
458 | def get_completion_data_key(self) -> str:
459 | """Refactor uses 'complete_refactoring' key."""
460 | return "complete_refactoring"
461 |
462 | def get_final_analysis_from_request(self, request):
463 | """Refactor tools use 'findings' field."""
464 | return request.findings
465 |
466 | def get_confidence_level(self, request) -> str:
467 | """Refactor tools use 'complete' for high confidence."""
468 | return "complete"
469 |
470 | def get_completion_message(self) -> str:
471 | """Refactor-specific completion message."""
472 | return (
473 | "Refactoring analysis complete with COMPLETE confidence. You have identified all significant "
474 | "refactoring opportunities and provided comprehensive analysis. MANDATORY: Present the user with "
475 | "the complete refactoring results organized by type and severity, and IMMEDIATELY proceed with "
476 | "implementing the highest priority refactoring opportunities or provide specific guidance for "
477 | "improvements. Focus on actionable refactoring steps."
478 | )
479 |
480 | def get_skip_reason(self) -> str:
481 | """Refactor-specific skip reason."""
482 | return "Completed comprehensive refactoring analysis with full confidence locally"
483 |
484 | def get_skip_expert_analysis_status(self) -> str:
485 | """Refactor-specific expert analysis skip status."""
486 | return "skipped_due_to_complete_refactoring_confidence"
487 |
488 | def prepare_work_summary(self) -> str:
489 | """Refactor-specific work summary."""
490 | return self._build_refactoring_summary(self.consolidated_findings)
491 |
492 | def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
493 | """
494 | Refactor-specific completion message.
495 |
496 | Args:
497 | expert_analysis_used: True if expert analysis was successfully executed
498 | """
499 | base_message = (
500 | "REFACTORING ANALYSIS IS COMPLETE. You MUST now summarize and present ALL refactoring opportunities "
501 | "organized by type (codesmells → decompose → modernize → organization) and severity (Critical → High → "
502 | "Medium → Low), specific code locations with line numbers, and exact recommendations for improvement. "
503 | "Clearly prioritize the top 3 refactoring opportunities that need immediate attention. Provide concrete, "
504 | "actionable guidance for each opportunity—make it easy for a developer to understand exactly what needs "
505 | "to be refactored and how to implement the improvements."
506 | )
507 |
508 | # Add expert analysis guidance only when expert analysis was actually used
509 | if expert_analysis_used:
510 | expert_guidance = self.get_expert_analysis_guidance()
511 | if expert_guidance:
512 | return f"{base_message}\n\n{expert_guidance}"
513 |
514 | return base_message
515 |
516 | def get_expert_analysis_guidance(self) -> str:
517 | """
518 | Get additional guidance for handling expert analysis results in refactor context.
519 |
520 | Returns:
521 | Additional guidance text for validating and using expert analysis findings
522 | """
523 | return (
524 | "IMPORTANT: Expert refactoring analysis has been provided above. You MUST review "
525 | "the expert's architectural insights and refactoring recommendations. Consider whether "
526 | "the expert's suggestions align with the codebase's evolution trajectory and current "
527 | "team priorities. Pay special attention to any breaking changes, migration complexity, "
528 | "or performance implications highlighted by the expert. Present a balanced view that "
529 | "considers both immediate benefits and long-term maintainability."
530 | )
531 |
532 | def get_step_guidance_message(self, request) -> str:
533 | """
534 | Refactor-specific step guidance with detailed investigation instructions.
535 | """
536 | step_guidance = self.get_refactor_step_guidance(request.step_number, request.confidence, request)
537 | return step_guidance["next_steps"]
538 |
539 | def get_refactor_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
540 | """
541 | Provide step-specific guidance for refactor workflow.
542 | """
543 | # Generate the next steps instruction based on required actions
544 | required_actions = self.get_required_actions(step_number, confidence, request.findings, request.total_steps)
545 |
546 | if step_number == 1:
547 | next_steps = (
548 | f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
549 | f"the code files thoroughly for refactoring opportunities using appropriate tools. CRITICAL AWARENESS: "
550 | f"You need to identify code smells, decomposition opportunities, modernization possibilities, and "
551 | f"organization improvements across the specified refactor_type. Look for complexity issues, outdated "
552 | f"patterns, oversized components, and structural problems. Use file reading tools, code analysis, and "
553 | f"systematic examination to gather comprehensive refactoring information. Only call {self.get_name()} "
554 | f"again AFTER completing your investigation. When you call {self.get_name()} next time, use "
555 | f"step_number: {step_number + 1} and report specific files examined, refactoring opportunities found, "
556 | f"and improvement assessments discovered."
557 | )
558 | elif confidence in ["exploring", "incomplete"]:
559 | next_steps = (
560 | f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
561 | f"deeper refactoring analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
562 | + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
563 | + f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
564 | + "completing these refactoring analysis tasks."
565 | )
566 | elif confidence == "partial":
567 | next_steps = (
568 | f"WAIT! Your refactoring analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
569 | + "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
570 | + f"\\n\\nREMEMBER: Ensure you have identified all significant refactoring opportunities across all types and "
571 | f"verified the completeness of your analysis. Document opportunities with specific file references and "
572 | f"line numbers where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
573 | )
574 | else:
575 | next_steps = (
576 | f"PAUSE REFACTORING ANALYSIS. Before calling {self.get_name()} step {step_number + 1}, you MUST examine more code thoroughly. "
577 | + "Required: "
578 | + ", ".join(required_actions[:2])
579 | + ". "
580 | + f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
581 | f"NEW evidence from actual refactoring analysis, not just theories. NO recursive {self.get_name()} calls "
582 | f"without investigation work!"
583 | )
584 |
585 | return {"next_steps": next_steps}
586 |
587 | def customize_workflow_response(self, response_data: dict, request) -> dict:
588 | """
589 | Customize response to match refactor workflow format.
590 | """
591 | # Store initial request on first step
592 | if request.step_number == 1:
593 | self.initial_request = request.step
594 | # Store refactor configuration for expert analysis
595 | if request.relevant_files:
596 | self.refactor_config = {
597 | "relevant_files": request.relevant_files,
598 | "refactor_type": request.refactor_type,
599 | "focus_areas": request.focus_areas,
600 | "style_guide_examples": request.style_guide_examples,
601 | }
602 |
603 | # Convert generic status names to refactor-specific ones
604 | tool_name = self.get_name()
605 | status_mapping = {
606 | f"{tool_name}_in_progress": "refactoring_analysis_in_progress",
607 | f"pause_for_{tool_name}": "pause_for_refactoring_analysis",
608 | f"{tool_name}_required": "refactoring_analysis_required",
609 | f"{tool_name}_complete": "refactoring_analysis_complete",
610 | }
611 |
612 | if response_data["status"] in status_mapping:
613 | response_data["status"] = status_mapping[response_data["status"]]
614 |
615 | # Rename status field to match refactor workflow
616 | if f"{tool_name}_status" in response_data:
617 | response_data["refactoring_status"] = response_data.pop(f"{tool_name}_status")
618 | # Add refactor-specific status fields
619 | refactor_types = {}
620 | for issue in self.consolidated_findings.issues_found:
621 | issue_type = issue.get("type", "unknown")
622 | if issue_type not in refactor_types:
623 | refactor_types[issue_type] = 0
624 | refactor_types[issue_type] += 1
625 | response_data["refactoring_status"]["opportunities_by_type"] = refactor_types
626 | response_data["refactoring_status"]["refactor_confidence"] = request.confidence
627 |
628 | # Map complete_refactor to complete_refactoring
629 | if f"complete_{tool_name}" in response_data:
630 | response_data["complete_refactoring"] = response_data.pop(f"complete_{tool_name}")
631 |
632 | # Map the completion flag to match refactor workflow
633 | if f"{tool_name}_complete" in response_data:
634 | response_data["refactoring_complete"] = response_data.pop(f"{tool_name}_complete")
635 |
636 | return response_data
637 |
638 | # Required abstract methods from BaseTool
639 | def get_request_model(self):
640 | """Return the refactor workflow-specific request model."""
641 | return RefactorRequest
642 |
643 | async def prepare_prompt(self, request) -> str:
644 | """Not used - workflow tools use execute_workflow()."""
645 | return "" # Workflow tools use execute_workflow() directly
646 |
```
--------------------------------------------------------------------------------
/utils/file_utils.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | File reading utilities with directory support and token management
3 |
4 | This module provides secure file access functionality for the MCP server.
5 | It implements critical security measures to prevent unauthorized file access
6 | and manages token limits to ensure efficient API usage.
7 |
8 | Key Features:
9 | - Path validation and sandboxing to prevent directory traversal attacks
10 | - Support for both individual files and recursive directory reading
11 | - Token counting and management to stay within API limits
12 | - Automatic file type detection and filtering
13 | - Comprehensive error handling with informative messages
14 |
15 | Security Model:
16 | - All file access is restricted to PROJECT_ROOT and its subdirectories
17 | - Absolute paths are required to prevent ambiguity
18 | - Symbolic links are resolved to ensure they stay within bounds
19 |
20 | CONVERSATION MEMORY INTEGRATION:
21 | This module works with the conversation memory system to support efficient
22 | multi-turn file handling:
23 |
24 | 1. DEDUPLICATION SUPPORT:
25 | - File reading functions are called by conversation-aware tools
26 | - Supports newest-first file prioritization by providing accurate token estimation
27 | - Enables efficient file content caching and token budget management
28 |
29 | 2. TOKEN BUDGET OPTIMIZATION:
30 | - Provides accurate token estimation for file content before reading
31 | - Supports the dual prioritization strategy by enabling precise budget calculations
32 | - Enables tools to make informed decisions about which files to include
33 |
34 | 3. CROSS-TOOL FILE PERSISTENCE:
35 | - File reading results are used across different tools in conversation chains
36 | - Consistent file access patterns support conversation continuation scenarios
37 | - Error handling preserves conversation flow when files become unavailable
38 | """
39 |
40 | import json
41 | import logging
42 | import os
43 | from datetime import datetime, timezone
44 | from pathlib import Path
45 | from typing import Optional
46 |
47 | from .file_types import BINARY_EXTENSIONS, CODE_EXTENSIONS, IMAGE_EXTENSIONS, TEXT_EXTENSIONS
48 | from .security_config import EXCLUDED_DIRS, is_dangerous_path
49 | from .token_utils import DEFAULT_CONTEXT_WINDOW, estimate_tokens
50 |
51 |
52 | def _is_builtin_custom_models_config(path_str: str) -> bool:
53 | """
54 | Check if path points to the server's built-in custom_models.json config file.
55 |
56 | This only matches the server's internal config, not user-specified CUSTOM_MODELS_CONFIG_PATH.
57 | We identify the built-in config by checking if it resolves to the server's conf directory.
58 |
59 | Args:
60 | path_str: Path to check
61 |
62 | Returns:
63 | True if this is the server's built-in custom_models.json config file
64 | """
65 | try:
66 | path = Path(path_str)
67 |
68 | # Get the server root by going up from this file: utils/file_utils.py -> server_root
69 | server_root = Path(__file__).parent.parent
70 | builtin_config = server_root / "conf" / "custom_models.json"
71 |
72 | # Check if the path resolves to the same file as our built-in config
73 | # This handles both relative and absolute paths to the same file
74 | return path.resolve() == builtin_config.resolve()
75 |
76 | except Exception:
77 | # If path resolution fails, it's not our built-in config
78 | return False
79 |
80 |
81 | logger = logging.getLogger(__name__)
82 |
83 |
84 | def is_mcp_directory(path: Path) -> bool:
85 | """
86 | Check if a directory is the MCP server's own directory.
87 |
88 | This prevents the MCP from including its own code when scanning projects
89 | where the MCP has been cloned as a subdirectory.
90 |
91 | Args:
92 | path: Directory path to check
93 |
94 | Returns:
95 | True if this is the MCP server directory or a subdirectory
96 | """
97 | if not path.is_dir():
98 | return False
99 |
100 | # Get the directory where the MCP server is running from
101 | # __file__ is utils/file_utils.py, so parent.parent is the MCP root
102 | mcp_server_dir = Path(__file__).parent.parent.resolve()
103 |
104 | # Check if the given path is the MCP server directory or a subdirectory
105 | try:
106 | path.resolve().relative_to(mcp_server_dir)
107 | logger.info(f"Detected MCP server directory at {path}, will exclude from scanning")
108 | return True
109 | except ValueError:
110 | # Not a subdirectory of MCP server
111 | return False
112 |
113 |
114 | def get_user_home_directory() -> Optional[Path]:
115 | """
116 | Get the user's home directory.
117 |
118 | Returns:
119 | User's home directory path
120 | """
121 | return Path.home()
122 |
123 |
124 | def is_home_directory_root(path: Path) -> bool:
125 | """
126 | Check if the given path is the user's home directory root.
127 |
128 | This prevents scanning the entire home directory which could include
129 | sensitive data and non-project files.
130 |
131 | Args:
132 | path: Directory path to check
133 |
134 | Returns:
135 | True if this is the home directory root
136 | """
137 | user_home = get_user_home_directory()
138 | if not user_home:
139 | return False
140 |
141 | try:
142 | resolved_path = path.resolve()
143 | resolved_home = user_home.resolve()
144 |
145 | # Check if this is exactly the home directory
146 | if resolved_path == resolved_home:
147 | logger.warning(
148 | f"Attempted to scan user home directory root: {path}. Please specify a subdirectory instead."
149 | )
150 | return True
151 |
152 | # Also check common home directory patterns
153 | path_str = str(resolved_path).lower()
154 | home_patterns = [
155 | "/users/", # macOS
156 | "/home/", # Linux
157 | "c:\\users\\", # Windows
158 | "c:/users/", # Windows with forward slashes
159 | ]
160 |
161 | for pattern in home_patterns:
162 | if pattern in path_str:
163 | # Extract the user directory path
164 | # e.g., /Users/fahad or /home/username
165 | parts = path_str.split(pattern)
166 | if len(parts) > 1:
167 | # Get the part after the pattern
168 | after_pattern = parts[1]
169 | # Check if we're at the user's root (no subdirectories)
170 | if "/" not in after_pattern and "\\" not in after_pattern:
171 | logger.warning(
172 | f"Attempted to scan user home directory root: {path}. "
173 | f"Please specify a subdirectory instead."
174 | )
175 | return True
176 |
177 | except Exception as e:
178 | logger.debug(f"Error checking if path is home directory: {e}")
179 |
180 | return False
181 |
182 |
183 | def detect_file_type(file_path: str) -> str:
184 | """
185 | Detect file type for appropriate processing strategy.
186 |
187 | This function is intended for specific file type handling (e.g., image processing,
188 | binary file analysis, or enhanced file filtering).
189 |
190 | Args:
191 | file_path: Path to the file to analyze
192 |
193 | Returns:
194 | str: "text", "binary", or "image"
195 | """
196 | path = Path(file_path)
197 |
198 | # Check extension first (fast)
199 | extension = path.suffix.lower()
200 | if extension in TEXT_EXTENSIONS:
201 | return "text"
202 | elif extension in IMAGE_EXTENSIONS:
203 | return "image"
204 | elif extension in BINARY_EXTENSIONS:
205 | return "binary"
206 |
207 | # Fallback: check magic bytes for text vs binary
208 | # This is helpful for files without extensions or unknown extensions
209 | try:
210 | with open(path, "rb") as f:
211 | chunk = f.read(1024)
212 | # Simple heuristic: if we can decode as UTF-8, likely text
213 | chunk.decode("utf-8")
214 | return "text"
215 | except UnicodeDecodeError:
216 | return "binary"
217 | except (FileNotFoundError, PermissionError) as e:
218 | logger.warning(f"Could not access file {file_path} for type detection: {e}")
219 | return "unknown"
220 |
221 |
222 | def should_add_line_numbers(file_path: str, include_line_numbers: Optional[bool] = None) -> bool:
223 | """
224 | Determine if line numbers should be added to a file.
225 |
226 | Args:
227 | file_path: Path to the file
228 | include_line_numbers: Explicit preference, or None for auto-detection
229 |
230 | Returns:
231 | bool: True if line numbers should be added
232 | """
233 | if include_line_numbers is not None:
234 | return include_line_numbers
235 |
236 | # Default: DO NOT add line numbers
237 | # Tools that want line numbers must explicitly request them
238 | return False
239 |
240 |
241 | def _normalize_line_endings(content: str) -> str:
242 | """
243 | Normalize line endings for consistent line numbering.
244 |
245 | Args:
246 | content: File content with potentially mixed line endings
247 |
248 | Returns:
249 | str: Content with normalized LF line endings
250 | """
251 | # Normalize all line endings to LF for consistent counting
252 | return content.replace("\r\n", "\n").replace("\r", "\n")
253 |
254 |
255 | def _add_line_numbers(content: str) -> str:
256 | """
257 | Add line numbers to text content for precise referencing.
258 |
259 | Args:
260 | content: Text content to number
261 |
262 | Returns:
263 | str: Content with line numbers in format " 45│ actual code line"
264 | Supports files up to 99,999 lines with dynamic width allocation
265 | """
266 | # Normalize line endings first
267 | normalized_content = _normalize_line_endings(content)
268 | lines = normalized_content.split("\n")
269 |
270 | # Dynamic width allocation based on total line count
271 | # This supports files of any size by computing required width
272 | total_lines = len(lines)
273 | width = len(str(total_lines))
274 | width = max(width, 4) # Minimum padding for readability
275 |
276 | # Format with dynamic width and clear separator
277 | numbered_lines = [f"{i + 1:{width}d}│ {line}" for i, line in enumerate(lines)]
278 |
279 | return "\n".join(numbered_lines)
280 |
281 |
282 | def resolve_and_validate_path(path_str: str) -> Path:
283 | """
284 | Resolves and validates a path against security policies.
285 |
286 | This function ensures safe file access by:
287 | 1. Requiring absolute paths (no ambiguity)
288 | 2. Resolving symlinks to prevent deception
289 | 3. Blocking access to dangerous system directories
290 |
291 | Args:
292 | path_str: Path string (must be absolute)
293 |
294 | Returns:
295 | Resolved Path object that is safe to access
296 |
297 | Raises:
298 | ValueError: If path is not absolute or otherwise invalid
299 | PermissionError: If path is in a dangerous location
300 | """
301 | # Step 1: Create a Path object
302 | user_path = Path(path_str)
303 |
304 | # Step 2: Security Policy - Require absolute paths
305 | # Relative paths could be interpreted differently depending on working directory
306 | if not user_path.is_absolute():
307 | raise ValueError(f"Relative paths are not supported. Please provide an absolute path.\nReceived: {path_str}")
308 |
309 | # Step 3: Resolve the absolute path (follows symlinks, removes .. and .)
310 | # This is critical for security as it reveals the true destination of symlinks
311 | resolved_path = user_path.resolve()
312 |
313 | # Step 4: Check against dangerous paths
314 | if is_dangerous_path(resolved_path):
315 | logger.warning(f"Access denied - dangerous path: {resolved_path}")
316 | raise PermissionError(f"Access to system directory denied: {path_str}")
317 |
318 | # Step 5: Check if it's the home directory root
319 | if is_home_directory_root(resolved_path):
320 | raise PermissionError(
321 | f"Cannot scan entire home directory: {path_str}\n" f"Please specify a subdirectory within your home folder."
322 | )
323 |
324 | return resolved_path
325 |
326 |
327 | def expand_paths(paths: list[str], extensions: Optional[set[str]] = None) -> list[str]:
328 | """
329 | Expand paths to individual files, handling both files and directories.
330 |
331 | This function recursively walks directories to find all matching files.
332 | It automatically filters out hidden files and common non-code directories
333 | like __pycache__ to avoid including generated or system files.
334 |
335 | Args:
336 | paths: List of file or directory paths (must be absolute)
337 | extensions: Optional set of file extensions to include (defaults to CODE_EXTENSIONS)
338 |
339 | Returns:
340 | List of individual file paths, sorted for consistent ordering
341 | """
342 | if extensions is None:
343 | extensions = CODE_EXTENSIONS
344 |
345 | expanded_files = []
346 | seen = set()
347 |
348 | for path in paths:
349 | try:
350 | # Validate each path for security before processing
351 | path_obj = resolve_and_validate_path(path)
352 | except (ValueError, PermissionError):
353 | # Skip invalid paths silently to allow partial success
354 | continue
355 |
356 | if not path_obj.exists():
357 | continue
358 |
359 | # Safety checks for directory scanning
360 | if path_obj.is_dir():
361 | # Check 1: Prevent scanning user's home directory root
362 | if is_home_directory_root(path_obj):
363 | logger.warning(f"Skipping home directory root: {path}. Please specify a project subdirectory instead.")
364 | continue
365 |
366 | # Check 2: Skip if this is the MCP's own directory
367 | if is_mcp_directory(path_obj):
368 | logger.info(
369 | f"Skipping MCP server directory: {path}. The MCP server code is excluded from project scans."
370 | )
371 | continue
372 |
373 | if path_obj.is_file():
374 | # Add file directly
375 | if str(path_obj) not in seen:
376 | expanded_files.append(str(path_obj))
377 | seen.add(str(path_obj))
378 |
379 | elif path_obj.is_dir():
380 | # Walk directory recursively to find all files
381 | for root, dirs, files in os.walk(path_obj):
382 | # Filter directories in-place to skip hidden and excluded directories
383 | # This prevents descending into .git, .venv, __pycache__, node_modules, etc.
384 | original_dirs = dirs[:]
385 | dirs[:] = []
386 | for d in original_dirs:
387 | # Skip hidden directories
388 | if d.startswith("."):
389 | continue
390 | # Skip excluded directories
391 | if d in EXCLUDED_DIRS:
392 | continue
393 | # Skip MCP directories found during traversal
394 | dir_path = Path(root) / d
395 | if is_mcp_directory(dir_path):
396 | logger.debug(f"Skipping MCP directory during traversal: {dir_path}")
397 | continue
398 | dirs.append(d)
399 |
400 | for file in files:
401 | # Skip hidden files (e.g., .DS_Store, .gitignore)
402 | if file.startswith("."):
403 | continue
404 |
405 | file_path = Path(root) / file
406 |
407 | # Filter by extension if specified
408 | if not extensions or file_path.suffix.lower() in extensions:
409 | full_path = str(file_path)
410 | # Use set to prevent duplicates
411 | if full_path not in seen:
412 | expanded_files.append(full_path)
413 | seen.add(full_path)
414 |
415 | # Sort for consistent ordering across different runs
416 | # This makes output predictable and easier to debug
417 | expanded_files.sort()
418 | return expanded_files
419 |
420 |
421 | def read_file_content(
422 | file_path: str, max_size: int = 1_000_000, *, include_line_numbers: Optional[bool] = None
423 | ) -> tuple[str, int]:
424 | """
425 | Read a single file and format it for inclusion in AI prompts.
426 |
427 | This function handles various error conditions gracefully and always
428 | returns formatted content, even for errors. This ensures the AI model
429 | gets context about what files were attempted but couldn't be read.
430 |
431 | Args:
432 | file_path: Path to file (must be absolute)
433 | max_size: Maximum file size to read (default 1MB to prevent memory issues)
434 | include_line_numbers: Whether to add line numbers. If None, auto-detects based on file type
435 |
436 | Returns:
437 | Tuple of (formatted_content, estimated_tokens)
438 | Content is wrapped with clear delimiters for AI parsing
439 | """
440 | logger.debug(f"[FILES] read_file_content called for: {file_path}")
441 | try:
442 | # Validate path security before any file operations
443 | path = resolve_and_validate_path(file_path)
444 | logger.debug(f"[FILES] Path validated and resolved: {path}")
445 | except (ValueError, PermissionError) as e:
446 | # Return error in a format that provides context to the AI
447 | logger.debug(f"[FILES] Path validation failed for {file_path}: {type(e).__name__}: {e}")
448 | error_msg = str(e)
449 | content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {error_msg}\n--- END FILE ---\n"
450 | tokens = estimate_tokens(content)
451 | logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
452 | return content, tokens
453 |
454 | try:
455 | # Validate file existence and type
456 | if not path.exists():
457 | logger.debug(f"[FILES] File does not exist: {file_path}")
458 | content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
459 | return content, estimate_tokens(content)
460 |
461 | if not path.is_file():
462 | logger.debug(f"[FILES] Path is not a file: {file_path}")
463 | content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
464 | return content, estimate_tokens(content)
465 |
466 | # Check file size to prevent memory exhaustion
467 | stat_result = path.stat()
468 | file_size = stat_result.st_size
469 | logger.debug(f"[FILES] File size for {file_path}: {file_size:,} bytes")
470 | if file_size > max_size:
471 | logger.debug(f"[FILES] File too large: {file_path} ({file_size:,} > {max_size:,} bytes)")
472 | modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
473 | content = (
474 | f"\n--- FILE TOO LARGE: {file_path} (Last modified: {modified_at}) ---\n"
475 | f"File size: {file_size:,} bytes (max: {max_size:,})\n"
476 | "--- END FILE ---\n"
477 | )
478 | return content, estimate_tokens(content)
479 |
480 | # Determine if we should add line numbers
481 | add_line_numbers = should_add_line_numbers(file_path, include_line_numbers)
482 | logger.debug(f"[FILES] Line numbers for {file_path}: {'enabled' if add_line_numbers else 'disabled'}")
483 |
484 | # Read the file with UTF-8 encoding, replacing invalid characters
485 | # This ensures we can handle files with mixed encodings
486 | logger.debug(f"[FILES] Reading file content for {file_path}")
487 | with open(path, encoding="utf-8", errors="replace") as f:
488 | file_content = f.read()
489 |
490 | logger.debug(f"[FILES] Successfully read {len(file_content)} characters from {file_path}")
491 |
492 | # Add line numbers if requested or auto-detected
493 | if add_line_numbers:
494 | file_content = _add_line_numbers(file_content)
495 | logger.debug(f"[FILES] Added line numbers to {file_path}")
496 | else:
497 | # Still normalize line endings for consistency
498 | file_content = _normalize_line_endings(file_content)
499 |
500 | # Format with clear delimiters that help the AI understand file boundaries
501 | # Using consistent markers makes it easier for the model to parse
502 | # NOTE: These markers ("--- BEGIN FILE: ... ---") are distinct from git diff markers
503 | # ("--- BEGIN DIFF: ... ---") to allow AI to distinguish between complete file content
504 | # vs. partial diff content when files appear in both sections
505 | modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
506 | formatted = (
507 | f"\n--- BEGIN FILE: {file_path} (Last modified: {modified_at}) ---\n"
508 | f"{file_content}\n"
509 | f"--- END FILE: {file_path} ---\n"
510 | )
511 | tokens = estimate_tokens(formatted)
512 | logger.debug(f"[FILES] Formatted content for {file_path}: {len(formatted)} chars, {tokens} tokens")
513 | return formatted, tokens
514 |
515 | except Exception as e:
516 | logger.debug(f"[FILES] Exception reading file {file_path}: {type(e).__name__}: {e}")
517 | content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
518 | tokens = estimate_tokens(content)
519 | logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
520 | return content, tokens
521 |
522 |
523 | def read_files(
524 | file_paths: list[str],
525 | code: Optional[str] = None,
526 | max_tokens: Optional[int] = None,
527 | reserve_tokens: int = 50_000,
528 | *,
529 | include_line_numbers: bool = False,
530 | ) -> str:
531 | """
532 | Read multiple files and optional direct code with smart token management.
533 |
534 | This function implements intelligent token budgeting to maximize the amount
535 | of relevant content that can be included in an AI prompt while staying
536 | within token limits. It prioritizes direct code and reads files until
537 | the token budget is exhausted.
538 |
539 | Args:
540 | file_paths: List of file or directory paths (absolute paths required)
541 | code: Optional direct code to include (prioritized over files)
542 | max_tokens: Maximum tokens to use (defaults to DEFAULT_CONTEXT_WINDOW)
543 | reserve_tokens: Tokens to reserve for prompt and response (default 50K)
544 | include_line_numbers: Whether to add line numbers to file content
545 |
546 | Returns:
547 | str: All file contents formatted for AI consumption
548 | """
549 | if max_tokens is None:
550 | max_tokens = DEFAULT_CONTEXT_WINDOW
551 |
552 | logger.debug(f"[FILES] read_files called with {len(file_paths)} paths")
553 | logger.debug(
554 | f"[FILES] Token budget: max={max_tokens:,}, reserve={reserve_tokens:,}, available={max_tokens - reserve_tokens:,}"
555 | )
556 |
557 | content_parts = []
558 | total_tokens = 0
559 | available_tokens = max_tokens - reserve_tokens
560 |
561 | files_skipped = []
562 |
563 | # Priority 1: Handle direct code if provided
564 | # Direct code is prioritized because it's explicitly provided by the user
565 | if code:
566 | formatted_code = f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n"
567 | code_tokens = estimate_tokens(formatted_code)
568 |
569 | if code_tokens <= available_tokens:
570 | content_parts.append(formatted_code)
571 | total_tokens += code_tokens
572 | available_tokens -= code_tokens
573 |
574 | # Priority 2: Process file paths
575 | if file_paths:
576 | # Expand directories to get all individual files
577 | logger.debug(f"[FILES] Expanding {len(file_paths)} file paths")
578 | all_files = expand_paths(file_paths)
579 | logger.debug(f"[FILES] After expansion: {len(all_files)} individual files")
580 |
581 | if not all_files and file_paths:
582 | # No files found but paths were provided
583 | logger.debug("[FILES] No files found from provided paths")
584 | content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n")
585 | else:
586 | # Read files sequentially until token limit is reached
587 | logger.debug(f"[FILES] Reading {len(all_files)} files with token budget {available_tokens:,}")
588 | for i, file_path in enumerate(all_files):
589 | if total_tokens >= available_tokens:
590 | logger.debug(f"[FILES] Token budget exhausted, skipping remaining {len(all_files) - i} files")
591 | files_skipped.extend(all_files[i:])
592 | break
593 |
594 | file_content, file_tokens = read_file_content(file_path, include_line_numbers=include_line_numbers)
595 | logger.debug(f"[FILES] File {file_path}: {file_tokens:,} tokens")
596 |
597 | # Check if adding this file would exceed limit
598 | if total_tokens + file_tokens <= available_tokens:
599 | content_parts.append(file_content)
600 | total_tokens += file_tokens
601 | logger.debug(f"[FILES] Added file {file_path}, total tokens: {total_tokens:,}")
602 | else:
603 | # File too large for remaining budget
604 | logger.debug(
605 | f"[FILES] File {file_path} too large for remaining budget ({file_tokens:,} tokens, {available_tokens - total_tokens:,} remaining)"
606 | )
607 | files_skipped.append(file_path)
608 |
609 | # Add informative note about skipped files to help users understand
610 | # what was omitted and why
611 | if files_skipped:
612 | logger.debug(f"[FILES] {len(files_skipped)} files skipped due to token limits")
613 | skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
614 | skip_note += f"Total skipped: {len(files_skipped)}\n"
615 | # Show first 10 skipped files as examples
616 | for _i, file_path in enumerate(files_skipped[:10]):
617 | skip_note += f" - {file_path}\n"
618 | if len(files_skipped) > 10:
619 | skip_note += f" ... and {len(files_skipped) - 10} more\n"
620 | skip_note += "--- END SKIPPED FILES ---\n"
621 | content_parts.append(skip_note)
622 |
623 | result = "\n\n".join(content_parts) if content_parts else ""
624 | logger.debug(f"[FILES] read_files complete: {len(result)} chars, {total_tokens:,} tokens used")
625 | return result
626 |
627 |
628 | def estimate_file_tokens(file_path: str) -> int:
629 | """
630 | Estimate tokens for a file using file-type aware ratios.
631 |
632 | Args:
633 | file_path: Path to the file
634 |
635 | Returns:
636 | Estimated token count for the file
637 | """
638 | try:
639 | if not os.path.exists(file_path) or not os.path.isfile(file_path):
640 | return 0
641 |
642 | file_size = os.path.getsize(file_path)
643 |
644 | # Get the appropriate ratio for this file type
645 | from .file_types import get_token_estimation_ratio
646 |
647 | ratio = get_token_estimation_ratio(file_path)
648 |
649 | return int(file_size / ratio)
650 | except Exception:
651 | return 0
652 |
653 |
654 | def check_files_size_limit(files: list[str], max_tokens: int, threshold_percent: float = 1.0) -> tuple[bool, int, int]:
655 | """
656 | Check if a list of files would exceed token limits.
657 |
658 | Args:
659 | files: List of file paths to check
660 | max_tokens: Maximum allowed tokens
661 | threshold_percent: Percentage of max_tokens to use as threshold (0.0-1.0)
662 |
663 | Returns:
664 | Tuple of (within_limit, total_estimated_tokens, file_count)
665 | """
666 | if not files:
667 | return True, 0, 0
668 |
669 | total_estimated_tokens = 0
670 | file_count = 0
671 | threshold = int(max_tokens * threshold_percent)
672 |
673 | for file_path in files:
674 | try:
675 | estimated_tokens = estimate_file_tokens(file_path)
676 | total_estimated_tokens += estimated_tokens
677 | if estimated_tokens > 0: # Only count accessible files
678 | file_count += 1
679 | except Exception:
680 | # Skip files that can't be accessed for size check
681 | continue
682 |
683 | within_limit = total_estimated_tokens <= threshold
684 | return within_limit, total_estimated_tokens, file_count
685 |
686 |
687 | def read_json_file(file_path: str) -> Optional[dict]:
688 | """
689 | Read and parse a JSON file with proper error handling.
690 |
691 | Args:
692 | file_path: Path to the JSON file
693 |
694 | Returns:
695 | Parsed JSON data as dict, or None if file doesn't exist or invalid
696 | """
697 | try:
698 | if not os.path.exists(file_path):
699 | return None
700 |
701 | with open(file_path, encoding="utf-8") as f:
702 | return json.load(f)
703 | except (json.JSONDecodeError, OSError):
704 | return None
705 |
706 |
707 | def write_json_file(file_path: str, data: dict, indent: int = 2) -> bool:
708 | """
709 | Write data to a JSON file with proper formatting.
710 |
711 | Args:
712 | file_path: Path to write the JSON file
713 | data: Dictionary data to serialize
714 | indent: JSON indentation level
715 |
716 | Returns:
717 | True if successful, False otherwise
718 | """
719 | try:
720 | os.makedirs(os.path.dirname(file_path), exist_ok=True)
721 |
722 | with open(file_path, "w", encoding="utf-8") as f:
723 | json.dump(data, f, indent=indent, ensure_ascii=False)
724 | return True
725 | except (OSError, TypeError):
726 | return False
727 |
728 |
729 | def get_file_size(file_path: str) -> int:
730 | """
731 | Get file size in bytes with proper error handling.
732 |
733 | Args:
734 | file_path: Path to the file
735 |
736 | Returns:
737 | File size in bytes, or 0 if file doesn't exist or error
738 | """
739 | try:
740 | if os.path.exists(file_path) and os.path.isfile(file_path):
741 | return os.path.getsize(file_path)
742 | return 0
743 | except OSError:
744 | return 0
745 |
746 |
747 | def ensure_directory_exists(file_path: str) -> bool:
748 | """
749 | Ensure the parent directory of a file path exists.
750 |
751 | Args:
752 | file_path: Path to file (directory will be created for parent)
753 |
754 | Returns:
755 | True if directory exists or was created, False on error
756 | """
757 | try:
758 | directory = os.path.dirname(file_path)
759 | if directory:
760 | os.makedirs(directory, exist_ok=True)
761 | return True
762 | except OSError:
763 | return False
764 |
765 |
766 | def is_text_file(file_path: str) -> bool:
767 | """
768 | Check if a file is likely a text file based on extension and content.
769 |
770 | Args:
771 | file_path: Path to the file
772 |
773 | Returns:
774 | True if file appears to be text, False otherwise
775 | """
776 | from .file_types import is_text_file as check_text_type
777 |
778 | return check_text_type(file_path)
779 |
780 |
781 | def read_file_safely(file_path: str, max_size: int = 10 * 1024 * 1024) -> Optional[str]:
782 | """
783 | Read a file with size limits and encoding handling.
784 |
785 | Args:
786 | file_path: Path to the file
787 | max_size: Maximum file size in bytes (default 10MB)
788 |
789 | Returns:
790 | File content as string, or None if file too large or unreadable
791 | """
792 | try:
793 | if not os.path.exists(file_path) or not os.path.isfile(file_path):
794 | return None
795 |
796 | file_size = os.path.getsize(file_path)
797 | if file_size > max_size:
798 | return None
799 |
800 | with open(file_path, encoding="utf-8", errors="ignore") as f:
801 | return f.read()
802 | except OSError:
803 | return None
804 |
805 |
806 | def check_total_file_size(files: list[str], model_name: str) -> Optional[dict]:
807 | """
808 | Check if total file sizes would exceed token threshold before embedding.
809 |
810 | IMPORTANT: This performs STRICT REJECTION at MCP boundary.
811 | No partial inclusion - either all files fit or request is rejected.
812 | This forces the CLI to make better file selection decisions.
813 |
814 | This function MUST be called with the effective model name (after resolution).
815 | It should never receive 'auto' or None - model resolution happens earlier.
816 |
817 | Args:
818 | files: List of file paths to check
819 | model_name: The resolved model name for context-aware thresholds (required)
820 |
821 | Returns:
822 | Dict with `code_too_large` response if too large, None if acceptable
823 | """
824 | if not files:
825 | return None
826 |
827 | # Validate we have a proper model name (not auto or None)
828 | if not model_name or model_name.lower() == "auto":
829 | raise ValueError(
830 | f"check_total_file_size called with unresolved model: '{model_name}'. "
831 | "Model must be resolved before file size checking."
832 | )
833 |
834 | logger.info(f"File size check: Using model '{model_name}' for token limit calculation")
835 |
836 | from utils.model_context import ModelContext
837 |
838 | model_context = ModelContext(model_name)
839 | token_allocation = model_context.calculate_token_allocation()
840 |
841 | # Dynamic threshold based on model capacity
842 | context_window = token_allocation.total_tokens
843 | if context_window >= 1_000_000: # Gemini-class models
844 | threshold_percent = 0.8 # Can be more generous
845 | elif context_window >= 500_000: # Mid-range models
846 | threshold_percent = 0.7 # Moderate
847 | else: # OpenAI-class models (200K)
848 | threshold_percent = 0.6 # Conservative
849 |
850 | max_file_tokens = int(token_allocation.file_tokens * threshold_percent)
851 |
852 | # Use centralized file size checking (threshold already applied to max_file_tokens)
853 | within_limit, total_estimated_tokens, file_count = check_files_size_limit(files, max_file_tokens)
854 |
855 | if not within_limit:
856 | return {
857 | "status": "code_too_large",
858 | "content": (
859 | f"The selected files are too large for analysis "
860 | f"(estimated {total_estimated_tokens:,} tokens, limit {max_file_tokens:,}). "
861 | f"Please select fewer, more specific files that are most relevant "
862 | f"to your question, then invoke the tool again."
863 | ),
864 | "content_type": "text",
865 | "metadata": {
866 | "total_estimated_tokens": total_estimated_tokens,
867 | "limit": max_file_tokens,
868 | "file_count": file_count,
869 | "threshold_percent": threshold_percent,
870 | "model_context_window": context_window,
871 | "model_name": model_name,
872 | "instructions": "Reduce file selection and try again - all files must fit within budget. If this persists, please use a model with a larger context window where available.",
873 | },
874 | }
875 |
876 | return None # Proceed with ALL files
877 |
```