#
tokens: 39861/50000 2/353 files (page 23/25)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 23 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   └── fix-github-issue.md
│   └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── documentation.yml
│   │   ├── feature_request.yml
│   │   └── tool_addition.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-pr.yml
│       ├── docker-release.yml
│       ├── semantic-pr.yml
│       ├── semantic-release.yml
│       └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   ├── constants.py
│   ├── models.py
│   ├── parsers
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│   ├── __init__.py
│   ├── azure_models.json
│   ├── cli_clients
│   │   ├── claude.json
│   │   ├── codex.json
│   │   └── gemini.json
│   ├── custom_models.json
│   ├── dial_models.json
│   ├── gemini_models.json
│   ├── openai_models.json
│   ├── openrouter_models.json
│   └── xai_models.json
├── config.py
├── docker
│   ├── README.md
│   └── scripts
│       ├── build.ps1
│       ├── build.sh
│       ├── deploy.ps1
│       ├── deploy.sh
│       └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── adding_providers.md
│   ├── adding_tools.md
│   ├── advanced-usage.md
│   ├── ai_banter.md
│   ├── ai-collaboration.md
│   ├── azure_openai.md
│   ├── configuration.md
│   ├── context-revival.md
│   ├── contributions.md
│   ├── custom_models.md
│   ├── docker-deployment.md
│   ├── gemini-setup.md
│   ├── getting-started.md
│   ├── index.md
│   ├── locale-configuration.md
│   ├── logging.md
│   ├── model_ranking.md
│   ├── testing.md
│   ├── tools
│   │   ├── analyze.md
│   │   ├── apilookup.md
│   │   ├── challenge.md
│   │   ├── chat.md
│   │   ├── clink.md
│   │   ├── codereview.md
│   │   ├── consensus.md
│   │   ├── debug.md
│   │   ├── docgen.md
│   │   ├── listmodels.md
│   │   ├── planner.md
│   │   ├── precommit.md
│   │   ├── refactor.md
│   │   ├── secaudit.md
│   │   ├── testgen.md
│   │   ├── thinkdeep.md
│   │   ├── tracer.md
│   │   └── version.md
│   ├── troubleshooting.md
│   ├── vcr-testing.md
│   └── wsl-setup.md
├── examples
│   ├── claude_config_macos.json
│   └── claude_config_wsl.json
├── LICENSE
├── providers
│   ├── __init__.py
│   ├── azure_openai.py
│   ├── base.py
│   ├── custom.py
│   ├── dial.py
│   ├── gemini.py
│   ├── openai_compatible.py
│   ├── openai.py
│   ├── openrouter.py
│   ├── registries
│   │   ├── __init__.py
│   │   ├── azure.py
│   │   ├── base.py
│   │   ├── custom.py
│   │   ├── dial.py
│   │   ├── gemini.py
│   │   ├── openai.py
│   │   ├── openrouter.py
│   │   └── xai.py
│   ├── registry_provider_mixin.py
│   ├── registry.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── model_capabilities.py
│   │   ├── model_response.py
│   │   ├── provider_type.py
│   │   └── temperature.py
│   └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│   └── sync_version.py
├── server.py
├── simulator_tests
│   ├── __init__.py
│   ├── base_test.py
│   ├── conversation_base_test.py
│   ├── log_utils.py
│   ├── test_analyze_validation.py
│   ├── test_basic_conversation.py
│   ├── test_chat_simple_validation.py
│   ├── test_codereview_validation.py
│   ├── test_consensus_conversation.py
│   ├── test_consensus_three_models.py
│   ├── test_consensus_workflow_accurate.py
│   ├── test_content_validation.py
│   ├── test_conversation_chain_validation.py
│   ├── test_cross_tool_comprehensive.py
│   ├── test_cross_tool_continuation.py
│   ├── test_debug_certain_confidence.py
│   ├── test_debug_validation.py
│   ├── test_line_number_validation.py
│   ├── test_logs_validation.py
│   ├── test_model_thinking_config.py
│   ├── test_o3_model_selection.py
│   ├── test_o3_pro_expensive.py
│   ├── test_ollama_custom_url.py
│   ├── test_openrouter_fallback.py
│   ├── test_openrouter_models.py
│   ├── test_per_tool_deduplication.py
│   ├── test_planner_continuation_history.py
│   ├── test_planner_validation_old.py
│   ├── test_planner_validation.py
│   ├── test_precommitworkflow_validation.py
│   ├── test_prompt_size_limit_bug.py
│   ├── test_refactor_validation.py
│   ├── test_secaudit_validation.py
│   ├── test_testgen_validation.py
│   ├── test_thinkdeep_validation.py
│   ├── test_token_allocation_validation.py
│   ├── test_vision_capability.py
│   └── test_xai_models.py
├── systemprompts
│   ├── __init__.py
│   ├── analyze_prompt.py
│   ├── chat_prompt.py
│   ├── clink
│   │   ├── codex_codereviewer.txt
│   │   ├── default_codereviewer.txt
│   │   ├── default_planner.txt
│   │   └── default.txt
│   ├── codereview_prompt.py
│   ├── consensus_prompt.py
│   ├── debug_prompt.py
│   ├── docgen_prompt.py
│   ├── generate_code_prompt.py
│   ├── planner_prompt.py
│   ├── precommit_prompt.py
│   ├── refactor_prompt.py
│   ├── secaudit_prompt.py
│   ├── testgen_prompt.py
│   ├── thinkdeep_prompt.py
│   └── tracer_prompt.py
├── tests
│   ├── __init__.py
│   ├── CASSETTE_MAINTENANCE.md
│   ├── conftest.py
│   ├── gemini_cassettes
│   │   ├── chat_codegen
│   │   │   └── gemini25_pro_calculator
│   │   │       └── mldev.json
│   │   ├── chat_cross
│   │   │   └── step1_gemini25_flash_number
│   │   │       └── mldev.json
│   │   └── consensus
│   │       └── step2_gemini25_flash_against
│   │           └── mldev.json
│   ├── http_transport_recorder.py
│   ├── mock_helpers.py
│   ├── openai_cassettes
│   │   ├── chat_cross_step2_gpt5_reminder.json
│   │   ├── chat_gpt5_continuation.json
│   │   ├── chat_gpt5_moon_distance.json
│   │   ├── consensus_step1_gpt5_for.json
│   │   └── o3_pro_basic_math.json
│   ├── pii_sanitizer.py
│   ├── sanitize_cassettes.py
│   ├── test_alias_target_restrictions.py
│   ├── test_auto_mode_comprehensive.py
│   ├── test_auto_mode_custom_provider_only.py
│   ├── test_auto_mode_model_listing.py
│   ├── test_auto_mode_provider_selection.py
│   ├── test_auto_mode.py
│   ├── test_auto_model_planner_fix.py
│   ├── test_azure_openai_provider.py
│   ├── test_buggy_behavior_prevention.py
│   ├── test_cassette_semantic_matching.py
│   ├── test_challenge.py
│   ├── test_chat_codegen_integration.py
│   ├── test_chat_cross_model_continuation.py
│   ├── test_chat_openai_integration.py
│   ├── test_chat_simple.py
│   ├── test_clink_claude_agent.py
│   ├── test_clink_claude_parser.py
│   ├── test_clink_codex_agent.py
│   ├── test_clink_gemini_agent.py
│   ├── test_clink_gemini_parser.py
│   ├── test_clink_integration.py
│   ├── test_clink_parsers.py
│   ├── test_clink_tool.py
│   ├── test_collaboration.py
│   ├── test_config.py
│   ├── test_consensus_integration.py
│   ├── test_consensus_schema.py
│   ├── test_consensus.py
│   ├── test_conversation_continuation_integration.py
│   ├── test_conversation_field_mapping.py
│   ├── test_conversation_file_features.py
│   ├── test_conversation_memory.py
│   ├── test_conversation_missing_files.py
│   ├── test_custom_openai_temperature_fix.py
│   ├── test_custom_provider.py
│   ├── test_debug.py
│   ├── test_deploy_scripts.py
│   ├── test_dial_provider.py
│   ├── test_directory_expansion_tracking.py
│   ├── test_disabled_tools.py
│   ├── test_docker_claude_desktop_integration.py
│   ├── test_docker_config_complete.py
│   ├── test_docker_healthcheck.py
│   ├── test_docker_implementation.py
│   ├── test_docker_mcp_validation.py
│   ├── test_docker_security.py
│   ├── test_docker_volume_persistence.py
│   ├── test_file_protection.py
│   ├── test_gemini_token_usage.py
│   ├── test_image_support_integration.py
│   ├── test_image_validation.py
│   ├── test_integration_utf8.py
│   ├── test_intelligent_fallback.py
│   ├── test_issue_245_simple.py
│   ├── test_large_prompt_handling.py
│   ├── test_line_numbers_integration.py
│   ├── test_listmodels_restrictions.py
│   ├── test_listmodels.py
│   ├── test_mcp_error_handling.py
│   ├── test_model_enumeration.py
│   ├── test_model_metadata_continuation.py
│   ├── test_model_resolution_bug.py
│   ├── test_model_restrictions.py
│   ├── test_o3_pro_output_text_fix.py
│   ├── test_o3_temperature_fix_simple.py
│   ├── test_openai_compatible_token_usage.py
│   ├── test_openai_provider.py
│   ├── test_openrouter_provider.py
│   ├── test_openrouter_registry.py
│   ├── test_parse_model_option.py
│   ├── test_per_tool_model_defaults.py
│   ├── test_pii_sanitizer.py
│   ├── test_pip_detection_fix.py
│   ├── test_planner.py
│   ├── test_precommit_workflow.py
│   ├── test_prompt_regression.py
│   ├── test_prompt_size_limit_bug_fix.py
│   ├── test_provider_retry_logic.py
│   ├── test_provider_routing_bugs.py
│   ├── test_provider_utf8.py
│   ├── test_providers.py
│   ├── test_rate_limit_patterns.py
│   ├── test_refactor.py
│   ├── test_secaudit.py
│   ├── test_server.py
│   ├── test_supported_models_aliases.py
│   ├── test_thinking_modes.py
│   ├── test_tools.py
│   ├── test_tracer.py
│   ├── test_utf8_localization.py
│   ├── test_utils.py
│   ├── test_uvx_resource_packaging.py
│   ├── test_uvx_support.py
│   ├── test_workflow_file_embedding.py
│   ├── test_workflow_metadata.py
│   ├── test_workflow_prompt_size_validation_simple.py
│   ├── test_workflow_utf8.py
│   ├── test_xai_provider.py
│   ├── transport_helpers.py
│   └── triangle.png
├── tools
│   ├── __init__.py
│   ├── analyze.py
│   ├── apilookup.py
│   ├── challenge.py
│   ├── chat.py
│   ├── clink.py
│   ├── codereview.py
│   ├── consensus.py
│   ├── debug.py
│   ├── docgen.py
│   ├── listmodels.py
│   ├── models.py
│   ├── planner.py
│   ├── precommit.py
│   ├── refactor.py
│   ├── secaudit.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── base_models.py
│   │   ├── base_tool.py
│   │   ├── exceptions.py
│   │   └── schema_builders.py
│   ├── simple
│   │   ├── __init__.py
│   │   └── base.py
│   ├── testgen.py
│   ├── thinkdeep.py
│   ├── tracer.py
│   ├── version.py
│   └── workflow
│       ├── __init__.py
│       ├── base.py
│       ├── schema_builders.py
│       └── workflow_mixin.py
├── utils
│   ├── __init__.py
│   ├── client_info.py
│   ├── conversation_memory.py
│   ├── env.py
│   ├── file_types.py
│   ├── file_utils.py
│   ├── image_utils.py
│   ├── model_context.py
│   ├── model_restrictions.py
│   ├── security_config.py
│   ├── storage_backend.py
│   └── token_utils.py
└── zen-mcp-server
```

# Files

--------------------------------------------------------------------------------
/tools/shared/base_tool.py:
--------------------------------------------------------------------------------

```python
   1 | """
   2 | Core Tool Infrastructure for Zen MCP Tools
   3 | 
   4 | This module provides the fundamental base class for all tools:
   5 | - BaseTool: Abstract base class defining the tool interface
   6 | 
   7 | The BaseTool class defines the core contract that tools must implement and provides
   8 | common functionality for request validation, error handling, model management,
   9 | conversation handling, file processing, and response formatting.
  10 | """
  11 | 
  12 | import logging
  13 | import os
  14 | from abc import ABC, abstractmethod
  15 | from typing import TYPE_CHECKING, Any, Optional
  16 | 
  17 | from mcp.types import TextContent
  18 | 
  19 | if TYPE_CHECKING:
  20 |     from providers.shared import ModelCapabilities
  21 |     from tools.models import ToolModelCategory
  22 | 
  23 | from config import MCP_PROMPT_SIZE_LIMIT
  24 | from providers import ModelProvider, ModelProviderRegistry
  25 | from utils import estimate_tokens
  26 | from utils.conversation_memory import (
  27 |     ConversationTurn,
  28 |     get_conversation_file_list,
  29 |     get_thread,
  30 | )
  31 | from utils.env import get_env
  32 | from utils.file_utils import read_file_content, read_files
  33 | 
  34 | # Import models from tools.models for compatibility
  35 | try:
  36 |     from tools.models import SPECIAL_STATUS_MODELS, ContinuationOffer, ToolOutput
  37 | except ImportError:
  38 |     # Fallback in case models haven't been set up yet
  39 |     SPECIAL_STATUS_MODELS = {}
  40 |     ContinuationOffer = None
  41 |     ToolOutput = None
  42 | 
  43 | logger = logging.getLogger(__name__)
  44 | 
  45 | 
  46 | class BaseTool(ABC):
  47 |     """
  48 |     Abstract base class for all Zen MCP tools.
  49 | 
  50 |     This class defines the interface that all tools must implement and provides
  51 |     common functionality for request handling, model creation, and response formatting.
  52 | 
  53 |     CONVERSATION-AWARE FILE PROCESSING:
  54 |     This base class implements the sophisticated dual prioritization strategy for
  55 |     conversation-aware file handling across all tools:
  56 | 
  57 |     1. FILE DEDUPLICATION WITH NEWEST-FIRST PRIORITY:
  58 |        - When same file appears in multiple conversation turns, newest reference wins
  59 |        - Prevents redundant file embedding while preserving most recent file state
  60 |        - Cross-tool file tracking ensures consistent behavior across analyze → codereview → debug
  61 | 
  62 |     2. CONVERSATION CONTEXT INTEGRATION:
  63 |        - All tools receive enhanced prompts with conversation history via reconstruct_thread_context()
  64 |        - File references from previous turns are preserved and accessible
  65 |        - Cross-tool knowledge transfer maintains full context without manual file re-specification
  66 | 
  67 |     3. TOKEN-AWARE FILE EMBEDDING:
  68 |        - Respects model-specific token allocation budgets from ModelContext
  69 |        - Prioritizes conversation history, then newest files, then remaining content
  70 |        - Graceful degradation when token limits are approached
  71 | 
  72 |     4. STATELESS-TO-STATEFUL BRIDGING:
  73 |        - Tools operate on stateless MCP requests but access full conversation state
  74 |        - Conversation memory automatically injected via continuation_id parameter
  75 |        - Enables natural AI-to-AI collaboration across tool boundaries
  76 | 
  77 |     To create a new tool:
  78 |     1. Create a new class that inherits from BaseTool
  79 |     2. Implement all abstract methods
  80 |     3. Define a request model that inherits from ToolRequest
  81 |     4. Register the tool in server.py's TOOLS dictionary
  82 |     """
  83 | 
  84 |     # Class-level cache for OpenRouter registry to avoid multiple loads
  85 |     _openrouter_registry_cache = None
  86 |     _custom_registry_cache = None
  87 | 
  88 |     @classmethod
  89 |     def _get_openrouter_registry(cls):
  90 |         """Get cached OpenRouter registry instance, creating if needed."""
  91 |         # Use BaseTool class directly to ensure cache is shared across all subclasses
  92 |         if BaseTool._openrouter_registry_cache is None:
  93 |             from providers.registries.openrouter import OpenRouterModelRegistry
  94 | 
  95 |             BaseTool._openrouter_registry_cache = OpenRouterModelRegistry()
  96 |             logger.debug("Created cached OpenRouter registry instance")
  97 |         return BaseTool._openrouter_registry_cache
  98 | 
  99 |     @classmethod
 100 |     def _get_custom_registry(cls):
 101 |         """Get cached custom-endpoint registry instance."""
 102 |         if BaseTool._custom_registry_cache is None:
 103 |             from providers.registries.custom import CustomEndpointModelRegistry
 104 | 
 105 |             BaseTool._custom_registry_cache = CustomEndpointModelRegistry()
 106 |             logger.debug("Created cached Custom registry instance")
 107 |         return BaseTool._custom_registry_cache
 108 | 
 109 |     def __init__(self):
 110 |         # Cache tool metadata at initialization to avoid repeated calls
 111 |         self.name = self.get_name()
 112 |         self.description = self.get_description()
 113 |         self.default_temperature = self.get_default_temperature()
 114 |         # Tool initialization complete
 115 | 
 116 |     @abstractmethod
 117 |     def get_name(self) -> str:
 118 |         """
 119 |         Return the unique name identifier for this tool.
 120 | 
 121 |         This name is used by MCP clients to invoke the tool and must be
 122 |         unique across all registered tools.
 123 | 
 124 |         Returns:
 125 |             str: The tool's unique name (e.g., "review_code", "analyze")
 126 |         """
 127 |         pass
 128 | 
 129 |     @abstractmethod
 130 |     def get_description(self) -> str:
 131 |         """
 132 |         Return a detailed description of what this tool does.
 133 | 
 134 |         This description is shown to MCP clients (like Claude / Codex / Gemini) to help them
 135 |         understand when and how to use the tool. It should be comprehensive
 136 |         and include trigger phrases.
 137 | 
 138 |         Returns:
 139 |             str: Detailed tool description with usage examples
 140 |         """
 141 |         pass
 142 | 
 143 |     @abstractmethod
 144 |     def get_input_schema(self) -> dict[str, Any]:
 145 |         """
 146 |         Return the JSON Schema that defines this tool's parameters.
 147 | 
 148 |         This schema is used by MCP clients to validate inputs before
 149 |         sending requests. It should match the tool's request model.
 150 | 
 151 |         Returns:
 152 |             Dict[str, Any]: JSON Schema object defining required and optional parameters
 153 |         """
 154 |         pass
 155 | 
 156 |     @abstractmethod
 157 |     def get_system_prompt(self) -> str:
 158 |         """
 159 |         Return the system prompt that configures the AI model's behavior.
 160 | 
 161 |         This prompt sets the context and instructions for how the model
 162 |         should approach the task. It's prepended to the user's request.
 163 | 
 164 |         Returns:
 165 |             str: System prompt with role definition and instructions
 166 |         """
 167 |         pass
 168 | 
 169 |     def get_capability_system_prompts(self, capabilities: Optional["ModelCapabilities"]) -> list[str]:
 170 |         """Return additional system prompt snippets gated on model capabilities.
 171 | 
 172 |         Subclasses can override this hook to append capability-specific
 173 |         instructions (for example, enabling code-generation exports when a
 174 |         model advertises support). The default implementation returns an empty
 175 |         list so no extra instructions are appended.
 176 | 
 177 |         Args:
 178 |             capabilities: The resolved capabilities for the active model.
 179 | 
 180 |         Returns:
 181 |             List of prompt fragments to append after the base system prompt.
 182 |         """
 183 | 
 184 |         return []
 185 | 
 186 |     def _augment_system_prompt_with_capabilities(
 187 |         self, base_prompt: str, capabilities: Optional["ModelCapabilities"]
 188 |     ) -> str:
 189 |         """Merge capability-driven prompt addenda with the base system prompt."""
 190 | 
 191 |         additions: list[str] = []
 192 |         if capabilities is not None:
 193 |             additions = [fragment.strip() for fragment in self.get_capability_system_prompts(capabilities) if fragment]
 194 | 
 195 |         if not additions:
 196 |             return base_prompt
 197 | 
 198 |         addition_text = "\n\n".join(additions)
 199 |         if not base_prompt:
 200 |             return addition_text
 201 | 
 202 |         suffix = "" if base_prompt.endswith("\n\n") else "\n\n"
 203 |         return f"{base_prompt}{suffix}{addition_text}"
 204 | 
 205 |     def get_annotations(self) -> Optional[dict[str, Any]]:
 206 |         """
 207 |         Return optional annotations for this tool.
 208 | 
 209 |         Annotations provide hints about tool behavior without being security-critical.
 210 |         They help MCP clients make better decisions about tool usage.
 211 | 
 212 |         Returns:
 213 |             Optional[dict]: Dictionary with annotation fields like readOnlyHint, destructiveHint, etc.
 214 |                            Returns None if no annotations are needed.
 215 |         """
 216 |         return None
 217 | 
 218 |     def requires_model(self) -> bool:
 219 |         """
 220 |         Return whether this tool requires AI model access.
 221 | 
 222 |         Tools that override execute() to do pure data processing (like planner)
 223 |         should return False to skip model resolution at the MCP boundary.
 224 | 
 225 |         Returns:
 226 |             bool: True if tool needs AI model access (default), False for data-only tools
 227 |         """
 228 |         return True
 229 | 
 230 |     def is_effective_auto_mode(self) -> bool:
 231 |         """
 232 |         Check if we're in effective auto mode for schema generation.
 233 | 
 234 |         This determines whether the model parameter should be required in the tool schema.
 235 |         Used at initialization time when schemas are generated.
 236 | 
 237 |         Returns:
 238 |             bool: True if model parameter should be required in the schema
 239 |         """
 240 |         from config import DEFAULT_MODEL
 241 |         from providers.registry import ModelProviderRegistry
 242 | 
 243 |         # Case 1: Explicit auto mode
 244 |         if DEFAULT_MODEL.lower() == "auto":
 245 |             return True
 246 | 
 247 |         # Case 2: Model not available (fallback to auto mode)
 248 |         if DEFAULT_MODEL.lower() != "auto":
 249 |             provider = ModelProviderRegistry.get_provider_for_model(DEFAULT_MODEL)
 250 |             if not provider:
 251 |                 return True
 252 | 
 253 |         return False
 254 | 
 255 |     def _should_require_model_selection(self, model_name: str) -> bool:
 256 |         """
 257 |         Check if we should require the CLI to select a model at runtime.
 258 | 
 259 |         This is called during request execution to determine if we need
 260 |         to return an error asking the CLI to provide a model parameter.
 261 | 
 262 |         Args:
 263 |             model_name: The model name from the request or DEFAULT_MODEL
 264 | 
 265 |         Returns:
 266 |             bool: True if we should require model selection
 267 |         """
 268 |         # Case 1: Model is explicitly "auto"
 269 |         if model_name.lower() == "auto":
 270 |             return True
 271 | 
 272 |         # Case 2: Requested model is not available
 273 |         from providers.registry import ModelProviderRegistry
 274 | 
 275 |         provider = ModelProviderRegistry.get_provider_for_model(model_name)
 276 |         if not provider:
 277 |             logger = logging.getLogger(f"tools.{self.name}")
 278 |             logger.warning(f"Model '{model_name}' is not available with current API keys. Requiring model selection.")
 279 |             return True
 280 | 
 281 |         return False
 282 | 
 283 |     def _get_available_models(self) -> list[str]:
 284 |         """
 285 |         Get list of models available from enabled providers.
 286 | 
 287 |         Only returns models from providers that have valid API keys configured.
 288 |         This fixes the namespace collision bug where models from disabled providers
 289 |         were shown to the CLI, causing routing conflicts.
 290 | 
 291 |         Returns:
 292 |             List of model names from enabled providers only
 293 |         """
 294 |         from providers.registry import ModelProviderRegistry
 295 | 
 296 |         # Get models from enabled providers only (those with valid API keys)
 297 |         all_models = ModelProviderRegistry.get_available_model_names()
 298 | 
 299 |         # Add OpenRouter models if OpenRouter is configured
 300 |         openrouter_key = get_env("OPENROUTER_API_KEY")
 301 |         if openrouter_key and openrouter_key != "your_openrouter_api_key_here":
 302 |             try:
 303 |                 registry = self._get_openrouter_registry()
 304 |                 # Add all aliases from the registry (includes OpenRouter cloud models)
 305 |                 for alias in registry.list_aliases():
 306 |                     if alias not in all_models:
 307 |                         all_models.append(alias)
 308 |             except Exception as e:
 309 |                 import logging
 310 | 
 311 |                 logging.debug(f"Failed to add OpenRouter models to enum: {e}")
 312 | 
 313 |         # Add custom models if custom API is configured
 314 |         custom_url = get_env("CUSTOM_API_URL")
 315 |         if custom_url:
 316 |             try:
 317 |                 registry = self._get_custom_registry()
 318 |                 for alias in registry.list_aliases():
 319 |                     if alias not in all_models:
 320 |                         all_models.append(alias)
 321 |             except Exception as e:
 322 |                 import logging
 323 | 
 324 |                 logging.debug(f"Failed to add custom models to enum: {e}")
 325 | 
 326 |         # Remove duplicates while preserving order
 327 |         seen = set()
 328 |         unique_models = []
 329 |         for model in all_models:
 330 |             if model not in seen:
 331 |                 seen.add(model)
 332 |                 unique_models.append(model)
 333 | 
 334 |         return unique_models
 335 | 
 336 |     def _format_available_models_list(self) -> str:
 337 |         """Return a human-friendly list of available models or guidance when none found."""
 338 | 
 339 |         summaries, total, has_restrictions = self._get_ranked_model_summaries()
 340 |         if not summaries:
 341 |             return (
 342 |                 "No models detected. Configure provider credentials or set DEFAULT_MODEL to a valid option. "
 343 |                 "If the user requested a specific model, respond with this notice instead of substituting another model."
 344 |             )
 345 |         display = "; ".join(summaries)
 346 |         remainder = total - len(summaries)
 347 |         if remainder > 0:
 348 |             display = f"{display}; +{remainder} more (use the `listmodels` tool for the full roster)"
 349 |         return display
 350 | 
 351 |     @staticmethod
 352 |     def _format_context_window(tokens: int) -> Optional[str]:
 353 |         """Convert a raw context window into a short display string."""
 354 | 
 355 |         if not tokens or tokens <= 0:
 356 |             return None
 357 | 
 358 |         if tokens >= 1_000_000:
 359 |             if tokens % 1_000_000 == 0:
 360 |                 return f"{tokens // 1_000_000}M ctx"
 361 |             return f"{tokens / 1_000_000:.1f}M ctx"
 362 | 
 363 |         if tokens >= 1_000:
 364 |             if tokens % 1_000 == 0:
 365 |                 return f"{tokens // 1_000}K ctx"
 366 |             return f"{tokens / 1_000:.1f}K ctx"
 367 | 
 368 |         return f"{tokens} ctx"
 369 | 
 370 |     def _collect_ranked_capabilities(self) -> list[tuple[int, str, Any]]:
 371 |         """Gather available model capabilities sorted by capability rank."""
 372 | 
 373 |         from providers.registry import ModelProviderRegistry
 374 | 
 375 |         ranked: list[tuple[int, str, Any]] = []
 376 |         available = ModelProviderRegistry.get_available_models(respect_restrictions=True)
 377 | 
 378 |         for model_name, provider_type in available.items():
 379 |             provider = ModelProviderRegistry.get_provider(provider_type)
 380 |             if not provider:
 381 |                 continue
 382 | 
 383 |             try:
 384 |                 capabilities = provider.get_capabilities(model_name)
 385 |             except ValueError:
 386 |                 continue
 387 | 
 388 |             rank = capabilities.get_effective_capability_rank()
 389 |             ranked.append((rank, model_name, capabilities))
 390 | 
 391 |         ranked.sort(key=lambda item: (-item[0], item[1]))
 392 |         return ranked
 393 | 
 394 |     @staticmethod
 395 |     def _normalize_model_identifier(name: str) -> str:
 396 |         """Normalize model names for deduplication across providers."""
 397 | 
 398 |         normalized = name.lower()
 399 |         if ":" in normalized:
 400 |             normalized = normalized.split(":", 1)[0]
 401 |         if "/" in normalized:
 402 |             normalized = normalized.split("/", 1)[-1]
 403 |         return normalized
 404 | 
 405 |     def _get_ranked_model_summaries(self, limit: int = 5) -> tuple[list[str], int, bool]:
 406 |         """Return formatted, ranked model summaries and restriction status."""
 407 | 
 408 |         ranked = self._collect_ranked_capabilities()
 409 | 
 410 |         # Build allowlist map (provider -> lowercase names) when restrictions are active
 411 |         allowed_map: dict[Any, set[str]] = {}
 412 |         try:
 413 |             from utils.model_restrictions import get_restriction_service
 414 | 
 415 |             restriction_service = get_restriction_service()
 416 |             if restriction_service:
 417 |                 from providers.shared import ProviderType
 418 | 
 419 |                 for provider_type in ProviderType:
 420 |                     allowed = restriction_service.get_allowed_models(provider_type)
 421 |                     if allowed:
 422 |                         allowed_map[provider_type] = {name.lower() for name in allowed if name}
 423 |         except Exception:
 424 |             allowed_map = {}
 425 | 
 426 |         filtered: list[tuple[int, str, Any]] = []
 427 |         seen_normalized: set[str] = set()
 428 | 
 429 |         for rank, model_name, capabilities in ranked:
 430 |             canonical_name = getattr(capabilities, "model_name", model_name)
 431 |             canonical_lower = canonical_name.lower()
 432 |             alias_lower = model_name.lower()
 433 |             provider_type = getattr(capabilities, "provider", None)
 434 | 
 435 |             if allowed_map:
 436 |                 if provider_type not in allowed_map:
 437 |                     continue
 438 |                 allowed_set = allowed_map[provider_type]
 439 |                 if canonical_lower not in allowed_set and alias_lower not in allowed_set:
 440 |                     continue
 441 | 
 442 |             normalized = self._normalize_model_identifier(canonical_name)
 443 |             if normalized in seen_normalized:
 444 |                 continue
 445 | 
 446 |             seen_normalized.add(normalized)
 447 |             filtered.append((rank, canonical_name, capabilities))
 448 | 
 449 |         summaries: list[str] = []
 450 |         for rank, canonical_name, capabilities in filtered[:limit]:
 451 |             details: list[str] = []
 452 | 
 453 |             context_str = self._format_context_window(capabilities.context_window)
 454 |             if context_str:
 455 |                 details.append(context_str)
 456 | 
 457 |             if capabilities.supports_extended_thinking:
 458 |                 details.append("thinking")
 459 | 
 460 |             if capabilities.allow_code_generation:
 461 |                 details.append("code-gen")
 462 | 
 463 |             base = f"{canonical_name} (score {rank}"
 464 |             if details:
 465 |                 base = f"{base}, {', '.join(details)}"
 466 |             summaries.append(f"{base})")
 467 | 
 468 |         return summaries, len(filtered), bool(allowed_map)
 469 | 
 470 |     def _get_restriction_note(self) -> Optional[str]:
 471 |         """Return a string describing active per-provider allowlists, if any."""
 472 | 
 473 |         env_labels = {
 474 |             "OPENAI_ALLOWED_MODELS": "OpenAI",
 475 |             "GOOGLE_ALLOWED_MODELS": "Google",
 476 |             "XAI_ALLOWED_MODELS": "X.AI",
 477 |             "OPENROUTER_ALLOWED_MODELS": "OpenRouter",
 478 |             "DIAL_ALLOWED_MODELS": "DIAL",
 479 |         }
 480 | 
 481 |         notes: list[str] = []
 482 |         for env_var, label in env_labels.items():
 483 |             raw = get_env(env_var)
 484 |             if not raw:
 485 |                 continue
 486 | 
 487 |             models = sorted({token.strip() for token in raw.split(",") if token.strip()})
 488 |             if not models:
 489 |                 continue
 490 | 
 491 |             notes.append(f"{label}: {', '.join(models)}")
 492 | 
 493 |         if not notes:
 494 |             return None
 495 | 
 496 |         return "Policy allows only → " + "; ".join(notes)
 497 | 
 498 |     def _build_model_unavailable_message(self, model_name: str) -> str:
 499 |         """Compose a consistent error message for unavailable model scenarios."""
 500 | 
 501 |         tool_category = self.get_model_category()
 502 |         suggested_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
 503 |         available_models_text = self._format_available_models_list()
 504 | 
 505 |         return (
 506 |             f"Model '{model_name}' is not available with current API keys. "
 507 |             f"Available models: {available_models_text}. "
 508 |             f"Suggested model for {self.get_name()}: '{suggested_model}' "
 509 |             f"(category: {tool_category.value}). If the user explicitly requested a model, you MUST use that exact name or report this error back—do not substitute another model."
 510 |         )
 511 | 
 512 |     def _build_auto_mode_required_message(self) -> str:
 513 |         """Compose the auto-mode prompt when an explicit model selection is required."""
 514 | 
 515 |         tool_category = self.get_model_category()
 516 |         suggested_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
 517 |         available_models_text = self._format_available_models_list()
 518 | 
 519 |         return (
 520 |             "Model parameter is required in auto mode. "
 521 |             f"Available models: {available_models_text}. "
 522 |             f"Suggested model for {self.get_name()}: '{suggested_model}' "
 523 |             f"(category: {tool_category.value}). When the user names a model, relay that exact name—never swap in another option."
 524 |         )
 525 | 
 526 |     def get_model_field_schema(self) -> dict[str, Any]:
 527 |         """
 528 |         Generate the model field schema based on auto mode configuration.
 529 | 
 530 |         When auto mode is enabled, the model parameter becomes required
 531 |         and includes detailed descriptions of each model's capabilities.
 532 | 
 533 |         Returns:
 534 |             Dict containing the model field JSON schema
 535 |         """
 536 | 
 537 |         from config import DEFAULT_MODEL
 538 | 
 539 |         # Use the centralized effective auto mode check
 540 |         if self.is_effective_auto_mode():
 541 |             description = (
 542 |                 "Currently in auto model selection mode. CRITICAL: When the user names a model, you MUST use that exact name unless the server rejects it. "
 543 |                 "If no model is provided, you may use the `listmodels` tool to review options and select an appropriate match."
 544 |             )
 545 |             summaries, total, restricted = self._get_ranked_model_summaries()
 546 |             remainder = max(0, total - len(summaries))
 547 |             if summaries:
 548 |                 top_line = "; ".join(summaries)
 549 |                 if remainder > 0:
 550 |                     label = "Allowed models" if restricted else "Top models"
 551 |                     top_line = f"{label}: {top_line}; +{remainder} more via `listmodels`."
 552 |                 else:
 553 |                     label = "Allowed models" if restricted else "Top models"
 554 |                     top_line = f"{label}: {top_line}."
 555 |                 description = f"{description} {top_line}"
 556 | 
 557 |             restriction_note = self._get_restriction_note()
 558 |             if restriction_note and (remainder > 0 or not summaries):
 559 |                 description = f"{description} {restriction_note}."
 560 |             return {
 561 |                 "type": "string",
 562 |                 "description": description,
 563 |             }
 564 | 
 565 |         description = (
 566 |             f"The default model is '{DEFAULT_MODEL}'. Override only when the user explicitly requests a different model, and use that exact name. "
 567 |             "If the requested model fails validation, surface the server error instead of substituting another model. When unsure, use the `listmodels` tool for details."
 568 |         )
 569 |         summaries, total, restricted = self._get_ranked_model_summaries()
 570 |         remainder = max(0, total - len(summaries))
 571 |         if summaries:
 572 |             top_line = "; ".join(summaries)
 573 |             if remainder > 0:
 574 |                 label = "Allowed models" if restricted else "Preferred alternatives"
 575 |                 top_line = f"{label}: {top_line}; +{remainder} more via `listmodels`."
 576 |             else:
 577 |                 label = "Allowed models" if restricted else "Preferred alternatives"
 578 |                 top_line = f"{label}: {top_line}."
 579 |             description = f"{description} {top_line}"
 580 | 
 581 |         restriction_note = self._get_restriction_note()
 582 |         if restriction_note and (remainder > 0 or not summaries):
 583 |             description = f"{description} {restriction_note}."
 584 | 
 585 |         return {
 586 |             "type": "string",
 587 |             "description": description,
 588 |         }
 589 | 
 590 |     def get_default_temperature(self) -> float:
 591 |         """
 592 |         Return the default temperature setting for this tool.
 593 | 
 594 |         Override this method to set tool-specific temperature defaults.
 595 |         Lower values (0.0-0.3) for analytical tasks, higher (0.7-1.0) for creative tasks.
 596 | 
 597 |         Returns:
 598 |             float: Default temperature between 0.0 and 1.0
 599 |         """
 600 |         return 0.5
 601 | 
 602 |     def wants_line_numbers_by_default(self) -> bool:
 603 |         """
 604 |         Return whether this tool wants line numbers added to code files by default.
 605 | 
 606 |         By default, ALL tools get line numbers for precise code references.
 607 |         Line numbers are essential for accurate communication about code locations.
 608 | 
 609 |         Returns:
 610 |             bool: True if line numbers should be added by default for this tool
 611 |         """
 612 |         return True  # All tools get line numbers by default for consistency
 613 | 
 614 |     def get_default_thinking_mode(self) -> str:
 615 |         """
 616 |         Return the default thinking mode for this tool.
 617 | 
 618 |         Thinking mode controls computational budget for reasoning.
 619 |         Override for tools that need more or less reasoning depth.
 620 | 
 621 |         Returns:
 622 |             str: One of "minimal", "low", "medium", "high", "max"
 623 |         """
 624 |         return "medium"  # Default to medium thinking for better reasoning
 625 | 
 626 |     def get_model_category(self) -> "ToolModelCategory":
 627 |         """
 628 |         Return the model category for this tool.
 629 | 
 630 |         Model category influences which model is selected in auto mode.
 631 |         Override to specify whether your tool needs extended reasoning,
 632 |         fast response, or balanced capabilities.
 633 | 
 634 |         Returns:
 635 |             ToolModelCategory: Category that influences model selection
 636 |         """
 637 |         from tools.models import ToolModelCategory
 638 | 
 639 |         return ToolModelCategory.BALANCED
 640 | 
 641 |     @abstractmethod
 642 |     def get_request_model(self):
 643 |         """
 644 |         Return the Pydantic model class used for validating requests.
 645 | 
 646 |         This model should inherit from ToolRequest and define all
 647 |         parameters specific to this tool.
 648 | 
 649 |         Returns:
 650 |             Type[ToolRequest]: The request model class
 651 |         """
 652 |         pass
 653 | 
 654 |     def validate_file_paths(self, request) -> Optional[str]:
 655 |         """
 656 |         Validate that all file paths in the request are absolute.
 657 | 
 658 |         This is a critical security function that prevents path traversal attacks
 659 |         and ensures all file access is properly controlled. All file paths must
 660 |         be absolute to avoid ambiguity and security issues.
 661 | 
 662 |         Args:
 663 |             request: The validated request object
 664 | 
 665 |         Returns:
 666 |             Optional[str]: Error message if validation fails, None if all paths are valid
 667 |         """
 668 |         # Only validate files/paths if they exist in the request
 669 |         file_fields = [
 670 |             "absolute_file_paths",
 671 |             "file",
 672 |             "path",
 673 |             "directory",
 674 |             "notebooks",
 675 |             "test_examples",
 676 |             "style_guide_examples",
 677 |             "files_checked",
 678 |             "relevant_files",
 679 |         ]
 680 | 
 681 |         for field_name in file_fields:
 682 |             if hasattr(request, field_name):
 683 |                 field_value = getattr(request, field_name)
 684 |                 if field_value is None:
 685 |                     continue
 686 | 
 687 |                 # Handle both single paths and lists of paths
 688 |                 paths_to_check = field_value if isinstance(field_value, list) else [field_value]
 689 | 
 690 |                 for path in paths_to_check:
 691 |                     if path and not os.path.isabs(path):
 692 |                         return f"All file paths must be FULL absolute paths. Invalid path: '{path}'"
 693 | 
 694 |         return None
 695 | 
 696 |     def _validate_token_limit(self, content: str, content_type: str = "Content") -> None:
 697 |         """
 698 |         Validate that user-provided content doesn't exceed the MCP prompt size limit.
 699 | 
 700 |         This enforcement is strictly for text crossing the MCP transport boundary
 701 |         (i.e., user input). Internal prompt construction may exceed this size and is
 702 |         governed by model-specific token limits.
 703 | 
 704 |         Args:
 705 |             content: The user-originated content to validate
 706 |             content_type: Description of the content type for error messages
 707 | 
 708 |         Raises:
 709 |             ValueError: If content exceeds the character size limit
 710 |         """
 711 |         if not content:
 712 |             logger.debug(f"{self.name} tool {content_type.lower()} validation skipped (no content)")
 713 |             return
 714 | 
 715 |         char_count = len(content)
 716 |         if char_count > MCP_PROMPT_SIZE_LIMIT:
 717 |             token_estimate = estimate_tokens(content)
 718 |             error_msg = (
 719 |                 f"{char_count:,} characters (~{token_estimate:,} tokens). "
 720 |                 f"Maximum is {MCP_PROMPT_SIZE_LIMIT:,} characters."
 721 |             )
 722 |             logger.error(f"{self.name} tool {content_type.lower()} validation failed: {error_msg}")
 723 |             raise ValueError(f"{content_type} too large: {error_msg}")
 724 | 
 725 |         token_estimate = estimate_tokens(content)
 726 |         logger.debug(
 727 |             f"{self.name} tool {content_type.lower()} validation passed: "
 728 |             f"{char_count:,} characters (~{token_estimate:,} tokens)"
 729 |         )
 730 | 
 731 |     def get_model_provider(self, model_name: str) -> ModelProvider:
 732 |         """
 733 |         Get the appropriate model provider for the given model name.
 734 | 
 735 |         This method performs runtime validation to ensure the requested model
 736 |         is actually available with the current API key configuration.
 737 | 
 738 |         Args:
 739 |             model_name: Name of the model to get provider for
 740 | 
 741 |         Returns:
 742 |             ModelProvider: The provider instance for the model
 743 | 
 744 |         Raises:
 745 |             ValueError: If the model is not available or provider not found
 746 |         """
 747 |         try:
 748 |             provider = ModelProviderRegistry.get_provider_for_model(model_name)
 749 |             if not provider:
 750 |                 logger.error(f"No provider found for model '{model_name}' in {self.name} tool")
 751 |                 raise ValueError(self._build_model_unavailable_message(model_name))
 752 | 
 753 |             return provider
 754 |         except Exception as e:
 755 |             logger.error(f"Failed to get provider for model '{model_name}' in {self.name} tool: {e}")
 756 |             raise
 757 | 
 758 |     # === CONVERSATION AND FILE HANDLING METHODS ===
 759 | 
 760 |     def get_conversation_embedded_files(self, continuation_id: Optional[str]) -> list[str]:
 761 |         """
 762 |         Get list of files already embedded in conversation history.
 763 | 
 764 |         This method returns the list of files that have already been embedded
 765 |         in the conversation history for a given continuation thread. Tools can
 766 |         use this to avoid re-embedding files that are already available in the
 767 |         conversation context.
 768 | 
 769 |         Args:
 770 |             continuation_id: Thread continuation ID, or None for new conversations
 771 | 
 772 |         Returns:
 773 |             list[str]: List of file paths already embedded in conversation history
 774 |         """
 775 |         if not continuation_id:
 776 |             # New conversation, no files embedded yet
 777 |             return []
 778 | 
 779 |         thread_context = get_thread(continuation_id)
 780 |         if not thread_context:
 781 |             # Thread not found, no files embedded
 782 |             return []
 783 | 
 784 |         embedded_files = get_conversation_file_list(thread_context)
 785 |         logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files")
 786 |         return embedded_files
 787 | 
 788 |     def filter_new_files(self, requested_files: list[str], continuation_id: Optional[str]) -> list[str]:
 789 |         """
 790 |         Filter out files that are already embedded in conversation history.
 791 | 
 792 |         This method prevents duplicate file embeddings by filtering out files that have
 793 |         already been embedded in the conversation history. This optimizes token usage
 794 |         while ensuring tools still have logical access to all requested files through
 795 |         conversation history references.
 796 | 
 797 |         Args:
 798 |             requested_files: List of files requested for current tool execution
 799 |             continuation_id: Thread continuation ID, or None for new conversations
 800 | 
 801 |         Returns:
 802 |             list[str]: List of files that need to be embedded (not already in history)
 803 |         """
 804 |         logger.debug(f"[FILES] {self.name}: Filtering {len(requested_files)} requested files")
 805 | 
 806 |         if not continuation_id:
 807 |             # New conversation, all files are new
 808 |             logger.debug(f"[FILES] {self.name}: New conversation, all {len(requested_files)} files are new")
 809 |             return requested_files
 810 | 
 811 |         try:
 812 |             embedded_files = set(self.get_conversation_embedded_files(continuation_id))
 813 |             logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files in conversation")
 814 | 
 815 |             # Safety check: If no files are marked as embedded but we have a continuation_id,
 816 |             # this might indicate an issue with conversation history. Be conservative.
 817 |             if not embedded_files:
 818 |                 logger.debug(f"{self.name} tool: No files found in conversation history for thread {continuation_id}")
 819 |                 logger.debug(
 820 |                     f"[FILES] {self.name}: No embedded files found, returning all {len(requested_files)} requested files"
 821 |                 )
 822 |                 return requested_files
 823 | 
 824 |             # Return only files that haven't been embedded yet
 825 |             new_files = [f for f in requested_files if f not in embedded_files]
 826 |             logger.debug(
 827 |                 f"[FILES] {self.name}: After filtering: {len(new_files)} new files, {len(requested_files) - len(new_files)} already embedded"
 828 |             )
 829 |             logger.debug(f"[FILES] {self.name}: New files to embed: {new_files}")
 830 | 
 831 |             # Log filtering results for debugging
 832 |             if len(new_files) < len(requested_files):
 833 |                 skipped = [f for f in requested_files if f in embedded_files]
 834 |                 logger.debug(
 835 |                     f"{self.name} tool: Filtering {len(skipped)} files already in conversation history: {', '.join(skipped)}"
 836 |                 )
 837 |                 logger.debug(f"[FILES] {self.name}: Skipped (already embedded): {skipped}")
 838 | 
 839 |             return new_files
 840 | 
 841 |         except Exception as e:
 842 |             # If there's any issue with conversation history lookup, be conservative
 843 |             # and include all files rather than risk losing access to needed files
 844 |             logger.warning(f"{self.name} tool: Error checking conversation history for {continuation_id}: {e}")
 845 |             logger.warning(f"{self.name} tool: Including all requested files as fallback")
 846 |             logger.debug(
 847 |                 f"[FILES] {self.name}: Exception in filter_new_files, returning all {len(requested_files)} files as fallback"
 848 |             )
 849 |             return requested_files
 850 | 
 851 |     def format_conversation_turn(self, turn: ConversationTurn) -> list[str]:
 852 |         """
 853 |         Format a conversation turn for display in conversation history.
 854 | 
 855 |         Tools can override this to provide custom formatting for their responses
 856 |         while maintaining the standard structure for cross-tool compatibility.
 857 | 
 858 |         This method is called by build_conversation_history when reconstructing
 859 |         conversation context, allowing each tool to control how its responses
 860 |         appear in subsequent conversation turns.
 861 | 
 862 |         Args:
 863 |             turn: The conversation turn to format (from utils.conversation_memory)
 864 | 
 865 |         Returns:
 866 |             list[str]: Lines of formatted content for this turn
 867 | 
 868 |         Example:
 869 |             Default implementation returns:
 870 |             ["Files used in this turn: file1.py, file2.py", "", "Response content..."]
 871 | 
 872 |             Tools can override to add custom sections, formatting, or metadata display.
 873 |         """
 874 |         parts = []
 875 | 
 876 |         # Add files context if present
 877 |         if turn.files:
 878 |             parts.append(f"Files used in this turn: {', '.join(turn.files)}")
 879 |             parts.append("")  # Empty line for readability
 880 | 
 881 |         # Add the actual content
 882 |         parts.append(turn.content)
 883 | 
 884 |         return parts
 885 | 
 886 |     def handle_prompt_file(self, files: Optional[list[str]]) -> tuple[Optional[str], Optional[list[str]]]:
 887 |         """
 888 |         Check for and handle prompt.txt in the absolute file paths list.
 889 | 
 890 |         If prompt.txt is found, reads its content and removes it from the files list.
 891 |         This file is treated specially as the main prompt, not as an embedded file.
 892 | 
 893 |         This mechanism allows us to work around MCP's ~25K token limit by having
 894 |         the CLI save large prompts to a file, effectively using the file transfer
 895 |         mechanism to bypass token constraints while preserving response capacity.
 896 | 
 897 |         Args:
 898 |             files: List of absolute file paths (will be translated for current environment)
 899 | 
 900 |         Returns:
 901 |             tuple: (prompt_content, updated_files_list)
 902 |         """
 903 |         if not files:
 904 |             return None, files
 905 | 
 906 |         prompt_content = None
 907 |         updated_files = []
 908 | 
 909 |         for file_path in files:
 910 | 
 911 |             # Check if the filename is exactly "prompt.txt"
 912 |             # This ensures we don't match files like "myprompt.txt" or "prompt.txt.bak"
 913 |             if os.path.basename(file_path) == "prompt.txt":
 914 |                 try:
 915 |                     # Read prompt.txt content and extract just the text
 916 |                     content, _ = read_file_content(file_path)
 917 |                     # Extract the content between the file markers
 918 |                     if "--- BEGIN FILE:" in content and "--- END FILE:" in content:
 919 |                         lines = content.split("\n")
 920 |                         in_content = False
 921 |                         content_lines = []
 922 |                         for line in lines:
 923 |                             if line.startswith("--- BEGIN FILE:"):
 924 |                                 in_content = True
 925 |                                 continue
 926 |                             elif line.startswith("--- END FILE:"):
 927 |                                 break
 928 |                             elif in_content:
 929 |                                 content_lines.append(line)
 930 |                         prompt_content = "\n".join(content_lines)
 931 |                     else:
 932 |                         # Fallback: if it's already raw content (from tests or direct input)
 933 |                         # and doesn't have error markers, use it directly
 934 |                         if not content.startswith("\n--- ERROR"):
 935 |                             prompt_content = content
 936 |                         else:
 937 |                             prompt_content = None
 938 |                 except Exception:
 939 |                     # If we can't read the file, we'll just skip it
 940 |                     # The error will be handled elsewhere
 941 |                     pass
 942 |             else:
 943 |                 # Keep the original path in the files list (will be translated later by read_files)
 944 |                 updated_files.append(file_path)
 945 | 
 946 |         return prompt_content, updated_files if updated_files else None
 947 | 
 948 |     def get_prompt_content_for_size_validation(self, user_content: str) -> str:
 949 |         """
 950 |         Get the content that should be validated for MCP prompt size limits.
 951 | 
 952 |         This hook method allows tools to specify what content should be checked
 953 |         against the MCP transport size limit. By default, it returns the user content,
 954 |         but can be overridden to exclude conversation history when needed.
 955 | 
 956 |         Args:
 957 |             user_content: The user content that would normally be validated
 958 | 
 959 |         Returns:
 960 |             The content that should actually be validated for size limits
 961 |         """
 962 |         # Default implementation: validate the full user content
 963 |         return user_content
 964 | 
 965 |     def check_prompt_size(self, text: str) -> Optional[dict[str, Any]]:
 966 |         """
 967 |         Check if USER INPUT text is too large for MCP transport boundary.
 968 | 
 969 |         IMPORTANT: This method should ONLY be used to validate user input that crosses
 970 |         the CLI ↔ MCP Server transport boundary. It should NOT be used to limit
 971 |         internal MCP Server operations.
 972 | 
 973 |         Args:
 974 |             text: The user input text to check (NOT internal prompt content)
 975 | 
 976 |         Returns:
 977 |             Optional[Dict[str, Any]]: Response asking for file handling if too large, None otherwise
 978 |         """
 979 |         if text and len(text) > MCP_PROMPT_SIZE_LIMIT:
 980 |             return {
 981 |                 "status": "resend_prompt",
 982 |                 "content": (
 983 |                     f"MANDATORY ACTION REQUIRED: The prompt is too large for MCP's token limits (>{MCP_PROMPT_SIZE_LIMIT:,} characters). "
 984 |                     "YOU MUST IMMEDIATELY save the prompt text to a temporary file named 'prompt.txt' in the working directory. "
 985 |                     "DO NOT attempt to shorten or modify the prompt. SAVE IT AS-IS to 'prompt.txt'. "
 986 |                     "Then resend the request, passing the absolute file path to 'prompt.txt' as part of the tool call, "
 987 |                     "along with any other files you wish to share as context. Leave the prompt text itself empty or very brief in the new request. "
 988 |                     "This is the ONLY way to handle large prompts - you MUST follow these exact steps."
 989 |                 ),
 990 |                 "content_type": "text",
 991 |                 "metadata": {
 992 |                     "prompt_size": len(text),
 993 |                     "limit": MCP_PROMPT_SIZE_LIMIT,
 994 |                     "instructions": "MANDATORY: Save prompt to 'prompt.txt' in current folder and provide full path when recalling this tool.",
 995 |                 },
 996 |             }
 997 |         return None
 998 | 
 999 |     def _prepare_file_content_for_prompt(
1000 |         self,
1001 |         request_files: list[str],
1002 |         continuation_id: Optional[str],
1003 |         context_description: str = "New files",
1004 |         max_tokens: Optional[int] = None,
1005 |         reserve_tokens: int = 1_000,
1006 |         remaining_budget: Optional[int] = None,
1007 |         arguments: Optional[dict] = None,
1008 |         model_context: Optional[Any] = None,
1009 |     ) -> tuple[str, list[str]]:
1010 |         """
1011 |         Centralized file processing implementing dual prioritization strategy.
1012 | 
1013 |         This method is the heart of conversation-aware file processing across all tools.
1014 | 
1015 |         Args:
1016 |             request_files: List of files requested for current tool execution
1017 |             continuation_id: Thread continuation ID, or None for new conversations
1018 |             context_description: Description for token limit validation (e.g. "Code", "New files")
1019 |             max_tokens: Maximum tokens to use (defaults to remaining budget or model-specific content allocation)
1020 |             reserve_tokens: Tokens to reserve for additional prompt content (default 1K)
1021 |             remaining_budget: Remaining token budget after conversation history (from server.py)
1022 |             arguments: Original tool arguments (used to extract _remaining_tokens if available)
1023 |             model_context: Model context object with all model information including token allocation
1024 | 
1025 |         Returns:
1026 |             tuple[str, list[str]]: (formatted_file_content, actually_processed_files)
1027 |                 - formatted_file_content: Formatted file content string ready for prompt inclusion
1028 |                 - actually_processed_files: List of individual file paths that were actually read and embedded
1029 |                   (directories are expanded to individual files)
1030 |         """
1031 |         if not request_files:
1032 |             return "", []
1033 | 
1034 |         # Extract remaining budget from arguments if available
1035 |         if remaining_budget is None:
1036 |             # Use provided arguments or fall back to stored arguments from execute()
1037 |             args_to_use = arguments or getattr(self, "_current_arguments", {})
1038 |             remaining_budget = args_to_use.get("_remaining_tokens")
1039 | 
1040 |         # Use remaining budget if provided, otherwise fall back to max_tokens or model-specific default
1041 |         if remaining_budget is not None:
1042 |             effective_max_tokens = remaining_budget - reserve_tokens
1043 |         elif max_tokens is not None:
1044 |             effective_max_tokens = max_tokens - reserve_tokens
1045 |         else:
1046 |             # Use model_context for token allocation
1047 |             if not model_context:
1048 |                 # Try to get from stored attributes as fallback
1049 |                 model_context = getattr(self, "_model_context", None)
1050 |                 if not model_context:
1051 |                     logger.error(
1052 |                         f"[FILES] {self.name}: _prepare_file_content_for_prompt called without model_context. "
1053 |                         "This indicates an incorrect call sequence in the tool's implementation."
1054 |                     )
1055 |                     raise RuntimeError("Model context not provided for file preparation.")
1056 | 
1057 |             # This is now the single source of truth for token allocation.
1058 |             try:
1059 |                 token_allocation = model_context.calculate_token_allocation()
1060 |                 # Standardize on `file_tokens` for consistency and correctness.
1061 |                 effective_max_tokens = token_allocation.file_tokens - reserve_tokens
1062 |                 logger.debug(
1063 |                     f"[FILES] {self.name}: Using model context for {model_context.model_name}: "
1064 |                     f"{token_allocation.file_tokens:,} file tokens from {token_allocation.total_tokens:,} total"
1065 |                 )
1066 |             except Exception as e:
1067 |                 logger.error(
1068 |                     f"[FILES] {self.name}: Failed to calculate token allocation from model context: {e}", exc_info=True
1069 |                 )
1070 |                 # If the context exists but calculation fails, we still need to prevent a crash.
1071 |                 # A loud error is logged, and we fall back to a safe default.
1072 |                 effective_max_tokens = 100_000 - reserve_tokens
1073 | 
1074 |         # Ensure we have a reasonable minimum budget
1075 |         effective_max_tokens = max(1000, effective_max_tokens)
1076 | 
1077 |         files_to_embed = self.filter_new_files(request_files, continuation_id)
1078 |         logger.debug(f"[FILES] {self.name}: Will embed {len(files_to_embed)} files after filtering")
1079 | 
1080 |         # Log the specific files for debugging/testing
1081 |         if files_to_embed:
1082 |             logger.info(
1083 |                 f"[FILE_PROCESSING] {self.name} tool will embed new files: {', '.join([os.path.basename(f) for f in files_to_embed])}"
1084 |             )
1085 |         else:
1086 |             logger.info(
1087 |                 f"[FILE_PROCESSING] {self.name} tool: No new files to embed (all files already in conversation history)"
1088 |             )
1089 | 
1090 |         content_parts = []
1091 |         actually_processed_files = []
1092 | 
1093 |         # Read content of new files only
1094 |         if files_to_embed:
1095 |             logger.debug(f"{self.name} tool embedding {len(files_to_embed)} new files: {', '.join(files_to_embed)}")
1096 |             logger.debug(
1097 |                 f"[FILES] {self.name}: Starting file embedding with token budget {effective_max_tokens + reserve_tokens:,}"
1098 |             )
1099 |             try:
1100 |                 # Before calling read_files, expand directories to get individual file paths
1101 |                 from utils.file_utils import expand_paths
1102 | 
1103 |                 expanded_files = expand_paths(files_to_embed)
1104 |                 logger.debug(
1105 |                     f"[FILES] {self.name}: Expanded {len(files_to_embed)} paths to {len(expanded_files)} individual files"
1106 |                 )
1107 | 
1108 |                 file_content = read_files(
1109 |                     files_to_embed,
1110 |                     max_tokens=effective_max_tokens + reserve_tokens,
1111 |                     reserve_tokens=reserve_tokens,
1112 |                     include_line_numbers=self.wants_line_numbers_by_default(),
1113 |                 )
1114 |                 # Note: No need to validate against MCP_PROMPT_SIZE_LIMIT here
1115 |                 # read_files already handles token-aware truncation based on model's capabilities
1116 |                 content_parts.append(file_content)
1117 | 
1118 |                 # Track the expanded files as actually processed
1119 |                 actually_processed_files.extend(expanded_files)
1120 | 
1121 |                 # Estimate tokens for debug logging
1122 |                 from utils.token_utils import estimate_tokens
1123 | 
1124 |                 content_tokens = estimate_tokens(file_content)
1125 |                 logger.debug(
1126 |                     f"{self.name} tool successfully embedded {len(files_to_embed)} files ({content_tokens:,} tokens)"
1127 |                 )
1128 |                 logger.debug(f"[FILES] {self.name}: Successfully embedded files - {content_tokens:,} tokens used")
1129 |                 logger.debug(
1130 |                     f"[FILES] {self.name}: Actually processed {len(actually_processed_files)} individual files"
1131 |                 )
1132 |             except Exception as e:
1133 |                 logger.error(f"{self.name} tool failed to embed files {files_to_embed}: {type(e).__name__}: {e}")
1134 |                 logger.debug(f"[FILES] {self.name}: File embedding failed - {type(e).__name__}: {e}")
1135 |                 raise
1136 |         else:
1137 |             logger.debug(f"[FILES] {self.name}: No files to embed after filtering")
1138 | 
1139 |         # Generate note about files already in conversation history
1140 |         if continuation_id and len(files_to_embed) < len(request_files):
1141 |             embedded_files = self.get_conversation_embedded_files(continuation_id)
1142 |             skipped_files = [f for f in request_files if f in embedded_files]
1143 |             if skipped_files:
1144 |                 logger.debug(
1145 |                     f"{self.name} tool skipping {len(skipped_files)} files already in conversation history: {', '.join(skipped_files)}"
1146 |                 )
1147 |                 logger.debug(f"[FILES] {self.name}: Adding note about {len(skipped_files)} skipped files")
1148 |                 if content_parts:
1149 |                     content_parts.append("\n\n")
1150 |                 note_lines = [
1151 |                     "--- NOTE: Additional files referenced in conversation history ---",
1152 |                     "The following files are already available in our conversation context:",
1153 |                     "\n".join(f"  - {f}" for f in skipped_files),
1154 |                     "--- END NOTE ---",
1155 |                 ]
1156 |                 content_parts.append("\n".join(note_lines))
1157 |             else:
1158 |                 logger.debug(f"[FILES] {self.name}: No skipped files to note")
1159 | 
1160 |         result = "".join(content_parts) if content_parts else ""
1161 |         logger.debug(
1162 |             f"[FILES] {self.name}: _prepare_file_content_for_prompt returning {len(result)} chars, {len(actually_processed_files)} processed files"
1163 |         )
1164 |         return result, actually_processed_files
1165 | 
1166 |     def get_websearch_instruction(self, tool_specific: Optional[str] = None) -> str:
1167 |         """
1168 |         Generate standardized web search instruction.
1169 | 
1170 |         Args:
1171 |             tool_specific: Optional tool-specific search guidance
1172 | 
1173 |         Returns:
1174 |             str: Web search instruction to append to prompt
1175 |         """
1176 | 
1177 |         base_instruction = """
1178 | 
1179 | WEB SEARCH CAPABILITY: You can request the calling agent to perform web searches to enhance your analysis with current information!
1180 | 
1181 | IMPORTANT: When you identify areas where web searches would significantly improve your response (such as checking current documentation, finding recent solutions, verifying best practices, or gathering community insights), you MUST explicitly instruct the agent to perform specific web searches and then respond back using the continuation_id from this response to continue the analysis.
1182 | 
1183 | Use clear, direct language based on the value of the search:
1184 | 
1185 | For valuable supplementary information: "Please perform a web search on '[specific topic/query]' and then continue this analysis using the continuation_id from this response if you find relevant information."
1186 | 
1187 | For important missing information: "Please search for '[specific topic/query]' and respond back with the findings using the continuation_id from this response - this information is needed to provide a complete analysis."
1188 | 
1189 | For critical/essential information: "SEARCH REQUIRED: Please immediately perform a web search on '[specific topic/query]' and respond back with the results using the continuation_id from this response. Cannot provide accurate analysis without this current information."
1190 | 
1191 | This ensures you get the most current and comprehensive information while maintaining conversation context through the continuation_id."""
1192 | 
1193 |         if tool_specific:
1194 |             return f"""{base_instruction}
1195 | 
1196 | {tool_specific}
1197 | 
1198 | When recommending searches, be specific about what information you need and why it would improve your analysis."""
1199 | 
1200 |         # Default instruction for all tools
1201 |         return f"""{base_instruction}
1202 | 
1203 | Consider requesting searches for:
1204 | - Current documentation and API references
1205 | - Recent best practices and patterns
1206 | - Known issues and community solutions
1207 | - Framework updates and compatibility
1208 | - Security advisories and patches
1209 | - Performance benchmarks and optimizations
1210 | 
1211 | When recommending searches, be specific about what information you need and why it would improve your analysis. Always remember to instruct agent to use the continuation_id from this response when providing search results."""
1212 | 
1213 |     def get_language_instruction(self) -> str:
1214 |         """
1215 |         Generate language instruction based on LOCALE configuration.
1216 | 
1217 |         Returns:
1218 |             str: Language instruction to prepend to prompt, or empty string if
1219 |                  no locale set
1220 |         """
1221 |         # Read LOCALE directly from environment to support dynamic changes
1222 |         # Tests can monkeypatch LOCALE via the environment helper (or .env when override is enforced)
1223 | 
1224 |         locale = (get_env("LOCALE", "") or "").strip()
1225 | 
1226 |         if not locale:
1227 |             return ""
1228 | 
1229 |         # Simple language instruction
1230 |         return f"Always respond in {locale}.\n\n"
1231 | 
1232 |     # === ABSTRACT METHODS FOR SIMPLE TOOLS ===
1233 | 
1234 |     @abstractmethod
1235 |     async def prepare_prompt(self, request) -> str:
1236 |         """
1237 |         Prepare the complete prompt for the AI model.
1238 | 
1239 |         This method should construct the full prompt by combining:
1240 |         - System prompt from get_system_prompt()
1241 |         - File content from _prepare_file_content_for_prompt()
1242 |         - Conversation history from reconstruct_thread_context()
1243 |         - User's request and any tool-specific context
1244 | 
1245 |         Args:
1246 |             request: The validated request object
1247 | 
1248 |         Returns:
1249 |             str: Complete prompt ready for the AI model
1250 |         """
1251 |         pass
1252 | 
1253 |     def format_response(self, response: str, request, model_info: dict = None) -> str:
1254 |         """
1255 |         Format the AI model's response for the user.
1256 | 
1257 |         This method allows tools to post-process the model's response,
1258 |         adding structure, validation, or additional context.
1259 | 
1260 |         The default implementation returns the response unchanged.
1261 |         Tools can override this method to add custom formatting.
1262 | 
1263 |         Args:
1264 |             response: Raw response from the AI model
1265 |             request: The original request object
1266 |             model_info: Optional model information and metadata
1267 | 
1268 |         Returns:
1269 |             str: Formatted response ready for the user
1270 |         """
1271 |         return response
1272 | 
1273 |     # === IMPLEMENTATION METHODS ===
1274 |     # These will be provided in a full implementation but are inherited from current base.py
1275 |     # for now to maintain compatibility.
1276 | 
1277 |     async def execute(self, arguments: dict[str, Any]) -> list[TextContent]:
1278 |         """Execute the tool - will be inherited from existing base.py for now."""
1279 |         # This will be implemented by importing from the current base.py
1280 |         # for backward compatibility during the migration
1281 |         raise NotImplementedError("Subclasses must implement execute method")
1282 | 
1283 |     def _should_require_model_selection(self, model_name: str) -> bool:
1284 |         """
1285 |         Check if we should require the CLI to select a model at runtime.
1286 | 
1287 |         This is called during request execution to determine if we need
1288 |         to return an error asking the CLI to provide a model parameter.
1289 | 
1290 |         Args:
1291 |             model_name: The model name from the request or DEFAULT_MODEL
1292 | 
1293 |         Returns:
1294 |             bool: True if we should require model selection
1295 |         """
1296 |         # Case 1: Model is explicitly "auto"
1297 |         if model_name.lower() == "auto":
1298 |             return True
1299 | 
1300 |         # Case 2: Requested model is not available
1301 |         from providers.registry import ModelProviderRegistry
1302 | 
1303 |         provider = ModelProviderRegistry.get_provider_for_model(model_name)
1304 |         if not provider:
1305 |             logger.warning(f"Model '{model_name}' is not available with current API keys. Requiring model selection.")
1306 |             return True
1307 | 
1308 |         return False
1309 | 
1310 |     def _get_available_models(self) -> list[str]:
1311 |         """
1312 |         Get list of models available from enabled providers.
1313 | 
1314 |         Only returns models from providers that have valid API keys configured.
1315 |         This fixes the namespace collision bug where models from disabled providers
1316 |         were shown to the CLI, causing routing conflicts.
1317 | 
1318 |         Returns:
1319 |             List of model names from enabled providers only
1320 |         """
1321 |         from providers.registry import ModelProviderRegistry
1322 | 
1323 |         # Get models from enabled providers only (those with valid API keys)
1324 |         all_models = ModelProviderRegistry.get_available_model_names()
1325 | 
1326 |         # Add OpenRouter models and their aliases when OpenRouter is configured
1327 |         openrouter_key = get_env("OPENROUTER_API_KEY")
1328 |         if openrouter_key and openrouter_key != "your_openrouter_api_key_here":
1329 |             try:
1330 |                 registry = self._get_openrouter_registry()
1331 | 
1332 |                 for alias in registry.list_aliases():
1333 |                     if alias not in all_models:
1334 |                         all_models.append(alias)
1335 |             except Exception as exc:  # pragma: no cover - logged for observability
1336 |                 import logging
1337 | 
1338 |                 logging.debug(f"Failed to add OpenRouter models to enum: {exc}")
1339 | 
1340 |         # Add custom models (and their aliases) when a custom endpoint is available
1341 |         custom_url = get_env("CUSTOM_API_URL")
1342 |         if custom_url:
1343 |             try:
1344 |                 registry = self._get_custom_registry()
1345 |                 for alias in registry.list_aliases():
1346 |                     if alias not in all_models:
1347 |                         all_models.append(alias)
1348 |             except Exception as exc:  # pragma: no cover - logged for observability
1349 |                 import logging
1350 | 
1351 |                 logging.debug(f"Failed to add custom models to enum: {exc}")
1352 | 
1353 |         # Remove duplicates while preserving insertion order
1354 |         seen: set[str] = set()
1355 |         unique_models: list[str] = []
1356 |         for model in all_models:
1357 |             if model not in seen:
1358 |                 seen.add(model)
1359 |                 unique_models.append(model)
1360 | 
1361 |         return unique_models
1362 | 
1363 |     def _resolve_model_context(self, arguments: dict, request) -> tuple[str, Any]:
1364 |         """
1365 |         Resolve model context and name using centralized logic.
1366 | 
1367 |         This method extracts the model resolution logic from execute() so it can be
1368 |         reused by tools that override execute() (like debug tool) without duplicating code.
1369 | 
1370 |         Args:
1371 |             arguments: Dictionary of arguments from the MCP client
1372 |             request: The validated request object
1373 | 
1374 |         Returns:
1375 |             tuple[str, ModelContext]: (resolved_model_name, model_context)
1376 | 
1377 |         Raises:
1378 |             ValueError: If model resolution fails or model selection is required
1379 |         """
1380 |         # MODEL RESOLUTION NOW HAPPENS AT MCP BOUNDARY
1381 |         # Extract pre-resolved model context from server.py
1382 |         model_context = arguments.get("_model_context")
1383 |         resolved_model_name = arguments.get("_resolved_model_name")
1384 | 
1385 |         if model_context and resolved_model_name:
1386 |             # Model was already resolved at MCP boundary
1387 |             model_name = resolved_model_name
1388 |             logger.debug(f"Using pre-resolved model '{model_name}' from MCP boundary")
1389 |         else:
1390 |             # Fallback for direct execute calls
1391 |             model_name = getattr(request, "model", None)
1392 |             if not model_name:
1393 |                 from config import DEFAULT_MODEL
1394 | 
1395 |                 model_name = DEFAULT_MODEL
1396 |             logger.debug(f"Using fallback model resolution for '{model_name}' (test mode)")
1397 | 
1398 |             # For tests: Check if we should require model selection (auto mode)
1399 |             if self._should_require_model_selection(model_name):
1400 |                 # Build error message based on why selection is required
1401 |                 if model_name.lower() == "auto":
1402 |                     error_message = self._build_auto_mode_required_message()
1403 |                 else:
1404 |                     error_message = self._build_model_unavailable_message(model_name)
1405 |                 raise ValueError(error_message)
1406 | 
1407 |             # Create model context for tests
1408 |             from utils.model_context import ModelContext
1409 | 
1410 |             model_context = ModelContext(model_name)
1411 | 
1412 |         return model_name, model_context
1413 | 
1414 |     def validate_and_correct_temperature(self, temperature: float, model_context: Any) -> tuple[float, list[str]]:
1415 |         """
1416 |         Validate and correct temperature for the specified model.
1417 | 
1418 |         This method ensures that the temperature value is within the valid range
1419 |         for the specific model being used. Different models have different temperature
1420 |         constraints (e.g., o1 models require temperature=1.0, GPT models support 0-2).
1421 | 
1422 |         Args:
1423 |             temperature: Temperature value to validate
1424 |             model_context: Model context object containing model name, provider, and capabilities
1425 | 
1426 |         Returns:
1427 |             Tuple of (corrected_temperature, warning_messages)
1428 |         """
1429 |         try:
1430 |             # Use model context capabilities directly - clean OOP approach
1431 |             capabilities = model_context.capabilities
1432 |             constraint = capabilities.temperature_constraint
1433 | 
1434 |             warnings = []
1435 |             if not constraint.validate(temperature):
1436 |                 corrected = constraint.get_corrected_value(temperature)
1437 |                 warning = (
1438 |                     f"Temperature {temperature} invalid for {model_context.model_name}. "
1439 |                     f"{constraint.get_description()}. Using {corrected} instead."
1440 |                 )
1441 |                 warnings.append(warning)
1442 |                 return corrected, warnings
1443 | 
1444 |             return temperature, warnings
1445 | 
1446 |         except Exception as e:
1447 |             # If validation fails for any reason, use the original temperature
1448 |             # and log a warning (but don't fail the request)
1449 |             logger.warning(f"Temperature validation failed for {model_context.model_name}: {e}")
1450 |             return temperature, [f"Temperature validation failed: {e}"]
1451 | 
1452 |     def _validate_image_limits(
1453 |         self, images: Optional[list[str]], model_context: Optional[Any] = None, continuation_id: Optional[str] = None
1454 |     ) -> Optional[dict]:
1455 |         """
1456 |         Validate image size and count against model capabilities.
1457 | 
1458 |         This performs strict validation to ensure we don't exceed model-specific
1459 |         image limits. Uses capability-based validation with actual model
1460 |         configuration rather than hard-coded limits.
1461 | 
1462 |         Args:
1463 |             images: List of image paths/data URLs to validate
1464 |             model_context: Model context object containing model name, provider, and capabilities
1465 |             continuation_id: Optional continuation ID for conversation context
1466 | 
1467 |         Returns:
1468 |             Optional[dict]: Error response if validation fails, None if valid
1469 |         """
1470 |         if not images:
1471 |             return None
1472 | 
1473 |         # Import here to avoid circular imports
1474 |         import base64
1475 |         from pathlib import Path
1476 | 
1477 |         if not model_context:
1478 |             # Get from tool's stored context as fallback
1479 |             model_context = getattr(self, "_model_context", None)
1480 |             if not model_context:
1481 |                 logger.warning("No model context available for image validation")
1482 |                 return None
1483 | 
1484 |         try:
1485 |             # Use model context capabilities directly - clean OOP approach
1486 |             capabilities = model_context.capabilities
1487 |             model_name = model_context.model_name
1488 |         except Exception as e:
1489 |             logger.warning(f"Failed to get capabilities from model_context for image validation: {e}")
1490 |             # Generic error response when capabilities cannot be accessed
1491 |             model_name = getattr(model_context, "model_name", "unknown")
1492 |             return {
1493 |                 "status": "error",
1494 |                 "content": self._build_model_unavailable_message(model_name),
1495 |                 "content_type": "text",
1496 |                 "metadata": {
1497 |                     "error_type": "validation_error",
1498 |                     "model_name": model_name,
1499 |                     "supports_images": None,  # Unknown since model capabilities unavailable
1500 |                     "image_count": len(images) if images else 0,
1501 |                 },
1502 |             }
1503 | 
1504 |         # Check if model supports images
1505 |         if not capabilities.supports_images:
1506 |             return {
1507 |                 "status": "error",
1508 |                 "content": (
1509 |                     f"Image support not available: Model '{model_name}' does not support image processing. "
1510 |                     f"Please use a vision-capable model such as 'gemini-2.5-flash', 'o3', "
1511 |                     f"or 'claude-opus-4.1' for image analysis tasks."
1512 |                 ),
1513 |                 "content_type": "text",
1514 |                 "metadata": {
1515 |                     "error_type": "validation_error",
1516 |                     "model_name": model_name,
1517 |                     "supports_images": False,
1518 |                     "image_count": len(images),
1519 |                 },
1520 |             }
1521 | 
1522 |         # Get model image limits from capabilities
1523 |         max_images = 5  # Default max number of images
1524 |         max_size_mb = capabilities.max_image_size_mb
1525 | 
1526 |         # Check image count
1527 |         if len(images) > max_images:
1528 |             return {
1529 |                 "status": "error",
1530 |                 "content": (
1531 |                     f"Too many images: Model '{model_name}' supports a maximum of {max_images} images, "
1532 |                     f"but {len(images)} were provided. Please reduce the number of images."
1533 |                 ),
1534 |                 "content_type": "text",
1535 |                 "metadata": {
1536 |                     "error_type": "validation_error",
1537 |                     "model_name": model_name,
1538 |                     "image_count": len(images),
1539 |                     "max_images": max_images,
1540 |                 },
1541 |             }
1542 | 
1543 |         # Calculate total size of all images
1544 |         total_size_mb = 0.0
1545 |         for image_path in images:
1546 |             try:
1547 |                 if image_path.startswith("data:image/"):
1548 |                     # Handle data URL: data:image/png;base64,iVBORw0...
1549 |                     _, data = image_path.split(",", 1)
1550 |                     # Base64 encoding increases size by ~33%, so decode to get actual size
1551 |                     actual_size = len(base64.b64decode(data))
1552 |                     total_size_mb += actual_size / (1024 * 1024)
1553 |                 else:
1554 |                     # Handle file path
1555 |                     path = Path(image_path)
1556 |                     if path.exists():
1557 |                         file_size = path.stat().st_size
1558 |                         total_size_mb += file_size / (1024 * 1024)
1559 |                     else:
1560 |                         logger.warning(f"Image file not found: {image_path}")
1561 |                         # Assume a reasonable size for missing files to avoid breaking validation
1562 |                         total_size_mb += 1.0  # 1MB assumption
1563 |             except Exception as e:
1564 |                 logger.warning(f"Failed to get size for image {image_path}: {e}")
1565 |                 # Assume a reasonable size for problematic files
1566 |                 total_size_mb += 1.0  # 1MB assumption
1567 | 
1568 |         # Apply 40MB cap for custom models if needed
1569 |         effective_limit_mb = max_size_mb
1570 |         try:
1571 |             from providers.shared import ProviderType
1572 | 
1573 |             # ModelCapabilities dataclass has provider field defined
1574 |             if capabilities.provider == ProviderType.CUSTOM:
1575 |                 effective_limit_mb = min(max_size_mb, 40.0)
1576 |         except Exception:
1577 |             pass
1578 | 
1579 |         # Validate against size limit
1580 |         if total_size_mb > effective_limit_mb:
1581 |             return {
1582 |                 "status": "error",
1583 |                 "content": (
1584 |                     f"Image size limit exceeded: Model '{model_name}' supports maximum {effective_limit_mb:.1f}MB "
1585 |                     f"for all images combined, but {total_size_mb:.1f}MB was provided. "
1586 |                     f"Please reduce image sizes or count and try again."
1587 |                 ),
1588 |                 "content_type": "text",
1589 |                 "metadata": {
1590 |                     "error_type": "validation_error",
1591 |                     "model_name": model_name,
1592 |                     "total_size_mb": round(total_size_mb, 2),
1593 |                     "limit_mb": round(effective_limit_mb, 2),
1594 |                     "image_count": len(images),
1595 |                     "supports_images": True,
1596 |                 },
1597 |             }
1598 | 
1599 |         # All validations passed
1600 |         logger.debug(f"Image validation passed: {len(images)} images, {total_size_mb:.1f}MB total")
1601 |         return None
1602 | 
1603 |     def _parse_response(self, raw_text: str, request, model_info: Optional[dict] = None):
1604 |         """Parse response - will be inherited for now."""
1605 |         # Implementation inherited from current base.py
1606 |         raise NotImplementedError("Subclasses must implement _parse_response method")
1607 | 
```

--------------------------------------------------------------------------------
/tools/workflow/workflow_mixin.py:
--------------------------------------------------------------------------------

```python
   1 | """
   2 | Workflow Mixin for Zen MCP Tools
   3 | 
   4 | This module provides a sophisticated workflow-based pattern that enables tools to
   5 | perform multi-step work with structured findings and expert analysis.
   6 | 
   7 | Key Components:
   8 | - BaseWorkflowMixin: Abstract base class providing comprehensive workflow functionality
   9 | 
  10 | The workflow pattern enables tools like debug, precommit, and codereview to perform
  11 | systematic multi-step work with pause/resume capabilities, context-aware file embedding,
  12 | and seamless integration with external AI models for expert analysis.
  13 | 
  14 | Features:
  15 | - Multi-step workflow orchestration with pause/resume
  16 | - Context-aware file embedding optimization
  17 | - Expert analysis integration with token budgeting
  18 | - Conversation memory and threading support
  19 | - Proper inheritance-based architecture (no hasattr/getattr)
  20 | - Comprehensive type annotations for IDE support
  21 | """
  22 | 
  23 | import json
  24 | import logging
  25 | import os
  26 | import re
  27 | from abc import ABC, abstractmethod
  28 | from typing import Any, Optional
  29 | 
  30 | from mcp.types import TextContent
  31 | 
  32 | from config import MCP_PROMPT_SIZE_LIMIT
  33 | from utils.conversation_memory import add_turn, create_thread
  34 | 
  35 | from ..shared.base_models import ConsolidatedFindings
  36 | from ..shared.exceptions import ToolExecutionError
  37 | 
  38 | logger = logging.getLogger(__name__)
  39 | 
  40 | 
  41 | class BaseWorkflowMixin(ABC):
  42 |     """
  43 |     Abstract base class providing guided workflow functionality for tools.
  44 | 
  45 |     This class implements a sophisticated workflow pattern where the CLI performs
  46 |     systematic local work before calling external models for expert analysis.
  47 |     Tools can inherit from this class to gain comprehensive workflow capabilities.
  48 | 
  49 |     Architecture:
  50 |     - Uses proper inheritance patterns instead of hasattr/getattr
  51 |     - Provides hook methods with default implementations
  52 |     - Requires abstract methods to be implemented by subclasses
  53 |     - Fully type-annotated for excellent IDE support
  54 | 
  55 |     Context-Aware File Embedding:
  56 |     - Intermediate steps: Only reference file names (saves the CLI's context)
  57 |     - Final steps: Embed full file content for expert analysis
  58 |     - Integrates with existing token budgeting infrastructure
  59 | 
  60 |     Requirements:
  61 |     This class expects to be used with BaseTool and requires implementation of:
  62 |     - get_model_provider(model_name)
  63 |     - _resolve_model_context(arguments, request)
  64 |     - get_system_prompt()
  65 |     - get_default_temperature()
  66 |     - _prepare_file_content_for_prompt()
  67 |     """
  68 | 
  69 |     def __init__(self) -> None:
  70 |         super().__init__()
  71 |         self.work_history: list[dict[str, Any]] = []
  72 |         self.consolidated_findings: ConsolidatedFindings = ConsolidatedFindings()
  73 |         self.initial_request: Optional[str] = None
  74 | 
  75 |     # ================================================================================
  76 |     # Abstract Methods - Required Implementation by BaseTool or Subclasses
  77 |     # ================================================================================
  78 | 
  79 |     @abstractmethod
  80 |     def get_name(self) -> str:
  81 |         """Return the name of this tool. Usually provided by BaseTool."""
  82 |         pass
  83 | 
  84 |     @abstractmethod
  85 |     def get_workflow_request_model(self) -> type:
  86 |         """Return the request model class for this workflow tool."""
  87 |         pass
  88 | 
  89 |     @abstractmethod
  90 |     def get_system_prompt(self) -> str:
  91 |         """Return the system prompt for this tool. Usually provided by BaseTool."""
  92 |         pass
  93 | 
  94 |     @abstractmethod
  95 |     def get_language_instruction(self) -> str:
  96 |         """Return the language instruction for localization. Usually provided by BaseTool."""
  97 |         pass
  98 | 
  99 |     @abstractmethod
 100 |     def get_default_temperature(self) -> float:
 101 |         """Return the default temperature for this tool. Usually provided by BaseTool."""
 102 |         pass
 103 | 
 104 |     @abstractmethod
 105 |     def get_model_provider(self, model_name: str) -> Any:
 106 |         """Get model provider for the given model. Usually provided by BaseTool."""
 107 |         pass
 108 | 
 109 |     @abstractmethod
 110 |     def _resolve_model_context(self, arguments: dict[str, Any], request: Any) -> tuple[str, Any]:
 111 |         """Resolve model context from arguments. Usually provided by BaseTool."""
 112 |         pass
 113 | 
 114 |     @abstractmethod
 115 |     def _prepare_file_content_for_prompt(
 116 |         self,
 117 |         request_files: list[str],
 118 |         continuation_id: Optional[str],
 119 |         context_description: str = "New files",
 120 |         max_tokens: Optional[int] = None,
 121 |         reserve_tokens: int = 1_000,
 122 |         remaining_budget: Optional[int] = None,
 123 |         arguments: Optional[dict[str, Any]] = None,
 124 |         model_context: Optional[Any] = None,
 125 |     ) -> tuple[str, list[str]]:
 126 |         """Prepare file content for prompts. Usually provided by BaseTool."""
 127 |         pass
 128 | 
 129 |     # ================================================================================
 130 |     # Abstract Methods - Tool-Specific Implementation Required
 131 |     # ================================================================================
 132 | 
 133 |     @abstractmethod
 134 |     def get_work_steps(self, request: Any) -> list[str]:
 135 |         """Define tool-specific work steps and criteria"""
 136 |         pass
 137 | 
 138 |     @abstractmethod
 139 |     def get_required_actions(
 140 |         self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
 141 |     ) -> list[str]:
 142 |         """Define required actions for each work phase.
 143 | 
 144 |         Args:
 145 |             step_number: Current step (1-based)
 146 |             confidence: Current confidence level (exploring, low, medium, high, certain)
 147 |             findings: Current findings text
 148 |             total_steps: Total estimated steps for this work
 149 |             request: Optional request object for continuation-aware decisions
 150 | 
 151 |         Returns:
 152 |             List of specific actions the CLI should take before calling tool again
 153 |         """
 154 |         pass
 155 | 
 156 |     # ================================================================================
 157 |     # Hook Methods - Default Implementations with Override Capability
 158 |     # ================================================================================
 159 | 
 160 |     def should_call_expert_analysis(self, consolidated_findings: ConsolidatedFindings, request=None) -> bool:
 161 |         """
 162 |         Decide when to call external model based on tool-specific criteria.
 163 | 
 164 |         Default implementation for tools that don't use expert analysis.
 165 |         Override this for tools that do use expert analysis.
 166 | 
 167 |         Args:
 168 |             consolidated_findings: Findings from workflow steps
 169 |             request: Current request object (optional for backwards compatibility)
 170 |         """
 171 |         if not self.requires_expert_analysis():
 172 |             return False
 173 | 
 174 |         # Check if user requested to skip assistant model
 175 |         if request and not self.get_request_use_assistant_model(request):
 176 |             return False
 177 | 
 178 |         # Default logic for tools that support expert analysis
 179 |         return (
 180 |             len(consolidated_findings.relevant_files) > 0
 181 |             or len(consolidated_findings.findings) >= 2
 182 |             or len(consolidated_findings.issues_found) > 0
 183 |         )
 184 | 
 185 |     def prepare_expert_analysis_context(self, consolidated_findings: ConsolidatedFindings) -> str:
 186 |         """
 187 |         Prepare context for external model call.
 188 | 
 189 |         Default implementation for tools that don't use expert analysis.
 190 |         Override this for tools that do use expert analysis.
 191 |         """
 192 |         if not self.requires_expert_analysis():
 193 |             return ""
 194 | 
 195 |         # Default context preparation
 196 |         context_parts = [
 197 |             f"=== {self.get_name().upper()} WORK SUMMARY ===",
 198 |             f"Total steps: {len(consolidated_findings.findings)}",
 199 |             f"Files examined: {len(consolidated_findings.files_checked)}",
 200 |             f"Relevant files: {len(consolidated_findings.relevant_files)}",
 201 |             "",
 202 |             "=== WORK PROGRESSION ===",
 203 |         ]
 204 | 
 205 |         for finding in consolidated_findings.findings:
 206 |             context_parts.append(finding)
 207 | 
 208 |         return "\n".join(context_parts)
 209 | 
 210 |     def requires_expert_analysis(self) -> bool:
 211 |         """
 212 |         Override this to completely disable expert analysis for the tool.
 213 | 
 214 |         Returns True if the tool supports expert analysis (default).
 215 |         Returns False if the tool is self-contained (like planner).
 216 |         """
 217 |         return True
 218 | 
 219 |     def should_include_files_in_expert_prompt(self) -> bool:
 220 |         """
 221 |         Whether to include file content in the expert analysis prompt.
 222 |         Override this to return True if your tool needs files in the prompt.
 223 |         """
 224 |         return False
 225 | 
 226 |     def should_embed_system_prompt(self) -> bool:
 227 |         """
 228 |         Whether to embed the system prompt in the main prompt.
 229 |         Override this to return True if your tool needs the system prompt embedded.
 230 |         """
 231 |         return False
 232 | 
 233 |     def get_expert_thinking_mode(self) -> str:
 234 |         """
 235 |         Get the thinking mode for expert analysis.
 236 |         Override this to customize the thinking mode.
 237 |         """
 238 |         return "high"
 239 | 
 240 |     def get_request_temperature(self, request) -> float:
 241 |         """Get temperature from request. Override for custom temperature handling."""
 242 |         try:
 243 |             return request.temperature if request.temperature is not None else self.get_default_temperature()
 244 |         except AttributeError:
 245 |             return self.get_default_temperature()
 246 | 
 247 |     def get_validated_temperature(self, request, model_context: Any) -> tuple[float, list[str]]:
 248 |         """
 249 |         Get temperature from request and validate it against model constraints.
 250 | 
 251 |         This is a convenience method that combines temperature extraction and validation
 252 |         for workflow tools. It ensures temperature is within valid range for the model.
 253 | 
 254 |         Args:
 255 |             request: The request object containing temperature
 256 |             model_context: Model context object containing model info
 257 | 
 258 |         Returns:
 259 |             Tuple of (validated_temperature, warning_messages)
 260 |         """
 261 |         temperature = self.get_request_temperature(request)
 262 |         return self.validate_and_correct_temperature(temperature, model_context)
 263 | 
 264 |     def get_request_thinking_mode(self, request) -> str:
 265 |         """Get thinking mode from request. Override for custom thinking mode handling."""
 266 |         try:
 267 |             return request.thinking_mode if request.thinking_mode is not None else self.get_expert_thinking_mode()
 268 |         except AttributeError:
 269 |             return self.get_expert_thinking_mode()
 270 | 
 271 |     def get_expert_analysis_instruction(self) -> str:
 272 |         """
 273 |         Get the instruction to append after the expert context.
 274 |         Override this to provide tool-specific instructions.
 275 |         """
 276 |         return "Please provide expert analysis based on the investigation findings."
 277 | 
 278 |     def get_request_use_assistant_model(self, request) -> bool:
 279 |         """
 280 |         Get use_assistant_model from request. Override for custom assistant model handling.
 281 | 
 282 |         Args:
 283 |             request: Current request object
 284 | 
 285 |         Returns:
 286 |             True if assistant model should be used, False otherwise
 287 |         """
 288 |         try:
 289 |             return request.use_assistant_model if request.use_assistant_model is not None else True
 290 |         except AttributeError:
 291 |             return True
 292 | 
 293 |     def get_step_guidance_message(self, request) -> str:
 294 |         """
 295 |         Get step guidance message. Override for tool-specific guidance.
 296 |         Default implementation uses required actions.
 297 |         """
 298 |         required_actions = self.get_required_actions(
 299 |             request.step_number, self.get_request_confidence(request), request.findings, request.total_steps, request
 300 |         )
 301 | 
 302 |         next_step_number = request.step_number + 1
 303 |         return (
 304 |             f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. "
 305 |             f"You MUST first work using appropriate tools. "
 306 |             f"REQUIRED ACTIONS before calling {self.get_name()} step {next_step_number}:\n"
 307 |             + "\n".join(f"{i + 1}. {action}" for i, action in enumerate(required_actions))
 308 |             + f"\n\nOnly call {self.get_name()} again with step_number: {next_step_number} "
 309 |             f"AFTER completing this work."
 310 |         )
 311 | 
 312 |     def _prepare_files_for_expert_analysis(self) -> str:
 313 |         """
 314 |         Prepare file content for expert analysis.
 315 | 
 316 |         EXPERT ANALYSIS REQUIRES ACTUAL FILE CONTENT:
 317 |         Expert analysis needs actual file content of all unique files marked as relevant
 318 |         throughout the workflow, regardless of conversation history optimization.
 319 | 
 320 |         SIMPLIFIED LOGIC:
 321 |         Expert analysis gets all unique files from relevant_files across the entire workflow.
 322 |         This includes:
 323 |         - Current step's relevant_files (consolidated_findings.relevant_files)
 324 |         - Plus any additional relevant_files from conversation history (if continued workflow)
 325 | 
 326 |         This ensures expert analysis has complete context without including irrelevant files.
 327 |         """
 328 |         all_relevant_files = set()
 329 | 
 330 |         # 1. Get files from current consolidated relevant_files
 331 |         all_relevant_files.update(self.consolidated_findings.relevant_files)
 332 | 
 333 |         # 2. Get additional relevant_files from conversation history (if continued workflow)
 334 |         try:
 335 |             current_arguments = self.get_current_arguments()
 336 |             if current_arguments:
 337 |                 continuation_id = current_arguments.get("continuation_id")
 338 | 
 339 |                 if continuation_id:
 340 |                     from utils.conversation_memory import get_conversation_file_list, get_thread
 341 | 
 342 |                     thread_context = get_thread(continuation_id)
 343 |                     if thread_context:
 344 |                         # Get all files from conversation (these were relevant_files in previous steps)
 345 |                         conversation_files = get_conversation_file_list(thread_context)
 346 |                         all_relevant_files.update(conversation_files)
 347 |                         logger.debug(
 348 |                             f"[WORKFLOW_FILES] {self.get_name()}: Added {len(conversation_files)} files from conversation history"
 349 |                         )
 350 |         except Exception as e:
 351 |             logger.warning(f"[WORKFLOW_FILES] {self.get_name()}: Could not get conversation files: {e}")
 352 | 
 353 |         # Convert to list and remove any empty/None values
 354 |         files_for_expert = [f for f in all_relevant_files if f and f.strip()]
 355 | 
 356 |         if not files_for_expert:
 357 |             logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: No relevant files found for expert analysis")
 358 |             return ""
 359 | 
 360 |         # Expert analysis needs actual file content, bypassing conversation optimization
 361 |         try:
 362 |             file_content, processed_files = self._force_embed_files_for_expert_analysis(files_for_expert)
 363 | 
 364 |             logger.info(
 365 |                 f"[WORKFLOW_FILES] {self.get_name()}: Prepared {len(processed_files)} unique relevant files for expert analysis "
 366 |                 f"(from {len(self.consolidated_findings.relevant_files)} current relevant files)"
 367 |             )
 368 | 
 369 |             return file_content
 370 | 
 371 |         except Exception as e:
 372 |             logger.error(f"[WORKFLOW_FILES] {self.get_name()}: Failed to prepare files for expert analysis: {e}")
 373 |             return ""
 374 | 
 375 |     def _force_embed_files_for_expert_analysis(self, files: list[str]) -> tuple[str, list[str]]:
 376 |         """
 377 |         Force embed files for expert analysis, bypassing conversation history filtering.
 378 | 
 379 |         Expert analysis has different requirements than normal workflow steps:
 380 |         - Normal steps: Optimize tokens by skipping files in conversation history
 381 |         - Expert analysis: Needs actual file content regardless of conversation history
 382 | 
 383 |         Args:
 384 |             files: List of file paths to embed
 385 | 
 386 |         Returns:
 387 |             tuple[str, list[str]]: (file_content, processed_files)
 388 |         """
 389 |         # Use read_files directly with token budgeting, bypassing filter_new_files
 390 |         from utils.file_utils import expand_paths, read_files
 391 | 
 392 |         # Get token budget for files
 393 |         current_model_context = self.get_current_model_context()
 394 |         if current_model_context:
 395 |             try:
 396 |                 token_allocation = current_model_context.calculate_token_allocation()
 397 |                 max_tokens = token_allocation.file_tokens
 398 |                 logger.debug(
 399 |                     f"[WORKFLOW_FILES] {self.get_name()}: Using {max_tokens:,} tokens for expert analysis files"
 400 |                 )
 401 |             except Exception as e:
 402 |                 logger.warning(f"[WORKFLOW_FILES] {self.get_name()}: Failed to get token allocation: {e}")
 403 |                 max_tokens = 100_000  # Fallback
 404 |         else:
 405 |             max_tokens = 100_000  # Fallback
 406 | 
 407 |         # Read files directly without conversation history filtering
 408 |         logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Force embedding {len(files)} files for expert analysis")
 409 |         file_content = read_files(
 410 |             files,
 411 |             max_tokens=max_tokens,
 412 |             reserve_tokens=1000,
 413 |             include_line_numbers=self.wants_line_numbers_by_default(),
 414 |         )
 415 | 
 416 |         # Expand paths to get individual files for tracking
 417 |         processed_files = expand_paths(files)
 418 | 
 419 |         logger.debug(
 420 |             f"[WORKFLOW_FILES] {self.get_name()}: Expert analysis embedding: {len(processed_files)} files, "
 421 |             f"{len(file_content):,} characters"
 422 |         )
 423 | 
 424 |         return file_content, processed_files
 425 | 
 426 |     def wants_line_numbers_by_default(self) -> bool:
 427 |         """
 428 |         Whether this tool wants line numbers in file content by default.
 429 |         Override this to customize line number behavior.
 430 |         """
 431 |         return True  # Most workflow tools benefit from line numbers for analysis
 432 | 
 433 |     def _add_files_to_expert_context(self, expert_context: str, file_content: str) -> str:
 434 |         """
 435 |         Add file content to the expert context.
 436 |         Override this to customize how files are added to the context.
 437 |         """
 438 |         return f"{expert_context}\n\n=== ESSENTIAL FILES ===\n{file_content}\n=== END ESSENTIAL FILES ==="
 439 | 
 440 |     # ================================================================================
 441 |     # Context-Aware File Embedding - Core Implementation
 442 |     # ================================================================================
 443 | 
 444 |     def _handle_workflow_file_context(self, request: Any, arguments: dict[str, Any]) -> None:
 445 |         """
 446 |         Handle file context appropriately based on workflow phase.
 447 | 
 448 |         CONTEXT-AWARE FILE EMBEDDING STRATEGY:
 449 |         1. Intermediate steps + continuation: Only reference file names (save the CLI's context)
 450 |         2. Final step: Embed full file content for expert analysis
 451 |         3. Expert analysis: Always embed relevant files with token budgeting
 452 | 
 453 |         This prevents wasting the CLI's limited context on intermediate steps while ensuring
 454 |         the final expert analysis has complete file context.
 455 |         """
 456 |         continuation_id = self.get_request_continuation_id(request)
 457 |         is_final_step = not self.get_request_next_step_required(request)
 458 |         step_number = self.get_request_step_number(request)
 459 | 
 460 |         # Extract model context for token budgeting
 461 |         model_context = arguments.get("_model_context")
 462 |         self._model_context = model_context
 463 | 
 464 |         # Clear any previous file context to ensure clean state
 465 |         self._embedded_file_content = ""
 466 |         self._file_reference_note = ""
 467 |         self._actually_processed_files = []
 468 | 
 469 |         # Determine if we should embed files or just reference them
 470 |         should_embed_files = self._should_embed_files_in_workflow_step(step_number, continuation_id, is_final_step)
 471 | 
 472 |         if should_embed_files:
 473 |             # Final step or expert analysis - embed full file content
 474 |             logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Embedding files for final step/expert analysis")
 475 |             self._embed_workflow_files(request, arguments)
 476 |         else:
 477 |             # Intermediate step with continuation - only reference file names
 478 |             logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Only referencing file names for intermediate step")
 479 |             self._reference_workflow_files(request)
 480 | 
 481 |     def _should_embed_files_in_workflow_step(
 482 |         self, step_number: int, continuation_id: Optional[str], is_final_step: bool
 483 |     ) -> bool:
 484 |         """
 485 |         Determine whether to embed file content based on workflow context.
 486 | 
 487 |         CORRECT LOGIC:
 488 |         - NEVER embed files when the CLI is getting the next step (next_step_required=True)
 489 |         - ONLY embed files when sending to external model (next_step_required=False)
 490 | 
 491 |         Args:
 492 |             step_number: Current step number
 493 |             continuation_id: Thread continuation ID (None for new conversations)
 494 |             is_final_step: Whether this is the final step (next_step_required == False)
 495 | 
 496 |         Returns:
 497 |             bool: True if files should be embedded, False if only referenced
 498 |         """
 499 |         # RULE 1: Final steps (no more steps needed) - embed files for expert analysis
 500 |         if is_final_step:
 501 |             logger.debug("[WORKFLOW_FILES] Final step - will embed files for expert analysis")
 502 |             return True
 503 | 
 504 |         # RULE 2: Any intermediate step (more steps needed) - NEVER embed files
 505 |         # This includes:
 506 |         # - New conversations with next_step_required=True
 507 |         # - Steps with continuation_id and next_step_required=True
 508 |         logger.debug("[WORKFLOW_FILES] Intermediate step (more work needed) - will only reference files")
 509 |         return False
 510 | 
 511 |     def _embed_workflow_files(self, request: Any, arguments: dict[str, Any]) -> None:
 512 |         """
 513 |         Embed full file content for final steps and expert analysis.
 514 |         Uses proper token budgeting like existing debug.py.
 515 |         """
 516 |         # Use relevant_files as the standard field for workflow tools
 517 |         request_files = self.get_request_relevant_files(request)
 518 |         if not request_files:
 519 |             logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: No relevant_files to embed")
 520 |             return
 521 | 
 522 |         try:
 523 |             # Model context should be available from early validation, but might be deferred for tests
 524 |             current_model_context = self.get_current_model_context()
 525 |             if not current_model_context:
 526 |                 # Try to resolve model context now (deferred from early validation)
 527 |                 try:
 528 |                     model_name, model_context = self._resolve_model_context(arguments, request)
 529 |                     self._model_context = model_context
 530 |                     self._current_model_name = model_name
 531 |                 except Exception as e:
 532 |                     logger.error(f"[WORKFLOW_FILES] {self.get_name()}: Failed to resolve model context: {e}")
 533 |                     # Create fallback model context (preserves existing test behavior)
 534 |                     from utils.model_context import ModelContext
 535 | 
 536 |                     model_name = self.get_request_model_name(request)
 537 |                     self._model_context = ModelContext(model_name)
 538 |                     self._current_model_name = model_name
 539 | 
 540 |             # Use the same file preparation logic as BaseTool with token budgeting
 541 |             continuation_id = self.get_request_continuation_id(request)
 542 |             remaining_tokens = arguments.get("_remaining_tokens")
 543 | 
 544 |             file_content, processed_files = self._prepare_file_content_for_prompt(
 545 |                 request_files,
 546 |                 continuation_id,
 547 |                 "Workflow files for analysis",
 548 |                 remaining_budget=remaining_tokens,
 549 |                 arguments=arguments,
 550 |                 model_context=self._model_context,
 551 |             )
 552 | 
 553 |             # Store for use in expert analysis
 554 |             self._embedded_file_content = file_content
 555 |             self._actually_processed_files = processed_files
 556 | 
 557 |             logger.info(
 558 |                 f"[WORKFLOW_FILES] {self.get_name()}: Embedded {len(processed_files)} relevant_files for final analysis"
 559 |             )
 560 | 
 561 |         except Exception as e:
 562 |             logger.error(f"[WORKFLOW_FILES] {self.get_name()}: Failed to embed files: {e}")
 563 |             # Continue without file embedding rather than failing
 564 |             self._embedded_file_content = ""
 565 |             self._actually_processed_files = []
 566 | 
 567 |     def _reference_workflow_files(self, request: Any) -> None:
 568 |         """
 569 |         Reference file names without embedding content for intermediate steps.
 570 |         Saves the CLI's context while still providing file awareness.
 571 |         """
 572 |         # Workflow tools use relevant_files, not files
 573 |         request_files = self.get_request_relevant_files(request)
 574 |         logger.debug(
 575 |             f"[WORKFLOW_FILES] {self.get_name()}: _reference_workflow_files called with {len(request_files)} relevant_files"
 576 |         )
 577 | 
 578 |         if not request_files:
 579 |             logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: No files to reference, skipping")
 580 |             return
 581 | 
 582 |         # Store file references for conversation context
 583 |         self._referenced_files = request_files
 584 | 
 585 |         # Create a simple reference note
 586 |         file_names = [os.path.basename(f) for f in request_files]
 587 |         reference_note = f"Files referenced in this step: {', '.join(file_names)}\n"
 588 | 
 589 |         self._file_reference_note = reference_note
 590 |         logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Set _file_reference_note: {self._file_reference_note}")
 591 | 
 592 |         logger.info(
 593 |             f"[WORKFLOW_FILES] {self.get_name()}: Referenced {len(request_files)} files without embedding content"
 594 |         )
 595 | 
 596 |     # ================================================================================
 597 |     # Main Workflow Orchestration
 598 |     # ================================================================================
 599 | 
 600 |     async def execute_workflow(self, arguments: dict[str, Any]) -> list[TextContent]:
 601 |         """
 602 |         Main workflow orchestration following debug tool pattern.
 603 | 
 604 |         Comprehensive workflow implementation that handles all common patterns:
 605 |         1. Request validation and step management
 606 |         2. Continuation and backtracking support
 607 |         3. Step data processing and consolidation
 608 |         4. Tool-specific field mapping and customization
 609 |         5. Completion logic with optional expert analysis
 610 |         6. Generic "certain confidence" handling
 611 |         7. Step guidance and required actions
 612 |         8. Conversation memory integration
 613 |         """
 614 |         from mcp.types import TextContent
 615 | 
 616 |         try:
 617 |             # Store arguments for access by helper methods
 618 |             self._current_arguments = arguments
 619 | 
 620 |             # Validate request using tool-specific model
 621 |             request = self.get_workflow_request_model()(**arguments)
 622 | 
 623 |             # Validate step field size (basic validation for workflow instructions)
 624 |             # If step is too large, user should use shorter instructions and put details in files
 625 |             step_content = request.step
 626 |             if step_content and len(step_content) > MCP_PROMPT_SIZE_LIMIT:
 627 |                 from tools.models import ToolOutput
 628 | 
 629 |                 error_output = ToolOutput(
 630 |                     status="resend_prompt",
 631 |                     content="Step instructions are too long. Please use shorter instructions and provide detailed context via file paths instead.",
 632 |                     content_type="text",
 633 |                     metadata={"prompt_size": len(step_content), "limit": MCP_PROMPT_SIZE_LIMIT},
 634 |                 )
 635 |                 raise ValueError(f"MCP_SIZE_CHECK:{error_output.model_dump_json()}")
 636 | 
 637 |             # Validate file paths for security (same as base tool)
 638 |             # Use try/except instead of hasattr as per coding standards
 639 |             try:
 640 |                 path_error = self.validate_file_paths(request)
 641 |                 if path_error:
 642 |                     from tools.models import ToolOutput
 643 | 
 644 |                     error_output = ToolOutput(
 645 |                         status="error",
 646 |                         content=path_error,
 647 |                         content_type="text",
 648 |                     )
 649 |                     logger.error("Path validation failed for %s: %s", self.get_name(), path_error)
 650 |                     raise ToolExecutionError(error_output.model_dump_json())
 651 |             except AttributeError:
 652 |                 # validate_file_paths method not available - skip validation
 653 |                 pass
 654 | 
 655 |             # Try to validate model availability early for production scenarios
 656 |             # For tests, defer model validation to later to allow mocks to work
 657 |             try:
 658 |                 model_name, model_context = self._resolve_model_context(arguments, request)
 659 |                 # Store for later use
 660 |                 self._current_model_name = model_name
 661 |                 self._model_context = model_context
 662 |             except ValueError as e:
 663 |                 # Model resolution failed - in production this would be an error,
 664 |                 # but for tests we defer to allow mocks to handle model resolution
 665 |                 logger.debug(f"Early model validation failed, deferring to later: {e}")
 666 |                 self._current_model_name = None
 667 |                 self._model_context = None
 668 | 
 669 |             # Handle continuation
 670 |             continuation_id = request.continuation_id
 671 | 
 672 |             # Restore workflow state on continuation
 673 |             if continuation_id:
 674 |                 from utils.conversation_memory import get_thread
 675 | 
 676 |                 thread = get_thread(continuation_id)
 677 |                 if thread and thread.turns:
 678 |                     # Find the most recent assistant turn from this tool with workflow state
 679 |                     for turn in reversed(thread.turns):
 680 |                         if turn.role == "assistant" and turn.tool_name == self.get_name() and turn.model_metadata:
 681 |                             state = turn.model_metadata
 682 |                             if isinstance(state, dict) and "work_history" in state:
 683 |                                 self.work_history = state.get("work_history", [])
 684 |                                 self.initial_request = state.get("initial_request")
 685 |                                 # Rebuild consolidated findings from restored history
 686 |                                 self._reprocess_consolidated_findings()
 687 |                                 logger.debug(
 688 |                                     f"[{self.get_name()}] Restored workflow state with {len(self.work_history)} history items"
 689 |                                 )
 690 |                                 break  # State restored, exit loop
 691 | 
 692 |             # Adjust total steps if needed
 693 |             if request.step_number > request.total_steps:
 694 |                 request.total_steps = request.step_number
 695 | 
 696 |             # Create thread for first step
 697 |             if not continuation_id and request.step_number == 1:
 698 |                 clean_args = {k: v for k, v in arguments.items() if k not in ["_model_context", "_resolved_model_name"]}
 699 |                 continuation_id = create_thread(self.get_name(), clean_args)
 700 |                 self.initial_request = request.step
 701 |                 # Allow tools to store initial description for expert analysis
 702 |                 self.store_initial_issue(request.step)
 703 | 
 704 |             # Process work step - allow tools to customize field mapping
 705 |             step_data = self.prepare_step_data(request)
 706 | 
 707 |             # Store in history
 708 |             self.work_history.append(step_data)
 709 | 
 710 |             # Update consolidated findings
 711 |             self._update_consolidated_findings(step_data)
 712 | 
 713 |             # Handle file context appropriately based on workflow phase
 714 |             self._handle_workflow_file_context(request, arguments)
 715 | 
 716 |             # Build response with tool-specific customization
 717 |             response_data = self.build_base_response(request, continuation_id)
 718 | 
 719 |             # If work is complete, handle completion logic
 720 |             if not request.next_step_required:
 721 |                 response_data = await self.handle_work_completion(response_data, request, arguments)
 722 |             else:
 723 |                 # Force CLI to work before calling tool again
 724 |                 response_data = self.handle_work_continuation(response_data, request)
 725 | 
 726 |             # Allow tools to customize the final response
 727 |             response_data = self.customize_workflow_response(response_data, request)
 728 | 
 729 |             # Add metadata (provider_used and model_used) to workflow response
 730 |             self._add_workflow_metadata(response_data, arguments)
 731 | 
 732 |             # Store in conversation memory
 733 |             if continuation_id:
 734 |                 self.store_conversation_turn(continuation_id, response_data, request)
 735 | 
 736 |             return [TextContent(type="text", text=json.dumps(response_data, indent=2, ensure_ascii=False))]
 737 | 
 738 |         except ToolExecutionError:
 739 |             raise
 740 |         except Exception as e:
 741 |             if str(e).startswith("MCP_SIZE_CHECK:"):
 742 |                 payload = str(e)[len("MCP_SIZE_CHECK:") :]
 743 |                 raise ToolExecutionError(payload)
 744 | 
 745 |             logger.error(f"Error in {self.get_name()} work: {e}", exc_info=True)
 746 |             error_data = {
 747 |                 "status": f"{self.get_name()}_failed",
 748 |                 "error": str(e),
 749 |                 "step_number": arguments.get("step_number", 0),
 750 |             }
 751 | 
 752 |             # Add metadata to error responses too
 753 |             self._add_workflow_metadata(error_data, arguments)
 754 | 
 755 |             raise ToolExecutionError(json.dumps(error_data, indent=2, ensure_ascii=False)) from e
 756 | 
 757 |     # Hook methods for tool customization
 758 | 
 759 |     def prepare_step_data(self, request) -> dict:
 760 |         """
 761 |         Prepare step data from request. Tools can override to customize field mapping.
 762 |         """
 763 |         step_data = {
 764 |             "step": request.step,
 765 |             "step_number": request.step_number,
 766 |             "findings": request.findings,
 767 |             "files_checked": self.get_request_files_checked(request),
 768 |             "relevant_files": self.get_request_relevant_files(request),
 769 |             "relevant_context": self.get_request_relevant_context(request),
 770 |             "issues_found": self.get_request_issues_found(request),
 771 |             "confidence": self.get_request_confidence(request),
 772 |             "hypothesis": self.get_request_hypothesis(request),
 773 |             "images": self.get_request_images(request),
 774 |         }
 775 |         return step_data
 776 | 
 777 |     def build_base_response(self, request, continuation_id: str = None) -> dict:
 778 |         """
 779 |         Build the base response structure. Tools can override for custom response fields.
 780 |         """
 781 |         response_data = {
 782 |             "status": f"{self.get_name()}_in_progress",
 783 |             "step_number": request.step_number,
 784 |             "total_steps": request.total_steps,
 785 |             "next_step_required": request.next_step_required,
 786 |             f"{self.get_name()}_status": {
 787 |                 "files_checked": len(self.consolidated_findings.files_checked),
 788 |                 "relevant_files": len(self.consolidated_findings.relevant_files),
 789 |                 "relevant_context": len(self.consolidated_findings.relevant_context),
 790 |                 "issues_found": len(self.consolidated_findings.issues_found),
 791 |                 "images_collected": len(self.consolidated_findings.images),
 792 |                 "current_confidence": self.get_request_confidence(request),
 793 |             },
 794 |         }
 795 | 
 796 |         if continuation_id:
 797 |             response_data["continuation_id"] = continuation_id
 798 | 
 799 |         # Add file context information based on workflow phase
 800 |         embedded_content = self.get_embedded_file_content()
 801 |         reference_note = self.get_file_reference_note()
 802 |         processed_files = self.get_actually_processed_files()
 803 | 
 804 |         logger.debug(
 805 |             f"[WORKFLOW_FILES] {self.get_name()}: Building response - has embedded_content: {bool(embedded_content)}, has reference_note: {bool(reference_note)}"
 806 |         )
 807 | 
 808 |         # Prioritize embedded content over references for final steps
 809 |         if embedded_content:
 810 |             # Final step - include embedded file information
 811 |             logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Adding fully_embedded file context")
 812 |             response_data["file_context"] = {
 813 |                 "type": "fully_embedded",
 814 |                 "files_embedded": len(processed_files),
 815 |                 "context_optimization": "Full file content embedded for expert analysis",
 816 |             }
 817 |         elif reference_note:
 818 |             # Intermediate step - include file reference note
 819 |             logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Adding reference_only file context")
 820 |             response_data["file_context"] = {
 821 |                 "type": "reference_only",
 822 |                 "note": reference_note,
 823 |                 "context_optimization": "Files referenced but not embedded to preserve the context window",
 824 |             }
 825 | 
 826 |         return response_data
 827 | 
 828 |     def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
 829 |         """
 830 |         Determine if expert analysis should be skipped due to high certainty.
 831 | 
 832 |         Default: False (always call expert analysis)
 833 |         Override in tools like debug to check for "certain" confidence.
 834 |         """
 835 |         return False
 836 | 
 837 |     def handle_completion_without_expert_analysis(self, request, consolidated_findings) -> dict:
 838 |         """
 839 |         Handle completion when skipping expert analysis.
 840 | 
 841 |         Tools can override this for custom high-confidence completion handling.
 842 |         Default implementation provides generic response.
 843 |         """
 844 |         work_summary = self.prepare_work_summary()
 845 |         continuation_id = self.get_request_continuation_id(request)
 846 | 
 847 |         response_data = {
 848 |             "status": self.get_completion_status(),
 849 |             f"complete_{self.get_name()}": {
 850 |                 "initial_request": self.get_initial_request(request.step),
 851 |                 "steps_taken": len(consolidated_findings.findings),
 852 |                 "files_examined": list(consolidated_findings.files_checked),
 853 |                 "relevant_files": list(consolidated_findings.relevant_files),
 854 |                 "relevant_context": list(consolidated_findings.relevant_context),
 855 |                 "work_summary": work_summary,
 856 |                 "final_analysis": self.get_final_analysis_from_request(request),
 857 |                 "confidence_level": self.get_confidence_level(request),
 858 |             },
 859 |             "next_steps": self.get_completion_message(),
 860 |             "skip_expert_analysis": True,
 861 |             "expert_analysis": {
 862 |                 "status": self.get_skip_expert_analysis_status(),
 863 |                 "reason": self.get_skip_reason(),
 864 |             },
 865 |         }
 866 | 
 867 |         if continuation_id:
 868 |             response_data["continuation_id"] = continuation_id
 869 | 
 870 |         return response_data
 871 | 
 872 |     # ================================================================================
 873 |     # Inheritance Hook Methods - Replace hasattr/getattr Anti-patterns
 874 |     # ================================================================================
 875 | 
 876 |     def get_request_confidence(self, request: Any) -> str:
 877 |         """Get confidence from request. Override for custom confidence handling."""
 878 |         try:
 879 |             return request.confidence or "low"
 880 |         except AttributeError:
 881 |             return "low"
 882 | 
 883 |     def get_request_relevant_context(self, request: Any) -> list[str]:
 884 |         """Get relevant context from request. Override for custom field mapping."""
 885 |         try:
 886 |             return request.relevant_context or []
 887 |         except AttributeError:
 888 |             return []
 889 | 
 890 |     def get_request_issues_found(self, request: Any) -> list[str]:
 891 |         """Get issues found from request. Override for custom field mapping."""
 892 |         try:
 893 |             return request.issues_found or []
 894 |         except AttributeError:
 895 |             return []
 896 | 
 897 |     def get_request_hypothesis(self, request: Any) -> Optional[str]:
 898 |         """Get hypothesis from request. Override for custom field mapping."""
 899 |         try:
 900 |             return request.hypothesis
 901 |         except AttributeError:
 902 |             return None
 903 | 
 904 |     def get_request_images(self, request: Any) -> list[str]:
 905 |         """Get images from request. Override for custom field mapping."""
 906 |         try:
 907 |             return request.images or []
 908 |         except AttributeError:
 909 |             return []
 910 | 
 911 |     # File Context Access Methods
 912 | 
 913 |     def get_embedded_file_content(self) -> str:
 914 |         """Get embedded file content. Returns empty string if not available."""
 915 |         try:
 916 |             return self._embedded_file_content or ""
 917 |         except AttributeError:
 918 |             return ""
 919 | 
 920 |     def get_file_reference_note(self) -> str:
 921 |         """Get file reference note. Returns empty string if not available."""
 922 |         try:
 923 |             return self._file_reference_note or ""
 924 |         except AttributeError:
 925 |             return ""
 926 | 
 927 |     def get_actually_processed_files(self) -> list[str]:
 928 |         """Get list of actually processed files. Returns empty list if not available."""
 929 |         try:
 930 |             return self._actually_processed_files or []
 931 |         except AttributeError:
 932 |             return []
 933 | 
 934 |     def get_current_model_context(self):
 935 |         """Get current model context. Returns None if not available."""
 936 |         try:
 937 |             return self._model_context
 938 |         except AttributeError:
 939 |             return None
 940 | 
 941 |     def get_request_model_name(self, request: Any) -> str:
 942 |         """Get model name from request. Override for custom model handling."""
 943 |         try:
 944 |             return request.model or "flash"
 945 |         except AttributeError:
 946 |             return "flash"
 947 | 
 948 |     def get_request_continuation_id(self, request: Any) -> Optional[str]:
 949 |         """Get continuation ID from request. Override for custom continuation handling."""
 950 |         try:
 951 |             return request.continuation_id
 952 |         except AttributeError:
 953 |             return None
 954 | 
 955 |     def get_request_next_step_required(self, request: Any) -> bool:
 956 |         """Get next step required from request. Override for custom step handling."""
 957 |         try:
 958 |             return request.next_step_required
 959 |         except AttributeError:
 960 |             return True
 961 | 
 962 |     def get_request_step_number(self, request: Any) -> int:
 963 |         """Get step number from request. Override for custom step handling."""
 964 |         try:
 965 |             return request.step_number or 1
 966 |         except AttributeError:
 967 |             return 1
 968 | 
 969 |     def get_request_relevant_files(self, request: Any) -> list[str]:
 970 |         """Get relevant files from request. Override for custom file handling."""
 971 |         try:
 972 |             return request.relevant_files or []
 973 |         except AttributeError:
 974 |             return []
 975 | 
 976 |     def get_request_files_checked(self, request: Any) -> list[str]:
 977 |         """Get files checked from request. Override for custom file handling."""
 978 |         try:
 979 |             return request.files_checked or []
 980 |         except AttributeError:
 981 |             return []
 982 | 
 983 |     def get_current_arguments(self) -> dict[str, Any]:
 984 |         """Get current arguments. Returns empty dict if not available."""
 985 |         try:
 986 |             return self._current_arguments or {}
 987 |         except AttributeError:
 988 |             return {}
 989 | 
 990 |     def store_initial_issue(self, step_description: str):
 991 |         """Store initial issue description. Override for custom storage."""
 992 |         # Default implementation - tools can override to store differently
 993 |         self.initial_issue = step_description
 994 | 
 995 |     def get_initial_request(self, fallback_step: str) -> str:
 996 |         """Get initial request description. Override for custom retrieval."""
 997 |         try:
 998 |             return self.initial_request or fallback_step
 999 |         except AttributeError:
