This is page 2 of 2. Use http://codebase.md/montevive/penpot-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .editorconfig
├── .flake8
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ └── feature_request.md
│ ├── PULL_REQUEST_TEMPLATE.md
│ ├── SETUP_CICD.md
│ └── workflows
│ ├── ci.yml
│ ├── code-quality.yml
│ ├── publish.yml
│ └── version-bump.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .vscode
│ └── launch.json
├── CHANGELOG.md
├── CLAUDE_INTEGRATION.md
├── CLAUDE.md
├── CONTRIBUTING.md
├── env.example
├── fix-lint-deps.sh
├── images
│ └── penpot-mcp.png
├── lint.py
├── LINTING.md
├── Makefile
├── penpot_mcp
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ └── penpot_api.py
│ ├── resources
│ │ ├── penpot-schema.json
│ │ └── penpot-tree-schema.json
│ ├── server
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mcp_server.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── cli
│ │ │ ├── __init__.py
│ │ │ ├── tree_cmd.py
│ │ │ └── validate_cmd.py
│ │ └── penpot_tree.py
│ └── utils
│ ├── __init__.py
│ ├── cache.py
│ ├── config.py
│ └── http_server.py
├── pyproject.toml
├── README.md
├── SECURITY.md
├── test_credentials.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_cache.py
│ ├── test_config.py
│ ├── test_mcp_server.py
│ └── test_penpot_tree.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/penpot_mcp/api/penpot_api.py:
--------------------------------------------------------------------------------
```python
import argparse
import json
import os
from typing import Any, Dict, List, Optional, Union
import requests
from dotenv import load_dotenv
class CloudFlareError(Exception):
"""Exception raised when CloudFlare protection blocks the request."""
def __init__(self, message: str, status_code: int = None, response_text: str = None):
super().__init__(message)
self.status_code = status_code
self.response_text = response_text
def __str__(self):
return f"CloudFlare Protection Error: {super().__str__()}"
class PenpotAPIError(Exception):
"""General exception for Penpot API errors."""
def __init__(self, message: str, status_code: int = None, response_text: str = None, is_cloudflare: bool = False):
super().__init__(message)
self.status_code = status_code
self.response_text = response_text
self.is_cloudflare = is_cloudflare
class PenpotAPI:
def __init__(
self,
base_url: str = None,
debug: bool = False,
email: Optional[str] = None,
password: Optional[str] = None):
# Load environment variables if not already loaded
load_dotenv()
# Use base_url from parameters if provided, otherwise from environment,
# fallback to default URL
self.base_url = base_url or os.getenv("PENPOT_API_URL", "https://design.penpot.app/api")
self.session = requests.Session()
self.access_token = None
self.debug = debug
self.email = email or os.getenv("PENPOT_USERNAME")
self.password = password or os.getenv("PENPOT_PASSWORD")
self.profile_id = None
# Set default headers - we'll use different headers at request time
# based on the required content type (JSON vs Transit+JSON)
self.session.headers.update({
"Accept": "application/json, application/transit+json",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
})
def _is_cloudflare_error(self, response: requests.Response) -> bool:
"""Check if the response indicates a CloudFlare error."""
# Check for CloudFlare-specific indicators
cloudflare_indicators = [
'cloudflare',
'cf-ray',
'attention required',
'checking your browser',
'challenge',
'ddos protection',
'security check',
'cf-browser-verification',
'cf-challenge-running',
'please wait while we are checking your browser',
'enable cookies and reload the page',
'this process is automatic'
]
# Check response headers for CloudFlare
server_header = response.headers.get('server', '').lower()
cf_ray = response.headers.get('cf-ray')
if 'cloudflare' in server_header or cf_ray:
return True
# Check response content for CloudFlare indicators
try:
response_text = response.text.lower()
for indicator in cloudflare_indicators:
if indicator in response_text:
return True
except:
# If we can't read the response text, don't assume it's CloudFlare
pass
# Check for specific status codes that might indicate CloudFlare blocks
if response.status_code in [403, 429, 503]:
# Additional check for CloudFlare-specific error pages
try:
response_text = response.text.lower()
if any(['cloudflare' in response_text, 'cf-ray' in response_text, 'attention required' in response_text]):
return True
except:
pass
return False
def _create_cloudflare_error_message(self, response: requests.Response) -> str:
"""Create a user-friendly CloudFlare error message."""
base_message = (
"CloudFlare protection has blocked this request. This is common on penpot.app. "
"To resolve this issue:\\n\\n"
"1. Open your web browser and navigate to https://design.penpot.app\\n"
"2. Log in to your Penpot account\\n"
"3. Complete any CloudFlare human verification challenges if prompted\\n"
"4. Once verified, try your request again\\n\\n"
"The verification typically lasts for a period of time, after which you may need to repeat the process."
)
if response.status_code:
return f"{base_message}\\n\\nHTTP Status: {response.status_code}"
return base_message
def set_access_token(self, token: str):
"""Set the auth token for authentication."""
self.access_token = token
# For cookie-based auth, set the auth-token cookie
self.session.cookies.set("auth-token", token)
# Also set Authorization header for APIs that use it
self.session.headers.update({
"Authorization": f"Token {token}"
})
def login_with_password(
self,
email: Optional[str] = None,
password: Optional[str] = None) -> str:
"""
Login with email and password to get an auth token.
This method uses the same cookie-based auth approach as the export methods.
Args:
email: Email for Penpot account (if None, will use stored email or PENPOT_USERNAME env var)
password: Password for Penpot account (if None, will use stored password or PENPOT_PASSWORD env var)
Returns:
Auth token for API calls
"""
# Use the export authentication which also extracts profile ID
token = self.login_for_export(email, password)
self.set_access_token(token)
# Profile ID is now extracted during login_for_export, no need to call get_profile
if self.debug and self.profile_id:
print(f"\nProfile ID available: {self.profile_id}")
return token
def get_profile(self) -> Dict[str, Any]:
"""
Get profile information for the current authenticated user.
Returns:
Dictionary containing profile information, including the profile ID
"""
url = f"{self.base_url}/rpc/command/get-profile"
payload = {} # No parameters needed
response = self._make_authenticated_request('post', url, json=payload, use_transit=False)
# Parse and normalize the response
data = response.json()
normalized_data = self._normalize_transit_response(data)
if self.debug:
print("\nProfile data retrieved:")
print(json.dumps(normalized_data, indent=2)[:200] + "...")
# Store profile ID for later use
if 'id' in normalized_data:
self.profile_id = normalized_data['id']
if self.debug:
print(f"\nStored profile ID: {self.profile_id}")
return normalized_data
def login_for_export(self, email: Optional[str] = None, password: Optional[str] = None) -> str:
"""
Login with email and password to get an auth token for export operations.
This is required for export operations which use a different authentication
mechanism than the standard API access token.
Args:
email: Email for Penpot account (if None, will use stored email or PENPOT_USERNAME env var)
password: Password for Penpot account (if None, will use stored password or PENPOT_PASSWORD env var)
Returns:
Auth token extracted from cookies
"""
# Use parameters if provided, else use instance variables, else check environment variables
email = email or self.email or os.getenv("PENPOT_USERNAME")
password = password or self.password or os.getenv("PENPOT_PASSWORD")
if not email or not password:
raise ValueError(
"Email and password are required for export authentication. "
"Please provide them as parameters or set PENPOT_USERNAME and "
"PENPOT_PASSWORD environment variables."
)
url = f"{self.base_url}/rpc/command/login-with-password"
# Use Transit+JSON format
payload = {
"~:email": email,
"~:password": password
}
if self.debug:
print("\nLogin request payload (Transit+JSON format):")
print(json.dumps(payload, indent=2).replace(password, "********"))
# Create a new session just for this request
login_session = requests.Session()
# Set headers
headers = {
"Content-Type": "application/transit+json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
response = login_session.post(url, json=payload, headers=headers)
if self.debug and response.status_code != 200:
print(f"\nError response: {response.status_code}")
print(f"Response text: {response.text}")
response.raise_for_status()
# Extract profile ID from response
try:
# The response is in Transit+JSON array format
data = response.json()
if isinstance(data, list):
# Convert Transit array to dict
transit_dict = {}
i = 1 # Skip the "^ " marker
while i < len(data) - 1:
key = data[i]
value = data[i + 1]
transit_dict[key] = value
i += 2
# Extract profile ID
if "~:id" in transit_dict:
profile_id = transit_dict["~:id"]
# Remove the ~u prefix for UUID
if isinstance(profile_id, str) and profile_id.startswith("~u"):
profile_id = profile_id[2:]
self.profile_id = profile_id
if self.debug:
print(f"\nExtracted profile ID from login response: {profile_id}")
except Exception as e:
if self.debug:
print(f"\nCouldn't extract profile ID from response: {e}")
# Also try to extract profile ID from auth-data cookie
if not self.profile_id:
for cookie in login_session.cookies:
if cookie.name == "auth-data":
# Cookie value is like: "profile-id=7ae66c33-6ede-81e2-8006-6a1b4dce3d2b"
if "profile-id=" in cookie.value:
profile_id = cookie.value.split("profile-id=")[1].split(";")[0].strip('"')
self.profile_id = profile_id
if self.debug:
print(f"\nExtracted profile ID from auth-data cookie: {profile_id}")
break
# Extract auth token from cookies
if 'Set-Cookie' in response.headers:
if self.debug:
print("\nSet-Cookie header found")
for cookie in login_session.cookies:
if cookie.name == "auth-token":
if self.debug:
print(f"\nAuth token extracted from cookies: {cookie.value[:10]}...")
return cookie.value
raise ValueError("Auth token not found in response cookies")
else:
# Try to extract from response JSON if available
try:
data = response.json()
if 'auth-token' in data:
return data['auth-token']
except Exception:
pass
# If we reached here, we couldn't find the token
raise ValueError("Auth token not found in response cookies or JSON body")
def _make_authenticated_request(self, method: str, url: str, retry_auth: bool = True, **kwargs) -> requests.Response:
"""
Make an authenticated request, handling re-auth if needed.
This internal method handles lazy authentication when a request
fails due to authentication issues, using the same cookie-based
approach as the export methods.
Args:
method: HTTP method (post, get, etc.)
url: URL to make the request to
**kwargs: Additional arguments to pass to requests
Returns:
The response object
"""
# If we don't have a token yet but have credentials, login first
if not self.access_token and self.email and self.password:
if self.debug:
print("\nNo access token set, logging in with credentials...")
self.login_with_password()
# Set up headers
headers = kwargs.get('headers', {})
if 'headers' in kwargs:
del kwargs['headers']
# Use Transit+JSON format for API calls (required by Penpot)
use_transit = kwargs.pop('use_transit', True)
if use_transit:
headers['Content-Type'] = 'application/transit+json'
headers['Accept'] = 'application/transit+json'
# Convert payload to Transit+JSON format if present
if 'json' in kwargs and kwargs['json']:
payload = kwargs['json']
# Only transform if not already in Transit format
if not any(isinstance(k, str) and k.startswith('~:') for k in payload.keys()):
transit_payload = {}
# Add cmd if not present
if 'cmd' not in payload and '~:cmd' not in payload:
# Extract command from URL
cmd = url.split('/')[-1]
transit_payload['~:cmd'] = f"~:{cmd}"
# Convert standard JSON to Transit+JSON format
for key, value in payload.items():
# Skip command if already added
if key == 'cmd':
continue
transit_key = f"~:{key}" if not key.startswith('~:') else key
# Handle special UUID conversion for IDs
if isinstance(value, str) and ('-' in value) and len(value) > 30:
transit_value = f"~u{value}"
else:
transit_value = value
transit_payload[transit_key] = transit_value
if self.debug:
print("\nConverted payload to Transit+JSON format:")
print(f"Original: {payload}")
print(f"Transit: {transit_payload}")
kwargs['json'] = transit_payload
else:
headers['Content-Type'] = 'application/json'
headers['Accept'] = 'application/json'
# Ensure the Authorization header is set if we have a token
if self.access_token:
headers['Authorization'] = f"Token {self.access_token}"
# Combine with session headers
combined_headers = {**self.session.headers, **headers}
# Make the request
try:
response = getattr(self.session, method)(url, headers=combined_headers, **kwargs)
if self.debug:
print(f"\nRequest to: {url}")
print(f"Method: {method}")
print(f"Headers: {combined_headers}")
if 'json' in kwargs:
print(f"Payload: {json.dumps(kwargs['json'], indent=2)}")
print(f"Response status: {response.status_code}")
response.raise_for_status()
return response
except requests.HTTPError as e:
# Check for CloudFlare errors first
if self._is_cloudflare_error(e.response):
cloudflare_message = self._create_cloudflare_error_message(e.response)
raise CloudFlareError(cloudflare_message, e.response.status_code, e.response.text)
# Handle authentication errors
if e.response.status_code in (401, 403) and self.email and self.password and retry_auth:
# Special case: don't retry auth for get-profile to avoid infinite loops
if url.endswith('/get-profile'):
raise
if self.debug:
print("\nAuthentication failed. Trying to re-login...")
# Re-login and update token
self.login_with_password()
# Update headers with new token
headers['Authorization'] = f"Token {self.access_token}"
combined_headers = {**self.session.headers, **headers}
# Retry the request with the new token (but don't retry auth again)
response = getattr(self.session, method)(url, headers=combined_headers, **kwargs)
response.raise_for_status()
return response
else:
# Re-raise other errors
raise
except requests.RequestException as e:
# Handle other request exceptions (connection errors, timeouts, etc.)
# Check if we have a response to analyze
if hasattr(e, 'response') and e.response is not None:
if self._is_cloudflare_error(e.response):
cloudflare_message = self._create_cloudflare_error_message(e.response)
raise CloudFlareError(cloudflare_message, e.response.status_code, e.response.text)
# Re-raise if not a CloudFlare error
raise
def _normalize_transit_response(self, data: Union[Dict, List, Any]) -> Union[Dict, List, Any]:
"""
Normalize a Transit+JSON response to a more usable format.
This recursively processes the response data, handling special Transit types
like UUIDs, keywords, and nested structures.
Args:
data: The data to normalize, can be a dict, list, or other value
Returns:
Normalized data
"""
if isinstance(data, dict):
# Normalize dictionary
result = {}
for key, value in data.items():
# Convert transit keywords in keys (~:key -> key)
norm_key = key.replace(
'~:', '') if isinstance(
key, str) and key.startswith('~:') else key
# Recursively normalize values
result[norm_key] = self._normalize_transit_response(value)
return result
elif isinstance(data, list):
# Normalize list items
return [self._normalize_transit_response(item) for item in data]
elif isinstance(data, str) and data.startswith('~u'):
# Convert Transit UUIDs (~u123-456 -> 123-456)
return data[2:]
else:
# Return other types as-is
return data
def list_projects(self) -> Dict[str, Any]:
"""
List all available projects for the authenticated user.
Returns:
Dictionary containing project information
"""
url = f"{self.base_url}/rpc/command/get-all-projects"
payload = {} # No parameters required
response = self._make_authenticated_request('post', url, json=payload, use_transit=False)
if self.debug:
content_type = response.headers.get('Content-Type', '')
print(f"\nResponse content type: {content_type}")
print(f"Response preview: {response.text[:100]}...")
# Parse JSON
data = response.json()
if self.debug:
print("\nData preview:")
print(json.dumps(data, indent=2)[:200] + "...")
return data
def get_project(self, project_id: str) -> Optional[Dict[str, Any]]:
"""
Get details for a specific project.
Args:
project_id: The ID of the project to retrieve
Returns:
Dictionary containing project information
"""
# First get all projects
projects = self.list_projects()
# Find the specific project by ID
for project in projects:
if project.get('id') == project_id:
return project
return None
def get_project_files(self, project_id: str) -> List[Dict[str, Any]]:
"""
Get all files for a specific project.
Args:
project_id: The ID of the project
Returns:
List of file information dictionaries
"""
url = f"{self.base_url}/rpc/command/get-project-files"
payload = {
"project-id": project_id
}
response = self._make_authenticated_request('post', url, json=payload, use_transit=False)
# Parse JSON
files = response.json()
return files
def get_file(self, file_id: str, save_data: bool = False,
save_raw_response: bool = False) -> Dict[str, Any]:
"""
Get details for a specific file.
Args:
file_id: The ID of the file to retrieve
features: List of features to include in the response
project_id: Optional project ID if known
save_data: Whether to save the data to a file
save_raw_response: Whether to save the raw response
Returns:
Dictionary containing file information
"""
url = f"{self.base_url}/rpc/command/get-file"
payload = {
"id": file_id,
}
response = self._make_authenticated_request('post', url, json=payload, use_transit=False)
# Save raw response if requested
if save_raw_response:
raw_filename = f"{file_id}_raw_response.json"
with open(raw_filename, 'w') as f:
f.write(response.text)
if self.debug:
print(f"\nSaved raw response to {raw_filename}")
# Parse JSON
data = response.json()
# Save normalized data if requested
if save_data:
filename = f"{file_id}.json"
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
if self.debug:
print(f"\nSaved file data to {filename}")
return data
def create_export(self, file_id: str, page_id: str, object_id: str,
export_type: str = "png", scale: int = 1,
email: Optional[str] = None, password: Optional[str] = None,
profile_id: Optional[str] = None):
"""
Create an export job for a Penpot object.
Args:
file_id: The file ID
page_id: The page ID
object_id: The object ID to export
export_type: Type of export (png, svg, pdf)
scale: Scale factor for the export
name: Name for the export
suffix: Suffix to add to the export name
email: Email for authentication (if different from instance)
password: Password for authentication (if different from instance)
profile_id: Optional profile ID (if not provided, will be fetched automatically)
Returns:
Export resource ID
"""
# This uses the cookie auth approach, which requires login
token = self.login_for_export(email, password)
# If profile_id is not provided, get it from instance variable
if not profile_id:
profile_id = self.profile_id
if not profile_id:
raise ValueError("Profile ID not available. It should be automatically extracted during login.")
# Build the URL for export creation
url = f"{self.base_url}/export"
# Set up the data for the export
payload = {
"~:wait": True,
"~:exports": [
{"~:type": f"~:{export_type}",
"~:suffix": "",
"~:scale": scale,
"~:page-id": f"~u{page_id}",
"~:file-id": f"~u{file_id}",
"~:name": "",
"~:object-id": f"~u{object_id}"}
],
"~:profile-id": f"~u{profile_id}",
"~:cmd": "~:export-shapes"
}
if self.debug:
print("\nCreating export with parameters:")
print(json.dumps(payload, indent=2))
# Create a session with the auth token
export_session = requests.Session()
export_session.cookies.set("auth-token", token)
headers = {
"Content-Type": "application/transit+json",
"Accept": "application/transit+json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
# Make the request
response = export_session.post(url, json=payload, headers=headers)
if self.debug and response.status_code != 200:
print(f"\nError response: {response.status_code}")
print(f"Response text: {response.text}")
response.raise_for_status()
# Parse the response
data = response.json()
if self.debug:
print("\nExport created successfully")
print(f"Response: {json.dumps(data, indent=2)}")
# Extract and return the resource ID
resource_id = data.get("~:id")
if not resource_id:
raise ValueError("Resource ID not found in response")
return resource_id
def get_export_resource(self,
resource_id: str,
save_to_file: Optional[str] = None,
email: Optional[str] = None,
password: Optional[str] = None) -> Union[bytes,
str]:
"""
Download an export resource by ID.
Args:
resource_id: The resource ID from create_export
save_to_file: Path to save the file (if None, returns the content)
email: Email for authentication (if different from instance)
password: Password for authentication (if different from instance)
Returns:
Either the file content as bytes, or the path to the saved file
"""
# This uses the cookie auth approach, which requires login
token = self.login_for_export(email, password)
# Build the URL for the resource
url = f"{self.base_url}/export"
payload = {
"~:wait": False,
"~:cmd": "~:get-resource",
"~:id": resource_id
}
headers = {
"Content-Type": "application/transit+json",
"Accept": "*/*",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
if self.debug:
print(f"\nFetching export resource: {url}")
# Create a session with the auth token
export_session = requests.Session()
export_session.cookies.set("auth-token", token)
# Make the request
response = export_session.post(url, json=payload, headers=headers)
if self.debug and response.status_code != 200:
print(f"\nError response: {response.status_code}")
print(f"Response headers: {response.headers}")
response.raise_for_status()
# Get the content type
content_type = response.headers.get('Content-Type', '')
if self.debug:
print(f"\nResource fetched successfully")
print(f"Content-Type: {content_type}")
print(f"Content length: {len(response.content)} bytes")
# Determine filename if saving to file
if save_to_file:
if os.path.isdir(save_to_file):
# If save_to_file is a directory, we need to figure out the filename
filename = None
# Try to get filename from Content-Disposition header
content_disp = response.headers.get('Content-Disposition', '')
if 'filename=' in content_disp:
filename = content_disp.split('filename=')[1].strip('"\'')
# If we couldn't get a filename, use the resource_id with an extension
if not filename:
ext = content_type.split('/')[-1].split(';')[0]
if ext in ('jpeg', 'png', 'pdf', 'svg+xml'):
if ext == 'svg+xml':
ext = 'svg'
filename = f"{resource_id}.{ext}"
else:
filename = f"{resource_id}"
save_path = os.path.join(save_to_file, filename)
else:
# Use the provided path directly
save_path = save_to_file
# Ensure the directory exists
os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True)
# Save the content to file
with open(save_path, 'wb') as f:
f.write(response.content)
if self.debug:
print(f"\nSaved resource to {save_path}")
return save_path
else:
# Return the content
return response.content
def export_and_download(self, file_id: str, page_id: str, object_id: str,
save_to_file: Optional[str] = None, export_type: str = "png",
scale: int = 1, name: str = "Board", suffix: str = "",
email: Optional[str] = None, password: Optional[str] = None,
profile_id: Optional[str] = None) -> Union[bytes, str]:
"""
Create and download an export in one step.
This is a convenience method that combines create_export and get_export_resource.
Args:
file_id: The file ID
page_id: The page ID
object_id: The object ID to export
save_to_file: Path to save the file (if None, returns the content)
export_type: Type of export (png, svg, pdf)
scale: Scale factor for the export
name: Name for the export
suffix: Suffix to add to the export name
email: Email for authentication (if different from instance)
password: Password for authentication (if different from instance)
profile_id: Optional profile ID (if not provided, will be fetched automatically)
Returns:
Either the file content as bytes, or the path to the saved file
"""
# Create the export
resource_id = self.create_export(
file_id=file_id,
page_id=page_id,
object_id=object_id,
export_type=export_type,
scale=scale,
email=email,
password=password,
profile_id=profile_id
)
# Download the resource
return self.get_export_resource(
resource_id=resource_id,
save_to_file=save_to_file,
email=email,
password=password
)
def extract_components(self, file_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract components from file data.
This processes a file's data to extract and normalize component information.
Args:
file_data: The file data from get_file
Returns:
Dictionary containing components information
"""
components = {}
components_index = file_data.get('data', {}).get('componentsIndex', {})
for component_id, component_data in components_index.items():
# Extract basic component info
component = {
'id': component_id,
'name': component_data.get('name', 'Unnamed'),
'path': component_data.get('path', []),
'shape': component_data.get('shape', ''),
'fileId': component_data.get('fileId', file_data.get('id')),
'created': component_data.get('created'),
'modified': component_data.get('modified')
}
# Add the component to our collection
components[component_id] = component
return {'components': components}
def analyze_file_structure(self, file_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyze file structure and return summary information.
Args:
file_data: The file data from get_file
Returns:
Dictionary containing analysis information
"""
data = file_data.get('data', {})
# Count pages
pages = data.get('pagesIndex', {})
page_count = len(pages)
# Count objects by type
object_types = {}
total_objects = 0
for page_id, page_data in pages.items():
objects = page_data.get('objects', {})
total_objects += len(objects)
for obj_id, obj_data in objects.items():
obj_type = obj_data.get('type', 'unknown')
object_types[obj_type] = object_types.get(obj_type, 0) + 1
# Count components
components = data.get('componentsIndex', {})
component_count = len(components)
# Count colors, typographies, etc.
colors = data.get('colorsIndex', {})
color_count = len(colors)
typographies = data.get('typographiesIndex', {})
typography_count = len(typographies)
return {
'pageCount': page_count,
'objectCount': total_objects,
'objectTypes': object_types,
'componentCount': component_count,
'colorCount': color_count,
'typographyCount': typography_count,
'fileName': file_data.get('name', 'Unknown'),
'fileId': file_data.get('id')
}
def main():
# Set up argument parser
parser = argparse.ArgumentParser(description='Penpot API Tool')
parser.add_argument('--debug', action='store_true', help='Enable debug output')
# Create subparsers for different commands
subparsers = parser.add_subparsers(dest='command', help='Command to run')
# List projects command
list_parser = subparsers.add_parser('list-projects', help='List all projects')
# Get project command
project_parser = subparsers.add_parser('get-project', help='Get project details')
project_parser.add_argument('--id', required=True, help='Project ID')
# List files command
files_parser = subparsers.add_parser('list-files', help='List files in a project')
files_parser.add_argument('--project-id', required=True, help='Project ID')
# Get file command
file_parser = subparsers.add_parser('get-file', help='Get file details')
file_parser.add_argument('--file-id', required=True, help='File ID')
file_parser.add_argument('--save', action='store_true', help='Save file data to JSON')
# Export command
export_parser = subparsers.add_parser('export', help='Export an object')
export_parser.add_argument(
'--profile-id',
required=False,
help='Profile ID (optional, will be fetched automatically if not provided)')
export_parser.add_argument('--file-id', required=True, help='File ID')
export_parser.add_argument('--page-id', required=True, help='Page ID')
export_parser.add_argument('--object-id', required=True, help='Object ID')
export_parser.add_argument(
'--type',
default='png',
choices=[
'png',
'svg',
'pdf'],
help='Export type')
export_parser.add_argument('--scale', type=int, default=1, help='Scale factor')
export_parser.add_argument('--output', required=True, help='Output file path')
# Parse arguments
args = parser.parse_args()
# Create API client
api = PenpotAPI(debug=args.debug)
# Handle different commands
if args.command == 'list-projects':
projects = api.list_projects()
print(f"Found {len(projects)} projects:")
for project in projects:
print(f"- {project.get('name')} - {project.get('teamName')} (ID: {project.get('id')})")
elif args.command == 'get-project':
project = api.get_project(args.id)
if project:
print(f"Project: {project.get('name')}")
print(json.dumps(project, indent=2))
else:
print(f"Project not found: {args.id}")
elif args.command == 'list-files':
files = api.get_project_files(args.project_id)
print(f"Found {len(files)} files:")
for file in files:
print(f"- {file.get('name')} (ID: {file.get('id')})")
elif args.command == 'get-file':
file_data = api.get_file(args.file_id, save_data=args.save)
print(f"File: {file_data.get('name')}")
if args.save:
print(f"Data saved to {args.file_id}.json")
else:
print("File metadata:")
print(json.dumps({k: v for k, v in file_data.items() if k != 'data'}, indent=2))
elif args.command == 'export':
output_path = api.export_and_download(
file_id=args.file_id,
page_id=args.page_id,
object_id=args.object_id,
export_type=args.type,
scale=args.scale,
save_to_file=args.output,
profile_id=args.profile_id
)
print(f"Exported to: {output_path}")
else:
parser.print_help()
if __name__ == '__main__':
main()
```
--------------------------------------------------------------------------------
/tests/test_mcp_server.py:
--------------------------------------------------------------------------------
```python
"""Tests for the MCP server module."""
import hashlib
import json
import os
from unittest.mock import MagicMock, mock_open, patch
import pytest
import yaml
from penpot_mcp.server.mcp_server import PenpotMCPServer, create_server
def test_server_initialization():
"""Test server initialization."""
server = PenpotMCPServer(name="Test Server", test_mode=True)
# Check that the server has the expected properties
assert server.mcp is not None
assert server.api is not None
assert hasattr(server, '_register_resources')
assert hasattr(server, '_register_tools')
assert hasattr(server, 'run')
def test_server_info_resource():
"""Test the server_info resource handler function directly."""
# Since we can't easily access the registered resource from FastMCP,
# we'll implement it here based on the implementation in mcp_server.py
def server_info():
from penpot_mcp.utils import config
return {
"status": "online",
"name": "Penpot MCP Server",
"description": "Model Context Provider for Penpot",
"api_url": config.PENPOT_API_URL
}
# Call the function
result = server_info()
# Check the result
assert isinstance(result, dict)
assert "status" in result
assert result["status"] == "online"
assert "name" in result
assert "description" in result
assert "api_url" in result
def test_list_projects_tool_handler(mock_penpot_api):
"""Test the list_projects tool handler directly."""
# Create a callable that matches what would be registered
def list_projects():
try:
projects = mock_penpot_api.list_projects()
return {"projects": projects}
except Exception as e:
return {"error": str(e)}
# Call the handler
result = list_projects()
# Check the result
assert isinstance(result, dict)
assert "projects" in result
assert len(result["projects"]) == 2
assert result["projects"][0]["id"] == "project1"
assert result["projects"][1]["id"] == "project2"
# Verify API was called
mock_penpot_api.list_projects.assert_called_once()
def test_get_project_files_tool_handler(mock_penpot_api):
"""Test the get_project_files tool handler directly."""
# Create a callable that matches what would be registered
def get_project_files(project_id):
try:
files = mock_penpot_api.get_project_files(project_id)
return {"files": files}
except Exception as e:
return {"error": str(e)}
# Call the handler with a project ID
result = get_project_files("project1")
# Check the result
assert isinstance(result, dict)
assert "files" in result
assert len(result["files"]) == 2
assert result["files"][0]["id"] == "file1"
assert result["files"][1]["id"] == "file2"
# Verify API was called with correct parameters
mock_penpot_api.get_project_files.assert_called_once_with("project1")
def test_get_file_tool_handler(mock_penpot_api):
"""Test the get_file tool handler directly."""
# Create a callable that matches what would be registered
def get_file(file_id):
try:
file_data = mock_penpot_api.get_file(file_id=file_id)
return file_data
except Exception as e:
return {"error": str(e)}
# Call the handler with a file ID
result = get_file("file1")
# Check the result
assert isinstance(result, dict)
assert result["id"] == "file1"
assert result["name"] == "Test File"
assert "data" in result
assert "pages" in result["data"]
# Verify API was called with correct parameters
mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
@patch('os.path.join')
@patch('builtins.open', new_callable=mock_open, read_data='{"test": "schema"}')
def test_penpot_schema_resource_handler(mock_file_open, mock_join):
"""Test the schema resource handler directly."""
# Setup the mock join to return a predictable path
mock_join.return_value = '/mock/path/to/penpot-schema.json'
# Create a callable that matches what would be registered
def penpot_schema():
from penpot_mcp.utils import config
schema_path = os.path.join(config.RESOURCES_PATH, 'penpot-schema.json')
try:
with open(schema_path, 'r') as f:
return json.load(f)
except Exception as e:
return {"error": f"Failed to load schema: {str(e)}"}
# Call the handler
result = penpot_schema()
# Check result matches our mocked file content
assert isinstance(result, dict)
assert "test" in result
assert result["test"] == "schema"
# Verify file was opened
mock_file_open.assert_called_once_with('/mock/path/to/penpot-schema.json', 'r')
@patch('os.path.join')
@patch('builtins.open', new_callable=mock_open, read_data='{"test": "tree-schema"}')
def test_penpot_tree_schema_resource_handler(mock_file_open, mock_join):
"""Test the tree schema resource handler directly."""
# Setup the mock join to return a predictable path
mock_join.return_value = '/mock/path/to/penpot-tree-schema.json'
# Create a callable that matches what would be registered
def penpot_tree_schema():
from penpot_mcp.utils import config
schema_path = os.path.join(config.RESOURCES_PATH, 'penpot-tree-schema.json')
try:
with open(schema_path, 'r') as f:
return json.load(f)
except Exception as e:
return {"error": f"Failed to load tree schema: {str(e)}"}
# Call the handler
result = penpot_tree_schema()
# Check result matches our mocked file content
assert isinstance(result, dict)
assert "test" in result
assert result["test"] == "tree-schema"
# Verify file was opened
mock_file_open.assert_called_once_with('/mock/path/to/penpot-tree-schema.json', 'r')
def test_create_server():
"""Test the create_server function."""
with patch('penpot_mcp.server.mcp_server.PenpotMCPServer') as mock_server_class:
mock_server_instance = MagicMock()
mock_server_class.return_value = mock_server_instance
# Test that create_server passes test_mode=True when in test environment
with patch('penpot_mcp.server.mcp_server.sys.modules', {'pytest': True}):
server = create_server()
mock_server_class.assert_called_once_with(test_mode=True)
assert server == mock_server_instance
@patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
def test_get_object_tree_basic(mock_get_subtree, mock_penpot_api):
"""Test the get_object_tree tool handler with basic parameters."""
# Setup the mock get_object_subtree_with_fields function
mock_get_subtree.return_value = {
"tree": {
"id": "obj1",
"type": "frame",
"name": "Test Object",
"children": []
},
"page_id": "page1"
}
# Setup the export_object mock for the included image
export_object_mock = MagicMock()
export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
# Create a callable that matches what would be registered
def get_object_tree(
file_id: str,
object_id: str,
fields: list, # Now required parameter
depth: int = -1,
format: str = "json"
):
try:
# Get the file data
file_data = mock_penpot_api.get_file(file_id=file_id)
# Use the mocked utility function
result = mock_get_subtree(
file_data,
object_id,
include_fields=fields,
depth=depth
)
# Check if an error occurred
if "error" in result:
return result
# Extract the tree and page_id
simplified_tree = result["tree"]
page_id = result["page_id"]
# Prepare the result dictionary
final_result = {"tree": simplified_tree}
# Always include image (no longer optional)
try:
image = export_object_mock(
file_id=file_id,
page_id=page_id,
object_id=object_id
)
# New format: URI-based instead of base64 data
image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
image_uri = f"render_component://{image_id}"
final_result["image"] = {
"uri": image_uri,
"format": image.format if hasattr(image, 'format') else "png"
}
except Exception as e:
final_result["image_error"] = str(e)
# Format the tree as YAML if requested
if format.lower() == "yaml":
try:
# Convert the entire result to YAML, including the image if present
yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
return {"yaml_result": yaml_result}
except Exception as e:
return {"format_error": f"Error formatting as YAML: {str(e)}"}
# Return the JSON format result
return final_result
except Exception as e:
return {"error": str(e)}
# Call the handler with basic parameters - fields is now required
result = get_object_tree(
file_id="file1",
object_id="obj1",
fields=["id", "type", "name"] # Required parameter
)
# Check the result
assert isinstance(result, dict)
assert "tree" in result
assert result["tree"]["id"] == "obj1"
assert result["tree"]["type"] == "frame"
assert result["tree"]["name"] == "Test Object"
# Check that image is always included
assert "image" in result
assert "uri" in result["image"]
assert result["image"]["uri"].startswith("render_component://")
assert result["image"]["format"] == "png"
# Verify mocks were called with correct parameters
mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
mock_get_subtree.assert_called_once_with(
mock_penpot_api.get_file.return_value,
"obj1",
include_fields=["id", "type", "name"],
depth=-1
)
@patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
def test_get_object_tree_with_fields_and_depth(mock_get_subtree, mock_penpot_api):
"""Test the get_object_tree tool handler with custom field list and depth."""
# Setup the mock get_object_subtree_with_fields function
mock_get_subtree.return_value = {
"tree": {
"id": "obj1",
"name": "Test Object", # Only id and name fields included
"children": []
},
"page_id": "page1"
}
# Setup the export_object mock for the included image
export_object_mock = MagicMock()
export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
# Create a callable that matches what would be registered
def get_object_tree(
file_id: str,
object_id: str,
fields: list, # Now required parameter
depth: int = -1,
format: str = "json"
):
try:
# Get the file data
file_data = mock_penpot_api.get_file(file_id=file_id)
# Use the mocked utility function
result = mock_get_subtree(
file_data,
object_id,
include_fields=fields,
depth=depth
)
# Extract the tree and page_id
simplified_tree = result["tree"]
page_id = result["page_id"]
# Prepare the result dictionary
final_result = {"tree": simplified_tree}
# Always include image (no longer optional)
try:
image = export_object_mock(
file_id=file_id,
page_id=page_id,
object_id=object_id
)
# New format: URI-based instead of base64 data
image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
image_uri = f"render_component://{image_id}"
final_result["image"] = {
"uri": image_uri,
"format": image.format if hasattr(image, 'format') else "png"
}
except Exception as e:
final_result["image_error"] = str(e)
# Format the tree as YAML if requested
if format.lower() == "yaml":
try:
# Convert the entire result to YAML, including the image if present
yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
return {"yaml_result": yaml_result}
except Exception as e:
return {"format_error": f"Error formatting as YAML: {str(e)}"}
# Return the JSON format result
return final_result
except Exception as e:
return {"error": str(e)}
# Call the handler with custom fields and depth
result = get_object_tree(
file_id="file1",
object_id="obj1",
fields=["id", "name"], # Updated parameter name
depth=2
)
# Check the result
assert isinstance(result, dict)
assert "tree" in result
assert result["tree"]["id"] == "obj1"
assert result["tree"]["name"] == "Test Object"
assert "type" not in result["tree"] # Type field should not be included
# Check that image is always included
assert "image" in result
assert "uri" in result["image"]
assert result["image"]["uri"].startswith("render_component://")
assert result["image"]["format"] == "png"
# Verify mocks were called with correct parameters
mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
mock_get_subtree.assert_called_once_with(
mock_penpot_api.get_file.return_value,
"obj1",
include_fields=["id", "name"],
depth=2
)
@patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
def test_get_object_tree_with_yaml_format(mock_get_subtree, mock_penpot_api):
"""Test the get_object_tree tool handler with YAML format output."""
# Setup the mock get_object_subtree_with_fields function
mock_get_subtree.return_value = {
"tree": {
"id": "obj1",
"type": "frame",
"name": "Test Object",
"children": [
{
"id": "child1",
"type": "text",
"name": "Child Text"
}
]
},
"page_id": "page1"
}
# Setup the export_object mock for the included image
export_object_mock = MagicMock()
export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
# Create a callable that matches what would be registered
def get_object_tree(
file_id: str,
object_id: str,
fields: list, # Now required parameter
depth: int = -1,
format: str = "json"
):
try:
# Get the file data
file_data = mock_penpot_api.get_file(file_id=file_id)
# Use the mocked utility function
result = mock_get_subtree(
file_data,
object_id,
include_fields=fields,
depth=depth
)
# Extract the tree and page_id
simplified_tree = result["tree"]
page_id = result["page_id"]
# Prepare the result dictionary
final_result = {"tree": simplified_tree}
# Always include image (no longer optional)
try:
image = export_object_mock(
file_id=file_id,
page_id=page_id,
object_id=object_id
)
# New format: URI-based instead of base64 data
image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
image_uri = f"render_component://{image_id}"
final_result["image"] = {
"uri": image_uri,
"format": image.format if hasattr(image, 'format') else "png"
}
except Exception as e:
final_result["image_error"] = str(e)
# Format the tree as YAML if requested
if format.lower() == "yaml":
try:
# Convert the entire result to YAML, including the image if present
yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
return {"yaml_result": yaml_result}
except Exception as e:
return {"format_error": f"Error formatting as YAML: {str(e)}"}
# Return the JSON format result
return final_result
except Exception as e:
return {"error": str(e)}
# Call the handler with YAML format - fields is now required
result = get_object_tree(
file_id="file1",
object_id="obj1",
fields=["id", "type", "name"], # Required parameter
format="yaml"
)
# Check the result
assert isinstance(result, dict)
assert "yaml_result" in result
assert "tree" not in result # Should not contain the tree field
# Verify the YAML content matches the expected tree structure
parsed_yaml = yaml.safe_load(result["yaml_result"])
assert "tree" in parsed_yaml
assert parsed_yaml["tree"]["id"] == "obj1"
assert parsed_yaml["tree"]["type"] == "frame"
assert parsed_yaml["tree"]["name"] == "Test Object"
assert isinstance(parsed_yaml["tree"]["children"], list)
assert parsed_yaml["tree"]["children"][0]["id"] == "child1"
# Check that image is included in YAML
assert "image" in parsed_yaml
assert "uri" in parsed_yaml["image"]
assert parsed_yaml["image"]["uri"].startswith("render_component://")
assert parsed_yaml["image"]["format"] == "png"
# Verify mocks were called with correct parameters
mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
mock_get_subtree.assert_called_once_with(
mock_penpot_api.get_file.return_value,
"obj1",
include_fields=["id", "type", "name"],
depth=-1
)
@patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
def test_get_object_tree_with_include_image(mock_get_subtree, mock_penpot_api):
"""Test the get_object_tree tool handler with image inclusion (always included now)."""
# Setup the mock get_object_subtree_with_fields function
mock_get_subtree.return_value = {
"tree": {
"id": "obj1",
"type": "frame",
"name": "Test Object",
"children": []
},
"page_id": "page1"
}
# Setup the export_object mock for the included image
export_object_mock = MagicMock()
export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
# Create a callable that matches what would be registered
def get_object_tree(
file_id: str,
object_id: str,
fields: list, # Now required parameter
depth: int = -1,
format: str = "json"
):
try:
# Get the file data
file_data = mock_penpot_api.get_file(file_id=file_id)
# Use the mocked utility function
result = mock_get_subtree(
file_data,
object_id,
include_fields=fields,
depth=depth
)
# Extract the tree and page_id
simplified_tree = result["tree"]
page_id = result["page_id"]
# Prepare the result dictionary
final_result = {"tree": simplified_tree}
# Always include image (no longer optional)
try:
image = export_object_mock(
file_id=file_id,
page_id=page_id,
object_id=object_id
)
# New format: URI-based instead of base64 data
image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
image_uri = f"render_component://{image_id}"
final_result["image"] = {
"uri": image_uri,
"format": image.format if hasattr(image, 'format') else "png"
}
except Exception as e:
final_result["image_error"] = str(e)
# Format the tree as YAML if requested
if format.lower() == "yaml":
try:
# Convert the entire result to YAML, including the image if present
yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
return {"yaml_result": yaml_result}
except Exception as e:
return {"format_error": f"Error formatting as YAML: {str(e)}"}
# Return the JSON format result
return final_result
except Exception as e:
return {"error": str(e)}
# Call the handler with required fields parameter
result = get_object_tree(
file_id="file1",
object_id="obj1",
fields=["id", "type", "name"] # Updated parameter name
)
# Check the result
assert isinstance(result, dict)
assert "tree" in result
assert result["tree"]["id"] == "obj1"
assert result["tree"]["type"] == "frame"
assert result["tree"]["name"] == "Test Object"
# Check that image is always included
assert "image" in result
assert "uri" in result["image"]
assert result["image"]["uri"].startswith("render_component://")
assert result["image"]["format"] == "png"
# Verify mocks were called with correct parameters
mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
mock_get_subtree.assert_called_once_with(
mock_penpot_api.get_file.return_value,
"obj1",
include_fields=["id", "type", "name"],
depth=-1
)
@patch('penpot_mcp.tools.penpot_tree.get_object_subtree_with_fields')
def test_get_object_tree_with_yaml_and_image(mock_get_subtree, mock_penpot_api):
"""Test the get_object_tree tool handler with YAML format and image inclusion (always included now)."""
# Setup the mock get_object_subtree_with_fields function
mock_get_subtree.return_value = {
"tree": {
"id": "obj1",
"type": "frame",
"name": "Test Object",
"children": []
},
"page_id": "page1"
}
# Setup the export_object mock for the included image
export_object_mock = MagicMock()
export_object_mock.return_value = MagicMock(data=b'test_image_data', format='png')
# Create a callable that matches what would be registered
def get_object_tree(
file_id: str,
object_id: str,
fields: list, # Now required parameter
depth: int = -1,
format: str = "json"
):
try:
# Get the file data
file_data = mock_penpot_api.get_file(file_id=file_id)
# Use the mocked utility function
result = mock_get_subtree(
file_data,
object_id,
include_fields=fields,
depth=depth
)
# Extract the tree and page_id
simplified_tree = result["tree"]
page_id = result["page_id"]
# Prepare the result dictionary
final_result = {"tree": simplified_tree}
# Always include image (no longer optional)
try:
image = export_object_mock(
file_id=file_id,
page_id=page_id,
object_id=object_id
)
# New format: URI-based instead of base64 data
image_id = hashlib.md5(f"{file_id}:{object_id}".encode()).hexdigest()
image_uri = f"render_component://{image_id}"
final_result["image"] = {
"uri": image_uri,
"format": image.format if hasattr(image, 'format') else "png"
}
except Exception as e:
final_result["image_error"] = str(e)
# Format the tree as YAML if requested
if format.lower() == "yaml":
try:
# Convert the entire result to YAML, including the image if present
yaml_result = yaml.dump(final_result, default_flow_style=False, sort_keys=False)
return {"yaml_result": yaml_result}
except Exception as e:
return {"format_error": f"Error formatting as YAML: {str(e)}"}
# Return the JSON format result
return final_result
except Exception as e:
return {"error": str(e)}
# Call the handler with required fields parameter and YAML format
result = get_object_tree(
file_id="file1",
object_id="obj1",
fields=["id", "type", "name"], # Updated parameter name
format="yaml"
)
# Check the result
assert isinstance(result, dict)
assert "yaml_result" in result
assert "tree" not in result # Should not contain the tree field directly
# Verify the YAML content contains both tree and image with URI
parsed_yaml = yaml.safe_load(result["yaml_result"])
assert "tree" in parsed_yaml
assert parsed_yaml["tree"]["id"] == "obj1"
assert parsed_yaml["tree"]["type"] == "frame"
assert parsed_yaml["tree"]["name"] == "Test Object"
assert "image" in parsed_yaml
assert "uri" in parsed_yaml["image"]
# Verify the URI format in the YAML
assert parsed_yaml["image"]["uri"].startswith("render_component://")
assert parsed_yaml["image"]["format"] == "png"
# Verify mocks were called with correct parameters
mock_penpot_api.get_file.assert_called_once_with(file_id="file1")
mock_get_subtree.assert_called_once_with(
mock_penpot_api.get_file.return_value,
"obj1",
include_fields=["id", "type", "name"],
depth=-1
)
def test_rendered_component_resource():
"""Test the rendered component resource handler."""
server = PenpotMCPServer(test_mode=True)
component_id = "test_component_id"
mock_image = MagicMock()
mock_image.format = "png"
# Mock the rendered_components dictionary
server.rendered_components = {component_id: mock_image}
# Get the resource handler function dynamically (this is tricky in real usage)
# For testing, we'll implement the function directly based on the code
def get_rendered_component(component_id: str):
if component_id in server.rendered_components:
return server.rendered_components[component_id]
raise Exception(f"Component with ID {component_id} not found")
# Test with a valid component ID
result = get_rendered_component(component_id)
assert result == mock_image
# Test with an invalid component ID
with pytest.raises(Exception) as excinfo:
get_rendered_component("invalid_id")
assert "not found" in str(excinfo.value)
def test_search_object_basic(mock_penpot_api):
"""Test the search_object tool basic functionality."""
# Mock the file contents with more detailed mock data
mock_file_data = {
"id": "file1",
"name": "Test File",
"pagesIndex": {
"page1": {
"id": "page1",
"name": "Page 1",
"objects": {
"obj1": {"id": "obj1", "name": "Button Component", "type": "frame"},
"obj2": {"id": "obj2", "name": "Header Text", "type": "text"},
"obj3": {"id": "obj3", "name": "Button Label", "type": "text"}
}
},
"page2": {
"id": "page2",
"name": "Page 2",
"objects": {
"obj4": {"id": "obj4", "name": "Footer Button", "type": "frame"},
"obj5": {"id": "obj5", "name": "Copyright Text", "type": "text"}
}
}
}
}
# Override the get_file return value for this test
mock_penpot_api.get_file.return_value = mock_file_data
# Create a function to simulate the search_object tool
def get_cached_file(file_id):
# Call the mock API to ensure it's tracked for assertions
return mock_penpot_api.get_file(file_id=file_id)
def search_object(file_id: str, query: str):
try:
# Get the file data using cache
file_data = get_cached_file(file_id)
if "error" in file_data:
return file_data
# Create case-insensitive pattern for matching
import re
pattern = re.compile(query, re.IGNORECASE)
# Store matching objects
matches = []
# Search through each page in the file
for page_id, page_data in file_data.get('pagesIndex', {}).items():
page_name = page_data.get('name', 'Unnamed')
# Search through objects in this page
for obj_id, obj_data in page_data.get('objects', {}).items():
obj_name = obj_data.get('name', '')
# Check if the name contains the query (case-insensitive)
if pattern.search(obj_name):
matches.append({
'id': obj_id,
'name': obj_name,
'page_id': page_id,
'page_name': page_name,
'object_type': obj_data.get('type', 'unknown')
})
return {'objects': matches}
except Exception as e:
return {"error": str(e)}
# Test searching for "button" (should find 3 objects)
result = search_object("file1", "button")
assert "objects" in result
assert len(result["objects"]) == 3
# Check the first match
button_matches = [obj for obj in result["objects"] if "Button Component" == obj["name"]]
assert len(button_matches) == 1
assert button_matches[0]["id"] == "obj1"
assert button_matches[0]["page_id"] == "page1"
assert button_matches[0]["page_name"] == "Page 1"
assert button_matches[0]["object_type"] == "frame"
# Check that it found objects across pages
footer_button_matches = [obj for obj in result["objects"] if "Footer Button" == obj["name"]]
assert len(footer_button_matches) == 1
assert footer_button_matches[0]["page_id"] == "page2"
# Verify API was called with correct parameters
mock_penpot_api.get_file.assert_called_with(file_id="file1")
def test_search_object_case_insensitive(mock_penpot_api):
"""Test the search_object tool with case-insensitive search."""
# Mock the file contents with more detailed mock data
mock_file_data = {
"id": "file1",
"name": "Test File",
"pagesIndex": {
"page1": {
"id": "page1",
"name": "Page 1",
"objects": {
"obj1": {"id": "obj1", "name": "Button Component", "type": "frame"},
"obj2": {"id": "obj2", "name": "HEADER TEXT", "type": "text"},
"obj3": {"id": "obj3", "name": "button Label", "type": "text"}
}
}
}
}
# Override the get_file return value for this test
mock_penpot_api.get_file.return_value = mock_file_data
# Create a function to simulate the search_object tool
def get_cached_file(file_id):
# Call the mock API to ensure it's tracked for assertions
return mock_penpot_api.get_file(file_id=file_id)
def search_object(file_id: str, query: str):
try:
# Get the file data using cache
file_data = get_cached_file(file_id)
if "error" in file_data:
return file_data
# Create case-insensitive pattern for matching
import re
pattern = re.compile(query, re.IGNORECASE)
# Store matching objects
matches = []
# Search through each page in the file
for page_id, page_data in file_data.get('pagesIndex', {}).items():
page_name = page_data.get('name', 'Unnamed')
# Search through objects in this page
for obj_id, obj_data in page_data.get('objects', {}).items():
obj_name = obj_data.get('name', '')
# Check if the name contains the query (case-insensitive)
if pattern.search(obj_name):
matches.append({
'id': obj_id,
'name': obj_name,
'page_id': page_id,
'page_name': page_name,
'object_type': obj_data.get('type', 'unknown')
})
return {'objects': matches}
except Exception as e:
return {"error": str(e)}
# Test with lowercase query for uppercase text
result = search_object("file1", "header")
assert "objects" in result
assert len(result["objects"]) == 1
assert result["objects"][0]["name"] == "HEADER TEXT"
# Test with uppercase query for lowercase text
result = search_object("file1", "BUTTON")
assert "objects" in result
assert len(result["objects"]) == 2
# Check mixed case matching
button_matches = sorted([obj["name"] for obj in result["objects"]])
assert button_matches == ["Button Component", "button Label"]
# Verify API was called
mock_penpot_api.get_file.assert_called_with(file_id="file1")
def test_search_object_no_matches(mock_penpot_api):
"""Test the search_object tool when no matches are found."""
# Mock the file contents
mock_file_data = {
"id": "file1",
"name": "Test File",
"pagesIndex": {
"page1": {
"id": "page1",
"name": "Page 1",
"objects": {
"obj1": {"id": "obj1", "name": "Button Component", "type": "frame"},
"obj2": {"id": "obj2", "name": "Header Text", "type": "text"}
}
}
}
}
# Override the get_file return value for this test
mock_penpot_api.get_file.return_value = mock_file_data
# Create a function to simulate the search_object tool
def get_cached_file(file_id):
# Call the mock API to ensure it's tracked for assertions
return mock_penpot_api.get_file(file_id=file_id)
def search_object(file_id: str, query: str):
try:
# Get the file data using cache
file_data = get_cached_file(file_id)
if "error" in file_data:
return file_data
# Create case-insensitive pattern for matching
import re
pattern = re.compile(query, re.IGNORECASE)
# Store matching objects
matches = []
# Search through each page in the file
for page_id, page_data in file_data.get('pagesIndex', {}).items():
page_name = page_data.get('name', 'Unnamed')
# Search through objects in this page
for obj_id, obj_data in page_data.get('objects', {}).items():
obj_name = obj_data.get('name', '')
# Check if the name contains the query (case-insensitive)
if pattern.search(obj_name):
matches.append({
'id': obj_id,
'name': obj_name,
'page_id': page_id,
'page_name': page_name,
'object_type': obj_data.get('type', 'unknown')
})
return {'objects': matches}
except Exception as e:
return {"error": str(e)}
# Test with a query that won't match anything
result = search_object("file1", "nonexistent")
assert "objects" in result
assert len(result["objects"]) == 0 # Empty array
# Verify API was called
mock_penpot_api.get_file.assert_called_with(file_id="file1")
def test_search_object_error_handling(mock_penpot_api):
"""Test the search_object tool error handling."""
# Make the API throw an exception
mock_penpot_api.get_file.side_effect = Exception("API error")
def get_cached_file(file_id):
try:
return mock_penpot_api.get_file(file_id=file_id)
except Exception as e:
return {"error": str(e)}
def search_object(file_id: str, query: str):
try:
# Get the file data using cache
file_data = get_cached_file(file_id)
if "error" in file_data:
return file_data
# Create case-insensitive pattern for matching
import re
pattern = re.compile(query, re.IGNORECASE)
# Store matching objects
matches = []
# Search through each page in the file
for page_id, page_data in file_data.get('pagesIndex', {}).items():
page_name = page_data.get('name', 'Unnamed')
# Search through objects in this page
for obj_id, obj_data in page_data.get('objects', {}).items():
obj_name = obj_data.get('name', '')
# Check if the name contains the query (case-insensitive)
if pattern.search(obj_name):
matches.append({
'id': obj_id,
'name': obj_name,
'page_id': page_id,
'page_name': page_name,
'object_type': obj_data.get('type', 'unknown')
})
return {'objects': matches}
except Exception as e:
return {"error": str(e)}
# Test with error from API
result = search_object("file1", "button")
assert "error" in result
assert "API error" in result["error"]
# Verify API was called
mock_penpot_api.get_file.assert_called_with(file_id="file1")
```
--------------------------------------------------------------------------------
/tests/test_penpot_tree.py:
--------------------------------------------------------------------------------
```python
"""Tests for the penpot_tree module."""
import re
from unittest.mock import MagicMock, patch
import pytest
from anytree import Node, RenderTree
from penpot_mcp.tools.penpot_tree import (
build_tree,
convert_node_to_dict,
export_tree_to_dot,
find_object_in_tree,
find_page_containing_object,
get_object_subtree,
get_object_subtree_with_fields,
print_tree,
)
@pytest.fixture
def sample_penpot_data():
"""Create sample Penpot file data for testing."""
return {
'components': {
'comp1': {'name': 'Button', 'annotation': 'Primary button'},
'comp2': {'name': 'Card', 'annotation': None}
},
'pagesIndex': {
'page1': {
'name': 'Home Page',
'objects': {
'00000000-0000-0000-0000-000000000000': {
'type': 'frame',
'name': 'Root Frame',
},
'obj1': {
'type': 'frame',
'name': 'Header',
'parentId': '00000000-0000-0000-0000-000000000000'
},
'obj2': {
'type': 'text',
'name': 'Title',
'parentId': 'obj1'
},
'obj3': {
'type': 'frame',
'name': 'Button Instance',
'parentId': 'obj1',
'componentId': 'comp1'
}
}
},
'page2': {
'name': 'About Page',
'objects': {
'00000000-0000-0000-0000-000000000000': {
'type': 'frame',
'name': 'Root Frame',
},
'obj4': {
'type': 'frame',
'name': 'Content',
'parentId': '00000000-0000-0000-0000-000000000000'
},
'obj5': {
'type': 'image',
'name': 'Logo',
'parentId': 'obj4'
}
}
}
}
}
@pytest.fixture
def sample_tree(sample_penpot_data):
"""Create a sample tree from the sample data."""
return build_tree(sample_penpot_data)
def test_build_tree(sample_penpot_data, sample_tree):
"""Test building a tree from Penpot file data."""
# Check that the root is created
assert sample_tree.name.startswith("SYNTHETIC-ROOT")
# Check components section
components_node = None
for child in sample_tree.children:
if "components (section)" in child.name:
components_node = child
break
assert components_node is not None
assert len(components_node.children) == 2
# Check pages are created
page_nodes = [child for child in sample_tree.children if "(page)" in child.name]
assert len(page_nodes) == 2
# Check objects within pages
for page_node in page_nodes:
if "Home Page" in page_node.name:
# Check that objects are created under the page
assert len(page_node.descendants) == 4 # Root frame + 3 objects
# Check parent-child relationships
for node in RenderTree(page_node):
if hasattr(node[2], 'obj_id') and node[2].obj_id == 'obj2':
assert node[2].parent.obj_id == 'obj1'
elif hasattr(node[2], 'obj_id') and node[2].obj_id == 'obj3':
assert node[2].parent.obj_id == 'obj1'
assert hasattr(node[2], 'componentRef')
assert node[2].componentRef == 'comp1'
assert hasattr(node[2], 'componentAnnotation')
assert node[2].componentAnnotation == 'Primary button'
def test_print_tree(sample_tree, capsys):
"""Test printing the tree to console."""
print_tree(sample_tree)
captured = capsys.readouterr()
# Check that all pages and components are in the output
assert "Home Page" in captured.out
assert "About Page" in captured.out
assert "comp1 (component) - Button" in captured.out
assert "comp2 (component) - Card" in captured.out
# Check that object types and names are displayed
assert "(frame) - Header" in captured.out
assert "(text) - Title" in captured.out
# Check that component references are shown
assert "refs component: comp1" in captured.out
assert "Note: Primary button" in captured.out
def test_print_tree_with_filter(sample_tree, capsys):
"""Test printing the tree with a filter applied."""
print_tree(sample_tree, filter_pattern="title")
captured = capsys.readouterr()
# Check that only the matching node and its ancestors are shown
assert "Title" in captured.out
assert "Header" in captured.out
assert "Home Page" in captured.out
assert "MATCH" in captured.out
# Check that non-matching nodes are not included
assert "Logo" not in captured.out
assert "About Page" not in captured.out
@patch('anytree.exporter.DotExporter.to_picture')
def test_export_tree_to_dot(mock_to_picture, sample_tree):
"""Test exporting the tree to a DOT file."""
result = export_tree_to_dot(sample_tree, "test_output.png")
# Check that the exporter was called
assert mock_to_picture.called
assert result is True
@patch('anytree.exporter.DotExporter.to_picture', side_effect=Exception("Test exception"))
def test_export_tree_to_dot_exception(mock_to_picture, sample_tree, capsys):
"""Test handling exceptions when exporting the tree."""
result = export_tree_to_dot(sample_tree, "test_output.png")
# Check that the function returns False on error
assert result is False
# Check that an error message is displayed
captured = capsys.readouterr()
assert "Warning: Could not export" in captured.out
assert "Make sure Graphviz is installed" in captured.out
def test_find_page_containing_object(sample_penpot_data):
"""Test finding which page contains a specific object."""
# Test finding an object that exists
page_id = find_page_containing_object(sample_penpot_data, 'obj2')
assert page_id == 'page1'
# Test finding an object in another page
page_id = find_page_containing_object(sample_penpot_data, 'obj5')
assert page_id == 'page2'
# Test finding an object that doesn't exist
page_id = find_page_containing_object(sample_penpot_data, 'nonexistent')
assert page_id is None
def test_find_object_in_tree(sample_tree):
"""Test finding an object in the tree by its ID."""
# Test finding an object that exists
obj_dict = find_object_in_tree(sample_tree, 'obj3')
assert obj_dict is not None
assert obj_dict['id'] == 'obj3'
assert obj_dict['type'] == 'frame'
assert obj_dict['name'] == 'Button Instance'
assert 'componentRef' in obj_dict
assert obj_dict['componentRef'] == 'comp1'
# Test finding an object that doesn't exist
obj_dict = find_object_in_tree(sample_tree, 'nonexistent')
assert obj_dict is None
def test_convert_node_to_dict():
"""Test converting a Node to a dictionary."""
# Create a test node with children and attributes
root = Node("root")
root.obj_id = "root_id"
root.obj_type = "frame"
root.obj_name = "Root Frame"
child1 = Node("child1", parent=root)
child1.obj_id = "child1_id"
child1.obj_type = "text"
child1.obj_name = "Child 1"
child2 = Node("child2", parent=root)
child2.obj_id = "child2_id"
child2.obj_type = "frame"
child2.obj_name = "Child 2"
child2.componentRef = "comp1"
child2.componentAnnotation = "Test component"
# Convert to dictionary
result = convert_node_to_dict(root)
# Check the result
assert result['id'] == 'root_id'
assert result['type'] == 'frame'
assert result['name'] == 'Root Frame'
assert len(result['children']) == 2
# Check children
child_ids = [child['id'] for child in result['children']]
assert 'child1_id' in child_ids
assert 'child2_id' in child_ids
# Check component reference
for child in result['children']:
if child['id'] == 'child2_id':
assert 'componentRef' in child
assert child['componentRef'] == 'comp1'
assert 'componentAnnotation' in child
assert child['componentAnnotation'] == 'Test component'
def test_get_object_subtree(sample_penpot_data):
"""Test getting a simplified tree for an object."""
file_data = {'data': sample_penpot_data}
# Test getting a subtree for an existing object
result = get_object_subtree(file_data, 'obj1')
assert 'error' not in result
assert 'tree' in result
assert result['tree']['id'] == 'obj1'
assert result['tree']['name'] == 'Header'
assert result['page_id'] == 'page1'
# Test getting a subtree for a non-existent object
result = get_object_subtree(file_data, 'nonexistent')
assert 'error' in result
assert 'not found' in result['error']
def test_circular_reference_handling(sample_penpot_data):
"""Test handling of circular references in the tree structure."""
# Create a circular reference
sample_penpot_data['pagesIndex']['page1']['objects']['obj6'] = {
'type': 'frame',
'name': 'Circular Parent',
'parentId': 'obj7'
}
sample_penpot_data['pagesIndex']['page1']['objects']['obj7'] = {
'type': 'frame',
'name': 'Circular Child',
'parentId': 'obj6'
}
# Build tree with circular reference
tree = build_tree(sample_penpot_data)
# The tree should be built without errors
# Check that the circular reference objects are attached to the page
page_node = None
for child in tree.children:
if "(page)" in child.name and "Home Page" in child.name:
page_node = child
break
assert page_node is not None
# Find the circular reference objects
circular_nodes = []
for node in RenderTree(page_node):
if hasattr(node[2], 'obj_id') and node[2].obj_id in ['obj6', 'obj7']:
circular_nodes.append(node[2])
# Check that the circular reference was resolved by attaching to parent
assert len(circular_nodes) == 2
def test_get_object_subtree_with_fields(sample_penpot_data):
"""Test getting a filtered subtree for an object with specific fields."""
file_data = {'data': sample_penpot_data}
# Test with no field filtering (include all fields)
result = get_object_subtree_with_fields(file_data, 'obj1')
assert 'error' not in result
assert 'tree' in result
assert result['tree']['id'] == 'obj1'
assert result['tree']['name'] == 'Header'
assert result['tree']['type'] == 'frame'
assert 'parentId' in result['tree']
assert len(result['tree']['children']) == 2
# Test with field filtering
result = get_object_subtree_with_fields(file_data, 'obj1', include_fields=['name', 'type'])
assert 'error' not in result
assert 'tree' in result
assert result['tree']['id'] == 'obj1' # id is always included
assert result['tree']['name'] == 'Header'
assert result['tree']['type'] == 'frame'
assert 'parentId' not in result['tree'] # should be filtered out
assert len(result['tree']['children']) == 2
# Test with depth limiting (depth=0, only the object itself)
result = get_object_subtree_with_fields(file_data, 'obj1', depth=0)
assert 'error' not in result
assert 'tree' in result
assert result['tree']['id'] == 'obj1'
assert 'children' not in result['tree'] # No children at depth 0
# Test for an object that doesn't exist
result = get_object_subtree_with_fields(file_data, 'nonexistent')
assert 'error' in result
assert 'not found' in result['error']
def test_get_object_subtree_with_fields_deep_hierarchy():
"""Test getting a filtered subtree for an object with multiple levels of nesting."""
# Create a more complex nested structure for testing depth parameter
file_data = {
'data': {
'components': {
'comp1': {
'id': 'comp1',
'name': 'Button',
'path': '/Components/Button',
'modifiedAt': '2023-01-01T12:00:00Z',
'mainInstanceId': 'main-button-instance',
'mainInstancePage': 'page1',
'annotation': 'Primary button'
},
'comp2': {
'id': 'comp2',
'name': 'Card',
'path': '/Components/Card',
'modifiedAt': '2023-01-02T12:00:00Z',
'mainInstanceId': 'main-card-instance',
'mainInstancePage': 'page1',
'annotation': 'Content card'
}
},
'colors': {
'color1': {
'path': '/Colors/Primary',
'color': '#3366FF',
'name': 'Primary Blue',
'modifiedAt': '2023-01-01T10:00:00Z',
'opacity': 1,
'id': 'color1'
},
'color2': {
'path': '/Colors/Secondary',
'color': '#FF6633',
'name': 'Secondary Orange',
'modifiedAt': '2023-01-01T10:30:00Z',
'opacity': 1,
'id': 'color2'
}
},
'typographies': {
'typo1': {
'lineHeight': '1.5',
'path': '/Typography/Heading',
'fontStyle': 'normal',
'textTransform': 'none',
'fontId': 'font1',
'fontSize': '24px',
'fontWeight': '600',
'name': 'Heading',
'modifiedAt': '2023-01-01T11:00:00Z',
'fontVariantId': 'var1',
'id': 'typo1',
'letterSpacing': '0',
'fontFamily': 'Inter'
}
},
'pagesIndex': {
'page1': {
'id': 'page1',
'name': 'Complex Page',
'options': {
'background': '#FFFFFF',
'grids': []
},
'objects': {
# Root frame (level 0)
'00000000-0000-0000-0000-000000000000': {
'id': '00000000-0000-0000-0000-000000000000',
'type': 'frame',
'name': 'Root Frame',
'width': 1920,
'height': 1080,
'x': 0,
'y': 0,
'rotation': 0,
'selrect': {
'x': 0,
'y': 0,
'width': 1920,
'height': 1080,
'x1': 0,
'y1': 0,
'x2': 1920,
'y2': 1080
},
'fills': [
{
'fillColor': '#FFFFFF',
'fillOpacity': 1
}
],
'layout': 'flex',
'layoutFlexDir': 'column',
'layoutAlignItems': 'center',
'layoutJustifyContent': 'start'
},
# Main container (level 1)
'main-container': {
'id': 'main-container',
'type': 'frame',
'name': 'Main Container',
'parentId': '00000000-0000-0000-0000-000000000000',
'width': 1200,
'height': 800,
'x': 360,
'y': 140,
'rotation': 0,
'selrect': {
'x': 360,
'y': 140,
'width': 1200,
'height': 800,
'x1': 360,
'y1': 140,
'x2': 1560,
'y2': 940
},
'fills': [
{
'fillColor': '#F5F5F5',
'fillOpacity': 1
}
],
'strokes': [
{
'strokeStyle': 'solid',
'strokeAlignment': 'center',
'strokeWidth': 1,
'strokeColor': '#E0E0E0',
'strokeOpacity': 1
}
],
'layout': 'flex',
'layoutFlexDir': 'column',
'layoutAlignItems': 'stretch',
'layoutJustifyContent': 'start',
'layoutGap': {
'row-gap': '0px',
'column-gap': '0px'
},
'layoutPadding': {
'padding-top': '0px',
'padding-right': '0px',
'padding-bottom': '0px',
'padding-left': '0px'
},
'constraintsH': 'center',
'constraintsV': 'center'
},
# Header section (level 2)
'header-section': {
'id': 'header-section',
'type': 'frame',
'name': 'Header Section',
'parentId': 'main-container',
'width': 1200,
'height': 100,
'x': 0,
'y': 0,
'rotation': 0,
'fills': [
{
'fillColor': '#FFFFFF',
'fillOpacity': 1
}
],
'strokes': [
{
'strokeStyle': 'solid',
'strokeAlignment': 'bottom',
'strokeWidth': 1,
'strokeColor': '#EEEEEE',
'strokeOpacity': 1
}
],
'layout': 'flex',
'layoutFlexDir': 'row',
'layoutAlignItems': 'center',
'layoutJustifyContent': 'space-between',
'layoutPadding': {
'padding-top': '20px',
'padding-right': '30px',
'padding-bottom': '20px',
'padding-left': '30px'
},
'constraintsH': 'stretch',
'constraintsV': 'top'
},
# Logo in header (level 3)
'logo': {
'id': 'logo',
'type': 'frame',
'name': 'Logo',
'parentId': 'header-section',
'width': 60,
'height': 60,
'x': 30,
'y': 20,
'rotation': 0,
'fills': [
{
'fillColor': '#3366FF',
'fillOpacity': 1
}
],
'r1': 8,
'r2': 8,
'r3': 8,
'r4': 8,
'constraintsH': 'left',
'constraintsV': 'center'
},
# Navigation menu (level 3)
'nav-menu': {
'id': 'nav-menu',
'type': 'frame',
'name': 'Navigation Menu',
'parentId': 'header-section',
'width': 600,
'height': 60,
'x': 300,
'y': 20,
'rotation': 0,
'layout': 'flex',
'layoutFlexDir': 'row',
'layoutAlignItems': 'center',
'layoutJustifyContent': 'center',
'layoutGap': {
'row-gap': '0px',
'column-gap': '20px'
},
'constraintsH': 'center',
'constraintsV': 'center'
},
# Menu items (level 4)
'menu-item-1': {
'id': 'menu-item-1',
'type': 'text',
'name': 'Home',
'parentId': 'nav-menu',
'width': 100,
'height': 40,
'x': 0,
'y': 10,
'rotation': 0,
'content': {
'type': 'root',
'children': [
{
'type': 'paragraph',
'children': [
{
'type': 'text',
'text': 'Home'
}
]
}
]
},
'fills': [
{
'fillColor': '#333333',
'fillOpacity': 1
}
],
'appliedTokens': {
'typography': 'typo1'
},
'constraintsH': 'start',
'constraintsV': 'center'
},
'menu-item-2': {
'id': 'menu-item-2',
'type': 'text',
'name': 'Products',
'parentId': 'nav-menu',
'width': 100,
'height': 40,
'x': 120,
'y': 10,
'rotation': 0,
'content': {
'type': 'root',
'children': [
{
'type': 'paragraph',
'children': [
{
'type': 'text',
'text': 'Products'
}
]
}
]
},
'fills': [
{
'fillColor': '#333333',
'fillOpacity': 1
}
]
},
'menu-item-3': {
'id': 'menu-item-3',
'type': 'text',
'name': 'About',
'parentId': 'nav-menu',
'width': 100,
'height': 40,
'x': 240,
'y': 10,
'rotation': 0,
'content': {
'type': 'root',
'children': [
{
'type': 'paragraph',
'children': [
{
'type': 'text',
'text': 'About'
}
]
}
]
},
'fills': [
{
'fillColor': '#333333',
'fillOpacity': 1
}
]
},
# Content section (level 2)
'content-section': {
'id': 'content-section',
'type': 'frame',
'name': 'Content Section',
'parentId': 'main-container',
'width': 1200,
'height': 700,
'x': 0,
'y': 100,
'rotation': 0,
'layout': 'flex',
'layoutFlexDir': 'column',
'layoutAlignItems': 'stretch',
'layoutJustifyContent': 'start',
'layoutGap': {
'row-gap': '0px',
'column-gap': '0px'
},
'constraintsH': 'stretch',
'constraintsV': 'top'
},
# Hero (level 3)
'hero': {
'id': 'hero',
'type': 'frame',
'name': 'Hero Section',
'parentId': 'content-section',
'width': 1200,
'height': 400,
'x': 0,
'y': 0,
'rotation': 0,
'fills': [
{
'fillColor': '#F0F7FF',
'fillOpacity': 1
}
],
'layout': 'flex',
'layoutFlexDir': 'column',
'layoutAlignItems': 'center',
'layoutJustifyContent': 'center',
'layoutPadding': {
'padding-top': '40px',
'padding-right': '40px',
'padding-bottom': '40px',
'padding-left': '40px'
},
'constraintsH': 'stretch',
'constraintsV': 'top'
},
# Hero title (level 4)
'hero-title': {
'id': 'hero-title',
'type': 'text',
'name': 'Welcome Title',
'parentId': 'hero',
'width': 600,
'height': 80,
'x': 300,
'y': 140,
'rotation': 0,
'content': {
'type': 'root',
'children': [
{
'type': 'paragraph',
'children': [
{
'type': 'text',
'text': 'Welcome to our Platform'
}
]
}
]
},
'fills': [
{
'fillColor': '#333333',
'fillOpacity': 1
}
],
'appliedTokens': {
'typography': 'typo1'
},
'constraintsH': 'center',
'constraintsV': 'center'
},
# Cards container (level 3)
'cards-container': {
'id': 'cards-container',
'type': 'frame',
'name': 'Cards Container',
'parentId': 'content-section',
'width': 1200,
'height': 300,
'x': 0,
'y': 400,
'rotation': 0,
'layout': 'flex',
'layoutFlexDir': 'row',
'layoutAlignItems': 'center',
'layoutJustifyContent': 'space-around',
'layoutPadding': {
'padding-top': '25px',
'padding-right': '25px',
'padding-bottom': '25px',
'padding-left': '25px'
},
'constraintsH': 'stretch',
'constraintsV': 'top'
},
# Card instances (level 4)
'card-1': {
'id': 'card-1',
'type': 'frame',
'name': 'Card 1',
'parentId': 'cards-container',
'width': 300,
'height': 250,
'x': 50,
'y': 25,
'rotation': 0,
'componentId': 'comp2',
'fills': [
{
'fillColor': '#FFFFFF',
'fillOpacity': 1
}
],
'strokes': [
{
'strokeStyle': 'solid',
'strokeAlignment': 'center',
'strokeWidth': 1,
'strokeColor': '#EEEEEE',
'strokeOpacity': 1
}
],
'r1': 8,
'r2': 8,
'r3': 8,
'r4': 8,
'layout': 'flex',
'layoutFlexDir': 'column',
'layoutAlignItems': 'center',
'layoutJustifyContent': 'start',
'layoutPadding': {
'padding-top': '20px',
'padding-right': '20px',
'padding-bottom': '20px',
'padding-left': '20px'
},
'constraintsH': 'center',
'constraintsV': 'center'
},
'card-2': {
'id': 'card-2',
'type': 'frame',
'name': 'Card 2',
'parentId': 'cards-container',
'width': 300,
'height': 250,
'x': 450,
'y': 25,
'rotation': 0,
'componentId': 'comp2',
'fills': [
{
'fillColor': '#FFFFFF',
'fillOpacity': 1
}
],
'strokes': [
{
'strokeStyle': 'solid',
'strokeAlignment': 'center',
'strokeWidth': 1,
'strokeColor': '#EEEEEE',
'strokeOpacity': 1
}
],
'r1': 8,
'r2': 8,
'r3': 8,
'r4': 8
},
'card-3': {
'id': 'card-3',
'type': 'frame',
'name': 'Card 3',
'parentId': 'cards-container',
'width': 300,
'height': 250,
'x': 850,
'y': 25,
'rotation': 0,
'componentId': 'comp2',
'fills': [
{
'fillColor': '#FFFFFF',
'fillOpacity': 1
}
],
'strokes': [
{
'strokeStyle': 'solid',
'strokeAlignment': 'center',
'strokeWidth': 1,
'strokeColor': '#EEEEEE',
'strokeOpacity': 1
}
],
'r1': 8,
'r2': 8,
'r3': 8,
'r4': 8
}
}
}
},
'id': 'file1',
'pages': ['page1'],
'tokensLib': {
'sets': {
'S-colors': {
'name': 'Colors',
'description': 'Color tokens',
'modifiedAt': '2023-01-01T09:00:00Z',
'tokens': {
'primary': {
'name': 'Primary',
'type': 'color',
'value': '#3366FF',
'description': 'Primary color',
'modifiedAt': '2023-01-01T09:00:00Z'
},
'secondary': {
'name': 'Secondary',
'type': 'color',
'value': '#FF6633',
'description': 'Secondary color',
'modifiedAt': '2023-01-01T09:00:00Z'
}
}
}
},
'themes': {
'default': {
'light': {
'name': 'Light',
'group': 'Default',
'description': 'Light theme',
'isSource': True,
'id': 'theme1',
'modifiedAt': '2023-01-01T09:30:00Z',
'sets': ['S-colors']
}
}
},
'activeThemes': ['light']
}
}
}
# Test 1: Full tree at maximum depth (default)
result = get_object_subtree_with_fields(file_data, 'main-container')
assert 'error' not in result
assert result['tree']['id'] == 'main-container'
assert result['tree']['name'] == 'Main Container'
assert result['tree']['type'] == 'frame'
# Verify first level children exist (header and content sections)
children_names = [child['name'] for child in result['tree']['children']]
assert 'Header Section' in children_names
assert 'Content Section' in children_names
# Verify second level children exist (deep nesting)
header_section = next(child for child in result['tree']['children'] if child['name'] == 'Header Section')
logo_in_header = next((child for child in header_section['children'] if child['name'] == 'Logo'), None)
assert logo_in_header is not None
nav_menu = next((child for child in header_section['children'] if child['name'] == 'Navigation Menu'), None)
assert nav_menu is not None
# Check if level 4 elements (menu items) exist
menu_items = [child for child in nav_menu['children']]
assert len(menu_items) == 3
menu_item_names = [item['name'] for item in menu_items]
assert 'Home' in menu_item_names
assert 'Products' in menu_item_names
assert 'About' in menu_item_names
# Test 2: Depth = 1 (main container and its immediate children only)
result = get_object_subtree_with_fields(file_data, 'main-container', depth=1)
assert 'error' not in result
assert result['tree']['id'] == 'main-container'
assert 'children' in result['tree']
# Should have header and content sections but no deeper elements
children_names = [child['name'] for child in result['tree']['children']]
assert 'Header Section' in children_names
assert 'Content Section' in children_names
# Verify no grandchildren are included
header_section = next(child for child in result['tree']['children'] if child['name'] == 'Header Section')
assert 'children' not in header_section
# Test 3: Depth = 2 (main container, its children, and grandchildren)
result = get_object_subtree_with_fields(file_data, 'main-container', depth=2)
assert 'error' not in result
# Should have header and content sections
header_section = next(child for child in result['tree']['children'] if child['name'] == 'Header Section')
content_section = next(child for child in result['tree']['children'] if child['name'] == 'Content Section')
# Header section should have logo and nav menu but no menu items
assert 'children' in header_section
nav_menu = next((child for child in header_section['children'] if child['name'] == 'Navigation Menu'), None)
assert nav_menu is not None
assert 'children' not in nav_menu
# Test 4: Field filtering with selective depth
result = get_object_subtree_with_fields(
file_data,
'main-container',
include_fields=['name', 'type'],
depth=2
)
assert 'error' not in result
# Main container should have only specified fields plus id
assert set(result['tree'].keys()) == {'id', 'name', 'type', 'children'}
assert 'width' not in result['tree']
assert 'height' not in result['tree']
# Children should also have only the specified fields
header_section = next(child for child in result['tree']['children'] if child['name'] == 'Header Section')
assert set(header_section.keys()) == {'id', 'name', 'type', 'children'}
# Test 5: Testing component references
result = get_object_subtree_with_fields(file_data, 'cards-container')
assert 'error' not in result
# Find the first card
card = next(child for child in result['tree']['children'] if child['name'] == 'Card 1')
assert 'componentId' in card
assert card['componentId'] == 'comp2' # References the Card component
# Test 6: Test layout properties in objects
result = get_object_subtree_with_fields(file_data, 'main-container', include_fields=['layout', 'layoutFlexDir', 'layoutAlignItems', 'layoutJustifyContent'])
assert 'error' not in result
assert result['tree']['layout'] == 'flex'
assert result['tree']['layoutFlexDir'] == 'column'
assert result['tree']['layoutAlignItems'] == 'stretch'
assert result['tree']['layoutJustifyContent'] == 'start'
# Test 7: Test text content structure
result = get_object_subtree_with_fields(file_data, 'hero-title', include_fields=['content'])
assert 'error' not in result
assert result['tree']['content']['type'] == 'root'
assert len(result['tree']['content']['children']) == 1
assert result['tree']['content']['children'][0]['type'] == 'paragraph'
assert result['tree']['content']['children'][0]['children'][0]['text'] == 'Welcome to our Platform'
# Test 8: Test applied tokens
result = get_object_subtree_with_fields(file_data, 'hero-title', include_fields=['appliedTokens'])
assert 'error' not in result
assert 'appliedTokens' in result['tree']
assert result['tree']['appliedTokens']['typography'] == 'typo1'
def test_get_object_subtree_with_fields_root_frame():
"""Test getting a filtered subtree starting from the root frame."""
# Use same complex nested structure from the previous test
file_data = {
'data': {
'pagesIndex': {
'page1': {
'name': 'Complex Page',
'objects': {
# Root frame (level 0)
'00000000-0000-0000-0000-000000000000': {
'type': 'frame',
'name': 'Root Frame',
'width': 1920,
'height': 1080
},
# Main container (level 1)
'main-container': {
'type': 'frame',
'name': 'Main Container',
'parentId': '00000000-0000-0000-0000-000000000000'
}
}
}
}
}
}
# Test getting the root frame
result = get_object_subtree_with_fields(file_data, '00000000-0000-0000-0000-000000000000')
assert 'error' not in result
assert result['tree']['id'] == '00000000-0000-0000-0000-000000000000'
assert result['tree']['type'] == 'frame'
assert 'children' in result['tree']
assert len(result['tree']['children']) == 1
assert result['tree']['children'][0]['name'] == 'Main Container'
def test_get_object_subtree_with_fields_circular_reference():
"""Test handling of circular references in object tree."""
file_data = {
'data': {
'pagesIndex': {
'page1': {
'name': 'Test Page',
'objects': {
# Object A references B as parent
'object-a': {
'type': 'frame',
'name': 'Object A',
'parentId': 'object-b'
},
# Object B references A as parent (circular)
'object-b': {
'type': 'frame',
'name': 'Object B',
'parentId': 'object-a'
},
# Object C references itself as parent
'object-c': {
'type': 'frame',
'name': 'Object C',
'parentId': 'object-c'
}
}
}
}
}
}
# Test getting object A - should handle circular reference with B
result = get_object_subtree_with_fields(file_data, 'object-a')
assert 'error' not in result
assert result['tree']['id'] == 'object-a'
assert 'children' in result['tree']
# Check that object-b appears as a child
assert len(result['tree']['children']) == 1
assert result['tree']['children'][0]['id'] == 'object-b'
# The circular reference appears when object-a appears again as a child of object-b
assert 'children' in result['tree']['children'][0]
assert len(result['tree']['children'][0]['children']) == 1
assert result['tree']['children'][0]['children'][0]['id'] == 'object-a'
assert result['tree']['children'][0]['children'][0]['_circular_reference'] == True
# Test getting object C - should handle self-reference
result = get_object_subtree_with_fields(file_data, 'object-c')
assert 'error' not in result
assert result['tree']['id'] == 'object-c'
assert 'children' in result['tree']
# Check that object-c appears as its own child with circular reference marker
assert len(result['tree']['children']) == 1
assert result['tree']['children'][0]['id'] == 'object-c'
assert result['tree']['children'][0]['_circular_reference'] == True
```