#
tokens: 18503/50000 4/71 files (page 2/2)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 2 of 2. Use http://codebase.md/ai-zerolab/mcp-toolbox?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .github
│   ├── actions
│   │   └── setup-python-env
│   │       └── action.yml
│   └── workflows
│       ├── main.yml
│       ├── on-release-main.yml
│       └── validate-codecov-config.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .vscode
│   └── settings.json
├── codecov.yaml
├── CONTRIBUTING.md
├── Dockerfile
├── docs
│   ├── index.md
│   └── modules.md
├── generate_config_template.py
├── LICENSE
├── llms.txt
├── Makefile
├── mcp_toolbox
│   ├── __init__.py
│   ├── app.py
│   ├── audio
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── cli.py
│   ├── command_line
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── config.py
│   ├── enhance
│   │   ├── __init__.py
│   │   ├── memory.py
│   │   └── tools.py
│   ├── figma
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── file_ops
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── flux
│   │   ├── __init__.py
│   │   ├── api.py
│   │   └── tools.py
│   ├── log.py
│   ├── markitdown
│   │   ├── __init__.py
│   │   └── tools.py
│   ├── web
│   │   ├── __init__.py
│   │   └── tools.py
│   └── xiaoyuzhoufm
│       ├── __init__.py
│       └── tools.py
├── mkdocs.yml
├── pyproject.toml
├── pytest.ini
├── README.md
├── smithery.yaml
├── tests
│   ├── audio
│   │   └── test_audio_tools.py
│   ├── command_line
│   │   └── test_command_line_tools.py
│   ├── enhance
│   │   ├── test_enhance_tools.py
│   │   └── test_memory.py
│   ├── figma
│   │   └── test_figma_tools.py
│   ├── file_ops
│   │   └── test_file_ops_tools.py
│   ├── flux
│   │   └── test_flux_tools.py
│   ├── markitdown
│   │   └── test_markitdown_tools.py
│   ├── mock
│   │   └── figma
│   │       ├── delete_comment.json
│   │       ├── get_comments.json
│   │       ├── get_component.json
│   │       ├── get_file_components.json
│   │       ├── get_file_nodes.json
│   │       ├── get_file_styles.json
│   │       ├── get_file.json
│   │       ├── get_image_fills.json
│   │       ├── get_image.json
│   │       ├── get_project_files.json
│   │       ├── get_style.json
│   │       ├── get_team_component_sets.json
│   │       ├── get_team_components.json
│   │       ├── get_team_projects.json
│   │       ├── get_team_styles.json
│   │       └── post_comment.json
│   ├── web
│   │   └── test_web_tools.py
│   └── xiaoyuzhoufm
│       └── test_xiaoyuzhoufm_tools.py
├── tox.ini
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/mcp_toolbox/figma/tools.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | import time
  3 | from pathlib import Path
  4 | from typing import Annotated, Any
  5 | 
  6 | import httpx
  7 | from pydantic import BaseModel, Field
  8 | 
  9 | from mcp_toolbox.app import mcp
 10 | from mcp_toolbox.config import Config
 11 | 
 12 | 
 13 | # Type definitions for request/response parameters
 14 | class ClientMeta(BaseModel):
 15 |     x: float
 16 |     y: float
 17 |     node_id: str | None = None
 18 |     node_offset: dict[str, float] | None = None
 19 | 
 20 | 
 21 | # API Client
 22 | class FigmaApiClient:
 23 |     BASE_URL = "https://api.figma.com/v1"
 24 | 
 25 |     def __init__(self):
 26 |         self.config = Config()
 27 | 
 28 |     async def get_access_token(self) -> str:
 29 |         if not self.config.figma_api_key:
 30 |             raise ValueError("No Figma API key provided. Set the FIGMA_API_KEY environment variable.")
 31 |         return self.config.figma_api_key
 32 | 
 33 |     async def make_request(self, path: str, method: str = "GET", data: Any = None) -> dict[str, Any]:
 34 |         token = await self.get_access_token()
 35 | 
 36 |         async with httpx.AsyncClient(
 37 |             transport=httpx.AsyncHTTPTransport(retries=3),
 38 |             timeout=30,
 39 |         ) as client:
 40 |             headers = {"X-Figma-Token": token}
 41 |             url = f"{self.BASE_URL}{path}"
 42 | 
 43 |             try:
 44 |                 if method == "GET":
 45 |                     response = await client.get(url, headers=headers)
 46 |                 elif method == "POST":
 47 |                     response = await client.post(url, headers=headers, json=data)
 48 |                 elif method == "DELETE":
 49 |                     response = await client.delete(url, headers=headers)
 50 |                 else:
 51 |                     raise ValueError(f"Unsupported HTTP method: {method}")
 52 | 
 53 |                 response.raise_for_status()
 54 |                 return response.json()
 55 |             except httpx.HTTPStatusError as e:
 56 |                 figma_error = (
 57 |                     e.response.json() if e.response.content else {"status": e.response.status_code, "err": str(e)}
 58 |                 )
 59 |                 raise ValueError(
 60 |                     f"Figma API error: {figma_error.get('err', figma_error.get('message', str(e)))}"
 61 |                 ) from e
 62 |             except httpx.RequestError as e:
 63 |                 raise ValueError(f"Request error: {e!s}") from e
 64 | 
 65 |     def build_query_string(self, params: dict[str, Any]) -> str:
 66 |         # Filter out None values
 67 |         filtered_params = {k: v for k, v in params.items() if v is not None}
 68 | 
 69 |         if not filtered_params:
 70 |             return ""
 71 | 
 72 |         # Convert lists to comma-separated strings
 73 |         for key, value in filtered_params.items():
 74 |             if isinstance(value, list):
 75 |                 filtered_params[key] = ",".join(map(str, value))
 76 | 
 77 |         # Build query string
 78 |         query_parts = [f"{k}={v}" for k, v in filtered_params.items()]
 79 |         return "?" + "&".join(query_parts)
 80 | 
 81 | 
 82 | # Cache Manager
 83 | class CacheManager:
 84 |     def __init__(self):
 85 |         self.config = Config()
 86 |         self.cache_dir = Path(self.config.cache_dir)
 87 |         self.cache_dir.mkdir(parents=True, exist_ok=True)
 88 | 
 89 |     def save_to_cache(self, filename: str, data: Any) -> str:
 90 |         file_path = self.cache_dir / filename
 91 |         with open(file_path, "w") as f:
 92 |             json.dump(data, f, indent=2)
 93 |         return str(file_path)
 94 | 
 95 | 
 96 | # Initialize API client and cache manager
 97 | api_client = FigmaApiClient()
 98 | cache_manager = CacheManager()
 99 | 