1000 |             return fallback_step
1001 | 
1002 |     # Default implementations for inheritance hooks
1003 | 
1004 |     def prepare_work_summary(self) -> str:
1005 |         """Prepare work summary. Override for custom implementation."""
1006 |         return f"Completed {len(self.consolidated_findings.findings)} work steps"
1007 | 
1008 |     def get_completion_status(self) -> str:
1009 |         """Get completion status. Override for tool-specific status."""
1010 |         return "high_confidence_completion"
1011 | 
1012 |     def get_final_analysis_from_request(self, request):
1013 |         """Extract final analysis from request. Override for tool-specific fields."""
1014 |         return self.get_request_hypothesis(request)
1015 | 
1016 |     def get_confidence_level(self, request) -> str:
1017 |         """Get confidence level. Override for tool-specific confidence handling."""
1018 |         return self.get_request_confidence(request) or "high"
1019 | 
1020 |     def get_completion_message(self) -> str:
1021 |         """Get completion message. Override for tool-specific messaging."""
1022 |         return (
1023 |             f"{self.get_name().capitalize()} complete with high confidence. Present results "
1024 |             "and proceed with implementation without requiring further consultation."
1025 |         )
1026 | 
1027 |     def get_skip_reason(self) -> str:
1028 |         """Get reason for skipping expert analysis. Override for tool-specific reasons."""
1029 |         return f"{self.get_name()} completed with sufficient confidence"
1030 | 
1031 |     def get_skip_expert_analysis_status(self) -> str:
1032 |         """Get status for skipped expert analysis. Override for tool-specific status."""
1033 |         return "skipped_by_tool_design"
1034 | 
1035 |     def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
1036 |         """
1037 |         Get the message to show when work is complete.
1038 |         Tools can override for custom messaging.
1039 | 
1040 |         Args:
1041 |             expert_analysis_used: True if expert analysis was successfully executed
1042 |         """
1043 |         base_message = (
1044 |             f"{self.get_name().upper()} IS COMPLETE. You MUST now summarize and present ALL key findings, confirmed "
1045 |             "hypotheses, and exact recommended solutions. Clearly identify the most likely root cause and "
1046 |             "provide concrete, actionable implementation guidance. Highlight affected code paths and display "
1047 |             "reasoning that led to this conclusion—make it easy for a developer to understand exactly where "
1048 |             "the problem lies."
1049 |         )
1050 | 
1051 |         # Add expert analysis guidance only when expert analysis was actually used
1052 |         if expert_analysis_used:
1053 |             expert_guidance = self.get_expert_analysis_guidance()
1054 |             if expert_guidance:
1055 |                 return f"{base_message}\n\n{expert_guidance}"
1056 | 
1057 |         return base_message
1058 | 
1059 |     def get_expert_analysis_guidance(self) -> str:
1060 |         """
1061 |         Get additional guidance for handling expert analysis results.
1062 | 
1063 |         Subclasses can override this to provide specific instructions about how
1064 |         to validate and use expert analysis findings. Returns empty string by default.
1065 | 
1066 |         When expert analysis is called, this guidance will be:
1067 |         1. Appended to the completion next steps message
1068 |         2. Added as "important_considerations" field in the response data
1069 | 
1070 |         Example implementation:
1071 |         ```python
1072 |         def get_expert_analysis_guidance(self) -> str:
1073 |             return (
1074 |                 "IMPORTANT: Expert analysis provided above. You MUST validate "
1075 |                 "the expert findings rather than accepting them blindly. "
1076 |                 "Cross-reference with your own investigation and ensure "
1077 |                 "recommendations align with the codebase context."
1078 |             )
1079 |         ```
1080 | 
1081 |         Returns:
1082 |             Additional guidance text or empty string if no guidance needed
1083 |         """
1084 |         return ""
1085 | 
1086 |     def customize_workflow_response(self, response_data: dict, request) -> dict:
1087 |         """
1088 |         Allow tools to customize the workflow response before returning.
1089 | 
1090 |         Tools can override this to add tool-specific fields, modify status names,
1091 |         customize field mapping, etc. Default implementation returns unchanged.
1092 |         """
1093 |         # Ensure file context information is preserved in all response paths
1094 |         if not response_data.get("file_context"):
1095 |             embedded_content = self.get_embedded_file_content()
1096 |             reference_note = self.get_file_reference_note()
1097 |             processed_files = self.get_actually_processed_files()
1098 | 
1099 |             # Prioritize embedded content over references for final steps
1100 |             if embedded_content:
1101 |                 response_data["file_context"] = {
1102 |                     "type": "fully_embedded",
1103 |                     "files_embedded": len(processed_files),
1104 |                     "context_optimization": "Full file content embedded for expert analysis",
1105 |                 }
1106 |             elif reference_note:
1107 |                 response_data["file_context"] = {
1108 |                     "type": "reference_only",
1109 |                     "note": reference_note,
1110 |                     "context_optimization": "Files referenced but not embedded to preserve the context window",
1111 |                 }
1112 | 
1113 |         return response_data
1114 | 
1115 |     def store_conversation_turn(self, continuation_id: str, response_data: dict, request):
1116 |         """
1117 |         Store the conversation turn. Tools can override for custom memory storage.
1118 |         """
1119 |         # CRITICAL: Extract clean content for conversation history (exclude internal workflow metadata)
1120 |         clean_content = self._extract_clean_workflow_content_for_history(response_data)
1121 | 
1122 |         # Serialize workflow state for persistence across stateless tool calls
1123 |         workflow_state = {"work_history": self.work_history, "initial_request": getattr(self, "initial_request", None)}
1124 | 
1125 |         add_turn(
1126 |             thread_id=continuation_id,
1127 |             role="assistant",
1128 |             content=clean_content,  # Use cleaned content instead of full response_data
1129 |             tool_name=self.get_name(),
1130 |             files=self.get_request_relevant_files(request),
1131 |             images=self.get_request_images(request),
1132 |             model_metadata=workflow_state,  # Persist the state
1133 |         )
1134 | 
1135 |     def _add_workflow_metadata(self, response_data: dict, arguments: dict[str, Any]) -> None:
1136 |         """
1137 |         Add metadata (provider_used and model_used) to workflow response.
1138 | 
1139 |         This ensures workflow tools have the same metadata as regular tools,
1140 |         making it consistent across all tool types for tracking which provider
1141 |         and model were used for the response.
1142 | 
1143 |         Args:
1144 |             response_data: The response data dictionary to modify
1145 |             arguments: The original arguments containing model context
1146 |         """
1147 |         try:
1148 |             # Get model information from arguments (set by server.py)
1149 |             resolved_model_name = arguments.get("_resolved_model_name")
1150 |             model_context = arguments.get("_model_context")
1151 | 
1152 |             if resolved_model_name and model_context:
1153 |                 # Extract provider information from model context
1154 |                 provider = model_context.provider
1155 |                 provider_name = provider.get_provider_type().value if provider else "unknown"
1156 | 
1157 |                 # Create metadata dictionary
1158 |                 metadata = {
1159 |                     "tool_name": self.get_name(),
1160 |                     "model_used": resolved_model_name,
1161 |                     "provider_used": provider_name,
1162 |                 }
1163 | 
1164 |                 # Preserve existing metadata and add workflow metadata
1165 |                 if "metadata" not in response_data:
1166 |                     response_data["metadata"] = {}
1167 |                 response_data["metadata"].update(metadata)
1168 | 
1169 |                 logger.debug(
1170 |                     f"[WORKFLOW_METADATA] {self.get_name()}: Added metadata - "
1171 |                     f"model: {resolved_model_name}, provider: {provider_name}"
1172 |                 )
1173 |             else:
1174 |                 # Fallback - try to get model info from request
1175 |                 request = self.get_workflow_request_model()(**arguments)
1176 |                 model_name = self.get_request_model_name(request)
1177 | 
1178 |                 # Basic metadata without provider info
1179 |                 metadata = {
1180 |                     "tool_name": self.get_name(),
1181 |                     "model_used": model_name,
1182 |                     "provider_used": "unknown",
1183 |                 }
1184 | 
1185 |                 # Preserve existing metadata and add workflow metadata
1186 |                 if "metadata" not in response_data:
1187 |                     response_data["metadata"] = {}
1188 |                 response_data["metadata"].update(metadata)
1189 | 
1190 |                 logger.debug(
1191 |                     f"[WORKFLOW_METADATA] {self.get_name()}: Added fallback metadata - "
1192 |                     f"model: {model_name}, provider: unknown"
1193 |                 )
1194 | 
1195 |         except Exception as e:
1196 |             # Don't fail the workflow if metadata addition fails
1197 |             logger.warning(f"[WORKFLOW_METADATA] {self.get_name()}: Failed to add metadata: {e}")
1198 |             # Still add basic metadata with tool name
1199 |             response_data["metadata"] = {"tool_name": self.get_name()}
1200 | 
1201 |     def _extract_clean_workflow_content_for_history(self, response_data: dict) -> str:
1202 |         """
1203 |         Extract clean content from workflow response suitable for conversation history.
1204 | 
1205 |         This method removes internal workflow metadata, continuation offers, and
1206 |         status information that should not appear when the conversation is
1207 |         reconstructed for expert models or other tools.
1208 | 
1209 |         Args:
1210 |             response_data: The full workflow response data
1211 | 
1212 |         Returns:
1213 |             str: Clean content suitable for conversation history storage
1214 |         """
1215 |         # Create a clean copy with only essential content for conversation history
1216 |         clean_data = {}
1217 | 
1218 |         # Include core content if present
1219 |         if "content" in response_data:
1220 |             clean_data["content"] = response_data["content"]
1221 | 
1222 |         # Include expert analysis if present (but clean it)
1223 |         if "expert_analysis" in response_data:
1224 |             expert_analysis = response_data["expert_analysis"]
1225 |             if isinstance(expert_analysis, dict):
1226 |                 # Only include the actual analysis content, not metadata
1227 |                 clean_expert = {}
1228 |                 if "raw_analysis" in expert_analysis:
1229 |                     clean_expert["analysis"] = expert_analysis["raw_analysis"]
1230 |                 elif "content" in expert_analysis:
1231 |                     clean_expert["analysis"] = expert_analysis["content"]
1232 |                 if clean_expert:
1233 |                     clean_data["expert_analysis"] = clean_expert
1234 | 
1235 |         # Include findings/issues if present (core workflow output)
1236 |         if "complete_analysis" in response_data:
1237 |             complete_analysis = response_data["complete_analysis"]
1238 |             if isinstance(complete_analysis, dict):
1239 |                 clean_complete = {}
1240 |                 # Include essential analysis data without internal metadata
1241 |                 for key in ["findings", "issues_found", "relevant_context", "insights"]:
1242 |                     if key in complete_analysis:
1243 |                         clean_complete[key] = complete_analysis[key]
1244 |                 if clean_complete:
1245 |                     clean_data["analysis_summary"] = clean_complete
1246 | 
1247 |         # Include step information for context but remove internal workflow metadata
1248 |         if "step_number" in response_data:
1249 |             clean_data["step_info"] = {
1250 |                 "step": response_data.get("step", ""),
1251 |                 "step_number": response_data.get("step_number", 1),
1252 |                 "total_steps": response_data.get("total_steps", 1),
1253 |             }
1254 | 
1255 |         # Exclude problematic fields that should never appear in conversation history:
1256 |         # - continuation_id (confuses LLMs with old IDs)
1257 |         # - status (internal workflow state)
1258 |         # - next_step_required (internal control flow)
1259 |         # - analysis_status (internal tracking)
1260 |         # - file_context (internal optimization info)
1261 |         # - required_actions (internal workflow instructions)
1262 | 
1263 |         return json.dumps(clean_data, indent=2, ensure_ascii=False)
1264 | 
1265 |     # Core workflow logic methods
1266 | 
1267 |     async def handle_work_completion(self, response_data: dict, request, arguments: dict) -> dict:
1268 |         """
1269 |         Handle work completion logic - expert analysis decision and response building.
1270 |         """
1271 |         response_data[f"{self.get_name()}_complete"] = True
1272 | 
1273 |         # Check if tool wants to skip expert analysis due to high certainty
1274 |         if self.should_skip_expert_analysis(request, self.consolidated_findings):
1275 |             # Handle completion without expert analysis
1276 |             completion_response = self.handle_completion_without_expert_analysis(request, self.consolidated_findings)
1277 |             response_data.update(completion_response)
1278 |         elif self.requires_expert_analysis() and self.should_call_expert_analysis(self.consolidated_findings, request):
1279 |             # Standard expert analysis path
1280 |             response_data["status"] = "calling_expert_analysis"
1281 | 
1282 |             # Call expert analysis
1283 |             expert_analysis = await self._call_expert_analysis(arguments, request)
1284 |             response_data["expert_analysis"] = expert_analysis
1285 | 
1286 |             # Handle special expert analysis statuses
1287 |             if isinstance(expert_analysis, dict) and expert_analysis.get("status") in [
1288 |                 "files_required_to_continue",
1289 |                 "investigation_paused",
1290 |                 "refactoring_paused",
1291 |             ]:
1292 |                 # Promote the special status to the main response
1293 |                 special_status = expert_analysis["status"]
1294 |                 response_data["status"] = special_status
1295 |                 response_data["content"] = expert_analysis.get(
1296 |                     "raw_analysis", json.dumps(expert_analysis, ensure_ascii=False)
1297 |                 )
1298 |                 del response_data["expert_analysis"]
1299 | 
1300 |                 # Update next steps for special status
1301 |                 if special_status == "files_required_to_continue":
1302 |                     response_data["next_steps"] = "Provide the requested files and continue the analysis."
1303 |                 else:
1304 |                     response_data["next_steps"] = expert_analysis.get(
1305 |                         "next_steps", "Continue based on expert analysis."
1306 |                     )
1307 |             elif isinstance(expert_analysis, dict) and expert_analysis.get("status") == "analysis_error":
1308 |                 # Expert analysis failed - promote error status
1309 |                 response_data["status"] = "error"
1310 |                 response_data["content"] = expert_analysis.get("error", "Expert analysis failed")
1311 |                 response_data["content_type"] = "text"
1312 |                 del response_data["expert_analysis"]
1313 |             else:
1314 |                 # Expert analysis was successfully executed - include expert guidance
1315 |                 response_data["next_steps"] = self.get_completion_next_steps_message(expert_analysis_used=True)
1316 | 
1317 |                 # Add expert analysis guidance as important considerations
1318 |                 expert_guidance = self.get_expert_analysis_guidance()
1319 |                 if expert_guidance:
1320 |                     response_data["important_considerations"] = expert_guidance
1321 | 
1322 |             # Prepare complete work summary
1323 |             work_summary = self._prepare_work_summary()
1324 |             response_data[f"complete_{self.get_name()}"] = {
1325 |                 "initial_request": self.get_initial_request(request.step),
1326 |                 "steps_taken": len(self.work_history),
1327 |                 "files_examined": list(self.consolidated_findings.files_checked),
1328 |                 "relevant_files": list(self.consolidated_findings.relevant_files),
1329 |                 "relevant_context": list(self.consolidated_findings.relevant_context),
1330 |                 "issues_found": self.consolidated_findings.issues_found,
1331 |                 "work_summary": work_summary,
1332 |             }
1333 |         else:
1334 |             # Tool doesn't require expert analysis or local work was sufficient
1335 |             if not self.requires_expert_analysis():
1336 |                 # Tool is self-contained (like planner)
1337 |                 response_data["status"] = f"{self.get_name()}_complete"
1338 |                 response_data["next_steps"] = (
1339 |                     f"{self.get_name().capitalize()} work complete. Present results to the user."
1340 |                 )
1341 |             else:
1342 |                 # Local work was sufficient for tools that support expert analysis
1343 |                 response_data["status"] = "local_work_complete"
1344 |                 response_data["next_steps"] = (
1345 |                     f"Local {self.get_name()} complete with sufficient confidence. Present findings "
1346 |                     "and recommendations to the user based on the work results."
1347 |                 )
1348 | 
1349 |         return response_data
1350 | 
1351 |     def handle_work_continuation(self, response_data: dict, request) -> dict:
1352 |         """
1353 |         Handle work continuation - force pause and provide guidance.
1354 |         """
1355 |         response_data["status"] = f"pause_for_{self.get_name()}"
1356 |         response_data[f"{self.get_name()}_required"] = True
1357 | 
1358 |         # Get tool-specific required actions
1359 |         required_actions = self.get_required_actions(
1360 |             request.step_number, self.get_request_confidence(request), request.findings, request.total_steps, request
1361 |         )
1362 |         response_data["required_actions"] = required_actions
1363 | 
1364 |         # Generate step guidance
1365 |         response_data["next_steps"] = self.get_step_guidance_message(request)
1366 | 
1367 |         return response_data
1368 | 
1369 |     def _update_consolidated_findings(self, step_data: dict):
1370 |         """Update consolidated findings with new step data"""
1371 |         self.consolidated_findings.files_checked.update(step_data.get("files_checked", []))
1372 |         self.consolidated_findings.relevant_files.update(step_data.get("relevant_files", []))
1373 |         self.consolidated_findings.relevant_context.update(step_data.get("relevant_context", []))
1374 |         self.consolidated_findings.findings.append(f"Step {step_data['step_number']}: {step_data['findings']}")
1375 |         if step_data.get("hypothesis"):
1376 |             self.consolidated_findings.hypotheses.append(
1377 |                 {
1378 |                     "step": step_data["step_number"],
1379 |                     "hypothesis": step_data["hypothesis"],
1380 |                     "confidence": step_data["confidence"],
1381 |                 }
1382 |             )
1383 |         if step_data.get("issues_found"):
1384 |             self.consolidated_findings.issues_found.extend(step_data["issues_found"])
1385 |         if step_data.get("images"):
1386 |             self.consolidated_findings.images.extend(step_data["images"])
1387 |         # Update confidence to latest value from this step
1388 |         if step_data.get("confidence"):
1389 |             self.consolidated_findings.confidence = step_data["confidence"]
1390 | 
1391 |     def _reprocess_consolidated_findings(self):
1392 |         """Reprocess consolidated findings after backtracking"""
1393 |         self.consolidated_findings = ConsolidatedFindings()
1394 |         for step in self.work_history:
1395 |             self._update_consolidated_findings(step)
1396 | 
1397 |     def _prepare_work_summary(self) -> str:
1398 |         """Prepare a comprehensive summary of the work"""
1399 |         summary_parts = [
1400 |             f"=== {self.get_name().upper()} WORK SUMMARY ===",
1401 |             f"Total steps: {len(self.work_history)}",
1402 |             f"Files examined: {len(self.consolidated_findings.files_checked)}",
1403 |             f"Relevant files identified: {len(self.consolidated_findings.relevant_files)}",
1404 |             f"Methods/functions involved: {len(self.consolidated_findings.relevant_context)}",
1405 |             f"Issues found: {len(self.consolidated_findings.issues_found)}",
1406 |             "",
1407 |             "=== WORK PROGRESSION ===",
1408 |         ]
1409 | 
1410 |         for finding in self.consolidated_findings.findings:
1411 |             summary_parts.append(finding)
1412 | 
1413 |         if self.consolidated_findings.hypotheses:
1414 |             summary_parts.extend(
1415 |                 [
1416 |                     "",
1417 |                     "=== HYPOTHESIS EVOLUTION ===",
1418 |                 ]
1419 |             )
1420 |             for hyp in self.consolidated_findings.hypotheses:
1421 |                 summary_parts.append(f"Step {hyp['step']} ({hyp['confidence']} confidence): {hyp['hypothesis']}")
1422 | 
1423 |         if self.consolidated_findings.issues_found:
1424 |             summary_parts.extend(
1425 |                 [
1426 |                     "",
1427 |                     "=== ISSUES IDENTIFIED ===",
1428 |                 ]
1429 |             )
1430 |             for issue in self.consolidated_findings.issues_found:
1431 |                 severity = issue.get("severity", "unknown")
1432 |                 description = issue.get("description", "No description")
1433 |                 summary_parts.append(f"[{severity.upper()}] {description}")
1434 | 
1435 |         return "\n".join(summary_parts)
1436 | 
1437 |     async def _call_expert_analysis(self, arguments: dict, request) -> dict:
1438 |         """Call external model for expert analysis"""
1439 |         try:
1440 |             # Model context should be resolved from early validation, but handle fallback for tests
1441 |             if not self._model_context:
1442 |                 # Try to resolve model context for expert analysis (deferred from early validation)
1443 |                 try:
1444 |                     model_name, model_context = self._resolve_model_context(arguments, request)
1445 |                     self._model_context = model_context
1446 |                     self._current_model_name = model_name
1447 |                 except Exception as e:
1448 |                     logger.error(f"Failed to resolve model context for expert analysis: {e}")
1449 |                     # Use request model as fallback (preserves existing test behavior)
1450 |                     model_name = self.get_request_model_name(request)
1451 |                     from utils.model_context import ModelContext
1452 | 
1453 |                     model_context = ModelContext(model_name)
1454 |                     self._model_context = model_context
1455 |                     self._current_model_name = model_name
1456 |             else:
1457 |                 model_name = self._current_model_name
1458 | 
1459 |             provider = self._model_context.provider
1460 | 
1461 |             # Prepare expert analysis context
1462 |             expert_context = self.prepare_expert_analysis_context(self.consolidated_findings)
1463 | 
1464 |             # Check if tool wants to include files in prompt
1465 |             if self.should_include_files_in_expert_prompt():
1466 |                 file_content = self._prepare_files_for_expert_analysis()
1467 |                 if file_content:
1468 |                     expert_context = self._add_files_to_expert_context(expert_context, file_content)
1469 | 
1470 |             # Get system prompt for this tool with localization support
1471 |             base_system_prompt = self.get_system_prompt()
1472 |             capability_augmented_prompt = self._augment_system_prompt_with_capabilities(
1473 |                 base_system_prompt, getattr(self._model_context, "capabilities", None)
1474 |             )
1475 |             language_instruction = self.get_language_instruction()
1476 |             system_prompt = language_instruction + capability_augmented_prompt
1477 | 
1478 |             # Check if tool wants system prompt embedded in main prompt
1479 |             if self.should_embed_system_prompt():
1480 |                 prompt = f"{system_prompt}\n\n{expert_context}\n\n{self.get_expert_analysis_instruction()}"
1481 |                 system_prompt = ""  # Clear it since we embedded it
1482 |             else:
1483 |                 prompt = expert_context
1484 | 
1485 |             # Validate temperature against model constraints
1486 |             validated_temperature, temp_warnings = self.get_validated_temperature(request, self._model_context)
1487 | 
1488 |             # Log any temperature corrections
1489 |             for warning in temp_warnings:
1490 |                 logger.warning(warning)
1491 | 
1492 |             # Generate AI response - use request parameters if available
1493 |             model_response = provider.generate_content(
1494 |                 prompt=prompt,
1495 |                 model_name=model_name,
1496 |                 system_prompt=system_prompt,
1497 |                 temperature=validated_temperature,
1498 |                 thinking_mode=self.get_request_thinking_mode(request),
1499 |                 images=list(set(self.consolidated_findings.images)) if self.consolidated_findings.images else None,
1500 |             )
1501 | 
1502 |             if model_response.content:
1503 |                 content = model_response.content.strip()
1504 | 
1505 |                 # Try to extract JSON from markdown code blocks if present
1506 |                 if "```json" in content or "```" in content:
1507 |                     json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", content, re.DOTALL)
1508 |                     if json_match:
1509 |                         content = json_match.group(1).strip()
1510 | 
1511 |                 try:
1512 |                     # Try to parse as JSON
1513 |                     analysis_result = json.loads(content)
1514 |                     return analysis_result
1515 |                 except json.JSONDecodeError as e:
1516 |                     # Log the parse error with more details but don't fail
1517 |                     logger.info(
1518 |                         f"[{self.get_name()}] Expert analysis returned non-JSON response (this is OK for smaller models). "
1519 |                         f"Parse error: {str(e)}. Response length: {len(model_response.content)} chars."
1520 |                     )
1521 |                     logger.debug(f"First 500 chars of response: {model_response.content[:500]!r}")
1522 | 
1523 |                     # Still return the analysis as plain text - this is valid
1524 |                     return {
1525 |                         "status": "analysis_complete",
1526 |                         "raw_analysis": model_response.content,
1527 |                         "format": "text",  # Indicate it's plain text, not an error
1528 |                         "note": "Analysis provided in plain text format",
1529 |                     }
1530 |             else:
1531 |                 return {"error": "No response from model", "status": "empty_response"}
1532 | 
1533 |         except Exception as e:
1534 |             logger.error(f"Error calling expert analysis: {e}", exc_info=True)
1535 |             return {"error": str(e), "status": "analysis_error"}
1536 | 
1537 |     def _process_work_step(self, step_data: dict):
1538 |         """
1539 |         Process a single work step and update internal state.
1540 | 
1541 |         This method is useful for testing and manual step processing.
1542 |         It adds the step to work history and updates consolidated findings.
1543 | 
1544 |         Args:
1545 |             step_data: Dictionary containing step information including:
1546 |                       step, step_number, findings, files_checked, etc.
1547 |         """
1548 |         # Store in history
1549 |         self.work_history.append(step_data)
1550 | 
1551 |         # Update consolidated findings
1552 |         self._update_consolidated_findings(step_data)
1553 | 
1554 |     # Common execute method for workflow-based tools
1555 | 
1556 |     async def execute(self, arguments: dict[str, Any]) -> list[TextContent]:
1557 |         """
1558 |         Common execute logic for workflow-based tools.
1559 | 
1560 |         This method provides common validation and delegates to execute_workflow.
1561 |         Tools that need custom execute logic can override this method.
1562 |         """
1563 |         try:
1564 |             # Common validation
1565 |             if not arguments:
1566 |                 error_data = {"status": "error", "content": "No arguments provided"}
1567 |                 # Add basic metadata even for validation errors
1568 |                 error_data["metadata"] = {"tool_name": self.get_name()}
1569 |                 raise ToolExecutionError(json.dumps(error_data, ensure_ascii=False))
1570 | 
1571 |             # Delegate to execute_workflow
1572 |             return await self.execute_workflow(arguments)
1573 | 
1574 |         except ToolExecutionError:
1575 |             raise
1576 |         except Exception as e:
1577 |             logger.error(f"Error in {self.get_name()} tool execution: {e}", exc_info=True)
1578 |             error_data = {
1579 |                 "status": "error",
1580 |                 "content": f"Error in {self.get_name()}: {str(e)}",
1581 |             }  # Add metadata to error responses
1582 |             self._add_workflow_metadata(error_data, arguments)
1583 |             raise ToolExecutionError(json.dumps(error_data, ensure_ascii=False)) from e
1584 | 
1585 |     # Default implementations for methods that workflow-based tools typically don't need
1586 | 
1587 |     async def prepare_prompt(self, request) -> str:
1588 |         """
1589 |         Base implementation for workflow tools - compatible with BaseTool signature.
1590 | 
1591 |         Workflow tools typically don't need to return a prompt since they handle
1592 |         their own prompt preparation internally through the workflow execution.
1593 | 
1594 |         Args:
1595 |             request: The validated request object
1596 | 
1597 |         Returns:
1598 |             Empty string since workflow tools manage prompts internally
1599 |         """
1600 |         # Workflow tools handle their prompts internally during workflow execution
1601 |         return ""
1602 | 
1603 |     def format_response(self, response: str, request, model_info=None):
1604 |         """
1605 |         Workflow tools handle their own response formatting.
1606 |         The BaseWorkflowMixin formats responses internally.
1607 |         """
1608 |         return response
1609 | 
```
Page 23/25FirstPrevNextLast