#
tokens: 40434/50000 1/207 files (page 40/45)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 40 of 45. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/filesystem.py:
--------------------------------------------------------------------------------

```python
   1 | """Secure asynchronous filesystem tools for Ultimate MCP Server.
   2 | 
   3 | This module provides secure asynchronous filesystem operations, including reading, writing,
   4 | deleting, and manipulating files and directories, with robust security controls to limit access
   5 | and optional heuristics to prevent accidental mass deletion/modification by LLMs.
   6 | """
   7 | 
   8 | import asyncio
   9 | import datetime  # Using datetime for standard timestamp representation.
  10 | import difflib
  11 | import json
  12 | import math  # For isnan checks
  13 | import os
  14 | import statistics  # For calculating standard deviation in protection heuristics
  15 | import time
  16 | from fnmatch import fnmatch  # Keep sync fnmatch for pattern matching
  17 | from typing import Any, AsyncGenerator, Dict, List, Literal, Optional, Set, Tuple, Union, cast
  18 | 
  19 | import aiofiles
  20 | import aiofiles.os
  21 | from pydantic import BaseModel
  22 | 
  23 | from ultimate_mcp_server.config import FilesystemConfig, GatewayConfig, get_config
  24 | from ultimate_mcp_server.exceptions import ToolError, ToolInputError
  25 | from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
  26 | from ultimate_mcp_server.utils import get_logger
  27 | 
  28 | logger = get_logger("ultimate_mcp_server.tools.filesystem")
  29 | 
  30 | 
  31 | class ProtectionTriggeredError(ToolError):
  32 |     """Exception raised when a security protection measure is triggered."""
  33 | 
  34 |     def __init__(self, message, protection_type=None, context=None, details=None):
  35 |         """Initialize the protection triggered error.
  36 | 
  37 |         Args:
  38 |             message: Error message
  39 |             protection_type: Type of protection triggered (e.g., "deletion_protection", "path_protection")
  40 |             context: Context information about the protection trigger
  41 |             details: Additional error details
  42 |         """
  43 |         error_details = details or {}
  44 |         if protection_type:
  45 |             error_details["protection_type"] = protection_type
  46 | 
  47 |         self.context = context or {}
  48 |         if context:
  49 |             error_details["context"] = context
  50 | 
  51 |         super().__init__(message, error_code="PROTECTION_TRIGGERED", details=error_details)
  52 | 
  53 | 
  54 | # --- Configuration and Security ---
  55 | 
  56 | 
  57 | def get_filesystem_config() -> "FilesystemConfig":  # Use type hint from config module
  58 |     """Get filesystem configuration object from the main config."""
  59 |     cfg: GatewayConfig = get_config()
  60 |     # Access the validated FilesystemConfig object directly
  61 |     fs_config = cfg.filesystem
  62 |     if not fs_config:  # Should not happen with default_factory, but check defensively
  63 |         logger.error(
  64 |             "Filesystem configuration missing after load. Using defaults.", emoji_key="config"
  65 |         )
  66 |         from ultimate_mcp_server.config import (
  67 |             FilesystemConfig,  # Local import to avoid circularity at top level
  68 |         )
  69 | 
  70 |         return FilesystemConfig()
  71 |     return fs_config
  72 | 
  73 | 
  74 | def get_protection_config(operation_type: Literal["deletion", "modification"]) -> Dict[str, Any]:
  75 |     """Get protection settings for a specific operation type as a dictionary."""
  76 |     fs_config = get_filesystem_config()
  77 |     protection_attr_name = f"file_{operation_type}_protection"
  78 | 
  79 |     if hasattr(fs_config, protection_attr_name):
  80 |         protection_config_obj = getattr(fs_config, protection_attr_name)
  81 |         if protection_config_obj and isinstance(
  82 |             protection_config_obj, BaseModel
  83 |         ):  # Check it's a Pydantic model instance
  84 |             # Convert the Pydantic model to a dictionary for consistent access
  85 |             return protection_config_obj.model_dump()
  86 |         else:
  87 |             logger.warning(
  88 |                 f"Protection config for '{operation_type}' is not a valid model instance. Using defaults.",
  89 |                 emoji_key="config",
  90 |             )
  91 |     else:
  92 |         logger.warning(
  93 |             f"Protection config attribute '{protection_attr_name}' not found. Using defaults.",
  94 |             emoji_key="config",
  95 |         )
  96 | 
  97 |     # Return default dictionary structure if config is missing or invalid
  98 |     # Fetch defaults from the Pydantic model definition if possible
  99 |     try:
 100 |         from ultimate_mcp_server.config import FilesystemProtectionConfig
 101 | 
 102 |         return FilesystemProtectionConfig().model_dump()
 103 |     except ImportError:  # Fallback if import fails
 104 |         return {
 105 |             "enabled": False,
 106 |             "max_files_threshold": 100,
 107 |             "datetime_stddev_threshold_sec": 60 * 60 * 24 * 30,
 108 |             "file_type_variance_threshold": 5,
 109 |             "max_stat_errors_pct": 10.0,
 110 |         }
 111 | 
 112 | 
 113 | def get_allowed_directories() -> List[str]:
 114 |     """Get allowed directories from configuration.
 115 | 
 116 |     Reads the configuration, normalizes paths (absolute, resolves symlinks),
 117 |     and returns a list of unique allowed directory paths. Assumes paths were expanded
 118 |     during config load.
 119 | 
 120 |     Returns:
 121 |         List of normalized, absolute directory paths that can be accessed.
 122 |     """
 123 |     fs_config = get_filesystem_config()
 124 |     # Access the already expanded list from the validated config object
 125 |     allowed_config: List[str] = fs_config.allowed_directories
 126 | 
 127 |     if not allowed_config:
 128 |         logger.warning(
 129 |             "No filesystem directories configured or loaded for access. All operations may be rejected.",
 130 |             emoji_key="security",
 131 |         )
 132 |         return []
 133 | 
 134 |     # Paths should already be expanded and absolute from config loading.
 135 |     # We still need to normalize and ensure uniqueness.
 136 |     normalized: List[str] = []
 137 |     for d in allowed_config:
 138 |         try:
 139 |             # Basic normalization (separator consistency)
 140 |             norm_d = os.path.normpath(d)
 141 |             if norm_d not in normalized:  # Avoid duplicates
 142 |                 normalized.append(norm_d)
 143 |         except Exception as e:
 144 |             # Log errors during normalization but continue
 145 |             logger.error(
 146 |                 f"Error normalizing configured allowed directory '{d}': {e}. Skipping.",
 147 |                 emoji_key="config",
 148 |             )
 149 | 
 150 |     if not normalized and allowed_config:
 151 |         logger.error(
 152 |             "Filesystem access potentially misconfigured: No valid allowed directories remain after normalization.",
 153 |             emoji_key="security",
 154 |         )
 155 |     elif not normalized:
 156 |         # Warning about no configured dirs was logged earlier if allowed_config was empty.
 157 |         pass
 158 | 
 159 |     # Debug log the final effective list used by tools
 160 |     logger.debug(
 161 |         f"Filesystem tools operating with {len(normalized)} normalized allowed directories",
 162 |         allowed_directories=normalized,
 163 |     )
 164 |     return normalized
 165 | 
 166 | 
 167 | # --- Path Validation ---
 168 | async def validate_path(
 169 |     path: str,
 170 |     check_exists: Optional[bool] = None,
 171 |     check_parent_writable: bool = False,
 172 |     resolve_symlinks: bool = False,
 173 | ) -> str:
 174 |     """Validate a path for security and accessibility using async I/O.
 175 | 
 176 |     Performs several checks:
 177 |     1.  Ensures path is a non-empty string.
 178 |     2.  Normalizes the path (expands user, makes absolute, resolves '../').
 179 |     3.  Checks if the normalized path is within the configured allowed directories.
 180 |     4.  If check_exists is True, checks if the path exists. If False, checks it does NOT exist.
 181 |     5.  If resolve_symlinks is True, resolves symbolic links and re-validates the real path against allowed dirs.
 182 |     6.  If check_exists is False, checks parent directory existence.
 183 |     7.  If `check_parent_writable` is True and path likely needs creation, checks parent directory write permissions.
 184 | 
 185 |     Args:
 186 |         path: The file or directory path input string to validate.
 187 |         check_exists: If True, path must exist. If False, path must NOT exist. If None, existence is not checked.
 188 |         check_parent_writable: If True and path doesn't exist/creation is implied, check if parent dir is writable.
 189 |         resolve_symlinks: If True, follows symlinks and returns their target path. If False, keeps the symlink path.
 190 | 
 191 |     Returns:
 192 |         The normalized, absolute, validated path string.
 193 | 
 194 |     Raises:
 195 |         ToolInputError: If the path is invalid (format, permissions, existence violation,
 196 |                         outside allowed dirs, symlink issue).
 197 |         ToolError: For underlying filesystem errors or configuration issues.
 198 |     """
 199 |     if not path or not isinstance(path, str):
 200 |         raise ToolInputError(
 201 |             "Path must be a non-empty string.",
 202 |             param_name="path",
 203 |             provided_value=repr(path),  # Use repr for clarity on non-string types
 204 |         )
 205 | 
 206 |     # Path normalization (sync, generally fast)
 207 |     try:
 208 |         path_expanded = os.path.expanduser(path)
 209 |         path_abs = os.path.abspath(path_expanded)
 210 |         # Normalize '.','..' and separators
 211 |         normalized_path = os.path.normpath(path_abs)
 212 |     except Exception as e:
 213 |         raise ToolInputError(
 214 |             f"Invalid path format or resolution error: {str(e)}",
 215 |             param_name="path",
 216 |             provided_value=path,
 217 |         ) from e
 218 | 
 219 |     # --- Use get_allowed_directories which reads from config ---
 220 |     allowed_dirs = get_allowed_directories()
 221 |     if not allowed_dirs:
 222 |         raise ToolError(
 223 |             "Filesystem access is disabled: No allowed directories are configured or loadable.",
 224 |             context={"configured_directories": 0},  # Provide context
 225 |         )
 226 | 
 227 |     # Ensure normalized_path is truly *under* an allowed dir.
 228 |     is_allowed = False
 229 |     original_validated_path = normalized_path  # Store before symlink resolution
 230 |     for allowed_dir in allowed_dirs:
 231 |         # Ensure allowed_dir is also normalized for comparison
 232 |         norm_allowed_dir = os.path.normpath(allowed_dir)
 233 |         # Path must be exactly the allowed dir or start with the allowed dir + separator.
 234 |         if normalized_path == norm_allowed_dir or normalized_path.startswith(
 235 |             norm_allowed_dir + os.sep
 236 |         ):
 237 |             is_allowed = True
 238 |             break
 239 | 
 240 |     if not is_allowed:
 241 |         logger.warning(
 242 |             f"Path '{normalized_path}' denied access. Not within allowed directories: {allowed_dirs}",
 243 |             emoji_key="security",
 244 |         )
 245 |         raise ToolInputError(
 246 |             f"Access denied: Path '{path}' resolves to '{normalized_path}', which is outside the allowed directories.",
 247 |             param_name="path",
 248 |             provided_value=path,
 249 |             # Add context about allowed dirs for debugging? Potentially sensitive.
 250 |             # context={"allowed": allowed_dirs}
 251 |         )
 252 | 
 253 |     # Filesystem checks using aiofiles.os
 254 |     current_validated_path = normalized_path  # Start with normalized path
 255 |     is_symlink = False
 256 |     symlink_target_path = None
 257 | 
 258 |     try:
 259 |         # Use stat with follow_symlinks=False to check the item itself (similar to lstat)
 260 |         try:
 261 |             lstat_info = await aiofiles.os.stat(current_validated_path, follow_symlinks=False)
 262 |             path_exists_locally = True  # stat succeeded, so the path entry itself exists
 263 |             is_symlink = os.path.stat.S_ISLNK(lstat_info.st_mode)
 264 | 
 265 |             if is_symlink:
 266 |                 try:
 267 |                     # Get the target for information purposes
 268 |                     symlink_target = await aiofiles.os.readlink(current_validated_path)
 269 |                     symlink_target_path = symlink_target
 270 |                     logger.debug(f"Path '{path}' is a symlink pointing to '{symlink_target}'")
 271 |                 except OSError as link_err:
 272 |                     logger.warning(
 273 |                         f"Error reading symlink target for '{current_validated_path}': {link_err}"
 274 |                     )
 275 | 
 276 |         except FileNotFoundError:
 277 |             path_exists_locally = False
 278 |             is_symlink = False
 279 |         except OSError as e:
 280 |             # Handle other OS errors during stat check
 281 |             logger.error(
 282 |                 f"OS Error during stat check for '{current_validated_path}': {e}", exc_info=True
 283 |             )
 284 |             raise ToolError(
 285 |                 f"Filesystem error checking path status for '{path}': {str(e)}",
 286 |                 context={"path": path, "resolved_path": current_validated_path},
 287 |             ) from e
 288 | 
 289 |         # Resolve symlink if it exists and re-validate
 290 |         if is_symlink and resolve_symlinks:
 291 |             try:
 292 |                 # Use synchronous os.path.realpath since aiofiles.os.path doesn't have it
 293 |                 real_path = os.path.realpath(current_validated_path)
 294 |                 real_normalized = os.path.normpath(real_path)
 295 |                 symlink_target_path = real_normalized  # noqa: F841
 296 | 
 297 |                 # Re-check if the *real* resolved path is within allowed directories
 298 |                 is_real_allowed = False
 299 |                 for allowed_dir in allowed_dirs:
 300 |                     norm_allowed_dir = os.path.normpath(allowed_dir)
 301 |                     if real_normalized == norm_allowed_dir or real_normalized.startswith(
 302 |                         norm_allowed_dir + os.sep
 303 |                     ):
 304 |                         is_real_allowed = True
 305 |                         break
 306 | 
 307 |                 if not is_real_allowed:
 308 |                     raise ToolInputError(
 309 |                         f"Access denied: Path '{path}' is a symbolic link pointing to '{real_normalized}', which is outside allowed directories.",
 310 |                         param_name="path",
 311 |                         provided_value=path,
 312 |                     )
 313 | 
 314 |                 # If validation passed, use the real path for further checks *about the target*
 315 |                 current_validated_path = real_normalized
 316 |                 # Re-check existence *of the target* - use exists instead of lexists
 317 |                 path_exists = await aiofiles.os.path.exists(current_validated_path)
 318 | 
 319 |             except OSError as e:
 320 |                 # Handle errors during realpath resolution (e.g., broken link, permissions)
 321 |                 if isinstance(e, FileNotFoundError):
 322 |                     # Broken link - the link entry exists, but target doesn't
 323 |                     path_exists = False
 324 |                     if check_exists is True:
 325 |                         raise ToolInputError(
 326 |                             f"Required path '{path}' is a symbolic link pointing to a non-existent target ('{original_validated_path}' -> target missing).",
 327 |                             param_name="path",
 328 |                             provided_value=path,
 329 |                         ) from e
 330 |                     # If check_exists is False or None, a broken link might be acceptable depending on the operation.
 331 |                     # Keep current_validated_path as the *link path itself* if the target doesn't exist.
 332 |                     current_validated_path = original_validated_path
 333 |                 else:
 334 |                     raise ToolInputError(
 335 |                         f"Error resolving symbolic link '{path}': {str(e)}",
 336 |                         param_name="path",
 337 |                         provided_value=path,
 338 |                     ) from e
 339 |             except ToolInputError:  # Re-raise specific input errors
 340 |                 raise
 341 |             except Exception as e:  # Catch other unexpected errors during link resolution
 342 |                 raise ToolError(
 343 |                     f"Unexpected error resolving symbolic link for '{path}': {str(e)}",
 344 |                     context={"path": path},
 345 |                 ) from e
 346 |         else:
 347 |             # Not a link or not resolving it, so existence check result is based on the initial check
 348 |             path_exists = path_exists_locally
 349 | 
 350 |         # Check existence requirement *after* potential symlink resolution
 351 |         if check_exists is True and not path_exists:
 352 |             raise ToolInputError(
 353 |                 f"Required path '{path}' (resolved to '{current_validated_path}') does not exist.",
 354 |                 param_name="path",
 355 |                 provided_value=path,
 356 |                 details={
 357 |                     "path": path,
 358 |                     "resolved_path": current_validated_path,
 359 |                     "error_type": "PATH_NOT_FOUND",
 360 |                 },
 361 |             )
 362 |         elif check_exists is False and path_exists:
 363 |             raise ToolInputError(
 364 |                 f"Path '{path}' (resolved to '{current_validated_path}') already exists, but non-existence was required.",
 365 |                 param_name="path",
 366 |                 provided_value=path,
 367 |                 details={
 368 |                     "path": path,
 369 |                     "resolved_path": current_validated_path,
 370 |                     "error_type": "PATH_ALREADY_EXISTS",
 371 |                 },
 372 |             )
 373 |         # else: check_exists is None, or condition met
 374 | 
 375 |         # If path doesn't exist and creation is likely (check_exists is False or None), check parent.
 376 |         parent_dir = os.path.dirname(current_validated_path)
 377 |         if (
 378 |             parent_dir and parent_dir != current_validated_path
 379 |         ):  # Check parent_dir is not empty and not the root itself
 380 |             try:
 381 |                 parent_exists = await aiofiles.os.path.exists(
 382 |                     parent_dir
 383 |                 )  # Check if parent exists first
 384 |                 if parent_exists:
 385 |                     if not await aiofiles.os.path.isdir(parent_dir):
 386 |                         raise ToolInputError(
 387 |                             f"Cannot operate on '{path}': Parent path '{parent_dir}' exists but is not a directory.",
 388 |                             param_name="path",
 389 |                             provided_value=path,
 390 |                         )
 391 |                     # Parent exists and is a directory, check writability if requested
 392 |                     if check_parent_writable:
 393 |                         if not os.access(parent_dir, os.W_OK | os.X_OK):
 394 |                             raise ToolInputError(
 395 |                                 f"Cannot operate on '{path}': Parent directory '{parent_dir}' exists but is not writable or accessible.",
 396 |                                 param_name="path",
 397 |                                 provided_value=path,
 398 |                             )
 399 |                 # else: Parent does NOT exist.
 400 |                 # If check_parent_writable was True, it's okay if parent doesn't exist because makedirs will create it.
 401 |                 # If check_parent_writable was False (or not requested for this scenario), we might still want to error if parent doesn't exist depending on the operation context.
 402 |                 # For create_directory context, non-existence of parent is fine.
 403 |             except OSError as e:
 404 |                 raise ToolError(
 405 |                     f"Filesystem error checking parent directory '{parent_dir}' for '{path}': {str(e)}",
 406 |                     context={"path": path, "parent": parent_dir},
 407 |                 ) from e
 408 | 
 409 |     except OSError as e:
 410 |         # Catch filesystem errors during async checks like exists, isdir, islink on the primary path
 411 |         raise ToolError(
 412 |             f"Filesystem error validating path '{path}': {str(e)}",
 413 |             context={"path": path, "resolved_path": current_validated_path, "error": str(e)},
 414 |         ) from e
 415 |     except ToolInputError:  # Re-raise ToolInputErrors from validation logic
 416 |         raise
 417 |     except Exception as e:
 418 |         # Catch unexpected errors during validation logic
 419 |         logger.error(f"Unexpected error during path validation for {path}: {e}", exc_info=True)
 420 |         raise ToolError(
 421 |             f"An unexpected error occurred validating path: {str(e)}", context={"path": path}
 422 |         ) from e
 423 | 
 424 |     # Always return the validated path string
 425 |     return current_validated_path
 426 | 
 427 | 
 428 | # --- Helper Functions ---
 429 | 
 430 | 
 431 | async def format_file_info(file_path: str, follow_symlinks: bool = False) -> Dict[str, Any]:
 432 |     """Get detailed file or directory information asynchronously.
 433 | 
 434 |     Uses `aiofiles.os.stat` to retrieve metadata.
 435 | 
 436 |     Args:
 437 |         file_path: Path to file or directory (assumed validated).
 438 |         follow_symlinks: If True, follows symlinks to get info about their targets.
 439 |                         If False, gets info about the symlink itself.
 440 | 
 441 |     Returns:
 442 |         Dictionary with file/directory details (name, path, size, timestamps, type, permissions).
 443 |         If an OS error occurs during stat, returns a dict containing 'name', 'path', and 'error'.
 444 |     """
 445 |     try:
 446 |         # Use stat results directly where possible to avoid redundant checks
 447 |         # Use stat with follow_symlinks parameter to control whether we stat the link or the target
 448 |         stat_info = await aiofiles.os.stat(file_path, follow_symlinks=follow_symlinks)
 449 |         mode = stat_info.st_mode
 450 |         is_dir = os.path.stat.S_ISDIR(mode)
 451 |         is_file = os.path.stat.S_ISREG(mode)  # Check for regular file
 452 |         is_link = os.path.stat.S_ISLNK(mode)  # Check if the item stat looked at is a link
 453 | 
 454 |         # Use timezone-aware ISO format timestamps for machine readability.
 455 |         # Handle potential platform differences in ctime availability (fallback to mtime).
 456 |         try:
 457 |             # Some systems might not have birthtime (st_birthtime)
 458 |             # ctime is platform dependent (creation on Windows, metadata change on Unix)
 459 |             # Use mtime as the most reliable "last modified" timestamp.
 460 |             # Let's report both ctime and mtime if available.
 461 |             ctime_ts = stat_info.st_ctime
 462 |         except AttributeError:
 463 |             ctime_ts = stat_info.st_mtime  # Fallback
 464 | 
 465 |         # Ensure timestamps are valid before conversion
 466 |         def safe_isoformat(timestamp):
 467 |             try:
 468 |                 # Handle potential negative timestamps or values outside valid range
 469 |                 if timestamp < 0:
 470 |                     return "Invalid Timestamp (Negative)"
 471 |                 # Check against a reasonable range (e.g., year 1 to 9999)
 472 |                 min_ts = datetime.datetime(1, 1, 1, tzinfo=datetime.timezone.utc).timestamp()
 473 |                 max_ts = datetime.datetime(
 474 |                     9999, 12, 31, 23, 59, 59, tzinfo=datetime.timezone.utc
 475 |                 ).timestamp()
 476 |                 if not (min_ts <= timestamp <= max_ts):
 477 |                     return f"Invalid Timestamp (Out of Range: {timestamp})"
 478 | 
 479 |                 return datetime.datetime.fromtimestamp(
 480 |                     timestamp, tz=datetime.timezone.utc
 481 |                 ).isoformat()
 482 |             except (OSError, ValueError) as e:  # Handle potential errors like invalid values
 483 |                 logger.warning(
 484 |                     f"Invalid timestamp {timestamp} for {file_path}: {e}", emoji_key="warning"
 485 |                 )
 486 |                 return f"Invalid Timestamp ({type(e).__name__})"
 487 | 
 488 |         info = {
 489 |             "name": os.path.basename(file_path),
 490 |             "path": file_path,
 491 |             "size": stat_info.st_size,
 492 |             "created_os_specific": safe_isoformat(ctime_ts),  # Note platform dependency
 493 |             "modified": safe_isoformat(stat_info.st_mtime),
 494 |             "accessed": safe_isoformat(stat_info.st_atime),
 495 |             "is_directory": is_dir,
 496 |             "is_file": is_file,
 497 |             "is_symlink": is_link,  # Indicate if the path itself is a symlink
 498 |             # Use S_IMODE for standard permission bits (mode & 0o777)
 499 |             "permissions": oct(os.path.stat.S_IMODE(mode)),
 500 |         }
 501 | 
 502 |         if is_link or (not follow_symlinks and await aiofiles.os.path.islink(file_path)):
 503 |             try:
 504 |                 # Attempt to read the link target
 505 |                 link_target = await aiofiles.os.readlink(file_path)
 506 |                 info["symlink_target"] = link_target
 507 |             except OSError as link_err:
 508 |                 info["symlink_target"] = f"<Error reading link: {link_err}>"
 509 | 
 510 |         return info
 511 |     except OSError as e:
 512 |         logger.warning(f"Error getting file info for {file_path}: {str(e)}", emoji_key="warning")
 513 |         # Return consistent error structure, let caller decide severity.
 514 |         return {
 515 |             "name": os.path.basename(file_path),
 516 |             "path": file_path,
 517 |             "error": f"Failed to get info: {str(e)}",
 518 |         }
 519 |     except Exception as e:  # Catch unexpected errors
 520 |         logger.error(
 521 |             f"Unexpected error getting file info for {file_path}: {e}",
 522 |             exc_info=True,
 523 |             emoji_key="error",
 524 |         )
 525 |         return {
 526 |             "name": os.path.basename(file_path),
 527 |             "path": file_path,
 528 |             "error": f"An unexpected error occurred: {str(e)}",
 529 |         }
 530 | 
 531 | 
 532 | def create_unified_diff(original_content: str, new_content: str, filepath: str) -> str:
 533 |     """Create a unified diff string between original and new content.
 534 | 
 535 |     Args:
 536 |         original_content: Original file content as a single string.
 537 |         new_content: New file content as a single string.
 538 |         filepath: Path to the file (used in the diff header).
 539 | 
 540 |     Returns:
 541 |         Unified diff as a multi-line string, or empty string if no differences.
 542 |     """
 543 |     # Normalize line endings for consistent diffing
 544 |     original_lines = original_content.splitlines()
 545 |     new_lines = new_content.splitlines()
 546 | 
 547 |     # Generate unified diff using difflib (synchronous)
 548 |     diff_lines = list(
 549 |         difflib.unified_diff(
 550 |             original_lines,
 551 |             new_lines,
 552 |             fromfile=f"{filepath} (original)",  # Label for 'from' file in diff
 553 |             tofile=f"{filepath} (modified)",  # Label for 'to' file in diff
 554 |             lineterm="",  # Keep lines without added newlines by difflib
 555 |         )
 556 |     )
 557 | 
 558 |     # Return empty string if no changes, otherwise join lines into single string.
 559 |     return "\n".join(diff_lines) if diff_lines else ""
 560 | 
 561 | 
 562 | async def read_file_content(filepath: str) -> str:
 563 |     """Read text file content using async I/O. Assumes UTF-8 encoding.
 564 | 
 565 |     Args:
 566 |         filepath: Path to the file to read (assumed validated).
 567 | 
 568 |     Returns:
 569 |         File content as string.
 570 | 
 571 |     Raises:
 572 |         ToolError: If reading fails or decoding fails. Includes specific context.
 573 |     """
 574 |     try:
 575 |         # Open asynchronously, read with strict UTF-8 decoding.
 576 |         async with aiofiles.open(filepath, mode="r", encoding="utf-8", errors="strict") as f:
 577 |             return await f.read()
 578 |     except UnicodeDecodeError as e:
 579 |         logger.warning(f"File {filepath} is not valid UTF-8: {e}", emoji_key="warning")
 580 |         # Provide context about the decoding error.
 581 |         raise ToolError(
 582 |             f"File is not valid UTF-8 encoded text: {filepath}. Cannot read as text. Details: {e}",
 583 |             context={"path": filepath, "encoding": "utf-8", "error_details": str(e)},
 584 |         ) from e
 585 |     except OSError as e:
 586 |         logger.error(f"OS error reading file {filepath}: {e}", emoji_key="error")
 587 |         raise ToolError(
 588 |             f"Error reading file: {str(e)}", context={"path": filepath, "errno": e.errno}
 589 |         ) from e
 590 |     except Exception as e:
 591 |         logger.error(
 592 |             f"Unexpected error reading file {filepath}: {e}", exc_info=True, emoji_key="error"
 593 |         )
 594 |         raise ToolError(
 595 |             f"An unexpected error occurred while reading file: {str(e)}", context={"path": filepath}
 596 |         ) from e
 597 | 
 598 | 
 599 | async def read_binary_file_content(filepath: str) -> bytes:
 600 |     """Read binary file content using async I/O.
 601 | 
 602 |     Args:
 603 |         filepath: Path to the file to read (assumed validated).
 604 | 
 605 |     Returns:
 606 |         File content as bytes.
 607 | 
 608 |     Raises:
 609 |         ToolError: If reading fails.
 610 |     """
 611 |     try:
 612 |         # Open asynchronously in binary read mode ('rb').
 613 |         async with aiofiles.open(filepath, mode="rb") as f:
 614 |             return await f.read()
 615 |     except OSError as e:
 616 |         logger.error(f"OS error reading binary file {filepath}: {e}", emoji_key="error")
 617 |         raise ToolError(
 618 |             f"Error reading binary file: {str(e)}", context={"path": filepath, "errno": e.errno}
 619 |         ) from e
 620 |     except Exception as e:
 621 |         logger.error(
 622 |             f"Unexpected error reading binary file {filepath}: {e}",
 623 |             exc_info=True,
 624 |             emoji_key="error",
 625 |         )
 626 |         raise ToolError(
 627 |             f"An unexpected error occurred while reading binary file: {str(e)}",
 628 |             context={"path": filepath},
 629 |         ) from e
 630 | 
 631 | 
 632 | async def write_file_content(filepath: str, content: Union[str, bytes]) -> None:
 633 |     """Write text or binary content to a file using async I/O. Creates parent dirs.
 634 | 
 635 |     Args:
 636 |         filepath: Path to the file to write (assumed validated, including parent writability).
 637 |         content: Content to write (string for text UTF-8, bytes for binary).
 638 | 
 639 |     Raises:
 640 |         ToolError: If writing fails.
 641 |         TypeError: If content is not str or bytes.
 642 |     """
 643 |     # Determine mode and encoding based on content type.
 644 |     if isinstance(content, str):
 645 |         mode = "w"
 646 |         encoding = "utf-8"
 647 |         data_to_write = content
 648 |     elif isinstance(content, bytes):
 649 |         mode = "wb"
 650 |         encoding = None  # No encoding for binary mode
 651 |         data_to_write = content
 652 |     else:
 653 |         raise TypeError("Content to write must be str or bytes")
 654 | 
 655 |     try:
 656 |         # Ensure parent directory exists asynchronously.
 657 |         parent_dir = os.path.dirname(filepath)
 658 |         if (
 659 |             parent_dir and parent_dir != filepath
 660 |         ):  # Check parent_dir is not empty and not the root itself
 661 |             # Use async makedirs for consistency. exist_ok=True makes it idempotent.
 662 |             await aiofiles.os.makedirs(parent_dir, exist_ok=True)
 663 | 
 664 |         # Open file asynchronously and write content.
 665 |         async with aiofiles.open(filepath, mode=mode, encoding=encoding) as f:
 666 |             await f.write(data_to_write)
 667 |             # await f.flush() # Often not necessary as context manager handles flush/close, but uncomment for critical writes if needed.
 668 | 
 669 |     except OSError as e:
 670 |         logger.error(f"OS error writing file {filepath}: {e}", emoji_key="error")
 671 |         raise ToolError(
 672 |             f"Error writing file: {str(e)}", context={"path": filepath, "errno": e.errno}
 673 |         ) from e
 674 |     except Exception as e:
 675 |         logger.error(
 676 |             f"Unexpected error writing file {filepath}: {e}", exc_info=True, emoji_key="error"
 677 |         )
 678 |         raise ToolError(
 679 |             f"An unexpected error occurred while writing file: {str(e)}", context={"path": filepath}
 680 |         ) from e
 681 | 
 682 | 
 683 | async def apply_file_edits(
 684 |     filepath: str, edits: List[Dict[str, str]], dry_run: bool = False
 685 | ) -> Tuple[str, str]:
 686 |     """Apply a series of text replacements to a file asynchronously.
 687 | 
 688 |     Reads the file (UTF-8), applies edits sequentially. If an exact match for
 689 |     'oldText' isn't found, it attempts a line-by-line match ignoring leading/trailing
 690 |     whitespace, preserving the original indentation of the matched block.
 691 |     Generates a diff and optionally writes back the changes.
 692 | 
 693 |     Args:
 694 |         filepath: Path to the file to edit (assumed validated and is a text file).
 695 |         edits: List of edit operations. Each dict must have 'oldText' and 'newText' (both strings).
 696 |         dry_run: If True, calculate changes and diff but do not write to disk.
 697 | 
 698 |     Returns:
 699 |         Tuple of (diff_string, new_content_string). The diff string is empty if no changes occurred.
 700 | 
 701 |     Raises:
 702 |         ToolError: If reading/writing fails.
 703 |         ToolInputError: If edits are malformed or text specified in 'oldText' cannot be found.
 704 |     """
 705 |     # Read original content asynchronously (raises ToolError if fails)
 706 |     content = await read_file_content(filepath)
 707 |     original_content = content
 708 |     current_content = content  # Work on a mutable copy
 709 | 
 710 |     # Apply each edit sequentially (string operations are sync)
 711 |     for i, edit in enumerate(edits):
 712 |         # Validate edit structure
 713 |         if not isinstance(edit, dict):
 714 |             raise ToolInputError(
 715 |                 f"Edit #{i + 1} is not a dictionary.",
 716 |                 param_name=f"edits[{i}]",
 717 |                 provided_value=type(edit),
 718 |             )
 719 |         old_text = edit.get("oldText")
 720 |         new_text = edit.get("newText")
 721 | 
 722 |         if not isinstance(old_text, str):
 723 |             raise ToolInputError(
 724 |                 f"Edit #{i + 1} is missing 'oldText' or it's not a string.",
 725 |                 param_name=f"edits[{i}].oldText",
 726 |                 provided_value=edit,
 727 |             )
 728 |         if not isinstance(new_text, str):
 729 |             raise ToolInputError(
 730 |                 f"Edit #{i + 1} is missing 'newText' or it's not a string.",
 731 |                 param_name=f"edits[{i}].newText",
 732 |                 provided_value=edit,
 733 |             )
 734 |         # Warn about potentially ambiguous empty oldText
 735 |         if not old_text:
 736 |             logger.warning(
 737 |                 f"Edit #{i + 1} has empty 'oldText'. Python's string.replace behavior with empty strings might be unexpected.",
 738 |                 emoji_key="warning",
 739 |             )
 740 | 
 741 |         # Try exact replacement first
 742 |         if old_text in current_content:
 743 |             # Replace *all* occurrences of old_text with new_text
 744 |             current_content = current_content.replace(old_text, new_text)
 745 |         else:
 746 |             # Fallback: Try line-by-line matching with stripped whitespace comparison.
 747 |             old_lines = old_text.splitlines()
 748 |             # Avoid fallback if old_text was only whitespace but wasn't found exactly
 749 |             if not any(line.strip() for line in old_lines) and old_text:
 750 |                 logger.warning(
 751 |                     f"Edit #{i + 1}: 'oldText' consists only of whitespace, but was not found exactly. Skipping whitespace-insensitive fallback.",
 752 |                     emoji_key="warning",
 753 |                 )
 754 |                 raise ToolInputError(
 755 |                     f"Could not find exact whitespace text to replace in edit #{i + 1}: '{old_text[:100]}{'...' if len(old_text) > 100 else ''}'",
 756 |                     param_name=f"edits[{i}].oldText",
 757 |                     provided_value=edit,
 758 |                 )
 759 | 
 760 |             if not old_lines:  # If old_text was empty string and not found, error out.
 761 |                 raise ToolInputError(
 762 |                     f"Could not find empty 'oldText' to replace in edit #{i + 1}.",
 763 |                     param_name=f"edits[{i}].oldText",
 764 |                     provided_value=edit,
 765 |                 )
 766 | 
 767 |             content_lines = current_content.splitlines()
 768 |             found_match = False
 769 |             line_idx = 0
 770 |             # Iterate through possible starting lines for the block match
 771 |             while line_idx <= len(content_lines) - len(old_lines):
 772 |                 # Check if the slice matches ignoring leading/trailing whitespace on each line
 773 |                 is_match = all(
 774 |                     old_lines[j].strip() == content_lines[line_idx + j].strip()
 775 |                     for j in range(len(old_lines))
 776 |                 )
 777 | 
 778 |                 if is_match:
 779 |                     # Found a match based on content, now replace respecting original indentation.
 780 |                     new_lines = new_text.splitlines()
 781 | 
 782 |                     # Determine indentation from the *first original line* being replaced.
 783 |                     first_original_line = content_lines[line_idx]
 784 |                     leading_whitespace = first_original_line[
 785 |                         : len(first_original_line) - len(first_original_line.lstrip())
 786 |                     ]
 787 | 
 788 |                     # Apply this leading whitespace to all *new* lines.
 789 |                     indented_new_lines = (
 790 |                         [leading_whitespace + line for line in new_lines] if new_lines else []
 791 |                     )
 792 | 
 793 |                     # Replace the slice in the original lines list
 794 |                     content_lines[line_idx : line_idx + len(old_lines)] = indented_new_lines
 795 |                     # Reconstruct content string immediately after modification
 796 |                     current_content = "\n".join(content_lines)
 797 |                     found_match = True
 798 |                     # Stop searching for matches for *this specific edit dict* once one is found and replaced.
 799 |                     # To replace all occurrences, logic would need modification (e.g., restart search or adjust indices).
 800 |                     break
 801 | 
 802 |                 else:
 803 |                     line_idx += 1  # Move to the next line to check
 804 | 
 805 |             if not found_match:
 806 |                 # No match found even with whitespace trimming fallback
 807 |                 raise ToolInputError(
 808 |                     f"Could not find text to replace in edit #{i + 1}. Text searched (approx first 100 chars): '{old_text[:100]}{'...' if len(old_text) > 100 else ''}'. "
 809 |                     f"Exact match failed, and whitespace-insensitive line match also failed.",
 810 |                     param_name=f"edits[{i}].oldText",
 811 |                     provided_value=edit,
 812 |                 )
 813 | 
 814 |     # Create diff (sync function call) using original vs final content
 815 |     diff = create_unified_diff(original_content, current_content, filepath)
 816 | 
 817 |     # Write the changes if not a dry run asynchronously
 818 |     if not dry_run:
 819 |         # Ensure content is string before writing
 820 |         await write_file_content(filepath, str(current_content))  # Handles errors internally
 821 | 
 822 |     return diff, str(current_content)
 823 | 
 824 | 
 825 | # --- MCP Formatting Helpers ---
 826 | 
 827 | 
 828 | def format_mcp_content(text_content: str) -> List[Dict[str, Any]]:
 829 |     """Format text content into a simple MCP text block.
 830 | 
 831 |     Applies length truncation to avoid oversized responses.
 832 | 
 833 |     Args:
 834 |         text_content: Text to format.
 835 | 
 836 |     Returns:
 837 |         List containing a single MCP text block dictionary.
 838 |     """
 839 |     # Max length for content block to avoid overwhelming downstream systems.
 840 |     MAX_LEN = 100000  # Adjust based on LLM/platform constraints.
 841 |     if len(text_content) > MAX_LEN:
 842 |         # Note: Truncation happens here, not a placeholder.
 843 |         trunc_msg = f"\n... [Content truncated - {len(text_content)} bytes total]"
 844 |         text_content = text_content[: MAX_LEN - len(trunc_msg)] + trunc_msg
 845 |         logger.warning(f"Content length > {MAX_LEN}, truncated.", emoji_key="warning")
 846 | 
 847 |     # Standard MCP text block structure.
 848 |     return [{"type": "text", "text": text_content}]
 849 | 
 850 | 
 851 | def create_tool_response(content: Any, is_error: bool = False) -> Dict[str, Any]:
 852 |     """Create a standard tool response dictionary for the Ultimate MCP Server.
 853 | 
 854 |     Formats various content types (str, dict, list) into MCP 'content' blocks,
 855 |     attempting to provide useful representations for common tool outputs.
 856 | 
 857 |     Args:
 858 |         content: The primary result or message from the tool. Can be str, dict,
 859 |                  list (assumed pre-formatted MCP), or other types (converted to str).
 860 |         is_error: If True, marks the response as an error response using "isError".
 861 | 
 862 |     Returns:
 863 |         A dictionary structured for the Ultimate MCP Server tool response schema.
 864 |     """
 865 |     formatted_content: List[Dict[str, Any]]
 866 |     response: Dict[str, Any] = {}  # Initialize response dict
 867 | 
 868 |     if is_error:
 869 |         # --- Error Handling Logic ---
 870 |         response["success"] = False  # Explicitly set success to False for errors
 871 |         response["isError"] = True  # Mark as an error response
 872 | 
 873 |         if isinstance(content, dict) and "message" in content:
 874 |             error_message = content.get("message", "Unknown error")
 875 |             error_code = content.get("error_code", "TOOL_ERROR")
 876 |             error_type = content.get("error_type", "ToolError")
 877 | 
 878 |             response["error"] = error_message
 879 |             response["error_code"] = error_code
 880 |             response["error_type"] = error_type
 881 | 
 882 |             # Add details if available
 883 |             if "details" in content:
 884 |                 response["details"] = content["details"]
 885 | 
 886 |             # Format the content text for error display, including context if available
 887 |             context = content.get("context")  # Context might be added by the calling function
 888 |             if context:
 889 |                 try:
 890 |                     # Try to pretty-print context if it's dict/list
 891 |                     if isinstance(context, (dict, list)):
 892 |                         context_str = json.dumps(context, indent=2, default=str)
 893 |                     else:
 894 |                         context_str = str(context)
 895 |                     error_display_text = f"Error: {error_message}\nContext: {context_str}"
 896 |                 except Exception:  # Fallback if context serialization fails
 897 |                     error_display_text = f"Error: {error_message}\nContext: (Could not serialize context: {type(context).__name__})"
 898 |             else:
 899 |                 error_display_text = f"Error: {error_message}"
 900 | 
 901 |             formatted_content = format_mcp_content(error_display_text)
 902 | 
 903 |         else:  # Handle cases where error content is not a detailed dict
 904 |             error_message = str(content)  # Use string representation of the error content
 905 |             formatted_content = format_mcp_content(f"Error: {error_message}")
 906 |             response["error"] = error_message
 907 |             # Provide default error codes/types if not available
 908 |             response["error_code"] = "UNKNOWN_ERROR"
 909 |             response["error_type"] = "UnknownError"
 910 | 
 911 |         # Add protectionTriggered flag if applicable (check original error dict if passed)
 912 |         if isinstance(content, dict) and content.get("protection_triggered"):
 913 |             response["protectionTriggered"] = True
 914 | 
 915 |         response["content"] = formatted_content
 916 | 
 917 |     else:
 918 |         # --- Success Case ---
 919 |         response["success"] = True  # <<< THIS WAS THE MISSING LINE
 920 | 
 921 |         # --- Existing success formatting logic ---
 922 |         try:
 923 |             if isinstance(content, dict):
 924 |                 # Handle specific known dictionary structures first
 925 |                 if (
 926 |                     "files" in content
 927 |                     and isinstance(content.get("files"), list)
 928 |                     and "succeeded" in content
 929 |                 ):
 930 |                     # Format output from read_multiple_files
 931 |                     blocks = []
 932 |                     summary = f"Read {content.get('succeeded', 0)} files successfully, {content.get('failed', 0)} failed."
 933 |                     blocks.append({"type": "text", "text": summary})
 934 |                     for file_result in content.get("files", []):
 935 |                         if isinstance(file_result, dict):
 936 |                             path = file_result.get("path", "Unknown path")
 937 |                             if file_result.get("success") and "content" in file_result:
 938 |                                 size_info = (
 939 |                                     f" ({file_result.get('size', 'N/A')} bytes)"
 940 |                                     if "size" in file_result
 941 |                                     else ""
 942 |                                 )
 943 |                                 binary_info = (
 944 |                                     " (Binary Preview)" if file_result.get("is_binary") else ""
 945 |                                 )
 946 |                                 header = f"--- File: {path}{size_info}{binary_info} ---"
 947 |                                 file_content_str = str(file_result["content"])
 948 |                                 blocks.extend(format_mcp_content(f"{header}\n{file_content_str}"))
 949 |                             elif "error" in file_result:
 950 |                                 header = f"--- File: {path} (Error) ---"
 951 |                                 error_msg = str(file_result["error"])
 952 |                                 blocks.extend(format_mcp_content(f"{header}\n{error_msg}"))
 953 |                             else:
 954 |                                 blocks.extend(
 955 |                                     format_mcp_content(
 956 |                                         f"--- File: {path} (Unknown status) ---\n{str(file_result)}"
 957 |                                     )
 958 |                                 )
 959 |                         else:
 960 |                             blocks.extend(
 961 |                                 format_mcp_content(
 962 |                                     f"--- Invalid entry in results ---\n{str(file_result)}"
 963 |                                 )
 964 |                             )
 965 |                     formatted_content = blocks
 966 |                 elif "tree" in content and "path" in content:
 967 |                     # Format output from directory_tree as JSON block
 968 |                     tree_json = json.dumps(
 969 |                         content["tree"], indent=2, ensure_ascii=False, default=str
 970 |                     )
 971 |                     MAX_JSON_LEN = 50000
 972 |                     if len(tree_json) > MAX_JSON_LEN:
 973 |                         trunc_msg = "\n... [Tree JSON truncated]"
 974 |                         tree_json = tree_json[: MAX_JSON_LEN - len(trunc_msg)] + trunc_msg
 975 |                     formatted_content = format_mcp_content(
 976 |                         f"Directory Tree for: {content['path']}\n```json\n{tree_json}\n```"
 977 |                     )
 978 |                 elif "entries" in content and "path" in content:
 979 |                     # Format output from list_directory
 980 |                     list_strs = [f"Directory Listing for: {content['path']}"]
 981 |                     for entry in content.get("entries", []):
 982 |                         if isinstance(entry, dict):
 983 |                             name = entry.get("name", "?")
 984 |                             etype = entry.get("type", "unknown")
 985 |                             size_str = (
 986 |                                 f" ({entry.get('size')} bytes)"
 987 |                                 if etype == "file" and "size" in entry
 988 |                                 else ""
 989 |                             )
 990 |                             error_str = (
 991 |                                 f" (Error: {entry.get('error')})" if "error" in entry else ""
 992 |                             )
 993 |                             link_str = (
 994 |                                 f" -> {entry.get('symlink_target')}"
 995 |                                 if etype == "symlink" and "symlink_target" in entry
 996 |                                 else ""
 997 |                             )
 998 |                             list_strs.append(f"- {name} [{etype}]{size_str}{error_str}{link_str}")
 999 |                         else:
1000 |                             list_strs.append(f"- Invalid entry: {str(entry)}")
1001 |                     formatted_content = format_mcp_content("\n".join(list_strs))
1002 |                 elif "matches" in content and "path" in content and "pattern" in content:
1003 |                     # Format output from search_files
1004 |                     search_strs = [
1005 |                         f"Search Results for '{content['pattern']}' in '{content['path']}':"
1006 |                     ]
1007 |                     matches = content.get("matches", [])
1008 |                     if matches:
1009 |                         for match in matches:
1010 |                             search_strs.append(f"- {match}")
1011 |                     else:
1012 |                         search_strs.append("(No matches found)")
1013 |                     if "warnings" in content:
1014 |                         search_strs.append("\nWarnings:")
1015 |                         search_strs.extend(content["warnings"])
1016 |                     formatted_content = format_mcp_content("\n".join(search_strs))
1017 |                 else:
1018 |                     # Attempt to pretty-print other dictionaries as JSON
1019 |                     json_content = json.dumps(content, indent=2, ensure_ascii=False, default=str)
1020 |                     formatted_content = format_mcp_content(f"```json\n{json_content}\n```")
1021 | 
1022 |             elif isinstance(content, str):
1023 |                 # Simple string content
1024 |                 formatted_content = format_mcp_content(content)
1025 |             elif isinstance(content, list) and all(
1026 |                 isinstance(item, dict) and "type" in item for item in content
1027 |             ):
1028 |                 # Assume it's already MCP formatted - pass through directly
1029 |                 formatted_content = content
1030 |             else:
1031 |                 # Convert anything else to string and format
1032 |                 formatted_content = format_mcp_content(str(content))
1033 | 
1034 |             # Ensure formatted_content is valid before adding to response
1035 |             if isinstance(formatted_content, list):
1036 |                 response["content"] = formatted_content
1037 |             else:
1038 |                 # Fallback if formatting somehow failed during success path
1039 |                 fallback_text = f"Successfully processed, but failed to format response content: {str(formatted_content)}"
1040 |                 logger.error(fallback_text)  # Log this internal error
1041 |                 response["content"] = format_mcp_content(fallback_text)
1042 |                 response["warning"] = "Response content formatting failed."  # Add a warning field
1043 | 
1044 |         except (TypeError, ValueError) as json_err:  # Catch JSON serialization errors specifically
1045 |             logger.warning(
1046 |                 f"Could not serialize successful dictionary response to JSON: {json_err}",
1047 |                 exc_info=True,
1048 |                 emoji_key="warning",
1049 |             )
1050 |             formatted_content = format_mcp_content(
1051 |                 f"(Response dictionary could not be formatted as JSON)\n{str(content)}"
1052 |             )
1053 |             response["content"] = formatted_content
1054 |             response["warning"] = "Could not format successful response as JSON."
1055 |         except Exception as format_err:  # Catch any other formatting errors
1056 |             logger.error(
1057 |                 f"Unexpected error formatting successful response content: {format_err}",
1058 |                 exc_info=True,
1059 |             )
1060 |             response["content"] = format_mcp_content(f"Error formatting response: {format_err}")
1061 |             response["warning"] = "Unexpected error formatting response content."
1062 | 
1063 |     return response
1064 | 
1065 | 
1066 | # --- Protection Heuristics Implementation ---
1067 | 
1068 | 
1069 | async def _get_minimal_stat(path: str) -> Optional[Tuple[Tuple[float, float], str]]:
1070 |     """Helper to get minimal stat info (ctime, mtime, extension) for protection checks."""
1071 |     try:
1072 |         # Use stat with follow_symlinks=False to get info without following links, as we care about the items being listed
1073 |         stat_info = await aiofiles.os.stat(path, follow_symlinks=False)
1074 |         # Use mtime and ctime (platform-dependent creation/metadata change time)
1075 |         mtime = stat_info.st_mtime
1076 |         try:
1077 |             ctime = stat_info.st_ctime
1078 |         except AttributeError:
1079 |             ctime = mtime  # Fallback if ctime not available
1080 | 
1081 |         extension = (
1082 |             os.path.splitext(path)[1].lower()
1083 |             if not os.path.stat.S_ISDIR(stat_info.st_mode)
1084 |             else ".<dir>"
1085 |         )
1086 |         return ((ctime, mtime), extension)
1087 |     except OSError as e:
1088 |         logger.debug(f"Stat failed for protection check on {path}: {e}", emoji_key="warning")
1089 |         return None  # Return None on OS error
1090 | 
1091 | 
1092 | async def _check_protection_heuristics(
1093 |     paths: List[str], operation_type: Literal["deletion", "modification"]
1094 | ) -> None:
1095 |     """
1096 |     Check if a bulk operation on multiple files triggers safety protection heuristics.
1097 |     (Modified to use get_protection_config which reads from validated config)
1098 |     """
1099 |     protection_config = get_protection_config()
1100 | 
1101 |     if not protection_config.get("enabled", False):
1102 |         return  # Protection disabled for this operation type
1103 | 
1104 |     num_paths = len(paths)
1105 |     # --- Read thresholds from the loaded config dictionary ---
1106 |     max_files_threshold = protection_config.get("max_files_threshold", 100)
1107 | 
1108 |     # Only run detailed checks if the number of files exceeds the threshold
1109 |     if num_paths <= max_files_threshold:
1110 |         return
1111 | 
1112 |     logger.info(
1113 |         f"Performing detailed safety check for {operation_type} of {num_paths} paths (threshold: {max_files_threshold})...",
1114 |         emoji_key="security",
1115 |     )
1116 | 
1117 |     # --- Gather Metadata Asynchronously ---
1118 |     stat_tasks = [_get_minimal_stat(p) for p in paths]
1119 |     stat_results = await asyncio.gather(*stat_tasks)
1120 | 
1121 |     successful_stats: List[Tuple[Tuple[float, float], str]] = []
1122 |     failed_stat_count = 0
1123 |     for result in stat_results:
1124 |         if result is not None:
1125 |             successful_stats.append(result)
1126 |         else:
1127 |             failed_stat_count += 1
1128 | 
1129 |     total_attempted = len(paths)
1130 |     # --- Read threshold from config dict ---
1131 |     max_errors_pct = protection_config.get("max_stat_errors_pct", 10.0)
1132 |     if total_attempted > 0 and (failed_stat_count / total_attempted * 100) > max_errors_pct:
1133 |         raise ProtectionTriggeredError(
1134 |             f"Operation blocked because safety check could not reliably gather file metadata ({failed_stat_count}/{total_attempted} failures).",
1135 |             context={"failed_stats": failed_stat_count, "total_paths": total_attempted},
1136 |         )
1137 | 
1138 |     num_valid_stats = len(successful_stats)
1139 |     if num_valid_stats < 2:
1140 |         logger.info(
1141 |             "Protection check skipped: Not enough valid file metadata points obtained.",
1142 |             emoji_key="info",
1143 |         )
1144 |         return
1145 | 
1146 |     # --- Calculate Heuristics ---
1147 |     creation_times = [ts[0] for ts, ext in successful_stats]
1148 |     modification_times = [ts[1] for ts, ext in successful_stats]
1149 |     extensions = {ext for ts, ext in successful_stats if ext and ext != ".<dir>"}
1150 | 
1151 |     try:
1152 |         ctime_stddev = statistics.pstdev(creation_times) if num_valid_stats > 1 else 0.0
1153 |         mtime_stddev = statistics.pstdev(modification_times) if num_valid_stats > 1 else 0.0
1154 |         if math.isnan(ctime_stddev):
1155 |             ctime_stddev = 0.0
1156 |         if math.isnan(mtime_stddev):
1157 |             mtime_stddev = 0.0
1158 |     except statistics.StatisticsError as e:
1159 |         logger.warning(
1160 |             f"Could not calculate timestamp standard deviation: {e}", emoji_key="warning"
1161 |         )
1162 |         ctime_stddev = 0.0
1163 |         mtime_stddev = 0.0
1164 | 
1165 |     num_unique_extensions = len(extensions)
1166 | 
1167 |     # --- Read thresholds from config dict ---
1168 |     dt_threshold = protection_config.get("datetime_stddev_threshold_sec", 60 * 60 * 24 * 30)
1169 |     type_threshold = protection_config.get("file_type_variance_threshold", 5)
1170 | 
1171 |     triggered = False
1172 |     reasons = []
1173 | 
1174 |     if ctime_stddev > dt_threshold:
1175 |         triggered = True
1176 |         reasons.append(
1177 |             f"High variance in creation times (std dev: {ctime_stddev:.2f}s > threshold: {dt_threshold}s)"
1178 |         )
1179 |     if mtime_stddev > dt_threshold:
1180 |         triggered = True
1181 |         reasons.append(
1182 |             f"High variance in modification times (std dev: {mtime_stddev:.2f}s > threshold: {dt_threshold}s)"
1183 |         )
1184 |     if num_unique_extensions > type_threshold:
1185 |         triggered = True
1186 |         reasons.append(
1187 |             f"High variance in file types ({num_unique_extensions} unique types > threshold: {type_threshold})"
1188 |         )
1189 | 
1190 |     if triggered:
1191 |         reason_str = (
1192 |             f"Operation involves a large number of files ({num_paths}) with suspicious characteristics: "
1193 |             + "; ".join(reasons)
1194 |         )
1195 |         logger.warning(f"Protection Triggered! Reason: {reason_str}", emoji_key="security")
1196 |         raise ProtectionTriggeredError(
1197 |             reason_str,
1198 |             protection_type=f"{operation_type}_protection",  # Add protection type
1199 |             context={  # Use context for structured data
1200 |                 "num_paths": num_paths,
1201 |                 "num_valid_stats": num_valid_stats,
1202 |                 "ctime_stddev_sec": round(ctime_stddev, 2),
1203 |                 "mtime_stddev_sec": round(mtime_stddev, 2),
1204 |                 "unique_file_types": num_unique_extensions,
1205 |                 "threshold_max_files": max_files_threshold,
1206 |                 "threshold_datetime_stddev_sec": dt_threshold,
1207 |                 "threshold_file_type_variance": type_threshold,
1208 |                 "failed_stat_count": failed_stat_count,
1209 |             },
1210 |         )
1211 |     else:
1212 |         logger.info(
1213 |             f"Safety check passed for {operation_type} of {num_paths} paths.",
1214 |             emoji_key="security",
1215 |             ctime_stddev=round(ctime_stddev, 2),
1216 |             mtime_stddev=round(mtime_stddev, 2),
1217 |             unique_types=num_unique_extensions,
1218 |         )
1219 | 
1220 | 
1221 | # --- Async Walk and Delete Helpers ---
1222 | 
1223 | 
1224 | async def async_walk(
1225 |     top: str,
1226 |     topdown: bool = True,
1227 |     onerror: Optional[callable] = None,
1228 |     followlinks: bool = False,
1229 |     exclude_patterns: Optional[List[str]] = None,
1230 |     base_path: Optional[str] = None,
1231 | ) -> AsyncGenerator[Tuple[str, List[str], List[str]], None]:
1232 |     """Async version of os.walk using aiofiles.os.scandir. Handles excludes.
1233 | 
1234 |     Yields (current_dir_path, dir_names, file_names) tuples asynchronously.
1235 |     Filters entries based on exclude_patterns matched against relative path from base_path.
1236 |     Handles errors during scanning via the onerror callback.
1237 | 
1238 |     Args:
1239 |         top: Root directory path to start walking.
1240 |         topdown: Whether to yield the current directory before (True) or after (False) its subdirectories.
1241 |         onerror: Optional callable that takes an OSError instance when errors occur during scandir or stat.
1242 |         followlinks: If True, recurse into directories pointed to by symlinks.
1243 |         exclude_patterns: List of glob-style patterns to exclude files/directories.
1244 |         base_path: The original starting path of the walk (used for relative path calculation).
1245 | 
1246 |     Yields:
1247 |         Tuples of (directory_path, list_of_subdir_names, list_of_file_names)
1248 |     """
1249 |     if base_path is None:
1250 |         base_path = top  # Keep track of the original root for exclusion matching
1251 | 
1252 |     dirs: List[str] = []
1253 |     nondirs: List[str] = []
1254 |     walk_error: Optional[OSError] = None
1255 | 
1256 |     try:
1257 |         # Use async scandir for efficient directory iteration
1258 |         # scandir returns a coroutine that must be awaited to get the entries
1259 |         scandir_it = await aiofiles.os.scandir(top)
1260 |         # Now iterate through the scandir entries (which should be a list or iterable)
1261 |         for entry in scandir_it:
1262 |             try:
1263 |                 # Calculate relative path for exclusion check
1264 |                 try:
1265 |                     # entry.path should be absolute if 'top' is absolute
1266 |                     rel_path = os.path.relpath(entry.path, base_path)
1267 |                 except ValueError:
1268 |                     # Fallback if paths are on different drives (Windows) or other issues
1269 |                     rel_path = entry.name  # Use name as fallback for matching
1270 |                     logger.debug(
1271 |                         f"Could not get relative path for {entry.path} from {base_path}, using name '{entry.name}' for exclusion check."
1272 |                     )
1273 | 
1274 |                 # Check exclusion patterns (case-insensitive matching where appropriate)
1275 |                 is_excluded = False
1276 |                 if exclude_patterns:
1277 |                     norm_rel_path = os.path.normcase(rel_path)
1278 |                     norm_entry_name = os.path.normcase(entry.name)
1279 |                     for pattern in exclude_patterns:
1280 |                         norm_pattern = os.path.normcase(pattern)
1281 |                         # Match pattern against full relative path OR just the name
1282 |                         if fnmatch(norm_rel_path, norm_pattern) or fnmatch(
1283 |                             norm_entry_name, norm_pattern
1284 |                         ):
1285 |                             is_excluded = True
1286 |                             logger.debug(f"Excluding '{entry.path}' due to pattern '{pattern}'")
1287 |                             break  # Stop checking patterns for this entry
1288 |                 if is_excluded:
1289 |                     continue  # Skip this excluded entry
1290 | 
1291 |                 # Check entry type, handling potential errors using lstat to check link itself
1292 |                 try:
1293 |                     # Determine if it's a directory (respecting followlinks for recursion decision later)
1294 |                     is_entry_dir = entry.is_dir(follow_symlinks=followlinks)
1295 |                     is_entry_link = entry.is_symlink()  # Check if entry *itself* is a link
1296 | 
1297 |                     if is_entry_dir and (not is_entry_link or followlinks):
1298 |                         # It's a directory, or it's a link to a directory and we follow links
1299 |                         dirs.append(entry.name)
1300 |                     else:
1301 |                         # It's a file, a link we don't follow, or something else
1302 |                         nondirs.append(entry.name)
1303 | 
1304 |                 except OSError as stat_err:
1305 |                     # Error determining type (e.g., permissions)
1306 |                     if onerror is not None:
1307 |                         onerror(stat_err)
1308 |                     logger.warning(
1309 |                         f"Skipping entry '{entry.name}' in {top} due to stat error: {stat_err}",
1310 |                         emoji_key="warning",
1311 |                     )
1312 |                     continue  # Skip entry if type cannot be determined
1313 | 
1314 |             except OSError as entry_proc_err:
1315 |                 # Error during exclusion check or other processing of the entry itself
1316 |                 if onerror is not None:
1317 |                     onerror(entry_proc_err)
1318 |                 logger.warning(
1319 |                     f"Error processing entry '{entry.name}' in {top}: {entry_proc_err}",
1320 |                     emoji_key="warning",
1321 |                 )
1322 |                 # Continue processing other entries in the directory
1323 | 
1324 |     except OSError as err:
1325 |         # Error during the initial scandir call itself (e.g., permissions on 'top')
1326 |         walk_error = err
1327 |         if onerror is not None:
1328 |             onerror(err)
1329 |         # Stop iteration for this path if scandir failed
1330 | 
1331 |     # --- Yield results and recurse ---
1332 |     if walk_error is None:
1333 |         if topdown:
1334 |             yield top, dirs, nondirs
1335 | 
1336 |         # Recurse into subdirectories discovered
1337 |         for name in dirs:
1338 |             new_path = os.path.join(top, name)
1339 |             # Recurse using 'async for' to delegate iteration properly down the recursion
1340 |             async for x in async_walk(
1341 |                 new_path, topdown, onerror, followlinks, exclude_patterns, base_path
1342 |             ):
1343 |                 yield x
1344 | 
1345 |         if not topdown:
1346 |             yield top, dirs, nondirs
1347 | 
1348 | 
1349 | async def _list_paths_recursive(root_path: str) -> List[str]:
1350 |     """Recursively list all file paths within a directory using async_walk."""
1351 |     paths = []
1352 |     try:
1353 |         async for dirpath, _dirnames, filenames in async_walk(
1354 |             root_path, topdown=True, followlinks=False
1355 |         ):
1356 |             for filename in filenames:
1357 |                 paths.append(os.path.join(dirpath, filename))
1358 |             # Also include directory paths themselves if needed for certain checks?
1359 |             # For deletion check, primarily care about files within.
1360 |             # Let's stick to files for heuristic calculation simplicity.
1361 |             # for dirname in dirnames:
1362 |             #     paths.append(os.path.join(dirpath, dirname))
1363 |     except Exception as e:
1364 |         logger.error(
1365 |             f"Error listing paths recursively under {root_path}: {e}",
1366 |             exc_info=True,
1367 |             emoji_key="error",
1368 |         )
1369 |         # Re-raise as ToolError so the caller knows listing failed
1370 |         raise ToolError(
1371 |             f"Failed to list contents of directory '{root_path}' for safety check: {e}"
1372 |         ) from e
1373 |     return paths
1374 | 
1375 | 
1376 | async def _async_rmtree(path: str):
1377 |     """Asynchronously remove a directory and its contents, similar to shutil.rmtree."""
1378 |     logger.debug(f"Initiating async rmtree for: {path}", emoji_key="action")
1379 |     errors: List[Tuple[str, OSError]] = []
1380 | 
1381 |     def onerror(os_error: OSError):
1382 |         logger.warning(
1383 |             f"Error during rmtree operation on {getattr(os_error, 'filename', 'N/A')}: {os_error}",
1384 |             emoji_key="warning",
1385 |         )
1386 |         errors.append((getattr(os_error, "filename", "N/A"), os_error))
1387 | 
1388 |     try:
1389 |         # Walk bottom-up to remove files first, then directories
1390 |         async for root, dirs, files in async_walk(
1391 |             path, topdown=False, onerror=onerror, followlinks=False
1392 |         ):
1393 |             # Remove files in the current directory
1394 |             for name in files:
1395 |                 filepath = os.path.join(root, name)
1396 |                 try:
1397 |                     logger.debug(f"Removing file: {filepath}", emoji_key="delete")
1398 |                     await aiofiles.os.remove(filepath)
1399 |                 except OSError as e:
1400 |                     onerror(e)  # Log error and collect it
1401 | 
1402 |             # Remove empty subdirectories (should be empty now if walk is bottom-up)
1403 |             for name in dirs:
1404 |                 dirpath = os.path.join(root, name)
1405 |                 # We only remove dirs listed in the walk *if* they still exist
1406 |                 # (e.g., link handling might affect this). Re-check existence and type.
1407 |                 try:
1408 |                     if await aiofiles.os.path.islink(dirpath):
1409 |                         # Remove symlink itself if not following links
1410 |                         logger.debug(
1411 |                             f"Removing symlink (treated as file): {dirpath}", emoji_key="delete"
1412 |                         )
1413 |                         await aiofiles.os.remove(dirpath)
1414 |                     elif await aiofiles.os.path.isdir(dirpath):
1415 |                         logger.debug(f"Removing directory: {dirpath}", emoji_key="delete")
1416 |                         await aiofiles.os.rmdir(dirpath)
1417 |                 except OSError as e:
1418 |                     onerror(e)  # Log error and collect it
1419 | 
1420 |         # Finally, remove the top-level directory itself
1421 |         try:
1422 |             logger.debug(f"Removing root directory: {path}", emoji_key="delete")
1423 |             await aiofiles.os.rmdir(path)
1424 |         except OSError as e:
1425 |             onerror(e)
1426 | 
1427 |         if errors:
1428 |             # Raise a consolidated error if any deletions failed
1429 |             error_summary = "; ".join([f"{p}: {e.strerror}" for p, e in errors[:5]])
1430 |             if len(errors) > 5:
1431 |                 error_summary += " ... (more errors)"
1432 |             raise ToolError(
1433 |                 f"Errors occurred during recursive deletion of '{path}': {error_summary}",
1434 |                 context={"path": path, "num_errors": len(errors)},
1435 |             )
1436 | 
1437 |     except Exception as e:
1438 |         logger.error(
1439 |             f"Unexpected error during async rmtree of {path}: {e}", exc_info=True, emoji_key="error"
1440 |         )
1441 |         raise ToolError(
1442 |             f"An unexpected error occurred during recursive deletion: {str(e)}",
1443 |             context={"path": path},
1444 |         ) from e
1445 | 
1446 | 
1447 | # --- Tool Functions ---
1448 | 
1449 | 
1450 | @with_tool_metrics
1451 | @with_error_handling
1452 | async def read_file(path: str) -> Dict[str, Any]:
1453 |     """Read a file's content asynchronously, handling text/binary detection.
1454 | 
1455 |     Validates the path, checks it's a file, attempts to read as UTF-8 text.
1456 |     If UTF-8 decoding fails, it reads as binary and provides a hex preview.
1457 | 
1458 |     Args:
1459 |         path: Path to the file to read.
1460 | 
1461 |     Returns:
1462 |         A dictionary formatted as an MCP tool response containing file content
1463 |         or an error message.
1464 |     """
1465 |     start_time = time.monotonic()
1466 |     response_content: Any
1467 |     is_response_error = False
1468 | 
1469 |     try:
1470 |         # Validate path, ensuring it exists and is accessible. check_exists=True
1471 |         validated_path = await validate_path(path, check_exists=True, check_parent_writable=False)
1472 | 
1473 |         # Ensure it's a file, not a directory or other type.
1474 |         if not await aiofiles.os.path.isfile(validated_path):
1475 |             # Check if it's a link first before declaring it's not a regular file
1476 |             if await aiofiles.os.path.islink(validated_path):
1477 |                 # If it's a link, let read proceed, it might link to a file.
1478 |                 # The isfile check follows links, so if isfile failed, the target isn't a file.
1479 |                 raise ToolInputError(
1480 |                     f"Path '{path}' is a symbolic link that points to something that is not a regular file.",
1481 |                     param_name="path",
1482 |                     provided_value=path,
1483 |                     details={
1484 |                         "path": path,
1485 |                         "resolved_path": validated_path,
1486 |                         "error_type": "INVALID_SYMLINK_TARGET",
1487 |                     },
1488 |                 )
1489 |             elif await aiofiles.os.path.isdir(validated_path):
1490 |                 raise ToolInputError(
1491 |                     f"Path '{path}' is a directory, not a file. Use list_directory or directory_tree to view its contents.",
1492 |                     param_name="path",
1493 |                     provided_value=path,
1494 |                     details={
1495 |                         "path": path,
1496 |                         "resolved_path": validated_path,
1497 |                         "error_type": "PATH_IS_DIRECTORY",
1498 |                     },
1499 |                 )
1500 |             else:
1501 |                 raise ToolInputError(
1502 |                     f"Path '{path}' exists but is not a regular file. It may be a special file type (socket, device, etc.).",
1503 |                     param_name="path",
1504 |                     provided_value=path,
1505 |                     details={
1506 |                         "path": path,
1507 |                         "resolved_path": validated_path,
1508 |                         "error_type": "PATH_NOT_REGULAR_FILE",
1509 |                     },
1510 |                 )
1511 | 
1512 |         content: Union[str, bytes]
1513 |         is_binary = False
1514 |         read_error = None
1515 |         file_size = -1  # Default size if stat fails later
1516 | 
1517 |         # Attempt to read as text first
1518 |         try:
1519 |             content = await read_file_content(validated_path)  # Handles UTF-8 check internally
1520 |         except ToolError as text_err:
1521 |             # Check if the error was specifically UnicodeDecodeError
1522 |             if "not valid UTF-8" in str(text_err):
1523 |                 logger.warning(
1524 |                     f"File {path} is not UTF-8 encoded, reading as binary.", emoji_key="warning"
1525 |                 )
1526 |                 is_binary = True
1527 |                 try:
1528 |                     # Fallback to binary read
1529 |                     content = await read_binary_file_content(
1530 |                         validated_path
1531 |                     )  # Keep raw bytes for now
1532 |                 except ToolError as bin_err:
1533 |                     read_error = (
1534 |                         f"Error reading file as binary after text decode failed: {str(bin_err)}"
1535 |                     )
1536 |                 except Exception as bin_e:
1537 |                     read_error = f"Unexpected error reading file as binary: {str(bin_e)}"
1538 |             else:
1539 |                 # Other OS read error during text read attempt
1540 |                 read_error = f"Error reading file: {str(text_err)}"
1541 |         except Exception as text_e:
1542 |             read_error = f"Unexpected error reading file as text: {str(text_e)}"
1543 | 
1544 |         if read_error:
1545 |             # If reading failed entirely (text and binary fallback)
1546 |             raise ToolError(read_error, context={"path": validated_path})
1547 | 
1548 |         # Successfully read content (either text or binary bytes)
1549 |         try:
1550 |             # Use stat with follow_symlinks=False for consistency
1551 |             file_size = (await aiofiles.os.stat(validated_path, follow_symlinks=False)).st_size
1552 |         except OSError as stat_err:
1553 |             logger.warning(
1554 |                 f"Could not get size for file {validated_path} after reading: {stat_err}",
1555 |                 emoji_key="warning",
1556 |             )
1557 |             # Continue without size info
1558 | 
1559 |         # Prepare response content string
1560 |         basename = os.path.basename(validated_path)
1561 |         size_str = f"{file_size} bytes" if file_size >= 0 else "Size unavailable"
1562 | 
1563 |         if is_binary:
1564 |             # Provide a safe representation for binary data
1565 |             binary_data = cast(bytes, content)
1566 |             hex_preview_len = 200  # Bytes to show as hex
1567 |             hex_preview = binary_data[:hex_preview_len].hex(" ")  # Use spaces for readability
1568 |             preview_msg = f"(showing first {min(hex_preview_len, len(binary_data))} bytes as hex)"
1569 |             ellipsis = "..." if len(binary_data) > hex_preview_len else ""
1570 |             response_text = (
1571 |                 f"File: {basename}\n"
1572 |                 f"Path: {validated_path}\n"
1573 |                 f"Size: {size_str}\n"
1574 |                 f"Content: <Binary file detected> {preview_msg}\n"
1575 |                 f"{hex_preview}{ellipsis}"
1576 |             )
1577 |         else:
1578 |             # Content is already string
1579 |             response_text = (
1580 |                 f"File: {basename}\n"
1581 |                 f"Path: {validated_path}\n"
1582 |                 f"Size: {size_str}\n"
1583 |                 f"Content:\n"  # Add newline for separation
1584 |                 f"{content}"
1585 |             )
1586 | 
1587 |         response_content = response_text
1588 |         processing_time = time.monotonic() - start_time
1589 |         logger.success(
1590 |             f"Successfully read file: {path}",
1591 |             emoji_key="file",
1592 |             size=file_size,
1593 |             time=processing_time,
1594 |             is_binary=is_binary,
1595 |         )
1596 | 
1597 |     except (ToolInputError, ToolError) as e:
1598 |         logger.error(
1599 |             f"Error in read_file for '{path}': {e}",
1600 |             emoji_key="error",
1601 |             details=getattr(e, "context", None),
1602 |         )
1603 |         # Return a formatted error response with detailed info
1604 |         error_type = e.__class__.__name__
1605 |         error_details = getattr(e, "details", {}) or {}
1606 |         # Get the context from the error if available (used in base ToolError)
1607 |         context = getattr(e, "context", None)
1608 |         if context and isinstance(context, dict):
1609 |             error_details.update(context)
1610 |         # Include error type and code for better error display
1611 |         response_content = {
1612 |             "message": str(e),
1613 |             "error_code": getattr(e, "error_code", "TOOL_ERROR"),
1614 |             "error_type": error_type,
1615 |             "details": error_details,
1616 |         }
1617 |         is_response_error = True
1618 |     except FileNotFoundError as e:
1619 |         # Specific error for file not found
1620 |         raise ToolInputError(
1621 |             f"File not found: {path}",
1622 |             param_name="path",
1623 |             provided_value=path,
1624 |             details={"errno": e.errno, "error_type": "PATH_NOT_FOUND"},
1625 |         ) from e
1626 |     except IsADirectoryError as e:
1627 |         # Specific error for path being a directory
1628 |         raise ToolInputError(
1629 |             f"Path is a directory, not a file: {path}",
1630 |             param_name="path",
1631 |             provided_value=path,
1632 |             details={"errno": e.errno, "error_type": "PATH_IS_DIRECTORY"},
1633 |         ) from e
1634 |     except PermissionError as e:
1635 |         # Specific error for permissions
1636 |         raise ToolInputError(
1637 |             f"Permission denied reading file: {path}",
1638 |             param_name="path",
1639 |             provided_value=path,
1640 |             details={"errno": e.errno, "error_type": "PERMISSION_DENIED"},
1641 |         ) from e
1642 |     except UnicodeDecodeError as e:
1643 |         # This is handled in read_file_content now, but keep check here just in case
1644 |         raise ToolError(
1645 |             f"File is not valid UTF-8 encoded text: {validated_path}. Details: {e}",
1646 |             context={"path": validated_path, "encoding": "utf-8"},
1647 |         ) from e
1648 |     except OSError as e:
1649 |         # General OS error during read
1650 |         raise ToolError(
1651 |             f"OS error reading file: {str(e)}", context={"path": validated_path, "errno": e.errno}
1652 |         ) from e
1653 |     except Exception as e:
1654 |         logger.error(
1655 |             f"Unexpected error in read_file for '{path}': {e}", exc_info=True, emoji_key="error"
1656 |         )
1657 |         response_content = {
1658 |             "message": f"An unexpected error occurred while reading '{path}': {str(e)}",
1659 |             "error_code": "UNEXPECTED_ERROR",
1660 |             "error_type": type(e).__name__,
1661 |             "details": {"error_class": type(e).__name__, "path": path},
1662 |         }
1663 |         is_response_error = True
1664 | 
1665 |     # Use create_tool_response for consistent formatting of success/error messages.
1666 |     return create_tool_response(response_content, is_error=is_response_error)
1667 | 
1668 | 
1669 | @with_tool_metrics
1670 | @with_error_handling
1671 | async def read_multiple_files(paths: List[str]) -> Dict[str, Any]:
1672 |     """Read the contents of multiple files asynchronously and concurrently.
1673 | 
1674 |     Validates each path and attempts to read each file (text/binary),
1675 |     handling individual errors gracefully.
1676 | 
1677 |     Args:
1678 |         paths: A list of file paths to read.
1679 | 
1680 |     Returns:
1681 |         A dictionary summarizing the results (suitable for create_tool_response),
1682 |         including success/failure counts and content/errors for each file.
1683 |         The operation itself succeeds unless input validation fails.
1684 |     """
1685 |     start_time = time.monotonic()
1686 | 
1687 |     # Input validation
1688 |     if not isinstance(paths, list):
1689 |         raise ToolInputError(
1690 |             "Input must be a list of paths.", param_name="paths", provided_value=type(paths)
1691 |         )
1692 |     if not paths:  # Handle empty list input explicitly
1693 |         logger.info("read_multiple_files called with empty list.", emoji_key="info")
1694 |         return {
1695 |             "files": [],
1696 |             "succeeded": 0,
1697 |             "failed": 0,
1698 |             "success": True,
1699 |             "message": "No paths provided to read.",
1700 |         }
1701 |     if not all(isinstance(p, str) for p in paths):
1702 |         invalid_path = next((p for p in paths if not isinstance(p, str)), None)
1703 |         raise ToolInputError(
1704 |             "All items in the 'paths' list must be strings.",
1705 |             param_name="paths",
1706 |             provided_value=f"List contains element of type {type(invalid_path)}",
1707 |         )
1708 | 
1709 |     # --- Inner Task Definition ---
1710 |     async def read_single_file_task(path: str) -> Dict[str, Any]:
1711 |         """Task to read and process a single file for read_multiple_files."""
1712 |         task_result: Dict[str, Any] = {"path": path, "success": False}  # Initialize result dict
1713 |         validated_path: Optional[str] = None
1714 |         try:
1715 |             validated_path = await validate_path(
1716 |                 path, check_exists=True, check_parent_writable=False
1717 |             )
1718 |             task_result["path"] = validated_path  # Update path in result if validation succeeds
1719 | 
1720 |             # check isfile (follows links)
1721 |             if not await aiofiles.os.path.isfile(validated_path):
1722 |                 # Check if link before failing
1723 |                 if await aiofiles.os.path.islink(validated_path):
1724 |                     task_result["error"] = "Path is a link, but does not point to a regular file"
1725 |                 else:
1726 |                     task_result["error"] = "Path exists but is not a regular file"
1727 |                 return task_result
1728 | 
1729 |             read_error = None
1730 |             file_size = -1
1731 | 
1732 |             # Try reading as text (UTF-8)
1733 |             try:
1734 |                 content_str = await read_file_content(validated_path)
1735 |                 task_result["content"] = content_str
1736 |             except ToolError as text_err:
1737 |                 if "not valid UTF-8" in str(text_err):
1738 |                     try:
1739 |                         binary_content = await read_binary_file_content(validated_path)
1740 |                         # For multi-read, just provide preview directly in result content
1741 |                         hex_preview_len = 200
1742 |                         hex_preview = binary_content[:hex_preview_len].hex(" ")
1743 |                         ellipsis = "..." if len(binary_content) > hex_preview_len else ""
1744 |                         preview_msg = f"<binary file detected, hex preview (first {min(hex_preview_len, len(binary_content))} bytes)>: {hex_preview}{ellipsis}"
1745 |                         task_result["content"] = preview_msg
1746 |                         task_result["is_binary"] = True
1747 |                     except ToolError as bin_err:
1748 |                         read_error = f"Error reading as binary: {str(bin_err)}"
1749 |                     except Exception as bin_e:
1750 |                         read_error = f"Unexpected error reading as binary: {str(bin_e)}"
1751 |                 else:
1752 |                     read_error = f"Error reading file: {str(text_err)}"
1753 |             except Exception as text_e:
1754 |                 read_error = f"Unexpected error reading file as text: {str(text_e)}"
1755 | 
1756 |             if read_error:
1757 |                 task_result["error"] = read_error
1758 |                 return task_result  # Mark as failed
1759 | 
1760 |             # Successfully read content (string or binary preview string)
1761 |             task_result["success"] = True
1762 | 
1763 |             # Try to get size (use stat with follow_symlinks=False for consistency)
1764 |             try:
1765 |                 file_size = (await aiofiles.os.stat(validated_path, follow_symlinks=False)).st_size
1766 |                 task_result["size"] = file_size
1767 |             except OSError as stat_err:
1768 |                 logger.warning(
1769 |                     f"Could not get size for {validated_path} in multi-read: {stat_err}",
1770 |                     emoji_key="warning",
1771 |                 )
1772 |                 task_result["warning"] = "Could not retrieve file size."
1773 | 
1774 |             return task_result
1775 | 
1776 |         except (ToolInputError, ToolError) as e:
1777 |             # Handle validation or specific tool errors for this path
1778 |             task_result["error"] = str(e)
1779 |             task_result["path"] = (
1780 |                 validated_path or path
1781 |             )  # Use original path if validation failed early
1782 |             return task_result
1783 |         except Exception as e:
1784 |             # Catch unexpected errors during processing of a single file
1785 |             logger.error(
1786 |                 f"Unexpected error reading single file {path} in multi-read: {e}",
1787 |                 exc_info=True,
1788 |                 emoji_key="error",
1789 |             )
1790 |             task_result["error"] = f"Unexpected error: {str(e)}"
1791 |             task_result["path"] = validated_path or path
1792 |             return task_result
1793 | 
1794 |     # --- End Inner Task Definition ---
1795 | 
1796 |     # Execute reads concurrently using asyncio.gather
1797 |     results = await asyncio.gather(
1798 |         *(read_single_file_task(p) for p in paths), return_exceptions=True
1799 |     )
1800 | 
1801 |     # Process results (handle potential exceptions returned by gather)
1802 |     processed_results: List[Dict[str, Any]] = []
1803 |     successful_count = 0
1804 |     failed_count = 0
1805 | 
1806 |     for i, res in enumerate(results):
1807 |         original_path = paths[i]  # Keep track of the requested path
1808 |         if isinstance(res, Exception):
1809 |             # An unexpected exception occurred *outside* the try/except in the task (unlikely)
1810 |             logger.error(
1811 |                 f"Gather returned exception for path '{original_path}': {res}",
1812 |                 exc_info=res,
1813 |                 emoji_key="error",
1814 |             )
1815 |             processed_results.append(
1816 |                 {
1817 |                     "path": original_path,
1818 |                     "error": f"Internal error during task execution: {res}",
1819 |                     "success": False,
1820 |                 }
1821 |             )
1822 |             failed_count += 1
1823 |         elif isinstance(res, dict):
1824 |             # Expected dictionary output from our task
1825 |             processed_results.append(res)
1826 |             if res.get("success"):
1827 |                 successful_count += 1
1828 |             else:
1829 |                 failed_count += 1
1830 |         else:
1831 |             # Should not happen if task always returns dict
1832 |             logger.error(
1833 |                 f"Unexpected result type from task for path '{original_path}': {type(res)}",
1834 |                 emoji_key="error",
1835 |             )
1836 |             processed_results.append(
1837 |                 {
1838 |                     "path": original_path,
1839 |                     "error": f"Internal error: Unexpected task result type {type(res)}",
1840 |                     "success": False,
1841 |                 }
1842 |             )
1843 |             failed_count += 1
1844 | 
1845 |     processing_time = time.monotonic() - start_time
1846 |     logger.success(
1847 |         f"Finished read_multiple_files: {successful_count} succeeded, {failed_count} failed",
1848 |         emoji_key="file",
1849 |         total_files=len(paths),
1850 |         time=processing_time,
1851 |     )
1852 | 
1853 |     # Return a dictionary structure that create_tool_response understands
1854 |     return {
1855 |         "files": processed_results,
1856 |         "succeeded": successful_count,
1857 |         "failed": failed_count,
1858 |         "success": True,  # The overall tool execution was successful (individual files might have failed)
1859 |     }
1860 | 
1861 | 
1862 | @with_tool_metrics
1863 | @with_error_handling
1864 | async def get_unique_filepath(path: str) -> Dict[str, Any]:
1865 |     """
1866 |     Finds an available (non-existent) filepath based on the requested path.
1867 | 
1868 |     If the requested path already doesn't exist, it's returned directly.
1869 |     If it exists, it appends counters like '_1', '_2', etc., to the filename stem
1870 |     until an unused path is found within the same directory.
1871 | 
1872 |     Args:
1873 |         path: The desired file path.
1874 | 
1875 |     Returns:
1876 |         Dictionary containing the unique, validated, absolute path found.
1877 | 
1878 |     Raises:
1879 |         ToolInputError: If the base path is invalid or outside allowed directories.
1880 |         ToolError: If the counter limit is exceeded or filesystem errors occur.
1881 |     """
1882 |     start_time = time.monotonic()
1883 |     MAX_FILENAME_ATTEMPTS = 1000  # Safety limit
1884 | 
1885 |     try:
1886 |         # 1. Validate the *input* path first.
1887 |         #    check_exists=None because the *final* path might not exist.
1888 |         #    check_parent_writable=False - we only need read/stat access to check existence.
1889 |         #    The parent directory's writability should be checked by the *calling* function
1890 |         #    (like write_file or smart_download's directory creation) before this.
1891 |         validated_input_path = await validate_path(
1892 |             path, check_exists=None, check_parent_writable=False
1893 |         )
1894 |         logger.debug(
1895 |             f"get_unique_filepath: Validated input path resolves to {validated_input_path}"
1896 |         )
1897 | 
1898 |         # 2. Check if the initial validated path is already available.
1899 |         if not await aiofiles.os.path.exists(validated_input_path):
1900 |             logger.info(f"Initial path '{validated_input_path}' is available.", emoji_key="file")
1901 |             return {
1902 |                 "path": validated_input_path,
1903 |                 "attempts": 0,
1904 |                 "success": True,
1905 |                 "message": f"Path '{validated_input_path}' is already unique.",
1906 |             }
1907 | 
1908 |         # 3. Path exists, need to find a unique alternative.
1909 |         logger.debug(f"Path '{validated_input_path}' exists, finding unique alternative.")
1910 |         dirname = os.path.dirname(validated_input_path)
1911 |         original_filename = os.path.basename(validated_input_path)
1912 |         stem, suffix = os.path.splitext(original_filename)
1913 | 
1914 |         counter = 1
1915 |         while counter <= MAX_FILENAME_ATTEMPTS:
1916 |             # Construct new filename candidate
1917 |             candidate_filename = f"{stem}_{counter}{suffix}"
1918 |             candidate_path = os.path.join(dirname, candidate_filename)
1919 | 
1920 |             # Check existence asynchronously
1921 |             if not await aiofiles.os.path.exists(candidate_path):
1922 |                 processing_time = time.monotonic() - start_time
1923 |                 logger.success(
1924 |                     f"Found unique path '{candidate_path}' after {counter} attempts.",
1925 |                     emoji_key="file",
1926 |                     time=processing_time,
1927 |                 )
1928 |                 return {
1929 |                     "path": candidate_path,
1930 |                     "attempts": counter,
1931 |                     "success": True,
1932 |                     "message": f"Found unique path '{candidate_path}'.",
1933 |                 }
1934 | 
1935 |             counter += 1
1936 | 
1937 |         # If loop finishes, we exceeded the limit
1938 |         raise ToolError(
1939 |             f"Could not find a unique filename based on '{path}' after {MAX_FILENAME_ATTEMPTS} attempts.",
1940 |             context={"base_path": validated_input_path, "attempts": MAX_FILENAME_ATTEMPTS},
1941 |         )
1942 | 
1943 |     except OSError as e:
1944 |         # Catch errors during exists checks
1945 |         logger.error(
1946 |             f"Filesystem error finding unique path based on '{path}': {str(e)}",
1947 |             exc_info=True,
1948 |             emoji_key="error",
1949 |         )
1950 |         raise ToolError(
1951 |             f"Filesystem error checking path existence: {str(e)}",
1952 |             context={"path": path, "errno": e.errno},
1953 |         ) from e
1954 |     except (
1955 |         ToolInputError,
1956 |         ToolError,
1957 |     ):  # Re-raise specific errors from validate_path or the counter limit
1958 |         raise
1959 |     except Exception as e:
1960 |         # Catch unexpected errors
1961 |         logger.error(
1962 |             f"Unexpected error finding unique path for {path}: {e}",
1963 |             exc_info=True,
1964 |             emoji_key="error",
1965 |         )
1966 |         raise ToolError(
1967 |             f"An unexpected error occurred finding a unique path: {str(e)}", context={"path": path}
1968 |         ) from e
1969 | 
1970 | 
1971 | @with_tool_metrics
1972 | @with_error_handling
1973 | async def write_file(path: str, content: Union[str, bytes]) -> Dict[str, Any]:
1974 |     """Write content to a file asynchronously (UTF-8 or binary), creating/overwriting.
1975 | 
1976 |     Ensures the path is valid, within allowed directories, and that the parent
1977 |     directory exists and is writable. Fails if the target path exists and is a directory.
1978 | 
1979 |     Args:
1980 |         path: Path to the file to write.
1981 |         content: Content to write (string for text UTF-8, bytes for binary).
1982 | 
1983 |     Returns:
1984 |         A dictionary confirming success and providing file details (path, size).
1985 |     """
1986 |     start_time = time.monotonic()
1987 | 
1988 |     # Validate content type explicitly at the start.
1989 |     if not isinstance(content, (str, bytes)):
1990 |         raise ToolInputError(
1991 |             "Content to write must be a string or bytes.",
1992 |             param_name="content",
1993 |             provided_value=type(content),
1994 |         )
1995 | 
1996 |     # Validate path: doesn't need to exist necessarily (check_exists=None), but parent must exist and be writable.
1997 |     validated_path = await validate_path(path, check_exists=None, check_parent_writable=True)
1998 | 
1999 |     # Check if the path exists and is a directory (we shouldn't overwrite a dir with a file).
2000 |     # Use exists instead of lexists for this check
2001 |     if await aiofiles.os.path.exists(validated_path) and await aiofiles.os.path.isdir(
2002 |         validated_path
2003 |     ):
2004 |         raise ToolInputError(
2005 |             f"Cannot write file: Path '{path}' (resolved to '{validated_path}') exists and is a directory.",
2006 |             param_name="path",
2007 |             provided_value=path,
2008 |         )
2009 | 
2010 |     # write_file_content handles actual writing and parent dir creation
2011 |     await write_file_content(validated_path, content)  # Can raise ToolError
2012 | 
2013 |     # Verify write success by getting status and size afterwards
2014 |     file_size = -1
2015 |     try:
2016 |         file_size = (await aiofiles.os.stat(validated_path, follow_symlinks=False)).st_size
2017 |     except OSError as e:
2018 |         # If stat fails after write seemed to succeed, something is wrong.
2019 |         logger.error(
2020 |             f"File write appeared successful for {validated_path}, but failed to get status afterwards: {e}",
2021 |             emoji_key="error",
2022 |         )
2023 |         raise ToolError(
2024 |             f"File written but failed to verify status afterwards: {str(e)}",
2025 |             context={"path": validated_path},
2026 |         ) from e
2027 | 
2028 |     processing_time = time.monotonic() - start_time
2029 |     logger.success(
2030 |         f"Successfully wrote file: {path}", emoji_key="file", size=file_size, time=processing_time
2031 |     )
2032 | 
2033 |     # Return a structured success response.
2034 |     return {
2035 |         "message": f"Successfully wrote {file_size} bytes to '{validated_path}'.",
2036 |         "path": validated_path,
2037 |         "size": file_size,
2038 |         "success": True,
2039 |     }
2040 | 
2041 | 
2042 | @with_tool_metrics
2043 | @with_error_handling
2044 | async def edit_file(
2045 |     path: str, edits: List[Dict[str, str]], dry_run: bool = False
2046 | ) -> Dict[str, Any]:
2047 |     """Edit a text file asynchronously by applying string replacements (UTF-8).
2048 | 
2049 |     Validates path and edits, reads the file, applies changes (with fallbacks for
2050 |     whitespace differences), generates a diff, and optionally writes back.
2051 | 
2052 |     Args:
2053 |         path: Path to the file to edit. Must be an existing text file.
2054 |         edits: List of edit operations. Each dict needs 'oldText' (str)
2055 |                and 'newText' (str).
2056 |         dry_run: If True, calculates changes and diff but does not save them.
2057 | 
2058 |     Returns:
2059 |         Dictionary containing the generated diff, success status, path,
2060 |         and whether it was a dry run.
2061 |     """
2062 |     start_time = time.monotonic()
2063 | 
2064 |     # Validate path must exist and be a file (check_exists=True).
2065 |     validated_path = await validate_path(path, check_exists=True, check_parent_writable=False)
2066 |     # Check if it's a regular file (follows links)
2067 |     if not await aiofiles.os.path.isfile(validated_path):
2068 |         if await aiofiles.os.path.islink(validated_path):
2069 |             raise ToolInputError(
2070 |                 f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a regular file.",
2071 |                 param_name="path",
2072 |                 provided_value=path,
2073 |             )
2074 |         else:
2075 |             raise ToolInputError(
2076 |                 f"Path '{path}' (resolved to '{validated_path}') is not a regular file.",
2077 |                 param_name="path",
2078 |                 provided_value=path,
2079 |             )
2080 | 
2081 |     # Validate edits structure
2082 |     if not isinstance(edits, list):
2083 |         raise ToolInputError(
2084 |             "Edits parameter must be a list.", param_name="edits", provided_value=type(edits)
2085 |         )
2086 |     if not edits:
2087 |         # Handle empty edits list as a no-op success.
2088 |         logger.info(
2089 |             f"edit_file called with empty edits list for {path}. No changes will be made.",
2090 |             emoji_key="info",
2091 |         )
2092 |         return {
2093 |             "path": validated_path,
2094 |             "diff": "No edits provided.",
2095 |             "success": True,
2096 |             "dry_run": dry_run,
2097 |             "message": "No edits were specified.",
2098 |         }
2099 |     # Deeper validation of each edit dict happens within apply_file_edits
2100 | 
2101 |     # NOTE: Modification protection is currently NOT applied here.
2102 |     # This operates on a single file. Bulk editing would require a different tool
2103 |     # where protection heuristics might be applied.
2104 | 
2105 |     # apply_file_edits handles reading, core editing logic, diffing, conditional writing.
2106 |     # Raises ToolInputError/ToolError on failure.
2107 |     diff, new_content = await apply_file_edits(validated_path, edits, dry_run)
2108 | 
2109 |     processing_time = time.monotonic() - start_time
2110 | 
2111 |     action = "Previewed edits for" if dry_run else "Applied edits to"
2112 |     logger.success(
2113 |         f"Successfully {action} file: {path}",
2114 |         emoji_key="file",
2115 |         num_edits=len(edits),
2116 |         dry_run=dry_run,
2117 |         time=processing_time,
2118 |         changes_made=(diff != ""),  # Log if actual changes resulted
2119 |     )
2120 | 
2121 |     # Provide clearer messages based on diff content and dry_run status.
2122 |     if diff:
2123 |         diff_message = diff
2124 |         status_message = f"Successfully {'previewed' if dry_run else 'applied'} {len(edits)} edits."
2125 |     else:  # Edits provided, but resulted in no change to content
2126 |         diff_message = "No changes detected after applying edits."
2127 |         status_message = f"{len(edits)} edits provided, but resulted in no content changes."
2128 | 
2129 |     return {
2130 |         "path": validated_path,
2131 |         "diff": diff_message,
2132 |         "success": True,
2133 |         "dry_run": dry_run,
2134 |         "message": status_message,
2135 |         # "new_content": new_content # Optional: return new content, especially for dry runs? Can be large.
2136 |     }
2137 | 
2138 | 
2139 | @with_tool_metrics
2140 | @with_error_handling
2141 | async def create_directory(path: str) -> Dict[str, Any]:
2142 |     """Create a directory asynchronously, including parent directories (like 'mkdir -p').
2143 | 
2144 |     Validates the path is allowed and parent is writable. Idempotent: If the directory
2145 |     already exists, it succeeds without error. Fails if the path exists but is a file.
2146 | 
2147 |     Args:
2148 |         path: Path to the directory to create.
2149 | 
2150 |     Returns:
2151 |         Dictionary confirming success, path, and whether it was newly created.
2152 |     """
2153 |     start_time = time.monotonic()
2154 | 
2155 |     # Validate path, parent must exist/be writable. Path itself should ideally not exist (check_exists=None allows check).
2156 |     validated_path = await validate_path(path, check_exists=None, check_parent_writable=True)
2157 | 
2158 |     created = False
2159 |     message = ""
2160 |     try:
2161 |         # Check existence and type before creating, using exists instead of lexists
2162 |         if await aiofiles.os.path.exists(validated_path):
2163 |             if await aiofiles.os.path.isdir(validated_path):
2164 |                 # Directory already exists - idempotent success.
2165 |                 logger.info(
2166 |                     f"Directory already exists: {path} (resolved: {validated_path})",
2167 |                     emoji_key="directory",
2168 |                 )
2169 |                 message = f"Directory '{validated_path}' already exists."
2170 |             else:
2171 |                 # Path exists but is not a directory (e.g., a file or symlink)
2172 |                 raise ToolInputError(
2173 |                     f"Cannot create directory: Path '{path}' (resolved to '{validated_path}') already exists but is not a directory.",
2174 |                     param_name="path",
2175 |                     provided_value=path,
2176 |                 )
2177 |         else:
2178 |             # Path does not exist, proceed with creation using async makedirs.
2179 |             await aiofiles.os.makedirs(validated_path, exist_ok=True)
2180 |             created = True
2181 |             logger.success(
2182 |                 f"Successfully created directory: {path} (resolved: {validated_path})",
2183 |                 emoji_key="directory",
2184 |             )
2185 |             message = f"Successfully created directory '{validated_path}'."
2186 | 
2187 |     except OSError as e:
2188 |         # Catch errors during the exists/isdir checks or makedirs call
2189 |         logger.error(
2190 |             f"Error creating directory '{path}' (resolved: {validated_path}): {e}",
2191 |             exc_info=True,
2192 |             emoji_key="error",
2193 |         )
2194 |         raise ToolError(
2195 |             f"Error creating directory '{path}': {str(e)}",
2196 |             context={"path": validated_path, "errno": e.errno},
2197 |         ) from e
2198 |     except Exception as e:
2199 |         # Catch unexpected errors
2200 |         logger.error(
2201 |             f"Unexpected error creating directory {path}: {e}", exc_info=True, emoji_key="error"
2202 |         )
2203 |         raise ToolError(
2204 |             f"An unexpected error occurred creating directory: {str(e)}",
2205 |             context={"path": validated_path},
2206 |         ) from e
2207 | 
2208 |     processing_time = time.monotonic() - start_time
2209 |     logger.success(
2210 |         f"Successfully created directory: {path} (resolved: {validated_path})",
2211 |         emoji_key="directory",
2212 |         time=processing_time,
2213 |     )
2214 | 
2215 |     return {"path": validated_path, "created": created, "success": True, "message": message}
2216 | 
2217 | 
2218 | @with_tool_metrics
2219 | @with_error_handling
2220 | async def list_directory(path: str) -> Dict[str, Any]:
2221 |     """List files and subdirectories within a given directory asynchronously.
2222 | 
2223 |     Validates the path exists, is a directory, and is allowed.
2224 |     Provides basic info (name, type, size for files, link target for symlinks) for each entry.
2225 | 
2226 |     Args:
2227 |         path: Path to the directory to list.
2228 | 
2229 |     Returns:
2230 |         Dictionary containing the path listed and a list of entries.
2231 |     """
2232 |     start_time = time.monotonic()
2233 | 
2234 |     # Validate path exists and is a directory (check_exists=True).
2235 |     validated_path = await validate_path(path, check_exists=True, check_parent_writable=False)
2236 |     if not await aiofiles.os.path.isdir(validated_path):
2237 |         # If it's a link, check if it points to a directory
2238 |         if await aiofiles.os.path.islink(validated_path):
2239 |             try:
2240 |                 target_is_dir = await aiofiles.os.path.isdir(
2241 |                     await aiofiles.os.path.realpath(validated_path)
2242 |                 )
2243 |                 if not target_is_dir:
2244 |                     raise ToolInputError(
2245 |                         f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a directory.",
2246 |                         param_name="path",
2247 |                         provided_value=path,
2248 |                     )
2249 |                 # If target is dir, proceed using validated_path (which resolves the link)
2250 |             except OSError as e:
2251 |                 raise ToolError(
2252 |                     f"Error resolving or checking link target for directory listing: {e}",
2253 |                     context={"path": validated_path},
2254 |                 ) from e
2255 |         else:
2256 |             raise ToolInputError(
2257 |                 f"Path '{path}' (resolved to '{validated_path}') is not a directory.",
2258 |                 param_name="path",
2259 |                 provided_value=path,
2260 |             )
2261 | 
2262 |     entries: List[Dict[str, Any]] = []
2263 |     scan_errors: List[str] = []
2264 |     try:
2265 |         # Use await with scandir rather than async iteration
2266 |         entry_list = await aiofiles.os.scandir(validated_path)
2267 |         for entry in entry_list:
2268 |             entry_info: Dict[str, Any] = {"name": entry.name}
2269 |             try:
2270 |                 # Use async methods on the DirEntry object for efficiency, check link status explicitly
2271 |                 is_link = entry.is_symlink()  # Checks if entry *itself* is a link
2272 |                 entry_info["is_symlink"] = is_link
2273 | 
2274 |                 # Let's be explicit using lstat results for type determination
2275 |                 try:
2276 |                     stat_res = entry.stat(follow_symlinks=False)  # Use lstat via entry
2277 |                     mode = stat_res.st_mode
2278 |                     l_is_dir = os.path.stat.S_ISDIR(mode)
2279 |                     l_is_file = os.path.stat.S_ISREG(mode)
2280 |                     l_is_link = os.path.stat.S_ISLNK(mode)  # Should match entry.is_symlink() result
2281 | 
2282 |                     if l_is_dir:
2283 |                         entry_info["type"] = "directory"
2284 |                     elif l_is_file:
2285 |                         entry_info["type"] = "file"
2286 |                         entry_info["size"] = stat_res.st_size  # Size of file itself
2287 |                     elif l_is_link:
2288 |                         entry_info["type"] = "symlink"
2289 |                         entry_info["size"] = stat_res.st_size  # Size of link itself
2290 |                         # Optionally try to resolve link target for display
2291 |                         try:
2292 |                             target = await aiofiles.os.readlink(entry.path)
2293 |                             entry_info["symlink_target"] = target
2294 |                         except OSError as link_err:
2295 |                             entry_info["error"] = f"Could not read link target: {link_err}"
2296 |                     else:
2297 |                         entry_info["type"] = "other"  # E.g., socket, fifo
2298 |                         entry_info["size"] = stat_res.st_size
2299 | 
2300 |                 except OSError as stat_err:
2301 |                     logger.warning(
2302 |                         f"Could not lstat entry {entry.path} in list_directory: {stat_err}",
2303 |                         emoji_key="warning",
2304 |                     )
2305 |                     entry_info["type"] = "error"
2306 |                     entry_info["error"] = f"Could not get info: {stat_err}"
2307 | 
2308 |                 entries.append(entry_info)
2309 | 
2310 |             except OSError as entry_err:
2311 |                 # Error processing a specific entry (e.g., permission denied on is_dir/is_file/is_symlink)
2312 |                 logger.warning(
2313 |                     f"Could not process directory entry '{entry.name}' in {validated_path}: {entry_err}",
2314 |                     emoji_key="warning",
2315 |                 )
2316 |                 error_message = f"Error processing entry '{entry.name}': {entry_err}"
2317 |                 scan_errors.append(error_message)
2318 |                 # Add error entry to the list for visibility
2319 |                 entries.append({"name": entry.name, "type": "error", "error": str(entry_err)})
2320 | 
2321 |         # Sort entries: directories, files, symlinks, others, errors; then alphabetically.
2322 |         entries.sort(
2323 |             key=lambda e: (
2324 |                 0
2325 |                 if e.get("type") == "directory"
2326 |                 else 1
2327 |                 if e.get("type") == "file"
2328 |                 else 2
2329 |                 if e.get("type") == "symlink"
2330 |                 else 3
2331 |                 if e.get("type") == "other"
2332 |                 else 4,  # Errors last
2333 |                 e.get("name", "").lower(),  # Case-insensitive sort by name
2334 |             )
2335 |         )
2336 | 
2337 |     except OSError as e:
2338 |         # Error during the initial scandir call (e.g., permission denied on the directory itself)
2339 |         raise ToolError(
2340 |             f"Error listing directory '{path}': {str(e)}",
2341 |             context={"path": validated_path, "errno": e.errno},
2342 |         ) from e
2343 |     except Exception as e:
2344 |         # Catch unexpected errors during iteration
2345 |         logger.error(
2346 |             f"Unexpected error listing directory {path}: {e}", exc_info=True, emoji_key="error"
2347 |         )
2348 |         raise ToolError(
2349 |             f"An unexpected error occurred listing directory: {str(e)}",
2350 |             context={"path": validated_path},
2351 |         ) from e
2352 | 
2353 |     processing_time = time.monotonic() - start_time
2354 |     logger.success(
2355 |         f"Listed directory: {path} ({len(entries)} entries found, {len(scan_errors)} errors)",
2356 |         emoji_key="directory",
2357 |         time=processing_time,
2358 |     )
2359 | 
2360 |     # Structure result clearly, including warnings if errors occurred on entries.
2361 |     result = {
2362 |         "path": validated_path,
2363 |         "entries": entries,
2364 |         "success": True,
2365 |         "message": f"Found {len(entries)} entries in '{validated_path}'.",
2366 |     }
2367 |     if scan_errors:
2368 |         warning_summary = f"Encountered {len(scan_errors)} errors processing directory entries."
2369 |         result["warnings"] = [warning_summary]
2370 |         # Optionally include first few errors: result["error_details"] = scan_errors[:5]
2371 | 
2372 |     return result
2373 | 
2374 | 
2375 | @with_tool_metrics
2376 | @with_error_handling
2377 | async def directory_tree(
2378 |     path: str, max_depth: int = 3, include_size: bool = False
2379 | ) -> Dict[str, Any]:
2380 |     """Get a recursive tree view of a directory structure asynchronously.
2381 | 
2382 |     Args:
2383 |         path: Path to the root directory for the tree view.
2384 |         max_depth: Maximum recursion depth (-1 for effectively unlimited, capped internally).
2385 |         include_size: If True, include file sizes in the tree (requires extra stat calls).
2386 | 
2387 |     Returns:
2388 |         Dictionary containing the path and the hierarchical tree structure.
2389 |     """
2390 |     start_time = time.monotonic()
2391 |     # Internal safety cap for 'unlimited' depth to prevent runaway recursion.
2392 |     INTERNAL_DEPTH_CAP = 15
2393 | 
2394 |     # Validate path exists and is a directory (check_exists=True)
2395 |     validated_path = await validate_path(path, check_exists=True)
2396 |     if not await aiofiles.os.path.isdir(validated_path):
2397 |         # Check if it's a link to a directory
2398 |         if await aiofiles.os.path.islink(validated_path):
2399 |             try:
2400 |                 target_is_dir = await aiofiles.os.path.isdir(
2401 |                     await aiofiles.os.path.realpath(validated_path)
2402 |                 )
2403 |                 if not target_is_dir:
2404 |                     raise ToolInputError(
2405 |                         f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a directory.",
2406 |                         param_name="path",
2407 |                         provided_value=path,
2408 |                     )
2409 |                 # proceed with validated_path which resolves link
2410 |             except OSError as e:
2411 |                 raise ToolError(
2412 |                     f"Error resolving or checking link target for directory tree: {e}",
2413 |                     context={"path": validated_path},
2414 |                 ) from e
2415 |         else:
2416 |             raise ToolInputError(
2417 |                 f"Path '{path}' (resolved to '{validated_path}') is not a directory.",
2418 |                 param_name="path",
2419 |                 provided_value=path,
2420 |             )
2421 | 
2422 |     # Validate max_depth input
2423 |     if not isinstance(max_depth, int):
2424 |         raise ToolInputError(
2425 |             "max_depth must be an integer.", param_name="max_depth", provided_value=max_depth
2426 |         )
2427 | 
2428 |     # Apply internal depth cap if necessary
2429 |     actual_max_depth = max_depth
2430 |     if max_depth < 0 or max_depth > INTERNAL_DEPTH_CAP:
2431 |         if max_depth >= 0:  # Only warn if user specified a large number, not for -1
2432 |             logger.warning(
2433 |                 f"Requested max_depth {max_depth} exceeds internal cap {INTERNAL_DEPTH_CAP}. Limiting depth.",
2434 |                 emoji_key="warning",
2435 |             )
2436 |         actual_max_depth = INTERNAL_DEPTH_CAP
2437 | 
2438 |     # --- Recursive Helper ---
2439 |     async def build_tree_recursive(current_path: str, current_depth: int) -> List[Dict[str, Any]]:
2440 |         """Recursively builds the directory tree structure."""
2441 |         if current_depth > actual_max_depth:
2442 |             # Return specific marker if depth limit hit, not just empty list.
2443 |             return [{"name": f"... (Max depth {actual_max_depth} reached)", "type": "info"}]
2444 | 
2445 |         children_nodes: List[Dict[str, Any]] = []
2446 |         try:
2447 |             # Await scandir and then iterate over the returned entries
2448 |             entries = await aiofiles.os.scandir(current_path)
2449 |             for entry in entries:
2450 |                 entry_data: Dict[str, Any] = {"name": entry.name}
2451 |                 try:
2452 |                     # Use lstat via entry to avoid following links unexpectedly
2453 |                     stat_res = entry.stat(follow_symlinks=False)
2454 |                     mode = stat_res.st_mode
2455 |                     l_is_dir = os.path.stat.S_ISDIR(mode)
2456 |                     l_is_file = os.path.stat.S_ISREG(mode)
2457 |                     l_is_link = os.path.stat.S_ISLNK(mode)
2458 | 
2459 |                     if l_is_dir:
2460 |                         entry_data["type"] = "directory"
2461 |                         # Recurse asynchronously into subdirectory
2462 |                         entry_data["children"] = await build_tree_recursive(
2463 |                             entry.path, current_depth + 1
2464 |                         )
2465 |                     elif l_is_file:
2466 |                         entry_data["type"] = "file"
2467 |                         if include_size:
2468 |                             entry_data["size"] = stat_res.st_size
2469 |                     elif l_is_link:
2470 |                         entry_data["type"] = "symlink"
2471 |                         if include_size:
2472 |                             entry_data["size"] = stat_res.st_size  # Size of link itself
2473 |                         # Optionally resolve link target? Can be noisy. Let's skip for tree view simplicity.
2474 |                         # try: entry_data["target"] = await aiofiles.os.readlink(entry.path)
2475 |                         # except OSError: entry_data["target"] = "<Error reading link>"
2476 |                     else:
2477 |                         entry_data["type"] = "other"
2478 |                         if include_size:
2479 |                             entry_data["size"] = stat_res.st_size
2480 | 
2481 |                     children_nodes.append(entry_data)
2482 | 
2483 |                 except OSError as entry_err:
2484 |                     # Error processing one entry (e.g., permissions on stat)
2485 |                     logger.warning(
2486 |                         f"Could not process entry {entry.path} in tree: {entry_err}",
2487 |                         emoji_key="warning",
2488 |                     )
2489 |                     children_nodes.append(
2490 |                         {"name": entry.name, "type": "error", "error": str(entry_err)}
2491 |                     )
2492 | 
2493 |             # Sort entries at the current level alphabetically by name, type secondary
2494 |             children_nodes.sort(
2495 |                 key=lambda e: (
2496 |                     0
2497 |                     if e.get("type") == "directory"
2498 |                     else 1
2499 |                     if e.get("type") == "file"
2500 |                     else 2
2501 |                     if e.get("type") == "symlink"
2502 |                     else 3,
2503 |                     e.get("name", "").lower(),  # Case-insensitive sort
2504 |                 )
2505 |             )
2506 |             return children_nodes
2507 | 
2508 |         except OSError as e:
2509 |             # Error scanning the directory itself (e.g., permissions)
2510 |             logger.error(
2511 |                 f"Error scanning directory {current_path} for tree: {str(e)}", emoji_key="error"
2512 |             )
2513 |             # Return error indicator instead of raising, allows partial trees if root is accessible
2514 |             return [{"name": f"... (Error scanning this directory: {e})", "type": "error"}]
2515 |         except Exception as e:
2516 |             # Unexpected error during scan
2517 |             logger.error(
2518 |                 f"Unexpected error scanning directory {current_path} for tree: {e}",
2519 |                 exc_info=True,
2520 |                 emoji_key="error",
2521 |             )
2522 |             return [{"name": f"... (Unexpected error scanning: {e})", "type": "error"}]
2523 | 
2524 |     # --- End Recursive Helper ---
2525 | 
2526 |     # Generate tree starting from the validated path
2527 |     tree_structure = await build_tree_recursive(validated_path, 0)
2528 | 
2529 |     processing_time = time.monotonic() - start_time
2530 |     logger.success(
2531 |         f"Generated directory tree for: {path}",
2532 |         emoji_key="directory",
2533 |         max_depth=actual_max_depth,  # Log the effective depth
2534 |         requested_depth=max_depth,
2535 |         include_size=include_size,
2536 |         time=processing_time,
2537 |     )
2538 | 
2539 |     # Structure result clearly
2540 |     return {
2541 |         "path": validated_path,
2542 |         "max_depth_reached": actual_max_depth,
2543 |         "tree": tree_structure,
2544 |         "success": True,
2545 |         "message": f"Generated directory tree for '{validated_path}' up to depth {actual_max_depth}.",
2546 |     }
2547 | 
2548 | 
2549 | @with_tool_metrics
2550 | @with_error_handling
2551 | async def move_file(
2552 |     source: str,
2553 |     destination: str,
2554 |     overwrite: bool = False,  # Default to NOT overwrite for safety
2555 | ) -> Dict[str, Any]:
2556 |     """Move or rename a file or directory asynchronously.
2557 | 
2558 |     Ensures both source and destination paths are within allowed directories.
2559 |     Checks for existence and potential conflicts before moving.
2560 | 
2561 |     Args:
2562 |         source: Path to the file or directory to move. Must exist.
2563 |         destination: New path for the file or directory. Parent must exist and be writable.
2564 |         overwrite: If True, allows overwriting an existing file or *empty* directory
2565 |                    at the destination. USE WITH CAUTION. Defaults False.
2566 | 
2567 |     Returns:
2568 |         Dictionary confirming the move operation, including source and destination paths.
2569 |     """
2570 |     start_time = time.monotonic()
2571 | 
2572 |     # Validate source path (must exist, check_exists=True)
2573 |     validated_source = await validate_path(source, check_exists=True)
2574 | 
2575 |     # Validate destination path (parent must exist and be writable, path itself may or may not exist check_exists=None)
2576 |     # Check parent writability *before* checking overwrite logic.
2577 |     validated_dest = await validate_path(destination, check_exists=None, check_parent_writable=True)
2578 | 
2579 |     # Validate overwrite flag type
2580 |     if not isinstance(overwrite, bool):
2581 |         raise ToolInputError(
2582 |             "overwrite parameter must be a boolean (true/false).",
2583 |             param_name="overwrite",
2584 |             provided_value=type(overwrite),
2585 |         )
2586 | 
2587 |     # NOTE: Modification protection is currently NOT applied here.
2588 |     # If overwriting a directory, current logic only allows overwriting an *empty* one via rmdir.
2589 |     # Overwriting a file is a single-file modification.
2590 |     # A future enhancement could add protection heuristics here if overwriting non-empty dirs was allowed.
2591 | 
2592 |     try:
2593 |         # Check if destination already exists using exists instead of lexists
2594 |         dest_exists = await aiofiles.os.path.exists(validated_dest)
2595 |         dest_is_dir = False
2596 |         if dest_exists:
2597 |             dest_is_dir = await aiofiles.os.path.isdir(
2598 |                 validated_dest
2599 |             )  # Check type (follows links if dest is link)
2600 | 
2601 |         if dest_exists:
2602 |             if overwrite:
2603 |                 # Overwrite requested. Log prominently.
2604 |                 logger.warning(
2605 |                     f"Overwrite flag is True. Attempting to replace existing path '{validated_dest}' with '{validated_source}'.",
2606 |                     emoji_key="warning",
2607 |                 )
2608 | 
2609 |                 # Check if source and destination types are compatible for overwrite (e.g., cannot replace dir with file easily)
2610 |                 # Use stat with follow_symlinks=False for source type check as well.
2611 |                 source_stat = await aiofiles.os.stat(validated_source, follow_symlinks=False)
2612 |                 is_source_dir = os.path.stat.S_ISDIR(source_stat.st_mode)
2613 | 
2614 |                 # Simple check: Prevent overwriting dir with file or vice-versa.
2615 |                 # Note: aiofiles.os.rename might handle some cases, but explicit check is safer.
2616 |                 # This logic assumes we'd remove the destination first.
2617 |                 if is_source_dir != dest_is_dir:
2618 |                     # Allow replacing link with dir/file, or dir/file with link? Be cautious.
2619 |                     # Let's prevent dir/file mismatch for simplicity. Overwriting links is tricky.
2620 |                     if not await aiofiles.os.path.islink(
2621 |                         validated_dest
2622 |                     ):  # Only enforce if dest is not a link
2623 |                         raise ToolInputError(
2624 |                             f"Cannot overwrite: Source is a {'directory' if is_source_dir else 'file/link'} but destination ('{validated_dest}') exists and is a {'directory' if dest_is_dir else 'file/link'}.",
2625 |                             param_name="destination",
2626 |                             provided_value=destination,
2627 |                         )
2628 | 
2629 |                 # Attempt to remove the existing destination. This is the dangerous part.
2630 |                 try:
2631 |                     if dest_is_dir:
2632 |                         # Use async rmdir - fails if directory is not empty!
2633 |                         # This prevents accidental recursive deletion via move+overwrite.
2634 |                         await aiofiles.os.rmdir(validated_dest)
2635 |                         logger.info(
2636 |                             f"Removed existing empty directory destination '{validated_dest}' for overwrite.",
2637 |                             emoji_key="action",
2638 |                         )
2639 |                     else:
2640 |                         # Removes a file or symlink
2641 |                         await aiofiles.os.remove(validated_dest)
2642 |                         logger.info(
2643 |                             f"Removed existing file/link destination '{validated_dest}' for overwrite.",
2644 |                             emoji_key="action",
2645 |                         )
2646 |                 except OSError as remove_err:
2647 |                     # Handle cases like directory not empty, permission error during removal
2648 |                     raise ToolError(
2649 |                         f"Failed to remove existing destination '{validated_dest}' for overwrite: {remove_err}. Check permissions or if directory is empty.",
2650 |                         context={
2651 |                             "source": validated_source,
2652 |                             "destination": validated_dest,
2653 |                             "errno": remove_err.errno,
2654 |                         },
2655 |                     ) from remove_err
2656 | 
2657 |             else:  # Destination exists, and overwrite is False (default)
2658 |                 raise ToolInputError(
2659 |                     f"Cannot move: Destination path '{destination}' (resolved to '{validated_dest}') already exists. Use overwrite=True to replace.",
2660 |                     param_name="destination",
2661 |                     provided_value=destination,
2662 |                 )
2663 | 
2664 |         # Ensure source and destination are not the same path after normalization/resolution.
2665 |         # Note: aiofiles.os.rename handles this check internally too, but explicit check is clearer.
2666 |         # Use os.path.samfile to check if they refer to the same actual file/inode (more robust than string comparison)
2667 |         # samfile is sync, but should be fast. Requires paths to exist.
2668 |         try:
2669 |             # Source exists. Does dest exist now (after potential removal)?
2670 |             dest_exists_after_remove = await aiofiles.os.path.exists(validated_dest)  # noqa: F841
2671 |             # Only call samefile if both paths point to existing things after the removal step.
2672 |             # This check is mainly useful if overwrite was False and dest didn't exist initially but resolved same as source.
2673 |             # If dest existed and overwrite=True, it should have been removed.
2674 |             # Let's simplify: compare final validated paths. If rename fails later, it likely handles identity internally.
2675 |             if validated_source == validated_dest:
2676 |                 logger.info(
2677 |                     f"Source and destination paths resolve to the same location ('{validated_source}'). No move needed.",
2678 |                     emoji_key="info",
2679 |                 )
2680 |                 return {
2681 |                     "source": validated_source,
2682 |                     "destination": validated_dest,
2683 |                     "success": True,
2684 |                     "message": "Source and destination are the same path. No operation performed.",
2685 |                 }
2686 |         except OSError:
2687 |             # Ignore errors from samefile check, rely on rename's internal checks.
2688 |             pass
2689 | 
2690 |         # Attempt the move/rename operation asynchronously
2691 |         await aiofiles.os.rename(validated_source, validated_dest)
2692 | 
2693 |     except OSError as e:
2694 |         # Catch errors from exists, rename, remove, rmdir etc.
2695 |         logger.error(
2696 |             f"Error moving/renaming from '{source}' to '{destination}': {str(e)}",
2697 |             exc_info=True,
2698 |             emoji_key="error",
2699 |         )
2700 |         raise ToolError(
2701 |             f"Error moving '{source}' to '{destination}': {str(e)}",
2702 |             context={"source": validated_source, "destination": validated_dest, "errno": e.errno},
2703 |         ) from e
2704 |     except (ToolInputError, ToolError, ProtectionTriggeredError):  # Re-raise our specific errors
2705 |         raise
2706 |     except Exception as e:
2707 |         # Catch unexpected errors
2708 |         logger.error(
2709 |             f"Unexpected error moving {source} to {destination}: {e}",
2710 |             exc_info=True,
2711 |             emoji_key="error",
2712 |         )
2713 |         raise ToolError(
2714 |             f"An unexpected error occurred during move: {str(e)}",
2715 |             context={"source": validated_source, "destination": validated_dest},
2716 |         ) from e
2717 | 
2718 |     processing_time = time.monotonic() - start_time
2719 |     logger.success(
2720 |         f"Moved '{source}' to '{destination}'",
2721 |         emoji_key="file",
2722 |         time=processing_time,
2723 |         overwrite_used=overwrite,
2724 |     )
2725 | 
2726 |     return {
2727 |         "source": validated_source,  # Report original source for clarity
2728 |         "destination": validated_dest,
2729 |         "success": True,
2730 |         "message": f"Successfully moved '{validated_source}' to '{validated_dest}'.",
2731 |     }
2732 | 
2733 | 
2734 | @with_tool_metrics
2735 | @with_error_handling
2736 | async def delete_path(path: str) -> Dict[str, Any]:
2737 |     """Delete a file or an entire directory tree asynchronously.
2738 | 
2739 |     Validates the path exists and is allowed. If the path is a directory,
2740 |     applies deletion protection heuristics (if enabled) based on the directory's
2741 |     contents before proceeding with recursive deletion.
2742 | 
2743 |     Args:
2744 |         path: Path to the file or directory to delete.
2745 | 
2746 |     Returns:
2747 |         Dictionary confirming the deletion.
2748 |     """
2749 |     start_time = time.monotonic()
2750 | 
2751 |     # Validate path exists (check_exists=True), but don't resolve symlinks yet
2752 |     # We need to know if the original path is a symlink to handle it properly
2753 |     validation_result = await validate_path(path, check_exists=True, resolve_symlinks=False)
2754 | 
2755 |     # Check if the path is a symlink
2756 |     is_symlink = await aiofiles.os.path.islink(path)
2757 |     logger.info(f"Note: Deleting the symlink itself (not its target) at path: {path}")
2758 |     validated_path = validation_result
2759 | 
2760 |     try:
2761 |         deleted_type = "unknown"
2762 | 
2763 |         # First check if it's a symlink - we want to handle this separately
2764 |         # to avoid accidentally deleting the target
2765 |         if is_symlink:
2766 |             deleted_type = "symlink"
2767 |             logger.info(f"Deleting symlink: {validated_path}", emoji_key="delete")
2768 |             await aiofiles.os.remove(validated_path)
2769 | 
2770 |         # If not a symlink, proceed with regular directory or file deletion
2771 |         else:
2772 |             # We need to check if it's a directory or file - use follow_symlinks=True because
2773 |             # we already handled the symlink case above
2774 |             stat_info = await aiofiles.os.stat(validated_path, follow_symlinks=True)
2775 |             is_dir = os.path.stat.S_ISDIR(stat_info.st_mode)
2776 |             is_file = os.path.stat.S_ISREG(stat_info.st_mode)
2777 | 
2778 |             if is_dir:
2779 |                 deleted_type = "directory"
2780 |                 logger.info(f"Attempting to delete directory: {validated_path}", emoji_key="delete")
2781 |                 # --- Deletion Protection Check ---
2782 |                 try:
2783 |                     # List all file paths within the directory for heuristic checks
2784 |                     contained_file_paths = await _list_paths_recursive(validated_path)
2785 |                     if contained_file_paths:  # Only run check if directory is not empty
2786 |                         await _check_protection_heuristics(contained_file_paths, "deletion")
2787 |                     else:
2788 |                         logger.info(
2789 |                             f"Directory '{validated_path}' is empty, skipping detailed protection check.",
2790 |                             emoji_key="info",
2791 |                         )
2792 |                 except ProtectionTriggeredError:
2793 |                     raise  # Re-raise the specific error if protection blocked the operation
2794 |                 except ToolError as list_err:
2795 |                     # If listing contents failed, block deletion for safety?
2796 |                     raise ToolError(
2797 |                         f"Could not list directory contents for safety check before deleting '{validated_path}'. Deletion aborted. Reason: {list_err}",
2798 |                         context={"path": validated_path},
2799 |                     ) from list_err
2800 |                 # --- End Protection Check ---
2801 | 
2802 |                 # Protection passed (or disabled/not triggered), proceed with recursive delete
2803 |                 await _async_rmtree(validated_path)
2804 | 
2805 |             elif is_file:
2806 |                 deleted_type = "file"
2807 |                 logger.info(f"Attempting to delete file: {validated_path}", emoji_key="delete")
2808 |                 await aiofiles.os.remove(validated_path)
2809 |             else:
2810 |                 # Should not happen if lexists passed validation, but handle defensively
2811 |                 raise ToolError(
2812 |                     f"Cannot delete path '{validated_path}': It is neither a file, directory, nor a symbolic link.",
2813 |                     context={"path": validated_path},
2814 |                 )
2815 | 
2816 |     except OSError as e:
2817 |         # Catch errors from remove, rmdir, or during rmtree
2818 |         logger.error(
2819 |             f"Error deleting path '{path}' (resolved: {validated_path}): {str(e)}",
2820 |             exc_info=True,
2821 |             emoji_key="error",
2822 |         )
2823 |         raise ToolError(
2824 |             f"Error deleting '{path}': {str(e)}", context={"path": validated_path, "errno": e.errno}
2825 |         ) from e
2826 |     except (ToolInputError, ToolError, ProtectionTriggeredError):  # Re-raise our specific errors
2827 |         raise
2828 |     except Exception as e:
2829 |         # Catch unexpected errors
2830 |         logger.error(f"Unexpected error deleting {path}: {e}", exc_info=True, emoji_key="error")
2831 |         raise ToolError(
2832 |             f"An unexpected error occurred during deletion: {str(e)}",
2833 |             context={"path": validated_path},
2834 |         ) from e
2835 | 
2836 |     processing_time = time.monotonic() - start_time
2837 |     logger.success(
2838 |         f"Successfully deleted {deleted_type}: '{path}' (resolved: {validated_path})",
2839 |         emoji_key="delete",
2840 |         time=processing_time,
2841 |     )
2842 | 
2843 |     return {
2844 |         "path": validated_path,
2845 |         "type_deleted": deleted_type,
2846 |         "success": True,
2847 |         "message": f"Successfully deleted {deleted_type} '{validated_path}'.",
2848 |     }
2849 | 
2850 | 
2851 | @with_tool_metrics
2852 | @with_error_handling
2853 | async def search_files(
2854 |     path: str,
2855 |     pattern: str,
2856 |     case_sensitive: bool = False,
2857 |     exclude_patterns: Optional[List[str]] = None,
2858 |     search_content: bool = False,
2859 |     max_content_bytes: int = 1024 * 1024,  # Limit content search size per file (1MB)
2860 | ) -> Dict[str, Any]:
2861 |     """Search for files/directories matching a pattern asynchronously and recursively.
2862 | 
2863 |     Supports filename pattern matching (case-sensitive/insensitive substring)
2864 |     and optional searching within file content (UTF-8 text only, limited size).
2865 |     Allows exclusion patterns (glob format).
2866 | 
2867 |     Args:
2868 |         path: Directory path to start the search from.
2869 |         pattern: Text pattern to find. Used for substring match in names,
2870 |                  and exact string find in content if search_content=True.
2871 |         case_sensitive: If True, matching is case-sensitive. Defaults False.
2872 |         exclude_patterns: Optional list of glob-style patterns for paths/names
2873 |                           to exclude (e.g., ["*.log", ".git/"]). Matched case-insensitively
2874 |                           on appropriate systems.
2875 |         search_content: If True, also search *inside* text files for the pattern.
2876 |                         This can be significantly slower and memory intensive.
2877 |         max_content_bytes: Max bytes to read from each file when search_content=True.
2878 | 
2879 |     Returns:
2880 |         Dictionary with search parameters and a list of matching file/directory paths.
2881 |         May include warnings about errors encountered during search.
2882 |     """
2883 |     start_time = time.monotonic()
2884 | 
2885 |     # Validate path exists and is a directory (check_exists=True)
2886 |     validated_path = await validate_path(path, check_exists=True)
2887 |     if not await aiofiles.os.path.isdir(validated_path):
2888 |         # Check links to dirs
2889 |         if await aiofiles.os.path.islink(validated_path):
2890 |             try:
2891 |                 if not await aiofiles.os.path.isdir(
2892 |                     await aiofiles.os.path.realpath(validated_path)
2893 |                 ):
2894 |                     raise ToolInputError(
2895 |                         f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a directory.",
2896 |                         param_name="path",
2897 |                         provided_value=path,
2898 |                     )
2899 |                 # proceed with validated_path
2900 |             except OSError as e:
2901 |                 raise ToolError(
2902 |                     f"Error resolving or checking link target for search: {e}",
2903 |                     context={"path": validated_path},
2904 |                 ) from e
2905 |         else:
2906 |             raise ToolInputError(
2907 |                 f"Path '{path}' (resolved to '{validated_path}') is not a directory.",
2908 |                 param_name="path",
2909 |                 provided_value=path,
2910 |             )
2911 | 
2912 |     # Validate other inputs
2913 |     if not isinstance(pattern, str) or not pattern:
2914 |         raise ToolInputError(
2915 |             "Search pattern must be a non-empty string.",
2916 |             param_name="pattern",
2917 |             provided_value=pattern,
2918 |         )
2919 |     if exclude_patterns:  # Ensure it's a list of strings if provided
2920 |         if not isinstance(exclude_patterns, list):
2921 |             raise ToolInputError(
2922 |                 "Exclude patterns must be a list of strings.",
2923 |                 param_name="exclude_patterns",
2924 |                 provided_value=exclude_patterns,
2925 |             )
2926 |         if not all(isinstance(p, str) for p in exclude_patterns):
2927 |             raise ToolInputError(
2928 |                 "All items in exclude_patterns must be strings.", param_name="exclude_patterns"
2929 |             )
2930 |     if not isinstance(case_sensitive, bool):
2931 |         raise ToolInputError(
2932 |             "case_sensitive must be a boolean.",
2933 |             param_name="case_sensitive",
2934 |             provided_value=case_sensitive,
2935 |         )
2936 |     if not isinstance(search_content, bool):
2937 |         raise ToolInputError(
2938 |             "search_content must be a boolean.",
2939 |             param_name="search_content",
2940 |             provided_value=search_content,
2941 |         )
2942 |     if not isinstance(max_content_bytes, int) or max_content_bytes < 0:
2943 |         raise ToolInputError(
2944 |             "max_content_bytes must be a non-negative integer.",
2945 |             param_name="max_content_bytes",
2946 |             provided_value=max_content_bytes,
2947 |         )
2948 | 
2949 |     search_errors: List[str] = []
2950 |     # Using a set to store matched paths avoids duplicates efficiently.
2951 |     matched_paths: Set[str] = set()
2952 | 
2953 |     # Prepare pattern based on case sensitivity
2954 |     search_pattern = pattern if case_sensitive else pattern.lower()
2955 | 
2956 |     # Error handler callback for async_walk
2957 |     MAX_REPORTED_ERRORS = 50
2958 | 
2959 |     def onerror(os_error: OSError):
2960 |         """Callback to handle and log errors during file tree walking."""
2961 |         err_msg = f"Permission or access error during search near '{getattr(os_error, 'filename', 'N/A')}': {getattr(os_error, 'strerror', str(os_error))}"
2962 |         # Limit number of reported errors to avoid flooding logs/results
2963 |         if len(search_errors) < MAX_REPORTED_ERRORS:
2964 |             logger.warning(err_msg, emoji_key="warning")
2965 |             search_errors.append(err_msg)
2966 |         elif len(search_errors) == MAX_REPORTED_ERRORS:
2967 |             suppress_msg = f"... (Further {type(os_error).__name__} errors suppressed)"
2968 |             logger.warning(suppress_msg, emoji_key="warning")
2969 |             search_errors.append(suppress_msg)
2970 | 
2971 |     # --- File Content Search Task (if enabled) ---
2972 |     async def check_file_content(filepath: str) -> bool:
2973 |         """Reads limited file content and checks for the pattern."""
2974 |         try:
2975 |             # Read limited chunk, ignore decoding errors for content search robustness
2976 |             async with aiofiles.open(filepath, mode="r", encoding="utf-8", errors="ignore") as f:
2977 |                 content_chunk = await f.read(max_content_bytes)
2978 |             # Perform search (case sensitive or insensitive)
2979 |             if case_sensitive:
2980 |                 return pattern in content_chunk
2981 |             else:
2982 |                 return search_pattern in content_chunk.lower()  # Compare lower case
2983 |         except OSError as read_err:
2984 |             # Log content read errors but don't necessarily fail the whole search
2985 |             logger.warning(
2986 |                 f"Could not read content of {filepath} for search: {read_err}", emoji_key="warning"
2987 |             )
2988 |             onerror(read_err)  # Report as a search error
2989 |         except Exception as read_unexpected_err:
2990 |             logger.error(
2991 |                 f"Unexpected error reading content of {filepath}: {read_unexpected_err}",
2992 |                 exc_info=True,
2993 |                 emoji_key="error",
2994 |             )
2995 |             # Do not report unexpected errors via onerror, log them fully.
2996 |         return False
2997 | 
2998 |     # --- End File Content Search Task ---
2999 | 
3000 |     try:
3001 |         # Use the async_walk helper for efficient traversal and exclusion handling
3002 |         # followlinks=True: Search should probably follow links to find matches within linked dirs too.
3003 |         # Exclude patterns will apply to paths within the linked directories relative to the base path.
3004 |         # Since we've fixed async_walk to use await and iterate properly, this should work as expected
3005 |         async for root, dirs, files in async_walk(
3006 |             validated_path,
3007 |             onerror=onerror,
3008 |             exclude_patterns=exclude_patterns,
3009 |             base_path=validated_path,  # Use original validated path as base for excludes
3010 |             followlinks=True,
3011 |         ):
3012 |             # Check matching directory names
3013 |             for dirname in dirs:
3014 |                 name_to_check = dirname if case_sensitive else dirname.lower()
3015 |                 if search_pattern in name_to_check:
3016 |                     match_path = os.path.join(root, dirname)
3017 |                     matched_paths.add(match_path)  # Add to set (handles duplicates)
3018 | 
3019 |             # Check matching file names OR content
3020 |             content_check_tasks = []
3021 |             files_to_check_content = []
3022 | 
3023 |             for filename in files:
3024 |                 name_to_check = filename if case_sensitive else filename.lower()
3025 |                 match_path = os.path.join(root, filename)
3026 |                 name_match = search_pattern in name_to_check
3027 | 
3028 |                 if name_match:
3029 |                     # Check if already added via content search in a previous iteration (unlikely but possible)
3030 |                     if match_path not in matched_paths:
3031 |                         matched_paths.add(match_path)  # Add name match
3032 | 
3033 |                 # If searching content AND name didn't already match, schedule content check
3034 |                 # Also check if path isn't already matched from a directory name match higher up.
3035 |                 if search_content and match_path not in matched_paths:
3036 |                     # Avoid queueing files already matched by name (implicitly handled by set)
3037 |                     files_to_check_content.append(match_path)
3038 |                     # Create task but don't await yet, gather below
3039 |                     content_check_tasks.append(check_file_content(match_path))
3040 | 
3041 |             # Run content checks concurrently for this directory level
3042 |             if content_check_tasks:
3043 |                 content_results = await asyncio.gather(*content_check_tasks, return_exceptions=True)
3044 |                 for idx, result in enumerate(content_results):
3045 |                     file_path_checked = files_to_check_content[idx]
3046 |                     if isinstance(result, Exception):
3047 |                         logger.warning(
3048 |                             f"Error during content check task for {file_path_checked}: {result}",
3049 |                             emoji_key="error",
3050 |                         )
3051 |                         # Errors during check_file_content are logged there, potentially reported via onerror too.
3052 |                     elif result is True:  # Content matched
3053 |                         matched_paths.add(file_path_checked)  # Add content match
3054 | 
3055 |     except Exception as e:
3056 |         # Catch unexpected errors during the async iteration/walk setup itself
3057 |         logger.error(
3058 |             f"Unexpected error during file search setup or walk in {path}: {e}",
3059 |             exc_info=True,
3060 |             emoji_key="error",
3061 |         )
3062 |         raise ToolError(
3063 |             f"An unexpected error occurred during search execution: {str(e)}",
3064 |             context={"path": path, "pattern": pattern},
3065 |         ) from e
3066 | 
3067 |     processing_time = time.monotonic() - start_time
3068 |     # Convert the set of unique matched paths to a sorted list for consistent output.
3069 |     unique_matches = sorted(list(matched_paths))
3070 | 
3071 |     logger.success(
3072 |         f"Search for '{pattern}' in {path} completed ({len(unique_matches)} unique matches found)",
3073 |         emoji_key="search",
3074 |         errors_encountered=len(search_errors),
3075 |         time=processing_time,
3076 |         case_sensitive=case_sensitive,
3077 |         search_content=search_content,
3078 |     )
3079 | 
3080 |     result = {
3081 |         "path": validated_path,
3082 |         "pattern": pattern,
3083 |         "case_sensitive": case_sensitive,
3084 |         "search_content": search_content,
3085 |         "matches": unique_matches,
3086 |         "success": True,
3087 |         "message": f"Found {len(unique_matches)} unique matches for '{pattern}' in '{validated_path}'.",
3088 |     }
3089 |     if search_errors:
3090 |         # Add consolidated warning if errors occurred during walk/stat.
3091 |         result["warnings"] = search_errors  # Include actual errors reported by onerror
3092 |         result["message"] += (
3093 |             f" Encountered {len(search_errors)} access or read errors during search."
3094 |         )
3095 | 
3096 |     return result
3097 | 
3098 | 
3099 | @with_tool_metrics
3100 | @with_error_handling
3101 | async def get_file_info(path: str) -> Dict[str, Any]:
3102 |     """Get detailed metadata about a specific file or directory asynchronously.
3103 | 
3104 |     Validates the path exists and is allowed. Returns information like size,
3105 |     timestamps, type, and permissions. Uses lstat to report info about the path
3106 |     itself (including if it's a symlink).
3107 | 
3108 |     Args:
3109 |         path: Path to the file or directory.
3110 | 
3111 |     Returns:
3112 |         Dictionary containing detailed file information, marked with success=True.
3113 |         If info retrieval fails after validation, raises ToolError.
3114 |     """
3115 |     start_time = time.monotonic()
3116 | 
3117 |     # Validate path (must exist, check_exists=True), but don't resolve symlinks
3118 |     # We want to get info about the path as specified by the user
3119 |     validation_result = await validate_path(path, check_exists=True, resolve_symlinks=False)
3120 | 
3121 |     # Check if this is a symlink
3122 |     is_symlink = isinstance(validation_result, dict) and validation_result.get("is_symlink")
3123 |     if is_symlink:
3124 |         validated_path = validation_result.get("symlink_path")
3125 |     else:
3126 |         validated_path = validation_result
3127 | 
3128 |     # Get file information asynchronously using the helper
3129 |     # Don't follow symlinks - we want info about the path itself
3130 |     info = await format_file_info(validated_path, follow_symlinks=False)
3131 | 
3132 |     # Check if the helper returned an error structure
3133 |     if "error" in info:
3134 |         # Propagate the error. Since path validation passed, this is likely
3135 |         # a transient issue or permission problem reading metadata. Use ToolError.
3136 |         raise ToolError(
3137 |             f"Failed to get file info for '{validated_path}': {info['error']}",
3138 |             context={"path": validated_path},
3139 |         )
3140 | 
3141 |     # Info retrieval successful
3142 |     processing_time = time.monotonic() - start_time
3143 |     logger.success(
3144 |         f"Got file info for: {path} (resolved: {validated_path})",
3145 |         emoji_key="file",
3146 |         time=processing_time,
3147 |     )
3148 | 
3149 |     # Add success flag and descriptive message to the info dictionary
3150 |     info["success"] = True
3151 |     info["message"] = f"Successfully retrieved info for '{validated_path}'."
3152 |     return info
3153 | 
3154 | 
3155 | @with_tool_metrics
3156 | @with_error_handling
3157 | async def list_allowed_directories() -> Dict[str, Any]:
3158 |     """List all directories configured as allowed for filesystem access.
3159 | 
3160 |     This is primarily an administrative/debugging tool. Reads from the loaded config.
3161 | 
3162 |     Returns:
3163 |         Dictionary containing the list of allowed base directory paths.
3164 |     """
3165 |     start_time = time.monotonic()
3166 | 
3167 |     # --- Use get_allowed_directories which reads from config ---
3168 |     try:
3169 |         allowed_dirs = get_allowed_directories()
3170 |     except Exception as e:
3171 |         # Handle rare errors during config retrieval itself
3172 |         raise ToolError(f"Failed to retrieve allowed directories configuration: {e}") from e
3173 | 
3174 |     processing_time = time.monotonic() - start_time
3175 |     logger.success(
3176 |         f"Listed {len(allowed_dirs)} allowed directories", emoji_key="config", time=processing_time
3177 |     )
3178 | 
3179 |     return {
3180 |         "directories": allowed_dirs,
3181 |         "count": len(allowed_dirs),
3182 |         "success": True,
3183 |         "message": f"Retrieved {len(allowed_dirs)} configured allowed directories.",
3184 |     }
3185 | 
```
Page 40/45FirstPrevNextLast