#
tokens: 49034/50000 49/353 files (page 2/19)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 19. Use http://codebase.md/beehiveinnovations/gemini-mcp-server?page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   └── fix-github-issue.md
│   └── settings.json
├── .coveragerc
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── documentation.yml
│   │   ├── feature_request.yml
│   │   └── tool_addition.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-pr.yml
│       ├── docker-release.yml
│       ├── semantic-pr.yml
│       ├── semantic-release.yml
│       └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CHANGELOG.md
├── claude_config_example.json
├── CLAUDE.md
├── clink
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   ├── constants.py
│   ├── models.py
│   ├── parsers
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude.py
│   │   ├── codex.py
│   │   └── gemini.py
│   └── registry.py
├── code_quality_checks.ps1
├── code_quality_checks.sh
├── communication_simulator_test.py
├── conf
│   ├── __init__.py
│   ├── azure_models.json
│   ├── cli_clients
│   │   ├── claude.json
│   │   ├── codex.json
│   │   └── gemini.json
│   ├── custom_models.json
│   ├── dial_models.json
│   ├── gemini_models.json
│   ├── openai_models.json
│   ├── openrouter_models.json
│   └── xai_models.json
├── config.py
├── docker
│   ├── README.md
│   └── scripts
│       ├── build.ps1
│       ├── build.sh
│       ├── deploy.ps1
│       ├── deploy.sh
│       └── healthcheck.py
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── adding_providers.md
│   ├── adding_tools.md
│   ├── advanced-usage.md
│   ├── ai_banter.md
│   ├── ai-collaboration.md
│   ├── azure_openai.md
│   ├── configuration.md
│   ├── context-revival.md
│   ├── contributions.md
│   ├── custom_models.md
│   ├── docker-deployment.md
│   ├── gemini-setup.md
│   ├── getting-started.md
│   ├── index.md
│   ├── locale-configuration.md
│   ├── logging.md
│   ├── model_ranking.md
│   ├── testing.md
│   ├── tools
│   │   ├── analyze.md
│   │   ├── apilookup.md
│   │   ├── challenge.md
│   │   ├── chat.md
│   │   ├── clink.md
│   │   ├── codereview.md
│   │   ├── consensus.md
│   │   ├── debug.md
│   │   ├── docgen.md
│   │   ├── listmodels.md
│   │   ├── planner.md
│   │   ├── precommit.md
│   │   ├── refactor.md
│   │   ├── secaudit.md
│   │   ├── testgen.md
│   │   ├── thinkdeep.md
│   │   ├── tracer.md
│   │   └── version.md
│   ├── troubleshooting.md
│   ├── vcr-testing.md
│   └── wsl-setup.md
├── examples
│   ├── claude_config_macos.json
│   └── claude_config_wsl.json
├── LICENSE
├── providers
│   ├── __init__.py
│   ├── azure_openai.py
│   ├── base.py
│   ├── custom.py
│   ├── dial.py
│   ├── gemini.py
│   ├── openai_compatible.py
│   ├── openai.py
│   ├── openrouter.py
│   ├── registries
│   │   ├── __init__.py
│   │   ├── azure.py
│   │   ├── base.py
│   │   ├── custom.py
│   │   ├── dial.py
│   │   ├── gemini.py
│   │   ├── openai.py
│   │   ├── openrouter.py
│   │   └── xai.py
│   ├── registry_provider_mixin.py
│   ├── registry.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── model_capabilities.py
│   │   ├── model_response.py
│   │   ├── provider_type.py
│   │   └── temperature.py
│   └── xai.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_integration_tests.ps1
├── run_integration_tests.sh
├── run-server.ps1
├── run-server.sh
├── scripts
│   └── sync_version.py
├── server.py
├── simulator_tests
│   ├── __init__.py
│   ├── base_test.py
│   ├── conversation_base_test.py
│   ├── log_utils.py
│   ├── test_analyze_validation.py
│   ├── test_basic_conversation.py
│   ├── test_chat_simple_validation.py
│   ├── test_codereview_validation.py
│   ├── test_consensus_conversation.py
│   ├── test_consensus_three_models.py
│   ├── test_consensus_workflow_accurate.py
│   ├── test_content_validation.py
│   ├── test_conversation_chain_validation.py
│   ├── test_cross_tool_comprehensive.py
│   ├── test_cross_tool_continuation.py
│   ├── test_debug_certain_confidence.py
│   ├── test_debug_validation.py
│   ├── test_line_number_validation.py
│   ├── test_logs_validation.py
│   ├── test_model_thinking_config.py
│   ├── test_o3_model_selection.py
│   ├── test_o3_pro_expensive.py
│   ├── test_ollama_custom_url.py
│   ├── test_openrouter_fallback.py
│   ├── test_openrouter_models.py
│   ├── test_per_tool_deduplication.py
│   ├── test_planner_continuation_history.py
│   ├── test_planner_validation_old.py
│   ├── test_planner_validation.py
│   ├── test_precommitworkflow_validation.py
│   ├── test_prompt_size_limit_bug.py
│   ├── test_refactor_validation.py
│   ├── test_secaudit_validation.py
│   ├── test_testgen_validation.py
│   ├── test_thinkdeep_validation.py
│   ├── test_token_allocation_validation.py
│   ├── test_vision_capability.py
│   └── test_xai_models.py
├── systemprompts
│   ├── __init__.py
│   ├── analyze_prompt.py
│   ├── chat_prompt.py
│   ├── clink
│   │   ├── codex_codereviewer.txt
│   │   ├── default_codereviewer.txt
│   │   ├── default_planner.txt
│   │   └── default.txt
│   ├── codereview_prompt.py
│   ├── consensus_prompt.py
│   ├── debug_prompt.py
│   ├── docgen_prompt.py
│   ├── generate_code_prompt.py
│   ├── planner_prompt.py
│   ├── precommit_prompt.py
│   ├── refactor_prompt.py
│   ├── secaudit_prompt.py
│   ├── testgen_prompt.py
│   ├── thinkdeep_prompt.py
│   └── tracer_prompt.py
├── tests
│   ├── __init__.py
│   ├── CASSETTE_MAINTENANCE.md
│   ├── conftest.py
│   ├── gemini_cassettes
│   │   ├── chat_codegen
│   │   │   └── gemini25_pro_calculator
│   │   │       └── mldev.json
│   │   ├── chat_cross
│   │   │   └── step1_gemini25_flash_number
│   │   │       └── mldev.json
│   │   └── consensus
│   │       └── step2_gemini25_flash_against
│   │           └── mldev.json
│   ├── http_transport_recorder.py
│   ├── mock_helpers.py
│   ├── openai_cassettes
│   │   ├── chat_cross_step2_gpt5_reminder.json
│   │   ├── chat_gpt5_continuation.json
│   │   ├── chat_gpt5_moon_distance.json
│   │   ├── consensus_step1_gpt5_for.json
│   │   └── o3_pro_basic_math.json
│   ├── pii_sanitizer.py
│   ├── sanitize_cassettes.py
│   ├── test_alias_target_restrictions.py
│   ├── test_auto_mode_comprehensive.py
│   ├── test_auto_mode_custom_provider_only.py
│   ├── test_auto_mode_model_listing.py
│   ├── test_auto_mode_provider_selection.py
│   ├── test_auto_mode.py
│   ├── test_auto_model_planner_fix.py
│   ├── test_azure_openai_provider.py
│   ├── test_buggy_behavior_prevention.py
│   ├── test_cassette_semantic_matching.py
│   ├── test_challenge.py
│   ├── test_chat_codegen_integration.py
│   ├── test_chat_cross_model_continuation.py
│   ├── test_chat_openai_integration.py
│   ├── test_chat_simple.py
│   ├── test_clink_claude_agent.py
│   ├── test_clink_claude_parser.py
│   ├── test_clink_codex_agent.py
│   ├── test_clink_gemini_agent.py
│   ├── test_clink_gemini_parser.py
│   ├── test_clink_integration.py
│   ├── test_clink_parsers.py
│   ├── test_clink_tool.py
│   ├── test_collaboration.py
│   ├── test_config.py
│   ├── test_consensus_integration.py
│   ├── test_consensus_schema.py
│   ├── test_consensus.py
│   ├── test_conversation_continuation_integration.py
│   ├── test_conversation_field_mapping.py
│   ├── test_conversation_file_features.py
│   ├── test_conversation_memory.py
│   ├── test_conversation_missing_files.py
│   ├── test_custom_openai_temperature_fix.py
│   ├── test_custom_provider.py
│   ├── test_debug.py
│   ├── test_deploy_scripts.py
│   ├── test_dial_provider.py
│   ├── test_directory_expansion_tracking.py
│   ├── test_disabled_tools.py
│   ├── test_docker_claude_desktop_integration.py
│   ├── test_docker_config_complete.py
│   ├── test_docker_healthcheck.py
│   ├── test_docker_implementation.py
│   ├── test_docker_mcp_validation.py
│   ├── test_docker_security.py
│   ├── test_docker_volume_persistence.py
│   ├── test_file_protection.py
│   ├── test_gemini_token_usage.py
│   ├── test_image_support_integration.py
│   ├── test_image_validation.py
│   ├── test_integration_utf8.py
│   ├── test_intelligent_fallback.py
│   ├── test_issue_245_simple.py
│   ├── test_large_prompt_handling.py
│   ├── test_line_numbers_integration.py
│   ├── test_listmodels_restrictions.py
│   ├── test_listmodels.py
│   ├── test_mcp_error_handling.py
│   ├── test_model_enumeration.py
│   ├── test_model_metadata_continuation.py
│   ├── test_model_resolution_bug.py
│   ├── test_model_restrictions.py
│   ├── test_o3_pro_output_text_fix.py
│   ├── test_o3_temperature_fix_simple.py
│   ├── test_openai_compatible_token_usage.py
│   ├── test_openai_provider.py
│   ├── test_openrouter_provider.py
│   ├── test_openrouter_registry.py
│   ├── test_parse_model_option.py
│   ├── test_per_tool_model_defaults.py
│   ├── test_pii_sanitizer.py
│   ├── test_pip_detection_fix.py
│   ├── test_planner.py
│   ├── test_precommit_workflow.py
│   ├── test_prompt_regression.py
│   ├── test_prompt_size_limit_bug_fix.py
│   ├── test_provider_retry_logic.py
│   ├── test_provider_routing_bugs.py
│   ├── test_provider_utf8.py
│   ├── test_providers.py
│   ├── test_rate_limit_patterns.py
│   ├── test_refactor.py
│   ├── test_secaudit.py
│   ├── test_server.py
│   ├── test_supported_models_aliases.py
│   ├── test_thinking_modes.py
│   ├── test_tools.py
│   ├── test_tracer.py
│   ├── test_utf8_localization.py
│   ├── test_utils.py
│   ├── test_uvx_resource_packaging.py
│   ├── test_uvx_support.py
│   ├── test_workflow_file_embedding.py
│   ├── test_workflow_metadata.py
│   ├── test_workflow_prompt_size_validation_simple.py
│   ├── test_workflow_utf8.py
│   ├── test_xai_provider.py
│   ├── transport_helpers.py
│   └── triangle.png
├── tools
│   ├── __init__.py
│   ├── analyze.py
│   ├── apilookup.py
│   ├── challenge.py
│   ├── chat.py
│   ├── clink.py
│   ├── codereview.py
│   ├── consensus.py
│   ├── debug.py
│   ├── docgen.py
│   ├── listmodels.py
│   ├── models.py
│   ├── planner.py
│   ├── precommit.py
│   ├── refactor.py
│   ├── secaudit.py
│   ├── shared
│   │   ├── __init__.py
│   │   ├── base_models.py
│   │   ├── base_tool.py
│   │   ├── exceptions.py
│   │   └── schema_builders.py
│   ├── simple
│   │   ├── __init__.py
│   │   └── base.py
│   ├── testgen.py
│   ├── thinkdeep.py
│   ├── tracer.py
│   ├── version.py
│   └── workflow
│       ├── __init__.py
│       ├── base.py
│       ├── schema_builders.py
│       └── workflow_mixin.py
├── utils
│   ├── __init__.py
│   ├── client_info.py
│   ├── conversation_memory.py
│   ├── env.py
│   ├── file_types.py
│   ├── file_utils.py
│   ├── image_utils.py
│   ├── model_context.py
│   ├── model_restrictions.py
│   ├── security_config.py
│   ├── storage_backend.py
│   └── token_utils.py
└── zen-mcp-server
```

# Files

--------------------------------------------------------------------------------
/providers/xai.py:
--------------------------------------------------------------------------------

```python
"""X.AI (GROK) model provider implementation."""

import logging
from typing import TYPE_CHECKING, ClassVar, Optional

if TYPE_CHECKING:
    from tools.models import ToolModelCategory

from .openai_compatible import OpenAICompatibleProvider
from .registries.xai import XAIModelRegistry
from .registry_provider_mixin import RegistryBackedProviderMixin
from .shared import ModelCapabilities, ProviderType

logger = logging.getLogger(__name__)


class XAIModelProvider(RegistryBackedProviderMixin, OpenAICompatibleProvider):
    """Integration for X.AI's GROK models exposed over an OpenAI-style API.

    Publishes capability metadata for the officially supported deployments and
    maps tool-category preferences to the appropriate GROK model.
    """

    FRIENDLY_NAME = "X.AI"

    REGISTRY_CLASS = XAIModelRegistry
    MODEL_CAPABILITIES: ClassVar[dict[str, ModelCapabilities]] = {}

    def __init__(self, api_key: str, **kwargs):
        """Initialize X.AI provider with API key."""
        # Set X.AI base URL
        kwargs.setdefault("base_url", "https://api.x.ai/v1")
        self._ensure_registry()
        super().__init__(api_key, **kwargs)
        self._invalidate_capability_cache()

    def get_provider_type(self) -> ProviderType:
        """Get the provider type."""
        return ProviderType.XAI

    def get_preferred_model(self, category: "ToolModelCategory", allowed_models: list[str]) -> Optional[str]:
        """Get XAI's preferred model for a given category from allowed models.

        Args:
            category: The tool category requiring a model
            allowed_models: Pre-filtered list of models allowed by restrictions

        Returns:
            Preferred model name or None
        """
        from tools.models import ToolModelCategory

        if not allowed_models:
            return None

        if category == ToolModelCategory.EXTENDED_REASONING:
            # Prefer GROK-4 for advanced reasoning with thinking mode
            if "grok-4" in allowed_models:
                return "grok-4"
            elif "grok-3" in allowed_models:
                return "grok-3"
            # Fall back to any available model
            return allowed_models[0]

        elif category == ToolModelCategory.FAST_RESPONSE:
            # Prefer GROK-3-Fast for speed, then GROK-4
            if "grok-3-fast" in allowed_models:
                return "grok-3-fast"
            elif "grok-4" in allowed_models:
                return "grok-4"
            # Fall back to any available model
            return allowed_models[0]

        else:  # BALANCED or default
            # Prefer GROK-4 for balanced use (best overall capabilities)
            if "grok-4" in allowed_models:
                return "grok-4"
            elif "grok-3" in allowed_models:
                return "grok-3"
            elif "grok-3-fast" in allowed_models:
                return "grok-3-fast"
            # Fall back to any available model
            return allowed_models[0]


# Load registry data at import time
XAIModelProvider._ensure_registry()

```

--------------------------------------------------------------------------------
/utils/image_utils.py:
--------------------------------------------------------------------------------

```python
"""Utility helpers for validating image inputs."""

import base64
import binascii
import os
from collections.abc import Iterable

from utils.file_types import IMAGES, get_image_mime_type

DEFAULT_MAX_IMAGE_SIZE_MB = 20.0

__all__ = ["DEFAULT_MAX_IMAGE_SIZE_MB", "validate_image"]


def _valid_mime_types() -> Iterable[str]:
    """Return the MIME types permitted by the IMAGES whitelist."""
    return (get_image_mime_type(ext) for ext in IMAGES)


def validate_image(image_path: str, max_size_mb: float = None) -> tuple[bytes, str]:
    """Validate a user-supplied image path or data URL.

    Args:
        image_path: Either a filesystem path or a data URL.
        max_size_mb: Optional size limit (defaults to ``DEFAULT_MAX_IMAGE_SIZE_MB``).

    Returns:
        A tuple ``(image_bytes, mime_type)`` ready for upstream providers.

    Raises:
        ValueError: When the image is missing, malformed, or exceeds limits.
    """
    if max_size_mb is None:
        max_size_mb = DEFAULT_MAX_IMAGE_SIZE_MB

    if image_path.startswith("data:"):
        return _validate_data_url(image_path, max_size_mb)

    return _validate_file_path(image_path, max_size_mb)


def _validate_data_url(image_data_url: str, max_size_mb: float) -> tuple[bytes, str]:
    """Validate a data URL and return image bytes plus MIME type."""
    try:
        header, data = image_data_url.split(",", 1)
        mime_type = header.split(";")[0].split(":")[1]
    except (ValueError, IndexError) as exc:
        raise ValueError(f"Invalid data URL format: {exc}")

    valid_mime_types = list(_valid_mime_types())
    if mime_type not in valid_mime_types:
        raise ValueError(
            "Unsupported image type: {mime}. Supported types: {supported}".format(
                mime=mime_type, supported=", ".join(valid_mime_types)
            )
        )

    try:
        image_bytes = base64.b64decode(data)
    except binascii.Error as exc:
        raise ValueError(f"Invalid base64 data: {exc}")

    _validate_size(image_bytes, max_size_mb)
    return image_bytes, mime_type


def _validate_file_path(file_path: str, max_size_mb: float) -> tuple[bytes, str]:
    """Validate an image loaded from the filesystem."""
    try:
        with open(file_path, "rb") as handle:
            image_bytes = handle.read()
    except FileNotFoundError:
        raise ValueError(f"Image file not found: {file_path}")
    except OSError as exc:
        raise ValueError(f"Failed to read image file: {exc}")

    ext = os.path.splitext(file_path)[1].lower()
    if ext not in IMAGES:
        raise ValueError(
            "Unsupported image format: {ext}. Supported formats: {supported}".format(
                ext=ext, supported=", ".join(sorted(IMAGES))
            )
        )

    mime_type = get_image_mime_type(ext)
    _validate_size(image_bytes, max_size_mb)
    return image_bytes, mime_type


def _validate_size(image_bytes: bytes, max_size_mb: float) -> None:
    """Ensure the image does not exceed the configured size limit."""
    size_mb = len(image_bytes) / (1024 * 1024)
    if size_mb > max_size_mb:
        raise ValueError(f"Image too large: {size_mb:.1f}MB (max: {max_size_mb}MB)")

```

--------------------------------------------------------------------------------
/tests/sanitize_cassettes.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Script to sanitize existing cassettes by applying PII sanitization.

This script will:
1. Load existing cassettes
2. Apply PII sanitization to all interactions
3. Create backups of originals
4. Save sanitized versions
"""

import json
import shutil
import sys
from datetime import datetime
from pathlib import Path

# Add tests directory to path to import our modules
sys.path.insert(0, str(Path(__file__).parent))

from pii_sanitizer import PIISanitizer


def sanitize_cassette(cassette_path: Path, backup: bool = True) -> bool:
    """Sanitize a single cassette file."""
    print(f"\n🔍 Processing: {cassette_path}")

    if not cassette_path.exists():
        print(f"❌ File not found: {cassette_path}")
        return False

    try:
        # Load cassette
        with open(cassette_path) as f:
            cassette_data = json.load(f)

        # Create backup if requested
        if backup:
            backup_path = cassette_path.with_suffix(f'.backup-{datetime.now().strftime("%Y%m%d-%H%M%S")}.json')
            shutil.copy2(cassette_path, backup_path)
            print(f"📦 Backup created: {backup_path}")

        # Initialize sanitizer
        sanitizer = PIISanitizer()

        # Sanitize interactions
        if "interactions" in cassette_data:
            sanitized_interactions = []

            for interaction in cassette_data["interactions"]:
                sanitized_interaction = {}

                # Sanitize request
                if "request" in interaction:
                    sanitized_interaction["request"] = sanitizer.sanitize_request(interaction["request"])

                # Sanitize response
                if "response" in interaction:
                    sanitized_interaction["response"] = sanitizer.sanitize_response(interaction["response"])

                sanitized_interactions.append(sanitized_interaction)

            cassette_data["interactions"] = sanitized_interactions

        # Save sanitized cassette
        with open(cassette_path, "w") as f:
            json.dump(cassette_data, f, indent=2, sort_keys=True)

        print(f"✅ Sanitized: {cassette_path}")
        return True

    except Exception as e:
        print(f"❌ Error processing {cassette_path}: {e}")
        import traceback

        traceback.print_exc()
        return False


def main():
    """Sanitize all cassettes in the openai_cassettes directory."""
    cassettes_dir = Path(__file__).parent / "openai_cassettes"

    if not cassettes_dir.exists():
        print(f"❌ Directory not found: {cassettes_dir}")
        sys.exit(1)

    # Find all JSON cassettes
    cassette_files = list(cassettes_dir.glob("*.json"))

    if not cassette_files:
        print(f"❌ No cassette files found in {cassettes_dir}")
        sys.exit(1)

    print(f"🎬 Found {len(cassette_files)} cassette(s) to sanitize")

    # Process each cassette
    success_count = 0
    for cassette_path in cassette_files:
        if sanitize_cassette(cassette_path):
            success_count += 1

    print(f"\n✨ Sanitization complete: {success_count}/{len(cassette_files)} cassettes processed successfully")

    if success_count < len(cassette_files):
        sys.exit(1)


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/providers/registry_provider_mixin.py:
--------------------------------------------------------------------------------

```python
"""Mixin for providers backed by capability registries.

This mixin centralises the boilerplate for providers that expose their model
capabilities via JSON configuration files. Subclasses only need to set
``REGISTRY_CLASS`` to an appropriate :class:`CapabilityModelRegistry` and the
mix-in will take care of:

* Populating ``MODEL_CAPABILITIES`` exactly once per process (with optional
  reload support for tests).
* Lazily exposing the registry contents through the standard provider hooks
  (:meth:`get_all_model_capabilities` and :meth:`get_model_registry`).
* Providing defensive logging when a registry cannot be constructed so the
  provider can degrade gracefully instead of raising during import.

Using this helper keeps individual provider implementations focused on their
SDK-specific behaviour while ensuring capability loading is consistent across
OpenAI, Gemini, X.AI, and other native backends.
"""

from __future__ import annotations

import logging
from typing import ClassVar

from .registries.base import CapabilityModelRegistry
from .shared import ModelCapabilities


class RegistryBackedProviderMixin:
    """Shared helper for providers that load capabilities from JSON registries."""

    REGISTRY_CLASS: ClassVar[type[CapabilityModelRegistry] | None] = None
    _registry: ClassVar[CapabilityModelRegistry | None] = None
    MODEL_CAPABILITIES: ClassVar[dict[str, ModelCapabilities]] = {}

    @classmethod
    def _registry_logger(cls) -> logging.Logger:
        """Return the logger used for registry lifecycle messages."""
        return logging.getLogger(cls.__module__)

    @classmethod
    def _ensure_registry(cls, *, force_reload: bool = False) -> None:
        """Populate ``MODEL_CAPABILITIES`` from the configured registry.

        Args:
            force_reload: When ``True`` the registry is re-created even if it
                was previously loaded. This is primarily used by tests.
        """

        if cls.REGISTRY_CLASS is None:  # pragma: no cover - defensive programming
            raise RuntimeError(f"{cls.__name__} must define REGISTRY_CLASS.")

        if cls._registry is not None and not force_reload:
            return

        try:
            registry = cls.REGISTRY_CLASS()
        except Exception as exc:  # pragma: no cover - registry failures shouldn't break the provider
            cls._registry_logger().warning("Unable to load %s registry: %s", cls.__name__, exc)
            cls._registry = None
            cls.MODEL_CAPABILITIES = {}
            return

        cls._registry = registry
        cls.MODEL_CAPABILITIES = dict(registry.model_map)

    @classmethod
    def reload_registry(cls) -> None:
        """Force a registry reload (used in tests)."""

        cls._ensure_registry(force_reload=True)

    def get_all_model_capabilities(self) -> dict[str, ModelCapabilities]:
        """Return the registry-backed ``MODEL_CAPABILITIES`` map."""

        self._ensure_registry()
        return super().get_all_model_capabilities()

    def get_model_registry(self) -> dict[str, ModelCapabilities] | None:
        """Return a copy of the underlying registry map when available."""

        if self._registry is None:
            return None
        return dict(self._registry.model_map)

