#
tokens: 30411/50000 1/207 files (page 30/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 30 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/filesystem.py:
--------------------------------------------------------------------------------

```python
"""Secure asynchronous filesystem tools for Ultimate MCP Server.

This module provides secure asynchronous filesystem operations, including reading, writing,
deleting, and manipulating files and directories, with robust security controls to limit access
and optional heuristics to prevent accidental mass deletion/modification by LLMs.
"""

import asyncio
import datetime  # Using datetime for standard timestamp representation.
import difflib
import json
import math  # For isnan checks
import os
import statistics  # For calculating standard deviation in protection heuristics
import time
from fnmatch import fnmatch  # Keep sync fnmatch for pattern matching
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional, Set, Tuple, Union, cast

import aiofiles
import aiofiles.os
from pydantic import BaseModel

from ultimate_mcp_server.config import FilesystemConfig, GatewayConfig, get_config
from ultimate_mcp_server.exceptions import ToolError, ToolInputError
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.filesystem")


class ProtectionTriggeredError(ToolError):
    """Exception raised when a security protection measure is triggered."""

    def __init__(self, message, protection_type=None, context=None, details=None):
        """Initialize the protection triggered error.

        Args:
            message: Error message
            protection_type: Type of protection triggered (e.g., "deletion_protection", "path_protection")
            context: Context information about the protection trigger
            details: Additional error details
        """
        error_details = details or {}
        if protection_type:
            error_details["protection_type"] = protection_type

        self.context = context or {}
        if context:
            error_details["context"] = context

        super().__init__(message, error_code="PROTECTION_TRIGGERED", details=error_details)


# --- Configuration and Security ---


def get_filesystem_config() -> "FilesystemConfig":  # Use type hint from config module
    """Get filesystem configuration object from the main config."""
    cfg: GatewayConfig = get_config()
    # Access the validated FilesystemConfig object directly
    fs_config = cfg.filesystem
    if not fs_config:  # Should not happen with default_factory, but check defensively
        logger.error(
            "Filesystem configuration missing after load. Using defaults.", emoji_key="config"
        )
        from ultimate_mcp_server.config import (
            FilesystemConfig,  # Local import to avoid circularity at top level
        )

        return FilesystemConfig()
    return fs_config


def get_protection_config(operation_type: Literal["deletion", "modification"]) -> Dict[str, Any]:
    """Get protection settings for a specific operation type as a dictionary."""
    fs_config = get_filesystem_config()
    protection_attr_name = f"file_{operation_type}_protection"

    if hasattr(fs_config, protection_attr_name):
        protection_config_obj = getattr(fs_config, protection_attr_name)
        if protection_config_obj and isinstance(
            protection_config_obj, BaseModel
        ):  # Check it's a Pydantic model instance
            # Convert the Pydantic model to a dictionary for consistent access
            return protection_config_obj.model_dump()
        else:
            logger.warning(
                f"Protection config for '{operation_type}' is not a valid model instance. Using defaults.",
                emoji_key="config",
            )
    else:
        logger.warning(
            f"Protection config attribute '{protection_attr_name}' not found. Using defaults.",
            emoji_key="config",
        )

    # Return default dictionary structure if config is missing or invalid
    # Fetch defaults from the Pydantic model definition if possible
    try:
        from ultimate_mcp_server.config import FilesystemProtectionConfig

        return FilesystemProtectionConfig().model_dump()
    except ImportError:  # Fallback if import fails
        return {
            "enabled": False,
            "max_files_threshold": 100,
            "datetime_stddev_threshold_sec": 60 * 60 * 24 * 30,
            "file_type_variance_threshold": 5,
            "max_stat_errors_pct": 10.0,
        }


def get_allowed_directories() -> List[str]:
    """Get allowed directories from configuration.

    Reads the configuration, normalizes paths (absolute, resolves symlinks),
    and returns a list of unique allowed directory paths. Assumes paths were expanded
    during config load.

    Returns:
        List of normalized, absolute directory paths that can be accessed.
    """
    fs_config = get_filesystem_config()
    # Access the already expanded list from the validated config object
    allowed_config: List[str] = fs_config.allowed_directories

    if not allowed_config:
        logger.warning(
            "No filesystem directories configured or loaded for access. All operations may be rejected.",
            emoji_key="security",
        )
        return []

    # Paths should already be expanded and absolute from config loading.
    # We still need to normalize and ensure uniqueness.
    normalized: List[str] = []
    for d in allowed_config:
        try:
            # Basic normalization (separator consistency)
            norm_d = os.path.normpath(d)
            if norm_d not in normalized:  # Avoid duplicates
                normalized.append(norm_d)
        except Exception as e:
            # Log errors during normalization but continue
            logger.error(
                f"Error normalizing configured allowed directory '{d}': {e}. Skipping.",
                emoji_key="config",
            )

    if not normalized and allowed_config:
        logger.error(
            "Filesystem access potentially misconfigured: No valid allowed directories remain after normalization.",
            emoji_key="security",
        )
    elif not normalized:
        # Warning about no configured dirs was logged earlier if allowed_config was empty.
        pass

    # Debug log the final effective list used by tools
    logger.debug(
        f"Filesystem tools operating with {len(normalized)} normalized allowed directories",
        allowed_directories=normalized,
    )
    return normalized


# --- Path Validation ---
async def validate_path(
    path: str,
    check_exists: Optional[bool] = None,
    check_parent_writable: bool = False,
    resolve_symlinks: bool = False,
) -> str:
    """Validate a path for security and accessibility using async I/O.

    Performs several checks:
    1.  Ensures path is a non-empty string.
    2.  Normalizes the path (expands user, makes absolute, resolves '../').
    3.  Checks if the normalized path is within the configured allowed directories.
    4.  If check_exists is True, checks if the path exists. If False, checks it does NOT exist.
    5.  If resolve_symlinks is True, resolves symbolic links and re-validates the real path against allowed dirs.
    6.  If check_exists is False, checks parent directory existence.
    7.  If `check_parent_writable` is True and path likely needs creation, checks parent directory write permissions.

    Args:
        path: The file or directory path input string to validate.
        check_exists: If True, path must exist. If False, path must NOT exist. If None, existence is not checked.
        check_parent_writable: If True and path doesn't exist/creation is implied, check if parent dir is writable.
        resolve_symlinks: If True, follows symlinks and returns their target path. If False, keeps the symlink path.

    Returns:
        The normalized, absolute, validated path string.

    Raises:
        ToolInputError: If the path is invalid (format, permissions, existence violation,
                        outside allowed dirs, symlink issue).
        ToolError: For underlying filesystem errors or configuration issues.
    """
    if not path or not isinstance(path, str):
        raise ToolInputError(
            "Path must be a non-empty string.",
            param_name="path",
            provided_value=repr(path),  # Use repr for clarity on non-string types
        )

    # Path normalization (sync, generally fast)
    try:
        path_expanded = os.path.expanduser(path)
        path_abs = os.path.abspath(path_expanded)
        # Normalize '.','..' and separators
        normalized_path = os.path.normpath(path_abs)
    except Exception as e:
        raise ToolInputError(
            f"Invalid path format or resolution error: {str(e)}",
            param_name="path",
            provided_value=path,
        ) from e

    # --- Use get_allowed_directories which reads from config ---
    allowed_dirs = get_allowed_directories()
    if not allowed_dirs:
        raise ToolError(
            "Filesystem access is disabled: No allowed directories are configured or loadable.",
            context={"configured_directories": 0},  # Provide context
        )

    # Ensure normalized_path is truly *under* an allowed dir.
    is_allowed = False
    original_validated_path = normalized_path  # Store before symlink resolution
    for allowed_dir in allowed_dirs:
        # Ensure allowed_dir is also normalized for comparison
        norm_allowed_dir = os.path.normpath(allowed_dir)
        # Path must be exactly the allowed dir or start with the allowed dir + separator.
        if normalized_path == norm_allowed_dir or normalized_path.startswith(
            norm_allowed_dir + os.sep
        ):
            is_allowed = True
            break

    if not is_allowed:
        logger.warning(
            f"Path '{normalized_path}' denied access. Not within allowed directories: {allowed_dirs}",
            emoji_key="security",
        )
        raise ToolInputError(
            f"Access denied: Path '{path}' resolves to '{normalized_path}', which is outside the allowed directories.",
            param_name="path",
            provided_value=path,
            # Add context about allowed dirs for debugging? Potentially sensitive.
            # context={"allowed": allowed_dirs}
        )

    # Filesystem checks using aiofiles.os
    current_validated_path = normalized_path  # Start with normalized path
    is_symlink = False
    symlink_target_path = None

    try:
        # Use stat with follow_symlinks=False to check the item itself (similar to lstat)
        try:
            lstat_info = await aiofiles.os.stat(current_validated_path, follow_symlinks=False)
            path_exists_locally = True  # stat succeeded, so the path entry itself exists
            is_symlink = os.path.stat.S_ISLNK(lstat_info.st_mode)

            if is_symlink:
                try:
                    # Get the target for information purposes
                    symlink_target = await aiofiles.os.readlink(current_validated_path)
                    symlink_target_path = symlink_target
                    logger.debug(f"Path '{path}' is a symlink pointing to '{symlink_target}'")
                except OSError as link_err:
                    logger.warning(
                        f"Error reading symlink target for '{current_validated_path}': {link_err}"
                    )

        except FileNotFoundError:
            path_exists_locally = False
            is_symlink = False
        except OSError as e:
            # Handle other OS errors during stat check
            logger.error(
                f"OS Error during stat check for '{current_validated_path}': {e}", exc_info=True
            )
            raise ToolError(
                f"Filesystem error checking path status for '{path}': {str(e)}",
                context={"path": path, "resolved_path": current_validated_path},
            ) from e

        # Resolve symlink if it exists and re-validate
        if is_symlink and resolve_symlinks:
            try:
                # Use synchronous os.path.realpath since aiofiles.os.path doesn't have it
                real_path = os.path.realpath(current_validated_path)
                real_normalized = os.path.normpath(real_path)
                symlink_target_path = real_normalized  # noqa: F841

                # Re-check if the *real* resolved path is within allowed directories
                is_real_allowed = False
                for allowed_dir in allowed_dirs:
                    norm_allowed_dir = os.path.normpath(allowed_dir)
                    if real_normalized == norm_allowed_dir or real_normalized.startswith(
                        norm_allowed_dir + os.sep
                    ):
                        is_real_allowed = True
                        break

                if not is_real_allowed:
                    raise ToolInputError(
                        f"Access denied: Path '{path}' is a symbolic link pointing to '{real_normalized}', which is outside allowed directories.",
                        param_name="path",
                        provided_value=path,
                    )

                # If validation passed, use the real path for further checks *about the target*
                current_validated_path = real_normalized
                # Re-check existence *of the target* - use exists instead of lexists
                path_exists = await aiofiles.os.path.exists(current_validated_path)

            except OSError as e:
                # Handle errors during realpath resolution (e.g., broken link, permissions)
                if isinstance(e, FileNotFoundError):
                    # Broken link - the link entry exists, but target doesn't
                    path_exists = False
                    if check_exists is True:
                        raise ToolInputError(
                            f"Required path '{path}' is a symbolic link pointing to a non-existent target ('{original_validated_path}' -> target missing).",
                            param_name="path",
                            provided_value=path,
                        ) from e
                    # If check_exists is False or None, a broken link might be acceptable depending on the operation.
                    # Keep current_validated_path as the *link path itself* if the target doesn't exist.
                    current_validated_path = original_validated_path
                else:
                    raise ToolInputError(
                        f"Error resolving symbolic link '{path}': {str(e)}",
                        param_name="path",
                        provided_value=path,
                    ) from e
            except ToolInputError:  # Re-raise specific input errors
                raise
            except Exception as e:  # Catch other unexpected errors during link resolution
                raise ToolError(
                    f"Unexpected error resolving symbolic link for '{path}': {str(e)}",
                    context={"path": path},
                ) from e
        else:
            # Not a link or not resolving it, so existence check result is based on the initial check
            path_exists = path_exists_locally

        # Check existence requirement *after* potential symlink resolution
        if check_exists is True and not path_exists:
            raise ToolInputError(
                f"Required path '{path}' (resolved to '{current_validated_path}') does not exist.",
                param_name="path",
                provided_value=path,
                details={
                    "path": path,
                    "resolved_path": current_validated_path,
                    "error_type": "PATH_NOT_FOUND",
                },
            )
        elif check_exists is False and path_exists:
            raise ToolInputError(
                f"Path '{path}' (resolved to '{current_validated_path}') already exists, but non-existence was required.",
                param_name="path",
                provided_value=path,
                details={
                    "path": path,
                    "resolved_path": current_validated_path,
                    "error_type": "PATH_ALREADY_EXISTS",
                },
            )
        # else: check_exists is None, or condition met

        # If path doesn't exist and creation is likely (check_exists is False or None), check parent.
        parent_dir = os.path.dirname(current_validated_path)
        if (
            parent_dir and parent_dir != current_validated_path
        ):  # Check parent_dir is not empty and not the root itself
            try:
                parent_exists = await aiofiles.os.path.exists(
                    parent_dir
                )  # Check if parent exists first
                if parent_exists:
                    if not await aiofiles.os.path.isdir(parent_dir):
                        raise ToolInputError(
                            f"Cannot operate on '{path}': Parent path '{parent_dir}' exists but is not a directory.",
                            param_name="path",
                            provided_value=path,
                        )
                    # Parent exists and is a directory, check writability if requested
                    if check_parent_writable:
                        if not os.access(parent_dir, os.W_OK | os.X_OK):
                            raise ToolInputError(
                                f"Cannot operate on '{path}': Parent directory '{parent_dir}' exists but is not writable or accessible.",
                                param_name="path",
                                provided_value=path,
                            )
                # else: Parent does NOT exist.
                # If check_parent_writable was True, it's okay if parent doesn't exist because makedirs will create it.
                # If check_parent_writable was False (or not requested for this scenario), we might still want to error if parent doesn't exist depending on the operation context.
                # For create_directory context, non-existence of parent is fine.
            except OSError as e:
                raise ToolError(
                    f"Filesystem error checking parent directory '{parent_dir}' for '{path}': {str(e)}",
                    context={"path": path, "parent": parent_dir},
                ) from e

    except OSError as e:
        # Catch filesystem errors during async checks like exists, isdir, islink on the primary path
        raise ToolError(
            f"Filesystem error validating path '{path}': {str(e)}",
            context={"path": path, "resolved_path": current_validated_path, "error": str(e)},
        ) from e
    except ToolInputError:  # Re-raise ToolInputErrors from validation logic
        raise
    except Exception as e:
        # Catch unexpected errors during validation logic
        logger.error(f"Unexpected error during path validation for {path}: {e}", exc_info=True)
        raise ToolError(
            f"An unexpected error occurred validating path: {str(e)}", context={"path": path}
        ) from e

    # Always return the validated path string
    return current_validated_path


# --- Helper Functions ---


async def format_file_info(file_path: str, follow_symlinks: bool = False) -> Dict[str, Any]:
    """Get detailed file or directory information asynchronously.

    Uses `aiofiles.os.stat` to retrieve metadata.

    Args:
        file_path: Path to file or directory (assumed validated).
        follow_symlinks: If True, follows symlinks to get info about their targets.
                        If False, gets info about the symlink itself.

    Returns:
        Dictionary with file/directory details (name, path, size, timestamps, type, permissions).
        If an OS error occurs during stat, returns a dict containing 'name', 'path', and 'error'.
    """
    try:
        # Use stat results directly where possible to avoid redundant checks
        # Use stat with follow_symlinks parameter to control whether we stat the link or the target
        stat_info = await aiofiles.os.stat(file_path, follow_symlinks=follow_symlinks)
        mode = stat_info.st_mode
        is_dir = os.path.stat.S_ISDIR(mode)
        is_file = os.path.stat.S_ISREG(mode)  # Check for regular file
        is_link = os.path.stat.S_ISLNK(mode)  # Check if the item stat looked at is a link

        # Use timezone-aware ISO format timestamps for machine readability.
        # Handle potential platform differences in ctime availability (fallback to mtime).
        try:
            # Some systems might not have birthtime (st_birthtime)
            # ctime is platform dependent (creation on Windows, metadata change on Unix)
            # Use mtime as the most reliable "last modified" timestamp.
            # Let's report both ctime and mtime if available.
            ctime_ts = stat_info.st_ctime
        except AttributeError:
            ctime_ts = stat_info.st_mtime  # Fallback

        # Ensure timestamps are valid before conversion
        def safe_isoformat(timestamp):
            try:
                # Handle potential negative timestamps or values outside valid range
                if timestamp < 0:
                    return "Invalid Timestamp (Negative)"
                # Check against a reasonable range (e.g., year 1 to 9999)
                min_ts = datetime.datetime(1, 1, 1, tzinfo=datetime.timezone.utc).timestamp()
                max_ts = datetime.datetime(
                    9999, 12, 31, 23, 59, 59, tzinfo=datetime.timezone.utc
                ).timestamp()
                if not (min_ts <= timestamp <= max_ts):
                    return f"Invalid Timestamp (Out of Range: {timestamp})"

                return datetime.datetime.fromtimestamp(
                    timestamp, tz=datetime.timezone.utc
                ).isoformat()
            except (OSError, ValueError) as e:  # Handle potential errors like invalid values
                logger.warning(
                    f"Invalid timestamp {timestamp} for {file_path}: {e}", emoji_key="warning"
                )
                return f"Invalid Timestamp ({type(e).__name__})"

        info = {
            "name": os.path.basename(file_path),
            "path": file_path,
            "size": stat_info.st_size,
            "created_os_specific": safe_isoformat(ctime_ts),  # Note platform dependency
            "modified": safe_isoformat(stat_info.st_mtime),
            "accessed": safe_isoformat(stat_info.st_atime),
            "is_directory": is_dir,
            "is_file": is_file,
            "is_symlink": is_link,  # Indicate if the path itself is a symlink
            # Use S_IMODE for standard permission bits (mode & 0o777)
            "permissions": oct(os.path.stat.S_IMODE(mode)),
        }

        if is_link or (not follow_symlinks and await aiofiles.os.path.islink(file_path)):
            try:
                # Attempt to read the link target
                link_target = await aiofiles.os.readlink(file_path)
                info["symlink_target"] = link_target
            except OSError as link_err:
                info["symlink_target"] = f"<Error reading link: {link_err}>"

        return info
    except OSError as e:
        logger.warning(f"Error getting file info for {file_path}: {str(e)}", emoji_key="warning")
        # Return consistent error structure, let caller decide severity.
        return {
            "name": os.path.basename(file_path),
            "path": file_path,
            "error": f"Failed to get info: {str(e)}",
        }
    except Exception as e:  # Catch unexpected errors
        logger.error(
            f"Unexpected error getting file info for {file_path}: {e}",
            exc_info=True,
            emoji_key="error",
        )
        return {
            "name": os.path.basename(file_path),
            "path": file_path,
            "error": f"An unexpected error occurred: {str(e)}",
        }


def create_unified_diff(original_content: str, new_content: str, filepath: str) -> str:
    """Create a unified diff string between original and new content.

    Args:
        original_content: Original file content as a single string.
        new_content: New file content as a single string.
        filepath: Path to the file (used in the diff header).

    Returns:
        Unified diff as a multi-line string, or empty string if no differences.
    """
    # Normalize line endings for consistent diffing
    original_lines = original_content.splitlines()
    new_lines = new_content.splitlines()

    # Generate unified diff using difflib (synchronous)
    diff_lines = list(
        difflib.unified_diff(
            original_lines,
            new_lines,
            fromfile=f"{filepath} (original)",  # Label for 'from' file in diff
            tofile=f"{filepath} (modified)",  # Label for 'to' file in diff
            lineterm="",  # Keep lines without added newlines by difflib
        )
    )

    # Return empty string if no changes, otherwise join lines into single string.
    return "\n".join(diff_lines) if diff_lines else ""


async def read_file_content(filepath: str) -> str:
    """Read text file content using async I/O. Assumes UTF-8 encoding.

    Args:
        filepath: Path to the file to read (assumed validated).

    Returns:
        File content as string.

    Raises:
        ToolError: If reading fails or decoding fails. Includes specific context.
    """
    try:
        # Open asynchronously, read with strict UTF-8 decoding.
        async with aiofiles.open(filepath, mode="r", encoding="utf-8", errors="strict") as f:
            return await f.read()
    except UnicodeDecodeError as e:
        logger.warning(f"File {filepath} is not valid UTF-8: {e}", emoji_key="warning")
        # Provide context about the decoding error.
        raise ToolError(
            f"File is not valid UTF-8 encoded text: {filepath}. Cannot read as text. Details: {e}",
            context={"path": filepath, "encoding": "utf-8", "error_details": str(e)},
        ) from e
    except OSError as e:
        logger.error(f"OS error reading file {filepath}: {e}", emoji_key="error")
        raise ToolError(
            f"Error reading file: {str(e)}", context={"path": filepath, "errno": e.errno}
        ) from e
    except Exception as e:
        logger.error(
            f"Unexpected error reading file {filepath}: {e}", exc_info=True, emoji_key="error"
        )
        raise ToolError(
            f"An unexpected error occurred while reading file: {str(e)}", context={"path": filepath}
        ) from e


async def read_binary_file_content(filepath: str) -> bytes:
    """Read binary file content using async I/O.

    Args:
        filepath: Path to the file to read (assumed validated).

    Returns:
        File content as bytes.

    Raises:
        ToolError: If reading fails.
    """
    try:
        # Open asynchronously in binary read mode ('rb').
        async with aiofiles.open(filepath, mode="rb") as f:
            return await f.read()
    except OSError as e:
        logger.error(f"OS error reading binary file {filepath}: {e}", emoji_key="error")
        raise ToolError(
            f"Error reading binary file: {str(e)}", context={"path": filepath, "errno": e.errno}
        ) from e
    except Exception as e:
        logger.error(
            f"Unexpected error reading binary file {filepath}: {e}",
            exc_info=True,
            emoji_key="error",
        )
        raise ToolError(
            f"An unexpected error occurred while reading binary file: {str(e)}",
            context={"path": filepath},
        ) from e


async def write_file_content(filepath: str, content: Union[str, bytes]) -> None:
    """Write text or binary content to a file using async I/O. Creates parent dirs.

    Args:
        filepath: Path to the file to write (assumed validated, including parent writability).
        content: Content to write (string for text UTF-8, bytes for binary).

    Raises:
        ToolError: If writing fails.
        TypeError: If content is not str or bytes.
    """
    # Determine mode and encoding based on content type.
    if isinstance(content, str):
        mode = "w"
        encoding = "utf-8"
        data_to_write = content
    elif isinstance(content, bytes):
        mode = "wb"
        encoding = None  # No encoding for binary mode
        data_to_write = content
    else:
        raise TypeError("Content to write must be str or bytes")

    try:
        # Ensure parent directory exists asynchronously.
        parent_dir = os.path.dirname(filepath)
        if (
            parent_dir and parent_dir != filepath
        ):  # Check parent_dir is not empty and not the root itself
            # Use async makedirs for consistency. exist_ok=True makes it idempotent.
            await aiofiles.os.makedirs(parent_dir, exist_ok=True)

        # Open file asynchronously and write content.
        async with aiofiles.open(filepath, mode=mode, encoding=encoding) as f:
            await f.write(data_to_write)
            # await f.flush() # Often not necessary as context manager handles flush/close, but uncomment for critical writes if needed.

    except OSError as e:
        logger.error(f"OS error writing file {filepath}: {e}", emoji_key="error")
        raise ToolError(
            f"Error writing file: {str(e)}", context={"path": filepath, "errno": e.errno}
        ) from e
    except Exception as e:
        logger.error(
            f"Unexpected error writing file {filepath}: {e}", exc_info=True, emoji_key="error"
        )
        raise ToolError(
            f"An unexpected error occurred while writing file: {str(e)}", context={"path": filepath}
        ) from e


async def apply_file_edits(
    filepath: str, edits: List[Dict[str, str]], dry_run: bool = False
) -> Tuple[str, str]:
    """Apply a series of text replacements to a file asynchronously.

    Reads the file (UTF-8), applies edits sequentially. If an exact match for
    'oldText' isn't found, it attempts a line-by-line match ignoring leading/trailing
    whitespace, preserving the original indentation of the matched block.
    Generates a diff and optionally writes back the changes.

    Args:
        filepath: Path to the file to edit (assumed validated and is a text file).
        edits: List of edit operations. Each dict must have 'oldText' and 'newText' (both strings).
        dry_run: If True, calculate changes and diff but do not write to disk.

    Returns:
        Tuple of (diff_string, new_content_string). The diff string is empty if no changes occurred.

    Raises:
        ToolError: If reading/writing fails.
        ToolInputError: If edits are malformed or text specified in 'oldText' cannot be found.
    """
    # Read original content asynchronously (raises ToolError if fails)
    content = await read_file_content(filepath)
    original_content = content
    current_content = content  # Work on a mutable copy

    # Apply each edit sequentially (string operations are sync)
    for i, edit in enumerate(edits):
        # Validate edit structure
        if not isinstance(edit, dict):
            raise ToolInputError(
                f"Edit #{i + 1} is not a dictionary.",
                param_name=f"edits[{i}]",
                provided_value=type(edit),
            )
        old_text = edit.get("oldText")
        new_text = edit.get("newText")

        if not isinstance(old_text, str):
            raise ToolInputError(
                f"Edit #{i + 1} is missing 'oldText' or it's not a string.",
                param_name=f"edits[{i}].oldText",
                provided_value=edit,
            )
        if not isinstance(new_text, str):
            raise ToolInputError(
                f"Edit #{i + 1} is missing 'newText' or it's not a string.",
                param_name=f"edits[{i}].newText",
                provided_value=edit,
            )
        # Warn about potentially ambiguous empty oldText
        if not old_text:
            logger.warning(
                f"Edit #{i + 1} has empty 'oldText'. Python's string.replace behavior with empty strings might be unexpected.",
                emoji_key="warning",
            )

        # Try exact replacement first
        if old_text in current_content:
            # Replace *all* occurrences of old_text with new_text
            current_content = current_content.replace(old_text, new_text)
        else:
            # Fallback: Try line-by-line matching with stripped whitespace comparison.
            old_lines = old_text.splitlines()
            # Avoid fallback if old_text was only whitespace but wasn't found exactly
            if not any(line.strip() for line in old_lines) and old_text:
                logger.warning(
                    f"Edit #{i + 1}: 'oldText' consists only of whitespace, but was not found exactly. Skipping whitespace-insensitive fallback.",
                    emoji_key="warning",
                )
                raise ToolInputError(
                    f"Could not find exact whitespace text to replace in edit #{i + 1}: '{old_text[:100]}{'...' if len(old_text) > 100 else ''}'",
                    param_name=f"edits[{i}].oldText",
                    provided_value=edit,
                )

            if not old_lines:  # If old_text was empty string and not found, error out.
                raise ToolInputError(
                    f"Could not find empty 'oldText' to replace in edit #{i + 1}.",
                    param_name=f"edits[{i}].oldText",
                    provided_value=edit,
                )

            content_lines = current_content.splitlines()
            found_match = False
            line_idx = 0
            # Iterate through possible starting lines for the block match
            while line_idx <= len(content_lines) - len(old_lines):
                # Check if the slice matches ignoring leading/trailing whitespace on each line
                is_match = all(
                    old_lines[j].strip() == content_lines[line_idx + j].strip()
                    for j in range(len(old_lines))
                )

                if is_match:
                    # Found a match based on content, now replace respecting original indentation.
                    new_lines = new_text.splitlines()

                    # Determine indentation from the *first original line* being replaced.
                    first_original_line = content_lines[line_idx]
                    leading_whitespace = first_original_line[
                        : len(first_original_line) - len(first_original_line.lstrip())
                    ]

                    # Apply this leading whitespace to all *new* lines.
                    indented_new_lines = (
                        [leading_whitespace + line for line in new_lines] if new_lines else []
                    )

                    # Replace the slice in the original lines list
                    content_lines[line_idx : line_idx + len(old_lines)] = indented_new_lines
                    # Reconstruct content string immediately after modification
                    current_content = "\n".join(content_lines)
                    found_match = True
                    # Stop searching for matches for *this specific edit dict* once one is found and replaced.
                    # To replace all occurrences, logic would need modification (e.g., restart search or adjust indices).
                    break

                else:
                    line_idx += 1  # Move to the next line to check

            if not found_match:
                # No match found even with whitespace trimming fallback
                raise ToolInputError(
                    f"Could not find text to replace in edit #{i + 1}. Text searched (approx first 100 chars): '{old_text[:100]}{'...' if len(old_text) > 100 else ''}'. "
                    f"Exact match failed, and whitespace-insensitive line match also failed.",
                    param_name=f"edits[{i}].oldText",
                    provided_value=edit,
                )

    # Create diff (sync function call) using original vs final content
    diff = create_unified_diff(original_content, current_content, filepath)

    # Write the changes if not a dry run asynchronously
    if not dry_run:
        # Ensure content is string before writing
        await write_file_content(filepath, str(current_content))  # Handles errors internally

    return diff, str(current_content)


# --- MCP Formatting Helpers ---


def format_mcp_content(text_content: str) -> List[Dict[str, Any]]:
    """Format text content into a simple MCP text block.

    Applies length truncation to avoid oversized responses.

    Args:
        text_content: Text to format.

    Returns:
        List containing a single MCP text block dictionary.
    """
    # Max length for content block to avoid overwhelming downstream systems.
    MAX_LEN = 100000  # Adjust based on LLM/platform constraints.
    if len(text_content) > MAX_LEN:
        # Note: Truncation happens here, not a placeholder.
        trunc_msg = f"\n... [Content truncated - {len(text_content)} bytes total]"
        text_content = text_content[: MAX_LEN - len(trunc_msg)] + trunc_msg
        logger.warning(f"Content length > {MAX_LEN}, truncated.", emoji_key="warning")

    # Standard MCP text block structure.
    return [{"type": "text", "text": text_content}]


def create_tool_response(content: Any, is_error: bool = False) -> Dict[str, Any]:
    """Create a standard tool response dictionary for the Ultimate MCP Server.

    Formats various content types (str, dict, list) into MCP 'content' blocks,
    attempting to provide useful representations for common tool outputs.

    Args:
        content: The primary result or message from the tool. Can be str, dict,
                 list (assumed pre-formatted MCP), or other types (converted to str).
        is_error: If True, marks the response as an error response using "isError".

    Returns:
        A dictionary structured for the Ultimate MCP Server tool response schema.
    """
    formatted_content: List[Dict[str, Any]]
    response: Dict[str, Any] = {}  # Initialize response dict

    if is_error:
        # --- Error Handling Logic ---
        response["success"] = False  # Explicitly set success to False for errors
        response["isError"] = True  # Mark as an error response

        if isinstance(content, dict) and "message" in content:
            error_message = content.get("message", "Unknown error")
            error_code = content.get("error_code", "TOOL_ERROR")
            error_type = content.get("error_type", "ToolError")

            response["error"] = error_message
            response["error_code"] = error_code
            response["error_type"] = error_type

            # Add details if available
            if "details" in content:
                response["details"] = content["details"]

            # Format the content text for error display, including context if available
            context = content.get("context")  # Context might be added by the calling function
            if context:
                try:
                    # Try to pretty-print context if it's dict/list
                    if isinstance(context, (dict, list)):
                        context_str = json.dumps(context, indent=2, default=str)
                    else:
                        context_str = str(context)
                    error_display_text = f"Error: {error_message}\nContext: {context_str}"
                except Exception:  # Fallback if context serialization fails
                    error_display_text = f"Error: {error_message}\nContext: (Could not serialize context: {type(context).__name__})"
            else:
                error_display_text = f"Error: {error_message}"

            formatted_content = format_mcp_content(error_display_text)

        else:  # Handle cases where error content is not a detailed dict
            error_message = str(content)  # Use string representation of the error content
            formatted_content = format_mcp_content(f"Error: {error_message}")
            response["error"] = error_message
            # Provide default error codes/types if not available
            response["error_code"] = "UNKNOWN_ERROR"
            response["error_type"] = "UnknownError"

        # Add protectionTriggered flag if applicable (check original error dict if passed)
        if isinstance(content, dict) and content.get("protection_triggered"):
            response["protectionTriggered"] = True

        response["content"] = formatted_content

    else:
        # --- Success Case ---
        response["success"] = True  # <<< THIS WAS THE MISSING LINE

        # --- Existing success formatting logic ---
        try:
            if isinstance(content, dict):
                # Handle specific known dictionary structures first
                if (
                    "files" in content
                    and isinstance(content.get("files"), list)
                    and "succeeded" in content
                ):
                    # Format output from read_multiple_files
                    blocks = []
                    summary = f"Read {content.get('succeeded', 0)} files successfully, {content.get('failed', 0)} failed."
                    blocks.append({"type": "text", "text": summary})
                    for file_result in content.get("files", []):
                        if isinstance(file_result, dict):
                            path = file_result.get("path", "Unknown path")
                            if file_result.get("success") and "content" in file_result:
                                size_info = (
                                    f" ({file_result.get('size', 'N/A')} bytes)"
                                    if "size" in file_result
                                    else ""
                                )
                                binary_info = (
                                    " (Binary Preview)" if file_result.get("is_binary") else ""
                                )
                                header = f"--- File: {path}{size_info}{binary_info} ---"
                                file_content_str = str(file_result["content"])
                                blocks.extend(format_mcp_content(f"{header}\n{file_content_str}"))
                            elif "error" in file_result:
                                header = f"--- File: {path} (Error) ---"
                                error_msg = str(file_result["error"])
                                blocks.extend(format_mcp_content(f"{header}\n{error_msg}"))
                            else:
                                blocks.extend(
                                    format_mcp_content(
                                        f"--- File: {path} (Unknown status) ---\n{str(file_result)}"
                                    )
                                )
                        else:
                            blocks.extend(
                                format_mcp_content(
                                    f"--- Invalid entry in results ---\n{str(file_result)}"
                                )
                            )
                    formatted_content = blocks
                elif "tree" in content and "path" in content:
                    # Format output from directory_tree as JSON block
                    tree_json = json.dumps(
                        content["tree"], indent=2, ensure_ascii=False, default=str
                    )
                    MAX_JSON_LEN = 50000
                    if len(tree_json) > MAX_JSON_LEN:
                        trunc_msg = "\n... [Tree JSON truncated]"
                        tree_json = tree_json[: MAX_JSON_LEN - len(trunc_msg)] + trunc_msg
                    formatted_content = format_mcp_content(
                        f"Directory Tree for: {content['path']}\n```json\n{tree_json}\n```"
                    )
                elif "entries" in content and "path" in content:
                    # Format output from list_directory
                    list_strs = [f"Directory Listing for: {content['path']}"]
                    for entry in content.get("entries", []):
                        if isinstance(entry, dict):
                            name = entry.get("name", "?")
                            etype = entry.get("type", "unknown")
                            size_str = (
                                f" ({entry.get('size')} bytes)"
                                if etype == "file" and "size" in entry
                                else ""
                            )
                            error_str = (
                                f" (Error: {entry.get('error')})" if "error" in entry else ""
                            )
                            link_str = (
                                f" -> {entry.get('symlink_target')}"
                                if etype == "symlink" and "symlink_target" in entry
                                else ""
                            )
                            list_strs.append(f"- {name} [{etype}]{size_str}{error_str}{link_str}")
                        else:
                            list_strs.append(f"- Invalid entry: {str(entry)}")
                    formatted_content = format_mcp_content("\n".join(list_strs))
                elif "matches" in content and "path" in content and "pattern" in content:
                    # Format output from search_files
                    search_strs = [
                        f"Search Results for '{content['pattern']}' in '{content['path']}':"
                    ]
                    matches = content.get("matches", [])
                    if matches:
                        for match in matches:
                            search_strs.append(f"- {match}")
                    else:
                        search_strs.append("(No matches found)")
                    if "warnings" in content:
                        search_strs.append("\nWarnings:")
                        search_strs.extend(content["warnings"])
                    formatted_content = format_mcp_content("\n".join(search_strs))
                else:
                    # Attempt to pretty-print other dictionaries as JSON
                    json_content = json.dumps(content, indent=2, ensure_ascii=False, default=str)
                    formatted_content = format_mcp_content(f"```json\n{json_content}\n```")

            elif isinstance(content, str):
                # Simple string content
                formatted_content = format_mcp_content(content)
            elif isinstance(content, list) and all(
                isinstance(item, dict) and "type" in item for item in content
            ):
                # Assume it's already MCP formatted - pass through directly
                formatted_content = content
            else:
                # Convert anything else to string and format
                formatted_content = format_mcp_content(str(content))

            # Ensure formatted_content is valid before adding to response
            if isinstance(formatted_content, list):
                response["content"] = formatted_content
            else:
                # Fallback if formatting somehow failed during success path
                fallback_text = f"Successfully processed, but failed to format response content: {str(formatted_content)}"
                logger.error(fallback_text)  # Log this internal error
                response["content"] = format_mcp_content(fallback_text)
                response["warning"] = "Response content formatting failed."  # Add a warning field

        except (TypeError, ValueError) as json_err:  # Catch JSON serialization errors specifically
            logger.warning(
                f"Could not serialize successful dictionary response to JSON: {json_err}",
                exc_info=True,
                emoji_key="warning",
            )
            formatted_content = format_mcp_content(
                f"(Response dictionary could not be formatted as JSON)\n{str(content)}"
            )
            response["content"] = formatted_content
            response["warning"] = "Could not format successful response as JSON."
        except Exception as format_err:  # Catch any other formatting errors
            logger.error(
                f"Unexpected error formatting successful response content: {format_err}",
                exc_info=True,
            )
            response["content"] = format_mcp_content(f"Error formatting response: {format_err}")
            response["warning"] = "Unexpected error formatting response content."

    return response


# --- Protection Heuristics Implementation ---


async def _get_minimal_stat(path: str) -> Optional[Tuple[Tuple[float, float], str]]:
    """Helper to get minimal stat info (ctime, mtime, extension) for protection checks."""
    try:
        # Use stat with follow_symlinks=False to get info without following links, as we care about the items being listed
        stat_info = await aiofiles.os.stat(path, follow_symlinks=False)
        # Use mtime and ctime (platform-dependent creation/metadata change time)
        mtime = stat_info.st_mtime
        try:
            ctime = stat_info.st_ctime
        except AttributeError:
            ctime = mtime  # Fallback if ctime not available

        extension = (
            os.path.splitext(path)[1].lower()
            if not os.path.stat.S_ISDIR(stat_info.st_mode)
            else ".<dir>"
        )
        return ((ctime, mtime), extension)
    except OSError as e:
        logger.debug(f"Stat failed for protection check on {path}: {e}", emoji_key="warning")
        return None  # Return None on OS error


async def _check_protection_heuristics(
    paths: List[str], operation_type: Literal["deletion", "modification"]
) -> None:
    """
    Check if a bulk operation on multiple files triggers safety protection heuristics.
    (Modified to use get_protection_config which reads from validated config)
    """
    protection_config = get_protection_config()

    if not protection_config.get("enabled", False):
        return  # Protection disabled for this operation type

    num_paths = len(paths)
    # --- Read thresholds from the loaded config dictionary ---
    max_files_threshold = protection_config.get("max_files_threshold", 100)

    # Only run detailed checks if the number of files exceeds the threshold
    if num_paths <= max_files_threshold:
        return

    logger.info(
        f"Performing detailed safety check for {operation_type} of {num_paths} paths (threshold: {max_files_threshold})...",
        emoji_key="security",
    )

    # --- Gather Metadata Asynchronously ---
    stat_tasks = [_get_minimal_stat(p) for p in paths]
    stat_results = await asyncio.gather(*stat_tasks)

    successful_stats: List[Tuple[Tuple[float, float], str]] = []
    failed_stat_count = 0
    for result in stat_results:
        if result is not None:
            successful_stats.append(result)
        else:
            failed_stat_count += 1

    total_attempted = len(paths)
    # --- Read threshold from config dict ---
    max_errors_pct = protection_config.get("max_stat_errors_pct", 10.0)
    if total_attempted > 0 and (failed_stat_count / total_attempted * 100) > max_errors_pct:
        raise ProtectionTriggeredError(
            f"Operation blocked because safety check could not reliably gather file metadata ({failed_stat_count}/{total_attempted} failures).",
            context={"failed_stats": failed_stat_count, "total_paths": total_attempted},
        )

    num_valid_stats = len(successful_stats)
    if num_valid_stats < 2:
        logger.info(
            "Protection check skipped: Not enough valid file metadata points obtained.",
            emoji_key="info",
        )
        return

    # --- Calculate Heuristics ---
    creation_times = [ts[0] for ts, ext in successful_stats]
    modification_times = [ts[1] for ts, ext in successful_stats]
    extensions = {ext for ts, ext in successful_stats if ext and ext != ".<dir>"}

    try:
        ctime_stddev = statistics.pstdev(creation_times) if num_valid_stats > 1 else 0.0
        mtime_stddev = statistics.pstdev(modification_times) if num_valid_stats > 1 else 0.0
        if math.isnan(ctime_stddev):
            ctime_stddev = 0.0
        if math.isnan(mtime_stddev):
            mtime_stddev = 0.0
    except statistics.StatisticsError as e:
        logger.warning(
            f"Could not calculate timestamp standard deviation: {e}", emoji_key="warning"
        )
        ctime_stddev = 0.0
        mtime_stddev = 0.0

    num_unique_extensions = len(extensions)

    # --- Read thresholds from config dict ---
    dt_threshold = protection_config.get("datetime_stddev_threshold_sec", 60 * 60 * 24 * 30)
    type_threshold = protection_config.get("file_type_variance_threshold", 5)

    triggered = False
    reasons = []

    if ctime_stddev > dt_threshold:
        triggered = True
        reasons.append(
            f"High variance in creation times (std dev: {ctime_stddev:.2f}s > threshold: {dt_threshold}s)"
        )
    if mtime_stddev > dt_threshold:
        triggered = True
        reasons.append(
            f"High variance in modification times (std dev: {mtime_stddev:.2f}s > threshold: {dt_threshold}s)"
        )
    if num_unique_extensions > type_threshold:
        triggered = True
        reasons.append(
            f"High variance in file types ({num_unique_extensions} unique types > threshold: {type_threshold})"
        )

    if triggered:
        reason_str = (
            f"Operation involves a large number of files ({num_paths}) with suspicious characteristics: "
            + "; ".join(reasons)
        )
        logger.warning(f"Protection Triggered! Reason: {reason_str}", emoji_key="security")
        raise ProtectionTriggeredError(
            reason_str,
            protection_type=f"{operation_type}_protection",  # Add protection type
            context={  # Use context for structured data
                "num_paths": num_paths,
                "num_valid_stats": num_valid_stats,
                "ctime_stddev_sec": round(ctime_stddev, 2),
                "mtime_stddev_sec": round(mtime_stddev, 2),
                "unique_file_types": num_unique_extensions,
                "threshold_max_files": max_files_threshold,
                "threshold_datetime_stddev_sec": dt_threshold,
                "threshold_file_type_variance": type_threshold,
                "failed_stat_count": failed_stat_count,
            },
        )
    else:
        logger.info(
            f"Safety check passed for {operation_type} of {num_paths} paths.",
            emoji_key="security",
            ctime_stddev=round(ctime_stddev, 2),
            mtime_stddev=round(mtime_stddev, 2),
            unique_types=num_unique_extensions,
        )


# --- Async Walk and Delete Helpers ---


async def async_walk(
    top: str,
    topdown: bool = True,
    onerror: Optional[callable] = None,
    followlinks: bool = False,
    exclude_patterns: Optional[List[str]] = None,
    base_path: Optional[str] = None,
) -> AsyncGenerator[Tuple[str, List[str], List[str]], None]:
    """Async version of os.walk using aiofiles.os.scandir. Handles excludes.

    Yields (current_dir_path, dir_names, file_names) tuples asynchronously.
    Filters entries based on exclude_patterns matched against relative path from base_path.
    Handles errors during scanning via the onerror callback.

    Args:
        top: Root directory path to start walking.
        topdown: Whether to yield the current directory before (True) or after (False) its subdirectories.
        onerror: Optional callable that takes an OSError instance when errors occur during scandir or stat.
        followlinks: If True, recurse into directories pointed to by symlinks.
        exclude_patterns: List of glob-style patterns to exclude files/directories.
        base_path: The original starting path of the walk (used for relative path calculation).

    Yields:
        Tuples of (directory_path, list_of_subdir_names, list_of_file_names)
    """
    if base_path is None:
        base_path = top  # Keep track of the original root for exclusion matching

    dirs: List[str] = []
    nondirs: List[str] = []
    walk_error: Optional[OSError] = None

    try:
        # Use async scandir for efficient directory iteration
        # scandir returns a coroutine that must be awaited to get the entries
        scandir_it = await aiofiles.os.scandir(top)
        # Now iterate through the scandir entries (which should be a list or iterable)
        for entry in scandir_it:
            try:
                # Calculate relative path for exclusion check
                try:
                    # entry.path should be absolute if 'top' is absolute
                    rel_path = os.path.relpath(entry.path, base_path)
                except ValueError:
                    # Fallback if paths are on different drives (Windows) or other issues
                    rel_path = entry.name  # Use name as fallback for matching
                    logger.debug(
                        f"Could not get relative path for {entry.path} from {base_path}, using name '{entry.name}' for exclusion check."
                    )

                # Check exclusion patterns (case-insensitive matching where appropriate)
                is_excluded = False
                if exclude_patterns:
                    norm_rel_path = os.path.normcase(rel_path)
                    norm_entry_name = os.path.normcase(entry.name)
                    for pattern in exclude_patterns:
                        norm_pattern = os.path.normcase(pattern)
                        # Match pattern against full relative path OR just the name
                        if fnmatch(norm_rel_path, norm_pattern) or fnmatch(
                            norm_entry_name, norm_pattern
                        ):
                            is_excluded = True
                            logger.debug(f"Excluding '{entry.path}' due to pattern '{pattern}'")
                            break  # Stop checking patterns for this entry
                if is_excluded:
                    continue  # Skip this excluded entry

                # Check entry type, handling potential errors using lstat to check link itself
                try:
                    # Determine if it's a directory (respecting followlinks for recursion decision later)
                    is_entry_dir = entry.is_dir(follow_symlinks=followlinks)
                    is_entry_link = entry.is_symlink()  # Check if entry *itself* is a link

                    if is_entry_dir and (not is_entry_link or followlinks):
                        # It's a directory, or it's a link to a directory and we follow links
                        dirs.append(entry.name)
                    else:
                        # It's a file, a link we don't follow, or something else
                        nondirs.append(entry.name)

                except OSError as stat_err:
                    # Error determining type (e.g., permissions)
                    if onerror is not None:
                        onerror(stat_err)
                    logger.warning(
                        f"Skipping entry '{entry.name}' in {top} due to stat error: {stat_err}",
                        emoji_key="warning",
                    )
                    continue  # Skip entry if type cannot be determined

            except OSError as entry_proc_err:
                # Error during exclusion check or other processing of the entry itself
                if onerror is not None:
                    onerror(entry_proc_err)
                logger.warning(
                    f"Error processing entry '{entry.name}' in {top}: {entry_proc_err}",
                    emoji_key="warning",
                )
                # Continue processing other entries in the directory

    except OSError as err:
        # Error during the initial scandir call itself (e.g., permissions on 'top')
        walk_error = err
        if onerror is not None:
            onerror(err)
        # Stop iteration for this path if scandir failed

    # --- Yield results and recurse ---
    if walk_error is None:
        if topdown:
            yield top, dirs, nondirs

        # Recurse into subdirectories discovered
        for name in dirs:
            new_path = os.path.join(top, name)
            # Recurse using 'async for' to delegate iteration properly down the recursion
            async for x in async_walk(
                new_path, topdown, onerror, followlinks, exclude_patterns, base_path
            ):
                yield x

        if not topdown:
            yield top, dirs, nondirs


async def _list_paths_recursive(root_path: str) -> List[str]:
    """Recursively list all file paths within a directory using async_walk."""
    paths = []
    try:
        async for dirpath, _dirnames, filenames in async_walk(
            root_path, topdown=True, followlinks=False
        ):
            for filename in filenames:
                paths.append(os.path.join(dirpath, filename))
            # Also include directory paths themselves if needed for certain checks?
            # For deletion check, primarily care about files within.
            # Let's stick to files for heuristic calculation simplicity.
            # for dirname in dirnames:
            #     paths.append(os.path.join(dirpath, dirname))
    except Exception as e:
        logger.error(
            f"Error listing paths recursively under {root_path}: {e}",
            exc_info=True,
            emoji_key="error",
        )
        # Re-raise as ToolError so the caller knows listing failed
        raise ToolError(
            f"Failed to list contents of directory '{root_path}' for safety check: {e}"
        ) from e
    return paths


async def _async_rmtree(path: str):
    """Asynchronously remove a directory and its contents, similar to shutil.rmtree."""
    logger.debug(f"Initiating async rmtree for: {path}", emoji_key="action")
    errors: List[Tuple[str, OSError]] = []

    def onerror(os_error: OSError):
        logger.warning(
            f"Error during rmtree operation on {getattr(os_error, 'filename', 'N/A')}: {os_error}",
            emoji_key="warning",
        )
        errors.append((getattr(os_error, "filename", "N/A"), os_error))

    try:
        # Walk bottom-up to remove files first, then directories
        async for root, dirs, files in async_walk(
            path, topdown=False, onerror=onerror, followlinks=False
        ):
            # Remove files in the current directory
            for name in files:
                filepath = os.path.join(root, name)
                try:
                    logger.debug(f"Removing file: {filepath}", emoji_key="delete")
                    await aiofiles.os.remove(filepath)
                except OSError as e:
                    onerror(e)  # Log error and collect it

            # Remove empty subdirectories (should be empty now if walk is bottom-up)
            for name in dirs:
                dirpath = os.path.join(root, name)
                # We only remove dirs listed in the walk *if* they still exist
                # (e.g., link handling might affect this). Re-check existence and type.
                try:
                    if await aiofiles.os.path.islink(dirpath):
                        # Remove symlink itself if not following links
                        logger.debug(
                            f"Removing symlink (treated as file): {dirpath}", emoji_key="delete"
                        )
                        await aiofiles.os.remove(dirpath)
                    elif await aiofiles.os.path.isdir(dirpath):
                        logger.debug(f"Removing directory: {dirpath}", emoji_key="delete")
                        await aiofiles.os.rmdir(dirpath)
                except OSError as e:
                    onerror(e)  # Log error and collect it

        # Finally, remove the top-level directory itself
        try:
            logger.debug(f"Removing root directory: {path}", emoji_key="delete")
            await aiofiles.os.rmdir(path)
        except OSError as e:
            onerror(e)

        if errors:
            # Raise a consolidated error if any deletions failed
            error_summary = "; ".join([f"{p}: {e.strerror}" for p, e in errors[:5]])
            if len(errors) > 5:
                error_summary += " ... (more errors)"
            raise ToolError(
                f"Errors occurred during recursive deletion of '{path}': {error_summary}",
                context={"path": path, "num_errors": len(errors)},
            )

    except Exception as e:
        logger.error(
            f"Unexpected error during async rmtree of {path}: {e}", exc_info=True, emoji_key="error"
        )
        raise ToolError(
            f"An unexpected error occurred during recursive deletion: {str(e)}",
            context={"path": path},
        ) from e


# --- Tool Functions ---


@with_tool_metrics
@with_error_handling
async def read_file(path: str) -> Dict[str, Any]:
    """Read a file's content asynchronously, handling text/binary detection.

    Validates the path, checks it's a file, attempts to read as UTF-8 text.
    If UTF-8 decoding fails, it reads as binary and provides a hex preview.

    Args:
        path: Path to the file to read.

    Returns:
        A dictionary formatted as an MCP tool response containing file content
        or an error message.
    """
    start_time = time.monotonic()
    response_content: Any
    is_response_error = False

    try:
        # Validate path, ensuring it exists and is accessible. check_exists=True
        validated_path = await validate_path(path, check_exists=True, check_parent_writable=False)

        # Ensure it's a file, not a directory or other type.
        if not await aiofiles.os.path.isfile(validated_path):
            # Check if it's a link first before declaring it's not a regular file
            if await aiofiles.os.path.islink(validated_path):
                # If it's a link, let read proceed, it might link to a file.
                # The isfile check follows links, so if isfile failed, the target isn't a file.
                raise ToolInputError(
                    f"Path '{path}' is a symbolic link that points to something that is not a regular file.",
                    param_name="path",
                    provided_value=path,
                    details={
                        "path": path,
                        "resolved_path": validated_path,
                        "error_type": "INVALID_SYMLINK_TARGET",
                    },
                )
            elif await aiofiles.os.path.isdir(validated_path):
                raise ToolInputError(
                    f"Path '{path}' is a directory, not a file. Use list_directory or directory_tree to view its contents.",
                    param_name="path",
                    provided_value=path,
                    details={
                        "path": path,
                        "resolved_path": validated_path,
                        "error_type": "PATH_IS_DIRECTORY",
                    },
                )
            else:
                raise ToolInputError(
                    f"Path '{path}' exists but is not a regular file. It may be a special file type (socket, device, etc.).",
                    param_name="path",
                    provided_value=path,
                    details={
                        "path": path,
                        "resolved_path": validated_path,
                        "error_type": "PATH_NOT_REGULAR_FILE",
                    },
                )

        content: Union[str, bytes]
        is_binary = False
        read_error = None
        file_size = -1  # Default size if stat fails later

        # Attempt to read as text first
        try:
            content = await read_file_content(validated_path)  # Handles UTF-8 check internally
        except ToolError as text_err:
            # Check if the error was specifically UnicodeDecodeError
            if "not valid UTF-8" in str(text_err):
                logger.warning(
                    f"File {path} is not UTF-8 encoded, reading as binary.", emoji_key="warning"
                )
                is_binary = True
                try:
                    # Fallback to binary read
                    content = await read_binary_file_content(
                        validated_path
                    )  # Keep raw bytes for now
                except ToolError as bin_err:
                    read_error = (
                        f"Error reading file as binary after text decode failed: {str(bin_err)}"
                    )
                except Exception as bin_e:
                    read_error = f"Unexpected error reading file as binary: {str(bin_e)}"
            else:
                # Other OS read error during text read attempt
                read_error = f"Error reading file: {str(text_err)}"
        except Exception as text_e:
            read_error = f"Unexpected error reading file as text: {str(text_e)}"

        if read_error:
            # If reading failed entirely (text and binary fallback)
            raise ToolError(read_error, context={"path": validated_path})

        # Successfully read content (either text or binary bytes)
        try:
            # Use stat with follow_symlinks=False for consistency
            file_size = (await aiofiles.os.stat(validated_path, follow_symlinks=False)).st_size
        except OSError as stat_err:
            logger.warning(
                f"Could not get size for file {validated_path} after reading: {stat_err}",
                emoji_key="warning",
            )
            # Continue without size info

        # Prepare response content string
        basename = os.path.basename(validated_path)
        size_str = f"{file_size} bytes" if file_size >= 0 else "Size unavailable"

        if is_binary:
            # Provide a safe representation for binary data
            binary_data = cast(bytes, content)
            hex_preview_len = 200  # Bytes to show as hex
            hex_preview = binary_data[:hex_preview_len].hex(" ")  # Use spaces for readability
            preview_msg = f"(showing first {min(hex_preview_len, len(binary_data))} bytes as hex)"
            ellipsis = "..." if len(binary_data) > hex_preview_len else ""
            response_text = (
                f"File: {basename}\n"
                f"Path: {validated_path}\n"
                f"Size: {size_str}\n"
                f"Content: <Binary file detected> {preview_msg}\n"
                f"{hex_preview}{ellipsis}"
            )
        else:
            # Content is already string
            response_text = (
                f"File: {basename}\n"
                f"Path: {validated_path}\n"
                f"Size: {size_str}\n"
                f"Content:\n"  # Add newline for separation
                f"{content}"
            )

        response_content = response_text
        processing_time = time.monotonic() - start_time
        logger.success(
            f"Successfully read file: {path}",
            emoji_key="file",
            size=file_size,
            time=processing_time,
            is_binary=is_binary,
        )

    except (ToolInputError, ToolError) as e:
        logger.error(
            f"Error in read_file for '{path}': {e}",
            emoji_key="error",
            details=getattr(e, "context", None),
        )
        # Return a formatted error response with detailed info
        error_type = e.__class__.__name__
        error_details = getattr(e, "details", {}) or {}
        # Get the context from the error if available (used in base ToolError)
        context = getattr(e, "context", None)
        if context and isinstance(context, dict):
            error_details.update(context)
        # Include error type and code for better error display
        response_content = {
            "message": str(e),
            "error_code": getattr(e, "error_code", "TOOL_ERROR"),
            "error_type": error_type,
            "details": error_details,
        }
        is_response_error = True
    except FileNotFoundError as e:
        # Specific error for file not found
        raise ToolInputError(
            f"File not found: {path}",
            param_name="path",
            provided_value=path,
            details={"errno": e.errno, "error_type": "PATH_NOT_FOUND"},
        ) from e
    except IsADirectoryError as e:
        # Specific error for path being a directory
        raise ToolInputError(
            f"Path is a directory, not a file: {path}",
            param_name="path",
            provided_value=path,
            details={"errno": e.errno, "error_type": "PATH_IS_DIRECTORY"},
        ) from e
    except PermissionError as e:
        # Specific error for permissions
        raise ToolInputError(
            f"Permission denied reading file: {path}",
            param_name="path",
            provided_value=path,
            details={"errno": e.errno, "error_type": "PERMISSION_DENIED"},
        ) from e
    except UnicodeDecodeError as e:
        # This is handled in read_file_content now, but keep check here just in case
        raise ToolError(
            f"File is not valid UTF-8 encoded text: {validated_path}. Details: {e}",
            context={"path": validated_path, "encoding": "utf-8"},
        ) from e
    except OSError as e:
        # General OS error during read
        raise ToolError(
            f"OS error reading file: {str(e)}", context={"path": validated_path, "errno": e.errno}
        ) from e
    except Exception as e:
        logger.error(
            f"Unexpected error in read_file for '{path}': {e}", exc_info=True, emoji_key="error"
        )
        response_content = {
            "message": f"An unexpected error occurred while reading '{path}': {str(e)}",
            "error_code": "UNEXPECTED_ERROR",
            "error_type": type(e).__name__,
            "details": {"error_class": type(e).__name__, "path": path},
        }
        is_response_error = True

    # Use create_tool_response for consistent formatting of success/error messages.
    return create_tool_response(response_content, is_error=is_response_error)


@with_tool_metrics
@with_error_handling
async def read_multiple_files(paths: List[str]) -> Dict[str, Any]:
    """Read the contents of multiple files asynchronously and concurrently.

    Validates each path and attempts to read each file (text/binary),
    handling individual errors gracefully.

    Args:
        paths: A list of file paths to read.

    Returns:
        A dictionary summarizing the results (suitable for create_tool_response),
        including success/failure counts and content/errors for each file.
        The operation itself succeeds unless input validation fails.
    """
    start_time = time.monotonic()

    # Input validation
    if not isinstance(paths, list):
        raise ToolInputError(
            "Input must be a list of paths.", param_name="paths", provided_value=type(paths)
        )
    if not paths:  # Handle empty list input explicitly
        logger.info("read_multiple_files called with empty list.", emoji_key="info")
        return {
            "files": [],
            "succeeded": 0,
            "failed": 0,
            "success": True,
            "message": "No paths provided to read.",
        }
    if not all(isinstance(p, str) for p in paths):
        invalid_path = next((p for p in paths if not isinstance(p, str)), None)
        raise ToolInputError(
            "All items in the 'paths' list must be strings.",
            param_name="paths",
            provided_value=f"List contains element of type {type(invalid_path)}",
        )

    # --- Inner Task Definition ---
    async def read_single_file_task(path: str) -> Dict[str, Any]:
        """Task to read and process a single file for read_multiple_files."""
        task_result: Dict[str, Any] = {"path": path, "success": False}  # Initialize result dict
        validated_path: Optional[str] = None
        try:
            validated_path = await validate_path(
                path, check_exists=True, check_parent_writable=False
            )
            task_result["path"] = validated_path  # Update path in result if validation succeeds

            # check isfile (follows links)
            if not await aiofiles.os.path.isfile(validated_path):
                # Check if link before failing
                if await aiofiles.os.path.islink(validated_path):
                    task_result["error"] = "Path is a link, but does not point to a regular file"
                else:
                    task_result["error"] = "Path exists but is not a regular file"
                return task_result

            read_error = None
            file_size = -1

            # Try reading as text (UTF-8)
            try:
                content_str = await read_file_content(validated_path)
                task_result["content"] = content_str
            except ToolError as text_err:
                if "not valid UTF-8" in str(text_err):
                    try:
                        binary_content = await read_binary_file_content(validated_path)
                        # For multi-read, just provide preview directly in result content
                        hex_preview_len = 200
                        hex_preview = binary_content[:hex_preview_len].hex(" ")
                        ellipsis = "..." if len(binary_content) > hex_preview_len else ""
                        preview_msg = f"<binary file detected, hex preview (first {min(hex_preview_len, len(binary_content))} bytes)>: {hex_preview}{ellipsis}"
                        task_result["content"] = preview_msg
                        task_result["is_binary"] = True
                    except ToolError as bin_err:
                        read_error = f"Error reading as binary: {str(bin_err)}"
                    except Exception as bin_e:
                        read_error = f"Unexpected error reading as binary: {str(bin_e)}"
                else:
                    read_error = f"Error reading file: {str(text_err)}"
            except Exception as text_e:
                read_error = f"Unexpected error reading file as text: {str(text_e)}"

            if read_error:
                task_result["error"] = read_error
                return task_result  # Mark as failed

            # Successfully read content (string or binary preview string)
            task_result["success"] = True

            # Try to get size (use stat with follow_symlinks=False for consistency)
            try:
                file_size = (await aiofiles.os.stat(validated_path, follow_symlinks=False)).st_size
                task_result["size"] = file_size
            except OSError as stat_err:
                logger.warning(
                    f"Could not get size for {validated_path} in multi-read: {stat_err}",
                    emoji_key="warning",
                )
                task_result["warning"] = "Could not retrieve file size."

            return task_result

        except (ToolInputError, ToolError) as e:
            # Handle validation or specific tool errors for this path
            task_result["error"] = str(e)
            task_result["path"] = (
                validated_path or path
            )  # Use original path if validation failed early
            return task_result
        except Exception as e:
            # Catch unexpected errors during processing of a single file
            logger.error(
                f"Unexpected error reading single file {path} in multi-read: {e}",
                exc_info=True,
                emoji_key="error",
            )
            task_result["error"] = f"Unexpected error: {str(e)}"
            task_result["path"] = validated_path or path
            return task_result

    # --- End Inner Task Definition ---

    # Execute reads concurrently using asyncio.gather
    results = await asyncio.gather(
        *(read_single_file_task(p) for p in paths), return_exceptions=True
    )

    # Process results (handle potential exceptions returned by gather)
    processed_results: List[Dict[str, Any]] = []
    successful_count = 0
    failed_count = 0

    for i, res in enumerate(results):
        original_path = paths[i]  # Keep track of the requested path
        if isinstance(res, Exception):
            # An unexpected exception occurred *outside* the try/except in the task (unlikely)
            logger.error(
                f"Gather returned exception for path '{original_path}': {res}",
                exc_info=res,
                emoji_key="error",
            )
            processed_results.append(
                {
                    "path": original_path,
                    "error": f"Internal error during task execution: {res}",
                    "success": False,
                }
            )
            failed_count += 1
        elif isinstance(res, dict):
            # Expected dictionary output from our task
            processed_results.append(res)
            if res.get("success"):
                successful_count += 1
            else:
                failed_count += 1
        else:
            # Should not happen if task always returns dict
            logger.error(
                f"Unexpected result type from task for path '{original_path}': {type(res)}",
                emoji_key="error",
            )
            processed_results.append(
                {
                    "path": original_path,
                    "error": f"Internal error: Unexpected task result type {type(res)}",
                    "success": False,
                }
            )
            failed_count += 1

    processing_time = time.monotonic() - start_time
    logger.success(
        f"Finished read_multiple_files: {successful_count} succeeded, {failed_count} failed",
        emoji_key="file",
        total_files=len(paths),
        time=processing_time,
    )

    # Return a dictionary structure that create_tool_response understands
    return {
        "files": processed_results,
        "succeeded": successful_count,
        "failed": failed_count,
        "success": True,  # The overall tool execution was successful (individual files might have failed)
    }


@with_tool_metrics
@with_error_handling
async def get_unique_filepath(path: str) -> Dict[str, Any]:
    """
    Finds an available (non-existent) filepath based on the requested path.

    If the requested path already doesn't exist, it's returned directly.
    If it exists, it appends counters like '_1', '_2', etc., to the filename stem
    until an unused path is found within the same directory.

    Args:
        path: The desired file path.

    Returns:
        Dictionary containing the unique, validated, absolute path found.

    Raises:
        ToolInputError: If the base path is invalid or outside allowed directories.
        ToolError: If the counter limit is exceeded or filesystem errors occur.
    """
    start_time = time.monotonic()
    MAX_FILENAME_ATTEMPTS = 1000  # Safety limit

    try:
        # 1. Validate the *input* path first.
        #    check_exists=None because the *final* path might not exist.
        #    check_parent_writable=False - we only need read/stat access to check existence.
        #    The parent directory's writability should be checked by the *calling* function
        #    (like write_file or smart_download's directory creation) before this.
        validated_input_path = await validate_path(
            path, check_exists=None, check_parent_writable=False
        )
        logger.debug(
            f"get_unique_filepath: Validated input path resolves to {validated_input_path}"
        )

        # 2. Check if the initial validated path is already available.
        if not await aiofiles.os.path.exists(validated_input_path):
            logger.info(f"Initial path '{validated_input_path}' is available.", emoji_key="file")
            return {
                "path": validated_input_path,
                "attempts": 0,
                "success": True,
                "message": f"Path '{validated_input_path}' is already unique.",
            }

        # 3. Path exists, need to find a unique alternative.
        logger.debug(f"Path '{validated_input_path}' exists, finding unique alternative.")
        dirname = os.path.dirname(validated_input_path)
        original_filename = os.path.basename(validated_input_path)
        stem, suffix = os.path.splitext(original_filename)

        counter = 1
        while counter <= MAX_FILENAME_ATTEMPTS:
            # Construct new filename candidate
            candidate_filename = f"{stem}_{counter}{suffix}"
            candidate_path = os.path.join(dirname, candidate_filename)

            # Check existence asynchronously
            if not await aiofiles.os.path.exists(candidate_path):
                processing_time = time.monotonic() - start_time
                logger.success(
                    f"Found unique path '{candidate_path}' after {counter} attempts.",
                    emoji_key="file",
                    time=processing_time,
                )
                return {
                    "path": candidate_path,
                    "attempts": counter,
                    "success": True,
                    "message": f"Found unique path '{candidate_path}'.",
                }

            counter += 1

        # If loop finishes, we exceeded the limit
        raise ToolError(
            f"Could not find a unique filename based on '{path}' after {MAX_FILENAME_ATTEMPTS} attempts.",
            context={"base_path": validated_input_path, "attempts": MAX_FILENAME_ATTEMPTS},
        )

    except OSError as e:
        # Catch errors during exists checks
        logger.error(
            f"Filesystem error finding unique path based on '{path}': {str(e)}",
            exc_info=True,
            emoji_key="error",
        )
        raise ToolError(
            f"Filesystem error checking path existence: {str(e)}",
            context={"path": path, "errno": e.errno},
        ) from e
    except (
        ToolInputError,
        ToolError,
    ):  # Re-raise specific errors from validate_path or the counter limit
        raise
    except Exception as e:
        # Catch unexpected errors
        logger.error(
            f"Unexpected error finding unique path for {path}: {e}",
            exc_info=True,
            emoji_key="error",
        )
        raise ToolError(
            f"An unexpected error occurred finding a unique path: {str(e)}", context={"path": path}
        ) from e


@with_tool_metrics
@with_error_handling
async def write_file(path: str, content: Union[str, bytes]) -> Dict[str, Any]:
    """Write content to a file asynchronously (UTF-8 or binary), creating/overwriting.

    Ensures the path is valid, within allowed directories, and that the parent
    directory exists and is writable. Fails if the target path exists and is a directory.

    Args:
        path: Path to the file to write.
        content: Content to write (string for text UTF-8, bytes for binary).

    Returns:
        A dictionary confirming success and providing file details (path, size).
    """
    start_time = time.monotonic()

    # Validate content type explicitly at the start.
    if not isinstance(content, (str, bytes)):
        raise ToolInputError(
            "Content to write must be a string or bytes.",
            param_name="content",
            provided_value=type(content),
        )

    # Validate path: doesn't need to exist necessarily (check_exists=None), but parent must exist and be writable.
    validated_path = await validate_path(path, check_exists=None, check_parent_writable=True)

    # Check if the path exists and is a directory (we shouldn't overwrite a dir with a file).
    # Use exists instead of lexists for this check
    if await aiofiles.os.path.exists(validated_path) and await aiofiles.os.path.isdir(
        validated_path
    ):
        raise ToolInputError(
            f"Cannot write file: Path '{path}' (resolved to '{validated_path}') exists and is a directory.",
            param_name="path",
            provided_value=path,
        )

    # write_file_content handles actual writing and parent dir creation
    await write_file_content(validated_path, content)  # Can raise ToolError

    # Verify write success by getting status and size afterwards
    file_size = -1
    try:
        file_size = (await aiofiles.os.stat(validated_path, follow_symlinks=False)).st_size
    except OSError as e:
        # If stat fails after write seemed to succeed, something is wrong.
        logger.error(
            f"File write appeared successful for {validated_path}, but failed to get status afterwards: {e}",
            emoji_key="error",
        )
        raise ToolError(
            f"File written but failed to verify status afterwards: {str(e)}",
            context={"path": validated_path},
        ) from e

    processing_time = time.monotonic() - start_time
    logger.success(
        f"Successfully wrote file: {path}", emoji_key="file", size=file_size, time=processing_time
    )

    # Return a structured success response.
    return {
        "message": f"Successfully wrote {file_size} bytes to '{validated_path}'.",
        "path": validated_path,
        "size": file_size,
        "success": True,
    }


@with_tool_metrics
@with_error_handling
async def edit_file(
    path: str, edits: List[Dict[str, str]], dry_run: bool = False
) -> Dict[str, Any]:
    """Edit a text file asynchronously by applying string replacements (UTF-8).

    Validates path and edits, reads the file, applies changes (with fallbacks for
    whitespace differences), generates a diff, and optionally writes back.

    Args:
        path: Path to the file to edit. Must be an existing text file.
        edits: List of edit operations. Each dict needs 'oldText' (str)
               and 'newText' (str).
        dry_run: If True, calculates changes and diff but does not save them.

    Returns:
        Dictionary containing the generated diff, success status, path,
        and whether it was a dry run.
    """
    start_time = time.monotonic()

    # Validate path must exist and be a file (check_exists=True).
    validated_path = await validate_path(path, check_exists=True, check_parent_writable=False)
    # Check if it's a regular file (follows links)
    if not await aiofiles.os.path.isfile(validated_path):
        if await aiofiles.os.path.islink(validated_path):
            raise ToolInputError(
                f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a regular file.",
                param_name="path",
                provided_value=path,
            )
        else:
            raise ToolInputError(
                f"Path '{path}' (resolved to '{validated_path}') is not a regular file.",
                param_name="path",
                provided_value=path,
            )

    # Validate edits structure
    if not isinstance(edits, list):
        raise ToolInputError(
            "Edits parameter must be a list.", param_name="edits", provided_value=type(edits)
        )
    if not edits:
        # Handle empty edits list as a no-op success.
        logger.info(
            f"edit_file called with empty edits list for {path}. No changes will be made.",
            emoji_key="info",
        )
        return {
            "path": validated_path,
            "diff": "No edits provided.",
            "success": True,
            "dry_run": dry_run,
            "message": "No edits were specified.",
        }
    # Deeper validation of each edit dict happens within apply_file_edits

    # NOTE: Modification protection is currently NOT applied here.
    # This operates on a single file. Bulk editing would require a different tool
    # where protection heuristics might be applied.

    # apply_file_edits handles reading, core editing logic, diffing, conditional writing.
    # Raises ToolInputError/ToolError on failure.
    diff, new_content = await apply_file_edits(validated_path, edits, dry_run)

    processing_time = time.monotonic() - start_time

    action = "Previewed edits for" if dry_run else "Applied edits to"
    logger.success(
        f"Successfully {action} file: {path}",
        emoji_key="file",
        num_edits=len(edits),
        dry_run=dry_run,
        time=processing_time,
        changes_made=(diff != ""),  # Log if actual changes resulted
    )

    # Provide clearer messages based on diff content and dry_run status.
    if diff:
        diff_message = diff
        status_message = f"Successfully {'previewed' if dry_run else 'applied'} {len(edits)} edits."
    else:  # Edits provided, but resulted in no change to content
        diff_message = "No changes detected after applying edits."
        status_message = f"{len(edits)} edits provided, but resulted in no content changes."

    return {
        "path": validated_path,
        "diff": diff_message,
        "success": True,
        "dry_run": dry_run,
        "message": status_message,
        # "new_content": new_content # Optional: return new content, especially for dry runs? Can be large.
    }


@with_tool_metrics
@with_error_handling
async def create_directory(path: str) -> Dict[str, Any]:
    """Create a directory asynchronously, including parent directories (like 'mkdir -p').

    Validates the path is allowed and parent is writable. Idempotent: If the directory
    already exists, it succeeds without error. Fails if the path exists but is a file.

    Args:
        path: Path to the directory to create.

    Returns:
        Dictionary confirming success, path, and whether it was newly created.
    """
    start_time = time.monotonic()

    # Validate path, parent must exist/be writable. Path itself should ideally not exist (check_exists=None allows check).
    validated_path = await validate_path(path, check_exists=None, check_parent_writable=True)

    created = False
    message = ""
    try:
        # Check existence and type before creating, using exists instead of lexists
        if await aiofiles.os.path.exists(validated_path):
            if await aiofiles.os.path.isdir(validated_path):
                # Directory already exists - idempotent success.
                logger.info(
                    f"Directory already exists: {path} (resolved: {validated_path})",
                    emoji_key="directory",
                )
                message = f"Directory '{validated_path}' already exists."
            else:
                # Path exists but is not a directory (e.g., a file or symlink)
                raise ToolInputError(
                    f"Cannot create directory: Path '{path}' (resolved to '{validated_path}') already exists but is not a directory.",
                    param_name="path",
                    provided_value=path,
                )
        else:
            # Path does not exist, proceed with creation using async makedirs.
            await aiofiles.os.makedirs(validated_path, exist_ok=True)
            created = True
            logger.success(
                f"Successfully created directory: {path} (resolved: {validated_path})",
                emoji_key="directory",
            )
            message = f"Successfully created directory '{validated_path}'."

    except OSError as e:
        # Catch errors during the exists/isdir checks or makedirs call
        logger.error(
            f"Error creating directory '{path}' (resolved: {validated_path}): {e}",
            exc_info=True,
            emoji_key="error",
        )
        raise ToolError(
            f"Error creating directory '{path}': {str(e)}",
            context={"path": validated_path, "errno": e.errno},
        ) from e
    except Exception as e:
        # Catch unexpected errors
        logger.error(
            f"Unexpected error creating directory {path}: {e}", exc_info=True, emoji_key="error"
        )
        raise ToolError(
            f"An unexpected error occurred creating directory: {str(e)}",
            context={"path": validated_path},
        ) from e

    processing_time = time.monotonic() - start_time
    logger.success(
        f"Successfully created directory: {path} (resolved: {validated_path})",
        emoji_key="directory",
        time=processing_time,
    )

    return {"path": validated_path, "created": created, "success": True, "message": message}


@with_tool_metrics
@with_error_handling
async def list_directory(path: str) -> Dict[str, Any]:
    """List files and subdirectories within a given directory asynchronously.

    Validates the path exists, is a directory, and is allowed.
    Provides basic info (name, type, size for files, link target for symlinks) for each entry.

    Args:
        path: Path to the directory to list.

    Returns:
        Dictionary containing the path listed and a list of entries.
    """
    start_time = time.monotonic()

    # Validate path exists and is a directory (check_exists=True).
    validated_path = await validate_path(path, check_exists=True, check_parent_writable=False)
    if not await aiofiles.os.path.isdir(validated_path):
        # If it's a link, check if it points to a directory
        if await aiofiles.os.path.islink(validated_path):
            try:
                target_is_dir = await aiofiles.os.path.isdir(
                    await aiofiles.os.path.realpath(validated_path)
                )
                if not target_is_dir:
                    raise ToolInputError(
                        f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a directory.",
                        param_name="path",
                        provided_value=path,
                    )
                # If target is dir, proceed using validated_path (which resolves the link)
            except OSError as e:
                raise ToolError(
                    f"Error resolving or checking link target for directory listing: {e}",
                    context={"path": validated_path},
                ) from e
        else:
            raise ToolInputError(
                f"Path '{path}' (resolved to '{validated_path}') is not a directory.",
                param_name="path",
                provided_value=path,
            )

    entries: List[Dict[str, Any]] = []
    scan_errors: List[str] = []
    try:
        # Use await with scandir rather than async iteration
        entry_list = await aiofiles.os.scandir(validated_path)
        for entry in entry_list:
            entry_info: Dict[str, Any] = {"name": entry.name}
            try:
                # Use async methods on the DirEntry object for efficiency, check link status explicitly
                is_link = entry.is_symlink()  # Checks if entry *itself* is a link
                entry_info["is_symlink"] = is_link

                # Let's be explicit using lstat results for type determination
                try:
                    stat_res = entry.stat(follow_symlinks=False)  # Use lstat via entry
                    mode = stat_res.st_mode
                    l_is_dir = os.path.stat.S_ISDIR(mode)
                    l_is_file = os.path.stat.S_ISREG(mode)
                    l_is_link = os.path.stat.S_ISLNK(mode)  # Should match entry.is_symlink() result

                    if l_is_dir:
                        entry_info["type"] = "directory"
                    elif l_is_file:
                        entry_info["type"] = "file"
                        entry_info["size"] = stat_res.st_size  # Size of file itself
                    elif l_is_link:
                        entry_info["type"] = "symlink"
                        entry_info["size"] = stat_res.st_size  # Size of link itself
                        # Optionally try to resolve link target for display
                        try:
                            target = await aiofiles.os.readlink(entry.path)
                            entry_info["symlink_target"] = target
                        except OSError as link_err:
                            entry_info["error"] = f"Could not read link target: {link_err}"
                    else:
                        entry_info["type"] = "other"  # E.g., socket, fifo
                        entry_info["size"] = stat_res.st_size

                except OSError as stat_err:
                    logger.warning(
                        f"Could not lstat entry {entry.path} in list_directory: {stat_err}",
                        emoji_key="warning",
                    )
                    entry_info["type"] = "error"
                    entry_info["error"] = f"Could not get info: {stat_err}"

                entries.append(entry_info)

            except OSError as entry_err:
                # Error processing a specific entry (e.g., permission denied on is_dir/is_file/is_symlink)
                logger.warning(
                    f"Could not process directory entry '{entry.name}' in {validated_path}: {entry_err}",
                    emoji_key="warning",
                )
                error_message = f"Error processing entry '{entry.name}': {entry_err}"
                scan_errors.append(error_message)
                # Add error entry to the list for visibility
                entries.append({"name": entry.name, "type": "error", "error": str(entry_err)})

        # Sort entries: directories, files, symlinks, others, errors; then alphabetically.
        entries.sort(
            key=lambda e: (
                0
                if e.get("type") == "directory"
                else 1
                if e.get("type") == "file"
                else 2
                if e.get("type") == "symlink"
                else 3
                if e.get("type") == "other"
                else 4,  # Errors last
                e.get("name", "").lower(),  # Case-insensitive sort by name
            )
        )

    except OSError as e:
        # Error during the initial scandir call (e.g., permission denied on the directory itself)
        raise ToolError(
            f"Error listing directory '{path}': {str(e)}",
            context={"path": validated_path, "errno": e.errno},
        ) from e
    except Exception as e:
        # Catch unexpected errors during iteration
        logger.error(
            f"Unexpected error listing directory {path}: {e}", exc_info=True, emoji_key="error"
        )
        raise ToolError(
            f"An unexpected error occurred listing directory: {str(e)}",
            context={"path": validated_path},
        ) from e

    processing_time = time.monotonic() - start_time
    logger.success(
        f"Listed directory: {path} ({len(entries)} entries found, {len(scan_errors)} errors)",
        emoji_key="directory",
        time=processing_time,
    )

    # Structure result clearly, including warnings if errors occurred on entries.
    result = {
        "path": validated_path,
        "entries": entries,
        "success": True,
        "message": f"Found {len(entries)} entries in '{validated_path}'.",
    }
    if scan_errors:
        warning_summary = f"Encountered {len(scan_errors)} errors processing directory entries."
        result["warnings"] = [warning_summary]
        # Optionally include first few errors: result["error_details"] = scan_errors[:5]

    return result


@with_tool_metrics
@with_error_handling
async def directory_tree(
    path: str, max_depth: int = 3, include_size: bool = False
) -> Dict[str, Any]:
    """Get a recursive tree view of a directory structure asynchronously.

    Args:
        path: Path to the root directory for the tree view.
        max_depth: Maximum recursion depth (-1 for effectively unlimited, capped internally).
        include_size: If True, include file sizes in the tree (requires extra stat calls).

    Returns:
        Dictionary containing the path and the hierarchical tree structure.
    """
    start_time = time.monotonic()
    # Internal safety cap for 'unlimited' depth to prevent runaway recursion.
    INTERNAL_DEPTH_CAP = 15

    # Validate path exists and is a directory (check_exists=True)
    validated_path = await validate_path(path, check_exists=True)
    if not await aiofiles.os.path.isdir(validated_path):
        # Check if it's a link to a directory
        if await aiofiles.os.path.islink(validated_path):
            try:
                target_is_dir = await aiofiles.os.path.isdir(
                    await aiofiles.os.path.realpath(validated_path)
                )
                if not target_is_dir:
                    raise ToolInputError(
                        f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a directory.",
                        param_name="path",
                        provided_value=path,
                    )
                # proceed with validated_path which resolves link
            except OSError as e:
                raise ToolError(
                    f"Error resolving or checking link target for directory tree: {e}",
                    context={"path": validated_path},
                ) from e
        else:
            raise ToolInputError(
                f"Path '{path}' (resolved to '{validated_path}') is not a directory.",
                param_name="path",
                provided_value=path,
            )

    # Validate max_depth input
    if not isinstance(max_depth, int):
        raise ToolInputError(
            "max_depth must be an integer.", param_name="max_depth", provided_value=max_depth
        )

    # Apply internal depth cap if necessary
    actual_max_depth = max_depth
    if max_depth < 0 or max_depth > INTERNAL_DEPTH_CAP:
        if max_depth >= 0:  # Only warn if user specified a large number, not for -1
            logger.warning(
                f"Requested max_depth {max_depth} exceeds internal cap {INTERNAL_DEPTH_CAP}. Limiting depth.",
                emoji_key="warning",
            )
        actual_max_depth = INTERNAL_DEPTH_CAP

    # --- Recursive Helper ---
    async def build_tree_recursive(current_path: str, current_depth: int) -> List[Dict[str, Any]]:
        """Recursively builds the directory tree structure."""
        if current_depth > actual_max_depth:
            # Return specific marker if depth limit hit, not just empty list.
            return [{"name": f"... (Max depth {actual_max_depth} reached)", "type": "info"}]

        children_nodes: List[Dict[str, Any]] = []
        try:
            # Await scandir and then iterate over the returned entries
            entries = await aiofiles.os.scandir(current_path)
            for entry in entries:
                entry_data: Dict[str, Any] = {"name": entry.name}
                try:
                    # Use lstat via entry to avoid following links unexpectedly
                    stat_res = entry.stat(follow_symlinks=False)
                    mode = stat_res.st_mode
                    l_is_dir = os.path.stat.S_ISDIR(mode)
                    l_is_file = os.path.stat.S_ISREG(mode)
                    l_is_link = os.path.stat.S_ISLNK(mode)

                    if l_is_dir:
                        entry_data["type"] = "directory"
                        # Recurse asynchronously into subdirectory
                        entry_data["children"] = await build_tree_recursive(
                            entry.path, current_depth + 1
                        )
                    elif l_is_file:
                        entry_data["type"] = "file"
                        if include_size:
                            entry_data["size"] = stat_res.st_size
                    elif l_is_link:
                        entry_data["type"] = "symlink"
                        if include_size:
                            entry_data["size"] = stat_res.st_size  # Size of link itself
                        # Optionally resolve link target? Can be noisy. Let's skip for tree view simplicity.
                        # try: entry_data["target"] = await aiofiles.os.readlink(entry.path)
                        # except OSError: entry_data["target"] = "<Error reading link>"
                    else:
                        entry_data["type"] = "other"
                        if include_size:
                            entry_data["size"] = stat_res.st_size

                    children_nodes.append(entry_data)

                except OSError as entry_err:
                    # Error processing one entry (e.g., permissions on stat)
                    logger.warning(
                        f"Could not process entry {entry.path} in tree: {entry_err}",
                        emoji_key="warning",
                    )
                    children_nodes.append(
                        {"name": entry.name, "type": "error", "error": str(entry_err)}
                    )

            # Sort entries at the current level alphabetically by name, type secondary
            children_nodes.sort(
                key=lambda e: (
                    0
                    if e.get("type") == "directory"
                    else 1
                    if e.get("type") == "file"
                    else 2
                    if e.get("type") == "symlink"
                    else 3,
                    e.get("name", "").lower(),  # Case-insensitive sort
                )
            )
            return children_nodes

        except OSError as e:
            # Error scanning the directory itself (e.g., permissions)
            logger.error(
                f"Error scanning directory {current_path} for tree: {str(e)}", emoji_key="error"
            )
            # Return error indicator instead of raising, allows partial trees if root is accessible
            return [{"name": f"... (Error scanning this directory: {e})", "type": "error"}]
        except Exception as e:
            # Unexpected error during scan
            logger.error(
                f"Unexpected error scanning directory {current_path} for tree: {e}",
                exc_info=True,
                emoji_key="error",
            )
            return [{"name": f"... (Unexpected error scanning: {e})", "type": "error"}]

    # --- End Recursive Helper ---

    # Generate tree starting from the validated path
    tree_structure = await build_tree_recursive(validated_path, 0)

    processing_time = time.monotonic() - start_time
    logger.success(
        f"Generated directory tree for: {path}",
        emoji_key="directory",
        max_depth=actual_max_depth,  # Log the effective depth
        requested_depth=max_depth,
        include_size=include_size,
        time=processing_time,
    )

    # Structure result clearly
    return {
        "path": validated_path,
        "max_depth_reached": actual_max_depth,
        "tree": tree_structure,
        "success": True,
        "message": f"Generated directory tree for '{validated_path}' up to depth {actual_max_depth}.",
    }


@with_tool_metrics
@with_error_handling
async def move_file(
    source: str,
    destination: str,
    overwrite: bool = False,  # Default to NOT overwrite for safety
) -> Dict[str, Any]:
    """Move or rename a file or directory asynchronously.

    Ensures both source and destination paths are within allowed directories.
    Checks for existence and potential conflicts before moving.

    Args:
        source: Path to the file or directory to move. Must exist.
        destination: New path for the file or directory. Parent must exist and be writable.
        overwrite: If True, allows overwriting an existing file or *empty* directory
                   at the destination. USE WITH CAUTION. Defaults False.

    Returns:
        Dictionary confirming the move operation, including source and destination paths.
    """
    start_time = time.monotonic()

    # Validate source path (must exist, check_exists=True)
    validated_source = await validate_path(source, check_exists=True)

    # Validate destination path (parent must exist and be writable, path itself may or may not exist check_exists=None)
    # Check parent writability *before* checking overwrite logic.
    validated_dest = await validate_path(destination, check_exists=None, check_parent_writable=True)

    # Validate overwrite flag type
    if not isinstance(overwrite, bool):
        raise ToolInputError(
            "overwrite parameter must be a boolean (true/false).",
            param_name="overwrite",
            provided_value=type(overwrite),
        )

    # NOTE: Modification protection is currently NOT applied here.
    # If overwriting a directory, current logic only allows overwriting an *empty* one via rmdir.
    # Overwriting a file is a single-file modification.
    # A future enhancement could add protection heuristics here if overwriting non-empty dirs was allowed.

    try:
        # Check if destination already exists using exists instead of lexists
        dest_exists = await aiofiles.os.path.exists(validated_dest)
        dest_is_dir = False
        if dest_exists:
            dest_is_dir = await aiofiles.os.path.isdir(
                validated_dest
            )  # Check type (follows links if dest is link)

        if dest_exists:
            if overwrite:
                # Overwrite requested. Log prominently.
                logger.warning(
                    f"Overwrite flag is True. Attempting to replace existing path '{validated_dest}' with '{validated_source}'.",
                    emoji_key="warning",
                )

                # Check if source and destination types are compatible for overwrite (e.g., cannot replace dir with file easily)
                # Use stat with follow_symlinks=False for source type check as well.
                source_stat = await aiofiles.os.stat(validated_source, follow_symlinks=False)
                is_source_dir = os.path.stat.S_ISDIR(source_stat.st_mode)

                # Simple check: Prevent overwriting dir with file or vice-versa.
                # Note: aiofiles.os.rename might handle some cases, but explicit check is safer.
                # This logic assumes we'd remove the destination first.
                if is_source_dir != dest_is_dir:
                    # Allow replacing link with dir/file, or dir/file with link? Be cautious.
                    # Let's prevent dir/file mismatch for simplicity. Overwriting links is tricky.
                    if not await aiofiles.os.path.islink(
                        validated_dest
                    ):  # Only enforce if dest is not a link
                        raise ToolInputError(
                            f"Cannot overwrite: Source is a {'directory' if is_source_dir else 'file/link'} but destination ('{validated_dest}') exists and is a {'directory' if dest_is_dir else 'file/link'}.",
                            param_name="destination",
                            provided_value=destination,
                        )

                # Attempt to remove the existing destination. This is the dangerous part.
                try:
                    if dest_is_dir:
                        # Use async rmdir - fails if directory is not empty!
                        # This prevents accidental recursive deletion via move+overwrite.
                        await aiofiles.os.rmdir(validated_dest)
                        logger.info(
                            f"Removed existing empty directory destination '{validated_dest}' for overwrite.",
                            emoji_key="action",
                        )
                    else:
                        # Removes a file or symlink
                        await aiofiles.os.remove(validated_dest)
                        logger.info(
                            f"Removed existing file/link destination '{validated_dest}' for overwrite.",
                            emoji_key="action",
                        )
                except OSError as remove_err:
                    # Handle cases like directory not empty, permission error during removal
                    raise ToolError(
                        f"Failed to remove existing destination '{validated_dest}' for overwrite: {remove_err}. Check permissions or if directory is empty.",
                        context={
                            "source": validated_source,
                            "destination": validated_dest,
                            "errno": remove_err.errno,
                        },
                    ) from remove_err

            else:  # Destination exists, and overwrite is False (default)
                raise ToolInputError(
                    f"Cannot move: Destination path '{destination}' (resolved to '{validated_dest}') already exists. Use overwrite=True to replace.",
                    param_name="destination",
                    provided_value=destination,
                )

        # Ensure source and destination are not the same path after normalization/resolution.
        # Note: aiofiles.os.rename handles this check internally too, but explicit check is clearer.
        # Use os.path.samfile to check if they refer to the same actual file/inode (more robust than string comparison)
        # samfile is sync, but should be fast. Requires paths to exist.
        try:
            # Source exists. Does dest exist now (after potential removal)?
            dest_exists_after_remove = await aiofiles.os.path.exists(validated_dest)  # noqa: F841
            # Only call samefile if both paths point to existing things after the removal step.
            # This check is mainly useful if overwrite was False and dest didn't exist initially but resolved same as source.
            # If dest existed and overwrite=True, it should have been removed.
            # Let's simplify: compare final validated paths. If rename fails later, it likely handles identity internally.
            if validated_source == validated_dest:
                logger.info(
                    f"Source and destination paths resolve to the same location ('{validated_source}'). No move needed.",
                    emoji_key="info",
                )
                return {
                    "source": validated_source,
                    "destination": validated_dest,
                    "success": True,
                    "message": "Source and destination are the same path. No operation performed.",
                }
        except OSError:
            # Ignore errors from samefile check, rely on rename's internal checks.
            pass

        # Attempt the move/rename operation asynchronously
        await aiofiles.os.rename(validated_source, validated_dest)

    except OSError as e:
        # Catch errors from exists, rename, remove, rmdir etc.
        logger.error(
            f"Error moving/renaming from '{source}' to '{destination}': {str(e)}",
            exc_info=True,
            emoji_key="error",
        )
        raise ToolError(
            f"Error moving '{source}' to '{destination}': {str(e)}",
            context={"source": validated_source, "destination": validated_dest, "errno": e.errno},
        ) from e
    except (ToolInputError, ToolError, ProtectionTriggeredError):  # Re-raise our specific errors
        raise
    except Exception as e:
        # Catch unexpected errors
        logger.error(
            f"Unexpected error moving {source} to {destination}: {e}",
            exc_info=True,
            emoji_key="error",
        )
        raise ToolError(
            f"An unexpected error occurred during move: {str(e)}",
            context={"source": validated_source, "destination": validated_dest},
        ) from e

    processing_time = time.monotonic() - start_time
    logger.success(
        f"Moved '{source}' to '{destination}'",
        emoji_key="file",
        time=processing_time,
        overwrite_used=overwrite,
    )

    return {
        "source": validated_source,  # Report original source for clarity
        "destination": validated_dest,
        "success": True,
        "message": f"Successfully moved '{validated_source}' to '{validated_dest}'.",
    }


@with_tool_metrics
@with_error_handling
async def delete_path(path: str) -> Dict[str, Any]:
    """Delete a file or an entire directory tree asynchronously.

    Validates the path exists and is allowed. If the path is a directory,
    applies deletion protection heuristics (if enabled) based on the directory's
    contents before proceeding with recursive deletion.

    Args:
        path: Path to the file or directory to delete.

    Returns:
        Dictionary confirming the deletion.
    """
    start_time = time.monotonic()

    # Validate path exists (check_exists=True), but don't resolve symlinks yet
    # We need to know if the original path is a symlink to handle it properly
    validation_result = await validate_path(path, check_exists=True, resolve_symlinks=False)

    # Check if the path is a symlink
    is_symlink = await aiofiles.os.path.islink(path)
    logger.info(f"Note: Deleting the symlink itself (not its target) at path: {path}")
    validated_path = validation_result

    try:
        deleted_type = "unknown"

        # First check if it's a symlink - we want to handle this separately
        # to avoid accidentally deleting the target
        if is_symlink:
            deleted_type = "symlink"
            logger.info(f"Deleting symlink: {validated_path}", emoji_key="delete")
            await aiofiles.os.remove(validated_path)

        # If not a symlink, proceed with regular directory or file deletion
        else:
            # We need to check if it's a directory or file - use follow_symlinks=True because
            # we already handled the symlink case above
            stat_info = await aiofiles.os.stat(validated_path, follow_symlinks=True)
            is_dir = os.path.stat.S_ISDIR(stat_info.st_mode)
            is_file = os.path.stat.S_ISREG(stat_info.st_mode)

            if is_dir:
                deleted_type = "directory"
                logger.info(f"Attempting to delete directory: {validated_path}", emoji_key="delete")
                # --- Deletion Protection Check ---
                try:
                    # List all file paths within the directory for heuristic checks
                    contained_file_paths = await _list_paths_recursive(validated_path)
                    if contained_file_paths:  # Only run check if directory is not empty
                        await _check_protection_heuristics(contained_file_paths, "deletion")
                    else:
                        logger.info(
                            f"Directory '{validated_path}' is empty, skipping detailed protection check.",
                            emoji_key="info",
                        )
                except ProtectionTriggeredError:
                    raise  # Re-raise the specific error if protection blocked the operation
                except ToolError as list_err:
                    # If listing contents failed, block deletion for safety?
                    raise ToolError(
                        f"Could not list directory contents for safety check before deleting '{validated_path}'. Deletion aborted. Reason: {list_err}",
                        context={"path": validated_path},
                    ) from list_err
                # --- End Protection Check ---

                # Protection passed (or disabled/not triggered), proceed with recursive delete
                await _async_rmtree(validated_path)

            elif is_file:
                deleted_type = "file"
                logger.info(f"Attempting to delete file: {validated_path}", emoji_key="delete")
                await aiofiles.os.remove(validated_path)
            else:
                # Should not happen if lexists passed validation, but handle defensively
                raise ToolError(
                    f"Cannot delete path '{validated_path}': It is neither a file, directory, nor a symbolic link.",
                    context={"path": validated_path},
                )

    except OSError as e:
        # Catch errors from remove, rmdir, or during rmtree
        logger.error(
            f"Error deleting path '{path}' (resolved: {validated_path}): {str(e)}",
            exc_info=True,
            emoji_key="error",
        )
        raise ToolError(
            f"Error deleting '{path}': {str(e)}", context={"path": validated_path, "errno": e.errno}
        ) from e
    except (ToolInputError, ToolError, ProtectionTriggeredError):  # Re-raise our specific errors
        raise
    except Exception as e:
        # Catch unexpected errors
        logger.error(f"Unexpected error deleting {path}: {e}", exc_info=True, emoji_key="error")
        raise ToolError(
            f"An unexpected error occurred during deletion: {str(e)}",
            context={"path": validated_path},
        ) from e

    processing_time = time.monotonic() - start_time
    logger.success(
        f"Successfully deleted {deleted_type}: '{path}' (resolved: {validated_path})",
        emoji_key="delete",
        time=processing_time,
    )

    return {
        "path": validated_path,
        "type_deleted": deleted_type,
        "success": True,
        "message": f"Successfully deleted {deleted_type} '{validated_path}'.",
    }


@with_tool_metrics
@with_error_handling
async def search_files(
    path: str,
    pattern: str,
    case_sensitive: bool = False,
    exclude_patterns: Optional[List[str]] = None,
    search_content: bool = False,
    max_content_bytes: int = 1024 * 1024,  # Limit content search size per file (1MB)
) -> Dict[str, Any]:
    """Search for files/directories matching a pattern asynchronously and recursively.

    Supports filename pattern matching (case-sensitive/insensitive substring)
    and optional searching within file content (UTF-8 text only, limited size).
    Allows exclusion patterns (glob format).

    Args:
        path: Directory path to start the search from.
        pattern: Text pattern to find. Used for substring match in names,
                 and exact string find in content if search_content=True.
        case_sensitive: If True, matching is case-sensitive. Defaults False.
        exclude_patterns: Optional list of glob-style patterns for paths/names
                          to exclude (e.g., ["*.log", ".git/"]). Matched case-insensitively
                          on appropriate systems.
        search_content: If True, also search *inside* text files for the pattern.
                        This can be significantly slower and memory intensive.
        max_content_bytes: Max bytes to read from each file when search_content=True.

    Returns:
        Dictionary with search parameters and a list of matching file/directory paths.
        May include warnings about errors encountered during search.
    """
    start_time = time.monotonic()

    # Validate path exists and is a directory (check_exists=True)
    validated_path = await validate_path(path, check_exists=True)
    if not await aiofiles.os.path.isdir(validated_path):
        # Check links to dirs
        if await aiofiles.os.path.islink(validated_path):
            try:
                if not await aiofiles.os.path.isdir(
                    await aiofiles.os.path.realpath(validated_path)
                ):
                    raise ToolInputError(
                        f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a directory.",
                        param_name="path",
                        provided_value=path,
                    )
                # proceed with validated_path
            except OSError as e:
                raise ToolError(
                    f"Error resolving or checking link target for search: {e}",
                    context={"path": validated_path},
                ) from e
        else:
            raise ToolInputError(
                f"Path '{path}' (resolved to '{validated_path}') is not a directory.",
                param_name="path",
                provided_value=path,
            )

    # Validate other inputs
    if not isinstance(pattern, str) or not pattern:
        raise ToolInputError(
            "Search pattern must be a non-empty string.",
            param_name="pattern",
            provided_value=pattern,
        )
    if exclude_patterns:  # Ensure it's a list of strings if provided
        if not isinstance(exclude_patterns, list):
            raise ToolInputError(
                "Exclude patterns must be a list of strings.",
                param_name="exclude_patterns",
                provided_value=exclude_patterns,
            )
        if not all(isinstance(p, str) for p in exclude_patterns):
            raise ToolInputError(
                "All items in exclude_patterns must be strings.", param_name="exclude_patterns"
            )
    if not isinstance(case_sensitive, bool):
        raise ToolInputError(
            "case_sensitive must be a boolean.",
            param_name="case_sensitive",
            provided_value=case_sensitive,
        )
    if not isinstance(search_content, bool):
        raise ToolInputError(
            "search_content must be a boolean.",
            param_name="search_content",
            provided_value=search_content,
        )
    if not isinstance(max_content_bytes, int) or max_content_bytes < 0:
        raise ToolInputError(
            "max_content_bytes must be a non-negative integer.",
            param_name="max_content_bytes",
            provided_value=max_content_bytes,
        )

    search_errors: List[str] = []
    # Using a set to store matched paths avoids duplicates efficiently.
    matched_paths: Set[str] = set()

    # Prepare pattern based on case sensitivity
    search_pattern = pattern if case_sensitive else pattern.lower()

    # Error handler callback for async_walk
    MAX_REPORTED_ERRORS = 50

    def onerror(os_error: OSError):
        """Callback to handle and log errors during file tree walking."""
        err_msg = f"Permission or access error during search near '{getattr(os_error, 'filename', 'N/A')}': {getattr(os_error, 'strerror', str(os_error))}"
        # Limit number of reported errors to avoid flooding logs/results
        if len(search_errors) < MAX_REPORTED_ERRORS:
            logger.warning(err_msg, emoji_key="warning")
            search_errors.append(err_msg)
        elif len(search_errors) == MAX_REPORTED_ERRORS:
            suppress_msg = f"... (Further {type(os_error).__name__} errors suppressed)"
            logger.warning(suppress_msg, emoji_key="warning")
            search_errors.append(suppress_msg)

    # --- File Content Search Task (if enabled) ---
    async def check_file_content(filepath: str) -> bool:
        """Reads limited file content and checks for the pattern."""
        try:
            # Read limited chunk, ignore decoding errors for content search robustness
            async with aiofiles.open(filepath, mode="r", encoding="utf-8", errors="ignore") as f:
                content_chunk = await f.read(max_content_bytes)
            # Perform search (case sensitive or insensitive)
            if case_sensitive:
                return pattern in content_chunk
            else:
                return search_pattern in content_chunk.lower()  # Compare lower case
        except OSError as read_err:
            # Log content read errors but don't necessarily fail the whole search
            logger.warning(
                f"Could not read content of {filepath} for search: {read_err}", emoji_key="warning"
            )
            onerror(read_err)  # Report as a search error
        except Exception as read_unexpected_err:
            logger.error(
                f"Unexpected error reading content of {filepath}: {read_unexpected_err}",
                exc_info=True,
                emoji_key="error",
            )
            # Do not report unexpected errors via onerror, log them fully.
        return False

    # --- End File Content Search Task ---

    try:
        # Use the async_walk helper for efficient traversal and exclusion handling
        # followlinks=True: Search should probably follow links to find matches within linked dirs too.
        # Exclude patterns will apply to paths within the linked directories relative to the base path.
        # Since we've fixed async_walk to use await and iterate properly, this should work as expected
        async for root, dirs, files in async_walk(
            validated_path,
            onerror=onerror,
            exclude_patterns=exclude_patterns,
            base_path=validated_path,  # Use original validated path as base for excludes
            followlinks=True,
        ):
            # Check matching directory names
            for dirname in dirs:
                name_to_check = dirname if case_sensitive else dirname.lower()
                if search_pattern in name_to_check:
                    match_path = os.path.join(root, dirname)
                    matched_paths.add(match_path)  # Add to set (handles duplicates)

            # Check matching file names OR content
            content_check_tasks = []
            files_to_check_content = []

            for filename in files:
                name_to_check = filename if case_sensitive else filename.lower()
                match_path = os.path.join(root, filename)
                name_match = search_pattern in name_to_check

                if name_match:
                    # Check if already added via content search in a previous iteration (unlikely but possible)
                    if match_path not in matched_paths:
                        matched_paths.add(match_path)  # Add name match

                # If searching content AND name didn't already match, schedule content check
                # Also check if path isn't already matched from a directory name match higher up.
                if search_content and match_path not in matched_paths:
                    # Avoid queueing files already matched by name (implicitly handled by set)
                    files_to_check_content.append(match_path)
                    # Create task but don't await yet, gather below
                    content_check_tasks.append(check_file_content(match_path))

            # Run content checks concurrently for this directory level
            if content_check_tasks:
                content_results = await asyncio.gather(*content_check_tasks, return_exceptions=True)
                for idx, result in enumerate(content_results):
                    file_path_checked = files_to_check_content[idx]
                    if isinstance(result, Exception):
                        logger.warning(
                            f"Error during content check task for {file_path_checked}: {result}",
                            emoji_key="error",
                        )
                        # Errors during check_file_content are logged there, potentially reported via onerror too.
                    elif result is True:  # Content matched
                        matched_paths.add(file_path_checked)  # Add content match

    except Exception as e:
        # Catch unexpected errors during the async iteration/walk setup itself
        logger.error(
            f"Unexpected error during file search setup or walk in {path}: {e}",
            exc_info=True,
            emoji_key="error",
        )
        raise ToolError(
            f"An unexpected error occurred during search execution: {str(e)}",
            context={"path": path, "pattern": pattern},
        ) from e

    processing_time = time.monotonic() - start_time
    # Convert the set of unique matched paths to a sorted list for consistent output.
    unique_matches = sorted(list(matched_paths))

    logger.success(
        f"Search for '{pattern}' in {path} completed ({len(unique_matches)} unique matches found)",
        emoji_key="search",
        errors_encountered=len(search_errors),
        time=processing_time,
        case_sensitive=case_sensitive,
        search_content=search_content,
    )

    result = {
        "path": validated_path,
        "pattern": pattern,
        "case_sensitive": case_sensitive,
        "search_content": search_content,
        "matches": unique_matches,
        "success": True,
        "message": f"Found {len(unique_matches)} unique matches for '{pattern}' in '{validated_path}'.",
    }
    if search_errors:
        # Add consolidated warning if errors occurred during walk/stat.
        result["warnings"] = search_errors  # Include actual errors reported by onerror
        result["message"] += (
            f" Encountered {len(search_errors)} access or read errors during search."
        )

    return result


@with_tool_metrics
@with_error_handling
async def get_file_info(path: str) -> Dict[str, Any]:
    """Get detailed metadata about a specific file or directory asynchronously.

    Validates the path exists and is allowed. Returns information like size,
    timestamps, type, and permissions. Uses lstat to report info about the path
    itself (including if it's a symlink).

    Args:
        path: Path to the file or directory.

    Returns:
        Dictionary containing detailed file information, marked with success=True.
        If info retrieval fails after validation, raises ToolError.
    """
    start_time = time.monotonic()

    # Validate path (must exist, check_exists=True), but don't resolve symlinks
    # We want to get info about the path as specified by the user
    validation_result = await validate_path(path, check_exists=True, resolve_symlinks=False)

    # Check if this is a symlink
    is_symlink = isinstance(validation_result, dict) and validation_result.get("is_symlink")
    if is_symlink:
        validated_path = validation_result.get("symlink_path")
    else:
        validated_path = validation_result

    # Get file information asynchronously using the helper
    # Don't follow symlinks - we want info about the path itself
    info = await format_file_info(validated_path, follow_symlinks=False)

    # Check if the helper returned an error structure
    if "error" in info:
        # Propagate the error. Since path validation passed, this is likely
        # a transient issue or permission problem reading metadata. Use ToolError.
        raise ToolError(
            f"Failed to get file info for '{validated_path}': {info['error']}",
            context={"path": validated_path},
        )

    # Info retrieval successful
    processing_time = time.monotonic() - start_time
    logger.success(
        f"Got file info for: {path} (resolved: {validated_path})",
        emoji_key="file",
        time=processing_time,
    )

    # Add success flag and descriptive message to the info dictionary
    info["success"] = True
    info["message"] = f"Successfully retrieved info for '{validated_path}'."
    return info


@with_tool_metrics
@with_error_handling
async def list_allowed_directories() -> Dict[str, Any]:
    """List all directories configured as allowed for filesystem access.

    This is primarily an administrative/debugging tool. Reads from the loaded config.

    Returns:
        Dictionary containing the list of allowed base directory paths.
    """
    start_time = time.monotonic()

    # --- Use get_allowed_directories which reads from config ---
    try:
        allowed_dirs = get_allowed_directories()
    except Exception as e:
        # Handle rare errors during config retrieval itself
        raise ToolError(f"Failed to retrieve allowed directories configuration: {e}") from e

    processing_time = time.monotonic() - start_time
    logger.success(
        f"Listed {len(allowed_dirs)} allowed directories", emoji_key="config", time=processing_time
    )

    return {
        "directories": allowed_dirs,
        "count": len(allowed_dirs),
        "success": True,
        "message": f"Retrieved {len(allowed_dirs)} configured allowed directories.",
    }

```
Page 30/35FirstPrevNextLast