This is page 2 of 2. Use http://codebase.md/ai-zerolab/mcp-toolbox?lines=true&page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── actions │ │ └── setup-python-env │ │ └── action.yml │ └── workflows │ ├── main.yml │ ├── on-release-main.yml │ └── validate-codecov-config.yml ├── .gitignore ├── .pre-commit-config.yaml ├── .vscode │ └── settings.json ├── codecov.yaml ├── CONTRIBUTING.md ├── Dockerfile ├── docs │ ├── index.md │ └── modules.md ├── generate_config_template.py ├── LICENSE ├── llms.txt ├── Makefile ├── mcp_toolbox │ ├── __init__.py │ ├── app.py │ ├── audio │ │ ├── __init__.py │ │ └── tools.py │ ├── cli.py │ ├── command_line │ │ ├── __init__.py │ │ └── tools.py │ ├── config.py │ ├── enhance │ │ ├── __init__.py │ │ ├── memory.py │ │ └── tools.py │ ├── figma │ │ ├── __init__.py │ │ └── tools.py │ ├── file_ops │ │ ├── __init__.py │ │ └── tools.py │ ├── flux │ │ ├── __init__.py │ │ ├── api.py │ │ └── tools.py │ ├── log.py │ ├── markitdown │ │ ├── __init__.py │ │ └── tools.py │ ├── web │ │ ├── __init__.py │ │ └── tools.py │ └── xiaoyuzhoufm │ ├── __init__.py │ └── tools.py ├── mkdocs.yml ├── pyproject.toml ├── pytest.ini ├── README.md ├── smithery.yaml ├── tests │ ├── audio │ │ └── test_audio_tools.py │ ├── command_line │ │ └── test_command_line_tools.py │ ├── enhance │ │ ├── test_enhance_tools.py │ │ └── test_memory.py │ ├── figma │ │ └── test_figma_tools.py │ ├── file_ops │ │ └── test_file_ops_tools.py │ ├── flux │ │ └── test_flux_tools.py │ ├── markitdown │ │ └── test_markitdown_tools.py │ ├── mock │ │ └── figma │ │ ├── delete_comment.json │ │ ├── get_comments.json │ │ ├── get_component.json │ │ ├── get_file_components.json │ │ ├── get_file_nodes.json │ │ ├── get_file_styles.json │ │ ├── get_file.json │ │ ├── get_image_fills.json │ │ ├── get_image.json │ │ ├── get_project_files.json │ │ ├── get_style.json │ │ ├── get_team_component_sets.json │ │ ├── get_team_components.json │ │ ├── get_team_projects.json │ │ ├── get_team_styles.json │ │ └── post_comment.json │ ├── web │ │ └── test_web_tools.py │ └── xiaoyuzhoufm │ └── test_xiaoyuzhoufm_tools.py ├── tox.ini └── uv.lock ``` # Files -------------------------------------------------------------------------------- /mcp_toolbox/figma/tools.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | import time 3 | from pathlib import Path 4 | from typing import Annotated, Any 5 | 6 | import httpx 7 | from pydantic import BaseModel, Field 8 | 9 | from mcp_toolbox.app import mcp 10 | from mcp_toolbox.config import Config 11 | 12 | 13 | # Type definitions for request/response parameters 14 | class ClientMeta(BaseModel): 15 | x: float 16 | y: float 17 | node_id: str | None = None 18 | node_offset: dict[str, float] | None = None 19 | 20 | 21 | # API Client 22 | class FigmaApiClient: 23 | BASE_URL = "https://api.figma.com/v1" 24 | 25 | def __init__(self): 26 | self.config = Config() 27 | 28 | async def get_access_token(self) -> str: 29 | if not self.config.figma_api_key: 30 | raise ValueError("No Figma API key provided. Set the FIGMA_API_KEY environment variable.") 31 | return self.config.figma_api_key 32 | 33 | async def make_request(self, path: str, method: str = "GET", data: Any = None) -> dict[str, Any]: 34 | token = await self.get_access_token() 35 | 36 | async with httpx.AsyncClient( 37 | transport=httpx.AsyncHTTPTransport(retries=3), 38 | timeout=30, 39 | ) as client: 40 | headers = {"X-Figma-Token": token} 41 | url = f"{self.BASE_URL}{path}" 42 | 43 | try: 44 | if method == "GET": 45 | response = await client.get(url, headers=headers) 46 | elif method == "POST": 47 | response = await client.post(url, headers=headers, json=data) 48 | elif method == "DELETE": 49 | response = await client.delete(url, headers=headers) 50 | else: 51 | raise ValueError(f"Unsupported HTTP method: {method}") 52 | 53 | response.raise_for_status() 54 | return response.json() 55 | except httpx.HTTPStatusError as e: 56 | figma_error = ( 57 | e.response.json() if e.response.content else {"status": e.response.status_code, "err": str(e)} 58 | ) 59 | raise ValueError( 60 | f"Figma API error: {figma_error.get('err', figma_error.get('message', str(e)))}" 61 | ) from e 62 | except httpx.RequestError as e: 63 | raise ValueError(f"Request error: {e!s}") from e 64 | 65 | def build_query_string(self, params: dict[str, Any]) -> str: 66 | # Filter out None values 67 | filtered_params = {k: v for k, v in params.items() if v is not None} 68 | 69 | if not filtered_params: 70 | return "" 71 | 72 | # Convert lists to comma-separated strings 73 | for key, value in filtered_params.items(): 74 | if isinstance(value, list): 75 | filtered_params[key] = ",".join(map(str, value)) 76 | 77 | # Build query string 78 | query_parts = [f"{k}={v}" for k, v in filtered_params.items()] 79 | return "?" + "&".join(query_parts) 80 | 81 | 82 | # Cache Manager 83 | class CacheManager: 84 | def __init__(self): 85 | self.config = Config() 86 | self.cache_dir = Path(self.config.cache_dir) 87 | self.cache_dir.mkdir(parents=True, exist_ok=True) 88 | 89 | def save_to_cache(self, filename: str, data: Any) -> str: 90 | file_path = self.cache_dir / filename 91 | with open(file_path, "w") as f: 92 | json.dump(data, f, indent=2) 93 | return str(file_path) 94 | 95 | 96 | # Initialize API client and cache manager 97 | api_client = FigmaApiClient() 98 | cache_manager = CacheManager() 99 | 100 | 101 | # Tool implementations 102 | @mcp.tool(description="Get a Figma file by key") 103 | async def figma_get_file( 104 | file_key: Annotated[str, Field(description="The key of the file to get")], 105 | version: Annotated[str | None, Field(default=None, description="A specific version ID to get")] = None, 106 | depth: Annotated[int | None, Field(default=None, description="Depth of nodes to return 1-4")] = None, 107 | branch_data: Annotated[bool | None, Field(default=None, description="Include branch data if true")] = None, 108 | ) -> dict[str, Any]: 109 | """Get a Figma file by key.""" 110 | params = {"version": version, "depth": depth, "branch_data": branch_data} 111 | 112 | query_string = api_client.build_query_string(params) 113 | result = await api_client.make_request(f"/files/{file_key}{query_string}") 114 | 115 | # Save to cache 116 | try: 117 | filename = f"file_{file_key}_{int(time.time() * 1000)}.json" 118 | file_path = cache_manager.save_to_cache(filename, result) 119 | return { 120 | "file_path": file_path, 121 | "message": "File data saved to local cache. Use this file path to access the complete data.", 122 | } 123 | except Exception: 124 | # If saving to cache fails, return original result 125 | return result 126 | 127 | 128 | @mcp.tool(description="Get specific nodes from a Figma file.") 129 | async def figma_get_file_nodes( 130 | file_key: Annotated[str, Field(description="The key of the file to get nodes from")], 131 | node_ids: Annotated[list[str], Field(description="Array of node IDs to get")], 132 | depth: Annotated[int | None, Field(default=None, description="Depth of nodes to return 1-4")] = None, 133 | version: Annotated[str | None, Field(default=None, description="A specific version ID to get")] = None, 134 | ) -> dict[str, Any]: 135 | """Get specific nodes from a Figma file.""" 136 | params = {"ids": node_ids, "depth": depth, "version": version} 137 | 138 | query_string = api_client.build_query_string(params) 139 | result = await api_client.make_request(f"/files/{file_key}/nodes{query_string}") 140 | 141 | # Save to cache 142 | try: 143 | filename = f"file_nodes_{file_key}_{int(time.time() * 1000)}.json" 144 | file_path = cache_manager.save_to_cache(filename, result) 145 | return { 146 | "file_path": file_path, 147 | "message": "File nodes data saved to local cache. Use this file path to access the complete data.", 148 | } 149 | except Exception: 150 | # If saving to cache fails, return original result 151 | return result 152 | 153 | 154 | @mcp.tool(description="Get images for nodes in a Figma file.") 155 | async def figma_get_image( 156 | file_key: Annotated[str, Field(description="The key of the file to get images from")], 157 | ids: Annotated[list[str], Field(description="Array of node IDs to render")], 158 | scale: Annotated[float | None, Field(default=None, description="Scale factor to render at 0.01-4")] = None, 159 | format_type: Annotated[str | None, Field(default=None, description="Image format jpg/png/svg/pdf")] = None, 160 | svg_include_id: Annotated[bool | None, Field(default=None, description="Include IDs in SVG output")] = None, 161 | svg_simplify_stroke: Annotated[ 162 | bool | None, Field(default=None, description="Simplify strokes in SVG output") 163 | ] = None, 164 | use_absolute_bounds: Annotated[bool | None, Field(default=None, description="Use absolute bounds")] = None, 165 | ) -> dict[str, Any]: 166 | """Get images for nodes in a Figma file.""" 167 | params = { 168 | "ids": ids, 169 | "scale": scale, 170 | "format": format_type, 171 | "svg_include_id": svg_include_id, 172 | "svg_simplify_stroke": svg_simplify_stroke, 173 | "use_absolute_bounds": use_absolute_bounds, 174 | } 175 | 176 | query_string = api_client.build_query_string(params) 177 | return await api_client.make_request(f"/images/{file_key}{query_string}") 178 | 179 | 180 | @mcp.tool(description="Get URLs for images used in a Figma file.") 181 | async def figma_get_image_fills( 182 | file_key: Annotated[str, Field(description="The key of the file to get image fills from")], 183 | ) -> dict[str, Any]: 184 | """Get URLs for images used in a Figma file.""" 185 | return await api_client.make_request(f"/files/{file_key}/images") 186 | 187 | 188 | @mcp.tool(description="Get comments on a Figma file.") 189 | async def figma_get_comments( 190 | file_key: Annotated[str, Field(description="The key of the file to get comments from")], 191 | ) -> dict[str, Any]: 192 | """Get comments on a Figma file.""" 193 | return await api_client.make_request(f"/files/{file_key}/comments") 194 | 195 | 196 | @mcp.tool(description="Post a comment on a Figma file.") 197 | async def figma_post_comment( 198 | file_key: Annotated[str, Field(description="The key of the file to comment on")], 199 | message: Annotated[str, Field(description="Comment message text")], 200 | client_meta: Annotated[ 201 | dict[str, Any] | None, Field(default=None, description="Position of the comment x/y/node_id/node_offset") 202 | ] = None, 203 | comment_id: Annotated[str | None, Field(default=None, description="ID of comment to reply to")] = None, 204 | ) -> dict[str, Any]: 205 | """Post a comment on a Figma file.""" 206 | comment_data = {"message": message} 207 | 208 | if client_meta: 209 | comment_data["client_meta"] = client_meta 210 | 211 | if comment_id: 212 | comment_data["comment_id"] = comment_id 213 | 214 | return await api_client.make_request(f"/files/{file_key}/comments", "POST", comment_data) 215 | 216 | 217 | @mcp.tool(description="Delete a comment from a Figma file.") 218 | async def figma_delete_comment( 219 | file_key: Annotated[str, Field(description="The key of the file to delete a comment from")], 220 | comment_id: Annotated[str, Field(description="ID of the comment to delete")], 221 | ) -> dict[str, Any]: 222 | """Delete a comment from a Figma file.""" 223 | return await api_client.make_request(f"/files/{file_key}/comments/{comment_id}", "DELETE") 224 | 225 | 226 | @mcp.tool(description="Get projects for a team.") 227 | async def figma_get_team_projects( 228 | team_id: Annotated[str, Field(description="The team ID")], 229 | page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, 230 | cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, 231 | ) -> dict[str, Any]: 232 | """Get projects for a team.""" 233 | params = {"page_size": page_size, "cursor": cursor} 234 | 235 | query_string = api_client.build_query_string(params) 236 | return await api_client.make_request(f"/teams/{team_id}/projects{query_string}") 237 | 238 | 239 | @mcp.tool(description="Get files for a project.") 240 | async def figma_get_project_files( 241 | project_id: Annotated[str, Field(description="The project ID")], 242 | page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, 243 | cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, 244 | branch_data: Annotated[bool | None, Field(default=None, description="Include branch data if true")] = None, 245 | ) -> dict[str, Any]: 246 | """Get files for a project.""" 247 | params = {"page_size": page_size, "cursor": cursor, "branch_data": branch_data} 248 | 249 | query_string = api_client.build_query_string(params) 250 | return await api_client.make_request(f"/projects/{project_id}/files{query_string}") 251 | 252 | 253 | @mcp.tool(description="Get components for a team.") 254 | async def figma_get_team_components( 255 | team_id: Annotated[str, Field(description="The team ID")], 256 | page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, 257 | cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, 258 | ) -> dict[str, Any]: 259 | """Get components for a team.""" 260 | params = {"page_size": page_size, "cursor": cursor} 261 | 262 | query_string = api_client.build_query_string(params) 263 | return await api_client.make_request(f"/teams/{team_id}/components{query_string}") 264 | 265 | 266 | @mcp.tool(description="Get components from a file.") 267 | async def figma_get_file_components( 268 | file_key: Annotated[str, Field(description="The key of the file to get components from")], 269 | ) -> dict[str, Any]: 270 | """Get components from a file.""" 271 | return await api_client.make_request(f"/files/{file_key}/components") 272 | 273 | 274 | @mcp.tool(description="Get a component by key.") 275 | async def figma_get_component(key: Annotated[str, Field(description="The component key")]) -> dict[str, Any]: 276 | """Get a component by key.""" 277 | return await api_client.make_request(f"/components/{key}") 278 | 279 | 280 | @mcp.tool(description="Get component sets for a team.") 281 | async def figma_get_team_component_sets( 282 | team_id: Annotated[str, Field(description="The team ID")], 283 | page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, 284 | cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, 285 | ) -> dict[str, Any]: 286 | """Get component sets for a team.""" 287 | params = {"page_size": page_size, "cursor": cursor} 288 | 289 | query_string = api_client.build_query_string(params) 290 | return await api_client.make_request(f"/teams/{team_id}/component_sets{query_string}") 291 | 292 | 293 | @mcp.tool(description="Get styles for a team.") 294 | async def figma_get_team_styles( 295 | team_id: Annotated[str, Field(description="The team ID")], 296 | page_size: Annotated[int | None, Field(default=None, description="Number of items per page")] = None, 297 | cursor: Annotated[str | None, Field(default=None, description="Cursor for pagination")] = None, 298 | ) -> dict[str, Any]: 299 | """Get styles for a team.""" 300 | params = {"page_size": page_size, "cursor": cursor} 301 | 302 | query_string = api_client.build_query_string(params) 303 | return await api_client.make_request(f"/teams/{team_id}/styles{query_string}") 304 | 305 | 306 | @mcp.tool(description="Get styles from a file.") 307 | async def figma_get_file_styles( 308 | file_key: Annotated[str, Field(description="The key of the file to get styles from")], 309 | ) -> dict[str, Any]: 310 | """Get styles from a file.""" 311 | return await api_client.make_request(f"/files/{file_key}/styles") 312 | 313 | 314 | @mcp.tool(description="Get a style by key.") 315 | async def figma_get_style(key: Annotated[str, Field(description="The style key")]) -> dict[str, Any]: 316 | """Get a style by key.""" 317 | return await api_client.make_request(f"/styles/{key}") 318 | ``` -------------------------------------------------------------------------------- /tests/figma/test_figma_tools.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | from pathlib import Path 3 | from unittest.mock import patch 4 | 5 | import pytest 6 | 7 | from mcp_toolbox.figma.tools import ( 8 | CacheManager, 9 | FigmaApiClient, 10 | figma_delete_comment, 11 | figma_get_comments, 12 | figma_get_component, 13 | figma_get_file, 14 | figma_get_file_components, 15 | figma_get_file_nodes, 16 | figma_get_file_styles, 17 | figma_get_image, 18 | figma_get_image_fills, 19 | figma_get_project_files, 20 | figma_get_style, 21 | figma_get_team_component_sets, 22 | figma_get_team_components, 23 | figma_get_team_projects, 24 | figma_get_team_styles, 25 | figma_post_comment, 26 | ) 27 | 28 | 29 | # Helper function to load mock data 30 | def load_mock_data(filename): 31 | mock_dir = Path(__file__).parent.parent / "mock" / "figma" 32 | file_path = mock_dir / filename 33 | 34 | if not file_path.exists(): 35 | # Create empty mock data if it doesn't exist 36 | mock_data = {"mock": "data"} 37 | with open(file_path, "w") as f: 38 | json.dump(mock_data, f) 39 | 40 | with open(file_path) as f: 41 | return json.load(f) 42 | 43 | 44 | # Patch the FigmaApiClient.make_request method 45 | @pytest.fixture 46 | def mock_make_request(): 47 | with patch.object(FigmaApiClient, "make_request") as mock: 48 | 49 | def side_effect(path, method="GET", data=None): 50 | # Extract the tool name from the path 51 | parts = path.strip("/").split("/") 52 | 53 | if len(parts) >= 2 and parts[0] == "files" and parts[1]: 54 | file_key = parts[1] 55 | 56 | if len(parts) == 2: 57 | # get_file 58 | return load_mock_data("get_file.json") 59 | elif len(parts) == 3: 60 | if parts[2] == "nodes": 61 | # get_file_nodes 62 | return load_mock_data("get_file_nodes.json") 63 | elif parts[2] == "images": 64 | # get_image_fills 65 | return load_mock_data("get_image_fills.json") 66 | elif parts[2] == "components": 67 | # get_file_components 68 | return load_mock_data("get_file_components.json") 69 | elif parts[2] == "styles": 70 | # get_file_styles 71 | return load_mock_data("get_file_styles.json") 72 | elif parts[2] == "comments": 73 | if method == "GET": 74 | # get_comments 75 | return load_mock_data("get_comments.json") 76 | elif method == "POST": 77 | # post_comment 78 | return load_mock_data("post_comment.json") 79 | elif len(parts) == 4 and parts[2] == "comments": 80 | # delete_comment 81 | return load_mock_data("delete_comment.json") 82 | 83 | elif parts[0] == "images" and len(parts) >= 2: 84 | # get_image 85 | return load_mock_data("get_image.json") 86 | 87 | elif parts[0] == "teams" and len(parts) >= 3: 88 | team_id = parts[1] 89 | 90 | if parts[2] == "projects": 91 | # get_team_projects 92 | return load_mock_data("get_team_projects.json") 93 | elif parts[2] == "components": 94 | # get_team_components 95 | return load_mock_data("get_team_components.json") 96 | elif parts[2] == "component_sets": 97 | # get_team_component_sets 98 | return load_mock_data("get_team_component_sets.json") 99 | elif parts[2] == "styles": 100 | # get_team_styles 101 | return load_mock_data("get_team_styles.json") 102 | 103 | elif parts[0] == "projects" and len(parts) >= 3: 104 | # get_project_files 105 | return load_mock_data("get_project_files.json") 106 | 107 | elif parts[0] == "components" and len(parts) >= 2: 108 | # get_component 109 | return load_mock_data("get_component.json") 110 | 111 | elif parts[0] == "styles" and len(parts) >= 2: 112 | # get_style 113 | return load_mock_data("get_style.json") 114 | 115 | # Default mock data 116 | return {"mock": "data"} 117 | 118 | mock.side_effect = side_effect 119 | yield mock 120 | 121 | 122 | # Patch the CacheManager.save_to_cache method 123 | @pytest.fixture 124 | def mock_save_to_cache(): 125 | with patch.object(CacheManager, "save_to_cache") as mock: 126 | mock.return_value = "/mock/path/to/cache/file.json" 127 | yield mock 128 | 129 | 130 | # Test get_file function 131 | @pytest.mark.asyncio 132 | async def test_get_file(mock_make_request, mock_save_to_cache): 133 | # Test with minimal parameters 134 | result = await figma_get_file("test_file_key") 135 | 136 | # Verify make_request was called with correct parameters 137 | mock_make_request.assert_called_once_with("/files/test_file_key") 138 | 139 | # Verify save_to_cache was called 140 | mock_save_to_cache.assert_called_once() 141 | 142 | # Verify the result contains expected fields 143 | assert "file_path" in result 144 | assert "message" in result 145 | assert result["file_path"] == "/mock/path/to/cache/file.json" 146 | 147 | # Reset mocks for next test 148 | mock_make_request.reset_mock() 149 | mock_save_to_cache.reset_mock() 150 | 151 | # Test with all parameters 152 | result = await figma_get_file("test_file_key", version="123", depth=2, branch_data=True) 153 | 154 | # Verify make_request was called with correct parameters 155 | mock_make_request.assert_called_once_with("/files/test_file_key?version=123&depth=2&branch_data=True") 156 | 157 | 158 | # Test get_file_nodes function 159 | @pytest.mark.asyncio 160 | async def test_get_file_nodes(mock_make_request, mock_save_to_cache): 161 | # Test with minimal parameters 162 | result = await figma_get_file_nodes("test_file_key", ["node1", "node2"]) 163 | 164 | # Verify make_request was called with correct parameters 165 | mock_make_request.assert_called_once_with("/files/test_file_key/nodes?ids=node1,node2") 166 | 167 | # Verify save_to_cache was called 168 | mock_save_to_cache.assert_called_once() 169 | 170 | # Verify the result contains expected fields 171 | assert "file_path" in result 172 | assert "message" in result 173 | assert result["file_path"] == "/mock/path/to/cache/file.json" 174 | 175 | # Reset mocks for next test 176 | mock_make_request.reset_mock() 177 | mock_save_to_cache.reset_mock() 178 | 179 | # Test with all parameters 180 | result = await figma_get_file_nodes("test_file_key", ["node1", "node2"], depth=2, version="123") 181 | 182 | # Verify make_request was called with correct parameters 183 | mock_make_request.assert_called_once_with("/files/test_file_key/nodes?ids=node1,node2&depth=2&version=123") 184 | 185 | 186 | # Test get_image function 187 | @pytest.mark.asyncio 188 | async def test_get_image(mock_make_request): 189 | # Test with minimal parameters 190 | result = await figma_get_image("test_file_key", ["node1", "node2"]) 191 | 192 | # Verify make_request was called with correct parameters 193 | mock_make_request.assert_called_once_with("/images/test_file_key?ids=node1,node2") 194 | 195 | # Reset mock for next test 196 | mock_make_request.reset_mock() 197 | 198 | # Test with all parameters 199 | result = await figma_get_image( 200 | "test_file_key", 201 | ["node1", "node2"], 202 | scale=2.0, 203 | format_type="png", 204 | svg_include_id=True, 205 | svg_simplify_stroke=True, 206 | use_absolute_bounds=True, 207 | ) 208 | 209 | # Verify make_request was called with correct parameters 210 | mock_make_request.assert_called_once_with( 211 | "/images/test_file_key?ids=node1,node2&scale=2.0&format=png&svg_include_id=True&svg_simplify_stroke=True&use_absolute_bounds=True" 212 | ) 213 | 214 | 215 | # Test get_image_fills function 216 | @pytest.mark.asyncio 217 | async def test_get_image_fills(mock_make_request): 218 | result = await figma_get_image_fills("test_file_key") 219 | 220 | # Verify make_request was called with correct parameters 221 | mock_make_request.assert_called_once_with("/files/test_file_key/images") 222 | 223 | 224 | # Test get_comments function 225 | @pytest.mark.asyncio 226 | async def test_get_comments(mock_make_request): 227 | result = await figma_get_comments("test_file_key") 228 | 229 | # Verify make_request was called with correct parameters 230 | mock_make_request.assert_called_once_with("/files/test_file_key/comments") 231 | 232 | 233 | # Test post_comment function 234 | @pytest.mark.asyncio 235 | async def test_post_comment(mock_make_request): 236 | # Test with minimal parameters 237 | result = await figma_post_comment("test_file_key", "Test comment") 238 | 239 | # Verify make_request was called with correct parameters 240 | mock_make_request.assert_called_once_with("/files/test_file_key/comments", "POST", {"message": "Test comment"}) 241 | 242 | # Reset mock for next test 243 | mock_make_request.reset_mock() 244 | 245 | # Test with all parameters 246 | client_meta = {"x": 100, "y": 200, "node_id": "node1", "node_offset": {"x": 10, "y": 20}} 247 | 248 | result = await figma_post_comment("test_file_key", "Test comment", client_meta=client_meta, comment_id="comment1") 249 | 250 | # Verify make_request was called with correct parameters 251 | mock_make_request.assert_called_once_with( 252 | "/files/test_file_key/comments", 253 | "POST", 254 | {"message": "Test comment", "client_meta": client_meta, "comment_id": "comment1"}, 255 | ) 256 | 257 | 258 | # Test delete_comment function 259 | @pytest.mark.asyncio 260 | async def test_delete_comment(mock_make_request): 261 | result = await figma_delete_comment("test_file_key", "comment1") 262 | 263 | # Verify make_request was called with correct parameters 264 | mock_make_request.assert_called_once_with("/files/test_file_key/comments/comment1", "DELETE") 265 | 266 | 267 | # Test get_team_projects function 268 | @pytest.mark.asyncio 269 | async def test_get_team_projects(mock_make_request): 270 | # Test with minimal parameters 271 | result = await figma_get_team_projects("team1") 272 | 273 | # Verify make_request was called with correct parameters 274 | mock_make_request.assert_called_once_with("/teams/team1/projects") 275 | 276 | # Reset mock for next test 277 | mock_make_request.reset_mock() 278 | 279 | # Test with all parameters 280 | result = await figma_get_team_projects("team1", page_size=10, cursor="cursor1") 281 | 282 | # Verify make_request was called with correct parameters 283 | mock_make_request.assert_called_once_with("/teams/team1/projects?page_size=10&cursor=cursor1") 284 | 285 | 286 | # Test get_project_files function 287 | @pytest.mark.asyncio 288 | async def test_get_project_files(mock_make_request): 289 | # Test with minimal parameters 290 | result = await figma_get_project_files("project1") 291 | 292 | # Verify make_request was called with correct parameters 293 | mock_make_request.assert_called_once_with("/projects/project1/files") 294 | 295 | # Reset mock for next test 296 | mock_make_request.reset_mock() 297 | 298 | # Test with all parameters 299 | result = await figma_get_project_files("project1", page_size=10, cursor="cursor1", branch_data=True) 300 | 301 | # Verify make_request was called with correct parameters 302 | mock_make_request.assert_called_once_with("/projects/project1/files?page_size=10&cursor=cursor1&branch_data=True") 303 | 304 | 305 | # Test get_team_components function 306 | @pytest.mark.asyncio 307 | async def test_get_team_components(mock_make_request): 308 | # Test with minimal parameters 309 | result = await figma_get_team_components("team1") 310 | 311 | # Verify make_request was called with correct parameters 312 | mock_make_request.assert_called_once_with("/teams/team1/components") 313 | 314 | # Reset mock for next test 315 | mock_make_request.reset_mock() 316 | 317 | # Test with all parameters 318 | result = await figma_get_team_components("team1", page_size=10, cursor="cursor1") 319 | 320 | # Verify make_request was called with correct parameters 321 | mock_make_request.assert_called_once_with("/teams/team1/components?page_size=10&cursor=cursor1") 322 | 323 | 324 | # Test get_file_components function 325 | @pytest.mark.asyncio 326 | async def test_get_file_components(mock_make_request): 327 | result = await figma_get_file_components("test_file_key") 328 | 329 | # Verify make_request was called with correct parameters 330 | mock_make_request.assert_called_once_with("/files/test_file_key/components") 331 | 332 | 333 | # Test get_component function 334 | @pytest.mark.asyncio 335 | async def test_get_component(mock_make_request): 336 | result = await figma_get_component("component1") 337 | 338 | # Verify make_request was called with correct parameters 339 | mock_make_request.assert_called_once_with("/components/component1") 340 | 341 | 342 | # Test get_team_component_sets function 343 | @pytest.mark.asyncio 344 | async def test_get_team_component_sets(mock_make_request): 345 | # Test with minimal parameters 346 | result = await figma_get_team_component_sets("team1") 347 | 348 | # Verify make_request was called with correct parameters 349 | mock_make_request.assert_called_once_with("/teams/team1/component_sets") 350 | 351 | # Reset mock for next test 352 | mock_make_request.reset_mock() 353 | 354 | # Test with all parameters 355 | result = await figma_get_team_component_sets("team1", page_size=10, cursor="cursor1") 356 | 357 | # Verify make_request was called with correct parameters 358 | mock_make_request.assert_called_once_with("/teams/team1/component_sets?page_size=10&cursor=cursor1") 359 | 360 | 361 | # Test get_team_styles function 362 | @pytest.mark.asyncio 363 | async def test_get_team_styles(mock_make_request): 364 | # Test with minimal parameters 365 | result = await figma_get_team_styles("team1") 366 | 367 | # Verify make_request was called with correct parameters 368 | mock_make_request.assert_called_once_with("/teams/team1/styles") 369 | 370 | # Reset mock for next test 371 | mock_make_request.reset_mock() 372 | 373 | # Test with all parameters 374 | result = await figma_get_team_styles("team1", page_size=10, cursor="cursor1") 375 | 376 | # Verify make_request was called with correct parameters 377 | mock_make_request.assert_called_once_with("/teams/team1/styles?page_size=10&cursor=cursor1") 378 | 379 | 380 | # Test get_file_styles function 381 | @pytest.mark.asyncio 382 | async def test_get_file_styles(mock_make_request): 383 | result = await figma_get_file_styles("test_file_key") 384 | 385 | # Verify make_request was called with correct parameters 386 | mock_make_request.assert_called_once_with("/files/test_file_key/styles") 387 | 388 | 389 | # Test get_style function 390 | @pytest.mark.asyncio 391 | async def test_get_style(mock_make_request): 392 | result = await figma_get_style("style1") 393 | 394 | # Verify make_request was called with correct parameters 395 | mock_make_request.assert_called_once_with("/styles/style1") 396 | ``` -------------------------------------------------------------------------------- /mcp_toolbox/file_ops/tools.py: -------------------------------------------------------------------------------- ```python 1 | """File operations tools for MCP-Toolbox.""" 2 | 3 | import re 4 | import stat 5 | from datetime import datetime 6 | from pathlib import Path 7 | from typing import Annotated, Any 8 | 9 | from pydantic import Field 10 | 11 | from mcp_toolbox.app import mcp 12 | 13 | 14 | @mcp.tool(description="Read file content.") 15 | async def read_file_content( 16 | path: Annotated[str, Field(description="Path to the file to read")], 17 | encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8", 18 | chunk_size: Annotated[ 19 | int, 20 | Field(default=1000000, description="Size of each chunk in bytes, default: 1MB"), 21 | ] = 1000000, 22 | chunk_index: Annotated[int, Field(default=0, description="Index of the chunk to retrieve, 0-based")] = 0, 23 | ) -> dict[str, Any]: 24 | """Read content from a file, with support for chunked reading for large files. 25 | 26 | Args: 27 | path: Path to the file to read 28 | encoding: Optional. File encoding (default: utf-8) 29 | chunk_size: Optional. Size of each chunk in bytes (default: 1000000, which about 1MB) 30 | chunk_index: Optional. Index of the chunk to retrieve, 0-based (default: 0) 31 | 32 | Returns: 33 | Dictionary containing content and metadata, including chunking information 34 | """ 35 | try: 36 | file_path = Path(path).expanduser() 37 | 38 | if not file_path.exists(): 39 | return { 40 | "error": f"File not found: {path}", 41 | "content": "", 42 | "success": False, 43 | } 44 | 45 | if not file_path.is_file(): 46 | return { 47 | "error": f"Path is not a file: {path}", 48 | "content": "", 49 | "success": False, 50 | } 51 | 52 | # Get file stats 53 | stats = file_path.stat() 54 | file_size = stats.st_size 55 | 56 | # Calculate total chunks 57 | total_chunks = (file_size + chunk_size - 1) // chunk_size # Ceiling division 58 | 59 | # Validate chunk_index 60 | if chunk_index < 0 or (file_size > 0 and chunk_index >= total_chunks): 61 | return { 62 | "error": f"Invalid chunk index: {chunk_index}. Valid range is 0 to {total_chunks - 1}", 63 | "content": "", 64 | "success": False, 65 | "total_chunks": total_chunks, 66 | "file_size": file_size, 67 | } 68 | 69 | # Calculate start and end positions for the chunk 70 | start_pos = chunk_index * chunk_size 71 | end_pos = min(start_pos + chunk_size, file_size) 72 | chunk_actual_size = end_pos - start_pos 73 | 74 | # Read the specified chunk 75 | content = "" 76 | with open(file_path, "rb") as f: 77 | f.seek(start_pos) 78 | chunk_bytes = f.read(chunk_actual_size) 79 | 80 | try: 81 | # Try to decode as text 82 | content = chunk_bytes.decode(encoding, errors="replace") 83 | except UnicodeDecodeError: 84 | # If decoding fails, return base64 encoded binary data 85 | import base64 86 | 87 | content = base64.b64encode(chunk_bytes).decode("ascii") 88 | encoding = f"base64 (original: {encoding})" 89 | 90 | return { 91 | "content": content, 92 | "size": file_size, 93 | "chunk_size": chunk_size, 94 | "chunk_index": chunk_index, 95 | "chunk_actual_size": chunk_actual_size, 96 | "total_chunks": total_chunks, 97 | "is_last_chunk": chunk_index == total_chunks - 1, 98 | "encoding": encoding, 99 | "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(), 100 | "success": True, 101 | } 102 | except UnicodeDecodeError: 103 | return { 104 | "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.", 105 | "content": "", 106 | "success": False, 107 | } 108 | except Exception as e: 109 | return { 110 | "error": f"Failed to read file: {e!s}", 111 | "content": "", 112 | "success": False, 113 | } 114 | 115 | 116 | @mcp.tool(description="Write content to a file.") 117 | async def write_file_content( 118 | path: Annotated[str, Field(description="Path to the file to write")], 119 | content: Annotated[str, Field(description="Content to write")], 120 | encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8", 121 | append: Annotated[bool, Field(default=False, description="Whether to append to the file")] = False, 122 | ) -> dict[str, Any]: 123 | """Write content to a file. 124 | 125 | Args: 126 | path: Path to the file to write 127 | content: Content to write to the file 128 | encoding: Optional. File encoding (default: utf-8) 129 | append: Optional. Whether to append to the file (default: False) 130 | 131 | Returns: 132 | Dictionary containing success status and metadata 133 | """ 134 | try: 135 | file_path = Path(path).expanduser() 136 | 137 | # Create parent directories if they don't exist 138 | file_path.parent.mkdir(parents=True, exist_ok=True) 139 | 140 | # Write content to file 141 | mode = "a" if append else "w" 142 | with open(file_path, mode, encoding=encoding) as f: 143 | f.write(content) 144 | 145 | # Get file stats 146 | stats = file_path.stat() 147 | 148 | return { 149 | "path": str(file_path), 150 | "size": stats.st_size, 151 | "last_modified": datetime.fromtimestamp(stats.st_mtime).isoformat(), 152 | "success": True, 153 | } 154 | except Exception as e: 155 | return { 156 | "error": f"Failed to write file: {e!s}", 157 | "path": path, 158 | "success": False, 159 | } 160 | 161 | 162 | @mcp.tool(description="Replace content in a file using regular expressions.") 163 | async def replace_in_file( 164 | path: Annotated[str, Field(description="Path to the file")], 165 | pattern: Annotated[ 166 | str, 167 | Field( 168 | description="Python regular expression pattern (re module). Supports groups, character classes, quantifiers, etc. Examples: '[a-z]+' for lowercase words, '\\d{3}-\\d{4}' for number patterns. Remember to escape backslashes." 169 | ), 170 | ], 171 | replacement: Annotated[str, Field(description="Replacement string")], 172 | encoding: Annotated[str, Field(default="utf-8", description="File encoding")] = "utf-8", 173 | count: Annotated[int, Field(default=0, description="Maximum number of replacements")] = 0, 174 | ) -> dict[str, Any]: 175 | """Replace content in a file using regular expressions. 176 | 177 | Args: 178 | path: Path to the file 179 | pattern: Regular expression pattern 180 | replacement: Replacement string 181 | encoding: Optional. File encoding (default: utf-8) 182 | count: Optional. Maximum number of replacements (default: 0, which means all occurrences) 183 | 184 | Returns: 185 | Dictionary containing success status and replacement information 186 | """ 187 | try: 188 | file_path = Path(path).expanduser() 189 | 190 | if not file_path.exists(): 191 | return { 192 | "error": f"File not found: {path}", 193 | "success": False, 194 | "replacements": 0, 195 | } 196 | 197 | if not file_path.is_file(): 198 | return { 199 | "error": f"Path is not a file: {path}", 200 | "success": False, 201 | "replacements": 0, 202 | } 203 | 204 | # Read file content 205 | with open(file_path, encoding=encoding) as f: 206 | content = f.read() 207 | 208 | # Compile regex pattern 209 | try: 210 | regex = re.compile(pattern) 211 | except re.error as e: 212 | return { 213 | "error": f"Invalid regular expression: {e!s}", 214 | "success": False, 215 | "replacements": 0, 216 | } 217 | 218 | # Replace content 219 | new_content, replacements = regex.subn(replacement, content, count=count) 220 | 221 | if replacements > 0: 222 | # Write updated content back to file 223 | with open(file_path, "w", encoding=encoding) as f: 224 | f.write(new_content) 225 | 226 | return { 227 | "path": str(file_path), 228 | "replacements": replacements, 229 | "success": True, 230 | } 231 | except UnicodeDecodeError: 232 | return { 233 | "error": f"Failed to decode file with encoding {encoding}. Try a different encoding.", 234 | "success": False, 235 | "replacements": 0, 236 | } 237 | except Exception as e: 238 | return { 239 | "error": f"Failed to replace content: {e!s}", 240 | "success": False, 241 | "replacements": 0, 242 | } 243 | 244 | 245 | def _format_mode(mode: int) -> str: 246 | """Format file mode into a string representation. 247 | 248 | Args: 249 | mode: File mode as an integer 250 | 251 | Returns: 252 | String representation of file permissions 253 | """ 254 | result = "" 255 | 256 | # File type 257 | if stat.S_ISDIR(mode): 258 | result += "d" 259 | elif stat.S_ISLNK(mode): 260 | result += "l" 261 | else: 262 | result += "-" 263 | 264 | # User permissions 265 | result += "r" if mode & stat.S_IRUSR else "-" 266 | result += "w" if mode & stat.S_IWUSR else "-" 267 | result += "x" if mode & stat.S_IXUSR else "-" 268 | 269 | # Group permissions 270 | result += "r" if mode & stat.S_IRGRP else "-" 271 | result += "w" if mode & stat.S_IWGRP else "-" 272 | result += "x" if mode & stat.S_IXGRP else "-" 273 | 274 | # Other permissions 275 | result += "r" if mode & stat.S_IROTH else "-" 276 | result += "w" if mode & stat.S_IWOTH else "-" 277 | result += "x" if mode & stat.S_IXOTH else "-" 278 | 279 | return result 280 | 281 | 282 | def _get_file_info(path: Path) -> dict[str, Any]: 283 | """Get detailed information about a file or directory. 284 | 285 | Args: 286 | path: Path to the file or directory 287 | 288 | Returns: 289 | Dictionary containing file information 290 | """ 291 | stats = path.stat() 292 | 293 | # Format timestamps 294 | mtime = datetime.fromtimestamp(stats.st_mtime).isoformat() 295 | ctime = datetime.fromtimestamp(stats.st_ctime).isoformat() 296 | atime = datetime.fromtimestamp(stats.st_atime).isoformat() 297 | 298 | # Get file type 299 | if path.is_dir(): 300 | file_type = "directory" 301 | elif path.is_symlink(): 302 | file_type = "symlink" 303 | else: 304 | file_type = "file" 305 | 306 | # Format size 307 | size = stats.st_size 308 | size_str = f"{size} bytes" 309 | if size >= 1024: 310 | size_str = f"{size / 1024:.2f} KB" 311 | if size >= 1024 * 1024: 312 | size_str = f"{size / (1024 * 1024):.2f} MB" 313 | if size >= 1024 * 1024 * 1024: 314 | size_str = f"{size / (1024 * 1024 * 1024):.2f} GB" 315 | 316 | return { 317 | "name": path.name, 318 | "path": str(path), 319 | "type": file_type, 320 | "size": size, 321 | "size_formatted": size_str, 322 | "permissions": _format_mode(stats.st_mode), 323 | "mode": stats.st_mode, 324 | "owner": stats.st_uid, 325 | "group": stats.st_gid, 326 | "created": ctime, 327 | "modified": mtime, 328 | "accessed": atime, 329 | } 330 | 331 | 332 | @mcp.tool(description="List directory contents with detailed information.") 333 | async def list_directory( # noqa: C901 334 | path: Annotated[str, Field(description="Directory path")], 335 | recursive: Annotated[bool, Field(default=False, description="Whether to list recursively")] = False, 336 | max_depth: Annotated[int, Field(default=-1, description="Maximum recursion depth")] = -1, 337 | include_hidden: Annotated[bool, Field(default=False, description="Whether to include hidden files")] = False, 338 | ignore_patterns: Annotated[ 339 | list[str] | None, 340 | Field( 341 | default=[ 342 | "node_modules", 343 | "dist", 344 | "build", 345 | "public", 346 | "static", 347 | ".next", 348 | ".git", 349 | ".vscode", 350 | ".idea", 351 | ".DS_Store", 352 | ".env", 353 | ".venv", 354 | ], 355 | description="Glob patterns to ignore (e.g. ['node_modules', '*.tmp'])", 356 | ), 357 | ] = None, 358 | ) -> dict[str, Any]: 359 | """List directory contents with detailed information. 360 | 361 | Args: 362 | path: Directory path 363 | recursive: Optional. Whether to list recursively (default: False) 364 | max_depth: Optional. Maximum recursion depth (default: -1, which means no limit) 365 | include_hidden: Optional. Whether to include hidden files (default: False) 366 | ignore_patterns: Optional. Glob patterns to ignore (default: ['node_modules', 'dist', 'build', 'public', 'static', '.next', '.git', '.vscode', '.idea', '.DS_Store', '.env', '.venv']) 367 | 368 | Returns: 369 | Dictionary containing directory contents and metadata 370 | """ 371 | ignore_patterns = ( 372 | ignore_patterns 373 | if ignore_patterns is not None 374 | else [ 375 | "node_modules", 376 | "dist", 377 | "build", 378 | "public", 379 | "static", 380 | ".next", 381 | ".git", 382 | ".vscode", 383 | ".idea", 384 | ".DS_Store", 385 | ".env", 386 | ".venv", 387 | ] 388 | ) 389 | try: 390 | dir_path = Path(path).expanduser() 391 | 392 | if not dir_path.exists(): 393 | return { 394 | "error": f"Directory not found: {path}", 395 | "entries": [], 396 | "success": False, 397 | } 398 | 399 | if not dir_path.is_dir(): 400 | return { 401 | "error": f"Path is not a directory: {path}", 402 | "entries": [], 403 | "success": False, 404 | } 405 | 406 | entries = [] 407 | 408 | # Import fnmatch for pattern matching 409 | import fnmatch 410 | 411 | def should_ignore(path: Path) -> bool: 412 | """Check if a path should be ignored based on ignore patterns. 413 | 414 | Args: 415 | path: Path to check 416 | 417 | Returns: 418 | True if the path should be ignored, False otherwise 419 | """ 420 | if not ignore_patterns: 421 | return False 422 | 423 | return any(fnmatch.fnmatch(path.name, pattern) for pattern in ignore_patterns) 424 | 425 | def process_directory(current_path: Path, current_depth: int = 0) -> None: 426 | """Process a directory and its contents recursively. 427 | 428 | Args: 429 | current_path: Path to the current directory 430 | current_depth: Current recursion depth 431 | """ 432 | nonlocal entries 433 | 434 | # Check if we've reached the maximum depth 435 | if max_depth >= 0 and current_depth > max_depth: 436 | return 437 | 438 | try: 439 | # List directory contents 440 | for item in current_path.iterdir(): 441 | # Skip hidden files if not included 442 | if not include_hidden and item.name.startswith("."): 443 | continue 444 | 445 | # Skip ignored patterns 446 | if should_ignore(item): 447 | continue 448 | 449 | # Get file information 450 | file_info = _get_file_info(item) 451 | file_info["depth"] = current_depth 452 | entries.append(file_info) 453 | 454 | # Recursively process subdirectories 455 | if recursive and item.is_dir(): 456 | process_directory(item, current_depth + 1) 457 | except PermissionError: 458 | # Add an entry indicating permission denied 459 | entries.append({ 460 | "name": current_path.name, 461 | "path": str(current_path), 462 | "type": "directory", 463 | "error": "Permission denied", 464 | "depth": current_depth, 465 | }) 466 | 467 | # Start processing from the root directory 468 | process_directory(dir_path) 469 | 470 | return { 471 | "path": str(dir_path), 472 | "entries": entries, 473 | "count": len(entries), 474 | "success": True, 475 | } 476 | except Exception as e: 477 | return { 478 | "error": f"Failed to list directory: {e!s}", 479 | "entries": [], 480 | "success": False, 481 | } 482 | ``` -------------------------------------------------------------------------------- /tests/file_ops/test_file_ops_tools.py: -------------------------------------------------------------------------------- ```python 1 | """Tests for file operations tools.""" 2 | 3 | import os 4 | import stat 5 | import tempfile 6 | from pathlib import Path 7 | from unittest.mock import patch 8 | 9 | import pytest 10 | 11 | from mcp_toolbox.file_ops.tools import ( 12 | _format_mode, 13 | _get_file_info, 14 | list_directory, 15 | read_file_content, 16 | replace_in_file, 17 | write_file_content, 18 | ) 19 | 20 | 21 | @pytest.mark.asyncio 22 | async def test_read_file_content(): 23 | """Test reading file content.""" 24 | # Create a temporary file 25 | with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file: 26 | temp_file.write("Test content") 27 | temp_path = temp_file.name 28 | 29 | try: 30 | # Test reading the entire file 31 | result = await read_file_content(temp_path) 32 | assert result["success"] is True 33 | assert result["content"] == "Test content" 34 | assert "size" in result 35 | assert "last_modified" in result 36 | assert result["total_chunks"] == 1 37 | assert result["chunk_index"] == 0 38 | assert result["is_last_chunk"] is True 39 | 40 | # Test reading a non-existent file 41 | result = await read_file_content("/non/existent/file") 42 | assert result["success"] is False 43 | assert "File not found" in result["error"] 44 | 45 | # Test reading a directory 46 | temp_dir = tempfile.mkdtemp() 47 | try: 48 | result = await read_file_content(temp_dir) 49 | assert result["success"] is False 50 | assert "Path is not a file" in result["error"] 51 | finally: 52 | os.rmdir(temp_dir) 53 | 54 | # Test reading a file with tilde in path 55 | with patch("pathlib.Path.expanduser", return_value=Path(temp_path)) as mock_expanduser: 56 | result = await read_file_content("~/test_file.txt") 57 | assert result["success"] is True 58 | assert result["content"] == "Test content" 59 | mock_expanduser.assert_called_once() 60 | 61 | # Test reading with custom chunk size 62 | result = await read_file_content(temp_path, chunk_size=5) 63 | assert result["success"] is True 64 | assert result["content"] == "Test " 65 | assert result["chunk_size"] == 5 66 | assert result["chunk_index"] == 0 67 | assert result["total_chunks"] == 3 # "Test content" is 12 chars, so 3 chunks of 5 bytes 68 | assert result["is_last_chunk"] is False 69 | 70 | # Test reading second chunk 71 | result = await read_file_content(temp_path, chunk_size=5, chunk_index=1) 72 | assert result["success"] is True 73 | assert result["content"] == "conte" 74 | assert result["chunk_index"] == 1 75 | assert result["is_last_chunk"] is False 76 | 77 | # Test reading last chunk 78 | result = await read_file_content(temp_path, chunk_size=5, chunk_index=2) 79 | assert result["success"] is True 80 | assert result["content"] == "nt" 81 | assert result["chunk_index"] == 2 82 | assert result["is_last_chunk"] is True 83 | assert result["chunk_actual_size"] == 2 # Only 2 bytes in the last chunk 84 | 85 | # Test reading with invalid chunk index 86 | result = await read_file_content(temp_path, chunk_size=5, chunk_index=3) 87 | assert result["success"] is False 88 | assert "Invalid chunk index" in result["error"] 89 | 90 | finally: 91 | # Clean up 92 | os.unlink(temp_path) 93 | 94 | 95 | @pytest.mark.asyncio 96 | async def test_write_file_content(): 97 | """Test writing file content.""" 98 | # Create a temporary directory 99 | with tempfile.TemporaryDirectory() as temp_dir: 100 | # Test writing to a new file 101 | file_path = os.path.join(temp_dir, "test_file.txt") 102 | result = await write_file_content(file_path, "Test content") 103 | assert result["success"] is True 104 | assert os.path.exists(file_path) 105 | with open(file_path) as f: 106 | assert f.read() == "Test content" 107 | 108 | # Test appending to a file 109 | result = await write_file_content(file_path, " appended", append=True) 110 | assert result["success"] is True 111 | with open(file_path) as f: 112 | assert f.read() == "Test content appended" 113 | 114 | # Test writing to a nested path 115 | nested_path = os.path.join(temp_dir, "nested", "dir", "test_file.txt") 116 | result = await write_file_content(nested_path, "Nested content") 117 | assert result["success"] is True 118 | assert os.path.exists(nested_path) 119 | with open(nested_path) as f: 120 | assert f.read() == "Nested content" 121 | 122 | # Test writing to a file with tilde in path 123 | tilde_path = "~/test_file_tilde.txt" 124 | expanded_path = os.path.join(temp_dir, "test_file_tilde.txt") 125 | 126 | with patch("pathlib.Path.expanduser", return_value=Path(expanded_path)) as mock_expanduser: 127 | result = await write_file_content(tilde_path, "Tilde content") 128 | assert result["success"] is True 129 | mock_expanduser.assert_called_once() 130 | # Verify the file was created at the expanded path 131 | assert os.path.exists(expanded_path) 132 | with open(expanded_path) as f: 133 | assert f.read() == "Tilde content" 134 | 135 | 136 | @pytest.mark.asyncio 137 | async def test_replace_in_file(): 138 | """Test replacing content in a file.""" 139 | # Create a temporary file 140 | with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file: 141 | temp_file.write("Hello world! This is a test.") 142 | temp_path = temp_file.name 143 | 144 | try: 145 | # Test replacing content 146 | result = await replace_in_file(temp_path, r"world", "universe") 147 | assert result["success"] is True 148 | assert result["replacements"] == 1 149 | with open(temp_path) as f: 150 | assert f.read() == "Hello universe! This is a test." 151 | 152 | # Test replacing with count 153 | result = await replace_in_file(temp_path, r"[aeiou]", "X", count=2) 154 | assert result["success"] is True 155 | assert result["replacements"] == 2 156 | with open(temp_path) as f: 157 | assert f.read() == "HXllX universe! This is a test." 158 | 159 | # Test replacing with invalid regex 160 | result = await replace_in_file(temp_path, r"[unclosed", "X") 161 | assert result["success"] is False 162 | assert "Invalid regular expression" in result["error"] 163 | 164 | # Test replacing in non-existent file 165 | result = await replace_in_file("/non/existent/file", r"test", "replacement") 166 | assert result["success"] is False 167 | assert "File not found" in result["error"] 168 | 169 | # Test replacing in a file with tilde in path 170 | with patch("pathlib.Path.expanduser", return_value=Path(temp_path)) as mock_expanduser: 171 | result = await replace_in_file("~/test_file.txt", r"HXllX", "Hello") 172 | assert result["success"] is True 173 | assert result["replacements"] == 1 174 | mock_expanduser.assert_called_once() 175 | with open(temp_path) as f: 176 | assert f.read() == "Hello universe! This is a test." 177 | 178 | finally: 179 | # Clean up 180 | os.unlink(temp_path) 181 | 182 | 183 | def test_format_mode(): 184 | """Test formatting file mode.""" 185 | # Directory with full permissions 186 | dir_mode = stat.S_IFDIR | 0o777 187 | assert _format_mode(dir_mode) == "drwxrwxrwx" 188 | 189 | # Regular file with read-only permissions 190 | file_mode = stat.S_IFREG | 0o444 191 | assert _format_mode(file_mode) == "-r--r--r--" 192 | 193 | # Executable file with owner-only permissions 194 | exec_mode = stat.S_IFREG | 0o700 195 | assert _format_mode(exec_mode) == "-rwx------" 196 | 197 | # Symlink with mixed permissions 198 | link_mode = stat.S_IFLNK | 0o751 199 | assert _format_mode(link_mode) == "lrwxr-x--x" 200 | 201 | 202 | @pytest.mark.asyncio 203 | async def test_list_directory(): 204 | """Test listing directory contents.""" 205 | # Create a temporary directory structure 206 | with tempfile.TemporaryDirectory() as temp_dir: 207 | # Create some files and subdirectories 208 | file1_path = os.path.join(temp_dir, "file1.txt") 209 | with open(file1_path, "w") as f: 210 | f.write("File 1 content") 211 | 212 | file2_path = os.path.join(temp_dir, "file2.txt") 213 | with open(file2_path, "w") as f: 214 | f.write("File 2 content") 215 | 216 | hidden_file_path = os.path.join(temp_dir, ".hidden_file") 217 | with open(hidden_file_path, "w") as f: 218 | f.write("Hidden file content") 219 | 220 | subdir_path = os.path.join(temp_dir, "subdir") 221 | os.mkdir(subdir_path) 222 | 223 | subfile_path = os.path.join(subdir_path, "subfile.txt") 224 | with open(subfile_path, "w") as f: 225 | f.write("Subfile content") 226 | 227 | # Create files for testing ignore patterns 228 | temp_file_path = os.path.join(temp_dir, "temp.tmp") 229 | with open(temp_file_path, "w") as f: 230 | f.write("Temporary file content") 231 | 232 | node_modules_path = os.path.join(temp_dir, "node_modules") 233 | os.mkdir(node_modules_path) 234 | 235 | node_module_file_path = os.path.join(node_modules_path, "package.json") 236 | with open(node_module_file_path, "w") as f: 237 | f.write('{"name": "test-package"}') 238 | 239 | cache_file_path = os.path.join(temp_dir, "cache.pyc") 240 | with open(cache_file_path, "w") as f: 241 | f.write("Python cache file") 242 | 243 | # Test basic directory listing 244 | result = await list_directory(temp_dir) 245 | assert result["success"] is True 246 | assert result["path"] == temp_dir 247 | assert ( 248 | len(result["entries"]) == 5 249 | ) # 4 files + 1 directory (node_modules is ignored by default), no hidden files 250 | assert result["count"] == 5 251 | 252 | # Test with hidden files 253 | result = await list_directory(temp_dir, include_hidden=True) 254 | assert result["success"] is True 255 | assert len(result["entries"]) == 6 # 4 files + 1 directory + 1 hidden file (node_modules is ignored by default) 256 | assert result["count"] == 6 257 | 258 | # Test recursive listing with explicit empty ignore patterns to see all files 259 | result = await list_directory(temp_dir, recursive=True, ignore_patterns=[]) 260 | assert result["success"] is True 261 | assert len(result["entries"]) == 8 # 4 files + 2 directories + 1 subfile + 1 node_module file 262 | assert result["count"] == 8 263 | 264 | # Test with max depth (with empty ignore patterns to ensure consistent behavior) 265 | result = await list_directory(temp_dir, recursive=True, max_depth=0, ignore_patterns=[]) 266 | assert result["success"] is True 267 | assert len(result["entries"]) == 6 # Only top-level entries 268 | assert result["count"] == 6 269 | 270 | # Test with ignore patterns - single pattern 271 | result = await list_directory(temp_dir, ignore_patterns=["*.tmp"]) 272 | assert result["success"] is True 273 | assert len(result["entries"]) == 5 # Excluding temp.tmp 274 | assert result["count"] == 5 275 | # Verify temp.tmp is not in the results 276 | assert not any(entry["name"] == "temp.tmp" for entry in result["entries"]) 277 | 278 | # Test with ignore patterns - directory pattern 279 | result = await list_directory(temp_dir, recursive=True, ignore_patterns=["node_modules"]) 280 | assert result["success"] is True 281 | assert len(result["entries"]) == 6 # Excluding node_modules directory and its contents 282 | assert result["count"] == 6 283 | # Verify node_modules is not in the results 284 | assert not any(entry["name"] == "node_modules" for entry in result["entries"]) 285 | 286 | # Test with multiple ignore patterns 287 | result = await list_directory(temp_dir, ignore_patterns=["*.tmp", "*.pyc"]) 288 | assert result["success"] is True 289 | assert len(result["entries"]) == 4 # Excluding temp.tmp and cache.pyc 290 | assert result["count"] == 4 291 | # Verify neither temp.tmp nor cache.pyc are in the results 292 | assert not any(entry["name"] == "temp.tmp" for entry in result["entries"]) 293 | assert not any(entry["name"] == "cache.pyc" for entry in result["entries"]) 294 | 295 | # Test default ignore patterns 296 | # Create directories and files that should be ignored by default 297 | git_dir_path = os.path.join(temp_dir, ".git") 298 | os.mkdir(git_dir_path) 299 | git_file_path = os.path.join(git_dir_path, "config") 300 | with open(git_file_path, "w") as f: 301 | f.write("Git config content") 302 | 303 | # Create a .DS_Store file that should be ignored by default 304 | ds_store_path = os.path.join(temp_dir, ".DS_Store") 305 | with open(ds_store_path, "w") as f: 306 | f.write("DS_Store content") 307 | 308 | # Create a node_modules directory that should be ignored by default 309 | node_modules_dir = os.path.join(temp_dir, "node_modules") 310 | os.makedirs(node_modules_dir, exist_ok=True) 311 | node_modules_file = os.path.join(node_modules_dir, "package.json") 312 | with open(node_modules_file, "w") as f: 313 | f.write('{"name": "test-package"}') 314 | 315 | # Test with explicit ignore patterns that match the default ones 316 | result = await list_directory( 317 | temp_dir, recursive=True, include_hidden=True, ignore_patterns=[".git", ".DS_Store", "node_modules"] 318 | ) 319 | assert result["success"] is True 320 | # Should not include .git directory or .DS_Store file due to specified ignore patterns 321 | assert not any(entry["name"] == ".git" for entry in result["entries"]) 322 | assert not any(entry["name"] == ".DS_Store" for entry in result["entries"]) 323 | assert not any(entry["name"] == "node_modules" for entry in result["entries"]) 324 | assert any(entry["name"] == ".hidden_file" for entry in result["entries"]) # Should include other hidden files 325 | 326 | # Test with explicit None for ignore_patterns (should use defaults) 327 | result = await list_directory(temp_dir, recursive=True, include_hidden=True, ignore_patterns=None) 328 | assert result["success"] is True 329 | # Should not include node_modules directory due to default ignore patterns 330 | assert not any(entry["name"] == "node_modules" for entry in result["entries"]) 331 | # Should not include .git directory due to default ignore patterns 332 | assert not any(entry["name"] == ".git" for entry in result["entries"]) 333 | # Should not include .DS_Store file due to default ignore patterns 334 | assert not any(entry["name"] == ".DS_Store" for entry in result["entries"]) 335 | 336 | # Test with empty list for ignore_patterns (should override defaults) 337 | result = await list_directory(temp_dir, recursive=True, include_hidden=True, ignore_patterns=[]) 338 | assert result["success"] is True 339 | # Should include .git directory and .DS_Store file since we're overriding defaults with empty list 340 | assert any(entry["name"] == ".git" for entry in result["entries"]) 341 | assert any(entry["name"] == ".DS_Store" for entry in result["entries"]) 342 | 343 | # Test combining ignore patterns with other parameters 344 | result = await list_directory( 345 | temp_dir, 346 | recursive=True, 347 | include_hidden=True, 348 | ignore_patterns=["node_modules", "*.tmp", "*.pyc", ".git", ".DS_Store"], 349 | ) 350 | assert result["success"] is True 351 | # Should include only .hidden_file, file1.txt, file2.txt, subdir, and subfile.txt 352 | assert len(result["entries"]) == 5 353 | assert result["count"] == 5 354 | # Verify specific files are included/excluded 355 | assert any(entry["name"] == ".hidden_file" for entry in result["entries"]) 356 | assert any(entry["name"] == "file1.txt" for entry in result["entries"]) 357 | assert any(entry["name"] == "file2.txt" for entry in result["entries"]) 358 | assert any(entry["name"] == "subdir" for entry in result["entries"]) 359 | assert any(entry["name"] == "subfile.txt" for entry in result["entries"]) 360 | # Verify excluded files 361 | assert not any(entry["name"] == "node_modules" for entry in result["entries"]) 362 | assert not any(entry["name"] == "temp.tmp" for entry in result["entries"]) 363 | assert not any(entry["name"] == "cache.pyc" for entry in result["entries"]) 364 | assert not any(entry["name"] == ".git" for entry in result["entries"]) 365 | assert not any(entry["name"] == ".DS_Store" for entry in result["entries"]) 366 | 367 | # Test non-existent directory 368 | result = await list_directory("/non/existent/dir") 369 | assert result["success"] is False 370 | assert "Directory not found" in result["error"] 371 | 372 | # Test file path instead of directory 373 | result = await list_directory(file1_path) 374 | assert result["success"] is False 375 | assert "Path is not a directory" in result["error"] 376 | 377 | # Test directory with tilde in path (with empty ignore patterns to ensure consistent behavior) 378 | with patch("pathlib.Path.expanduser", return_value=Path(temp_dir)) as mock_expanduser: 379 | result = await list_directory("~/test_dir", ignore_patterns=[]) 380 | assert result["success"] is True 381 | assert len(result["entries"]) == 6 # 4 files + 2 directories, no hidden files 382 | assert result["count"] == 6 383 | mock_expanduser.assert_called_once() 384 | 385 | 386 | def test_get_file_info(): 387 | """Test getting file information.""" 388 | # Create a temporary file 389 | with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file: 390 | temp_file.write("Test content") 391 | temp_path = temp_file.name 392 | 393 | try: 394 | # Get file info 395 | file_path = Path(temp_path) 396 | file_info = _get_file_info(file_path) 397 | 398 | # Check basic properties 399 | assert file_info["name"] == file_path.name 400 | assert file_info["path"] == str(file_path) 401 | assert file_info["type"] == "file" 402 | assert file_info["size"] == len("Test content") 403 | assert "size_formatted" in file_info 404 | assert "permissions" in file_info 405 | assert "mode" in file_info 406 | assert "owner" in file_info 407 | assert "group" in file_info 408 | assert "created" in file_info 409 | assert "modified" in file_info 410 | assert "accessed" in file_info 411 | 412 | finally: 413 | # Clean up 414 | os.unlink(temp_path) 415 | ```