```

--------------------------------------------------------------------------------
/docker/scripts/healthcheck.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Health check script for Zen MCP Server Docker container
"""

import os
import subprocess
import sys
from pathlib import Path

try:
    from utils.env import get_env
except ImportError:  # pragma: no cover - resolves module path inside container
    project_root = Path(__file__).resolve().parents[2]
    if str(project_root) not in sys.path:
        sys.path.insert(0, str(project_root))
    from utils.env import get_env  # type: ignore[import-error]


def check_process():
    """Check if the main server process is running"""
    result = subprocess.run(["pgrep", "-f", "server.py"], capture_output=True, text=True, timeout=10)
    if result.returncode == 0:
        return True
    print(f"Process check failed: {result.stderr}", file=sys.stderr)
    return False


def check_python_imports():
    """Check if critical Python modules can be imported"""
    critical_modules = ["mcp", "google.genai", "openai", "pydantic", "dotenv"]

    for module in critical_modules:
        try:
            __import__(module)
        except ImportError as e:
            print(f"Critical module {module} cannot be imported: {e}", file=sys.stderr)
            return False
        except Exception as e:
            print(f"Error importing {module}: {e}", file=sys.stderr)
            return False
    return True


def check_log_directory():
    """Check if logs directory is writable"""
    log_dir = "/app/logs"
    try:
        if not os.path.exists(log_dir):
            print(f"Log directory {log_dir} does not exist", file=sys.stderr)
            return False

        test_file = os.path.join(log_dir, ".health_check")
        with open(test_file, "w") as f:
            f.write("health_check")
        os.remove(test_file)
        return True
    except Exception as e:
        print(f"Log directory check failed: {e}", file=sys.stderr)
        return False


def check_environment():
    """Check if essential environment variables are present"""
    # At least one API key should be present
    api_keys = [
        "GEMINI_API_KEY",
        "GOOGLE_API_KEY",
        "OPENAI_API_KEY",
        "XAI_API_KEY",
        "DIAL_API_KEY",
        "OPENROUTER_API_KEY",
    ]

    has_api_key = any(get_env(key) for key in api_keys)
    if not has_api_key:
        print("No API keys found in environment", file=sys.stderr)
        return False

    # Validate API key formats (basic checks)
    for key in api_keys:
        value = get_env(key)
        if value:
            if len(value.strip()) < 10:
                print(f"API key {key} appears too short or invalid", file=sys.stderr)
                return False

    return True


def main():
    """Main health check function"""
    checks = [
        ("Process", check_process),
        ("Python imports", check_python_imports),
        ("Log directory", check_log_directory),
        ("Environment", check_environment),
    ]

    failed_checks = []

    for check_name, check_func in checks:
        if not check_func():
            failed_checks.append(check_name)

    if failed_checks:
        print(f"Health check failed: {', '.join(failed_checks)}", file=sys.stderr)
        sys.exit(1)

    print("Health check passed")
    sys.exit(0)


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/utils/env.py:
--------------------------------------------------------------------------------

```python
"""Centralized environment variable access for Zen MCP Server."""

from __future__ import annotations

import os
from collections.abc import Mapping
from contextlib import contextmanager
from pathlib import Path

try:
    from dotenv import dotenv_values, load_dotenv
except ImportError:  # pragma: no cover - optional dependency
    dotenv_values = None  # type: ignore[assignment]
    load_dotenv = None  # type: ignore[assignment]

_PROJECT_ROOT = Path(__file__).resolve().parent.parent
_ENV_PATH = _PROJECT_ROOT / ".env"

_DOTENV_VALUES: dict[str, str | None] = {}
_FORCE_ENV_OVERRIDE = False


def _read_dotenv_values() -> dict[str, str | None]:
    if dotenv_values is not None and _ENV_PATH.exists():
        loaded = dotenv_values(_ENV_PATH)
        return dict(loaded)
    return {}


def _compute_force_override(values: Mapping[str, str | None]) -> bool:
    raw = (values.get("ZEN_MCP_FORCE_ENV_OVERRIDE") or "false").strip().lower()
    return raw == "true"


def reload_env(dotenv_mapping: Mapping[str, str | None] | None = None) -> None:
    """Reload .env values and recompute override semantics.

    Args:
        dotenv_mapping: Optional mapping used instead of reading the .env file.
            Intended for tests; when provided, load_dotenv is not invoked.
    """

    global _DOTENV_VALUES, _FORCE_ENV_OVERRIDE

    if dotenv_mapping is not None:
        _DOTENV_VALUES = dict(dotenv_mapping)
        _FORCE_ENV_OVERRIDE = _compute_force_override(_DOTENV_VALUES)
        return

    _DOTENV_VALUES = _read_dotenv_values()
    _FORCE_ENV_OVERRIDE = _compute_force_override(_DOTENV_VALUES)

    if load_dotenv is not None and _ENV_PATH.exists():
        load_dotenv(dotenv_path=_ENV_PATH, override=_FORCE_ENV_OVERRIDE)


reload_env()


def env_override_enabled() -> bool:
    """Return True when ZEN_MCP_FORCE_ENV_OVERRIDE is enabled via the .env file."""

    return _FORCE_ENV_OVERRIDE


def get_env(key: str, default: str | None = None) -> str | None:
    """Retrieve environment variables respecting ZEN_MCP_FORCE_ENV_OVERRIDE."""

    if env_override_enabled():
        if key in _DOTENV_VALUES:
            value = _DOTENV_VALUES[key]
            return value if value is not None else default
        return default

    return os.getenv(key, default)


def get_env_bool(key: str, default: bool = False) -> bool:
    """Boolean helper that respects override semantics."""

    raw_default = "true" if default else "false"
    raw_value = get_env(key, raw_default)
    return (raw_value or raw_default).strip().lower() == "true"


def get_all_env() -> dict[str, str | None]:
    """Expose the loaded .env mapping for diagnostics/logging."""

    return dict(_DOTENV_VALUES)


@contextmanager
def suppress_env_vars(*names: str):
    """Temporarily remove environment variables during the context.

    Args:
        names: Environment variable names to remove. Empty or falsy names are ignored.
    """

    removed: dict[str, str] = {}
    try:
        for name in names:
            if not name:
                continue
            if name in os.environ:
                removed[name] = os.environ[name]
                del os.environ[name]
        yield
    finally:
        for name, value in removed.items():
            os.environ[name] = value

```

--------------------------------------------------------------------------------
/tests/test_parse_model_option.py:
--------------------------------------------------------------------------------

```python
"""Tests for parse_model_option function."""

from server import parse_model_option


class TestParseModelOption:
    """Test cases for model option parsing."""

    def test_openrouter_free_suffix_preserved(self):
        """Test that OpenRouter :free suffix is preserved as part of model name."""
        model, option = parse_model_option("openai/gpt-3.5-turbo:free")
        assert model == "openai/gpt-3.5-turbo:free"
        assert option is None

    def test_openrouter_beta_suffix_preserved(self):
        """Test that OpenRouter :beta suffix is preserved as part of model name."""
        model, option = parse_model_option("anthropic/claude-opus-4.1:beta")
        assert model == "anthropic/claude-opus-4.1:beta"
        assert option is None

    def test_openrouter_preview_suffix_preserved(self):
        """Test that OpenRouter :preview suffix is preserved as part of model name."""
        model, option = parse_model_option("google/gemini-pro:preview")
        assert model == "google/gemini-pro:preview"
        assert option is None

    def test_ollama_tag_parsed_as_option(self):
        """Test that Ollama tags are parsed as options."""
        model, option = parse_model_option("llama3.2:latest")
        assert model == "llama3.2"
        assert option == "latest"

    def test_consensus_stance_parsed_as_option(self):
        """Test that consensus stances are parsed as options."""
        model, option = parse_model_option("o3:for")
        assert model == "o3"
        assert option == "for"

        model, option = parse_model_option("gemini-2.5-pro:against")
        assert model == "gemini-2.5-pro"
        assert option == "against"

    def test_openrouter_unknown_suffix_parsed_as_option(self):
        """Test that unknown suffixes on OpenRouter models are parsed as options."""
        model, option = parse_model_option("openai/gpt-4:custom-tag")
        assert model == "openai/gpt-4"
        assert option == "custom-tag"

    def test_plain_model_name(self):
        """Test plain model names without colons."""
        model, option = parse_model_option("gpt-4")
        assert model == "gpt-4"
        assert option is None

    def test_url_not_parsed(self):
        """Test that URLs are not parsed for options."""
        model, option = parse_model_option("http://localhost:8080")
        assert model == "http://localhost:8080"
        assert option is None

    def test_whitespace_handling(self):
        """Test that whitespace is properly stripped."""
        model, option = parse_model_option("  openai/gpt-3.5-turbo:free  ")
        assert model == "openai/gpt-3.5-turbo:free"
        assert option is None

        model, option = parse_model_option("  llama3.2 : latest  ")
        assert model == "llama3.2"
        assert option == "latest"

    def test_case_insensitive_suffix_matching(self):
        """Test that OpenRouter suffix matching is case-insensitive."""
        model, option = parse_model_option("openai/gpt-3.5-turbo:FREE")
        assert model == "openai/gpt-3.5-turbo:FREE"  # Original case preserved
        assert option is None

        model, option = parse_model_option("openai/gpt-3.5-turbo:Free")
        assert model == "openai/gpt-3.5-turbo:Free"  # Original case preserved
        assert option is None

```

--------------------------------------------------------------------------------
/docs/azure_openai.md:
--------------------------------------------------------------------------------

```markdown
# Azure OpenAI Configuration

Azure OpenAI support lets Zen MCP talk to GPT-4o, GPT-4.1, GPT-5, and o-series deployments that you expose through your Azure resource. This guide describes the configuration expected by the server: a couple of required environment variables plus a JSON manifest that lists every deployment you want to expose.

## 1. Required Environment Variables

Set these entries in your `.env` (or MCP `env` block).

```bash
AZURE_OPENAI_API_KEY=your_azure_openai_key_here
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
# AZURE_OPENAI_API_VERSION=2024-02-15-preview
```

Without the key and endpoint the provider is skipped entirely. Leave the key blank only if the endpoint truly allows anonymous access (rare for Azure).

## 2. Define Deployments in `conf/azure_models.json`

Azure models live in `conf/azure_models.json` (or the file pointed to by `AZURE_MODELS_CONFIG_PATH`). Each entry follows the same schema as [`ModelCapabilities`](../providers/shared/model_capabilities.py) with one additional required key: `deployment`. This field must exactly match the deployment name shown in the Azure Portal (for example `prod-gpt4o`). The provider routes requests by that value, so omitting it or using the wrong name will cause the server to skip the model. You can also opt into extra behaviour per model—for example set `use_openai_response_api` to `true` when an Azure deployment requires the `/responses` endpoint (O-series reasoning models), or leave it unset for standard chat completions.

```json
{
  "models": [
    {
      "model_name": "gpt-4o",
      "deployment": "prod-gpt4o",
      "friendly_name": "Azure GPT-4o EU",
      "intelligence_score": 18,
      "context_window": 600000,
      "max_output_tokens": 128000,
      "supports_temperature": false,
      "temperature_constraint": "fixed",
      "aliases": ["gpt4o-eu"],
      "use_openai_response_api": false
    }
  ]
}
```

Tips:

- Copy `conf/azure_models.json` into your repo and commit it, or point `AZURE_MODELS_CONFIG_PATH` at a custom path.
- Add one object per deployment. Aliases are optional but help when you want short names like `gpt4o-eu`.
- All capability fields are optional except `model_name`, `deployment`, and `friendly_name`. Anything you omit falls back to conservative defaults.
- Set `use_openai_response_api` to `true` for models that must call Azure's `/responses` endpoint (for example O3 deployments). Leave it unset for standard chat completions.

## 3. Optional Restrictions

Use `AZURE_OPENAI_ALLOWED_MODELS` to limit which Azure models Claude can access:

```bash
AZURE_OPENAI_ALLOWED_MODELS=gpt-4o,gpt-4o-mini
```

Aliases are matched case-insensitively.

## 4. Quick Checklist

- [ ] `AZURE_OPENAI_API_KEY` and `AZURE_OPENAI_ENDPOINT` are set
- [ ] `conf/azure_models.json` (or the file referenced by `AZURE_MODELS_CONFIG_PATH`) lists every deployment with the desired metadata
- [ ] Optional: `AZURE_OPENAI_ALLOWED_MODELS` to restrict usage
- [ ] Restart `./run-server.sh` and run `listmodels` to confirm the Azure entries appear with the expected metadata

See also: [`docs/adding_providers.md`](adding_providers.md) for the full provider architecture and [README (Provider Configuration)](../README.md#provider-configuration) for quick-start environment snippets.

```

--------------------------------------------------------------------------------
/simulator_tests/test_basic_conversation.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Basic Conversation Flow Test

Tests basic conversation continuity with the chat tool, including:
- Initial chat with file analysis
- Continuing conversation with same file (deduplication)
- Adding additional files to ongoing conversation
"""

from .base_test import BaseSimulatorTest


class BasicConversationTest(BaseSimulatorTest):
    """Test basic conversation flow with chat tool"""

    @property
    def test_name(self) -> str:
        return "basic_conversation"

    @property
    def test_description(self) -> str:
        return "Basic conversation flow with chat tool"

    def run_test(self) -> bool:
        """Test basic conversation flow with chat tool"""
        try:
            self.logger.info("Test: Basic conversation flow")

            # Setup test files
            self.setup_test_files()

            # Initial chat tool call with file
            self.logger.info("  1.1: Initial chat with file analysis")
            response1, continuation_id = self.call_mcp_tool(
                "chat",
                {
                    "prompt": "Please use low thinking mode. Analyze this Python code and explain what it does",
                    "absolute_file_paths": [self.test_files["python"]],
                    "model": "flash",
                },
            )

            if not response1 or not continuation_id:
                self.logger.error("Failed to get initial response with continuation_id")
                return False

            self.logger.info(f"  ✅ Got continuation_id: {continuation_id}")

            # Continue conversation with same file (should be deduplicated)
            self.logger.info("  1.2: Continue conversation with same file")
            response2, _ = self.call_mcp_tool(
                "chat",
                {
                    "prompt": "Please use low thinking mode. Now focus on the Calculator class specifically. Are there any improvements you'd suggest?",
                    "absolute_file_paths": [self.test_files["python"]],  # Same file - should be deduplicated
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response2:
                self.logger.error("Failed to continue conversation")
                return False

            # Continue with additional file
            self.logger.info("  1.3: Continue conversation with additional file")
            response3, _ = self.call_mcp_tool(
                "chat",
                {
                    "prompt": "Please use low thinking mode. Now also analyze this configuration file and see how it might relate to the Python code",
                    "absolute_file_paths": [self.test_files["python"], self.test_files["config"]],
                    "continuation_id": continuation_id,
                    "model": "flash",
                },
            )

            if not response3:
                self.logger.error("Failed to continue with additional file")
                return False

            self.logger.info("  ✅ Basic conversation flow working")
            return True

        except Exception as e:
            self.logger.error(f"Basic conversation flow test failed: {e}")
            return False
        finally:
            self.cleanup_test_files()

```

--------------------------------------------------------------------------------
/clink/models.py:
--------------------------------------------------------------------------------

```python
"""Pydantic models for clink configuration and runtime structures."""

from __future__ import annotations

from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field, PositiveInt, field_validator


class OutputCaptureConfig(BaseModel):
    """Optional configuration for CLIs that write output to disk."""

    flag_template: str = Field(..., description="Template used to inject the output path, e.g. '--output {path}'.")
    cleanup: bool = Field(
        default=True,
        description="Whether the temporary file should be removed after reading.",
    )


class CLIRoleConfig(BaseModel):
    """Role-specific configuration loaded from JSON manifests."""

    prompt_path: str | None = Field(
        default=None,
        description="Path to the prompt file that seeds this role.",
    )
    role_args: list[str] = Field(default_factory=list)
    description: str | None = Field(default=None)

    @field_validator("role_args", mode="before")
    @classmethod
    def _ensure_list(cls, value: Any) -> list[str]:
        if value is None:
            return []
        if isinstance(value, list):
            return [str(item) for item in value]
        if isinstance(value, str):
            return [value]
        raise TypeError("role_args must be a list of strings or a single string")


class CLIClientConfig(BaseModel):
    """Raw CLI client configuration before internal defaults are applied."""

    name: str
    command: str | None = None
    working_dir: str | None = None
    additional_args: list[str] = Field(default_factory=list)
    env: dict[str, str] = Field(default_factory=dict)
    timeout_seconds: PositiveInt | None = Field(default=None)
    roles: dict[str, CLIRoleConfig] = Field(default_factory=dict)
    output_to_file: OutputCaptureConfig | None = None

    @field_validator("additional_args", mode="before")
    @classmethod
    def _ensure_args_list(cls, value: Any) -> list[str]:
        if value is None:
            return []
        if isinstance(value, list):
            return [str(item) for item in value]
        if isinstance(value, str):
            return [value]
        raise TypeError("additional_args must be a list of strings or a single string")


class ResolvedCLIRole(BaseModel):
    """Runtime representation of a CLI role with resolved prompt path."""

    name: str
    prompt_path: Path
    role_args: list[str] = Field(default_factory=list)
    description: str | None = None


class ResolvedCLIClient(BaseModel):
    """Runtime configuration after merging defaults and validating paths."""

    name: str
    executable: list[str]
    working_dir: Path | None
    internal_args: list[str] = Field(default_factory=list)
    config_args: list[str] = Field(default_factory=list)
    env: dict[str, str] = Field(default_factory=dict)
    timeout_seconds: int
    parser: str
    runner: str | None = None
    roles: dict[str, ResolvedCLIRole]
    output_to_file: OutputCaptureConfig | None = None

    def list_roles(self) -> list[str]:
        return list(self.roles.keys())

    def get_role(self, role_name: str | None) -> ResolvedCLIRole:
        key = role_name or "default"
        if key not in self.roles:
            available = ", ".join(sorted(self.roles.keys()))
            raise KeyError(f"Role '{role_name}' not configured for CLI '{self.name}'. Available roles: {available}")
        return self.roles[key]

```

--------------------------------------------------------------------------------
/tests/test_debug.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the debug tool using new WorkflowTool architecture.
"""

from tools.debug import DebugInvestigationRequest, DebugIssueTool
from tools.models import ToolModelCategory


class TestDebugTool:
    """Test suite for DebugIssueTool using new WorkflowTool architecture."""

    def test_tool_metadata(self):
        """Test basic tool metadata and configuration."""
        tool = DebugIssueTool()

        assert tool.get_name() == "debug"
        assert "debugging and root cause analysis" in tool.get_description()
        assert tool.get_default_temperature() == 0.2  # TEMPERATURE_ANALYTICAL
        assert tool.get_model_category() == ToolModelCategory.EXTENDED_REASONING
        assert tool.requires_model() is True

    def test_request_validation(self):
        """Test Pydantic request model validation."""
        # Valid investigation step request
        step_request = DebugInvestigationRequest(
            step="Investigating null pointer exception in UserService",
            step_number=1,
            total_steps=3,
            next_step_required=True,
            findings="Found potential null reference in user authentication flow",
            files_checked=["/src/UserService.java"],
            relevant_files=["/src/UserService.java"],
            relevant_context=["authenticate", "validateUser"],
            confidence="medium",
            hypothesis="Null pointer occurs when user object is not properly validated",
        )

        assert step_request.step_number == 1
        assert step_request.confidence == "medium"
        assert len(step_request.relevant_context) == 2

    def test_input_schema_generation(self):
        """Test that input schema is generated correctly."""
        tool = DebugIssueTool()
        schema = tool.get_input_schema()

        # Verify required investigation fields are present
        assert "step" in schema["properties"]
        assert "step_number" in schema["properties"]
        assert "total_steps" in schema["properties"]
        assert "next_step_required" in schema["properties"]
        assert "findings" in schema["properties"]
        assert "relevant_context" in schema["properties"]

        # Verify field types
        assert schema["properties"]["step"]["type"] == "string"
        assert schema["properties"]["step_number"]["type"] == "integer"
        assert schema["properties"]["next_step_required"]["type"] == "boolean"
        assert schema["properties"]["relevant_context"]["type"] == "array"

    def test_model_category_for_debugging(self):
        """Test that debug tool correctly identifies as extended reasoning category."""
        tool = DebugIssueTool()
        assert tool.get_model_category() == ToolModelCategory.EXTENDED_REASONING

    def test_relevant_context_handling(self):
        """Test that relevant_context is handled correctly."""
        request = DebugInvestigationRequest(
            step="Test investigation",
            step_number=1,
            total_steps=2,
            next_step_required=True,
            findings="Test findings",
            relevant_context=["method1", "method2"],
        )

        # Should have relevant_context directly
        assert request.relevant_context == ["method1", "method2"]

        # Test step data preparation
        tool = DebugIssueTool()
        step_data = tool.prepare_step_data(request)
        assert step_data["relevant_context"] == ["method1", "method2"]

```

--------------------------------------------------------------------------------
/tests/test_clink_claude_agent.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import shutil
from pathlib import Path

import pytest

from clink.agents.base import CLIAgentError
from clink.agents.claude import ClaudeAgent
from clink.models import ResolvedCLIClient, ResolvedCLIRole


class DummyProcess:
    def __init__(self, *, stdout: bytes = b"", stderr: bytes = b"", returncode: int = 0):
        self._stdout = stdout
        self._stderr = stderr
        self.returncode = returncode
        self.stdin_data: bytes | None = None

    async def communicate(self, input_data):
        self.stdin_data = input_data
        return self._stdout, self._stderr


@pytest.fixture()
def claude_agent():
    prompt_path = Path("systemprompts/clink/default.txt").resolve()
    role = ResolvedCLIRole(name="default", prompt_path=prompt_path, role_args=[])
    client = ResolvedCLIClient(
        name="claude",
        executable=["claude"],
        internal_args=["--print", "--output-format", "json"],
        config_args=["--permission-mode", "acceptEdits"],
        env={},
        timeout_seconds=30,
        parser="claude_json",
        runner="claude",
        roles={"default": role},
        output_to_file=None,
        working_dir=None,
    )
    return ClaudeAgent(client), role


async def _run_agent_with_process(monkeypatch, agent, role, process, *, system_prompt="System prompt"):
    async def fake_create_subprocess_exec(*_args, **_kwargs):
        return process

    def fake_which(executable_name):
        return f"/usr/bin/{executable_name}"

    monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec)
    monkeypatch.setattr(shutil, "which", fake_which)

    return await agent.run(
        role=role,
        prompt="Respond with 42",
        system_prompt=system_prompt,
        files=[],
        images=[],
    )


@pytest.mark.asyncio
async def test_claude_agent_injects_system_prompt(monkeypatch, claude_agent):
    agent, role = claude_agent
    stdout_payload = json.dumps(
        {
            "type": "result",
            "subtype": "success",
            "is_error": False,
            "result": "42",
        }
    ).encode()
    process = DummyProcess(stdout=stdout_payload)

    result = await _run_agent_with_process(monkeypatch, agent, role, process)

    assert "--append-system-prompt" in result.sanitized_command
    idx = result.sanitized_command.index("--append-system-prompt")
    assert result.sanitized_command[idx + 1] == "System prompt"
    assert process.stdin_data.decode().startswith("Respond with 42")


@pytest.mark.asyncio
async def test_claude_agent_recovers_error_payload(monkeypatch, claude_agent):
    agent, role = claude_agent
    stdout_payload = json.dumps(
        {
            "type": "result",
            "subtype": "success",
            "is_error": True,
            "result": "API Error",
        }
    ).encode()
    process = DummyProcess(stdout=stdout_payload, returncode=2)

    result = await _run_agent_with_process(monkeypatch, agent, role, process)

    assert result.returncode == 2
    assert result.parsed.content == "API Error"
    assert result.parsed.metadata["is_error"] is True


@pytest.mark.asyncio
async def test_claude_agent_propagates_unparseable_output(monkeypatch, claude_agent):
    agent, role = claude_agent
    process = DummyProcess(stdout=b"", returncode=1)

    with pytest.raises(CLIAgentError):
        await _run_agent_with_process(monkeypatch, agent, role, process)

```

--------------------------------------------------------------------------------
/systemprompts/thinkdeep_prompt.py:
--------------------------------------------------------------------------------

```python
"""
ThinkDeep tool system prompt
"""

THINKDEEP_PROMPT = """
ROLE
You are a senior engineering collaborator working alongside the agent on complex software problems. The agent will send you
content—analysis, prompts, questions, ideas, or theories—to deepen, validate, or extend with rigor and clarity.

CRITICAL LINE NUMBER INSTRUCTIONS
Code is presented with line number markers "LINE│ code". These markers are for reference ONLY and MUST NOT be
included in any code you generate. Always reference specific line numbers in your replies in order to locate
exact positions if needed to point to exact locations. Include a very short code excerpt alongside for clarity.
Include context_start_text and context_end_text as backup references. Never include "LINE│" markers in generated code
snippets.

IF MORE INFORMATION IS NEEDED
If you need additional context (e.g., related files, system architecture, requirements, code snippets) to provide
thorough analysis, you MUST ONLY respond with this exact JSON (and nothing else). Do NOT ask for the same file you've
been provided unless for some reason its content is missing or incomplete:
{
  "status": "files_required_to_continue",
  "mandatory_instructions": "<your critical instructions for the agent>",
  "files_needed": ["[file name here]", "[or some folder/]"]
}

GUIDELINES
1. Begin with context analysis: identify tech stack, languages, frameworks, and project constraints.
2. Stay on scope: avoid speculative, over-engineered, or oversized ideas; keep suggestions practical and grounded.
3. Challenge and enrich: find gaps, question assumptions, and surface hidden complexities or risks.
4. Provide actionable next steps: offer specific advice, trade-offs, and implementation strategies.
5. Offer multiple viable strategies ONLY WHEN clearly beneficial within the current environment.
6. Suggest creative solutions that operate within real-world constraints, and avoid proposing major shifts unless truly warranted.
7. Use concise, technical language; assume an experienced engineering audience.
8. Remember: Overengineering is an anti-pattern — avoid suggesting solutions that introduce unnecessary abstraction,
   indirection, or configuration in anticipation of complexity that does not yet exist, is not clearly justified by the
   current scope, and may not arise in the foreseeable future.

KEY FOCUS AREAS (apply when relevant)
- Architecture & Design: modularity, boundaries, abstraction layers, dependencies
- Performance & Scalability: algorithmic efficiency, concurrency, caching, bottlenecks
- Security & Safety: validation, authentication/authorization, error handling, vulnerabilities
- Quality & Maintainability: readability, testing, monitoring, refactoring
- Integration & Deployment: ONLY IF APPLICABLE TO THE QUESTION - external systems, compatibility, configuration, operational concerns

EVALUATION
Your response will be reviewed by the agent before any decision is made. Your goal is to practically extend the agent's thinking,
surface blind spots, and refine options—not to deliver final answers in isolation.

REMINDERS
- Ground all insights in the current project's architecture, limitations, and goals.
- If further context is needed, request it via the clarification JSON—nothing else.
- Prioritize depth over breadth; propose alternatives ONLY if they clearly add value and improve the current approach.
- Be the ideal development partner—rigorous, focused, and fluent in real-world software trade-offs.
"""

```

--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the main server functionality
"""

import pytest

from server import handle_call_tool


class TestServerTools:
    """Test server tool handling"""

    @pytest.mark.asyncio
    async def test_handle_call_tool_unknown(self):
        """Test calling an unknown tool"""
        result = await handle_call_tool("unknown_tool", {})
        assert len(result) == 1
        assert "Unknown tool: unknown_tool" in result[0].text

    @pytest.mark.asyncio
    async def test_handle_chat(self):
        """Test chat functionality using real integration testing"""
        import importlib
        import os

        # Set test environment
        os.environ["PYTEST_CURRENT_TEST"] = "test"

        # Save original environment
        original_env = {
            "OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY"),
            "DEFAULT_MODEL": os.environ.get("DEFAULT_MODEL"),
        }

        try:
            # Set up environment for real provider resolution
            os.environ["OPENAI_API_KEY"] = "sk-test-key-server-chat-test-not-real"
            os.environ["DEFAULT_MODEL"] = "o3-mini"

            # Clear other provider keys to isolate to OpenAI
            for key in ["GEMINI_API_KEY", "XAI_API_KEY", "OPENROUTER_API_KEY"]:
                os.environ.pop(key, None)

            # Reload config and clear registry
            import config

            importlib.reload(config)
            from providers.registry import ModelProviderRegistry

            ModelProviderRegistry._instance = None

            # Test with real provider resolution
            try:
                result = await handle_call_tool("chat", {"prompt": "Hello Gemini", "model": "o3-mini"})

                # If we get here, check the response format
                assert len(result) == 1
                # Parse JSON response
                import json

                response_data = json.loads(result[0].text)
                assert "status" in response_data

            except Exception as e:
                # Expected: API call will fail with fake key
                error_msg = str(e)
                # Should NOT be a mock-related error
                assert "MagicMock" not in error_msg
                assert "'<' not supported between instances" not in error_msg

                # Should be a real provider error
                assert any(
                    phrase in error_msg
                    for phrase in ["API", "key", "authentication", "provider", "network", "connection"]
                )

        finally:
            # Restore environment
            for key, value in original_env.items():
                if value is not None:
                    os.environ[key] = value
                else:
                    os.environ.pop(key, None)

            # Reload config and clear registry
            importlib.reload(config)
            ModelProviderRegistry._instance = None

    @pytest.mark.asyncio
    async def test_handle_version(self):
        """Test getting version info"""
        result = await handle_call_tool("version", {})
        assert len(result) == 1

        response = result[0].text
        # Parse the JSON response
        import json

        data = json.loads(response)
        assert data["status"] == "success"
        content = data["content"]

        # Check for expected content in the markdown output
        assert "# Zen MCP Server Version" in content
        assert "## Server Information" in content
        assert "## Configuration" in content
        assert "Current Version" in content

