This is page 17 of 19. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ └── fix-github-issue.md
│ └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── documentation.yml
│ │ ├── feature_request.yml
│ │ └── tool_addition.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-pr.yml
│ ├── docker-release.yml
│ ├── semantic-pr.yml
│ ├── semantic-release.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ ├── constants.py
│ ├── models.py
│ ├── parsers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude.py
│ │ ├── codex.py
│ │ └── gemini.py
│ └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│ ├── __init__.py
│ ├── azure_models.json
│ ├── cli_clients
│ │ ├── claude.json
│ │ ├── codex.json
│ │ └── gemini.json
│ ├── custom_models.json
│ ├── dial_models.json
│ ├── gemini_models.json
│ ├── openai_models.json
│ ├── openrouter_models.json
│ └── xai_models.json
├── config.py
├── docker
│ ├── README.md
│ └── scripts
│ ├── build.ps1
│ ├── build.sh
│ ├── deploy.ps1
│ ├── deploy.sh
│ └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── adding_providers.md
│ ├── adding_tools.md
│ ├── advanced-usage.md
│ ├── ai_banter.md
│ ├── ai-collaboration.md
│ ├── azure_openai.md
│ ├── configuration.md
│ ├── context-revival.md
│ ├── contributions.md
│ ├── custom_models.md
│ ├── docker-deployment.md
│ ├── gemini-setup.md
│ ├── getting-started.md
│ ├── index.md
│ ├── locale-configuration.md
│ ├── logging.md
│ ├── model_ranking.md
│ ├── testing.md
│ ├── tools
│ │ ├── analyze.md
│ │ ├── apilookup.md
│ │ ├── challenge.md
│ │ ├── chat.md
│ │ ├── clink.md
│ │ ├── codereview.md
│ │ ├── consensus.md
│ │ ├── debug.md
│ │ ├── docgen.md
│ │ ├── listmodels.md
│ │ ├── planner.md
│ │ ├── precommit.md
│ │ ├── refactor.md
│ │ ├── secaudit.md
│ │ ├── testgen.md
│ │ ├── thinkdeep.md
│ │ ├── tracer.md
│ │ └── version.md
│ ├── troubleshooting.md
│ ├── vcr-testing.md
│ └── wsl-setup.md
├── examples
│ ├── claude_config_macos.json
│ └── claude_config_wsl.json
├── LICENSE
├── providers
│ ├── __init__.py
│ ├── azure_openai.py
│ ├── base.py
│ ├── custom.py
│ ├── dial.py
│ ├── gemini.py
│ ├── openai_compatible.py
│ ├── openai.py
│ ├── openrouter.py
│ ├── registries
│ │ ├── __init__.py
│ │ ├── azure.py
│ │ ├── base.py
│ │ ├── custom.py
│ │ ├── dial.py
│ │ ├── gemini.py
│ │ ├── openai.py
│ │ ├── openrouter.py
│ │ └── xai.py
│ ├── registry_provider_mixin.py
│ ├── registry.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── model_capabilities.py
│ │ ├── model_response.py
│ │ ├── provider_type.py
│ │ └── temperature.py
│ └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│ └── sync_version.py
├── server.py
├── simulator_tests
│ ├── __init__.py
│ ├── base_test.py
│ ├── conversation_base_test.py
│ ├── log_utils.py
│ ├── test_analyze_validation.py
│ ├── test_basic_conversation.py
│ ├── test_chat_simple_validation.py
│ ├── test_codereview_validation.py
│ ├── test_consensus_conversation.py
│ ├── test_consensus_three_models.py
│ ├── test_consensus_workflow_accurate.py
│ ├── test_content_validation.py
│ ├── test_conversation_chain_validation.py
│ ├── test_cross_tool_comprehensive.py
│ ├── test_cross_tool_continuation.py
│ ├── test_debug_certain_confidence.py
│ ├── test_debug_validation.py
│ ├── test_line_number_validation.py
│ ├── test_logs_validation.py
│ ├── test_model_thinking_config.py
│ ├── test_o3_model_selection.py
│ ├── test_o3_pro_expensive.py
│ ├── test_ollama_custom_url.py
│ ├── test_openrouter_fallback.py
│ ├── test_openrouter_models.py
│ ├── test_per_tool_deduplication.py
│ ├── test_planner_continuation_history.py
│ ├── test_planner_validation_old.py
│ ├── test_planner_validation.py
│ ├── test_precommitworkflow_validation.py
│ ├── test_prompt_size_limit_bug.py
│ ├── test_refactor_validation.py
│ ├── test_secaudit_validation.py
│ ├── test_testgen_validation.py
│ ├── test_thinkdeep_validation.py
│ ├── test_token_allocation_validation.py
│ ├── test_vision_capability.py
│ └── test_xai_models.py
├── systemprompts
│ ├── __init__.py
│ ├── analyze_prompt.py
│ ├── chat_prompt.py
│ ├── clink
│ │ ├── codex_codereviewer.txt
│ │ ├── default_codereviewer.txt
│ │ ├── default_planner.txt
│ │ └── default.txt
│ ├── codereview_prompt.py
│ ├── consensus_prompt.py
│ ├── debug_prompt.py
│ ├── docgen_prompt.py
│ ├── generate_code_prompt.py
│ ├── planner_prompt.py
│ ├── precommit_prompt.py
│ ├── refactor_prompt.py
│ ├── secaudit_prompt.py
│ ├── testgen_prompt.py
│ ├── thinkdeep_prompt.py
│ └── tracer_prompt.py
├── tests
│ ├── __init__.py
│ ├── CASSETTE_MAINTENANCE.md
│ ├── conftest.py
│ ├── gemini_cassettes
│ │ ├── chat_codegen
│ │ │ └── gemini25_pro_calculator
│ │ │ └── mldev.json
│ │ ├── chat_cross
│ │ │ └── step1_gemini25_flash_number
│ │ │ └── mldev.json
│ │ └── consensus
│ │ └── step2_gemini25_flash_against
│ │ └── mldev.json
│ ├── http_transport_recorder.py
│ ├── mock_helpers.py
│ ├── openai_cassettes
│ │ ├── chat_cross_step2_gpt5_reminder.json
│ │ ├── chat_gpt5_continuation.json
│ │ ├── chat_gpt5_moon_distance.json
│ │ ├── consensus_step1_gpt5_for.json
│ │ └── o3_pro_basic_math.json
│ ├── pii_sanitizer.py
│ ├── sanitize_cassettes.py
│ ├── test_alias_target_restrictions.py
│ ├── test_auto_mode_comprehensive.py
│ ├── test_auto_mode_custom_provider_only.py
│ ├── test_auto_mode_model_listing.py
│ ├── test_auto_mode_provider_selection.py
│ ├── test_auto_mode.py
│ ├── test_auto_model_planner_fix.py
│ ├── test_azure_openai_provider.py
│ ├── test_buggy_behavior_prevention.py
│ ├── test_cassette_semantic_matching.py
│ ├── test_challenge.py
│ ├── test_chat_codegen_integration.py
│ ├── test_chat_cross_model_continuation.py
│ ├── test_chat_openai_integration.py
│ ├── test_chat_simple.py
│ ├── test_clink_claude_agent.py
│ ├── test_clink_claude_parser.py
│ ├── test_clink_codex_agent.py
│ ├── test_clink_gemini_agent.py
│ ├── test_clink_gemini_parser.py
│ ├── test_clink_integration.py
│ ├── test_clink_parsers.py
│ ├── test_clink_tool.py
│ ├── test_collaboration.py
│ ├── test_config.py
│ ├── test_consensus_integration.py
│ ├── test_consensus_schema.py
│ ├── test_consensus.py
│ ├── test_conversation_continuation_integration.py
│ ├── test_conversation_field_mapping.py
│ ├── test_conversation_file_features.py
│ ├── test_conversation_memory.py
│ ├── test_conversation_missing_files.py
│ ├── test_custom_openai_temperature_fix.py
│ ├── test_custom_provider.py
│ ├── test_debug.py
│ ├── test_deploy_scripts.py
│ ├── test_dial_provider.py
│ ├── test_directory_expansion_tracking.py
│ ├── test_disabled_tools.py
│ ├── test_docker_claude_desktop_integration.py
│ ├── test_docker_config_complete.py
│ ├── test_docker_healthcheck.py
│ ├── test_docker_implementation.py
│ ├── test_docker_mcp_validation.py
│ ├── test_docker_security.py
│ ├── test_docker_volume_persistence.py
│ ├── test_file_protection.py
│ ├── test_gemini_token_usage.py
│ ├── test_image_support_integration.py
│ ├── test_image_validation.py
│ ├── test_integration_utf8.py
│ ├── test_intelligent_fallback.py
│ ├── test_issue_245_simple.py
│ ├── test_large_prompt_handling.py
│ ├── test_line_numbers_integration.py
│ ├── test_listmodels_restrictions.py
│ ├── test_listmodels.py
│ ├── test_mcp_error_handling.py
│ ├── test_model_enumeration.py
│ ├── test_model_metadata_continuation.py
│ ├── test_model_resolution_bug.py
│ ├── test_model_restrictions.py
│ ├── test_o3_pro_output_text_fix.py
│ ├── test_o3_temperature_fix_simple.py
│ ├── test_openai_compatible_token_usage.py
│ ├── test_openai_provider.py
│ ├── test_openrouter_provider.py
│ ├── test_openrouter_registry.py
│ ├── test_parse_model_option.py
│ ├── test_per_tool_model_defaults.py
│ ├── test_pii_sanitizer.py
│ ├── test_pip_detection_fix.py
│ ├── test_planner.py
│ ├── test_precommit_workflow.py
│ ├── test_prompt_regression.py
│ ├── test_prompt_size_limit_bug_fix.py
│ ├── test_provider_retry_logic.py
│ ├── test_provider_routing_bugs.py
│ ├── test_provider_utf8.py
│ ├── test_providers.py
│ ├── test_rate_limit_patterns.py
│ ├── test_refactor.py
│ ├── test_secaudit.py
│ ├── test_server.py
│ ├── test_supported_models_aliases.py
│ ├── test_thinking_modes.py
│ ├── test_tools.py
│ ├── test_tracer.py
│ ├── test_utf8_localization.py
│ ├── test_utils.py
│ ├── test_uvx_resource_packaging.py
│ ├── test_uvx_support.py
│ ├── test_workflow_file_embedding.py
│ ├── test_workflow_metadata.py
│ ├── test_workflow_prompt_size_validation_simple.py
│ ├── test_workflow_utf8.py
│ ├── test_xai_provider.py
│ ├── transport_helpers.py
│ └── triangle.png
├── tools
│ ├── __init__.py
│ ├── analyze.py
│ ├── apilookup.py
│ ├── challenge.py
│ ├── chat.py
│ ├── clink.py
│ ├── codereview.py
│ ├── consensus.py
│ ├── debug.py
│ ├── docgen.py
│ ├── listmodels.py
│ ├── models.py
│ ├── planner.py
│ ├── precommit.py
│ ├── refactor.py
│ ├── secaudit.py
│ ├── shared
│ │ ├── __init__.py
│ │ ├── base_models.py
│ │ ├── base_tool.py
│ │ ├── exceptions.py
│ │ └── schema_builders.py
│ ├── simple
│ │ ├── __init__.py
│ │ └── base.py
│ ├── testgen.py
│ ├── thinkdeep.py
│ ├── tracer.py
│ ├── version.py
│ └── workflow
│ ├── __init__.py
│ ├── base.py
│ ├── schema_builders.py
│ └── workflow_mixin.py
├── utils
│ ├── __init__.py
│ ├── client_info.py
│ ├── conversation_memory.py
│ ├── env.py
│ ├── file_types.py
│ ├── file_utils.py
│ ├── image_utils.py
│ ├── model_context.py
│ ├── model_restrictions.py
│ ├── security_config.py
│ ├── storage_backend.py
│ └── token_utils.py
└── zen-mcp-server
```
# Files
--------------------------------------------------------------------------------
/utils/conversation_memory.py:
--------------------------------------------------------------------------------
```python
"""
Conversation Memory for AI-to-AI Multi-turn Discussions
This module provides conversation persistence and context reconstruction for
stateless MCP (Model Context Protocol) environments. It enables multi-turn
conversations between the agent and downstream models by storing conversation
state in memory across independent request cycles.
CRITICAL ARCHITECTURAL REQUIREMENT:
This conversation memory system is designed for PERSISTENT MCP SERVER PROCESSES.
It uses in-memory storage that persists only within a single Python process.
⚠️ IMPORTANT: This system will NOT work correctly if MCP tool calls are made
as separate subprocess invocations (each subprocess starts with empty memory).
WORKING SCENARIO: Claude Desktop with persistent MCP server process
FAILING SCENARIO: Simulator tests calling server.py as individual subprocesses
Root cause of test failures: Each subprocess call loses the conversation
state from previous calls because memory is process-specific, not shared
across subprocess boundaries.
ARCHITECTURE OVERVIEW:
The MCP protocol is inherently stateless - each tool request is independent
with no memory of previous interactions. This module bridges that gap by:
1. Creating persistent conversation threads with unique UUIDs
2. Storing complete conversation context (turns, files, metadata) in memory
3. Reconstructing conversation history when tools are called with continuation_id
4. Supporting cross-tool continuation - seamlessly switch between different tools
while maintaining full conversation context and file references
CROSS-TOOL CONTINUATION:
A conversation started with one tool (e.g., 'analyze') can be continued with
any other tool (e.g., 'codereview', 'debug', 'chat') using the same continuation_id.
The second tool will have access to:
- All previous conversation turns and responses
- File context from previous tools (preserved in conversation history)
- Original thread metadata and timing information
- Accumulated knowledge from the entire conversation
Key Features:
- UUID-based conversation thread identification with security validation
- Turn-by-turn conversation history storage with tool attribution
- Cross-tool continuation support - switch tools while preserving context
- File context preservation - files shared in earlier turns remain accessible
- NEWEST-FIRST FILE PRIORITIZATION - when the same file appears in multiple turns,
references from newer turns take precedence over older ones. This ensures the
most recent file context is preserved when token limits require exclusions.
- Automatic turn limiting (20 turns max) to prevent runaway conversations
- Context reconstruction for stateless request continuity
- In-memory persistence with automatic expiration (3 hour TTL)
- Thread-safe operations for concurrent access
- Graceful degradation when storage is unavailable
DUAL PRIORITIZATION STRATEGY (Files & Conversations):
The conversation memory system implements sophisticated prioritization for both files and
conversation turns, using a consistent "newest-first" approach during collection but
presenting information in the optimal format for LLM consumption:
FILE PRIORITIZATION (Newest-First Throughout):
1. When collecting files across conversation turns, the system walks BACKWARDS through
turns (newest to oldest) and builds a unique file list
2. If the same file path appears in multiple turns, only the reference from the
NEWEST turn is kept in the final list
3. This "newest-first" ordering is preserved throughout the entire pipeline:
- get_conversation_file_list() establishes the order
- build_conversation_history() maintains it during token budgeting
- When token limits are hit, OLDER files are excluded first
4. This strategy works across conversation chains - files from newer turns in ANY
thread take precedence over files from older turns in ANY thread
CONVERSATION TURN PRIORITIZATION (Newest-First Collection, Chronological Presentation):
1. COLLECTION PHASE: Processes turns newest-to-oldest to prioritize recent context
- When token budget is tight, OLDER turns are excluded first
- Ensures most contextually relevant recent exchanges are preserved
2. PRESENTATION PHASE: Reverses collected turns to chronological order (oldest-first)
- LLM sees natural conversation flow: "Turn 1 → Turn 2 → Turn 3..."
- Maintains proper sequential understanding while preserving recency prioritization
This dual approach ensures optimal context preservation (newest-first) with natural
conversation flow (chronological) for maximum LLM comprehension and relevance.
USAGE EXAMPLE:
1. Tool A creates thread: create_thread("analyze", request_data) → returns UUID
2. Tool A adds response: add_turn(UUID, "assistant", response, files=[...], tool_name="analyze")
3. Tool B continues thread: get_thread(UUID) → retrieves full context
4. Tool B sees conversation history via build_conversation_history()
5. Tool B adds its response: add_turn(UUID, "assistant", response, tool_name="codereview")
DUAL STRATEGY EXAMPLE:
Conversation has 5 turns, token budget allows only 3 turns:
Collection Phase (Newest-First Priority):
- Evaluates: Turn 5 → Turn 4 → Turn 3 → Turn 2 → Turn 1
- Includes: Turn 5, Turn 4, Turn 3 (newest 3 fit in budget)
- Excludes: Turn 2, Turn 1 (oldest, dropped due to token limits)
Presentation Phase (Chronological Order):
- LLM sees: "--- Turn 3 (Agent) ---", "--- Turn 4 (Model) ---", "--- Turn 5 (Agent) ---"
- Natural conversation flow maintained despite prioritizing recent context
This enables true AI-to-AI collaboration across the entire tool ecosystem with optimal
context preservation and natural conversation understanding.
"""
import logging
import os
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
from pydantic import BaseModel
from utils.env import get_env
logger = logging.getLogger(__name__)
# Configuration constants
# Get max conversation turns from environment, default to 20 turns (10 exchanges)
try:
max_turns_raw = (get_env("MAX_CONVERSATION_TURNS", "50") or "50").strip()
MAX_CONVERSATION_TURNS = int(max_turns_raw)
if MAX_CONVERSATION_TURNS <= 0:
logger.warning(f"Invalid MAX_CONVERSATION_TURNS value ({MAX_CONVERSATION_TURNS}), using default of 50 turns")
MAX_CONVERSATION_TURNS = 50
except ValueError:
logger.warning(
f"Invalid MAX_CONVERSATION_TURNS value ('{get_env('MAX_CONVERSATION_TURNS')}'), using default of 50 turns"
)
MAX_CONVERSATION_TURNS = 50
# Get conversation timeout from environment (in hours), default to 3 hours
try:
timeout_raw = (get_env("CONVERSATION_TIMEOUT_HOURS", "3") or "3").strip()
CONVERSATION_TIMEOUT_HOURS = int(timeout_raw)
if CONVERSATION_TIMEOUT_HOURS <= 0:
logger.warning(
f"Invalid CONVERSATION_TIMEOUT_HOURS value ({CONVERSATION_TIMEOUT_HOURS}), using default of 3 hours"
)
CONVERSATION_TIMEOUT_HOURS = 3
except ValueError:
logger.warning(
f"Invalid CONVERSATION_TIMEOUT_HOURS value ('{get_env('CONVERSATION_TIMEOUT_HOURS')}'), using default of 3 hours"
)
CONVERSATION_TIMEOUT_HOURS = 3
CONVERSATION_TIMEOUT_SECONDS = CONVERSATION_TIMEOUT_HOURS * 3600
class ConversationTurn(BaseModel):
"""
Single turn in a conversation
Represents one exchange in the AI-to-AI conversation, tracking both
the content and metadata needed for cross-tool continuation.
Attributes:
role: "user" (Agent request) or "assistant" (model response)
content: The actual message content/response
timestamp: ISO timestamp when this turn was created
files: List of file paths referenced in this specific turn
images: List of image paths referenced in this specific turn
tool_name: Which tool generated this turn (for cross-tool tracking)
model_provider: Provider used (e.g., "google", "openai")
model_name: Specific model used (e.g., "gemini-2.5-flash", "o3-mini")
model_metadata: Additional model-specific metadata (e.g., thinking mode, token usage)
"""
role: str # "user" or "assistant"
content: str
timestamp: str
files: Optional[list[str]] = None # Files referenced in this turn
images: Optional[list[str]] = None # Images referenced in this turn
tool_name: Optional[str] = None # Tool used for this turn
model_provider: Optional[str] = None # Model provider (google, openai, etc)
model_name: Optional[str] = None # Specific model used
model_metadata: Optional[dict[str, Any]] = None # Additional model info
class ThreadContext(BaseModel):
"""
Complete conversation context for a thread
Contains all information needed to reconstruct a conversation state
across different tools and request cycles. This is the core data
structure that enables cross-tool continuation.
Attributes:
thread_id: UUID identifying this conversation thread
parent_thread_id: UUID of parent thread (for conversation chains)
created_at: ISO timestamp when thread was created
last_updated_at: ISO timestamp of last modification
tool_name: Name of the tool that initiated this thread
turns: List of all conversation turns in chronological order
initial_context: Original request data that started the conversation
"""
thread_id: str
parent_thread_id: Optional[str] = None # Parent thread for conversation chains
created_at: str
last_updated_at: str
tool_name: str # Tool that created this thread (preserved for attribution)
turns: list[ConversationTurn]
initial_context: dict[str, Any] # Original request parameters
def get_storage():
"""
Get in-memory storage backend for conversation persistence.
Returns:
InMemoryStorage: Thread-safe in-memory storage backend
"""
from .storage_backend import get_storage_backend
return get_storage_backend()
def create_thread(tool_name: str, initial_request: dict[str, Any], parent_thread_id: Optional[str] = None) -> str:
"""
Create new conversation thread and return thread ID
Initializes a new conversation thread for AI-to-AI discussions.
This is called when a tool wants to enable follow-up conversations
or when Claude explicitly starts a multi-turn interaction.
Args:
tool_name: Name of the tool creating this thread (e.g., "analyze", "chat")
initial_request: Original request parameters (will be filtered for serialization)
parent_thread_id: Optional parent thread ID for conversation chains
Returns:
str: UUID thread identifier that can be used for continuation
Note:
- Thread expires after the configured timeout (default: 3 hours)
- Non-serializable parameters are filtered out automatically
- Thread can be continued by any tool using the returned UUID
- Parent thread creates a chain for conversation history traversal
"""
thread_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
# Filter out non-serializable parameters to avoid JSON encoding issues
filtered_context = {
k: v
for k, v in initial_request.items()
if k not in ["temperature", "thinking_mode", "model", "continuation_id"]
}
context = ThreadContext(
thread_id=thread_id,
parent_thread_id=parent_thread_id, # Link to parent for conversation chains
created_at=now,
last_updated_at=now,
tool_name=tool_name, # Track which tool initiated this conversation
turns=[], # Empty initially, turns added via add_turn()
initial_context=filtered_context,
)
# Store in memory with configurable TTL to prevent indefinite accumulation
storage = get_storage()
key = f"thread:{thread_id}"
storage.setex(key, CONVERSATION_TIMEOUT_SECONDS, context.model_dump_json())
logger.debug(f"[THREAD] Created new thread {thread_id} with parent {parent_thread_id}")
return thread_id
def get_thread(thread_id: str) -> Optional[ThreadContext]:
"""
Retrieve thread context from in-memory storage
Fetches complete conversation context for cross-tool continuation.
This is the core function that enables tools to access conversation
history from previous interactions.
Args:
thread_id: UUID of the conversation thread
Returns:
ThreadContext: Complete conversation context if found
None: If thread doesn't exist, expired, or invalid UUID
Security:
- Validates UUID format to prevent injection attacks
- Handles storage connection failures gracefully
- No error information leakage on failure
"""
if not thread_id or not _is_valid_uuid(thread_id):
return None
try:
storage = get_storage()
key = f"thread:{thread_id}"
data = storage.get(key)
if data:
return ThreadContext.model_validate_json(data)
return None
except Exception:
# Silently handle errors to avoid exposing storage details
return None
def add_turn(
thread_id: str,
role: str,
content: str,
files: Optional[list[str]] = None,
images: Optional[list[str]] = None,
tool_name: Optional[str] = None,
model_provider: Optional[str] = None,
model_name: Optional[str] = None,
model_metadata: Optional[dict[str, Any]] = None,
) -> bool:
"""
Add turn to existing thread with atomic file ordering.
Appends a new conversation turn to an existing thread. This is the core
function for building conversation history and enabling cross-tool
continuation. Each turn preserves the tool and model that generated it.
Args:
thread_id: UUID of the conversation thread
role: "user" (Agent request) or "assistant" (model response)
content: The actual message/response content
files: Optional list of files referenced in this turn
images: Optional list of images referenced in this turn
tool_name: Name of the tool adding this turn (for attribution)
model_provider: Provider used (e.g., "google", "openai")
model_name: Specific model used (e.g., "gemini-2.5-flash", "o3-mini")
model_metadata: Additional model info (e.g., thinking mode, token usage)
Returns:
bool: True if turn was successfully added, False otherwise
Failure cases:
- Thread doesn't exist or expired
- Maximum turn limit reached
- Storage connection failure
Note:
- Refreshes thread TTL to configured timeout on successful update
- Turn limits prevent runaway conversations
- File references are preserved for cross-tool access with atomic ordering
- Image references are preserved for cross-tool visual context
- Model information enables cross-provider conversations
"""
logger.debug(f"[FLOW] Adding {role} turn to {thread_id} ({tool_name})")
context = get_thread(thread_id)
if not context:
logger.debug(f"[FLOW] Thread {thread_id} not found for turn addition")
return False
# Check turn limit to prevent runaway conversations
if len(context.turns) >= MAX_CONVERSATION_TURNS:
logger.debug(f"[FLOW] Thread {thread_id} at max turns ({MAX_CONVERSATION_TURNS})")
return False
# Create new turn with complete metadata
turn = ConversationTurn(
role=role,
content=content,
timestamp=datetime.now(timezone.utc).isoformat(),
files=files, # Preserved for cross-tool file context
images=images, # Preserved for cross-tool visual context
tool_name=tool_name, # Track which tool generated this turn
model_provider=model_provider, # Track model provider
model_name=model_name, # Track specific model
model_metadata=model_metadata, # Additional model info
)
context.turns.append(turn)
context.last_updated_at = datetime.now(timezone.utc).isoformat()
# Save back to storage and refresh TTL
try:
storage = get_storage()
key = f"thread:{thread_id}"
storage.setex(key, CONVERSATION_TIMEOUT_SECONDS, context.model_dump_json()) # Refresh TTL to configured timeout
return True
except Exception as e:
logger.debug(f"[FLOW] Failed to save turn to storage: {type(e).__name__}")
return False
def get_thread_chain(thread_id: str, max_depth: int = 20) -> list[ThreadContext]:
"""
Traverse the parent chain to get all threads in conversation sequence.
Retrieves the complete conversation chain by following parent_thread_id
links. Returns threads in chronological order (oldest first).
Args:
thread_id: Starting thread ID
max_depth: Maximum chain depth to prevent infinite loops
Returns:
list[ThreadContext]: All threads in chain, oldest first
"""
chain = []
current_id = thread_id
seen_ids = set()
# Build chain from current to oldest
while current_id and len(chain) < max_depth:
# Prevent circular references
if current_id in seen_ids:
logger.warning(f"[THREAD] Circular reference detected in thread chain at {current_id}")
break
seen_ids.add(current_id)
context = get_thread(current_id)
if not context:
logger.debug(f"[THREAD] Thread {current_id} not found in chain traversal")
break
chain.append(context)
current_id = context.parent_thread_id
# Reverse to get chronological order (oldest first)
chain.reverse()
logger.debug(f"[THREAD] Retrieved chain of {len(chain)} threads for {thread_id}")
return chain
def get_conversation_file_list(context: ThreadContext) -> list[str]:
"""
Extract all unique files from conversation turns with newest-first prioritization.
This function implements the core file prioritization logic used throughout the
conversation memory system. It walks backwards through conversation turns
(from newest to oldest) and collects unique file references, ensuring that
when the same file appears in multiple turns, the reference from the NEWEST
turn takes precedence.
PRIORITIZATION ALGORITHM:
1. Iterate through turns in REVERSE order (index len-1 down to 0)
2. For each turn, process files in the order they appear in turn.files
3. Add file to result list only if not already seen (newest reference wins)
4. Skip duplicate files that were already added from newer turns
This ensures that:
- Files from newer conversation turns appear first in the result
- When the same file is referenced multiple times, only the newest reference is kept
- The order reflects the most recent conversation context
Example:
Turn 1: files = ["main.py", "utils.py"]
Turn 2: files = ["test.py"]
Turn 3: files = ["main.py", "config.py"] # main.py appears again
Result: ["main.py", "config.py", "test.py", "utils.py"]
(main.py from Turn 3 takes precedence over Turn 1)
Args:
context: ThreadContext containing all conversation turns to process
Returns:
list[str]: Unique file paths ordered by newest reference first.
Empty list if no turns exist or no files are referenced.
Performance:
- Time Complexity: O(n*m) where n=turns, m=avg files per turn
- Space Complexity: O(f) where f=total unique files
- Uses set for O(1) duplicate detection
"""
if not context.turns:
logger.debug("[FILES] No turns found, returning empty file list")
return []
# Collect files by walking backwards (newest to oldest turns)
seen_files = set()
file_list = []
logger.debug(f"[FILES] Collecting files from {len(context.turns)} turns (newest first)")
# Process turns in reverse order (newest first) - this is the CORE of newest-first prioritization
# By iterating from len-1 down to 0, we encounter newer turns before older turns
# When we find a duplicate file, we skip it because the newer version is already in our list
for i in range(len(context.turns) - 1, -1, -1): # REVERSE: newest turn first
turn = context.turns[i]
if turn.files:
logger.debug(f"[FILES] Turn {i + 1} has {len(turn.files)} files: {turn.files}")
for file_path in turn.files:
if file_path not in seen_files:
# First time seeing this file - add it (this is the NEWEST reference)
seen_files.add(file_path)
file_list.append(file_path)
logger.debug(f"[FILES] Added new file: {file_path} (from turn {i + 1})")
else:
# File already seen from a NEWER turn - skip this older reference
logger.debug(f"[FILES] Skipping duplicate file: {file_path} (newer version already included)")
logger.debug(f"[FILES] Final file list ({len(file_list)}): {file_list}")
return file_list
def get_conversation_image_list(context: ThreadContext) -> list[str]:
"""
Extract all unique images from conversation turns with newest-first prioritization.
This function implements the identical prioritization logic as get_conversation_file_list()
to ensure consistency in how images are handled across conversation turns. It walks
backwards through conversation turns (from newest to oldest) and collects unique image
references, ensuring that when the same image appears in multiple turns, the reference
from the NEWEST turn takes precedence.
PRIORITIZATION ALGORITHM:
1. Iterate through turns in REVERSE order (index len-1 down to 0)
2. For each turn, process images in the order they appear in turn.images
3. Add image to result list only if not already seen (newest reference wins)
4. Skip duplicate images that were already added from newer turns
This ensures that:
- Images from newer conversation turns appear first in the result
- When the same image is referenced multiple times, only the newest reference is kept
- The order reflects the most recent conversation context
Example:
Turn 1: images = ["diagram.png", "flow.jpg"]
Turn 2: images = ["error.png"]
Turn 3: images = ["diagram.png", "updated.png"] # diagram.png appears again
Result: ["diagram.png", "updated.png", "error.png", "flow.jpg"]
(diagram.png from Turn 3 takes precedence over Turn 1)
Args:
context: ThreadContext containing all conversation turns to process
Returns:
list[str]: Unique image paths ordered by newest reference first.
Empty list if no turns exist or no images are referenced.
Performance:
- Time Complexity: O(n*m) where n=turns, m=avg images per turn
- Space Complexity: O(i) where i=total unique images
- Uses set for O(1) duplicate detection
"""
if not context.turns:
logger.debug("[IMAGES] No turns found, returning empty image list")
return []
# Collect images by walking backwards (newest to oldest turns)
seen_images = set()
image_list = []
logger.debug(f"[IMAGES] Collecting images from {len(context.turns)} turns (newest first)")
# Process turns in reverse order (newest first) - this is the CORE of newest-first prioritization
# By iterating from len-1 down to 0, we encounter newer turns before older turns
# When we find a duplicate image, we skip it because the newer version is already in our list
for i in range(len(context.turns) - 1, -1, -1): # REVERSE: newest turn first
turn = context.turns[i]
if turn.images:
logger.debug(f"[IMAGES] Turn {i + 1} has {len(turn.images)} images: {turn.images}")
for image_path in turn.images:
if image_path not in seen_images:
# First time seeing this image - add it (this is the NEWEST reference)
seen_images.add(image_path)
image_list.append(image_path)
logger.debug(f"[IMAGES] Added new image: {image_path} (from turn {i + 1})")
else:
# Image already seen from a NEWER turn - skip this older reference
logger.debug(f"[IMAGES] Skipping duplicate image: {image_path} (newer version already included)")
logger.debug(f"[IMAGES] Final image list ({len(image_list)}): {image_list}")
return image_list
def _plan_file_inclusion_by_size(all_files: list[str], max_file_tokens: int) -> tuple[list[str], list[str], int]:
"""
Plan which files to include based on size constraints.
This is ONLY used for conversation history building, not MCP boundary checks.
Args:
all_files: List of files to consider for inclusion
max_file_tokens: Maximum tokens available for file content
Returns:
Tuple of (files_to_include, files_to_skip, estimated_total_tokens)
"""
if not all_files:
return [], [], 0
files_to_include = []
files_to_skip = []
total_tokens = 0
logger.debug(f"[FILES] Planning inclusion for {len(all_files)} files with budget {max_file_tokens:,} tokens")
for file_path in all_files:
try:
from utils.file_utils import estimate_file_tokens
if os.path.exists(file_path) and os.path.isfile(file_path):
# Use centralized token estimation for consistency
estimated_tokens = estimate_file_tokens(file_path)
if total_tokens + estimated_tokens <= max_file_tokens:
files_to_include.append(file_path)
total_tokens += estimated_tokens
logger.debug(
f"[FILES] Including {file_path} - {estimated_tokens:,} tokens (total: {total_tokens:,})"
)
else:
files_to_skip.append(file_path)
logger.debug(
f"[FILES] Skipping {file_path} - would exceed budget (needs {estimated_tokens:,} tokens)"
)
else:
files_to_skip.append(file_path)
# More descriptive message for missing files
if not os.path.exists(file_path):
logger.debug(
f"[FILES] Skipping {file_path} - file no longer exists (may have been moved/deleted since conversation)"
)
else:
logger.debug(f"[FILES] Skipping {file_path} - file not accessible (not a regular file)")
except Exception as e:
files_to_skip.append(file_path)
logger.debug(f"[FILES] Skipping {file_path} - error during processing: {type(e).__name__}: {e}")
logger.debug(
f"[FILES] Inclusion plan: {len(files_to_include)} include, {len(files_to_skip)} skip, {total_tokens:,} tokens"
)
return files_to_include, files_to_skip, total_tokens
def build_conversation_history(context: ThreadContext, model_context=None, read_files_func=None) -> tuple[str, int]:
"""
Build formatted conversation history for tool prompts with embedded file contents.
Creates a comprehensive conversation history that includes both conversation turns and
file contents, with intelligent prioritization to maximize relevant context within
token limits. This function enables stateless tools to access complete conversation
context from previous interactions, including cross-tool continuations.
FILE PRIORITIZATION BEHAVIOR:
Files from newer conversation turns are prioritized over files from older turns.
When the same file appears in multiple turns, the reference from the NEWEST turn
takes precedence. This ensures the most recent file context is preserved when
token limits require file exclusions.
CONVERSATION CHAIN HANDLING:
If the thread has a parent_thread_id, this function traverses the entire chain
to include complete conversation history across multiple linked threads. File
prioritization works across the entire chain, not just the current thread.
CONVERSATION TURN ORDERING STRATEGY:
The function employs a sophisticated two-phase approach for optimal token utilization:
PHASE 1 - COLLECTION (Newest-First for Token Budget):
- Processes conversation turns in REVERSE chronological order (newest to oldest)
- Prioritizes recent turns within token constraints
- If token budget is exceeded, OLDER turns are excluded first
- Ensures the most contextually relevant recent exchanges are preserved
PHASE 2 - PRESENTATION (Chronological for LLM Understanding):
- Reverses the collected turns back to chronological order (oldest to newest)
- Presents conversation flow naturally for LLM comprehension
- Maintains "--- Turn 1, Turn 2, Turn 3..." sequential numbering
- Enables LLM to follow conversation progression logically
This approach balances recency prioritization with natural conversation flow.
TOKEN MANAGEMENT:
- Uses model-specific token allocation (file_tokens + history_tokens)
- Files are embedded ONCE at the start to prevent duplication
- Turn collection prioritizes newest-first, presentation shows chronologically
- Stops adding turns when token budget would be exceeded
- Gracefully handles token limits with informative notes
Args:
context: ThreadContext containing the conversation to format
model_context: ModelContext for token allocation (optional, uses DEFAULT_MODEL fallback)
read_files_func: Optional function to read files (primarily for testing)
Returns:
tuple[str, int]: (formatted_conversation_history, total_tokens_used)
Returns ("", 0) if no conversation turns exist in the context
Output Format:
=== CONVERSATION HISTORY (CONTINUATION) ===
Thread: <thread_id>
Tool: <original_tool_name>
Turn <current>/<max_allowed>
You are continuing this conversation thread from where it left off.
=== FILES REFERENCED IN THIS CONVERSATION ===
The following files have been shared and analyzed during our conversation.
[NOTE: X files omitted due to size constraints]
Refer to these when analyzing the context and requests below:
<embedded_file_contents_with_line_numbers>
=== END REFERENCED FILES ===
Previous conversation turns:
--- Turn 1 (Claude) ---
Files used in this turn: file1.py, file2.py
<turn_content>
--- Turn 2 (gemini-2.5-flash using analyze via google) ---
Files used in this turn: file3.py
<turn_content>
=== END CONVERSATION HISTORY ===
IMPORTANT: You are continuing an existing conversation thread...
This is turn X of the conversation - use the conversation history above...
Cross-Tool Collaboration:
This formatted history allows any tool to "see" both conversation context AND
file contents from previous tools, enabling seamless handoffs between analyze,
codereview, debug, chat, and other tools while maintaining complete context.
Performance Characteristics:
- O(n) file collection with newest-first prioritization
- Intelligent token budgeting prevents context window overflow
- In-memory persistence with automatic TTL management
- Graceful degradation when files are inaccessible or too large
"""
# Get the complete thread chain
if context.parent_thread_id:
# This thread has a parent, get the full chain
chain = get_thread_chain(context.thread_id)
# Collect all turns from all threads in chain
all_turns = []
total_turns = 0
for thread in chain:
all_turns.extend(thread.turns)
total_turns += len(thread.turns)
# Use centralized file collection logic for consistency across the entire chain
# This ensures files from newer turns across ALL threads take precedence
# over files from older turns, maintaining the newest-first prioritization
# even when threads are chained together
temp_context = ThreadContext(
thread_id="merged_chain",
created_at=context.created_at,
last_updated_at=context.last_updated_at,
tool_name=context.tool_name,
turns=all_turns, # All turns from entire chain in chronological order
initial_context=context.initial_context,
)
all_files = get_conversation_file_list(temp_context) # Applies newest-first logic to entire chain
logger.debug(f"[THREAD] Built history from {len(chain)} threads with {total_turns} total turns")
else:
# Single thread, no parent chain
all_turns = context.turns
total_turns = len(context.turns)
all_files = get_conversation_file_list(context)
if not all_turns:
return "", 0
logger.debug(f"[FILES] Found {len(all_files)} unique files in conversation history")
# Get model-specific token allocation early (needed for both files and turns)
if model_context is None:
from config import DEFAULT_MODEL, IS_AUTO_MODE
from utils.model_context import ModelContext
# In auto mode, use an intelligent fallback model for token calculations
# since "auto" is not a real model with a provider
model_name = DEFAULT_MODEL
if IS_AUTO_MODE and model_name.lower() == "auto":
# Use intelligent fallback based on available API keys
from providers.registry import ModelProviderRegistry
model_name = ModelProviderRegistry.get_preferred_fallback_model()
model_context = ModelContext(model_name)
token_allocation = model_context.calculate_token_allocation()
max_file_tokens = token_allocation.file_tokens
max_history_tokens = token_allocation.history_tokens
logger.debug(f"[HISTORY] Using model-specific limits for {model_context.model_name}:")
logger.debug(f"[HISTORY] Max file tokens: {max_file_tokens:,}")
logger.debug(f"[HISTORY] Max history tokens: {max_history_tokens:,}")
history_parts = [
"=== CONVERSATION HISTORY (CONTINUATION) ===",
f"Thread: {context.thread_id}",
f"Tool: {context.tool_name}", # Original tool that started the conversation
f"Turn {total_turns}/{MAX_CONVERSATION_TURNS}",
"You are continuing this conversation thread from where it left off.",
"",
]
# Embed files referenced in this conversation with size-aware selection
if all_files:
logger.debug(f"[FILES] Starting embedding for {len(all_files)} files")
# Plan file inclusion based on size constraints
# CRITICAL: all_files is already ordered by newest-first prioritization from get_conversation_file_list()
# So when _plan_file_inclusion_by_size() hits token limits, it naturally excludes OLDER files first
# while preserving the most recent file references - exactly what we want!
files_to_include, files_to_skip, estimated_tokens = _plan_file_inclusion_by_size(all_files, max_file_tokens)
if files_to_skip:
logger.info(f"[FILES] Excluding {len(files_to_skip)} files from conversation history: {files_to_skip}")
logger.debug("[FILES] Files excluded for various reasons (size constraints, missing files, access issues)")
if files_to_include:
history_parts.extend(
[
"=== FILES REFERENCED IN THIS CONVERSATION ===",
"The following files have been shared and analyzed during our conversation.",
(
""
if not files_to_skip
else f"[NOTE: {len(files_to_skip)} files omitted (size constraints, missing files, or access issues)]"
),
"Refer to these when analyzing the context and requests below:",
"",
]
)
if read_files_func is None:
from utils.file_utils import read_file_content
# Process files for embedding
file_contents = []
total_tokens = 0
files_included = 0
for file_path in files_to_include:
try:
logger.debug(f"[FILES] Processing file {file_path}")
formatted_content, content_tokens = read_file_content(file_path)
if formatted_content:
file_contents.append(formatted_content)
total_tokens += content_tokens
files_included += 1
logger.debug(
f"File embedded in conversation history: {file_path} ({content_tokens:,} tokens)"
)
else:
logger.debug(f"File skipped (empty content): {file_path}")
except Exception as e:
# More descriptive error handling for missing files
try:
if not os.path.exists(file_path):
logger.info(
f"File no longer accessible for conversation history: {file_path} - file was moved/deleted since conversation (marking as excluded)"
)
else:
logger.warning(
f"Failed to embed file in conversation history: {file_path} - {type(e).__name__}: {e}"
)
except Exception:
# Fallback if path translation also fails
logger.warning(
f"Failed to embed file in conversation history: {file_path} - {type(e).__name__}: {e}"
)
continue
if file_contents:
files_content = "".join(file_contents)
if files_to_skip:
files_content += (
f"\n[NOTE: {len(files_to_skip)} additional file(s) were omitted due to size constraints, missing files, or access issues. "
f"These were older files from earlier conversation turns.]\n"
)
history_parts.append(files_content)
logger.debug(
f"Conversation history file embedding complete: {files_included} files embedded, {len(files_to_skip)} omitted, {total_tokens:,} total tokens"
)
else:
history_parts.append("(No accessible files found)")
logger.debug(f"[FILES] No accessible files found from {len(files_to_include)} planned files")
else:
# Fallback to original read_files function
files_content = read_files_func(all_files)
if files_content:
# Add token validation for the combined file content
from utils.token_utils import check_token_limit
within_limit, estimated_tokens = check_token_limit(files_content)
if within_limit:
history_parts.append(files_content)
else:
# Handle token limit exceeded for conversation files
error_message = f"ERROR: The total size of files referenced in this conversation has exceeded the context limit and cannot be displayed.\nEstimated tokens: {estimated_tokens}, but limit is {max_file_tokens}."
history_parts.append(error_message)
else:
history_parts.append("(No accessible files found)")
history_parts.extend(
[
"",
"=== END REFERENCED FILES ===",
"",
]
)
history_parts.append("Previous conversation turns:")
# === PHASE 1: COLLECTION (Newest-First for Token Budget) ===
# Build conversation turns bottom-up (most recent first) to prioritize recent context within token limits
# This ensures we include as many recent turns as possible within the token budget by excluding
# OLDER turns first when space runs out, preserving the most contextually relevant exchanges
turn_entries = [] # Will store (index, formatted_turn_content) for chronological ordering later
total_turn_tokens = 0
file_embedding_tokens = sum(model_context.estimate_tokens(part) for part in history_parts)
# CRITICAL: Process turns in REVERSE chronological order (newest to oldest)
# This prioritization strategy ensures recent context is preserved when token budget is tight
for idx in range(len(all_turns) - 1, -1, -1):
turn = all_turns[idx]
turn_num = idx + 1
if turn.role == "user":
role_label = "Agent"
else:
role_label = turn.model_name or "Assistant"
# Build the complete turn content
turn_parts = []
# Add turn header with tool attribution for cross-tool tracking
turn_header = f"\n--- Turn {turn_num} ({role_label}"
if turn.tool_name:
turn_header += f" using {turn.tool_name}"
# Add model info if available
if turn.model_provider:
provider_descriptor = turn.model_provider
if turn.model_name and turn.model_name != role_label:
provider_descriptor += f"/{turn.model_name}"
turn_header += f" via {provider_descriptor}"
elif turn.model_name and turn.model_name != role_label:
turn_header += f" via {turn.model_name}"
turn_header += ") ---"
turn_parts.append(turn_header)
# Get tool-specific formatting if available
# This includes file references and the actual content
tool_formatted_content = _get_tool_formatted_content(turn)
turn_parts.extend(tool_formatted_content)
# Calculate tokens for this turn
turn_content = "\n".join(turn_parts)
turn_tokens = model_context.estimate_tokens(turn_content)
# Check if adding this turn would exceed history budget
if file_embedding_tokens + total_turn_tokens + turn_tokens > max_history_tokens:
# Stop adding turns - we've reached the limit
logger.debug(f"[HISTORY] Stopping at turn {turn_num} - would exceed history budget")
logger.debug(f"[HISTORY] File tokens: {file_embedding_tokens:,}")
logger.debug(f"[HISTORY] Turn tokens so far: {total_turn_tokens:,}")
logger.debug(f"[HISTORY] This turn: {turn_tokens:,}")
logger.debug(f"[HISTORY] Would total: {file_embedding_tokens + total_turn_tokens + turn_tokens:,}")
logger.debug(f"[HISTORY] Budget: {max_history_tokens:,}")
break
# Add this turn to our collection (we'll reverse it later for chronological presentation)
# Store the original index to maintain proper turn numbering in final output
turn_entries.append((idx, turn_content))
total_turn_tokens += turn_tokens
# === PHASE 2: PRESENTATION (Chronological for LLM Understanding) ===
# Reverse the collected turns to restore chronological order (oldest first)
# This gives the LLM a natural conversation flow: Turn 1 → Turn 2 → Turn 3...
# while still having prioritized recent turns during the token-constrained collection phase
turn_entries.reverse()
# Add the turns in chronological order for natural LLM comprehension
# The LLM will see: "--- Turn 1 (Agent) ---" followed by "--- Turn 2 (Model) ---" etc.
for _, turn_content in turn_entries:
history_parts.append(turn_content)
# Log what we included
included_turns = len(turn_entries)
total_turns = len(all_turns)
if included_turns < total_turns:
logger.info(f"[HISTORY] Included {included_turns}/{total_turns} turns due to token limit")
history_parts.append(f"\n[Note: Showing {included_turns} most recent turns out of {total_turns} total]")
history_parts.extend(
[
"",
"=== END CONVERSATION HISTORY ===",
"",
"IMPORTANT: You are continuing an existing conversation thread. Build upon the previous exchanges shown above,",
"reference earlier points, and maintain consistency with what has been discussed.",
"",
"DO NOT repeat or summarize previous analysis, findings, or instructions that are already covered in the",
"conversation history. Instead, provide only new insights, additional analysis, or direct answers to",
"the follow-up question / concerns / insights. Assume the user has read the prior conversation.",
"",
f"This is turn {len(all_turns) + 1} of the conversation - use the conversation history above to provide a coherent continuation.",
]
)
# Calculate total tokens for the complete conversation history
complete_history = "\n".join(history_parts)
from utils.token_utils import estimate_tokens
total_conversation_tokens = estimate_tokens(complete_history)
# Summary log of what was built
user_turns = len([t for t in all_turns if t.role == "user"])
assistant_turns = len([t for t in all_turns if t.role == "assistant"])
logger.debug(
f"[FLOW] Built conversation history: {user_turns} user + {assistant_turns} assistant turns, {len(all_files)} files, {total_conversation_tokens:,} tokens"
)
return complete_history, total_conversation_tokens
def _get_tool_formatted_content(turn: ConversationTurn) -> list[str]:
"""
Get tool-specific formatting for a conversation turn.
This function attempts to use the tool's custom formatting method if available,
falling back to default formatting if the tool cannot be found or doesn't
provide custom formatting.
Args:
turn: The conversation turn to format
Returns:
list[str]: Formatted content lines for this turn
"""
if turn.tool_name:
try:
# Dynamically import to avoid circular dependencies
from server import TOOLS
tool = TOOLS.get(turn.tool_name)
if tool:
# Use inheritance pattern - try to call the method directly
# If it doesn't exist or raises AttributeError, fall back to default
try:
return tool.format_conversation_turn(turn)
except AttributeError:
# Tool doesn't implement format_conversation_turn - use default
pass
except Exception as e:
# Log but don't fail - fall back to default formatting
logger.debug(f"[HISTORY] Could not get tool-specific formatting for {turn.tool_name}: {e}")
# Default formatting
return _default_turn_formatting(turn)
def _default_turn_formatting(turn: ConversationTurn) -> list[str]:
"""
Default formatting for conversation turns.
This provides the standard formatting when no tool-specific
formatting is available.
Args:
turn: The conversation turn to format
Returns:
list[str]: Default formatted content lines
"""
parts = []
# Add files context if present
if turn.files:
parts.append(f"Files used in this turn: {', '.join(turn.files)}")
parts.append("") # Empty line for readability
# Add the actual content
parts.append(turn.content)
return parts
def _is_valid_uuid(val: str) -> bool:
"""
Validate UUID format for security
Ensures thread IDs are valid UUIDs to prevent injection attacks
and malformed requests.
Args:
val: String to validate as UUID
Returns:
bool: True if valid UUID format, False otherwise
"""
try:
uuid.UUID(val)
return True
except ValueError:
return False
```
--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------
```python
"""
Zen MCP Server - Main server implementation
This module implements the core MCP (Model Context Protocol) server that provides
AI-powered tools for code analysis, review, and assistance using multiple AI models.
The server follows the MCP specification to expose various AI tools as callable functions
that can be used by MCP clients (like Claude). Each tool provides specialized functionality
such as code review, debugging, deep thinking, and general chat capabilities.
Key Components:
- MCP Server: Handles protocol communication and tool discovery
- Tool Registry: Maps tool names to their implementations
- Request Handler: Processes incoming tool calls and returns formatted responses
- Configuration: Manages API keys and model settings
The server runs on stdio (standard input/output) and communicates using JSON-RPC messages
as defined by the MCP protocol.
"""
import asyncio
import atexit
import logging
import os
import sys
import time
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Any, Optional
from mcp.server import Server # noqa: E402
from mcp.server.models import InitializationOptions # noqa: E402
from mcp.server.stdio import stdio_server # noqa: E402
from mcp.types import ( # noqa: E402
GetPromptResult,
Prompt,
PromptMessage,
PromptsCapability,
ServerCapabilities,
TextContent,
Tool,
ToolAnnotations,
ToolsCapability,
)
from config import ( # noqa: E402
DEFAULT_MODEL,
__version__,
)
from tools import ( # noqa: E402
AnalyzeTool,
ChallengeTool,
ChatTool,
CLinkTool,
CodeReviewTool,
ConsensusTool,
DebugIssueTool,
DocgenTool,
ListModelsTool,
LookupTool,
PlannerTool,
PrecommitTool,
RefactorTool,
SecauditTool,
TestGenTool,
ThinkDeepTool,
TracerTool,
VersionTool,
)
from tools.models import ToolOutput # noqa: E402
from tools.shared.exceptions import ToolExecutionError # noqa: E402
from utils.env import env_override_enabled, get_env # noqa: E402
# Configure logging for server operations
# Can be controlled via LOG_LEVEL environment variable (DEBUG, INFO, WARNING, ERROR)
log_level = (get_env("LOG_LEVEL", "DEBUG") or "DEBUG").upper()
# Create timezone-aware formatter
class LocalTimeFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
"""Override to use local timezone instead of UTC"""
ct = self.converter(record.created)
if datefmt:
s = time.strftime(datefmt, ct)
else:
t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
s = f"{t},{record.msecs:03.0f}"
return s
# Configure both console and file logging
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Clear any existing handlers first
root_logger = logging.getLogger()
root_logger.handlers.clear()
# Create and configure stderr handler explicitly
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(getattr(logging, log_level, logging.INFO))
stderr_handler.setFormatter(LocalTimeFormatter(log_format))
root_logger.addHandler(stderr_handler)
# Note: MCP stdio_server interferes with stderr during tool execution
# All logs are properly written to logs/mcp_server.log for monitoring
# Set root logger level
root_logger.setLevel(getattr(logging, log_level, logging.INFO))
# Add rotating file handler for local log monitoring
try:
# Create logs directory in project root
log_dir = Path(__file__).parent / "logs"
log_dir.mkdir(exist_ok=True)
# Main server log with size-based rotation (20MB max per file)
# This ensures logs don't grow indefinitely and are properly managed
file_handler = RotatingFileHandler(
log_dir / "mcp_server.log",
maxBytes=20 * 1024 * 1024, # 20MB max file size
backupCount=5, # Keep 10 rotated files (100MB total)
encoding="utf-8",
)
file_handler.setLevel(getattr(logging, log_level, logging.INFO))
file_handler.setFormatter(LocalTimeFormatter(log_format))
logging.getLogger().addHandler(file_handler)
# Create a special logger for MCP activity tracking with size-based rotation
mcp_logger = logging.getLogger("mcp_activity")
mcp_file_handler = RotatingFileHandler(
log_dir / "mcp_activity.log",
maxBytes=10 * 1024 * 1024, # 20MB max file size
backupCount=2, # Keep 5 rotated files (20MB total)
encoding="utf-8",
)
mcp_file_handler.setLevel(logging.INFO)
mcp_file_handler.setFormatter(LocalTimeFormatter("%(asctime)s - %(message)s"))
mcp_logger.addHandler(mcp_file_handler)
mcp_logger.setLevel(logging.INFO)
# Ensure MCP activity also goes to stderr
mcp_logger.propagate = True
# Log setup info directly to root logger since logger isn't defined yet
logging.info(f"Logging to: {log_dir / 'mcp_server.log'}")
logging.info(f"Process PID: {os.getpid()}")
except Exception as e:
print(f"Warning: Could not set up file logging: {e}", file=sys.stderr)
logger = logging.getLogger(__name__)
# Log ZEN_MCP_FORCE_ENV_OVERRIDE configuration for transparency
if env_override_enabled():
logger.info("ZEN_MCP_FORCE_ENV_OVERRIDE enabled - .env file values will override system environment variables")
logger.debug("Environment override prevents conflicts between different AI tools passing cached API keys")
else:
logger.debug("ZEN_MCP_FORCE_ENV_OVERRIDE disabled - system environment variables take precedence")
# Create the MCP server instance with a unique name identifier
# This name is used by MCP clients to identify and connect to this specific server
server: Server = Server("zen-server")
# Constants for tool filtering
ESSENTIAL_TOOLS = {"version", "listmodels"}
def parse_disabled_tools_env() -> set[str]:
"""
Parse the DISABLED_TOOLS environment variable into a set of tool names.
Returns:
Set of lowercase tool names to disable, empty set if none specified
"""
disabled_tools_env = (get_env("DISABLED_TOOLS", "") or "").strip()
if not disabled_tools_env:
return set()
return {t.strip().lower() for t in disabled_tools_env.split(",") if t.strip()}
def validate_disabled_tools(disabled_tools: set[str], all_tools: dict[str, Any]) -> None:
"""
Validate the disabled tools list and log appropriate warnings.
Args:
disabled_tools: Set of tool names requested to be disabled
all_tools: Dictionary of all available tool instances
"""
essential_disabled = disabled_tools & ESSENTIAL_TOOLS
if essential_disabled:
logger.warning(f"Cannot disable essential tools: {sorted(essential_disabled)}")
unknown_tools = disabled_tools - set(all_tools.keys())
if unknown_tools:
logger.warning(f"Unknown tools in DISABLED_TOOLS: {sorted(unknown_tools)}")
def apply_tool_filter(all_tools: dict[str, Any], disabled_tools: set[str]) -> dict[str, Any]:
"""
Apply the disabled tools filter to create the final tools dictionary.
Args:
all_tools: Dictionary of all available tool instances
disabled_tools: Set of tool names to disable
Returns:
Dictionary containing only enabled tools
"""
enabled_tools = {}
for tool_name, tool_instance in all_tools.items():
if tool_name in ESSENTIAL_TOOLS or tool_name not in disabled_tools:
enabled_tools[tool_name] = tool_instance
else:
logger.debug(f"Tool '{tool_name}' disabled via DISABLED_TOOLS")
return enabled_tools
def log_tool_configuration(disabled_tools: set[str], enabled_tools: dict[str, Any]) -> None:
"""
Log the final tool configuration for visibility.
Args:
disabled_tools: Set of tool names that were requested to be disabled
enabled_tools: Dictionary of tools that remain enabled
"""
if not disabled_tools:
logger.info("All tools enabled (DISABLED_TOOLS not set)")
return
actual_disabled = disabled_tools - ESSENTIAL_TOOLS
if actual_disabled:
logger.debug(f"Disabled tools: {sorted(actual_disabled)}")
logger.info(f"Active tools: {sorted(enabled_tools.keys())}")
def filter_disabled_tools(all_tools: dict[str, Any]) -> dict[str, Any]:
"""
Filter tools based on DISABLED_TOOLS environment variable.
Args:
all_tools: Dictionary of all available tool instances
Returns:
dict: Filtered dictionary containing only enabled tools
"""
disabled_tools = parse_disabled_tools_env()
if not disabled_tools:
log_tool_configuration(disabled_tools, all_tools)
return all_tools
validate_disabled_tools(disabled_tools, all_tools)
enabled_tools = apply_tool_filter(all_tools, disabled_tools)
log_tool_configuration(disabled_tools, enabled_tools)
return enabled_tools
# Initialize the tool registry with all available AI-powered tools
# Each tool provides specialized functionality for different development tasks
# Tools are instantiated once and reused across requests (stateless design)
TOOLS = {
"chat": ChatTool(), # Interactive development chat and brainstorming
"clink": CLinkTool(), # Bridge requests to configured AI CLIs
"thinkdeep": ThinkDeepTool(), # Step-by-step deep thinking workflow with expert analysis
"planner": PlannerTool(), # Interactive sequential planner using workflow architecture
"consensus": ConsensusTool(), # Step-by-step consensus workflow with multi-model analysis
"codereview": CodeReviewTool(), # Comprehensive step-by-step code review workflow with expert analysis
"precommit": PrecommitTool(), # Step-by-step pre-commit validation workflow
"debug": DebugIssueTool(), # Root cause analysis and debugging assistance
"secaudit": SecauditTool(), # Comprehensive security audit with OWASP Top 10 and compliance coverage
"docgen": DocgenTool(), # Step-by-step documentation generation with complexity analysis
"analyze": AnalyzeTool(), # General-purpose file and code analysis
"refactor": RefactorTool(), # Step-by-step refactoring analysis workflow with expert validation
"tracer": TracerTool(), # Static call path prediction and control flow analysis
"testgen": TestGenTool(), # Step-by-step test generation workflow with expert validation
"challenge": ChallengeTool(), # Critical challenge prompt wrapper to avoid automatic agreement
"apilookup": LookupTool(), # Quick web/API lookup instructions
"listmodels": ListModelsTool(), # List all available AI models by provider
"version": VersionTool(), # Display server version and system information
}
TOOLS = filter_disabled_tools(TOOLS)
# Rich prompt templates for all tools
PROMPT_TEMPLATES = {
"chat": {
"name": "chat",
"description": "Chat and brainstorm ideas",
"template": "Chat with {model} about this",
},
"clink": {
"name": "clink",
"description": "Forward a request to a configured AI CLI (e.g., Gemini)",
"template": "Use clink with cli_name=<cli> to run this prompt",
},
"thinkdeep": {
"name": "thinkdeeper",
"description": "Step-by-step deep thinking workflow with expert analysis",
"template": "Start comprehensive deep thinking workflow with {model} using {thinking_mode} thinking mode",
},
"planner": {
"name": "planner",
"description": "Break down complex ideas, problems, or projects into multiple manageable steps",
"template": "Create a detailed plan with {model}",
},
"consensus": {
"name": "consensus",
"description": "Step-by-step consensus workflow with multi-model analysis",
"template": "Start comprehensive consensus workflow with {model}",
},
"codereview": {
"name": "review",
"description": "Perform a comprehensive code review",
"template": "Perform a comprehensive code review with {model}",
},
"precommit": {
"name": "precommit",
"description": "Step-by-step pre-commit validation workflow",
"template": "Start comprehensive pre-commit validation workflow with {model}",
},
"debug": {
"name": "debug",
"description": "Debug an issue or error",
"template": "Help debug this issue with {model}",
},
"secaudit": {
"name": "secaudit",
"description": "Comprehensive security audit with OWASP Top 10 coverage",
"template": "Perform comprehensive security audit with {model}",
},
"docgen": {
"name": "docgen",
"description": "Generate comprehensive code documentation with complexity analysis",
"template": "Generate comprehensive documentation with {model}",
},
"analyze": {
"name": "analyze",
"description": "Analyze files and code structure",
"template": "Analyze these files with {model}",
},
"refactor": {
"name": "refactor",
"description": "Refactor and improve code structure",
"template": "Refactor this code with {model}",
},
"tracer": {
"name": "tracer",
"description": "Trace code execution paths",
"template": "Generate tracer analysis with {model}",
},
"testgen": {
"name": "testgen",
"description": "Generate comprehensive tests",
"template": "Generate comprehensive tests with {model}",
},
"challenge": {
"name": "challenge",
"description": "Challenge a statement critically without automatic agreement",
"template": "Challenge this statement critically",
},
"apilookup": {
"name": "apilookup",
"description": "Look up the latest API or SDK information",
"template": "Lookup latest API docs for {model}",
},
"listmodels": {
"name": "listmodels",
"description": "List available AI models",
"template": "List all available models",
},
"version": {
"name": "version",
"description": "Show server version and system information",
"template": "Show Zen MCP Server version",
},
}
def configure_providers():
"""
Configure and validate AI providers based on available API keys.
This function checks for API keys and registers the appropriate providers.
At least one valid API key (Gemini or OpenAI) is required.
Raises:
ValueError: If no valid API keys are found or conflicting configurations detected
"""
# Log environment variable status for debugging
logger.debug("Checking environment variables for API keys...")
api_keys_to_check = ["OPENAI_API_KEY", "OPENROUTER_API_KEY", "GEMINI_API_KEY", "XAI_API_KEY", "CUSTOM_API_URL"]
for key in api_keys_to_check:
value = get_env(key)
logger.debug(f" {key}: {'[PRESENT]' if value else '[MISSING]'}")
from providers import ModelProviderRegistry
from providers.azure_openai import AzureOpenAIProvider
from providers.custom import CustomProvider
from providers.dial import DIALModelProvider
from providers.gemini import GeminiModelProvider
from providers.openai import OpenAIModelProvider
from providers.openrouter import OpenRouterProvider
from providers.shared import ProviderType
from providers.xai import XAIModelProvider
from utils.model_restrictions import get_restriction_service
valid_providers = []
has_native_apis = False
has_openrouter = False
has_custom = False
# Check for Gemini API key
gemini_key = get_env("GEMINI_API_KEY")
if gemini_key and gemini_key != "your_gemini_api_key_here":
valid_providers.append("Gemini")
has_native_apis = True
logger.info("Gemini API key found - Gemini models available")
# Check for OpenAI API key
openai_key = get_env("OPENAI_API_KEY")
logger.debug(f"OpenAI key check: key={'[PRESENT]' if openai_key else '[MISSING]'}")
if openai_key and openai_key != "your_openai_api_key_here":
valid_providers.append("OpenAI")
has_native_apis = True
logger.info("OpenAI API key found")
else:
if not openai_key:
logger.debug("OpenAI API key not found in environment")
else:
logger.debug("OpenAI API key is placeholder value")
# Check for Azure OpenAI configuration
azure_key = get_env("AZURE_OPENAI_API_KEY")
azure_endpoint = get_env("AZURE_OPENAI_ENDPOINT")
azure_models_available = False
if azure_key and azure_key != "your_azure_openai_key_here" and azure_endpoint:
try:
from providers.registries.azure import AzureModelRegistry
azure_registry = AzureModelRegistry()
if azure_registry.list_models():
valid_providers.append("Azure OpenAI")
has_native_apis = True
azure_models_available = True
logger.info("Azure OpenAI configuration detected")
else:
logger.warning(
"Azure OpenAI models configuration is empty. Populate conf/azure_models.json or set AZURE_MODELS_CONFIG_PATH."
)
except Exception as exc:
logger.warning(f"Failed to load Azure OpenAI models: {exc}")
# Check for X.AI API key
xai_key = get_env("XAI_API_KEY")
if xai_key and xai_key != "your_xai_api_key_here":
valid_providers.append("X.AI (GROK)")
has_native_apis = True
logger.info("X.AI API key found - GROK models available")
# Check for DIAL API key
dial_key = get_env("DIAL_API_KEY")
if dial_key and dial_key != "your_dial_api_key_here":
valid_providers.append("DIAL")
has_native_apis = True
logger.info("DIAL API key found - DIAL models available")
# Check for OpenRouter API key
openrouter_key = get_env("OPENROUTER_API_KEY")
logger.debug(f"OpenRouter key check: key={'[PRESENT]' if openrouter_key else '[MISSING]'}")
if openrouter_key and openrouter_key != "your_openrouter_api_key_here":
valid_providers.append("OpenRouter")
has_openrouter = True
logger.info("OpenRouter API key found - Multiple models available via OpenRouter")
else:
if not openrouter_key:
logger.debug("OpenRouter API key not found in environment")
else:
logger.debug("OpenRouter API key is placeholder value")
# Check for custom API endpoint (Ollama, vLLM, etc.)
custom_url = get_env("CUSTOM_API_URL")
if custom_url:
# IMPORTANT: Always read CUSTOM_API_KEY even if empty
# - Some providers (vLLM, LM Studio, enterprise APIs) require authentication
# - Others (Ollama) work without authentication (empty key)
# - DO NOT remove this variable - it's needed for provider factory function
custom_key = get_env("CUSTOM_API_KEY", "") or "" # Default to empty (Ollama doesn't need auth)
custom_model = get_env("CUSTOM_MODEL_NAME", "llama3.2") or "llama3.2"
valid_providers.append(f"Custom API ({custom_url})")
has_custom = True
logger.info(f"Custom API endpoint found: {custom_url} with model {custom_model}")
if custom_key:
logger.debug("Custom API key provided for authentication")
else:
logger.debug("No custom API key provided (using unauthenticated access)")
# Register providers in priority order:
# 1. Native APIs first (most direct and efficient)
registered_providers = []
if has_native_apis:
if gemini_key and gemini_key != "your_gemini_api_key_here":
ModelProviderRegistry.register_provider(ProviderType.GOOGLE, GeminiModelProvider)
registered_providers.append(ProviderType.GOOGLE.value)
logger.debug(f"Registered provider: {ProviderType.GOOGLE.value}")
if openai_key and openai_key != "your_openai_api_key_here":
ModelProviderRegistry.register_provider(ProviderType.OPENAI, OpenAIModelProvider)
registered_providers.append(ProviderType.OPENAI.value)
logger.debug(f"Registered provider: {ProviderType.OPENAI.value}")
if azure_models_available:
ModelProviderRegistry.register_provider(ProviderType.AZURE, AzureOpenAIProvider)
registered_providers.append(ProviderType.AZURE.value)
logger.debug(f"Registered provider: {ProviderType.AZURE.value}")
if xai_key and xai_key != "your_xai_api_key_here":
ModelProviderRegistry.register_provider(ProviderType.XAI, XAIModelProvider)
registered_providers.append(ProviderType.XAI.value)
logger.debug(f"Registered provider: {ProviderType.XAI.value}")
if dial_key and dial_key != "your_dial_api_key_here":
ModelProviderRegistry.register_provider(ProviderType.DIAL, DIALModelProvider)
registered_providers.append(ProviderType.DIAL.value)
logger.debug(f"Registered provider: {ProviderType.DIAL.value}")
# 2. Custom provider second (for local/private models)
if has_custom:
# Factory function that creates CustomProvider with proper parameters
def custom_provider_factory(api_key=None):
# api_key is CUSTOM_API_KEY (can be empty for Ollama), base_url from CUSTOM_API_URL
base_url = get_env("CUSTOM_API_URL", "") or ""
return CustomProvider(api_key=api_key or "", base_url=base_url) # Use provided API key or empty string
ModelProviderRegistry.register_provider(ProviderType.CUSTOM, custom_provider_factory)
registered_providers.append(ProviderType.CUSTOM.value)
logger.debug(f"Registered provider: {ProviderType.CUSTOM.value}")
# 3. OpenRouter last (catch-all for everything else)
if has_openrouter:
ModelProviderRegistry.register_provider(ProviderType.OPENROUTER, OpenRouterProvider)
registered_providers.append(ProviderType.OPENROUTER.value)
logger.debug(f"Registered provider: {ProviderType.OPENROUTER.value}")
# Log all registered providers
if registered_providers:
logger.info(f"Registered providers: {', '.join(registered_providers)}")
# Require at least one valid provider
if not valid_providers:
raise ValueError(
"At least one API configuration is required. Please set either:\n"
"- GEMINI_API_KEY for Gemini models\n"
"- OPENAI_API_KEY for OpenAI models\n"
"- XAI_API_KEY for X.AI GROK models\n"
"- DIAL_API_KEY for DIAL models\n"
"- OPENROUTER_API_KEY for OpenRouter (multiple models)\n"
"- CUSTOM_API_URL for local models (Ollama, vLLM, etc.)"
)
logger.info(f"Available providers: {', '.join(valid_providers)}")
# Log provider priority
priority_info = []
if has_native_apis:
priority_info.append("Native APIs (Gemini, OpenAI)")
if has_custom:
priority_info.append("Custom endpoints")
if has_openrouter:
priority_info.append("OpenRouter (catch-all)")
if len(priority_info) > 1:
logger.info(f"Provider priority: {' → '.join(priority_info)}")
# Register cleanup function for providers
def cleanup_providers():
"""Clean up all registered providers on shutdown."""
try:
registry = ModelProviderRegistry()
if hasattr(registry, "_initialized_providers"):
for provider in list(registry._initialized_providers.items()):
try:
if provider and hasattr(provider, "close"):
provider.close()
except Exception:
# Logger might be closed during shutdown
pass
except Exception:
# Silently ignore any errors during cleanup
pass
atexit.register(cleanup_providers)
# Check and log model restrictions
restriction_service = get_restriction_service()
restrictions = restriction_service.get_restriction_summary()
if restrictions:
logger.info("Model restrictions configured:")
for provider_name, allowed_models in restrictions.items():
if isinstance(allowed_models, list):
logger.info(f" {provider_name}: {', '.join(allowed_models)}")
else:
logger.info(f" {provider_name}: {allowed_models}")
# Validate restrictions against known models
provider_instances = {}
provider_types_to_validate = [ProviderType.GOOGLE, ProviderType.OPENAI, ProviderType.XAI, ProviderType.DIAL]
for provider_type in provider_types_to_validate:
provider = ModelProviderRegistry.get_provider(provider_type)
if provider:
provider_instances[provider_type] = provider
if provider_instances:
restriction_service.validate_against_known_models(provider_instances)
else:
logger.info("No model restrictions configured - all models allowed")
# Check if auto mode has any models available after restrictions
from config import IS_AUTO_MODE
if IS_AUTO_MODE:
available_models = ModelProviderRegistry.get_available_models(respect_restrictions=True)
if not available_models:
logger.error(
"Auto mode is enabled but no models are available after applying restrictions. "
"Please check your OPENAI_ALLOWED_MODELS and GOOGLE_ALLOWED_MODELS settings."
)
raise ValueError(
"No models available for auto mode due to restrictions. "
"Please adjust your allowed model settings or disable auto mode."
)
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""
List all available tools with their descriptions and input schemas.
This handler is called by MCP clients during initialization to discover
what tools are available. Each tool provides:
- name: Unique identifier for the tool
- description: Detailed explanation of what the tool does
- inputSchema: JSON Schema defining the expected parameters
Returns:
List of Tool objects representing all available tools
"""
logger.debug("MCP client requested tool list")
# Try to log client info if available (this happens early in the handshake)
try:
from utils.client_info import format_client_info, get_client_info_from_context
client_info = get_client_info_from_context(server)
if client_info:
formatted = format_client_info(client_info)
logger.info(f"MCP Client Connected: {formatted}")
# Log to activity file as well
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
friendly_name = client_info.get("friendly_name", "CLI Agent")
raw_name = client_info.get("name", "Unknown")
version = client_info.get("version", "Unknown")
mcp_activity_logger.info(f"MCP_CLIENT_INFO: {friendly_name} (raw={raw_name} v{version})")
except Exception:
pass
except Exception as e:
logger.debug(f"Could not log client info during list_tools: {e}")
tools = []
# Add all registered AI-powered tools from the TOOLS registry
for tool in TOOLS.values():
# Get optional annotations from the tool
annotations = tool.get_annotations()
tool_annotations = ToolAnnotations(**annotations) if annotations else None
tools.append(
Tool(
name=tool.name,
description=tool.description,
inputSchema=tool.get_input_schema(),
annotations=tool_annotations,
)
)
# Log cache efficiency info
openrouter_key_for_cache = get_env("OPENROUTER_API_KEY")
if openrouter_key_for_cache and openrouter_key_for_cache != "your_openrouter_api_key_here":
logger.debug("OpenRouter registry cache used efficiently across all tool schemas")
logger.debug(f"Returning {len(tools)} tools to MCP client")
return tools
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""
Handle incoming tool execution requests from MCP clients.
This is the main request dispatcher that routes tool calls to their appropriate handlers.
It supports both AI-powered tools (from TOOLS registry) and utility tools (implemented as
static functions).
CONVERSATION LIFECYCLE MANAGEMENT:
This function serves as the central orchestrator for multi-turn AI-to-AI conversations:
1. THREAD RESUMPTION: When continuation_id is present, it reconstructs complete conversation
context from in-memory storage including conversation history and file references
2. CROSS-TOOL CONTINUATION: Enables seamless handoffs between different tools (analyze →
codereview → debug) while preserving full conversation context and file references
3. CONTEXT INJECTION: Reconstructed conversation history is embedded into tool prompts
using the dual prioritization strategy:
- Files: Newest-first prioritization (recent file versions take precedence)
- Turns: Newest-first collection for token efficiency, chronological presentation for LLM
4. FOLLOW-UP GENERATION: After tool execution, generates continuation offers for ongoing
AI-to-AI collaboration with natural language instructions
STATELESS TO STATEFUL BRIDGE:
The MCP protocol is inherently stateless, but this function bridges the gap by:
- Loading persistent conversation state from in-memory storage
- Reconstructing full multi-turn context for tool execution
- Enabling tools to access previous exchanges and file references
- Supporting conversation chains across different tool types
Args:
name: The name of the tool to execute (e.g., "analyze", "chat", "codereview")
arguments: Dictionary of arguments to pass to the tool, potentially including:
- continuation_id: UUID for conversation thread resumption
- files: File paths for analysis (subject to deduplication)
- prompt: User request or follow-up question
- model: Specific AI model to use (optional)
Returns:
List of TextContent objects containing:
- Tool's primary response with analysis/results
- Continuation offers for follow-up conversations (when applicable)
- Structured JSON responses with status and content
Raises:
ValueError: If continuation_id is invalid or conversation thread not found
Exception: For tool-specific errors or execution failures
Example Conversation Flow:
1. The CLI calls analyze tool with files → creates new thread
2. Thread ID returned in continuation offer
3. The CLI continues with codereview tool + continuation_id → full context preserved
4. Multiple tools can collaborate using same thread ID
"""
logger.info(f"MCP tool call: {name}")
logger.debug(f"MCP tool arguments: {list(arguments.keys())}")
# Log to activity file for monitoring
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(f"TOOL_CALL: {name} with {len(arguments)} arguments")
except Exception:
pass
# Handle thread context reconstruction if continuation_id is present
if "continuation_id" in arguments and arguments["continuation_id"]:
continuation_id = arguments["continuation_id"]
logger.debug(f"Resuming conversation thread: {continuation_id}")
logger.debug(
f"[CONVERSATION_DEBUG] Tool '{name}' resuming thread {continuation_id} with {len(arguments)} arguments"
)
logger.debug(f"[CONVERSATION_DEBUG] Original arguments keys: {list(arguments.keys())}")
# Log to activity file for monitoring
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(f"CONVERSATION_RESUME: {name} resuming thread {continuation_id}")
except Exception:
pass
arguments = await reconstruct_thread_context(arguments)
logger.debug(f"[CONVERSATION_DEBUG] After thread reconstruction, arguments keys: {list(arguments.keys())}")
if "_remaining_tokens" in arguments:
logger.debug(f"[CONVERSATION_DEBUG] Remaining token budget: {arguments['_remaining_tokens']:,}")
# Route to AI-powered tools that require Gemini API calls
if name in TOOLS:
logger.info(f"Executing tool '{name}' with {len(arguments)} parameter(s)")
tool = TOOLS[name]
# EARLY MODEL RESOLUTION AT MCP BOUNDARY
# Resolve model before passing to tool - this ensures consistent model handling
# NOTE: Consensus tool is exempt as it handles multiple models internally
from providers.registry import ModelProviderRegistry
from utils.file_utils import check_total_file_size
from utils.model_context import ModelContext
# Get model from arguments or use default
model_name = arguments.get("model") or DEFAULT_MODEL
logger.debug(f"Initial model for {name}: {model_name}")
# Parse model:option format if present
model_name, model_option = parse_model_option(model_name)
if model_option:
logger.info(f"Parsed model format - model: '{model_name}', option: '{model_option}'")
else:
logger.info(f"Parsed model format - model: '{model_name}'")
# Consensus tool handles its own model configuration validation
# No special handling needed at server level
# Skip model resolution for tools that don't require models (e.g., planner)
if not tool.requires_model():
logger.debug(f"Tool {name} doesn't require model resolution - skipping model validation")
# Execute tool directly without model context
return await tool.execute(arguments)
# Handle auto mode at MCP boundary - resolve to specific model
if model_name.lower() == "auto":
# Get tool category to determine appropriate model
tool_category = tool.get_model_category()
resolved_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
logger.info(f"Auto mode resolved to {resolved_model} for {name} (category: {tool_category.value})")
model_name = resolved_model
# Update arguments with resolved model
arguments["model"] = model_name
# Validate model availability at MCP boundary
provider = ModelProviderRegistry.get_provider_for_model(model_name)
if not provider:
# Get list of available models for error message
available_models = list(ModelProviderRegistry.get_available_models(respect_restrictions=True).keys())
tool_category = tool.get_model_category()
suggested_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
error_message = (
f"Model '{model_name}' is not available with current API keys. "
f"Available models: {', '.join(available_models)}. "
f"Suggested model for {name}: '{suggested_model}' "
f"(category: {tool_category.value})"
)
error_output = ToolOutput(
status="error",
content=error_message,
content_type="text",
metadata={"tool_name": name, "requested_model": model_name},
)
raise ToolExecutionError(error_output.model_dump_json())
# Create model context with resolved model and option
model_context = ModelContext(model_name, model_option)
arguments["_model_context"] = model_context
arguments["_resolved_model_name"] = model_name
logger.debug(
f"Model context created for {model_name} with {model_context.capabilities.context_window} token capacity"
)
if model_option:
logger.debug(f"Model option stored in context: '{model_option}'")
# EARLY FILE SIZE VALIDATION AT MCP BOUNDARY
# Check file sizes before tool execution using resolved model
argument_files = arguments.get("absolute_file_paths")
if argument_files:
logger.debug(f"Checking file sizes for {len(argument_files)} files with model {model_name}")
file_size_check = check_total_file_size(argument_files, model_name)
if file_size_check:
logger.warning(f"File size check failed for {name} with model {model_name}")
raise ToolExecutionError(ToolOutput(**file_size_check).model_dump_json())
# Execute tool with pre-resolved model context
result = await tool.execute(arguments)
logger.info(f"Tool '{name}' execution completed")
# Log completion to activity file
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(f"TOOL_COMPLETED: {name}")
except Exception:
pass
return result
# Handle unknown tool requests gracefully
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
def parse_model_option(model_string: str) -> tuple[str, Optional[str]]:
"""
Parse model:option format into model name and option.
Handles different formats:
- OpenRouter models: preserve :free, :beta, :preview suffixes as part of model name
- Ollama/Custom models: split on : to extract tags like :latest
- Consensus stance: extract options like :for, :against
Args:
model_string: String that may contain "model:option" format
Returns:
tuple: (model_name, option) where option may be None
"""
if ":" in model_string and not model_string.startswith("http"): # Avoid parsing URLs
# Check if this looks like an OpenRouter model (contains /)
if "/" in model_string and model_string.count(":") == 1:
# Could be openai/gpt-4:something - check what comes after colon
parts = model_string.split(":", 1)
suffix = parts[1].strip().lower()
# Known OpenRouter suffixes to preserve
if suffix in ["free", "beta", "preview"]:
return model_string.strip(), None
# For other patterns (Ollama tags, consensus stances), split normally
parts = model_string.split(":", 1)
model_name = parts[0].strip()
model_option = parts[1].strip() if len(parts) > 1 else None
return model_name, model_option
return model_string.strip(), None
def get_follow_up_instructions(current_turn_count: int, max_turns: int = None) -> str:
"""
Generate dynamic follow-up instructions based on conversation turn count.
Args:
current_turn_count: Current number of turns in the conversation
max_turns: Maximum allowed turns before conversation ends (defaults to MAX_CONVERSATION_TURNS)
Returns:
Follow-up instructions to append to the tool prompt
"""
if max_turns is None:
from utils.conversation_memory import MAX_CONVERSATION_TURNS
max_turns = MAX_CONVERSATION_TURNS
if current_turn_count >= max_turns - 1:
# We're at or approaching the turn limit - no more follow-ups
return """
IMPORTANT: This is approaching the final exchange in this conversation thread.
Do NOT include any follow-up questions in your response. Provide your complete
final analysis and recommendations."""
else:
# Normal follow-up instructions
remaining_turns = max_turns - current_turn_count - 1
return f"""
CONVERSATION CONTINUATION: You can continue this discussion with the agent! ({remaining_turns} exchanges remaining)
Feel free to ask clarifying questions or suggest areas for deeper exploration naturally within your response.
If something needs clarification or you'd benefit from additional context, simply mention it conversationally.
IMPORTANT: When you suggest follow-ups or ask questions, you MUST explicitly instruct the agent to use the continuation_id
to respond. Use clear, direct language based on urgency:
For optional follow-ups: "Please continue this conversation using the continuation_id from this response if you'd "
"like to explore this further."
For needed responses: "Please respond using the continuation_id from this response - your input is needed to proceed."
For essential/critical responses: "RESPONSE REQUIRED: Please immediately continue using the continuation_id from "
"this response. Cannot proceed without your clarification/input."
This ensures the agent knows both HOW to maintain the conversation thread AND whether a response is optional, "
"needed, or essential.
The tool will automatically provide a continuation_id in the structured response that the agent can use in subsequent
tool calls to maintain full conversation context across multiple exchanges.
Remember: Only suggest follow-ups when they would genuinely add value to the discussion, and always instruct "
"The agent to use the continuation_id when you do."""
async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any]:
"""
Reconstruct conversation context for stateless-to-stateful thread continuation.
This is a critical function that transforms the inherently stateless MCP protocol into
stateful multi-turn conversations. It loads persistent conversation state from in-memory
storage and rebuilds complete conversation context using the sophisticated dual prioritization
strategy implemented in the conversation memory system.
CONTEXT RECONSTRUCTION PROCESS:
1. THREAD RETRIEVAL: Loads complete ThreadContext from storage using continuation_id
- Includes all conversation turns with tool attribution
- Preserves file references and cross-tool context
- Handles conversation chains across multiple linked threads
2. CONVERSATION HISTORY BUILDING: Uses build_conversation_history() to create
comprehensive context with intelligent prioritization:
FILE PRIORITIZATION (Newest-First Throughout):
- When same file appears in multiple turns, newest reference wins
- File embedding prioritizes recent versions, excludes older duplicates
- Token budget management ensures most relevant files are preserved
CONVERSATION TURN PRIORITIZATION (Dual Strategy):
- Collection Phase: Processes turns newest-to-oldest for token efficiency
- Presentation Phase: Presents turns chronologically for LLM understanding
- Ensures recent context is preserved when token budget is constrained
3. CONTEXT INJECTION: Embeds reconstructed history into tool request arguments
- Conversation history becomes part of the tool's prompt context
- Files referenced in previous turns are accessible to current tool
- Cross-tool knowledge transfer is seamless and comprehensive
4. TOKEN BUDGET MANAGEMENT: Applies model-specific token allocation
- Balances conversation history vs. file content vs. response space
- Gracefully handles token limits with intelligent exclusion strategies
- Preserves most contextually relevant information within constraints
CROSS-TOOL CONTINUATION SUPPORT:
This function enables seamless handoffs between different tools:
- Analyze tool → Debug tool: Full file context and analysis preserved
- Chat tool → CodeReview tool: Conversation context maintained
- Any tool → Any tool: Complete cross-tool knowledge transfer
ERROR HANDLING & RECOVERY:
- Thread expiration: Provides clear instructions for conversation restart
- Storage unavailability: Graceful degradation with error messaging
- Invalid continuation_id: Security validation and user-friendly errors
Args:
arguments: Original request arguments dictionary containing:
- continuation_id (required): UUID of conversation thread to resume
- Other tool-specific arguments that will be preserved
Returns:
dict[str, Any]: Enhanced arguments dictionary with conversation context:
- Original arguments preserved
- Conversation history embedded in appropriate format for tool consumption
- File context from previous turns made accessible
- Cross-tool knowledge transfer enabled
Raises:
ValueError: When continuation_id is invalid, thread not found, or expired
Includes user-friendly recovery instructions
Performance Characteristics:
- O(1) thread lookup in memory
- O(n) conversation history reconstruction where n = number of turns
- Intelligent token budgeting prevents context window overflow
- Optimized file deduplication minimizes redundant content
Example Usage Flow:
1. CLI: "Continue analyzing the security issues" + continuation_id
2. reconstruct_thread_context() loads previous analyze conversation
3. Debug tool receives full context including previous file analysis
4. Debug tool can reference specific findings from analyze tool
5. Natural cross-tool collaboration without context loss
"""
from utils.conversation_memory import add_turn, build_conversation_history, get_thread
continuation_id = arguments["continuation_id"]
# Get thread context from storage
logger.debug(f"[CONVERSATION_DEBUG] Looking up thread {continuation_id} in storage")
context = get_thread(continuation_id)
if not context:
logger.warning(f"Thread not found: {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Thread {continuation_id} not found in storage or expired")
# Log to activity file for monitoring
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(f"CONVERSATION_ERROR: Thread {continuation_id} not found or expired")
except Exception:
pass
# Return error asking CLI to restart conversation with full context
raise ValueError(
f"Conversation thread '{continuation_id}' was not found or has expired. "
f"This may happen if the conversation was created more than 3 hours ago or if the "
f"server was restarted. "
f"Please restart the conversation by providing your full question/prompt without the "
f"continuation_id parameter. "
f"This will create a new conversation thread that can continue with follow-up exchanges."
)
# Add user's new input to the conversation
user_prompt = arguments.get("prompt", "")
if user_prompt:
# Capture files referenced in this turn
user_files = arguments.get("absolute_file_paths") or []
logger.debug(f"[CONVERSATION_DEBUG] Adding user turn to thread {continuation_id}")
from utils.token_utils import estimate_tokens
user_prompt_tokens = estimate_tokens(user_prompt)
logger.debug(
f"[CONVERSATION_DEBUG] User prompt length: {len(user_prompt)} chars (~{user_prompt_tokens:,} tokens)"
)
logger.debug(f"[CONVERSATION_DEBUG] User files: {user_files}")
success = add_turn(continuation_id, "user", user_prompt, files=user_files)
if not success:
logger.warning(f"Failed to add user turn to thread {continuation_id}")
logger.debug("[CONVERSATION_DEBUG] Failed to add user turn - thread may be at turn limit or expired")
else:
logger.debug(f"[CONVERSATION_DEBUG] Successfully added user turn to thread {continuation_id}")
# Create model context early to use for history building
from utils.model_context import ModelContext
tool = TOOLS.get(context.tool_name)
requires_model = tool.requires_model() if tool else True
# Check if we should use the model from the previous conversation turn
model_from_args = arguments.get("model")
if requires_model and not model_from_args and context.turns:
# Find the last assistant turn to get the model used
for turn in reversed(context.turns):
if turn.role == "assistant" and turn.model_name:
arguments["model"] = turn.model_name
logger.debug(f"[CONVERSATION_DEBUG] Using model from previous turn: {turn.model_name}")
break
# Resolve an effective model for context reconstruction when DEFAULT_MODEL=auto
model_context = arguments.get("_model_context")
if requires_model:
if model_context is None:
try:
model_context = ModelContext.from_arguments(arguments)
arguments.setdefault("_resolved_model_name", model_context.model_name)
except ValueError as exc:
from providers.registry import ModelProviderRegistry
fallback_model = None
if tool is not None:
try:
fallback_model = ModelProviderRegistry.get_preferred_fallback_model(tool.get_model_category())
except Exception as fallback_exc: # pragma: no cover - defensive log
logger.debug(
f"[CONVERSATION_DEBUG] Unable to resolve fallback model for {context.tool_name}: {fallback_exc}"
)
if fallback_model is None:
available_models = ModelProviderRegistry.get_available_model_names()
if available_models:
fallback_model = available_models[0]
if fallback_model is None:
raise
logger.debug(
f"[CONVERSATION_DEBUG] Falling back to model '{fallback_model}' for context reconstruction after error: {exc}"
)
model_context = ModelContext(fallback_model)
arguments["_model_context"] = model_context
arguments["_resolved_model_name"] = fallback_model
from providers.registry import ModelProviderRegistry
provider = ModelProviderRegistry.get_provider_for_model(model_context.model_name)
if provider is None:
fallback_model = None
if tool is not None:
try:
fallback_model = ModelProviderRegistry.get_preferred_fallback_model(tool.get_model_category())
except Exception as fallback_exc: # pragma: no cover - defensive log
logger.debug(
f"[CONVERSATION_DEBUG] Unable to resolve fallback model for {context.tool_name}: {fallback_exc}"
)
if fallback_model is None:
available_models = ModelProviderRegistry.get_available_model_names()
if available_models:
fallback_model = available_models[0]
if fallback_model is None:
raise ValueError(
f"Conversation continuation failed: model '{model_context.model_name}' is not available with current API keys."
)
logger.debug(
f"[CONVERSATION_DEBUG] Model '{model_context.model_name}' unavailable; swapping to '{fallback_model}' for context reconstruction"
)
model_context = ModelContext(fallback_model)
arguments["_model_context"] = model_context
arguments["_resolved_model_name"] = fallback_model
else:
if model_context is None:
from providers.registry import ModelProviderRegistry
fallback_model = None
if tool is not None:
try:
fallback_model = ModelProviderRegistry.get_preferred_fallback_model(tool.get_model_category())
except Exception as fallback_exc: # pragma: no cover - defensive log
logger.debug(
f"[CONVERSATION_DEBUG] Unable to resolve fallback model for {context.tool_name}: {fallback_exc}"
)
if fallback_model is None:
available_models = ModelProviderRegistry.get_available_model_names()
if available_models:
fallback_model = available_models[0]
if fallback_model is None:
raise ValueError(
"Conversation continuation failed: no available models detected for context reconstruction."
)
logger.debug(
f"[CONVERSATION_DEBUG] Using fallback model '{fallback_model}' for context reconstruction of tool without model requirement"
)
model_context = ModelContext(fallback_model)
arguments["_model_context"] = model_context
arguments["_resolved_model_name"] = fallback_model
# Build conversation history with model-specific limits
logger.debug(f"[CONVERSATION_DEBUG] Building conversation history for thread {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Thread has {len(context.turns)} turns, tool: {context.tool_name}")
logger.debug(f"[CONVERSATION_DEBUG] Using model: {model_context.model_name}")
conversation_history, conversation_tokens = build_conversation_history(context, model_context)
logger.debug(f"[CONVERSATION_DEBUG] Conversation history built: {conversation_tokens:,} tokens")
logger.debug(
f"[CONVERSATION_DEBUG] Conversation history length: {len(conversation_history)} chars (~{conversation_tokens:,} tokens)"
)
# Add dynamic follow-up instructions based on turn count
follow_up_instructions = get_follow_up_instructions(len(context.turns))
logger.debug(f"[CONVERSATION_DEBUG] Follow-up instructions added for turn {len(context.turns)}")
# All tools now use standardized 'prompt' field
original_prompt = arguments.get("prompt", "")
logger.debug("[CONVERSATION_DEBUG] Extracting user input from 'prompt' field")
original_prompt_tokens = estimate_tokens(original_prompt) if original_prompt else 0
logger.debug(
f"[CONVERSATION_DEBUG] User input length: {len(original_prompt)} chars (~{original_prompt_tokens:,} tokens)"
)
# Merge original context with new prompt and follow-up instructions
if conversation_history:
enhanced_prompt = (
f"{conversation_history}\n\n=== NEW USER INPUT ===\n{original_prompt}\n\n{follow_up_instructions}"
)
else:
enhanced_prompt = f"{original_prompt}\n\n{follow_up_instructions}"
# Update arguments with enhanced context and remaining token budget
enhanced_arguments = arguments.copy()
# Store the enhanced prompt in the prompt field
enhanced_arguments["prompt"] = enhanced_prompt
# Store the original user prompt separately for size validation
enhanced_arguments["_original_user_prompt"] = original_prompt
logger.debug("[CONVERSATION_DEBUG] Storing enhanced prompt in 'prompt' field")
logger.debug("[CONVERSATION_DEBUG] Storing original user prompt in '_original_user_prompt' field")
# Calculate remaining token budget based on current model
# (model_context was already created above for history building)
token_allocation = model_context.calculate_token_allocation()
# Calculate remaining tokens for files/new content
# History has already consumed some of the content budget
remaining_tokens = token_allocation.content_tokens - conversation_tokens
enhanced_arguments["_remaining_tokens"] = max(0, remaining_tokens) # Ensure non-negative
enhanced_arguments["_model_context"] = model_context # Pass context for use in tools
logger.debug("[CONVERSATION_DEBUG] Token budget calculation:")
logger.debug(f"[CONVERSATION_DEBUG] Model: {model_context.model_name}")
logger.debug(f"[CONVERSATION_DEBUG] Total capacity: {token_allocation.total_tokens:,}")
logger.debug(f"[CONVERSATION_DEBUG] Content allocation: {token_allocation.content_tokens:,}")
logger.debug(f"[CONVERSATION_DEBUG] Conversation tokens: {conversation_tokens:,}")
logger.debug(f"[CONVERSATION_DEBUG] Remaining tokens: {remaining_tokens:,}")
# Merge original context parameters (files, etc.) with new request
if context.initial_context:
logger.debug(f"[CONVERSATION_DEBUG] Merging initial context with {len(context.initial_context)} parameters")
for key, value in context.initial_context.items():
if key not in enhanced_arguments and key not in ["temperature", "thinking_mode", "model"]:
enhanced_arguments[key] = value
logger.debug(f"[CONVERSATION_DEBUG] Merged initial context param: {key}")
logger.info(f"Reconstructed context for thread {continuation_id} (turn {len(context.turns)})")
logger.debug(f"[CONVERSATION_DEBUG] Final enhanced arguments keys: {list(enhanced_arguments.keys())}")
if "absolute_file_paths" in enhanced_arguments:
logger.debug(
f"[CONVERSATION_DEBUG] Final files in enhanced arguments: {enhanced_arguments['absolute_file_paths']}"
)
# Log to activity file for monitoring
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(
f"CONVERSATION_CONTINUATION: Thread {continuation_id} turn {len(context.turns)} - "
f"{len(context.turns)} previous turns loaded"
)
except Exception:
pass
return enhanced_arguments
@server.list_prompts()
async def handle_list_prompts() -> list[Prompt]:
"""
List all available prompts for CLI Code shortcuts.
This handler returns prompts that enable shortcuts like /zen:thinkdeeper.
We automatically generate prompts from all tools (1:1 mapping) plus add
a few marketing aliases with richer templates for commonly used tools.
Returns:
List of Prompt objects representing all available prompts
"""
logger.debug("MCP client requested prompt list")
prompts = []
# Add a prompt for each tool with rich templates
for tool_name, tool in TOOLS.items():
if tool_name in PROMPT_TEMPLATES:
# Use the rich template
template_info = PROMPT_TEMPLATES[tool_name]
prompts.append(
Prompt(
name=template_info["name"],
description=template_info["description"],
arguments=[], # MVP: no structured args
)
)
else:
# Fallback for any tools without templates (shouldn't happen)
prompts.append(
Prompt(
name=tool_name,
description=f"Use {tool.name} tool",
arguments=[],
)
)
# Add special "continue" prompt
prompts.append(
Prompt(
name="continue",
description="Continue the previous conversation using the chat tool",
arguments=[],
)
)
logger.debug(f"Returning {len(prompts)} prompts to MCP client")
return prompts
@server.get_prompt()
async def handle_get_prompt(name: str, arguments: dict[str, Any] = None) -> GetPromptResult:
"""
Get prompt details and generate the actual prompt text.
This handler is called when a user invokes a prompt (e.g., /zen:thinkdeeper or /zen:chat:gpt5).
It generates the appropriate text that CLI will then use to call the
underlying tool.
Supports structured prompt names like "chat:gpt5" where:
- "chat" is the tool name
- "gpt5" is the model to use
Args:
name: The name of the prompt to execute (can include model like "chat:gpt5")
arguments: Optional arguments for the prompt (e.g., model, thinking_mode)
Returns:
GetPromptResult with the prompt details and generated message
Raises:
ValueError: If the prompt name is unknown
"""
logger.debug(f"MCP client requested prompt: {name} with args: {arguments}")
# Handle special "continue" case
if name.lower() == "continue":
# This is "/zen:continue" - use chat tool as default for continuation
tool_name = "chat"
template_info = {
"name": "continue",
"description": "Continue the previous conversation",
"template": "Continue the conversation",
}
logger.debug("Using /zen:continue - defaulting to chat tool")
else:
# Find the corresponding tool by checking prompt names
tool_name = None
template_info = None
# Check if it's a known prompt name
for t_name, t_info in PROMPT_TEMPLATES.items():
if t_info["name"] == name:
tool_name = t_name
template_info = t_info
break
# If not found, check if it's a direct tool name
if not tool_name and name in TOOLS:
tool_name = name
template_info = {
"name": name,
"description": f"Use {name} tool",
"template": f"Use {name}",
}
if not tool_name:
logger.error(f"Unknown prompt requested: {name}")
raise ValueError(f"Unknown prompt: {name}")
# Get the template
template = template_info.get("template", f"Use {tool_name}")
# Safe template expansion with defaults
final_model = arguments.get("model", "auto") if arguments else "auto"
prompt_args = {
"model": final_model,
"thinking_mode": arguments.get("thinking_mode", "medium") if arguments else "medium",
}
logger.debug(f"Using model '{final_model}' for prompt '{name}'")
# Safely format the template
try:
prompt_text = template.format(**prompt_args)
except KeyError as e:
logger.warning(f"Missing template argument {e} for prompt {name}, using raw template")
prompt_text = template # Fallback to raw template
# Generate tool call instruction
if name.lower() == "continue":
# "/zen:continue" case
tool_instruction = (
f"Continue the previous conversation using the {tool_name} tool. "
"CRITICAL: You MUST provide the continuation_id from the previous response to maintain conversation context. "
"Additionally, you should reuse the same model that was used in the previous exchange for consistency, unless "
"the user specifically asks for a different model name to be used."
)
else:
# Simple prompt case
tool_instruction = prompt_text
return GetPromptResult(
prompt=Prompt(
name=name,
description=template_info["description"],
arguments=[],
),
messages=[
PromptMessage(
role="user",
content={"type": "text", "text": tool_instruction},
)
],
)
async def main():
"""
Main entry point for the MCP server.
Initializes the Gemini API configuration and starts the server using
stdio transport. The server will continue running until the client
disconnects or an error occurs.
The server communicates via standard input/output streams using the
MCP protocol's JSON-RPC message format.
"""
# Validate and configure providers based on available API keys
configure_providers()
# Log startup message
logger.info("Zen MCP Server starting up...")
logger.info(f"Log level: {log_level}")
# Note: MCP client info will be logged during the protocol handshake
# (when handle_list_tools is called)
# Log current model mode
from config import IS_AUTO_MODE
if IS_AUTO_MODE:
logger.info("Model mode: AUTO (CLI will select the best model for each task)")
else:
logger.info(f"Model mode: Fixed model '{DEFAULT_MODEL}'")
# Import here to avoid circular imports
from config import DEFAULT_THINKING_MODE_THINKDEEP
logger.info(f"Default thinking mode (ThinkDeep): {DEFAULT_THINKING_MODE_THINKDEEP}")
logger.info(f"Available tools: {list(TOOLS.keys())}")
logger.info("Server ready - waiting for tool requests...")
# Prepare dynamic instructions for the MCP client based on model mode
if IS_AUTO_MODE:
handshake_instructions = (
"When the user names a specific model (e.g. 'use chat with gpt5'), send that exact model in the tool call. "
"When no model is mentioned, first use the `listmodels` tool from zen to obtain available models to choose the best one from."
)
else:
handshake_instructions = (
"When the user names a specific model (e.g. 'use chat with gpt5'), send that exact model in the tool call. "
f"When no model is mentioned, default to '{DEFAULT_MODEL}'."
)
# Run the server using stdio transport (standard input/output)
# This allows the server to be launched by MCP clients as a subprocess
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="zen",
server_version=__version__,
instructions=handshake_instructions,
capabilities=ServerCapabilities(
tools=ToolsCapability(), # Advertise tool support capability
prompts=PromptsCapability(), # Advertise prompt support capability
),
),
)
def run():
"""Console script entry point for zen-mcp-server."""
try:
asyncio.run(main())
except KeyboardInterrupt:
# Handle graceful shutdown
pass
if __name__ == "__main__":
run()
```
--------------------------------------------------------------------------------
/tools/shared/base_tool.py:
--------------------------------------------------------------------------------
```python
"""
Core Tool Infrastructure for Zen MCP Tools
This module provides the fundamental base class for all tools:
- BaseTool: Abstract base class defining the tool interface
The BaseTool class defines the core contract that tools must implement and provides
common functionality for request validation, error handling, model management,
conversation handling, file processing, and response formatting.
"""
import logging
import os
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Optional
from mcp.types import TextContent
if TYPE_CHECKING:
from providers.shared import ModelCapabilities
from tools.models import ToolModelCategory
from config import MCP_PROMPT_SIZE_LIMIT
from providers import ModelProvider, ModelProviderRegistry
from utils import estimate_tokens
from utils.conversation_memory import (
ConversationTurn,
get_conversation_file_list,
get_thread,
)
from utils.env import get_env
from utils.file_utils import read_file_content, read_files
# Import models from tools.models for compatibility
try:
from tools.models import SPECIAL_STATUS_MODELS, ContinuationOffer, ToolOutput
except ImportError:
# Fallback in case models haven't been set up yet
SPECIAL_STATUS_MODELS = {}
ContinuationOffer = None
ToolOutput = None
logger = logging.getLogger(__name__)
class BaseTool(ABC):
"""
Abstract base class for all Zen MCP tools.
This class defines the interface that all tools must implement and provides
common functionality for request handling, model creation, and response formatting.
CONVERSATION-AWARE FILE PROCESSING:
This base class implements the sophisticated dual prioritization strategy for
conversation-aware file handling across all tools:
1. FILE DEDUPLICATION WITH NEWEST-FIRST PRIORITY:
- When same file appears in multiple conversation turns, newest reference wins
- Prevents redundant file embedding while preserving most recent file state
- Cross-tool file tracking ensures consistent behavior across analyze → codereview → debug
2. CONVERSATION CONTEXT INTEGRATION:
- All tools receive enhanced prompts with conversation history via reconstruct_thread_context()
- File references from previous turns are preserved and accessible
- Cross-tool knowledge transfer maintains full context without manual file re-specification
3. TOKEN-AWARE FILE EMBEDDING:
- Respects model-specific token allocation budgets from ModelContext
- Prioritizes conversation history, then newest files, then remaining content
- Graceful degradation when token limits are approached
4. STATELESS-TO-STATEFUL BRIDGING:
- Tools operate on stateless MCP requests but access full conversation state
- Conversation memory automatically injected via continuation_id parameter
- Enables natural AI-to-AI collaboration across tool boundaries
To create a new tool:
1. Create a new class that inherits from BaseTool
2. Implement all abstract methods
3. Define a request model that inherits from ToolRequest
4. Register the tool in server.py's TOOLS dictionary
"""
# Class-level cache for OpenRouter registry to avoid multiple loads
_openrouter_registry_cache = None
_custom_registry_cache = None
@classmethod
def _get_openrouter_registry(cls):
"""Get cached OpenRouter registry instance, creating if needed."""
# Use BaseTool class directly to ensure cache is shared across all subclasses
if BaseTool._openrouter_registry_cache is None:
from providers.registries.openrouter import OpenRouterModelRegistry
BaseTool._openrouter_registry_cache = OpenRouterModelRegistry()
logger.debug("Created cached OpenRouter registry instance")
return BaseTool._openrouter_registry_cache
@classmethod
def _get_custom_registry(cls):
"""Get cached custom-endpoint registry instance."""
if BaseTool._custom_registry_cache is None:
from providers.registries.custom import CustomEndpointModelRegistry
BaseTool._custom_registry_cache = CustomEndpointModelRegistry()
logger.debug("Created cached Custom registry instance")
return BaseTool._custom_registry_cache
def __init__(self):
# Cache tool metadata at initialization to avoid repeated calls
self.name = self.get_name()
self.description = self.get_description()
self.default_temperature = self.get_default_temperature()
# Tool initialization complete
@abstractmethod
def get_name(self) -> str:
"""
Return the unique name identifier for this tool.
This name is used by MCP clients to invoke the tool and must be
unique across all registered tools.
Returns:
str: The tool's unique name (e.g., "review_code", "analyze")
"""
pass
@abstractmethod
def get_description(self) -> str:
"""
Return a detailed description of what this tool does.
This description is shown to MCP clients (like Claude / Codex / Gemini) to help them
understand when and how to use the tool. It should be comprehensive
and include trigger phrases.
Returns:
str: Detailed tool description with usage examples
"""
pass
@abstractmethod
def get_input_schema(self) -> dict[str, Any]:
"""
Return the JSON Schema that defines this tool's parameters.
This schema is used by MCP clients to validate inputs before
sending requests. It should match the tool's request model.
Returns:
Dict[str, Any]: JSON Schema object defining required and optional parameters
"""
pass
@abstractmethod
def get_system_prompt(self) -> str:
"""
Return the system prompt that configures the AI model's behavior.
This prompt sets the context and instructions for how the model
should approach the task. It's prepended to the user's request.
Returns:
str: System prompt with role definition and instructions
"""
pass
def get_capability_system_prompts(self, capabilities: Optional["ModelCapabilities"]) -> list[str]:
"""Return additional system prompt snippets gated on model capabilities.
Subclasses can override this hook to append capability-specific
instructions (for example, enabling code-generation exports when a
model advertises support). The default implementation returns an empty
list so no extra instructions are appended.
Args:
capabilities: The resolved capabilities for the active model.
Returns:
List of prompt fragments to append after the base system prompt.
"""
return []
def _augment_system_prompt_with_capabilities(
self, base_prompt: str, capabilities: Optional["ModelCapabilities"]
) -> str:
"""Merge capability-driven prompt addenda with the base system prompt."""
additions: list[str] = []
if capabilities is not None:
additions = [fragment.strip() for fragment in self.get_capability_system_prompts(capabilities) if fragment]
if not additions:
return base_prompt
addition_text = "\n\n".join(additions)
if not base_prompt:
return addition_text
suffix = "" if base_prompt.endswith("\n\n") else "\n\n"
return f"{base_prompt}{suffix}{addition_text}"
def get_annotations(self) -> Optional[dict[str, Any]]:
"""
Return optional annotations for this tool.
Annotations provide hints about tool behavior without being security-critical.
They help MCP clients make better decisions about tool usage.
Returns:
Optional[dict]: Dictionary with annotation fields like readOnlyHint, destructiveHint, etc.
Returns None if no annotations are needed.
"""
return None
def requires_model(self) -> bool:
"""
Return whether this tool requires AI model access.
Tools that override execute() to do pure data processing (like planner)
should return False to skip model resolution at the MCP boundary.
Returns:
bool: True if tool needs AI model access (default), False for data-only tools
"""
return True
def is_effective_auto_mode(self) -> bool:
"""
Check if we're in effective auto mode for schema generation.
This determines whether the model parameter should be required in the tool schema.
Used at initialization time when schemas are generated.
Returns:
bool: True if model parameter should be required in the schema
"""
from config import DEFAULT_MODEL
from providers.registry import ModelProviderRegistry
# Case 1: Explicit auto mode
if DEFAULT_MODEL.lower() == "auto":
return True
# Case 2: Model not available (fallback to auto mode)
if DEFAULT_MODEL.lower() != "auto":
provider = ModelProviderRegistry.get_provider_for_model(DEFAULT_MODEL)
if not provider:
return True
return False
def _should_require_model_selection(self, model_name: str) -> bool:
"""
Check if we should require the CLI to select a model at runtime.
This is called during request execution to determine if we need
to return an error asking the CLI to provide a model parameter.
Args:
model_name: The model name from the request or DEFAULT_MODEL
Returns:
bool: True if we should require model selection
"""
# Case 1: Model is explicitly "auto"
if model_name.lower() == "auto":
return True
# Case 2: Requested model is not available
from providers.registry import ModelProviderRegistry
provider = ModelProviderRegistry.get_provider_for_model(model_name)
if not provider:
logger = logging.getLogger(f"tools.{self.name}")
logger.warning(f"Model '{model_name}' is not available with current API keys. Requiring model selection.")
return True
return False
def _get_available_models(self) -> list[str]:
"""
Get list of models available from enabled providers.
Only returns models from providers that have valid API keys configured.
This fixes the namespace collision bug where models from disabled providers
were shown to the CLI, causing routing conflicts.
Returns:
List of model names from enabled providers only
"""
from providers.registry import ModelProviderRegistry
# Get models from enabled providers only (those with valid API keys)
all_models = ModelProviderRegistry.get_available_model_names()
# Add OpenRouter models if OpenRouter is configured
openrouter_key = get_env("OPENROUTER_API_KEY")
if openrouter_key and openrouter_key != "your_openrouter_api_key_here":
try:
registry = self._get_openrouter_registry()
# Add all aliases from the registry (includes OpenRouter cloud models)
for alias in registry.list_aliases():
if alias not in all_models:
all_models.append(alias)
except Exception as e:
import logging
logging.debug(f"Failed to add OpenRouter models to enum: {e}")
# Add custom models if custom API is configured
custom_url = get_env("CUSTOM_API_URL")
if custom_url:
try:
registry = self._get_custom_registry()
for alias in registry.list_aliases():
if alias not in all_models:
all_models.append(alias)
except Exception as e:
import logging
logging.debug(f"Failed to add custom models to enum: {e}")
# Remove duplicates while preserving order
seen = set()
unique_models = []
for model in all_models:
if model not in seen:
seen.add(model)
unique_models.append(model)
return unique_models
def _format_available_models_list(self) -> str:
"""Return a human-friendly list of available models or guidance when none found."""
summaries, total, has_restrictions = self._get_ranked_model_summaries()
if not summaries:
return (
"No models detected. Configure provider credentials or set DEFAULT_MODEL to a valid option. "
"If the user requested a specific model, respond with this notice instead of substituting another model."
)
display = "; ".join(summaries)
remainder = total - len(summaries)
if remainder > 0:
display = f"{display}; +{remainder} more (use the `listmodels` tool for the full roster)"
return display
@staticmethod
def _format_context_window(tokens: int) -> Optional[str]:
"""Convert a raw context window into a short display string."""
if not tokens or tokens <= 0:
return None
if tokens >= 1_000_000:
if tokens % 1_000_000 == 0:
return f"{tokens // 1_000_000}M ctx"
return f"{tokens / 1_000_000:.1f}M ctx"
if tokens >= 1_000:
if tokens % 1_000 == 0:
return f"{tokens // 1_000}K ctx"
return f"{tokens / 1_000:.1f}K ctx"
return f"{tokens} ctx"
def _collect_ranked_capabilities(self) -> list[tuple[int, str, Any]]:
"""Gather available model capabilities sorted by capability rank."""
from providers.registry import ModelProviderRegistry
ranked: list[tuple[int, str, Any]] = []
available = ModelProviderRegistry.get_available_models(respect_restrictions=True)
for model_name, provider_type in available.items():
provider = ModelProviderRegistry.get_provider(provider_type)
if not provider:
continue
try:
capabilities = provider.get_capabilities(model_name)
except ValueError:
continue
rank = capabilities.get_effective_capability_rank()
ranked.append((rank, model_name, capabilities))
ranked.sort(key=lambda item: (-item[0], item[1]))
return ranked
@staticmethod
def _normalize_model_identifier(name: str) -> str:
"""Normalize model names for deduplication across providers."""
normalized = name.lower()
if ":" in normalized:
normalized = normalized.split(":", 1)[0]
if "/" in normalized:
normalized = normalized.split("/", 1)[-1]
return normalized
def _get_ranked_model_summaries(self, limit: int = 5) -> tuple[list[str], int, bool]:
"""Return formatted, ranked model summaries and restriction status."""
ranked = self._collect_ranked_capabilities()
# Build allowlist map (provider -> lowercase names) when restrictions are active
allowed_map: dict[Any, set[str]] = {}
try:
from utils.model_restrictions import get_restriction_service
restriction_service = get_restriction_service()
if restriction_service:
from providers.shared import ProviderType
for provider_type in ProviderType:
allowed = restriction_service.get_allowed_models(provider_type)
if allowed:
allowed_map[provider_type] = {name.lower() for name in allowed if name}
except Exception:
allowed_map = {}
filtered: list[tuple[int, str, Any]] = []
seen_normalized: set[str] = set()
for rank, model_name, capabilities in ranked:
canonical_name = getattr(capabilities, "model_name", model_name)
canonical_lower = canonical_name.lower()
alias_lower = model_name.lower()
provider_type = getattr(capabilities, "provider", None)
if allowed_map:
if provider_type not in allowed_map:
continue
allowed_set = allowed_map[provider_type]
if canonical_lower not in allowed_set and alias_lower not in allowed_set:
continue
normalized = self._normalize_model_identifier(canonical_name)
if normalized in seen_normalized:
continue
seen_normalized.add(normalized)
filtered.append((rank, canonical_name, capabilities))
summaries: list[str] = []
for rank, canonical_name, capabilities in filtered[:limit]:
details: list[str] = []
context_str = self._format_context_window(capabilities.context_window)
if context_str:
details.append(context_str)
if capabilities.supports_extended_thinking:
details.append("thinking")
if capabilities.allow_code_generation:
details.append("code-gen")
base = f"{canonical_name} (score {rank}"
if details:
base = f"{base}, {', '.join(details)}"
summaries.append(f"{base})")
return summaries, len(filtered), bool(allowed_map)
def _get_restriction_note(self) -> Optional[str]:
"""Return a string describing active per-provider allowlists, if any."""
env_labels = {
"OPENAI_ALLOWED_MODELS": "OpenAI",
"GOOGLE_ALLOWED_MODELS": "Google",
"XAI_ALLOWED_MODELS": "X.AI",
"OPENROUTER_ALLOWED_MODELS": "OpenRouter",
"DIAL_ALLOWED_MODELS": "DIAL",
}
notes: list[str] = []
for env_var, label in env_labels.items():
raw = get_env(env_var)
if not raw:
continue
models = sorted({token.strip() for token in raw.split(",") if token.strip()})
if not models:
continue
notes.append(f"{label}: {', '.join(models)}")
if not notes:
return None
return "Policy allows only → " + "; ".join(notes)
def _build_model_unavailable_message(self, model_name: str) -> str:
"""Compose a consistent error message for unavailable model scenarios."""
tool_category = self.get_model_category()
suggested_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
available_models_text = self._format_available_models_list()
return (
f"Model '{model_name}' is not available with current API keys. "
f"Available models: {available_models_text}. "
f"Suggested model for {self.get_name()}: '{suggested_model}' "
f"(category: {tool_category.value}). If the user explicitly requested a model, you MUST use that exact name or report this error back—do not substitute another model."
)
def _build_auto_mode_required_message(self) -> str:
"""Compose the auto-mode prompt when an explicit model selection is required."""
tool_category = self.get_model_category()
suggested_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
available_models_text = self._format_available_models_list()
return (
"Model parameter is required in auto mode. "
f"Available models: {available_models_text}. "
f"Suggested model for {self.get_name()}: '{suggested_model}' "
f"(category: {tool_category.value}). When the user names a model, relay that exact name—never swap in another option."
)
def get_model_field_schema(self) -> dict[str, Any]:
"""
Generate the model field schema based on auto mode configuration.
When auto mode is enabled, the model parameter becomes required
and includes detailed descriptions of each model's capabilities.
Returns:
Dict containing the model field JSON schema
"""
from config import DEFAULT_MODEL
# Use the centralized effective auto mode check
if self.is_effective_auto_mode():
description = (
"Currently in auto model selection mode. CRITICAL: When the user names a model, you MUST use that exact name unless the server rejects it. "
"If no model is provided, you may use the `listmodels` tool to review options and select an appropriate match."
)
summaries, total, restricted = self._get_ranked_model_summaries()
remainder = max(0, total - len(summaries))
if summaries:
top_line = "; ".join(summaries)
if remainder > 0:
label = "Allowed models" if restricted else "Top models"
top_line = f"{label}: {top_line}; +{remainder} more via `listmodels`."
else:
label = "Allowed models" if restricted else "Top models"
top_line = f"{label}: {top_line}."
description = f"{description} {top_line}"
restriction_note = self._get_restriction_note()
if restriction_note and (remainder > 0 or not summaries):
description = f"{description} {restriction_note}."
return {
"type": "string",
"description": description,
}
description = (
f"The default model is '{DEFAULT_MODEL}'. Override only when the user explicitly requests a different model, and use that exact name. "
"If the requested model fails validation, surface the server error instead of substituting another model. When unsure, use the `listmodels` tool for details."
)
summaries, total, restricted = self._get_ranked_model_summaries()
remainder = max(0, total - len(summaries))
if summaries:
top_line = "; ".join(summaries)
if remainder > 0:
label = "Allowed models" if restricted else "Preferred alternatives"
top_line = f"{label}: {top_line}; +{remainder} more via `listmodels`."
else:
label = "Allowed models" if restricted else "Preferred alternatives"
top_line = f"{label}: {top_line}."
description = f"{description} {top_line}"
restriction_note = self._get_restriction_note()
if restriction_note and (remainder > 0 or not summaries):
description = f"{description} {restriction_note}."
return {
"type": "string",
"description": description,
}
def get_default_temperature(self) -> float:
"""
Return the default temperature setting for this tool.
Override this method to set tool-specific temperature defaults.
Lower values (0.0-0.3) for analytical tasks, higher (0.7-1.0) for creative tasks.
Returns:
float: Default temperature between 0.0 and 1.0
"""
return 0.5
def wants_line_numbers_by_default(self) -> bool:
"""
Return whether this tool wants line numbers added to code files by default.
By default, ALL tools get line numbers for precise code references.
Line numbers are essential for accurate communication about code locations.
Returns:
bool: True if line numbers should be added by default for this tool
"""
return True # All tools get line numbers by default for consistency
def get_default_thinking_mode(self) -> str:
"""
Return the default thinking mode for this tool.
Thinking mode controls computational budget for reasoning.
Override for tools that need more or less reasoning depth.
Returns:
str: One of "minimal", "low", "medium", "high", "max"
"""
return "medium" # Default to medium thinking for better reasoning
def get_model_category(self) -> "ToolModelCategory":
"""
Return the model category for this tool.
Model category influences which model is selected in auto mode.
Override to specify whether your tool needs extended reasoning,
fast response, or balanced capabilities.
Returns:
ToolModelCategory: Category that influences model selection
"""
from tools.models import ToolModelCategory
return ToolModelCategory.BALANCED
@abstractmethod
def get_request_model(self):
"""
Return the Pydantic model class used for validating requests.
This model should inherit from ToolRequest and define all
parameters specific to this tool.
Returns:
Type[ToolRequest]: The request model class
"""
pass
def validate_file_paths(self, request) -> Optional[str]:
"""
Validate that all file paths in the request are absolute.
This is a critical security function that prevents path traversal attacks
and ensures all file access is properly controlled. All file paths must
be absolute to avoid ambiguity and security issues.
Args:
request: The validated request object
Returns:
Optional[str]: Error message if validation fails, None if all paths are valid
"""
# Only validate files/paths if they exist in the request
file_fields = [
"absolute_file_paths",
"file",
"path",
"directory",
"notebooks",
"test_examples",
"style_guide_examples",
"files_checked",
"relevant_files",
]
for field_name in file_fields:
if hasattr(request, field_name):
field_value = getattr(request, field_name)
if field_value is None:
continue
# Handle both single paths and lists of paths
paths_to_check = field_value if isinstance(field_value, list) else [field_value]
for path in paths_to_check:
if path and not os.path.isabs(path):
return f"All file paths must be FULL absolute paths. Invalid path: '{path}'"
return None
def _validate_token_limit(self, content: str, content_type: str = "Content") -> None:
"""
Validate that user-provided content doesn't exceed the MCP prompt size limit.
This enforcement is strictly for text crossing the MCP transport boundary
(i.e., user input). Internal prompt construction may exceed this size and is
governed by model-specific token limits.
Args:
content: The user-originated content to validate
content_type: Description of the content type for error messages
Raises:
ValueError: If content exceeds the character size limit
"""
if not content:
logger.debug(f"{self.name} tool {content_type.lower()} validation skipped (no content)")
return
char_count = len(content)
if char_count > MCP_PROMPT_SIZE_LIMIT:
token_estimate = estimate_tokens(content)
error_msg = (
f"{char_count:,} characters (~{token_estimate:,} tokens). "
f"Maximum is {MCP_PROMPT_SIZE_LIMIT:,} characters."
)
logger.error(f"{self.name} tool {content_type.lower()} validation failed: {error_msg}")
raise ValueError(f"{content_type} too large: {error_msg}")
token_estimate = estimate_tokens(content)
logger.debug(
f"{self.name} tool {content_type.lower()} validation passed: "
f"{char_count:,} characters (~{token_estimate:,} tokens)"
)
def get_model_provider(self, model_name: str) -> ModelProvider:
"""
Get the appropriate model provider for the given model name.
This method performs runtime validation to ensure the requested model
is actually available with the current API key configuration.
Args:
model_name: Name of the model to get provider for
Returns:
ModelProvider: The provider instance for the model
Raises:
ValueError: If the model is not available or provider not found
"""
try:
provider = ModelProviderRegistry.get_provider_for_model(model_name)
if not provider:
logger.error(f"No provider found for model '{model_name}' in {self.name} tool")
raise ValueError(self._build_model_unavailable_message(model_name))
return provider
except Exception as e:
logger.error(f"Failed to get provider for model '{model_name}' in {self.name} tool: {e}")
raise
# === CONVERSATION AND FILE HANDLING METHODS ===
def get_conversation_embedded_files(self, continuation_id: Optional[str]) -> list[str]:
"""
Get list of files already embedded in conversation history.
This method returns the list of files that have already been embedded
in the conversation history for a given continuation thread. Tools can
use this to avoid re-embedding files that are already available in the
conversation context.
Args:
continuation_id: Thread continuation ID, or None for new conversations
Returns:
list[str]: List of file paths already embedded in conversation history
"""
if not continuation_id:
# New conversation, no files embedded yet
return []
thread_context = get_thread(continuation_id)
if not thread_context:
# Thread not found, no files embedded
return []
embedded_files = get_conversation_file_list(thread_context)
logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files")
return embedded_files
def filter_new_files(self, requested_files: list[str], continuation_id: Optional[str]) -> list[str]:
"""
Filter out files that are already embedded in conversation history.
This method prevents duplicate file embeddings by filtering out files that have
already been embedded in the conversation history. This optimizes token usage
while ensuring tools still have logical access to all requested files through
conversation history references.
Args:
requested_files: List of files requested for current tool execution
continuation_id: Thread continuation ID, or None for new conversations
Returns:
list[str]: List of files that need to be embedded (not already in history)
"""
logger.debug(f"[FILES] {self.name}: Filtering {len(requested_files)} requested files")
if not continuation_id:
# New conversation, all files are new
logger.debug(f"[FILES] {self.name}: New conversation, all {len(requested_files)} files are new")
return requested_files
try:
embedded_files = set(self.get_conversation_embedded_files(continuation_id))
logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files in conversation")
# Safety check: If no files are marked as embedded but we have a continuation_id,
# this might indicate an issue with conversation history. Be conservative.
if not embedded_files:
logger.debug(f"{self.name} tool: No files found in conversation history for thread {continuation_id}")
logger.debug(
f"[FILES] {self.name}: No embedded files found, returning all {len(requested_files)} requested files"
)
return requested_files
# Return only files that haven't been embedded yet
new_files = [f for f in requested_files if f not in embedded_files]
logger.debug(
f"[FILES] {self.name}: After filtering: {len(new_files)} new files, {len(requested_files) - len(new_files)} already embedded"
)
logger.debug(f"[FILES] {self.name}: New files to embed: {new_files}")
# Log filtering results for debugging
if len(new_files) < len(requested_files):
skipped = [f for f in requested_files if f in embedded_files]
logger.debug(
f"{self.name} tool: Filtering {len(skipped)} files already in conversation history: {', '.join(skipped)}"
)
logger.debug(f"[FILES] {self.name}: Skipped (already embedded): {skipped}")
return new_files
except Exception as e:
# If there's any issue with conversation history lookup, be conservative
# and include all files rather than risk losing access to needed files
logger.warning(f"{self.name} tool: Error checking conversation history for {continuation_id}: {e}")
logger.warning(f"{self.name} tool: Including all requested files as fallback")
logger.debug(
f"[FILES] {self.name}: Exception in filter_new_files, returning all {len(requested_files)} files as fallback"
)
return requested_files
def format_conversation_turn(self, turn: ConversationTurn) -> list[str]:
"""
Format a conversation turn for display in conversation history.
Tools can override this to provide custom formatting for their responses
while maintaining the standard structure for cross-tool compatibility.
This method is called by build_conversation_history when reconstructing
conversation context, allowing each tool to control how its responses
appear in subsequent conversation turns.
Args:
turn: The conversation turn to format (from utils.conversation_memory)
Returns:
list[str]: Lines of formatted content for this turn
Example:
Default implementation returns:
["Files used in this turn: file1.py, file2.py", "", "Response content..."]
Tools can override to add custom sections, formatting, or metadata display.
"""
parts = []
# Add files context if present
if turn.files:
parts.append(f"Files used in this turn: {', '.join(turn.files)}")
parts.append("") # Empty line for readability
# Add the actual content
parts.append(turn.content)
return parts
def handle_prompt_file(self, files: Optional[list[str]]) -> tuple[Optional[str], Optional[list[str]]]:
"""
Check for and handle prompt.txt in the absolute file paths list.
If prompt.txt is found, reads its content and removes it from the files list.
This file is treated specially as the main prompt, not as an embedded file.
This mechanism allows us to work around MCP's ~25K token limit by having
the CLI save large prompts to a file, effectively using the file transfer
mechanism to bypass token constraints while preserving response capacity.
Args:
files: List of absolute file paths (will be translated for current environment)
Returns:
tuple: (prompt_content, updated_files_list)
"""
if not files:
return None, files
prompt_content = None
updated_files = []
for file_path in files:
# Check if the filename is exactly "prompt.txt"
# This ensures we don't match files like "myprompt.txt" or "prompt.txt.bak"
if os.path.basename(file_path) == "prompt.txt":
try:
# Read prompt.txt content and extract just the text
content, _ = read_file_content(file_path)
# Extract the content between the file markers
if "--- BEGIN FILE:" in content and "--- END FILE:" in content:
lines = content.split("\n")
in_content = False
content_lines = []
for line in lines:
if line.startswith("--- BEGIN FILE:"):
in_content = True
continue
elif line.startswith("--- END FILE:"):
break
elif in_content:
content_lines.append(line)
prompt_content = "\n".join(content_lines)
else:
# Fallback: if it's already raw content (from tests or direct input)
# and doesn't have error markers, use it directly
if not content.startswith("\n--- ERROR"):
prompt_content = content
else:
prompt_content = None
except Exception:
# If we can't read the file, we'll just skip it
# The error will be handled elsewhere
pass
else:
# Keep the original path in the files list (will be translated later by read_files)
updated_files.append(file_path)
return prompt_content, updated_files if updated_files else None
def get_prompt_content_for_size_validation(self, user_content: str) -> str:
"""
Get the content that should be validated for MCP prompt size limits.
This hook method allows tools to specify what content should be checked
against the MCP transport size limit. By default, it returns the user content,
but can be overridden to exclude conversation history when needed.
Args:
user_content: The user content that would normally be validated
Returns:
The content that should actually be validated for size limits
"""
# Default implementation: validate the full user content
return user_content
def check_prompt_size(self, text: str) -> Optional[dict[str, Any]]:
"""
Check if USER INPUT text is too large for MCP transport boundary.
IMPORTANT: This method should ONLY be used to validate user input that crosses
the CLI ↔ MCP Server transport boundary. It should NOT be used to limit
internal MCP Server operations.
Args:
text: The user input text to check (NOT internal prompt content)
Returns:
Optional[Dict[str, Any]]: Response asking for file handling if too large, None otherwise
"""
if text and len(text) > MCP_PROMPT_SIZE_LIMIT:
return {
"status": "resend_prompt",
"content": (
f"MANDATORY ACTION REQUIRED: The prompt is too large for MCP's token limits (>{MCP_PROMPT_SIZE_LIMIT:,} characters). "
"YOU MUST IMMEDIATELY save the prompt text to a temporary file named 'prompt.txt' in the working directory. "
"DO NOT attempt to shorten or modify the prompt. SAVE IT AS-IS to 'prompt.txt'. "
"Then resend the request, passing the absolute file path to 'prompt.txt' as part of the tool call, "
"along with any other files you wish to share as context. Leave the prompt text itself empty or very brief in the new request. "
"This is the ONLY way to handle large prompts - you MUST follow these exact steps."
),
"content_type": "text",
"metadata": {
"prompt_size": len(text),
"limit": MCP_PROMPT_SIZE_LIMIT,
"instructions": "MANDATORY: Save prompt to 'prompt.txt' in current folder and provide full path when recalling this tool.",
},
}
return None
def _prepare_file_content_for_prompt(
self,
request_files: list[str],
continuation_id: Optional[str],
context_description: str = "New files",
max_tokens: Optional[int] = None,
reserve_tokens: int = 1_000,
remaining_budget: Optional[int] = None,
arguments: Optional[dict] = None,
model_context: Optional[Any] = None,
) -> tuple[str, list[str]]:
"""
Centralized file processing implementing dual prioritization strategy.
This method is the heart of conversation-aware file processing across all tools.
Args:
request_files: List of files requested for current tool execution
continuation_id: Thread continuation ID, or None for new conversations
context_description: Description for token limit validation (e.g. "Code", "New files")
max_tokens: Maximum tokens to use (defaults to remaining budget or model-specific content allocation)
reserve_tokens: Tokens to reserve for additional prompt content (default 1K)
remaining_budget: Remaining token budget after conversation history (from server.py)
arguments: Original tool arguments (used to extract _remaining_tokens if available)
model_context: Model context object with all model information including token allocation
Returns:
tuple[str, list[str]]: (formatted_file_content, actually_processed_files)
- formatted_file_content: Formatted file content string ready for prompt inclusion
- actually_processed_files: List of individual file paths that were actually read and embedded
(directories are expanded to individual files)
"""
if not request_files:
return "", []
# Extract remaining budget from arguments if available
if remaining_budget is None:
# Use provided arguments or fall back to stored arguments from execute()
args_to_use = arguments or getattr(self, "_current_arguments", {})
remaining_budget = args_to_use.get("_remaining_tokens")
# Use remaining budget if provided, otherwise fall back to max_tokens or model-specific default
if remaining_budget is not None:
effective_max_tokens = remaining_budget - reserve_tokens
elif max_tokens is not None:
effective_max_tokens = max_tokens - reserve_tokens
else:
# Use model_context for token allocation
if not model_context:
# Try to get from stored attributes as fallback
model_context = getattr(self, "_model_context", None)
if not model_context:
logger.error(
f"[FILES] {self.name}: _prepare_file_content_for_prompt called without model_context. "
"This indicates an incorrect call sequence in the tool's implementation."
)
raise RuntimeError("Model context not provided for file preparation.")
# This is now the single source of truth for token allocation.
try:
token_allocation = model_context.calculate_token_allocation()
# Standardize on `file_tokens` for consistency and correctness.
effective_max_tokens = token_allocation.file_tokens - reserve_tokens
logger.debug(
f"[FILES] {self.name}: Using model context for {model_context.model_name}: "
f"{token_allocation.file_tokens:,} file tokens from {token_allocation.total_tokens:,} total"
)
except Exception as e:
logger.error(
f"[FILES] {self.name}: Failed to calculate token allocation from model context: {e}", exc_info=True
)
# If the context exists but calculation fails, we still need to prevent a crash.
# A loud error is logged, and we fall back to a safe default.
effective_max_tokens = 100_000 - reserve_tokens
# Ensure we have a reasonable minimum budget
effective_max_tokens = max(1000, effective_max_tokens)
files_to_embed = self.filter_new_files(request_files, continuation_id)
logger.debug(f"[FILES] {self.name}: Will embed {len(files_to_embed)} files after filtering")
# Log the specific files for debugging/testing
if files_to_embed:
logger.info(
f"[FILE_PROCESSING] {self.name} tool will embed new files: {', '.join([os.path.basename(f) for f in files_to_embed])}"
)
else:
logger.info(
f"[FILE_PROCESSING] {self.name} tool: No new files to embed (all files already in conversation history)"
)
content_parts = []
actually_processed_files = []
# Read content of new files only
if files_to_embed:
logger.debug(f"{self.name} tool embedding {len(files_to_embed)} new files: {', '.join(files_to_embed)}")
logger.debug(
f"[FILES] {self.name}: Starting file embedding with token budget {effective_max_tokens + reserve_tokens:,}"
)
try:
# Before calling read_files, expand directories to get individual file paths
from utils.file_utils import expand_paths
expanded_files = expand_paths(files_to_embed)
logger.debug(
f"[FILES] {self.name}: Expanded {len(files_to_embed)} paths to {len(expanded_files)} individual files"
)
file_content = read_files(
files_to_embed,
max_tokens=effective_max_tokens + reserve_tokens,
reserve_tokens=reserve_tokens,
include_line_numbers=self.wants_line_numbers_by_default(),
)
# Note: No need to validate against MCP_PROMPT_SIZE_LIMIT here
# read_files already handles token-aware truncation based on model's capabilities
content_parts.append(file_content)
# Track the expanded files as actually processed
actually_processed_files.extend(expanded_files)
# Estimate tokens for debug logging
from utils.token_utils import estimate_tokens
content_tokens = estimate_tokens(file_content)
logger.debug(
f"{self.name} tool successfully embedded {len(files_to_embed)} files ({content_tokens:,} tokens)"
)
logger.debug(f"[FILES] {self.name}: Successfully embedded files - {content_tokens:,} tokens used")
logger.debug(
f"[FILES] {self.name}: Actually processed {len(actually_processed_files)} individual files"
)
except Exception as e:
logger.error(f"{self.name} tool failed to embed files {files_to_embed}: {type(e).__name__}: {e}")
logger.debug(f"[FILES] {self.name}: File embedding failed - {type(e).__name__}: {e}")
raise
else:
logger.debug(f"[FILES] {self.name}: No files to embed after filtering")
# Generate note about files already in conversation history
if continuation_id and len(files_to_embed) < len(request_files):
embedded_files = self.get_conversation_embedded_files(continuation_id)
skipped_files = [f for f in request_files if f in embedded_files]
if skipped_files:
logger.debug(
f"{self.name} tool skipping {len(skipped_files)} files already in conversation history: {', '.join(skipped_files)}"
)
logger.debug(f"[FILES] {self.name}: Adding note about {len(skipped_files)} skipped files")
if content_parts:
content_parts.append("\n\n")
note_lines = [
"--- NOTE: Additional files referenced in conversation history ---",
"The following files are already available in our conversation context:",
"\n".join(f" - {f}" for f in skipped_files),
"--- END NOTE ---",
]
content_parts.append("\n".join(note_lines))
else:
logger.debug(f"[FILES] {self.name}: No skipped files to note")
result = "".join(content_parts) if content_parts else ""
logger.debug(
f"[FILES] {self.name}: _prepare_file_content_for_prompt returning {len(result)} chars, {len(actually_processed_files)} processed files"
)
return result, actually_processed_files
def get_websearch_instruction(self, tool_specific: Optional[str] = None) -> str:
"""
Generate standardized web search instruction.
Args:
tool_specific: Optional tool-specific search guidance
Returns:
str: Web search instruction to append to prompt
"""
base_instruction = """
WEB SEARCH CAPABILITY: You can request the calling agent to perform web searches to enhance your analysis with current information!
IMPORTANT: When you identify areas where web searches would significantly improve your response (such as checking current documentation, finding recent solutions, verifying best practices, or gathering community insights), you MUST explicitly instruct the agent to perform specific web searches and then respond back using the continuation_id from this response to continue the analysis.
Use clear, direct language based on the value of the search:
For valuable supplementary information: "Please perform a web search on '[specific topic/query]' and then continue this analysis using the continuation_id from this response if you find relevant information."
For important missing information: "Please search for '[specific topic/query]' and respond back with the findings using the continuation_id from this response - this information is needed to provide a complete analysis."
For critical/essential information: "SEARCH REQUIRED: Please immediately perform a web search on '[specific topic/query]' and respond back with the results using the continuation_id from this response. Cannot provide accurate analysis without this current information."
This ensures you get the most current and comprehensive information while maintaining conversation context through the continuation_id."""
if tool_specific:
return f"""{base_instruction}
{tool_specific}
When recommending searches, be specific about what information you need and why it would improve your analysis."""
# Default instruction for all tools
return f"""{base_instruction}
Consider requesting searches for:
- Current documentation and API references
- Recent best practices and patterns
- Known issues and community solutions
- Framework updates and compatibility
- Security advisories and patches
- Performance benchmarks and optimizations
When recommending searches, be specific about what information you need and why it would improve your analysis. Always remember to instruct agent to use the continuation_id from this response when providing search results."""
def get_language_instruction(self) -> str:
"""
Generate language instruction based on LOCALE configuration.
Returns:
str: Language instruction to prepend to prompt, or empty string if
no locale set
"""
# Read LOCALE directly from environment to support dynamic changes
# Tests can monkeypatch LOCALE via the environment helper (or .env when override is enforced)
locale = (get_env("LOCALE", "") or "").strip()
if not locale:
return ""
# Simple language instruction
return f"Always respond in {locale}.\n\n"
# === ABSTRACT METHODS FOR SIMPLE TOOLS ===
@abstractmethod
async def prepare_prompt(self, request) -> str:
"""
Prepare the complete prompt for the AI model.
This method should construct the full prompt by combining:
- System prompt from get_system_prompt()
- File content from _prepare_file_content_for_prompt()
- Conversation history from reconstruct_thread_context()
- User's request and any tool-specific context
Args:
request: The validated request object
Returns:
str: Complete prompt ready for the AI model
"""
pass
def format_response(self, response: str, request, model_info: dict = None) -> str:
"""
Format the AI model's response for the user.
This method allows tools to post-process the model's response,
adding structure, validation, or additional context.
The default implementation returns the response unchanged.
Tools can override this method to add custom formatting.
Args:
response: Raw response from the AI model
request: The original request object
model_info: Optional model information and metadata
Returns:
str: Formatted response ready for the user
"""
return response
# === IMPLEMENTATION METHODS ===
# These will be provided in a full implementation but are inherited from current base.py
# for now to maintain compatibility.
async def execute(self, arguments: dict[str, Any]) -> list[TextContent]:
"""Execute the tool - will be inherited from existing base.py for now."""
# This will be implemented by importing from the current base.py
# for backward compatibility during the migration
raise NotImplementedError("Subclasses must implement execute method")
def _should_require_model_selection(self, model_name: str) -> bool:
"""
Check if we should require the CLI to select a model at runtime.
This is called during request execution to determine if we need
to return an error asking the CLI to provide a model parameter.
Args:
model_name: The model name from the request or DEFAULT_MODEL
Returns:
bool: True if we should require model selection
"""
# Case 1: Model is explicitly "auto"
if model_name.lower() == "auto":
return True
# Case 2: Requested model is not available
from providers.registry import ModelProviderRegistry
provider = ModelProviderRegistry.get_provider_for_model(model_name)
if not provider:
logger.warning(f"Model '{model_name}' is not available with current API keys. Requiring model selection.")
return True
return False
def _get_available_models(self) -> list[str]:
"""
Get list of models available from enabled providers.
Only returns models from providers that have valid API keys configured.
This fixes the namespace collision bug where models from disabled providers
were shown to the CLI, causing routing conflicts.
Returns:
List of model names from enabled providers only
"""
from providers.registry import ModelProviderRegistry
# Get models from enabled providers only (those with valid API keys)
all_models = ModelProviderRegistry.get_available_model_names()
# Add OpenRouter models and their aliases when OpenRouter is configured
openrouter_key = get_env("OPENROUTER_API_KEY")
if openrouter_key and openrouter_key != "your_openrouter_api_key_here":
try:
registry = self._get_openrouter_registry()
for alias in registry.list_aliases():
if alias not in all_models:
all_models.append(alias)
except Exception as exc: # pragma: no cover - logged for observability
import logging
logging.debug(f"Failed to add OpenRouter models to enum: {exc}")
# Add custom models (and their aliases) when a custom endpoint is available
custom_url = get_env("CUSTOM_API_URL")
if custom_url:
try:
registry = self._get_custom_registry()
for alias in registry.list_aliases():
if alias not in all_models:
all_models.append(alias)
except Exception as exc: # pragma: no cover - logged for observability
import logging
logging.debug(f"Failed to add custom models to enum: {exc}")
# Remove duplicates while preserving insertion order
seen: set[str] = set()
unique_models: list[str] = []
for model in all_models:
if model not in seen:
seen.add(model)
unique_models.append(model)
return unique_models
def _resolve_model_context(self, arguments: dict, request) -> tuple[str, Any]:
"""
Resolve model context and name using centralized logic.
This method extracts the model resolution logic from execute() so it can be
reused by tools that override execute() (like debug tool) without duplicating code.
Args:
arguments: Dictionary of arguments from the MCP client
request: The validated request object
Returns:
tuple[str, ModelContext]: (resolved_model_name, model_context)
Raises:
ValueError: If model resolution fails or model selection is required
"""
# MODEL RESOLUTION NOW HAPPENS AT MCP BOUNDARY
# Extract pre-resolved model context from server.py
model_context = arguments.get("_model_context")
resolved_model_name = arguments.get("_resolved_model_name")
if model_context and resolved_model_name:
# Model was already resolved at MCP boundary
model_name = resolved_model_name
logger.debug(f"Using pre-resolved model '{model_name}' from MCP boundary")
else:
# Fallback for direct execute calls
model_name = getattr(request, "model", None)
if not model_name:
from config import DEFAULT_MODEL
model_name = DEFAULT_MODEL
logger.debug(f"Using fallback model resolution for '{model_name}' (test mode)")
# For tests: Check if we should require model selection (auto mode)
if self._should_require_model_selection(model_name):
# Build error message based on why selection is required
if model_name.lower() == "auto":
error_message = self._build_auto_mode_required_message()
else:
error_message = self._build_model_unavailable_message(model_name)
raise ValueError(error_message)
# Create model context for tests
from utils.model_context import ModelContext
model_context = ModelContext(model_name)
return model_name, model_context
def validate_and_correct_temperature(self, temperature: float, model_context: Any) -> tuple[float, list[str]]:
"""
Validate and correct temperature for the specified model.
This method ensures that the temperature value is within the valid range
for the specific model being used. Different models have different temperature
constraints (e.g., o1 models require temperature=1.0, GPT models support 0-2).
Args:
temperature: Temperature value to validate
model_context: Model context object containing model name, provider, and capabilities
Returns:
Tuple of (corrected_temperature, warning_messages)
"""
try:
# Use model context capabilities directly - clean OOP approach
capabilities = model_context.capabilities
constraint = capabilities.temperature_constraint
warnings = []
if not constraint.validate(temperature):
corrected = constraint.get_corrected_value(temperature)
warning = (
f"Temperature {temperature} invalid for {model_context.model_name}. "
f"{constraint.get_description()}. Using {corrected} instead."
)
warnings.append(warning)
return corrected, warnings
return temperature, warnings
except Exception as e:
# If validation fails for any reason, use the original temperature
# and log a warning (but don't fail the request)
logger.warning(f"Temperature validation failed for {model_context.model_name}: {e}")
return temperature, [f"Temperature validation failed: {e}"]
def _validate_image_limits(
self, images: Optional[list[str]], model_context: Optional[Any] = None, continuation_id: Optional[str] = None
) -> Optional[dict]:
"""
Validate image size and count against model capabilities.
This performs strict validation to ensure we don't exceed model-specific
image limits. Uses capability-based validation with actual model
configuration rather than hard-coded limits.
Args:
images: List of image paths/data URLs to validate
model_context: Model context object containing model name, provider, and capabilities
continuation_id: Optional continuation ID for conversation context
Returns:
Optional[dict]: Error response if validation fails, None if valid
"""
if not images:
return None
# Import here to avoid circular imports
import base64
from pathlib import Path
if not model_context:
# Get from tool's stored context as fallback
model_context = getattr(self, "_model_context", None)
if not model_context:
logger.warning("No model context available for image validation")
return None
try:
# Use model context capabilities directly - clean OOP approach
capabilities = model_context.capabilities
model_name = model_context.model_name
except Exception as e:
logger.warning(f"Failed to get capabilities from model_context for image validation: {e}")
# Generic error response when capabilities cannot be accessed
model_name = getattr(model_context, "model_name", "unknown")
return {
"status": "error",
"content": self._build_model_unavailable_message(model_name),
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_name,
"supports_images": None, # Unknown since model capabilities unavailable
"image_count": len(images) if images else 0,
},
}
# Check if model supports images
if not capabilities.supports_images:
return {
"status": "error",
"content": (
f"Image support not available: Model '{model_name}' does not support image processing. "
f"Please use a vision-capable model such as 'gemini-2.5-flash', 'o3', "
f"or 'claude-opus-4.1' for image analysis tasks."
),
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_name,
"supports_images": False,
"image_count": len(images),
},
}
# Get model image limits from capabilities
max_images = 5 # Default max number of images
max_size_mb = capabilities.max_image_size_mb
# Check image count
if len(images) > max_images:
return {
"status": "error",
"content": (
f"Too many images: Model '{model_name}' supports a maximum of {max_images} images, "
f"but {len(images)} were provided. Please reduce the number of images."
),
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_name,
"image_count": len(images),
"max_images": max_images,
},
}
# Calculate total size of all images
total_size_mb = 0.0
for image_path in images:
try:
if image_path.startswith("data:image/"):
# Handle data URL: data:image/png;base64,iVBORw0...
_, data = image_path.split(",", 1)
# Base64 encoding increases size by ~33%, so decode to get actual size
actual_size = len(base64.b64decode(data))
total_size_mb += actual_size / (1024 * 1024)
else:
# Handle file path
path = Path(image_path)
if path.exists():
file_size = path.stat().st_size
total_size_mb += file_size / (1024 * 1024)
else:
logger.warning(f"Image file not found: {image_path}")
# Assume a reasonable size for missing files to avoid breaking validation
total_size_mb += 1.0 # 1MB assumption
except Exception as e:
logger.warning(f"Failed to get size for image {image_path}: {e}")
# Assume a reasonable size for problematic files
total_size_mb += 1.0 # 1MB assumption
# Apply 40MB cap for custom models if needed
effective_limit_mb = max_size_mb
try:
from providers.shared import ProviderType
# ModelCapabilities dataclass has provider field defined
if capabilities.provider == ProviderType.CUSTOM:
effective_limit_mb = min(max_size_mb, 40.0)
except Exception:
pass
# Validate against size limit
if total_size_mb > effective_limit_mb:
return {
"status": "error",
"content": (
f"Image size limit exceeded: Model '{model_name}' supports maximum {effective_limit_mb:.1f}MB "
f"for all images combined, but {total_size_mb:.1f}MB was provided. "
f"Please reduce image sizes or count and try again."
),
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_name,
"total_size_mb": round(total_size_mb, 2),
"limit_mb": round(effective_limit_mb, 2),
"image_count": len(images),
"supports_images": True,
},
}
# All validations passed
logger.debug(f"Image validation passed: {len(images)} images, {total_size_mb:.1f}MB total")
return None
def _parse_response(self, raw_text: str, request, model_info: Optional[dict] = None):
"""Parse response - will be inherited for now."""
# Implementation inherited from current base.py
raise NotImplementedError("Subclasses must implement _parse_response method")
```