This is page 13 of 19. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/tools/analyze.py:
--------------------------------------------------------------------------------
```python
"""
AnalyzeWorkflow tool - Step-by-step code analysis with systematic investigation
This tool provides a structured workflow for comprehensive code and file analysis.
It guides the CLI agent through systematic investigation steps with forced pauses between each step
to ensure thorough code examination, pattern identification, and architectural assessment before proceeding.
The tool supports complex analysis scenarios including architectural review, performance analysis,
security assessment, and maintainability evaluation.
Key features:
- Step-by-step analysis workflow with progress tracking
- Context-aware file embedding (references during investigation, full content for analysis)
- Automatic pattern and insight tracking with categorization
- Expert analysis integration with external models
- Support for focused analysis (architecture, performance, security, quality)
- Confidence-based workflow optimization
"""
import logging
from typing import TYPE_CHECKING, Any, Literal, Optional
from pydantic import Field, model_validator
if TYPE_CHECKING:
from tools.models import ToolModelCategory
from config import TEMPERATURE_ANALYTICAL
from systemprompts import ANALYZE_PROMPT
from tools.shared.base_models import WorkflowRequest
from .workflow.base import WorkflowTool
logger = logging.getLogger(__name__)
# Tool-specific field descriptions for analyze workflow
ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS = {
"step": (
"The analysis plan. Step 1: State your strategy, including how you will map the codebase structure, "
"understand business logic, and assess code quality, performance implications, and architectural patterns. "
"Later steps: Report findings and adapt the approach as new insights emerge."
),
"step_number": (
"The index of the current step in the analysis sequence, beginning at 1. Each step should build upon or "
"revise the previous one."
),
"total_steps": (
"Your current estimate for how many steps will be needed to complete the analysis. "
"Adjust as new findings emerge."
),
"next_step_required": (
"Set to true if you plan to continue the investigation with another step. False means you believe the "
"analysis is complete and ready for expert validation."
),
"findings": (
"Summary of discoveries from this step, including architectural patterns, tech stack assessment, scalability characteristics, "
"performance implications, maintainability factors, and strategic improvement opportunities. "
"IMPORTANT: Document both strengths (good patterns, solid architecture) and concerns (tech debt, overengineering, unnecessary complexity). "
"In later steps, confirm or update past findings with additional evidence."
),
"files_checked": (
"List all files examined (absolute paths). Include even ruled-out files to track exploration path."
),
"relevant_files": (
"Subset of files_checked directly relevant to analysis findings (absolute paths). Include files with "
"significant patterns, architectural decisions, or strategic improvement opportunities."
),
"relevant_context": (
"List methods/functions central to analysis findings, in 'ClassName.methodName' or 'functionName' format. "
"Prioritize those demonstrating key patterns, architectural decisions, or improvement opportunities."
),
"images": (
"Optional absolute paths to architecture diagrams or visual references that help with analysis context."
),
"confidence": (
"Your confidence in the analysis: exploring, low, medium, high, very_high, almost_certain, or certain. "
"'certain' indicates the analysis is complete and ready for validation."
),
"analysis_type": "Type of analysis to perform (architecture, performance, security, quality, general)",
"output_format": "How to format the output (summary, detailed, actionable)",
}
class AnalyzeWorkflowRequest(WorkflowRequest):
"""Request model for analyze workflow investigation steps"""
# Required fields for each investigation step
step: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"])
step_number: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
total_steps: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
next_step_required: bool = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
# Investigation tracking fields
findings: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
files_checked: list[str] = Field(
default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
)
relevant_files: list[str] = Field(
default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
)
relevant_context: list[str] = Field(
default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
)
# Issues found during analysis (structured with severity)
issues_found: list[dict] = Field(
default_factory=list,
description="Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
)
# Optional images for visual context
images: Optional[list[str]] = Field(default=None, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"])
# Analyze-specific fields (only used in step 1 to initialize)
# Note: Use relevant_files field instead of files for consistency across workflow tools
analysis_type: Optional[Literal["architecture", "performance", "security", "quality", "general"]] = Field(
"general", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"]
)
output_format: Optional[Literal["summary", "detailed", "actionable"]] = Field(
"detailed", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"]
)
# Keep thinking_mode from original analyze tool; temperature is inherited from WorkflowRequest
@model_validator(mode="after")
def validate_step_one_requirements(self):
"""Ensure step 1 has required relevant_files."""
if self.step_number == 1:
if not self.relevant_files:
raise ValueError("Step 1 requires 'relevant_files' field to specify files or directories to analyze")
return self
class AnalyzeTool(WorkflowTool):
"""
Analyze workflow tool for step-by-step code analysis and expert validation.
This tool implements a structured analysis workflow that guides users through
methodical investigation steps, ensuring thorough code examination, pattern identification,
and architectural assessment before reaching conclusions. It supports complex analysis scenarios
including architectural review, performance analysis, security assessment, and maintainability evaluation.
"""
def __init__(self):
super().__init__()
self.initial_request = None
self.analysis_config = {}
def get_name(self) -> str:
return "analyze"
def get_description(self) -> str:
return (
"Performs comprehensive code analysis with systematic investigation and expert validation. "
"Use for architecture, performance, maintainability, and pattern analysis. "
"Guides through structured code review and strategic planning."
)
def get_system_prompt(self) -> str:
return ANALYZE_PROMPT
def get_default_temperature(self) -> float:
return TEMPERATURE_ANALYTICAL
def get_model_category(self) -> "ToolModelCategory":
"""Analyze workflow requires thorough analysis and reasoning"""
from tools.models import ToolModelCategory
return ToolModelCategory.EXTENDED_REASONING
def get_workflow_request_model(self):
"""Return the analyze workflow-specific request model."""
return AnalyzeWorkflowRequest
def get_input_schema(self) -> dict[str, Any]:
"""Generate input schema using WorkflowSchemaBuilder with analyze-specific overrides."""
from .workflow.schema_builders import WorkflowSchemaBuilder
# Fields to exclude from analyze workflow (inherited from WorkflowRequest but not used)
excluded_fields = {"hypothesis", "confidence"}
# Analyze workflow-specific field overrides
analyze_field_overrides = {
"step": {
"type": "string",
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"],
},
"step_number": {
"type": "integer",
"minimum": 1,
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
},
"total_steps": {
"type": "integer",
"minimum": 1,
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
},
"next_step_required": {
"type": "boolean",
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
},
"findings": {
"type": "string",
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
},
"files_checked": {
"type": "array",
"items": {"type": "string"},
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"],
},
"relevant_files": {
"type": "array",
"items": {"type": "string"},
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
},
"confidence": {
"type": "string",
"enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["confidence"],
},
"images": {
"type": "array",
"items": {"type": "string"},
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"],
},
"issues_found": {
"type": "array",
"items": {"type": "object"},
"description": "Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
},
"analysis_type": {
"type": "string",
"enum": ["architecture", "performance", "security", "quality", "general"],
"default": "general",
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"],
},
"output_format": {
"type": "string",
"enum": ["summary", "detailed", "actionable"],
"default": "detailed",
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"],
},
}
# Use WorkflowSchemaBuilder with analyze-specific tool fields
return WorkflowSchemaBuilder.build_schema(
tool_specific_fields=analyze_field_overrides,
model_field_schema=self.get_model_field_schema(),
auto_mode=self.is_effective_auto_mode(),
tool_name=self.get_name(),
excluded_workflow_fields=list(excluded_fields),
)
def get_required_actions(
self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
) -> list[str]:
"""Define required actions for each investigation phase."""
if step_number == 1:
# Initial analysis investigation tasks
return [
"Read and understand the code files specified for analysis",
"Map the tech stack, frameworks, and overall architecture",
"Identify the main components, modules, and their relationships",
"Understand the business logic and intended functionality",
"Examine architectural patterns and design decisions used",
"Look for strengths, risks, and strategic improvement areas",
]
elif step_number < total_steps:
# Need deeper investigation
return [
"Examine specific architectural patterns and design decisions in detail",
"Analyze scalability characteristics and performance implications",
"Assess maintainability factors: module cohesion, coupling, tech debt",
"Identify security posture and potential systemic vulnerabilities",
"Look for overengineering, unnecessary complexity, or missing abstractions",
"Evaluate how well the architecture serves business and scaling goals",
]
else:
# Close to completion - need final verification
return [
"Verify all significant architectural insights have been documented",
"Confirm strategic improvement opportunities are comprehensively captured",
"Ensure both strengths and risks are properly identified with evidence",
"Validate that findings align with the analysis type and goals specified",
"Check that recommendations are actionable and proportional to the codebase",
"Confirm the analysis provides clear guidance for strategic decisions",
]
def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
"""
Always call expert analysis for comprehensive validation.
Analysis benefits from a second opinion to ensure completeness.
"""
# Check if user explicitly requested to skip assistant model
if request and not self.get_request_use_assistant_model(request):
return False
# For analysis, we always want expert validation if we have any meaningful data
return len(consolidated_findings.relevant_files) > 0 or len(consolidated_findings.findings) >= 1
def prepare_expert_analysis_context(self, consolidated_findings) -> str:
"""Prepare context for external model call for final analysis validation."""
context_parts = [
f"=== ANALYSIS REQUEST ===\\n{self.initial_request or 'Code analysis workflow initiated'}\\n=== END REQUEST ==="
]
# Add investigation summary
investigation_summary = self._build_analysis_summary(consolidated_findings)
context_parts.append(
f"\\n=== AGENT'S ANALYSIS INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
)
# Add analysis configuration context if available
if self.analysis_config:
config_text = "\\n".join(f"- {key}: {value}" for key, value in self.analysis_config.items() if value)
context_parts.append(f"\\n=== ANALYSIS CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")
# Add relevant code elements if available
if consolidated_findings.relevant_context:
methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")
# Add assessment evolution if available
if consolidated_findings.hypotheses:
assessments_text = "\\n".join(
f"Step {h['step']}: {h['hypothesis']}" for h in consolidated_findings.hypotheses
)
context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")
# Add images if available
if consolidated_findings.images:
images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
context_parts.append(
f"\\n=== VISUAL ANALYSIS INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
)
return "\\n".join(context_parts)
def _build_analysis_summary(self, consolidated_findings) -> str:
"""Prepare a comprehensive summary of the analysis investigation."""
summary_parts = [
"=== SYSTEMATIC ANALYSIS INVESTIGATION SUMMARY ===",
f"Total steps: {len(consolidated_findings.findings)}",
f"Files examined: {len(consolidated_findings.files_checked)}",
f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
"",
"=== INVESTIGATION PROGRESSION ===",
]
for finding in consolidated_findings.findings:
summary_parts.append(finding)
return "\\n".join(summary_parts)
def should_include_files_in_expert_prompt(self) -> bool:
"""Include files in expert analysis for comprehensive validation."""
return True
def should_embed_system_prompt(self) -> bool:
"""Embed system prompt in expert analysis for proper context."""
return True
def get_expert_thinking_mode(self) -> str:
"""Use high thinking mode for thorough analysis."""
return "high"
def get_expert_analysis_instruction(self) -> str:
"""Get specific instruction for analysis expert validation."""
return (
"Please provide comprehensive analysis validation based on the investigation findings. "
"Focus on identifying any remaining architectural insights, validating the completeness of the analysis, "
"and providing final strategic recommendations following the structured format specified in the system prompt."
)
# Hook method overrides for analyze-specific behavior
def prepare_step_data(self, request) -> dict:
"""
Map analyze-specific fields for internal processing.
"""
step_data = {
"step": request.step,
"step_number": request.step_number,
"findings": request.findings,
"files_checked": request.files_checked,
"relevant_files": request.relevant_files,
"relevant_context": request.relevant_context,
"issues_found": request.issues_found, # Analyze workflow uses issues_found for structured problem tracking
"confidence": "medium", # Fixed value for workflow compatibility
"hypothesis": request.findings, # Map findings to hypothesis for compatibility
"images": request.images or [],
}
return step_data
def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
"""
Analyze workflow always uses expert analysis for comprehensive validation.
Analysis benefits from a second opinion to ensure completeness and catch
any missed insights or alternative perspectives.
"""
return False
def store_initial_issue(self, step_description: str):
"""Store initial request for expert analysis."""
self.initial_request = step_description
# Override inheritance hooks for analyze-specific behavior
def get_completion_status(self) -> str:
"""Analyze tools use analysis-specific status."""
return "analysis_complete_ready_for_implementation"
def get_completion_data_key(self) -> str:
"""Analyze uses 'complete_analysis' key."""
return "complete_analysis"
def get_final_analysis_from_request(self, request):
"""Analyze tools use 'findings' field."""
return request.findings
def get_confidence_level(self, request) -> str:
"""Analyze tools use fixed confidence for consistency."""
return "medium"
def get_completion_message(self) -> str:
"""Analyze-specific completion message."""
return (
"Analysis complete. You have identified all significant patterns, "
"architectural insights, and strategic opportunities. MANDATORY: Present the user with the complete "
"analysis results organized by strategic impact, and IMMEDIATELY proceed with implementing the "
"highest priority recommendations or provide specific guidance for improvements. Focus on actionable "
"strategic insights."
)
def get_skip_reason(self) -> str:
"""Analyze-specific skip reason."""
return "Completed comprehensive analysis locally"
def get_skip_expert_analysis_status(self) -> str:
"""Analyze-specific expert analysis skip status."""
return "skipped_due_to_complete_analysis"
def prepare_work_summary(self) -> str:
"""Analyze-specific work summary."""
return self._build_analysis_summary(self.consolidated_findings)
def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
"""
Analyze-specific completion message.
"""
base_message = (
"ANALYSIS IS COMPLETE. You MUST now summarize and present ALL analysis findings organized by "
"strategic impact (Critical → High → Medium → Low), specific architectural insights with code references, "
"and exact recommendations for improvement. Clearly prioritize the top 3 strategic opportunities that need "
"immediate attention. Provide concrete, actionable guidance for each finding—make it easy for a developer "
"to understand exactly what strategic improvements to implement and how to approach them."
)
# Add expert analysis guidance only when expert analysis was actually used
if expert_analysis_used:
expert_guidance = self.get_expert_analysis_guidance()
if expert_guidance:
return f"{base_message}\n\n{expert_guidance}"
return base_message
def get_expert_analysis_guidance(self) -> str:
"""
Provide specific guidance for handling expert analysis in code analysis.
"""
return (
"IMPORTANT: Analysis from an assistant model has been provided above. You MUST thoughtfully evaluate and validate "
"the expert insights rather than treating them as definitive conclusions. Cross-reference the expert "
"analysis with your own systematic investigation, verify that architectural recommendations are "
"appropriate for this codebase's scale and context, and ensure suggested improvements align with "
"the project's goals and constraints. Present a comprehensive synthesis that combines your detailed "
"analysis with validated expert perspectives, clearly distinguishing between patterns you've "
"independently identified and additional strategic insights from expert validation."
)
def get_step_guidance_message(self, request) -> str:
"""
Analyze-specific step guidance with detailed investigation instructions.
"""
step_guidance = self.get_analyze_step_guidance(request.step_number, request)
return step_guidance["next_steps"]
def get_analyze_step_guidance(self, step_number: int, request) -> dict[str, Any]:
"""
Provide step-specific guidance for analyze workflow.
"""
# Generate the next steps instruction based on required actions
required_actions = self.get_required_actions(step_number, "medium", request.findings, request.total_steps)
if step_number == 1:
next_steps = (
f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
f"the code files thoroughly using appropriate tools. CRITICAL AWARENESS: You need to understand "
f"the architectural patterns, assess scalability and performance characteristics, identify strategic "
f"improvement areas, and look for systemic risks, overengineering, and missing abstractions. "
f"Use file reading tools, code analysis, and systematic examination to gather comprehensive information. "
f"Only call {self.get_name()} again AFTER completing your investigation. When you call "
f"{self.get_name()} next time, use step_number: {step_number + 1} and report specific "
f"files examined, architectural insights found, and strategic assessment discoveries."
)
elif step_number < request.total_steps:
next_steps = (
f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
f"deeper analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
+ "completing these analysis tasks."
)
else:
next_steps = (
f"WAIT! Your analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\\n\\nREMEMBER: Ensure you have identified all significant architectural insights and strategic "
f"opportunities across all areas. Document findings with specific file references and "
f"code examples where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
)
return {"next_steps": next_steps}
def customize_workflow_response(self, response_data: dict, request) -> dict:
"""
Customize response to match analyze workflow format.
"""
# Store initial request on first step
if request.step_number == 1:
self.initial_request = request.step
# Store analysis configuration for expert analysis
if request.relevant_files:
self.analysis_config = {
"relevant_files": request.relevant_files,
"analysis_type": request.analysis_type,
"output_format": request.output_format,
}
# Convert generic status names to analyze-specific ones
tool_name = self.get_name()
status_mapping = {
f"{tool_name}_in_progress": "analysis_in_progress",
f"pause_for_{tool_name}": "pause_for_analysis",
f"{tool_name}_required": "analysis_required",
f"{tool_name}_complete": "analysis_complete",
}
if response_data["status"] in status_mapping:
response_data["status"] = status_mapping[response_data["status"]]
# Rename status field to match analyze workflow
if f"{tool_name}_status" in response_data:
response_data["analysis_status"] = response_data.pop(f"{tool_name}_status")
# Add analyze-specific status fields
response_data["analysis_status"]["insights_by_severity"] = {}
for insight in self.consolidated_findings.issues_found:
severity = insight.get("severity", "unknown")
if severity not in response_data["analysis_status"]["insights_by_severity"]:
response_data["analysis_status"]["insights_by_severity"][severity] = 0
response_data["analysis_status"]["insights_by_severity"][severity] += 1
response_data["analysis_status"]["analysis_confidence"] = self.get_request_confidence(request)
# Map complete_analyze to complete_analysis
if f"complete_{tool_name}" in response_data:
response_data["complete_analysis"] = response_data.pop(f"complete_{tool_name}")
# Map the completion flag to match analyze workflow
if f"{tool_name}_complete" in response_data:
response_data["analysis_complete"] = response_data.pop(f"{tool_name}_complete")
return response_data
# Required abstract methods from BaseTool
def get_request_model(self):
"""Return the analyze workflow-specific request model."""
return AnalyzeWorkflowRequest
async def prepare_prompt(self, request) -> str:
"""Not used - workflow tools use execute_workflow()."""
return "" # Workflow tools use execute_workflow() directly
```
--------------------------------------------------------------------------------
/simulator_tests/test_planner_validation.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
PlannerWorkflow Tool Validation Test
Tests the planner tool's capabilities using the new workflow architecture.
This validates that the new workflow-based implementation maintains all the
functionality of the original planner tool while using the workflow pattern
like the debug tool.
"""
import json
from typing import Optional
from .conversation_base_test import ConversationBaseTest
class PlannerValidationTest(ConversationBaseTest):
"""Test planner tool with new workflow architecture"""
@property
def test_name(self) -> str:
return "planner_validation"
@property
def test_description(self) -> str:
return "PlannerWorkflow tool validation with new workflow architecture"
def run_test(self) -> bool:
"""Test planner tool capabilities"""
# Set up the test environment
self.setUp()
try:
self.logger.info("Test: PlannerWorkflow tool validation (new architecture)")
# Test 1: Single planning session with workflow architecture
if not self._test_single_planning_session():
return False
# Test 2: Planning with continuation using workflow
if not self._test_planning_with_continuation():
return False
# Test 3: Complex plan with deep thinking pauses
if not self._test_complex_plan_deep_thinking():
return False
# Test 4: Self-contained completion (no expert analysis)
if not self._test_self_contained_completion():
return False
# Test 5: Branching and revision with workflow
if not self._test_branching_and_revision():
return False
# Test 6: Workflow file context behavior
if not self._test_workflow_file_context():
return False
self.logger.info(" ✅ All planner validation tests passed")
return True
except Exception as e:
self.logger.error(f"PlannerWorkflow validation test failed: {e}")
return False
def _test_single_planning_session(self) -> bool:
"""Test a complete planning session with workflow architecture"""
try:
self.logger.info(" 1.1: Testing single planning session with workflow")
# Step 1: Start planning
self.logger.info(" 1.1.1: Step 1 - Initial planning step")
response1, continuation_id = self.call_mcp_tool(
"planner",
{
"step": "I need to plan a comprehensive API redesign for our legacy system. Let me start by analyzing the current state and identifying key requirements for the new API architecture.",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"model": "flash",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to get initial planning response")
return False
# Parse and validate JSON response
response1_data = self._parse_planner_response(response1)
if not response1_data:
return False
# Validate step 1 response structure - expect pause_for_planner for next_step_required=True
if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_planner"):
return False
# Debug: Log the actual response structure to see what we're getting
self.logger.debug(f"Response structure: {list(response1_data.keys())}")
# Check workflow-specific response structure (more flexible)
status_key = None
for key in response1_data.keys():
if key.endswith("_status"):
status_key = key
break
if not status_key:
self.logger.error(f"Missing workflow status field in response: {list(response1_data.keys())}")
return False
self.logger.debug(f"Found status field: {status_key}")
# Check required_actions for workflow guidance
if not response1_data.get("required_actions"):
self.logger.error("Missing required_actions in workflow response")
return False
self.logger.info(f" ✅ Step 1 successful with workflow, continuation_id: {continuation_id}")
# Step 2: Continue planning
self.logger.info(" 1.1.2: Step 2 - API domain analysis")
response2, _ = self.call_mcp_tool(
"planner",
{
"step": "After analyzing the current API, I can identify three main domains: User Management, Content Management, and Analytics. Let me design the new API structure with RESTful endpoints and proper versioning.",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"continuation_id": continuation_id,
"model": "flash",
},
)
if not response2:
self.logger.error("Failed to continue planning to step 2")
return False
response2_data = self._parse_planner_response(response2)
if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_planner"):
return False
# Check step history tracking in workflow (more flexible)
status_key = None
for key in response2_data.keys():
if key.endswith("_status"):
status_key = key
break
if status_key:
workflow_status = response2_data.get(status_key, {})
step_history_length = workflow_status.get("step_history_length", 0)
if step_history_length < 2:
self.logger.error(f"Step history not properly tracked in workflow: {step_history_length}")
return False
self.logger.debug(f"Step history length: {step_history_length}")
else:
self.logger.warning("No workflow status found, skipping step history check")
self.logger.info(" ✅ Step 2 successful with workflow tracking")
# Step 3: Final step - should trigger completion
self.logger.info(" 1.1.3: Step 3 - Final planning step")
response3, _ = self.call_mcp_tool(
"planner",
{
"step": "API redesign plan complete: Phase 1 - User Management API, Phase 2 - Content Management API, Phase 3 - Analytics API. Each phase includes proper authentication, rate limiting, and comprehensive documentation.",
"step_number": 3,
"total_steps": 3, # Adjusted total
"next_step_required": False, # Final step - should complete without expert analysis
"continuation_id": continuation_id,
"model": "flash",
},
)
if not response3:
self.logger.error("Failed to complete planning session")
return False
response3_data = self._parse_planner_response(response3)
if not response3_data:
return False
# Validate final response structure - should be self-contained completion
if response3_data.get("status") != "planner_complete":
self.logger.error(f"Expected status 'planner_complete', got '{response3_data.get('status')}'")
return False
if not response3_data.get("planning_complete"):
self.logger.error("Expected planning_complete=true for final step")
return False
# Should NOT have expert_analysis (self-contained)
if "expert_analysis" in response3_data:
self.logger.error("PlannerWorkflow should be self-contained without expert analysis")
return False
# Check plan_summary exists
if not response3_data.get("plan_summary"):
self.logger.error("Missing plan_summary in final step")
return False
self.logger.info(" ✅ Planning session completed successfully with workflow architecture")
# Store continuation_id for next test
self.api_continuation_id = continuation_id
return True
except Exception as e:
self.logger.error(f"Single planning session test failed: {e}")
return False
def _test_planning_with_continuation(self) -> bool:
"""Test planning continuation with workflow architecture"""
try:
self.logger.info(" 1.2: Testing planning continuation with workflow")
# Use continuation from previous test if available
continuation_id = getattr(self, "api_continuation_id", None)
if not continuation_id:
# Start fresh if no continuation available
self.logger.info(" 1.2.0: Starting fresh planning session")
response0, continuation_id = self.call_mcp_tool(
"planner",
{
"step": "Planning API security strategy",
"step_number": 1,
"total_steps": 2,
"next_step_required": True,
"model": "flash",
},
)
if not response0 or not continuation_id:
self.logger.error("Failed to start fresh planning session")
return False
# Test continuation step
self.logger.info(" 1.2.1: Continue planning session")
response1, _ = self.call_mcp_tool(
"planner",
{
"step": "Building on the API redesign, let me now plan the security implementation with OAuth 2.0, API keys, and rate limiting strategies.",
"step_number": 2,
"total_steps": 2,
"next_step_required": True,
"continuation_id": continuation_id,
"model": "flash",
},
)
if not response1:
self.logger.error("Failed to continue planning")
return False
response1_data = self._parse_planner_response(response1)
if not response1_data:
return False
# Validate continuation behavior
if not self._validate_step_response(response1_data, 2, 2, True, "pause_for_planner"):
return False
# Check that continuation_id is preserved
if response1_data.get("continuation_id") != continuation_id:
self.logger.error("Continuation ID not preserved in workflow")
return False
self.logger.info(" ✅ Planning continuation working with workflow")
return True
except Exception as e:
self.logger.error(f"Planning continuation test failed: {e}")
return False
def _test_complex_plan_deep_thinking(self) -> bool:
"""Test complex plan with deep thinking pauses"""
try:
self.logger.info(" 1.3: Testing complex plan with deep thinking pauses")
# Start complex plan (≥5 steps) - should trigger deep thinking
self.logger.info(" 1.3.1: Step 1 of complex plan (should trigger deep thinking)")
response1, continuation_id = self.call_mcp_tool(
"planner",
{
"step": "I need to plan a complete digital transformation for our enterprise organization, including cloud migration, process automation, and cultural change management.",
"step_number": 1,
"total_steps": 8, # Complex plan ≥5 steps
"next_step_required": True,
"model": "flash",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start complex planning")
return False
response1_data = self._parse_planner_response(response1)
if not response1_data:
return False
# Should trigger deep thinking pause for complex plan
if response1_data.get("status") != "pause_for_deep_thinking":
self.logger.error("Expected deep thinking pause for complex plan step 1")
return False
if not response1_data.get("thinking_required"):
self.logger.error("Expected thinking_required=true for complex plan")
return False
# Check required thinking actions
required_thinking = response1_data.get("required_thinking", [])
if len(required_thinking) < 4:
self.logger.error("Expected comprehensive thinking requirements for complex plan")
return False
# Check for deep thinking guidance in next_steps
next_steps = response1_data.get("next_steps", "")
if "MANDATORY" not in next_steps or "deep thinking" not in next_steps.lower():
self.logger.error("Expected mandatory deep thinking guidance")
return False
self.logger.info(" ✅ Complex plan step 1 correctly triggered deep thinking pause")
# Step 2 of complex plan - should also trigger deep thinking
self.logger.info(" 1.3.2: Step 2 of complex plan (should trigger deep thinking)")
response2, _ = self.call_mcp_tool(
"planner",
{
"step": "After deep analysis, I can see this transformation requires three parallel tracks: Technical Infrastructure, Business Process, and Human Capital. Let me design the coordination strategy.",
"step_number": 2,
"total_steps": 8,
"next_step_required": True,
"continuation_id": continuation_id,
"model": "flash",
},
)
if not response2:
self.logger.error("Failed to continue complex planning")
return False
response2_data = self._parse_planner_response(response2)
if not response2_data:
return False
# Step 2 should also trigger deep thinking for complex plans
if response2_data.get("status") != "pause_for_deep_thinking":
self.logger.error("Expected deep thinking pause for complex plan step 2")
return False
self.logger.info(" ✅ Complex plan step 2 correctly triggered deep thinking pause")
# Step 4 of complex plan - should use normal flow (after step 3)
self.logger.info(" 1.3.3: Step 4 of complex plan (should use normal flow)")
response4, _ = self.call_mcp_tool(
"planner",
{
"step": "Now moving to tactical planning: Phase 1 execution details with specific timelines and resource allocation for the technical infrastructure track.",
"step_number": 4,
"total_steps": 8,
"next_step_required": True,
"continuation_id": continuation_id,
"model": "flash",
},
)
if not response4:
self.logger.error("Failed to continue to step 4")
return False
response4_data = self._parse_planner_response(response4)
if not response4_data:
return False
# Step 4 should use normal flow (no more deep thinking pauses)
if response4_data.get("status") != "pause_for_planner":
self.logger.error("Expected normal planning flow for step 4")
return False
if response4_data.get("thinking_required"):
self.logger.error("Step 4 should not require special thinking pause")
return False
self.logger.info(" ✅ Complex plan transitions to normal flow after step 3")
return True
except Exception as e:
self.logger.error(f"Complex plan deep thinking test failed: {e}")
return False
def _test_self_contained_completion(self) -> bool:
"""Test self-contained completion without expert analysis"""
try:
self.logger.info(" 1.4: Testing self-contained completion")
# Simple planning session that should complete without expert analysis
self.logger.info(" 1.4.1: Simple planning session")
response1, continuation_id = self.call_mcp_tool(
"planner",
{
"step": "Planning a simple website redesign with new color scheme and improved navigation.",
"step_number": 1,
"total_steps": 2,
"next_step_required": True,
"model": "flash",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start simple planning")
return False
# Final step - should complete without expert analysis
self.logger.info(" 1.4.2: Final step - self-contained completion")
response2, _ = self.call_mcp_tool(
"planner",
{
"step": "Website redesign plan complete: Phase 1 - Update color palette and typography, Phase 2 - Redesign navigation structure and user flows.",
"step_number": 2,
"total_steps": 2,
"next_step_required": False, # Final step
"continuation_id": continuation_id,
"model": "flash",
},
)
if not response2:
self.logger.error("Failed to complete simple planning")
return False
response2_data = self._parse_planner_response(response2)
if not response2_data:
return False
# Validate self-contained completion
if response2_data.get("status") != "planner_complete":
self.logger.error("Expected self-contained completion status")
return False
# Should NOT call expert analysis
if "expert_analysis" in response2_data:
self.logger.error("PlannerWorkflow should not call expert analysis")
return False
# Should have planning_complete flag
if not response2_data.get("planning_complete"):
self.logger.error("Expected planning_complete=true")
return False
# Should have plan_summary
if not response2_data.get("plan_summary"):
self.logger.error("Expected plan_summary in completion")
return False
# Check completion instructions
output = response2_data.get("output", {})
if not output.get("instructions"):
self.logger.error("Missing output instructions for plan presentation")
return False
self.logger.info(" ✅ Self-contained completion working correctly")
return True
except Exception as e:
self.logger.error(f"Self-contained completion test failed: {e}")
return False
def _test_branching_and_revision(self) -> bool:
"""Test branching and revision with workflow architecture"""
try:
self.logger.info(" 1.5: Testing branching and revision with workflow")
# Start planning session for branching test
self.logger.info(" 1.5.1: Start planning for branching test")
response1, continuation_id = self.call_mcp_tool(
"planner",
{
"step": "Planning mobile app development strategy with different technology options to evaluate.",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"model": "flash",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start branching test")
return False
# Create branch
self.logger.info(" 1.5.2: Create branch for React Native approach")
response2, _ = self.call_mcp_tool(
"planner",
{
"step": "Branch A: React Native approach - cross-platform development with shared codebase, faster development cycle, and consistent UI across platforms.",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"is_branch_point": True,
"branch_from_step": 1,
"branch_id": "react-native",
"continuation_id": continuation_id,
"model": "flash",
},
)
if not response2:
self.logger.error("Failed to create branch")
return False
response2_data = self._parse_planner_response(response2)
if not response2_data:
return False
# Validate branching in workflow
metadata = response2_data.get("metadata", {})
if not metadata.get("is_branch_point"):
self.logger.error("Branch point not recorded in workflow")
return False
if metadata.get("branch_id") != "react-native":
self.logger.error("Branch ID not properly recorded")
return False
if "react-native" not in metadata.get("branches", []):
self.logger.error("Branch not added to branches list")
return False
self.logger.info(" ✅ Branching working with workflow architecture")
# Test revision
self.logger.info(" 1.5.3: Test revision capability")
response3, _ = self.call_mcp_tool(
"planner",
{
"step": "Revision of step 2: After consideration, let me revise the React Native approach to include performance optimizations and native module integration for critical features.",
"step_number": 3,
"total_steps": 4,
"next_step_required": True,
"is_step_revision": True,
"revises_step_number": 2,
"continuation_id": continuation_id,
"model": "flash",
},
)
if not response3:
self.logger.error("Failed to create revision")
return False
response3_data = self._parse_planner_response(response3)
if not response3_data:
return False
# Validate revision in workflow
metadata = response3_data.get("metadata", {})
if not metadata.get("is_step_revision"):
self.logger.error("Step revision not recorded in workflow")
return False
if metadata.get("revises_step_number") != 2:
self.logger.error("Revised step number not properly recorded")
return False
self.logger.info(" ✅ Revision working with workflow architecture")
return True
except Exception as e:
self.logger.error(f"Branching and revision test failed: {e}")
return False
def _test_workflow_file_context(self) -> bool:
"""Test workflow file context behavior (should be minimal for planner)"""
try:
self.logger.info(" 1.6: Testing workflow file context behavior")
# Planner typically doesn't use files, but test the workflow handles this correctly
self.logger.info(" 1.6.1: Planning step with no files (normal case)")
response1, continuation_id = self.call_mcp_tool(
"planner",
{
"step": "Planning data architecture for analytics platform.",
"step_number": 1,
"total_steps": 2,
"next_step_required": True,
"model": "flash",
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to start workflow file context test")
return False
response1_data = self._parse_planner_response(response1)
if not response1_data:
return False
# Planner workflow should not have file_context since it doesn't use files
if "file_context" in response1_data:
self.logger.info(" ℹ️ Workflow file context present but should be minimal for planner")
# Final step
self.logger.info(" 1.6.2: Final step (should complete without file embedding)")
response2, _ = self.call_mcp_tool(
"planner",
{
"step": "Data architecture plan complete with data lakes, processing pipelines, and analytics layers.",
"step_number": 2,
"total_steps": 2,
"next_step_required": False,
"continuation_id": continuation_id,
"model": "flash",
},
)
if not response2:
self.logger.error("Failed to complete workflow file context test")
return False
response2_data = self._parse_planner_response(response2)
if not response2_data:
return False
# Final step should complete self-contained
if response2_data.get("status") != "planner_complete":
self.logger.error("Expected self-contained completion for planner workflow")
return False
self.logger.info(" ✅ Workflow file context behavior appropriate for planner")
return True
except Exception as e:
self.logger.error(f"Workflow file context test failed: {e}")
return False
def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
"""Call an MCP tool in-process - override for planner-specific response handling"""
# Use in-process implementation to maintain conversation memory
response_text, _ = self.call_mcp_tool_direct(tool_name, params)
if not response_text:
return None, None
# Extract continuation_id from planner response specifically
continuation_id = self._extract_planner_continuation_id(response_text)
return response_text, continuation_id
def _extract_planner_continuation_id(self, response_text: str) -> Optional[str]:
"""Extract continuation_id from planner response"""
try:
# Parse the response
response_data = json.loads(response_text)
return response_data.get("continuation_id")
except json.JSONDecodeError as e:
self.logger.debug(f"Failed to parse response for planner continuation_id: {e}")
return None
def _parse_planner_response(self, response_text: str) -> dict:
"""Parse planner tool JSON response"""
try:
# Parse the response - it should be direct JSON
return json.loads(response_text)
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse planner response as JSON: {e}")
self.logger.error(f"Response text: {response_text[:500]}...")
return {}
def _validate_step_response(
self,
response_data: dict,
expected_step: int,
expected_total: int,
expected_next_required: bool,
expected_status: str,
) -> bool:
"""Validate a planner step response structure"""
try:
# Check status
if response_data.get("status") != expected_status:
self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
return False
# Check step number
if response_data.get("step_number") != expected_step:
self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
return False
# Check total steps
if response_data.get("total_steps") != expected_total:
self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
return False
# Check next_step_required
if response_data.get("next_step_required") != expected_next_required:
self.logger.error(
f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
)
return False
# Check step_content exists
if not response_data.get("step_content"):
self.logger.error("Missing step_content in response")
return False
# Check next_steps guidance
if not response_data.get("next_steps"):
self.logger.error("Missing next_steps guidance in response")
return False
return True
except Exception as e:
self.logger.error(f"Error validating step response: {e}")
return False
```
--------------------------------------------------------------------------------
/tools/docgen.py:
--------------------------------------------------------------------------------
```python
"""
Documentation Generation tool - Automated code documentation with complexity analysis
This tool provides a structured workflow for adding comprehensive documentation to codebases.
It guides you through systematic code analysis to generate modern documentation with:
- Function/method parameter documentation
- Big O complexity analysis
- Call flow and dependency documentation
- Inline comments for complex logic
- Smart updating of existing documentation
Key features:
- Step-by-step documentation workflow with progress tracking
- Context-aware file embedding (references during analysis, full content for documentation)
- Automatic conversation threading and history preservation
- Expert analysis integration with external models
- Support for multiple programming languages and documentation styles
- Configurable documentation features via parameters
"""
import logging
from typing import TYPE_CHECKING, Any, Optional
from pydantic import Field
if TYPE_CHECKING:
from tools.models import ToolModelCategory
from config import TEMPERATURE_ANALYTICAL
from systemprompts import DOCGEN_PROMPT
from tools.shared.base_models import WorkflowRequest
from .workflow.base import WorkflowTool
logger = logging.getLogger(__name__)
# Tool-specific field descriptions for documentation generation
DOCGEN_FIELD_DESCRIPTIONS = {
"step": (
"Step 1 (Discovery): list every file that needs documentation and record the total. Do not write docs yet. "
"Steps 2+: document exactly one file per step. Never change code logic; log bugs separately. Keep the counters accurate."
),
"step_number": "Current documentation step (starts at 1).",
"total_steps": "1 discovery step + one step per file documented (tracks via `total_files_to_document`).",
"next_step_required": "True while more files still need documentation; False once everything is complete.",
"findings": "Summarize documentation gaps, complexity, call flows, and well-documented areas. Stop and report immediately if you uncover a bug.",
"relevant_files": "Absolute paths for the file(s) you are documenting this step—stick to a single file per step.",
"relevant_context": "Functions or methods needing documentation (e.g. 'Class.method', 'function_name'), especially complex or user-facing areas.",
"num_files_documented": "Count of files finished so far. Increment only when a file is fully documented.",
"total_files_to_document": "Total files identified in discovery; completion requires matching this count.",
"document_complexity": "Include algorithmic complexity (Big O) analysis when True (default).",
"document_flow": "Include call flow/dependency notes when True (default).",
"update_existing": "True (default) to polish inaccurate or outdated docs instead of leaving them untouched.",
"comments_on_complex_logic": "True (default) to add inline comments around non-obvious logic.",
}
class DocgenRequest(WorkflowRequest):
"""Request model for documentation generation steps"""
# Required workflow fields
step: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step"])
step_number: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step_number"])
total_steps: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["total_steps"])
next_step_required: bool = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["next_step_required"])
# Documentation analysis tracking fields
findings: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["findings"])
relevant_files: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_files"])
relevant_context: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_context"])
# Critical completion tracking counters
num_files_documented: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"])
total_files_to_document: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"])
# Documentation generation configuration parameters
document_complexity: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_complexity"])
document_flow: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_flow"])
update_existing: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["update_existing"])
comments_on_complex_logic: Optional[bool] = Field(
True, description=DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"]
)
class DocgenTool(WorkflowTool):
"""
Documentation generation tool for automated code documentation with complexity analysis.
This tool implements a structured documentation workflow that guides users through
methodical code analysis to generate comprehensive documentation including:
- Function/method signatures and parameter descriptions
- Algorithmic complexity (Big O) analysis
- Call flow and dependency documentation
- Inline comments for complex logic
- Modern documentation style appropriate for the language/platform
"""
def __init__(self):
super().__init__()
self.initial_request = None
def get_name(self) -> str:
return "docgen"
def get_description(self) -> str:
return (
"Generates comprehensive code documentation with systematic analysis of functions, classes, and complexity. "
"Use for documentation generation, code analysis, complexity assessment, and API documentation. "
"Analyzes code structure and patterns to create thorough documentation."
)
def get_system_prompt(self) -> str:
return DOCGEN_PROMPT
def get_default_temperature(self) -> float:
return TEMPERATURE_ANALYTICAL
def get_model_category(self) -> "ToolModelCategory":
"""Docgen requires analytical and reasoning capabilities"""
from tools.models import ToolModelCategory
return ToolModelCategory.EXTENDED_REASONING
def requires_model(self) -> bool:
"""
Docgen tool doesn't require model resolution at the MCP boundary.
The docgen tool is a self-contained workflow tool that guides the CLI agent through
systematic documentation generation without calling external AI models.
Returns:
bool: False - docgen doesn't need external AI model access
"""
return False
def requires_expert_analysis(self) -> bool:
"""Docgen is self-contained and doesn't need expert analysis."""
return False
def get_workflow_request_model(self):
"""Return the docgen-specific request model."""
return DocgenRequest
def get_tool_fields(self) -> dict[str, dict[str, Any]]:
"""Return the tool-specific fields for docgen."""
return {
"document_complexity": {
"type": "boolean",
"default": True,
"description": DOCGEN_FIELD_DESCRIPTIONS["document_complexity"],
},
"document_flow": {
"type": "boolean",
"default": True,
"description": DOCGEN_FIELD_DESCRIPTIONS["document_flow"],
},
"update_existing": {
"type": "boolean",
"default": True,
"description": DOCGEN_FIELD_DESCRIPTIONS["update_existing"],
},
"comments_on_complex_logic": {
"type": "boolean",
"default": True,
"description": DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"],
},
"num_files_documented": {
"type": "integer",
"default": 0,
"minimum": 0,
"description": DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"],
},
"total_files_to_document": {
"type": "integer",
"default": 0,
"minimum": 0,
"description": DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"],
},
}
def get_required_fields(self) -> list[str]:
"""Return additional required fields beyond the standard workflow requirements."""
return [
"document_complexity",
"document_flow",
"update_existing",
"comments_on_complex_logic",
"num_files_documented",
"total_files_to_document",
]
def get_input_schema(self) -> dict[str, Any]:
"""Generate input schema using WorkflowSchemaBuilder with field exclusions."""
from .workflow.schema_builders import WorkflowSchemaBuilder
# Exclude workflow fields that documentation generation doesn't need
excluded_workflow_fields = [
"confidence", # Documentation doesn't use confidence levels
"hypothesis", # Documentation doesn't use hypothesis
"files_checked", # Documentation uses doc_files and doc_methods instead for better tracking
]
# Exclude common fields that documentation generation doesn't need
excluded_common_fields = [
"model", # Documentation doesn't need external model selection
"temperature", # Documentation doesn't need temperature control
"thinking_mode", # Documentation doesn't need thinking mode
"images", # Documentation doesn't use images
]
return WorkflowSchemaBuilder.build_schema(
tool_specific_fields=self.get_tool_fields(),
required_fields=self.get_required_fields(), # Include docgen-specific required fields
model_field_schema=None, # Exclude model field - docgen doesn't need external model selection
auto_mode=False, # Force non-auto mode to prevent model field addition
tool_name=self.get_name(),
excluded_workflow_fields=excluded_workflow_fields,
excluded_common_fields=excluded_common_fields,
)
def get_required_actions(
self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
) -> list[str]:
"""Define required actions for comprehensive documentation analysis with step-by-step file focus."""
if step_number == 1:
# Initial discovery ONLY - no documentation yet
return [
"CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
"Discover ALL files in the current directory (not nested) that need documentation",
"COUNT the exact number of files that need documentation",
"LIST all the files you found that need documentation by name",
"IDENTIFY the programming language(s) to use MODERN documentation style (/// for Objective-C, /** */ for Java/JavaScript, etc.)",
"DO NOT start documenting any files yet - this is discovery phase only",
"Report the total count and file list clearly to the user",
"IMMEDIATELY call docgen step 2 after discovery to begin documentation phase",
"WHEN CALLING DOCGEN step 2: Set total_files_to_document to the exact count you found",
"WHEN CALLING DOCGEN step 2: Set num_files_documented to 0 (haven't started yet)",
]
elif step_number == 2:
# Start documentation phase with first file
return [
"CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
"Choose the FIRST file from your discovered list to start documentation",
"For the chosen file: identify ALL functions, classes, and methods within it",
'USE MODERN documentation style for the programming language (/// for Objective-C, /** */ for Java/JavaScript, """ for Python, etc.)',
"Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
"When file is 100% documented, increment num_files_documented from 0 to 1",
"Note any dependencies this file has (what it imports/calls) and what calls into it",
"CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
"Report which specific functions you documented in this step for accountability",
"Report progress: num_files_documented (1) out of total_files_to_document",
]
elif step_number <= 4:
# Continue with focused file-by-file approach
return [
"CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
"Choose the NEXT undocumented file from your discovered list",
"For the chosen file: identify ALL functions, classes, and methods within it",
"USE MODERN documentation style for the programming language (NEVER use legacy /* */ style for languages with modern alternatives)",
"Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
"When file is 100% documented, increment num_files_documented by 1",
"Verify that EVERY function in the current file has proper documentation (no skipping)",
"CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
"Report specific function names you documented for verification",
"Report progress: current num_files_documented out of total_files_to_document",
]
else:
# Continue systematic file-by-file coverage
return [
"CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
"Check counters: num_files_documented vs total_files_to_document",
"If num_files_documented < total_files_to_document: choose NEXT undocumented file",
"USE MODERN documentation style appropriate for each programming language (NEVER legacy styles)",
"Document every function, method, and class in current file with no exceptions",
"When file is 100% documented, increment num_files_documented by 1",
"CRITICAL: If you find ANY bugs/logic errors, STOP documenting and report to user immediately",
"Report progress: current num_files_documented out of total_files_to_document",
"If num_files_documented < total_files_to_document: RESTART docgen with next step",
"ONLY set next_step_required=false when num_files_documented equals total_files_to_document",
"For nested dependencies: check if functions call into subdirectories and document those too",
"CRITICAL: If ANY bugs/logic errors were found, STOP and ask user before proceeding",
]
def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
"""Docgen is self-contained and doesn't need expert analysis."""
return False
def prepare_expert_analysis_context(self, consolidated_findings) -> str:
"""Docgen doesn't use expert analysis."""
return ""
def get_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
"""
Provide step-specific guidance for documentation generation workflow.
This method generates docgen-specific guidance used by get_step_guidance_message().
"""
# Generate the next steps instruction based on required actions
# Calculate dynamic total_steps based on files to document
total_files_to_document = self.get_request_total_files_to_document(request)
calculated_total_steps = 1 + total_files_to_document if total_files_to_document > 0 else request.total_steps
required_actions = self.get_required_actions(step_number, confidence, request.findings, calculated_total_steps)
if step_number == 1:
next_steps = (
f"DISCOVERY PHASE ONLY - DO NOT START DOCUMENTING YET!\n"
f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first perform "
f"FILE DISCOVERY step by step. DO NOT DOCUMENT ANYTHING YET. "
f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
+ "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\n\nCRITICAL: When you call {self.get_name()} step 2, set total_files_to_document to the exact count "
f"of files needing documentation and set num_files_documented to 0 (haven't started documenting yet). "
f"Your total_steps will be automatically calculated as 1 (discovery) + number of files to document. "
f"Step 2 will BEGIN the documentation phase. Report the count clearly and then IMMEDIATELY "
f"proceed to call {self.get_name()} step 2 to start documenting the first file."
)
elif step_number == 2:
next_steps = (
f"DOCUMENTATION PHASE BEGINS! ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
f"START FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
+ "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented from 0 to 1 when first file complete. "
f"REPORT counters: current num_files_documented out of total_files_to_document. "
f"CRITICAL: If you found ANY bugs/logic errors, STOP documenting and ask user what to do before continuing. "
f"Do NOT move to a new file until the current one is completely documented. "
f"When ready for step {step_number + 1}, report completed work with updated counters."
)
elif step_number <= 4:
next_steps = (
f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
f"CONTINUE FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
+ "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented when file complete. "
f"REPORT counters: current num_files_documented out of total_files_to_document. "
f"CRITICAL: If you found ANY bugs/logic errors, STOP documenting and ask user what to do before continuing. "
f"Do NOT move to a new file until the current one is completely documented. "
f"When ready for step {step_number + 1}, report completed work with updated counters."
)
else:
next_steps = (
f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
f"CRITICAL: Check if MORE FILES need documentation before finishing! "
f"REQUIRED ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
+ "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\n\nREPORT which functions you documented and update num_files_documented when file complete. "
f"CHECK: If num_files_documented < total_files_to_document, RESTART {self.get_name()} with next step! "
f"CRITICAL: Only set next_step_required=false when num_files_documented equals total_files_to_document! "
f"REPORT counters: current num_files_documented out of total_files_to_document. "
f"CRITICAL: If ANY bugs/logic errors were found during documentation, STOP and ask user before proceeding. "
f"NO recursive {self.get_name()} calls without actual documentation work!"
)
return {"next_steps": next_steps}
# Hook method overrides for docgen-specific behavior
async def handle_work_completion(self, response_data: dict, request, arguments: dict) -> dict:
"""
Override work completion to enforce counter validation.
The docgen tool MUST complete ALL files before finishing. If counters don't match,
force continuation regardless of next_step_required setting.
"""
# CRITICAL VALIDATION: Check if all files have been documented using proper inheritance hooks
num_files_documented = self.get_request_num_files_documented(request)
total_files_to_document = self.get_request_total_files_to_document(request)
if num_files_documented < total_files_to_document:
# Counters don't match - force continuation!
logger.warning(
f"Docgen stopping early: {num_files_documented} < {total_files_to_document}. "
f"Forcing continuation to document remaining files."
)
# Override to continuation mode
response_data["status"] = "documentation_analysis_required"
response_data[f"pause_for_{self.get_name()}"] = True
response_data["next_steps"] = (
f"CRITICAL ERROR: You attempted to finish documentation with only {num_files_documented} "
f"out of {total_files_to_document} files documented! You MUST continue documenting "
f"the remaining {total_files_to_document - num_files_documented} files. "
f"Call {self.get_name()} again with step {request.step_number + 1} and continue documentation "
f"of the next undocumented file. DO NOT set next_step_required=false until ALL files are documented!"
)
return response_data
# If counters match, proceed with normal completion
return await super().handle_work_completion(response_data, request, arguments)
def prepare_step_data(self, request) -> dict:
"""
Prepare docgen-specific step data for processing.
Calculates total_steps dynamically based on number of files to document:
- Step 1: Discovery phase
- Steps 2+: One step per file to document
"""
# Calculate dynamic total_steps based on files to document
total_files_to_document = self.get_request_total_files_to_document(request)
if total_files_to_document > 0:
# Discovery step (1) + one step per file
calculated_total_steps = 1 + total_files_to_document
else:
# Fallback to request total_steps if no file count available
calculated_total_steps = request.total_steps
step_data = {
"step": request.step,
"step_number": request.step_number,
"total_steps": calculated_total_steps, # Use calculated value
"findings": request.findings,
"relevant_files": request.relevant_files,
"relevant_context": request.relevant_context,
"num_files_documented": request.num_files_documented,
"total_files_to_document": request.total_files_to_document,
"issues_found": [], # Docgen uses this for documentation gaps
"confidence": "medium", # Default confidence for docgen
"hypothesis": "systematic_documentation_needed", # Default hypothesis
"images": [], # Docgen doesn't typically use images
# CRITICAL: Include documentation configuration parameters so the model can see them
"document_complexity": request.document_complexity,
"document_flow": request.document_flow,
"update_existing": request.update_existing,
"comments_on_complex_logic": request.comments_on_complex_logic,
}
return step_data
def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
"""
Docgen tool skips expert analysis when the CLI agent has "certain" confidence.
"""
return request.confidence == "certain" and not request.next_step_required
# Override inheritance hooks for docgen-specific behavior
def get_completion_status(self) -> str:
"""Docgen tools use docgen-specific status."""
return "documentation_analysis_complete"
def get_completion_data_key(self) -> str:
"""Docgen uses 'complete_documentation_analysis' key."""
return "complete_documentation_analysis"
def get_final_analysis_from_request(self, request):
"""Docgen tools use 'hypothesis' field for documentation strategy."""
return request.hypothesis
def get_confidence_level(self, request) -> str:
"""Docgen tools use 'certain' for high confidence."""
return request.confidence or "high"
def get_completion_message(self) -> str:
"""Docgen-specific completion message."""
return (
"Documentation analysis complete with high confidence. You have identified the comprehensive "
"documentation needs and strategy. MANDATORY: Present the user with the documentation plan "
"and IMMEDIATELY proceed with implementing the documentation without requiring further "
"consultation. Focus on the precise documentation improvements needed."
)
def get_skip_reason(self) -> str:
"""Docgen-specific skip reason."""
return "Completed comprehensive documentation analysis locally"
def get_request_relevant_context(self, request) -> list:
"""Get relevant_context for docgen tool."""
try:
return request.relevant_context or []
except AttributeError:
return []
def get_request_num_files_documented(self, request) -> int:
"""Get num_files_documented from request. Override for custom handling."""
try:
return request.num_files_documented or 0
except AttributeError:
return 0
def get_request_total_files_to_document(self, request) -> int:
"""Get total_files_to_document from request. Override for custom handling."""
try:
return request.total_files_to_document or 0
except AttributeError:
return 0
def get_skip_expert_analysis_status(self) -> str:
"""Docgen-specific expert analysis skip status."""
return "skipped_due_to_complete_analysis"
def prepare_work_summary(self) -> str:
"""Docgen-specific work summary."""
try:
return f"Completed {len(self.work_history)} documentation analysis steps"
except AttributeError:
return "Completed documentation analysis"
def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
"""
Docgen-specific completion message.
"""
return (
"DOCUMENTATION ANALYSIS IS COMPLETE FOR ALL FILES (num_files_documented equals total_files_to_document). "
"MANDATORY FINAL VERIFICATION: Before presenting your summary, you MUST perform a final verification scan. "
"Read through EVERY file you documented and check EVERY function, method, class, and property to confirm "
"it has proper documentation including complexity analysis and call flow information. If ANY items lack "
"documentation, document them immediately before finishing. "
"THEN present a clear summary showing: 1) Final counters: num_files_documented out of total_files_to_document, "
"2) Complete accountability list of ALL files you documented with verification status, "
"3) Detailed list of EVERY function/method you documented in each file (proving complete coverage), "
"4) Any dependency relationships you discovered between files, 5) Recommended documentation improvements with concrete examples including "
"complexity analysis and call flow information. 6) **CRITICAL**: List any bugs or logic issues you found "
"during documentation but did NOT fix - present these to the user and ask what they'd like to do about them. "
"Make it easy for a developer to see the complete documentation status across the entire codebase with full accountability."
)
def get_step_guidance_message(self, request) -> str:
"""
Docgen-specific step guidance with detailed analysis instructions.
"""
step_guidance = self.get_step_guidance(request.step_number, request.confidence, request)
return step_guidance["next_steps"]
def customize_workflow_response(self, response_data: dict, request) -> dict:
"""
Customize response to match docgen tool format.
"""
# Store initial request on first step
if request.step_number == 1:
self.initial_request = request.step
# Convert generic status names to docgen-specific ones
tool_name = self.get_name()
status_mapping = {
f"{tool_name}_in_progress": "documentation_analysis_in_progress",
f"pause_for_{tool_name}": "pause_for_documentation_analysis",
f"{tool_name}_required": "documentation_analysis_required",
f"{tool_name}_complete": "documentation_analysis_complete",
}
if response_data["status"] in status_mapping:
response_data["status"] = status_mapping[response_data["status"]]
# Rename status field to match docgen tool
if f"{tool_name}_status" in response_data:
response_data["documentation_analysis_status"] = response_data.pop(f"{tool_name}_status")
# Add docgen-specific status fields
response_data["documentation_analysis_status"]["documentation_strategies"] = len(
self.consolidated_findings.hypotheses
)
# Rename complete documentation analysis data
if f"complete_{tool_name}" in response_data:
response_data["complete_documentation_analysis"] = response_data.pop(f"complete_{tool_name}")
# Map the completion flag to match docgen tool
if f"{tool_name}_complete" in response_data:
response_data["documentation_analysis_complete"] = response_data.pop(f"{tool_name}_complete")
# Map the required flag to match docgen tool
if f"{tool_name}_required" in response_data:
response_data["documentation_analysis_required"] = response_data.pop(f"{tool_name}_required")
return response_data
# Required abstract methods from BaseTool
def get_request_model(self):
"""Return the docgen-specific request model."""
return DocgenRequest
async def prepare_prompt(self, request) -> str:
"""Not used - workflow tools use execute_workflow()."""
return "" # Workflow tools use execute_workflow() directly
```
--------------------------------------------------------------------------------
/tests/test_conversation_memory.py:
--------------------------------------------------------------------------------
```python
"""
Test suite for conversation memory system
Tests the Redis-based conversation persistence needed for AI-to-AI multi-turn
discussions in stateless MCP environments.
"""
import os
from unittest.mock import Mock, patch
import pytest
from server import get_follow_up_instructions
from utils.conversation_memory import (
CONVERSATION_TIMEOUT_SECONDS,
MAX_CONVERSATION_TURNS,
ConversationTurn,
ThreadContext,
add_turn,
build_conversation_history,
create_thread,
get_thread,
)
class TestConversationMemory:
"""Test the conversation memory system for stateless MCP requests"""
@patch("utils.conversation_memory.get_storage")
def test_create_thread(self, mock_storage):
"""Test creating a new thread"""
mock_client = Mock()
mock_storage.return_value = mock_client
thread_id = create_thread("chat", {"prompt": "Hello", "absolute_file_paths": ["/test.py"]})
assert thread_id is not None
assert len(thread_id) == 36 # UUID4 length
# Verify Redis was called
mock_client.setex.assert_called_once()
call_args = mock_client.setex.call_args
assert call_args[0][0] == f"thread:{thread_id}" # key
assert call_args[0][1] == CONVERSATION_TIMEOUT_SECONDS # TTL from configuration
@patch("utils.conversation_memory.get_storage")
def test_get_thread_valid(self, mock_storage):
"""Test retrieving an existing thread"""
mock_client = Mock()
mock_storage.return_value = mock_client
test_uuid = "12345678-1234-1234-1234-123456789012"
# Create valid ThreadContext and serialize it
context_obj = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=[],
initial_context={"prompt": "test"},
)
mock_client.get.return_value = context_obj.model_dump_json()
context = get_thread(test_uuid)
assert context is not None
assert context.thread_id == test_uuid
assert context.tool_name == "chat"
mock_client.get.assert_called_once_with(f"thread:{test_uuid}")
@patch("utils.conversation_memory.get_storage")
def test_get_thread_invalid_uuid(self, mock_storage):
"""Test handling invalid UUID"""
context = get_thread("invalid-uuid")
assert context is None
@patch("utils.conversation_memory.get_storage")
def test_get_thread_not_found(self, mock_storage):
"""Test handling thread not found"""
mock_client = Mock()
mock_storage.return_value = mock_client
mock_client.get.return_value = None
context = get_thread("12345678-1234-1234-1234-123456789012")
assert context is None
@patch("utils.conversation_memory.get_storage")
def test_add_turn_success(self, mock_storage):
"""Test adding a turn to existing thread"""
mock_client = Mock()
mock_storage.return_value = mock_client
test_uuid = "12345678-1234-1234-1234-123456789012"
# Create valid ThreadContext
context_obj = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=[],
initial_context={"prompt": "test"},
)
mock_client.get.return_value = context_obj.model_dump_json()
success = add_turn(test_uuid, "user", "Hello there")
assert success is True
# Verify Redis get and setex were called
mock_client.get.assert_called_once()
mock_client.setex.assert_called_once()
@patch("utils.conversation_memory.get_storage")
def test_add_turn_max_limit(self, mock_storage):
"""Test turn limit enforcement"""
mock_client = Mock()
mock_storage.return_value = mock_client
test_uuid = "12345678-1234-1234-1234-123456789012"
# Create thread with MAX_CONVERSATION_TURNS turns (at limit)
turns = [
ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
for i in range(MAX_CONVERSATION_TURNS)
]
context_obj = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=turns,
initial_context={"prompt": "test"},
)
mock_client.get.return_value = context_obj.model_dump_json()
success = add_turn(test_uuid, "user", "This should fail")
assert success is False
@patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
def test_build_conversation_history(self, project_path):
"""Test building conversation history format with files and speaker identification"""
from providers.registry import ModelProviderRegistry
ModelProviderRegistry.clear_cache()
# Create real test files to test actual file embedding functionality
main_file = project_path / "main.py"
readme_file = project_path / "docs" / "readme.md"
examples_dir = project_path / "examples"
examples_file = examples_dir / "example.py"
# Create directories and files
readme_file.parent.mkdir(parents=True, exist_ok=True)
examples_dir.mkdir(parents=True, exist_ok=True)
main_file.write_text("def main():\n print('Hello world')\n")
readme_file.write_text("# Project Documentation\nThis is a test project.\n")
examples_file.write_text("# Example code\nprint('Example')\n")
test_uuid = "12345678-1234-1234-1234-123456789012"
turns = [
ConversationTurn(
role="user",
content="What is Python?",
timestamp="2023-01-01T00:00:00Z",
files=[str(main_file), str(readme_file)],
),
ConversationTurn(
role="assistant",
content="Python is a programming language",
timestamp="2023-01-01T00:01:00Z",
files=[str(examples_dir)], # Directory will be expanded to files
tool_name="chat",
model_name="gpt-5",
model_provider="openai",
),
]
context = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=turns,
initial_context={},
)
history, tokens = build_conversation_history(context, model_context=None)
# Test basic structure
assert "CONVERSATION HISTORY" in history
assert f"Thread: {test_uuid}" in history
assert "Tool: chat" in history
assert f"Turn 2/{MAX_CONVERSATION_TURNS}" in history
# Test speaker identification
assert "--- Turn 1 (Agent) ---" in history
assert "--- Turn 2 (gpt-5 using chat via openai) ---" in history
# Test content
assert "What is Python?" in history
assert "Python is a programming language" in history
# Test file tracking
# Check that the new file embedding section is included
assert "=== FILES REFERENCED IN THIS CONVERSATION ===" in history
assert "The following files have been shared and analyzed during our conversation." in history
# Check that file context from previous turns is included (now shows files used per turn)
assert f"Files used in this turn: {main_file}, {readme_file}" in history
assert f"Files used in this turn: {examples_dir}" in history
# Verify actual file content is embedded
assert "def main():" in history
assert "Hello world" in history
assert "Project Documentation" in history
def test_build_conversation_history_empty(self):
"""Test building history with no turns"""
test_uuid = "12345678-1234-1234-1234-123456789012"
context = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=[],
initial_context={},
)
history, tokens = build_conversation_history(context, model_context=None)
assert history == ""
assert tokens == 0
class TestConversationFlow:
"""Test complete conversation flows simulating stateless MCP requests"""
@patch("utils.conversation_memory.get_storage")
def test_complete_conversation_cycle(self, mock_storage):
"""Test a complete 5-turn conversation until limit reached"""
mock_client = Mock()
mock_storage.return_value = mock_client
# Simulate independent MCP request cycles
# REQUEST 1: Initial request creates thread
thread_id = create_thread("chat", {"prompt": "Analyze this code"})
initial_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=[],
initial_context={"prompt": "Analyze this code"},
)
mock_client.get.return_value = initial_context.model_dump_json()
# Add assistant response
success = add_turn(
thread_id,
"assistant",
"Code analysis complete",
)
assert success is True
# REQUEST 2: User responds to follow-up (independent request cycle)
# Simulate retrieving updated context from Redis
context_after_1 = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=[
ConversationTurn(
role="assistant",
content="Code analysis complete",
timestamp="2023-01-01T00:00:30Z",
)
],
initial_context={"prompt": "Analyze this code"},
)
mock_client.get.return_value = context_after_1.model_dump_json()
success = add_turn(thread_id, "user", "Yes, check error handling")
assert success is True
success = add_turn(thread_id, "assistant", "Error handling reviewed")
assert success is True
# REQUEST 3-5: Continue conversation (simulating independent cycles)
# After turn 3
context_after_3 = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:03:00Z",
tool_name="chat",
turns=[
ConversationTurn(
role="assistant",
content="Code analysis complete",
timestamp="2023-01-01T00:00:30Z",
),
ConversationTurn(role="user", content="Yes, check error handling", timestamp="2023-01-01T00:01:30Z"),
ConversationTurn(
role="assistant",
content="Error handling reviewed",
timestamp="2023-01-01T00:02:30Z",
),
],
initial_context={"prompt": "Analyze this code"},
)
mock_client.get.return_value = context_after_3.model_dump_json()
success = add_turn(thread_id, "user", "Yes, check tests")
assert success is True
success = add_turn(thread_id, "assistant", "Test coverage analyzed")
assert success is True
# REQUEST 6: Try to exceed MAX_CONVERSATION_TURNS limit - should fail
turns_at_limit = [
ConversationTurn(
role="assistant" if i % 2 == 0 else "user", content=f"Turn {i + 1}", timestamp="2023-01-01T00:00:30Z"
)
for i in range(MAX_CONVERSATION_TURNS)
]
context_at_limit = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:05:00Z",
tool_name="chat",
turns=turns_at_limit,
initial_context={"prompt": "Analyze this code"},
)
mock_client.get.return_value = context_at_limit.model_dump_json()
# This should fail - conversation has reached limit
success = add_turn(thread_id, "user", "This should be rejected")
assert success is False # CONVERSATION STOPS HERE
@patch("utils.conversation_memory.get_storage")
def test_invalid_continuation_id_error(self, mock_storage):
"""Test that invalid continuation IDs raise proper error for restart"""
from server import reconstruct_thread_context
mock_client = Mock()
mock_storage.return_value = mock_client
mock_client.get.return_value = None # Thread not found
arguments = {"continuation_id": "invalid-uuid-12345", "prompt": "Continue conversation"}
# Should raise ValueError asking to restart
with pytest.raises(ValueError) as exc_info:
import asyncio
asyncio.run(reconstruct_thread_context(arguments))
error_msg = str(exc_info.value)
assert "Conversation thread 'invalid-uuid-12345' was not found or has expired" in error_msg
assert (
"Please restart the conversation by providing your full question/prompt without the continuation_id"
in error_msg
)
@patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
def test_dynamic_max_turns_configuration(self):
"""Test that all functions respect MAX_CONVERSATION_TURNS configuration"""
from providers.registry import ModelProviderRegistry
ModelProviderRegistry.clear_cache()
# This test ensures if we change MAX_CONVERSATION_TURNS, everything updates
# Test with different max values by patching the constant
test_values = [3, 7, 10]
for test_max in test_values:
# Create turns up to the test limit
turns = [
ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
for i in range(test_max)
]
# Test history building respects the limit
test_uuid = "12345678-1234-1234-1234-123456789012"
context = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=turns,
initial_context={},
)
history, tokens = build_conversation_history(context, model_context=None)
expected_turn_text = f"Turn {test_max}/{MAX_CONVERSATION_TURNS}"
assert expected_turn_text in history
def test_follow_up_instructions_dynamic_behavior(self):
"""Test that follow-up instructions change correctly based on turn count and max setting"""
# Test with default MAX_CONVERSATION_TURNS
max_turns = MAX_CONVERSATION_TURNS
# Test early conversation (should allow follow-ups)
early_instructions = get_follow_up_instructions(0, max_turns)
assert "CONVERSATION CONTINUATION" in early_instructions
assert f"({max_turns - 1} exchanges remaining)" in early_instructions
assert "Feel free to ask clarifying questions" in early_instructions
# Test mid conversation
mid_instructions = get_follow_up_instructions(2, max_turns)
assert "CONVERSATION CONTINUATION" in mid_instructions
assert f"({max_turns - 3} exchanges remaining)" in mid_instructions
assert "Feel free to ask clarifying questions" in mid_instructions
# Test approaching limit (should stop follow-ups)
limit_instructions = get_follow_up_instructions(max_turns - 1, max_turns)
assert "Do NOT include any follow-up questions" in limit_instructions
assert "final exchange" in limit_instructions
# Test at limit
at_limit_instructions = get_follow_up_instructions(max_turns, max_turns)
assert "Do NOT include any follow-up questions" in at_limit_instructions
# Test with custom max_turns to ensure dynamic behavior
custom_max = 3
custom_early = get_follow_up_instructions(0, custom_max)
assert f"({custom_max - 1} exchanges remaining)" in custom_early
custom_limit = get_follow_up_instructions(custom_max - 1, custom_max)
assert "Do NOT include any follow-up questions" in custom_limit
def test_follow_up_instructions_defaults_to_config(self):
"""Test that follow-up instructions use MAX_CONVERSATION_TURNS when max_turns not provided"""
instructions = get_follow_up_instructions(0) # No max_turns parameter
expected_remaining = MAX_CONVERSATION_TURNS - 1
assert f"({expected_remaining} exchanges remaining)" in instructions
@patch("utils.conversation_memory.get_storage")
def test_complete_conversation_with_dynamic_turns(self, mock_storage):
"""Test complete conversation respecting MAX_CONVERSATION_TURNS dynamically"""
mock_client = Mock()
mock_storage.return_value = mock_client
thread_id = create_thread("chat", {"prompt": "Start conversation"})
# Simulate conversation up to MAX_CONVERSATION_TURNS - 1
for turn_num in range(MAX_CONVERSATION_TURNS - 1):
# Mock context with current turns
turns = [
ConversationTurn(
role="user" if i % 2 == 0 else "assistant",
content=f"Turn {i + 1}",
timestamp="2023-01-01T00:00:00Z",
)
for i in range(turn_num)
]
context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=turns,
initial_context={"prompt": "Start conversation"},
)
mock_client.get.return_value = context.model_dump_json()
# Should succeed
success = add_turn(thread_id, "user", f"User turn {turn_num + 1}")
assert success is True, f"Turn {turn_num + 1} should succeed"
# Now we should be at the limit - create final context
final_turns = [
ConversationTurn(
role="user" if i % 2 == 0 else "assistant", content=f"Turn {i + 1}", timestamp="2023-01-01T00:00:00Z"
)
for i in range(MAX_CONVERSATION_TURNS)
]
final_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=final_turns,
initial_context={"prompt": "Start conversation"},
)
mock_client.get.return_value = final_context.model_dump_json()
# This should fail - at the limit
success = add_turn(thread_id, "user", "This should fail")
assert success is False, f"Turn {MAX_CONVERSATION_TURNS + 1} should fail"
@patch("utils.conversation_memory.get_storage")
@patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
def test_conversation_with_files_and_context_preservation(self, mock_storage):
"""Test complete conversation flow with file tracking and context preservation"""
from providers.registry import ModelProviderRegistry
ModelProviderRegistry.clear_cache()
mock_client = Mock()
mock_storage.return_value = mock_client
# Start conversation with files using a simple tool
thread_id = create_thread("chat", {"prompt": "Analyze this codebase", "absolute_file_paths": ["/project/src/"]})
# Turn 1: Claude provides context with multiple files
initial_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=[],
initial_context={
"prompt": "Analyze this codebase",
"absolute_file_paths": ["/project/src/"],
},
)
mock_client.get.return_value = initial_context.model_dump_json()
# Add Gemini's response
success = add_turn(
thread_id,
"assistant",
"I've analyzed your codebase structure.",
files=["/project/src/main.py", "/project/src/utils.py"],
tool_name="analyze",
model_name="gemini-2.5-flash",
model_provider="google",
)
assert success is True
# Turn 2: Claude responds with different files
context_turn_1 = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="analyze",
turns=[
ConversationTurn(
role="assistant",
content="I've analyzed your codebase structure.",
timestamp="2023-01-01T00:00:30Z",
files=["/project/src/main.py", "/project/src/utils.py"],
tool_name="analyze",
model_name="gemini-2.5-flash",
model_provider="google",
)
],
initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
)
mock_client.get.return_value = context_turn_1.model_dump_json()
# User responds with test files
success = add_turn(
thread_id, "user", "Yes, check the test coverage", files=["/project/tests/", "/project/test_main.py"]
)
assert success is True
# Turn 3: Gemini analyzes tests
context_turn_2 = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:02:00Z",
tool_name="analyze",
turns=[
ConversationTurn(
role="assistant",
content="I've analyzed your codebase structure.",
timestamp="2023-01-01T00:00:30Z",
files=["/project/src/main.py", "/project/src/utils.py"],
tool_name="analyze",
),
ConversationTurn(
role="user",
content="Yes, check the test coverage",
timestamp="2023-01-01T00:01:30Z",
files=["/project/tests/", "/project/test_main.py"],
),
],
initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
)
mock_client.get.return_value = context_turn_2.model_dump_json()
success = add_turn(
thread_id,
"assistant",
"Test coverage analysis complete. Coverage is 85%.",
files=["/project/tests/test_utils.py", "/project/coverage.html"],
tool_name="analyze",
model_name="gemini-2.5-flash",
model_provider="google",
)
assert success is True
# Build conversation history and verify chronological file preservation
final_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:03:00Z",
tool_name="analyze",
turns=[
ConversationTurn(
role="assistant",
content="I've analyzed your codebase structure.",
timestamp="2023-01-01T00:00:30Z",
files=["/project/src/main.py", "/project/src/utils.py"],
tool_name="analyze",
model_name="gemini-2.5-flash",
model_provider="google",
),
ConversationTurn(
role="user",
content="Yes, check the test coverage",
timestamp="2023-01-01T00:01:30Z",
files=["/project/tests/", "/project/test_main.py"],
),
ConversationTurn(
role="assistant",
content="Test coverage analysis complete. Coverage is 85%.",
timestamp="2023-01-01T00:02:30Z",
files=["/project/tests/test_utils.py", "/project/coverage.html"],
tool_name="analyze",
model_name="gemini-2.5-flash",
model_provider="google",
),
],
initial_context={"prompt": "Analyze this codebase", "relevant_files": ["/project/src/"]},
)
history, tokens = build_conversation_history(final_context)
# Verify chronological order and speaker identification
assert "--- Turn 1 (gemini-2.5-flash using analyze via google) ---" in history
assert "--- Turn 2 (Agent) ---" in history
assert "--- Turn 3 (gemini-2.5-flash using analyze via google) ---" in history
# Verify all files are preserved in chronological order
turn_1_files = "Files used in this turn: /project/src/main.py, /project/src/utils.py"
turn_2_files = "Files used in this turn: /project/tests/, /project/test_main.py"
turn_3_files = "Files used in this turn: /project/tests/test_utils.py, /project/coverage.html"
assert turn_1_files in history
assert turn_2_files in history
assert turn_3_files in history
# Verify content
assert "I've analyzed your codebase structure." in history
assert "Yes, check the test coverage" in history
assert "Test coverage analysis complete. Coverage is 85%." in history
# Verify chronological ordering (turn 1 appears before turn 2, etc.)
turn_1_pos = history.find("--- Turn 1 (gemini-2.5-flash using analyze via google) ---")
turn_2_pos = history.find("--- Turn 2 (Agent) ---")
turn_3_pos = history.find("--- Turn 3 (gemini-2.5-flash using analyze via google) ---")
assert turn_1_pos < turn_2_pos < turn_3_pos
@patch("utils.conversation_memory.get_storage")
def test_stateless_request_isolation(self, mock_storage):
"""Test that each request cycle is independent but shares context via Redis"""
mock_client = Mock()
mock_storage.return_value = mock_client
# Simulate two different "processes" accessing same thread
thread_id = "12345678-1234-1234-1234-123456789012"
# Process 1: Creates thread
initial_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="thinkdeep",
turns=[],
initial_context={"prompt": "Think about architecture"},
)
mock_client.get.return_value = initial_context.model_dump_json()
success = add_turn(thread_id, "assistant", "Architecture analysis")
assert success is True
# Process 2: Different "request cycle" accesses same thread
context_from_redis = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="thinkdeep",
turns=[
ConversationTurn(
role="assistant",
content="Architecture analysis",
timestamp="2023-01-01T00:00:30Z",
)
],
initial_context={"prompt": "Think about architecture"},
)
mock_client.get.return_value = context_from_redis.model_dump_json()
# Verify context continuity across "processes"
retrieved_context = get_thread(thread_id)
assert retrieved_context is not None
assert len(retrieved_context.turns) == 1
@patch.dict(os.environ, {"GEMINI_API_KEY": "test-key", "OPENAI_API_KEY": ""}, clear=False)
def test_token_limit_optimization_in_conversation_history(self):
"""Test that build_conversation_history efficiently handles token limits"""
import os
import tempfile
from providers.registry import ModelProviderRegistry
ModelProviderRegistry.clear_cache()
from utils.conversation_memory import build_conversation_history
# Create test files with known content sizes
with tempfile.TemporaryDirectory() as temp_dir:
# Create small and large test files
small_file = os.path.join(temp_dir, "small.py")
large_file = os.path.join(temp_dir, "large.py")
small_content = "# Small file\nprint('hello')\n"
large_content = "# Large file\n" + "x = 1\n" * 10000 # Very large file
with open(small_file, "w") as f:
f.write(small_content)
with open(large_file, "w") as f:
f.write(large_content)
# Create context with files that would exceed token limit
context = ThreadContext(
thread_id="test-token-limit",
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="analyze",
turns=[
ConversationTurn(
role="user",
content="Analyze these files",
timestamp="2023-01-01T00:00:30Z",
files=[small_file, large_file], # Large file should be truncated
)
],
initial_context={"prompt": "Analyze code"},
)
# Build conversation history (should handle token limits gracefully)
history, tokens = build_conversation_history(context, model_context=None)
# Verify the history was built successfully
assert "=== CONVERSATION HISTORY" in history
assert "=== FILES REFERENCED IN THIS CONVERSATION ===" in history
# The small file should be included, but large file might be truncated
# At minimum, verify no crashes and history is generated
assert len(history) > 0
# If truncation occurred, there should be a note about it
if "additional file(s) were truncated due to token limit" in history:
assert small_file in history or large_file in history
else:
# Both files fit within limit
assert small_file in history
assert large_file in history
if __name__ == "__main__":
pytest.main([__file__])
```
--------------------------------------------------------------------------------
/tests/test_large_prompt_handling.py:
--------------------------------------------------------------------------------
```python
"""
Tests for large prompt handling functionality.
This test module verifies that the MCP server correctly handles
prompts that exceed the 50,000 character limit by requesting
Claude to save them to a file and resend.
"""
import json
import os
import shutil
import tempfile
from unittest.mock import MagicMock, patch
import pytest
from config import MCP_PROMPT_SIZE_LIMIT
from tools.chat import ChatTool
from tools.codereview import CodeReviewTool
from tools.shared.exceptions import ToolExecutionError
# from tools.debug import DebugIssueTool # Commented out - debug tool refactored
class TestLargePromptHandling:
"""Test suite for large prompt handling across all tools."""
def teardown_method(self):
"""Clean up after each test to prevent state pollution."""
# Clear provider registry singleton
from providers.registry import ModelProviderRegistry
ModelProviderRegistry._instance = None
@pytest.fixture
def large_prompt(self):
"""Create a prompt larger than MCP_PROMPT_SIZE_LIMIT characters."""
return "x" * (MCP_PROMPT_SIZE_LIMIT + 1000)
@pytest.fixture
def normal_prompt(self):
"""Create a normal-sized prompt."""
return "This is a normal prompt that should work fine."
@pytest.fixture
def temp_prompt_file(self, large_prompt):
"""Create a temporary prompt.txt file with large content."""
# Create temp file with exact name "prompt.txt"
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, "prompt.txt")
with open(file_path, "w") as f:
f.write(large_prompt)
return file_path
@pytest.mark.asyncio
async def test_chat_large_prompt_detection(self, large_prompt):
"""Test that chat tool detects large prompts."""
tool = ChatTool()
temp_dir = tempfile.mkdtemp()
temp_dir = tempfile.mkdtemp()
try:
with pytest.raises(ToolExecutionError) as exc_info:
await tool.execute({"prompt": large_prompt, "working_directory_absolute_path": temp_dir})
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
output = json.loads(exc_info.value.payload)
assert output["status"] == "resend_prompt"
assert f"{MCP_PROMPT_SIZE_LIMIT:,} characters" in output["content"]
# The prompt size should match the user input since we check at MCP transport boundary before adding internal content
assert output["metadata"]["prompt_size"] == len(large_prompt)
assert output["metadata"]["limit"] == MCP_PROMPT_SIZE_LIMIT
@pytest.mark.asyncio
async def test_chat_normal_prompt_works(self, normal_prompt):
"""Test that chat tool works normally with regular prompts."""
tool = ChatTool()
temp_dir = tempfile.mkdtemp()
# This test runs in the test environment which uses dummy keys
# The chat tool will return an error for dummy keys, which is expected
try:
try:
result = await tool.execute(
{"prompt": normal_prompt, "model": "gemini-2.5-flash", "working_directory_absolute_path": temp_dir}
)
except ToolExecutionError as exc:
output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
else:
assert len(result) == 1
output = json.loads(result[0].text)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
# Whether provider succeeds or fails, we should not hit the resend_prompt branch
assert output["status"] != "resend_prompt"
@pytest.mark.asyncio
async def test_chat_prompt_file_handling(self):
"""Test that chat tool correctly handles prompt.txt files with reasonable size."""
tool = ChatTool()
# Use a smaller prompt that won't exceed limit when combined with system prompt
reasonable_prompt = "This is a reasonable sized prompt for testing prompt.txt file handling."
# Create a temp file with reasonable content
temp_dir = tempfile.mkdtemp()
temp_prompt_file = os.path.join(temp_dir, "prompt.txt")
with open(temp_prompt_file, "w") as f:
f.write(reasonable_prompt)
try:
try:
result = await tool.execute(
{
"prompt": "",
"absolute_file_paths": [temp_prompt_file],
"model": "gemini-2.5-flash",
"working_directory_absolute_path": temp_dir,
}
)
except ToolExecutionError as exc:
output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
else:
assert len(result) == 1
output = json.loads(result[0].text)
# The test may fail with dummy API keys, which is expected behavior.
# We're mainly testing that the tool processes prompt files correctly without size errors.
assert output["status"] != "resend_prompt"
finally:
# Cleanup
shutil.rmtree(temp_dir)
@pytest.mark.asyncio
async def test_codereview_large_focus(self, large_prompt):
"""Test that codereview tool detects large focus_on field using real integration testing."""
import importlib
import os
tool = CodeReviewTool()
# Save original environment
original_env = {
"OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY"),
"DEFAULT_MODEL": os.environ.get("DEFAULT_MODEL"),
}
try:
# Set up environment for real provider resolution
os.environ["OPENAI_API_KEY"] = "sk-test-key-large-focus-test-not-real"
os.environ["DEFAULT_MODEL"] = "o3-mini"
# Clear other provider keys to isolate to OpenAI
for key in ["GEMINI_API_KEY", "XAI_API_KEY", "OPENROUTER_API_KEY"]:
os.environ.pop(key, None)
# Reload config and clear registry
import config
importlib.reload(config)
from providers.registry import ModelProviderRegistry
ModelProviderRegistry._instance = None
# Test with real provider resolution
try:
args = {
"step": "initial review setup",
"step_number": 1,
"total_steps": 1,
"next_step_required": False,
"findings": "Initial testing",
"relevant_files": ["/some/file.py"],
"files_checked": ["/some/file.py"],
"focus_on": large_prompt,
"prompt": "Test code review for validation purposes",
"model": "o3-mini",
}
try:
result = await tool.execute(args)
except ToolExecutionError as exc:
output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
else:
assert len(result) == 1
output = json.loads(result[0].text)
# The large focus_on may trigger the resend_prompt guard before provider access.
# When the guard does not trigger, auto-mode falls back to provider selection and
# returns an error about the unavailable model. Both behaviors are acceptable for this test.
if output.get("status") == "resend_prompt":
assert output["metadata"]["prompt_size"] == len(large_prompt)
else:
assert output.get("status") == "error"
assert "Model" in output.get("content", "")
except Exception as e:
# If we get an unexpected exception, ensure it's not a mock artifact
error_msg = str(e)
assert "MagicMock" not in error_msg
assert "'<' not supported between instances" not in error_msg
# Should be a real provider error (API, authentication, etc.)
assert any(
phrase in error_msg
for phrase in ["API", "key", "authentication", "provider", "network", "connection"]
)
finally:
# Restore environment
for key, value in original_env.items():
if value is not None:
os.environ[key] = value
else:
os.environ.pop(key, None)
# Reload config and clear registry
importlib.reload(config)
ModelProviderRegistry._instance = None
# NOTE: Precommit test has been removed because the precommit tool has been
# refactored to use a workflow-based pattern instead of accepting simple prompt/path fields.
# The new precommit tool requires workflow fields like: step, step_number, total_steps,
# next_step_required, findings, etc. See simulator_tests/test_precommitworkflow_validation.py
# for comprehensive workflow testing including large prompt handling.
# NOTE: Debug tool tests have been commented out because the debug tool has been
# refactored to use a self-investigation pattern instead of accepting a prompt field.
# The new debug tool requires fields like: step, step_number, total_steps, next_step_required, findings
# and doesn't have the "resend_prompt" functionality for large prompts.
# @pytest.mark.asyncio
# async def test_debug_large_error_description(self, large_prompt):
# """Test that debug tool detects large error_description."""
# tool = DebugIssueTool()
# result = await tool.execute({"prompt": large_prompt})
#
# assert len(result) == 1
# output = json.loads(result[0].text)
# assert output["status"] == "resend_prompt"
# @pytest.mark.asyncio
# async def test_debug_large_error_context(self, large_prompt, normal_prompt):
# """Test that debug tool detects large error_context."""
# tool = DebugIssueTool()
# result = await tool.execute({"prompt": normal_prompt, "error_context": large_prompt})
#
# assert len(result) == 1
# output = json.loads(result[0].text)
# assert output["status"] == "resend_prompt"
# Removed: test_analyze_large_question - workflow tool handles large prompts differently
@pytest.mark.asyncio
async def test_multiple_files_with_prompt_txt(self, temp_prompt_file):
"""Test handling of prompt.txt alongside other files."""
tool = ChatTool()
other_file = "/some/other/file.py"
with (
patch("utils.model_context.ModelContext") as mock_model_context_cls,
patch.object(tool, "handle_prompt_file") as mock_handle_prompt,
patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files,
):
mock_provider = MagicMock()
mock_provider.get_provider_type.return_value = MagicMock(value="google")
mock_provider.generate_content.return_value = MagicMock(
content="Success",
usage={"input_tokens": 10, "output_tokens": 20, "total_tokens": 30},
model_name="gemini-2.5-flash",
metadata={"finish_reason": "STOP"},
)
from utils.model_context import TokenAllocation
mock_model_context = MagicMock()
mock_model_context.model_name = "gemini-2.5-flash"
mock_model_context.provider = mock_provider
mock_model_context.capabilities = MagicMock(supports_extended_thinking=False)
mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
total_tokens=1_000_000,
content_tokens=800_000,
response_tokens=200_000,
file_tokens=320_000,
history_tokens=320_000,
)
mock_model_context_cls.return_value = mock_model_context
# Return the prompt content and updated files list (without prompt.txt)
mock_handle_prompt.return_value = ("Large prompt content from file", [other_file])
# Mock the centralized file preparation method
mock_prepare_files.return_value = ("File content", [other_file])
# Use a small prompt to avoid triggering size limit
await tool.execute(
{
"prompt": "Test prompt",
"absolute_file_paths": [temp_prompt_file, other_file],
"working_directory_absolute_path": os.path.dirname(temp_prompt_file),
}
)
# Verify handle_prompt_file was called with the original files list
mock_handle_prompt.assert_called_once_with([temp_prompt_file, other_file])
# Verify _prepare_file_content_for_prompt was called with the updated files list (without prompt.txt)
mock_prepare_files.assert_called_once()
files_arg = mock_prepare_files.call_args[0][0]
assert len(files_arg) == 1
assert files_arg[0] == other_file
temp_dir = os.path.dirname(temp_prompt_file)
shutil.rmtree(temp_dir)
@pytest.mark.asyncio
async def test_boundary_case_exactly_at_limit(self):
"""Test prompt exactly at MCP_PROMPT_SIZE_LIMIT characters (should pass with the fix)."""
tool = ChatTool()
exact_prompt = "x" * MCP_PROMPT_SIZE_LIMIT
# Mock the model provider to avoid real API calls
with patch.object(tool, "get_model_provider") as mock_get_provider:
mock_provider = MagicMock()
mock_provider.get_provider_type.return_value = MagicMock(value="google")
mock_provider.get_capabilities.return_value = MagicMock(supports_extended_thinking=False)
mock_provider.generate_content.return_value = MagicMock(
content="Response to the large prompt",
usage={"input_tokens": 12000, "output_tokens": 10, "total_tokens": 12010},
model_name="gemini-2.5-flash",
metadata={"finish_reason": "STOP"},
)
mock_get_provider.return_value = mock_provider
# With the fix, this should now pass because we check at MCP transport boundary before adding internal content
temp_dir = tempfile.mkdtemp()
try:
try:
result = await tool.execute({"prompt": exact_prompt, "working_directory_absolute_path": temp_dir})
except ToolExecutionError as exc:
output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
else:
output = json.loads(result[0].text)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
assert output["status"] != "resend_prompt"
@pytest.mark.asyncio
async def test_boundary_case_just_over_limit(self):
"""Test prompt just over MCP_PROMPT_SIZE_LIMIT characters (should trigger file request)."""
tool = ChatTool()
over_prompt = "x" * (MCP_PROMPT_SIZE_LIMIT + 1)
temp_dir = tempfile.mkdtemp()
try:
try:
result = await tool.execute({"prompt": over_prompt, "working_directory_absolute_path": temp_dir})
except ToolExecutionError as exc:
output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
else:
output = json.loads(result[0].text)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
assert output["status"] == "resend_prompt"
@pytest.mark.asyncio
async def test_empty_prompt_no_file(self):
"""Test empty prompt without prompt.txt file."""
tool = ChatTool()
with patch.object(tool, "get_model_provider") as mock_get_provider:
mock_provider = MagicMock()
mock_provider.get_provider_type.return_value = MagicMock(value="google")
mock_provider.get_capabilities.return_value = MagicMock(supports_extended_thinking=False)
mock_provider.generate_content.return_value = MagicMock(
content="Success",
usage={"input_tokens": 10, "output_tokens": 20, "total_tokens": 30},
model_name="gemini-2.5-flash",
metadata={"finish_reason": "STOP"},
)
mock_get_provider.return_value = mock_provider
temp_dir = tempfile.mkdtemp()
try:
try:
result = await tool.execute({"prompt": "", "working_directory_absolute_path": temp_dir})
except ToolExecutionError as exc:
output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
else:
output = json.loads(result[0].text)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
assert output["status"] != "resend_prompt"
@pytest.mark.asyncio
async def test_prompt_file_read_error(self):
"""Test handling when prompt.txt can't be read."""
from tests.mock_helpers import create_mock_provider
tool = ChatTool()
bad_file = "/nonexistent/prompt.txt"
with (
patch.object(tool, "get_model_provider") as mock_get_provider,
patch("utils.model_context.ModelContext") as mock_model_context_class,
):
mock_provider = create_mock_provider(model_name="gemini-2.5-flash", context_window=1_048_576)
mock_provider.generate_content.return_value.content = "Success"
mock_get_provider.return_value = mock_provider
# Mock ModelContext to avoid the comparison issue
from utils.model_context import TokenAllocation
mock_model_context = MagicMock()
mock_model_context.model_name = "gemini-2.5-flash"
mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
total_tokens=1_048_576,
content_tokens=838_861,
response_tokens=209_715,
file_tokens=335_544,
history_tokens=335_544,
)
mock_model_context_class.return_value = mock_model_context
# Should continue with empty prompt when file can't be read
temp_dir = tempfile.mkdtemp()
try:
try:
result = await tool.execute(
{"prompt": "", "absolute_file_paths": [bad_file], "working_directory_absolute_path": temp_dir}
)
except ToolExecutionError as exc:
output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
else:
output = json.loads(result[0].text)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
assert output["status"] != "resend_prompt"
@pytest.mark.asyncio
async def test_large_file_context_does_not_trigger_mcp_prompt_limit(self, tmp_path):
"""Large context files should not be blocked by MCP prompt limit enforcement."""
from tests.mock_helpers import create_mock_provider
from utils.model_context import TokenAllocation
tool = ChatTool()
# Create a file significantly larger than MCP_PROMPT_SIZE_LIMIT characters
large_content = "A" * (MCP_PROMPT_SIZE_LIMIT * 5)
large_file = tmp_path / "huge_context.txt"
large_file.write_text(large_content)
mock_provider = create_mock_provider(model_name="flash")
class DummyModelContext:
def __init__(self, provider):
self.model_name = "flash"
self._provider = provider
self.capabilities = provider.get_capabilities("flash")
@property
def provider(self):
return self._provider
def calculate_token_allocation(self):
return TokenAllocation(
total_tokens=1_048_576,
content_tokens=838_861,
response_tokens=209_715,
file_tokens=335_544,
history_tokens=335_544,
)
dummy_context = DummyModelContext(mock_provider)
with patch.object(tool, "get_model_provider", return_value=mock_provider):
result = await tool.execute(
{
"prompt": "Summarize the design decisions",
"absolute_file_paths": [str(large_file)],
"model": "flash",
"working_directory_absolute_path": str(tmp_path),
"_model_context": dummy_context,
}
)
output = json.loads(result[0].text)
assert output["status"] != "resend_prompt"
@pytest.mark.asyncio
async def test_mcp_boundary_with_large_internal_context(self):
"""
Critical test: Ensure MCP_PROMPT_SIZE_LIMIT only applies to user input (MCP boundary),
NOT to internal context like conversation history, system prompts, or file content.
This test verifies that even if our internal prompt (with system prompts, history, etc.)
exceeds MCP_PROMPT_SIZE_LIMIT, it should still work as long as the user's input is small.
"""
tool = ChatTool()
# Small user input that should pass MCP boundary check
small_user_prompt = "What is the weather like?"
# Mock a huge conversation history that would exceed MCP limits if incorrectly checked
huge_history = "x" * (MCP_PROMPT_SIZE_LIMIT * 2) # 100K chars = way over 50K limit
temp_dir = tempfile.mkdtemp()
original_prepare_prompt = tool.prepare_prompt
try:
with (
patch.object(tool, "get_model_provider") as mock_get_provider,
patch("utils.model_context.ModelContext") as mock_model_context_class,
):
from tests.mock_helpers import create_mock_provider
from utils.model_context import TokenAllocation
mock_provider = create_mock_provider(model_name="flash")
mock_get_provider.return_value = mock_provider
mock_model_context = MagicMock()
mock_model_context.model_name = "flash"
mock_model_context.provider = mock_provider
mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
total_tokens=1_048_576,
content_tokens=838_861,
response_tokens=209_715,
file_tokens=335_544,
history_tokens=335_544,
)
mock_model_context_class.return_value = mock_model_context
async def mock_prepare_prompt(request):
normal_prompt = await original_prepare_prompt(request)
huge_internal_prompt = f"{normal_prompt}\n\n=== HUGE INTERNAL CONTEXT ===\n{huge_history}"
assert len(huge_internal_prompt) > MCP_PROMPT_SIZE_LIMIT
return huge_internal_prompt
tool.prepare_prompt = mock_prepare_prompt
result = await tool.execute(
{"prompt": small_user_prompt, "model": "flash", "working_directory_absolute_path": temp_dir}
)
output = json.loads(result[0].text)
assert output["status"] != "resend_prompt"
mock_provider.generate_content.assert_called_once()
call_kwargs = mock_provider.generate_content.call_args[1]
actual_prompt = call_kwargs.get("prompt")
assert len(actual_prompt) > MCP_PROMPT_SIZE_LIMIT
assert huge_history in actual_prompt
assert small_user_prompt in actual_prompt
finally:
tool.prepare_prompt = original_prepare_prompt
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.mark.asyncio
async def test_mcp_boundary_vs_internal_processing_distinction(self):
"""
Test that clearly demonstrates the distinction between:
1. MCP transport boundary (user input - SHOULD be limited)
2. Internal processing (system prompts, files, history - should NOT be limited)
"""
tool = ChatTool()
# Test case 1: Large user input should fail at MCP boundary
large_user_input = "x" * (MCP_PROMPT_SIZE_LIMIT + 1000)
temp_dir = tempfile.mkdtemp()
try:
try:
result = await tool.execute(
{"prompt": large_user_input, "model": "flash", "working_directory_absolute_path": temp_dir}
)
except ToolExecutionError as exc:
output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
else:
output = json.loads(result[0].text)
assert output["status"] == "resend_prompt" # Should fail
assert "too large for MCP's token limits" in output["content"]
# Test case 2: Small user input should succeed even with huge internal processing
small_user_input = "Hello"
try:
result = await tool.execute(
{
"prompt": small_user_input,
"model": "gemini-2.5-flash",
"working_directory_absolute_path": temp_dir,
}
)
except ToolExecutionError as exc:
output = json.loads(exc.payload if hasattr(exc, "payload") else str(exc))
else:
output = json.loads(result[0].text)
# The test will fail with dummy API keys, which is expected behavior
# We're mainly testing that the tool processes small prompts correctly without size errors
assert output["status"] != "resend_prompt"
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.mark.asyncio
async def test_continuation_with_huge_conversation_history(self):
"""
Test that continuation calls with huge conversation history work correctly.
This simulates the exact scenario where conversation history builds up and exceeds
MCP_PROMPT_SIZE_LIMIT but should still work since history is internal processing.
"""
tool = ChatTool()
# Small user input for continuation
small_continuation_prompt = "Continue the discussion"
# Mock huge conversation history (simulates many turns of conversation)
# Calculate repetitions needed to exceed MCP_PROMPT_SIZE_LIMIT
base_text = "=== CONVERSATION HISTORY ===\n"
repeat_text = "Previous message content\n"
# Add buffer to ensure we exceed the limit
target_size = MCP_PROMPT_SIZE_LIMIT + 1000
available_space = target_size - len(base_text)
repetitions_needed = (available_space // len(repeat_text)) + 1
huge_conversation_history = base_text + (repeat_text * repetitions_needed)
# Ensure the history exceeds MCP limits
assert len(huge_conversation_history) > MCP_PROMPT_SIZE_LIMIT
temp_dir = tempfile.mkdtemp()
with (
patch.object(tool, "get_model_provider") as mock_get_provider,
patch("utils.model_context.ModelContext") as mock_model_context_class,
):
from tests.mock_helpers import create_mock_provider
mock_provider = create_mock_provider(model_name="flash")
mock_provider.generate_content.return_value.content = "Continuing our conversation..."
mock_get_provider.return_value = mock_provider
# Mock ModelContext to avoid the comparison issue
from utils.model_context import TokenAllocation
mock_model_context = MagicMock()
mock_model_context.model_name = "flash"
mock_model_context.provider = mock_provider
mock_model_context.calculate_token_allocation.return_value = TokenAllocation(
total_tokens=1_048_576,
content_tokens=838_861,
response_tokens=209_715,
file_tokens=335_544,
history_tokens=335_544,
)
mock_model_context_class.return_value = mock_model_context
# Simulate continuation by having the request contain embedded conversation history
# This mimics what server.py does when it embeds conversation history
request_with_history = {
"prompt": f"{huge_conversation_history}\n\n=== CURRENT REQUEST ===\n{small_continuation_prompt}",
"model": "flash",
"continuation_id": "test_thread_123",
"working_directory_absolute_path": temp_dir,
}
# Mock the conversation history embedding to simulate server.py behavior
original_execute = tool.__class__.execute
async def mock_execute_with_history(self, arguments):
# Check if this has continuation_id (simulating server.py logic)
if arguments.get("continuation_id"):
# Simulate the case where conversation history is already embedded in prompt
# by server.py before calling the tool
field_value = arguments.get("prompt", "")
if "=== CONVERSATION HISTORY ===" in field_value:
# Set the flag that history is embedded
self._has_embedded_history = True
# The prompt field contains both history AND user input
# But we should only check the user input part for MCP boundary
# (This is what our fix ensures happens in prepare_prompt)
# Call original execute
return await original_execute(self, arguments)
tool.__class__.execute = mock_execute_with_history
try:
# This should succeed because:
# 1. The actual user input is small (passes MCP boundary check)
# 2. The huge conversation history is internal processing (not subject to MCP limits)
result = await tool.execute(request_with_history)
output = json.loads(result[0].text)
# Should succeed even though total prompt with history is huge
assert output["status"] != "resend_prompt"
assert "Continuing our conversation" in output["content"]
# Verify the model was called with the complete prompt (including huge history)
mock_provider.generate_content.assert_called_once()
call_kwargs = mock_provider.generate_content.call_args[1]
final_prompt = call_kwargs.get("prompt")
# The final prompt should contain both history and user input
assert huge_conversation_history in final_prompt
assert small_continuation_prompt in final_prompt
# And it should be huge (proving we don't limit internal processing)
assert len(final_prompt) > MCP_PROMPT_SIZE_LIMIT
finally:
# Restore original execute method
tool.__class__.execute = original_execute
shutil.rmtree(temp_dir, ignore_errors=True)
if __name__ == "__main__":
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/tools/refactor.py:
--------------------------------------------------------------------------------
```python
"""
Refactor tool - Step-by-step refactoring analysis with expert validation
This tool provides a structured workflow for comprehensive code refactoring analysis.
It guides CLI agent through systematic investigation steps with forced pauses between each step
to ensure thorough code examination, refactoring opportunity identification, and quality
assessment before proceeding. The tool supports complex refactoring scenarios including
code smell detection, decomposition planning, modernization opportunities, and organization improvements.
Key features:
- Step-by-step refactoring investigation workflow with progress tracking
- Context-aware file embedding (references during investigation, full content for analysis)
- Automatic refactoring opportunity tracking with type and severity classification
- Expert analysis integration with external models
- Support for focused refactoring types (codesmells, decompose, modernize, organization)
- Confidence-based workflow optimization with refactor completion tracking
"""
import logging
from typing import TYPE_CHECKING, Any, Literal, Optional
from pydantic import Field, model_validator
if TYPE_CHECKING:
from tools.models import ToolModelCategory
from config import TEMPERATURE_ANALYTICAL
from systemprompts import REFACTOR_PROMPT
from tools.shared.base_models import WorkflowRequest
from .workflow.base import WorkflowTool
logger = logging.getLogger(__name__)
# Tool-specific field descriptions for refactor tool
REFACTOR_FIELD_DESCRIPTIONS = {
"step": (
"The refactoring plan. Step 1: State strategy. Later steps: Report findings. "
"CRITICAL: Examine code for smells, and opportunities for decomposition, modernization, and organization. "
"Use 'relevant_files' for code. FORBIDDEN: Large code snippets."
),
"step_number": (
"The index of the current step in the refactoring investigation sequence, beginning at 1. Each step should "
"build upon or revise the previous one."
),
"total_steps": (
"Your current estimate for how many steps will be needed to complete the refactoring investigation. "
"Adjust as new opportunities emerge."
),
"next_step_required": (
"Set to true if you plan to continue the investigation with another step. False means you believe the "
"refactoring analysis is complete and ready for expert validation."
),
"findings": (
"Summary of discoveries from this step, including code smells and opportunities for decomposition, modernization, or organization. "
"Document both strengths and weaknesses. In later steps, confirm or update past findings."
),
"files_checked": (
"List all files examined (absolute paths). Include even ruled-out files to track exploration path."
),
"relevant_files": (
"Subset of files_checked with code requiring refactoring (absolute paths). Include files with "
"code smells, decomposition needs, or improvement opportunities."
),
"relevant_context": (
"List methods/functions central to refactoring opportunities, in 'ClassName.methodName' or 'functionName' format. "
"Prioritize those with code smells or needing improvement."
),
"issues_found": (
"Refactoring opportunities as dictionaries with 'severity' (critical/high/medium/low), "
"'type' (codesmells/decompose/modernize/organization), and 'description'. "
"Include all improvement opportunities found."
),
"confidence": (
"Your confidence in refactoring analysis: exploring (starting), incomplete (significant work remaining), "
"partial (some opportunities found, more analysis needed), complete (comprehensive analysis finished, "
"all major opportunities identified). "
"WARNING: Use 'complete' ONLY when fully analyzed and can provide recommendations without expert help. "
"'complete' PREVENTS expert validation. Use 'partial' for large files or uncertain analysis."
),
"images": (
"Optional list of absolute paths to architecture diagrams, UI mockups, design documents, or visual references "
"that help with refactoring context. Only include if they materially assist understanding or assessment."
),
"refactor_type": "Type of refactoring analysis to perform (codesmells, decompose, modernize, organization)",
"focus_areas": "Specific areas to focus on (e.g., 'performance', 'readability', 'maintainability', 'security')",
"style_guide_examples": (
"Optional existing code files to use as style/pattern reference (must be FULL absolute paths to real files / "
"folders - DO NOT SHORTEN). These files represent the target coding style and patterns for the project."
),
}
class RefactorRequest(WorkflowRequest):
"""Request model for refactor workflow investigation steps"""
# Required fields for each investigation step
step: str = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["step"])
step_number: int = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["step_number"])
total_steps: int = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["total_steps"])
next_step_required: bool = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["next_step_required"])
# Investigation tracking fields
findings: str = Field(..., description=REFACTOR_FIELD_DESCRIPTIONS["findings"])
files_checked: list[str] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["files_checked"])
relevant_files: list[str] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["relevant_files"])
relevant_context: list[str] = Field(
default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["relevant_context"]
)
issues_found: list[dict] = Field(default_factory=list, description=REFACTOR_FIELD_DESCRIPTIONS["issues_found"])
confidence: Optional[Literal["exploring", "incomplete", "partial", "complete"]] = Field(
"incomplete", description=REFACTOR_FIELD_DESCRIPTIONS["confidence"]
)
# Optional images for visual context
images: Optional[list[str]] = Field(default=None, description=REFACTOR_FIELD_DESCRIPTIONS["images"])
# Refactor-specific fields (only used in step 1 to initialize)
refactor_type: Optional[Literal["codesmells", "decompose", "modernize", "organization"]] = Field(
"codesmells", description=REFACTOR_FIELD_DESCRIPTIONS["refactor_type"]
)
focus_areas: Optional[list[str]] = Field(None, description=REFACTOR_FIELD_DESCRIPTIONS["focus_areas"])
style_guide_examples: Optional[list[str]] = Field(
None, description=REFACTOR_FIELD_DESCRIPTIONS["style_guide_examples"]
)
# Override inherited fields to exclude them from schema (except model which needs to be available)
temperature: Optional[float] = Field(default=None, exclude=True)
thinking_mode: Optional[str] = Field(default=None, exclude=True)
@model_validator(mode="after")
def validate_step_one_requirements(self):
"""Ensure step 1 has required relevant_files field."""
if self.step_number == 1 and not self.relevant_files:
raise ValueError(
"Step 1 requires 'relevant_files' field to specify code files or directories to analyze for refactoring"
)
return self
class RefactorTool(WorkflowTool):
"""
Refactor tool for step-by-step refactoring analysis and expert validation.
This tool implements a structured refactoring workflow that guides users through
methodical investigation steps, ensuring thorough code examination, refactoring opportunity
identification, and improvement assessment before reaching conclusions. It supports complex
refactoring scenarios including code smell detection, decomposition planning, modernization
opportunities, and organization improvements.
"""
def __init__(self):
super().__init__()
self.initial_request = None
self.refactor_config = {}
def get_name(self) -> str:
return "refactor"
def get_description(self) -> str:
return (
"Analyzes code for refactoring opportunities with systematic investigation. "
"Use for code smell detection, decomposition planning, modernization, and maintainability improvements. "
"Guides through structured analysis with expert validation."
)
def get_system_prompt(self) -> str:
return REFACTOR_PROMPT
def get_default_temperature(self) -> float:
return TEMPERATURE_ANALYTICAL
def get_model_category(self) -> "ToolModelCategory":
"""Refactor workflow requires thorough analysis and reasoning"""
from tools.models import ToolModelCategory
return ToolModelCategory.EXTENDED_REASONING
def get_workflow_request_model(self):
"""Return the refactor workflow-specific request model."""
return RefactorRequest
def get_input_schema(self) -> dict[str, Any]:
"""Generate input schema using WorkflowSchemaBuilder with refactor-specific overrides."""
from .workflow.schema_builders import WorkflowSchemaBuilder
# Refactor workflow-specific field overrides
refactor_field_overrides = {
"step": {
"type": "string",
"description": REFACTOR_FIELD_DESCRIPTIONS["step"],
},
"step_number": {
"type": "integer",
"minimum": 1,
"description": REFACTOR_FIELD_DESCRIPTIONS["step_number"],
},
"total_steps": {
"type": "integer",
"minimum": 1,
"description": REFACTOR_FIELD_DESCRIPTIONS["total_steps"],
},
"next_step_required": {
"type": "boolean",
"description": REFACTOR_FIELD_DESCRIPTIONS["next_step_required"],
},
"findings": {
"type": "string",
"description": REFACTOR_FIELD_DESCRIPTIONS["findings"],
},
"files_checked": {
"type": "array",
"items": {"type": "string"},
"description": REFACTOR_FIELD_DESCRIPTIONS["files_checked"],
},
"relevant_files": {
"type": "array",
"items": {"type": "string"},
"description": REFACTOR_FIELD_DESCRIPTIONS["relevant_files"],
},
"confidence": {
"type": "string",
"enum": ["exploring", "incomplete", "partial", "complete"],
"default": "incomplete",
"description": REFACTOR_FIELD_DESCRIPTIONS["confidence"],
},
"issues_found": {
"type": "array",
"items": {"type": "object"},
"description": REFACTOR_FIELD_DESCRIPTIONS["issues_found"],
},
"images": {
"type": "array",
"items": {"type": "string"},
"description": REFACTOR_FIELD_DESCRIPTIONS["images"],
},
# Refactor-specific fields (for step 1)
# Note: Use relevant_files field instead of files for consistency
"refactor_type": {
"type": "string",
"enum": ["codesmells", "decompose", "modernize", "organization"],
"default": "codesmells",
"description": REFACTOR_FIELD_DESCRIPTIONS["refactor_type"],
},
"focus_areas": {
"type": "array",
"items": {"type": "string"},
"description": REFACTOR_FIELD_DESCRIPTIONS["focus_areas"],
},
"style_guide_examples": {
"type": "array",
"items": {"type": "string"},
"description": REFACTOR_FIELD_DESCRIPTIONS["style_guide_examples"],
},
}
# Use WorkflowSchemaBuilder with refactor-specific tool fields
return WorkflowSchemaBuilder.build_schema(
tool_specific_fields=refactor_field_overrides,
model_field_schema=self.get_model_field_schema(),
auto_mode=self.is_effective_auto_mode(),
tool_name=self.get_name(),
)
def get_required_actions(
self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
) -> list[str]:
"""Define required actions for each investigation phase."""
if step_number == 1:
# Initial refactoring investigation tasks
return [
"Read and understand the code files specified for refactoring analysis",
"Examine the overall structure, architecture, and design patterns used",
"Identify potential code smells: long methods, large classes, duplicate code, complex conditionals",
"Look for decomposition opportunities: oversized components that could be broken down",
"Check for modernization opportunities: outdated patterns, deprecated features, newer language constructs",
"Assess organization: logical grouping, file structure, naming conventions, module boundaries",
"Document specific refactoring opportunities with file locations and line numbers",
]
elif confidence in ["exploring", "incomplete"]:
# Need deeper investigation
return [
"Examine specific code sections you've identified as needing refactoring",
"Analyze code smells in detail: complexity, coupling, cohesion issues",
"Investigate decomposition opportunities: identify natural breaking points for large components",
"Look for modernization possibilities: language features, patterns, libraries that could improve the code",
"Check organization issues: related functionality that could be better grouped or structured",
"Trace dependencies and relationships between components to understand refactoring impact",
"Prioritize refactoring opportunities by impact and effort required",
]
elif confidence == "partial":
# Close to completion - need final verification
return [
"Verify all identified refactoring opportunities have been properly documented with locations",
"Check for any missed opportunities in areas not yet thoroughly examined",
"Confirm that refactoring suggestions align with the specified refactor_type and focus_areas",
"Ensure refactoring opportunities are prioritized by severity and impact",
"Validate that proposed changes would genuinely improve code quality without breaking functionality",
"Double-check that all relevant files and code elements are captured in your analysis",
]
else:
# General investigation needed
return [
"Continue examining the codebase for additional refactoring opportunities",
"Gather more evidence using appropriate code analysis techniques",
"Test your assumptions about code quality and improvement possibilities",
"Look for patterns that confirm or refute your current refactoring assessment",
"Focus on areas that haven't been thoroughly examined for refactoring potential",
]
def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
"""
Decide when to call external model based on investigation completeness.
Don't call expert analysis if the CLI agent has certain confidence and complete refactoring - trust their judgment.
"""
# Check if user requested to skip assistant model
if request and not self.get_request_use_assistant_model(request):
return False
# Check if refactoring work is complete
if request and request.confidence == "complete":
return False
# Check if we have meaningful investigation data
return (
len(consolidated_findings.relevant_files) > 0
or len(consolidated_findings.findings) >= 2
or len(consolidated_findings.issues_found) > 0
)
def prepare_expert_analysis_context(self, consolidated_findings) -> str:
"""Prepare context for external model call for final refactoring validation."""
context_parts = [
f"=== REFACTORING ANALYSIS REQUEST ===\\n{self.initial_request or 'Refactoring workflow initiated'}\\n=== END REQUEST ==="
]
# Add investigation summary
investigation_summary = self._build_refactoring_summary(consolidated_findings)
context_parts.append(
f"\\n=== AGENT'S REFACTORING INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
)
# Add refactor configuration context if available
if self.refactor_config:
config_text = "\\n".join(f"- {key}: {value}" for key, value in self.refactor_config.items() if value)
context_parts.append(f"\\n=== REFACTOR CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")
# Add relevant code elements if available
if consolidated_findings.relevant_context:
methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")
# Add refactoring opportunities found if available
if consolidated_findings.issues_found:
opportunities_text = "\\n".join(
f"[{issue.get('severity', 'unknown').upper()}] {issue.get('type', 'unknown').upper()}: {issue.get('description', 'No description')}"
for issue in consolidated_findings.issues_found
)
context_parts.append(
f"\\n=== REFACTORING OPPORTUNITIES ===\\n{opportunities_text}\\n=== END OPPORTUNITIES ==="
)
# Add assessment evolution if available
if consolidated_findings.hypotheses:
assessments_text = "\\n".join(
f"Step {h['step']} ({h['confidence']} confidence): {h['hypothesis']}"
for h in consolidated_findings.hypotheses
)
context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")
# Add images if available
if consolidated_findings.images:
images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
context_parts.append(
f"\\n=== VISUAL REFACTORING INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
)
return "\\n".join(context_parts)
def _build_refactoring_summary(self, consolidated_findings) -> str:
"""Prepare a comprehensive summary of the refactoring investigation."""
summary_parts = [
"=== SYSTEMATIC REFACTORING INVESTIGATION SUMMARY ===",
f"Total steps: {len(consolidated_findings.findings)}",
f"Files examined: {len(consolidated_findings.files_checked)}",
f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
f"Refactoring opportunities identified: {len(consolidated_findings.issues_found)}",
"",
"=== INVESTIGATION PROGRESSION ===",
]
for finding in consolidated_findings.findings:
summary_parts.append(finding)
return "\\n".join(summary_parts)
def should_include_files_in_expert_prompt(self) -> bool:
"""Include files in expert analysis for comprehensive refactoring validation."""
return True
def should_embed_system_prompt(self) -> bool:
"""Embed system prompt in expert analysis for proper context."""
return True
def get_expert_thinking_mode(self) -> str:
"""Use high thinking mode for thorough refactoring analysis."""
return "high"
def get_expert_analysis_instruction(self) -> str:
"""Get specific instruction for refactoring expert analysis."""
return (
"Please provide comprehensive refactoring analysis based on the investigation findings. "
"Focus on validating the identified opportunities, ensuring completeness of the analysis, "
"and providing final recommendations for refactoring implementation, following the structured "
"format specified in the system prompt."
)
# Hook method overrides for refactor-specific behavior
def prepare_step_data(self, request) -> dict:
"""
Map refactor workflow-specific fields for internal processing.
"""
step_data = {
"step": request.step,
"step_number": request.step_number,
"findings": request.findings,
"files_checked": request.files_checked,
"relevant_files": request.relevant_files,
"relevant_context": request.relevant_context,
"issues_found": request.issues_found,
"confidence": request.confidence,
"hypothesis": request.findings, # Map findings to hypothesis for compatibility
"images": request.images or [],
}
return step_data
def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
"""
Refactor workflow skips expert analysis when the CLI agent has "complete" confidence.
"""
return request.confidence == "complete" and not request.next_step_required
def store_initial_issue(self, step_description: str):
"""Store initial request for expert analysis."""
self.initial_request = step_description
# Inheritance hook methods for refactor-specific behavior
# Override inheritance hooks for refactor-specific behavior
def get_completion_status(self) -> str:
"""Refactor tools use refactor-specific status."""
return "refactoring_analysis_complete_ready_for_implementation"
def get_completion_data_key(self) -> str:
"""Refactor uses 'complete_refactoring' key."""
return "complete_refactoring"
def get_final_analysis_from_request(self, request):
"""Refactor tools use 'findings' field."""
return request.findings
def get_confidence_level(self, request) -> str:
"""Refactor tools use 'complete' for high confidence."""
return "complete"
def get_completion_message(self) -> str:
"""Refactor-specific completion message."""
return (
"Refactoring analysis complete with COMPLETE confidence. You have identified all significant "
"refactoring opportunities and provided comprehensive analysis. MANDATORY: Present the user with "
"the complete refactoring results organized by type and severity, and IMMEDIATELY proceed with "
"implementing the highest priority refactoring opportunities or provide specific guidance for "
"improvements. Focus on actionable refactoring steps."
)
def get_skip_reason(self) -> str:
"""Refactor-specific skip reason."""
return "Completed comprehensive refactoring analysis with full confidence locally"
def get_skip_expert_analysis_status(self) -> str:
"""Refactor-specific expert analysis skip status."""
return "skipped_due_to_complete_refactoring_confidence"
def prepare_work_summary(self) -> str:
"""Refactor-specific work summary."""
return self._build_refactoring_summary(self.consolidated_findings)
def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
"""
Refactor-specific completion message.
Args:
expert_analysis_used: True if expert analysis was successfully executed
"""
base_message = (
"REFACTORING ANALYSIS IS COMPLETE. You MUST now summarize and present ALL refactoring opportunities "
"organized by type (codesmells → decompose → modernize → organization) and severity (Critical → High → "
"Medium → Low), specific code locations with line numbers, and exact recommendations for improvement. "
"Clearly prioritize the top 3 refactoring opportunities that need immediate attention. Provide concrete, "
"actionable guidance for each opportunity—make it easy for a developer to understand exactly what needs "
"to be refactored and how to implement the improvements."
)
# Add expert analysis guidance only when expert analysis was actually used
if expert_analysis_used:
expert_guidance = self.get_expert_analysis_guidance()
if expert_guidance:
return f"{base_message}\n\n{expert_guidance}"
return base_message
def get_expert_analysis_guidance(self) -> str:
"""
Get additional guidance for handling expert analysis results in refactor context.
Returns:
Additional guidance text for validating and using expert analysis findings
"""
return (
"IMPORTANT: Expert refactoring analysis has been provided above. You MUST review "
"the expert's architectural insights and refactoring recommendations. Consider whether "
"the expert's suggestions align with the codebase's evolution trajectory and current "
"team priorities. Pay special attention to any breaking changes, migration complexity, "
"or performance implications highlighted by the expert. Present a balanced view that "
"considers both immediate benefits and long-term maintainability."
)
def get_step_guidance_message(self, request) -> str:
"""
Refactor-specific step guidance with detailed investigation instructions.
"""
step_guidance = self.get_refactor_step_guidance(request.step_number, request.confidence, request)
return step_guidance["next_steps"]
def get_refactor_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
"""
Provide step-specific guidance for refactor workflow.
"""
# Generate the next steps instruction based on required actions
required_actions = self.get_required_actions(step_number, confidence, request.findings, request.total_steps)
if step_number == 1:
next_steps = (
f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
f"the code files thoroughly for refactoring opportunities using appropriate tools. CRITICAL AWARENESS: "
f"You need to identify code smells, decomposition opportunities, modernization possibilities, and "
f"organization improvements across the specified refactor_type. Look for complexity issues, outdated "
f"patterns, oversized components, and structural problems. Use file reading tools, code analysis, and "
f"systematic examination to gather comprehensive refactoring information. Only call {self.get_name()} "
f"again AFTER completing your investigation. When you call {self.get_name()} next time, use "
f"step_number: {step_number + 1} and report specific files examined, refactoring opportunities found, "
f"and improvement assessments discovered."
)
elif confidence in ["exploring", "incomplete"]:
next_steps = (
f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
f"deeper refactoring analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
+ "completing these refactoring analysis tasks."
)
elif confidence == "partial":
next_steps = (
f"WAIT! Your refactoring analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\\n\\nREMEMBER: Ensure you have identified all significant refactoring opportunities across all types and "
f"verified the completeness of your analysis. Document opportunities with specific file references and "
f"line numbers where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
)
else:
next_steps = (
f"PAUSE REFACTORING ANALYSIS. Before calling {self.get_name()} step {step_number + 1}, you MUST examine more code thoroughly. "
+ "Required: "
+ ", ".join(required_actions[:2])
+ ". "
+ f"Your next {self.get_name()} call (step_number: {step_number + 1}) must include "
f"NEW evidence from actual refactoring analysis, not just theories. NO recursive {self.get_name()} calls "
f"without investigation work!"
)
return {"next_steps": next_steps}
def customize_workflow_response(self, response_data: dict, request) -> dict:
"""
Customize response to match refactor workflow format.
"""
# Store initial request on first step
if request.step_number == 1:
self.initial_request = request.step
# Store refactor configuration for expert analysis
if request.relevant_files:
self.refactor_config = {
"relevant_files": request.relevant_files,
"refactor_type": request.refactor_type,
"focus_areas": request.focus_areas,
"style_guide_examples": request.style_guide_examples,
}
# Convert generic status names to refactor-specific ones
tool_name = self.get_name()
status_mapping = {
f"{tool_name}_in_progress": "refactoring_analysis_in_progress",
f"pause_for_{tool_name}": "pause_for_refactoring_analysis",
f"{tool_name}_required": "refactoring_analysis_required",
f"{tool_name}_complete": "refactoring_analysis_complete",
}
if response_data["status"] in status_mapping:
response_data["status"] = status_mapping[response_data["status"]]
# Rename status field to match refactor workflow
if f"{tool_name}_status" in response_data:
response_data["refactoring_status"] = response_data.pop(f"{tool_name}_status")
# Add refactor-specific status fields
refactor_types = {}
for issue in self.consolidated_findings.issues_found:
issue_type = issue.get("type", "unknown")
if issue_type not in refactor_types:
refactor_types[issue_type] = 0
refactor_types[issue_type] += 1
response_data["refactoring_status"]["opportunities_by_type"] = refactor_types
response_data["refactoring_status"]["refactor_confidence"] = request.confidence
# Map complete_refactor to complete_refactoring
if f"complete_{tool_name}" in response_data:
response_data["complete_refactoring"] = response_data.pop(f"complete_{tool_name}")
# Map the completion flag to match refactor workflow
if f"{tool_name}_complete" in response_data:
response_data["refactoring_complete"] = response_data.pop(f"{tool_name}_complete")
return response_data
# Required abstract methods from BaseTool
def get_request_model(self):
"""Return the refactor workflow-specific request model."""
return RefactorRequest
async def prepare_prompt(self, request) -> str:
"""Not used - workflow tools use execute_workflow()."""
return "" # Workflow tools use execute_workflow() directly
```
--------------------------------------------------------------------------------
/utils/file_utils.py:
--------------------------------------------------------------------------------
```python
"""
File reading utilities with directory support and token management
This module provides secure file access functionality for the MCP server.
It implements critical security measures to prevent unauthorized file access
and manages token limits to ensure efficient API usage.
Key Features:
- Path validation and sandboxing to prevent directory traversal attacks
- Support for both individual files and recursive directory reading
- Token counting and management to stay within API limits
- Automatic file type detection and filtering
- Comprehensive error handling with informative messages
Security Model:
- All file access is restricted to PROJECT_ROOT and its subdirectories
- Absolute paths are required to prevent ambiguity
- Symbolic links are resolved to ensure they stay within bounds
CONVERSATION MEMORY INTEGRATION:
This module works with the conversation memory system to support efficient
multi-turn file handling:
1. DEDUPLICATION SUPPORT:
- File reading functions are called by conversation-aware tools
- Supports newest-first file prioritization by providing accurate token estimation
- Enables efficient file content caching and token budget management
2. TOKEN BUDGET OPTIMIZATION:
- Provides accurate token estimation for file content before reading
- Supports the dual prioritization strategy by enabling precise budget calculations
- Enables tools to make informed decisions about which files to include
3. CROSS-TOOL FILE PERSISTENCE:
- File reading results are used across different tools in conversation chains
- Consistent file access patterns support conversation continuation scenarios
- Error handling preserves conversation flow when files become unavailable
"""
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from .file_types import BINARY_EXTENSIONS, CODE_EXTENSIONS, IMAGE_EXTENSIONS, TEXT_EXTENSIONS
from .security_config import EXCLUDED_DIRS, is_dangerous_path
from .token_utils import DEFAULT_CONTEXT_WINDOW, estimate_tokens
def _is_builtin_custom_models_config(path_str: str) -> bool:
"""
Check if path points to the server's built-in custom_models.json config file.
This only matches the server's internal config, not user-specified CUSTOM_MODELS_CONFIG_PATH.
We identify the built-in config by checking if it resolves to the server's conf directory.
Args:
path_str: Path to check
Returns:
True if this is the server's built-in custom_models.json config file
"""
try:
path = Path(path_str)
# Get the server root by going up from this file: utils/file_utils.py -> server_root
server_root = Path(__file__).parent.parent
builtin_config = server_root / "conf" / "custom_models.json"
# Check if the path resolves to the same file as our built-in config
# This handles both relative and absolute paths to the same file
return path.resolve() == builtin_config.resolve()
except Exception:
# If path resolution fails, it's not our built-in config
return False
logger = logging.getLogger(__name__)
def is_mcp_directory(path: Path) -> bool:
"""
Check if a directory is the MCP server's own directory.
This prevents the MCP from including its own code when scanning projects
where the MCP has been cloned as a subdirectory.
Args:
path: Directory path to check
Returns:
True if this is the MCP server directory or a subdirectory
"""
if not path.is_dir():
return False
# Get the directory where the MCP server is running from
# __file__ is utils/file_utils.py, so parent.parent is the MCP root
mcp_server_dir = Path(__file__).parent.parent.resolve()
# Check if the given path is the MCP server directory or a subdirectory
try:
path.resolve().relative_to(mcp_server_dir)
logger.info(f"Detected MCP server directory at {path}, will exclude from scanning")
return True
except ValueError:
# Not a subdirectory of MCP server
return False
def get_user_home_directory() -> Optional[Path]:
"""
Get the user's home directory.
Returns:
User's home directory path
"""
return Path.home()
def is_home_directory_root(path: Path) -> bool:
"""
Check if the given path is the user's home directory root.
This prevents scanning the entire home directory which could include
sensitive data and non-project files.
Args:
path: Directory path to check
Returns:
True if this is the home directory root
"""
user_home = get_user_home_directory()
if not user_home:
return False
try:
resolved_path = path.resolve()
resolved_home = user_home.resolve()
# Check if this is exactly the home directory
if resolved_path == resolved_home:
logger.warning(
f"Attempted to scan user home directory root: {path}. Please specify a subdirectory instead."
)
return True
# Also check common home directory patterns
path_str = str(resolved_path).lower()
home_patterns = [
"/users/", # macOS
"/home/", # Linux
"c:\\users\\", # Windows
"c:/users/", # Windows with forward slashes
]
for pattern in home_patterns:
if pattern in path_str:
# Extract the user directory path
# e.g., /Users/fahad or /home/username
parts = path_str.split(pattern)
if len(parts) > 1:
# Get the part after the pattern
after_pattern = parts[1]
# Check if we're at the user's root (no subdirectories)
if "/" not in after_pattern and "\\" not in after_pattern:
logger.warning(
f"Attempted to scan user home directory root: {path}. "
f"Please specify a subdirectory instead."
)
return True
except Exception as e:
logger.debug(f"Error checking if path is home directory: {e}")
return False
def detect_file_type(file_path: str) -> str:
"""
Detect file type for appropriate processing strategy.
This function is intended for specific file type handling (e.g., image processing,
binary file analysis, or enhanced file filtering).
Args:
file_path: Path to the file to analyze
Returns:
str: "text", "binary", or "image"
"""
path = Path(file_path)
# Check extension first (fast)
extension = path.suffix.lower()
if extension in TEXT_EXTENSIONS:
return "text"
elif extension in IMAGE_EXTENSIONS:
return "image"
elif extension in BINARY_EXTENSIONS:
return "binary"
# Fallback: check magic bytes for text vs binary
# This is helpful for files without extensions or unknown extensions
try:
with open(path, "rb") as f:
chunk = f.read(1024)
# Simple heuristic: if we can decode as UTF-8, likely text
chunk.decode("utf-8")
return "text"
except UnicodeDecodeError:
return "binary"
except (FileNotFoundError, PermissionError) as e:
logger.warning(f"Could not access file {file_path} for type detection: {e}")
return "unknown"
def should_add_line_numbers(file_path: str, include_line_numbers: Optional[bool] = None) -> bool:
"""
Determine if line numbers should be added to a file.
Args:
file_path: Path to the file
include_line_numbers: Explicit preference, or None for auto-detection
Returns:
bool: True if line numbers should be added
"""
if include_line_numbers is not None:
return include_line_numbers
# Default: DO NOT add line numbers
# Tools that want line numbers must explicitly request them
return False
def _normalize_line_endings(content: str) -> str:
"""
Normalize line endings for consistent line numbering.
Args:
content: File content with potentially mixed line endings
Returns:
str: Content with normalized LF line endings
"""
# Normalize all line endings to LF for consistent counting
return content.replace("\r\n", "\n").replace("\r", "\n")
def _add_line_numbers(content: str) -> str:
"""
Add line numbers to text content for precise referencing.
Args:
content: Text content to number
Returns:
str: Content with line numbers in format " 45│ actual code line"
Supports files up to 99,999 lines with dynamic width allocation
"""
# Normalize line endings first
normalized_content = _normalize_line_endings(content)
lines = normalized_content.split("\n")
# Dynamic width allocation based on total line count
# This supports files of any size by computing required width
total_lines = len(lines)
width = len(str(total_lines))
width = max(width, 4) # Minimum padding for readability
# Format with dynamic width and clear separator
numbered_lines = [f"{i + 1:{width}d}│ {line}" for i, line in enumerate(lines)]
return "\n".join(numbered_lines)
def resolve_and_validate_path(path_str: str) -> Path:
"""
Resolves and validates a path against security policies.
This function ensures safe file access by:
1. Requiring absolute paths (no ambiguity)
2. Resolving symlinks to prevent deception
3. Blocking access to dangerous system directories
Args:
path_str: Path string (must be absolute)
Returns:
Resolved Path object that is safe to access
Raises:
ValueError: If path is not absolute or otherwise invalid
PermissionError: If path is in a dangerous location
"""
# Step 1: Create a Path object
user_path = Path(path_str)
# Step 2: Security Policy - Require absolute paths
# Relative paths could be interpreted differently depending on working directory
if not user_path.is_absolute():
raise ValueError(f"Relative paths are not supported. Please provide an absolute path.\nReceived: {path_str}")
# Step 3: Resolve the absolute path (follows symlinks, removes .. and .)
# This is critical for security as it reveals the true destination of symlinks
resolved_path = user_path.resolve()
# Step 4: Check against dangerous paths
if is_dangerous_path(resolved_path):
logger.warning(f"Access denied - dangerous path: {resolved_path}")
raise PermissionError(f"Access to system directory denied: {path_str}")
# Step 5: Check if it's the home directory root
if is_home_directory_root(resolved_path):
raise PermissionError(
f"Cannot scan entire home directory: {path_str}\n" f"Please specify a subdirectory within your home folder."
)
return resolved_path
def expand_paths(paths: list[str], extensions: Optional[set[str]] = None) -> list[str]:
"""
Expand paths to individual files, handling both files and directories.
This function recursively walks directories to find all matching files.
It automatically filters out hidden files and common non-code directories
like __pycache__ to avoid including generated or system files.
Args:
paths: List of file or directory paths (must be absolute)
extensions: Optional set of file extensions to include (defaults to CODE_EXTENSIONS)
Returns:
List of individual file paths, sorted for consistent ordering
"""
if extensions is None:
extensions = CODE_EXTENSIONS
expanded_files = []
seen = set()
for path in paths:
try:
# Validate each path for security before processing
path_obj = resolve_and_validate_path(path)
except (ValueError, PermissionError):
# Skip invalid paths silently to allow partial success
continue
if not path_obj.exists():
continue
# Safety checks for directory scanning
if path_obj.is_dir():
# Check 1: Prevent scanning user's home directory root
if is_home_directory_root(path_obj):
logger.warning(f"Skipping home directory root: {path}. Please specify a project subdirectory instead.")
continue
# Check 2: Skip if this is the MCP's own directory
if is_mcp_directory(path_obj):
logger.info(
f"Skipping MCP server directory: {path}. The MCP server code is excluded from project scans."
)
continue
if path_obj.is_file():
# Add file directly
if str(path_obj) not in seen:
expanded_files.append(str(path_obj))
seen.add(str(path_obj))
elif path_obj.is_dir():
# Walk directory recursively to find all files
for root, dirs, files in os.walk(path_obj):
# Filter directories in-place to skip hidden and excluded directories
# This prevents descending into .git, .venv, __pycache__, node_modules, etc.
original_dirs = dirs[:]
dirs[:] = []
for d in original_dirs:
# Skip hidden directories
if d.startswith("."):
continue
# Skip excluded directories
if d in EXCLUDED_DIRS:
continue
# Skip MCP directories found during traversal
dir_path = Path(root) / d
if is_mcp_directory(dir_path):
logger.debug(f"Skipping MCP directory during traversal: {dir_path}")
continue
dirs.append(d)
for file in files:
# Skip hidden files (e.g., .DS_Store, .gitignore)
if file.startswith("."):
continue
file_path = Path(root) / file
# Filter by extension if specified
if not extensions or file_path.suffix.lower() in extensions:
full_path = str(file_path)
# Use set to prevent duplicates
if full_path not in seen:
expanded_files.append(full_path)
seen.add(full_path)
# Sort for consistent ordering across different runs
# This makes output predictable and easier to debug
expanded_files.sort()
return expanded_files
def read_file_content(
file_path: str, max_size: int = 1_000_000, *, include_line_numbers: Optional[bool] = None
) -> tuple[str, int]:
"""
Read a single file and format it for inclusion in AI prompts.
This function handles various error conditions gracefully and always
returns formatted content, even for errors. This ensures the AI model
gets context about what files were attempted but couldn't be read.
Args:
file_path: Path to file (must be absolute)
max_size: Maximum file size to read (default 1MB to prevent memory issues)
include_line_numbers: Whether to add line numbers. If None, auto-detects based on file type
Returns:
Tuple of (formatted_content, estimated_tokens)
Content is wrapped with clear delimiters for AI parsing
"""
logger.debug(f"[FILES] read_file_content called for: {file_path}")
try:
# Validate path security before any file operations
path = resolve_and_validate_path(file_path)
logger.debug(f"[FILES] Path validated and resolved: {path}")
except (ValueError, PermissionError) as e:
# Return error in a format that provides context to the AI
logger.debug(f"[FILES] Path validation failed for {file_path}: {type(e).__name__}: {e}")
error_msg = str(e)
content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {error_msg}\n--- END FILE ---\n"
tokens = estimate_tokens(content)
logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
return content, tokens
try:
# Validate file existence and type
if not path.exists():
logger.debug(f"[FILES] File does not exist: {file_path}")
content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
return content, estimate_tokens(content)
if not path.is_file():
logger.debug(f"[FILES] Path is not a file: {file_path}")
content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Check file size to prevent memory exhaustion
stat_result = path.stat()
file_size = stat_result.st_size
logger.debug(f"[FILES] File size for {file_path}: {file_size:,} bytes")
if file_size > max_size:
logger.debug(f"[FILES] File too large: {file_path} ({file_size:,} > {max_size:,} bytes)")
modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
content = (
f"\n--- FILE TOO LARGE: {file_path} (Last modified: {modified_at}) ---\n"
f"File size: {file_size:,} bytes (max: {max_size:,})\n"
"--- END FILE ---\n"
)
return content, estimate_tokens(content)
# Determine if we should add line numbers
add_line_numbers = should_add_line_numbers(file_path, include_line_numbers)
logger.debug(f"[FILES] Line numbers for {file_path}: {'enabled' if add_line_numbers else 'disabled'}")
# Read the file with UTF-8 encoding, replacing invalid characters
# This ensures we can handle files with mixed encodings
logger.debug(f"[FILES] Reading file content for {file_path}")
with open(path, encoding="utf-8", errors="replace") as f:
file_content = f.read()
logger.debug(f"[FILES] Successfully read {len(file_content)} characters from {file_path}")
# Add line numbers if requested or auto-detected
if add_line_numbers:
file_content = _add_line_numbers(file_content)
logger.debug(f"[FILES] Added line numbers to {file_path}")
else:
# Still normalize line endings for consistency
file_content = _normalize_line_endings(file_content)
# Format with clear delimiters that help the AI understand file boundaries
# Using consistent markers makes it easier for the model to parse
# NOTE: These markers ("--- BEGIN FILE: ... ---") are distinct from git diff markers
# ("--- BEGIN DIFF: ... ---") to allow AI to distinguish between complete file content
# vs. partial diff content when files appear in both sections
modified_at = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
formatted = (
f"\n--- BEGIN FILE: {file_path} (Last modified: {modified_at}) ---\n"
f"{file_content}\n"
f"--- END FILE: {file_path} ---\n"
)
tokens = estimate_tokens(formatted)
logger.debug(f"[FILES] Formatted content for {file_path}: {len(formatted)} chars, {tokens} tokens")
return formatted, tokens
except Exception as e:
logger.debug(f"[FILES] Exception reading file {file_path}: {type(e).__name__}: {e}")
content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
tokens = estimate_tokens(content)
logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
return content, tokens
def read_files(
file_paths: list[str],
code: Optional[str] = None,
max_tokens: Optional[int] = None,
reserve_tokens: int = 50_000,
*,
include_line_numbers: bool = False,
) -> str:
"""
Read multiple files and optional direct code with smart token management.
This function implements intelligent token budgeting to maximize the amount
of relevant content that can be included in an AI prompt while staying
within token limits. It prioritizes direct code and reads files until
the token budget is exhausted.
Args:
file_paths: List of file or directory paths (absolute paths required)
code: Optional direct code to include (prioritized over files)
max_tokens: Maximum tokens to use (defaults to DEFAULT_CONTEXT_WINDOW)
reserve_tokens: Tokens to reserve for prompt and response (default 50K)
include_line_numbers: Whether to add line numbers to file content
Returns:
str: All file contents formatted for AI consumption
"""
if max_tokens is None:
max_tokens = DEFAULT_CONTEXT_WINDOW
logger.debug(f"[FILES] read_files called with {len(file_paths)} paths")
logger.debug(
f"[FILES] Token budget: max={max_tokens:,}, reserve={reserve_tokens:,}, available={max_tokens - reserve_tokens:,}"
)
content_parts = []
total_tokens = 0
available_tokens = max_tokens - reserve_tokens
files_skipped = []
# Priority 1: Handle direct code if provided
# Direct code is prioritized because it's explicitly provided by the user
if code:
formatted_code = f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n"
code_tokens = estimate_tokens(formatted_code)
if code_tokens <= available_tokens:
content_parts.append(formatted_code)
total_tokens += code_tokens
available_tokens -= code_tokens
# Priority 2: Process file paths
if file_paths:
# Expand directories to get all individual files
logger.debug(f"[FILES] Expanding {len(file_paths)} file paths")
all_files = expand_paths(file_paths)
logger.debug(f"[FILES] After expansion: {len(all_files)} individual files")
if not all_files and file_paths:
# No files found but paths were provided
logger.debug("[FILES] No files found from provided paths")
content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n")
else:
# Read files sequentially until token limit is reached
logger.debug(f"[FILES] Reading {len(all_files)} files with token budget {available_tokens:,}")
for i, file_path in enumerate(all_files):
if total_tokens >= available_tokens:
logger.debug(f"[FILES] Token budget exhausted, skipping remaining {len(all_files) - i} files")
files_skipped.extend(all_files[i:])
break
file_content, file_tokens = read_file_content(file_path, include_line_numbers=include_line_numbers)
logger.debug(f"[FILES] File {file_path}: {file_tokens:,} tokens")
# Check if adding this file would exceed limit
if total_tokens + file_tokens <= available_tokens:
content_parts.append(file_content)
total_tokens += file_tokens
logger.debug(f"[FILES] Added file {file_path}, total tokens: {total_tokens:,}")
else:
# File too large for remaining budget
logger.debug(
f"[FILES] File {file_path} too large for remaining budget ({file_tokens:,} tokens, {available_tokens - total_tokens:,} remaining)"
)
files_skipped.append(file_path)
# Add informative note about skipped files to help users understand
# what was omitted and why
if files_skipped:
logger.debug(f"[FILES] {len(files_skipped)} files skipped due to token limits")
skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
skip_note += f"Total skipped: {len(files_skipped)}\n"
# Show first 10 skipped files as examples
for _i, file_path in enumerate(files_skipped[:10]):
skip_note += f" - {file_path}\n"
if len(files_skipped) > 10:
skip_note += f" ... and {len(files_skipped) - 10} more\n"
skip_note += "--- END SKIPPED FILES ---\n"
content_parts.append(skip_note)
result = "\n\n".join(content_parts) if content_parts else ""
logger.debug(f"[FILES] read_files complete: {len(result)} chars, {total_tokens:,} tokens used")
return result
def estimate_file_tokens(file_path: str) -> int:
"""
Estimate tokens for a file using file-type aware ratios.
Args:
file_path: Path to the file
Returns:
Estimated token count for the file
"""
try:
if not os.path.exists(file_path) or not os.path.isfile(file_path):
return 0
file_size = os.path.getsize(file_path)
# Get the appropriate ratio for this file type
from .file_types import get_token_estimation_ratio
ratio = get_token_estimation_ratio(file_path)
return int(file_size / ratio)
except Exception:
return 0
def check_files_size_limit(files: list[str], max_tokens: int, threshold_percent: float = 1.0) -> tuple[bool, int, int]:
"""
Check if a list of files would exceed token limits.
Args:
files: List of file paths to check
max_tokens: Maximum allowed tokens
threshold_percent: Percentage of max_tokens to use as threshold (0.0-1.0)
Returns:
Tuple of (within_limit, total_estimated_tokens, file_count)
"""
if not files:
return True, 0, 0
total_estimated_tokens = 0
file_count = 0
threshold = int(max_tokens * threshold_percent)
for file_path in files:
try:
estimated_tokens = estimate_file_tokens(file_path)
total_estimated_tokens += estimated_tokens
if estimated_tokens > 0: # Only count accessible files
file_count += 1
except Exception:
# Skip files that can't be accessed for size check
continue
within_limit = total_estimated_tokens <= threshold
return within_limit, total_estimated_tokens, file_count
def read_json_file(file_path: str) -> Optional[dict]:
"""
Read and parse a JSON file with proper error handling.
Args:
file_path: Path to the JSON file
Returns:
Parsed JSON data as dict, or None if file doesn't exist or invalid
"""
try:
if not os.path.exists(file_path):
return None
with open(file_path, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def write_json_file(file_path: str, data: dict, indent: int = 2) -> bool:
"""
Write data to a JSON file with proper formatting.
Args:
file_path: Path to write the JSON file
data: Dictionary data to serialize
indent: JSON indentation level
Returns:
True if successful, False otherwise
"""
try:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=indent, ensure_ascii=False)
return True
except (OSError, TypeError):
return False
def get_file_size(file_path: str) -> int:
"""
Get file size in bytes with proper error handling.
Args:
file_path: Path to the file
Returns:
File size in bytes, or 0 if file doesn't exist or error
"""
try:
if os.path.exists(file_path) and os.path.isfile(file_path):
return os.path.getsize(file_path)
return 0
except OSError:
return 0
def ensure_directory_exists(file_path: str) -> bool:
"""
Ensure the parent directory of a file path exists.
Args:
file_path: Path to file (directory will be created for parent)
Returns:
True if directory exists or was created, False on error
"""
try:
directory = os.path.dirname(file_path)
if directory:
os.makedirs(directory, exist_ok=True)
return True
except OSError:
return False
def is_text_file(file_path: str) -> bool:
"""
Check if a file is likely a text file based on extension and content.
Args:
file_path: Path to the file
Returns:
True if file appears to be text, False otherwise
"""
from .file_types import is_text_file as check_text_type
return check_text_type(file_path)
def read_file_safely(file_path: str, max_size: int = 10 * 1024 * 1024) -> Optional[str]:
"""
Read a file with size limits and encoding handling.
Args:
file_path: Path to the file
max_size: Maximum file size in bytes (default 10MB)
Returns:
File content as string, or None if file too large or unreadable
"""
try:
if not os.path.exists(file_path) or not os.path.isfile(file_path):
return None
file_size = os.path.getsize(file_path)
if file_size > max_size:
return None
with open(file_path, encoding="utf-8", errors="ignore") as f:
return f.read()
except OSError:
return None
def check_total_file_size(files: list[str], model_name: str) -> Optional[dict]:
"""
Check if total file sizes would exceed token threshold before embedding.
IMPORTANT: This performs STRICT REJECTION at MCP boundary.
No partial inclusion - either all files fit or request is rejected.
This forces the CLI to make better file selection decisions.
This function MUST be called with the effective model name (after resolution).
It should never receive 'auto' or None - model resolution happens earlier.
Args:
files: List of file paths to check
model_name: The resolved model name for context-aware thresholds (required)
Returns:
Dict with `code_too_large` response if too large, None if acceptable
"""
if not files:
return None
# Validate we have a proper model name (not auto or None)
if not model_name or model_name.lower() == "auto":
raise ValueError(
f"check_total_file_size called with unresolved model: '{model_name}'. "
"Model must be resolved before file size checking."
)
logger.info(f"File size check: Using model '{model_name}' for token limit calculation")
from utils.model_context import ModelContext
model_context = ModelContext(model_name)
token_allocation = model_context.calculate_token_allocation()
# Dynamic threshold based on model capacity
context_window = token_allocation.total_tokens
if context_window >= 1_000_000: # Gemini-class models
threshold_percent = 0.8 # Can be more generous
elif context_window >= 500_000: # Mid-range models
threshold_percent = 0.7 # Moderate
else: # OpenAI-class models (200K)
threshold_percent = 0.6 # Conservative
max_file_tokens = int(token_allocation.file_tokens * threshold_percent)
# Use centralized file size checking (threshold already applied to max_file_tokens)
within_limit, total_estimated_tokens, file_count = check_files_size_limit(files, max_file_tokens)
if not within_limit:
return {
"status": "code_too_large",
"content": (
f"The selected files are too large for analysis "
f"(estimated {total_estimated_tokens:,} tokens, limit {max_file_tokens:,}). "
f"Please select fewer, more specific files that are most relevant "
f"to your question, then invoke the tool again."
),
"content_type": "text",
"metadata": {
"total_estimated_tokens": total_estimated_tokens,
"limit": max_file_tokens,
"file_count": file_count,
"threshold_percent": threshold_percent,
"model_context_window": context_window,
"model_name": model_name,
"instructions": "Reduce file selection and try again - all files must fit within budget. If this persists, please use a model with a larger context window where available.",
},
}
return None # Proceed with ALL files
```