```

--------------------------------------------------------------------------------
/docs/tools/planner.md:
--------------------------------------------------------------------------------

```markdown
# Planner Tool - Interactive Step-by-Step Planning

**Break down complex projects into manageable, structured plans through step-by-step thinking**

The `planner` tool helps you break down complex ideas, problems, or projects into multiple manageable steps. Perfect for system design, migration strategies, 
architectural planning, and feature development with branching and revision capabilities.

## How It Works

The planner tool enables step-by-step thinking with incremental plan building:

1. **Start with step 1**: Describe the task or problem to plan
2. **Continue building**: Add subsequent steps, building the plan piece by piece  
3. **Revise when needed**: Update earlier decisions as new insights emerge
4. **Branch alternatives**: Explore different approaches when multiple options exist
5. **Continue across sessions**: Resume planning later with full context

## Example Prompts

#### Pro Tip
Claude supports `sub-tasks` where it will spawn and run separate background tasks. You can ask Claude to 
run Zen's planner with two separate ideas. Then when it's done, use Zen's `consensus` tool to pass the entire
plan and get expert perspective from two powerful AI models on which one to work on first! Like performing **AB** testing
in one-go without the wait!

```
Create two separate sub-tasks: in one, using planner tool show me how to add natural language support 
to my cooking app. In the other sub-task, use planner to plan how to add support for voice notes to my cooking app. 
Once done, start a consensus by sharing both plans to o3 and flash to give me the final verdict. Which one do 
I implement first?
```

```
Use zen's planner and show me how to add real-time notifications to our mobile app
```

```
Using the planner tool, show me how to add CoreData sync to my app, include any sub-steps
```

## Key Features

- **Step-by-step breakdown**: Build plans incrementally with full context awareness
- **Branching support**: Explore alternative approaches when needed  
- **Revision capabilities**: Update earlier decisions as new insights emerge
- **Multi-session continuation**: Resume planning across multiple sessions with context
- **Dynamic adjustment**: Modify step count and approach as planning progresses
- **Visual presentation**: ASCII charts, diagrams, and structured formatting
- **Professional output**: Clean, structured plans without emojis or time estimates

## More Examples

```
Using planner, plan the architecture for a new real-time chat system with 100k concurrent users
```

```
Create a plan using zen for migrating our React app from JavaScript to TypeScript
```

```
Develop a plan using zen for implementing CI/CD pipelines across our development teams
```

## Best Practices

- **Start broad, then narrow**: Begin with high-level strategy, then add implementation details
- **Include constraints**: Consider technical, organizational, and resource limitations
- **Plan for validation**: Include testing and verification steps
- **Think about dependencies**: Identify what needs to happen before each step
- **Consider alternatives**: Note when multiple approaches are viable
- **Enable continuation**: Use continuation_id for multi-session planning

## Continue With a New Plan

Like all other tools in Zen, you can `continue` with a new plan using the output from a previous plan by simply saying

```
Continue with zen's consensus tool and find out what o3:for and flash:against think of the plan 
```

You can mix and match and take one output and feed it into another, continuing from where you left off using a different 
tool / model combination.
```

--------------------------------------------------------------------------------
/tests/test_issue_245_simple.py:
--------------------------------------------------------------------------------

```python
"""
Simple test to verify GitHub issue #245 is fixed.

Issue: Custom OpenAI models (gpt-5, o3) use temperature despite the config having supports_temperature: false
"""

from unittest.mock import Mock, patch

from providers.openai import OpenAIModelProvider


def test_issue_245_custom_openai_temperature_ignored():
    """Test that reproduces and validates the fix for issue #245."""

    with patch("utils.model_restrictions.get_restriction_service") as mock_restriction:
        with patch("providers.openai_compatible.OpenAI") as mock_openai:
            with patch("providers.registries.openrouter.OpenRouterModelRegistry") as mock_registry_class:

                # Mock restriction service
                mock_service = Mock()
                mock_service.is_allowed.return_value = True
                mock_restriction.return_value = mock_service

                # Mock OpenAI client
                mock_client = Mock()
                mock_openai.return_value = mock_client
                mock_response = Mock()
                mock_response.choices = [Mock()]
                mock_response.choices[0].message.content = "Test response"
                mock_response.choices[0].finish_reason = "stop"
                mock_response.model = "gpt-5-2025-08-07"
                mock_response.id = "test"
                mock_response.created = 123
                mock_response.usage = Mock()
                mock_response.usage.prompt_tokens = 10
                mock_response.usage.completion_tokens = 5
                mock_response.usage.total_tokens = 15
                mock_client.chat.completions.create.return_value = mock_response

                # Mock registry with user's custom config (the issue scenario)
                mock_registry = Mock()
                mock_registry_class.return_value = mock_registry

                from providers.shared import ModelCapabilities, ProviderType, TemperatureConstraint

                # This is what the user configured in their custom_models.json
                custom_config = ModelCapabilities(
                    provider=ProviderType.OPENAI,
                    model_name="gpt-5-2025-08-07",
                    friendly_name="Custom GPT-5",
                    context_window=400000,
                    max_output_tokens=128000,
                    supports_extended_thinking=True,
                    supports_json_mode=True,
                    supports_system_prompts=True,
                    supports_streaming=True,
                    supports_function_calling=True,
                    supports_temperature=False,  # User set this to false!
                    temperature_constraint=TemperatureConstraint.create("fixed"),
                    supports_images=True,
                    max_image_size_mb=20.0,
                    description="Custom OpenAI GPT-5",
                )
                mock_registry.get_model_config.return_value = custom_config

                # Create provider and test
                provider = OpenAIModelProvider(api_key="test-key")
                provider.validate_model_name = lambda name: True

                # This is what was causing the 400 error before the fix
                provider.generate_content(
                    prompt="Test", model_name="gpt-5-2025-08-07", temperature=0.2  # This should be ignored!
                )

                # Verify the fix: NO temperature should be sent to the API
                call_kwargs = mock_client.chat.completions.create.call_args[1]
                assert "temperature" not in call_kwargs, "Fix failed: temperature still being sent!"

```

--------------------------------------------------------------------------------
/.github/workflows/docker-release.yml:
--------------------------------------------------------------------------------

```yaml
name: Docker Release Build

on:
  release:
    types: [published]
  workflow_dispatch:
    inputs:
      tag:
        description: 'Tag to build (leave empty for latest release)'
        required: false
        type: string

permissions:
  contents: read
  packages: write

jobs:
  docker:
    name: Build and Push Docker Image
    runs-on: ubuntu-latest
    
    steps:
      - name: Checkout
        uses: actions/checkout@v4
        with:
          # If triggered by workflow_dispatch with a tag, checkout that tag
          ref: ${{ inputs.tag || github.event.release.tag_name }}

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Login to GitHub Container Registry
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Extract metadata
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ghcr.io/${{ github.repository }}
          tags: |
            # Tag with the release version
            type=semver,pattern={{version}},value=${{ inputs.tag || github.event.release.tag_name }}
            type=semver,pattern={{major}}.{{minor}},value=${{ inputs.tag || github.event.release.tag_name }}
            type=semver,pattern={{major}},value=${{ inputs.tag || github.event.release.tag_name }}
            # Also tag as latest for the most recent release
            type=raw,value=latest,enable={{is_default_branch}}

      - name: Build and push Docker image
        uses: docker/build-push-action@v5
        with:
          context: .
          platforms: linux/amd64,linux/arm64
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

      - name: Update release with Docker info
        if: github.event_name == 'release'
        run: |
          RELEASE_TAG="${{ github.event.release.tag_name }}"
          DOCKER_TAGS=$(echo "${{ steps.meta.outputs.tags }}" | tr '\n' ' ')
          
          # Add Docker information to the release
          gh release edit "$RELEASE_TAG" --notes-file - << EOF
          ${{ github.event.release.body }}
          
          ---
          
          ## 🐳 Docker Images
          
          This release is available as Docker images:
          
          $(echo "$DOCKER_TAGS" | sed 's/ghcr.io/- `ghcr.io/g' | sed 's/ /`\n/g')
          
          **Quick start with Docker:**
          \`\`\`bash
          docker pull ghcr.io/${{ github.repository }}:$RELEASE_TAG
          \`\`\`
          
          **Claude Desktop configuration:**
          \`\`\`json
          {
            "mcpServers": {
              "zen-mcp-server": {
                "command": "docker",
                "args": [
                  "run", "--rm", "-i",
                  "-e", "GEMINI_API_KEY",
                  "ghcr.io/${{ github.repository }}:$RELEASE_TAG"
                ],
                "env": {
                  "GEMINI_API_KEY": "your-api-key-here"
                }
              }
            }
          }
          \`\`\`
          EOF
        env:
          GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}

      - name: Create deployment summary
        run: |
          echo "## 🐳 Docker Release Build Complete" >> $GITHUB_STEP_SUMMARY
          echo "" >> $GITHUB_STEP_SUMMARY
          echo "**Release**: ${{ inputs.tag || github.event.release.tag_name }}" >> $GITHUB_STEP_SUMMARY
          echo "**Images built:**" >> $GITHUB_STEP_SUMMARY
          echo "\`\`\`" >> $GITHUB_STEP_SUMMARY
          echo "${{ steps.meta.outputs.tags }}" >> $GITHUB_STEP_SUMMARY
          echo "\`\`\`" >> $GITHUB_STEP_SUMMARY
```

--------------------------------------------------------------------------------
/docs/tools/version.md:
--------------------------------------------------------------------------------

```markdown
# Version Tool - Server Information

**Get server version, configuration details, and list of available tools**

The `version` tool provides information about the Zen MCP Server version, configuration details, and system capabilities. This is useful for debugging, understanding server capabilities, and verifying your installation.

## Usage

```
"Get zen to show its version"
```

## Key Features

- **Server version information**: Current version and build details
- **Configuration overview**: Active settings and capabilities
- **Tool inventory**: Complete list of available tools and their status
- **System health**: Basic server status and connectivity verification
- **Debug information**: Helpful details for troubleshooting

## Output Information

The tool provides:

**Version Details:**
- Server version number
- Build timestamp and commit information
- MCP protocol version compatibility
- Python runtime version

**Configuration Summary:**
- Active providers and their status
- Default model configuration
- Feature flags and settings
- Environment configuration overview

**Tool Availability:**
- Complete list of available tools
- Tool version information
- Capability status for each tool

**System Information:**
- Server uptime and status
- Memory and resource usage (if available)
- Conversation memory status
- Server process information

## Example Output

```
🔧 Zen MCP Server Information

📋 Version: 2.15.0
🏗️ Build: 2024-01-15T10:30:00Z (commit: abc123f)
🔌 MCP Protocol: 1.0.0
🐍 Python Runtime: 3.11.7

⚙️ Configuration:
• Default Model: auto
• Providers: Google ✅, OpenAI ✅, Custom ✅
• Conversation Memory: Active ✅
• Web Search: Enabled

🛠️ Available Tools (12):
• chat - General development chat & collaborative thinking
• thinkdeep - Extended reasoning partner  
• consensus - Multi-model perspective gathering
• codereview - Professional code review
• precommit - Pre-commit validation
• debug - Expert debugging assistant
• analyze - Smart file analysis
• refactor - Intelligent code refactoring
• tracer - Static code analysis prompt generator
• testgen - Comprehensive test generation
• listmodels - List available models
• version - Server information

🔍 System Status:
• Server Uptime: 2h 35m
• Memory Storage: Active
• Server Process: Running
```

## When to Use Version Tool

- **Troubleshooting**: When experiencing issues with the server or tools
- **Configuration verification**: To confirm your setup is correct
- **Support requests**: To provide system information when asking for help
- **Update checking**: To verify you're running the latest version
- **Capability discovery**: To understand what features are available

## Debug Information

The version tool can help diagnose common issues:

**Connection Problems:**
- Verify server is running and responsive
- Check MCP protocol compatibility
- Confirm tool availability

**Configuration Issues:**
- Validate provider setup
- Check API key configuration status
- Verify feature enablement

**Performance Troubleshooting:**
- Server uptime and stability
- Resource usage patterns
- Memory storage health

## Tool Parameters

This tool requires no parameters - it provides comprehensive server information automatically.

## Best Practices

- **Include in bug reports**: Always include version output when reporting issues
- **Check after updates**: Verify version information after server updates
- **Monitor system health**: Use periodically to check server status
- **Validate configuration**: Confirm settings match your expectations

## When to Use Version vs Other Tools

- **Use `version`** for: Server diagnostics, configuration verification, troubleshooting
- **Use `listmodels`** for: Model availability and capability information
- **Use other tools** for: Actual development and analysis tasks
- **Use with support**: Essential information for getting help with issues
```

--------------------------------------------------------------------------------
/docs/tools/listmodels.md:
--------------------------------------------------------------------------------

```markdown
# ListModels Tool - List Available Models

**Display all available AI models organized by provider**

The `listmodels` tool shows which providers are configured, available models, their aliases, context windows, and capabilities. This is useful for understanding what models can be used and their characteristics.

## Usage

```
"Use zen to list available models"
```

## Key Features

- **Provider organization**: Shows all configured providers and their status
- **Model capabilities**: Context windows, thinking mode support, and special features
- **Alias mapping**: Shows shorthand names and their full model mappings
- **Configuration status**: Indicates which providers are available based on API keys
- **Context window information**: Helps you choose models based on your content size needs
- **Capability overview**: Understanding which models support extended thinking, vision, etc.

## Output Information

The tool displays:

**Provider Status:**
- Which providers are configured and available
- API key status (without revealing the actual keys)
- Provider priority order

**Model Details:**
- Full model names and their aliases
- Context window sizes (tokens)
- Special capabilities (thinking modes, vision support, etc.)
- Provider-specific features

**Capability Summary:**
- Which models support extended thinking
- Vision-capable models for image analysis
- Models with largest context windows
- Fastest models for quick tasks

## Example Output

```
📋 Available Models by Provider

🔹 Google (Gemini) - ✅ Configured
  • pro (gemini-2.5-pro) - 1M context, thinking modes
  • flash (gemini-2.0-flash-experimental) - 1M context, ultra-fast

🔹 OpenAI - ✅ Configured  
  • o3 (o3) - 200K context, strong reasoning
  • o3-mini (o3-mini) - 200K context, balanced
  • o4-mini (o4-mini) - 200K context, latest reasoning

🔹 Custom/Local - ✅ Configured
  • local-llama (llama3.2) - 128K context, local inference
  • Available at: http://localhost:11434/v1

🔹 OpenRouter - ❌ Not configured
  Set OPENROUTER_API_KEY to enable access to Claude, GPT-4, and more models
```

## When to Use ListModels

- **Model selection**: When you're unsure which models are available
- **Capability checking**: To verify what features each model supports
- **Configuration validation**: To confirm your API keys are working
- **Context planning**: To choose models based on content size requirements
- **Performance optimization**: To select the right model for speed vs quality trade-offs

## Configuration Dependencies

The available models depend on your configuration:

**API Keys Required:**
- `GEMINI_API_KEY` - Enables Gemini Pro and Flash models
- `OPENAI_API_KEY` - Enables OpenAI O3, O4-mini, and GPT models
- `OPENROUTER_API_KEY` - Enables access to multiple providers through OpenRouter
- `CUSTOM_API_URL` - Enables local/custom models (Ollama, vLLM, etc.)

**Model Restrictions:**
If you've set model usage restrictions via environment variables, the tool will show:
- Which models are allowed vs restricted
- Active restriction policies
- How to modify restrictions

## Tool Parameters

This tool requires no parameters - it simply queries the server configuration and displays all available information.

## Best Practices

- **Check before planning**: Use this tool to understand your options before starting complex tasks
- **Verify configuration**: Confirm your API keys are working as expected
- **Choose appropriate models**: Match model capabilities to your specific needs
- **Understand limits**: Be aware of context windows when working with large files

## When to Use ListModels vs Other Tools

- **Use `listmodels`** for: Understanding available options and model capabilities
- **Use `chat`** for: General discussions about which model to use for specific tasks
- **Use `version`** for: Server configuration and version information
- **Use other tools** for: Actual analysis, debugging, or development work
```

--------------------------------------------------------------------------------
/tests/test_gemini_token_usage.py:
--------------------------------------------------------------------------------

```python
"""Tests for Gemini provider token usage extraction."""

import unittest
from unittest.mock import Mock

from providers.gemini import GeminiModelProvider


class TestGeminiTokenUsage(unittest.TestCase):
    """Test Gemini provider token usage handling."""

    def setUp(self):
        """Set up test fixtures."""
        self.provider = GeminiModelProvider("test-key")

    def test_extract_usage_with_valid_tokens(self):
        """Test token extraction with valid token counts."""
        response = Mock()
        response.usage_metadata = Mock()
        response.usage_metadata.prompt_token_count = 100
        response.usage_metadata.candidates_token_count = 50

        usage = self.provider._extract_usage(response)

        self.assertEqual(usage["input_tokens"], 100)
        self.assertEqual(usage["output_tokens"], 50)
        self.assertEqual(usage["total_tokens"], 150)

    def test_extract_usage_with_none_input_tokens(self):
        """Test token extraction when input_tokens is None (regression test for bug)."""
        response = Mock()
        response.usage_metadata = Mock()
        response.usage_metadata.prompt_token_count = None  # This was causing crashes
        response.usage_metadata.candidates_token_count = 50

        usage = self.provider._extract_usage(response)

        # Should not include input_tokens when None
        self.assertNotIn("input_tokens", usage)
        self.assertEqual(usage["output_tokens"], 50)
        # Should not calculate total_tokens when input is None
        self.assertNotIn("total_tokens", usage)

    def test_extract_usage_with_none_output_tokens(self):
        """Test token extraction when output_tokens is None (regression test for bug)."""
        response = Mock()
        response.usage_metadata = Mock()
        response.usage_metadata.prompt_token_count = 100
        response.usage_metadata.candidates_token_count = None  # This was causing crashes

        usage = self.provider._extract_usage(response)

        self.assertEqual(usage["input_tokens"], 100)
        # Should not include output_tokens when None
        self.assertNotIn("output_tokens", usage)
        # Should not calculate total_tokens when output is None
        self.assertNotIn("total_tokens", usage)

    def test_extract_usage_with_both_none_tokens(self):
        """Test token extraction when both token counts are None."""
        response = Mock()
        response.usage_metadata = Mock()
        response.usage_metadata.prompt_token_count = None
        response.usage_metadata.candidates_token_count = None

        usage = self.provider._extract_usage(response)

        # Should return empty dict when all tokens are None
        self.assertEqual(usage, {})

    def test_extract_usage_without_usage_metadata(self):
        """Test token extraction when response has no usage_metadata."""
        response = Mock(spec=[])

        usage = self.provider._extract_usage(response)

        # Should return empty dict
        self.assertEqual(usage, {})

    def test_extract_usage_with_zero_tokens(self):
        """Test token extraction with zero token counts."""
        response = Mock()
        response.usage_metadata = Mock()
        response.usage_metadata.prompt_token_count = 0
        response.usage_metadata.candidates_token_count = 0

        usage = self.provider._extract_usage(response)

        self.assertEqual(usage["input_tokens"], 0)
        self.assertEqual(usage["output_tokens"], 0)
        self.assertEqual(usage["total_tokens"], 0)

    def test_extract_usage_missing_attributes(self):
        """Test token extraction when metadata lacks token count attributes."""
        response = Mock()
        response.usage_metadata = Mock(spec=[])

        usage = self.provider._extract_usage(response)

        # Should return empty dict when attributes are missing
        self.assertEqual(usage, {})


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/systemprompts/chat_prompt.py:
--------------------------------------------------------------------------------

```python
"""
Chat tool system prompt
"""