100 | 
101 | # Tool implementations
102 | @mcp.tool(description="Get a Figma file by key")
103 | async def figma_get_file(
104 |     file_key: Annotated[str, Field(description="The key of the file to get")],
105 |     version: Annotated[str | None, Field(default=None, description="A specific version ID to get")] = None,
106 |     depth: Annotated[int | None, Field(default=None, description="Depth of nodes to return 1-4")] = None,
107 |     branch_data: Annotated[bool | None, Field(default=None, description="Include branch data if true")] = None,
108 | ) -> dict[str, Any]:
109 |     """Get a Figma file by key."""
110 |     params = {"version": version, "depth": depth, "branch_data": branch_data}
111 | 
112 |     query_string = api_client.build_query_string(params)
113 |     result = await api_client.make_request(f"/files/{file_key}{query_string}")
114 | 
115 |     # Save to cache
116 |     try:
117 |         filename = f"file_{file_key}_{int(time.time() * 1000)}.json"
118 |         file_path = cache_manager.save_to_cache(filename, result)
119 |         return {
120 |             "file_path": file_path,
121 |             "message": "File data saved to local cache. Use this file path to access the complete data.",
122 |         }
123 |     except Exception:
124 |         # If saving to cache fails, return original result
125 |         return result
126 | 
127 | 
128 | @mcp.tool(description="Get specific nodes from a Figma file.")
129 | async def figma_get_file_nodes(
130 |     file_key: Annotated[str, Field(description="The key of the file to get nodes from")],
131 |     node_ids: Annotated[list[str], Field(description="Array of node IDs to get")],
132 |     depth: Annotated[int | None, Field(default=None, description="Depth of nodes to return 1-4")] = None,
133 |     version: Annotated[str | None, Field(default=None, description="A specific version ID to get")] = None,
134 | ) -> dict[str, Any]:
135 |     """Get specific nodes from a Figma file."""
136 |     params = {"ids": node_ids, "depth": depth, "version": version}
137 | 
138 |     query_string = api_client.build_query_string(params)
139 |     result = await api_client.make_request(f"/files/{file_key}/nodes{query_string}")
140 | 
141 |     # Save to cache
142 |     try:
143 |         filename = f"file_nodes_{file_key}_{int(time.time() * 1000)}.json"
144 |         file_path = cache_manager.save_to_cache(filename, result)
145 |         return {
146 |             "file_path": file_path,
147 |             "message": "File nodes data saved to local cache. Use this file path to access the complete data.",
148 |         }
149 |     except Exception:
150 |         # If saving to cache fails, return original result
151 |         return result
152 | 
153 | 
154 | @mcp.tool(description="Get images for nodes in a Figma file.")
155 | async def figma_get_image(
156 |     file_key: Annotated[str, Field(description="The key of the file to get images from")],
157 |     ids: Annotated[list[str], Field(description="Array of node IDs to render")],
158 |     scale: Annotated[float | None, Field(default=None, description="Scale factor to render at 0.01-4")] = None,
159 |     format_type: Annotated[str | None, Field(default=None, description="Image format jpg/png/svg/pdf")] = None,
160 |     svg_include_id: Annotated[bool | None, Field(default=None, description="Include IDs in SVG output")] = None,
161 |     svg_simplify_stroke: Annotated[
162 |         bool | None, Field(default=None, description="Simplify strokes in SVG output")
163 |     ] = None,
164 |     use_absolute_bounds: Annotated[bool | None, Field(default=None, description="Use absolute bounds")] = None,
165 | ) -> dict[str, Any]:
166 |     """Get images for nodes in a Figma file."""
167 |     params = {
168 |         "ids": ids,
169 |         "scale": scale,
170 |         "format": format_type,
171 |         "svg_include_id": svg_include_id,
172 |         "svg_simplify_stroke": svg_simplify_stroke,
173 |         "use_absolute_bounds": use_absolute_bounds,
174 |     }
175 | 
176 |     query_string = api_client.build_query_string(params)
177 |     return await api_client.make_request(f"/images/{file_key}{query_string}")
178 | 
179 | 
180 | @mcp.tool(description="Get URLs for images used in a Figma file.")
181 | async def figma_get_image_fills(
182 |     file_key: Annotated[str, Field(description="The key of the file to get image fills from")],
183 | ) -> dict[str, Any]:
184 |     """Get URLs for images used in a Figma file."""
185 |     return await api_client.make_request(f"/files/{file_key}/images")
186 | 
187 | 
188 | @mcp.tool(description="Get comments on a Figma file.")
189 | async def figma_get_comments(
190 |     file_key: Annotated[str, Field(description="The key of the file to get comments from")],
191 | ) -> dict[str, Any]:
192 |     """Get comments on a Figma file."""
193 |     return await api_client.make_request(f"/files/{file_key}/comments")
194 | 
195 | 
196 | @mcp.tool(description="Post a comment on a Figma file.")
197 | async def figma_post_comment(
198 |     file_key: Annotated[str, Field(description="The key of the file to comment on")],
199 |     message: Annotated[str, Field(description="Comment message text")],
200 |     client_meta: Annotated[
201 |         dict[str, Any] | None, Field(default=None, description="Position of the comment x/y/node_id/node_offset")
202 |     ] = None,
203 |     comment_id: Annotated[str | None, Field(default=None, description="ID of comment to reply to")] = None,
204 | ) -> dict[str, Any]:
205 |     """Post a comment on a Figma file."""
206 |     comment_data = {"message": message}
207 | 
208 |     if client_meta:
209 |         comment_data["client_meta"] = client_meta
210 | 
211 |     if comment_id:
212 |         comment_data["comment_id"] = comment_id
213 | 
214 |     return await api_client.make_request(f"/files/{file_key}/comments", "POST", comment_data)
215 | 
216 | 
217 | @mcp.tool(description="Delete a comment from a Figma file.")
218 | async def figma_delete_comment(
219 |     file_key: Annotated[str, Field(description="The key of the file to delete a comment from")],
220 |     comment_id: Annotated[str, Field(description="ID of the comment to delete")],
221 | ) -> dict[str, Any]:
222 |     """Delete a comment from a Figma file."""
223 |     return await api_client.make_request(f"/files/{file_key}/comments/{comment_id}", "DELETE")
224 | 
225 | 
226 | @mcp.tool(description="Get projects for a team.")
227 | async def figma_get_team_projects(
228 |     team_id: Annotated[str, Field(description="The team ID")],
229 |     page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
230 |     cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
231 | ) -> dict[str, Any]:
232 |     """Get projects for a team."""
233 |     params = {"page_size": page_size, "cursor": cursor}
234 | 
235 |     query_string = api_client.build_query_string(params)
236 |     return await api_client.make_request(f"/teams/{team_id}/projects{query_string}")
237 | 
238 | 
239 | @mcp.tool(description="Get files for a project.")
240 | async def figma_get_project_files(
241 |     project_id: Annotated[str, Field(description="The project ID")],
242 |     page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
243 |     cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
244 |     branch_data: Annotated[bool | None, Field(default=None, description="Include branch data if true")] = None,
245 | ) -> dict[str, Any]:
246 |     """Get files for a project."""
247 |     params = {"page_size": page_size, "cursor": cursor, "branch_data": branch_data}
248 | 
249 |     query_string = api_client.build_query_string(params)
250 |     return await api_client.make_request(f"/projects/{project_id}/files{query_string}")
251 | 
252 | 
253 | @mcp.tool(description="Get components for a team.")
254 | async def figma_get_team_components(
255 |     team_id: Annotated[str, Field(description="The team ID")],
256 |     page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
257 |     cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
258 | ) -> dict[str, Any]:
259 |     """Get components for a team."""
260 |     params = {"page_size": page_size, "cursor": cursor}
261 | 
262 |     query_string = api_client.build_query_string(params)
263 |     return await api_client.make_request(f"/teams/{team_id}/components{query_string}")
264 | 
265 | 
266 | @mcp.tool(description="Get components from a file.")
267 | async def figma_get_file_components(
268 |     file_key: Annotated[str, Field(description="The key of the file to get components from")],
269 | ) -> dict[str, Any]:
270 |     """Get components from a file."""
271 |     return await api_client.make_request(f"/files/{file_key}/components")
272 | 
273 | 
274 | @mcp.tool(description="Get a component by key.")
275 | async def figma_get_component(key: Annotated[str, Field(description="The component key")]) -> dict[str, Any]:
276 |     """Get a component by key."""
277 |     return await api_client.make_request(f"/components/{key}")
278 | 
279 | 
280 | @mcp.tool(description="Get component sets for a team.")
281 | async def figma_get_team_component_sets(
282 |     team_id: Annotated[str, Field(description="The team ID")],
283 |     page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
284 |     cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
285 | ) -> dict[str, Any]:
286 |     """Get component sets for a team."""
287 |     params = {"page_size": page_size, "cursor": cursor}
288 | 
289 |     query_string = api_client.build_query_string(params)
290 |     return await api_client.make_request(f"/teams/{team_id}/component_sets{query_string}")
291 | 
292 | 
293 | @mcp.tool(description="Get styles for a team.")
294 | async def figma_get_team_styles(
295 |     team_id: Annotated[str, Field(description="The team ID")],
296 |     page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None,
297 |     cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None,
298 | ) -> dict[str, Any]:
299 |     """Get styles for a team."""
300 |     params = {"page_size": page_size, "cursor": cursor}
301 | 
302 |     query_string = api_client.build_query_string(params)
303 |     return await api_client.make_request(f"/teams/{team_id}/styles{query_string}")
304 | 
305 | 
306 | @mcp.tool(description="Get styles from a file.")
307 | async def figma_get_file_styles(
308 |     file_key: Annotated[str, Field(description="The key of the file to get styles from")],
309 | ) -> dict[str, Any]:
310 |     """Get styles from a file."""
311 |     return await api_client.make_request(f"/files/{file_key}/styles")
312 | 
313 | 
314 | @mcp.tool(description="Get a style by key.")
315 | async def figma_get_style(key: Annotated[str, Field(description="The style key")]) -> dict[str, Any]:
316 |     """Get a style by key."""
317 |     return await api_client.make_request(f"/styles/{key}")
318 | 
```

--------------------------------------------------------------------------------
/tests/figma/test_figma_tools.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | from pathlib import Path
  3 | from unittest.mock import patch
  4 | 
  5 | import pytest
  6 | 
  7 | from mcp_toolbox.figma.tools import (
  8 |     CacheManager,
  9 |     FigmaApiClient,
 10 |     figma_delete_comment,
 11 |     figma_get_comments,
 12 |     figma_get_component,
 13 |     figma_get_file,
 14 |     figma_get_file_components,
 15 |     figma_get_file_nodes,
 16 |     figma_get_file_styles,
 17 |     figma_get_image,
 18 |     figma_get_image_fills,
 19 |     figma_get_project_files,
 20 |     figma_get_style,
 21 |     figma_get_team_component_sets,
 22 |     figma_get_team_components,
 23 |     figma_get_team_projects,
 24 |     figma_get_team_styles,
 25 |     figma_post_comment,
 26 | )
 27 | 
 28 | 
 29 | # Helper function to load mock data
 30 | def load_mock_data(filename):
 31 |     mock_dir = Path(__file__).parent.parent / "mock" / "figma"
 32 |     file_path = mock_dir / filename
 33 | 
 34 |     if not file_path.exists():
 35 |         # Create empty mock data if it doesn't exist
 36 |         mock_data = {"mock": "data"}
 37 |         with open(file_path, "w") as f:
 38 |             json.dump(mock_data, f)
 39 | 
 40 |     with open(file_path) as f:
 41 |         return json.load(f)
 42 | 
 43 | 
 44 | # Patch the FigmaApiClient.make_request method
 45 | @pytest.fixture
 46 | def mock_make_request():
 47 |     with patch.object(FigmaApiClient, "make_request") as mock:
 48 | 
 49 |         def side_effect(path, method="GET", data=None):
 50 |             # Extract the tool name from the path
 51 |             parts = path.strip("/").split("/")
 52 | 
 53 |             if len(parts) >= 2 and parts[0] == "files" and parts[1]:
 54 |                 file_key = parts[1]
 55 | 
 56 |                 if len(parts) == 2:
 57 |                     # get_file
 58 |                     return load_mock_data("get_file.json")
 59 |                 elif len(parts) == 3:
 60 |                     if parts[2] == "nodes":
 61 |                         # get_file_nodes
 62 |                         return load_mock_data("get_file_nodes.json")
 63 |                     elif parts[2] == "images":
 64 |                         # get_image_fills
 65 |                         return load_mock_data("get_image_fills.json")
 66 |                     elif parts[2] == "components":
 67 |                         # get_file_components
 68 |                         return load_mock_data("get_file_components.json")
 69 |                     elif parts[2] == "styles":
 70 |                         # get_file_styles
 71 |                         return load_mock_data("get_file_styles.json")
 72 |                     elif parts[2] == "comments":
 73 |                         if method == "GET":
 74 |                             # get_comments
 75 |                             return load_mock_data("get_comments.json")
 76 |                         elif method == "POST":
 77 |                             # post_comment
 78 |                             return load_mock_data("post_comment.json")
 79 |                 elif len(parts) == 4 and parts[2] == "comments":
 80 |                     # delete_comment
 81 |                     return load_mock_data("delete_comment.json")
 82 | 
 83 |             elif parts[0] == "images" and len(parts) >= 2:
 84 |                 # get_image
 85 |                 return load_mock_data("get_image.json")
 86 | 
 87 |             elif parts[0] == "teams" and len(parts) >= 3:
 88 |                 team_id = parts[1]
 89 | 
 90 |                 if parts[2] == "projects":
 91 |                     # get_team_projects
 92 |                     return load_mock_data("get_team_projects.json")
 93 |                 elif parts[2] == "components":
 94 |                     # get_team_components
 95 |                     return load_mock_data("get_team_components.json")
 96 |                 elif parts[2] == "component_sets":
 97 |                     # get_team_component_sets
 98 |                     return load_mock_data("get_team_component_sets.json")
 99 |                 elif parts[2] == "styles":
100 |                     # get_team_styles
101 |                     return load_mock_data("get_team_styles.json")
102 | 
103 |             elif parts[0] == "projects" and len(parts) >= 3:
104 |                 # get_project_files
105 |                 return load_mock_data("get_project_files.json")
106 | 
107 |             elif parts[0] == "components" and len(parts) >= 2:
108 |                 # get_component
109 |                 return load_mock_data("get_component.json")
110 | 
111 |             elif parts[0] == "styles" and len(parts) >= 2:
112 |                 # get_style
113 |                 return load_mock_data("get_style.json")
114 | 
115 |             # Default mock data
116 |             return {"mock": "data"}
117 | 
118 |         mock.side_effect = side_effect
119 |         yield mock
120 | 
121 | 
122 | # Patch the CacheManager.save_to_cache method
123 | @pytest.fixture
124 | def mock_save_to_cache():
125 |     with patch.object(CacheManager, "save_to_cache") as mock:
126 |         mock.return_value = "/mock/path/to/cache/file.json"
127 |         yield mock
128 | 
129 | 
130 | # Test get_file function
131 | @pytest.mark.asyncio
132 | async def test_get_file(mock_make_request, mock_save_to_cache):
133 |     # Test with minimal parameters
134 |     result = await figma_get_file("test_file_key")
135 | 
136 |     # Verify make_request was called with correct parameters
137 |     mock_make_request.assert_called_once_with("/files/test_file_key")
138 | 
139 |     # Verify save_to_cache was called
140 |     mock_save_to_cache.assert_called_once()
141 | 
142 |     # Verify the result contains expected fields
143 |     assert "file_path" in result
144 |     assert "message" in result
145 |     assert result["file_path"] == "/mock/path/to/cache/file.json"
146 | 
147 |     # Reset mocks for next test
148 |     mock_make_request.reset_mock()
149 |     mock_save_to_cache.reset_mock()
150 | 
151 |     # Test with all parameters
152 |     result = await figma_get_file("test_file_key", version="123", depth=2, branch_data=True)
153 | 
154 |     # Verify make_request was called with correct parameters
155 |     mock_make_request.assert_called_once_with("/files/test_file_key?version=123&depth=2&branch_data=True")
156 | 
157 | 
158 | # Test get_file_nodes function
159 | @pytest.mark.asyncio
160 | async def test_get_file_nodes(mock_make_request, mock_save_to_cache):
161 |     # Test with minimal parameters
162 |     result = await figma_get_file_nodes("test_file_key", ["node1", "node2"])
163 | 
164 |     # Verify make_request was called with correct parameters
165 |     mock_make_request.assert_called_once_with("/files/test_file_key/nodes?ids=node1,node2")
166 | 
167 |     # Verify save_to_cache was called
168 |     mock_save_to_cache.assert_called_once()
169 | 
170 |     # Verify the result contains expected fields
171 |     assert "file_path" in result
172 |     assert "message" in result
173 |     assert result["file_path"] == "/mock/path/to/cache/file.json"
174 | 
175 |     # Reset mocks for next test
176 |     mock_make_request.reset_mock()
177 |     mock_save_to_cache.reset_mock()
178 | 
179 |     # Test with all parameters
180 |     result = await figma_get_file_nodes("test_file_key", ["node1", "node2"], depth=2, version="123")
181 | 
182 |     # Verify make_request was called with correct parameters
183 |     mock_make_request.assert_called_once_with("/files/test_file_key/nodes?ids=node1,node2&depth=2&version=123")
184 | 
185 | 
186 | # Test get_image function
187 | @pytest.mark.asyncio
188 | async def test_get_image(mock_make_request):
189 |     # Test with minimal parameters
190 |     result = await figma_get_image("test_file_key", ["node1", "node2"])
191 | 
192 |     # Verify make_request was called with correct parameters
193 |     mock_make_request.assert_called_once_with("/images/test_file_key?ids=node1,node2")
194 | 
195 |     # Reset mock for next test
196 |     mock_make_request.reset_mock()
197 | 
198 |     # Test with all parameters
199 |     result = await figma_get_image(
200 |         "test_file_key",
201 |         ["node1", "node2"],
202 |         scale=2.0,
203 |         format_type="png",
204 |         svg_include_id=True,
205 |         svg_simplify_stroke=True,
206 |         use_absolute_bounds=True,
207 |     )
208 | 
209 |     # Verify make_request was called with correct parameters
210 |     mock_make_request.assert_called_once_with(
211 |         "/images/test_file_key?ids=node1,node2&scale=2.0&format=png&svg_include_id=True&svg_simplify_stroke=True&use_absolute_bounds=True"
212 |     )
213 | 
214 | 
215 | # Test get_image_fills function
216 | @pytest.mark.asyncio
217 | async def test_get_image_fills(mock_make_request):
218 |     result = await figma_get_image_fills("test_file_key")
219 | 
220 |     # Verify make_request was called with correct parameters
221 |     mock_make_request.assert_called_once_with("/files/test_file_key/images")
222 | 
223 | 
224 | # Test get_comments function
225 | @pytest.mark.asyncio
226 | async def test_get_comments(mock_make_request):
227 |     result = await figma_get_comments("test_file_key")
228 | 
229 |     # Verify make_request was called with correct parameters
230 |     mock_make_request.assert_called_once_with("/files/test_file_key/comments")
231 | 
232 | 
233 | # Test post_comment function
234 | @pytest.mark.asyncio
235 | async def test_post_comment(mock_make_request):
236 |     # Test with minimal parameters
237 |     result = await figma_post_comment("test_file_key", "Test comment")
238 | 
239 |     # Verify make_request was called with correct parameters
240 |     mock_make_request.assert_called_once_with("/files/test_file_key/comments", "POST", {"message": "Test comment"})
241 | 
242 |     # Reset mock for next test
243 |     mock_make_request.reset_mock()
244 | 
245 |     # Test with all parameters
246 |     client_meta = {"x": 100, "y": 200, "node_id": "node1", "node_offset": {"x": 10, "y": 20}}
247 | 
248 |     result = await figma_post_comment("test_file_key", "Test comment", client_meta=client_meta, comment_id="comment1")
249 | 
250 |     # Verify make_request was called with correct parameters
251 |     mock_make_request.assert_called_once_with(
252 |         "/files/test_file_key/comments",
253 |         "POST",
254 |         {"message": "Test comment", "client_meta": client_meta, "comment_id": "comment1"},
255 |     )
256 | 
257 | 
258 | # Test delete_comment function
259 | @pytest.mark.asyncio
260 | async def test_delete_comment(mock_make_request):
261 |     result = await figma_delete_comment("test_file_key", "comment1")
262 | 
263 |     # Verify make_request was called with correct parameters
264 |     mock_make_request.assert_called_once_with("/files/test_file_key/comments/comment1", "DELETE")
265 | 
266 | 
267 | # Test get_team_projects function
268 | @pytest.mark.asyncio
269 | async def test_get_team_projects(mock_make_request):
270 |     # Test with minimal parameters
271 |     result = await figma_get_team_projects("team1")
272 | 
273 |     # Verify make_request was called with correct parameters
274 |     mock_make_request.assert_called_once_with("/teams/team1/projects")
275 | 
276 |     # Reset mock for next test
277 |     mock_make_request.reset_mock()
278 | 
279 |     # Test with all parameters
280 |     result = await figma_get_team_projects("team1", page_size=10, cursor="cursor1")
281 | 
282 |     # Verify make_request was called with correct parameters
283 |     mock_make_request.assert_called_once_with("/teams/team1/projects?page_size=10&cursor=cursor1")
284 | 
285 | 
286 | # Test get_project_files function
287 | @pytest.mark.asyncio
288 | async def test_get_project_files(mock_make_request):
289 |     # Test with minimal parameters
290 |     result = await figma_get_project_files("project1")
291 | 
292 |     # Verify make_request was called with correct parameters
293 |     mock_make_request.assert_called_once_with("/projects/project1/files")
294 | 
295 |     # Reset mock for next test
296 |     mock_make_request.reset_mock()
297 | 
298 |     # Test with all parameters
299 |     result = await figma_get_project_files("project1", page_size=10, cursor="cursor1", branch_data=True)
300 | 
301 |     # Verify make_request was called with correct parameters
302 |     mock_make_request.assert_called_once_with("/projects/project1/files?page_size=10&cursor=cursor1&branch_data=True")
303 | 
304 | 
305 | # Test get_team_components function
306 | @pytest.mark.asyncio
307 | async def test_get_team_components(mock_make_request):
308 |     # Test with minimal parameters
309 |     result = await figma_get_team_components("team1")
310 | 
311 |     # Verify make_request was called with correct parameters
312 |     mock_make_request.assert_called_once_with("/teams/team1/components")
313 | 
314 |     # Reset mock for next test
315 |     mock_make_request.reset_mock()
316 | 
317 |     # Test with all parameters
318 |     result = await figma_get_team_components("team1", page_size=10, cursor="cursor1")
319 | 
320 |     # Verify make_request was called with correct parameters
321 |     mock_make_request.assert_called_once_with("/teams/team1/components?page_size=10&cursor=cursor1")
322 | 
323 | 
324 | # Test get_file_components function
325 | @pytest.mark.asyncio
326 | async def test_get_file_components(mock_make_request):
327 |     result = await figma_get_file_components("test_file_key")
328 | 
329 |     # Verify make_request was called with correct parameters
330 |     mock_make_request.assert_called_once_with("/files/test_file_key/components")
331 | 
332 | 
333 | # Test get_component function
334 | @pytest.mark.asyncio
335 | async def test_get_component(mock_make_request):
336 |     result = await figma_get_component("component1")
337 | 
338 |     # Verify make_request was called with correct parameters
339 |     mock_make_request.assert_called_once_with("/components/component1")
340 | 
341 | 
342 | # Test get_team_component_sets function
343 | @pytest.mark.asyncio
344 | async def test_get_team_component_sets(mock_make_request):
345 |     # Test with minimal parameters
346 |     result = await figma_get_team_component_sets("team1")
347 | 
348 |     # Verify make_request was called with correct parameters
349 |     mock_make_request.assert_called_once_with("/teams/team1/component_sets")
350 | 
351 |     # Reset mock for next test
352 |     mock_make_request.reset_mock()
353 | 
354 |     # Test with all parameters
355 |     result = await figma_get_team_component_sets("team1", page_size=10, cursor="cursor1")
356 | 
357 |     # Verify make_request was called with correct parameters
358 |     mock_make_request.assert_called_once_with("/teams/team1/component_sets?page_size=10&cursor=cursor1")
359 | 
360 | 
361 | # Test get_team_styles function
362 | @pytest.mark.asyncio
363 | async def test_get_team_styles(mock_make_request):
364 |     # Test with minimal parameters
365 |     result = await figma_get_team_styles("team1")
366 | 
367 |     # Verify make_request was called with correct parameters
368 |     mock_make_request.assert_called_once_with("/teams/team1/styles")
369 | 
370 |     # Reset mock for next test
371 |     mock_make_request.reset_mock()
372 | 
373 |     # Test with all parameters
374 |     result = await figma_get_team_styles("team1", page_size=10, cursor="cursor1")
375 | 
376 |     # Verify make_request was called with correct parameters
377 |     mock_make_request.assert_called_once_with("/teams/team1/styles?page_size=10&cursor=cursor1")
378 | 
379 | 
380 | # Test get_file_styles function
381 | @pytest.mark.asyncio
382 | async def test_get_file_styles(mock_make_request):
383 |     result = await figma_get_file_styles("test_file_key")
384 | 
385 |     # Verify make_request was called with correct parameters
386 |     mock_make_request.assert_called_once_with("/files/test_file_key/styles")
387 | 
388 | 
389 | # Test get_style function
390 | @pytest.mark.asyncio
391 | async def test_get_style(mock_make_request):
392 |     result = await figma_get_style("style1")
393 | 
394 |     # Verify make_request was called with correct parameters
395 |     mock_make_request.assert_called_once_with("/styles/style1")
396 | 
```

