This is page 2 of 2. Use http://codebase.md/montevive/penpot-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .editorconfig
├── .flake8
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ └── feature_request.md
│ ├── PULL_REQUEST_TEMPLATE.md
│ ├── SETUP_CICD.md
│ └── workflows
│ ├── ci.yml
│ ├── code-quality.yml
│ ├── publish.yml
│ └── version-bump.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .vscode
│ └── launch.json
├── CHANGELOG.md
├── CLAUDE_INTEGRATION.md
├── CLAUDE.md
├── CONTRIBUTING.md
├── env.example
├── fix-lint-deps.sh
├── images
│ └── penpot-mcp.png
├── lint.py
├── LINTING.md
├── Makefile
├── penpot_mcp
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ └── penpot_api.py
│ ├── resources
│ │ ├── penpot-schema.json
│ │ └── penpot-tree-schema.json
│ ├── server
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mcp_server.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── cli
│ │ │ ├── __init__.py
│ │ │ ├── tree_cmd.py
│ │ │ └── validate_cmd.py
│ │ └── penpot_tree.py
│ └── utils
│ ├── __init__.py
│ ├── cache.py
│ ├── config.py
│ └── http_server.py
├── pyproject.toml
├── README.md
├── SECURITY.md
├── test_credentials.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_cache.py
│ ├── test_config.py
│ ├── test_mcp_server.py
│ └── test_penpot_tree.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/penpot_mcp/tools/penpot_tree.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tool for building and visualizing the structure of Penpot files as a tree.
3 |
4 | This module provides functionality to parse Penpot file data and generate
5 | a tree representation, which can be displayed or exported.
6 | """
7 |
8 | import re
9 | from typing import Any, Dict, List, Optional, Union
10 |
11 | from anytree import Node, RenderTree
12 | from anytree.exporter import DotExporter
13 |
14 |
15 | def build_tree(data: Dict[str, Any]) -> Node:
16 | """
17 | Build a tree representation of Penpot file data.
18 |
19 | Args:
20 | data: The Penpot file data
21 |
22 | Returns:
23 | The root node of the tree
24 | """
25 | # Create nodes dictionary with ID as key
26 | nodes = {}
27 |
28 | # Create a synthetic root node with a special ID that won't conflict
29 | synthetic_root_id = "SYNTHETIC-ROOT"
30 | root = Node(f"{synthetic_root_id} (root) - Root")
31 | nodes[synthetic_root_id] = root
32 |
33 | # Add components section
34 | components_node = Node(f"components (section) - Components", parent=root)
35 |
36 | # Store component annotations for later reference
37 | component_annotations = {}
38 |
39 | # Process components
40 | for comp_id, comp_data in data.get('components', {}).items():
41 | comp_name = comp_data.get('name', 'Unnamed')
42 | comp_node = Node(f"{comp_id} (component) - {comp_name}", parent=components_node)
43 | nodes[comp_id] = comp_node
44 |
45 | # Store annotation if present
46 | if 'annotation' in comp_data and comp_data['annotation']:
47 | component_annotations[comp_id] = comp_data['annotation']
48 |
49 | # First pass: create all page nodes
50 | for page_id, page_data in data.get('pagesIndex', {}).items():
51 | # Create page node
52 | page_name = page_data.get('name', 'Unnamed')
53 | page_node = Node(f"{page_id} (page) - {page_name}", parent=root)
54 | nodes[page_id] = page_node
55 |
56 | # Second pass: process each page and its objects
57 | for page_id, page_data in data.get('pagesIndex', {}).items():
58 | page_name = page_data.get('name', 'Unnamed')
59 |
60 | # Create a page-specific dictionary for objects to avoid ID collisions
61 | page_nodes = {}
62 |
63 | # First, create all object nodes for this page
64 | for obj_id, obj_data in page_data.get('objects', {}).items():
65 | obj_type = obj_data.get('type', 'unknown')
66 | obj_name = obj_data.get('name', 'Unnamed')
67 |
68 | # Make a unique key that includes the page ID to avoid collisions
69 | page_obj_id = f"{page_id}:{obj_id}"
70 |
71 | node = Node(f"{obj_id} ({obj_type}) - {obj_name}")
72 | page_nodes[obj_id] = node # Store with original ID for this page's lookup
73 |
74 | # Store additional properties for filtering
75 | node.obj_type = obj_type
76 | node.obj_name = obj_name
77 | node.obj_id = obj_id
78 |
79 | # Add component reference if this is a component instance
80 | if 'componentId' in obj_data and obj_data['componentId'] in nodes:
81 | comp_ref = obj_data['componentId']
82 | node.componentRef = comp_ref
83 |
84 | # If this component has an annotation, store it
85 | if comp_ref in component_annotations:
86 | node.componentAnnotation = component_annotations[comp_ref]
87 |
88 | # Identify the all-zeros root frame for this page
89 | all_zeros_id = "00000000-0000-0000-0000-000000000000"
90 | page_root_frame = None
91 |
92 | # First, find and connect the all-zeros root frame if it exists
93 | if all_zeros_id in page_data.get('objects', {}):
94 | page_root_frame = page_nodes[all_zeros_id]
95 | page_root_frame.parent = nodes[page_id]
96 |
97 | # Then build parent-child relationships for this page
98 | for obj_id, obj_data in page_data.get('objects', {}).items():
99 | # Skip the all-zeros root frame as we already processed it
100 | if obj_id == all_zeros_id:
101 | continue
102 |
103 | parent_id = obj_data.get('parentId')
104 |
105 | # Skip if parent ID is the same as object ID (circular reference)
106 | if parent_id and parent_id == obj_id:
107 | print(
108 | f"Warning: Object {obj_id} references itself as parent. Attaching to page instead.")
109 | page_nodes[obj_id].parent = nodes[page_id]
110 | elif parent_id and parent_id in page_nodes:
111 | # Check for circular references in the node hierarchy
112 | is_circular = False
113 | check_node = page_nodes[parent_id]
114 | while check_node.parent is not None:
115 | if hasattr(check_node.parent, 'obj_id') and check_node.parent.obj_id == obj_id:
116 | is_circular = True
117 | break
118 | check_node = check_node.parent
119 |
120 | if is_circular:
121 | print(
122 | f"Warning: Circular reference detected for {obj_id}. Attaching to page instead.")
123 | page_nodes[obj_id].parent = nodes[page_id]
124 | else:
125 | page_nodes[obj_id].parent = page_nodes[parent_id]
126 | else:
127 | # If no parent or parent not found, connect to the all-zeros root frame if it exists,
128 | # otherwise connect to the page
129 | if page_root_frame:
130 | page_nodes[obj_id].parent = page_root_frame
131 | else:
132 | page_nodes[obj_id].parent = nodes[page_id]
133 |
134 | return root
135 |
136 |
137 | def print_tree(root: Node, filter_pattern: Optional[str] = None) -> None:
138 | """
139 | Print a tree representation to the console, with optional filtering.
140 |
141 | Args:
142 | root: The root node of the tree
143 | filter_pattern: Optional regex pattern to filter nodes
144 | """
145 | matched_nodes = []
146 |
147 | # Apply filtering
148 | if filter_pattern:
149 | # Find all nodes that match the filter
150 | pattern = re.compile(filter_pattern, re.IGNORECASE)
151 |
152 | # Helper function to check if a node matches the filter
153 | def matches_filter(node):
154 | if not hasattr(node, 'obj_type') and not hasattr(node, 'obj_name'):
155 | return False # Root node or section nodes
156 |
157 | if pattern.search(
158 | node.obj_type) or pattern.search(
159 | node.obj_name) or pattern.search(
160 | node.obj_id):
161 | return True
162 | return False
163 |
164 | # Find all matching nodes and their paths to root
165 | for pre, _, node in RenderTree(root):
166 | if matches_filter(node):
167 | matched_nodes.append(node)
168 |
169 | # If we found matches, only print these nodes and their ancestors
170 | if matched_nodes:
171 | print(f"Filtered results matching '{filter_pattern}':")
172 |
173 | # Build a set of all nodes to show (matching nodes and their ancestors)
174 | nodes_to_show = set()
175 | for node in matched_nodes:
176 | # Add the node and all its ancestors
177 | current = node
178 | while current is not None:
179 | nodes_to_show.add(current)
180 | current = current.parent
181 |
182 | # Print the filtered tree
183 | for pre, _, node in RenderTree(root):
184 | if node in nodes_to_show:
185 | node_name = node.name
186 | if hasattr(node, 'componentRef'):
187 | comp_ref_str = f" (refs component: {node.componentRef}"
188 | if hasattr(node, 'componentAnnotation'):
189 | comp_ref_str += f" - Note: {node.componentAnnotation}"
190 | comp_ref_str += ")"
191 | node_name += comp_ref_str
192 |
193 | # Highlight matched nodes
194 | if node in matched_nodes:
195 | print(f"{pre}{node_name} <-- MATCH")
196 | else:
197 | print(f"{pre}{node_name}")
198 |
199 | print(f"\nFound {len(matched_nodes)} matching objects.")
200 | return
201 |
202 | # If no filter or no matches, print the entire tree
203 | for pre, _, node in RenderTree(root):
204 | node_name = node.name
205 | if hasattr(node, 'componentRef'):
206 | comp_ref_str = f" (refs component: {node.componentRef}"
207 | if hasattr(node, 'componentAnnotation'):
208 | comp_ref_str += f" - Note: {node.componentAnnotation}"
209 | comp_ref_str += ")"
210 | node_name += comp_ref_str
211 | print(f"{pre}{node_name}")
212 |
213 |
214 | def export_tree_to_dot(root: Node, output_file: str, filter_pattern: Optional[str] = None) -> bool:
215 | """
216 | Export the tree to a DOT file (Graphviz format).
217 |
218 | Args:
219 | root: The root node of the tree
220 | output_file: Path to save the exported file
221 | filter_pattern: Optional regex pattern to filter nodes
222 |
223 | Returns:
224 | True if successful, False otherwise
225 | """
226 | try:
227 | # If filtering, we may want to only export the filtered tree
228 | if filter_pattern:
229 | # TODO: Implement filtered export
230 | pass
231 |
232 | DotExporter(root).to_picture(output_file)
233 | print(f"Tree exported to {output_file}")
234 | return True
235 | except Exception as e:
236 | print(f"Warning: Could not export to {output_file}: {e}")
237 | print("Make sure Graphviz is installed: https://graphviz.org/download/")
238 | return False
239 |
240 |
241 | def find_page_containing_object(content: Dict[str, Any], object_id: str) -> Optional[str]:
242 | """
243 | Find which page contains the specified object.
244 |
245 | Args:
246 | content: The Penpot file content
247 | object_id: The ID of the object to find
248 |
249 | Returns:
250 | The page ID containing the object, or None if not found
251 | """
252 | # Helper function to recursively search for an object in the hierarchy
253 | def find_object_in_hierarchy(objects_dict, target_id):
254 | # Check if the object is directly in the dictionary
255 | if target_id in objects_dict:
256 | return True
257 |
258 | # Check if the object is a child of any object in the dictionary
259 | for obj_id, obj_data in objects_dict.items():
260 | # Look for objects that have shapes (children)
261 | if "shapes" in obj_data and target_id in obj_data["shapes"]:
262 | return True
263 |
264 | # Check in children elements if any
265 | if "children" in obj_data:
266 | child_objects = {child["id"]: child for child in obj_data["children"]}
267 | if find_object_in_hierarchy(child_objects, target_id):
268 | return True
269 |
270 | return False
271 |
272 | # Check each page
273 | for page_id, page_data in content.get('pagesIndex', {}).items():
274 | objects_dict = page_data.get('objects', {})
275 | if find_object_in_hierarchy(objects_dict, object_id):
276 | return page_id
277 |
278 | return None
279 |
280 |
281 | def find_object_in_tree(tree: Node, target_id: str) -> Optional[Dict[str, Any]]:
282 | """
283 | Find an object in the tree by its ID and return its subtree as a dictionary.
284 |
285 | Args:
286 | tree: The root node of the tree
287 | target_id: The ID of the object to find
288 |
289 | Returns:
290 | Dictionary representation of the object's subtree, or None if not found
291 | """
292 | # Helper function to search in a node's children
293 | def find_object_in_children(node, target_id):
294 | for child in node.children:
295 | if hasattr(child, 'obj_id') and child.obj_id == target_id:
296 | return convert_node_to_dict(child)
297 |
298 | result = find_object_in_children(child, target_id)
299 | if result:
300 | return result
301 | return None
302 |
303 | # Iterate through the tree's children
304 | for child in tree.children:
305 | # Check if this is a page node (contains "(page)" in its name)
306 | if "(page)" in child.name:
307 | # Check all objects under this page
308 | for obj in child.children:
309 | if hasattr(obj, 'obj_id') and obj.obj_id == target_id:
310 | return convert_node_to_dict(obj)
311 |
312 | # Check children recursively
313 | result = find_object_in_children(obj, target_id)
314 | if result:
315 | return result
316 | return None
317 |
318 |
319 | def convert_node_to_dict(node: Node) -> Dict[str, Any]:
320 | """
321 | Convert an anytree.Node to a dictionary format for API response.
322 |
323 | Args:
324 | node: The node to convert
325 |
326 | Returns:
327 | Dictionary representation of the node and its subtree
328 | """
329 | result = {
330 | 'id': node.obj_id if hasattr(node, 'obj_id') else None,
331 | 'type': node.obj_type if hasattr(node, 'obj_type') else None,
332 | 'name': node.obj_name if hasattr(node, 'obj_name') else None,
333 | 'children': []
334 | }
335 |
336 | # Add component reference if available
337 | if hasattr(node, 'componentRef'):
338 | result['componentRef'] = node.componentRef
339 |
340 | # Add component annotation if available
341 | if hasattr(node, 'componentAnnotation'):
342 | result['componentAnnotation'] = node.componentAnnotation
343 |
344 | # Recursively add children
345 | for child in node.children:
346 | result['children'].append(convert_node_to_dict(child))
347 |
348 | return result
349 |
350 |
351 | def get_object_subtree(file_data: Dict[str, Any], object_id: str) -> Dict[str, Union[Dict, str]]:
352 | """
353 | Get a simplified tree representation of an object and its children.
354 |
355 | Args:
356 | file_data: The Penpot file data
357 | object_id: The ID of the object to get the tree for
358 |
359 | Returns:
360 | Dictionary containing the simplified tree or an error message
361 | """
362 | try:
363 | # Get the content from file data
364 | content = file_data.get('data')
365 |
366 | # Find which page contains the object
367 | page_id = find_page_containing_object(content, object_id)
368 |
369 | if not page_id:
370 | return {"error": f"Object {object_id} not found in file"}
371 |
372 | # Build the full tree
373 | full_tree = build_tree(content)
374 |
375 | # Find the object in the full tree and extract its subtree
376 | simplified_tree = find_object_in_tree(full_tree, object_id)
377 |
378 | if not simplified_tree:
379 | return {"error": f"Object {object_id} not found in tree structure"}
380 |
381 | return {
382 | "tree": simplified_tree,
383 | "page_id": page_id
384 | }
385 | except Exception as e:
386 | return {"error": str(e)}
387 |
388 |
389 | def get_object_subtree_with_fields(file_data: Dict[str, Any], object_id: str,
390 | include_fields: Optional[List[str]] = None,
391 | depth: int = -1) -> Dict[str, Any]:
392 | """
393 | Get a filtered tree representation of an object with only specified fields.
394 |
395 | This function finds an object in the Penpot file data and returns a subtree
396 | with the object as the root, including only the specified fields and limiting
397 | the depth of the tree if requested.
398 |
399 | Args:
400 | file_data: The Penpot file data
401 | object_id: The ID of the object to get the tree for
402 | include_fields: List of field names to include in the output (None means include all)
403 | depth: Maximum depth of the tree (-1 means no limit)
404 |
405 | Returns:
406 | Dictionary containing the filtered tree or an error message
407 | """
408 | try:
409 | # Get the content from file data
410 | content = file_data.get('data', file_data)
411 |
412 | # Find which page contains the object
413 | page_id = find_page_containing_object(content, object_id)
414 |
415 | if not page_id:
416 | return {"error": f"Object {object_id} not found in file"}
417 |
418 | # Get the page data
419 | page_data = content.get('pagesIndex', {}).get(page_id, {})
420 | objects_dict = page_data.get('objects', {})
421 |
422 | # Check if the object exists in this page
423 | if object_id not in objects_dict:
424 | return {"error": f"Object {object_id} not found in page {page_id}"}
425 |
426 | # Track visited nodes to prevent infinite loops
427 | visited = set()
428 |
429 | # Function to recursively build the filtered object tree
430 | def build_filtered_object_tree(obj_id: str, current_depth: int = 0):
431 | if obj_id not in objects_dict:
432 | return None
433 |
434 | # Check for circular reference
435 | if obj_id in visited:
436 | # Return a placeholder to indicate circular reference
437 | return {
438 | 'id': obj_id,
439 | 'name': objects_dict[obj_id].get('name', 'Unnamed'),
440 | 'type': objects_dict[obj_id].get('type', 'unknown'),
441 | '_circular_reference': True
442 | }
443 |
444 | # Mark this object as visited
445 | visited.add(obj_id)
446 |
447 | obj_data = objects_dict[obj_id]
448 |
449 | # Create a new dict with only the requested fields or all fields if None
450 | if include_fields is None:
451 | filtered_obj = obj_data.copy()
452 | else:
453 | filtered_obj = {field: obj_data[field] for field in include_fields if field in obj_data}
454 |
455 | # Always include the id field
456 | filtered_obj['id'] = obj_id
457 |
458 | # If depth limit reached, don't process children
459 | if depth != -1 and current_depth >= depth:
460 | # Remove from visited before returning
461 | visited.remove(obj_id)
462 | return filtered_obj
463 |
464 | # Find all children of this object
465 | children = []
466 | for child_id, child_data in objects_dict.items():
467 | if child_data.get('parentId') == obj_id:
468 | child_tree = build_filtered_object_tree(child_id, current_depth + 1)
469 | if child_tree:
470 | children.append(child_tree)
471 |
472 | # Add children field only if we have children
473 | if children:
474 | filtered_obj['children'] = children
475 |
476 | # Remove from visited after processing
477 | visited.remove(obj_id)
478 |
479 | return filtered_obj
480 |
481 | # Build the filtered tree starting from the requested object
482 | object_tree = build_filtered_object_tree(object_id)
483 |
484 | if not object_tree:
485 | return {"error": f"Failed to build object tree for {object_id}"}
486 |
487 | return {
488 | "tree": object_tree,
489 | "page_id": page_id
490 | }
491 |
492 | except Exception as e:
493 | return {"error": str(e)}
494 |
```
--------------------------------------------------------------------------------
/penpot_mcp/server/mcp_server.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Main MCP server implementation for Penpot.
3 |
4 | This module defines the MCP server with resources and tools for interacting with
5 | the Penpot design platform.
6 | """
7 |
8 | import argparse
9 | import hashlib
10 | import json
11 | import os
12 | import re
13 | import sys
14 | from typing import Dict, List, Optional
15 |
16 | from mcp.server.fastmcp import FastMCP, Image
17 |
18 | from penpot_mcp.api.penpot_api import CloudFlareError, PenpotAPI, PenpotAPIError
19 | from penpot_mcp.tools.penpot_tree import get_object_subtree_with_fields
20 | from penpot_mcp.utils import config
21 | from penpot_mcp.utils.cache import MemoryCache
22 | from penpot_mcp.utils.http_server import ImageServer
23 |
24 |
25 | class PenpotMCPServer:
26 | """Penpot MCP Server implementation."""
27 |
28 | def __init__(self, name="Penpot MCP Server", test_mode=False):
29 | """
30 | Initialize the Penpot MCP Server.
31 |
32 | Args:
33 | name: Server name
34 | test_mode: If True, certain features like HTTP server will be disabled for testing
35 | """
36 | # Initialize the MCP server
37 | self.mcp = FastMCP(name, instructions="""
38 | I can help you generate code from your Penpot UI designs. My primary aim is to convert Penpot design components into functional code.
39 |
40 | The typical workflow for code generation from Penpot designs is:
41 |
42 | 1. List your projects using 'list_projects' to find the project containing your designs
43 | 2. List files within the project using 'get_project_files' to locate the specific design file
44 | 3. Search for the target component within the file using 'search_object' to find the component you want to convert
45 | 4. Retrieve the Penpot tree schema using 'penpot_tree_schema' to understand which fields are available in the object tree
46 | 5. Get a cropped version of the object tree with a screenshot using 'get_object_tree' to see the component structure and visual representation
47 | 6. Get the full screenshot of the object using 'get_rendered_component' for detailed visual reference
48 |
49 | For complex designs, you may need multiple iterations of 'get_object_tree' and 'get_rendered_component' due to LLM context limits.
50 |
51 | Use the resources to access schemas, cached files, and rendered objects (screenshots) as needed.
52 |
53 | Let me know which Penpot design you'd like to convert to code, and I'll guide you through the process!
54 | """)
55 |
56 | # Initialize the Penpot API
57 | self.api = PenpotAPI(
58 | base_url=config.PENPOT_API_URL,
59 | debug=config.DEBUG
60 | )
61 |
62 | # Initialize memory cache
63 | self.file_cache = MemoryCache(ttl_seconds=600) # 10 minutes
64 |
65 | # Storage for rendered component images
66 | self.rendered_components: Dict[str, Image] = {}
67 |
68 | # Initialize HTTP server for images if enabled and not in test mode
69 | self.image_server = None
70 | self.image_server_url = None
71 |
72 | # Detect if running in a test environment
73 | is_test_env = test_mode or 'pytest' in sys.modules
74 |
75 | if config.ENABLE_HTTP_SERVER and not is_test_env:
76 | try:
77 | self.image_server = ImageServer(
78 | host=config.HTTP_SERVER_HOST,
79 | port=config.HTTP_SERVER_PORT
80 | )
81 | # Start the server and get the URL with actual port assigned
82 | self.image_server_url = self.image_server.start()
83 | print(f"Image server started at {self.image_server_url}")
84 | except Exception as e:
85 | print(f"Warning: Failed to start image server: {str(e)}")
86 |
87 | # Register resources and tools
88 | if config.RESOURCES_AS_TOOLS:
89 | self._register_resources(resources_only=True)
90 | self._register_tools(include_resource_tools=True)
91 | else:
92 | self._register_resources(resources_only=False)
93 | self._register_tools(include_resource_tools=False)
94 |
95 | def _handle_api_error(self, e: Exception) -> dict:
96 | """Handle API errors and return user-friendly error messages."""
97 | if isinstance(e, CloudFlareError):
98 | return {
99 | "error": "CloudFlare Protection",
100 | "message": str(e),
101 | "error_type": "cloudflare_protection",
102 | "instructions": [
103 | "Open your web browser and navigate to https://design.penpot.app",
104 | "Log in to your Penpot account",
105 | "Complete any CloudFlare human verification challenges if prompted",
106 | "Once verified, try your request again"
107 | ]
108 | }
109 | elif isinstance(e, PenpotAPIError):
110 | return {
111 | "error": "Penpot API Error",
112 | "message": str(e),
113 | "error_type": "api_error",
114 | "status_code": getattr(e, 'status_code', None)
115 | }
116 | else:
117 | return {"error": str(e)}
118 |
119 | def _register_resources(self, resources_only=False):
120 | """Register all MCP resources. If resources_only is True, only register server://info as a resource."""
121 | @self.mcp.resource("server://info")
122 | def server_info() -> dict:
123 | """Provide information about the server."""
124 | info = {
125 | "status": "online",
126 | "name": "Penpot MCP Server",
127 | "description": "Model Context Provider for Penpot",
128 | "api_url": config.PENPOT_API_URL
129 | }
130 |
131 | if self.image_server and self.image_server.is_running:
132 | info["image_server"] = self.image_server_url
133 |
134 | return info
135 | if resources_only:
136 | return
137 | @self.mcp.resource("penpot://schema", mime_type="application/schema+json")
138 | def penpot_schema() -> dict:
139 | """Provide the Penpot API schema as JSON."""
140 | schema_path = os.path.join(config.RESOURCES_PATH, 'penpot-schema.json')
141 | try:
142 | with open(schema_path, 'r') as f:
143 | return json.load(f)
144 | except Exception as e:
145 | return {"error": f"Failed to load schema: {str(e)}"}
146 | @self.mcp.resource("penpot://tree-schema", mime_type="application/schema+json")
147 | def penpot_tree_schema() -> dict:
148 | """Provide the Penpot object tree schema as JSON."""
149 | schema_path = os.path.join(config.RESOURCES_PATH, 'penpot-tree-schema.json')
150 | try:
151 | with open(schema_path, 'r') as f:
152 | return json.load(f)
153 | except Exception as e:
154 | return {"error": f"Failed to load tree schema: {str(e)}"}
155 | @self.mcp.resource("rendered-component://{component_id}", mime_type="image/png")
156 | def get_rendered_component(component_id: str) -> Image:
157 | """Return a rendered component image by its ID."""
158 | if component_id in self.rendered_components:
159 | return self.rendered_components[component_id]
160 | raise Exception(f"Component with ID {component_id} not found")
161 | @self.mcp.resource("penpot://cached-files")
162 | def get_cached_files() -> dict:
163 | """List all files currently stored in the cache."""
164 | return self.file_cache.get_all_cached_files()
165 |
166 | def _register_tools(self, include_resource_tools=False):
167 | """Register all MCP tools. If include_resource_tools is True, also register resource logic as tools."""
168 | @self.mcp.tool()
169 | def list_projects() -> dict:
170 | """Retrieve a list of all available Penpot projects."""
171 | try:
172 | projects = self.api.list_projects()
173 | return {"projects": projects}
174 | except Exception as e:
175 | return self._handle_api_error(e)
176 | @self.mcp.tool()
177 | def get_project_files(project_id: str) -> dict:
178 | """Get all files contained within a specific Penpot project.
179 |
180 | Args:
181 | project_id: The ID of the Penpot project
182 | """
183 | try:
184 | files = self.api.get_project_files(project_id)
185 | return {"files": files}
186 | except Exception as e:
187 | return self._handle_api_error(e)
188 | def get_cached_file(file_id: str) -> dict:
189 | """Internal helper to retrieve a file, using cache if available.
190 |
191 | Args:
192 | file_id: The ID of the Penpot file
193 | """
194 | cached_data = self.file_cache.get(file_id)
195 | if cached_data is not None:
196 | return cached_data
197 | try:
198 | file_data = self.api.get_file(file_id=file_id)
199 | self.file_cache.set(file_id, file_data)
200 | return file_data
201 | except Exception as e:
202 | return self._handle_api_error(e)
203 | @self.mcp.tool()
204 | def get_file(file_id: str) -> dict:
205 | """Retrieve a Penpot file by its ID and cache it. Don't use this tool for code generation, use 'get_object_tree' instead.
206 |
207 | Args:
208 | file_id: The ID of the Penpot file
209 | """
210 | try:
211 | file_data = self.api.get_file(file_id=file_id)
212 | self.file_cache.set(file_id, file_data)
213 | return file_data
214 | except Exception as e:
215 | return self._handle_api_error(e)
216 | @self.mcp.tool()
217 | def export_object(
218 | file_id: str,
219 | page_id: str,
220 | object_id: str,
221 | export_type: str = "png",
222 | scale: int = 1) -> Image:
223 | """Export a Penpot design object as an image.
224 |
225 | Args:
226 | file_id: The ID of the Penpot file
227 | page_id: The ID of the page containing the object
228 | object_id: The ID of the object to export
229 | export_type: Image format (png, svg, etc.)
230 | scale: Scale factor for the exported image
231 | """
232 | temp_filename = None
233 | try:
234 | import tempfile
235 | temp_dir = tempfile.gettempdir()
236 | temp_filename = os.path.join(temp_dir, f"{object_id}.{export_type}")
237 | output_path = self.api.export_and_download(
238 | file_id=file_id,
239 | page_id=page_id,
240 | object_id=object_id,
241 | export_type=export_type,
242 | scale=scale,
243 | save_to_file=temp_filename
244 | )
245 | with open(output_path, "rb") as f:
246 | file_content = f.read()
247 |
248 | image = Image(data=file_content, format=export_type)
249 |
250 | # If HTTP server is enabled, add the image to the server
251 | if self.image_server and self.image_server.is_running:
252 | image_id = hashlib.md5(f"{file_id}:{page_id}:{object_id}".encode()).hexdigest()
253 | # Use the current image_server_url to ensure the correct port
254 | image_url = self.image_server.add_image(image_id, file_content, export_type)
255 | # Add HTTP URL to the image metadata
256 | image.http_url = image_url
257 |
258 | return image
259 | except Exception as e:
260 | if isinstance(e, CloudFlareError):
261 | raise Exception(f"CloudFlare Protection: {str(e)}")
262 | else:
263 | raise Exception(f"Export failed: {str(e)}")
264 | finally:
265 | if temp_filename and os.path.exists(temp_filename):
266 | try:
267 | os.remove(temp_filename)
268 | except Exception as e:
269 | print(f"Warning: Failed to delete temporary file {temp_filename}: {str(e)}")
270 | @self.mcp.tool()
271 | def get_object_tree(
272 | file_id: str,
273 | object_id: str,
274 | fields: List[str],
275 | depth: int = -1,
276 | format: str = "json"
277 | ) -> dict:
278 | """Get the object tree structure for a Penpot object ("tree" field) with rendered screenshot image of the object ("image.mcp_uri" field).
279 | Args:
280 | file_id: The ID of the Penpot file
281 | object_id: The ID of the object to retrieve
282 | fields: Specific fields to include in the tree (call "penpot_tree_schema" resource/tool for available fields)
283 | depth: How deep to traverse the object tree (-1 for full depth)
284 | format: Output format ('json' or 'yaml')
285 | """
286 | try:
287 | file_data = get_cached_file(file_id)
288 | if "error" in file_data:
289 | return file_data
290 | result = get_object_subtree_with_fields(
291 | file_data,
292 | object_id,
293 | include_fields=fields,
294 | depth=depth
295 | )
296 | if "error" in result:
297 | return result
298 | simplified_tree = result["tree"]
299 | page_id = result["page_id"]
300 | final_result = {"tree": simplified_tree}
301 |
302 | try:
303 | image = export_object(
304 | file_id=file_id,
305 | page_id=page_id,
306 | object_id=object_id
307 | )
308 | image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
309 | self.rendered_components[image_id] = image
310 |
311 | # Image URI preferences:
312 | # 1. HTTP server URL if available
313 | # 2. Fallback to MCP resource URI
314 | image_uri = f"render_component://{image_id}"
315 | if hasattr(image, 'http_url'):
316 | final_result["image"] = {
317 | "uri": image.http_url,
318 | "mcp_uri": image_uri,
319 | "format": image.format if hasattr(image, 'format') else "png"
320 | }
321 | else:
322 | final_result["image"] = {
323 | "uri": image_uri,
324 | "format": image.format if hasattr(image, 'format') else "png"
325 | }
326 | except Exception as e:
327 | final_result["image_error"] = str(e)
328 | if format.lower() == "yaml":
329 | try:
330 | import yaml
331 | yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
332 | return {"yaml_result": yaml_result}
333 | except ImportError:
334 | return {"format_error": "YAML format requested but PyYAML package is not installed"}
335 | except Exception as e:
336 | return {"format_error": f"Error formatting as YAML: {str(e)}"}
337 | return final_result
338 | except Exception as e:
339 | return self._handle_api_error(e)
340 | @self.mcp.tool()
341 | def search_object(file_id: str, query: str) -> dict:
342 | """Search for objects within a Penpot file by name.
343 |
344 | Args:
345 | file_id: The ID of the Penpot file to search in
346 | query: Search string (supports regex patterns)
347 | """
348 | try:
349 | file_data = get_cached_file(file_id)
350 | if "error" in file_data:
351 | return file_data
352 | pattern = re.compile(query, re.IGNORECASE)
353 | matches = []
354 | data = file_data.get('data', {})
355 | for page_id, page_data in data.get('pagesIndex', {}).items():
356 | page_name = page_data.get('name', 'Unnamed')
357 | for obj_id, obj_data in page_data.get('objects', {}).items():
358 | obj_name = obj_data.get('name', '')
359 | if pattern.search(obj_name):
360 | matches.append({
361 | 'id': obj_id,
362 | 'name': obj_name,
363 | 'page_id': page_id,
364 | 'page_name': page_name,
365 | 'object_type': obj_data.get('type', 'unknown')
366 | })
367 | return {'objects': matches}
368 | except Exception as e:
369 | return self._handle_api_error(e)
370 | if include_resource_tools:
371 | @self.mcp.tool()
372 | def penpot_schema() -> dict:
373 | """Provide the Penpot API schema as JSON."""
374 | schema_path = os.path.join(config.RESOURCES_PATH, 'penpot-schema.json')
375 | try:
376 | with open(schema_path, 'r') as f:
377 | return json.load(f)
378 | except Exception as e:
379 | return {"error": f"Failed to load schema: {str(e)}"}
380 | @self.mcp.tool()
381 | def penpot_tree_schema() -> dict:
382 | """Provide the Penpot object tree schema as JSON."""
383 | schema_path = os.path.join(config.RESOURCES_PATH, 'penpot-tree-schema.json')
384 | try:
385 | with open(schema_path, 'r') as f:
386 | return json.load(f)
387 | except Exception as e:
388 | return {"error": f"Failed to load tree schema: {str(e)}"}
389 | @self.mcp.tool()
390 | def get_rendered_component(component_id: str) -> Image:
391 | """Return a rendered component image by its ID."""
392 | if component_id in self.rendered_components:
393 | return self.rendered_components[component_id]
394 | raise Exception(f"Component with ID {component_id} not found")
395 | @self.mcp.tool()
396 | def get_cached_files() -> dict:
397 | """List all files currently stored in the cache."""
398 | return self.file_cache.get_all_cached_files()
399 |
400 | def run(self, port=None, debug=None, mode=None):
401 | """
402 | Run the MCP server.
403 |
404 | Args:
405 | port: Port to run on (overrides config) - only used in 'sse' mode
406 | debug: Debug mode (overrides config)
407 | mode: MCP mode ('stdio' or 'sse', overrides config)
408 | """
409 | # Use provided values or fall back to config
410 | debug = debug if debug is not None else config.DEBUG
411 |
412 | # Get mode from parameter, environment variable, or default to stdio
413 | mode = mode or os.environ.get('MODE', 'stdio')
414 |
415 | # Validate mode
416 | if mode not in ['stdio', 'sse']:
417 | print(f"Invalid mode: {mode}. Using stdio mode.")
418 | mode = 'stdio'
419 |
420 | if mode == 'sse':
421 | print(f"Starting Penpot MCP Server on port {port} (debug={debug}, mode={mode})")
422 | else:
423 | print(f"Starting Penpot MCP Server (debug={debug}, mode={mode})")
424 |
425 | # Start HTTP server if enabled and not already running
426 | if config.ENABLE_HTTP_SERVER and self.image_server and not self.image_server.is_running:
427 | try:
428 | self.image_server_url = self.image_server.start()
429 | except Exception as e:
430 | print(f"Warning: Failed to start image server: {str(e)}")
431 |
432 | self.mcp.run(mode)
433 |
434 |
435 | def create_server():
436 | """Create and configure a new server instance."""
437 | # Detect if running in a test environment
438 | is_test_env = 'pytest' in sys.modules
439 | return PenpotMCPServer(test_mode=is_test_env)
440 |
441 |
442 | # Create a global server instance with a standard name for the MCP tool
443 | server = create_server()
444 |
445 |
446 | def main():
447 | """Entry point for the console script."""
448 | parser = argparse.ArgumentParser(description='Run the Penpot MCP Server')
449 | parser.add_argument('--port', type=int, help='Port to run on')
450 | parser.add_argument('--debug', action='store_true', help='Enable debug mode')
451 | parser.add_argument('--mode', choices=['stdio', 'sse'], default=os.environ.get('MODE', 'stdio'),
452 | help='MCP mode (stdio or sse)')
453 |
454 | args = parser.parse_args()
455 | server.run(port=args.port, debug=args.debug, mode=args.mode)
456 |
457 |
458 | if __name__ == "__main__":
459 | main()
460 |
```
--------------------------------------------------------------------------------
/penpot_mcp/api/penpot_api.py:
--------------------------------------------------------------------------------
```python
1 | import argparse
2 | import json
3 | import os
4 | from typing import Any, Dict, List, Optional, Union
5 |
6 | import requests
7 | from dotenv import load_dotenv
8 |
9 |
10 | class CloudFlareError(Exception):
11 | """Exception raised when CloudFlare protection blocks the request."""
12 |
13 | def __init__(self, message: str, status_code: int = None, response_text: str = None):
14 | super().__init__(message)
15 | self.status_code = status_code
16 | self.response_text = response_text
17 |
18 | def __str__(self):
19 | return f"CloudFlare Protection Error: {super().__str__()}"
20 |
21 |
22 | class PenpotAPIError(Exception):
23 | """General exception for Penpot API errors."""
24 |
25 | def __init__(self, message: str, status_code: int = None, response_text: str = None, is_cloudflare: bool = False):
26 | super().__init__(message)
27 | self.status_code = status_code
28 | self.response_text = response_text
29 | self.is_cloudflare = is_cloudflare
30 |
31 |
32 | class PenpotAPI:
33 | def __init__(
34 | self,
35 | base_url: str = None,
36 | debug: bool = False,
37 | email: Optional[str] = None,
38 | password: Optional[str] = None):
39 | # Load environment variables if not already loaded
40 | load_dotenv()
41 |
42 | # Use base_url from parameters if provided, otherwise from environment,
43 | # fallback to default URL
44 | self.base_url = base_url or os.getenv("PENPOT_API_URL", "https://design.penpot.app/api")
45 | self.session = requests.Session()
46 | self.access_token = None
47 | self.debug = debug
48 | self.email = email or os.getenv("PENPOT_USERNAME")
49 | self.password = password or os.getenv("PENPOT_PASSWORD")
50 | self.profile_id = None
51 |
52 | # Set default headers - we'll use different headers at request time
53 | # based on the required content type (JSON vs Transit+JSON)
54 | self.session.headers.update({
55 | "Accept": "application/json, application/transit+json",
56 | "Content-Type": "application/json",
57 | "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
58 | })
59 |
60 | def _is_cloudflare_error(self, response: requests.Response) -> bool:
61 | """Check if the response indicates a CloudFlare error."""
62 | # Check for CloudFlare-specific indicators
63 | cloudflare_indicators = [
64 | 'cloudflare',
65 | 'cf-ray',
66 | 'attention required',
67 | 'checking your browser',
68 | 'challenge',
69 | 'ddos protection',
70 | 'security check',
71 | 'cf-browser-verification',
72 | 'cf-challenge-running',
73 | 'please wait while we are checking your browser',
74 | 'enable cookies and reload the page',
75 | 'this process is automatic'
76 | ]
77 |
78 | # Check response headers for CloudFlare
79 | server_header = response.headers.get('server', '').lower()
80 | cf_ray = response.headers.get('cf-ray')
81 |
82 | if 'cloudflare' in server_header or cf_ray:
83 | return True
84 |
85 | # Check response content for CloudFlare indicators
86 | try:
87 | response_text = response.text.lower()
88 | for indicator in cloudflare_indicators:
89 | if indicator in response_text:
90 | return True
91 | except:
92 | # If we can't read the response text, don't assume it's CloudFlare
93 | pass
94 |
95 | # Check for specific status codes that might indicate CloudFlare blocks
96 | if response.status_code in [403, 429, 503]:
97 | # Additional check for CloudFlare-specific error pages
98 | try:
99 | response_text = response.text.lower()
100 | if any(['cloudflare' in response_text, 'cf-ray' in response_text, 'attention required' in response_text]):
101 | return True
102 | except:
103 | pass
104 |
105 | return False
106 |
107 | def _create_cloudflare_error_message(self, response: requests.Response) -> str:
108 | """Create a user-friendly CloudFlare error message."""
109 | base_message = (
110 | "CloudFlare protection has blocked this request. This is common on penpot.app. "
111 | "To resolve this issue:\\n\\n"
112 | "1. Open your web browser and navigate to https://design.penpot.app\\n"
113 | "2. Log in to your Penpot account\\n"
114 | "3. Complete any CloudFlare human verification challenges if prompted\\n"
115 | "4. Once verified, try your request again\\n\\n"
116 | "The verification typically lasts for a period of time, after which you may need to repeat the process."
117 | )
118 |
119 | if response.status_code:
120 | return f"{base_message}\\n\\nHTTP Status: {response.status_code}"
121 |
122 | return base_message
123 |
124 | def set_access_token(self, token: str):
125 | """Set the auth token for authentication."""
126 | self.access_token = token
127 | # For cookie-based auth, set the auth-token cookie
128 | self.session.cookies.set("auth-token", token)
129 | # Also set Authorization header for APIs that use it
130 | self.session.headers.update({
131 | "Authorization": f"Token {token}"
132 | })
133 |
134 | def login_with_password(
135 | self,
136 | email: Optional[str] = None,
137 | password: Optional[str] = None) -> str:
138 | """
139 | Login with email and password to get an auth token.
140 |
141 | This method uses the same cookie-based auth approach as the export methods.
142 |
143 | Args:
144 | email: Email for Penpot account (if None, will use stored email or PENPOT_USERNAME env var)
145 | password: Password for Penpot account (if None, will use stored password or PENPOT_PASSWORD env var)
146 |
147 | Returns:
148 | Auth token for API calls
149 | """
150 | # Use the export authentication which also extracts profile ID
151 | token = self.login_for_export(email, password)
152 | self.set_access_token(token)
153 | # Profile ID is now extracted during login_for_export, no need to call get_profile
154 | if self.debug and self.profile_id:
155 | print(f"\nProfile ID available: {self.profile_id}")
156 | return token
157 |
158 | def get_profile(self) -> Dict[str, Any]:
159 | """
160 | Get profile information for the current authenticated user.
161 |
162 | Returns:
163 | Dictionary containing profile information, including the profile ID
164 | """
165 | url = f"{self.base_url}/rpc/command/get-profile"
166 |
167 | payload = {} # No parameters needed
168 |
169 | response = self._make_authenticated_request('post', url, json=payload, use_transit=False)
170 |
171 | # Parse and normalize the response
172 | data = response.json()
173 | normalized_data = self._normalize_transit_response(data)
174 |
175 | if self.debug:
176 | print("\nProfile data retrieved:")
177 | print(json.dumps(normalized_data, indent=2)[:200] + "...")
178 |
179 | # Store profile ID for later use
180 | if 'id' in normalized_data:
181 | self.profile_id = normalized_data['id']
182 | if self.debug:
183 | print(f"\nStored profile ID: {self.profile_id}")
184 |
185 | return normalized_data
186 |
187 | def login_for_export(self, email: Optional[str] = None, password: Optional[str] = None) -> str:
188 | """
189 | Login with email and password to get an auth token for export operations.
190 |
191 | This is required for export operations which use a different authentication
192 | mechanism than the standard API access token.
193 |
194 | Args:
195 | email: Email for Penpot account (if None, will use stored email or PENPOT_USERNAME env var)
196 | password: Password for Penpot account (if None, will use stored password or PENPOT_PASSWORD env var)
197 |
198 | Returns:
199 | Auth token extracted from cookies
200 | """
201 | # Use parameters if provided, else use instance variables, else check environment variables
202 | email = email or self.email or os.getenv("PENPOT_USERNAME")
203 | password = password or self.password or os.getenv("PENPOT_PASSWORD")
204 |
205 | if not email or not password:
206 | raise ValueError(
207 | "Email and password are required for export authentication. "
208 | "Please provide them as parameters or set PENPOT_USERNAME and "
209 | "PENPOT_PASSWORD environment variables."
210 | )
211 |
212 | url = f"{self.base_url}/rpc/command/login-with-password"
213 |
214 | # Use Transit+JSON format
215 | payload = {
216 | "~:email": email,
217 | "~:password": password
218 | }
219 |
220 | if self.debug:
221 | print("\nLogin request payload (Transit+JSON format):")
222 | print(json.dumps(payload, indent=2).replace(password, "********"))
223 |
224 | # Create a new session just for this request
225 | login_session = requests.Session()
226 |
227 | # Set headers
228 | headers = {
229 | "Content-Type": "application/transit+json",
230 | "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
231 | }
232 |
233 | response = login_session.post(url, json=payload, headers=headers)
234 | if self.debug and response.status_code != 200:
235 | print(f"\nError response: {response.status_code}")
236 | print(f"Response text: {response.text}")
237 | response.raise_for_status()
238 |
239 | # Extract profile ID from response
240 | try:
241 | # The response is in Transit+JSON array format
242 | data = response.json()
243 | if isinstance(data, list):
244 | # Convert Transit array to dict
245 | transit_dict = {}
246 | i = 1 # Skip the "^ " marker
247 | while i < len(data) - 1:
248 | key = data[i]
249 | value = data[i + 1]
250 | transit_dict[key] = value
251 | i += 2
252 |
253 | # Extract profile ID
254 | if "~:id" in transit_dict:
255 | profile_id = transit_dict["~:id"]
256 | # Remove the ~u prefix for UUID
257 | if isinstance(profile_id, str) and profile_id.startswith("~u"):
258 | profile_id = profile_id[2:]
259 | self.profile_id = profile_id
260 | if self.debug:
261 | print(f"\nExtracted profile ID from login response: {profile_id}")
262 | except Exception as e:
263 | if self.debug:
264 | print(f"\nCouldn't extract profile ID from response: {e}")
265 |
266 | # Also try to extract profile ID from auth-data cookie
267 | if not self.profile_id:
268 | for cookie in login_session.cookies:
269 | if cookie.name == "auth-data":
270 | # Cookie value is like: "profile-id=7ae66c33-6ede-81e2-8006-6a1b4dce3d2b"
271 | if "profile-id=" in cookie.value:
272 | profile_id = cookie.value.split("profile-id=")[1].split(";")[0].strip('"')
273 | self.profile_id = profile_id
274 | if self.debug:
275 | print(f"\nExtracted profile ID from auth-data cookie: {profile_id}")
276 | break
277 |
278 | # Extract auth token from cookies
279 | if 'Set-Cookie' in response.headers:
280 | if self.debug:
281 | print("\nSet-Cookie header found")
282 |
283 | for cookie in login_session.cookies:
284 | if cookie.name == "auth-token":
285 | if self.debug:
286 | print(f"\nAuth token extracted from cookies: {cookie.value[:10]}...")
287 | return cookie.value
288 |
289 | raise ValueError("Auth token not found in response cookies")
290 | else:
291 | # Try to extract from response JSON if available
292 | try:
293 | data = response.json()
294 | if 'auth-token' in data:
295 | return data['auth-token']
296 | except Exception:
297 | pass
298 |
299 | # If we reached here, we couldn't find the token
300 | raise ValueError("Auth token not found in response cookies or JSON body")
301 |
302 | def _make_authenticated_request(self, method: str, url: str, retry_auth: bool = True, **kwargs) -> requests.Response:
303 | """
304 | Make an authenticated request, handling re-auth if needed.
305 |
306 | This internal method handles lazy authentication when a request
307 | fails due to authentication issues, using the same cookie-based
308 | approach as the export methods.
309 |
310 | Args:
311 | method: HTTP method (post, get, etc.)
312 | url: URL to make the request to
313 | **kwargs: Additional arguments to pass to requests
314 |
315 | Returns:
316 | The response object
317 | """
318 | # If we don't have a token yet but have credentials, login first
319 | if not self.access_token and self.email and self.password:
320 | if self.debug:
321 | print("\nNo access token set, logging in with credentials...")
322 | self.login_with_password()
323 |
324 | # Set up headers
325 | headers = kwargs.get('headers', {})
326 | if 'headers' in kwargs:
327 | del kwargs['headers']
328 |
329 | # Use Transit+JSON format for API calls (required by Penpot)
330 | use_transit = kwargs.pop('use_transit', True)
331 |
332 | if use_transit:
333 | headers['Content-Type'] = 'application/transit+json'
334 | headers['Accept'] = 'application/transit+json'
335 |
336 | # Convert payload to Transit+JSON format if present
337 | if 'json' in kwargs and kwargs['json']:
338 | payload = kwargs['json']
339 |
340 | # Only transform if not already in Transit format
341 | if not any(isinstance(k, str) and k.startswith('~:') for k in payload.keys()):
342 | transit_payload = {}
343 |
344 | # Add cmd if not present
345 | if 'cmd' not in payload and '~:cmd' not in payload:
346 | # Extract command from URL
347 | cmd = url.split('/')[-1]
348 | transit_payload['~:cmd'] = f"~:{cmd}"
349 |
350 | # Convert standard JSON to Transit+JSON format
351 | for key, value in payload.items():
352 | # Skip command if already added
353 | if key == 'cmd':
354 | continue
355 |
356 | transit_key = f"~:{key}" if not key.startswith('~:') else key
357 |
358 | # Handle special UUID conversion for IDs
359 | if isinstance(value, str) and ('-' in value) and len(value) > 30:
360 | transit_value = f"~u{value}"
361 | else:
362 | transit_value = value
363 |
364 | transit_payload[transit_key] = transit_value
365 |
366 | if self.debug:
367 | print("\nConverted payload to Transit+JSON format:")
368 | print(f"Original: {payload}")
369 | print(f"Transit: {transit_payload}")
370 |
371 | kwargs['json'] = transit_payload
372 | else:
373 | headers['Content-Type'] = 'application/json'
374 | headers['Accept'] = 'application/json'
375 |
376 | # Ensure the Authorization header is set if we have a token
377 | if self.access_token:
378 | headers['Authorization'] = f"Token {self.access_token}"
379 |
380 | # Combine with session headers
381 | combined_headers = {**self.session.headers, **headers}
382 |
383 | # Make the request
384 | try:
385 | response = getattr(self.session, method)(url, headers=combined_headers, **kwargs)
386 |
387 | if self.debug:
388 | print(f"\nRequest to: {url}")
389 | print(f"Method: {method}")
390 | print(f"Headers: {combined_headers}")
391 | if 'json' in kwargs:
392 | print(f"Payload: {json.dumps(kwargs['json'], indent=2)}")
393 | print(f"Response status: {response.status_code}")
394 |
395 | response.raise_for_status()
396 | return response
397 |
398 | except requests.HTTPError as e:
399 | # Check for CloudFlare errors first
400 | if self._is_cloudflare_error(e.response):
401 | cloudflare_message = self._create_cloudflare_error_message(e.response)
402 | raise CloudFlareError(cloudflare_message, e.response.status_code, e.response.text)
403 |
404 | # Handle authentication errors
405 | if e.response.status_code in (401, 403) and self.email and self.password and retry_auth:
406 | # Special case: don't retry auth for get-profile to avoid infinite loops
407 | if url.endswith('/get-profile'):
408 | raise
409 |
410 | if self.debug:
411 | print("\nAuthentication failed. Trying to re-login...")
412 |
413 | # Re-login and update token
414 | self.login_with_password()
415 |
416 | # Update headers with new token
417 | headers['Authorization'] = f"Token {self.access_token}"
418 | combined_headers = {**self.session.headers, **headers}
419 |
420 | # Retry the request with the new token (but don't retry auth again)
421 | response = getattr(self.session, method)(url, headers=combined_headers, **kwargs)
422 | response.raise_for_status()
423 | return response
424 | else:
425 | # Re-raise other errors
426 | raise
427 | except requests.RequestException as e:
428 | # Handle other request exceptions (connection errors, timeouts, etc.)
429 | # Check if we have a response to analyze
430 | if hasattr(e, 'response') and e.response is not None:
431 | if self._is_cloudflare_error(e.response):
432 | cloudflare_message = self._create_cloudflare_error_message(e.response)
433 | raise CloudFlareError(cloudflare_message, e.response.status_code, e.response.text)
434 | # Re-raise if not a CloudFlare error
435 | raise
436 |
437 | def _normalize_transit_response(self, data: Union[Dict, List, Any]) -> Union[Dict, List, Any]:
438 | """
439 | Normalize a Transit+JSON response to a more usable format.
440 |
441 | This recursively processes the response data, handling special Transit types
442 | like UUIDs, keywords, and nested structures.
443 |
444 | Args:
445 | data: The data to normalize, can be a dict, list, or other value
446 |
447 | Returns:
448 | Normalized data
449 | """
450 | if isinstance(data, dict):
451 | # Normalize dictionary
452 | result = {}
453 | for key, value in data.items():
454 | # Convert transit keywords in keys (~:key -> key)
455 | norm_key = key.replace(
456 | '~:', '') if isinstance(
457 | key, str) and key.startswith('~:') else key
458 | # Recursively normalize values
459 | result[norm_key] = self._normalize_transit_response(value)
460 | return result
461 | elif isinstance(data, list):
462 | # Normalize list items
463 | return [self._normalize_transit_response(item) for item in data]
464 | elif isinstance(data, str) and data.startswith('~u'):
465 | # Convert Transit UUIDs (~u123-456 -> 123-456)
466 | return data[2:]
467 | else:
468 | # Return other types as-is
469 | return data
470 |
471 | def list_projects(self) -> Dict[str, Any]:
472 | """
473 | List all available projects for the authenticated user.
474 |
475 | Returns:
476 | Dictionary containing project information
477 | """
478 | url = f"{self.base_url}/rpc/command/get-all-projects"
479 |
480 | payload = {} # No parameters required
481 |
482 | response = self._make_authenticated_request('post', url, json=payload, use_transit=False)
483 |
484 | if self.debug:
485 | content_type = response.headers.get('Content-Type', '')
486 | print(f"\nResponse content type: {content_type}")
487 | print(f"Response preview: {response.text[:100]}...")
488 |
489 | # Parse JSON
490 | data = response.json()
491 |
492 | if self.debug:
493 | print("\nData preview:")
494 | print(json.dumps(data, indent=2)[:200] + "...")
495 |
496 | return data
497 |
498 | def get_project(self, project_id: str) -> Optional[Dict[str, Any]]:
499 | """
500 | Get details for a specific project.
501 |
502 | Args:
503 | project_id: The ID of the project to retrieve
504 |
505 | Returns:
506 | Dictionary containing project information
507 | """
508 | # First get all projects
509 | projects = self.list_projects()
510 |
511 | # Find the specific project by ID
512 | for project in projects:
513 | if project.get('id') == project_id:
514 | return project
515 |
516 | return None
517 |
518 | def get_project_files(self, project_id: str) -> List[Dict[str, Any]]:
519 | """
520 | Get all files for a specific project.
521 |
522 | Args:
523 | project_id: The ID of the project
524 |
525 | Returns:
526 | List of file information dictionaries
527 | """
528 | url = f"{self.base_url}/rpc/command/get-project-files"
529 |
530 | payload = {
531 | "project-id": project_id
532 | }
533 |
534 | response = self._make_authenticated_request('post', url, json=payload, use_transit=False)
535 |
536 | # Parse JSON
537 | files = response.json()
538 | return files
539 |
540 | def get_file(self, file_id: str, save_data: bool = False,
541 | save_raw_response: bool = False) -> Dict[str, Any]:
542 | """
543 | Get details for a specific file.
544 |
545 | Args:
546 | file_id: The ID of the file to retrieve
547 | features: List of features to include in the response
548 | project_id: Optional project ID if known
549 | save_data: Whether to save the data to a file
550 | save_raw_response: Whether to save the raw response
551 |
552 | Returns:
553 | Dictionary containing file information
554 | """
555 | url = f"{self.base_url}/rpc/command/get-file"
556 |
557 | payload = {
558 | "id": file_id,
559 | }
560 |
561 | response = self._make_authenticated_request('post', url, json=payload, use_transit=False)
562 |
563 | # Save raw response if requested
564 | if save_raw_response:
565 | raw_filename = f"{file_id}_raw_response.json"
566 | with open(raw_filename, 'w') as f:
567 | f.write(response.text)
568 | if self.debug:
569 | print(f"\nSaved raw response to {raw_filename}")
570 |
571 | # Parse JSON
572 | data = response.json()
573 |
574 | # Save normalized data if requested
575 | if save_data:
576 | filename = f"{file_id}.json"
577 | with open(filename, 'w') as f:
578 | json.dump(data, f, indent=2)
579 | if self.debug:
580 | print(f"\nSaved file data to {filename}")
581 |
582 | return data
583 |
584 | def create_export(self, file_id: str, page_id: str, object_id: str,
585 | export_type: str = "png", scale: int = 1,
586 | email: Optional[str] = None, password: Optional[str] = None,
587 | profile_id: Optional[str] = None):
588 | """
589 | Create an export job for a Penpot object.
590 |
591 | Args:
592 | file_id: The file ID
593 | page_id: The page ID
594 | object_id: The object ID to export
595 | export_type: Type of export (png, svg, pdf)
596 | scale: Scale factor for the export
597 | name: Name for the export
598 | suffix: Suffix to add to the export name
599 | email: Email for authentication (if different from instance)
600 | password: Password for authentication (if different from instance)
601 | profile_id: Optional profile ID (if not provided, will be fetched automatically)
602 |
603 | Returns:
604 | Export resource ID
605 | """
606 | # This uses the cookie auth approach, which requires login
607 | token = self.login_for_export(email, password)
608 |
609 | # If profile_id is not provided, get it from instance variable
610 | if not profile_id:
611 | profile_id = self.profile_id
612 |
613 | if not profile_id:
614 | raise ValueError("Profile ID not available. It should be automatically extracted during login.")
615 |
616 | # Build the URL for export creation
617 | url = f"{self.base_url}/export"
618 |
619 | # Set up the data for the export
620 | payload = {
621 | "~:wait": True,
622 | "~:exports": [
623 | {"~:type": f"~:{export_type}",
624 | "~:suffix": "",
625 | "~:scale": scale,
626 | "~:page-id": f"~u{page_id}",
627 | "~:file-id": f"~u{file_id}",
628 | "~:name": "",
629 | "~:object-id": f"~u{object_id}"}
630 | ],
631 | "~:profile-id": f"~u{profile_id}",
632 | "~:cmd": "~:export-shapes"
633 | }
634 |
635 | if self.debug:
636 | print("\nCreating export with parameters:")
637 | print(json.dumps(payload, indent=2))
638 |
639 | # Create a session with the auth token
640 | export_session = requests.Session()
641 | export_session.cookies.set("auth-token", token)
642 |
643 | headers = {
644 | "Content-Type": "application/transit+json",
645 | "Accept": "application/transit+json",
646 | "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
647 | }
648 |
649 | # Make the request
650 | response = export_session.post(url, json=payload, headers=headers)
651 |
652 | if self.debug and response.status_code != 200:
653 | print(f"\nError response: {response.status_code}")
654 | print(f"Response text: {response.text}")
655 |
656 | response.raise_for_status()
657 |
658 | # Parse the response
659 | data = response.json()
660 |
661 | if self.debug:
662 | print("\nExport created successfully")
663 | print(f"Response: {json.dumps(data, indent=2)}")
664 |
665 | # Extract and return the resource ID
666 | resource_id = data.get("~:id")
667 | if not resource_id:
668 | raise ValueError("Resource ID not found in response")
669 |
670 | return resource_id
671 |
672 | def get_export_resource(self,
673 | resource_id: str,
674 | save_to_file: Optional[str] = None,
675 | email: Optional[str] = None,
676 | password: Optional[str] = None) -> Union[bytes,
677 | str]:
678 | """
679 | Download an export resource by ID.
680 |
681 | Args:
682 | resource_id: The resource ID from create_export
683 | save_to_file: Path to save the file (if None, returns the content)
684 | email: Email for authentication (if different from instance)
685 | password: Password for authentication (if different from instance)
686 |
687 | Returns:
688 | Either the file content as bytes, or the path to the saved file
689 | """
690 | # This uses the cookie auth approach, which requires login
691 | token = self.login_for_export(email, password)
692 |
693 | # Build the URL for the resource
694 | url = f"{self.base_url}/export"
695 |
696 | payload = {
697 | "~:wait": False,
698 | "~:cmd": "~:get-resource",
699 | "~:id": resource_id
700 | }
701 | headers = {
702 | "Content-Type": "application/transit+json",
703 | "Accept": "*/*",
704 | "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
705 | }
706 | if self.debug:
707 | print(f"\nFetching export resource: {url}")
708 |
709 | # Create a session with the auth token
710 | export_session = requests.Session()
711 | export_session.cookies.set("auth-token", token)
712 |
713 | # Make the request
714 | response = export_session.post(url, json=payload, headers=headers)
715 |
716 | if self.debug and response.status_code != 200:
717 | print(f"\nError response: {response.status_code}")
718 | print(f"Response headers: {response.headers}")
719 |
720 | response.raise_for_status()
721 |
722 | # Get the content type
723 | content_type = response.headers.get('Content-Type', '')
724 |
725 | if self.debug:
726 | print(f"\nResource fetched successfully")
727 | print(f"Content-Type: {content_type}")
728 | print(f"Content length: {len(response.content)} bytes")
729 |
730 | # Determine filename if saving to file
731 | if save_to_file:
732 | if os.path.isdir(save_to_file):
733 | # If save_to_file is a directory, we need to figure out the filename
734 | filename = None
735 |
736 | # Try to get filename from Content-Disposition header
737 | content_disp = response.headers.get('Content-Disposition', '')
738 | if 'filename=' in content_disp:
739 | filename = content_disp.split('filename=')[1].strip('"\'')
740 |
741 | # If we couldn't get a filename, use the resource_id with an extension
742 | if not filename:
743 | ext = content_type.split('/')[-1].split(';')[0]
744 | if ext in ('jpeg', 'png', 'pdf', 'svg+xml'):
745 | if ext == 'svg+xml':
746 | ext = 'svg'
747 | filename = f"{resource_id}.{ext}"
748 | else:
749 | filename = f"{resource_id}"
750 |
751 | save_path = os.path.join(save_to_file, filename)
752 | else:
753 | # Use the provided path directly
754 | save_path = save_to_file
755 |
756 | # Ensure the directory exists
757 | os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True)
758 |
759 | # Save the content to file
760 | with open(save_path, 'wb') as f:
761 | f.write(response.content)
762 |
763 | if self.debug:
764 | print(f"\nSaved resource to {save_path}")
765 |
766 | return save_path
767 | else:
768 | # Return the content
769 | return response.content
770 |
771 | def export_and_download(self, file_id: str, page_id: str, object_id: str,
772 | save_to_file: Optional[str] = None, export_type: str = "png",
773 | scale: int = 1, name: str = "Board", suffix: str = "",
774 | email: Optional[str] = None, password: Optional[str] = None,
775 | profile_id: Optional[str] = None) -> Union[bytes, str]:
776 | """
777 | Create and download an export in one step.
778 |
779 | This is a convenience method that combines create_export and get_export_resource.
780 |
781 | Args:
782 | file_id: The file ID
783 | page_id: The page ID
784 | object_id: The object ID to export
785 | save_to_file: Path to save the file (if None, returns the content)
786 | export_type: Type of export (png, svg, pdf)
787 | scale: Scale factor for the export
788 | name: Name for the export
789 | suffix: Suffix to add to the export name
790 | email: Email for authentication (if different from instance)
791 | password: Password for authentication (if different from instance)
792 | profile_id: Optional profile ID (if not provided, will be fetched automatically)
793 |
794 | Returns:
795 | Either the file content as bytes, or the path to the saved file
796 | """
797 | # Create the export
798 | resource_id = self.create_export(
799 | file_id=file_id,
800 | page_id=page_id,
801 | object_id=object_id,
802 | export_type=export_type,
803 | scale=scale,
804 | email=email,
805 | password=password,
806 | profile_id=profile_id
807 | )
808 |
809 | # Download the resource
810 | return self.get_export_resource(
811 | resource_id=resource_id,
812 | save_to_file=save_to_file,
813 | email=email,
814 | password=password
815 | )
816 |
817 | def extract_components(self, file_data: Dict[str, Any]) -> Dict[str, Any]:
818 | """
819 | Extract components from file data.
820 |
821 | This processes a file's data to extract and normalize component information.
822 |
823 | Args:
824 | file_data: The file data from get_file
825 |
826 | Returns:
827 | Dictionary containing components information
828 | """
829 | components = {}
830 | components_index = file_data.get('data', {}).get('componentsIndex', {})
831 |
832 | for component_id, component_data in components_index.items():
833 | # Extract basic component info
834 | component = {
835 | 'id': component_id,
836 | 'name': component_data.get('name', 'Unnamed'),
837 | 'path': component_data.get('path', []),
838 | 'shape': component_data.get('shape', ''),
839 | 'fileId': component_data.get('fileId', file_data.get('id')),
840 | 'created': component_data.get('created'),
841 | 'modified': component_data.get('modified')
842 | }
843 |
844 | # Add the component to our collection
845 | components[component_id] = component
846 |
847 | return {'components': components}
848 |
849 | def analyze_file_structure(self, file_data: Dict[str, Any]) -> Dict[str, Any]:
850 | """
851 | Analyze file structure and return summary information.
852 |
853 | Args:
854 | file_data: The file data from get_file
855 |
856 | Returns:
857 | Dictionary containing analysis information
858 | """
859 | data = file_data.get('data', {})
860 |
861 | # Count pages
862 | pages = data.get('pagesIndex', {})
863 | page_count = len(pages)
864 |
865 | # Count objects by type
866 | object_types = {}
867 | total_objects = 0
868 |
869 | for page_id, page_data in pages.items():
870 | objects = page_data.get('objects', {})
871 | total_objects += len(objects)
872 |
873 | for obj_id, obj_data in objects.items():
874 | obj_type = obj_data.get('type', 'unknown')
875 | object_types[obj_type] = object_types.get(obj_type, 0) + 1
876 |
877 | # Count components
878 | components = data.get('componentsIndex', {})
879 | component_count = len(components)
880 |
881 | # Count colors, typographies, etc.
882 | colors = data.get('colorsIndex', {})
883 | color_count = len(colors)
884 |
885 | typographies = data.get('typographiesIndex', {})
886 | typography_count = len(typographies)
887 |
888 | return {
889 | 'pageCount': page_count,
890 | 'objectCount': total_objects,
891 | 'objectTypes': object_types,
892 | 'componentCount': component_count,
893 | 'colorCount': color_count,
894 | 'typographyCount': typography_count,
895 | 'fileName': file_data.get('name', 'Unknown'),
896 | 'fileId': file_data.get('id')
897 | }
898 |
899 |
900 | def main():
901 | # Set up argument parser
902 | parser = argparse.ArgumentParser(description='Penpot API Tool')
903 | parser.add_argument('--debug', action='store_true', help='Enable debug output')
904 |
905 | # Create subparsers for different commands
906 | subparsers = parser.add_subparsers(dest='command', help='Command to run')
907 |
908 | # List projects command
909 | list_parser = subparsers.add_parser('list-projects', help='List all projects')
910 |
911 | # Get project command
912 | project_parser = subparsers.add_parser('get-project', help='Get project details')
913 | project_parser.add_argument('--id', required=True, help='Project ID')
914 |
915 | # List files command
916 | files_parser = subparsers.add_parser('list-files', help='List files in a project')
917 | files_parser.add_argument('--project-id', required=True, help='Project ID')
918 |
919 | # Get file command
920 | file_parser = subparsers.add_parser('get-file', help='Get file details')
921 | file_parser.add_argument('--file-id', required=True, help='File ID')
922 | file_parser.add_argument('--save', action='store_true', help='Save file data to JSON')
923 |
924 | # Export command
925 | export_parser = subparsers.add_parser('export', help='Export an object')
926 | export_parser.add_argument(
927 | '--profile-id',
928 | required=False,
929 | help='Profile ID (optional, will be fetched automatically if not provided)')
930 | export_parser.add_argument('--file-id', required=True, help='File ID')
931 | export_parser.add_argument('--page-id', required=True, help='Page ID')
932 | export_parser.add_argument('--object-id', required=True, help='Object ID')
933 | export_parser.add_argument(
934 | '--type',
935 | default='png',
936 | choices=[
937 | 'png',
938 | 'svg',
939 | 'pdf'],
940 | help='Export type')
941 | export_parser.add_argument('--scale', type=int, default=1, help='Scale factor')
942 | export_parser.add_argument('--output', required=True, help='Output file path')
943 |
944 | # Parse arguments
945 | args = parser.parse_args()
946 |
947 | # Create API client
948 | api = PenpotAPI(debug=args.debug)
949 |
950 | # Handle different commands
951 | if args.command == 'list-projects':
952 | projects = api.list_projects()
953 | print(f"Found {len(projects)} projects:")
954 | for project in projects:
955 | print(f"- {project.get('name')} - {project.get('teamName')} (ID: {project.get('id')})")
956 |
957 | elif args.command == 'get-project':
958 | project = api.get_project(args.id)
959 | if project:
960 | print(f"Project: {project.get('name')}")
961 | print(json.dumps(project, indent=2))
962 | else:
963 | print(f"Project not found: {args.id}")
964 |
965 | elif args.command == 'list-files':
966 | files = api.get_project_files(args.project_id)
967 | print(f"Found {len(files)} files:")
968 | for file in files:
969 | print(f"- {file.get('name')} (ID: {file.get('id')})")
970 |
971 | elif args.command == 'get-file':
972 | file_data = api.get_file(args.file_id, save_data=args.save)
973 | print(f"File: {file_data.get('name')}")
974 | if args.save:
975 | print(f"Data saved to {args.file_id}.json")
976 | else:
977 | print("File metadata:")
978 | print(json.dumps({k: v for k, v in file_data.items() if k != 'data'}, indent=2))
979 |
980 | elif args.command == 'export':
981 | output_path = api.export_and_download(
982 | file_id=args.file_id,
983 | page_id=args.page_id,
984 | object_id=args.object_id,
985 | export_type=args.type,
986 | scale=args.scale,
987 | save_to_file=args.output,
988 | profile_id=args.profile_id
989 | )
990 | print(f"Exported to: {output_path}")
991 | else:
992 | parser.print_help()
993 |
994 |
995 | if __name__ == '__main__':
996 | main()
997 |
```
--------------------------------------------------------------------------------
/tests/test_mcp_server.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the MCP server module."""
2 |
3 | import hashlib
4 | import json
5 | import os
6 | from unittest.mock import MagicMock, mock_open, patch
7 |
8 | import pytest
9 | import yaml
10 |
11 | from penpot_mcp.server.mcp_server import PenpotMCPServer, create_server
12 |
13 |
14 | def test_server_initialization():
15 | """Test server initialization."""
16 | server = PenpotMCPServer(name="Test Server", test_mode=True)
17 |
18 | # Check that the server has the expected properties
19 | assert server.mcp is not None
20 | assert server.api is not None
21 | assert hasattr(server, '_register_resources')
22 | assert hasattr(server, '_register_tools')
23 | assert hasattr(server, 'run')
24 |
25 |
26 | def test_server_info_resource():
27 | """Test the server_info resource handler function directly."""
28 | # Since we can't easily access the registered resource from FastMCP,
29 | # we'll implement it here based on the implementation in mcp_server.py
30 | def server_info():
31 | from penpot_mcp.utils import config
32 | return {
33 | "status": "online",
34 | "name": "Penpot MCP Server",
35 | "description": "Model Context Provider for Penpot",
36 | "api_url": config.PENPOT_API_URL
37 | }
38 |
39 | # Call the function
40 | result = server_info()
41 |
42 | # Check the result
43 | assert isinstance(result, dict)
44 | assert "status" in result
45 | assert result["status"] == "online"
46 | assert "name" in result
47 | assert "description" in result
48 | assert "api_url" in result
49 |
50 |
51 | def test_list_projects_tool_handler(mock_penpot_api):
52 | """Test the list_projects tool handler directly."""
53 | # Create a callable that matches what would be registered
54 | def list_projects():
55 | try:
56 | projects = mock_penpot_api.list_projects()
57 | return {"projects": projects}
58 | except Exception as e:
59 | return {"error": str(e)}
60 |
61 | # Call the handler
62 | result = list_projects()
63 |
64 | # Check the result
65 | assert isinstance(result, dict)
66 | assert "projects" in result
67 | assert len(result["projects"]) == 2
68 | assert result["projects"][0]["id"] == "project1"
69 | assert result["projects"][1]["id"] == "project2"
70 |
71 | # Verify API was called
72 | mock_penpot_api.list_projects.assert_called_once()
73 |
74 |
75 | def test_get_project_files_tool_handler(mock_penpot_api):
76 | """Test the get_project_files tool handler directly."""
77 | # Create a callable that matches what would be registered
78 | def get_project_files(project_id):
79 | try:
80 | files = mock_penpot_api.get_project_files(project_id)
81 | return {"files": files}
82 | except Exception as e:
83 | return {"error": str(e)}
84 |
85 | # Call the handler with a project ID
86 | result = get_project_files("project1")
87 |
88 | # Check the result
89 | assert isinstance(result, dict)
90 | assert "files" in result
91 | assert len(result["files"]) == 2
92 | assert result["files"][0]["id"] == "file1"
93 | assert result["files"][1]["id"] == "file2"
94 |
95 | # Verify API was called with correct parameters
96 | mock_penpot_api.get_project_files.assert_called_once_with("project1")
97 |
98 |
99 | def test_get_file_tool_handler(mock_penpot_api):
100 | """Test the get_file tool handler directly."""
101 | # Create a callable that matches what would be registered
102 | def get_file(file_id):
103 | try:
104 | file_data = mock_penpot_api.get_file(file_id=file_id)
105 | return file_data
106 | except Exception as e:
107 | return {"error": str(e)}
108 |
109 | # Call the handler with a file ID
110 | result = get_file("file1")
111 |
112 | # Check the result
113 | assert isinstance(result, dict)
114 | assert result["id"] == "file1"
115 | assert result["name"] == "Test File"
116 | assert "data" in result
117 | assert "pages" in result["data"]
118 |
119 | # Verify API was called with correct parameters
120 | mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
121 |
122 |
123 | @patch('os.path.join')
124 | @patch('builtins.open', new_callable=mock_open, read_data='{"test": "schema"}')
125 | def test_penpot_schema_resource_handler(mock_file_open, mock_join):
126 | """Test the schema resource handler directly."""
127 | # Setup the mock join to return a predictable path
128 | mock_join.return_value = '/mock/path/to/penpot-schema.json'
129 |
130 | # Create a callable that matches what would be registered
131 | def penpot_schema():
132 | from penpot_mcp.utils import config
133 | schema_path = os.path.join(config.RESOURCES_PATH, 'penpot-schema.json')
134 | try:
135 | with open(schema_path, 'r') as f:
136 | return json.load(f)
137 | except Exception as e:
138 | return {"error": f"Failed to load schema: {str(e)}"}
139 |
140 | # Call the handler
141 | result = penpot_schema()
142 |
143 | # Check result matches our mocked file content
144 | assert isinstance(result, dict)
145 | assert "test" in result
146 | assert result["test"] == "schema"
147 |
148 | # Verify file was opened
149 | mock_file_open.assert_called_once_with('/mock/path/to/penpot-schema.json', 'r')
150 |
151 |
152 | @patch('os.path.join')
153 | @patch('builtins.open', new_callable=mock_open, read_data='{"test": "tree-schema"}')
154 | def test_penpot_tree_schema_resource_handler(mock_file_open, mock_join):
155 | """Test the tree schema resource handler directly."""
156 | # Setup the mock join to return a predictable path
157 | mock_join.return_value = '/mock/path/to/penpot-tree-schema.json'
158 |
159 | # Create a callable that matches what would be registered
160 | def penpot_tree_schema():
161 | from penpot_mcp.utils import config
162 | schema_path = os.path.join(config.RESOURCES_PATH, 'penpot-tree-schema.json')
163 | try:
164 | with open(schema_path, 'r') as f:
165 | return json.load(f)
166 | except Exception as e:
167 | return {"error": f"Failed to load tree schema: {str(e)}"}
168 |
169 | # Call the handler
170 | result = penpot_tree_schema()
171 |
172 | # Check result matches our mocked file content
173 | assert isinstance(result, dict)
174 | assert "test" in result
175 | assert result["test"] == "tree-schema"
176 |
177 | # Verify file was opened
178 | mock_file_open.assert_called_once_with('/mock/path/to/penpot-tree-schema.json', 'r')
179 |
180 |
181 | def test_create_server():
182 | """Test the create_server function."""
183 | with patch('penpot_mcp.server.mcp_server.PenpotMCPServer') as mock_server_class:
184 | mock_server_instance = MagicMock()
185 | mock_server_class.return_value = mock_server_instance
186 |
187 | # Test that create_server passes test_mode=True when in test environment
188 | with patch('penpot_mcp.server.mcp_server.sys.modules', {'pytest': True}):
189 | server = create_server()
190 | mock_server_class.assert_called_once_with(test_mode=True)
191 | assert server == mock_server_instance
192 |
193 |
194 | @patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
195 | def test_get_object_tree_basic(mock_get_subtree, mock_penpot_api):
196 | """Test the get_object_tree tool handler with basic parameters."""
197 | # Setup the mock get_object_subtree_with_fields function
198 | mock_get_subtree.return_value = {
199 | "tree": {
200 | "id": "obj1",
201 | "type": "frame",
202 | "name": "Test Object",
203 | "children": []
204 | },
205 | "page_id": "page1"
206 | }
207 |
208 | # Setup the export_object mock for the included image
209 | export_object_mock = MagicMock()
210 | export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
211 |
212 | # Create a callable that matches what would be registered
213 | def get_object_tree(
214 | file_id: str,
215 | object_id: str,
216 | fields: list, # Now required parameter
217 | depth: int = -1,
218 | format: str = "json"
219 | ):
220 | try:
221 | # Get the file data
222 | file_data = mock_penpot_api.get_file(file_id=file_id)
223 |
224 | # Use the mocked utility function
225 | result = mock_get_subtree(
226 | file_data,
227 | object_id,
228 | include_fields=fields,
229 | depth=depth
230 | )
231 |
232 | # Check if an error occurred
233 | if "error" in result:
234 | return result
235 |
236 | # Extract the tree and page_id
237 | simplified_tree = result["tree"]
238 | page_id = result["page_id"]
239 |
240 | # Prepare the result dictionary
241 | final_result = {"tree": simplified_tree}
242 |
243 | # Always include image (no longer optional)
244 | try:
245 | image = export_object_mock(
246 | file_id=file_id,
247 | page_id=page_id,
248 | object_id=object_id
249 | )
250 | # New format: URI-based instead of base64 data
251 | image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
252 | image_uri = f"render_component://{image_id}"
253 | final_result["image"] = {
254 | "uri": image_uri,
255 | "format": image.format if hasattr(image, 'format') else "png"
256 | }
257 | except Exception as e:
258 | final_result["image_error"] = str(e)
259 |
260 | # Format the tree as YAML if requested
261 | if format.lower() == "yaml":
262 | try:
263 | # Convert the entire result to YAML, including the image if present
264 | yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
265 | return {"yaml_result": yaml_result}
266 | except Exception as e:
267 | return {"format_error": f"Error formatting as YAML: {str(e)}"}
268 |
269 | # Return the JSON format result
270 | return final_result
271 | except Exception as e:
272 | return {"error": str(e)}
273 |
274 | # Call the handler with basic parameters - fields is now required
275 | result = get_object_tree(
276 | file_id="file1",
277 | object_id="obj1",
278 | fields=["id", "type", "name"] # Required parameter
279 | )
280 |
281 | # Check the result
282 | assert isinstance(result, dict)
283 | assert "tree" in result
284 | assert result["tree"]["id"] == "obj1"
285 | assert result["tree"]["type"] == "frame"
286 | assert result["tree"]["name"] == "Test Object"
287 |
288 | # Check that image is always included
289 | assert "image" in result
290 | assert "uri" in result["image"]
291 | assert result["image"]["uri"].startswith("render_component://")
292 | assert result["image"]["format"] == "png"
293 |
294 | # Verify mocks were called with correct parameters
295 | mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
296 | mock_get_subtree.assert_called_once_with(
297 | mock_penpot_api.get_file.return_value,
298 | "obj1",
299 | include_fields=["id", "type", "name"],
300 | depth=-1
301 | )
302 |
303 |
304 | @patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
305 | def test_get_object_tree_with_fields_and_depth(mock_get_subtree, mock_penpot_api):
306 | """Test the get_object_tree tool handler with custom field list and depth."""
307 | # Setup the mock get_object_subtree_with_fields function
308 | mock_get_subtree.return_value = {
309 | "tree": {
310 | "id": "obj1",
311 | "name": "Test Object", # Only id and name fields included
312 | "children": []
313 | },
314 | "page_id": "page1"
315 | }
316 |
317 | # Setup the export_object mock for the included image
318 | export_object_mock = MagicMock()
319 | export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
320 |
321 | # Create a callable that matches what would be registered
322 | def get_object_tree(
323 | file_id: str,
324 | object_id: str,
325 | fields: list, # Now required parameter
326 | depth: int = -1,
327 | format: str = "json"
328 | ):
329 | try:
330 | # Get the file data
331 | file_data = mock_penpot_api.get_file(file_id=file_id)
332 |
333 | # Use the mocked utility function
334 | result = mock_get_subtree(
335 | file_data,
336 | object_id,
337 | include_fields=fields,
338 | depth=depth
339 | )
340 |
341 | # Extract the tree and page_id
342 | simplified_tree = result["tree"]
343 | page_id = result["page_id"]
344 |
345 | # Prepare the result dictionary
346 | final_result = {"tree": simplified_tree}
347 |
348 | # Always include image (no longer optional)
349 | try:
350 | image = export_object_mock(
351 | file_id=file_id,
352 | page_id=page_id,
353 | object_id=object_id
354 | )
355 | # New format: URI-based instead of base64 data
356 | image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
357 | image_uri = f"render_component://{image_id}"
358 | final_result["image"] = {
359 | "uri": image_uri,
360 | "format": image.format if hasattr(image, 'format') else "png"
361 | }
362 | except Exception as e:
363 | final_result["image_error"] = str(e)
364 |
365 | # Format the tree as YAML if requested
366 | if format.lower() == "yaml":
367 | try:
368 | # Convert the entire result to YAML, including the image if present
369 | yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
370 | return {"yaml_result": yaml_result}
371 | except Exception as e:
372 | return {"format_error": f"Error formatting as YAML: {str(e)}"}
373 |
374 | # Return the JSON format result
375 | return final_result
376 | except Exception as e:
377 | return {"error": str(e)}
378 |
379 | # Call the handler with custom fields and depth
380 | result = get_object_tree(
381 | file_id="file1",
382 | object_id="obj1",
383 | fields=["id", "name"], # Updated parameter name
384 | depth=2
385 | )
386 |
387 | # Check the result
388 | assert isinstance(result, dict)
389 | assert "tree" in result
390 | assert result["tree"]["id"] == "obj1"
391 | assert result["tree"]["name"] == "Test Object"
392 | assert "type" not in result["tree"] # Type field should not be included
393 |
394 | # Check that image is always included
395 | assert "image" in result
396 | assert "uri" in result["image"]
397 | assert result["image"]["uri"].startswith("render_component://")
398 | assert result["image"]["format"] == "png"
399 |
400 | # Verify mocks were called with correct parameters
401 | mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
402 | mock_get_subtree.assert_called_once_with(
403 | mock_penpot_api.get_file.return_value,
404 | "obj1",
405 | include_fields=["id", "name"],
406 | depth=2
407 | )
408 |
409 |
410 | @patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
411 | def test_get_object_tree_with_yaml_format(mock_get_subtree, mock_penpot_api):
412 | """Test the get_object_tree tool handler with YAML format output."""
413 | # Setup the mock get_object_subtree_with_fields function
414 | mock_get_subtree.return_value = {
415 | "tree": {
416 | "id": "obj1",
417 | "type": "frame",
418 | "name": "Test Object",
419 | "children": [
420 | {
421 | "id": "child1",
422 | "type": "text",
423 | "name": "Child Text"
424 | }
425 | ]
426 | },
427 | "page_id": "page1"
428 | }
429 |
430 | # Setup the export_object mock for the included image
431 | export_object_mock = MagicMock()
432 | export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
433 |
434 | # Create a callable that matches what would be registered
435 | def get_object_tree(
436 | file_id: str,
437 | object_id: str,
438 | fields: list, # Now required parameter
439 | depth: int = -1,
440 | format: str = "json"
441 | ):
442 | try:
443 | # Get the file data
444 | file_data = mock_penpot_api.get_file(file_id=file_id)
445 |
446 | # Use the mocked utility function
447 | result = mock_get_subtree(
448 | file_data,
449 | object_id,
450 | include_fields=fields,
451 | depth=depth
452 | )
453 |
454 | # Extract the tree and page_id
455 | simplified_tree = result["tree"]
456 | page_id = result["page_id"]
457 |
458 | # Prepare the result dictionary
459 | final_result = {"tree": simplified_tree}
460 |
461 | # Always include image (no longer optional)
462 | try:
463 | image = export_object_mock(
464 | file_id=file_id,
465 | page_id=page_id,
466 | object_id=object_id
467 | )
468 | # New format: URI-based instead of base64 data
469 | image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
470 | image_uri = f"render_component://{image_id}"
471 | final_result["image"] = {
472 | "uri": image_uri,
473 | "format": image.format if hasattr(image, 'format') else "png"
474 | }
475 | except Exception as e:
476 | final_result["image_error"] = str(e)
477 |
478 | # Format the tree as YAML if requested
479 | if format.lower() == "yaml":
480 | try:
481 | # Convert the entire result to YAML, including the image if present
482 | yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
483 | return {"yaml_result": yaml_result}
484 | except Exception as e:
485 | return {"format_error": f"Error formatting as YAML: {str(e)}"}
486 |
487 | # Return the JSON format result
488 | return final_result
489 | except Exception as e:
490 | return {"error": str(e)}
491 |
492 | # Call the handler with YAML format - fields is now required
493 | result = get_object_tree(
494 | file_id="file1",
495 | object_id="obj1",
496 | fields=["id", "type", "name"], # Required parameter
497 | format="yaml"
498 | )
499 |
500 | # Check the result
501 | assert isinstance(result, dict)
502 | assert "yaml_result" in result
503 | assert "tree" not in result # Should not contain the tree field
504 |
505 | # Verify the YAML content matches the expected tree structure
506 | parsed_yaml = yaml.safe_load(result["yaml_result"])
507 | assert "tree" in parsed_yaml
508 | assert parsed_yaml["tree"]["id"] == "obj1"
509 | assert parsed_yaml["tree"]["type"] == "frame"
510 | assert parsed_yaml["tree"]["name"] == "Test Object"
511 | assert isinstance(parsed_yaml["tree"]["children"], list)
512 | assert parsed_yaml["tree"]["children"][0]["id"] == "child1"
513 |
514 | # Check that image is included in YAML
515 | assert "image" in parsed_yaml
516 | assert "uri" in parsed_yaml["image"]
517 | assert parsed_yaml["image"]["uri"].startswith("render_component://")
518 | assert parsed_yaml["image"]["format"] == "png"
519 |
520 | # Verify mocks were called with correct parameters
521 | mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
522 | mock_get_subtree.assert_called_once_with(
523 | mock_penpot_api.get_file.return_value,
524 | "obj1",
525 | include_fields=["id", "type", "name"],
526 | depth=-1
527 | )
528 |
529 |
530 | @patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
531 | def test_get_object_tree_with_include_image(mock_get_subtree, mock_penpot_api):
532 | """Test the get_object_tree tool handler with image inclusion (always included now)."""
533 | # Setup the mock get_object_subtree_with_fields function
534 | mock_get_subtree.return_value = {
535 | "tree": {
536 | "id": "obj1",
537 | "type": "frame",
538 | "name": "Test Object",
539 | "children": []
540 | },
541 | "page_id": "page1"
542 | }
543 |
544 | # Setup the export_object mock for the included image
545 | export_object_mock = MagicMock()
546 | export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
547 |
548 | # Create a callable that matches what would be registered
549 | def get_object_tree(
550 | file_id: str,
551 | object_id: str,
552 | fields: list, # Now required parameter
553 | depth: int = -1,
554 | format: str = "json"
555 | ):
556 | try:
557 | # Get the file data
558 | file_data = mock_penpot_api.get_file(file_id=file_id)
559 |
560 | # Use the mocked utility function
561 | result = mock_get_subtree(
562 | file_data,
563 | object_id,
564 | include_fields=fields,
565 | depth=depth
566 | )
567 |
568 | # Extract the tree and page_id
569 | simplified_tree = result["tree"]
570 | page_id = result["page_id"]
571 |
572 | # Prepare the result dictionary
573 | final_result = {"tree": simplified_tree}
574 |
575 | # Always include image (no longer optional)
576 | try:
577 | image = export_object_mock(
578 | file_id=file_id,
579 | page_id=page_id,
580 | object_id=object_id
581 | )
582 | # New format: URI-based instead of base64 data
583 | image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
584 | image_uri = f"render_component://{image_id}"
585 | final_result["image"] = {
586 | "uri": image_uri,
587 | "format": image.format if hasattr(image, 'format') else "png"
588 | }
589 | except Exception as e:
590 | final_result["image_error"] = str(e)
591 |
592 | # Format the tree as YAML if requested
593 | if format.lower() == "yaml":
594 | try:
595 | # Convert the entire result to YAML, including the image if present
596 | yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
597 | return {"yaml_result": yaml_result}
598 | except Exception as e:
599 | return {"format_error": f"Error formatting as YAML: {str(e)}"}
600 |
601 | # Return the JSON format result
602 | return final_result
603 | except Exception as e:
604 | return {"error": str(e)}
605 |
606 | # Call the handler with required fields parameter
607 | result = get_object_tree(
608 | file_id="file1",
609 | object_id="obj1",
610 | fields=["id", "type", "name"] # Updated parameter name
611 | )
612 |
613 | # Check the result
614 | assert isinstance(result, dict)
615 | assert "tree" in result
616 | assert result["tree"]["id"] == "obj1"
617 | assert result["tree"]["type"] == "frame"
618 | assert result["tree"]["name"] == "Test Object"
619 |
620 | # Check that image is always included
621 | assert "image" in result
622 | assert "uri" in result["image"]
623 | assert result["image"]["uri"].startswith("render_component://")
624 | assert result["image"]["format"] == "png"
625 |
626 | # Verify mocks were called with correct parameters
627 | mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
628 | mock_get_subtree.assert_called_once_with(
629 | mock_penpot_api.get_file.return_value,
630 | "obj1",
631 | include_fields=["id", "type", "name"],
632 | depth=-1
633 | )
634 |
635 |
636 | @patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
637 | def test_get_object_tree_with_yaml_and_image(mock_get_subtree, mock_penpot_api):
638 | """Test the get_object_tree tool handler with YAML format and image inclusion (always included now)."""
639 | # Setup the mock get_object_subtree_with_fields function
640 | mock_get_subtree.return_value = {
641 | "tree": {
642 | "id": "obj1",
643 | "type": "frame",
644 | "name": "Test Object",
645 | "children": []
646 | },
647 | "page_id": "page1"
648 | }
649 |
650 | # Setup the export_object mock for the included image
651 | export_object_mock = MagicMock()
652 | export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
653 |
654 | # Create a callable that matches what would be registered
655 | def get_object_tree(
656 | file_id: str,
657 | object_id: str,
658 | fields: list, # Now required parameter
659 | depth: int = -1,
660 | format: str = "json"
661 | ):
662 | try:
663 | # Get the file data
664 | file_data = mock_penpot_api.get_file(file_id=file_id)
665 |
666 | # Use the mocked utility function
667 | result = mock_get_subtree(
668 | file_data,
669 | object_id,
670 | include_fields=fields,
671 | depth=depth
672 | )
673 |
674 | # Extract the tree and page_id
675 | simplified_tree = result["tree"]
676 | page_id = result["page_id"]
677 |
678 | # Prepare the result dictionary
679 | final_result = {"tree": simplified_tree}
680 |
681 | # Always include image (no longer optional)
682 | try:
683 | image = export_object_mock(
684 | file_id=file_id,
685 | page_id=page_id,
686 | object_id=object_id
687 | )
688 | # New format: URI-based instead of base64 data
689 | image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
690 | image_uri = f"render_component://{image_id}"
691 | final_result["image"] = {
692 | "uri": image_uri,
693 | "format": image.format if hasattr(image, 'format') else "png"
694 | }
695 | except Exception as e:
696 | final_result["image_error"] = str(e)
697 |
698 | # Format the tree as YAML if requested
699 | if format.lower() == "yaml":
700 | try:
701 | # Convert the entire result to YAML, including the image if present
702 | yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
703 | return {"yaml_result": yaml_result}
704 | except Exception as e:
705 | return {"format_error": f"Error formatting as YAML: {str(e)}"}
706 |
707 | # Return the JSON format result
708 | return final_result
709 | except Exception as e:
710 | return {"error": str(e)}
711 |
712 | # Call the handler with required fields parameter and YAML format
713 | result = get_object_tree(
714 | file_id="file1",
715 | object_id="obj1",
716 | fields=["id", "type", "name"], # Updated parameter name
717 | format="yaml"
718 | )
719 |
720 | # Check the result
721 | assert isinstance(result, dict)
722 | assert "yaml_result" in result
723 | assert "tree" not in result # Should not contain the tree field directly
724 |
725 | # Verify the YAML content contains both tree and image with URI
726 | parsed_yaml = yaml.safe_load(result["yaml_result"])
727 | assert "tree" in parsed_yaml
728 | assert parsed_yaml["tree"]["id"] == "obj1"
729 | assert parsed_yaml["tree"]["type"] == "frame"
730 | assert parsed_yaml["tree"]["name"] == "Test Object"
731 | assert "image" in parsed_yaml
732 | assert "uri" in parsed_yaml["image"]
733 |
734 | # Verify the URI format in the YAML
735 | assert parsed_yaml["image"]["uri"].startswith("render_component://")
736 | assert parsed_yaml["image"]["format"] == "png"
737 |
738 | # Verify mocks were called with correct parameters
739 | mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
740 | mock_get_subtree.assert_called_once_with(
741 | mock_penpot_api.get_file.return_value,
742 | "obj1",
743 | include_fields=["id", "type", "name"],
744 | depth=-1
745 | )
746 |
747 |
748 | def test_rendered_component_resource():
749 | """Test the rendered component resource handler."""
750 | server = PenpotMCPServer(test_mode=True)
751 |
752 | component_id = "test_component_id"
753 | mock_image = MagicMock()
754 | mock_image.format = "png"
755 |
756 | # Mock the rendered_components dictionary
757 | server.rendered_components = {component_id: mock_image}
758 |
759 | # Get the resource handler function dynamically (this is tricky in real usage)
760 | # For testing, we'll implement the function directly based on the code
761 | def get_rendered_component(component_id: str):
762 | if component_id in server.rendered_components:
763 | return server.rendered_components[component_id]
764 | raise Exception(f"Component with ID {component_id} not found")
765 |
766 | # Test with a valid component ID
767 | result = get_rendered_component(component_id)
768 | assert result == mock_image
769 |
770 | # Test with an invalid component ID
771 | with pytest.raises(Exception) as excinfo:
772 | get_rendered_component("invalid_id")
773 | assert "not found" in str(excinfo.value)
774 |
775 |
776 | def test_search_object_basic(mock_penpot_api):
777 | """Test the search_object tool basic functionality."""
778 | # Mock the file contents with more detailed mock data
779 | mock_file_data = {
780 | "id": "file1",
781 | "name": "Test File",
782 | "pagesIndex": {
783 | "page1": {
784 | "id": "page1",
785 | "name": "Page 1",
786 | "objects": {
787 | "obj1": {"id": "obj1", "name": "Button Component", "type": "frame"},
788 | "obj2": {"id": "obj2", "name": "Header Text", "type": "text"},
789 | "obj3": {"id": "obj3", "name": "Button Label", "type": "text"}
790 | }
791 | },
792 | "page2": {
793 | "id": "page2",
794 | "name": "Page 2",
795 | "objects": {
796 | "obj4": {"id": "obj4", "name": "Footer Button", "type": "frame"},
797 | "obj5": {"id": "obj5", "name": "Copyright Text", "type": "text"}
798 | }
799 | }
800 | }
801 | }
802 |
803 | # Override the get_file return value for this test
804 | mock_penpot_api.get_file.return_value = mock_file_data
805 |
806 | # Create a function to simulate the search_object tool
807 | def get_cached_file(file_id):
808 | # Call the mock API to ensure it's tracked for assertions
809 | return mock_penpot_api.get_file(file_id=file_id)
810 |
811 | def search_object(file_id: str, query: str):
812 | try:
813 | # Get the file data using cache
814 | file_data = get_cached_file(file_id)
815 | if "error" in file_data:
816 | return file_data
817 |
818 | # Create case-insensitive pattern for matching
819 | import re
820 | pattern = re.compile(query, re.IGNORECASE)
821 |
822 | # Store matching objects
823 | matches = []
824 |
825 | # Search through each page in the file
826 | for page_id, page_data in file_data.get('pagesIndex', {}).items():
827 | page_name = page_data.get('name', 'Unnamed')
828 |
829 | # Search through objects in this page
830 | for obj_id, obj_data in page_data.get('objects', {}).items():
831 | obj_name = obj_data.get('name', '')
832 |
833 | # Check if the name contains the query (case-insensitive)
834 | if pattern.search(obj_name):
835 | matches.append({
836 | 'id': obj_id,
837 | 'name': obj_name,
838 | 'page_id': page_id,
839 | 'page_name': page_name,
840 | 'object_type': obj_data.get('type', 'unknown')
841 | })
842 |
843 | return {'objects': matches}
844 | except Exception as e:
845 | return {"error": str(e)}
846 |
847 | # Test searching for "button" (should find 3 objects)
848 | result = search_object("file1", "button")
849 | assert "objects" in result
850 | assert len(result["objects"]) == 3
851 |
852 | # Check the first match
853 | button_matches = [obj for obj in result["objects"] if "Button Component" == obj["name"]]
854 | assert len(button_matches) == 1
855 | assert button_matches[0]["id"] == "obj1"
856 | assert button_matches[0]["page_id"] == "page1"
857 | assert button_matches[0]["page_name"] == "Page 1"
858 | assert button_matches[0]["object_type"] == "frame"
859 |
860 | # Check that it found objects across pages
861 | footer_button_matches = [obj for obj in result["objects"] if "Footer Button" == obj["name"]]
862 | assert len(footer_button_matches) == 1
863 | assert footer_button_matches[0]["page_id"] == "page2"
864 |
865 | # Verify API was called with correct parameters
866 | mock_penpot_api.get_file.assert_called_with(file_id="file1")
867 |
868 |
869 | def test_search_object_case_insensitive(mock_penpot_api):
870 | """Test the search_object tool with case-insensitive search."""
871 | # Mock the file contents with more detailed mock data
872 | mock_file_data = {
873 | "id": "file1",
874 | "name": "Test File",
875 | "pagesIndex": {
876 | "page1": {
877 | "id": "page1",
878 | "name": "Page 1",
879 | "objects": {
880 | "obj1": {"id": "obj1", "name": "Button Component", "type": "frame"},
881 | "obj2": {"id": "obj2", "name": "HEADER TEXT", "type": "text"},
882 | "obj3": {"id": "obj3", "name": "button Label", "type": "text"}
883 | }
884 | }
885 | }
886 | }
887 |
888 | # Override the get_file return value for this test
889 | mock_penpot_api.get_file.return_value = mock_file_data
890 |
891 | # Create a function to simulate the search_object tool
892 | def get_cached_file(file_id):
893 | # Call the mock API to ensure it's tracked for assertions
894 | return mock_penpot_api.get_file(file_id=file_id)
895 |
896 | def search_object(file_id: str, query: str):
897 | try:
898 | # Get the file data using cache
899 | file_data = get_cached_file(file_id)
900 | if "error" in file_data:
901 | return file_data
902 |
903 | # Create case-insensitive pattern for matching
904 | import re
905 | pattern = re.compile(query, re.IGNORECASE)
906 |
907 | # Store matching objects
908 | matches = []
909 |
910 | # Search through each page in the file
911 | for page_id, page_data in file_data.get('pagesIndex', {}).items():
912 | page_name = page_data.get('name', 'Unnamed')
913 |
914 | # Search through objects in this page
915 | for obj_id, obj_data in page_data.get('objects', {}).items():
916 | obj_name = obj_data.get('name', '')
917 |
918 | # Check if the name contains the query (case-insensitive)
919 | if pattern.search(obj_name):
920 | matches.append({
921 | 'id': obj_id,
922 | 'name': obj_name,
923 | 'page_id': page_id,
924 | 'page_name': page_name,
925 | 'object_type': obj_data.get('type', 'unknown')
926 | })
927 |
928 | return {'objects': matches}
929 | except Exception as e:
930 | return {"error": str(e)}
931 |
932 | # Test with lowercase query for uppercase text
933 | result = search_object("file1", "header")
934 | assert "objects" in result
935 | assert len(result["objects"]) == 1
936 | assert result["objects"][0]["name"] == "HEADER TEXT"
937 |
938 | # Test with uppercase query for lowercase text
939 | result = search_object("file1", "BUTTON")
940 | assert "objects" in result
941 | assert len(result["objects"]) == 2
942 |
943 | # Check mixed case matching
944 | button_matches = sorted([obj["name"] for obj in result["objects"]])
945 | assert button_matches == ["Button Component", "button Label"]
946 |
947 | # Verify API was called
948 | mock_penpot_api.get_file.assert_called_with(file_id="file1")
949 |
950 |
951 | def test_search_object_no_matches(mock_penpot_api):
952 | """Test the search_object tool when no matches are found."""
953 | # Mock the file contents
954 | mock_file_data = {
955 | "id": "file1",
956 | "name": "Test File",
957 | "pagesIndex": {
958 | "page1": {
959 | "id": "page1",
960 | "name": "Page 1",
961 | "objects": {
962 | "obj1": {"id": "obj1", "name": "Button Component", "type": "frame"},
963 | "obj2": {"id": "obj2", "name": "Header Text", "type": "text"}
964 | }
965 | }
966 | }
967 | }
968 |
969 | # Override the get_file return value for this test
970 | mock_penpot_api.get_file.return_value = mock_file_data
971 |
972 | # Create a function to simulate the search_object tool
973 | def get_cached_file(file_id):
974 | # Call the mock API to ensure it's tracked for assertions
975 | return mock_penpot_api.get_file(file_id=file_id)
976 |
977 | def search_object(file_id: str, query: str):
978 | try:
979 | # Get the file data using cache
980 | file_data = get_cached_file(file_id)
981 | if "error" in file_data:
982 | return file_data
983 |
984 | # Create case-insensitive pattern for matching
985 | import re
986 | pattern = re.compile(query, re.IGNORECASE)
987 |
988 | # Store matching objects
989 | matches = []
990 |
991 | # Search through each page in the file
992 | for page_id, page_data in file_data.get('pagesIndex', {}).items():
993 | page_name = page_data.get('name', 'Unnamed')
994 |
995 | # Search through objects in this page
996 | for obj_id, obj_data in page_data.get('objects', {}).items():
997 | obj_name = obj_data.get('name', '')
998 |
999 | # Check if the name contains the query (case-insensitive)
1000 | if pattern.search(obj_name):
1001 | matches.append({
1002 | 'id': obj_id,
1003 | 'name': obj_name,
1004 | 'page_id': page_id,
1005 | 'page_name': page_name,
1006 | 'object_type': obj_data.get('type', 'unknown')
1007 | })
1008 |
1009 | return {'objects': matches}
1010 | except Exception as e:
1011 | return {"error": str(e)}
1012 |
1013 | # Test with a query that won't match anything
1014 | result = search_object("file1", "nonexistent")
1015 | assert "objects" in result
1016 | assert len(result["objects"]) == 0 # Empty array
1017 |
1018 | # Verify API was called
1019 | mock_penpot_api.get_file.assert_called_with(file_id="file1")
1020 |
1021 |
1022 | def test_search_object_error_handling(mock_penpot_api):
1023 | """Test the search_object tool error handling."""
1024 | # Make the API throw an exception
1025 | mock_penpot_api.get_file.side_effect = Exception("API error")
1026 |
1027 | def get_cached_file(file_id):
1028 | try:
1029 | return mock_penpot_api.get_file(file_id=file_id)
1030 | except Exception as e:
1031 | return {"error": str(e)}
1032 |
1033 | def search_object(file_id: str, query: str):
1034 | try:
1035 | # Get the file data using cache
1036 | file_data = get_cached_file(file_id)
1037 | if "error" in file_data:
1038 | return file_data
1039 |
1040 | # Create case-insensitive pattern for matching
1041 | import re
1042 | pattern = re.compile(query, re.IGNORECASE)
1043 |
1044 | # Store matching objects
1045 | matches = []
1046 |
1047 | # Search through each page in the file
1048 | for page_id, page_data in file_data.get('pagesIndex', {}).items():
1049 | page_name = page_data.get('name', 'Unnamed')
1050 |
1051 | # Search through objects in this page
1052 | for obj_id, obj_data in page_data.get('objects', {}).items():
1053 | obj_name = obj_data.get('name', '')
1054 |
1055 | # Check if the name contains the query (case-insensitive)
1056 | if pattern.search(obj_name):
1057 | matches.append({
1058 | 'id': obj_id,
1059 | 'name': obj_name,
1060 | 'page_id': page_id,
1061 | 'page_name': page_name,
1062 | 'object_type': obj_data.get('type', 'unknown')
1063 | })
1064 |
1065 | return {'objects': matches}
1066 | except Exception as e:
1067 | return {"error": str(e)}
1068 |
1069 | # Test with error from API
1070 | result = search_object("file1", "button")
1071 | assert "error" in result
1072 | assert "API error" in result["error"]
1073 |
1074 | # Verify API was called
1075 | mock_penpot_api.get_file.assert_called_with(file_id="file1")
1076 |
```
--------------------------------------------------------------------------------
/tests/test_penpot_tree.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the penpot_tree module."""
2 |
3 | import re
4 | from unittest.mock import MagicMock, patch
5 |
6 | import pytest
7 | from anytree import Node, RenderTree
8 |
9 | from penpot_mcp.tools.penpot_tree import (
10 | build_tree,
11 | convert_node_to_dict,
12 | export_tree_to_dot,
13 | find_object_in_tree,
14 | find_page_containing_object,
15 | get_object_subtree,
16 | get_object_subtree_with_fields,
17 | print_tree,
18 | )
19 |
20 |
21 | @pytest.fixture
22 | def sample_penpot_data():
23 | """Create sample Penpot file data for testing."""
24 | return {
25 | 'components': {
26 | 'comp1': {'name': 'Button', 'annotation': 'Primary button'},
27 | 'comp2': {'name': 'Card', 'annotation': None}
28 | },
29 | 'pagesIndex': {
30 | 'page1': {
31 | 'name': 'Home Page',
32 | 'objects': {
33 | '00000000-0000-0000-0000-000000000000': {
34 | 'type': 'frame',
35 | 'name': 'Root Frame',
36 | },
37 | 'obj1': {
38 | 'type': 'frame',
39 | 'name': 'Header',
40 | 'parentId': '00000000-0000-0000-0000-000000000000'
41 | },
42 | 'obj2': {
43 | 'type': 'text',
44 | 'name': 'Title',
45 | 'parentId': 'obj1'
46 | },
47 | 'obj3': {
48 | 'type': 'frame',
49 | 'name': 'Button Instance',
50 | 'parentId': 'obj1',
51 | 'componentId': 'comp1'
52 | }
53 | }
54 | },
55 | 'page2': {
56 | 'name': 'About Page',
57 | 'objects': {
58 | '00000000-0000-0000-0000-000000000000': {
59 | 'type': 'frame',
60 | 'name': 'Root Frame',
61 | },
62 | 'obj4': {
63 | 'type': 'frame',
64 | 'name': 'Content',
65 | 'parentId': '00000000-0000-0000-0000-000000000000'
66 | },
67 | 'obj5': {
68 | 'type': 'image',
69 | 'name': 'Logo',
70 | 'parentId': 'obj4'
71 | }
72 | }
73 | }
74 | }
75 | }
76 |
77 |
78 | @pytest.fixture
79 | def sample_tree(sample_penpot_data):
80 | """Create a sample tree from the sample data."""
81 | return build_tree(sample_penpot_data)
82 |
83 |
84 | def test_build_tree(sample_penpot_data, sample_tree):
85 | """Test building a tree from Penpot file data."""
86 | # Check that the root is created
87 | assert sample_tree.name.startswith("SYNTHETIC-ROOT")
88 |
89 | # Check components section
90 | components_node = None
91 | for child in sample_tree.children:
92 | if "components (section)" in child.name:
93 | components_node = child
94 | break
95 |
96 | assert components_node is not None
97 | assert len(components_node.children) == 2
98 |
99 | # Check pages are created
100 | page_nodes = [child for child in sample_tree.children if "(page)" in child.name]
101 | assert len(page_nodes) == 2
102 |
103 | # Check objects within pages
104 | for page_node in page_nodes:
105 | if "Home Page" in page_node.name:
106 | # Check that objects are created under the page
107 | assert len(page_node.descendants) == 4 # Root frame + 3 objects
108 |
109 | # Check parent-child relationships
110 | for node in RenderTree(page_node):
111 | if hasattr(node[2], 'obj_id') and node[2].obj_id == 'obj2':
112 | assert node[2].parent.obj_id == 'obj1'
113 | elif hasattr(node[2], 'obj_id') and node[2].obj_id == 'obj3':
114 | assert node[2].parent.obj_id == 'obj1'
115 | assert hasattr(node[2], 'componentRef')
116 | assert node[2].componentRef == 'comp1'
117 | assert hasattr(node[2], 'componentAnnotation')
118 | assert node[2].componentAnnotation == 'Primary button'
119 |
120 |
121 | def test_print_tree(sample_tree, capsys):
122 | """Test printing the tree to console."""
123 | print_tree(sample_tree)
124 | captured = capsys.readouterr()
125 |
126 | # Check that all pages and components are in the output
127 | assert "Home Page" in captured.out
128 | assert "About Page" in captured.out
129 | assert "comp1 (component) - Button" in captured.out
130 | assert "comp2 (component) - Card" in captured.out
131 |
132 | # Check that object types and names are displayed
133 | assert "(frame) - Header" in captured.out
134 | assert "(text) - Title" in captured.out
135 |
136 | # Check that component references are shown
137 | assert "refs component: comp1" in captured.out
138 | assert "Note: Primary button" in captured.out
139 |
140 |
141 | def test_print_tree_with_filter(sample_tree, capsys):
142 | """Test printing the tree with a filter applied."""
143 | print_tree(sample_tree, filter_pattern="title")
144 | captured = capsys.readouterr()
145 |
146 | # Check that only the matching node and its ancestors are shown
147 | assert "Title" in captured.out
148 | assert "Header" in captured.out
149 | assert "Home Page" in captured.out
150 | assert "MATCH" in captured.out
151 |
152 | # Check that non-matching nodes are not included
153 | assert "Logo" not in captured.out
154 | assert "About Page" not in captured.out
155 |
156 |
157 | @patch('anytree.exporter.DotExporter.to_picture')
158 | def test_export_tree_to_dot(mock_to_picture, sample_tree):
159 | """Test exporting the tree to a DOT file."""
160 | result = export_tree_to_dot(sample_tree, "test_output.png")
161 |
162 | # Check that the exporter was called
163 | assert mock_to_picture.called
164 | assert result is True
165 |
166 |
167 | @patch('anytree.exporter.DotExporter.to_picture', side_effect=Exception("Test exception"))
168 | def test_export_tree_to_dot_exception(mock_to_picture, sample_tree, capsys):
169 | """Test handling exceptions when exporting the tree."""
170 | result = export_tree_to_dot(sample_tree, "test_output.png")
171 |
172 | # Check that the function returns False on error
173 | assert result is False
174 |
175 | # Check that an error message is displayed
176 | captured = capsys.readouterr()
177 | assert "Warning: Could not export" in captured.out
178 | assert "Make sure Graphviz is installed" in captured.out
179 |
180 |
181 | def test_find_page_containing_object(sample_penpot_data):
182 | """Test finding which page contains a specific object."""
183 | # Test finding an object that exists
184 | page_id = find_page_containing_object(sample_penpot_data, 'obj2')
185 | assert page_id == 'page1'
186 |
187 | # Test finding an object in another page
188 | page_id = find_page_containing_object(sample_penpot_data, 'obj5')
189 | assert page_id == 'page2'
190 |
191 | # Test finding an object that doesn't exist
192 | page_id = find_page_containing_object(sample_penpot_data, 'nonexistent')
193 | assert page_id is None
194 |
195 |
196 | def test_find_object_in_tree(sample_tree):
197 | """Test finding an object in the tree by its ID."""
198 | # Test finding an object that exists
199 | obj_dict = find_object_in_tree(sample_tree, 'obj3')
200 | assert obj_dict is not None
201 | assert obj_dict['id'] == 'obj3'
202 | assert obj_dict['type'] == 'frame'
203 | assert obj_dict['name'] == 'Button Instance'
204 | assert 'componentRef' in obj_dict
205 | assert obj_dict['componentRef'] == 'comp1'
206 |
207 | # Test finding an object that doesn't exist
208 | obj_dict = find_object_in_tree(sample_tree, 'nonexistent')
209 | assert obj_dict is None
210 |
211 |
212 | def test_convert_node_to_dict():
213 | """Test converting a Node to a dictionary."""
214 | # Create a test node with children and attributes
215 | root = Node("root")
216 | root.obj_id = "root_id"
217 | root.obj_type = "frame"
218 | root.obj_name = "Root Frame"
219 |
220 | child1 = Node("child1", parent=root)
221 | child1.obj_id = "child1_id"
222 | child1.obj_type = "text"
223 | child1.obj_name = "Child 1"
224 |
225 | child2 = Node("child2", parent=root)
226 | child2.obj_id = "child2_id"
227 | child2.obj_type = "frame"
228 | child2.obj_name = "Child 2"
229 | child2.componentRef = "comp1"
230 | child2.componentAnnotation = "Test component"
231 |
232 | # Convert to dictionary
233 | result = convert_node_to_dict(root)
234 |
235 | # Check the result
236 | assert result['id'] == 'root_id'
237 | assert result['type'] == 'frame'
238 | assert result['name'] == 'Root Frame'
239 | assert len(result['children']) == 2
240 |
241 | # Check children
242 | child_ids = [child['id'] for child in result['children']]
243 | assert 'child1_id' in child_ids
244 | assert 'child2_id' in child_ids
245 |
246 | # Check component reference
247 | for child in result['children']:
248 | if child['id'] == 'child2_id':
249 | assert 'componentRef' in child
250 | assert child['componentRef'] == 'comp1'
251 | assert 'componentAnnotation' in child
252 | assert child['componentAnnotation'] == 'Test component'
253 |
254 |
255 | def test_get_object_subtree(sample_penpot_data):
256 | """Test getting a simplified tree for an object."""
257 | file_data = {'data': sample_penpot_data}
258 |
259 | # Test getting a subtree for an existing object
260 | result = get_object_subtree(file_data, 'obj1')
261 | assert 'error' not in result
262 | assert 'tree' in result
263 | assert result['tree']['id'] == 'obj1'
264 | assert result['tree']['name'] == 'Header'
265 | assert result['page_id'] == 'page1'
266 |
267 | # Test getting a subtree for a non-existent object
268 | result = get_object_subtree(file_data, 'nonexistent')
269 | assert 'error' in result
270 | assert 'not found' in result['error']
271 |
272 |
273 | def test_circular_reference_handling(sample_penpot_data):
274 | """Test handling of circular references in the tree structure."""
275 | # Create a circular reference
276 | sample_penpot_data['pagesIndex']['page1']['objects']['obj6'] = {
277 | 'type': 'frame',
278 | 'name': 'Circular Parent',
279 | 'parentId': 'obj7'
280 | }
281 | sample_penpot_data['pagesIndex']['page1']['objects']['obj7'] = {
282 | 'type': 'frame',
283 | 'name': 'Circular Child',
284 | 'parentId': 'obj6'
285 | }
286 |
287 | # Build tree with circular reference
288 | tree = build_tree(sample_penpot_data)
289 |
290 | # The tree should be built without errors
291 | # Check that the circular reference objects are attached to the page
292 | page_node = None
293 | for child in tree.children:
294 | if "(page)" in child.name and "Home Page" in child.name:
295 | page_node = child
296 | break
297 |
298 | assert page_node is not None
299 |
300 | # Find the circular reference objects
301 | circular_nodes = []
302 | for node in RenderTree(page_node):
303 | if hasattr(node[2], 'obj_id') and node[2].obj_id in ['obj6', 'obj7']:
304 | circular_nodes.append(node[2])
305 |
306 | # Check that the circular reference was resolved by attaching to parent
307 | assert len(circular_nodes) == 2
308 |
309 |
310 | def test_get_object_subtree_with_fields(sample_penpot_data):
311 | """Test getting a filtered subtree for an object with specific fields."""
312 | file_data = {'data': sample_penpot_data}
313 |
314 | # Test with no field filtering (include all fields)
315 | result = get_object_subtree_with_fields(file_data, 'obj1')
316 | assert 'error' not in result
317 | assert 'tree' in result
318 | assert result['tree']['id'] == 'obj1'
319 | assert result['tree']['name'] == 'Header'
320 | assert result['tree']['type'] == 'frame'
321 | assert 'parentId' in result['tree']
322 | assert len(result['tree']['children']) == 2
323 |
324 | # Test with field filtering
325 | result = get_object_subtree_with_fields(file_data, 'obj1', include_fields=['name', 'type'])
326 | assert 'error' not in result
327 | assert 'tree' in result
328 | assert result['tree']['id'] == 'obj1' # id is always included
329 | assert result['tree']['name'] == 'Header'
330 | assert result['tree']['type'] == 'frame'
331 | assert 'parentId' not in result['tree'] # should be filtered out
332 | assert len(result['tree']['children']) == 2
333 |
334 | # Test with depth limiting (depth=0, only the object itself)
335 | result = get_object_subtree_with_fields(file_data, 'obj1', depth=0)
336 | assert 'error' not in result
337 | assert 'tree' in result
338 | assert result['tree']['id'] == 'obj1'
339 | assert 'children' not in result['tree'] # No children at depth 0
340 |
341 | # Test for an object that doesn't exist
342 | result = get_object_subtree_with_fields(file_data, 'nonexistent')
343 | assert 'error' in result
344 | assert 'not found' in result['error']
345 |
346 | def test_get_object_subtree_with_fields_deep_hierarchy():
347 | """Test getting a filtered subtree for an object with multiple levels of nesting."""
348 | # Create a more complex nested structure for testing depth parameter
349 | file_data = {
350 | 'data': {
351 | 'components': {
352 | 'comp1': {
353 | 'id': 'comp1',
354 | 'name': 'Button',
355 | 'path': '/Components/Button',
356 | 'modifiedAt': '2023-01-01T12:00:00Z',
357 | 'mainInstanceId': 'main-button-instance',
358 | 'mainInstancePage': 'page1',
359 | 'annotation': 'Primary button'
360 | },
361 | 'comp2': {
362 | 'id': 'comp2',
363 | 'name': 'Card',
364 | 'path': '/Components/Card',
365 | 'modifiedAt': '2023-01-02T12:00:00Z',
366 | 'mainInstanceId': 'main-card-instance',
367 | 'mainInstancePage': 'page1',
368 | 'annotation': 'Content card'
369 | }
370 | },
371 | 'colors': {
372 | 'color1': {
373 | 'path': '/Colors/Primary',
374 | 'color': '#3366FF',
375 | 'name': 'Primary Blue',
376 | 'modifiedAt': '2023-01-01T10:00:00Z',
377 | 'opacity': 1,
378 | 'id': 'color1'
379 | },
380 | 'color2': {
381 | 'path': '/Colors/Secondary',
382 | 'color': '#FF6633',
383 | 'name': 'Secondary Orange',
384 | 'modifiedAt': '2023-01-01T10:30:00Z',
385 | 'opacity': 1,
386 | 'id': 'color2'
387 | }
388 | },
389 | 'typographies': {
390 | 'typo1': {
391 | 'lineHeight': '1.5',
392 | 'path': '/Typography/Heading',
393 | 'fontStyle': 'normal',
394 | 'textTransform': 'none',
395 | 'fontId': 'font1',
396 | 'fontSize': '24px',
397 | 'fontWeight': '600',
398 | 'name': 'Heading',
399 | 'modifiedAt': '2023-01-01T11:00:00Z',
400 | 'fontVariantId': 'var1',
401 | 'id': 'typo1',
402 | 'letterSpacing': '0',
403 | 'fontFamily': 'Inter'
404 | }
405 | },
406 | 'pagesIndex': {
407 | 'page1': {
408 | 'id': 'page1',
409 | 'name': 'Complex Page',
410 | 'options': {
411 | 'background': '#FFFFFF',
412 | 'grids': []
413 | },
414 | 'objects': {
415 | # Root frame (level 0)
416 | '00000000-0000-0000-0000-000000000000': {
417 | 'id': '00000000-0000-0000-0000-000000000000',
418 | 'type': 'frame',
419 | 'name': 'Root Frame',
420 | 'width': 1920,
421 | 'height': 1080,
422 | 'x': 0,
423 | 'y': 0,
424 | 'rotation': 0,
425 | 'selrect': {
426 | 'x': 0,
427 | 'y': 0,
428 | 'width': 1920,
429 | 'height': 1080,
430 | 'x1': 0,
431 | 'y1': 0,
432 | 'x2': 1920,
433 | 'y2': 1080
434 | },
435 | 'fills': [
436 | {
437 | 'fillColor': '#FFFFFF',
438 | 'fillOpacity': 1
439 | }
440 | ],
441 | 'layout': 'flex',
442 | 'layoutFlexDir': 'column',
443 | 'layoutAlignItems': 'center',
444 | 'layoutJustifyContent': 'start'
445 | },
446 | # Main container (level 1)
447 | 'main-container': {
448 | 'id': 'main-container',
449 | 'type': 'frame',
450 | 'name': 'Main Container',
451 | 'parentId': '00000000-0000-0000-0000-000000000000',
452 | 'width': 1200,
453 | 'height': 800,
454 | 'x': 360,
455 | 'y': 140,
456 | 'rotation': 0,
457 | 'selrect': {
458 | 'x': 360,
459 | 'y': 140,
460 | 'width': 1200,
461 | 'height': 800,
462 | 'x1': 360,
463 | 'y1': 140,
464 | 'x2': 1560,
465 | 'y2': 940
466 | },
467 | 'fills': [
468 | {
469 | 'fillColor': '#F5F5F5',
470 | 'fillOpacity': 1
471 | }
472 | ],
473 | 'strokes': [
474 | {
475 | 'strokeStyle': 'solid',
476 | 'strokeAlignment': 'center',
477 | 'strokeWidth': 1,
478 | 'strokeColor': '#E0E0E0',
479 | 'strokeOpacity': 1
480 | }
481 | ],
482 | 'layout': 'flex',
483 | 'layoutFlexDir': 'column',
484 | 'layoutAlignItems': 'stretch',
485 | 'layoutJustifyContent': 'start',
486 | 'layoutGap': {
487 | 'row-gap': '0px',
488 | 'column-gap': '0px'
489 | },
490 | 'layoutPadding': {
491 | 'padding-top': '0px',
492 | 'padding-right': '0px',
493 | 'padding-bottom': '0px',
494 | 'padding-left': '0px'
495 | },
496 | 'constraintsH': 'center',
497 | 'constraintsV': 'center'
498 | },
499 | # Header section (level 2)
500 | 'header-section': {
501 | 'id': 'header-section',
502 | 'type': 'frame',
503 | 'name': 'Header Section',
504 | 'parentId': 'main-container',
505 | 'width': 1200,
506 | 'height': 100,
507 | 'x': 0,
508 | 'y': 0,
509 | 'rotation': 0,
510 | 'fills': [
511 | {
512 | 'fillColor': '#FFFFFF',
513 | 'fillOpacity': 1
514 | }
515 | ],
516 | 'strokes': [
517 | {
518 | 'strokeStyle': 'solid',
519 | 'strokeAlignment': 'bottom',
520 | 'strokeWidth': 1,
521 | 'strokeColor': '#EEEEEE',
522 | 'strokeOpacity': 1
523 | }
524 | ],
525 | 'layout': 'flex',
526 | 'layoutFlexDir': 'row',
527 | 'layoutAlignItems': 'center',
528 | 'layoutJustifyContent': 'space-between',
529 | 'layoutPadding': {
530 | 'padding-top': '20px',
531 | 'padding-right': '30px',
532 | 'padding-bottom': '20px',
533 | 'padding-left': '30px'
534 | },
535 | 'constraintsH': 'stretch',
536 | 'constraintsV': 'top'
537 | },
538 | # Logo in header (level 3)
539 | 'logo': {
540 | 'id': 'logo',
541 | 'type': 'frame',
542 | 'name': 'Logo',
543 | 'parentId': 'header-section',
544 | 'width': 60,
545 | 'height': 60,
546 | 'x': 30,
547 | 'y': 20,
548 | 'rotation': 0,
549 | 'fills': [
550 | {
551 | 'fillColor': '#3366FF',
552 | 'fillOpacity': 1
553 | }
554 | ],
555 | 'r1': 8,
556 | 'r2': 8,
557 | 'r3': 8,
558 | 'r4': 8,
559 | 'constraintsH': 'left',
560 | 'constraintsV': 'center'
561 | },
562 | # Navigation menu (level 3)
563 | 'nav-menu': {
564 | 'id': 'nav-menu',
565 | 'type': 'frame',
566 | 'name': 'Navigation Menu',
567 | 'parentId': 'header-section',
568 | 'width': 600,
569 | 'height': 60,
570 | 'x': 300,
571 | 'y': 20,
572 | 'rotation': 0,
573 | 'layout': 'flex',
574 | 'layoutFlexDir': 'row',
575 | 'layoutAlignItems': 'center',
576 | 'layoutJustifyContent': 'center',
577 | 'layoutGap': {
578 | 'row-gap': '0px',
579 | 'column-gap': '20px'
580 | },
581 | 'constraintsH': 'center',
582 | 'constraintsV': 'center'
583 | },
584 | # Menu items (level 4)
585 | 'menu-item-1': {
586 | 'id': 'menu-item-1',
587 | 'type': 'text',
588 | 'name': 'Home',
589 | 'parentId': 'nav-menu',
590 | 'width': 100,
591 | 'height': 40,
592 | 'x': 0,
593 | 'y': 10,
594 | 'rotation': 0,
595 | 'content': {
596 | 'type': 'root',
597 | 'children': [
598 | {
599 | 'type': 'paragraph',
600 | 'children': [
601 | {
602 | 'type': 'text',
603 | 'text': 'Home'
604 | }
605 | ]
606 | }
607 | ]
608 | },
609 | 'fills': [
610 | {
611 | 'fillColor': '#333333',
612 | 'fillOpacity': 1
613 | }
614 | ],
615 | 'appliedTokens': {
616 | 'typography': 'typo1'
617 | },
618 | 'constraintsH': 'start',
619 | 'constraintsV': 'center'
620 | },
621 | 'menu-item-2': {
622 | 'id': 'menu-item-2',
623 | 'type': 'text',
624 | 'name': 'Products',
625 | 'parentId': 'nav-menu',
626 | 'width': 100,
627 | 'height': 40,
628 | 'x': 120,
629 | 'y': 10,
630 | 'rotation': 0,
631 | 'content': {
632 | 'type': 'root',
633 | 'children': [
634 | {
635 | 'type': 'paragraph',
636 | 'children': [
637 | {
638 | 'type': 'text',
639 | 'text': 'Products'
640 | }
641 | ]
642 | }
643 | ]
644 | },
645 | 'fills': [
646 | {
647 | 'fillColor': '#333333',
648 | 'fillOpacity': 1
649 | }
650 | ]
651 | },
652 | 'menu-item-3': {
653 | 'id': 'menu-item-3',
654 | 'type': 'text',
655 | 'name': 'About',
656 | 'parentId': 'nav-menu',
657 | 'width': 100,
658 | 'height': 40,
659 | 'x': 240,
660 | 'y': 10,
661 | 'rotation': 0,
662 | 'content': {
663 | 'type': 'root',
664 | 'children': [
665 | {
666 | 'type': 'paragraph',
667 | 'children': [
668 | {
669 | 'type': 'text',
670 | 'text': 'About'
671 | }
672 | ]
673 | }
674 | ]
675 | },
676 | 'fills': [
677 | {
678 | 'fillColor': '#333333',
679 | 'fillOpacity': 1
680 | }
681 | ]
682 | },
683 | # Content section (level 2)
684 | 'content-section': {
685 | 'id': 'content-section',
686 | 'type': 'frame',
687 | 'name': 'Content Section',
688 | 'parentId': 'main-container',
689 | 'width': 1200,
690 | 'height': 700,
691 | 'x': 0,
692 | 'y': 100,
693 | 'rotation': 0,
694 | 'layout': 'flex',
695 | 'layoutFlexDir': 'column',
696 | 'layoutAlignItems': 'stretch',
697 | 'layoutJustifyContent': 'start',
698 | 'layoutGap': {
699 | 'row-gap': '0px',
700 | 'column-gap': '0px'
701 | },
702 | 'constraintsH': 'stretch',
703 | 'constraintsV': 'top'
704 | },
705 | # Hero (level 3)
706 | 'hero': {
707 | 'id': 'hero',
708 | 'type': 'frame',
709 | 'name': 'Hero Section',
710 | 'parentId': 'content-section',
711 | 'width': 1200,
712 | 'height': 400,
713 | 'x': 0,
714 | 'y': 0,
715 | 'rotation': 0,
716 | 'fills': [
717 | {
718 | 'fillColor': '#F0F7FF',
719 | 'fillOpacity': 1
720 | }
721 | ],
722 | 'layout': 'flex',
723 | 'layoutFlexDir': 'column',
724 | 'layoutAlignItems': 'center',
725 | 'layoutJustifyContent': 'center',
726 | 'layoutPadding': {
727 | 'padding-top': '40px',
728 | 'padding-right': '40px',
729 | 'padding-bottom': '40px',
730 | 'padding-left': '40px'
731 | },
732 | 'constraintsH': 'stretch',
733 | 'constraintsV': 'top'
734 | },
735 | # Hero title (level 4)
736 | 'hero-title': {
737 | 'id': 'hero-title',
738 | 'type': 'text',
739 | 'name': 'Welcome Title',
740 | 'parentId': 'hero',
741 | 'width': 600,
742 | 'height': 80,
743 | 'x': 300,
744 | 'y': 140,
745 | 'rotation': 0,
746 | 'content': {
747 | 'type': 'root',
748 | 'children': [
749 | {
750 | 'type': 'paragraph',
751 | 'children': [
752 | {
753 | 'type': 'text',
754 | 'text': 'Welcome to our Platform'
755 | }
756 | ]
757 | }
758 | ]
759 | },
760 | 'fills': [
761 | {
762 | 'fillColor': '#333333',
763 | 'fillOpacity': 1
764 | }
765 | ],
766 | 'appliedTokens': {
767 | 'typography': 'typo1'
768 | },
769 | 'constraintsH': 'center',
770 | 'constraintsV': 'center'
771 | },
772 | # Cards container (level 3)
773 | 'cards-container': {
774 | 'id': 'cards-container',
775 | 'type': 'frame',
776 | 'name': 'Cards Container',
777 | 'parentId': 'content-section',
778 | 'width': 1200,
779 | 'height': 300,
780 | 'x': 0,
781 | 'y': 400,
782 | 'rotation': 0,
783 | 'layout': 'flex',
784 | 'layoutFlexDir': 'row',
785 | 'layoutAlignItems': 'center',
786 | 'layoutJustifyContent': 'space-around',
787 | 'layoutPadding': {
788 | 'padding-top': '25px',
789 | 'padding-right': '25px',
790 | 'padding-bottom': '25px',
791 | 'padding-left': '25px'
792 | },
793 | 'constraintsH': 'stretch',
794 | 'constraintsV': 'top'
795 | },
796 | # Card instances (level 4)
797 | 'card-1': {
798 | 'id': 'card-1',
799 | 'type': 'frame',
800 | 'name': 'Card 1',
801 | 'parentId': 'cards-container',
802 | 'width': 300,
803 | 'height': 250,
804 | 'x': 50,
805 | 'y': 25,
806 | 'rotation': 0,
807 | 'componentId': 'comp2',
808 | 'fills': [
809 | {
810 | 'fillColor': '#FFFFFF',
811 | 'fillOpacity': 1
812 | }
813 | ],
814 | 'strokes': [
815 | {
816 | 'strokeStyle': 'solid',
817 | 'strokeAlignment': 'center',
818 | 'strokeWidth': 1,
819 | 'strokeColor': '#EEEEEE',
820 | 'strokeOpacity': 1
821 | }
822 | ],
823 | 'r1': 8,
824 | 'r2': 8,
825 | 'r3': 8,
826 | 'r4': 8,
827 | 'layout': 'flex',
828 | 'layoutFlexDir': 'column',
829 | 'layoutAlignItems': 'center',
830 | 'layoutJustifyContent': 'start',
831 | 'layoutPadding': {
832 | 'padding-top': '20px',
833 | 'padding-right': '20px',
834 | 'padding-bottom': '20px',
835 | 'padding-left': '20px'
836 | },
837 | 'constraintsH': 'center',
838 | 'constraintsV': 'center'
839 | },
840 | 'card-2': {
841 | 'id': 'card-2',
842 | 'type': 'frame',
843 | 'name': 'Card 2',
844 | 'parentId': 'cards-container',
845 | 'width': 300,
846 | 'height': 250,
847 | 'x': 450,
848 | 'y': 25,
849 | 'rotation': 0,
850 | 'componentId': 'comp2',
851 | 'fills': [
852 | {
853 | 'fillColor': '#FFFFFF',
854 | 'fillOpacity': 1
855 | }
856 | ],
857 | 'strokes': [
858 | {
859 | 'strokeStyle': 'solid',
860 | 'strokeAlignment': 'center',
861 | 'strokeWidth': 1,
862 | 'strokeColor': '#EEEEEE',
863 | 'strokeOpacity': 1
864 | }
865 | ],
866 | 'r1': 8,
867 | 'r2': 8,
868 | 'r3': 8,
869 | 'r4': 8
870 | },
871 | 'card-3': {
872 | 'id': 'card-3',
873 | 'type': 'frame',
874 | 'name': 'Card 3',
875 | 'parentId': 'cards-container',
876 | 'width': 300,
877 | 'height': 250,
878 | 'x': 850,
879 | 'y': 25,
880 | 'rotation': 0,
881 | 'componentId': 'comp2',
882 | 'fills': [
883 | {
884 | 'fillColor': '#FFFFFF',
885 | 'fillOpacity': 1
886 | }
887 | ],
888 | 'strokes': [
889 | {
890 | 'strokeStyle': 'solid',
891 | 'strokeAlignment': 'center',
892 | 'strokeWidth': 1,
893 | 'strokeColor': '#EEEEEE',
894 | 'strokeOpacity': 1
895 | }
896 | ],
897 | 'r1': 8,
898 | 'r2': 8,
899 | 'r3': 8,
900 | 'r4': 8
901 | }
902 | }
903 | }
904 | },
905 | 'id': 'file1',
906 | 'pages': ['page1'],
907 | 'tokensLib': {
908 | 'sets': {
909 | 'S-colors': {
910 | 'name': 'Colors',
911 | 'description': 'Color tokens',
912 | 'modifiedAt': '2023-01-01T09:00:00Z',
913 | 'tokens': {
914 | 'primary': {
915 | 'name': 'Primary',
916 | 'type': 'color',
917 | 'value': '#3366FF',
918 | 'description': 'Primary color',
919 | 'modifiedAt': '2023-01-01T09:00:00Z'
920 | },
921 | 'secondary': {
922 | 'name': 'Secondary',
923 | 'type': 'color',
924 | 'value': '#FF6633',
925 | 'description': 'Secondary color',
926 | 'modifiedAt': '2023-01-01T09:00:00Z'
927 | }
928 | }
929 | }
930 | },
931 | 'themes': {
932 | 'default': {
933 | 'light': {
934 | 'name': 'Light',
935 | 'group': 'Default',
936 | 'description': 'Light theme',
937 | 'isSource': True,
938 | 'id': 'theme1',
939 | 'modifiedAt': '2023-01-01T09:30:00Z',
940 | 'sets': ['S-colors']
941 | }
942 | }
943 | },
944 | 'activeThemes': ['light']
945 | }
946 | }
947 | }
948 |
949 | # Test 1: Full tree at maximum depth (default)
950 | result = get_object_subtree_with_fields(file_data, 'main-container')
951 | assert 'error' not in result
952 | assert result['tree']['id'] == 'main-container'
953 | assert result['tree']['name'] == 'Main Container'
954 | assert result['tree']['type'] == 'frame'
955 |
956 | # Verify first level children exist (header and content sections)
957 | children_names = [child['name'] for child in result['tree']['children']]
958 | assert 'Header Section' in children_names
959 | assert 'Content Section' in children_names
960 |
961 | # Verify second level children exist (deep nesting)
962 | header_section = next(child for child in result['tree']['children'] if child['name'] == 'Header Section')
963 | logo_in_header = next((child for child in header_section['children'] if child['name'] == 'Logo'), None)
964 | assert logo_in_header is not None
965 |
966 | nav_menu = next((child for child in header_section['children'] if child['name'] == 'Navigation Menu'), None)
967 | assert nav_menu is not None
968 |
969 | # Check if level 4 elements (menu items) exist
970 | menu_items = [child for child in nav_menu['children']]
971 | assert len(menu_items) == 3
972 | menu_item_names = [item['name'] for item in menu_items]
973 | assert 'Home' in menu_item_names
974 | assert 'Products' in menu_item_names
975 | assert 'About' in menu_item_names
976 |
977 | # Test 2: Depth = 1 (main container and its immediate children only)
978 | result = get_object_subtree_with_fields(file_data, 'main-container', depth=1)
979 | assert 'error' not in result
980 | assert result['tree']['id'] == 'main-container'
981 | assert 'children' in result['tree']
982 |
983 | # Should have header and content sections but no deeper elements
984 | children_names = [child['name'] for child in result['tree']['children']]
985 | assert 'Header Section' in children_names
986 | assert 'Content Section' in children_names
987 |
988 | # Verify no grandchildren are included
989 | header_section = next(child for child in result['tree']['children'] if child['name'] == 'Header Section')
990 | assert 'children' not in header_section
991 |
992 | # Test 3: Depth = 2 (main container, its children, and grandchildren)
993 | result = get_object_subtree_with_fields(file_data, 'main-container', depth=2)
994 | assert 'error' not in result
995 |
996 | # Should have header and content sections
997 | header_section = next(child for child in result['tree']['children'] if child['name'] == 'Header Section')
998 | content_section = next(child for child in result['tree']['children'] if child['name'] == 'Content Section')
999 |
1000 | # Header section should have logo and nav menu but no menu items
1001 | assert 'children' in header_section
1002 | nav_menu = next((child for child in header_section['children'] if child['name'] == 'Navigation Menu'), None)
1003 | assert nav_menu is not None
1004 | assert 'children' not in nav_menu
1005 |
1006 | # Test 4: Field filtering with selective depth
1007 | result = get_object_subtree_with_fields(
1008 | file_data,
1009 | 'main-container',
1010 | include_fields=['name', 'type'],
1011 | depth=2
1012 | )
1013 | assert 'error' not in result
1014 |
1015 | # Main container should have only specified fields plus id
1016 | assert set(result['tree'].keys()) == {'id', 'name', 'type', 'children'}
1017 | assert 'width' not in result['tree']
1018 | assert 'height' not in result['tree']
1019 |
1020 | # Children should also have only the specified fields
1021 | header_section = next(child for child in result['tree']['children'] if child['name'] == 'Header Section')
1022 | assert set(header_section.keys()) == {'id', 'name', 'type', 'children'}
1023 |
1024 | # Test 5: Testing component references
1025 | result = get_object_subtree_with_fields(file_data, 'cards-container')
1026 | assert 'error' not in result
1027 |
1028 | # Find the first card
1029 | card = next(child for child in result['tree']['children'] if child['name'] == 'Card 1')
1030 | assert 'componentId' in card
1031 | assert card['componentId'] == 'comp2' # References the Card component
1032 |
1033 | # Test 6: Test layout properties in objects
1034 | result = get_object_subtree_with_fields(file_data, 'main-container', include_fields=['layout', 'layoutFlexDir', 'layoutAlignItems', 'layoutJustifyContent'])
1035 | assert 'error' not in result
1036 | assert result['tree']['layout'] == 'flex'
1037 | assert result['tree']['layoutFlexDir'] == 'column'
1038 | assert result['tree']['layoutAlignItems'] == 'stretch'
1039 | assert result['tree']['layoutJustifyContent'] == 'start'
1040 |
1041 | # Test 7: Test text content structure
1042 | result = get_object_subtree_with_fields(file_data, 'hero-title', include_fields=['content'])
1043 | assert 'error' not in result
1044 | assert result['tree']['content']['type'] == 'root'
1045 | assert len(result['tree']['content']['children']) == 1
1046 | assert result['tree']['content']['children'][0]['type'] == 'paragraph'
1047 | assert result['tree']['content']['children'][0]['children'][0]['text'] == 'Welcome to our Platform'
1048 |
1049 | # Test 8: Test applied tokens
1050 | result = get_object_subtree_with_fields(file_data, 'hero-title', include_fields=['appliedTokens'])
1051 | assert 'error' not in result
1052 | assert 'appliedTokens' in result['tree']
1053 | assert result['tree']['appliedTokens']['typography'] == 'typo1'
1054 |
1055 | def test_get_object_subtree_with_fields_root_frame():
1056 | """Test getting a filtered subtree starting from the root frame."""
1057 | # Use same complex nested structure from the previous test
1058 | file_data = {
1059 | 'data': {
1060 | 'pagesIndex': {
1061 | 'page1': {
1062 | 'name': 'Complex Page',
1063 | 'objects': {
1064 | # Root frame (level 0)
1065 | '00000000-0000-0000-0000-000000000000': {
1066 | 'type': 'frame',
1067 | 'name': 'Root Frame',
1068 | 'width': 1920,
1069 | 'height': 1080
1070 | },
1071 | # Main container (level 1)
1072 | 'main-container': {
1073 | 'type': 'frame',
1074 | 'name': 'Main Container',
1075 | 'parentId': '00000000-0000-0000-0000-000000000000'
1076 | }
1077 | }
1078 | }
1079 | }
1080 | }
1081 | }
1082 |
1083 | # Test getting the root frame
1084 | result = get_object_subtree_with_fields(file_data, '00000000-0000-0000-0000-000000000000')
1085 | assert 'error' not in result
1086 | assert result['tree']['id'] == '00000000-0000-0000-0000-000000000000'
1087 | assert result['tree']['type'] == 'frame'
1088 | assert 'children' in result['tree']
1089 | assert len(result['tree']['children']) == 1
1090 | assert result['tree']['children'][0]['name'] == 'Main Container'
1091 |
1092 |
1093 | def test_get_object_subtree_with_fields_circular_reference():
1094 | """Test handling of circular references in object tree."""
1095 | file_data = {
1096 | 'data': {
1097 | 'pagesIndex': {
1098 | 'page1': {
1099 | 'name': 'Test Page',
1100 | 'objects': {
1101 | # Object A references B as parent
1102 | 'object-a': {
1103 | 'type': 'frame',
1104 | 'name': 'Object A',
1105 | 'parentId': 'object-b'
1106 | },
1107 | # Object B references A as parent (circular)
1108 | 'object-b': {
1109 | 'type': 'frame',
1110 | 'name': 'Object B',
1111 | 'parentId': 'object-a'
1112 | },
1113 | # Object C references itself as parent
1114 | 'object-c': {
1115 | 'type': 'frame',
1116 | 'name': 'Object C',
1117 | 'parentId': 'object-c'
1118 | }
1119 | }
1120 | }
1121 | }
1122 | }
1123 | }
1124 |
1125 | # Test getting object A - should handle circular reference with B
1126 | result = get_object_subtree_with_fields(file_data, 'object-a')
1127 | assert 'error' not in result
1128 | assert result['tree']['id'] == 'object-a'
1129 | assert 'children' in result['tree']
1130 | # Check that object-b appears as a child
1131 | assert len(result['tree']['children']) == 1
1132 | assert result['tree']['children'][0]['id'] == 'object-b'
1133 | # The circular reference appears when object-a appears again as a child of object-b
1134 | assert 'children' in result['tree']['children'][0]
1135 | assert len(result['tree']['children'][0]['children']) == 1
1136 | assert result['tree']['children'][0]['children'][0]['id'] == 'object-a'
1137 | assert result['tree']['children'][0]['children'][0]['_circular_reference'] == True
1138 |
1139 | # Test getting object C - should handle self-reference
1140 | result = get_object_subtree_with_fields(file_data, 'object-c')
1141 | assert 'error' not in result
1142 | assert result['tree']['id'] == 'object-c'
1143 | assert 'children' in result['tree']
1144 | # Check that object-c appears as its own child with circular reference marker
1145 | assert len(result['tree']['children']) == 1
1146 | assert result['tree']['children'][0]['id'] == 'object-c'
1147 | assert result['tree']['children'][0]['_circular_reference'] == True
```