CHAT_PROMPT = """
You are a senior engineering thought-partner collaborating with another AI agent. Your mission is to brainstorm, validate ideas,
and offer well-reasoned second opinions on technical decisions when they are justified and practical.

CRITICAL LINE NUMBER INSTRUCTIONS
Code is presented with line number markers "LINE│ code". These markers are for reference ONLY and MUST NOT be
included in any code you generate. Always reference specific line numbers in your replies in order to locate
exact positions if needed to point to exact locations. Include a very short code excerpt alongside for clarity.
Include context_start_text and context_end_text as backup references. Never include "LINE│" markers in generated code
snippets.

IF MORE INFORMATION IS NEEDED
If the agent is discussing specific code, functions, or project components that was not given as part of the context,
and you need additional context (e.g., related files, configuration, dependencies, test files) to provide meaningful
collaboration, you MUST respond ONLY with this JSON format (and nothing else). Do NOT ask for the same file you've been
provided unless for some reason its content is missing or incomplete:
{
  "status": "files_required_to_continue",
  "mandatory_instructions": "<your critical instructions for the agent>",
  "files_needed": ["[file name here]", "[or some folder/]"]
}

SCOPE & FOCUS
• Ground every suggestion in the project's current tech stack, languages, frameworks, and constraints.
• Recommend new technologies or patterns ONLY when they provide clearly superior outcomes with minimal added complexity.
• Avoid speculative, over-engineered, or unnecessarily abstract designs that exceed current project goals or needs.
• Keep proposals practical and directly actionable within the existing architecture.
• Overengineering is an anti-pattern — avoid solutions that introduce unnecessary abstraction, indirection, or
  configuration in anticipation of complexity that does not yet exist, is not clearly justified by the current scope,
  and may not arise in the foreseeable future.

COLLABORATION APPROACH
1. Treat the collaborating agent as an equally senior peer. Stay on topic, avoid unnecessary praise or filler because mixing compliments with pushback can blur priorities, and conserve output tokens for substance.
2. Engage deeply with the agent's input – extend, refine, and explore alternatives ONLY WHEN they are well-justified and materially beneficial.
3. Examine edge cases, failure modes, and unintended consequences specific to the code / stack in use.
4. Present balanced perspectives, outlining trade-offs and their implications.
5. Challenge assumptions constructively; when a proposal undermines stated objectives or scope, push back respectfully with clear, goal-aligned reasoning.
6. Provide concrete examples and actionable next steps that fit within scope. Prioritize direct, achievable outcomes.
7. Ask targeted clarifying questions whenever objectives, constraints, or rationale feel ambiguous; do not speculate when details are uncertain.

BRAINSTORMING GUIDELINES
• Offer multiple viable strategies ONLY WHEN clearly beneficial within the current environment.
• Suggest creative solutions that operate within real-world constraints, and avoid proposing major shifts unless truly warranted.
• Surface pitfalls early, particularly those tied to the chosen frameworks, languages, design direction or choice.
• Evaluate scalability, maintainability, and operational realities inside the existing architecture and current
framework.
• Reference industry best practices relevant to the technologies in use.
• Communicate concisely and technically, assuming an experienced engineering audience.

REMEMBER
Act as a peer, not a lecturer. Avoid overcomplicating. Aim for depth over breadth, stay within project boundaries, and help the team
reach sound, actionable decisions.
"""

```

--------------------------------------------------------------------------------
/simulator_tests/test_logs_validation.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Server Logs Validation Test

Validates server logs to confirm file deduplication behavior and
conversation threading is working properly.
"""

from .base_test import BaseSimulatorTest


class LogsValidationTest(BaseSimulatorTest):
    """Validate server logs to confirm file deduplication behavior"""

    @property
    def test_name(self) -> str:
        return "logs_validation"

    @property
    def test_description(self) -> str:
        return "Server logs validation"

    def run_test(self) -> bool:
        """Validate server logs to confirm file deduplication behavior"""
        try:
            self.logger.info("📋 Test: Validating server logs for file deduplication...")

            # Get server logs from log files
            import os

            logs = ""
            log_files = ["logs/mcp_server.log", "logs/mcp_activity.log"]

            for log_file in log_files:
                if os.path.exists(log_file):
                    try:
                        with open(log_file) as f:
                            file_content = f.read()
                            logs += f"\n=== {log_file} ===\n{file_content}\n"
                            self.logger.debug(f"Read {len(file_content)} characters from {log_file}")
                    except Exception as e:
                        self.logger.warning(f"Could not read {log_file}: {e}")
                else:
                    self.logger.warning(f"Log file not found: {log_file}")

            if not logs.strip():
                self.logger.warning("No log content found - server may not have processed any requests yet")
                return False

            # Look for conversation threading patterns that indicate the system is working
            conversation_patterns = [
                "CONVERSATION_RESUME",
                "CONVERSATION_CONTEXT",
                "previous turns loaded",
                "tool embedding",
                "files included",
                "files truncated",
                "already in conversation history",
            ]

            conversation_lines = []
            for line in logs.split("\n"):
                for pattern in conversation_patterns:
                    if pattern.lower() in line.lower():
                        conversation_lines.append(line.strip())
                        break

            # Look for evidence of conversation threading and file handling
            conversation_threading_found = False
            multi_turn_conversations = False

            for line in conversation_lines:
                lower_line = line.lower()
                if "conversation_resume" in lower_line:
                    conversation_threading_found = True
                    self.logger.debug(f"📄 Conversation threading: {line}")
                elif "previous turns loaded" in lower_line:
                    multi_turn_conversations = True
                    self.logger.debug(f"📄 Multi-turn conversation: {line}")
                elif "already in conversation" in lower_line:
                    self.logger.info(f"✅ Found explicit deduplication: {line}")
                    return True

            # Conversation threading with multiple turns is evidence of file deduplication working
            if conversation_threading_found and multi_turn_conversations:
                self.logger.info("✅ Conversation threading with multi-turn context working")
                self.logger.info(
                    "✅ File deduplication working implicitly (files embedded once in conversation history)"
                )
                return True
            elif conversation_threading_found:
                self.logger.info("✅ Conversation threading detected")
                return True
            else:
                self.logger.warning("⚠️  No clear evidence of conversation threading in logs")
                self.logger.debug(f"Found {len(conversation_lines)} conversation-related log lines")
                return False

        except Exception as e:
            self.logger.error(f"Log validation failed: {e}")
            return False

```

--------------------------------------------------------------------------------
/conf/xai_models.json:
--------------------------------------------------------------------------------

```json
{
  "_README": {
    "description": "Model metadata for X.AI (GROK) API access.",
    "documentation": "https://github.com/BeehiveInnovations/zen-mcp-server/blob/main/docs/custom_models.md",
    "usage": "Models listed here are exposed directly through the X.AI provider. Aliases are case-insensitive.",
    "field_notes": "Matches providers/shared/model_capabilities.py.",
    "field_descriptions": {
      "model_name": "The model identifier (e.g., 'grok-4', 'grok-3-fast')",
      "aliases": "Array of short names users can type instead of the full model name",
      "context_window": "Total number of tokens the model can process (input + output combined)",
      "max_output_tokens": "Maximum number of tokens the model can generate in a single response",
      "max_thinking_tokens": "Maximum reasoning/thinking tokens the model will allocate when extended thinking is requested",
      "supports_extended_thinking": "Whether the model supports extended reasoning tokens (currently none do via OpenRouter or custom APIs)",
      "supports_json_mode": "Whether the model can guarantee valid JSON output",
      "supports_function_calling": "Whether the model supports function/tool calling",
      "supports_images": "Whether the model can process images/visual input",
      "max_image_size_mb": "Maximum total size in MB for all images combined (capped at 40MB max for custom models)",
      "supports_temperature": "Whether the model accepts temperature parameter in API calls (set to false for O3/O4 reasoning models)",
      "temperature_constraint": "Type of temperature constraint: 'fixed' (fixed value), 'range' (continuous range), 'discrete' (specific values), or omit for default range",
      "use_openai_response_api": "Set to true when the model must use the /responses endpoint (reasoning models like GPT-5 Pro). Leave false/omit for standard chat completions.",
      "default_reasoning_effort": "Default reasoning effort level for models that support it (e.g., 'low', 'medium', 'high'). Omit if not applicable.",
      "description": "Human-readable description of the model",
      "intelligence_score": "1-20 human rating used as the primary signal for auto-mode model ordering"
    }
  },
  "models": [
    {
      "model_name": "grok-4",
      "friendly_name": "X.AI (Grok 4)",
      "aliases": [
        "grok",
        "grok4",
        "grok-4"
      ],
      "intelligence_score": 16,
      "description": "GROK-4 (256K context) - Frontier multimodal reasoning model with advanced capabilities",
      "context_window": 256000,
      "max_output_tokens": 256000,
      "supports_extended_thinking": true,
      "supports_system_prompts": true,
      "supports_streaming": true,
      "supports_function_calling": true,
      "supports_json_mode": true,
      "supports_images": true,
      "supports_temperature": true,
      "max_image_size_mb": 20.0
    },
    {
      "model_name": "grok-3",
      "friendly_name": "X.AI (Grok 3)",
      "aliases": [
        "grok3"
      ],
      "intelligence_score": 13,
      "description": "GROK-3 (131K context) - Advanced reasoning model from X.AI, excellent for complex analysis",
      "context_window": 131072,
      "max_output_tokens": 131072,
      "supports_extended_thinking": false,
      "supports_system_prompts": true,
      "supports_streaming": true,
      "supports_function_calling": true,
      "supports_json_mode": false,
      "supports_images": false,
      "supports_temperature": true
    },
    {
      "model_name": "grok-3-fast",
      "friendly_name": "X.AI (Grok 3 Fast)",
      "aliases": [
        "grok3fast",
        "grokfast",
        "grok3-fast"
      ],
      "intelligence_score": 12,
      "description": "GROK-3 Fast (131K context) - Higher performance variant, faster processing but more expensive",
      "context_window": 131072,
      "max_output_tokens": 131072,
      "supports_extended_thinking": false,
      "supports_system_prompts": true,
      "supports_streaming": true,
      "supports_function_calling": true,
      "supports_json_mode": false,
      "supports_images": false,
      "supports_temperature": true
    }
  ]
}

```

--------------------------------------------------------------------------------
/tests/test_chat_codegen_integration.py:
--------------------------------------------------------------------------------

```python
"""Integration test for Chat tool code generation with Gemini 2.5 Pro.

This test uses the Google Gemini SDK's built-in record/replay support. To refresh the
cassette, delete the existing JSON file under
``tests/gemini_cassettes/chat_codegen/gemini25_pro_calculator/mldev.json`` and run:

```
GEMINI_API_KEY=<real-key> pytest tests/test_chat_codegen_integration.py::test_chat_codegen_saves_file
```

The test will automatically record a new interaction when the cassette is missing and
the environment variable `GEMINI_API_KEY` is set to a valid key.
"""

from __future__ import annotations

import json
import os
from pathlib import Path

import pytest

from providers.gemini import GeminiModelProvider
from providers.registry import ModelProviderRegistry, ProviderType
from tools.chat import ChatTool

REPLAYS_ROOT = Path(__file__).parent / "gemini_cassettes"
CASSETTE_DIR = REPLAYS_ROOT / "chat_codegen"
CASSETTE_PATH = CASSETTE_DIR / "gemini25_pro_calculator" / "mldev.json"
CASSETTE_REPLAY_ID = "chat_codegen/gemini25_pro_calculator/mldev"


@pytest.mark.asyncio
@pytest.mark.no_mock_provider
async def test_chat_codegen_saves_file(monkeypatch, tmp_path):
    """Ensure Gemini 2.5 Pro responses create zen_generated.code when code is emitted."""

    CASSETTE_PATH.parent.mkdir(parents=True, exist_ok=True)

    recording_mode = not CASSETTE_PATH.exists()
    gemini_key = os.getenv("GEMINI_API_KEY", "")

    if recording_mode:
        if not gemini_key or gemini_key.startswith("dummy"):
            pytest.skip("Cassette missing and GEMINI_API_KEY not configured. Provide a real key to record.")
        client_mode = "record"
    else:
        gemini_key = "dummy-key-for-replay"
        client_mode = "replay"

    with monkeypatch.context() as m:
        m.setenv("GEMINI_API_KEY", gemini_key)
        m.setenv("DEFAULT_MODEL", "auto")
        m.setenv("GOOGLE_ALLOWED_MODELS", "gemini-2.5-pro")
        m.setenv("GOOGLE_GENAI_CLIENT_MODE", client_mode)
        m.setenv("GOOGLE_GENAI_REPLAYS_DIRECTORY", str(REPLAYS_ROOT))
        m.setenv("GOOGLE_GENAI_REPLAY_ID", CASSETTE_REPLAY_ID)

        # Clear other provider keys to avoid unintended routing
        for key in ["OPENAI_API_KEY", "XAI_API_KEY", "OPENROUTER_API_KEY", "CUSTOM_API_KEY"]:
            m.delenv(key, raising=False)

        ModelProviderRegistry.reset_for_testing()
        ModelProviderRegistry.register_provider(ProviderType.GOOGLE, GeminiModelProvider)

        working_dir = tmp_path / "codegen"
        working_dir.mkdir()
        preexisting = working_dir / "zen_generated.code"
        preexisting.write_text("stale contents", encoding="utf-8")

        chat_tool = ChatTool()
        prompt = (
            "Please generate a Python module with functions `add` and `multiply` that perform"
            " basic addition and multiplication. Produce the response using the structured"
            " <GENERATED-CODE> format so the assistant can apply the files directly."
        )

        result = await chat_tool.execute(
            {
                "prompt": prompt,
                "model": "gemini-2.5-pro",
                "working_directory_absolute_path": str(working_dir),
            }
        )

        provider = ModelProviderRegistry.get_provider_for_model("gemini-2.5-pro")
        if provider is not None:
            try:
                provider.client.close()
            except AttributeError:
                pass

        # Reset restriction service cache to avoid leaking allowed-model config
        try:
            from utils import model_restrictions

            model_restrictions._restriction_service = None  # type: ignore[attr-defined]
        except Exception:
            pass

    assert result and result[0].type == "text"
    payload = json.loads(result[0].text)
    assert payload["status"] in {"success", "continuation_available"}

    artifact_path = working_dir / "zen_generated.code"
    assert artifact_path.exists()
    saved = artifact_path.read_text()
    assert "<GENERATED-CODE>" in saved
    assert "<NEWFILE:" in saved
    assert "def add" in saved and "def multiply" in saved
    assert "stale contents" not in saved

    artifact_path.unlink()

```

--------------------------------------------------------------------------------
/docs/vcr-testing.md:
--------------------------------------------------------------------------------

```markdown
# HTTP Transport Recorder for Testing

A custom HTTP recorder for testing expensive API calls (like o3-pro) with real responses.

## Overview

The HTTP Transport Recorder captures and replays HTTP interactions at the transport layer, enabling:
- Cost-efficient testing of expensive APIs (record once, replay forever)
- Deterministic tests with real API responses
- Seamless integration with httpx and OpenAI SDK
- Automatic PII sanitization for secure recordings

## Quick Start

```python
from tests.transport_helpers import inject_transport

# Simple one-line setup with automatic transport injection
def test_expensive_api_call(monkeypatch):
    inject_transport(monkeypatch, "tests/openai_cassettes/my_test.json")
    
    # Make API calls - automatically recorded/replayed with PII sanitization
    result = await chat_tool.execute({"prompt": "2+2?", "model": "o3-pro"})
```

## How It Works

1. **First run** (cassette doesn't exist): Records real API calls
2. **Subsequent runs** (cassette exists): Replays saved responses
3. **Re-record**: Delete cassette file and run again

## Usage in Tests

The `transport_helpers.inject_transport()` function simplifies test setup:

```python
from tests.transport_helpers import inject_transport

async def test_with_recording(monkeypatch):
    # One-line setup - handles all transport injection complexity
    inject_transport(monkeypatch, "tests/openai_cassettes/my_test.json")
    
    # Use API normally - recording/replay happens transparently
    result = await chat_tool.execute({"prompt": "2+2?", "model": "o3-pro"})
```

For manual setup, see `test_o3_pro_output_text_fix.py`.

## Automatic PII Sanitization

All recordings are automatically sanitized to remove sensitive data:

- **API Keys & Tokens**: Bearer tokens, API keys, and auth headers
- **Personal Data**: Email addresses, IP addresses, phone numbers
- **URLs**: Sensitive query parameters and paths
- **Custom Patterns**: Add your own sanitization rules

Sanitization is enabled by default in `RecordingTransport`. To disable:

```python
transport = TransportFactory.create_transport(cassette_path, sanitize=False)
```

## File Structure

```
tests/
├── openai_cassettes/           # Recorded API interactions
│   └── *.json                  # Cassette files
├── http_transport_recorder.py  # Transport implementation
├── pii_sanitizer.py           # Automatic PII sanitization
├── transport_helpers.py       # Simplified transport injection
├── sanitize_cassettes.py      # Batch sanitization script
└── test_o3_pro_output_text_fix.py  # Example usage
```

## Sanitizing Existing Cassettes

Use the `sanitize_cassettes.py` script to clean existing recordings:

```bash
# Sanitize all cassettes (creates backups)
python tests/sanitize_cassettes.py

# Sanitize specific cassette
python tests/sanitize_cassettes.py tests/openai_cassettes/my_test.json

# Skip backup creation
python tests/sanitize_cassettes.py --no-backup
```

The script will:
- Create timestamped backups of original files
- Apply comprehensive PII sanitization
- Preserve JSON structure and functionality

## Cost Management

- **One-time cost**: Initial recording only
- **Zero ongoing cost**: Replays are free
- **CI-friendly**: No API keys needed for replay

## Re-recording

When API changes require new recordings:

```bash
# Delete specific cassette
rm tests/openai_cassettes/my_test.json

# Run test with real API key
python -m pytest tests/test_o3_pro_output_text_fix.py
```

## Implementation Details

- **RecordingTransport**: Captures real HTTP calls with automatic PII sanitization
- **ReplayTransport**: Serves saved responses from cassettes
- **TransportFactory**: Auto-selects mode based on cassette existence
- **PIISanitizer**: Comprehensive sanitization of sensitive data (integrated by default)

**Security Note**: While recordings are automatically sanitized, always review new cassette files before committing. The sanitizer removes known patterns of sensitive data, but domain-specific secrets may need custom rules.

For implementation details, see:
- `tests/http_transport_recorder.py` - Core transport implementation
- `tests/pii_sanitizer.py` - Sanitization patterns and logic
- `tests/transport_helpers.py` - Simplified test integration


```

--------------------------------------------------------------------------------
/utils/storage_backend.py:
--------------------------------------------------------------------------------

```python
"""
In-memory storage backend for conversation threads

This module provides a thread-safe, in-memory alternative to Redis for storing
conversation contexts. It's designed for ephemeral MCP server sessions where
conversations only need to persist during a single Claude session.

⚠️  PROCESS-SPECIFIC STORAGE: This storage is confined to a single Python process.
    Data stored in one process is NOT accessible from other processes or subprocesses.
    This is why simulator tests that run server.py as separate subprocesses cannot
    share conversation state between tool calls.

Key Features:
- Thread-safe operations using locks
- TTL support with automatic expiration
- Background cleanup thread for memory management
- Singleton pattern for consistent state within a single process
- Drop-in replacement for Redis storage (for single-process scenarios)
"""

import logging
import threading
import time
from typing import Optional

from utils.env import get_env

logger = logging.getLogger(__name__)


class InMemoryStorage:
    """Thread-safe in-memory storage for conversation threads"""

    def __init__(self):
        self._store: dict[str, tuple[str, float]] = {}
        self._lock = threading.Lock()
        # Match Redis behavior: cleanup interval based on conversation timeout
        # Run cleanup at 1/10th of timeout interval (e.g., 18 mins for 3 hour timeout)
        timeout_hours = int(get_env("CONVERSATION_TIMEOUT_HOURS", "3") or "3")
        self._cleanup_interval = (timeout_hours * 3600) // 10
        self._cleanup_interval = max(300, self._cleanup_interval)  # Minimum 5 minutes
        self._shutdown = False

        # Start background cleanup thread
        self._cleanup_thread = threading.Thread(target=self._cleanup_worker, daemon=True)
        self._cleanup_thread.start()

        logger.info(
            f"In-memory storage initialized with {timeout_hours}h timeout, cleanup every {self._cleanup_interval//60}m"
        )

    def set_with_ttl(self, key: str, ttl_seconds: int, value: str) -> None:
        """Store value with expiration time"""
        with self._lock:
            expires_at = time.time() + ttl_seconds
            self._store[key] = (value, expires_at)
            logger.debug(f"Stored key {key} with TTL {ttl_seconds}s")

    def get(self, key: str) -> Optional[str]:
        """Retrieve value if not expired"""
        with self._lock:
            if key in self._store:
                value, expires_at = self._store[key]
                if time.time() < expires_at:
                    logger.debug(f"Retrieved key {key}")
                    return value
                else:
                    # Clean up expired entry
                    del self._store[key]
                    logger.debug(f"Key {key} expired and removed")
        return None

    def setex(self, key: str, ttl_seconds: int, value: str) -> None:
        """Redis-compatible setex method"""
        self.set_with_ttl(key, ttl_seconds, value)

    def _cleanup_worker(self):
        """Background thread that periodically cleans up expired entries"""
        while not self._shutdown:
            time.sleep(self._cleanup_interval)
            self._cleanup_expired()

    def _cleanup_expired(self):
        """Remove all expired entries"""
        with self._lock:
            current_time = time.time()
            expired_keys = [k for k, (_, exp) in self._store.items() if exp < current_time]
            for key in expired_keys:
                del self._store[key]

            if expired_keys:
                logger.debug(f"Cleaned up {len(expired_keys)} expired conversation threads")

    def shutdown(self):
        """Graceful shutdown of background thread"""
        self._shutdown = True
        if self._cleanup_thread.is_alive():
            self._cleanup_thread.join(timeout=1)


# Global singleton instance
_storage_instance = None
_storage_lock = threading.Lock()


def get_storage_backend() -> InMemoryStorage:
    """Get the global storage instance (singleton pattern)"""
    global _storage_instance
    if _storage_instance is None:
        with _storage_lock:
            if _storage_instance is None:
                _storage_instance = InMemoryStorage()
                logger.info("Initialized in-memory conversation storage")
    return _storage_instance

```

--------------------------------------------------------------------------------
/.github/workflows/docker-pr.yml:
--------------------------------------------------------------------------------

```yaml
name: PR Docker Build

on:
  pull_request:
    types: [opened, synchronize, reopened, labeled, unlabeled]
    paths:
      - '**.py'
      - 'requirements*.txt'
      - 'pyproject.toml'
      - 'Dockerfile'
      - 'docker-compose.yml'
      - '.dockerignore'

permissions:
  contents: read
  packages: write
  pull-requests: write

jobs:
  docker:
    name: Build Docker Image
    runs-on: ubuntu-latest
    if: |
      github.event.action == 'opened' ||
      github.event.action == 'synchronize' ||
      github.event.action == 'reopened' ||
      contains(github.event.pull_request.labels.*.name, 'docker-build')
    
    steps:
      - name: Checkout
        uses: actions/checkout@v4

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Login to GitHub Container Registry
        if: github.event.pull_request.head.repo.full_name == github.repository
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Extract metadata
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ghcr.io/${{ github.repository }}
          tags: |
            # PR-specific tag for testing
            type=raw,value=pr-${{ github.event.number }}-${{ github.sha }}
            type=raw,value=pr-${{ github.event.number }}

      - name: Build and push Docker image (internal PRs)
        if: github.event.pull_request.head.repo.full_name == github.repository
        uses: docker/build-push-action@v5
        with:
          context: .
          platforms: linux/amd64,linux/arm64
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

      - name: Build Docker image (fork PRs)
        if: github.event.pull_request.head.repo.full_name != github.repository
        uses: docker/build-push-action@v5
        with:
          context: .
          platforms: linux/amd64,linux/arm64
          push: false
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

      - name: Add Docker build comment (internal PRs)
        if: github.event.pull_request.head.repo.full_name == github.repository
        uses: marocchino/sticky-pull-request-comment@d2ad0de260ae8b0235ce059e63f2949ba9e05943 # v2.9.3
        with:
          header: docker-build
          message: |
            ## 🐳 Docker Build Complete
            
            **PR**: #${{ github.event.number }} | **Commit**: `${{ github.sha }}`
            
            ```
            ${{ steps.meta.outputs.tags }}
            ```
            
            **Test:** `docker pull ghcr.io/${{ github.repository }}:pr-${{ github.event.number }}`
            
            **Claude config:**
            ```json
            {
              "mcpServers": {
                "zen": {
                  "command": "docker",
                  "args": ["run", "--rm", "-i", "-e", "GEMINI_API_KEY", "ghcr.io/${{ github.repository }}:pr-${{ github.event.number }}"],
                  "env": { "GEMINI_API_KEY": "your-key" }
                }
              }
            }
            ```
            
            💡 Add `docker-build` label to manually trigger builds


      - name: Update job summary (internal PRs)
        if: github.event.pull_request.head.repo.full_name == github.repository
        run: |
          {
            echo "## 🐳 Docker Build Complete"
            echo "**PR**: #${{ github.event.number }} | **Commit**: ${{ github.sha }}"
            echo '```'
            echo "${{ steps.meta.outputs.tags }}"
            echo '```'
          } >> $GITHUB_STEP_SUMMARY

      - name: Update job summary (fork PRs)
        if: github.event.pull_request.head.repo.full_name != github.repository
        run: |
          {
            echo "## 🐳 Docker Build Complete (Build Only)"
            echo "**PR**: #${{ github.event.number }} | **Commit**: ${{ github.sha }}"
            echo "✅ Multi-platform Docker build successful"
            echo "Note: Fork PRs only build (no push) for security"
          } >> $GITHUB_STEP_SUMMARY

