This is page 23 of 25. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/tools/shared/base_tool.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Core Tool Infrastructure for Zen MCP Tools
3 |
4 | This module provides the fundamental base class for all tools:
5 | - BaseTool: Abstract base class defining the tool interface
6 |
7 | The BaseTool class defines the core contract that tools must implement and provides
8 | common functionality for request validation, error handling, model management,
9 | conversation handling, file processing, and response formatting.
10 | """
11 |
12 | import logging
13 | import os
14 | from abc import ABC, abstractmethod
15 | from typing import TYPE_CHECKING, Any, Optional
16 |
17 | from mcp.types import TextContent
18 |
19 | if TYPE_CHECKING:
20 | from providers.shared import ModelCapabilities
21 | from tools.models import ToolModelCategory
22 |
23 | from config import MCP_PROMPT_SIZE_LIMIT
24 | from providers import ModelProvider, ModelProviderRegistry
25 | from utils import estimate_tokens
26 | from utils.conversation_memory import (
27 | ConversationTurn,
28 | get_conversation_file_list,
29 | get_thread,
30 | )
31 | from utils.env import get_env
32 | from utils.file_utils import read_file_content, read_files
33 |
34 | # Import models from tools.models for compatibility
35 | try:
36 | from tools.models import SPECIAL_STATUS_MODELS, ContinuationOffer, ToolOutput
37 | except ImportError:
38 | # Fallback in case models haven't been set up yet
39 | SPECIAL_STATUS_MODELS = {}
40 | ContinuationOffer = None
41 | ToolOutput = None
42 |
43 | logger = logging.getLogger(__name__)
44 |
45 |
46 | class BaseTool(ABC):
47 | """
48 | Abstract base class for all Zen MCP tools.
49 |
50 | This class defines the interface that all tools must implement and provides
51 | common functionality for request handling, model creation, and response formatting.
52 |
53 | CONVERSATION-AWARE FILE PROCESSING:
54 | This base class implements the sophisticated dual prioritization strategy for
55 | conversation-aware file handling across all tools:
56 |
57 | 1. FILE DEDUPLICATION WITH NEWEST-FIRST PRIORITY:
58 | - When same file appears in multiple conversation turns, newest reference wins
59 | - Prevents redundant file embedding while preserving most recent file state
60 | - Cross-tool file tracking ensures consistent behavior across analyze → codereview → debug
61 |
62 | 2. CONVERSATION CONTEXT INTEGRATION:
63 | - All tools receive enhanced prompts with conversation history via reconstruct_thread_context()
64 | - File references from previous turns are preserved and accessible
65 | - Cross-tool knowledge transfer maintains full context without manual file re-specification
66 |
67 | 3. TOKEN-AWARE FILE EMBEDDING:
68 | - Respects model-specific token allocation budgets from ModelContext
69 | - Prioritizes conversation history, then newest files, then remaining content
70 | - Graceful degradation when token limits are approached
71 |
72 | 4. STATELESS-TO-STATEFUL BRIDGING:
73 | - Tools operate on stateless MCP requests but access full conversation state
74 | - Conversation memory automatically injected via continuation_id parameter
75 | - Enables natural AI-to-AI collaboration across tool boundaries
76 |
77 | To create a new tool:
78 | 1. Create a new class that inherits from BaseTool
79 | 2. Implement all abstract methods
80 | 3. Define a request model that inherits from ToolRequest
81 | 4. Register the tool in server.py's TOOLS dictionary
82 | """
83 |
84 | # Class-level cache for OpenRouter registry to avoid multiple loads
85 | _openrouter_registry_cache = None
86 | _custom_registry_cache = None
87 |
88 | @classmethod
89 | def _get_openrouter_registry(cls):
90 | """Get cached OpenRouter registry instance, creating if needed."""
91 | # Use BaseTool class directly to ensure cache is shared across all subclasses
92 | if BaseTool._openrouter_registry_cache is None:
93 | from providers.registries.openrouter import OpenRouterModelRegistry
94 |
95 | BaseTool._openrouter_registry_cache = OpenRouterModelRegistry()
96 | logger.debug("Created cached OpenRouter registry instance")
97 | return BaseTool._openrouter_registry_cache
98 |
99 | @classmethod
100 | def _get_custom_registry(cls):
101 | """Get cached custom-endpoint registry instance."""
102 | if BaseTool._custom_registry_cache is None:
103 | from providers.registries.custom import CustomEndpointModelRegistry
104 |
105 | BaseTool._custom_registry_cache = CustomEndpointModelRegistry()
106 | logger.debug("Created cached Custom registry instance")
107 | return BaseTool._custom_registry_cache
108 |
109 | def __init__(self):
110 | # Cache tool metadata at initialization to avoid repeated calls
111 | self.name = self.get_name()
112 | self.description = self.get_description()
113 | self.default_temperature = self.get_default_temperature()
114 | # Tool initialization complete
115 |
116 | @abstractmethod
117 | def get_name(self) -> str:
118 | """
119 | Return the unique name identifier for this tool.
120 |
121 | This name is used by MCP clients to invoke the tool and must be
122 | unique across all registered tools.
123 |
124 | Returns:
125 | str: The tool's unique name (e.g., "review_code", "analyze")
126 | """
127 | pass
128 |
129 | @abstractmethod
130 | def get_description(self) -> str:
131 | """
132 | Return a detailed description of what this tool does.
133 |
134 | This description is shown to MCP clients (like Claude / Codex / Gemini) to help them
135 | understand when and how to use the tool. It should be comprehensive
136 | and include trigger phrases.
137 |
138 | Returns:
139 | str: Detailed tool description with usage examples
140 | """
141 | pass
142 |
143 | @abstractmethod
144 | def get_input_schema(self) -> dict[str, Any]:
145 | """
146 | Return the JSON Schema that defines this tool's parameters.
147 |
148 | This schema is used by MCP clients to validate inputs before
149 | sending requests. It should match the tool's request model.
150 |
151 | Returns:
152 | Dict[str, Any]: JSON Schema object defining required and optional parameters
153 | """
154 | pass
155 |
156 | @abstractmethod
157 | def get_system_prompt(self) -> str:
158 | """
159 | Return the system prompt that configures the AI model's behavior.
160 |
161 | This prompt sets the context and instructions for how the model
162 | should approach the task. It's prepended to the user's request.
163 |
164 | Returns:
165 | str: System prompt with role definition and instructions
166 | """
167 | pass
168 |
169 | def get_capability_system_prompts(self, capabilities: Optional["ModelCapabilities"]) -> list[str]:
170 | """Return additional system prompt snippets gated on model capabilities.
171 |
172 | Subclasses can override this hook to append capability-specific
173 | instructions (for example, enabling code-generation exports when a
174 | model advertises support). The default implementation returns an empty
175 | list so no extra instructions are appended.
176 |
177 | Args:
178 | capabilities: The resolved capabilities for the active model.
179 |
180 | Returns:
181 | List of prompt fragments to append after the base system prompt.
182 | """
183 |
184 | return []
185 |
186 | def _augment_system_prompt_with_capabilities(
187 | self, base_prompt: str, capabilities: Optional["ModelCapabilities"]
188 | ) -> str:
189 | """Merge capability-driven prompt addenda with the base system prompt."""
190 |
191 | additions: list[str] = []
192 | if capabilities is not None:
193 | additions = [fragment.strip() for fragment in self.get_capability_system_prompts(capabilities) if fragment]
194 |
195 | if not additions:
196 | return base_prompt
197 |
198 | addition_text = "\n\n".join(additions)
199 | if not base_prompt:
200 | return addition_text
201 |
202 | suffix = "" if base_prompt.endswith("\n\n") else "\n\n"
203 | return f"{base_prompt}{suffix}{addition_text}"
204 |
205 | def get_annotations(self) -> Optional[dict[str, Any]]:
206 | """
207 | Return optional annotations for this tool.
208 |
209 | Annotations provide hints about tool behavior without being security-critical.
210 | They help MCP clients make better decisions about tool usage.
211 |
212 | Returns:
213 | Optional[dict]: Dictionary with annotation fields like readOnlyHint, destructiveHint, etc.
214 | Returns None if no annotations are needed.
215 | """
216 | return None
217 |
218 | def requires_model(self) -> bool:
219 | """
220 | Return whether this tool requires AI model access.
221 |
222 | Tools that override execute() to do pure data processing (like planner)
223 | should return False to skip model resolution at the MCP boundary.
224 |
225 | Returns:
226 | bool: True if tool needs AI model access (default), False for data-only tools
227 | """
228 | return True
229 |
230 | def is_effective_auto_mode(self) -> bool:
231 | """
232 | Check if we're in effective auto mode for schema generation.
233 |
234 | This determines whether the model parameter should be required in the tool schema.
235 | Used at initialization time when schemas are generated.
236 |
237 | Returns:
238 | bool: True if model parameter should be required in the schema
239 | """
240 | from config import DEFAULT_MODEL
241 | from providers.registry import ModelProviderRegistry
242 |
243 | # Case 1: Explicit auto mode
244 | if DEFAULT_MODEL.lower() == "auto":
245 | return True
246 |
247 | # Case 2: Model not available (fallback to auto mode)
248 | if DEFAULT_MODEL.lower() != "auto":
249 | provider = ModelProviderRegistry.get_provider_for_model(DEFAULT_MODEL)
250 | if not provider:
251 | return True
252 |
253 | return False
254 |
255 | def _should_require_model_selection(self, model_name: str) -> bool:
256 | """
257 | Check if we should require the CLI to select a model at runtime.
258 |
259 | This is called during request execution to determine if we need
260 | to return an error asking the CLI to provide a model parameter.
261 |
262 | Args:
263 | model_name: The model name from the request or DEFAULT_MODEL
264 |
265 | Returns:
266 | bool: True if we should require model selection
267 | """
268 | # Case 1: Model is explicitly "auto"
269 | if model_name.lower() == "auto":
270 | return True
271 |
272 | # Case 2: Requested model is not available
273 | from providers.registry import ModelProviderRegistry
274 |
275 | provider = ModelProviderRegistry.get_provider_for_model(model_name)
276 | if not provider:
277 | logger = logging.getLogger(f"tools.{self.name}")
278 | logger.warning(f"Model '{model_name}' is not available with current API keys. Requiring model selection.")
279 | return True
280 |
281 | return False
282 |
283 | def _get_available_models(self) -> list[str]:
284 | """
285 | Get list of models available from enabled providers.
286 |
287 | Only returns models from providers that have valid API keys configured.
288 | This fixes the namespace collision bug where models from disabled providers
289 | were shown to the CLI, causing routing conflicts.
290 |
291 | Returns:
292 | List of model names from enabled providers only
293 | """
294 | from providers.registry import ModelProviderRegistry
295 |
296 | # Get models from enabled providers only (those with valid API keys)
297 | all_models = ModelProviderRegistry.get_available_model_names()
298 |
299 | # Add OpenRouter models if OpenRouter is configured
300 | openrouter_key = get_env("OPENROUTER_API_KEY")
301 | if openrouter_key and openrouter_key != "your_openrouter_api_key_here":
302 | try:
303 | registry = self._get_openrouter_registry()
304 | # Add all aliases from the registry (includes OpenRouter cloud models)
305 | for alias in registry.list_aliases():
306 | if alias not in all_models:
307 | all_models.append(alias)
308 | except Exception as e:
309 | import logging
310 |
311 | logging.debug(f"Failed to add OpenRouter models to enum: {e}")
312 |
313 | # Add custom models if custom API is configured
314 | custom_url = get_env("CUSTOM_API_URL")
315 | if custom_url:
316 | try:
317 | registry = self._get_custom_registry()
318 | for alias in registry.list_aliases():
319 | if alias not in all_models:
320 | all_models.append(alias)
321 | except Exception as e:
322 | import logging
323 |
324 | logging.debug(f"Failed to add custom models to enum: {e}")
325 |
326 | # Remove duplicates while preserving order
327 | seen = set()
328 | unique_models = []
329 | for model in all_models:
330 | if model not in seen:
331 | seen.add(model)
332 | unique_models.append(model)
333 |
334 | return unique_models
335 |
336 | def _format_available_models_list(self) -> str:
337 | """Return a human-friendly list of available models or guidance when none found."""
338 |
339 | summaries, total, has_restrictions = self._get_ranked_model_summaries()
340 | if not summaries:
341 | return (
342 | "No models detected. Configure provider credentials or set DEFAULT_MODEL to a valid option. "
343 | "If the user requested a specific model, respond with this notice instead of substituting another model."
344 | )
345 | display = "; ".join(summaries)
346 | remainder = total - len(summaries)
347 | if remainder > 0:
348 | display = f"{display}; +{remainder} more (use the `listmodels` tool for the full roster)"
349 | return display
350 |
351 | @staticmethod
352 | def _format_context_window(tokens: int) -> Optional[str]:
353 | """Convert a raw context window into a short display string."""
354 |
355 | if not tokens or tokens <= 0:
356 | return None
357 |
358 | if tokens >= 1_000_000:
359 | if tokens % 1_000_000 == 0:
360 | return f"{tokens // 1_000_000}M ctx"
361 | return f"{tokens / 1_000_000:.1f}M ctx"
362 |
363 | if tokens >= 1_000:
364 | if tokens % 1_000 == 0:
365 | return f"{tokens // 1_000}K ctx"
366 | return f"{tokens / 1_000:.1f}K ctx"
367 |
368 | return f"{tokens} ctx"
369 |
370 | def _collect_ranked_capabilities(self) -> list[tuple[int, str, Any]]:
371 | """Gather available model capabilities sorted by capability rank."""
372 |
373 | from providers.registry import ModelProviderRegistry
374 |
375 | ranked: list[tuple[int, str, Any]] = []
376 | available = ModelProviderRegistry.get_available_models(respect_restrictions=True)
377 |
378 | for model_name, provider_type in available.items():
379 | provider = ModelProviderRegistry.get_provider(provider_type)
380 | if not provider:
381 | continue
382 |
383 | try:
384 | capabilities = provider.get_capabilities(model_name)
385 | except ValueError:
386 | continue
387 |
388 | rank = capabilities.get_effective_capability_rank()
389 | ranked.append((rank, model_name, capabilities))
390 |
391 | ranked.sort(key=lambda item: (-item[0], item[1]))
392 | return ranked
393 |
394 | @staticmethod
395 | def _normalize_model_identifier(name: str) -> str:
396 | """Normalize model names for deduplication across providers."""
397 |
398 | normalized = name.lower()
399 | if ":" in normalized:
400 | normalized = normalized.split(":", 1)[0]
401 | if "/" in normalized:
402 | normalized = normalized.split("/", 1)[-1]
403 | return normalized
404 |
405 | def _get_ranked_model_summaries(self, limit: int = 5) -> tuple[list[str], int, bool]:
406 | """Return formatted, ranked model summaries and restriction status."""
407 |
408 | ranked = self._collect_ranked_capabilities()
409 |
410 | # Build allowlist map (provider -> lowercase names) when restrictions are active
411 | allowed_map: dict[Any, set[str]] = {}
412 | try:
413 | from utils.model_restrictions import get_restriction_service
414 |
415 | restriction_service = get_restriction_service()
416 | if restriction_service:
417 | from providers.shared import ProviderType
418 |
419 | for provider_type in ProviderType:
420 | allowed = restriction_service.get_allowed_models(provider_type)
421 | if allowed:
422 | allowed_map[provider_type] = {name.lower() for name in allowed if name}
423 | except Exception:
424 | allowed_map = {}
425 |
426 | filtered: list[tuple[int, str, Any]] = []
427 | seen_normalized: set[str] = set()
428 |
429 | for rank, model_name, capabilities in ranked:
430 | canonical_name = getattr(capabilities, "model_name", model_name)
431 | canonical_lower = canonical_name.lower()
432 | alias_lower = model_name.lower()
433 | provider_type = getattr(capabilities, "provider", None)
434 |
435 | if allowed_map:
436 | if provider_type not in allowed_map:
437 | continue
438 | allowed_set = allowed_map[provider_type]
439 | if canonical_lower not in allowed_set and alias_lower not in allowed_set:
440 | continue
441 |
442 | normalized = self._normalize_model_identifier(canonical_name)
443 | if normalized in seen_normalized:
444 | continue
445 |
446 | seen_normalized.add(normalized)
447 | filtered.append((rank, canonical_name, capabilities))
448 |
449 | summaries: list[str] = []
450 | for rank, canonical_name, capabilities in filtered[:limit]:
451 | details: list[str] = []
452 |
453 | context_str = self._format_context_window(capabilities.context_window)
454 | if context_str:
455 | details.append(context_str)
456 |
457 | if capabilities.supports_extended_thinking:
458 | details.append("thinking")
459 |
460 | if capabilities.allow_code_generation:
461 | details.append("code-gen")
462 |
463 | base = f"{canonical_name} (score {rank}"
464 | if details:
465 | base = f"{base}, {', '.join(details)}"
466 | summaries.append(f"{base})")
467 |
468 | return summaries, len(filtered), bool(allowed_map)
469 |
470 | def _get_restriction_note(self) -> Optional[str]:
471 | """Return a string describing active per-provider allowlists, if any."""
472 |
473 | env_labels = {
474 | "OPENAI_ALLOWED_MODELS": "OpenAI",
475 | "GOOGLE_ALLOWED_MODELS": "Google",
476 | "XAI_ALLOWED_MODELS": "X.AI",
477 | "OPENROUTER_ALLOWED_MODELS": "OpenRouter",
478 | "DIAL_ALLOWED_MODELS": "DIAL",
479 | }
480 |
481 | notes: list[str] = []
482 | for env_var, label in env_labels.items():
483 | raw = get_env(env_var)
484 | if not raw:
485 | continue
486 |
487 | models = sorted({token.strip() for token in raw.split(",") if token.strip()})
488 | if not models:
489 | continue
490 |
491 | notes.append(f"{label}: {', '.join(models)}")
492 |
493 | if not notes:
494 | return None
495 |
496 | return "Policy allows only → " + "; ".join(notes)
497 |
498 | def _build_model_unavailable_message(self, model_name: str) -> str:
499 | """Compose a consistent error message for unavailable model scenarios."""
500 |
501 | tool_category = self.get_model_category()
502 | suggested_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
503 | available_models_text = self._format_available_models_list()
504 |
505 | return (
506 | f"Model '{model_name}' is not available with current API keys. "
507 | f"Available models: {available_models_text}. "
508 | f"Suggested model for {self.get_name()}: '{suggested_model}' "
509 | f"(category: {tool_category.value}). If the user explicitly requested a model, you MUST use that exact name or report this error back—do not substitute another model."
510 | )
511 |
512 | def _build_auto_mode_required_message(self) -> str:
513 | """Compose the auto-mode prompt when an explicit model selection is required."""
514 |
515 | tool_category = self.get_model_category()
516 | suggested_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
517 | available_models_text = self._format_available_models_list()
518 |
519 | return (
520 | "Model parameter is required in auto mode. "
521 | f"Available models: {available_models_text}. "
522 | f"Suggested model for {self.get_name()}: '{suggested_model}' "
523 | f"(category: {tool_category.value}). When the user names a model, relay that exact name—never swap in another option."
524 | )
525 |
526 | def get_model_field_schema(self) -> dict[str, Any]:
527 | """
528 | Generate the model field schema based on auto mode configuration.
529 |
530 | When auto mode is enabled, the model parameter becomes required
531 | and includes detailed descriptions of each model's capabilities.
532 |
533 | Returns:
534 | Dict containing the model field JSON schema
535 | """
536 |
537 | from config import DEFAULT_MODEL
538 |
539 | # Use the centralized effective auto mode check
540 | if self.is_effective_auto_mode():
541 | description = (
542 | "Currently in auto model selection mode. CRITICAL: When the user names a model, you MUST use that exact name unless the server rejects it. "
543 | "If no model is provided, you may use the `listmodels` tool to review options and select an appropriate match."
544 | )
545 | summaries, total, restricted = self._get_ranked_model_summaries()
546 | remainder = max(0, total - len(summaries))
547 | if summaries:
548 | top_line = "; ".join(summaries)
549 | if remainder > 0:
550 | label = "Allowed models" if restricted else "Top models"
551 | top_line = f"{label}: {top_line}; +{remainder} more via `listmodels`."
552 | else:
553 | label = "Allowed models" if restricted else "Top models"
554 | top_line = f"{label}: {top_line}."
555 | description = f"{description} {top_line}"
556 |
557 | restriction_note = self._get_restriction_note()
558 | if restriction_note and (remainder > 0 or not summaries):
559 | description = f"{description} {restriction_note}."
560 | return {
561 | "type": "string",
562 | "description": description,
563 | }
564 |
565 | description = (
566 | f"The default model is '{DEFAULT_MODEL}'. Override only when the user explicitly requests a different model, and use that exact name. "
567 | "If the requested model fails validation, surface the server error instead of substituting another model. When unsure, use the `listmodels` tool for details."
568 | )
569 | summaries, total, restricted = self._get_ranked_model_summaries()
570 | remainder = max(0, total - len(summaries))
571 | if summaries:
572 | top_line = "; ".join(summaries)
573 | if remainder > 0:
574 | label = "Allowed models" if restricted else "Preferred alternatives"
575 | top_line = f"{label}: {top_line}; +{remainder} more via `listmodels`."
576 | else:
577 | label = "Allowed models" if restricted else "Preferred alternatives"
578 | top_line = f"{label}: {top_line}."
579 | description = f"{description} {top_line}"
580 |
581 | restriction_note = self._get_restriction_note()
582 | if restriction_note and (remainder > 0 or not summaries):
583 | description = f"{description} {restriction_note}."
584 |
585 | return {
586 | "type": "string",
587 | "description": description,
588 | }
589 |
590 | def get_default_temperature(self) -> float:
591 | """
592 | Return the default temperature setting for this tool.
593 |
594 | Override this method to set tool-specific temperature defaults.
595 | Lower values (0.0-0.3) for analytical tasks, higher (0.7-1.0) for creative tasks.
596 |
597 | Returns:
598 | float: Default temperature between 0.0 and 1.0
599 | """
600 | return 0.5
601 |
602 | def wants_line_numbers_by_default(self) -> bool:
603 | """
604 | Return whether this tool wants line numbers added to code files by default.
605 |
606 | By default, ALL tools get line numbers for precise code references.
607 | Line numbers are essential for accurate communication about code locations.
608 |
609 | Returns:
610 | bool: True if line numbers should be added by default for this tool
611 | """
612 | return True # All tools get line numbers by default for consistency
613 |
614 | def get_default_thinking_mode(self) -> str:
615 | """
616 | Return the default thinking mode for this tool.
617 |
618 | Thinking mode controls computational budget for reasoning.
619 | Override for tools that need more or less reasoning depth.
620 |
621 | Returns:
622 | str: One of "minimal", "low", "medium", "high", "max"
623 | """
624 | return "medium" # Default to medium thinking for better reasoning
625 |
626 | def get_model_category(self) -> "ToolModelCategory":
627 | """
628 | Return the model category for this tool.
629 |
630 | Model category influences which model is selected in auto mode.
631 | Override to specify whether your tool needs extended reasoning,
632 | fast response, or balanced capabilities.
633 |
634 | Returns:
635 | ToolModelCategory: Category that influences model selection
636 | """
637 | from tools.models import ToolModelCategory
638 |
639 | return ToolModelCategory.BALANCED
640 |
641 | @abstractmethod
642 | def get_request_model(self):
643 | """
644 | Return the Pydantic model class used for validating requests.
645 |
646 | This model should inherit from ToolRequest and define all
647 | parameters specific to this tool.
648 |
649 | Returns:
650 | Type[ToolRequest]: The request model class
651 | """
652 | pass
653 |
654 | def validate_file_paths(self, request) -> Optional[str]:
655 | """
656 | Validate that all file paths in the request are absolute.
657 |
658 | This is a critical security function that prevents path traversal attacks
659 | and ensures all file access is properly controlled. All file paths must
660 | be absolute to avoid ambiguity and security issues.
661 |
662 | Args:
663 | request: The validated request object
664 |
665 | Returns:
666 | Optional[str]: Error message if validation fails, None if all paths are valid
667 | """
668 | # Only validate files/paths if they exist in the request
669 | file_fields = [
670 | "absolute_file_paths",
671 | "file",
672 | "path",
673 | "directory",
674 | "notebooks",
675 | "test_examples",
676 | "style_guide_examples",
677 | "files_checked",
678 | "relevant_files",
679 | ]
680 |
681 | for field_name in file_fields:
682 | if hasattr(request, field_name):
683 | field_value = getattr(request, field_name)
684 | if field_value is None:
685 | continue
686 |
687 | # Handle both single paths and lists of paths
688 | paths_to_check = field_value if isinstance(field_value, list) else [field_value]
689 |
690 | for path in paths_to_check:
691 | if path and not os.path.isabs(path):
692 | return f"All file paths must be FULL absolute paths. Invalid path: '{path}'"
693 |
694 | return None
695 |
696 | def _validate_token_limit(self, content: str, content_type: str = "Content") -> None:
697 | """
698 | Validate that user-provided content doesn't exceed the MCP prompt size limit.
699 |
700 | This enforcement is strictly for text crossing the MCP transport boundary
701 | (i.e., user input). Internal prompt construction may exceed this size and is
702 | governed by model-specific token limits.
703 |
704 | Args:
705 | content: The user-originated content to validate
706 | content_type: Description of the content type for error messages
707 |
708 | Raises:
709 | ValueError: If content exceeds the character size limit
710 | """
711 | if not content:
712 | logger.debug(f"{self.name} tool {content_type.lower()} validation skipped (no content)")
713 | return
714 |
715 | char_count = len(content)
716 | if char_count > MCP_PROMPT_SIZE_LIMIT:
717 | token_estimate = estimate_tokens(content)
718 | error_msg = (
719 | f"{char_count:,} characters (~{token_estimate:,} tokens). "
720 | f"Maximum is {MCP_PROMPT_SIZE_LIMIT:,} characters."
721 | )
722 | logger.error(f"{self.name} tool {content_type.lower()} validation failed: {error_msg}")
723 | raise ValueError(f"{content_type} too large: {error_msg}")
724 |
725 | token_estimate = estimate_tokens(content)
726 | logger.debug(
727 | f"{self.name} tool {content_type.lower()} validation passed: "
728 | f"{char_count:,} characters (~{token_estimate:,} tokens)"
729 | )
730 |
731 | def get_model_provider(self, model_name: str) -> ModelProvider:
732 | """
733 | Get the appropriate model provider for the given model name.
734 |
735 | This method performs runtime validation to ensure the requested model
736 | is actually available with the current API key configuration.
737 |
738 | Args:
739 | model_name: Name of the model to get provider for
740 |
741 | Returns:
742 | ModelProvider: The provider instance for the model
743 |
744 | Raises:
745 | ValueError: If the model is not available or provider not found
746 | """
747 | try:
748 | provider = ModelProviderRegistry.get_provider_for_model(model_name)
749 | if not provider:
750 | logger.error(f"No provider found for model '{model_name}' in {self.name} tool")
751 | raise ValueError(self._build_model_unavailable_message(model_name))
752 |
753 | return provider
754 | except Exception as e:
755 | logger.error(f"Failed to get provider for model '{model_name}' in {self.name} tool: {e}")
756 | raise
757 |
758 | # === CONVERSATION AND FILE HANDLING METHODS ===
759 |
760 | def get_conversation_embedded_files(self, continuation_id: Optional[str]) -> list[str]:
761 | """
762 | Get list of files already embedded in conversation history.
763 |
764 | This method returns the list of files that have already been embedded
765 | in the conversation history for a given continuation thread. Tools can
766 | use this to avoid re-embedding files that are already available in the
767 | conversation context.
768 |
769 | Args:
770 | continuation_id: Thread continuation ID, or None for new conversations
771 |
772 | Returns:
773 | list[str]: List of file paths already embedded in conversation history
774 | """
775 | if not continuation_id:
776 | # New conversation, no files embedded yet
777 | return []
778 |
779 | thread_context = get_thread(continuation_id)
780 | if not thread_context:
781 | # Thread not found, no files embedded
782 | return []
783 |
784 | embedded_files = get_conversation_file_list(thread_context)
785 | logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files")
786 | return embedded_files
787 |
788 | def filter_new_files(self, requested_files: list[str], continuation_id: Optional[str]) -> list[str]:
789 | """
790 | Filter out files that are already embedded in conversation history.
791 |
792 | This method prevents duplicate file embeddings by filtering out files that have
793 | already been embedded in the conversation history. This optimizes token usage
794 | while ensuring tools still have logical access to all requested files through
795 | conversation history references.
796 |
797 | Args:
798 | requested_files: List of files requested for current tool execution
799 | continuation_id: Thread continuation ID, or None for new conversations
800 |
801 | Returns:
802 | list[str]: List of files that need to be embedded (not already in history)
803 | """
804 | logger.debug(f"[FILES] {self.name}: Filtering {len(requested_files)} requested files")
805 |
806 | if not continuation_id:
807 | # New conversation, all files are new
808 | logger.debug(f"[FILES] {self.name}: New conversation, all {len(requested_files)} files are new")
809 | return requested_files
810 |
811 | try:
812 | embedded_files = set(self.get_conversation_embedded_files(continuation_id))
813 | logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files in conversation")
814 |
815 | # Safety check: If no files are marked as embedded but we have a continuation_id,
816 | # this might indicate an issue with conversation history. Be conservative.
817 | if not embedded_files:
818 | logger.debug(f"{self.name} tool: No files found in conversation history for thread {continuation_id}")
819 | logger.debug(
820 | f"[FILES] {self.name}: No embedded files found, returning all {len(requested_files)} requested files"
821 | )
822 | return requested_files
823 |
824 | # Return only files that haven't been embedded yet
825 | new_files = [f for f in requested_files if f not in embedded_files]
826 | logger.debug(
827 | f"[FILES] {self.name}: After filtering: {len(new_files)} new files, {len(requested_files) - len(new_files)} already embedded"
828 | )
829 | logger.debug(f"[FILES] {self.name}: New files to embed: {new_files}")
830 |
831 | # Log filtering results for debugging
832 | if len(new_files) < len(requested_files):
833 | skipped = [f for f in requested_files if f in embedded_files]
834 | logger.debug(
835 | f"{self.name} tool: Filtering {len(skipped)} files already in conversation history: {', '.join(skipped)}"
836 | )
837 | logger.debug(f"[FILES] {self.name}: Skipped (already embedded): {skipped}")
838 |
839 | return new_files
840 |
841 | except Exception as e:
842 | # If there's any issue with conversation history lookup, be conservative
843 | # and include all files rather than risk losing access to needed files
844 | logger.warning(f"{self.name} tool: Error checking conversation history for {continuation_id}: {e}")
845 | logger.warning(f"{self.name} tool: Including all requested files as fallback")
846 | logger.debug(
847 | f"[FILES] {self.name}: Exception in filter_new_files, returning all {len(requested_files)} files as fallback"
848 | )
849 | return requested_files
850 |
851 | def format_conversation_turn(self, turn: ConversationTurn) -> list[str]:
852 | """
853 | Format a conversation turn for display in conversation history.
854 |
855 | Tools can override this to provide custom formatting for their responses
856 | while maintaining the standard structure for cross-tool compatibility.
857 |
858 | This method is called by build_conversation_history when reconstructing
859 | conversation context, allowing each tool to control how its responses
860 | appear in subsequent conversation turns.
861 |
862 | Args:
863 | turn: The conversation turn to format (from utils.conversation_memory)
864 |
865 | Returns:
866 | list[str]: Lines of formatted content for this turn
867 |
868 | Example:
869 | Default implementation returns:
870 | ["Files used in this turn: file1.py, file2.py", "", "Response content..."]
871 |
872 | Tools can override to add custom sections, formatting, or metadata display.
873 | """
874 | parts = []
875 |
876 | # Add files context if present
877 | if turn.files:
878 | parts.append(f"Files used in this turn: {', '.join(turn.files)}")
879 | parts.append("") # Empty line for readability
880 |
881 | # Add the actual content
882 | parts.append(turn.content)
883 |
884 | return parts
885 |
886 | def handle_prompt_file(self, files: Optional[list[str]]) -> tuple[Optional[str], Optional[list[str]]]:
887 | """
888 | Check for and handle prompt.txt in the absolute file paths list.
889 |
890 | If prompt.txt is found, reads its content and removes it from the files list.
891 | This file is treated specially as the main prompt, not as an embedded file.
892 |
893 | This mechanism allows us to work around MCP's ~25K token limit by having
894 | the CLI save large prompts to a file, effectively using the file transfer
895 | mechanism to bypass token constraints while preserving response capacity.
896 |
897 | Args:
898 | files: List of absolute file paths (will be translated for current environment)
899 |
900 | Returns:
901 | tuple: (prompt_content, updated_files_list)
902 | """
903 | if not files:
904 | return None, files
905 |
906 | prompt_content = None
907 | updated_files = []
908 |
909 | for file_path in files:
910 |
911 | # Check if the filename is exactly "prompt.txt"
912 | # This ensures we don't match files like "myprompt.txt" or "prompt.txt.bak"
913 | if os.path.basename(file_path) == "prompt.txt":
914 | try:
915 | # Read prompt.txt content and extract just the text
916 | content, _ = read_file_content(file_path)
917 | # Extract the content between the file markers
918 | if "--- BEGIN FILE:" in content and "--- END FILE:" in content:
919 | lines = content.split("\n")
920 | in_content = False
921 | content_lines = []
922 | for line in lines:
923 | if line.startswith("--- BEGIN FILE:"):
924 | in_content = True
925 | continue
926 | elif line.startswith("--- END FILE:"):
927 | break
928 | elif in_content:
929 | content_lines.append(line)
930 | prompt_content = "\n".join(content_lines)
931 | else:
932 | # Fallback: if it's already raw content (from tests or direct input)
933 | # and doesn't have error markers, use it directly
934 | if not content.startswith("\n--- ERROR"):
935 | prompt_content = content
936 | else:
937 | prompt_content = None
938 | except Exception:
939 | # If we can't read the file, we'll just skip it
940 | # The error will be handled elsewhere
941 | pass
942 | else:
943 | # Keep the original path in the files list (will be translated later by read_files)
944 | updated_files.append(file_path)
945 |
946 | return prompt_content, updated_files if updated_files else None
947 |
948 | def get_prompt_content_for_size_validation(self, user_content: str) -> str:
949 | """
950 | Get the content that should be validated for MCP prompt size limits.
951 |
952 | This hook method allows tools to specify what content should be checked
953 | against the MCP transport size limit. By default, it returns the user content,
954 | but can be overridden to exclude conversation history when needed.
955 |
956 | Args:
957 | user_content: The user content that would normally be validated
958 |
959 | Returns:
960 | The content that should actually be validated for size limits
961 | """
962 | # Default implementation: validate the full user content
963 | return user_content
964 |
965 | def check_prompt_size(self, text: str) -> Optional[dict[str, Any]]:
966 | """
967 | Check if USER INPUT text is too large for MCP transport boundary.
968 |
969 | IMPORTANT: This method should ONLY be used to validate user input that crosses
970 | the CLI ↔ MCP Server transport boundary. It should NOT be used to limit
971 | internal MCP Server operations.
972 |
973 | Args:
974 | text: The user input text to check (NOT internal prompt content)
975 |
976 | Returns:
977 | Optional[Dict[str, Any]]: Response asking for file handling if too large, None otherwise
978 | """
979 | if text and len(text) > MCP_PROMPT_SIZE_LIMIT:
980 | return {
981 | "status": "resend_prompt",
982 | "content": (
983 | f"MANDATORY ACTION REQUIRED: The prompt is too large for MCP's token limits (>{MCP_PROMPT_SIZE_LIMIT:,} characters). "
984 | "YOU MUST IMMEDIATELY save the prompt text to a temporary file named 'prompt.txt' in the working directory. "
985 | "DO NOT attempt to shorten or modify the prompt. SAVE IT AS-IS to 'prompt.txt'. "
986 | "Then resend the request, passing the absolute file path to 'prompt.txt' as part of the tool call, "
987 | "along with any other files you wish to share as context. Leave the prompt text itself empty or very brief in the new request. "
988 | "This is the ONLY way to handle large prompts - you MUST follow these exact steps."
989 | ),
990 | "content_type": "text",
991 | "metadata": {
992 | "prompt_size": len(text),
993 | "limit": MCP_PROMPT_SIZE_LIMIT,
994 | "instructions": "MANDATORY: Save prompt to 'prompt.txt' in current folder and provide full path when recalling this tool.",
995 | },
996 | }
997 | return None
998 |
999 | def _prepare_file_content_for_prompt(
1000 | self,
1001 | request_files: list[str],
1002 | continuation_id: Optional[str],
1003 | context_description: str = "New files",
1004 | max_tokens: Optional[int] = None,
1005 | reserve_tokens: int = 1_000,
1006 | remaining_budget: Optional[int] = None,
1007 | arguments: Optional[dict] = None,
1008 | model_context: Optional[Any] = None,
1009 | ) -> tuple[str, list[str]]:
1010 | """
1011 | Centralized file processing implementing dual prioritization strategy.
1012 |
1013 | This method is the heart of conversation-aware file processing across all tools.
1014 |
1015 | Args:
1016 | request_files: List of files requested for current tool execution
1017 | continuation_id: Thread continuation ID, or None for new conversations
1018 | context_description: Description for token limit validation (e.g. "Code", "New files")
1019 | max_tokens: Maximum tokens to use (defaults to remaining budget or model-specific content allocation)
1020 | reserve_tokens: Tokens to reserve for additional prompt content (default 1K)
1021 | remaining_budget: Remaining token budget after conversation history (from server.py)
1022 | arguments: Original tool arguments (used to extract _remaining_tokens if available)
1023 | model_context: Model context object with all model information including token allocation
1024 |
1025 | Returns:
1026 | tuple[str, list[str]]: (formatted_file_content, actually_processed_files)
1027 | - formatted_file_content: Formatted file content string ready for prompt inclusion
1028 | - actually_processed_files: List of individual file paths that were actually read and embedded
1029 | (directories are expanded to individual files)
1030 | """
1031 | if not request_files:
1032 | return "", []
1033 |
1034 | # Extract remaining budget from arguments if available
1035 | if remaining_budget is None:
1036 | # Use provided arguments or fall back to stored arguments from execute()
1037 | args_to_use = arguments or getattr(self, "_current_arguments", {})
1038 | remaining_budget = args_to_use.get("_remaining_tokens")
1039 |
1040 | # Use remaining budget if provided, otherwise fall back to max_tokens or model-specific default
1041 | if remaining_budget is not None:
1042 | effective_max_tokens = remaining_budget - reserve_tokens
1043 | elif max_tokens is not None:
1044 | effective_max_tokens = max_tokens - reserve_tokens
1045 | else:
1046 | # Use model_context for token allocation
1047 | if not model_context:
1048 | # Try to get from stored attributes as fallback
1049 | model_context = getattr(self, "_model_context", None)
1050 | if not model_context:
1051 | logger.error(
1052 | f"[FILES] {self.name}: _prepare_file_content_for_prompt called without model_context. "
1053 | "This indicates an incorrect call sequence in the tool's implementation."
1054 | )
1055 | raise RuntimeError("Model context not provided for file preparation.")
1056 |
1057 | # This is now the single source of truth for token allocation.
1058 | try:
1059 | token_allocation = model_context.calculate_token_allocation()
1060 | # Standardize on `file_tokens` for consistency and correctness.
1061 | effective_max_tokens = token_allocation.file_tokens - reserve_tokens
1062 | logger.debug(
1063 | f"[FILES] {self.name}: Using model context for {model_context.model_name}: "
1064 | f"{token_allocation.file_tokens:,} file tokens from {token_allocation.total_tokens:,} total"
1065 | )
1066 | except Exception as e:
1067 | logger.error(
1068 | f"[FILES] {self.name}: Failed to calculate token allocation from model context: {e}", exc_info=True
1069 | )
1070 | # If the context exists but calculation fails, we still need to prevent a crash.
1071 | # A loud error is logged, and we fall back to a safe default.
1072 | effective_max_tokens = 100_000 - reserve_tokens
1073 |
1074 | # Ensure we have a reasonable minimum budget
1075 | effective_max_tokens = max(1000, effective_max_tokens)
1076 |
1077 | files_to_embed = self.filter_new_files(request_files, continuation_id)
1078 | logger.debug(f"[FILES] {self.name}: Will embed {len(files_to_embed)} files after filtering")
1079 |
1080 | # Log the specific files for debugging/testing
1081 | if files_to_embed:
1082 | logger.info(
1083 | f"[FILE_PROCESSING] {self.name} tool will embed new files: {', '.join([os.path.basename(f) for f in files_to_embed])}"
1084 | )
1085 | else:
1086 | logger.info(
1087 | f"[FILE_PROCESSING] {self.name} tool: No new files to embed (all files already in conversation history)"
1088 | )
1089 |
1090 | content_parts = []
1091 | actually_processed_files = []
1092 |
1093 | # Read content of new files only
1094 | if files_to_embed:
1095 | logger.debug(f"{self.name} tool embedding {len(files_to_embed)} new files: {', '.join(files_to_embed)}")
1096 | logger.debug(
1097 | f"[FILES] {self.name}: Starting file embedding with token budget {effective_max_tokens + reserve_tokens:,}"
1098 | )
1099 | try:
1100 | # Before calling read_files, expand directories to get individual file paths
1101 | from utils.file_utils import expand_paths
1102 |
1103 | expanded_files = expand_paths(files_to_embed)
1104 | logger.debug(
1105 | f"[FILES] {self.name}: Expanded {len(files_to_embed)} paths to {len(expanded_files)} individual files"
1106 | )
1107 |
1108 | file_content = read_files(
1109 | files_to_embed,
1110 | max_tokens=effective_max_tokens + reserve_tokens,
1111 | reserve_tokens=reserve_tokens,
1112 | include_line_numbers=self.wants_line_numbers_by_default(),
1113 | )
1114 | # Note: No need to validate against MCP_PROMPT_SIZE_LIMIT here
1115 | # read_files already handles token-aware truncation based on model's capabilities
1116 | content_parts.append(file_content)
1117 |
1118 | # Track the expanded files as actually processed
1119 | actually_processed_files.extend(expanded_files)
1120 |
1121 | # Estimate tokens for debug logging
1122 | from utils.token_utils import estimate_tokens
1123 |
1124 | content_tokens = estimate_tokens(file_content)
1125 | logger.debug(
1126 | f"{self.name} tool successfully embedded {len(files_to_embed)} files ({content_tokens:,} tokens)"
1127 | )
1128 | logger.debug(f"[FILES] {self.name}: Successfully embedded files - {content_tokens:,} tokens used")
1129 | logger.debug(
1130 | f"[FILES] {self.name}: Actually processed {len(actually_processed_files)} individual files"
1131 | )
1132 | except Exception as e:
1133 | logger.error(f"{self.name} tool failed to embed files {files_to_embed}: {type(e).__name__}: {e}")
1134 | logger.debug(f"[FILES] {self.name}: File embedding failed - {type(e).__name__}: {e}")
1135 | raise
1136 | else:
1137 | logger.debug(f"[FILES] {self.name}: No files to embed after filtering")
1138 |
1139 | # Generate note about files already in conversation history
1140 | if continuation_id and len(files_to_embed) < len(request_files):
1141 | embedded_files = self.get_conversation_embedded_files(continuation_id)
1142 | skipped_files = [f for f in request_files if f in embedded_files]
1143 | if skipped_files:
1144 | logger.debug(
1145 | f"{self.name} tool skipping {len(skipped_files)} files already in conversation history: {', '.join(skipped_files)}"
1146 | )
1147 | logger.debug(f"[FILES] {self.name}: Adding note about {len(skipped_files)} skipped files")
1148 | if content_parts:
1149 | content_parts.append("\n\n")
1150 | note_lines = [
1151 | "--- NOTE: Additional files referenced in conversation history ---",
1152 | "The following files are already available in our conversation context:",
1153 | "\n".join(f" - {f}" for f in skipped_files),
1154 | "--- END NOTE ---",
1155 | ]
1156 | content_parts.append("\n".join(note_lines))
1157 | else:
1158 | logger.debug(f"[FILES] {self.name}: No skipped files to note")
1159 |
1160 | result = "".join(content_parts) if content_parts else ""
1161 | logger.debug(
1162 | f"[FILES] {self.name}: _prepare_file_content_for_prompt returning {len(result)} chars, {len(actually_processed_files)} processed files"
1163 | )
1164 | return result, actually_processed_files
1165 |
1166 | def get_websearch_instruction(self, tool_specific: Optional[str] = None) -> str:
1167 | """
1168 | Generate standardized web search instruction.
1169 |
1170 | Args:
1171 | tool_specific: Optional tool-specific search guidance
1172 |
1173 | Returns:
1174 | str: Web search instruction to append to prompt
1175 | """
1176 |
1177 | base_instruction = """
1178 |
1179 | WEB SEARCH CAPABILITY: You can request the calling agent to perform web searches to enhance your analysis with current information!
1180 |
1181 | IMPORTANT: When you identify areas where web searches would significantly improve your response (such as checking current documentation, finding recent solutions, verifying best practices, or gathering community insights), you MUST explicitly instruct the agent to perform specific web searches and then respond back using the continuation_id from this response to continue the analysis.
1182 |
1183 | Use clear, direct language based on the value of the search:
1184 |
1185 | For valuable supplementary information: "Please perform a web search on '[specific topic/query]' and then continue this analysis using the continuation_id from this response if you find relevant information."
1186 |
1187 | For important missing information: "Please search for '[specific topic/query]' and respond back with the findings using the continuation_id from this response - this information is needed to provide a complete analysis."
1188 |
1189 | For critical/essential information: "SEARCH REQUIRED: Please immediately perform a web search on '[specific topic/query]' and respond back with the results using the continuation_id from this response. Cannot provide accurate analysis without this current information."
1190 |
1191 | This ensures you get the most current and comprehensive information while maintaining conversation context through the continuation_id."""
1192 |
1193 | if tool_specific:
1194 | return f"""{base_instruction}
1195 |
1196 | {tool_specific}
1197 |
1198 | When recommending searches, be specific about what information you need and why it would improve your analysis."""
1199 |
1200 | # Default instruction for all tools
1201 | return f"""{base_instruction}
1202 |
1203 | Consider requesting searches for:
1204 | - Current documentation and API references
1205 | - Recent best practices and patterns
1206 | - Known issues and community solutions
1207 | - Framework updates and compatibility
1208 | - Security advisories and patches
1209 | - Performance benchmarks and optimizations
1210 |
1211 | When recommending searches, be specific about what information you need and why it would improve your analysis. Always remember to instruct agent to use the continuation_id from this response when providing search results."""
1212 |
1213 | def get_language_instruction(self) -> str:
1214 | """
1215 | Generate language instruction based on LOCALE configuration.
1216 |
1217 | Returns:
1218 | str: Language instruction to prepend to prompt, or empty string if
1219 | no locale set
1220 | """
1221 | # Read LOCALE directly from environment to support dynamic changes
1222 | # Tests can monkeypatch LOCALE via the environment helper (or .env when override is enforced)
1223 |
1224 | locale = (get_env("LOCALE", "") or "").strip()
1225 |
1226 | if not locale:
1227 | return ""
1228 |
1229 | # Simple language instruction
1230 | return f"Always respond in {locale}.\n\n"
1231 |
1232 | # === ABSTRACT METHODS FOR SIMPLE TOOLS ===
1233 |
1234 | @abstractmethod
1235 | async def prepare_prompt(self, request) -> str:
1236 | """
1237 | Prepare the complete prompt for the AI model.
1238 |
1239 | This method should construct the full prompt by combining:
1240 | - System prompt from get_system_prompt()
1241 | - File content from _prepare_file_content_for_prompt()
1242 | - Conversation history from reconstruct_thread_context()
1243 | - User's request and any tool-specific context
1244 |
1245 | Args:
1246 | request: The validated request object
1247 |
1248 | Returns:
1249 | str: Complete prompt ready for the AI model
1250 | """
1251 | pass
1252 |
1253 | def format_response(self, response: str, request, model_info: dict = None) -> str:
1254 | """
1255 | Format the AI model's response for the user.
1256 |
1257 | This method allows tools to post-process the model's response,
1258 | adding structure, validation, or additional context.
1259 |
1260 | The default implementation returns the response unchanged.
1261 | Tools can override this method to add custom formatting.
1262 |
1263 | Args:
1264 | response: Raw response from the AI model
1265 | request: The original request object
1266 | model_info: Optional model information and metadata
1267 |
1268 | Returns:
1269 | str: Formatted response ready for the user
1270 | """
1271 | return response
1272 |
1273 | # === IMPLEMENTATION METHODS ===
1274 | # These will be provided in a full implementation but are inherited from current base.py
1275 | # for now to maintain compatibility.
1276 |
1277 | async def execute(self, arguments: dict[str, Any]) -> list[TextContent]:
1278 | """Execute the tool - will be inherited from existing base.py for now."""
1279 | # This will be implemented by importing from the current base.py
1280 | # for backward compatibility during the migration
1281 | raise NotImplementedError("Subclasses must implement execute method")
1282 |
1283 | def _should_require_model_selection(self, model_name: str) -> bool:
1284 | """
1285 | Check if we should require the CLI to select a model at runtime.
1286 |
1287 | This is called during request execution to determine if we need
1288 | to return an error asking the CLI to provide a model parameter.
1289 |
1290 | Args:
1291 | model_name: The model name from the request or DEFAULT_MODEL
1292 |
1293 | Returns:
1294 | bool: True if we should require model selection
1295 | """
1296 | # Case 1: Model is explicitly "auto"
1297 | if model_name.lower() == "auto":
1298 | return True
1299 |
1300 | # Case 2: Requested model is not available
1301 | from providers.registry import ModelProviderRegistry
1302 |
1303 | provider = ModelProviderRegistry.get_provider_for_model(model_name)
1304 | if not provider:
1305 | logger.warning(f"Model '{model_name}' is not available with current API keys. Requiring model selection.")
1306 | return True
1307 |
1308 | return False
1309 |
1310 | def _get_available_models(self) -> list[str]:
1311 | """
1312 | Get list of models available from enabled providers.
1313 |
1314 | Only returns models from providers that have valid API keys configured.
1315 | This fixes the namespace collision bug where models from disabled providers
1316 | were shown to the CLI, causing routing conflicts.
1317 |
1318 | Returns:
1319 | List of model names from enabled providers only
1320 | """
1321 | from providers.registry import ModelProviderRegistry
1322 |
1323 | # Get models from enabled providers only (those with valid API keys)
1324 | all_models = ModelProviderRegistry.get_available_model_names()
1325 |
1326 | # Add OpenRouter models and their aliases when OpenRouter is configured
1327 | openrouter_key = get_env("OPENROUTER_API_KEY")
1328 | if openrouter_key and openrouter_key != "your_openrouter_api_key_here":
1329 | try:
1330 | registry = self._get_openrouter_registry()
1331 |
1332 | for alias in registry.list_aliases():
1333 | if alias not in all_models:
1334 | all_models.append(alias)
1335 | except Exception as exc: # pragma: no cover - logged for observability
1336 | import logging
1337 |
1338 | logging.debug(f"Failed to add OpenRouter models to enum: {exc}")
1339 |
1340 | # Add custom models (and their aliases) when a custom endpoint is available
1341 | custom_url = get_env("CUSTOM_API_URL")
1342 | if custom_url:
1343 | try:
1344 | registry = self._get_custom_registry()
1345 | for alias in registry.list_aliases():
1346 | if alias not in all_models:
1347 | all_models.append(alias)
1348 | except Exception as exc: # pragma: no cover - logged for observability
1349 | import logging
1350 |
1351 | logging.debug(f"Failed to add custom models to enum: {exc}")
1352 |
1353 | # Remove duplicates while preserving insertion order
1354 | seen: set[str] = set()
1355 | unique_models: list[str] = []
1356 | for model in all_models:
1357 | if model not in seen:
1358 | seen.add(model)
1359 | unique_models.append(model)
1360 |
1361 | return unique_models
1362 |
1363 | def _resolve_model_context(self, arguments: dict, request) -> tuple[str, Any]:
1364 | """
1365 | Resolve model context and name using centralized logic.
1366 |
1367 | This method extracts the model resolution logic from execute() so it can be
1368 | reused by tools that override execute() (like debug tool) without duplicating code.
1369 |
1370 | Args:
1371 | arguments: Dictionary of arguments from the MCP client
1372 | request: The validated request object
1373 |
1374 | Returns:
1375 | tuple[str, ModelContext]: (resolved_model_name, model_context)
1376 |
1377 | Raises:
1378 | ValueError: If model resolution fails or model selection is required
1379 | """
1380 | # MODEL RESOLUTION NOW HAPPENS AT MCP BOUNDARY
1381 | # Extract pre-resolved model context from server.py
1382 | model_context = arguments.get("_model_context")
1383 | resolved_model_name = arguments.get("_resolved_model_name")
1384 |
1385 | if model_context and resolved_model_name:
1386 | # Model was already resolved at MCP boundary
1387 | model_name = resolved_model_name
1388 | logger.debug(f"Using pre-resolved model '{model_name}' from MCP boundary")
1389 | else:
1390 | # Fallback for direct execute calls
1391 | model_name = getattr(request, "model", None)
1392 | if not model_name:
1393 | from config import DEFAULT_MODEL
1394 |
1395 | model_name = DEFAULT_MODEL
1396 | logger.debug(f"Using fallback model resolution for '{model_name}' (test mode)")
1397 |
1398 | # For tests: Check if we should require model selection (auto mode)
1399 | if self._should_require_model_selection(model_name):
1400 | # Build error message based on why selection is required
1401 | if model_name.lower() == "auto":
1402 | error_message = self._build_auto_mode_required_message()
1403 | else:
1404 | error_message = self._build_model_unavailable_message(model_name)
1405 | raise ValueError(error_message)
1406 |
1407 | # Create model context for tests
1408 | from utils.model_context import ModelContext
1409 |
1410 | model_context = ModelContext(model_name)
1411 |
1412 | return model_name, model_context
1413 |
1414 | def validate_and_correct_temperature(self, temperature: float, model_context: Any) -> tuple[float, list[str]]:
1415 | """
1416 | Validate and correct temperature for the specified model.
1417 |
1418 | This method ensures that the temperature value is within the valid range
1419 | for the specific model being used. Different models have different temperature
1420 | constraints (e.g., o1 models require temperature=1.0, GPT models support 0-2).
1421 |
1422 | Args:
1423 | temperature: Temperature value to validate
1424 | model_context: Model context object containing model name, provider, and capabilities
1425 |
1426 | Returns:
1427 | Tuple of (corrected_temperature, warning_messages)
1428 | """
1429 | try:
1430 | # Use model context capabilities directly - clean OOP approach
1431 | capabilities = model_context.capabilities
1432 | constraint = capabilities.temperature_constraint
1433 |
1434 | warnings = []
1435 | if not constraint.validate(temperature):
1436 | corrected = constraint.get_corrected_value(temperature)
1437 | warning = (
1438 | f"Temperature {temperature} invalid for {model_context.model_name}. "
1439 | f"{constraint.get_description()}. Using {corrected} instead."
1440 | )
1441 | warnings.append(warning)
1442 | return corrected, warnings
1443 |
1444 | return temperature, warnings
1445 |
1446 | except Exception as e:
1447 | # If validation fails for any reason, use the original temperature
1448 | # and log a warning (but don't fail the request)
1449 | logger.warning(f"Temperature validation failed for {model_context.model_name}: {e}")
1450 | return temperature, [f"Temperature validation failed: {e}"]
1451 |
1452 | def _validate_image_limits(
1453 | self, images: Optional[list[str]], model_context: Optional[Any] = None, continuation_id: Optional[str] = None
1454 | ) -> Optional[dict]:
1455 | """
1456 | Validate image size and count against model capabilities.
1457 |
1458 | This performs strict validation to ensure we don't exceed model-specific
1459 | image limits. Uses capability-based validation with actual model
1460 | configuration rather than hard-coded limits.
1461 |
1462 | Args:
1463 | images: List of image paths/data URLs to validate
1464 | model_context: Model context object containing model name, provider, and capabilities
1465 | continuation_id: Optional continuation ID for conversation context
1466 |
1467 | Returns:
1468 | Optional[dict]: Error response if validation fails, None if valid
1469 | """
1470 | if not images:
1471 | return None
1472 |
1473 | # Import here to avoid circular imports
1474 | import base64
1475 | from pathlib import Path
1476 |
1477 | if not model_context:
1478 | # Get from tool's stored context as fallback
1479 | model_context = getattr(self, "_model_context", None)
1480 | if not model_context:
1481 | logger.warning("No model context available for image validation")
1482 | return None
1483 |
1484 | try:
1485 | # Use model context capabilities directly - clean OOP approach
1486 | capabilities = model_context.capabilities
1487 | model_name = model_context.model_name
1488 | except Exception as e:
1489 | logger.warning(f"Failed to get capabilities from model_context for image validation: {e}")
1490 | # Generic error response when capabilities cannot be accessed
1491 | model_name = getattr(model_context, "model_name", "unknown")
1492 | return {
1493 | "status": "error",
1494 | "content": self._build_model_unavailable_message(model_name),
1495 | "content_type": "text",
1496 | "metadata": {
1497 | "error_type": "validation_error",
1498 | "model_name": model_name,
1499 | "supports_images": None, # Unknown since model capabilities unavailable
1500 | "image_count": len(images) if images else 0,
1501 | },
1502 | }
1503 |
1504 | # Check if model supports images
1505 | if not capabilities.supports_images:
1506 | return {
1507 | "status": "error",
1508 | "content": (
1509 | f"Image support not available: Model '{model_name}' does not support image processing. "
1510 | f"Please use a vision-capable model such as 'gemini-2.5-flash', 'o3', "
1511 | f"or 'claude-opus-4.1' for image analysis tasks."
1512 | ),
1513 | "content_type": "text",
1514 | "metadata": {
1515 | "error_type": "validation_error",
1516 | "model_name": model_name,
1517 | "supports_images": False,
1518 | "image_count": len(images),
1519 | },
1520 | }
1521 |
1522 | # Get model image limits from capabilities
1523 | max_images = 5 # Default max number of images
1524 | max_size_mb = capabilities.max_image_size_mb
1525 |
1526 | # Check image count
1527 | if len(images) > max_images:
1528 | return {
1529 | "status": "error",
1530 | "content": (
1531 | f"Too many images: Model '{model_name}' supports a maximum of {max_images} images, "
1532 | f"but {len(images)} were provided. Please reduce the number of images."
1533 | ),
1534 | "content_type": "text",
1535 | "metadata": {
1536 | "error_type": "validation_error",
1537 | "model_name": model_name,
1538 | "image_count": len(images),
1539 | "max_images": max_images,
1540 | },
1541 | }
1542 |
1543 | # Calculate total size of all images
1544 | total_size_mb = 0.0
1545 | for image_path in images:
1546 | try:
1547 | if image_path.startswith("data:image/"):
1548 | # Handle data URL: data:image/png;base64,iVBORw0...
1549 | _, data = image_path.split(",", 1)
1550 | # Base64 encoding increases size by ~33%, so decode to get actual size
1551 | actual_size = len(base64.b64decode(data))
1552 | total_size_mb += actual_size / (1024 * 1024)
1553 | else:
1554 | # Handle file path
1555 | path = Path(image_path)
1556 | if path.exists():
1557 | file_size = path.stat().st_size
1558 | total_size_mb += file_size / (1024 * 1024)
1559 | else:
1560 | logger.warning(f"Image file not found: {image_path}")
1561 | # Assume a reasonable size for missing files to avoid breaking validation
1562 | total_size_mb += 1.0 # 1MB assumption
1563 | except Exception as e:
1564 | logger.warning(f"Failed to get size for image {image_path}: {e}")
1565 | # Assume a reasonable size for problematic files
1566 | total_size_mb += 1.0 # 1MB assumption
1567 |
1568 | # Apply 40MB cap for custom models if needed
1569 | effective_limit_mb = max_size_mb
1570 | try:
1571 | from providers.shared import ProviderType
1572 |
1573 | # ModelCapabilities dataclass has provider field defined
1574 | if capabilities.provider == ProviderType.CUSTOM:
1575 | effective_limit_mb = min(max_size_mb, 40.0)
1576 | except Exception:
1577 | pass
1578 |
1579 | # Validate against size limit
1580 | if total_size_mb > effective_limit_mb:
1581 | return {
1582 | "status": "error",
1583 | "content": (
1584 | f"Image size limit exceeded: Model '{model_name}' supports maximum {effective_limit_mb:.1f}MB "
1585 | f"for all images combined, but {total_size_mb:.1f}MB was provided. "
1586 | f"Please reduce image sizes or count and try again."
1587 | ),
1588 | "content_type": "text",
1589 | "metadata": {
1590 | "error_type": "validation_error",
1591 | "model_name": model_name,
1592 | "total_size_mb": round(total_size_mb, 2),
1593 | "limit_mb": round(effective_limit_mb, 2),
1594 | "image_count": len(images),
1595 | "supports_images": True,
1596 | },
1597 | }
1598 |
1599 | # All validations passed
1600 | logger.debug(f"Image validation passed: {len(images)} images, {total_size_mb:.1f}MB total")
1601 | return None
1602 |
1603 | def _parse_response(self, raw_text: str, request, model_info: Optional[dict] = None):
1604 | """Parse response - will be inherited for now."""
1605 | # Implementation inherited from current base.py
1606 | raise NotImplementedError("Subclasses must implement _parse_response method")
1607 |
```
--------------------------------------------------------------------------------
/tools/workflow/workflow_mixin.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Workflow Mixin for Zen MCP Tools
3 |
4 | This module provides a sophisticated workflow-based pattern that enables tools to
5 | perform multi-step work with structured findings and expert analysis.
6 |
7 | Key Components:
8 | - BaseWorkflowMixin: Abstract base class providing comprehensive workflow functionality
9 |
10 | The workflow pattern enables tools like debug, precommit, and codereview to perform
11 | systematic multi-step work with pause/resume capabilities, context-aware file embedding,
12 | and seamless integration with external AI models for expert analysis.
13 |
14 | Features:
15 | - Multi-step workflow orchestration with pause/resume
16 | - Context-aware file embedding optimization
17 | - Expert analysis integration with token budgeting
18 | - Conversation memory and threading support
19 | - Proper inheritance-based architecture (no hasattr/getattr)
20 | - Comprehensive type annotations for IDE support
21 | """
22 |
23 | import json
24 | import logging
25 | import os
26 | import re
27 | from abc import ABC, abstractmethod
28 | from typing import Any, Optional
29 |
30 | from mcp.types import TextContent
31 |
32 | from config import MCP_PROMPT_SIZE_LIMIT
33 | from utils.conversation_memory import add_turn, create_thread
34 |
35 | from ..shared.base_models import ConsolidatedFindings
36 | from ..shared.exceptions import ToolExecutionError
37 |
38 | logger = logging.getLogger(__name__)
39 |
40 |
41 | class BaseWorkflowMixin(ABC):
42 | """
43 | Abstract base class providing guided workflow functionality for tools.
44 |
45 | This class implements a sophisticated workflow pattern where the CLI performs
46 | systematic local work before calling external models for expert analysis.
47 | Tools can inherit from this class to gain comprehensive workflow capabilities.
48 |
49 | Architecture:
50 | - Uses proper inheritance patterns instead of hasattr/getattr
51 | - Provides hook methods with default implementations
52 | - Requires abstract methods to be implemented by subclasses
53 | - Fully type-annotated for excellent IDE support
54 |
55 | Context-Aware File Embedding:
56 | - Intermediate steps: Only reference file names (saves the CLI's context)
57 | - Final steps: Embed full file content for expert analysis
58 | - Integrates with existing token budgeting infrastructure
59 |
60 | Requirements:
61 | This class expects to be used with BaseTool and requires implementation of:
62 | - get_model_provider(model_name)
63 | - _resolve_model_context(arguments, request)
64 | - get_system_prompt()
65 | - get_default_temperature()
66 | - _prepare_file_content_for_prompt()
67 | """
68 |
69 | def __init__(self) -> None:
70 | super().__init__()
71 | self.work_history: list[dict[str, Any]] = []
72 | self.consolidated_findings: ConsolidatedFindings = ConsolidatedFindings()
73 | self.initial_request: Optional[str] = None
74 |
75 | # ================================================================================
76 | # Abstract Methods - Required Implementation by BaseTool or Subclasses
77 | # ================================================================================
78 |
79 | @abstractmethod
80 | def get_name(self) -> str:
81 | """Return the name of this tool. Usually provided by BaseTool."""
82 | pass
83 |
84 | @abstractmethod
85 | def get_workflow_request_model(self) -> type:
86 | """Return the request model class for this workflow tool."""
87 | pass
88 |
89 | @abstractmethod
90 | def get_system_prompt(self) -> str:
91 | """Return the system prompt for this tool. Usually provided by BaseTool."""
92 | pass
93 |
94 | @abstractmethod
95 | def get_language_instruction(self) -> str:
96 | """Return the language instruction for localization. Usually provided by BaseTool."""
97 | pass
98 |
99 | @abstractmethod
100 | def get_default_temperature(self) -> float:
101 | """Return the default temperature for this tool. Usually provided by BaseTool."""
102 | pass
103 |
104 | @abstractmethod
105 | def get_model_provider(self, model_name: str) -> Any:
106 | """Get model provider for the given model. Usually provided by BaseTool."""
107 | pass
108 |
109 | @abstractmethod
110 | def _resolve_model_context(self, arguments: dict[str, Any], request: Any) -> tuple[str, Any]:
111 | """Resolve model context from arguments. Usually provided by BaseTool."""
112 | pass
113 |
114 | @abstractmethod
115 | def _prepare_file_content_for_prompt(
116 | self,
117 | request_files: list[str],
118 | continuation_id: Optional[str],
119 | context_description: str = "New files",
120 | max_tokens: Optional[int] = None,
121 | reserve_tokens: int = 1_000,
122 | remaining_budget: Optional[int] = None,
123 | arguments: Optional[dict[str, Any]] = None,
124 | model_context: Optional[Any] = None,
125 | ) -> tuple[str, list[str]]:
126 | """Prepare file content for prompts. Usually provided by BaseTool."""
127 | pass
128 |
129 | # ================================================================================
130 | # Abstract Methods - Tool-Specific Implementation Required
131 | # ================================================================================
132 |
133 | @abstractmethod
134 | def get_work_steps(self, request: Any) -> list[str]:
135 | """Define tool-specific work steps and criteria"""
136 | pass
137 |
138 | @abstractmethod
139 | def get_required_actions(
140 | self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
141 | ) -> list[str]:
142 | """Define required actions for each work phase.
143 |
144 | Args:
145 | step_number: Current step (1-based)
146 | confidence: Current confidence level (exploring, low, medium, high, certain)
147 | findings: Current findings text
148 | total_steps: Total estimated steps for this work
149 | request: Optional request object for continuation-aware decisions
150 |
151 | Returns:
152 | List of specific actions the CLI should take before calling tool again
153 | """
154 | pass
155 |
156 | # ================================================================================
157 | # Hook Methods - Default Implementations with Override Capability
158 | # ================================================================================
159 |
160 | def should_call_expert_analysis(self, consolidated_findings: ConsolidatedFindings, request=None) -> bool:
161 | """
162 | Decide when to call external model based on tool-specific criteria.
163 |
164 | Default implementation for tools that don't use expert analysis.
165 | Override this for tools that do use expert analysis.
166 |
167 | Args:
168 | consolidated_findings: Findings from workflow steps
169 | request: Current request object (optional for backwards compatibility)
170 | """
171 | if not self.requires_expert_analysis():
172 | return False
173 |
174 | # Check if user requested to skip assistant model
175 | if request and not self.get_request_use_assistant_model(request):
176 | return False
177 |
178 | # Default logic for tools that support expert analysis
179 | return (
180 | len(consolidated_findings.relevant_files) > 0
181 | or len(consolidated_findings.findings) >= 2
182 | or len(consolidated_findings.issues_found) > 0
183 | )
184 |
185 | def prepare_expert_analysis_context(self, consolidated_findings: ConsolidatedFindings) -> str:
186 | """
187 | Prepare context for external model call.
188 |
189 | Default implementation for tools that don't use expert analysis.
190 | Override this for tools that do use expert analysis.
191 | """
192 | if not self.requires_expert_analysis():
193 | return ""
194 |
195 | # Default context preparation
196 | context_parts = [
197 | f"=== {self.get_name().upper()} WORK SUMMARY ===",
198 | f"Total steps: {len(consolidated_findings.findings)}",
199 | f"Files examined: {len(consolidated_findings.files_checked)}",
200 | f"Relevant files: {len(consolidated_findings.relevant_files)}",
201 | "",
202 | "=== WORK PROGRESSION ===",
203 | ]
204 |
205 | for finding in consolidated_findings.findings:
206 | context_parts.append(finding)
207 |
208 | return "\n".join(context_parts)
209 |
210 | def requires_expert_analysis(self) -> bool:
211 | """
212 | Override this to completely disable expert analysis for the tool.
213 |
214 | Returns True if the tool supports expert analysis (default).
215 | Returns False if the tool is self-contained (like planner).
216 | """
217 | return True
218 |
219 | def should_include_files_in_expert_prompt(self) -> bool:
220 | """
221 | Whether to include file content in the expert analysis prompt.
222 | Override this to return True if your tool needs files in the prompt.
223 | """
224 | return False
225 |
226 | def should_embed_system_prompt(self) -> bool:
227 | """
228 | Whether to embed the system prompt in the main prompt.
229 | Override this to return True if your tool needs the system prompt embedded.
230 | """
231 | return False
232 |
233 | def get_expert_thinking_mode(self) -> str:
234 | """
235 | Get the thinking mode for expert analysis.
236 | Override this to customize the thinking mode.
237 | """
238 | return "high"
239 |
240 | def get_request_temperature(self, request) -> float:
241 | """Get temperature from request. Override for custom temperature handling."""
242 | try:
243 | return request.temperature if request.temperature is not None else self.get_default_temperature()
244 | except AttributeError:
245 | return self.get_default_temperature()
246 |
247 | def get_validated_temperature(self, request, model_context: Any) -> tuple[float, list[str]]:
248 | """
249 | Get temperature from request and validate it against model constraints.
250 |
251 | This is a convenience method that combines temperature extraction and validation
252 | for workflow tools. It ensures temperature is within valid range for the model.
253 |
254 | Args:
255 | request: The request object containing temperature
256 | model_context: Model context object containing model info
257 |
258 | Returns:
259 | Tuple of (validated_temperature, warning_messages)
260 | """
261 | temperature = self.get_request_temperature(request)
262 | return self.validate_and_correct_temperature(temperature, model_context)
263 |
264 | def get_request_thinking_mode(self, request) -> str:
265 | """Get thinking mode from request. Override for custom thinking mode handling."""
266 | try:
267 | return request.thinking_mode if request.thinking_mode is not None else self.get_expert_thinking_mode()
268 | except AttributeError:
269 | return self.get_expert_thinking_mode()
270 |
271 | def get_expert_analysis_instruction(self) -> str:
272 | """
273 | Get the instruction to append after the expert context.
274 | Override this to provide tool-specific instructions.
275 | """
276 | return "Please provide expert analysis based on the investigation findings."
277 |
278 | def get_request_use_assistant_model(self, request) -> bool:
279 | """
280 | Get use_assistant_model from request. Override for custom assistant model handling.
281 |
282 | Args:
283 | request: Current request object
284 |
285 | Returns:
286 | True if assistant model should be used, False otherwise
287 | """
288 | try:
289 | return request.use_assistant_model if request.use_assistant_model is not None else True
290 | except AttributeError:
291 | return True
292 |
293 | def get_step_guidance_message(self, request) -> str:
294 | """
295 | Get step guidance message. Override for tool-specific guidance.
296 | Default implementation uses required actions.
297 | """
298 | required_actions = self.get_required_actions(
299 | request.step_number, self.get_request_confidence(request), request.findings, request.total_steps, request
300 | )
301 |
302 | next_step_number = request.step_number + 1
303 | return (
304 | f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. "
305 | f"You MUST first work using appropriate tools. "
306 | f"REQUIRED ACTIONS before calling {self.get_name()} step {next_step_number}:\n"
307 | + "\n".join(f"{i + 1}. {action}" for i, action in enumerate(required_actions))
308 | + f"\n\nOnly call {self.get_name()} again with step_number: {next_step_number} "
309 | f"AFTER completing this work."
310 | )
311 |
312 | def _prepare_files_for_expert_analysis(self) -> str:
313 | """
314 | Prepare file content for expert analysis.
315 |
316 | EXPERT ANALYSIS REQUIRES ACTUAL FILE CONTENT:
317 | Expert analysis needs actual file content of all unique files marked as relevant
318 | throughout the workflow, regardless of conversation history optimization.
319 |
320 | SIMPLIFIED LOGIC:
321 | Expert analysis gets all unique files from relevant_files across the entire workflow.
322 | This includes:
323 | - Current step's relevant_files (consolidated_findings.relevant_files)
324 | - Plus any additional relevant_files from conversation history (if continued workflow)
325 |
326 | This ensures expert analysis has complete context without including irrelevant files.
327 | """
328 | all_relevant_files = set()
329 |
330 | # 1. Get files from current consolidated relevant_files
331 | all_relevant_files.update(self.consolidated_findings.relevant_files)
332 |
333 | # 2. Get additional relevant_files from conversation history (if continued workflow)
334 | try:
335 | current_arguments = self.get_current_arguments()
336 | if current_arguments:
337 | continuation_id = current_arguments.get("continuation_id")
338 |
339 | if continuation_id:
340 | from utils.conversation_memory import get_conversation_file_list, get_thread
341 |
342 | thread_context = get_thread(continuation_id)
343 | if thread_context:
344 | # Get all files from conversation (these were relevant_files in previous steps)
345 | conversation_files = get_conversation_file_list(thread_context)
346 | all_relevant_files.update(conversation_files)
347 | logger.debug(
348 | f"[WORKFLOW_FILES] {self.get_name()}: Added {len(conversation_files)} files from conversation history"
349 | )
350 | except Exception as e:
351 | logger.warning(f"[WORKFLOW_FILES] {self.get_name()}: Could not get conversation files: {e}")
352 |
353 | # Convert to list and remove any empty/None values
354 | files_for_expert = [f for f in all_relevant_files if f and f.strip()]
355 |
356 | if not files_for_expert:
357 | logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: No relevant files found for expert analysis")
358 | return ""
359 |
360 | # Expert analysis needs actual file content, bypassing conversation optimization
361 | try:
362 | file_content, processed_files = self._force_embed_files_for_expert_analysis(files_for_expert)
363 |
364 | logger.info(
365 | f"[WORKFLOW_FILES] {self.get_name()}: Prepared {len(processed_files)} unique relevant files for expert analysis "
366 | f"(from {len(self.consolidated_findings.relevant_files)} current relevant files)"
367 | )
368 |
369 | return file_content
370 |
371 | except Exception as e:
372 | logger.error(f"[WORKFLOW_FILES] {self.get_name()}: Failed to prepare files for expert analysis: {e}")
373 | return ""
374 |
375 | def _force_embed_files_for_expert_analysis(self, files: list[str]) -> tuple[str, list[str]]:
376 | """
377 | Force embed files for expert analysis, bypassing conversation history filtering.
378 |
379 | Expert analysis has different requirements than normal workflow steps:
380 | - Normal steps: Optimize tokens by skipping files in conversation history
381 | - Expert analysis: Needs actual file content regardless of conversation history
382 |
383 | Args:
384 | files: List of file paths to embed
385 |
386 | Returns:
387 | tuple[str, list[str]]: (file_content, processed_files)
388 | """
389 | # Use read_files directly with token budgeting, bypassing filter_new_files
390 | from utils.file_utils import expand_paths, read_files
391 |
392 | # Get token budget for files
393 | current_model_context = self.get_current_model_context()
394 | if current_model_context:
395 | try:
396 | token_allocation = current_model_context.calculate_token_allocation()
397 | max_tokens = token_allocation.file_tokens
398 | logger.debug(
399 | f"[WORKFLOW_FILES] {self.get_name()}: Using {max_tokens:,} tokens for expert analysis files"
400 | )
401 | except Exception as e:
402 | logger.warning(f"[WORKFLOW_FILES] {self.get_name()}: Failed to get token allocation: {e}")
403 | max_tokens = 100_000 # Fallback
404 | else:
405 | max_tokens = 100_000 # Fallback
406 |
407 | # Read files directly without conversation history filtering
408 | logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Force embedding {len(files)} files for expert analysis")
409 | file_content = read_files(
410 | files,
411 | max_tokens=max_tokens,
412 | reserve_tokens=1000,
413 | include_line_numbers=self.wants_line_numbers_by_default(),
414 | )
415 |
416 | # Expand paths to get individual files for tracking
417 | processed_files = expand_paths(files)
418 |
419 | logger.debug(
420 | f"[WORKFLOW_FILES] {self.get_name()}: Expert analysis embedding: {len(processed_files)} files, "
421 | f"{len(file_content):,} characters"
422 | )
423 |
424 | return file_content, processed_files
425 |
426 | def wants_line_numbers_by_default(self) -> bool:
427 | """
428 | Whether this tool wants line numbers in file content by default.
429 | Override this to customize line number behavior.
430 | """
431 | return True # Most workflow tools benefit from line numbers for analysis
432 |
433 | def _add_files_to_expert_context(self, expert_context: str, file_content: str) -> str:
434 | """
435 | Add file content to the expert context.
436 | Override this to customize how files are added to the context.
437 | """
438 | return f"{expert_context}\n\n=== ESSENTIAL FILES ===\n{file_content}\n=== END ESSENTIAL FILES ==="
439 |
440 | # ================================================================================
441 | # Context-Aware File Embedding - Core Implementation
442 | # ================================================================================
443 |
444 | def _handle_workflow_file_context(self, request: Any, arguments: dict[str, Any]) -> None:
445 | """
446 | Handle file context appropriately based on workflow phase.
447 |
448 | CONTEXT-AWARE FILE EMBEDDING STRATEGY:
449 | 1. Intermediate steps + continuation: Only reference file names (save the CLI's context)
450 | 2. Final step: Embed full file content for expert analysis
451 | 3. Expert analysis: Always embed relevant files with token budgeting
452 |
453 | This prevents wasting the CLI's limited context on intermediate steps while ensuring
454 | the final expert analysis has complete file context.
455 | """
456 | continuation_id = self.get_request_continuation_id(request)
457 | is_final_step = not self.get_request_next_step_required(request)
458 | step_number = self.get_request_step_number(request)
459 |
460 | # Extract model context for token budgeting
461 | model_context = arguments.get("_model_context")
462 | self._model_context = model_context
463 |
464 | # Clear any previous file context to ensure clean state
465 | self._embedded_file_content = ""
466 | self._file_reference_note = ""
467 | self._actually_processed_files = []
468 |
469 | # Determine if we should embed files or just reference them
470 | should_embed_files = self._should_embed_files_in_workflow_step(step_number, continuation_id, is_final_step)
471 |
472 | if should_embed_files:
473 | # Final step or expert analysis - embed full file content
474 | logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Embedding files for final step/expert analysis")
475 | self._embed_workflow_files(request, arguments)
476 | else:
477 | # Intermediate step with continuation - only reference file names
478 | logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Only referencing file names for intermediate step")
479 | self._reference_workflow_files(request)
480 |
481 | def _should_embed_files_in_workflow_step(
482 | self, step_number: int, continuation_id: Optional[str], is_final_step: bool
483 | ) -> bool:
484 | """
485 | Determine whether to embed file content based on workflow context.
486 |
487 | CORRECT LOGIC:
488 | - NEVER embed files when the CLI is getting the next step (next_step_required=True)
489 | - ONLY embed files when sending to external model (next_step_required=False)
490 |
491 | Args:
492 | step_number: Current step number
493 | continuation_id: Thread continuation ID (None for new conversations)
494 | is_final_step: Whether this is the final step (next_step_required == False)
495 |
496 | Returns:
497 | bool: True if files should be embedded, False if only referenced
498 | """
499 | # RULE 1: Final steps (no more steps needed) - embed files for expert analysis
500 | if is_final_step:
501 | logger.debug("[WORKFLOW_FILES] Final step - will embed files for expert analysis")
502 | return True
503 |
504 | # RULE 2: Any intermediate step (more steps needed) - NEVER embed files
505 | # This includes:
506 | # - New conversations with next_step_required=True
507 | # - Steps with continuation_id and next_step_required=True
508 | logger.debug("[WORKFLOW_FILES] Intermediate step (more work needed) - will only reference files")
509 | return False
510 |
511 | def _embed_workflow_files(self, request: Any, arguments: dict[str, Any]) -> None:
512 | """
513 | Embed full file content for final steps and expert analysis.
514 | Uses proper token budgeting like existing debug.py.
515 | """
516 | # Use relevant_files as the standard field for workflow tools
517 | request_files = self.get_request_relevant_files(request)
518 | if not request_files:
519 | logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: No relevant_files to embed")
520 | return
521 |
522 | try:
523 | # Model context should be available from early validation, but might be deferred for tests
524 | current_model_context = self.get_current_model_context()
525 | if not current_model_context:
526 | # Try to resolve model context now (deferred from early validation)
527 | try:
528 | model_name, model_context = self._resolve_model_context(arguments, request)
529 | self._model_context = model_context
530 | self._current_model_name = model_name
531 | except Exception as e:
532 | logger.error(f"[WORKFLOW_FILES] {self.get_name()}: Failed to resolve model context: {e}")
533 | # Create fallback model context (preserves existing test behavior)
534 | from utils.model_context import ModelContext
535 |
536 | model_name = self.get_request_model_name(request)
537 | self._model_context = ModelContext(model_name)
538 | self._current_model_name = model_name
539 |
540 | # Use the same file preparation logic as BaseTool with token budgeting
541 | continuation_id = self.get_request_continuation_id(request)
542 | remaining_tokens = arguments.get("_remaining_tokens")
543 |
544 | file_content, processed_files = self._prepare_file_content_for_prompt(
545 | request_files,
546 | continuation_id,
547 | "Workflow files for analysis",
548 | remaining_budget=remaining_tokens,
549 | arguments=arguments,
550 | model_context=self._model_context,
551 | )
552 |
553 | # Store for use in expert analysis
554 | self._embedded_file_content = file_content
555 | self._actually_processed_files = processed_files
556 |
557 | logger.info(
558 | f"[WORKFLOW_FILES] {self.get_name()}: Embedded {len(processed_files)} relevant_files for final analysis"
559 | )
560 |
561 | except Exception as e:
562 | logger.error(f"[WORKFLOW_FILES] {self.get_name()}: Failed to embed files: {e}")
563 | # Continue without file embedding rather than failing
564 | self._embedded_file_content = ""
565 | self._actually_processed_files = []
566 |
567 | def _reference_workflow_files(self, request: Any) -> None:
568 | """
569 | Reference file names without embedding content for intermediate steps.
570 | Saves the CLI's context while still providing file awareness.
571 | """
572 | # Workflow tools use relevant_files, not files
573 | request_files = self.get_request_relevant_files(request)
574 | logger.debug(
575 | f"[WORKFLOW_FILES] {self.get_name()}: _reference_workflow_files called with {len(request_files)} relevant_files"
576 | )
577 |
578 | if not request_files:
579 | logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: No files to reference, skipping")
580 | return
581 |
582 | # Store file references for conversation context
583 | self._referenced_files = request_files
584 |
585 | # Create a simple reference note
586 | file_names = [os.path.basename(f) for f in request_files]
587 | reference_note = f"Files referenced in this step: {', '.join(file_names)}\n"
588 |
589 | self._file_reference_note = reference_note
590 | logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Set _file_reference_note: {self._file_reference_note}")
591 |
592 | logger.info(
593 | f"[WORKFLOW_FILES] {self.get_name()}: Referenced {len(request_files)} files without embedding content"
594 | )
595 |
596 | # ================================================================================
597 | # Main Workflow Orchestration
598 | # ================================================================================
599 |
600 | async def execute_workflow(self, arguments: dict[str, Any]) -> list[TextContent]:
601 | """
602 | Main workflow orchestration following debug tool pattern.
603 |
604 | Comprehensive workflow implementation that handles all common patterns:
605 | 1. Request validation and step management
606 | 2. Continuation and backtracking support
607 | 3. Step data processing and consolidation
608 | 4. Tool-specific field mapping and customization
609 | 5. Completion logic with optional expert analysis
610 | 6. Generic "certain confidence" handling
611 | 7. Step guidance and required actions
612 | 8. Conversation memory integration
613 | """
614 | from mcp.types import TextContent
615 |
616 | try:
617 | # Store arguments for access by helper methods
618 | self._current_arguments = arguments
619 |
620 | # Validate request using tool-specific model
621 | request = self.get_workflow_request_model()(**arguments)
622 |
623 | # Validate step field size (basic validation for workflow instructions)
624 | # If step is too large, user should use shorter instructions and put details in files
625 | step_content = request.step
626 | if step_content and len(step_content) > MCP_PROMPT_SIZE_LIMIT:
627 | from tools.models import ToolOutput
628 |
629 | error_output = ToolOutput(
630 | status="resend_prompt",
631 | content="Step instructions are too long. Please use shorter instructions and provide detailed context via file paths instead.",
632 | content_type="text",
633 | metadata={"prompt_size": len(step_content), "limit": MCP_PROMPT_SIZE_LIMIT},
634 | )
635 | raise ValueError(f"MCP_SIZE_CHECK:{error_output.model_dump_json()}")
636 |
637 | # Validate file paths for security (same as base tool)
638 | # Use try/except instead of hasattr as per coding standards
639 | try:
640 | path_error = self.validate_file_paths(request)
641 | if path_error:
642 | from tools.models import ToolOutput
643 |
644 | error_output = ToolOutput(
645 | status="error",
646 | content=path_error,
647 | content_type="text",
648 | )
649 | logger.error("Path validation failed for %s: %s", self.get_name(), path_error)
650 | raise ToolExecutionError(error_output.model_dump_json())
651 | except AttributeError:
652 | # validate_file_paths method not available - skip validation
653 | pass
654 |
655 | # Try to validate model availability early for production scenarios
656 | # For tests, defer model validation to later to allow mocks to work
657 | try:
658 | model_name, model_context = self._resolve_model_context(arguments, request)
659 | # Store for later use
660 | self._current_model_name = model_name
661 | self._model_context = model_context
662 | except ValueError as e:
663 | # Model resolution failed - in production this would be an error,
664 | # but for tests we defer to allow mocks to handle model resolution
665 | logger.debug(f"Early model validation failed, deferring to later: {e}")
666 | self._current_model_name = None
667 | self._model_context = None
668 |
669 | # Handle continuation
670 | continuation_id = request.continuation_id
671 |
672 | # Restore workflow state on continuation
673 | if continuation_id:
674 | from utils.conversation_memory import get_thread
675 |
676 | thread = get_thread(continuation_id)
677 | if thread and thread.turns:
678 | # Find the most recent assistant turn from this tool with workflow state
679 | for turn in reversed(thread.turns):
680 | if turn.role == "assistant" and turn.tool_name == self.get_name() and turn.model_metadata:
681 | state = turn.model_metadata
682 | if isinstance(state, dict) and "work_history" in state:
683 | self.work_history = state.get("work_history", [])
684 | self.initial_request = state.get("initial_request")
685 | # Rebuild consolidated findings from restored history
686 | self._reprocess_consolidated_findings()
687 | logger.debug(
688 | f"[{self.get_name()}] Restored workflow state with {len(self.work_history)} history items"
689 | )
690 | break # State restored, exit loop
691 |
692 | # Adjust total steps if needed
693 | if request.step_number > request.total_steps:
694 | request.total_steps = request.step_number
695 |
696 | # Create thread for first step
697 | if not continuation_id and request.step_number == 1:
698 | clean_args = {k: v for k, v in arguments.items() if k not in ["_model_context", "_resolved_model_name"]}
699 | continuation_id = create_thread(self.get_name(), clean_args)
700 | self.initial_request = request.step
701 | # Allow tools to store initial description for expert analysis
702 | self.store_initial_issue(request.step)
703 |
704 | # Process work step - allow tools to customize field mapping
705 | step_data = self.prepare_step_data(request)
706 |
707 | # Store in history
708 | self.work_history.append(step_data)
709 |
710 | # Update consolidated findings
711 | self._update_consolidated_findings(step_data)
712 |
713 | # Handle file context appropriately based on workflow phase
714 | self._handle_workflow_file_context(request, arguments)
715 |
716 | # Build response with tool-specific customization
717 | response_data = self.build_base_response(request, continuation_id)
718 |
719 | # If work is complete, handle completion logic
720 | if not request.next_step_required:
721 | response_data = await self.handle_work_completion(response_data, request, arguments)
722 | else:
723 | # Force CLI to work before calling tool again
724 | response_data = self.handle_work_continuation(response_data, request)
725 |
726 | # Allow tools to customize the final response
727 | response_data = self.customize_workflow_response(response_data, request)
728 |
729 | # Add metadata (provider_used and model_used) to workflow response
730 | self._add_workflow_metadata(response_data, arguments)
731 |
732 | # Store in conversation memory
733 | if continuation_id:
734 | self.store_conversation_turn(continuation_id, response_data, request)
735 |
736 | return [TextContent(type="text", text=json.dumps(response_data, indent=2, ensure_ascii=False))]
737 |
738 | except ToolExecutionError:
739 | raise
740 | except Exception as e:
741 | if str(e).startswith("MCP_SIZE_CHECK:"):
742 | payload = str(e)[len("MCP_SIZE_CHECK:") :]
743 | raise ToolExecutionError(payload)
744 |
745 | logger.error(f"Error in {self.get_name()} work: {e}", exc_info=True)
746 | error_data = {
747 | "status": f"{self.get_name()}_failed",
748 | "error": str(e),
749 | "step_number": arguments.get("step_number", 0),
750 | }
751 |
752 | # Add metadata to error responses too
753 | self._add_workflow_metadata(error_data, arguments)
754 |
755 | raise ToolExecutionError(json.dumps(error_data, indent=2, ensure_ascii=False)) from e
756 |
757 | # Hook methods for tool customization
758 |
759 | def prepare_step_data(self, request) -> dict:
760 | """
761 | Prepare step data from request. Tools can override to customize field mapping.
762 | """
763 | step_data = {
764 | "step": request.step,
765 | "step_number": request.step_number,
766 | "findings": request.findings,
767 | "files_checked": self.get_request_files_checked(request),
768 | "relevant_files": self.get_request_relevant_files(request),
769 | "relevant_context": self.get_request_relevant_context(request),
770 | "issues_found": self.get_request_issues_found(request),
771 | "confidence": self.get_request_confidence(request),
772 | "hypothesis": self.get_request_hypothesis(request),
773 | "images": self.get_request_images(request),
774 | }
775 | return step_data
776 |
777 | def build_base_response(self, request, continuation_id: str = None) -> dict:
778 | """
779 | Build the base response structure. Tools can override for custom response fields.
780 | """
781 | response_data = {
782 | "status": f"{self.get_name()}_in_progress",
783 | "step_number": request.step_number,
784 | "total_steps": request.total_steps,
785 | "next_step_required": request.next_step_required,
786 | f"{self.get_name()}_status": {
787 | "files_checked": len(self.consolidated_findings.files_checked),
788 | "relevant_files": len(self.consolidated_findings.relevant_files),
789 | "relevant_context": len(self.consolidated_findings.relevant_context),
790 | "issues_found": len(self.consolidated_findings.issues_found),
791 | "images_collected": len(self.consolidated_findings.images),
792 | "current_confidence": self.get_request_confidence(request),
793 | },
794 | }
795 |
796 | if continuation_id:
797 | response_data["continuation_id"] = continuation_id
798 |
799 | # Add file context information based on workflow phase
800 | embedded_content = self.get_embedded_file_content()
801 | reference_note = self.get_file_reference_note()
802 | processed_files = self.get_actually_processed_files()
803 |
804 | logger.debug(
805 | f"[WORKFLOW_FILES] {self.get_name()}: Building response - has embedded_content: {bool(embedded_content)}, has reference_note: {bool(reference_note)}"
806 | )
807 |
808 | # Prioritize embedded content over references for final steps
809 | if embedded_content:
810 | # Final step - include embedded file information
811 | logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Adding fully_embedded file context")
812 | response_data["file_context"] = {
813 | "type": "fully_embedded",
814 | "files_embedded": len(processed_files),
815 | "context_optimization": "Full file content embedded for expert analysis",
816 | }
817 | elif reference_note:
818 | # Intermediate step - include file reference note
819 | logger.debug(f"[WORKFLOW_FILES] {self.get_name()}: Adding reference_only file context")
820 | response_data["file_context"] = {
821 | "type": "reference_only",
822 | "note": reference_note,
823 | "context_optimization": "Files referenced but not embedded to preserve the context window",
824 | }
825 |
826 | return response_data
827 |
828 | def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
829 | """
830 | Determine if expert analysis should be skipped due to high certainty.
831 |
832 | Default: False (always call expert analysis)
833 | Override in tools like debug to check for "certain" confidence.
834 | """
835 | return False
836 |
837 | def handle_completion_without_expert_analysis(self, request, consolidated_findings) -> dict:
838 | """
839 | Handle completion when skipping expert analysis.
840 |
841 | Tools can override this for custom high-confidence completion handling.
842 | Default implementation provides generic response.
843 | """
844 | work_summary = self.prepare_work_summary()
845 | continuation_id = self.get_request_continuation_id(request)
846 |
847 | response_data = {
848 | "status": self.get_completion_status(),
849 | f"complete_{self.get_name()}": {
850 | "initial_request": self.get_initial_request(request.step),
851 | "steps_taken": len(consolidated_findings.findings),
852 | "files_examined": list(consolidated_findings.files_checked),
853 | "relevant_files": list(consolidated_findings.relevant_files),
854 | "relevant_context": list(consolidated_findings.relevant_context),
855 | "work_summary": work_summary,
856 | "final_analysis": self.get_final_analysis_from_request(request),
857 | "confidence_level": self.get_confidence_level(request),
858 | },
859 | "next_steps": self.get_completion_message(),
860 | "skip_expert_analysis": True,
861 | "expert_analysis": {
862 | "status": self.get_skip_expert_analysis_status(),
863 | "reason": self.get_skip_reason(),
864 | },
865 | }
866 |
867 | if continuation_id:
868 | response_data["continuation_id"] = continuation_id
869 |
870 | return response_data
871 |
872 | # ================================================================================
873 | # Inheritance Hook Methods - Replace hasattr/getattr Anti-patterns
874 | # ================================================================================
875 |
876 | def get_request_confidence(self, request: Any) -> str:
877 | """Get confidence from request. Override for custom confidence handling."""
878 | try:
879 | return request.confidence or "low"
880 | except AttributeError:
881 | return "low"
882 |
883 | def get_request_relevant_context(self, request: Any) -> list[str]:
884 | """Get relevant context from request. Override for custom field mapping."""
885 | try:
886 | return request.relevant_context or []
887 | except AttributeError:
888 | return []
889 |
890 | def get_request_issues_found(self, request: Any) -> list[str]:
891 | """Get issues found from request. Override for custom field mapping."""
892 | try:
893 | return request.issues_found or []
894 | except AttributeError:
895 | return []
896 |
897 | def get_request_hypothesis(self, request: Any) -> Optional[str]:
898 | """Get hypothesis from request. Override for custom field mapping."""
899 | try:
900 | return request.hypothesis
901 | except AttributeError:
902 | return None
903 |
904 | def get_request_images(self, request: Any) -> list[str]:
905 | """Get images from request. Override for custom field mapping."""
906 | try:
907 | return request.images or []
908 | except AttributeError:
909 | return []
910 |
911 | # File Context Access Methods
912 |
913 | def get_embedded_file_content(self) -> str:
914 | """Get embedded file content. Returns empty string if not available."""
915 | try:
916 | return self._embedded_file_content or ""
917 | except AttributeError:
918 | return ""
919 |
920 | def get_file_reference_note(self) -> str:
921 | """Get file reference note. Returns empty string if not available."""
922 | try:
923 | return self._file_reference_note or ""
924 | except AttributeError:
925 | return ""
926 |
927 | def get_actually_processed_files(self) -> list[str]:
928 | """Get list of actually processed files. Returns empty list if not available."""
929 | try:
930 | return self._actually_processed_files or []
931 | except AttributeError:
932 | return []
933 |
934 | def get_current_model_context(self):
935 | """Get current model context. Returns None if not available."""
936 | try:
937 | return self._model_context
938 | except AttributeError:
939 | return None
940 |
941 | def get_request_model_name(self, request: Any) -> str:
942 | """Get model name from request. Override for custom model handling."""
943 | try:
944 | return request.model or "flash"
945 | except AttributeError:
946 | return "flash"
947 |
948 | def get_request_continuation_id(self, request: Any) -> Optional[str]:
949 | """Get continuation ID from request. Override for custom continuation handling."""
950 | try:
951 | return request.continuation_id
952 | except AttributeError:
953 | return None
954 |
955 | def get_request_next_step_required(self, request: Any) -> bool:
956 | """Get next step required from request. Override for custom step handling."""
957 | try:
958 | return request.next_step_required
959 | except AttributeError:
960 | return True
961 |
962 | def get_request_step_number(self, request: Any) -> int:
963 | """Get step number from request. Override for custom step handling."""
964 | try:
965 | return request.step_number or 1
966 | except AttributeError:
967 | return 1
968 |
969 | def get_request_relevant_files(self, request: Any) -> list[str]:
970 | """Get relevant files from request. Override for custom file handling."""
971 | try:
972 | return request.relevant_files or []
973 | except AttributeError:
974 | return []
975 |
976 | def get_request_files_checked(self, request: Any) -> list[str]:
977 | """Get files checked from request. Override for custom file handling."""
978 | try:
979 | return request.files_checked or []
980 | except AttributeError:
981 | return []
982 |
983 | def get_current_arguments(self) -> dict[str, Any]:
984 | """Get current arguments. Returns empty dict if not available."""
985 | try:
986 | return self._current_arguments or {}
987 | except AttributeError:
988 | return {}
989 |
990 | def store_initial_issue(self, step_description: str):
991 | """Store initial issue description. Override for custom storage."""
992 | # Default implementation - tools can override to store differently
993 | self.initial_issue = step_description
994 |
995 | def get_initial_request(self, fallback_step: str) -> str:
996 | """Get initial request description. Override for custom retrieval."""
997 | try:
998 | return self.initial_request or fallback_step
999 | except AttributeError:
1000 | return fallback_step
1001 |
1002 | # Default implementations for inheritance hooks
1003 |
1004 | def prepare_work_summary(self) -> str:
1005 | """Prepare work summary. Override for custom implementation."""
1006 | return f"Completed {len(self.consolidated_findings.findings)} work steps"
1007 |
1008 | def get_completion_status(self) -> str:
1009 | """Get completion status. Override for tool-specific status."""
1010 | return "high_confidence_completion"
1011 |
1012 | def get_final_analysis_from_request(self, request):
1013 | """Extract final analysis from request. Override for tool-specific fields."""
1014 | return self.get_request_hypothesis(request)
1015 |
1016 | def get_confidence_level(self, request) -> str:
1017 | """Get confidence level. Override for tool-specific confidence handling."""
1018 | return self.get_request_confidence(request) or "high"
1019 |
1020 | def get_completion_message(self) -> str:
1021 | """Get completion message. Override for tool-specific messaging."""
1022 | return (
1023 | f"{self.get_name().capitalize()} complete with high confidence. Present results "
1024 | "and proceed with implementation without requiring further consultation."
1025 | )
1026 |
1027 | def get_skip_reason(self) -> str:
1028 | """Get reason for skipping expert analysis. Override for tool-specific reasons."""
1029 | return f"{self.get_name()} completed with sufficient confidence"
1030 |
1031 | def get_skip_expert_analysis_status(self) -> str:
1032 | """Get status for skipped expert analysis. Override for tool-specific status."""
1033 | return "skipped_by_tool_design"
1034 |
1035 | def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
1036 | """
1037 | Get the message to show when work is complete.
1038 | Tools can override for custom messaging.
1039 |
1040 | Args:
1041 | expert_analysis_used: True if expert analysis was successfully executed
1042 | """
1043 | base_message = (
1044 | f"{self.get_name().upper()} IS COMPLETE. You MUST now summarize and present ALL key findings, confirmed "
1045 | "hypotheses, and exact recommended solutions. Clearly identify the most likely root cause and "
1046 | "provide concrete, actionable implementation guidance. Highlight affected code paths and display "
1047 | "reasoning that led to this conclusion—make it easy for a developer to understand exactly where "
1048 | "the problem lies."
1049 | )
1050 |
1051 | # Add expert analysis guidance only when expert analysis was actually used
1052 | if expert_analysis_used:
1053 | expert_guidance = self.get_expert_analysis_guidance()
1054 | if expert_guidance:
1055 | return f"{base_message}\n\n{expert_guidance}"
1056 |
1057 | return base_message
1058 |
1059 | def get_expert_analysis_guidance(self) -> str:
1060 | """
1061 | Get additional guidance for handling expert analysis results.
1062 |
1063 | Subclasses can override this to provide specific instructions about how
1064 | to validate and use expert analysis findings. Returns empty string by default.
1065 |
1066 | When expert analysis is called, this guidance will be:
1067 | 1. Appended to the completion next steps message
1068 | 2. Added as "important_considerations" field in the response data
1069 |
1070 | Example implementation:
1071 | ```python
1072 | def get_expert_analysis_guidance(self) -> str:
1073 | return (
1074 | "IMPORTANT: Expert analysis provided above. You MUST validate "
1075 | "the expert findings rather than accepting them blindly. "
1076 | "Cross-reference with your own investigation and ensure "
1077 | "recommendations align with the codebase context."
1078 | )
1079 | ```
1080 |
1081 | Returns:
1082 | Additional guidance text or empty string if no guidance needed
1083 | """
1084 | return ""
1085 |
1086 | def customize_workflow_response(self, response_data: dict, request) -> dict:
1087 | """
1088 | Allow tools to customize the workflow response before returning.
1089 |
1090 | Tools can override this to add tool-specific fields, modify status names,
1091 | customize field mapping, etc. Default implementation returns unchanged.
1092 | """
1093 | # Ensure file context information is preserved in all response paths
1094 | if not response_data.get("file_context"):
1095 | embedded_content = self.get_embedded_file_content()
1096 | reference_note = self.get_file_reference_note()
1097 | processed_files = self.get_actually_processed_files()
1098 |
1099 | # Prioritize embedded content over references for final steps
1100 | if embedded_content:
1101 | response_data["file_context"] = {
1102 | "type": "fully_embedded",
1103 | "files_embedded": len(processed_files),
1104 | "context_optimization": "Full file content embedded for expert analysis",
1105 | }
1106 | elif reference_note:
1107 | response_data["file_context"] = {
1108 | "type": "reference_only",
1109 | "note": reference_note,
1110 | "context_optimization": "Files referenced but not embedded to preserve the context window",
1111 | }
1112 |
1113 | return response_data
1114 |
1115 | def store_conversation_turn(self, continuation_id: str, response_data: dict, request):
1116 | """
1117 | Store the conversation turn. Tools can override for custom memory storage.
1118 | """
1119 | # CRITICAL: Extract clean content for conversation history (exclude internal workflow metadata)
1120 | clean_content = self._extract_clean_workflow_content_for_history(response_data)
1121 |
1122 | # Serialize workflow state for persistence across stateless tool calls
1123 | workflow_state = {"work_history": self.work_history, "initial_request": getattr(self, "initial_request", None)}
1124 |
1125 | add_turn(
1126 | thread_id=continuation_id,
1127 | role="assistant",
1128 | content=clean_content, # Use cleaned content instead of full response_data
1129 | tool_name=self.get_name(),
1130 | files=self.get_request_relevant_files(request),
1131 | images=self.get_request_images(request),
1132 | model_metadata=workflow_state, # Persist the state
1133 | )
1134 |
1135 | def _add_workflow_metadata(self, response_data: dict, arguments: dict[str, Any]) -> None:
1136 | """
1137 | Add metadata (provider_used and model_used) to workflow response.
1138 |
1139 | This ensures workflow tools have the same metadata as regular tools,
1140 | making it consistent across all tool types for tracking which provider
1141 | and model were used for the response.
1142 |
1143 | Args:
1144 | response_data: The response data dictionary to modify
1145 | arguments: The original arguments containing model context
1146 | """
1147 | try:
1148 | # Get model information from arguments (set by server.py)
1149 | resolved_model_name = arguments.get("_resolved_model_name")
1150 | model_context = arguments.get("_model_context")
1151 |
1152 | if resolved_model_name and model_context:
1153 | # Extract provider information from model context
1154 | provider = model_context.provider
1155 | provider_name = provider.get_provider_type().value if provider else "unknown"
1156 |
1157 | # Create metadata dictionary
1158 | metadata = {
1159 | "tool_name": self.get_name(),
1160 | "model_used": resolved_model_name,
1161 | "provider_used": provider_name,
1162 | }
1163 |
1164 | # Preserve existing metadata and add workflow metadata
1165 | if "metadata" not in response_data:
1166 | response_data["metadata"] = {}
1167 | response_data["metadata"].update(metadata)
1168 |
1169 | logger.debug(
1170 | f"[WORKFLOW_METADATA] {self.get_name()}: Added metadata - "
1171 | f"model: {resolved_model_name}, provider: {provider_name}"
1172 | )
1173 | else:
1174 | # Fallback - try to get model info from request
1175 | request = self.get_workflow_request_model()(**arguments)
1176 | model_name = self.get_request_model_name(request)
1177 |
1178 | # Basic metadata without provider info
1179 | metadata = {
1180 | "tool_name": self.get_name(),
1181 | "model_used": model_name,
1182 | "provider_used": "unknown",
1183 | }
1184 |
1185 | # Preserve existing metadata and add workflow metadata
1186 | if "metadata" not in response_data:
1187 | response_data["metadata"] = {}
1188 | response_data["metadata"].update(metadata)
1189 |
1190 | logger.debug(
1191 | f"[WORKFLOW_METADATA] {self.get_name()}: Added fallback metadata - "
1192 | f"model: {model_name}, provider: unknown"
1193 | )
1194 |
1195 | except Exception as e:
1196 | # Don't fail the workflow if metadata addition fails
1197 | logger.warning(f"[WORKFLOW_METADATA] {self.get_name()}: Failed to add metadata: {e}")
1198 | # Still add basic metadata with tool name
1199 | response_data["metadata"] = {"tool_name": self.get_name()}
1200 |
1201 | def _extract_clean_workflow_content_for_history(self, response_data: dict) -> str:
1202 | """
1203 | Extract clean content from workflow response suitable for conversation history.
1204 |
1205 | This method removes internal workflow metadata, continuation offers, and
1206 | status information that should not appear when the conversation is
1207 | reconstructed for expert models or other tools.
1208 |
1209 | Args:
1210 | response_data: The full workflow response data
1211 |
1212 | Returns:
1213 | str: Clean content suitable for conversation history storage
1214 | """
1215 | # Create a clean copy with only essential content for conversation history
1216 | clean_data = {}
1217 |
1218 | # Include core content if present
1219 | if "content" in response_data:
1220 | clean_data["content"] = response_data["content"]
1221 |
1222 | # Include expert analysis if present (but clean it)
1223 | if "expert_analysis" in response_data:
1224 | expert_analysis = response_data["expert_analysis"]
1225 | if isinstance(expert_analysis, dict):
1226 | # Only include the actual analysis content, not metadata
1227 | clean_expert = {}
1228 | if "raw_analysis" in expert_analysis:
1229 | clean_expert["analysis"] = expert_analysis["raw_analysis"]
1230 | elif "content" in expert_analysis:
1231 | clean_expert["analysis"] = expert_analysis["content"]
1232 | if clean_expert:
1233 | clean_data["expert_analysis"] = clean_expert
1234 |
1235 | # Include findings/issues if present (core workflow output)
1236 | if "complete_analysis" in response_data:
1237 | complete_analysis = response_data["complete_analysis"]
1238 | if isinstance(complete_analysis, dict):
1239 | clean_complete = {}
1240 | # Include essential analysis data without internal metadata
1241 | for key in ["findings", "issues_found", "relevant_context", "insights"]:
1242 | if key in complete_analysis:
1243 | clean_complete[key] = complete_analysis[key]
1244 | if clean_complete:
1245 | clean_data["analysis_summary"] = clean_complete
1246 |
1247 | # Include step information for context but remove internal workflow metadata
1248 | if "step_number" in response_data:
1249 | clean_data["step_info"] = {
1250 | "step": response_data.get("step", ""),
1251 | "step_number": response_data.get("step_number", 1),
1252 | "total_steps": response_data.get("total_steps", 1),
1253 | }
1254 |
1255 | # Exclude problematic fields that should never appear in conversation history:
1256 | # - continuation_id (confuses LLMs with old IDs)
1257 | # - status (internal workflow state)
1258 | # - next_step_required (internal control flow)
1259 | # - analysis_status (internal tracking)
1260 | # - file_context (internal optimization info)
1261 | # - required_actions (internal workflow instructions)
1262 |
1263 | return json.dumps(clean_data, indent=2, ensure_ascii=False)
1264 |
1265 | # Core workflow logic methods
1266 |
1267 | async def handle_work_completion(self, response_data: dict, request, arguments: dict) -> dict:
1268 | """
1269 | Handle work completion logic - expert analysis decision and response building.
1270 | """
1271 | response_data[f"{self.get_name()}_complete"] = True
1272 |
1273 | # Check if tool wants to skip expert analysis due to high certainty
1274 | if self.should_skip_expert_analysis(request, self.consolidated_findings):
1275 | # Handle completion without expert analysis
1276 | completion_response = self.handle_completion_without_expert_analysis(request, self.consolidated_findings)
1277 | response_data.update(completion_response)
1278 | elif self.requires_expert_analysis() and self.should_call_expert_analysis(self.consolidated_findings, request):
1279 | # Standard expert analysis path
1280 | response_data["status"] = "calling_expert_analysis"
1281 |
1282 | # Call expert analysis
1283 | expert_analysis = await self._call_expert_analysis(arguments, request)
1284 | response_data["expert_analysis"] = expert_analysis
1285 |
1286 | # Handle special expert analysis statuses
1287 | if isinstance(expert_analysis, dict) and expert_analysis.get("status") in [
1288 | "files_required_to_continue",
1289 | "investigation_paused",
1290 | "refactoring_paused",
1291 | ]:
1292 | # Promote the special status to the main response
1293 | special_status = expert_analysis["status"]
1294 | response_data["status"] = special_status
1295 | response_data["content"] = expert_analysis.get(
1296 | "raw_analysis", json.dumps(expert_analysis, ensure_ascii=False)
1297 | )
1298 | del response_data["expert_analysis"]
1299 |
1300 | # Update next steps for special status
1301 | if special_status == "files_required_to_continue":
1302 | response_data["next_steps"] = "Provide the requested files and continue the analysis."
1303 | else:
1304 | response_data["next_steps"] = expert_analysis.get(
1305 | "next_steps", "Continue based on expert analysis."
1306 | )
1307 | elif isinstance(expert_analysis, dict) and expert_analysis.get("status") == "analysis_error":
1308 | # Expert analysis failed - promote error status
1309 | response_data["status"] = "error"
1310 | response_data["content"] = expert_analysis.get("error", "Expert analysis failed")
1311 | response_data["content_type"] = "text"
1312 | del response_data["expert_analysis"]
1313 | else:
1314 | # Expert analysis was successfully executed - include expert guidance
1315 | response_data["next_steps"] = self.get_completion_next_steps_message(expert_analysis_used=True)
1316 |
1317 | # Add expert analysis guidance as important considerations
1318 | expert_guidance = self.get_expert_analysis_guidance()
1319 | if expert_guidance:
1320 | response_data["important_considerations"] = expert_guidance
1321 |
1322 | # Prepare complete work summary
1323 | work_summary = self._prepare_work_summary()
1324 | response_data[f"complete_{self.get_name()}"] = {
1325 | "initial_request": self.get_initial_request(request.step),
1326 | "steps_taken": len(self.work_history),
1327 | "files_examined": list(self.consolidated_findings.files_checked),
1328 | "relevant_files": list(self.consolidated_findings.relevant_files),
1329 | "relevant_context": list(self.consolidated_findings.relevant_context),
1330 | "issues_found": self.consolidated_findings.issues_found,
1331 | "work_summary": work_summary,
1332 | }
1333 | else:
1334 | # Tool doesn't require expert analysis or local work was sufficient
1335 | if not self.requires_expert_analysis():
1336 | # Tool is self-contained (like planner)
1337 | response_data["status"] = f"{self.get_name()}_complete"
1338 | response_data["next_steps"] = (
1339 | f"{self.get_name().capitalize()} work complete. Present results to the user."
1340 | )
1341 | else:
1342 | # Local work was sufficient for tools that support expert analysis
1343 | response_data["status"] = "local_work_complete"
1344 | response_data["next_steps"] = (
1345 | f"Local {self.get_name()} complete with sufficient confidence. Present findings "
1346 | "and recommendations to the user based on the work results."
1347 | )
1348 |
1349 | return response_data
1350 |
1351 | def handle_work_continuation(self, response_data: dict, request) -> dict:
1352 | """
1353 | Handle work continuation - force pause and provide guidance.
1354 | """
1355 | response_data["status"] = f"pause_for_{self.get_name()}"
1356 | response_data[f"{self.get_name()}_required"] = True
1357 |
1358 | # Get tool-specific required actions
1359 | required_actions = self.get_required_actions(
1360 | request.step_number, self.get_request_confidence(request), request.findings, request.total_steps, request
1361 | )
1362 | response_data["required_actions"] = required_actions
1363 |
1364 | # Generate step guidance
1365 | response_data["next_steps"] = self.get_step_guidance_message(request)
1366 |
1367 | return response_data
1368 |
1369 | def _update_consolidated_findings(self, step_data: dict):
1370 | """Update consolidated findings with new step data"""
1371 | self.consolidated_findings.files_checked.update(step_data.get("files_checked", []))
1372 | self.consolidated_findings.relevant_files.update(step_data.get("relevant_files", []))
1373 | self.consolidated_findings.relevant_context.update(step_data.get("relevant_context", []))
1374 | self.consolidated_findings.findings.append(f"Step {step_data['step_number']}: {step_data['findings']}")
1375 | if step_data.get("hypothesis"):
1376 | self.consolidated_findings.hypotheses.append(
1377 | {
1378 | "step": step_data["step_number"],
1379 | "hypothesis": step_data["hypothesis"],
1380 | "confidence": step_data["confidence"],
1381 | }
1382 | )
1383 | if step_data.get("issues_found"):
1384 | self.consolidated_findings.issues_found.extend(step_data["issues_found"])
1385 | if step_data.get("images"):
1386 | self.consolidated_findings.images.extend(step_data["images"])
1387 | # Update confidence to latest value from this step
1388 | if step_data.get("confidence"):
1389 | self.consolidated_findings.confidence = step_data["confidence"]
1390 |
1391 | def _reprocess_consolidated_findings(self):
1392 | """Reprocess consolidated findings after backtracking"""
1393 | self.consolidated_findings = ConsolidatedFindings()
1394 | for step in self.work_history:
1395 | self._update_consolidated_findings(step)
1396 |
1397 | def _prepare_work_summary(self) -> str:
1398 | """Prepare a comprehensive summary of the work"""
1399 | summary_parts = [
1400 | f"=== {self.get_name().upper()} WORK SUMMARY ===",
1401 | f"Total steps: {len(self.work_history)}",
1402 | f"Files examined: {len(self.consolidated_findings.files_checked)}",
1403 | f"Relevant files identified: {len(self.consolidated_findings.relevant_files)}",
1404 | f"Methods/functions involved: {len(self.consolidated_findings.relevant_context)}",
1405 | f"Issues found: {len(self.consolidated_findings.issues_found)}",
1406 | "",
1407 | "=== WORK PROGRESSION ===",
1408 | ]
1409 |
1410 | for finding in self.consolidated_findings.findings:
1411 | summary_parts.append(finding)
1412 |
1413 | if self.consolidated_findings.hypotheses:
1414 | summary_parts.extend(
1415 | [
1416 | "",
1417 | "=== HYPOTHESIS EVOLUTION ===",
1418 | ]
1419 | )
1420 | for hyp in self.consolidated_findings.hypotheses:
1421 | summary_parts.append(f"Step {hyp['step']} ({hyp['confidence']} confidence): {hyp['hypothesis']}")
1422 |
1423 | if self.consolidated_findings.issues_found:
1424 | summary_parts.extend(
1425 | [
1426 | "",
1427 | "=== ISSUES IDENTIFIED ===",
1428 | ]
1429 | )
1430 | for issue in self.consolidated_findings.issues_found:
1431 | severity = issue.get("severity", "unknown")
1432 | description = issue.get("description", "No description")
1433 | summary_parts.append(f"[{severity.upper()}] {description}")
1434 |
1435 | return "\n".join(summary_parts)
1436 |
1437 | async def _call_expert_analysis(self, arguments: dict, request) -> dict:
1438 | """Call external model for expert analysis"""
1439 | try:
1440 | # Model context should be resolved from early validation, but handle fallback for tests
1441 | if not self._model_context:
1442 | # Try to resolve model context for expert analysis (deferred from early validation)
1443 | try:
1444 | model_name, model_context = self._resolve_model_context(arguments, request)
1445 | self._model_context = model_context
1446 | self._current_model_name = model_name
1447 | except Exception as e:
1448 | logger.error(f"Failed to resolve model context for expert analysis: {e}")
1449 | # Use request model as fallback (preserves existing test behavior)
1450 | model_name = self.get_request_model_name(request)
1451 | from utils.model_context import ModelContext
1452 |
1453 | model_context = ModelContext(model_name)
1454 | self._model_context = model_context
1455 | self._current_model_name = model_name
1456 | else:
1457 | model_name = self._current_model_name
1458 |
1459 | provider = self._model_context.provider
1460 |
1461 | # Prepare expert analysis context
1462 | expert_context = self.prepare_expert_analysis_context(self.consolidated_findings)
1463 |
1464 | # Check if tool wants to include files in prompt
1465 | if self.should_include_files_in_expert_prompt():
1466 | file_content = self._prepare_files_for_expert_analysis()
1467 | if file_content:
1468 | expert_context = self._add_files_to_expert_context(expert_context, file_content)
1469 |
1470 | # Get system prompt for this tool with localization support
1471 | base_system_prompt = self.get_system_prompt()
1472 | capability_augmented_prompt = self._augment_system_prompt_with_capabilities(
1473 | base_system_prompt, getattr(self._model_context, "capabilities", None)
1474 | )
1475 | language_instruction = self.get_language_instruction()
1476 | system_prompt = language_instruction + capability_augmented_prompt
1477 |
1478 | # Check if tool wants system prompt embedded in main prompt
1479 | if self.should_embed_system_prompt():
1480 | prompt = f"{system_prompt}\n\n{expert_context}\n\n{self.get_expert_analysis_instruction()}"
1481 | system_prompt = "" # Clear it since we embedded it
1482 | else:
1483 | prompt = expert_context
1484 |
1485 | # Validate temperature against model constraints
1486 | validated_temperature, temp_warnings = self.get_validated_temperature(request, self._model_context)
1487 |
1488 | # Log any temperature corrections
1489 | for warning in temp_warnings:
1490 | logger.warning(warning)
1491 |
1492 | # Generate AI response - use request parameters if available
1493 | model_response = provider.generate_content(
1494 | prompt=prompt,
1495 | model_name=model_name,
1496 | system_prompt=system_prompt,
1497 | temperature=validated_temperature,
1498 | thinking_mode=self.get_request_thinking_mode(request),
1499 | images=list(set(self.consolidated_findings.images)) if self.consolidated_findings.images else None,
1500 | )
1501 |
1502 | if model_response.content:
1503 | content = model_response.content.strip()
1504 |
1505 | # Try to extract JSON from markdown code blocks if present
1506 | if "```json" in content or "```" in content:
1507 | json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", content, re.DOTALL)
1508 | if json_match:
1509 | content = json_match.group(1).strip()
1510 |
1511 | try:
1512 | # Try to parse as JSON
1513 | analysis_result = json.loads(content)
1514 | return analysis_result
1515 | except json.JSONDecodeError as e:
1516 | # Log the parse error with more details but don't fail
1517 | logger.info(
1518 | f"[{self.get_name()}] Expert analysis returned non-JSON response (this is OK for smaller models). "
1519 | f"Parse error: {str(e)}. Response length: {len(model_response.content)} chars."
1520 | )
1521 | logger.debug(f"First 500 chars of response: {model_response.content[:500]!r}")
1522 |
1523 | # Still return the analysis as plain text - this is valid
1524 | return {
1525 | "status": "analysis_complete",
1526 | "raw_analysis": model_response.content,
1527 | "format": "text", # Indicate it's plain text, not an error
1528 | "note": "Analysis provided in plain text format",
1529 | }
1530 | else:
1531 | return {"error": "No response from model", "status": "empty_response"}
1532 |
1533 | except Exception as e:
1534 | logger.error(f"Error calling expert analysis: {e}", exc_info=True)
1535 | return {"error": str(e), "status": "analysis_error"}
1536 |
1537 | def _process_work_step(self, step_data: dict):
1538 | """
1539 | Process a single work step and update internal state.
1540 |
1541 | This method is useful for testing and manual step processing.
1542 | It adds the step to work history and updates consolidated findings.
1543 |
1544 | Args:
1545 | step_data: Dictionary containing step information including:
1546 | step, step_number, findings, files_checked, etc.
1547 | """
1548 | # Store in history
1549 | self.work_history.append(step_data)
1550 |
1551 | # Update consolidated findings
1552 | self._update_consolidated_findings(step_data)
1553 |
1554 | # Common execute method for workflow-based tools
1555 |
1556 | async def execute(self, arguments: dict[str, Any]) -> list[TextContent]:
1557 | """
1558 | Common execute logic for workflow-based tools.
1559 |
1560 | This method provides common validation and delegates to execute_workflow.
1561 | Tools that need custom execute logic can override this method.
1562 | """
1563 | try:
1564 | # Common validation
1565 | if not arguments:
1566 | error_data = {"status": "error", "content": "No arguments provided"}
1567 | # Add basic metadata even for validation errors
1568 | error_data["metadata"] = {"tool_name": self.get_name()}
1569 | raise ToolExecutionError(json.dumps(error_data, ensure_ascii=False))
1570 |
1571 | # Delegate to execute_workflow
1572 | return await self.execute_workflow(arguments)
1573 |
1574 | except ToolExecutionError:
1575 | raise
1576 | except Exception as e:
1577 | logger.error(f"Error in {self.get_name()} tool execution: {e}", exc_info=True)
1578 | error_data = {
1579 | "status": "error",
1580 | "content": f"Error in {self.get_name()}: {str(e)}",
1581 | } # Add metadata to error responses
1582 | self._add_workflow_metadata(error_data, arguments)
1583 | raise ToolExecutionError(json.dumps(error_data, ensure_ascii=False)) from e
1584 |
1585 | # Default implementations for methods that workflow-based tools typically don't need
1586 |
1587 | async def prepare_prompt(self, request) -> str:
1588 | """
1589 | Base implementation for workflow tools - compatible with BaseTool signature.
1590 |
1591 | Workflow tools typically don't need to return a prompt since they handle
1592 | their own prompt preparation internally through the workflow execution.
1593 |
1594 | Args:
1595 | request: The validated request object
1596 |
1597 | Returns:
1598 | Empty string since workflow tools manage prompts internally
1599 | """
1600 | # Workflow tools handle their prompts internally during workflow execution
1601 | return ""
1602 |
1603 | def format_response(self, response: str, request, model_info=None):
1604 | """
1605 | Workflow tools handle their own response formatting.
1606 | The BaseWorkflowMixin formats responses internally.
1607 | """
1608 | return response
1609 |
```