This is page 40 of 45. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/filesystem.py:
--------------------------------------------------------------------------------
```python
1 | """Secure asynchronous filesystem tools for Ultimate MCP Server.
2 |
3 | This module provides secure asynchronous filesystem operations, including reading, writing,
4 | deleting, and manipulating files and directories, with robust security controls to limit access
5 | and optional heuristics to prevent accidental mass deletion/modification by LLMs.
6 | """
7 |
8 | import asyncio
9 | import datetime # Using datetime for standard timestamp representation.
10 | import difflib
11 | import json
12 | import math # For isnan checks
13 | import os
14 | import statistics # For calculating standard deviation in protection heuristics
15 | import time
16 | from fnmatch import fnmatch # Keep sync fnmatch for pattern matching
17 | from typing import Any, AsyncGenerator, Dict, List, Literal, Optional, Set, Tuple, Union, cast
18 |
19 | import aiofiles
20 | import aiofiles.os
21 | from pydantic import BaseModel
22 |
23 | from ultimate_mcp_server.config import FilesystemConfig, GatewayConfig, get_config
24 | from ultimate_mcp_server.exceptions import ToolError, ToolInputError
25 | from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
26 | from ultimate_mcp_server.utils import get_logger
27 |
28 | logger = get_logger("ultimate_mcp_server.tools.filesystem")
29 |
30 |
31 | class ProtectionTriggeredError(ToolError):
32 | """Exception raised when a security protection measure is triggered."""
33 |
34 | def __init__(self, message, protection_type=None, context=None, details=None):
35 | """Initialize the protection triggered error.
36 |
37 | Args:
38 | message: Error message
39 | protection_type: Type of protection triggered (e.g., "deletion_protection", "path_protection")
40 | context: Context information about the protection trigger
41 | details: Additional error details
42 | """
43 | error_details = details or {}
44 | if protection_type:
45 | error_details["protection_type"] = protection_type
46 |
47 | self.context = context or {}
48 | if context:
49 | error_details["context"] = context
50 |
51 | super().__init__(message, error_code="PROTECTION_TRIGGERED", details=error_details)
52 |
53 |
54 | # --- Configuration and Security ---
55 |
56 |
57 | def get_filesystem_config() -> "FilesystemConfig": # Use type hint from config module
58 | """Get filesystem configuration object from the main config."""
59 | cfg: GatewayConfig = get_config()
60 | # Access the validated FilesystemConfig object directly
61 | fs_config = cfg.filesystem
62 | if not fs_config: # Should not happen with default_factory, but check defensively
63 | logger.error(
64 | "Filesystem configuration missing after load. Using defaults.", emoji_key="config"
65 | )
66 | from ultimate_mcp_server.config import (
67 | FilesystemConfig, # Local import to avoid circularity at top level
68 | )
69 |
70 | return FilesystemConfig()
71 | return fs_config
72 |
73 |
74 | def get_protection_config(operation_type: Literal["deletion", "modification"]) -> Dict[str, Any]:
75 | """Get protection settings for a specific operation type as a dictionary."""
76 | fs_config = get_filesystem_config()
77 | protection_attr_name = f"file_{operation_type}_protection"
78 |
79 | if hasattr(fs_config, protection_attr_name):
80 | protection_config_obj = getattr(fs_config, protection_attr_name)
81 | if protection_config_obj and isinstance(
82 | protection_config_obj, BaseModel
83 | ): # Check it's a Pydantic model instance
84 | # Convert the Pydantic model to a dictionary for consistent access
85 | return protection_config_obj.model_dump()
86 | else:
87 | logger.warning(
88 | f"Protection config for '{operation_type}' is not a valid model instance. Using defaults.",
89 | emoji_key="config",
90 | )
91 | else:
92 | logger.warning(
93 | f"Protection config attribute '{protection_attr_name}' not found. Using defaults.",
94 | emoji_key="config",
95 | )
96 |
97 | # Return default dictionary structure if config is missing or invalid
98 | # Fetch defaults from the Pydantic model definition if possible
99 | try:
100 | from ultimate_mcp_server.config import FilesystemProtectionConfig
101 |
102 | return FilesystemProtectionConfig().model_dump()
103 | except ImportError: # Fallback if import fails
104 | return {
105 | "enabled": False,
106 | "max_files_threshold": 100,
107 | "datetime_stddev_threshold_sec": 60 * 60 * 24 * 30,
108 | "file_type_variance_threshold": 5,
109 | "max_stat_errors_pct": 10.0,
110 | }
111 |
112 |
113 | def get_allowed_directories() -> List[str]:
114 | """Get allowed directories from configuration.
115 |
116 | Reads the configuration, normalizes paths (absolute, resolves symlinks),
117 | and returns a list of unique allowed directory paths. Assumes paths were expanded
118 | during config load.
119 |
120 | Returns:
121 | List of normalized, absolute directory paths that can be accessed.
122 | """
123 | fs_config = get_filesystem_config()
124 | # Access the already expanded list from the validated config object
125 | allowed_config: List[str] = fs_config.allowed_directories
126 |
127 | if not allowed_config:
128 | logger.warning(
129 | "No filesystem directories configured or loaded for access. All operations may be rejected.",
130 | emoji_key="security",
131 | )
132 | return []
133 |
134 | # Paths should already be expanded and absolute from config loading.
135 | # We still need to normalize and ensure uniqueness.
136 | normalized: List[str] = []
137 | for d in allowed_config:
138 | try:
139 | # Basic normalization (separator consistency)
140 | norm_d = os.path.normpath(d)
141 | if norm_d not in normalized: # Avoid duplicates
142 | normalized.append(norm_d)
143 | except Exception as e:
144 | # Log errors during normalization but continue
145 | logger.error(
146 | f"Error normalizing configured allowed directory '{d}': {e}. Skipping.",
147 | emoji_key="config",
148 | )
149 |
150 | if not normalized and allowed_config:
151 | logger.error(
152 | "Filesystem access potentially misconfigured: No valid allowed directories remain after normalization.",
153 | emoji_key="security",
154 | )
155 | elif not normalized:
156 | # Warning about no configured dirs was logged earlier if allowed_config was empty.
157 | pass
158 |
159 | # Debug log the final effective list used by tools
160 | logger.debug(
161 | f"Filesystem tools operating with {len(normalized)} normalized allowed directories",
162 | allowed_directories=normalized,
163 | )
164 | return normalized
165 |
166 |
167 | # --- Path Validation ---
168 | async def validate_path(
169 | path: str,
170 | check_exists: Optional[bool] = None,
171 | check_parent_writable: bool = False,
172 | resolve_symlinks: bool = False,
173 | ) -> str:
174 | """Validate a path for security and accessibility using async I/O.
175 |
176 | Performs several checks:
177 | 1. Ensures path is a non-empty string.
178 | 2. Normalizes the path (expands user, makes absolute, resolves '../').
179 | 3. Checks if the normalized path is within the configured allowed directories.
180 | 4. If check_exists is True, checks if the path exists. If False, checks it does NOT exist.
181 | 5. If resolve_symlinks is True, resolves symbolic links and re-validates the real path against allowed dirs.
182 | 6. If check_exists is False, checks parent directory existence.
183 | 7. If `check_parent_writable` is True and path likely needs creation, checks parent directory write permissions.
184 |
185 | Args:
186 | path: The file or directory path input string to validate.
187 | check_exists: If True, path must exist. If False, path must NOT exist. If None, existence is not checked.
188 | check_parent_writable: If True and path doesn't exist/creation is implied, check if parent dir is writable.
189 | resolve_symlinks: If True, follows symlinks and returns their target path. If False, keeps the symlink path.
190 |
191 | Returns:
192 | The normalized, absolute, validated path string.
193 |
194 | Raises:
195 | ToolInputError: If the path is invalid (format, permissions, existence violation,
196 | outside allowed dirs, symlink issue).
197 | ToolError: For underlying filesystem errors or configuration issues.
198 | """
199 | if not path or not isinstance(path, str):
200 | raise ToolInputError(
201 | "Path must be a non-empty string.",
202 | param_name="path",
203 | provided_value=repr(path), # Use repr for clarity on non-string types
204 | )
205 |
206 | # Path normalization (sync, generally fast)
207 | try:
208 | path_expanded = os.path.expanduser(path)
209 | path_abs = os.path.abspath(path_expanded)
210 | # Normalize '.','..' and separators
211 | normalized_path = os.path.normpath(path_abs)
212 | except Exception as e:
213 | raise ToolInputError(
214 | f"Invalid path format or resolution error: {str(e)}",
215 | param_name="path",
216 | provided_value=path,
217 | ) from e
218 |
219 | # --- Use get_allowed_directories which reads from config ---
220 | allowed_dirs = get_allowed_directories()
221 | if not allowed_dirs:
222 | raise ToolError(
223 | "Filesystem access is disabled: No allowed directories are configured or loadable.",
224 | context={"configured_directories": 0}, # Provide context
225 | )
226 |
227 | # Ensure normalized_path is truly *under* an allowed dir.
228 | is_allowed = False
229 | original_validated_path = normalized_path # Store before symlink resolution
230 | for allowed_dir in allowed_dirs:
231 | # Ensure allowed_dir is also normalized for comparison
232 | norm_allowed_dir = os.path.normpath(allowed_dir)
233 | # Path must be exactly the allowed dir or start with the allowed dir + separator.
234 | if normalized_path == norm_allowed_dir or normalized_path.startswith(
235 | norm_allowed_dir + os.sep
236 | ):
237 | is_allowed = True
238 | break
239 |
240 | if not is_allowed:
241 | logger.warning(
242 | f"Path '{normalized_path}' denied access. Not within allowed directories: {allowed_dirs}",
243 | emoji_key="security",
244 | )
245 | raise ToolInputError(
246 | f"Access denied: Path '{path}' resolves to '{normalized_path}', which is outside the allowed directories.",
247 | param_name="path",
248 | provided_value=path,
249 | # Add context about allowed dirs for debugging? Potentially sensitive.
250 | # context={"allowed": allowed_dirs}
251 | )
252 |
253 | # Filesystem checks using aiofiles.os
254 | current_validated_path = normalized_path # Start with normalized path
255 | is_symlink = False
256 | symlink_target_path = None
257 |
258 | try:
259 | # Use stat with follow_symlinks=False to check the item itself (similar to lstat)
260 | try:
261 | lstat_info = await aiofiles.os.stat(current_validated_path, follow_symlinks=False)
262 | path_exists_locally = True # stat succeeded, so the path entry itself exists
263 | is_symlink = os.path.stat.S_ISLNK(lstat_info.st_mode)
264 |
265 | if is_symlink:
266 | try:
267 | # Get the target for information purposes
268 | symlink_target = await aiofiles.os.readlink(current_validated_path)
269 | symlink_target_path = symlink_target
270 | logger.debug(f"Path '{path}' is a symlink pointing to '{symlink_target}'")
271 | except OSError as link_err:
272 | logger.warning(
273 | f"Error reading symlink target for '{current_validated_path}': {link_err}"
274 | )
275 |
276 | except FileNotFoundError:
277 | path_exists_locally = False
278 | is_symlink = False
279 | except OSError as e:
280 | # Handle other OS errors during stat check
281 | logger.error(
282 | f"OS Error during stat check for '{current_validated_path}': {e}", exc_info=True
283 | )
284 | raise ToolError(
285 | f"Filesystem error checking path status for '{path}': {str(e)}",
286 | context={"path": path, "resolved_path": current_validated_path},
287 | ) from e
288 |
289 | # Resolve symlink if it exists and re-validate
290 | if is_symlink and resolve_symlinks:
291 | try:
292 | # Use synchronous os.path.realpath since aiofiles.os.path doesn't have it
293 | real_path = os.path.realpath(current_validated_path)
294 | real_normalized = os.path.normpath(real_path)
295 | symlink_target_path = real_normalized # noqa: F841
296 |
297 | # Re-check if the *real* resolved path is within allowed directories
298 | is_real_allowed = False
299 | for allowed_dir in allowed_dirs:
300 | norm_allowed_dir = os.path.normpath(allowed_dir)
301 | if real_normalized == norm_allowed_dir or real_normalized.startswith(
302 | norm_allowed_dir + os.sep
303 | ):
304 | is_real_allowed = True
305 | break
306 |
307 | if not is_real_allowed:
308 | raise ToolInputError(
309 | f"Access denied: Path '{path}' is a symbolic link pointing to '{real_normalized}', which is outside allowed directories.",
310 | param_name="path",
311 | provided_value=path,
312 | )
313 |
314 | # If validation passed, use the real path for further checks *about the target*
315 | current_validated_path = real_normalized
316 | # Re-check existence *of the target* - use exists instead of lexists
317 | path_exists = await aiofiles.os.path.exists(current_validated_path)
318 |
319 | except OSError as e:
320 | # Handle errors during realpath resolution (e.g., broken link, permissions)
321 | if isinstance(e, FileNotFoundError):
322 | # Broken link - the link entry exists, but target doesn't
323 | path_exists = False
324 | if check_exists is True:
325 | raise ToolInputError(
326 | f"Required path '{path}' is a symbolic link pointing to a non-existent target ('{original_validated_path}' -> target missing).",
327 | param_name="path",
328 | provided_value=path,
329 | ) from e
330 | # If check_exists is False or None, a broken link might be acceptable depending on the operation.
331 | # Keep current_validated_path as the *link path itself* if the target doesn't exist.
332 | current_validated_path = original_validated_path
333 | else:
334 | raise ToolInputError(
335 | f"Error resolving symbolic link '{path}': {str(e)}",
336 | param_name="path",
337 | provided_value=path,
338 | ) from e
339 | except ToolInputError: # Re-raise specific input errors
340 | raise
341 | except Exception as e: # Catch other unexpected errors during link resolution
342 | raise ToolError(
343 | f"Unexpected error resolving symbolic link for '{path}': {str(e)}",
344 | context={"path": path},
345 | ) from e
346 | else:
347 | # Not a link or not resolving it, so existence check result is based on the initial check
348 | path_exists = path_exists_locally
349 |
350 | # Check existence requirement *after* potential symlink resolution
351 | if check_exists is True and not path_exists:
352 | raise ToolInputError(
353 | f"Required path '{path}' (resolved to '{current_validated_path}') does not exist.",
354 | param_name="path",
355 | provided_value=path,
356 | details={
357 | "path": path,
358 | "resolved_path": current_validated_path,
359 | "error_type": "PATH_NOT_FOUND",
360 | },
361 | )
362 | elif check_exists is False and path_exists:
363 | raise ToolInputError(
364 | f"Path '{path}' (resolved to '{current_validated_path}') already exists, but non-existence was required.",
365 | param_name="path",
366 | provided_value=path,
367 | details={
368 | "path": path,
369 | "resolved_path": current_validated_path,
370 | "error_type": "PATH_ALREADY_EXISTS",
371 | },
372 | )
373 | # else: check_exists is None, or condition met
374 |
375 | # If path doesn't exist and creation is likely (check_exists is False or None), check parent.
376 | parent_dir = os.path.dirname(current_validated_path)
377 | if (
378 | parent_dir and parent_dir != current_validated_path
379 | ): # Check parent_dir is not empty and not the root itself
380 | try:
381 | parent_exists = await aiofiles.os.path.exists(
382 | parent_dir
383 | ) # Check if parent exists first
384 | if parent_exists:
385 | if not await aiofiles.os.path.isdir(parent_dir):
386 | raise ToolInputError(
387 | f"Cannot operate on '{path}': Parent path '{parent_dir}' exists but is not a directory.",
388 | param_name="path",
389 | provided_value=path,
390 | )
391 | # Parent exists and is a directory, check writability if requested
392 | if check_parent_writable:
393 | if not os.access(parent_dir, os.W_OK | os.X_OK):
394 | raise ToolInputError(
395 | f"Cannot operate on '{path}': Parent directory '{parent_dir}' exists but is not writable or accessible.",
396 | param_name="path",
397 | provided_value=path,
398 | )
399 | # else: Parent does NOT exist.
400 | # If check_parent_writable was True, it's okay if parent doesn't exist because makedirs will create it.
401 | # If check_parent_writable was False (or not requested for this scenario), we might still want to error if parent doesn't exist depending on the operation context.
402 | # For create_directory context, non-existence of parent is fine.
403 | except OSError as e:
404 | raise ToolError(
405 | f"Filesystem error checking parent directory '{parent_dir}' for '{path}': {str(e)}",
406 | context={"path": path, "parent": parent_dir},
407 | ) from e
408 |
409 | except OSError as e:
410 | # Catch filesystem errors during async checks like exists, isdir, islink on the primary path
411 | raise ToolError(
412 | f"Filesystem error validating path '{path}': {str(e)}",
413 | context={"path": path, "resolved_path": current_validated_path, "error": str(e)},
414 | ) from e
415 | except ToolInputError: # Re-raise ToolInputErrors from validation logic
416 | raise
417 | except Exception as e:
418 | # Catch unexpected errors during validation logic
419 | logger.error(f"Unexpected error during path validation for {path}: {e}", exc_info=True)
420 | raise ToolError(
421 | f"An unexpected error occurred validating path: {str(e)}", context={"path": path}
422 | ) from e
423 |
424 | # Always return the validated path string
425 | return current_validated_path
426 |
427 |
428 | # --- Helper Functions ---
429 |
430 |
431 | async def format_file_info(file_path: str, follow_symlinks: bool = False) -> Dict[str, Any]:
432 | """Get detailed file or directory information asynchronously.
433 |
434 | Uses `aiofiles.os.stat` to retrieve metadata.
435 |
436 | Args:
437 | file_path: Path to file or directory (assumed validated).
438 | follow_symlinks: If True, follows symlinks to get info about their targets.
439 | If False, gets info about the symlink itself.
440 |
441 | Returns:
442 | Dictionary with file/directory details (name, path, size, timestamps, type, permissions).
443 | If an OS error occurs during stat, returns a dict containing 'name', 'path', and 'error'.
444 | """
445 | try:
446 | # Use stat results directly where possible to avoid redundant checks
447 | # Use stat with follow_symlinks parameter to control whether we stat the link or the target
448 | stat_info = await aiofiles.os.stat(file_path, follow_symlinks=follow_symlinks)
449 | mode = stat_info.st_mode
450 | is_dir = os.path.stat.S_ISDIR(mode)
451 | is_file = os.path.stat.S_ISREG(mode) # Check for regular file
452 | is_link = os.path.stat.S_ISLNK(mode) # Check if the item stat looked at is a link
453 |
454 | # Use timezone-aware ISO format timestamps for machine readability.
455 | # Handle potential platform differences in ctime availability (fallback to mtime).
456 | try:
457 | # Some systems might not have birthtime (st_birthtime)
458 | # ctime is platform dependent (creation on Windows, metadata change on Unix)
459 | # Use mtime as the most reliable "last modified" timestamp.
460 | # Let's report both ctime and mtime if available.
461 | ctime_ts = stat_info.st_ctime
462 | except AttributeError:
463 | ctime_ts = stat_info.st_mtime # Fallback
464 |
465 | # Ensure timestamps are valid before conversion
466 | def safe_isoformat(timestamp):
467 | try:
468 | # Handle potential negative timestamps or values outside valid range
469 | if timestamp < 0:
470 | return "Invalid Timestamp (Negative)"
471 | # Check against a reasonable range (e.g., year 1 to 9999)
472 | min_ts = datetime.datetime(1, 1, 1, tzinfo=datetime.timezone.utc).timestamp()
473 | max_ts = datetime.datetime(
474 | 9999, 12, 31, 23, 59, 59, tzinfo=datetime.timezone.utc
475 | ).timestamp()
476 | if not (min_ts <= timestamp <= max_ts):
477 | return f"Invalid Timestamp (Out of Range: {timestamp})"
478 |
479 | return datetime.datetime.fromtimestamp(
480 | timestamp, tz=datetime.timezone.utc
481 | ).isoformat()
482 | except (OSError, ValueError) as e: # Handle potential errors like invalid values
483 | logger.warning(
484 | f"Invalid timestamp {timestamp} for {file_path}: {e}", emoji_key="warning"
485 | )
486 | return f"Invalid Timestamp ({type(e).__name__})"
487 |
488 | info = {
489 | "name": os.path.basename(file_path),
490 | "path": file_path,
491 | "size": stat_info.st_size,
492 | "created_os_specific": safe_isoformat(ctime_ts), # Note platform dependency
493 | "modified": safe_isoformat(stat_info.st_mtime),
494 | "accessed": safe_isoformat(stat_info.st_atime),
495 | "is_directory": is_dir,
496 | "is_file": is_file,
497 | "is_symlink": is_link, # Indicate if the path itself is a symlink
498 | # Use S_IMODE for standard permission bits (mode & 0o777)
499 | "permissions": oct(os.path.stat.S_IMODE(mode)),
500 | }
501 |
502 | if is_link or (not follow_symlinks and await aiofiles.os.path.islink(file_path)):
503 | try:
504 | # Attempt to read the link target
505 | link_target = await aiofiles.os.readlink(file_path)
506 | info["symlink_target"] = link_target
507 | except OSError as link_err:
508 | info["symlink_target"] = f"<Error reading link: {link_err}>"
509 |
510 | return info
511 | except OSError as e:
512 | logger.warning(f"Error getting file info for {file_path}: {str(e)}", emoji_key="warning")
513 | # Return consistent error structure, let caller decide severity.
514 | return {
515 | "name": os.path.basename(file_path),
516 | "path": file_path,
517 | "error": f"Failed to get info: {str(e)}",
518 | }
519 | except Exception as e: # Catch unexpected errors
520 | logger.error(
521 | f"Unexpected error getting file info for {file_path}: {e}",
522 | exc_info=True,
523 | emoji_key="error",
524 | )
525 | return {
526 | "name": os.path.basename(file_path),
527 | "path": file_path,
528 | "error": f"An unexpected error occurred: {str(e)}",
529 | }
530 |
531 |
532 | def create_unified_diff(original_content: str, new_content: str, filepath: str) -> str:
533 | """Create a unified diff string between original and new content.
534 |
535 | Args:
536 | original_content: Original file content as a single string.
537 | new_content: New file content as a single string.
538 | filepath: Path to the file (used in the diff header).
539 |
540 | Returns:
541 | Unified diff as a multi-line string, or empty string if no differences.
542 | """
543 | # Normalize line endings for consistent diffing
544 | original_lines = original_content.splitlines()
545 | new_lines = new_content.splitlines()
546 |
547 | # Generate unified diff using difflib (synchronous)
548 | diff_lines = list(
549 | difflib.unified_diff(
550 | original_lines,
551 | new_lines,
552 | fromfile=f"{filepath} (original)", # Label for 'from' file in diff
553 | tofile=f"{filepath} (modified)", # Label for 'to' file in diff
554 | lineterm="", # Keep lines without added newlines by difflib
555 | )
556 | )
557 |
558 | # Return empty string if no changes, otherwise join lines into single string.
559 | return "\n".join(diff_lines) if diff_lines else ""
560 |
561 |
562 | async def read_file_content(filepath: str) -> str:
563 | """Read text file content using async I/O. Assumes UTF-8 encoding.
564 |
565 | Args:
566 | filepath: Path to the file to read (assumed validated).
567 |
568 | Returns:
569 | File content as string.
570 |
571 | Raises:
572 | ToolError: If reading fails or decoding fails. Includes specific context.
573 | """
574 | try:
575 | # Open asynchronously, read with strict UTF-8 decoding.
576 | async with aiofiles.open(filepath, mode="r", encoding="utf-8", errors="strict") as f:
577 | return await f.read()
578 | except UnicodeDecodeError as e:
579 | logger.warning(f"File {filepath} is not valid UTF-8: {e}", emoji_key="warning")
580 | # Provide context about the decoding error.
581 | raise ToolError(
582 | f"File is not valid UTF-8 encoded text: {filepath}. Cannot read as text. Details: {e}",
583 | context={"path": filepath, "encoding": "utf-8", "error_details": str(e)},
584 | ) from e
585 | except OSError as e:
586 | logger.error(f"OS error reading file {filepath}: {e}", emoji_key="error")
587 | raise ToolError(
588 | f"Error reading file: {str(e)}", context={"path": filepath, "errno": e.errno}
589 | ) from e
590 | except Exception as e:
591 | logger.error(
592 | f"Unexpected error reading file {filepath}: {e}", exc_info=True, emoji_key="error"
593 | )
594 | raise ToolError(
595 | f"An unexpected error occurred while reading file: {str(e)}", context={"path": filepath}
596 | ) from e
597 |
598 |
599 | async def read_binary_file_content(filepath: str) -> bytes:
600 | """Read binary file content using async I/O.
601 |
602 | Args:
603 | filepath: Path to the file to read (assumed validated).
604 |
605 | Returns:
606 | File content as bytes.
607 |
608 | Raises:
609 | ToolError: If reading fails.
610 | """
611 | try:
612 | # Open asynchronously in binary read mode ('rb').
613 | async with aiofiles.open(filepath, mode="rb") as f:
614 | return await f.read()
615 | except OSError as e:
616 | logger.error(f"OS error reading binary file {filepath}: {e}", emoji_key="error")
617 | raise ToolError(
618 | f"Error reading binary file: {str(e)}", context={"path": filepath, "errno": e.errno}
619 | ) from e
620 | except Exception as e:
621 | logger.error(
622 | f"Unexpected error reading binary file {filepath}: {e}",
623 | exc_info=True,
624 | emoji_key="error",
625 | )
626 | raise ToolError(
627 | f"An unexpected error occurred while reading binary file: {str(e)}",
628 | context={"path": filepath},
629 | ) from e
630 |
631 |
632 | async def write_file_content(filepath: str, content: Union[str, bytes]) -> None:
633 | """Write text or binary content to a file using async I/O. Creates parent dirs.
634 |
635 | Args:
636 | filepath: Path to the file to write (assumed validated, including parent writability).
637 | content: Content to write (string for text UTF-8, bytes for binary).
638 |
639 | Raises:
640 | ToolError: If writing fails.
641 | TypeError: If content is not str or bytes.
642 | """
643 | # Determine mode and encoding based on content type.
644 | if isinstance(content, str):
645 | mode = "w"
646 | encoding = "utf-8"
647 | data_to_write = content
648 | elif isinstance(content, bytes):
649 | mode = "wb"
650 | encoding = None # No encoding for binary mode
651 | data_to_write = content
652 | else:
653 | raise TypeError("Content to write must be str or bytes")
654 |
655 | try:
656 | # Ensure parent directory exists asynchronously.
657 | parent_dir = os.path.dirname(filepath)
658 | if (
659 | parent_dir and parent_dir != filepath
660 | ): # Check parent_dir is not empty and not the root itself
661 | # Use async makedirs for consistency. exist_ok=True makes it idempotent.
662 | await aiofiles.os.makedirs(parent_dir, exist_ok=True)
663 |
664 | # Open file asynchronously and write content.
665 | async with aiofiles.open(filepath, mode=mode, encoding=encoding) as f:
666 | await f.write(data_to_write)
667 | # await f.flush() # Often not necessary as context manager handles flush/close, but uncomment for critical writes if needed.
668 |
669 | except OSError as e:
670 | logger.error(f"OS error writing file {filepath}: {e}", emoji_key="error")
671 | raise ToolError(
672 | f"Error writing file: {str(e)}", context={"path": filepath, "errno": e.errno}
673 | ) from e
674 | except Exception as e:
675 | logger.error(
676 | f"Unexpected error writing file {filepath}: {e}", exc_info=True, emoji_key="error"
677 | )
678 | raise ToolError(
679 | f"An unexpected error occurred while writing file: {str(e)}", context={"path": filepath}
680 | ) from e
681 |
682 |
683 | async def apply_file_edits(
684 | filepath: str, edits: List[Dict[str, str]], dry_run: bool = False
685 | ) -> Tuple[str, str]:
686 | """Apply a series of text replacements to a file asynchronously.
687 |
688 | Reads the file (UTF-8), applies edits sequentially. If an exact match for
689 | 'oldText' isn't found, it attempts a line-by-line match ignoring leading/trailing
690 | whitespace, preserving the original indentation of the matched block.
691 | Generates a diff and optionally writes back the changes.
692 |
693 | Args:
694 | filepath: Path to the file to edit (assumed validated and is a text file).
695 | edits: List of edit operations. Each dict must have 'oldText' and 'newText' (both strings).
696 | dry_run: If True, calculate changes and diff but do not write to disk.
697 |
698 | Returns:
699 | Tuple of (diff_string, new_content_string). The diff string is empty if no changes occurred.
700 |
701 | Raises:
702 | ToolError: If reading/writing fails.
703 | ToolInputError: If edits are malformed or text specified in 'oldText' cannot be found.
704 | """
705 | # Read original content asynchronously (raises ToolError if fails)
706 | content = await read_file_content(filepath)
707 | original_content = content
708 | current_content = content # Work on a mutable copy
709 |
710 | # Apply each edit sequentially (string operations are sync)
711 | for i, edit in enumerate(edits):
712 | # Validate edit structure
713 | if not isinstance(edit, dict):
714 | raise ToolInputError(
715 | f"Edit #{i + 1} is not a dictionary.",
716 | param_name=f"edits[{i}]",
717 | provided_value=type(edit),
718 | )
719 | old_text = edit.get("oldText")
720 | new_text = edit.get("newText")
721 |
722 | if not isinstance(old_text, str):
723 | raise ToolInputError(
724 | f"Edit #{i + 1} is missing 'oldText' or it's not a string.",
725 | param_name=f"edits[{i}].oldText",
726 | provided_value=edit,
727 | )
728 | if not isinstance(new_text, str):
729 | raise ToolInputError(
730 | f"Edit #{i + 1} is missing 'newText' or it's not a string.",
731 | param_name=f"edits[{i}].newText",
732 | provided_value=edit,
733 | )
734 | # Warn about potentially ambiguous empty oldText
735 | if not old_text:
736 | logger.warning(
737 | f"Edit #{i + 1} has empty 'oldText'. Python's string.replace behavior with empty strings might be unexpected.",
738 | emoji_key="warning",
739 | )
740 |
741 | # Try exact replacement first
742 | if old_text in current_content:
743 | # Replace *all* occurrences of old_text with new_text
744 | current_content = current_content.replace(old_text, new_text)
745 | else:
746 | # Fallback: Try line-by-line matching with stripped whitespace comparison.
747 | old_lines = old_text.splitlines()
748 | # Avoid fallback if old_text was only whitespace but wasn't found exactly
749 | if not any(line.strip() for line in old_lines) and old_text:
750 | logger.warning(
751 | f"Edit #{i + 1}: 'oldText' consists only of whitespace, but was not found exactly. Skipping whitespace-insensitive fallback.",
752 | emoji_key="warning",
753 | )
754 | raise ToolInputError(
755 | f"Could not find exact whitespace text to replace in edit #{i + 1}: '{old_text[:100]}{'...' if len(old_text) > 100 else ''}'",
756 | param_name=f"edits[{i}].oldText",
757 | provided_value=edit,
758 | )
759 |
760 | if not old_lines: # If old_text was empty string and not found, error out.
761 | raise ToolInputError(
762 | f"Could not find empty 'oldText' to replace in edit #{i + 1}.",
763 | param_name=f"edits[{i}].oldText",
764 | provided_value=edit,
765 | )
766 |
767 | content_lines = current_content.splitlines()
768 | found_match = False
769 | line_idx = 0
770 | # Iterate through possible starting lines for the block match
771 | while line_idx <= len(content_lines) - len(old_lines):
772 | # Check if the slice matches ignoring leading/trailing whitespace on each line
773 | is_match = all(
774 | old_lines[j].strip() == content_lines[line_idx + j].strip()
775 | for j in range(len(old_lines))
776 | )
777 |
778 | if is_match:
779 | # Found a match based on content, now replace respecting original indentation.
780 | new_lines = new_text.splitlines()
781 |
782 | # Determine indentation from the *first original line* being replaced.
783 | first_original_line = content_lines[line_idx]
784 | leading_whitespace = first_original_line[
785 | : len(first_original_line) - len(first_original_line.lstrip())
786 | ]
787 |
788 | # Apply this leading whitespace to all *new* lines.
789 | indented_new_lines = (
790 | [leading_whitespace + line for line in new_lines] if new_lines else []
791 | )
792 |
793 | # Replace the slice in the original lines list
794 | content_lines[line_idx : line_idx + len(old_lines)] = indented_new_lines
795 | # Reconstruct content string immediately after modification
796 | current_content = "\n".join(content_lines)
797 | found_match = True
798 | # Stop searching for matches for *this specific edit dict* once one is found and replaced.
799 | # To replace all occurrences, logic would need modification (e.g., restart search or adjust indices).
800 | break
801 |
802 | else:
803 | line_idx += 1 # Move to the next line to check
804 |
805 | if not found_match:
806 | # No match found even with whitespace trimming fallback
807 | raise ToolInputError(
808 | f"Could not find text to replace in edit #{i + 1}. Text searched (approx first 100 chars): '{old_text[:100]}{'...' if len(old_text) > 100 else ''}'. "
809 | f"Exact match failed, and whitespace-insensitive line match also failed.",
810 | param_name=f"edits[{i}].oldText",
811 | provided_value=edit,
812 | )
813 |
814 | # Create diff (sync function call) using original vs final content
815 | diff = create_unified_diff(original_content, current_content, filepath)
816 |
817 | # Write the changes if not a dry run asynchronously
818 | if not dry_run:
819 | # Ensure content is string before writing
820 | await write_file_content(filepath, str(current_content)) # Handles errors internally
821 |
822 | return diff, str(current_content)
823 |
824 |
825 | # --- MCP Formatting Helpers ---
826 |
827 |
828 | def format_mcp_content(text_content: str) -> List[Dict[str, Any]]:
829 | """Format text content into a simple MCP text block.
830 |
831 | Applies length truncation to avoid oversized responses.
832 |
833 | Args:
834 | text_content: Text to format.
835 |
836 | Returns:
837 | List containing a single MCP text block dictionary.
838 | """
839 | # Max length for content block to avoid overwhelming downstream systems.
840 | MAX_LEN = 100000 # Adjust based on LLM/platform constraints.
841 | if len(text_content) > MAX_LEN:
842 | # Note: Truncation happens here, not a placeholder.
843 | trunc_msg = f"\n... [Content truncated - {len(text_content)} bytes total]"
844 | text_content = text_content[: MAX_LEN - len(trunc_msg)] + trunc_msg
845 | logger.warning(f"Content length > {MAX_LEN}, truncated.", emoji_key="warning")
846 |
847 | # Standard MCP text block structure.
848 | return [{"type": "text", "text": text_content}]
849 |
850 |
851 | def create_tool_response(content: Any, is_error: bool = False) -> Dict[str, Any]:
852 | """Create a standard tool response dictionary for the Ultimate MCP Server.
853 |
854 | Formats various content types (str, dict, list) into MCP 'content' blocks,
855 | attempting to provide useful representations for common tool outputs.
856 |
857 | Args:
858 | content: The primary result or message from the tool. Can be str, dict,
859 | list (assumed pre-formatted MCP), or other types (converted to str).
860 | is_error: If True, marks the response as an error response using "isError".
861 |
862 | Returns:
863 | A dictionary structured for the Ultimate MCP Server tool response schema.
864 | """
865 | formatted_content: List[Dict[str, Any]]
866 | response: Dict[str, Any] = {} # Initialize response dict
867 |
868 | if is_error:
869 | # --- Error Handling Logic ---
870 | response["success"] = False # Explicitly set success to False for errors
871 | response["isError"] = True # Mark as an error response
872 |
873 | if isinstance(content, dict) and "message" in content:
874 | error_message = content.get("message", "Unknown error")
875 | error_code = content.get("error_code", "TOOL_ERROR")
876 | error_type = content.get("error_type", "ToolError")
877 |
878 | response["error"] = error_message
879 | response["error_code"] = error_code
880 | response["error_type"] = error_type
881 |
882 | # Add details if available
883 | if "details" in content:
884 | response["details"] = content["details"]
885 |
886 | # Format the content text for error display, including context if available
887 | context = content.get("context") # Context might be added by the calling function
888 | if context:
889 | try:
890 | # Try to pretty-print context if it's dict/list
891 | if isinstance(context, (dict, list)):
892 | context_str = json.dumps(context, indent=2, default=str)
893 | else:
894 | context_str = str(context)
895 | error_display_text = f"Error: {error_message}\nContext: {context_str}"
896 | except Exception: # Fallback if context serialization fails
897 | error_display_text = f"Error: {error_message}\nContext: (Could not serialize context: {type(context).__name__})"
898 | else:
899 | error_display_text = f"Error: {error_message}"
900 |
901 | formatted_content = format_mcp_content(error_display_text)
902 |
903 | else: # Handle cases where error content is not a detailed dict
904 | error_message = str(content) # Use string representation of the error content
905 | formatted_content = format_mcp_content(f"Error: {error_message}")
906 | response["error"] = error_message
907 | # Provide default error codes/types if not available
908 | response["error_code"] = "UNKNOWN_ERROR"
909 | response["error_type"] = "UnknownError"
910 |
911 | # Add protectionTriggered flag if applicable (check original error dict if passed)
912 | if isinstance(content, dict) and content.get("protection_triggered"):
913 | response["protectionTriggered"] = True
914 |
915 | response["content"] = formatted_content
916 |
917 | else:
918 | # --- Success Case ---
919 | response["success"] = True # <<< THIS WAS THE MISSING LINE
920 |
921 | # --- Existing success formatting logic ---
922 | try:
923 | if isinstance(content, dict):
924 | # Handle specific known dictionary structures first
925 | if (
926 | "files" in content
927 | and isinstance(content.get("files"), list)
928 | and "succeeded" in content
929 | ):
930 | # Format output from read_multiple_files
931 | blocks = []
932 | summary = f"Read {content.get('succeeded', 0)} files successfully, {content.get('failed', 0)} failed."
933 | blocks.append({"type": "text", "text": summary})
934 | for file_result in content.get("files", []):
935 | if isinstance(file_result, dict):
936 | path = file_result.get("path", "Unknown path")
937 | if file_result.get("success") and "content" in file_result:
938 | size_info = (
939 | f" ({file_result.get('size', 'N/A')} bytes)"
940 | if "size" in file_result
941 | else ""
942 | )
943 | binary_info = (
944 | " (Binary Preview)" if file_result.get("is_binary") else ""
945 | )
946 | header = f"--- File: {path}{size_info}{binary_info} ---"
947 | file_content_str = str(file_result["content"])
948 | blocks.extend(format_mcp_content(f"{header}\n{file_content_str}"))
949 | elif "error" in file_result:
950 | header = f"--- File: {path} (Error) ---"
951 | error_msg = str(file_result["error"])
952 | blocks.extend(format_mcp_content(f"{header}\n{error_msg}"))
953 | else:
954 | blocks.extend(
955 | format_mcp_content(
956 | f"--- File: {path} (Unknown status) ---\n{str(file_result)}"
957 | )
958 | )
959 | else:
960 | blocks.extend(
961 | format_mcp_content(
962 | f"--- Invalid entry in results ---\n{str(file_result)}"
963 | )
964 | )
965 | formatted_content = blocks
966 | elif "tree" in content and "path" in content:
967 | # Format output from directory_tree as JSON block
968 | tree_json = json.dumps(
969 | content["tree"], indent=2, ensure_ascii=False, default=str
970 | )
971 | MAX_JSON_LEN = 50000
972 | if len(tree_json) > MAX_JSON_LEN:
973 | trunc_msg = "\n... [Tree JSON truncated]"
974 | tree_json = tree_json[: MAX_JSON_LEN - len(trunc_msg)] + trunc_msg
975 | formatted_content = format_mcp_content(
976 | f"Directory Tree for: {content['path']}\n```json\n{tree_json}\n```"
977 | )
978 | elif "entries" in content and "path" in content:
979 | # Format output from list_directory
980 | list_strs = [f"Directory Listing for: {content['path']}"]
981 | for entry in content.get("entries", []):
982 | if isinstance(entry, dict):
983 | name = entry.get("name", "?")
984 | etype = entry.get("type", "unknown")
985 | size_str = (
986 | f" ({entry.get('size')} bytes)"
987 | if etype == "file" and "size" in entry
988 | else ""
989 | )
990 | error_str = (
991 | f" (Error: {entry.get('error')})" if "error" in entry else ""
992 | )
993 | link_str = (
994 | f" -> {entry.get('symlink_target')}"
995 | if etype == "symlink" and "symlink_target" in entry
996 | else ""
997 | )
998 | list_strs.append(f"- {name} [{etype}]{size_str}{error_str}{link_str}")
999 | else:
1000 | list_strs.append(f"- Invalid entry: {str(entry)}")
1001 | formatted_content = format_mcp_content("\n".join(list_strs))
1002 | elif "matches" in content and "path" in content and "pattern" in content:
1003 | # Format output from search_files
1004 | search_strs = [
1005 | f"Search Results for '{content['pattern']}' in '{content['path']}':"
1006 | ]
1007 | matches = content.get("matches", [])
1008 | if matches:
1009 | for match in matches:
1010 | search_strs.append(f"- {match}")
1011 | else:
1012 | search_strs.append("(No matches found)")
1013 | if "warnings" in content:
1014 | search_strs.append("\nWarnings:")
1015 | search_strs.extend(content["warnings"])
1016 | formatted_content = format_mcp_content("\n".join(search_strs))
1017 | else:
1018 | # Attempt to pretty-print other dictionaries as JSON
1019 | json_content = json.dumps(content, indent=2, ensure_ascii=False, default=str)
1020 | formatted_content = format_mcp_content(f"```json\n{json_content}\n```")
1021 |
1022 | elif isinstance(content, str):
1023 | # Simple string content
1024 | formatted_content = format_mcp_content(content)
1025 | elif isinstance(content, list) and all(
1026 | isinstance(item, dict) and "type" in item for item in content
1027 | ):
1028 | # Assume it's already MCP formatted - pass through directly
1029 | formatted_content = content
1030 | else:
1031 | # Convert anything else to string and format
1032 | formatted_content = format_mcp_content(str(content))
1033 |
1034 | # Ensure formatted_content is valid before adding to response
1035 | if isinstance(formatted_content, list):
1036 | response["content"] = formatted_content
1037 | else:
1038 | # Fallback if formatting somehow failed during success path
1039 | fallback_text = f"Successfully processed, but failed to format response content: {str(formatted_content)}"
1040 | logger.error(fallback_text) # Log this internal error
1041 | response["content"] = format_mcp_content(fallback_text)
1042 | response["warning"] = "Response content formatting failed." # Add a warning field
1043 |
1044 | except (TypeError, ValueError) as json_err: # Catch JSON serialization errors specifically
1045 | logger.warning(
1046 | f"Could not serialize successful dictionary response to JSON: {json_err}",
1047 | exc_info=True,
1048 | emoji_key="warning",
1049 | )
1050 | formatted_content = format_mcp_content(
1051 | f"(Response dictionary could not be formatted as JSON)\n{str(content)}"
1052 | )
1053 | response["content"] = formatted_content
1054 | response["warning"] = "Could not format successful response as JSON."
1055 | except Exception as format_err: # Catch any other formatting errors
1056 | logger.error(
1057 | f"Unexpected error formatting successful response content: {format_err}",
1058 | exc_info=True,
1059 | )
1060 | response["content"] = format_mcp_content(f"Error formatting response: {format_err}")
1061 | response["warning"] = "Unexpected error formatting response content."
1062 |
1063 | return response
1064 |
1065 |
1066 | # --- Protection Heuristics Implementation ---
1067 |
1068 |
1069 | async def _get_minimal_stat(path: str) -> Optional[Tuple[Tuple[float, float], str]]:
1070 | """Helper to get minimal stat info (ctime, mtime, extension) for protection checks."""
1071 | try:
1072 | # Use stat with follow_symlinks=False to get info without following links, as we care about the items being listed
1073 | stat_info = await aiofiles.os.stat(path, follow_symlinks=False)
1074 | # Use mtime and ctime (platform-dependent creation/metadata change time)
1075 | mtime = stat_info.st_mtime
1076 | try:
1077 | ctime = stat_info.st_ctime
1078 | except AttributeError:
1079 | ctime = mtime # Fallback if ctime not available
1080 |
1081 | extension = (
1082 | os.path.splitext(path)[1].lower()
1083 | if not os.path.stat.S_ISDIR(stat_info.st_mode)
1084 | else ".<dir>"
1085 | )
1086 | return ((ctime, mtime), extension)
1087 | except OSError as e:
1088 | logger.debug(f"Stat failed for protection check on {path}: {e}", emoji_key="warning")
1089 | return None # Return None on OS error
1090 |
1091 |
1092 | async def _check_protection_heuristics(
1093 | paths: List[str], operation_type: Literal["deletion", "modification"]
1094 | ) -> None:
1095 | """
1096 | Check if a bulk operation on multiple files triggers safety protection heuristics.
1097 | (Modified to use get_protection_config which reads from validated config)
1098 | """
1099 | protection_config = get_protection_config()
1100 |
1101 | if not protection_config.get("enabled", False):
1102 | return # Protection disabled for this operation type
1103 |
1104 | num_paths = len(paths)
1105 | # --- Read thresholds from the loaded config dictionary ---
1106 | max_files_threshold = protection_config.get("max_files_threshold", 100)
1107 |
1108 | # Only run detailed checks if the number of files exceeds the threshold
1109 | if num_paths <= max_files_threshold:
1110 | return
1111 |
1112 | logger.info(
1113 | f"Performing detailed safety check for {operation_type} of {num_paths} paths (threshold: {max_files_threshold})...",
1114 | emoji_key="security",
1115 | )
1116 |
1117 | # --- Gather Metadata Asynchronously ---
1118 | stat_tasks = [_get_minimal_stat(p) for p in paths]
1119 | stat_results = await asyncio.gather(*stat_tasks)
1120 |
1121 | successful_stats: List[Tuple[Tuple[float, float], str]] = []
1122 | failed_stat_count = 0
1123 | for result in stat_results:
1124 | if result is not None:
1125 | successful_stats.append(result)
1126 | else:
1127 | failed_stat_count += 1
1128 |
1129 | total_attempted = len(paths)
1130 | # --- Read threshold from config dict ---
1131 | max_errors_pct = protection_config.get("max_stat_errors_pct", 10.0)
1132 | if total_attempted > 0 and (failed_stat_count / total_attempted * 100) > max_errors_pct:
1133 | raise ProtectionTriggeredError(
1134 | f"Operation blocked because safety check could not reliably gather file metadata ({failed_stat_count}/{total_attempted} failures).",
1135 | context={"failed_stats": failed_stat_count, "total_paths": total_attempted},
1136 | )
1137 |
1138 | num_valid_stats = len(successful_stats)
1139 | if num_valid_stats < 2:
1140 | logger.info(
1141 | "Protection check skipped: Not enough valid file metadata points obtained.",
1142 | emoji_key="info",
1143 | )
1144 | return
1145 |
1146 | # --- Calculate Heuristics ---
1147 | creation_times = [ts[0] for ts, ext in successful_stats]
1148 | modification_times = [ts[1] for ts, ext in successful_stats]
1149 | extensions = {ext for ts, ext in successful_stats if ext and ext != ".<dir>"}
1150 |
1151 | try:
1152 | ctime_stddev = statistics.pstdev(creation_times) if num_valid_stats > 1 else 0.0
1153 | mtime_stddev = statistics.pstdev(modification_times) if num_valid_stats > 1 else 0.0
1154 | if math.isnan(ctime_stddev):
1155 | ctime_stddev = 0.0
1156 | if math.isnan(mtime_stddev):
1157 | mtime_stddev = 0.0
1158 | except statistics.StatisticsError as e:
1159 | logger.warning(
1160 | f"Could not calculate timestamp standard deviation: {e}", emoji_key="warning"
1161 | )
1162 | ctime_stddev = 0.0
1163 | mtime_stddev = 0.0
1164 |
1165 | num_unique_extensions = len(extensions)
1166 |
1167 | # --- Read thresholds from config dict ---
1168 | dt_threshold = protection_config.get("datetime_stddev_threshold_sec", 60 * 60 * 24 * 30)
1169 | type_threshold = protection_config.get("file_type_variance_threshold", 5)
1170 |
1171 | triggered = False
1172 | reasons = []
1173 |
1174 | if ctime_stddev > dt_threshold:
1175 | triggered = True
1176 | reasons.append(
1177 | f"High variance in creation times (std dev: {ctime_stddev:.2f}s > threshold: {dt_threshold}s)"
1178 | )
1179 | if mtime_stddev > dt_threshold:
1180 | triggered = True
1181 | reasons.append(
1182 | f"High variance in modification times (std dev: {mtime_stddev:.2f}s > threshold: {dt_threshold}s)"
1183 | )
1184 | if num_unique_extensions > type_threshold:
1185 | triggered = True
1186 | reasons.append(
1187 | f"High variance in file types ({num_unique_extensions} unique types > threshold: {type_threshold})"
1188 | )
1189 |
1190 | if triggered:
1191 | reason_str = (
1192 | f"Operation involves a large number of files ({num_paths}) with suspicious characteristics: "
1193 | + "; ".join(reasons)
1194 | )
1195 | logger.warning(f"Protection Triggered! Reason: {reason_str}", emoji_key="security")
1196 | raise ProtectionTriggeredError(
1197 | reason_str,
1198 | protection_type=f"{operation_type}_protection", # Add protection type
1199 | context={ # Use context for structured data
1200 | "num_paths": num_paths,
1201 | "num_valid_stats": num_valid_stats,
1202 | "ctime_stddev_sec": round(ctime_stddev, 2),
1203 | "mtime_stddev_sec": round(mtime_stddev, 2),
1204 | "unique_file_types": num_unique_extensions,
1205 | "threshold_max_files": max_files_threshold,
1206 | "threshold_datetime_stddev_sec": dt_threshold,
1207 | "threshold_file_type_variance": type_threshold,
1208 | "failed_stat_count": failed_stat_count,
1209 | },
1210 | )
1211 | else:
1212 | logger.info(
1213 | f"Safety check passed for {operation_type} of {num_paths} paths.",
1214 | emoji_key="security",
1215 | ctime_stddev=round(ctime_stddev, 2),
1216 | mtime_stddev=round(mtime_stddev, 2),
1217 | unique_types=num_unique_extensions,
1218 | )
1219 |
1220 |
1221 | # --- Async Walk and Delete Helpers ---
1222 |
1223 |
1224 | async def async_walk(
1225 | top: str,
1226 | topdown: bool = True,
1227 | onerror: Optional[callable] = None,
1228 | followlinks: bool = False,
1229 | exclude_patterns: Optional[List[str]] = None,
1230 | base_path: Optional[str] = None,
1231 | ) -> AsyncGenerator[Tuple[str, List[str], List[str]], None]:
1232 | """Async version of os.walk using aiofiles.os.scandir. Handles excludes.
1233 |
1234 | Yields (current_dir_path, dir_names, file_names) tuples asynchronously.
1235 | Filters entries based on exclude_patterns matched against relative path from base_path.
1236 | Handles errors during scanning via the onerror callback.
1237 |
1238 | Args:
1239 | top: Root directory path to start walking.
1240 | topdown: Whether to yield the current directory before (True) or after (False) its subdirectories.
1241 | onerror: Optional callable that takes an OSError instance when errors occur during scandir or stat.
1242 | followlinks: If True, recurse into directories pointed to by symlinks.
1243 | exclude_patterns: List of glob-style patterns to exclude files/directories.
1244 | base_path: The original starting path of the walk (used for relative path calculation).
1245 |
1246 | Yields:
1247 | Tuples of (directory_path, list_of_subdir_names, list_of_file_names)
1248 | """
1249 | if base_path is None:
1250 | base_path = top # Keep track of the original root for exclusion matching
1251 |
1252 | dirs: List[str] = []
1253 | nondirs: List[str] = []
1254 | walk_error: Optional[OSError] = None
1255 |
1256 | try:
1257 | # Use async scandir for efficient directory iteration
1258 | # scandir returns a coroutine that must be awaited to get the entries
1259 | scandir_it = await aiofiles.os.scandir(top)
1260 | # Now iterate through the scandir entries (which should be a list or iterable)
1261 | for entry in scandir_it:
1262 | try:
1263 | # Calculate relative path for exclusion check
1264 | try:
1265 | # entry.path should be absolute if 'top' is absolute
1266 | rel_path = os.path.relpath(entry.path, base_path)
1267 | except ValueError:
1268 | # Fallback if paths are on different drives (Windows) or other issues
1269 | rel_path = entry.name # Use name as fallback for matching
1270 | logger.debug(
1271 | f"Could not get relative path for {entry.path} from {base_path}, using name '{entry.name}' for exclusion check."
1272 | )
1273 |
1274 | # Check exclusion patterns (case-insensitive matching where appropriate)
1275 | is_excluded = False
1276 | if exclude_patterns:
1277 | norm_rel_path = os.path.normcase(rel_path)
1278 | norm_entry_name = os.path.normcase(entry.name)
1279 | for pattern in exclude_patterns:
1280 | norm_pattern = os.path.normcase(pattern)
1281 | # Match pattern against full relative path OR just the name
1282 | if fnmatch(norm_rel_path, norm_pattern) or fnmatch(
1283 | norm_entry_name, norm_pattern
1284 | ):
1285 | is_excluded = True
1286 | logger.debug(f"Excluding '{entry.path}' due to pattern '{pattern}'")
1287 | break # Stop checking patterns for this entry
1288 | if is_excluded:
1289 | continue # Skip this excluded entry
1290 |
1291 | # Check entry type, handling potential errors using lstat to check link itself
1292 | try:
1293 | # Determine if it's a directory (respecting followlinks for recursion decision later)
1294 | is_entry_dir = entry.is_dir(follow_symlinks=followlinks)
1295 | is_entry_link = entry.is_symlink() # Check if entry *itself* is a link
1296 |
1297 | if is_entry_dir and (not is_entry_link or followlinks):
1298 | # It's a directory, or it's a link to a directory and we follow links
1299 | dirs.append(entry.name)
1300 | else:
1301 | # It's a file, a link we don't follow, or something else
1302 | nondirs.append(entry.name)
1303 |
1304 | except OSError as stat_err:
1305 | # Error determining type (e.g., permissions)
1306 | if onerror is not None:
1307 | onerror(stat_err)
1308 | logger.warning(
1309 | f"Skipping entry '{entry.name}' in {top} due to stat error: {stat_err}",
1310 | emoji_key="warning",
1311 | )
1312 | continue # Skip entry if type cannot be determined
1313 |
1314 | except OSError as entry_proc_err:
1315 | # Error during exclusion check or other processing of the entry itself
1316 | if onerror is not None:
1317 | onerror(entry_proc_err)
1318 | logger.warning(
1319 | f"Error processing entry '{entry.name}' in {top}: {entry_proc_err}",
1320 | emoji_key="warning",
1321 | )
1322 | # Continue processing other entries in the directory
1323 |
1324 | except OSError as err:
1325 | # Error during the initial scandir call itself (e.g., permissions on 'top')
1326 | walk_error = err
1327 | if onerror is not None:
1328 | onerror(err)
1329 | # Stop iteration for this path if scandir failed
1330 |
1331 | # --- Yield results and recurse ---
1332 | if walk_error is None:
1333 | if topdown:
1334 | yield top, dirs, nondirs
1335 |
1336 | # Recurse into subdirectories discovered
1337 | for name in dirs:
1338 | new_path = os.path.join(top, name)
1339 | # Recurse using 'async for' to delegate iteration properly down the recursion
1340 | async for x in async_walk(
1341 | new_path, topdown, onerror, followlinks, exclude_patterns, base_path
1342 | ):
1343 | yield x
1344 |
1345 | if not topdown:
1346 | yield top, dirs, nondirs
1347 |
1348 |
1349 | async def _list_paths_recursive(root_path: str) -> List[str]:
1350 | """Recursively list all file paths within a directory using async_walk."""
1351 | paths = []
1352 | try:
1353 | async for dirpath, _dirnames, filenames in async_walk(
1354 | root_path, topdown=True, followlinks=False
1355 | ):
1356 | for filename in filenames:
1357 | paths.append(os.path.join(dirpath, filename))
1358 | # Also include directory paths themselves if needed for certain checks?
1359 | # For deletion check, primarily care about files within.
1360 | # Let's stick to files for heuristic calculation simplicity.
1361 | # for dirname in dirnames:
1362 | # paths.append(os.path.join(dirpath, dirname))
1363 | except Exception as e:
1364 | logger.error(
1365 | f"Error listing paths recursively under {root_path}: {e}",
1366 | exc_info=True,
1367 | emoji_key="error",
1368 | )
1369 | # Re-raise as ToolError so the caller knows listing failed
1370 | raise ToolError(
1371 | f"Failed to list contents of directory '{root_path}' for safety check: {e}"
1372 | ) from e
1373 | return paths
1374 |
1375 |
1376 | async def _async_rmtree(path: str):
1377 | """Asynchronously remove a directory and its contents, similar to shutil.rmtree."""
1378 | logger.debug(f"Initiating async rmtree for: {path}", emoji_key="action")
1379 | errors: List[Tuple[str, OSError]] = []
1380 |
1381 | def onerror(os_error: OSError):
1382 | logger.warning(
1383 | f"Error during rmtree operation on {getattr(os_error, 'filename', 'N/A')}: {os_error}",
1384 | emoji_key="warning",
1385 | )
1386 | errors.append((getattr(os_error, "filename", "N/A"), os_error))
1387 |
1388 | try:
1389 | # Walk bottom-up to remove files first, then directories
1390 | async for root, dirs, files in async_walk(
1391 | path, topdown=False, onerror=onerror, followlinks=False
1392 | ):
1393 | # Remove files in the current directory
1394 | for name in files:
1395 | filepath = os.path.join(root, name)
1396 | try:
1397 | logger.debug(f"Removing file: {filepath}", emoji_key="delete")
1398 | await aiofiles.os.remove(filepath)
1399 | except OSError as e:
1400 | onerror(e) # Log error and collect it
1401 |
1402 | # Remove empty subdirectories (should be empty now if walk is bottom-up)
1403 | for name in dirs:
1404 | dirpath = os.path.join(root, name)
1405 | # We only remove dirs listed in the walk *if* they still exist
1406 | # (e.g., link handling might affect this). Re-check existence and type.
1407 | try:
1408 | if await aiofiles.os.path.islink(dirpath):
1409 | # Remove symlink itself if not following links
1410 | logger.debug(
1411 | f"Removing symlink (treated as file): {dirpath}", emoji_key="delete"
1412 | )
1413 | await aiofiles.os.remove(dirpath)
1414 | elif await aiofiles.os.path.isdir(dirpath):
1415 | logger.debug(f"Removing directory: {dirpath}", emoji_key="delete")
1416 | await aiofiles.os.rmdir(dirpath)
1417 | except OSError as e:
1418 | onerror(e) # Log error and collect it
1419 |
1420 | # Finally, remove the top-level directory itself
1421 | try:
1422 | logger.debug(f"Removing root directory: {path}", emoji_key="delete")
1423 | await aiofiles.os.rmdir(path)
1424 | except OSError as e:
1425 | onerror(e)
1426 |
1427 | if errors:
1428 | # Raise a consolidated error if any deletions failed
1429 | error_summary = "; ".join([f"{p}: {e.strerror}" for p, e in errors[:5]])
1430 | if len(errors) > 5:
1431 | error_summary += " ... (more errors)"
1432 | raise ToolError(
1433 | f"Errors occurred during recursive deletion of '{path}': {error_summary}",
1434 | context={"path": path, "num_errors": len(errors)},
1435 | )
1436 |
1437 | except Exception as e:
1438 | logger.error(
1439 | f"Unexpected error during async rmtree of {path}: {e}", exc_info=True, emoji_key="error"
1440 | )
1441 | raise ToolError(
1442 | f"An unexpected error occurred during recursive deletion: {str(e)}",
1443 | context={"path": path},
1444 | ) from e
1445 |
1446 |
1447 | # --- Tool Functions ---
1448 |
1449 |
1450 | @with_tool_metrics
1451 | @with_error_handling
1452 | async def read_file(path: str) -> Dict[str, Any]:
1453 | """Read a file's content asynchronously, handling text/binary detection.
1454 |
1455 | Validates the path, checks it's a file, attempts to read as UTF-8 text.
1456 | If UTF-8 decoding fails, it reads as binary and provides a hex preview.
1457 |
1458 | Args:
1459 | path: Path to the file to read.
1460 |
1461 | Returns:
1462 | A dictionary formatted as an MCP tool response containing file content
1463 | or an error message.
1464 | """
1465 | start_time = time.monotonic()
1466 | response_content: Any
1467 | is_response_error = False
1468 |
1469 | try:
1470 | # Validate path, ensuring it exists and is accessible. check_exists=True
1471 | validated_path = await validate_path(path, check_exists=True, check_parent_writable=False)
1472 |
1473 | # Ensure it's a file, not a directory or other type.
1474 | if not await aiofiles.os.path.isfile(validated_path):
1475 | # Check if it's a link first before declaring it's not a regular file
1476 | if await aiofiles.os.path.islink(validated_path):
1477 | # If it's a link, let read proceed, it might link to a file.
1478 | # The isfile check follows links, so if isfile failed, the target isn't a file.
1479 | raise ToolInputError(
1480 | f"Path '{path}' is a symbolic link that points to something that is not a regular file.",
1481 | param_name="path",
1482 | provided_value=path,
1483 | details={
1484 | "path": path,
1485 | "resolved_path": validated_path,
1486 | "error_type": "INVALID_SYMLINK_TARGET",
1487 | },
1488 | )
1489 | elif await aiofiles.os.path.isdir(validated_path):
1490 | raise ToolInputError(
1491 | f"Path '{path}' is a directory, not a file. Use list_directory or directory_tree to view its contents.",
1492 | param_name="path",
1493 | provided_value=path,
1494 | details={
1495 | "path": path,
1496 | "resolved_path": validated_path,
1497 | "error_type": "PATH_IS_DIRECTORY",
1498 | },
1499 | )
1500 | else:
1501 | raise ToolInputError(
1502 | f"Path '{path}' exists but is not a regular file. It may be a special file type (socket, device, etc.).",
1503 | param_name="path",
1504 | provided_value=path,
1505 | details={
1506 | "path": path,
1507 | "resolved_path": validated_path,
1508 | "error_type": "PATH_NOT_REGULAR_FILE",
1509 | },
1510 | )
1511 |
1512 | content: Union[str, bytes]
1513 | is_binary = False
1514 | read_error = None
1515 | file_size = -1 # Default size if stat fails later
1516 |
1517 | # Attempt to read as text first
1518 | try:
1519 | content = await read_file_content(validated_path) # Handles UTF-8 check internally
1520 | except ToolError as text_err:
1521 | # Check if the error was specifically UnicodeDecodeError
1522 | if "not valid UTF-8" in str(text_err):
1523 | logger.warning(
1524 | f"File {path} is not UTF-8 encoded, reading as binary.", emoji_key="warning"
1525 | )
1526 | is_binary = True
1527 | try:
1528 | # Fallback to binary read
1529 | content = await read_binary_file_content(
1530 | validated_path
1531 | ) # Keep raw bytes for now
1532 | except ToolError as bin_err:
1533 | read_error = (
1534 | f"Error reading file as binary after text decode failed: {str(bin_err)}"
1535 | )
1536 | except Exception as bin_e:
1537 | read_error = f"Unexpected error reading file as binary: {str(bin_e)}"
1538 | else:
1539 | # Other OS read error during text read attempt
1540 | read_error = f"Error reading file: {str(text_err)}"
1541 | except Exception as text_e:
1542 | read_error = f"Unexpected error reading file as text: {str(text_e)}"
1543 |
1544 | if read_error:
1545 | # If reading failed entirely (text and binary fallback)
1546 | raise ToolError(read_error, context={"path": validated_path})
1547 |
1548 | # Successfully read content (either text or binary bytes)
1549 | try:
1550 | # Use stat with follow_symlinks=False for consistency
1551 | file_size = (await aiofiles.os.stat(validated_path, follow_symlinks=False)).st_size
1552 | except OSError as stat_err:
1553 | logger.warning(
1554 | f"Could not get size for file {validated_path} after reading: {stat_err}",
1555 | emoji_key="warning",
1556 | )
1557 | # Continue without size info
1558 |
1559 | # Prepare response content string
1560 | basename = os.path.basename(validated_path)
1561 | size_str = f"{file_size} bytes" if file_size >= 0 else "Size unavailable"
1562 |
1563 | if is_binary:
1564 | # Provide a safe representation for binary data
1565 | binary_data = cast(bytes, content)
1566 | hex_preview_len = 200 # Bytes to show as hex
1567 | hex_preview = binary_data[:hex_preview_len].hex(" ") # Use spaces for readability
1568 | preview_msg = f"(showing first {min(hex_preview_len, len(binary_data))} bytes as hex)"
1569 | ellipsis = "..." if len(binary_data) > hex_preview_len else ""
1570 | response_text = (
1571 | f"File: {basename}\n"
1572 | f"Path: {validated_path}\n"
1573 | f"Size: {size_str}\n"
1574 | f"Content: <Binary file detected> {preview_msg}\n"
1575 | f"{hex_preview}{ellipsis}"
1576 | )
1577 | else:
1578 | # Content is already string
1579 | response_text = (
1580 | f"File: {basename}\n"
1581 | f"Path: {validated_path}\n"
1582 | f"Size: {size_str}\n"
1583 | f"Content:\n" # Add newline for separation
1584 | f"{content}"
1585 | )
1586 |
1587 | response_content = response_text
1588 | processing_time = time.monotonic() - start_time
1589 | logger.success(
1590 | f"Successfully read file: {path}",
1591 | emoji_key="file",
1592 | size=file_size,
1593 | time=processing_time,
1594 | is_binary=is_binary,
1595 | )
1596 |
1597 | except (ToolInputError, ToolError) as e:
1598 | logger.error(
1599 | f"Error in read_file for '{path}': {e}",
1600 | emoji_key="error",
1601 | details=getattr(e, "context", None),
1602 | )
1603 | # Return a formatted error response with detailed info
1604 | error_type = e.__class__.__name__
1605 | error_details = getattr(e, "details", {}) or {}
1606 | # Get the context from the error if available (used in base ToolError)
1607 | context = getattr(e, "context", None)
1608 | if context and isinstance(context, dict):
1609 | error_details.update(context)
1610 | # Include error type and code for better error display
1611 | response_content = {
1612 | "message": str(e),
1613 | "error_code": getattr(e, "error_code", "TOOL_ERROR"),
1614 | "error_type": error_type,
1615 | "details": error_details,
1616 | }
1617 | is_response_error = True
1618 | except FileNotFoundError as e:
1619 | # Specific error for file not found
1620 | raise ToolInputError(
1621 | f"File not found: {path}",
1622 | param_name="path",
1623 | provided_value=path,
1624 | details={"errno": e.errno, "error_type": "PATH_NOT_FOUND"},
1625 | ) from e
1626 | except IsADirectoryError as e:
1627 | # Specific error for path being a directory
1628 | raise ToolInputError(
1629 | f"Path is a directory, not a file: {path}",
1630 | param_name="path",
1631 | provided_value=path,
1632 | details={"errno": e.errno, "error_type": "PATH_IS_DIRECTORY"},
1633 | ) from e
1634 | except PermissionError as e:
1635 | # Specific error for permissions
1636 | raise ToolInputError(
1637 | f"Permission denied reading file: {path}",
1638 | param_name="path",
1639 | provided_value=path,
1640 | details={"errno": e.errno, "error_type": "PERMISSION_DENIED"},
1641 | ) from e
1642 | except UnicodeDecodeError as e:
1643 | # This is handled in read_file_content now, but keep check here just in case
1644 | raise ToolError(
1645 | f"File is not valid UTF-8 encoded text: {validated_path}. Details: {e}",
1646 | context={"path": validated_path, "encoding": "utf-8"},
1647 | ) from e
1648 | except OSError as e:
1649 | # General OS error during read
1650 | raise ToolError(
1651 | f"OS error reading file: {str(e)}", context={"path": validated_path, "errno": e.errno}
1652 | ) from e
1653 | except Exception as e:
1654 | logger.error(
1655 | f"Unexpected error in read_file for '{path}': {e}", exc_info=True, emoji_key="error"
1656 | )
1657 | response_content = {
1658 | "message": f"An unexpected error occurred while reading '{path}': {str(e)}",
1659 | "error_code": "UNEXPECTED_ERROR",
1660 | "error_type": type(e).__name__,
1661 | "details": {"error_class": type(e).__name__, "path": path},
1662 | }
1663 | is_response_error = True
1664 |
1665 | # Use create_tool_response for consistent formatting of success/error messages.
1666 | return create_tool_response(response_content, is_error=is_response_error)
1667 |
1668 |
1669 | @with_tool_metrics
1670 | @with_error_handling
1671 | async def read_multiple_files(paths: List[str]) -> Dict[str, Any]:
1672 | """Read the contents of multiple files asynchronously and concurrently.
1673 |
1674 | Validates each path and attempts to read each file (text/binary),
1675 | handling individual errors gracefully.
1676 |
1677 | Args:
1678 | paths: A list of file paths to read.
1679 |
1680 | Returns:
1681 | A dictionary summarizing the results (suitable for create_tool_response),
1682 | including success/failure counts and content/errors for each file.
1683 | The operation itself succeeds unless input validation fails.
1684 | """
1685 | start_time = time.monotonic()
1686 |
1687 | # Input validation
1688 | if not isinstance(paths, list):
1689 | raise ToolInputError(
1690 | "Input must be a list of paths.", param_name="paths", provided_value=type(paths)
1691 | )
1692 | if not paths: # Handle empty list input explicitly
1693 | logger.info("read_multiple_files called with empty list.", emoji_key="info")
1694 | return {
1695 | "files": [],
1696 | "succeeded": 0,
1697 | "failed": 0,
1698 | "success": True,
1699 | "message": "No paths provided to read.",
1700 | }
1701 | if not all(isinstance(p, str) for p in paths):
1702 | invalid_path = next((p for p in paths if not isinstance(p, str)), None)
1703 | raise ToolInputError(
1704 | "All items in the 'paths' list must be strings.",
1705 | param_name="paths",
1706 | provided_value=f"List contains element of type {type(invalid_path)}",
1707 | )
1708 |
1709 | # --- Inner Task Definition ---
1710 | async def read_single_file_task(path: str) -> Dict[str, Any]:
1711 | """Task to read and process a single file for read_multiple_files."""
1712 | task_result: Dict[str, Any] = {"path": path, "success": False} # Initialize result dict
1713 | validated_path: Optional[str] = None
1714 | try:
1715 | validated_path = await validate_path(
1716 | path, check_exists=True, check_parent_writable=False
1717 | )
1718 | task_result["path"] = validated_path # Update path in result if validation succeeds
1719 |
1720 | # check isfile (follows links)
1721 | if not await aiofiles.os.path.isfile(validated_path):
1722 | # Check if link before failing
1723 | if await aiofiles.os.path.islink(validated_path):
1724 | task_result["error"] = "Path is a link, but does not point to a regular file"
1725 | else:
1726 | task_result["error"] = "Path exists but is not a regular file"
1727 | return task_result
1728 |
1729 | read_error = None
1730 | file_size = -1
1731 |
1732 | # Try reading as text (UTF-8)
1733 | try:
1734 | content_str = await read_file_content(validated_path)
1735 | task_result["content"] = content_str
1736 | except ToolError as text_err:
1737 | if "not valid UTF-8" in str(text_err):
1738 | try:
1739 | binary_content = await read_binary_file_content(validated_path)
1740 | # For multi-read, just provide preview directly in result content
1741 | hex_preview_len = 200
1742 | hex_preview = binary_content[:hex_preview_len].hex(" ")
1743 | ellipsis = "..." if len(binary_content) > hex_preview_len else ""
1744 | preview_msg = f"<binary file detected, hex preview (first {min(hex_preview_len, len(binary_content))} bytes)>: {hex_preview}{ellipsis}"
1745 | task_result["content"] = preview_msg
1746 | task_result["is_binary"] = True
1747 | except ToolError as bin_err:
1748 | read_error = f"Error reading as binary: {str(bin_err)}"
1749 | except Exception as bin_e:
1750 | read_error = f"Unexpected error reading as binary: {str(bin_e)}"
1751 | else:
1752 | read_error = f"Error reading file: {str(text_err)}"
1753 | except Exception as text_e:
1754 | read_error = f"Unexpected error reading file as text: {str(text_e)}"
1755 |
1756 | if read_error:
1757 | task_result["error"] = read_error
1758 | return task_result # Mark as failed
1759 |
1760 | # Successfully read content (string or binary preview string)
1761 | task_result["success"] = True
1762 |
1763 | # Try to get size (use stat with follow_symlinks=False for consistency)
1764 | try:
1765 | file_size = (await aiofiles.os.stat(validated_path, follow_symlinks=False)).st_size
1766 | task_result["size"] = file_size
1767 | except OSError as stat_err:
1768 | logger.warning(
1769 | f"Could not get size for {validated_path} in multi-read: {stat_err}",
1770 | emoji_key="warning",
1771 | )
1772 | task_result["warning"] = "Could not retrieve file size."
1773 |
1774 | return task_result
1775 |
1776 | except (ToolInputError, ToolError) as e:
1777 | # Handle validation or specific tool errors for this path
1778 | task_result["error"] = str(e)
1779 | task_result["path"] = (
1780 | validated_path or path
1781 | ) # Use original path if validation failed early
1782 | return task_result
1783 | except Exception as e:
1784 | # Catch unexpected errors during processing of a single file
1785 | logger.error(
1786 | f"Unexpected error reading single file {path} in multi-read: {e}",
1787 | exc_info=True,
1788 | emoji_key="error",
1789 | )
1790 | task_result["error"] = f"Unexpected error: {str(e)}"
1791 | task_result["path"] = validated_path or path
1792 | return task_result
1793 |
1794 | # --- End Inner Task Definition ---
1795 |
1796 | # Execute reads concurrently using asyncio.gather
1797 | results = await asyncio.gather(
1798 | *(read_single_file_task(p) for p in paths), return_exceptions=True
1799 | )
1800 |
1801 | # Process results (handle potential exceptions returned by gather)
1802 | processed_results: List[Dict[str, Any]] = []
1803 | successful_count = 0
1804 | failed_count = 0
1805 |
1806 | for i, res in enumerate(results):
1807 | original_path = paths[i] # Keep track of the requested path
1808 | if isinstance(res, Exception):
1809 | # An unexpected exception occurred *outside* the try/except in the task (unlikely)
1810 | logger.error(
1811 | f"Gather returned exception for path '{original_path}': {res}",
1812 | exc_info=res,
1813 | emoji_key="error",
1814 | )
1815 | processed_results.append(
1816 | {
1817 | "path": original_path,
1818 | "error": f"Internal error during task execution: {res}",
1819 | "success": False,
1820 | }
1821 | )
1822 | failed_count += 1
1823 | elif isinstance(res, dict):
1824 | # Expected dictionary output from our task
1825 | processed_results.append(res)
1826 | if res.get("success"):
1827 | successful_count += 1
1828 | else:
1829 | failed_count += 1
1830 | else:
1831 | # Should not happen if task always returns dict
1832 | logger.error(
1833 | f"Unexpected result type from task for path '{original_path}': {type(res)}",
1834 | emoji_key="error",
1835 | )
1836 | processed_results.append(
1837 | {
1838 | "path": original_path,
1839 | "error": f"Internal error: Unexpected task result type {type(res)}",
1840 | "success": False,
1841 | }
1842 | )
1843 | failed_count += 1
1844 |
1845 | processing_time = time.monotonic() - start_time
1846 | logger.success(
1847 | f"Finished read_multiple_files: {successful_count} succeeded, {failed_count} failed",
1848 | emoji_key="file",
1849 | total_files=len(paths),
1850 | time=processing_time,
1851 | )
1852 |
1853 | # Return a dictionary structure that create_tool_response understands
1854 | return {
1855 | "files": processed_results,
1856 | "succeeded": successful_count,
1857 | "failed": failed_count,
1858 | "success": True, # The overall tool execution was successful (individual files might have failed)
1859 | }
1860 |
1861 |
1862 | @with_tool_metrics
1863 | @with_error_handling
1864 | async def get_unique_filepath(path: str) -> Dict[str, Any]:
1865 | """
1866 | Finds an available (non-existent) filepath based on the requested path.
1867 |
1868 | If the requested path already doesn't exist, it's returned directly.
1869 | If it exists, it appends counters like '_1', '_2', etc., to the filename stem
1870 | until an unused path is found within the same directory.
1871 |
1872 | Args:
1873 | path: The desired file path.
1874 |
1875 | Returns:
1876 | Dictionary containing the unique, validated, absolute path found.
1877 |
1878 | Raises:
1879 | ToolInputError: If the base path is invalid or outside allowed directories.
1880 | ToolError: If the counter limit is exceeded or filesystem errors occur.
1881 | """
1882 | start_time = time.monotonic()
1883 | MAX_FILENAME_ATTEMPTS = 1000 # Safety limit
1884 |
1885 | try:
1886 | # 1. Validate the *input* path first.
1887 | # check_exists=None because the *final* path might not exist.
1888 | # check_parent_writable=False - we only need read/stat access to check existence.
1889 | # The parent directory's writability should be checked by the *calling* function
1890 | # (like write_file or smart_download's directory creation) before this.
1891 | validated_input_path = await validate_path(
1892 | path, check_exists=None, check_parent_writable=False
1893 | )
1894 | logger.debug(
1895 | f"get_unique_filepath: Validated input path resolves to {validated_input_path}"
1896 | )
1897 |
1898 | # 2. Check if the initial validated path is already available.
1899 | if not await aiofiles.os.path.exists(validated_input_path):
1900 | logger.info(f"Initial path '{validated_input_path}' is available.", emoji_key="file")
1901 | return {
1902 | "path": validated_input_path,
1903 | "attempts": 0,
1904 | "success": True,
1905 | "message": f"Path '{validated_input_path}' is already unique.",
1906 | }
1907 |
1908 | # 3. Path exists, need to find a unique alternative.
1909 | logger.debug(f"Path '{validated_input_path}' exists, finding unique alternative.")
1910 | dirname = os.path.dirname(validated_input_path)
1911 | original_filename = os.path.basename(validated_input_path)
1912 | stem, suffix = os.path.splitext(original_filename)
1913 |
1914 | counter = 1
1915 | while counter <= MAX_FILENAME_ATTEMPTS:
1916 | # Construct new filename candidate
1917 | candidate_filename = f"{stem}_{counter}{suffix}"
1918 | candidate_path = os.path.join(dirname, candidate_filename)
1919 |
1920 | # Check existence asynchronously
1921 | if not await aiofiles.os.path.exists(candidate_path):
1922 | processing_time = time.monotonic() - start_time
1923 | logger.success(
1924 | f"Found unique path '{candidate_path}' after {counter} attempts.",
1925 | emoji_key="file",
1926 | time=processing_time,
1927 | )
1928 | return {
1929 | "path": candidate_path,
1930 | "attempts": counter,
1931 | "success": True,
1932 | "message": f"Found unique path '{candidate_path}'.",
1933 | }
1934 |
1935 | counter += 1
1936 |
1937 | # If loop finishes, we exceeded the limit
1938 | raise ToolError(
1939 | f"Could not find a unique filename based on '{path}' after {MAX_FILENAME_ATTEMPTS} attempts.",
1940 | context={"base_path": validated_input_path, "attempts": MAX_FILENAME_ATTEMPTS},
1941 | )
1942 |
1943 | except OSError as e:
1944 | # Catch errors during exists checks
1945 | logger.error(
1946 | f"Filesystem error finding unique path based on '{path}': {str(e)}",
1947 | exc_info=True,
1948 | emoji_key="error",
1949 | )
1950 | raise ToolError(
1951 | f"Filesystem error checking path existence: {str(e)}",
1952 | context={"path": path, "errno": e.errno},
1953 | ) from e
1954 | except (
1955 | ToolInputError,
1956 | ToolError,
1957 | ): # Re-raise specific errors from validate_path or the counter limit
1958 | raise
1959 | except Exception as e:
1960 | # Catch unexpected errors
1961 | logger.error(
1962 | f"Unexpected error finding unique path for {path}: {e}",
1963 | exc_info=True,
1964 | emoji_key="error",
1965 | )
1966 | raise ToolError(
1967 | f"An unexpected error occurred finding a unique path: {str(e)}", context={"path": path}
1968 | ) from e
1969 |
1970 |
1971 | @with_tool_metrics
1972 | @with_error_handling
1973 | async def write_file(path: str, content: Union[str, bytes]) -> Dict[str, Any]:
1974 | """Write content to a file asynchronously (UTF-8 or binary), creating/overwriting.
1975 |
1976 | Ensures the path is valid, within allowed directories, and that the parent
1977 | directory exists and is writable. Fails if the target path exists and is a directory.
1978 |
1979 | Args:
1980 | path: Path to the file to write.
1981 | content: Content to write (string for text UTF-8, bytes for binary).
1982 |
1983 | Returns:
1984 | A dictionary confirming success and providing file details (path, size).
1985 | """
1986 | start_time = time.monotonic()
1987 |
1988 | # Validate content type explicitly at the start.
1989 | if not isinstance(content, (str, bytes)):
1990 | raise ToolInputError(
1991 | "Content to write must be a string or bytes.",
1992 | param_name="content",
1993 | provided_value=type(content),
1994 | )
1995 |
1996 | # Validate path: doesn't need to exist necessarily (check_exists=None), but parent must exist and be writable.
1997 | validated_path = await validate_path(path, check_exists=None, check_parent_writable=True)
1998 |
1999 | # Check if the path exists and is a directory (we shouldn't overwrite a dir with a file).
2000 | # Use exists instead of lexists for this check
2001 | if await aiofiles.os.path.exists(validated_path) and await aiofiles.os.path.isdir(
2002 | validated_path
2003 | ):
2004 | raise ToolInputError(
2005 | f"Cannot write file: Path '{path}' (resolved to '{validated_path}') exists and is a directory.",
2006 | param_name="path",
2007 | provided_value=path,
2008 | )
2009 |
2010 | # write_file_content handles actual writing and parent dir creation
2011 | await write_file_content(validated_path, content) # Can raise ToolError
2012 |
2013 | # Verify write success by getting status and size afterwards
2014 | file_size = -1
2015 | try:
2016 | file_size = (await aiofiles.os.stat(validated_path, follow_symlinks=False)).st_size
2017 | except OSError as e:
2018 | # If stat fails after write seemed to succeed, something is wrong.
2019 | logger.error(
2020 | f"File write appeared successful for {validated_path}, but failed to get status afterwards: {e}",
2021 | emoji_key="error",
2022 | )
2023 | raise ToolError(
2024 | f"File written but failed to verify status afterwards: {str(e)}",
2025 | context={"path": validated_path},
2026 | ) from e
2027 |
2028 | processing_time = time.monotonic() - start_time
2029 | logger.success(
2030 | f"Successfully wrote file: {path}", emoji_key="file", size=file_size, time=processing_time
2031 | )
2032 |
2033 | # Return a structured success response.
2034 | return {
2035 | "message": f"Successfully wrote {file_size} bytes to '{validated_path}'.",
2036 | "path": validated_path,
2037 | "size": file_size,
2038 | "success": True,
2039 | }
2040 |
2041 |
2042 | @with_tool_metrics
2043 | @with_error_handling
2044 | async def edit_file(
2045 | path: str, edits: List[Dict[str, str]], dry_run: bool = False
2046 | ) -> Dict[str, Any]:
2047 | """Edit a text file asynchronously by applying string replacements (UTF-8).
2048 |
2049 | Validates path and edits, reads the file, applies changes (with fallbacks for
2050 | whitespace differences), generates a diff, and optionally writes back.
2051 |
2052 | Args:
2053 | path: Path to the file to edit. Must be an existing text file.
2054 | edits: List of edit operations. Each dict needs 'oldText' (str)
2055 | and 'newText' (str).
2056 | dry_run: If True, calculates changes and diff but does not save them.
2057 |
2058 | Returns:
2059 | Dictionary containing the generated diff, success status, path,
2060 | and whether it was a dry run.
2061 | """
2062 | start_time = time.monotonic()
2063 |
2064 | # Validate path must exist and be a file (check_exists=True).
2065 | validated_path = await validate_path(path, check_exists=True, check_parent_writable=False)
2066 | # Check if it's a regular file (follows links)
2067 | if not await aiofiles.os.path.isfile(validated_path):
2068 | if await aiofiles.os.path.islink(validated_path):
2069 | raise ToolInputError(
2070 | f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a regular file.",
2071 | param_name="path",
2072 | provided_value=path,
2073 | )
2074 | else:
2075 | raise ToolInputError(
2076 | f"Path '{path}' (resolved to '{validated_path}') is not a regular file.",
2077 | param_name="path",
2078 | provided_value=path,
2079 | )
2080 |
2081 | # Validate edits structure
2082 | if not isinstance(edits, list):
2083 | raise ToolInputError(
2084 | "Edits parameter must be a list.", param_name="edits", provided_value=type(edits)
2085 | )
2086 | if not edits:
2087 | # Handle empty edits list as a no-op success.
2088 | logger.info(
2089 | f"edit_file called with empty edits list for {path}. No changes will be made.",
2090 | emoji_key="info",
2091 | )
2092 | return {
2093 | "path": validated_path,
2094 | "diff": "No edits provided.",
2095 | "success": True,
2096 | "dry_run": dry_run,
2097 | "message": "No edits were specified.",
2098 | }
2099 | # Deeper validation of each edit dict happens within apply_file_edits
2100 |
2101 | # NOTE: Modification protection is currently NOT applied here.
2102 | # This operates on a single file. Bulk editing would require a different tool
2103 | # where protection heuristics might be applied.
2104 |
2105 | # apply_file_edits handles reading, core editing logic, diffing, conditional writing.
2106 | # Raises ToolInputError/ToolError on failure.
2107 | diff, new_content = await apply_file_edits(validated_path, edits, dry_run)
2108 |
2109 | processing_time = time.monotonic() - start_time
2110 |
2111 | action = "Previewed edits for" if dry_run else "Applied edits to"
2112 | logger.success(
2113 | f"Successfully {action} file: {path}",
2114 | emoji_key="file",
2115 | num_edits=len(edits),
2116 | dry_run=dry_run,
2117 | time=processing_time,
2118 | changes_made=(diff != ""), # Log if actual changes resulted
2119 | )
2120 |
2121 | # Provide clearer messages based on diff content and dry_run status.
2122 | if diff:
2123 | diff_message = diff
2124 | status_message = f"Successfully {'previewed' if dry_run else 'applied'} {len(edits)} edits."
2125 | else: # Edits provided, but resulted in no change to content
2126 | diff_message = "No changes detected after applying edits."
2127 | status_message = f"{len(edits)} edits provided, but resulted in no content changes."
2128 |
2129 | return {
2130 | "path": validated_path,
2131 | "diff": diff_message,
2132 | "success": True,
2133 | "dry_run": dry_run,
2134 | "message": status_message,
2135 | # "new_content": new_content # Optional: return new content, especially for dry runs? Can be large.
2136 | }
2137 |
2138 |
2139 | @with_tool_metrics
2140 | @with_error_handling
2141 | async def create_directory(path: str) -> Dict[str, Any]:
2142 | """Create a directory asynchronously, including parent directories (like 'mkdir -p').
2143 |
2144 | Validates the path is allowed and parent is writable. Idempotent: If the directory
2145 | already exists, it succeeds without error. Fails if the path exists but is a file.
2146 |
2147 | Args:
2148 | path: Path to the directory to create.
2149 |
2150 | Returns:
2151 | Dictionary confirming success, path, and whether it was newly created.
2152 | """
2153 | start_time = time.monotonic()
2154 |
2155 | # Validate path, parent must exist/be writable. Path itself should ideally not exist (check_exists=None allows check).
2156 | validated_path = await validate_path(path, check_exists=None, check_parent_writable=True)
2157 |
2158 | created = False
2159 | message = ""
2160 | try:
2161 | # Check existence and type before creating, using exists instead of lexists
2162 | if await aiofiles.os.path.exists(validated_path):
2163 | if await aiofiles.os.path.isdir(validated_path):
2164 | # Directory already exists - idempotent success.
2165 | logger.info(
2166 | f"Directory already exists: {path} (resolved: {validated_path})",
2167 | emoji_key="directory",
2168 | )
2169 | message = f"Directory '{validated_path}' already exists."
2170 | else:
2171 | # Path exists but is not a directory (e.g., a file or symlink)
2172 | raise ToolInputError(
2173 | f"Cannot create directory: Path '{path}' (resolved to '{validated_path}') already exists but is not a directory.",
2174 | param_name="path",
2175 | provided_value=path,
2176 | )
2177 | else:
2178 | # Path does not exist, proceed with creation using async makedirs.
2179 | await aiofiles.os.makedirs(validated_path, exist_ok=True)
2180 | created = True
2181 | logger.success(
2182 | f"Successfully created directory: {path} (resolved: {validated_path})",
2183 | emoji_key="directory",
2184 | )
2185 | message = f"Successfully created directory '{validated_path}'."
2186 |
2187 | except OSError as e:
2188 | # Catch errors during the exists/isdir checks or makedirs call
2189 | logger.error(
2190 | f"Error creating directory '{path}' (resolved: {validated_path}): {e}",
2191 | exc_info=True,
2192 | emoji_key="error",
2193 | )
2194 | raise ToolError(
2195 | f"Error creating directory '{path}': {str(e)}",
2196 | context={"path": validated_path, "errno": e.errno},
2197 | ) from e
2198 | except Exception as e:
2199 | # Catch unexpected errors
2200 | logger.error(
2201 | f"Unexpected error creating directory {path}: {e}", exc_info=True, emoji_key="error"
2202 | )
2203 | raise ToolError(
2204 | f"An unexpected error occurred creating directory: {str(e)}",
2205 | context={"path": validated_path},
2206 | ) from e
2207 |
2208 | processing_time = time.monotonic() - start_time
2209 | logger.success(
2210 | f"Successfully created directory: {path} (resolved: {validated_path})",
2211 | emoji_key="directory",
2212 | time=processing_time,
2213 | )
2214 |
2215 | return {"path": validated_path, "created": created, "success": True, "message": message}
2216 |
2217 |
2218 | @with_tool_metrics
2219 | @with_error_handling
2220 | async def list_directory(path: str) -> Dict[str, Any]:
2221 | """List files and subdirectories within a given directory asynchronously.
2222 |
2223 | Validates the path exists, is a directory, and is allowed.
2224 | Provides basic info (name, type, size for files, link target for symlinks) for each entry.
2225 |
2226 | Args:
2227 | path: Path to the directory to list.
2228 |
2229 | Returns:
2230 | Dictionary containing the path listed and a list of entries.
2231 | """
2232 | start_time = time.monotonic()
2233 |
2234 | # Validate path exists and is a directory (check_exists=True).
2235 | validated_path = await validate_path(path, check_exists=True, check_parent_writable=False)
2236 | if not await aiofiles.os.path.isdir(validated_path):
2237 | # If it's a link, check if it points to a directory
2238 | if await aiofiles.os.path.islink(validated_path):
2239 | try:
2240 | target_is_dir = await aiofiles.os.path.isdir(
2241 | await aiofiles.os.path.realpath(validated_path)
2242 | )
2243 | if not target_is_dir:
2244 | raise ToolInputError(
2245 | f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a directory.",
2246 | param_name="path",
2247 | provided_value=path,
2248 | )
2249 | # If target is dir, proceed using validated_path (which resolves the link)
2250 | except OSError as e:
2251 | raise ToolError(
2252 | f"Error resolving or checking link target for directory listing: {e}",
2253 | context={"path": validated_path},
2254 | ) from e
2255 | else:
2256 | raise ToolInputError(
2257 | f"Path '{path}' (resolved to '{validated_path}') is not a directory.",
2258 | param_name="path",
2259 | provided_value=path,
2260 | )
2261 |
2262 | entries: List[Dict[str, Any]] = []
2263 | scan_errors: List[str] = []
2264 | try:
2265 | # Use await with scandir rather than async iteration
2266 | entry_list = await aiofiles.os.scandir(validated_path)
2267 | for entry in entry_list:
2268 | entry_info: Dict[str, Any] = {"name": entry.name}
2269 | try:
2270 | # Use async methods on the DirEntry object for efficiency, check link status explicitly
2271 | is_link = entry.is_symlink() # Checks if entry *itself* is a link
2272 | entry_info["is_symlink"] = is_link
2273 |
2274 | # Let's be explicit using lstat results for type determination
2275 | try:
2276 | stat_res = entry.stat(follow_symlinks=False) # Use lstat via entry
2277 | mode = stat_res.st_mode
2278 | l_is_dir = os.path.stat.S_ISDIR(mode)
2279 | l_is_file = os.path.stat.S_ISREG(mode)
2280 | l_is_link = os.path.stat.S_ISLNK(mode) # Should match entry.is_symlink() result
2281 |
2282 | if l_is_dir:
2283 | entry_info["type"] = "directory"
2284 | elif l_is_file:
2285 | entry_info["type"] = "file"
2286 | entry_info["size"] = stat_res.st_size # Size of file itself
2287 | elif l_is_link:
2288 | entry_info["type"] = "symlink"
2289 | entry_info["size"] = stat_res.st_size # Size of link itself
2290 | # Optionally try to resolve link target for display
2291 | try:
2292 | target = await aiofiles.os.readlink(entry.path)
2293 | entry_info["symlink_target"] = target
2294 | except OSError as link_err:
2295 | entry_info["error"] = f"Could not read link target: {link_err}"
2296 | else:
2297 | entry_info["type"] = "other" # E.g., socket, fifo
2298 | entry_info["size"] = stat_res.st_size
2299 |
2300 | except OSError as stat_err:
2301 | logger.warning(
2302 | f"Could not lstat entry {entry.path} in list_directory: {stat_err}",
2303 | emoji_key="warning",
2304 | )
2305 | entry_info["type"] = "error"
2306 | entry_info["error"] = f"Could not get info: {stat_err}"
2307 |
2308 | entries.append(entry_info)
2309 |
2310 | except OSError as entry_err:
2311 | # Error processing a specific entry (e.g., permission denied on is_dir/is_file/is_symlink)
2312 | logger.warning(
2313 | f"Could not process directory entry '{entry.name}' in {validated_path}: {entry_err}",
2314 | emoji_key="warning",
2315 | )
2316 | error_message = f"Error processing entry '{entry.name}': {entry_err}"
2317 | scan_errors.append(error_message)
2318 | # Add error entry to the list for visibility
2319 | entries.append({"name": entry.name, "type": "error", "error": str(entry_err)})
2320 |
2321 | # Sort entries: directories, files, symlinks, others, errors; then alphabetically.
2322 | entries.sort(
2323 | key=lambda e: (
2324 | 0
2325 | if e.get("type") == "directory"
2326 | else 1
2327 | if e.get("type") == "file"
2328 | else 2
2329 | if e.get("type") == "symlink"
2330 | else 3
2331 | if e.get("type") == "other"
2332 | else 4, # Errors last
2333 | e.get("name", "").lower(), # Case-insensitive sort by name
2334 | )
2335 | )
2336 |
2337 | except OSError as e:
2338 | # Error during the initial scandir call (e.g., permission denied on the directory itself)
2339 | raise ToolError(
2340 | f"Error listing directory '{path}': {str(e)}",
2341 | context={"path": validated_path, "errno": e.errno},
2342 | ) from e
2343 | except Exception as e:
2344 | # Catch unexpected errors during iteration
2345 | logger.error(
2346 | f"Unexpected error listing directory {path}: {e}", exc_info=True, emoji_key="error"
2347 | )
2348 | raise ToolError(
2349 | f"An unexpected error occurred listing directory: {str(e)}",
2350 | context={"path": validated_path},
2351 | ) from e
2352 |
2353 | processing_time = time.monotonic() - start_time
2354 | logger.success(
2355 | f"Listed directory: {path} ({len(entries)} entries found, {len(scan_errors)} errors)",
2356 | emoji_key="directory",
2357 | time=processing_time,
2358 | )
2359 |
2360 | # Structure result clearly, including warnings if errors occurred on entries.
2361 | result = {
2362 | "path": validated_path,
2363 | "entries": entries,
2364 | "success": True,
2365 | "message": f"Found {len(entries)} entries in '{validated_path}'.",
2366 | }
2367 | if scan_errors:
2368 | warning_summary = f"Encountered {len(scan_errors)} errors processing directory entries."
2369 | result["warnings"] = [warning_summary]
2370 | # Optionally include first few errors: result["error_details"] = scan_errors[:5]
2371 |
2372 | return result
2373 |
2374 |
2375 | @with_tool_metrics
2376 | @with_error_handling
2377 | async def directory_tree(
2378 | path: str, max_depth: int = 3, include_size: bool = False
2379 | ) -> Dict[str, Any]:
2380 | """Get a recursive tree view of a directory structure asynchronously.
2381 |
2382 | Args:
2383 | path: Path to the root directory for the tree view.
2384 | max_depth: Maximum recursion depth (-1 for effectively unlimited, capped internally).
2385 | include_size: If True, include file sizes in the tree (requires extra stat calls).
2386 |
2387 | Returns:
2388 | Dictionary containing the path and the hierarchical tree structure.
2389 | """
2390 | start_time = time.monotonic()
2391 | # Internal safety cap for 'unlimited' depth to prevent runaway recursion.
2392 | INTERNAL_DEPTH_CAP = 15
2393 |
2394 | # Validate path exists and is a directory (check_exists=True)
2395 | validated_path = await validate_path(path, check_exists=True)
2396 | if not await aiofiles.os.path.isdir(validated_path):
2397 | # Check if it's a link to a directory
2398 | if await aiofiles.os.path.islink(validated_path):
2399 | try:
2400 | target_is_dir = await aiofiles.os.path.isdir(
2401 | await aiofiles.os.path.realpath(validated_path)
2402 | )
2403 | if not target_is_dir:
2404 | raise ToolInputError(
2405 | f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a directory.",
2406 | param_name="path",
2407 | provided_value=path,
2408 | )
2409 | # proceed with validated_path which resolves link
2410 | except OSError as e:
2411 | raise ToolError(
2412 | f"Error resolving or checking link target for directory tree: {e}",
2413 | context={"path": validated_path},
2414 | ) from e
2415 | else:
2416 | raise ToolInputError(
2417 | f"Path '{path}' (resolved to '{validated_path}') is not a directory.",
2418 | param_name="path",
2419 | provided_value=path,
2420 | )
2421 |
2422 | # Validate max_depth input
2423 | if not isinstance(max_depth, int):
2424 | raise ToolInputError(
2425 | "max_depth must be an integer.", param_name="max_depth", provided_value=max_depth
2426 | )
2427 |
2428 | # Apply internal depth cap if necessary
2429 | actual_max_depth = max_depth
2430 | if max_depth < 0 or max_depth > INTERNAL_DEPTH_CAP:
2431 | if max_depth >= 0: # Only warn if user specified a large number, not for -1
2432 | logger.warning(
2433 | f"Requested max_depth {max_depth} exceeds internal cap {INTERNAL_DEPTH_CAP}. Limiting depth.",
2434 | emoji_key="warning",
2435 | )
2436 | actual_max_depth = INTERNAL_DEPTH_CAP
2437 |
2438 | # --- Recursive Helper ---
2439 | async def build_tree_recursive(current_path: str, current_depth: int) -> List[Dict[str, Any]]:
2440 | """Recursively builds the directory tree structure."""
2441 | if current_depth > actual_max_depth:
2442 | # Return specific marker if depth limit hit, not just empty list.
2443 | return [{"name": f"... (Max depth {actual_max_depth} reached)", "type": "info"}]
2444 |
2445 | children_nodes: List[Dict[str, Any]] = []
2446 | try:
2447 | # Await scandir and then iterate over the returned entries
2448 | entries = await aiofiles.os.scandir(current_path)
2449 | for entry in entries:
2450 | entry_data: Dict[str, Any] = {"name": entry.name}
2451 | try:
2452 | # Use lstat via entry to avoid following links unexpectedly
2453 | stat_res = entry.stat(follow_symlinks=False)
2454 | mode = stat_res.st_mode
2455 | l_is_dir = os.path.stat.S_ISDIR(mode)
2456 | l_is_file = os.path.stat.S_ISREG(mode)
2457 | l_is_link = os.path.stat.S_ISLNK(mode)
2458 |
2459 | if l_is_dir:
2460 | entry_data["type"] = "directory"
2461 | # Recurse asynchronously into subdirectory
2462 | entry_data["children"] = await build_tree_recursive(
2463 | entry.path, current_depth + 1
2464 | )
2465 | elif l_is_file:
2466 | entry_data["type"] = "file"
2467 | if include_size:
2468 | entry_data["size"] = stat_res.st_size
2469 | elif l_is_link:
2470 | entry_data["type"] = "symlink"
2471 | if include_size:
2472 | entry_data["size"] = stat_res.st_size # Size of link itself
2473 | # Optionally resolve link target? Can be noisy. Let's skip for tree view simplicity.
2474 | # try: entry_data["target"] = await aiofiles.os.readlink(entry.path)
2475 | # except OSError: entry_data["target"] = "<Error reading link>"
2476 | else:
2477 | entry_data["type"] = "other"
2478 | if include_size:
2479 | entry_data["size"] = stat_res.st_size
2480 |
2481 | children_nodes.append(entry_data)
2482 |
2483 | except OSError as entry_err:
2484 | # Error processing one entry (e.g., permissions on stat)
2485 | logger.warning(
2486 | f"Could not process entry {entry.path} in tree: {entry_err}",
2487 | emoji_key="warning",
2488 | )
2489 | children_nodes.append(
2490 | {"name": entry.name, "type": "error", "error": str(entry_err)}
2491 | )
2492 |
2493 | # Sort entries at the current level alphabetically by name, type secondary
2494 | children_nodes.sort(
2495 | key=lambda e: (
2496 | 0
2497 | if e.get("type") == "directory"
2498 | else 1
2499 | if e.get("type") == "file"
2500 | else 2
2501 | if e.get("type") == "symlink"
2502 | else 3,
2503 | e.get("name", "").lower(), # Case-insensitive sort
2504 | )
2505 | )
2506 | return children_nodes
2507 |
2508 | except OSError as e:
2509 | # Error scanning the directory itself (e.g., permissions)
2510 | logger.error(
2511 | f"Error scanning directory {current_path} for tree: {str(e)}", emoji_key="error"
2512 | )
2513 | # Return error indicator instead of raising, allows partial trees if root is accessible
2514 | return [{"name": f"... (Error scanning this directory: {e})", "type": "error"}]
2515 | except Exception as e:
2516 | # Unexpected error during scan
2517 | logger.error(
2518 | f"Unexpected error scanning directory {current_path} for tree: {e}",
2519 | exc_info=True,
2520 | emoji_key="error",
2521 | )
2522 | return [{"name": f"... (Unexpected error scanning: {e})", "type": "error"}]
2523 |
2524 | # --- End Recursive Helper ---
2525 |
2526 | # Generate tree starting from the validated path
2527 | tree_structure = await build_tree_recursive(validated_path, 0)
2528 |
2529 | processing_time = time.monotonic() - start_time
2530 | logger.success(
2531 | f"Generated directory tree for: {path}",
2532 | emoji_key="directory",
2533 | max_depth=actual_max_depth, # Log the effective depth
2534 | requested_depth=max_depth,
2535 | include_size=include_size,
2536 | time=processing_time,
2537 | )
2538 |
2539 | # Structure result clearly
2540 | return {
2541 | "path": validated_path,
2542 | "max_depth_reached": actual_max_depth,
2543 | "tree": tree_structure,
2544 | "success": True,
2545 | "message": f"Generated directory tree for '{validated_path}' up to depth {actual_max_depth}.",
2546 | }
2547 |
2548 |
2549 | @with_tool_metrics
2550 | @with_error_handling
2551 | async def move_file(
2552 | source: str,
2553 | destination: str,
2554 | overwrite: bool = False, # Default to NOT overwrite for safety
2555 | ) -> Dict[str, Any]:
2556 | """Move or rename a file or directory asynchronously.
2557 |
2558 | Ensures both source and destination paths are within allowed directories.
2559 | Checks for existence and potential conflicts before moving.
2560 |
2561 | Args:
2562 | source: Path to the file or directory to move. Must exist.
2563 | destination: New path for the file or directory. Parent must exist and be writable.
2564 | overwrite: If True, allows overwriting an existing file or *empty* directory
2565 | at the destination. USE WITH CAUTION. Defaults False.
2566 |
2567 | Returns:
2568 | Dictionary confirming the move operation, including source and destination paths.
2569 | """
2570 | start_time = time.monotonic()
2571 |
2572 | # Validate source path (must exist, check_exists=True)
2573 | validated_source = await validate_path(source, check_exists=True)
2574 |
2575 | # Validate destination path (parent must exist and be writable, path itself may or may not exist check_exists=None)
2576 | # Check parent writability *before* checking overwrite logic.
2577 | validated_dest = await validate_path(destination, check_exists=None, check_parent_writable=True)
2578 |
2579 | # Validate overwrite flag type
2580 | if not isinstance(overwrite, bool):
2581 | raise ToolInputError(
2582 | "overwrite parameter must be a boolean (true/false).",
2583 | param_name="overwrite",
2584 | provided_value=type(overwrite),
2585 | )
2586 |
2587 | # NOTE: Modification protection is currently NOT applied here.
2588 | # If overwriting a directory, current logic only allows overwriting an *empty* one via rmdir.
2589 | # Overwriting a file is a single-file modification.
2590 | # A future enhancement could add protection heuristics here if overwriting non-empty dirs was allowed.
2591 |
2592 | try:
2593 | # Check if destination already exists using exists instead of lexists
2594 | dest_exists = await aiofiles.os.path.exists(validated_dest)
2595 | dest_is_dir = False
2596 | if dest_exists:
2597 | dest_is_dir = await aiofiles.os.path.isdir(
2598 | validated_dest
2599 | ) # Check type (follows links if dest is link)
2600 |
2601 | if dest_exists:
2602 | if overwrite:
2603 | # Overwrite requested. Log prominently.
2604 | logger.warning(
2605 | f"Overwrite flag is True. Attempting to replace existing path '{validated_dest}' with '{validated_source}'.",
2606 | emoji_key="warning",
2607 | )
2608 |
2609 | # Check if source and destination types are compatible for overwrite (e.g., cannot replace dir with file easily)
2610 | # Use stat with follow_symlinks=False for source type check as well.
2611 | source_stat = await aiofiles.os.stat(validated_source, follow_symlinks=False)
2612 | is_source_dir = os.path.stat.S_ISDIR(source_stat.st_mode)
2613 |
2614 | # Simple check: Prevent overwriting dir with file or vice-versa.
2615 | # Note: aiofiles.os.rename might handle some cases, but explicit check is safer.
2616 | # This logic assumes we'd remove the destination first.
2617 | if is_source_dir != dest_is_dir:
2618 | # Allow replacing link with dir/file, or dir/file with link? Be cautious.
2619 | # Let's prevent dir/file mismatch for simplicity. Overwriting links is tricky.
2620 | if not await aiofiles.os.path.islink(
2621 | validated_dest
2622 | ): # Only enforce if dest is not a link
2623 | raise ToolInputError(
2624 | f"Cannot overwrite: Source is a {'directory' if is_source_dir else 'file/link'} but destination ('{validated_dest}') exists and is a {'directory' if dest_is_dir else 'file/link'}.",
2625 | param_name="destination",
2626 | provided_value=destination,
2627 | )
2628 |
2629 | # Attempt to remove the existing destination. This is the dangerous part.
2630 | try:
2631 | if dest_is_dir:
2632 | # Use async rmdir - fails if directory is not empty!
2633 | # This prevents accidental recursive deletion via move+overwrite.
2634 | await aiofiles.os.rmdir(validated_dest)
2635 | logger.info(
2636 | f"Removed existing empty directory destination '{validated_dest}' for overwrite.",
2637 | emoji_key="action",
2638 | )
2639 | else:
2640 | # Removes a file or symlink
2641 | await aiofiles.os.remove(validated_dest)
2642 | logger.info(
2643 | f"Removed existing file/link destination '{validated_dest}' for overwrite.",
2644 | emoji_key="action",
2645 | )
2646 | except OSError as remove_err:
2647 | # Handle cases like directory not empty, permission error during removal
2648 | raise ToolError(
2649 | f"Failed to remove existing destination '{validated_dest}' for overwrite: {remove_err}. Check permissions or if directory is empty.",
2650 | context={
2651 | "source": validated_source,
2652 | "destination": validated_dest,
2653 | "errno": remove_err.errno,
2654 | },
2655 | ) from remove_err
2656 |
2657 | else: # Destination exists, and overwrite is False (default)
2658 | raise ToolInputError(
2659 | f"Cannot move: Destination path '{destination}' (resolved to '{validated_dest}') already exists. Use overwrite=True to replace.",
2660 | param_name="destination",
2661 | provided_value=destination,
2662 | )
2663 |
2664 | # Ensure source and destination are not the same path after normalization/resolution.
2665 | # Note: aiofiles.os.rename handles this check internally too, but explicit check is clearer.
2666 | # Use os.path.samfile to check if they refer to the same actual file/inode (more robust than string comparison)
2667 | # samfile is sync, but should be fast. Requires paths to exist.
2668 | try:
2669 | # Source exists. Does dest exist now (after potential removal)?
2670 | dest_exists_after_remove = await aiofiles.os.path.exists(validated_dest) # noqa: F841
2671 | # Only call samefile if both paths point to existing things after the removal step.
2672 | # This check is mainly useful if overwrite was False and dest didn't exist initially but resolved same as source.
2673 | # If dest existed and overwrite=True, it should have been removed.
2674 | # Let's simplify: compare final validated paths. If rename fails later, it likely handles identity internally.
2675 | if validated_source == validated_dest:
2676 | logger.info(
2677 | f"Source and destination paths resolve to the same location ('{validated_source}'). No move needed.",
2678 | emoji_key="info",
2679 | )
2680 | return {
2681 | "source": validated_source,
2682 | "destination": validated_dest,
2683 | "success": True,
2684 | "message": "Source and destination are the same path. No operation performed.",
2685 | }
2686 | except OSError:
2687 | # Ignore errors from samefile check, rely on rename's internal checks.
2688 | pass
2689 |
2690 | # Attempt the move/rename operation asynchronously
2691 | await aiofiles.os.rename(validated_source, validated_dest)
2692 |
2693 | except OSError as e:
2694 | # Catch errors from exists, rename, remove, rmdir etc.
2695 | logger.error(
2696 | f"Error moving/renaming from '{source}' to '{destination}': {str(e)}",
2697 | exc_info=True,
2698 | emoji_key="error",
2699 | )
2700 | raise ToolError(
2701 | f"Error moving '{source}' to '{destination}': {str(e)}",
2702 | context={"source": validated_source, "destination": validated_dest, "errno": e.errno},
2703 | ) from e
2704 | except (ToolInputError, ToolError, ProtectionTriggeredError): # Re-raise our specific errors
2705 | raise
2706 | except Exception as e:
2707 | # Catch unexpected errors
2708 | logger.error(
2709 | f"Unexpected error moving {source} to {destination}: {e}",
2710 | exc_info=True,
2711 | emoji_key="error",
2712 | )
2713 | raise ToolError(
2714 | f"An unexpected error occurred during move: {str(e)}",
2715 | context={"source": validated_source, "destination": validated_dest},
2716 | ) from e
2717 |
2718 | processing_time = time.monotonic() - start_time
2719 | logger.success(
2720 | f"Moved '{source}' to '{destination}'",
2721 | emoji_key="file",
2722 | time=processing_time,
2723 | overwrite_used=overwrite,
2724 | )
2725 |
2726 | return {
2727 | "source": validated_source, # Report original source for clarity
2728 | "destination": validated_dest,
2729 | "success": True,
2730 | "message": f"Successfully moved '{validated_source}' to '{validated_dest}'.",
2731 | }
2732 |
2733 |
2734 | @with_tool_metrics
2735 | @with_error_handling
2736 | async def delete_path(path: str) -> Dict[str, Any]:
2737 | """Delete a file or an entire directory tree asynchronously.
2738 |
2739 | Validates the path exists and is allowed. If the path is a directory,
2740 | applies deletion protection heuristics (if enabled) based on the directory's
2741 | contents before proceeding with recursive deletion.
2742 |
2743 | Args:
2744 | path: Path to the file or directory to delete.
2745 |
2746 | Returns:
2747 | Dictionary confirming the deletion.
2748 | """
2749 | start_time = time.monotonic()
2750 |
2751 | # Validate path exists (check_exists=True), but don't resolve symlinks yet
2752 | # We need to know if the original path is a symlink to handle it properly
2753 | validation_result = await validate_path(path, check_exists=True, resolve_symlinks=False)
2754 |
2755 | # Check if the path is a symlink
2756 | is_symlink = await aiofiles.os.path.islink(path)
2757 | logger.info(f"Note: Deleting the symlink itself (not its target) at path: {path}")
2758 | validated_path = validation_result
2759 |
2760 | try:
2761 | deleted_type = "unknown"
2762 |
2763 | # First check if it's a symlink - we want to handle this separately
2764 | # to avoid accidentally deleting the target
2765 | if is_symlink:
2766 | deleted_type = "symlink"
2767 | logger.info(f"Deleting symlink: {validated_path}", emoji_key="delete")
2768 | await aiofiles.os.remove(validated_path)
2769 |
2770 | # If not a symlink, proceed with regular directory or file deletion
2771 | else:
2772 | # We need to check if it's a directory or file - use follow_symlinks=True because
2773 | # we already handled the symlink case above
2774 | stat_info = await aiofiles.os.stat(validated_path, follow_symlinks=True)
2775 | is_dir = os.path.stat.S_ISDIR(stat_info.st_mode)
2776 | is_file = os.path.stat.S_ISREG(stat_info.st_mode)
2777 |
2778 | if is_dir:
2779 | deleted_type = "directory"
2780 | logger.info(f"Attempting to delete directory: {validated_path}", emoji_key="delete")
2781 | # --- Deletion Protection Check ---
2782 | try:
2783 | # List all file paths within the directory for heuristic checks
2784 | contained_file_paths = await _list_paths_recursive(validated_path)
2785 | if contained_file_paths: # Only run check if directory is not empty
2786 | await _check_protection_heuristics(contained_file_paths, "deletion")
2787 | else:
2788 | logger.info(
2789 | f"Directory '{validated_path}' is empty, skipping detailed protection check.",
2790 | emoji_key="info",
2791 | )
2792 | except ProtectionTriggeredError:
2793 | raise # Re-raise the specific error if protection blocked the operation
2794 | except ToolError as list_err:
2795 | # If listing contents failed, block deletion for safety?
2796 | raise ToolError(
2797 | f"Could not list directory contents for safety check before deleting '{validated_path}'. Deletion aborted. Reason: {list_err}",
2798 | context={"path": validated_path},
2799 | ) from list_err
2800 | # --- End Protection Check ---
2801 |
2802 | # Protection passed (or disabled/not triggered), proceed with recursive delete
2803 | await _async_rmtree(validated_path)
2804 |
2805 | elif is_file:
2806 | deleted_type = "file"
2807 | logger.info(f"Attempting to delete file: {validated_path}", emoji_key="delete")
2808 | await aiofiles.os.remove(validated_path)
2809 | else:
2810 | # Should not happen if lexists passed validation, but handle defensively
2811 | raise ToolError(
2812 | f"Cannot delete path '{validated_path}': It is neither a file, directory, nor a symbolic link.",
2813 | context={"path": validated_path},
2814 | )
2815 |
2816 | except OSError as e:
2817 | # Catch errors from remove, rmdir, or during rmtree
2818 | logger.error(
2819 | f"Error deleting path '{path}' (resolved: {validated_path}): {str(e)}",
2820 | exc_info=True,
2821 | emoji_key="error",
2822 | )
2823 | raise ToolError(
2824 | f"Error deleting '{path}': {str(e)}", context={"path": validated_path, "errno": e.errno}
2825 | ) from e
2826 | except (ToolInputError, ToolError, ProtectionTriggeredError): # Re-raise our specific errors
2827 | raise
2828 | except Exception as e:
2829 | # Catch unexpected errors
2830 | logger.error(f"Unexpected error deleting {path}: {e}", exc_info=True, emoji_key="error")
2831 | raise ToolError(
2832 | f"An unexpected error occurred during deletion: {str(e)}",
2833 | context={"path": validated_path},
2834 | ) from e
2835 |
2836 | processing_time = time.monotonic() - start_time
2837 | logger.success(
2838 | f"Successfully deleted {deleted_type}: '{path}' (resolved: {validated_path})",
2839 | emoji_key="delete",
2840 | time=processing_time,
2841 | )
2842 |
2843 | return {
2844 | "path": validated_path,
2845 | "type_deleted": deleted_type,
2846 | "success": True,
2847 | "message": f"Successfully deleted {deleted_type} '{validated_path}'.",
2848 | }
2849 |
2850 |
2851 | @with_tool_metrics
2852 | @with_error_handling
2853 | async def search_files(
2854 | path: str,
2855 | pattern: str,
2856 | case_sensitive: bool = False,
2857 | exclude_patterns: Optional[List[str]] = None,
2858 | search_content: bool = False,
2859 | max_content_bytes: int = 1024 * 1024, # Limit content search size per file (1MB)
2860 | ) -> Dict[str, Any]:
2861 | """Search for files/directories matching a pattern asynchronously and recursively.
2862 |
2863 | Supports filename pattern matching (case-sensitive/insensitive substring)
2864 | and optional searching within file content (UTF-8 text only, limited size).
2865 | Allows exclusion patterns (glob format).
2866 |
2867 | Args:
2868 | path: Directory path to start the search from.
2869 | pattern: Text pattern to find. Used for substring match in names,
2870 | and exact string find in content if search_content=True.
2871 | case_sensitive: If True, matching is case-sensitive. Defaults False.
2872 | exclude_patterns: Optional list of glob-style patterns for paths/names
2873 | to exclude (e.g., ["*.log", ".git/"]). Matched case-insensitively
2874 | on appropriate systems.
2875 | search_content: If True, also search *inside* text files for the pattern.
2876 | This can be significantly slower and memory intensive.
2877 | max_content_bytes: Max bytes to read from each file when search_content=True.
2878 |
2879 | Returns:
2880 | Dictionary with search parameters and a list of matching file/directory paths.
2881 | May include warnings about errors encountered during search.
2882 | """
2883 | start_time = time.monotonic()
2884 |
2885 | # Validate path exists and is a directory (check_exists=True)
2886 | validated_path = await validate_path(path, check_exists=True)
2887 | if not await aiofiles.os.path.isdir(validated_path):
2888 | # Check links to dirs
2889 | if await aiofiles.os.path.islink(validated_path):
2890 | try:
2891 | if not await aiofiles.os.path.isdir(
2892 | await aiofiles.os.path.realpath(validated_path)
2893 | ):
2894 | raise ToolInputError(
2895 | f"Path '{path}' (resolved to link '{validated_path}') points to something that is not a directory.",
2896 | param_name="path",
2897 | provided_value=path,
2898 | )
2899 | # proceed with validated_path
2900 | except OSError as e:
2901 | raise ToolError(
2902 | f"Error resolving or checking link target for search: {e}",
2903 | context={"path": validated_path},
2904 | ) from e
2905 | else:
2906 | raise ToolInputError(
2907 | f"Path '{path}' (resolved to '{validated_path}') is not a directory.",
2908 | param_name="path",
2909 | provided_value=path,
2910 | )
2911 |
2912 | # Validate other inputs
2913 | if not isinstance(pattern, str) or not pattern:
2914 | raise ToolInputError(
2915 | "Search pattern must be a non-empty string.",
2916 | param_name="pattern",
2917 | provided_value=pattern,
2918 | )
2919 | if exclude_patterns: # Ensure it's a list of strings if provided
2920 | if not isinstance(exclude_patterns, list):
2921 | raise ToolInputError(
2922 | "Exclude patterns must be a list of strings.",
2923 | param_name="exclude_patterns",
2924 | provided_value=exclude_patterns,
2925 | )
2926 | if not all(isinstance(p, str) for p in exclude_patterns):
2927 | raise ToolInputError(
2928 | "All items in exclude_patterns must be strings.", param_name="exclude_patterns"
2929 | )
2930 | if not isinstance(case_sensitive, bool):
2931 | raise ToolInputError(
2932 | "case_sensitive must be a boolean.",
2933 | param_name="case_sensitive",
2934 | provided_value=case_sensitive,
2935 | )
2936 | if not isinstance(search_content, bool):
2937 | raise ToolInputError(
2938 | "search_content must be a boolean.",
2939 | param_name="search_content",
2940 | provided_value=search_content,
2941 | )
2942 | if not isinstance(max_content_bytes, int) or max_content_bytes < 0:
2943 | raise ToolInputError(
2944 | "max_content_bytes must be a non-negative integer.",
2945 | param_name="max_content_bytes",
2946 | provided_value=max_content_bytes,
2947 | )
2948 |
2949 | search_errors: List[str] = []
2950 | # Using a set to store matched paths avoids duplicates efficiently.
2951 | matched_paths: Set[str] = set()
2952 |
2953 | # Prepare pattern based on case sensitivity
2954 | search_pattern = pattern if case_sensitive else pattern.lower()
2955 |
2956 | # Error handler callback for async_walk
2957 | MAX_REPORTED_ERRORS = 50
2958 |
2959 | def onerror(os_error: OSError):
2960 | """Callback to handle and log errors during file tree walking."""
2961 | err_msg = f"Permission or access error during search near '{getattr(os_error, 'filename', 'N/A')}': {getattr(os_error, 'strerror', str(os_error))}"
2962 | # Limit number of reported errors to avoid flooding logs/results
2963 | if len(search_errors) < MAX_REPORTED_ERRORS:
2964 | logger.warning(err_msg, emoji_key="warning")
2965 | search_errors.append(err_msg)
2966 | elif len(search_errors) == MAX_REPORTED_ERRORS:
2967 | suppress_msg = f"... (Further {type(os_error).__name__} errors suppressed)"
2968 | logger.warning(suppress_msg, emoji_key="warning")
2969 | search_errors.append(suppress_msg)
2970 |
2971 | # --- File Content Search Task (if enabled) ---
2972 | async def check_file_content(filepath: str) -> bool:
2973 | """Reads limited file content and checks for the pattern."""
2974 | try:
2975 | # Read limited chunk, ignore decoding errors for content search robustness
2976 | async with aiofiles.open(filepath, mode="r", encoding="utf-8", errors="ignore") as f:
2977 | content_chunk = await f.read(max_content_bytes)
2978 | # Perform search (case sensitive or insensitive)
2979 | if case_sensitive:
2980 | return pattern in content_chunk
2981 | else:
2982 | return search_pattern in content_chunk.lower() # Compare lower case
2983 | except OSError as read_err:
2984 | # Log content read errors but don't necessarily fail the whole search
2985 | logger.warning(
2986 | f"Could not read content of {filepath} for search: {read_err}", emoji_key="warning"
2987 | )
2988 | onerror(read_err) # Report as a search error
2989 | except Exception as read_unexpected_err:
2990 | logger.error(
2991 | f"Unexpected error reading content of {filepath}: {read_unexpected_err}",
2992 | exc_info=True,
2993 | emoji_key="error",
2994 | )
2995 | # Do not report unexpected errors via onerror, log them fully.
2996 | return False
2997 |
2998 | # --- End File Content Search Task ---
2999 |
3000 | try:
3001 | # Use the async_walk helper for efficient traversal and exclusion handling
3002 | # followlinks=True: Search should probably follow links to find matches within linked dirs too.
3003 | # Exclude patterns will apply to paths within the linked directories relative to the base path.
3004 | # Since we've fixed async_walk to use await and iterate properly, this should work as expected
3005 | async for root, dirs, files in async_walk(
3006 | validated_path,
3007 | onerror=onerror,
3008 | exclude_patterns=exclude_patterns,
3009 | base_path=validated_path, # Use original validated path as base for excludes
3010 | followlinks=True,
3011 | ):
3012 | # Check matching directory names
3013 | for dirname in dirs:
3014 | name_to_check = dirname if case_sensitive else dirname.lower()
3015 | if search_pattern in name_to_check:
3016 | match_path = os.path.join(root, dirname)
3017 | matched_paths.add(match_path) # Add to set (handles duplicates)
3018 |
3019 | # Check matching file names OR content
3020 | content_check_tasks = []
3021 | files_to_check_content = []
3022 |
3023 | for filename in files:
3024 | name_to_check = filename if case_sensitive else filename.lower()
3025 | match_path = os.path.join(root, filename)
3026 | name_match = search_pattern in name_to_check
3027 |
3028 | if name_match:
3029 | # Check if already added via content search in a previous iteration (unlikely but possible)
3030 | if match_path not in matched_paths:
3031 | matched_paths.add(match_path) # Add name match
3032 |
3033 | # If searching content AND name didn't already match, schedule content check
3034 | # Also check if path isn't already matched from a directory name match higher up.
3035 | if search_content and match_path not in matched_paths:
3036 | # Avoid queueing files already matched by name (implicitly handled by set)
3037 | files_to_check_content.append(match_path)
3038 | # Create task but don't await yet, gather below
3039 | content_check_tasks.append(check_file_content(match_path))
3040 |
3041 | # Run content checks concurrently for this directory level
3042 | if content_check_tasks:
3043 | content_results = await asyncio.gather(*content_check_tasks, return_exceptions=True)
3044 | for idx, result in enumerate(content_results):
3045 | file_path_checked = files_to_check_content[idx]
3046 | if isinstance(result, Exception):
3047 | logger.warning(
3048 | f"Error during content check task for {file_path_checked}: {result}",
3049 | emoji_key="error",
3050 | )
3051 | # Errors during check_file_content are logged there, potentially reported via onerror too.
3052 | elif result is True: # Content matched
3053 | matched_paths.add(file_path_checked) # Add content match
3054 |
3055 | except Exception as e:
3056 | # Catch unexpected errors during the async iteration/walk setup itself
3057 | logger.error(
3058 | f"Unexpected error during file search setup or walk in {path}: {e}",
3059 | exc_info=True,
3060 | emoji_key="error",
3061 | )
3062 | raise ToolError(
3063 | f"An unexpected error occurred during search execution: {str(e)}",
3064 | context={"path": path, "pattern": pattern},
3065 | ) from e
3066 |
3067 | processing_time = time.monotonic() - start_time
3068 | # Convert the set of unique matched paths to a sorted list for consistent output.
3069 | unique_matches = sorted(list(matched_paths))
3070 |
3071 | logger.success(
3072 | f"Search for '{pattern}' in {path} completed ({len(unique_matches)} unique matches found)",
3073 | emoji_key="search",
3074 | errors_encountered=len(search_errors),
3075 | time=processing_time,
3076 | case_sensitive=case_sensitive,
3077 | search_content=search_content,
3078 | )
3079 |
3080 | result = {
3081 | "path": validated_path,
3082 | "pattern": pattern,
3083 | "case_sensitive": case_sensitive,
3084 | "search_content": search_content,
3085 | "matches": unique_matches,
3086 | "success": True,
3087 | "message": f"Found {len(unique_matches)} unique matches for '{pattern}' in '{validated_path}'.",
3088 | }
3089 | if search_errors:
3090 | # Add consolidated warning if errors occurred during walk/stat.
3091 | result["warnings"] = search_errors # Include actual errors reported by onerror
3092 | result["message"] += (
3093 | f" Encountered {len(search_errors)} access or read errors during search."
3094 | )
3095 |
3096 | return result
3097 |
3098 |
3099 | @with_tool_metrics
3100 | @with_error_handling
3101 | async def get_file_info(path: str) -> Dict[str, Any]:
3102 | """Get detailed metadata about a specific file or directory asynchronously.
3103 |
3104 | Validates the path exists and is allowed. Returns information like size,
3105 | timestamps, type, and permissions. Uses lstat to report info about the path
3106 | itself (including if it's a symlink).
3107 |
3108 | Args:
3109 | path: Path to the file or directory.
3110 |
3111 | Returns:
3112 | Dictionary containing detailed file information, marked with success=True.
3113 | If info retrieval fails after validation, raises ToolError.
3114 | """
3115 | start_time = time.monotonic()
3116 |
3117 | # Validate path (must exist, check_exists=True), but don't resolve symlinks
3118 | # We want to get info about the path as specified by the user
3119 | validation_result = await validate_path(path, check_exists=True, resolve_symlinks=False)
3120 |
3121 | # Check if this is a symlink
3122 | is_symlink = isinstance(validation_result, dict) and validation_result.get("is_symlink")
3123 | if is_symlink:
3124 | validated_path = validation_result.get("symlink_path")
3125 | else:
3126 | validated_path = validation_result
3127 |
3128 | # Get file information asynchronously using the helper
3129 | # Don't follow symlinks - we want info about the path itself
3130 | info = await format_file_info(validated_path, follow_symlinks=False)
3131 |
3132 | # Check if the helper returned an error structure
3133 | if "error" in info:
3134 | # Propagate the error. Since path validation passed, this is likely
3135 | # a transient issue or permission problem reading metadata. Use ToolError.
3136 | raise ToolError(
3137 | f"Failed to get file info for '{validated_path}': {info['error']}",
3138 | context={"path": validated_path},
3139 | )
3140 |
3141 | # Info retrieval successful
3142 | processing_time = time.monotonic() - start_time
3143 | logger.success(
3144 | f"Got file info for: {path} (resolved: {validated_path})",
3145 | emoji_key="file",
3146 | time=processing_time,
3147 | )
3148 |
3149 | # Add success flag and descriptive message to the info dictionary
3150 | info["success"] = True
3151 | info["message"] = f"Successfully retrieved info for '{validated_path}'."
3152 | return info
3153 |
3154 |
3155 | @with_tool_metrics
3156 | @with_error_handling
3157 | async def list_allowed_directories() -> Dict[str, Any]:
3158 | """List all directories configured as allowed for filesystem access.
3159 |
3160 | This is primarily an administrative/debugging tool. Reads from the loaded config.
3161 |
3162 | Returns:
3163 | Dictionary containing the list of allowed base directory paths.
3164 | """
3165 | start_time = time.monotonic()
3166 |
3167 | # --- Use get_allowed_directories which reads from config ---
3168 | try:
3169 | allowed_dirs = get_allowed_directories()
3170 | except Exception as e:
3171 | # Handle rare errors during config retrieval itself
3172 | raise ToolError(f"Failed to retrieve allowed directories configuration: {e}") from e
3173 |
3174 | processing_time = time.monotonic() - start_time
3175 | logger.success(
3176 | f"Listed {len(allowed_dirs)} allowed directories", emoji_key="config", time=processing_time
3177 | )
3178 |
3179 | return {
3180 | "directories": allowed_dirs,
3181 | "count": len(allowed_dirs),
3182 | "success": True,
3183 | "message": f"Retrieved {len(allowed_dirs)} configured allowed directories.",
3184 | }
3185 |
```