```

--------------------------------------------------------------------------------
/clink/parsers/gemini.py:
--------------------------------------------------------------------------------

```python
"""Parser for Gemini CLI JSON output."""

from __future__ import annotations

import json
from typing import Any

from .base import BaseParser, ParsedCLIResponse, ParserError


class GeminiJSONParser(BaseParser):
    """Parse stdout produced by `gemini -o json`."""

    name = "gemini_json"

    def parse(self, stdout: str, stderr: str) -> ParsedCLIResponse:
        if not stdout.strip():
            raise ParserError("Gemini CLI returned empty stdout while JSON output was expected")

        try:
            payload: dict[str, Any] = json.loads(stdout)
        except json.JSONDecodeError as exc:  # pragma: no cover - defensive logging
            raise ParserError(f"Failed to decode Gemini CLI JSON output: {exc}") from exc

        response = payload.get("response")
        response_text = response.strip() if isinstance(response, str) else ""

        metadata: dict[str, Any] = {"raw": payload}

        stats = payload.get("stats")
        if isinstance(stats, dict):
            metadata["stats"] = stats
            models = stats.get("models")
            if isinstance(models, dict) and models:
                model_name = next(iter(models.keys()))
                metadata["model_used"] = model_name
                model_stats = models.get(model_name) or {}
                tokens = model_stats.get("tokens")
                if isinstance(tokens, dict):
                    metadata["token_usage"] = tokens
                api_stats = model_stats.get("api")
                if isinstance(api_stats, dict):
                    metadata["latency_ms"] = api_stats.get("totalLatencyMs")

        if response_text:
            if stderr and stderr.strip():
                metadata["stderr"] = stderr.strip()
            return ParsedCLIResponse(content=response_text, metadata=metadata)

        fallback_message, extra_metadata = self._build_fallback_message(payload, stderr)
        if fallback_message:
            metadata.update(extra_metadata)
            if stderr and stderr.strip():
                metadata["stderr"] = stderr.strip()
            return ParsedCLIResponse(content=fallback_message, metadata=metadata)

        raise ParserError("Gemini CLI response is missing a textual 'response' field")

    def _build_fallback_message(self, payload: dict[str, Any], stderr: str) -> tuple[str | None, dict[str, Any]]:
        """Derive a human friendly message when Gemini returns empty content."""

        stderr_text = stderr.strip() if stderr else ""
        stderr_lower = stderr_text.lower()
        extra_metadata: dict[str, Any] = {"empty_response": True}

        if "429" in stderr_lower or "rate limit" in stderr_lower:
            extra_metadata["rate_limit_status"] = 429
            message = (
                "Gemini request returned no content because the API reported a 429 rate limit. "
                "Retry after reducing the request size or waiting for quota to replenish."
            )
            return message, extra_metadata

        stats = payload.get("stats")
        if isinstance(stats, dict):
            models = stats.get("models")
            if isinstance(models, dict) and models:
                first_model = next(iter(models.values()))
                if isinstance(first_model, dict):
                    api_stats = first_model.get("api")
                    if isinstance(api_stats, dict):
                        total_errors = api_stats.get("totalErrors")
                        total_requests = api_stats.get("totalRequests")
                        if isinstance(total_errors, int) and total_errors > 0:
                            extra_metadata["api_total_errors"] = total_errors
                            if isinstance(total_requests, int):
                                extra_metadata["api_total_requests"] = total_requests
                            message = (
                                "Gemini CLI returned no textual output. The API reported "
                                f"{total_errors} error(s); see stderr for details."
                            )
                            return message, extra_metadata

        if stderr_text:
            message = "Gemini CLI returned no textual output. Raw stderr was preserved for troubleshooting."
            return message, extra_metadata

        return None, extra_metadata

```

--------------------------------------------------------------------------------
/docs/tools/apilookup.md:
--------------------------------------------------------------------------------

```markdown
# API Lookup Tool

The `apilookup` tool ensures you get **current, accurate API/SDK documentation** by forcing the AI to search for the latest information rather than relying on outdated training data. This is especially critical for OS-tied APIs (iOS, macOS, Android, etc.) where the AI's knowledge cutoff may be months or years old.
Most importantly, it does this within in a sub-process / sub-agent, saving you precious tokens within your working context window. 

## Why Use This Tool?

### Without Zen (Using Standard AI)
```
User: "How do I add glass look to a button in Swift?"

AI: [Searches based on training data knowledge cutoff]
    "SwiftUI glass morphism frosted glass effect button iOS 18 2025"

Result: You get outdated APIs for iOS 18, not the iOS 26 effect you're after
```

<div align="center">
    
 [API without Zen](https://github.com/user-attachments/assets/01a79dc9-ad16-4264-9ce1-76a56c3580ee)
 
</div>

### With Zen (Using apilookup)
```
User: "use apilookup how do I add glass look to a button in swift?"

AI: Step 1 - Search: "what is the latest iOS version 2025"
    → Finds: iOS 26 is current

    Step 2 - Search: "iOS 26 SwiftUI glass effect button 2025"
    → Gets current APIs specific to iOS 26

Result: You get the correct, current APIs that work with today's iOS version
```

<div align="center">

[API with Zen](https://github.com/user-attachments/assets/5c847326-4b66-41f7-8f30-f380453dce22)

</div>

## Key Features

### 1. **OS Version Detection** (Critical!)
For any OS-tied request (iOS, macOS, Windows, Android, watchOS, tvOS), `apilookup` **MUST**:
- First search for the current OS version ("what is the latest iOS version 2025")
- **Never** rely on the AI's training data for version numbers
- Only after confirming current version, search for APIs/SDKs for that specific version

### 2. **Authoritative Sources Only**
Prioritizes official documentation:
- Project documentation sites
- GitHub repositories
- Package registries (npm, PyPI, crates.io, Maven Central, etc.)
- Official blogs and release notes

### 3. **Actionable, Concise Results**
- Current version numbers and release dates
- Breaking changes and migration notes
- Code examples and configuration options
- Deprecation warnings and security advisories

## When to Use

- You need current API/SDK documentation or version info
- You're working with OS-specific frameworks (SwiftUI, UIKit, Jetpack Compose, etc.)
- You want to verify which version supports a feature
- You need migration guides or breaking change notes
- You're checking for deprecations or security advisories

## Usage Examples

### OS-Specific APIs
```
use apilookup how do I add glass look to a button in swift?
use apilookup what's the latest way to handle permissions in Android?
use apilookup how do I use the new macOS window management APIs?
```

### Library/Framework Versions
```
use apilookup find the latest Stripe Python SDK version and note any breaking changes since v7
use apilookup what's the current AWS CDK release and list migration steps from v2
use apilookup check the latest React version and any new hooks introduced in 2025
```

### Feature Compatibility
```
use apilookup does the latest TypeScript support decorators natively?
use apilookup what's the current status of Swift async/await on Linux?
```

## How It Works

1. **Receives your query** with API/SDK/framework name
2. **Injects mandatory instructions** that force current-year searches
3. **For OS-tied requests**: Requires two-step search (OS version first, then API)
4. **Returns structured guidance** with instructions for web search
5. **AI executes searches** and provides authoritative, current documentation

## Output Format

The tool returns JSON with:
- `status`: "web_lookup_needed"
- `instructions`: Detailed search strategy and requirements
- `user_prompt`: Your original request

The AI then performs the actual web searches and synthesizes the results into actionable documentation.

## Codex CLI Configuration Reminder

If you use Zen through the Codex CLI, the assistant needs Codex's native web-search tool to fetch current documentation. After adding the Zen MCP entry to `~/.codex/config.toml`, confirm the file also contains:

```toml
[tools]
web_search = true
```

If `[tools]` is missing, append the block manually. Without this flag, `apilookup` will keep requesting web searches that Codex cannot execute, and you'll see repeated attempts at using `curl` incorrectly.

```

--------------------------------------------------------------------------------
/simulator_tests/test_o3_pro_expensive.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
O3-Pro Expensive Model Test

⚠️  WARNING: This test uses o3-pro which is EXTREMELY EXPENSIVE! ⚠️

This test is intentionally NOT added to TEST_REGISTRY to prevent accidental execution.
It can only be run manually using:
    python communication_simulator_test.py --individual o3_pro_expensive

Tests that o3-pro model:
1. Uses the correct /v1/responses endpoint (not /v1/chat/completions)
2. Successfully completes a chat call
3. Returns properly formatted response
"""

from .base_test import BaseSimulatorTest


class O3ProExpensiveTest(BaseSimulatorTest):
    """Test o3-pro model basic functionality - EXPENSIVE, manual only"""

    @property
    def test_name(self) -> str:
        return "o3_pro_expensive"

    @property
    def test_description(self) -> str:
        return "⚠️ EXPENSIVE O3-Pro basic validation (manual only)"

    def run_test(self) -> bool:
        """Test o3-pro model with endpoint verification - EXPENSIVE!"""
        try:
            self.logger.warning("⚠️ ⚠️ ⚠️  EXPENSIVE TEST - O3-PRO COSTS ~$15-60 PER 1K TOKENS! ⚠️ ⚠️ ⚠️")
            self.logger.info("Test: O3-Pro endpoint and functionality test")

            # First, verify we're hitting the right endpoint by checking logs
            self.logger.info("Step 1: Testing o3-pro with chat tool")

            # One simple chat call
            response, tool_result = self.call_mcp_tool(
                "chat",
                {
                    "prompt": "What is 2 + 2?",
                    "model": "o3-pro",
                    "temperature": 1.0,
                },
            )

            if not response:
                self.logger.error("❌ O3-Pro chat call failed - no response")
                if tool_result and "error" in tool_result:
                    error_msg = tool_result["error"]
                    self.logger.error(f"Error details: {error_msg}")
                    # Check if it's the endpoint error we're trying to fix
                    if "v1/responses" in str(error_msg) and "v1/chat/completions" in str(error_msg):
                        self.logger.error(
                            "❌ ENDPOINT BUG DETECTED: o3-pro is trying to use chat/completions instead of responses endpoint!"
                        )
                return False

            # Check the metadata to verify endpoint was used
            if tool_result and isinstance(tool_result, dict):
                metadata = tool_result.get("metadata", {})
                endpoint_used = metadata.get("endpoint", "unknown")

                if endpoint_used == "responses":
                    self.logger.info("✅ Correct endpoint used: /v1/responses")
                else:
                    self.logger.warning(f"⚠️ Endpoint used: {endpoint_used} (expected: responses)")

            # Verify the response content
            if response and "4" in str(response):
                self.logger.info("✅ O3-Pro response is mathematically correct")
            else:
                self.logger.warning(f"⚠️ Unexpected response: {response}")

            self.logger.info("✅ O3-Pro test completed successfully")
            self.logger.warning("💰 Test completed - check your billing!")
            return True

        except Exception as e:
            self.logger.error(f"O3-Pro test failed with exception: {e}")
            # Log the full error for debugging endpoint issues
            import traceback

            self.logger.error(f"Full traceback: {traceback.format_exc()}")
            return False


def main():
    """Run the O3-Pro expensive test"""
    import sys

    print("⚠️ ⚠️ ⚠️  WARNING: This test uses O3-PRO which is EXTREMELY EXPENSIVE! ⚠️ ⚠️ ⚠️")
    print("O3-Pro can cost $15-60 per 1K tokens!")
    print("This is a MINIMAL test but may still cost $5-15!")
    print()

    response = input("Are you absolutely sure you want to run this expensive test? Type 'YES_I_UNDERSTAND_THE_COST': ")
    if response != "YES_I_UNDERSTAND_THE_COST":
        print("❌ Test cancelled")
        sys.exit(1)

    print("💰 Running minimal O3-Pro test...")

    verbose = "--verbose" in sys.argv or "-v" in sys.argv
    test = O3ProExpensiveTest(verbose=verbose)

    success = test.run_test()

    if success:
        print("✅ O3-Pro test completed successfully")
        print("💰 Don't forget to check your billing!")
    else:
        print("❌ O3-Pro test failed")

    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/tests/test_cassette_semantic_matching.py:
--------------------------------------------------------------------------------

```python
"""
Tests for cassette semantic matching to prevent breaks from prompt changes.

This validates that o3 model cassettes match on semantic content (model + user question)
rather than exact request bodies, preventing cassette breaks when system prompts change.
"""

import hashlib
import json

import pytest

from tests.http_transport_recorder import ReplayTransport


class TestCassetteSemanticMatching:
    """Test that cassette matching is resilient to prompt changes."""

    @pytest.fixture
    def dummy_cassette(self, tmp_path):
        """Create a minimal dummy cassette file."""
        cassette_file = tmp_path / "dummy.json"
        cassette_file.write_text(json.dumps({"interactions": []}))
        return cassette_file

    def test_o3_model_semantic_matching(self, dummy_cassette):
        """Test that o3 models use semantic matching."""
        transport = ReplayTransport(str(dummy_cassette))

        # Two requests with same user question but different system prompts
        request1_body = {
            "model": "o3-pro",
            "reasoning": {"effort": "medium"},
            "input": [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "input_text",
                            "text": "System prompt v1...\n\n=== USER REQUEST ===\nWhat is 2 + 2?\n=== END REQUEST ===\n\nMore instructions...",
                        }
                    ],
                }
            ],
        }

        request2_body = {
            "model": "o3-pro",
            "reasoning": {"effort": "medium"},
            "input": [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "input_text",
                            "text": "System prompt v2 (DIFFERENT)...\n\n=== USER REQUEST ===\nWhat is 2 + 2?\n=== END REQUEST ===\n\nDifferent instructions...",
                        }
                    ],
                }
            ],
        }

        # Extract semantic fields - should be identical
        semantic1 = transport._extract_semantic_fields(request1_body)
        semantic2 = transport._extract_semantic_fields(request2_body)

        assert semantic1 == semantic2, "Semantic fields should match despite different prompts"
        assert semantic1["user_question"] == "What is 2 + 2?"
        assert semantic1["model"] == "o3-pro"
        assert semantic1["reasoning"] == {"effort": "medium"}

        # Generate signatures - should be identical
        content1 = json.dumps(semantic1, sort_keys=True)
        content2 = json.dumps(semantic2, sort_keys=True)
        hash1 = hashlib.md5(content1.encode()).hexdigest()
        hash2 = hashlib.md5(content2.encode()).hexdigest()

        assert hash1 == hash2, "Hashes should match for same semantic content"

    def test_non_o3_model_exact_matching(self, dummy_cassette):
        """Test that non-o3 models still use exact matching."""
        transport = ReplayTransport(str(dummy_cassette))

        request_body = {
            "model": "gpt-4",
            "messages": [{"role": "user", "content": "test"}],
        }

        # Should not use semantic matching
        assert not transport._is_o3_model_request(request_body)

    def test_o3_mini_semantic_matching(self, dummy_cassette):
        """Test that o3-mini also uses semantic matching."""
        transport = ReplayTransport(str(dummy_cassette))

        request_body = {
            "model": "o3-mini",
            "reasoning": {"effort": "low"},
            "input": [
                {
                    "role": "user",
                    "content": [
                        {"type": "input_text", "text": "System...\n\n=== USER REQUEST ===\nTest\n=== END REQUEST ==="}
                    ],
                }
            ],
        }

        assert transport._is_o3_model_request(request_body)
        semantic = transport._extract_semantic_fields(request_body)
        assert semantic["model"] == "o3-mini"
        assert semantic["user_question"] == "Test"

    def test_o3_without_request_markers(self, dummy_cassette):
        """Test o3 requests without REQUEST markers fall back to full text."""
        transport = ReplayTransport(str(dummy_cassette))

        request_body = {
            "model": "o3-pro",
            "reasoning": {"effort": "medium"},
            "input": [{"role": "user", "content": [{"type": "input_text", "text": "Just a simple question"}]}],
        }

        semantic = transport._extract_semantic_fields(request_body)
        assert semantic["user_question"] == "Just a simple question"

```

--------------------------------------------------------------------------------
/docs/tools/thinkdeep.md:
--------------------------------------------------------------------------------

```markdown
# ThinkDeep Tool - Extended Reasoning Partner

**Get a second opinion to augment Claude's own extended thinking**

The `thinkdeep` tool provides extended reasoning capabilities, offering a second perspective to augment Claude's analysis. It's designed to challenge assumptions, find edge cases, and provide alternative approaches to complex problems.

## Thinking Mode

**Default is `high` (16,384 tokens) for deep analysis.** Claude will automatically choose the best mode based on complexity - use `low` for quick validations, `medium` for standard problems, `high` for complex issues (default), or `max` for extremely complex challenges requiring deepest analysis.

## Example Prompt

```
Think deeper about my authentication design with pro using max thinking mode and brainstorm to come up 
with the best architecture for my project
```

## Key Features

- **Uses Gemini's specialized thinking models** for enhanced reasoning capabilities
- **Provides a second opinion** on Claude's analysis
- **Challenges assumptions** and identifies edge cases Claude might miss
- **Offers alternative perspectives** and approaches
- **Validates architectural decisions** and design patterns
- **File reference support**: `"Use gemini to think deeper about my API design with reference to api/routes.py"`
- **Image support**: Analyze architectural diagrams, flowcharts, design mockups: `"Think deeper about this system architecture diagram with gemini pro using max thinking mode"`
- **Enhanced Critical Evaluation (v2.10.0)**: After Gemini's analysis, Claude is prompted to critically evaluate the suggestions, consider context and constraints, identify risks, and synthesize a final recommendation - ensuring a balanced, well-considered solution
- **Web search capability**: Automatically identifies areas where current documentation or community solutions would strengthen the analysis and instructs Claude to perform targeted searches

## Tool Parameters

- `prompt`: Your current thinking/analysis to extend and validate (required)
- `model`: auto|pro|flash|flash-2.0|flashlite|o3|o3-mini|o4-mini|gpt4.1|gpt5|gpt5-mini|gpt5-nano (default: server default)
- `problem_context`: Additional context about the problem or goal
- `focus_areas`: Specific aspects to focus on (architecture, performance, security, etc.)
- `files`: Optional file paths or directories for additional context (absolute paths)
- `images`: Optional images for visual analysis (absolute paths)
- `temperature`: Temperature for creative thinking (0-1, default 0.7)
- `thinking_mode`: minimal|low|medium|high|max (default: high, Gemini only)
- `continuation_id`: Continue previous conversations

## Usage Examples

**Architecture Design:**
```
"Think deeper about my microservices authentication strategy with pro using max thinking mode"
```

**With File Context:**
```
"Use gemini to think deeper about my API design with reference to api/routes.py and models/user.py"
```

**Visual Analysis:**
```
"Think deeper about this system architecture diagram with gemini pro - identify potential bottlenecks"
```

**Problem Solving:**
```
"I'm considering using GraphQL vs REST for my API. Think deeper about the trade-offs with o3 using high thinking mode"
```

**Code Review Enhancement:**
```
"Think deeper about the security implications of this authentication code with pro"
```

## Best Practices

- **Provide detailed context**: Share your current thinking, constraints, and objectives
- **Be specific about focus areas**: Mention what aspects need deeper analysis
- **Include relevant files**: Reference code, documentation, or configuration files
- **Use appropriate thinking modes**: Higher modes for complex problems, lower for quick validations
- **Leverage visual context**: Include diagrams or mockups for architectural discussions
- **Build on discussions**: Use continuation to extend previous analyses

## Enhanced Critical Evaluation Process

The `thinkdeep` tool includes a unique two-stage process:

1. **Gemini's Analysis**: Extended reasoning with specialized thinking capabilities
2. **Claude's Critical Evaluation**: Claude reviews Gemini's suggestions, considers:
   - Context and constraints of your specific situation
   - Potential risks and implementation challenges
   - Trade-offs and alternatives
   - Final synthesized recommendation

This ensures you get both deep reasoning and practical, context-aware advice.

## When to Use ThinkDeep vs Other Tools

- **Use `thinkdeep`** for: Extending specific analysis, challenging assumptions, architectural decisions
- **Use `chat`** for: Open-ended brainstorming and general discussions
- **Use `analyze`** for: Understanding existing code without extending analysis
- **Use `codereview`** for: Finding specific bugs and security issues

```

--------------------------------------------------------------------------------
/systemprompts/analyze_prompt.py:
--------------------------------------------------------------------------------

```python
"""
Analyze tool system prompt
"""

ANALYZE_PROMPT = """
ROLE
You are a senior software analyst performing a holistic technical audit of the given code or project. Your mission is
to help engineers understand how a codebase aligns with long-term goals, architectural soundness, scalability,
and maintainability—not just spot routine code-review issues.

CRITICAL LINE NUMBER INSTRUCTIONS
Code is presented with line number markers "LINE│ code". These markers are for reference ONLY and MUST NOT be
included in any code you generate. Always reference specific line numbers in your replies in order to locate
exact positions if needed to point to exact locations. Include a very short code excerpt alongside for clarity.
Include context_start_text and context_end_text as backup references. Never include "LINE│" markers in generated code
snippets.

IF MORE INFORMATION IS NEEDED
If you need additional context (e.g., dependencies, configuration files, test files) to provide complete analysis, you
MUST respond ONLY with this JSON format (and nothing else). Do NOT ask for the same file you've been provided unless
for some reason its content is missing or incomplete:
{
  "status": "files_required_to_continue",
  "mandatory_instructions": "<your critical instructions for the agent>",
  "files_needed": ["[file name here]", "[or some folder/]"]
}

ESCALATE TO A FULL CODEREVIEW IF REQUIRED
If, after thoroughly analysing the question and the provided code, you determine that a comprehensive, code-base–wide
review is essential - e.g., the issue spans multiple modules or exposes a systemic architectural flaw — do not proceed
with partial analysis. Instead, respond ONLY with the JSON below (and nothing else). Clearly state the reason why
you strongly feel this is necessary and ask the agent to inform the user why you're switching to a different tool:
{"status": "full_codereview_required",
 "important": "Please use zen's codereview tool instead",
 "reason": "<brief, specific rationale for escalation>"}

SCOPE & FOCUS
• Understand the code's purpose and architecture and the overall scope and scale of the project
• Identify strengths, risks, and strategic improvement areas that affect future development
• Avoid line-by-line bug hunts or minor style critiques—those are covered by CodeReview
• Recommend practical, proportional changes; no "rip-and-replace" proposals unless the architecture is untenable
• Identify and flag overengineered solutions — excessive abstraction, unnecessary configuration layers, or generic
  frameworks introduced without a clear, current need. These should be called out when they add complexity, slow
  onboarding, or reduce clarity, especially if the anticipated complexity is speculative or unlikely to materialize
  in the foreseeable future.

ANALYSIS STRATEGY
1. Map the tech stack, frameworks, deployment model, and constraints
2. Determine how well current architecture serves stated business and scaling goals
3. Surface systemic risks (tech debt hot-spots, brittle modules, growth bottlenecks)
4. Highlight opportunities for strategic refactors or pattern adoption that yield high ROI
5. Provide clear, actionable insights with just enough detail to guide decision-making

KEY DIMENSIONS (apply as relevant)
• **Architectural Alignment** – layering, domain boundaries, CQRS/eventing, micro-vs-monolith fit
• **Scalability & Performance Trajectory** – data flow, caching strategy, concurrency model
• **Maintainability & Tech Debt** – module cohesion, coupling, code ownership, documentation health
• **Security & Compliance Posture** – systemic exposure points, secrets management, threat surfaces
• **Operational Readiness** – observability, deployment pipeline, rollback/DR strategy
• **Future Proofing** – ease of feature addition, language/version roadmap, community support

DELIVERABLE FORMAT

## Executive Overview
One paragraph summarizing architecture fitness, key risks, and standout strengths.

## Strategic Findings (Ordered by Impact)

### 1. [FINDING NAME]
**Insight:** Very concise statement of what matters and why.
**Evidence:** Specific modules/files/metrics/code illustrating the point.
**Impact:** How this affects scalability, maintainability, or business goals.
**Recommendation:** Actionable next step (e.g., adopt pattern X, consolidate service Y).
**Effort vs. Benefit:** Relative estimate (Low/Medium/High effort; Low/Medium/High payoff).

### 2. [FINDING NAME]
[Repeat format...]

## Quick Wins
Bullet list of low-effort changes offering immediate value.

## Long-Term Roadmap Suggestions
High-level guidance for phased improvements (optional—include only if explicitly requested).

Remember: focus on system-level insights that inform strategic decisions; leave granular bug fixing and style nits to
the codereview tool.
"""

```

--------------------------------------------------------------------------------
/tests/test_prompt_size_limit_bug_fix.py:
--------------------------------------------------------------------------------

```python
"""
Test for the prompt size limit bug fix.

This test verifies that SimpleTool correctly validates only the original user prompt
when conversation history is embedded, rather than validating the full enhanced prompt.
"""

from tools.chat import ChatTool
from tools.shared.base_models import ToolRequest