--------------------------------------------------------------------------------
/mcp_toolbox/file_ops/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """File operations tools for MCP-Toolbox."""
  2 | 
  3 | import re
  4 | import stat
  5 | from datetime import datetime
  6 | from pathlib import Path
  7 | from typing import Annotated, Any
  8 | 
  9 | from pydantic import Field
 10 | 
 11 | from mcp_toolbox.app import mcp
 12 | 
 13 | 
 14 | @mcp.tool(description="Read file content.")
 15 | async def read_file_content(
 16 |     path: Annotated[str, Field(description="Path to the file to read")],
 17 |     encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8",
 18 |     chunk_size: Annotated[
 19 |         int,
 20 |         Field(default=1000000, description="Size of each chunk in bytes, default: 1MB"),
 21 |     ] = 1000000,
 22 |     chunk_index: Annotated[int, Field(default=0, description="Index of the chunk to retrieve, 0-based")] = 0,
 23 | ) -> dict[str, Any]:
 24 |     """Read content from a file, with support for chunked reading for large files.
 25 | 
 26 |     Args:
 27 |         path: Path to the file to read
 28 |         encoding: Optional. File encoding (default: utf-8)
 29 |         chunk_size: Optional. Size of each chunk in bytes (default: 1000000, which about 1MB)
 30 |         chunk_index: Optional. Index of the chunk to retrieve, 0-based (default: 0)
 31 | 
 32 |     Returns:
 33 |         Dictionary containing content and metadata, including chunking information
 34 |     """
 35 |     try:
 36 |         file_path = Path(path).expanduser()
 37 | 
 38 |         if not file_path.exists():
 39 |             return {
 40 |                 "error": f"File not found: {path}",
 41 |                 "content": "",
 42 |                 "success": False,
 43 |             }
 44 | 
 45 |         if not file_path.is_file():
 46 |             return {
 47 |                 "error": f"Path is not a file: {path}",
 48 |                 "content": "",
 49 |                 "success": False,
 50 |             }
 51 | 
 52 |         # Get file stats
 53 |         stats = file_path.stat()
 54 |         file_size = stats.st_size
 55 | 
 56 |         # Calculate total chunks
 57 |         total_chunks = (file_size + chunk_size - 1) // chunk_size  # Ceiling division
 58 | 
 59 |         # Validate chunk_index
 60 |         if chunk_index < 0 or (file_size > 0 and chunk_index >= total_chunks):
 61 |             return {
 62 |                 "error": f"Invalid chunk index: {chunk_index}. Valid range is 0 to {total_chunks - 1}",
 63 |                 "content": "",
 64 |                 "success": False,
 65 |                 "total_chunks": total_chunks,
 66 |                 "file_size": file_size,
 67 |             }
 68 | 
 69 |         # Calculate start and end positions for the chunk
 70 |         start_pos = chunk_index * chunk_size
 71 |         end_pos = min(start_pos + chunk_size, file_size)
 72 |         chunk_actual_size = end_pos - start_pos
 73 | 
 74 |         # Read the specified chunk
 75 |         content = ""
 76 |         with open(file_path, "rb") as f:
 77 |             f.seek(start_pos)
 78 |             chunk_bytes = f.read(chunk_actual_size)
 79 | 
 80 |             try:
 81 |                 # Try to decode as text
 82 |                 content = chunk_bytes.decode(encoding, errors="replace")
 83 |             except UnicodeDecodeError:
 84 |                 # If decoding fails, return base64 encoded binary data
 85 |                 import base64
 86 | 
 87 |                 content = base64.b64encode(chunk_bytes).decode("ascii")
 88 |                 encoding = f"base64 (original: {encoding})"
 89 | 
 90 |         return {
 91 |             "content": content,
 92 |             "size": file_size,
 93 |             "chunk_size": chunk_size,
 94 |             "chunk_index": chunk_index,
 95 |             "chunk_actual_size": chunk_actual_size,
 96 |             "total_chunks": total_chunks,
 97 |             "is_last_chunk": chunk_index == total_chunks - 1,
 98 |             "encoding": encoding,
 99 |             "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(),
100 |             "success": True,
101 |         }
102 |     except UnicodeDecodeError:
103 |         return {
104 |             "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.",
105 |             "content": "",
106 |             "success": False,
107 |         }
108 |     except Exception as e:
109 |         return {
110 |             "error": f"Failed to read file: {e!s}",
111 |             "content": "",
112 |             "success": False,
113 |         }
114 | 
115 | 
116 | @mcp.tool(description="Write content to a file.")
117 | async def write_file_content(
118 |     path: Annotated[str, Field(description="Path to the file to write")],
119 |     content: Annotated[str, Field(description="Content to write")],
120 |     encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8",
121 |     append: Annotated[bool, Field(default=False, description="Whether to append to the file")] = False,
122 | ) -> dict[str, Any]:
123 |     """Write content to a file.
124 | 
125 |     Args:
126 |         path: Path to the file to write
127 |         content: Content to write to the file
128 |         encoding: Optional. File encoding (default: utf-8)
129 |         append: Optional. Whether to append to the file (default: False)
130 | 
131 |     Returns:
132 |         Dictionary containing success status and metadata
133 |     """
134 |     try:
135 |         file_path = Path(path).expanduser()
136 | 
137 |         # Create parent directories if they don't exist
138 |         file_path.parent.mkdir(parents=True, exist_ok=True)
139 | 
140 |         # Write content to file
141 |         mode = "a" if append else "w"
142 |         with open(file_path, mode, encoding=encoding) as f:
143 |             f.write(content)
144 | 
145 |         # Get file stats
146 |         stats = file_path.stat()
147 | 
148 |         return {
149 |             "path": str(file_path),
150 |             "size": stats.st_size,
151 |             "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(),
152 |             "success": True,
153 |         }
154 |     except Exception as e:
155 |         return {
156 |             "error": f"Failed to write file: {e!s}",
157 |             "path": path,
158 |             "success": False,
159 |         }
160 | 
161 | 
162 | @mcp.tool(description="Replace content in a file using regular expressions.")
163 | async def replace_in_file(
164 |     path: Annotated[str, Field(description="Path to the file")],
165 |     pattern: Annotated[
166 |         str,
167 |         Field(
168 |             description="Python regular expression pattern (re module). Supports groups, character classes, quantifiers, etc. Examples: '[a-z]+' for lowercase words, '\\d{3}-\\d{4}' for number patterns. Remember to escape backslashes."
169 |         ),
170 |     ],
171 |     replacement: Annotated[str, Field(description="Replacement string")],
172 |     encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8",
173 |     count: Annotated[int, Field(default=0, description="Maximum number of replacements")] = 0,
174 | ) -> dict[str, Any]:
175 |     """Replace content in a file using regular expressions.
176 | 
177 |     Args:
178 |         path: Path to the file
179 |         pattern: Regular expression pattern
180 |         replacement: Replacement string
181 |         encoding: Optional. File encoding (default: utf-8)
182 |         count: Optional. Maximum number of replacements (default: 0, which means all occurrences)
183 | 
184 |     Returns:
185 |         Dictionary containing success status and replacement information
186 |     """
187 |     try:
188 |         file_path = Path(path).expanduser()
189 | 
190 |         if not file_path.exists():
191 |             return {
192 |                 "error": f"File not found: {path}",
193 |                 "success": False,
194 |                 "replacements": 0,
195 |             }
196 | 
197 |         if not file_path.is_file():
198 |             return {
199 |                 "error": f"Path is not a file: {path}",
200 |                 "success": False,
201 |                 "replacements": 0,
202 |             }
203 | 
204 |         # Read file content
205 |         with open(file_path, encoding=encoding) as f:
206 |             content = f.read()
207 | 
208 |         # Compile regex pattern
209 |         try:
210 |             regex = re.compile(pattern)
211 |         except re.error as e:
212 |             return {
213 |                 "error": f"Invalid regular expression: {e!s}",
214 |                 "success": False,
215 |                 "replacements": 0,
216 |             }
217 | 
218 |         # Replace content
219 |         new_content, replacements = regex.subn(replacement, content, count=count)
220 | 
221 |         if replacements > 0:
222 |             # Write updated content back to file
223 |             with open(file_path, "w", encoding=encoding) as f:
224 |                 f.write(new_content)
225 | 
226 |         return {
227 |             "path": str(file_path),
228 |             "replacements": replacements,
229 |             "success": True,
230 |         }
231 |     except UnicodeDecodeError:
232 |         return {
233 |             "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.",
234 |             "success": False,
235 |             "replacements": 0,
236 |         }
237 |     except Exception as e:
238 |         return {
239 |             "error": f"Failed to replace content: {e!s}",
240 |             "success": False,
241 |             "replacements": 0,
242 |         }
243 | 
244 | 
245 | def _format_mode(mode: int) -> str:
246 |     """Format file mode into a string representation.
247 | 
248 |     Args:
249 |         mode: File mode as an integer
250 | 
251 |     Returns:
252 |         String representation of file permissions
253 |     """
254 |     result = ""
255 | 
256 |     # File type
257 |     if stat.S_ISDIR(mode):
258 |         result += "d"
259 |     elif stat.S_ISLNK(mode):
260 |         result += "l"
261 |     else:
262 |         result += "-"
263 | 
264 |     # User permissions
265 |     result += "r" if mode & stat.S_IRUSR else "-"
266 |     result += "w" if mode & stat.S_IWUSR else "-"
267 |     result += "x" if mode & stat.S_IXUSR else "-"
268 | 
269 |     # Group permissions
270 |     result += "r" if mode & stat.S_IRGRP else "-"
271 |     result += "w" if mode & stat.S_IWGRP else "-"
272 |     result += "x" if mode & stat.S_IXGRP else "-"
273 | 
274 |     # Other permissions
275 |     result += "r" if mode & stat.S_IROTH else "-"
276 |     result += "w" if mode & stat.S_IWOTH else "-"
277 |     result += "x" if mode & stat.S_IXOTH else "-"
278 | 
279 |     return result
280 | 
281 | 
282 | def _get_file_info(path: Path) -> dict[str, Any]:
283 |     """Get detailed information about a file or directory.
284 | 
285 |     Args:
286 |         path: Path to the file or directory
287 | 
288 |     Returns:
289 |         Dictionary containing file information
290 |     """
291 |     stats = path.stat()
292 | 
293 |     # Format timestamps
294 |     mtime = datetime.fromtimestamp(stats.st_mtime).isoformat()
295 |     ctime = datetime.fromtimestamp(stats.st_ctime).isoformat()
296 |     atime = datetime.fromtimestamp(stats.st_atime).isoformat()
297 | 
298 |     # Get file type
299 |     if path.is_dir():
300 |         file_type = "directory"
301 |     elif path.is_symlink():
302 |         file_type = "symlink"
303 |     else:
304 |         file_type = "file"
305 | 
306 |     # Format size
307 |     size = stats.st_size
308 |     size_str = f"{size} bytes"
309 |     if size >= 1024:
310 |         size_str = f"{size / 1024:.2f} KB"
311 |     if size >= 1024 * 1024:
312 |         size_str = f"{size / (1024 * 1024):.2f} MB"
313 |     if size >= 1024 * 1024 * 1024:
314 |         size_str = f"{size / (1024 * 1024 * 1024):.2f} GB"
315 | 
316 |     return {
317 |         "name": path.name,
318 |         "path": str(path),
319 |         "type": file_type,
320 |         "size": size,
321 |         "size_formatted": size_str,
322 |         "permissions": _format_mode(stats.st_mode),
323 |         "mode": stats.st_mode,
324 |         "owner": stats.st_uid,
325 |         "group": stats.st_gid,
326 |         "created": ctime,
327 |         "modified": mtime,
328 |         "accessed": atime,
329 |     }
330 | 
331 | 
332 | @mcp.tool(description="List directory contents with detailed information.")
333 | async def list_directory(  # noqa: C901
334 |     path: Annotated[str, Field(description="Directory path")],
335 |     recursive: Annotated[bool, Field(default=False, description="Whether to list recursively")] = False,
336 |     max_depth: Annotated[int, Field(default=-1, description="Maximum recursion depth")] = -1,
337 |     include_hidden: Annotated[bool, Field(default=False, description="Whether to include hidden files")] = False,
338 |     ignore_patterns: Annotated[
339 |         list[str] | None,
340 |         Field(
341 |             default=[
342 |                 "node_modules",
343 |                 "dist",
344 |                 "build",
345 |                 "public",
346 |                 "static",
347 |                 ".next",
348 |                 ".git",
349 |                 ".vscode",
350 |                 ".idea",
351 |                 ".DS_Store",
352 |                 ".env",
353 |                 ".venv",
354 |             ],
355 |             description="Glob patterns to ignore (e.g. ['node_modules', '*.tmp'])",
356 |         ),
357 |     ] = None,
358 | ) -> dict[str, Any]:
359 |     """List directory contents with detailed information.
360 | 
361 |     Args:
362 |         path: Directory path
363 |         recursive: Optional. Whether to list recursively (default: False)
364 |         max_depth: Optional. Maximum recursion depth (default: -1, which means no limit)
365 |         include_hidden: Optional. Whether to include hidden files (default: False)
366 |         ignore_patterns: Optional. Glob patterns to ignore (default: ['node_modules', 'dist', 'build', 'public', 'static', '.next', '.git', '.vscode', '.idea', '.DS_Store', '.env', '.venv'])
367 | 
368 |     Returns:
369 |         Dictionary containing directory contents and metadata
370 |     """
371 |     ignore_patterns = (
372 |         ignore_patterns
373 |         if ignore_patterns is not None
374 |         else [
375 |             "node_modules",
376 |             "dist",
377 |             "build",
378 |             "public",
379 |             "static",
380 |             ".next",
381 |             ".git",
382 |             ".vscode",
383 |             ".idea",
384 |             ".DS_Store",
385 |             ".env",
386 |             ".venv",
387 |         ]
388 |     )
389 |     try:
390 |         dir_path = Path(path).expanduser()
391 | 
392 |         if not dir_path.exists():
393 |             return {
394 |                 "error": f"Directory not found: {path}",
395 |                 "entries": [],
396 |                 "success": False,
397 |             }
398 | 
399 |         if not dir_path.is_dir():
400 |             return {
401 |                 "error": f"Path is not a directory: {path}",
402 |                 "entries": [],
403 |                 "success": False,
404 |             }
405 | 
406 |         entries = []
407 | 
408 |         # Import fnmatch for pattern matching
409 |         import fnmatch
410 | 
411 |         def should_ignore(path: Path) -> bool:
412 |             """Check if a path should be ignored based on ignore patterns.
413 | 
414 |             Args:
415 |                 path: Path to check
416 | 
417 |             Returns:
418 |                 True if the path should be ignored, False otherwise
419 |             """
420 |             if not ignore_patterns:
421 |                 return False
422 | 
423 |             return any(fnmatch.fnmatch(path.name, pattern) for pattern in ignore_patterns)
424 | 
425 |         def process_directory(current_path: Path, current_depth: int = 0) -> None:
426 |             """Process a directory and its contents recursively.
427 | 
428 |             Args:
429 |                 current_path: Path to the current directory
430 |                 current_depth: Current recursion depth
431 |             """
432 |             nonlocal entries
433 | 
434 |             # Check if we've reached the maximum depth
435 |             if max_depth >= 0 and current_depth > max_depth:
436 |                 return
437 | 
438 |             try:
439 |                 # List directory contents
440 |                 for item in current_path.iterdir():
441 |                     # Skip hidden files if not included
442 |                     if not include_hidden and item.name.startswith("."):
443 |                         continue
444 | 
445 |                     # Skip ignored patterns
446 |                     if should_ignore(item):
447 |                         continue
448 | 
449 |                     # Get file information
450 |                     file_info = _get_file_info(item)
451 |                     file_info["depth"] = current_depth
452 |                     entries.append(file_info)
453 | 
454 |                     # Recursively process subdirectories
455 |                     if recursive and item.is_dir():
456 |                         process_directory(item, current_depth + 1)
457 |             except PermissionError:
458 |                 # Add an entry indicating permission denied
459 |                 entries.append({
460 |                     "name": current_path.name,
461 |                     "path": str(current_path),
462 |                     "type": "directory",
463 |                     "error": "Permission denied",
464 |                     "depth": current_depth,
465 |                 })
466 | 
467 |         # Start processing from the root directory
468 |         process_directory(dir_path)
469 | 
470 |         return {
471 |             "path": str(dir_path),
472 |             "entries": entries,
473 |             "count": len(entries),
474 |             "success": True,
475 |         }
476 |     except Exception as e:
477 |         return {
478 |             "error": f"Failed to list directory: {e!s}",
479 |             "entries": [],
480 |             "success": False,
481 |         }
482 | 
```

--------------------------------------------------------------------------------
/tests/file_ops/test_file_ops_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for file operations tools."""
  2 | 
  3 | import os
  4 | import stat
  5 | import tempfile
  6 | from pathlib import Path
  7 | from unittest.mock import patch
  8 | 
  9 | import pytest
 10 | 
 11 | from mcp_toolbox.file_ops.tools import (
 12 |     _format_mode,
 13 |     _get_file_info,
 14 |     list_directory,
 15 |     read_file_content,
 16 |     replace_in_file,
 17 |     write_file_content,
 18 | )
 19 | 
 20 | 
 21 | @pytest.mark.asyncio
 22 | async def test_read_file_content():
 23 |     """Test reading file content."""
 24 |     # Create a temporary file
 25 |     with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
 26 |         temp_file.write("Test content")
 27 |         temp_path = temp_file.name
 28 | 
 29 |     try:
 30 |         # Test reading the entire file
 31 |         result = await read_file_content(temp_path)
 32 |         assert result["success"] is True
 33 |         assert result["content"] == "Test content"
 34 |         assert "size" in result
 35 |         assert "last_modified" in result
 36 |         assert result["total_chunks"] == 1
 37 |         assert result["chunk_index"] == 0
 38 |         assert result["is_last_chunk"] is True
 39 | 
 40 |         # Test reading a non-existent file
 41 |         result = await read_file_content("/non/existent/file")
 42 |         assert result["success"] is False
 43 |         assert "File not found" in result["error"]
 44 | 
 45 |         # Test reading a directory
 46 |         temp_dir = tempfile.mkdtemp()
 47 |         try:
 48 |             result = await read_file_content(temp_dir)
 49 |             assert result["success"] is False
 50 |             assert "Path is not a file" in result["error"]
 51 |         finally:
 52 |             os.rmdir(temp_dir)
 53 | 
 54 |         # Test reading a file with tilde in path
 55 |         with patch("pathlib.Path.expanduser", return_value=Path(temp_path)) as mock_expanduser:
 56 |             result = await read_file_content("~/test_file.txt")
 57 |             assert result["success"] is True
 58 |             assert result["content"] == "Test content"
 59 |             mock_expanduser.assert_called_once()
 60 | 
 61 |         # Test reading with custom chunk size
 62 |         result = await read_file_content(temp_path, chunk_size=5)
 63 |         assert result["success"] is True
 64 |         assert result["content"] == "Test "
 65 |         assert result["chunk_size"] == 5
 66 |         assert result["chunk_index"] == 0
 67 |         assert result["total_chunks"] == 3  # "Test content" is 12 chars, so 3 chunks of 5 bytes
 68 |         assert result["is_last_chunk"] is False
 69 | 
 70 |         # Test reading second chunk
 71 |         result = await read_file_content(temp_path, chunk_size=5, chunk_index=1)
 72 |         assert result["success"] is True
 73 |         assert result["content"] == "conte"
 74 |         assert result["chunk_index"] == 1
 75 |         assert result["is_last_chunk"] is False
 76 | 
 77 |         # Test reading last chunk
 78 |         result = await read_file_content(temp_path, chunk_size=5, chunk_index=2)
 79 |         assert result["success"] is True
 80 |         assert result["content"] == "nt"
 81 |         assert result["chunk_index"] == 2
 82 |         assert result["is_last_chunk"] is True
 83 |         assert result["chunk_actual_size"] == 2  # Only 2 bytes in the last chunk
 84 | 
 85 |         # Test reading with invalid chunk index
 86 |         result = await read_file_content(temp_path, chunk_size=5, chunk_index=3)
 87 |         assert result["success"] is False
 88 |         assert "Invalid chunk index" in result["error"]
 89 | 
 90 |     finally:
 91 |         # Clean up
 92 |         os.unlink(temp_path)
 93 | 
 94 | 
 95 | @pytest.mark.asyncio
 96 | async def test_write_file_content():
 97 |     """Test writing file content."""
 98 |     # Create a temporary directory
 99 |     with tempfile.TemporaryDirectory() as temp_dir:
100 |         # Test writing to a new file
101 |         file_path = os.path.join(temp_dir, "test_file.txt")
102 |         result = await write_file_content(file_path, "Test content")
103 |         assert result["success"] is True
104 |         assert os.path.exists(file_path)
105 |         with open(file_path) as f:
106 |             assert f.read() == "Test content"
107 | 
108 |         # Test appending to a file
109 |         result = await write_file_content(file_path, " appended", append=True)
110 |         assert result["success"] is True
111 |         with open(file_path) as f:
112 |             assert f.read() == "Test content appended"
113 | 
114 |         # Test writing to a nested path
115 |         nested_path = os.path.join(temp_dir, "nested", "dir", "test_file.txt")
116 |         result = await write_file_content(nested_path, "Nested content")
117 |         assert result["success"] is True
118 |         assert os.path.exists(nested_path)
119 |         with open(nested_path) as f:
120 |             assert f.read() == "Nested content"
121 | 
122 |         # Test writing to a file with tilde in path
123 |         tilde_path = "~/test_file_tilde.txt"
124 |         expanded_path = os.path.join(temp_dir, "test_file_tilde.txt")
125 | 
126 |         with patch("pathlib.Path.expanduser", return_value=Path(expanded_path)) as mock_expanduser:
127 |             result = await write_file_content(tilde_path, "Tilde content")
128 |             assert result["success"] is True
129 |             mock_expanduser.assert_called_once()
130 |             # Verify the file was created at the expanded path
131 |             assert os.path.exists(expanded_path)
132 |             with open(expanded_path) as f:
133 |                 assert f.read() == "Tilde content"
134 | 
135 | 
136 | @pytest.mark.asyncio
137 | async def test_replace_in_file():
138 |     """Test replacing content in a file."""
139 |     # Create a temporary file
140 |     with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
141 |         temp_file.write("Hello world! This is a test.")
142 |         temp_path = temp_file.name
143 | 
144 |     try:
145 |         # Test replacing content
146 |         result = await replace_in_file(temp_path, r"world", "universe")
147 |         assert result["success"] is True
148 |         assert result["replacements"] == 1
149 |         with open(temp_path) as f:
150 |             assert f.read() == "Hello universe! This is a test."
151 | 
152 |         # Test replacing with count
153 |         result = await replace_in_file(temp_path, r"[aeiou]", "X", count=2)
154 |         assert result["success"] is True
155 |         assert result["replacements"] == 2
156 |         with open(temp_path) as f:
157 |             assert f.read() == "HXllX universe! This is a test."
158 | 
159 |         # Test replacing with invalid regex
160 |         result = await replace_in_file(temp_path, r"[unclosed", "X")
161 |         assert result["success"] is False
162 |         assert "Invalid regular expression" in result["error"]
163 | 
164 |         # Test replacing in non-existent file
165 |         result = await replace_in_file("/non/existent/file", r"test", "replacement")
166 |         assert result["success"] is False
167 |         assert "File not found" in result["error"]
168 | 
169 |         # Test replacing in a file with tilde in path
170 |         with patch("pathlib.Path.expanduser", return_value=Path(temp_path)) as mock_expanduser:
171 |             result = await replace_in_file("~/test_file.txt", r"HXllX", "Hello")
172 |             assert result["success"] is True
173 |             assert result["replacements"] == 1
174 |             mock_expanduser.assert_called_once()
175 |             with open(temp_path) as f:
176 |                 assert f.read() == "Hello universe! This is a test."
177 | 
178 |     finally:
179 |         # Clean up
180 |         os.unlink(temp_path)
181 | 
182 | 
183 | def test_format_mode():
184 |     """Test formatting file mode."""
185 |     # Directory with full permissions
186 |     dir_mode = stat.S_IFDIR | 0o777
187 |     assert _format_mode(dir_mode) == "drwxrwxrwx"
188 | 
189 |     # Regular file with read-only permissions
190 |     file_mode = stat.S_IFREG | 0o444
191 |     assert _format_mode(file_mode) == "-r--r--r--"
192 | 
193 |     # Executable file with owner-only permissions
194 |     exec_mode = stat.S_IFREG | 0o700
195 |     assert _format_mode(exec_mode) == "-rwx------"
196 | 
197 |     # Symlink with mixed permissions
198 |     link_mode = stat.S_IFLNK | 0o751
199 |     assert _format_mode(link_mode) == "lrwxr-x--x"
200 | 
201 | 
202 | @pytest.mark.asyncio
203 | async def test_list_directory():
204 |     """Test listing directory contents."""
205 |     # Create a temporary directory structure
206 |     with tempfile.TemporaryDirectory() as temp_dir:
207 |         # Create some files and subdirectories
208 |         file1_path = os.path.join(temp_dir, "file1.txt")
209 |         with open(file1_path, "w") as f:
210 |             f.write("File 1 content")
211 | 
212 |         file2_path = os.path.join(temp_dir, "file2.txt")
213 |         with open(file2_path, "w") as f:
214 |             f.write("File 2 content")
215 | 
216 |         hidden_file_path = os.path.join(temp_dir, ".hidden_file")
217 |         with open(hidden_file_path, "w") as f:
218 |             f.write("Hidden file content")
219 | 
220 |         subdir_path = os.path.join(temp_dir, "subdir")
221 |         os.mkdir(subdir_path)
222 | 
223 |         subfile_path = os.path.join(subdir_path, "subfile.txt")
224 |         with open(subfile_path, "w") as f:
225 |             f.write("Subfile content")
226 | 
227 |         # Create files for testing ignore patterns
228 |         temp_file_path = os.path.join(temp_dir, "temp.tmp")
229 |         with open(temp_file_path, "w") as f:
230 |             f.write("Temporary file content")
231 | 
232 |         node_modules_path = os.path.join(temp_dir, "node_modules")
233 |         os.mkdir(node_modules_path)
234 | 
235 |         node_module_file_path = os.path.join(node_modules_path, "package.json")
236 |         with open(node_module_file_path, "w") as f:
237 |             f.write('{"name": "test-package"}')
238 | 
239 |         cache_file_path = os.path.join(temp_dir, "cache.pyc")
240 |         with open(cache_file_path, "w") as f:
241 |             f.write("Python cache file")
242 | 
243 |         # Test basic directory listing
244 |         result = await list_directory(temp_dir)
245 |         assert result["success"] is True
246 |         assert result["path"] == temp_dir
247 |         assert (
248 |             len(result["entries"]) == 5
249 |         )  # 4 files + 1 directory (node_modules is ignored by default), no hidden files
250 |         assert result["count"] == 5
251 | 
252 |         # Test with hidden files
253 |         result = await list_directory(temp_dir, include_hidden=True)
254 |         assert result["success"] is True
255 |         assert len(result["entries"]) == 6  # 4 files + 1 directory + 1 hidden file (node_modules is ignored by default)
256 |         assert result["count"] == 6
257 | 
258 |         # Test recursive listing with explicit empty ignore patterns to see all files
259 |         result = await list_directory(temp_dir, recursive=True, ignore_patterns=[])
260 |         assert result["success"] is True
261 |         assert len(result["entries"]) == 8  # 4 files + 2 directories + 1 subfile + 1 node_module file
262 |         assert result["count"] == 8
263 | 
264 |         # Test with max depth (with empty ignore patterns to ensure consistent behavior)
265 |         result = await list_directory(temp_dir, recursive=True, max_depth=0, ignore_patterns=[])
266 |         assert result["success"] is True
267 |         assert len(result["entries"]) == 6  # Only top-level entries
268 |         assert result["count"] == 6
269 | 
270 |         # Test with ignore patterns - single pattern
271 |         result = await list_directory(temp_dir, ignore_patterns=["*.tmp"])
272 |         assert result["success"] is True
273 |         assert len(result["entries"]) == 5  # Excluding temp.tmp
274 |         assert result["count"] == 5
275 |         # Verify temp.tmp is not in the results
276 |         assert not any(entry["name"] == "temp.tmp" for entry in result["entries"])
277 | 
278 |         # Test with ignore patterns - directory pattern
279 |         result = await list_directory(temp_dir, recursive=True, ignore_patterns=["node_modules"])
280 |         assert result["success"] is True
281 |         assert len(result["entries"]) == 6  # Excluding node_modules directory and its contents
282 |         assert result["count"] == 6
283 |         # Verify node_modules is not in the results
284 |         assert not any(entry["name"] == "node_modules" for entry in result["entries"])
285 | 
286 |         # Test with multiple ignore patterns
287 |         result = await list_directory(temp_dir, ignore_patterns=["*.tmp", "*.pyc"])
288 |         assert result["success"] is True
289 |         assert len(result["entries"]) == 4  # Excluding temp.tmp and cache.pyc
290 |         assert result["count"] == 4
291 |         # Verify neither temp.tmp nor cache.pyc are in the results
292 |         assert not any(entry["name"] == "temp.tmp" for entry in result["entries"])
293 |         assert not any(entry["name"] == "cache.pyc" for entry in result["entries"])
294 | 
295 |         # Test default ignore patterns
296 |         # Create directories and files that should be ignored by default
297 |         git_dir_path = os.path.join(temp_dir, ".git")
298 |         os.mkdir(git_dir_path)
299 |         git_file_path = os.path.join(git_dir_path, "config")
300 |         with open(git_file_path, "w") as f:
301 |             f.write("Git config content")
302 | 
303 |         # Create a .DS_Store file that should be ignored by default
304 |         ds_store_path = os.path.join(temp_dir, ".DS_Store")
305 |         with open(ds_store_path, "w") as f:
306 |             f.write("DS_Store content")
307 | 
308 |         # Create a node_modules directory that should be ignored by default
309 |         node_modules_dir = os.path.join(temp_dir, "node_modules")
310 |         os.makedirs(node_modules_dir, exist_ok=True)
311 |         node_modules_file = os.path.join(node_modules_dir, "package.json")
312 |         with open(node_modules_file, "w") as f:
313 |             f.write('{"name": "test-package"}')
314 | 
315 |         # Test with explicit ignore patterns that match the default ones
316 |         result = await list_directory(
317 |             temp_dir, recursive=True, include_hidden=True, ignore_patterns=[".git", ".DS_Store", "node_modules"]
318 |         )
319 |         assert result["success"] is True
320 |         # Should not include .git directory or .DS_Store file due to specified ignore patterns
321 |         assert not any(entry["name"] == ".git" for entry in result["entries"])
322 |         assert not any(entry["name"] == ".DS_Store" for entry in result["entries"])
323 |         assert not any(entry["name"] == "node_modules" for entry in result["entries"])
324 |         assert any(entry["name"] == ".hidden_file" for entry in result["entries"])  # Should include other hidden files
325 | 
326 |         # Test with explicit None for ignore_patterns (should use defaults)
327 |         result = await list_directory(temp_dir, recursive=True, include_hidden=True, ignore_patterns=None)
328 |         assert result["success"] is True
329 |         # Should not include node_modules directory due to default ignore patterns
330 |         assert not any(entry["name"] == "node_modules" for entry in result["entries"])
331 |         # Should not include .git directory due to default ignore patterns
332 |         assert not any(entry["name"] == ".git" for entry in result["entries"])
333 |         # Should not include .DS_Store file due to default ignore patterns
334 |         assert not any(entry["name"] == ".DS_Store" for entry in result["entries"])
335 | 
336 |         # Test with empty list for ignore_patterns (should override defaults)
337 |         result = await list_directory(temp_dir, recursive=True, include_hidden=True, ignore_patterns=[])
338 |         assert result["success"] is True
339 |         # Should include .git directory and .DS_Store file since we're overriding defaults with empty list
340 |         assert any(entry["name"] == ".git" for entry in result["entries"])
341 |         assert any(entry["name"] == ".DS_Store" for entry in result["entries"])
342 | 
343 |         # Test combining ignore patterns with other parameters
344 |         result = await list_directory(
345 |             temp_dir,
346 |             recursive=True,
347 |             include_hidden=True,
348 |             ignore_patterns=["node_modules", "*.tmp", "*.pyc", ".git", ".DS_Store"],
349 |         )
350 |         assert result["success"] is True
351 |         # Should include only .hidden_file, file1.txt, file2.txt, subdir, and subfile.txt
352 |         assert len(result["entries"]) == 5
353 |         assert result["count"] == 5
354 |         # Verify specific files are included/excluded
355 |         assert any(entry["name"] == ".hidden_file" for entry in result["entries"])
356 |         assert any(entry["name"] == "file1.txt" for entry in result["entries"])
357 |         assert any(entry["name"] == "file2.txt" for entry in result["entries"])
358 |         assert any(entry["name"] == "subdir" for entry in result["entries"])
359 |         assert any(entry["name"] == "subfile.txt" for entry in result["entries"])
360 |         # Verify excluded files
361 |         assert not any(entry["name"] == "node_modules" for entry in result["entries"])
362 |         assert not any(entry["name"] == "temp.tmp" for entry in result["entries"])
363 |         assert not any(entry["name"] == "cache.pyc" for entry in result["entries"])
364 |         assert not any(entry["name"] == ".git" for entry in result["entries"])
365 |         assert not any(entry["name"] == ".DS_Store" for entry in result["entries"])
366 | 
367 |         # Test non-existent directory
368 |         result = await list_directory("/non/existent/dir")
369 |         assert result["success"] is False
370 |         assert "Directory not found" in result["error"]
371 | 
372 |         # Test file path instead of directory
373 |         result = await list_directory(file1_path)
374 |         assert result["success"] is False
375 |         assert "Path is not a directory" in result["error"]
376 | 
377 |         # Test directory with tilde in path (with empty ignore patterns to ensure consistent behavior)
378 |         with patch("pathlib.Path.expanduser", return_value=Path(temp_dir)) as mock_expanduser:
379 |             result = await list_directory("~/test_dir", ignore_patterns=[])
380 |             assert result["success"] is True
381 |             assert len(result["entries"]) == 6  # 4 files + 2 directories, no hidden files
382 |             assert result["count"] == 6
383 |             mock_expanduser.assert_called_once()
384 | 
385 | 
386 | def test_get_file_info():
387 |     """Test getting file information."""
388 |     # Create a temporary file
389 |     with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
390 |         temp_file.write("Test content")
391 |         temp_path = temp_file.name
392 | 
393 |     try:
394 |         # Get file info
395 |         file_path = Path(temp_path)
396 |         file_info = _get_file_info(file_path)
397 | 
398 |         # Check basic properties
399 |         assert file_info["name"] == file_path.name
400 |         assert file_info["path"] == str(file_path)
401 |         assert file_info["type"] == "file"
402 |         assert file_info["size"] == len("Test content")
403 |         assert "size_formatted" in file_info
404 |         assert "permissions" in file_info
405 |         assert "mode" in file_info
406 |         assert "owner" in file_info
407 |         assert "group" in file_info
408 |         assert "created" in file_info
409 |         assert "modified" in file_info
410 |         assert "accessed" in file_info
411 | 
412 |     finally:
413 |         # Clean up
414 |         os.unlink(temp_path)
415 | 
```
Page 2/2FirstPrevNextLast