class TestPromptSizeLimitBugFix:
    """Test that the prompt size limit bug is fixed"""

    def test_prompt_size_validation_with_conversation_history(self):
        """Test that prompt size validation uses original prompt when conversation history is embedded"""

        # Create a ChatTool instance
        tool = ChatTool()

        # Simulate a short user prompt (should not trigger size limit)
        short_user_prompt = "Thanks for the help!"

        # Simulate conversation history (large content)
        conversation_history = "=== CONVERSATION HISTORY ===\n" + ("Previous conversation content. " * 5000)

        # Simulate enhanced prompt with conversation history (what server.py creates)
        enhanced_prompt = f"{conversation_history}\n\n=== NEW USER INPUT ===\n{short_user_prompt}"

        # Simulate server.py behavior: store original prompt in _current_arguments
        tool._current_arguments = {
            "prompt": enhanced_prompt,  # Enhanced with history
            "_original_user_prompt": short_user_prompt,  # Original user input (our fix)
            "model": "local-llama",
        }

        # Test the hook method directly
        validation_content = tool.get_prompt_content_for_size_validation(enhanced_prompt)

        # Should return the original short prompt, not the enhanced prompt
        assert validation_content == short_user_prompt
        assert len(validation_content) == len(short_user_prompt)
        assert len(validation_content) < 1000  # Much smaller than enhanced prompt

        # Verify the enhanced prompt would have triggered the bug
        assert len(enhanced_prompt) > 50000  # This would trigger size limit

        # Test that size check passes with the original prompt
        size_check = tool.check_prompt_size(validation_content)
        assert size_check is None  # No size limit error

        # Test that size check would fail with enhanced prompt
        size_check_enhanced = tool.check_prompt_size(enhanced_prompt)
        assert size_check_enhanced is not None  # Would trigger size limit
        assert size_check_enhanced["status"] == "resend_prompt"

    def test_prompt_size_validation_without_original_prompt(self):
        """Test fallback behavior when no original prompt is stored (new conversations)"""

        tool = ChatTool()

        user_content = "Regular prompt without conversation history"

        # No _current_arguments (new conversation scenario)
        tool._current_arguments = None

        # Should fall back to validating the full user content
        validation_content = tool.get_prompt_content_for_size_validation(user_content)
        assert validation_content == user_content

    def test_prompt_size_validation_with_missing_original_prompt(self):
        """Test fallback when _current_arguments exists but no _original_user_prompt"""

        tool = ChatTool()

        user_content = "Regular prompt without conversation history"

        # _current_arguments exists but no _original_user_prompt field
        tool._current_arguments = {
            "prompt": user_content,
            "model": "local-llama",
            # No _original_user_prompt field
        }

        # Should fall back to validating the full user content
        validation_content = tool.get_prompt_content_for_size_validation(user_content)
        assert validation_content == user_content

    def test_base_tool_default_behavior(self):
        """Test that BaseTool's default implementation validates full content"""

        from tools.shared.base_tool import BaseTool

        # Create a minimal tool implementation for testing
        class TestTool(BaseTool):
            def get_name(self) -> str:
                return "test"

            def get_description(self) -> str:
                return "Test tool"

            def get_input_schema(self) -> dict:
                return {}

            def get_request_model(self):
                return ToolRequest

            def get_system_prompt(self) -> str:
                return "Test system prompt"

            async def prepare_prompt(self, request) -> str:
                return "Test prompt"

            async def execute(self, arguments: dict) -> list:
                return []

        tool = TestTool()
        user_content = "Test content"

        # Default implementation should return the same content
        validation_content = tool.get_prompt_content_for_size_validation(user_content)
        assert validation_content == user_content

```

--------------------------------------------------------------------------------
/tests/test_azure_openai_provider.py:
--------------------------------------------------------------------------------

```python
import sys
import types

import pytest

if "openai" not in sys.modules:  # pragma: no cover - test shim for optional dependency
    stub = types.ModuleType("openai")
    stub.AzureOpenAI = object  # Replaced with a mock inside tests
    sys.modules["openai"] = stub

from providers.azure_openai import AzureOpenAIProvider
from providers.shared import ModelCapabilities, ProviderType


class _DummyResponse:
    def __init__(self):
        self.choices = [
            types.SimpleNamespace(
                message=types.SimpleNamespace(content="hello"),
                finish_reason="stop",
            )
        ]
        self.model = "prod-gpt4o"
        self.id = "resp-123"
        self.created = 0
        self.usage = types.SimpleNamespace(
            prompt_tokens=5,
            completion_tokens=3,
            total_tokens=8,
        )


@pytest.fixture
def dummy_azure_client(monkeypatch):
    captured = {}

    class _DummyAzureClient:
        def __init__(self, **kwargs):
            captured["client_kwargs"] = kwargs
            self.chat = types.SimpleNamespace(completions=types.SimpleNamespace(create=self._create_completion))
            self.responses = types.SimpleNamespace(create=self._create_response)

        def _create_completion(self, **kwargs):
            captured["request_kwargs"] = kwargs
            return _DummyResponse()

        def _create_response(self, **kwargs):
            captured["responses_kwargs"] = kwargs
            return _DummyResponse()

    monkeypatch.delenv("AZURE_OPENAI_ALLOWED_MODELS", raising=False)
    monkeypatch.setattr("providers.azure_openai.AzureOpenAI", _DummyAzureClient)
    return captured


def test_generate_content_uses_deployment_mapping(dummy_azure_client):
    provider = AzureOpenAIProvider(
        api_key="key",
        azure_endpoint="https://example.openai.azure.com/",
        deployments={"gpt-4o": "prod-gpt4o"},
    )

    result = provider.generate_content("hello", "gpt-4o")

    assert dummy_azure_client["request_kwargs"]["model"] == "prod-gpt4o"
    assert result.model_name == "gpt-4o"
    assert result.provider == ProviderType.AZURE
    assert provider.validate_model_name("prod-gpt4o")


def test_generate_content_accepts_deployment_alias(dummy_azure_client):
    provider = AzureOpenAIProvider(
        api_key="key",
        azure_endpoint="https://example.openai.azure.com/",
        deployments={"gpt-4o-mini": "mini-deployment"},
    )

    # Calling with the deployment alias should still resolve properly.
    result = provider.generate_content("hi", "mini-deployment")

    assert dummy_azure_client["request_kwargs"]["model"] == "mini-deployment"
    assert result.model_name == "gpt-4o-mini"


def test_client_initialization_uses_endpoint_and_version(dummy_azure_client):
    provider = AzureOpenAIProvider(
        api_key="key",
        azure_endpoint="https://example.openai.azure.com/",
        api_version="2024-03-15-preview",
        deployments={"gpt-4o": "prod"},
    )

    _ = provider.client

    assert dummy_azure_client["client_kwargs"]["azure_endpoint"] == "https://example.openai.azure.com"
    assert dummy_azure_client["client_kwargs"]["api_version"] == "2024-03-15-preview"


def test_deployment_overrides_capabilities(dummy_azure_client):
    provider = AzureOpenAIProvider(
        api_key="key",
        azure_endpoint="https://example.openai.azure.com/",
        deployments={
            "gpt-4o": {
                "deployment": "prod-gpt4o",
                "friendly_name": "Azure GPT-4o EU",
                "intelligence_score": 19,
                "supports_temperature": False,
                "temperature_constraint": "fixed",
            }
        },
    )

    caps = provider.get_capabilities("gpt-4o")
    assert caps.friendly_name == "Azure GPT-4o EU"
    assert caps.intelligence_score == 19
    assert not caps.supports_temperature


def test_registry_configuration_merges_capabilities(dummy_azure_client, monkeypatch):
    def fake_registry_entries(self):
        capability = ModelCapabilities(
            provider=ProviderType.AZURE,
            model_name="gpt-4o",
            friendly_name="Azure GPT-4o Registry",
            context_window=500_000,
            max_output_tokens=128_000,
        )
        return {"gpt-4o": {"deployment": "registry-deployment", "capability": capability}}

    monkeypatch.setattr(AzureOpenAIProvider, "_load_registry_entries", fake_registry_entries)

    provider = AzureOpenAIProvider(
        api_key="key",
        azure_endpoint="https://example.openai.azure.com/",
    )

    # Capability should come from registry
    caps = provider.get_capabilities("gpt-4o")
    assert caps.friendly_name == "Azure GPT-4o Registry"
    assert caps.context_window == 500_000

    # API call should use deployment defined in registry
    provider.generate_content("hello", "gpt-4o")
    assert dummy_azure_client["request_kwargs"]["model"] == "registry-deployment"

```

--------------------------------------------------------------------------------
/tests/test_openai_compatible_token_usage.py:
--------------------------------------------------------------------------------

```python
"""Tests for OpenAI-compatible provider token usage extraction."""

import unittest
from unittest.mock import Mock

from providers.openai_compatible import OpenAICompatibleProvider


class TestOpenAICompatibleTokenUsage(unittest.TestCase):
    """Test OpenAI-compatible provider token usage handling."""

    def setUp(self):
        """Set up test fixtures."""

        # Create a concrete implementation for testing
        class TestProvider(OpenAICompatibleProvider):
            FRIENDLY_NAME = "Test"
            MODEL_CAPABILITIES = {"test-model": {"context_window": 4096}}

            def get_capabilities(self, model_name):
                return Mock()

            def get_provider_type(self):
                return Mock()

            def validate_model_name(self, model_name):
                return True

            def list_models(self, **kwargs):
                return ["test-model"]

        self.provider = TestProvider("test-key")

    def test_extract_usage_with_valid_tokens(self):
        """Test token extraction with valid token counts."""
        response = Mock()
        response.usage = Mock()
        response.usage.prompt_tokens = 100
        response.usage.completion_tokens = 50
        response.usage.total_tokens = 150

        usage = self.provider._extract_usage(response)

        self.assertEqual(usage["input_tokens"], 100)
        self.assertEqual(usage["output_tokens"], 50)
        self.assertEqual(usage["total_tokens"], 150)

    def test_extract_usage_with_none_prompt_tokens(self):
        """Test token extraction when prompt_tokens is None (regression test for bug)."""
        response = Mock()
        response.usage = Mock()
        response.usage.prompt_tokens = None  # This was causing crashes
        response.usage.completion_tokens = 50
        response.usage.total_tokens = None

        usage = self.provider._extract_usage(response)

        # Should default to 0 when None
        self.assertEqual(usage["input_tokens"], 0)
        self.assertEqual(usage["output_tokens"], 50)
        self.assertEqual(usage["total_tokens"], 0)

    def test_extract_usage_with_none_completion_tokens(self):
        """Test token extraction when completion_tokens is None (regression test for bug)."""
        response = Mock()
        response.usage = Mock()
        response.usage.prompt_tokens = 100
        response.usage.completion_tokens = None  # This was causing crashes
        response.usage.total_tokens = None

        usage = self.provider._extract_usage(response)

        self.assertEqual(usage["input_tokens"], 100)
        # Should default to 0 when None
        self.assertEqual(usage["output_tokens"], 0)
        self.assertEqual(usage["total_tokens"], 0)

    def test_extract_usage_with_all_none_tokens(self):
        """Test token extraction when all token counts are None."""
        response = Mock()
        response.usage = Mock()
        response.usage.prompt_tokens = None
        response.usage.completion_tokens = None
        response.usage.total_tokens = None

        usage = self.provider._extract_usage(response)

        # Should default to 0 for all when None
        self.assertEqual(usage["input_tokens"], 0)
        self.assertEqual(usage["output_tokens"], 0)
        self.assertEqual(usage["total_tokens"], 0)

    def test_extract_usage_without_usage(self):
        """Test token extraction when response has no usage."""
        response = Mock(spec=[])  # No usage attribute

        usage = self.provider._extract_usage(response)

        # Should return empty dict
        self.assertEqual(usage, {})

    def test_extract_usage_with_zero_tokens(self):
        """Test token extraction with zero token counts."""
        response = Mock()
        response.usage = Mock()
        response.usage.prompt_tokens = 0
        response.usage.completion_tokens = 0
        response.usage.total_tokens = 0

        usage = self.provider._extract_usage(response)

        self.assertEqual(usage["input_tokens"], 0)
        self.assertEqual(usage["output_tokens"], 0)
        self.assertEqual(usage["total_tokens"], 0)

    def test_alternative_token_format_with_none(self):
        """Test alternative token format (input_tokens/output_tokens) with None values."""
        # This tests the other code path in generate_content_openai_responses
        # Simulate a response with input_tokens/output_tokens attributes that could be None
        response = Mock()
        response.input_tokens = None  # This was causing crashes
        response.output_tokens = 50

        # Test the pattern: getattr(response, "input_tokens", 0) or 0
        input_tokens = getattr(response, "input_tokens", 0) or 0
        output_tokens = getattr(response, "output_tokens", 0) or 0

        # Should not crash and should handle None gracefully
        self.assertEqual(input_tokens, 0)
        self.assertEqual(output_tokens, 50)

        # Test that addition works
        total = input_tokens + output_tokens
        self.assertEqual(total, 50)


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/tests/test_o3_pro_output_text_fix.py:
--------------------------------------------------------------------------------

```python
"""
Tests for o3-pro output_text parsing fix using HTTP transport recording.

This test validates the fix that uses `response.output_text` convenience field
instead of manually parsing `response.output.content[].text`.

Uses HTTP transport recorder to record real o3-pro API responses at the HTTP level while allowing
the OpenAI SDK to create real response objects that we can test.

RECORDING: To record new responses, delete the cassette file and run with real API keys.
"""

import logging
import os
import tempfile
from pathlib import Path
from unittest.mock import patch

import pytest
from dotenv import load_dotenv

from providers import ModelProviderRegistry
from tests.transport_helpers import inject_transport
from tools.chat import ChatTool

logger = logging.getLogger(__name__)

# Load environment variables from .env file
load_dotenv()

# Use absolute path for cassette directory
cassette_dir = Path(__file__).parent / "openai_cassettes"
cassette_dir.mkdir(exist_ok=True)


@pytest.mark.asyncio
class TestO3ProOutputTextFix:
    """Test o3-pro response parsing fix using respx for HTTP recording/replay."""

    def setup_method(self):
        """Set up the test by ensuring clean registry state."""
        # Use the new public API for registry cleanup
        ModelProviderRegistry.reset_for_testing()
        # Provider registration is now handled by inject_transport helper

        # Clear restriction service to ensure it re-reads environment
        # This is necessary because previous tests may have set restrictions
        # that are cached in the singleton
        import utils.model_restrictions

        utils.model_restrictions._restriction_service = None

    def teardown_method(self):
        """Clean up after test to ensure no state pollution."""
        # Use the new public API for registry cleanup
        ModelProviderRegistry.reset_for_testing()

    @pytest.mark.no_mock_provider  # Disable provider mocking for this test
    @patch.dict(os.environ, {"OPENAI_ALLOWED_MODELS": "o3-pro", "LOCALE": ""})
    async def test_o3_pro_uses_output_text_field(self, monkeypatch):
        """Test that o3-pro parsing uses the output_text convenience field via ChatTool."""
        cassette_path = cassette_dir / "o3_pro_basic_math.json"

        # Check if we need to record or replay
        if not cassette_path.exists():
            # Recording mode - check for real API key
            real_api_key = os.getenv("OPENAI_API_KEY", "").strip()
            if not real_api_key or real_api_key.startswith("dummy"):
                pytest.fail(
                    f"Cassette file not found at {cassette_path}. "
                    "To record: Set OPENAI_API_KEY environment variable to a valid key and run this test. "
                    "Note: Recording will make a real API call to OpenAI."
                )
            # Real API key is available, we'll record the cassette
            logger.debug("🎬 Recording mode: Using real API key to record cassette")
        else:
            # Replay mode - use dummy key
            monkeypatch.setenv("OPENAI_API_KEY", "dummy-key-for-replay")
            logger.debug("📼 Replay mode: Using recorded cassette")

        # Simplified transport injection - just one line!
        inject_transport(monkeypatch, cassette_path)

        # Execute ChatTool test with custom transport
        result = await self._execute_chat_tool_test()

        # Verify the response works correctly
        self._verify_chat_tool_response(result)

        # Verify cassette exists
        assert cassette_path.exists()

    async def _execute_chat_tool_test(self):
        """Execute the ChatTool with o3-pro and return the result."""
        chat_tool = ChatTool()
        with tempfile.TemporaryDirectory() as workdir:
            arguments = {
                "prompt": "What is 2 + 2?",
                "model": "o3-pro",
                "temperature": 1.0,
                "working_directory_absolute_path": workdir,
            }

            return await chat_tool.execute(arguments)

    def _verify_chat_tool_response(self, result):
        """Verify the ChatTool response contains expected data."""
        # Basic response validation
        assert result is not None
        assert isinstance(result, list)
        assert len(result) > 0
        assert result[0].type == "text"

        # Parse JSON response
        import json

        response_data = json.loads(result[0].text)

        # Debug log the response
        logger.debug(f"Response data: {json.dumps(response_data, indent=2)}")

        # Verify response structure - no cargo culting
        if response_data["status"] == "error":
            pytest.fail(f"Chat tool returned error: {response_data.get('error', 'Unknown error')}")
        assert response_data["status"] in ["success", "continuation_available"]
        assert "4" in response_data["content"]

        # Verify o3-pro was actually used
        metadata = response_data["metadata"]
        assert metadata["model_used"] == "o3-pro"
        assert metadata["provider_used"] == "openai"

```

--------------------------------------------------------------------------------
/tools/shared/schema_builders.py:
--------------------------------------------------------------------------------

```python
"""
Core schema building functionality for Zen MCP tools.

This module provides base schema generation functionality for simple tools.
Workflow-specific schema building is located in workflow/schema_builders.py
to maintain proper separation of concerns.
"""

from typing import Any

from .base_models import COMMON_FIELD_DESCRIPTIONS


class SchemaBuilder:
    """
    Base schema builder for simple MCP tools.

    This class provides static methods to build consistent schemas for simple tools.
    Workflow tools use WorkflowSchemaBuilder in workflow/schema_builders.py.
    """

    # Common field schemas that can be reused across all tool types
    COMMON_FIELD_SCHEMAS = {
        "temperature": {
            "type": "number",
            "description": COMMON_FIELD_DESCRIPTIONS["temperature"],
            "minimum": 0.0,
            "maximum": 1.0,
        },
        "thinking_mode": {
            "type": "string",
            "enum": ["minimal", "low", "medium", "high", "max"],
            "description": COMMON_FIELD_DESCRIPTIONS["thinking_mode"],
        },
        "continuation_id": {
            "type": "string",
            "description": COMMON_FIELD_DESCRIPTIONS["continuation_id"],
        },
        "images": {
            "type": "array",
            "items": {"type": "string"},
            "description": COMMON_FIELD_DESCRIPTIONS["images"],
        },
    }

    # Simple tool-specific field schemas (workflow tools use relevant_files instead)
    SIMPLE_FIELD_SCHEMAS = {
        "absolute_file_paths": {
            "type": "array",
            "items": {"type": "string"},
            "description": COMMON_FIELD_DESCRIPTIONS["absolute_file_paths"],
        },
    }

    @staticmethod
    def build_schema(
        tool_specific_fields: dict[str, dict[str, Any]] = None,
        required_fields: list[str] = None,
        model_field_schema: dict[str, Any] = None,
        auto_mode: bool = False,
        require_model: bool = False,
    ) -> dict[str, Any]:
        """
        Build complete schema for simple tools.

        Args:
            tool_specific_fields: Additional fields specific to the tool
            required_fields: List of required field names
            model_field_schema: Schema for the model field
            auto_mode: Whether the tool is in auto mode (affects model requirement)

        Returns:
            Complete JSON schema for the tool
        """
        properties = {}

        # Add common fields (temperature, thinking_mode, etc.)
        properties.update(SchemaBuilder.COMMON_FIELD_SCHEMAS)

        # Add simple tool-specific fields (files field for simple tools)
        properties.update(SchemaBuilder.SIMPLE_FIELD_SCHEMAS)

        # Add model field if provided
        if model_field_schema:
            properties["model"] = model_field_schema

        # Add tool-specific fields if provided
        if tool_specific_fields:
            properties.update(tool_specific_fields)

        # Build required fields list
        required = list(required_fields) if required_fields else []
        if (auto_mode or require_model) and "model" not in required:
            required.append("model")

        # Build the complete schema
        schema = {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "type": "object",
            "properties": properties,
            "additionalProperties": False,
        }

        if required:
            schema["required"] = required

        return schema

    @staticmethod
    def get_common_fields() -> dict[str, dict[str, Any]]:
        """Get the standard field schemas for simple tools."""
        return SchemaBuilder.COMMON_FIELD_SCHEMAS.copy()

    @staticmethod
    def create_field_schema(
        field_type: str,
        description: str,
        enum_values: list[str] = None,
        minimum: float = None,
        maximum: float = None,
        items_type: str = None,
        default: Any = None,
    ) -> dict[str, Any]:
        """
        Helper method to create field schemas with common patterns.

        Args:
            field_type: JSON schema type ("string", "number", "array", etc.)
            description: Human-readable description of the field
            enum_values: For enum fields, list of allowed values
            minimum: For numeric fields, minimum value
            maximum: For numeric fields, maximum value
            items_type: For array fields, type of array items
            default: Default value for the field

        Returns:
            JSON schema object for the field
        """
        schema = {
            "type": field_type,
            "description": description,
        }

        if enum_values:
            schema["enum"] = enum_values

        if minimum is not None:
            schema["minimum"] = minimum

        if maximum is not None:
            schema["maximum"] = maximum

        if items_type and field_type == "array":
            schema["items"] = {"type": items_type}

        if default is not None:
            schema["default"] = default

        return schema

```

--------------------------------------------------------------------------------
/docs/testing.md:
--------------------------------------------------------------------------------

```markdown
# Testing Guide

This project includes comprehensive test coverage through unit tests and integration simulator tests.

## Running Tests

### Prerequisites
- Environment set up: `./run-server.sh`
  - Use `./run-server.sh -f` to automatically follow logs after starting

### Unit Tests

Run all unit tests with pytest:
```bash
# Run all tests with verbose output
python -m pytest -xvs

# Run specific test file
python -m pytest tests/test_providers.py -xvs
```

### Simulator Tests

Simulator tests replicate real-world Claude CLI interactions with the standalone MCP server. Unlike unit tests that test isolated functions, simulator tests validate the complete end-to-end flow including:
- Actual MCP protocol communication
- Standalone server interactions
- Multi-turn conversations across tools
- Log output validation

**Important**: Simulator tests require `LOG_LEVEL=DEBUG` in your `.env` file to validate detailed execution logs.

#### Monitoring Logs During Tests

**Important**: The MCP stdio protocol interferes with stderr output during tool execution. Tool execution logs are written to local log files. This is a known limitation of the stdio-based MCP protocol.

To monitor logs during test execution:

```bash
# Start server and automatically follow logs
./run-server.sh -f

# Or manually monitor main server logs (includes all tool execution details)
tail -f -n 500 logs/mcp_server.log

# Monitor MCP activity logs (tool calls and completions)  
tail -f logs/mcp_activity.log

# Check log file sizes (logs rotate at 20MB)
ls -lh logs/mcp_*.log*
```

**Log Rotation**: All log files are configured with automatic rotation at 20MB to prevent disk space issues. The server keeps:
- 10 rotated files for mcp_server.log (200MB total)
- 5 rotated files for mcp_activity.log (100MB total)

**Why logs appear in files**: The MCP stdio_server captures stderr during tool execution to prevent interference with the JSON-RPC protocol communication. This means tool execution logs are written to files rather than displayed in console output.

#### Running All Simulator Tests
```bash
# Run all simulator tests
python communication_simulator_test.py

# Run with verbose output for debugging
python communication_simulator_test.py --verbose

# Keep server logs after tests for inspection
python communication_simulator_test.py --keep-logs
```

#### Running Individual Tests
To run a single simulator test in isolation (useful for debugging or test development):

```bash
# Run a specific test by name
python communication_simulator_test.py --individual basic_conversation

# Examples of available tests:
python communication_simulator_test.py --individual content_validation
python communication_simulator_test.py --individual cross_tool_continuation
python communication_simulator_test.py --individual memory_validation
```

#### Other Options
```bash
# List all available simulator tests with descriptions
python communication_simulator_test.py --list-tests

# Run multiple specific tests (not all)
python communication_simulator_test.py --tests basic_conversation content_validation

```

### Code Quality Checks

Before committing, ensure all linting passes:
```bash
# Run all linting checks
ruff check .
black --check .
isort --check-only .

# Auto-fix issues
ruff check . --fix
black .
isort .
```

## What Each Test Suite Covers

### Unit Tests
Test isolated components and functions:
- **Provider functionality**: Model initialization, API interactions, capability checks
- **Tool operations**: All MCP tools (chat, analyze, debug, etc.)
- **Conversation memory**: Threading, continuation, history management
- **File handling**: Path validation, token limits, deduplication
- **Auto mode**: Model selection logic and fallback behavior

### HTTP Recording/Replay Tests (HTTP Transport Recorder)
Tests for expensive API calls (like o3-pro) use custom recording/replay:
- **Real API validation**: Tests against actual provider responses
- **Cost efficiency**: Record once, replay forever
- **Provider compatibility**: Validates fixes against real APIs
- Uses HTTP Transport Recorder for httpx-based API calls
- See [HTTP Recording/Replay Testing Guide](./vcr-testing.md) for details

### Simulator Tests
Validate real-world usage scenarios by simulating actual Claude prompts:
- **Basic conversations**: Multi-turn chat functionality with real prompts
- **Cross-tool continuation**: Context preservation across different tools
- **File deduplication**: Efficient handling of repeated file references
- **Model selection**: Proper routing to configured providers
- **Token allocation**: Context window management in practice
- **Redis validation**: Conversation persistence and retrieval

## Contributing

For detailed contribution guidelines, testing requirements, and code quality standards, please see our [Contributing Guide](./contributions.md).

### Quick Testing Reference

```bash
# Run quality checks
./code_quality_checks.sh

# Run unit tests
python -m pytest -xvs

# Run simulator tests (for tool changes)
python communication_simulator_test.py
```

Remember: All tests must pass before submitting a PR. See the [Contributing Guide](./contributions.md) for complete requirements.
```

--------------------------------------------------------------------------------
/clink/parsers/claude.py:
--------------------------------------------------------------------------------

```python
"""Parser for Claude CLI JSON output."""

from __future__ import annotations

import json
from typing import Any

from .base import BaseParser, ParsedCLIResponse, ParserError


class ClaudeJSONParser(BaseParser):
    """Parse stdout produced by `claude --output-format json`."""

    name = "claude_json"

    def parse(self, stdout: str, stderr: str) -> ParsedCLIResponse:
        if not stdout.strip():
            raise ParserError("Claude CLI returned empty stdout while JSON output was expected")

        try:
            loaded = json.loads(stdout)
        except json.JSONDecodeError as exc:  # pragma: no cover - defensive logging
            raise ParserError(f"Failed to decode Claude CLI JSON output: {exc}") from exc

        events: list[dict[str, Any]] | None = None
        assistant_entry: dict[str, Any] | None = None

        if isinstance(loaded, dict):
            payload: dict[str, Any] = loaded
        elif isinstance(loaded, list):
            events = [item for item in loaded if isinstance(item, dict)]
            result_entry = next(
                (item for item in events if item.get("type") == "result" or "result" in item),
                None,
            )
            assistant_entry = next(
                (item for item in reversed(events) if item.get("type") == "assistant"),
                None,
            )
            payload = result_entry or assistant_entry or (events[-1] if events else {})
            if not payload:
                raise ParserError("Claude CLI JSON array did not contain any parsable objects")
        else:
            raise ParserError("Claude CLI returned unexpected JSON payload")

        metadata = self._build_metadata(payload, stderr)
        if events is not None:
            metadata["raw_events"] = events
            metadata["raw"] = loaded

        result = payload.get("result")
        content: str = ""
        if isinstance(result, str):
            content = result.strip()
        elif isinstance(result, list):
            # Some CLI flows may emit a list of strings; join them conservatively.
            joined = [part.strip() for part in result if isinstance(part, str) and part.strip()]
            content = "\n".join(joined)

        if content:
            return ParsedCLIResponse(content=content, metadata=metadata)

        message = self._extract_message(payload)
        if message is None and assistant_entry and assistant_entry is not payload:
            message = self._extract_message(assistant_entry)
        if message:
            return ParsedCLIResponse(content=message, metadata=metadata)

        stderr_text = stderr.strip()
        if stderr_text:
            metadata.setdefault("stderr", stderr_text)
            return ParsedCLIResponse(
                content="Claude CLI returned no textual result. Raw stderr was preserved for troubleshooting.",
                metadata=metadata,
            )

        raise ParserError("Claude CLI response did not contain a textual result")

    def _build_metadata(self, payload: dict[str, Any], stderr: str) -> dict[str, Any]:
        metadata: dict[str, Any] = {
            "raw": payload,
            "is_error": bool(payload.get("is_error")),
        }

        type_field = payload.get("type")
        if isinstance(type_field, str):
            metadata["type"] = type_field
        subtype_field = payload.get("subtype")
        if isinstance(subtype_field, str):
            metadata["subtype"] = subtype_field

        duration_ms = payload.get("duration_ms")
        if isinstance(duration_ms, (int, float)):
            metadata["duration_ms"] = duration_ms
        api_duration = payload.get("duration_api_ms")
        if isinstance(api_duration, (int, float)):
            metadata["duration_api_ms"] = api_duration

        usage = payload.get("usage")
        if isinstance(usage, dict):
            metadata["usage"] = usage

        model_usage = payload.get("modelUsage")
        if isinstance(model_usage, dict) and model_usage:
            metadata["model_usage"] = model_usage
            first_model = next(iter(model_usage.keys()))
            metadata["model_used"] = first_model

        permission_denials = payload.get("permission_denials")
        if isinstance(permission_denials, list) and permission_denials:
            metadata["permission_denials"] = permission_denials

        session_id = payload.get("session_id")
        if isinstance(session_id, str) and session_id:
            metadata["session_id"] = session_id
        uuid_field = payload.get("uuid")
        if isinstance(uuid_field, str) and uuid_field:
            metadata["uuid"] = uuid_field

        stderr_text = stderr.strip()
        if stderr_text:
            metadata.setdefault("stderr", stderr_text)

        return metadata

    def _extract_message(self, payload: dict[str, Any]) -> str | None:
        message = payload.get("message")
        if isinstance(message, str) and message.strip():
            return message.strip()

        error_field = payload.get("error")
        if isinstance(error_field, dict):
            error_message = error_field.get("message")
            if isinstance(error_message, str) and error_message.strip():
                return error_message.strip()

        return None

```

--------------------------------------------------------------------------------
/docs/locale-configuration.md:
--------------------------------------------------------------------------------

```markdown
# Locale Configuration for Zen MCP Server

This guide explains how to configure and use the localization feature to customize the language of responses from MCP tools.

## Overview

The localization feature allows you to specify the language in which MCP tools should respond, while maintaining their analytical capabilities. This is especially useful for non-English speakers who want to receive answers in their native language.

## Configuration

### 1. Environment Variable

Set the language using the `LOCALE` environment variable in your `.env` file:

```bash
# In your .env file
LOCALE=fr-FR
```

### 2. Supported Languages

You can use any standard language code. Examples:

- `fr-FR` - French (France)
- `en-US` - English (United States)
- `zh-CN` - Chinese (Simplified)
- `zh-TW` - Chinese (Traditional)
- `ja-JP` - Japanese
- `ko-KR` - Korean
- `es-ES` - Spanish (Spain)
- `de-DE` - German (Germany)
- `it-IT` - Italian (Italy)
- `pt-PT` - Portuguese (Portugal)
- `ru-RU` - Russian (Russia)
- `ar-SA` - Arabic (Saudi Arabia)

### 3. Default Behavior

If no language is specified (`LOCALE` is empty or unset), tools will default to English.

## Technical Implementation

### Architecture

Localization is implemented in the `BaseTool` class in `tools/shared/base_tool.py`. All tools inherit this feature automatically.

### `get_language_instruction()` Method

```python
def get_language_instruction(self) -> str:
    """
    Generate language instruction based on LOCALE configuration.
    Returns:
        str: Language instruction to prepend to prompt, or empty string if no locale set
    """
    import os

    locale = os.getenv("LOCALE", "").strip()

    if not locale:
        return ""

    return f"Always respond in {locale}.\n\n"
```

### Integration in Tool Execution

The language instruction is automatically prepended to the system prompt of each tool:

```python
# In tools/simple/base.py
base_system_prompt = self.get_system_prompt()
language_instruction = self.get_language_instruction()
system_prompt = language_instruction + base_system_prompt
```

## Usage

### 1. Basic Setup

1. Edit your `.env` file:
   ```bash
   LOCALE=fr-FR
   ```
2. Restart the MCP server:
   ```bash
   ./run-server.sh
   ```
3. Use any tool – responses will be in the specified language.

### 2. Example

**Before (default English):**
```
Tool: chat
Input: "Explain how to use Python dictionaries"
Output: "Python dictionaries are key-value pairs that allow you to store and organize data..."
```

**After (with LOCALE=fr-FR):**
```
Tool: chat
Input: "Explain how to use Python dictionaries"
Output: "Les dictionnaires Python sont des paires clé-valeur qui permettent de stocker et d'organiser des données..."
```

### 3. Affected Tools

All MCP tools are affected by this configuration:

- `chat` – General conversation
- `codereview` – Code review
- `analyze` – Code analysis
- `debug` – Debugging
- `refactor` – Refactoring
- `thinkdeep` – Deep thinking
- `consensus` – Model consensus
- And all other tools...

## Best Practices

### 1. Language Choice
- Use standard language codes (ISO 639-1 with ISO 3166-1 country codes)
- Be specific with regional variants if needed (e.g., `zh-CN` vs `zh-TW`)

### 2. Consistency
- Use the same language setting across your team for consistency
- Document the chosen language in your team documentation

### 3. Testing
- Test the configuration with different tools to ensure consistency

## Troubleshooting

### Issue: Language does not change
**Solution:**
1. Check that the `LOCALE` variable is correctly set in `.env`
2. Fully restart the MCP server
3. Ensure there are no extra spaces in the value

### Issue: Partially translated responses
**Explanation:**
- AI models may sometimes mix languages
- This depends on the multilingual capabilities of the model used
- Technical terms may remain in English

### Issue: Configuration errors
**Solution:**
1. Check the syntax of your `.env` file
2. Make sure there are no quotes around the value

## Advanced Customization

### Customizing the Language Instruction

To customize the language instruction, modify the `get_language_instruction()` method in `tools/shared/base_tool.py`:

```python
def get_language_instruction(self) -> str:
    import os

    locale = os.getenv("LOCALE", "").strip()

    if not locale:
        return ""
    # Custom instruction
    return f"Always respond in {locale} and use a professional tone.\n\n"
```

### Per-Tool Customization

You can also override the method in specific tools for custom behavior:

```python
class MyCustomTool(SimpleTool):
    def get_language_instruction(self) -> str:
        import os

        locale = os.getenv("LOCALE", "").strip()

        if locale == "fr-FR":
            return "Respond in French with precise technical vocabulary.\n\n"
        elif locale == "zh-CN":
            return "请用中文回答,使用专业术语。\n\n"
        else:
            return super().get_language_instruction()
```

## Integration with Other Features

Localization works with all other MCP server features:

- **Conversation threading** – Multilingual conversations are supported
- **File processing** – File analysis is in the specified language
- **Web search** – Search instructions remain functional
- **Model selection** – Works with all supported models

```

--------------------------------------------------------------------------------
/conf/gemini_models.json:
--------------------------------------------------------------------------------

```json
{
  "_README": {
    "description": "Model metadata for Google's Gemini API access.",
    "documentation": "https://github.com/BeehiveInnovations/zen-mcp-server/blob/main/docs/custom_models.md",
    "usage": "Models listed here are exposed directly through the Gemini provider. Aliases are case-insensitive.",
    "field_notes": "Matches providers/shared/model_capabilities.py.",
    "field_descriptions": {
      "model_name": "The model identifier (e.g., 'gemini-2.5-pro', 'gemini-2.0-flash')",
      "aliases": "Array of short names users can type instead of the full model name",
      "context_window": "Total number of tokens the model can process (input + output combined)",
      "max_output_tokens": "Maximum number of tokens the model can generate in a single response",
      "max_thinking_tokens": "Maximum reasoning/thinking tokens the model will allocate when extended thinking is requested",
      "supports_extended_thinking": "Whether the model supports extended reasoning tokens (currently none do via OpenRouter or custom APIs)",
      "supports_json_mode": "Whether the model can guarantee valid JSON output",
      "supports_function_calling": "Whether the model supports function/tool calling",
      "supports_images": "Whether the model can process images/visual input",
      "max_image_size_mb": "Maximum total size in MB for all images combined (capped at 40MB max for custom models)",
      "supports_temperature": "Whether the model accepts temperature parameter in API calls (set to false for O3/O4 reasoning models)",
      "temperature_constraint": "Type of temperature constraint: 'fixed' (fixed value), 'range' (continuous range), 'discrete' (specific values), or omit for default range",
      "use_openai_response_api": "Set to true when the model must use the /responses endpoint (reasoning models like GPT-5 Pro). Leave false/omit for standard chat completions.",
      "default_reasoning_effort": "Default reasoning effort level for models that support it (e.g., 'low', 'medium', 'high'). Omit if not applicable.",
      "description": "Human-readable description of the model",
      "intelligence_score": "1-20 human rating used as the primary signal for auto-mode model ordering",
      "allow_code_generation": "Whether this model can generate and suggest fully working code - complete with functions, files, and detailed implementation instructions - for your AI tool to use right away. Only set this to 'true' for a model more capable than the AI model / CLI you're currently using."
    }
  },
  "models": [
    {
      "model_name": "gemini-2.5-pro",
      "friendly_name": "Gemini (Pro 2.5)",
      "aliases": [
        "pro",
        "gemini pro",
        "gemini-pro"
      ],
      "intelligence_score": 18,
      "description": "Deep reasoning + thinking mode (1M context) - Complex problems, architecture, deep analysis",
      "context_window": 1048576,
      "max_output_tokens": 65536,
      "max_thinking_tokens": 32768,
      "supports_extended_thinking": true,
      "supports_system_prompts": true,
      "supports_streaming": true,
      "supports_function_calling": true,
      "supports_json_mode": true,
      "supports_images": true,
      "supports_temperature": true,
      "allow_code_generation": true,
      "max_image_size_mb": 32.0
    },
    {
      "model_name": "gemini-2.0-flash",
      "friendly_name": "Gemini (Flash 2.0)",
      "aliases": [
        "flash-2.0",
        "flash2"
      ],
      "intelligence_score": 9,
      "description": "Gemini 2.0 Flash (1M context) - Latest fast model with experimental thinking, supports audio/video input",
      "context_window": 1048576,
      "max_output_tokens": 65536,
      "max_thinking_tokens": 24576,
      "supports_extended_thinking": true,
      "supports_system_prompts": true,
      "supports_streaming": true,
      "supports_function_calling": true,
      "supports_json_mode": true,
      "supports_images": true,
      "supports_temperature": true,
      "max_image_size_mb": 20.0
    },
    {
      "model_name": "gemini-2.0-flash-lite",
      "friendly_name": "Gemini (Flash Lite 2.0)",
      "aliases": [
        "flashlite",
        "flash-lite"
      ],
      "intelligence_score": 7,
      "description": "Gemini 2.0 Flash Lite (1M context) - Lightweight fast model, text-only",
      "context_window": 1048576,
      "max_output_tokens": 65536,
      "supports_extended_thinking": false,
      "supports_system_prompts": true,
      "supports_streaming": true,
      "supports_function_calling": true,
      "supports_json_mode": true,
      "supports_images": false,
      "supports_temperature": true
    },
    {
      "model_name": "gemini-2.5-flash",
      "friendly_name": "Gemini (Flash 2.5)",
      "aliases": [
        "flash",
        "flash2.5"
      ],
      "intelligence_score": 10,
      "description": "Ultra-fast (1M context) - Quick analysis, simple queries, rapid iterations",
      "context_window": 1048576,
      "max_output_tokens": 65536,
      "max_thinking_tokens": 24576,
      "supports_extended_thinking": true,
      "supports_system_prompts": true,
      "supports_streaming": true,
      "supports_function_calling": true,
      "supports_json_mode": true,
      "supports_images": true,
      "supports_temperature": true,
      "max_image_size_mb": 20.0
    }
  ]
}

```

--------------------------------------------------------------------------------
/tests/test_uvx_resource_packaging.py:
--------------------------------------------------------------------------------

```python
"""Tests for uvx path resolution functionality."""

import json
import tempfile
from pathlib import Path
from unittest.mock import patch

from providers.registries.openrouter import OpenRouterModelRegistry


class TestUvxPathResolution:
    """Test uvx path resolution for OpenRouter model registry."""

    def test_normal_operation(self):
        """Test that normal operation works in development environment."""
        registry = OpenRouterModelRegistry()
        assert len(registry.list_models()) > 0
        assert len(registry.list_aliases()) > 0

    def test_config_path_resolution(self):
        """Test that the config path resolution finds the config file in multiple locations."""
        # Check that the config file exists in the development location
        config_file = Path(__file__).parent.parent / "conf" / "openrouter_models.json"
        assert config_file.exists(), "Config file should exist in conf/openrouter_models.json"

        # Test that a registry can find and use the config
        registry = OpenRouterModelRegistry()

        # When using resources, config_path is None; when using file system, it should exist
        if registry.use_resources:
            assert registry.config_path is None, "When using resources, config_path should be None"
        else:
            assert registry.config_path.exists(), "When using file system, config path should exist"

        assert len(registry.list_models()) > 0, "Registry should load models from config"

    def test_explicit_config_path_override(self):
        """Test that explicit config path works correctly."""
        config_path = Path(__file__).parent.parent / "conf" / "openrouter_models.json"

        registry = OpenRouterModelRegistry(config_path=str(config_path))

        # Should use the provided file path
        assert registry.config_path == config_path
        assert len(registry.list_models()) > 0

    def test_environment_variable_override(self):
        """Test that CUSTOM_MODELS_CONFIG_PATH environment variable works."""
        config_path = Path(__file__).parent.parent / "conf" / "openrouter_models.json"

        with patch.dict("os.environ", {"OPENROUTER_MODELS_CONFIG_PATH": str(config_path)}):
            registry = OpenRouterModelRegistry()

            # Should use environment path
            assert registry.config_path == config_path
            assert len(registry.list_models()) > 0

    @patch("providers.registries.base.importlib.resources.files")
    def test_multiple_path_fallback(self, mock_files):
        """Test that file-system fallback works when resource loading fails."""
        mock_files.side_effect = Exception("Resource loading failed")

        with tempfile.TemporaryDirectory() as tmpdir:
            temp_dir = Path(tmpdir)
            conf_dir = temp_dir / "conf"
            conf_dir.mkdir(parents=True, exist_ok=True)
            config_path = conf_dir / "openrouter_models.json"
            config_path.write_text(
                json.dumps(
                    {
                        "models": [
                            {
                                "model_name": "test/model",
                                "aliases": ["testalias"],
                                "context_window": 1024,
                                "max_output_tokens": 512,
                            }
                        ]
                    },
                    indent=2,
                )
            )

            original_exists = Path.exists

            def fake_exists(path_self):
                if str(path_self).endswith("conf/openrouter_models.json") and path_self != config_path:
                    return False
                if path_self == config_path:
                    return True
                return original_exists(path_self)

            with patch("pathlib.Path.cwd", return_value=temp_dir), patch("pathlib.Path.exists", fake_exists):
                registry = OpenRouterModelRegistry()

            assert not registry.use_resources
            assert registry.config_path == config_path
            assert "test/model" in registry.list_models()

    def test_missing_config_handling(self):
        """Test behavior when config file is missing."""
        # Use a non-existent path
        with patch.dict("os.environ", {}, clear=True):
            registry = OpenRouterModelRegistry(config_path="/nonexistent/path/config.json")

        # Should gracefully handle missing config
        assert len(registry.list_models()) == 0
        assert len(registry.list_aliases()) == 0

    def test_resource_loading_success(self):
        """Test successful resource loading via importlib.resources."""
        # Just test that the registry works normally in our environment
        # This validates the resource loading mechanism indirectly
        registry = OpenRouterModelRegistry()

        # Should load successfully using either resources or file system fallback
        assert len(registry.list_models()) > 0
        assert len(registry.list_aliases()) > 0

    def test_use_resources_attribute(self):
        """Test that the use_resources attribute is properly set."""
        registry = OpenRouterModelRegistry()

        # Should have the use_resources attribute
        assert hasattr(registry, "use_resources")
        assert isinstance(registry.use_resources, bool)

```

--------------------------------------------------------------------------------
/tests/test_listmodels.py:
--------------------------------------------------------------------------------

```python
"""Tests for the ListModels tool"""

import json
import os
from unittest.mock import patch

import pytest
from mcp.types import TextContent

from tools.listmodels import ListModelsTool


class TestListModelsTool:
    """Test the ListModels tool functionality"""

    @pytest.fixture
    def tool(self):
        """Create a ListModelsTool instance"""
        return ListModelsTool()

    def test_tool_metadata(self, tool):
        """Test tool has correct metadata"""
        assert tool.name == "listmodels"
        assert "model providers" in tool.description
        assert tool.get_request_model().__name__ == "ToolRequest"

    @pytest.mark.asyncio
    async def test_execute_with_no_providers(self, tool):
        """Test listing models with no providers configured"""
        with patch.dict(os.environ, {}, clear=True):
            # Set auto mode
            os.environ["DEFAULT_MODEL"] = "auto"

            result = await tool.execute({})

            assert len(result) == 1
            assert isinstance(result[0], TextContent)

            # Parse JSON response
            response = json.loads(result[0].text)
            assert response["status"] == "success"

            content = response["content"]

            # Check that providers show as not configured
            assert "Google Gemini ❌" in content
            assert "OpenAI ❌" in content
            assert "X.AI (Grok) ❌" in content
            assert "OpenRouter ❌" in content
            assert "Custom/Local API ❌" in content

            # Check summary shows 0 configured
            assert "**Configured Providers**: 0" in content

    @pytest.mark.asyncio
    async def test_execute_with_gemini_configured(self, tool):
        """Test listing models with Gemini configured"""
        env_vars = {"GEMINI_API_KEY": "test-key", "DEFAULT_MODEL": "auto"}

        with patch.dict(os.environ, env_vars, clear=True):
            result = await tool.execute({})

            response = json.loads(result[0].text)
            content = response["content"]

            # Check Gemini shows as configured
            assert "Google Gemini ✅" in content
            assert "`flash` → `gemini-2.5-flash`" in content
            assert "`pro` → `gemini-2.5-pro`" in content
            assert "1M context" in content
            assert "Supports structured code generation" in content

            # Check summary
            assert "**Configured Providers**: 1" in content

    @pytest.mark.asyncio
    async def test_execute_with_multiple_providers(self, tool):
        """Test listing models with multiple providers configured"""
        env_vars = {
            "GEMINI_API_KEY": "test-key",
            "OPENAI_API_KEY": "test-key",
            "XAI_API_KEY": "test-key",
            "DEFAULT_MODEL": "auto",
        }

        with patch.dict(os.environ, env_vars, clear=True):
            result = await tool.execute({})

            response = json.loads(result[0].text)
            content = response["content"]

            # Check all show as configured
            assert "Google Gemini ✅" in content
            assert "OpenAI ✅" in content
            assert "X.AI (Grok) ✅" in content

            # Check models are listed
            assert "`o3`" in content
            assert "`grok`" in content

            # Check summary
            assert "**Configured Providers**: 3" in content

    @pytest.mark.asyncio
    async def test_execute_with_openrouter(self, tool):
        """Test listing models with OpenRouter configured"""
        env_vars = {"OPENROUTER_API_KEY": "test-key", "DEFAULT_MODEL": "auto"}

        with patch.dict(os.environ, env_vars, clear=True):
            result = await tool.execute({})

            response = json.loads(result[0].text)
            content = response["content"]

            # Check OpenRouter shows as configured
            assert "OpenRouter ✅" in content
            assert "Access to multiple cloud AI providers" in content

            # Should show some models (mocked registry will have some)
            assert "Available Models" in content

    @pytest.mark.asyncio
    async def test_execute_with_custom_api(self, tool):
        """Test listing models with custom API configured"""
        env_vars = {"CUSTOM_API_URL": "http://localhost:11434", "DEFAULT_MODEL": "auto"}

        with patch.dict(os.environ, env_vars, clear=True):
            result = await tool.execute({})

            response = json.loads(result[0].text)
            content = response["content"]

            # Check Custom API shows as configured
            assert "Custom/Local API ✅" in content
            assert "http://localhost:11434" in content
            assert "Local models via Ollama" in content

    @pytest.mark.asyncio
    async def test_output_includes_usage_tips(self, tool):
        """Test that output includes helpful usage tips"""
        result = await tool.execute({})

        response = json.loads(result[0].text)
        content = response["content"]

        # Check for usage tips
        assert "**Usage Tips**:" in content
        assert "Use model aliases" in content
        assert "auto mode" in content

    def test_model_category(self, tool):
        """Test that tool uses FAST_RESPONSE category"""
        from tools.models import ToolModelCategory

        assert tool.get_model_category() == ToolModelCategory.FAST_RESPONSE

```

--------------------------------------------------------------------------------
/providers/openai.py:
--------------------------------------------------------------------------------

```python
"""OpenAI model provider implementation."""

import logging
from typing import TYPE_CHECKING, ClassVar, Optional

if TYPE_CHECKING:
    from tools.models import ToolModelCategory

from .openai_compatible import OpenAICompatibleProvider
from .registries.openai import OpenAIModelRegistry
from .registry_provider_mixin import RegistryBackedProviderMixin
from .shared import ModelCapabilities, ProviderType

logger = logging.getLogger(__name__)


class OpenAIModelProvider(RegistryBackedProviderMixin, OpenAICompatibleProvider):
    """Implementation that talks to api.openai.com using rich model metadata.

    In addition to the built-in catalogue, the provider can surface models
    defined in ``conf/custom_models.json`` (for organisations running their own
    OpenAI-compatible gateways) while still respecting restriction policies.
    """

    REGISTRY_CLASS = OpenAIModelRegistry
    MODEL_CAPABILITIES: ClassVar[dict[str, ModelCapabilities]] = {}

    def __init__(self, api_key: str, **kwargs):
        """Initialize OpenAI provider with API key."""
        self._ensure_registry()
        # Set default OpenAI base URL, allow override for regions/custom endpoints
        kwargs.setdefault("base_url", "https://api.openai.com/v1")
        super().__init__(api_key, **kwargs)
        self._invalidate_capability_cache()

    # ------------------------------------------------------------------
    # Capability surface
    # ------------------------------------------------------------------

    def _lookup_capabilities(
        self,
        canonical_name: str,
        requested_name: Optional[str] = None,
    ) -> Optional[ModelCapabilities]:
        """Look up OpenAI capabilities from built-ins or the custom registry."""

        self._ensure_registry()
        builtin = super()._lookup_capabilities(canonical_name, requested_name)
        if builtin is not None:
            return builtin

        try:
            from .registries.openrouter import OpenRouterModelRegistry

            registry = OpenRouterModelRegistry()
            config = registry.get_model_config(canonical_name)

            if config and config.provider == ProviderType.OPENAI:
                return config

        except Exception as exc:  # pragma: no cover - registry failures are non-critical
            logger.debug(f"Could not resolve custom OpenAI model '{canonical_name}': {exc}")

        return None

    def _finalise_capabilities(
        self,
        capabilities: ModelCapabilities,
        canonical_name: str,
        requested_name: str,
    ) -> ModelCapabilities:
        """Ensure registry-sourced models report the correct provider type."""

        if capabilities.provider != ProviderType.OPENAI:
            capabilities.provider = ProviderType.OPENAI
        return capabilities

    def _raise_unsupported_model(self, model_name: str) -> None:
        raise ValueError(f"Unsupported OpenAI model: {model_name}")

    # ------------------------------------------------------------------
    # Provider identity
    # ------------------------------------------------------------------

    def get_provider_type(self) -> ProviderType:
        """Get the provider type."""
        return ProviderType.OPENAI

    # ------------------------------------------------------------------
    # Provider preferences
    # ------------------------------------------------------------------

    def get_preferred_model(self, category: "ToolModelCategory", allowed_models: list[str]) -> Optional[str]:
        """Get OpenAI's preferred model for a given category from allowed models.

        Args:
            category: The tool category requiring a model
            allowed_models: Pre-filtered list of models allowed by restrictions

        Returns:
            Preferred model name or None
        """
        from tools.models import ToolModelCategory

        if not allowed_models:
            return None

        # Helper to find first available from preference list
        def find_first(preferences: list[str]) -> Optional[str]:
            """Return first available model from preference list."""
            for model in preferences:
                if model in allowed_models:
                    return model
            return None

        if category == ToolModelCategory.EXTENDED_REASONING:
            # Prefer models with extended thinking support
            # GPT-5-Codex first for coding tasks
            preferred = find_first(["gpt-5-codex", "gpt-5-pro", "o3", "o3-pro", "gpt-5"])
            return preferred if preferred else allowed_models[0]

        elif category == ToolModelCategory.FAST_RESPONSE:
            # Prefer fast, cost-efficient models
            # GPT-5 models for speed, GPT-5-Codex after (premium pricing but cached)
            preferred = find_first(["gpt-5", "gpt-5-mini", "gpt-5-codex", "o4-mini", "o3-mini"])
            return preferred if preferred else allowed_models[0]

        else:  # BALANCED or default
            # Prefer balanced performance/cost models
            # Include GPT-5-Codex for coding workflows
            preferred = find_first(["gpt-5", "gpt-5-codex", "gpt-5-pro", "gpt-5-mini", "o4-mini", "o3-mini"])
            return preferred if preferred else allowed_models[0]


# Load registry data at import time so dependent providers (Azure) can reuse it
OpenAIModelProvider._ensure_registry()

```

--------------------------------------------------------------------------------
/simulator_tests/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Communication Simulator Tests Package

This package contains individual test modules for the Zen MCP Communication Simulator.
Each test is in its own file for better organization and maintainability.
"""

from .base_test import BaseSimulatorTest
from .test_analyze_validation import AnalyzeValidationTest
from .test_basic_conversation import BasicConversationTest
from .test_chat_simple_validation import ChatSimpleValidationTest
from .test_codereview_validation import CodeReviewValidationTest
from .test_consensus_conversation import TestConsensusConversation
from .test_consensus_three_models import TestConsensusThreeModels
from .test_consensus_workflow_accurate import TestConsensusWorkflowAccurate
from .test_content_validation import ContentValidationTest
from .test_conversation_chain_validation import ConversationChainValidationTest
from .test_cross_tool_comprehensive import CrossToolComprehensiveTest
from .test_cross_tool_continuation import CrossToolContinuationTest
from .test_debug_certain_confidence import DebugCertainConfidenceTest
from .test_debug_validation import DebugValidationTest
from .test_line_number_validation import LineNumberValidationTest
from .test_logs_validation import LogsValidationTest
from .test_model_thinking_config import TestModelThinkingConfig
from .test_o3_model_selection import O3ModelSelectionTest
from .test_o3_pro_expensive import O3ProExpensiveTest
from .test_ollama_custom_url import OllamaCustomUrlTest
from .test_openrouter_fallback import OpenRouterFallbackTest
from .test_openrouter_models import OpenRouterModelsTest
from .test_per_tool_deduplication import PerToolDeduplicationTest
from .test_planner_continuation_history import PlannerContinuationHistoryTest
from .test_planner_validation import PlannerValidationTest
from .test_precommitworkflow_validation import PrecommitWorkflowValidationTest
from .test_prompt_size_limit_bug import PromptSizeLimitBugTest

# Redis validation test removed - no longer needed for standalone server
from .test_refactor_validation import RefactorValidationTest
from .test_secaudit_validation import SecauditValidationTest
from .test_testgen_validation import TestGenValidationTest
from .test_thinkdeep_validation import ThinkDeepWorkflowValidationTest
from .test_token_allocation_validation import TokenAllocationValidationTest
from .test_vision_capability import VisionCapabilityTest
from .test_xai_models import XAIModelsTest

# Test registry for dynamic loading
TEST_REGISTRY = {
    "basic_conversation": BasicConversationTest,
    "chat_validation": ChatSimpleValidationTest,
    "codereview_validation": CodeReviewValidationTest,
    "content_validation": ContentValidationTest,
    "per_tool_deduplication": PerToolDeduplicationTest,
    "cross_tool_continuation": CrossToolContinuationTest,
    "cross_tool_comprehensive": CrossToolComprehensiveTest,
    "line_number_validation": LineNumberValidationTest,
    "logs_validation": LogsValidationTest,
    # "redis_validation": RedisValidationTest,  # Removed - no longer needed for standalone server
    "model_thinking_config": TestModelThinkingConfig,
    "o3_model_selection": O3ModelSelectionTest,
    "ollama_custom_url": OllamaCustomUrlTest,
    "openrouter_fallback": OpenRouterFallbackTest,
    "openrouter_models": OpenRouterModelsTest,
    "planner_validation": PlannerValidationTest,
    "planner_continuation_history": PlannerContinuationHistoryTest,
    "precommit_validation": PrecommitWorkflowValidationTest,
    "token_allocation_validation": TokenAllocationValidationTest,
    "testgen_validation": TestGenValidationTest,
    "thinkdeep_validation": ThinkDeepWorkflowValidationTest,
    "refactor_validation": RefactorValidationTest,
    "secaudit_validation": SecauditValidationTest,
    "debug_validation": DebugValidationTest,
    "debug_certain_confidence": DebugCertainConfidenceTest,
    "conversation_chain_validation": ConversationChainValidationTest,
    "vision_capability": VisionCapabilityTest,
    "xai_models": XAIModelsTest,
    "consensus_conversation": TestConsensusConversation,
    "consensus_workflow_accurate": TestConsensusWorkflowAccurate,
    "consensus_three_models": TestConsensusThreeModels,
    "analyze_validation": AnalyzeValidationTest,
    "prompt_size_limit_bug": PromptSizeLimitBugTest,
    # "o3_pro_expensive": O3ProExpensiveTest,  # COMMENTED OUT - too expensive to run by default
}

__all__ = [
    "BaseSimulatorTest",
    "BasicConversationTest",
    "ChatSimpleValidationTest",
    "CodeReviewValidationTest",
    "ContentValidationTest",
    "PerToolDeduplicationTest",
    "CrossToolContinuationTest",
    "CrossToolComprehensiveTest",
    "LineNumberValidationTest",
    "LogsValidationTest",
    "TestModelThinkingConfig",
    "O3ModelSelectionTest",
    "O3ProExpensiveTest",
    "OllamaCustomUrlTest",
    "OpenRouterFallbackTest",
    "OpenRouterModelsTest",
    "PlannerValidationTest",
    "PlannerContinuationHistoryTest",
    "PrecommitWorkflowValidationTest",
    "TokenAllocationValidationTest",
    "TestGenValidationTest",
    "ThinkDeepWorkflowValidationTest",
    "RefactorValidationTest",
    "SecauditValidationTest",
    "DebugValidationTest",
    "DebugCertainConfidenceTest",
    "ConversationChainValidationTest",
    "VisionCapabilityTest",
    "XAIModelsTest",
    "TestConsensusConversation",
    "TestConsensusWorkflowAccurate",
    "TestConsensusThreeModels",
    "AnalyzeValidationTest",
    "PromptSizeLimitBugTest",
    "TEST_REGISTRY",
]

```

--------------------------------------------------------------------------------
/tests/test_model_resolution_bug.py:
--------------------------------------------------------------------------------

```python
"""
Test to reproduce and fix the OpenRouter model name resolution bug.

This test specifically targets the bug where:
1. User specifies "gemini" in consensus tool
2. System incorrectly resolves to "gemini-2.5-pro" instead of "google/gemini-2.5-pro"
3. OpenRouter API returns "gemini-2.5-pro is not a valid model ID"
"""

from unittest.mock import Mock, patch

from providers.openrouter import OpenRouterProvider
from providers.shared import ProviderType
from tools.consensus import ConsensusTool


class TestModelResolutionBug:
    """Test cases for the OpenRouter model name resolution bug."""

    def setup_method(self):
        """Setup test environment."""
        self.consensus_tool = ConsensusTool()

    def test_openrouter_registry_resolves_gemini_alias(self):
        """Test that OpenRouter registry properly resolves 'gemini' to 'google/gemini-2.5-pro'."""
        # Test the registry directly
        provider = OpenRouterProvider("test_key")

        # Test alias resolution
        resolved_model_name = provider._resolve_model_name("gemini")
        assert (
            resolved_model_name == "google/gemini-2.5-pro"
        ), f"Expected 'google/gemini-2.5-pro', got '{resolved_model_name}'"

        # Test that it also works with 'pro' alias
        resolved_pro = provider._resolve_model_name("pro")
        assert resolved_pro == "google/gemini-2.5-pro", f"Expected 'google/gemini-2.5-pro', got '{resolved_pro}'"

    # DELETED: test_provider_registry_returns_openrouter_for_gemini
    # This test had a flawed mock setup - it mocked get_provider() but called get_provider_for_model().
    # The test was trying to verify OpenRouter model resolution functionality that is already
    # comprehensively tested in working OpenRouter provider tests.

    @patch.dict("os.environ", {"OPENROUTER_API_KEY": "test_key"}, clear=False)
    def test_consensus_tool_model_resolution_bug_reproduction(self):
        """Test that the new consensus workflow tool properly handles OpenRouter model resolution."""
        import asyncio

        # Create a mock OpenRouter provider that tracks what model names it receives
        mock_provider = Mock(spec=OpenRouterProvider)
        mock_provider.get_provider_type.return_value = ProviderType.OPENROUTER

        # Mock response for successful generation
        mock_response = Mock()
        mock_response.content = "Test response"
        mock_response.usage = None
        mock_provider.generate_content.return_value = mock_response

        # Track the model name passed to generate_content
        received_model_names = []

        def track_generate_content(*args, **kwargs):
            received_model_names.append(kwargs.get("model_name", args[1] if len(args) > 1 else "unknown"))
            return mock_response

        mock_provider.generate_content.side_effect = track_generate_content

        # Mock the get_model_provider to return our mock
        with patch.object(self.consensus_tool, "get_model_provider", return_value=mock_provider):
            # Set initial prompt
            self.consensus_tool.initial_prompt = "Test prompt"

            # Create a mock request
            request = Mock()
            request.relevant_files = []
            request.continuation_id = None
            request.images = None

            # Test model consultation directly
            result = asyncio.run(self.consensus_tool._consult_model({"model": "gemini", "stance": "neutral"}, request))

            # Verify that generate_content was called
            assert len(received_model_names) == 1

            # The consensus tool should pass the original alias "gemini"
            # The OpenRouter provider should resolve it internally
            received_model = received_model_names[0]
            print(f"Model name passed to provider: {received_model}")

            assert received_model == "gemini", f"Expected 'gemini' to be passed to provider, got '{received_model}'"

            # Verify the result structure
            assert result["model"] == "gemini"
            assert result["status"] == "success"

    def test_bug_reproduction_with_malformed_model_name(self):
        """Test what happens when 'gemini-2.5-pro' (malformed) is passed to OpenRouter."""
        provider = OpenRouterProvider("test_key")

        # This should NOT resolve because 'gemini-2.5-pro' is not in the OpenRouter registry
        resolved = provider._resolve_model_name("gemini-2.5-pro")

        # The bug: this returns "gemini-2.5-pro" as-is instead of resolving to proper name
        # This is what causes the OpenRouter API to fail
        assert resolved == "gemini-2.5-pro", f"Expected fallback to 'gemini-2.5-pro', got '{resolved}'"

        # Verify the registry doesn't have this malformed name
        config = provider._registry.resolve("gemini-2.5-pro")
        assert config is None, "Registry should not contain 'gemini-2.5-pro' - only 'google/gemini-2.5-pro'"


if __name__ == "__main__":
    # Run the tests
    test = TestModelResolutionBug()
    test.setup_method()

    print("Testing OpenRouter registry resolution...")
    test.test_openrouter_registry_resolves_gemini_alias()
    print("✅ Registry resolves aliases correctly")

    print("\nTesting malformed model name handling...")
    test.test_bug_reproduction_with_malformed_model_name()
    print("✅ Confirmed: malformed names fall through as-is")

    print("\nConsensus tool test completed successfully.")

    print("\nAll tests completed. The bug is fixed.")

```

--------------------------------------------------------------------------------
/tests/test_pip_detection_fix.py:
--------------------------------------------------------------------------------

```python
"""Tests for pip detection fix in run-server.sh script.

This test file ensures our pip detection improvements work correctly
and don't break existing functionality.
"""

import os
import subprocess
import tempfile
from pathlib import Path

import pytest


class TestPipDetectionFix:
    """Test cases for issue #188: PIP is available but not recognized."""

    def test_run_server_script_syntax_valid(self):
        """Test that run-server.sh has valid bash syntax."""
        result = subprocess.run(["bash", "-n", "./run-server.sh"], capture_output=True, text=True)
        assert result.returncode == 0, f"Syntax error in run-server.sh: {result.stderr}"

    def test_run_server_has_proper_shebang(self):
        """Test that run-server.sh starts with proper shebang."""
        content = Path("./run-server.sh").read_text()
        assert content.startswith("#!/bin/bash"), "Script missing proper bash shebang"

    def test_critical_functions_exist(self):
        """Test that all critical functions are defined in the script."""
        content = Path("./run-server.sh").read_text()
        critical_functions = ["find_python", "setup_environment", "setup_venv", "install_dependencies", "bootstrap_pip"]

        for func in critical_functions:
            assert f"{func}()" in content, f"Critical function {func}() not found in script"

    def test_pip_detection_consistency_issue(self):
        """Test the specific issue: pip works in setup_venv but fails in install_dependencies.

        This test verifies that our fix ensures consistent Python executable paths.
        """
        # Test that the get_venv_python_path function now returns absolute paths
        content = Path("./run-server.sh").read_text()

        # Check that get_venv_python_path includes our absolute path conversion logic
        assert "abs_venv_path" in content, "get_venv_python_path should use absolute paths"
        assert 'cd "$(dirname' in content, "Should convert to absolute path"

        # Test successful completion - our fix should make the script more robust
        result = subprocess.run(["bash", "-n", "./run-server.sh"], capture_output=True, text=True)
        assert result.returncode == 0, "Script should have valid syntax after our fix"

    def test_pip_detection_with_non_interactive_shell(self):
        """Test pip detection works in non-interactive shell environments.

        This addresses the contributor's suggestion about non-interactive shells
        not sourcing ~/.bashrc where pip PATH might be defined.
        """
        # Test case for Git Bash on Windows and non-interactive Linux shells
        with tempfile.TemporaryDirectory() as temp_dir:
            # Create mock virtual environment structure
            venv_path = Path(temp_dir) / ".zen_venv"
            bin_path = venv_path / "bin"
            bin_path.mkdir(parents=True)

            # Create mock python executable
            python_exe = bin_path / "python"
            python_exe.write_text("#!/bin/bash\necho 'Python 3.12.3'\n")
            python_exe.chmod(0o755)

            # Create mock pip executable
            pip_exe = bin_path / "pip"
            pip_exe.write_text("#!/bin/bash\necho 'pip 23.0.1'\n")
            pip_exe.chmod(0o755)

            # Test that we can detect pip using explicit paths (not PATH)
            assert python_exe.exists(), "Mock python executable should exist"
            assert pip_exe.exists(), "Mock pip executable should exist"
            assert python_exe.is_file(), "Python should be a file"
            assert pip_exe.is_file(), "Pip should be a file"

    def test_enhanced_diagnostic_messages_included(self):
        """Test that our enhanced diagnostic messages are included in the script.

        Verify that the script contains the enhanced error diagnostics we added.
        """
        content = Path("./run-server.sh").read_text()

        # Check that enhanced diagnostic information is present in the script
        expected_diagnostic_patterns = [
            "Enhanced diagnostic information for debugging",
            "Diagnostic information:",
            "Python executable:",
            "Python executable exists:",
            "Python executable permissions:",
            "Virtual environment path:",
            "Virtual environment exists:",
            "Final diagnostic information:",
        ]

        for pattern in expected_diagnostic_patterns:
            assert pattern in content, f"Enhanced diagnostic pattern '{pattern}' should be in script"

    def test_setup_env_file_does_not_create_bsd_backup(self, tmp_path):
        """Ensure setup_env_file avoids creating .env'' artifacts (BSD sed behavior)."""
        script_path = Path("./run-server.sh").resolve()

        # Prepare temp workspace with example env
        env_example = Path(".env.example").read_text()
        target_example = tmp_path / ".env.example"
        target_example.write_text(env_example)

        # Run setup_env_file inside isolated shell session
        command = f"""
        set -e
        cd "{tmp_path}"
        source "{script_path}"
        setup_env_file
        """
        env = os.environ.copy()
        subprocess.run(["bash", "-lc", command], check=True, env=env, text=True)

        artifacts = {p.name for p in tmp_path.glob(".env*")}
        assert ".env''" not in artifacts, "setup_env_file should not create BSD sed backup artifacts"
        assert ".env" in artifacts, ".env should be created from .env.example"


if __name__ == "__main__":
    pytest.main([__file__, "-v"])

```

--------------------------------------------------------------------------------
/tests/test_disabled_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for DISABLED_TOOLS environment variable functionality."""

import logging
import os
from unittest.mock import patch

import pytest

from server import (
    apply_tool_filter,
    parse_disabled_tools_env,
    validate_disabled_tools,
)


# Mock the tool classes since we're testing the filtering logic
class MockTool:
    def __init__(self, name):
        self.name = name


class TestDisabledTools:
    """Test suite for DISABLED_TOOLS functionality."""

    def test_parse_disabled_tools_empty(self):
        """Empty string returns empty set (no tools disabled)."""
        with patch.dict(os.environ, {"DISABLED_TOOLS": ""}):
            assert parse_disabled_tools_env() == set()

    def test_parse_disabled_tools_not_set(self):
        """Unset variable returns empty set."""
        with patch.dict(os.environ, {}, clear=True):
            # Ensure DISABLED_TOOLS is not in environment
            if "DISABLED_TOOLS" in os.environ:
                del os.environ["DISABLED_TOOLS"]
            assert parse_disabled_tools_env() == set()

    def test_parse_disabled_tools_single(self):
        """Single tool name parsed correctly."""
        with patch.dict(os.environ, {"DISABLED_TOOLS": "debug"}):
            assert parse_disabled_tools_env() == {"debug"}

    def test_parse_disabled_tools_multiple(self):
        """Multiple tools with spaces parsed correctly."""
        with patch.dict(os.environ, {"DISABLED_TOOLS": "debug, analyze, refactor"}):
            assert parse_disabled_tools_env() == {"debug", "analyze", "refactor"}

    def test_parse_disabled_tools_extra_spaces(self):
        """Extra spaces and empty items handled correctly."""
        with patch.dict(os.environ, {"DISABLED_TOOLS": " debug , , analyze ,  "}):
            assert parse_disabled_tools_env() == {"debug", "analyze"}

    def test_parse_disabled_tools_duplicates(self):
        """Duplicate entries handled correctly (set removes duplicates)."""
        with patch.dict(os.environ, {"DISABLED_TOOLS": "debug,analyze,debug"}):
            assert parse_disabled_tools_env() == {"debug", "analyze"}

    def test_tool_filtering_logic(self):
        """Test the complete filtering logic using the actual server functions."""
        # Simulate ALL_TOOLS
        ALL_TOOLS = {
            "chat": MockTool("chat"),
            "debug": MockTool("debug"),
            "analyze": MockTool("analyze"),
            "version": MockTool("version"),
            "listmodels": MockTool("listmodels"),
        }

        # Test case 1: No tools disabled
        disabled_tools = set()
        enabled_tools = apply_tool_filter(ALL_TOOLS, disabled_tools)

        assert len(enabled_tools) == 5  # All tools included
        assert set(enabled_tools.keys()) == set(ALL_TOOLS.keys())

        # Test case 2: Disable some regular tools
        disabled_tools = {"debug", "analyze"}
        enabled_tools = apply_tool_filter(ALL_TOOLS, disabled_tools)

        assert len(enabled_tools) == 3  # chat, version, listmodels
        assert "debug" not in enabled_tools
        assert "analyze" not in enabled_tools
        assert "chat" in enabled_tools
        assert "version" in enabled_tools
        assert "listmodels" in enabled_tools

        # Test case 3: Attempt to disable essential tools
        disabled_tools = {"version", "chat"}
        enabled_tools = apply_tool_filter(ALL_TOOLS, disabled_tools)

        assert "version" in enabled_tools  # Essential tool not disabled
        assert "chat" not in enabled_tools  # Regular tool disabled
        assert "listmodels" in enabled_tools  # Essential tool included

    def test_unknown_tools_warning(self, caplog):
        """Test that unknown tool names generate appropriate warnings."""
        ALL_TOOLS = {
            "chat": MockTool("chat"),
            "debug": MockTool("debug"),
            "analyze": MockTool("analyze"),
            "version": MockTool("version"),
            "listmodels": MockTool("listmodels"),
        }
        disabled_tools = {"chat", "unknown_tool", "another_unknown"}

        with caplog.at_level(logging.WARNING):
            validate_disabled_tools(disabled_tools, ALL_TOOLS)
            assert "Unknown tools in DISABLED_TOOLS: ['another_unknown', 'unknown_tool']" in caplog.text

    def test_essential_tools_warning(self, caplog):
        """Test warning when trying to disable essential tools."""
        ALL_TOOLS = {
            "chat": MockTool("chat"),
            "debug": MockTool("debug"),
            "analyze": MockTool("analyze"),
            "version": MockTool("version"),
            "listmodels": MockTool("listmodels"),
        }
        disabled_tools = {"version", "chat", "debug"}

        with caplog.at_level(logging.WARNING):
            validate_disabled_tools(disabled_tools, ALL_TOOLS)
            assert "Cannot disable essential tools: ['version']" in caplog.text

    @pytest.mark.parametrize(
        "env_value,expected",
        [
            ("", set()),  # Empty string
            ("   ", set()),  # Only spaces
            (",,,", set()),  # Only commas
            ("chat", {"chat"}),  # Single tool
            ("chat,debug", {"chat", "debug"}),  # Multiple tools
            ("chat, debug, analyze", {"chat", "debug", "analyze"}),  # With spaces
            ("chat,debug,chat", {"chat", "debug"}),  # Duplicates
        ],
    )
    def test_parse_disabled_tools_parametrized(self, env_value, expected):
        """Parametrized tests for various input formats."""
        with patch.dict(os.environ, {"DISABLED_TOOLS": env_value}):
            assert parse_disabled_tools_env() == expected

```
Page 2/19FirstPrevNextLast