# Directory Structure ``` ├── .github │ └── workflows │ ├── publish.yml │ └── release.yaml ├── .gitignore ├── LICENSE ├── main.py ├── pyproject.toml ├── README.md ├── src │ └── server.py └── succeed.jpg ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python 2 | __pycache__/ 3 | *.py[cod] 4 | *$py.class 5 | *.so 6 | .Python 7 | build/ 8 | develop-eggs/ 9 | dist/ 10 | downloads/ 11 | eggs/ 12 | .eggs/ 13 | lib/ 14 | lib64/ 15 | parts/ 16 | sdist/ 17 | var/ 18 | wheels/ 19 | *.egg-info/ 20 | .installed.cfg 21 | *.egg 22 | 23 | # Virtual Environment 24 | venv/ 25 | ENV/ 26 | env/ 27 | .env 28 | .venv 29 | env.bak/ 30 | venv.bak/ 31 | 32 | # IDE specific files 33 | .idea/ 34 | .vscode/ 35 | *.swp 36 | *.swo 37 | .DS_Store 38 | *.sublime-workspace 39 | *.sublime-project 40 | 41 | # Testing 42 | .tox/ 43 | .coverage 44 | .coverage.* 45 | .cache 46 | nosetests.xml 47 | coverage.xml 48 | *.cover 49 | .hypothesis/ 50 | .pytest_cache/ 51 | htmlcov/ 52 | 53 | # Distribution / packaging 54 | .Python 55 | *.manifest 56 | *.spec 57 | pip-log.txt 58 | pip-delete-this-directory.txt 59 | 60 | # Jupyter Notebook 61 | .ipynb_checkpoints 62 | 63 | # pyenv 64 | .python-version 65 | 66 | # mypy 67 | .mypy_cache/ 68 | .dmypy.json 69 | dmypy.json 70 | 71 | # Logs and databases 72 | *.log 73 | *.sqlite 74 | *.db 75 | 76 | uv.lock ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Tripo MCP Server 2 | 3 | Tripo MCP provides an interface between AI assistants and [Tripo AI](https://www.tripo3d.ai) via [Model Context Protocol (MCP)](https://github.com/anthropics/anthropic-cookbook/tree/main/mcp). 4 | 5 | > **Note:** This project is in alpha. Currently, it supports Tripo Blender Addon integration. 6 | 7 | ## Current Features 8 | 9 | - Generate 3D asset from natural language using Tripo's API and import to Blender 10 | - Compatible with Claude and other MCP-enabled AI assistants 11 | 12 | ## Quick Start 13 | 14 | ### Prerequisites 15 | - Python 3.10+ 16 | - [Blender](https://www.blender.org/download/) 17 | - [Tripo AI Blender Addon](https://www.tripo3d.ai/app/home) 18 | - Claude for Desktop or Cursor IDE 19 | 20 | ### Installation 21 | 22 | 1. Install Tripo AI Blender Addon from [Tripo AI's website](https://www.tripo3d.ai/app/home) 23 | 24 | 2. Configure the MCP server in Claude Desktop or Cursor. 25 | 26 | * `pip install uv` 27 | * set mcp in cursor 28 | ```json 29 | { 30 | "mcpServers": { 31 | "tripo-mcp": { 32 | "command": "uvx", 33 | "args": [ 34 | "tripo-mcp" 35 | ] 36 | } 37 | } 38 | } 39 | ``` 40 | 41 | * Then you will get a green dot like this: 42 |  43 | 44 | ### Usage 45 | 46 | 1. Enable Tripo AI Blender Addon and start blender mcp server. 47 | 48 | 2. Chat using cursor or claude. E.g., "Generate a 3D model of a futuristic chair". 49 | 50 | ## Acknowledgements 51 | 52 | - **[Tripo AI](https://www.tripo3d.ai)** 53 | - **[blender-mcp](https://github.com/ahujasid/blender-mcp)** by [Siddharth Ahuja](https://github.com/ahujasid) 54 | 55 | **Special Thanks** 56 | Special thanks to Siddharth Ahuja for the blender-mcp project, which provided inspiring ideas for MCP + 3D. 57 | ``` -------------------------------------------------------------------------------- /main.py: -------------------------------------------------------------------------------- ```python 1 | from src.server import main as server_main 2 | 3 | 4 | if __name__ == "__main__": 5 | """Entry point for the Tripo MCP package""" 6 | server_main() ``` -------------------------------------------------------------------------------- /.github/workflows/publish.yml: -------------------------------------------------------------------------------- ```yaml 1 | name: Publish to PyPI 2 | 3 | on: 4 | release: 5 | types: [published] 6 | 7 | jobs: 8 | deploy: 9 | runs-on: ubuntu-latest 10 | steps: 11 | - uses: actions/checkout@v4 12 | - name: Set up Python 13 | uses: actions/setup-python@v4 14 | with: 15 | python-version: '3.10' 16 | - name: Install dependencies 17 | run: | 18 | python -m pip install --upgrade pip 19 | pip install build 20 | - name: Build package 21 | run: python -m build 22 | - name: Publish package 23 | uses: pypa/[email protected] 24 | with: 25 | password: ${{ secrets.PYPI_API_TOKEN }} ``` -------------------------------------------------------------------------------- /.github/workflows/release.yaml: -------------------------------------------------------------------------------- ```yaml 1 | name: Publish to PyPI 2 | 3 | on: 4 | release: 5 | types: [created] 6 | 7 | jobs: 8 | deploy: 9 | runs-on: ubuntu-latest 10 | steps: 11 | - uses: actions/checkout@v3 12 | 13 | - name: Set up Python 14 | uses: actions/setup-python@v4 15 | with: 16 | python-version: '3.x' 17 | 18 | - name: Install dependencies 19 | run: | 20 | python -m pip install --upgrade pip 21 | pip install build twine 22 | 23 | - name: Build package 24 | run: python -m build 25 | 26 | - name: Publish to PyPI 27 | env: 28 | TWINE_USERNAME: __token__ 29 | TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} 30 | run: | 31 | python -m twine upload dist/* ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "tripo-mcp" 3 | version = "0.1.2" 4 | description = "MCP (Model Control Protocol) integration for Tripo" 5 | authors = [ 6 | {name = "Allen Dang", email = "[email protected]"}, 7 | {name = "pookiefoof", email = "[email protected]"}, 8 | {name = "Ding Liang", email = "[email protected]"}, 9 | ] 10 | license = {text = "MIT"} 11 | readme = "README.md" 12 | requires-python = ">=3.10" 13 | keywords = ["mcp", "blender", "3d", "automation"] 14 | classifiers = [ 15 | "Development Status :: 3 - Alpha", 16 | "Intended Audience :: Developers", 17 | "License :: OSI Approved :: MIT License", 18 | "Programming Language :: Python :: 3", 19 | "Programming Language :: Python :: 3.10", 20 | ] 21 | 22 | dependencies = [ 23 | "tripo3d>=0.2.0", 24 | "mcp[cli]>=1.4.1", 25 | ] 26 | 27 | [project.optional-dependencies] 28 | dev = [ 29 | ] 30 | 31 | [project.scripts] 32 | tripo-mcp = "server:main" 33 | 34 | [build-system] 35 | requires = ["setuptools>=61.0", "wheel"] 36 | build-backend = "setuptools.build_meta" 37 | 38 | [tool.setuptools] 39 | package-dir = {"" = "src"} 40 | ``` -------------------------------------------------------------------------------- /src/server.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | MIT License 3 | 4 | Copyright (c) 2025 Siddharth Ahuja 5 | Copyright (c) 2025 for additions 6 | 7 | Permission is hereby granted, free of charge, to any person obtaining a copy 8 | of this software and associated documentation files (the "Software"), to deal 9 | in the Software without restriction, including without limitation the rights 10 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 | copies of the Software, and to permit persons to whom the Software is 12 | furnished to do so, subject to the following conditions: 13 | 14 | The above copyright notice and this permission notice shall be included in all 15 | copies or substantial portions of the Software. 16 | 17 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23 | SOFTWARE. 24 | 25 | This file is based on work by Siddharth Ahuja, with additional contributions 26 | for Tripo MCP functionality. 27 | """ 28 | 29 | from mcp.server.fastmcp import FastMCP, Context 30 | import sys 31 | from pathlib import Path 32 | from typing import Dict, Any 33 | import socket 34 | import json 35 | import logging 36 | from dataclasses import dataclass 37 | from contextlib import asynccontextmanager 38 | from typing import AsyncIterator, Dict, Any, List 39 | 40 | # Configure logging 41 | logging.basicConfig( 42 | level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 43 | ) 44 | logger = logging.getLogger("BlenderMCPServer") 45 | 46 | sys.path.insert(0, str(Path(__file__).parent.parent)) 47 | 48 | from tripo3d import TripoClient, TaskStatus 49 | 50 | 51 | @dataclass 52 | class BlenderConnection: 53 | host: str 54 | port: int 55 | sock: socket.socket = ( 56 | None # Changed from 'socket' to 'sock' to avoid naming conflict 57 | ) 58 | 59 | def connect(self) -> bool: 60 | """Connect to the Blender addon socket server""" 61 | if self.sock: 62 | return True 63 | 64 | try: 65 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 66 | self.sock.connect((self.host, self.port)) 67 | logger.info(f"Connected to Blender at {self.host}:{self.port}") 68 | return True 69 | except Exception as e: 70 | logger.error(f"Failed to connect to Blender: {str(e)}") 71 | self.sock = None 72 | return False 73 | 74 | def disconnect(self): 75 | """Disconnect from the Blender addon""" 76 | if self.sock: 77 | try: 78 | self.sock.close() 79 | except Exception as e: 80 | logger.error(f"Error disconnecting from Blender: {str(e)}") 81 | finally: 82 | self.sock = None 83 | 84 | def receive_full_response(self, sock, buffer_size=8192): 85 | """Receive the complete response, potentially in multiple chunks""" 86 | chunks = [] 87 | # Use a consistent timeout value that matches the addon's timeout 88 | sock.settimeout(15.0) # Match the addon's timeout 89 | 90 | try: 91 | while True: 92 | try: 93 | chunk = sock.recv(buffer_size) 94 | if not chunk: 95 | # If we get an empty chunk, the connection might be closed 96 | if ( 97 | not chunks 98 | ): # If we haven't received anything yet, this is an error 99 | raise Exception( 100 | "Connection closed before receiving any data" 101 | ) 102 | break 103 | 104 | chunks.append(chunk) 105 | 106 | # Check if we've received a complete JSON object 107 | try: 108 | data = b"".join(chunks) 109 | json.loads(data.decode("utf-8")) 110 | # If we get here, it parsed successfully 111 | logger.info(f"Received complete response ({len(data)} bytes)") 112 | return data 113 | except json.JSONDecodeError: 114 | # Incomplete JSON, continue receiving 115 | continue 116 | except socket.timeout: 117 | # If we hit a timeout during receiving, break the loop and try to use what we have 118 | logger.warning("Socket timeout during chunked receive") 119 | break 120 | except (ConnectionError, BrokenPipeError, ConnectionResetError) as e: 121 | logger.error(f"Socket connection error during receive: {str(e)}") 122 | raise # Re-raise to be handled by the caller 123 | except socket.timeout: 124 | logger.warning("Socket timeout during chunked receive") 125 | except Exception as e: 126 | logger.error(f"Error during receive: {str(e)}") 127 | raise 128 | 129 | # If we get here, we either timed out or broke out of the loop 130 | # Try to use what we have 131 | if chunks: 132 | data = b"".join(chunks) 133 | logger.info(f"Returning data after receive completion ({len(data)} bytes)") 134 | try: 135 | # Try to parse what we have 136 | json.loads(data.decode("utf-8")) 137 | return data 138 | except json.JSONDecodeError: 139 | # If we can't parse it, it's incomplete 140 | raise Exception("Incomplete JSON response received") 141 | else: 142 | raise Exception("No data received") 143 | 144 | def send_command( 145 | self, command_type: str, params: Dict[str, Any] = None 146 | ) -> Dict[str, Any]: 147 | """Send a command to Blender and return the response""" 148 | if not self.sock and not self.connect(): 149 | raise ConnectionError("Not connected to Blender") 150 | 151 | command = {"type": command_type, "params": params or {}} 152 | 153 | try: 154 | # Log the command being sent 155 | logger.info(f"Sending command: {command_type} with params: {params}") 156 | 157 | # Send the command 158 | self.sock.sendall(json.dumps(command).encode("utf-8")) 159 | logger.info(f"Command sent, waiting for response...") 160 | 161 | # Set a timeout for receiving - use the same timeout as in receive_full_response 162 | self.sock.settimeout(15.0) # Match the addon's timeout 163 | 164 | # Receive the response using the improved receive_full_response method 165 | response_data = self.receive_full_response(self.sock) 166 | logger.info(f"Received {len(response_data)} bytes of data") 167 | 168 | response = json.loads(response_data.decode("utf-8")) 169 | logger.info(f"Response parsed, status: {response.get('status', 'unknown')}") 170 | 171 | if response.get("status") == "error": 172 | logger.error(f"Blender error: {response.get('message')}") 173 | raise Exception(response.get("message", "Unknown error from Blender")) 174 | 175 | return response.get("result", {}) 176 | except socket.timeout: 177 | logger.error("Socket timeout while waiting for response from Blender") 178 | # Don't try to reconnect here - let the get_blender_connection handle reconnection 179 | # Just invalidate the current socket so it will be recreated next time 180 | self.sock = None 181 | raise Exception( 182 | "Timeout waiting for Blender response - try simplifying your request" 183 | ) 184 | except (ConnectionError, BrokenPipeError, ConnectionResetError) as e: 185 | logger.error(f"Socket connection error: {str(e)}") 186 | self.sock = None 187 | raise Exception(f"Connection to Blender lost: {str(e)}") 188 | except json.JSONDecodeError as e: 189 | logger.error(f"Invalid JSON response from Blender: {str(e)}") 190 | # Try to log what was received 191 | if "response_data" in locals() and response_data: 192 | logger.error(f"Raw response (first 200 bytes): {response_data[:200]}") 193 | raise Exception(f"Invalid response from Blender: {str(e)}") 194 | except Exception as e: 195 | logger.error(f"Error communicating with Blender: {str(e)}") 196 | # Don't try to reconnect here - let the get_blender_connection handle reconnection 197 | self.sock = None 198 | raise Exception(f"Communication error with Blender: {str(e)}") 199 | 200 | 201 | @asynccontextmanager 202 | async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: 203 | """Manage server startup and shutdown lifecycle""" 204 | # We don't need to create a connection here since we're using the global connection 205 | # for resources and tools 206 | 207 | try: 208 | # Just log that we're starting up 209 | logger.info("Blender server starting up") 210 | 211 | # Try to connect to Blender on startup to verify it's available 212 | try: 213 | # This will initialize the global connection if needed 214 | blender = get_blender_connection() 215 | logger.info("Successfully connected to Blender on startup") 216 | except Exception as e: 217 | logger.warning(f"Could not connect to Blender on startup: {str(e)}") 218 | logger.warning( 219 | "Make sure the Blender addon is running before using Blender resources or tools" 220 | ) 221 | 222 | # Return an empty context - we're using the global connection 223 | yield {} 224 | finally: 225 | # Clean up the global connection on shutdown 226 | global _blender_connection 227 | if _blender_connection: 228 | logger.info("Disconnecting from Blender on shutdown") 229 | _blender_connection.disconnect() 230 | _blender_connection = None 231 | logger.info("BlenderMCP server shut down") 232 | 233 | 234 | mcp = FastMCP( 235 | name="Tripo MCP", 236 | instructions="MCP for Tripo Blender addon", 237 | lifespan=server_lifespan, 238 | # host="127.0.0.1", 239 | # port=8392, 240 | ) 241 | 242 | _blender_connection = None 243 | _polyhaven_enabled = False # Add this global variable 244 | _tripo_apikey = "" 245 | 246 | 247 | def get_blender_connection(): 248 | """Get or create a persistent Blender connection""" 249 | global _blender_connection, _polyhaven_enabled, _tripo_apikey # Add _polyhaven_enabled to globals 250 | 251 | # If we have an existing connection, check if it's still valid 252 | if _blender_connection is not None: 253 | try: 254 | # First check if PolyHaven is enabled by sending a ping command 255 | result = _blender_connection.send_command("get_tripo_apikey") 256 | _tripo_apikey = result.get("api_key", "") 257 | result = _blender_connection.send_command("get_polyhaven_status") 258 | # Store the PolyHaven status globally 259 | _polyhaven_enabled = result.get("enabled", False) 260 | 261 | return _blender_connection 262 | except Exception as e: 263 | # Connection is dead, close it and create a new one 264 | logger.warning(f"Existing connection is no longer valid: {str(e)}") 265 | try: 266 | _blender_connection.disconnect() 267 | except: 268 | pass 269 | _blender_connection = None 270 | 271 | # Create a new connection if needed 272 | if _blender_connection is None: 273 | _blender_connection = BlenderConnection(host="localhost", port=9876) 274 | if not _blender_connection.connect(): 275 | logger.error("Failed to connect to Blender") 276 | _blender_connection = None 277 | raise Exception( 278 | "Could not connect to Blender. Make sure the Blender addon is running." 279 | ) 280 | logger.info("Created new persistent connection to Blender") 281 | 282 | return _blender_connection 283 | 284 | 285 | @mcp.tool() 286 | def get_scene_info(ctx: Context) -> str: 287 | """Get detailed information about the current Blender scene""" 288 | try: 289 | blender = get_blender_connection() 290 | result = blender.send_command("get_scene_info") 291 | 292 | # Just return the JSON representation of what Blender sent us 293 | return json.dumps(result, indent=2) 294 | except Exception as e: 295 | logger.error(f"Error getting scene info from Blender: {str(e)}") 296 | return f"Error getting scene info: {str(e)}" 297 | 298 | 299 | @mcp.tool() 300 | def get_object_info(ctx: Context, object_name: str) -> str: 301 | """ 302 | Get detailed information about a specific object in the Blender scene. 303 | 304 | Parameters: 305 | - object_name: The name of the object to get information about 306 | """ 307 | try: 308 | blender = get_blender_connection() 309 | result = blender.send_command("get_object_info", {"name": object_name}) 310 | 311 | # Just return the JSON representation of what Blender sent us 312 | return json.dumps(result, indent=2) 313 | except Exception as e: 314 | logger.error(f"Error getting object info from Blender: {str(e)}") 315 | return f"Error getting object info: {str(e)}" 316 | 317 | 318 | @mcp.tool() 319 | def create_object( 320 | ctx: Context, 321 | type: str = "CUBE", 322 | name: str = None, 323 | location: List[float] = None, 324 | rotation: List[float] = None, 325 | scale: List[float] = None, 326 | ) -> str: 327 | """ 328 | Create a new object in the Blender scene. 329 | 330 | Parameters: 331 | - type: Object type (CUBE, SPHERE, CYLINDER, PLANE, CONE, TORUS, EMPTY, CAMERA, LIGHT) 332 | - name: Optional name for the object 333 | - location: Optional [x, y, z] location coordinates 334 | - rotation: Optional [x, y, z] rotation in radians 335 | - scale: Optional [x, y, z] scale factors 336 | """ 337 | try: 338 | # Get the global connection 339 | blender = get_blender_connection() 340 | 341 | # Set default values for missing parameters 342 | loc = location or [0, 0, 0] 343 | rot = rotation or [0, 0, 0] 344 | sc = scale or [1, 1, 1] 345 | 346 | params = {"type": type, "location": loc, "rotation": rot, "scale": sc} 347 | 348 | if name: 349 | params["name"] = name 350 | 351 | result = blender.send_command("create_object", params) 352 | return f"Created {type} object: {result['name']}" 353 | except Exception as e: 354 | logger.error(f"Error creating object: {str(e)}") 355 | return f"Error creating object: {str(e)}" 356 | 357 | 358 | @mcp.tool() 359 | def modify_object( 360 | ctx: Context, 361 | name: str, 362 | location: List[float] = None, 363 | rotation: List[float] = None, 364 | scale: List[float] = None, 365 | visible: bool = None, 366 | ) -> str: 367 | """ 368 | Modify an existing object in the Blender scene. 369 | 370 | Parameters: 371 | - name: Name of the object to modify 372 | - location: Optional [x, y, z] location coordinates 373 | - rotation: Optional [x, y, z] rotation in radians 374 | - scale: Optional [x, y, z] scale factors 375 | - visible: Optional boolean to set visibility 376 | """ 377 | try: 378 | # Get the global connection 379 | blender = get_blender_connection() 380 | 381 | params = {"name": name} 382 | 383 | if location is not None: 384 | params["location"] = location 385 | if rotation is not None: 386 | params["rotation"] = rotation 387 | if scale is not None: 388 | params["scale"] = scale 389 | if visible is not None: 390 | params["visible"] = visible 391 | 392 | result = blender.send_command("modify_object", params) 393 | return f"Modified object: {result['name']}" 394 | except Exception as e: 395 | logger.error(f"Error modifying object: {str(e)}") 396 | return f"Error modifying object: {str(e)}" 397 | 398 | 399 | @mcp.tool() 400 | def delete_object(ctx: Context, name: str) -> str: 401 | """ 402 | Delete an object from the Blender scene. 403 | 404 | Parameters: 405 | - name: Name of the object to delete 406 | """ 407 | try: 408 | # Get the global connection 409 | blender = get_blender_connection() 410 | 411 | result = blender.send_command("delete_object", {"name": name}) 412 | return f"Deleted object: {name}" 413 | except Exception as e: 414 | logger.error(f"Error deleting object: {str(e)}") 415 | return f"Error deleting object: {str(e)}" 416 | 417 | 418 | @mcp.tool() 419 | def set_material( 420 | ctx: Context, object_name: str, material_name: str = None, color: List[float] = None 421 | ) -> str: 422 | """ 423 | Set or create a material for an object. 424 | 425 | Parameters: 426 | - object_name: Name of the object to apply the material to 427 | - material_name: Optional name of the material to use or create 428 | - color: Optional [R, G, B] color values (0.0-1.0) 429 | """ 430 | try: 431 | # Get the global connection 432 | blender = get_blender_connection() 433 | 434 | params = {"object_name": object_name} 435 | 436 | if material_name: 437 | params["material_name"] = material_name 438 | if color: 439 | params["color"] = color 440 | 441 | result = blender.send_command("set_material", params) 442 | return f"Applied material to {object_name}: {result.get('material_name', 'unknown')}" 443 | except Exception as e: 444 | logger.error(f"Error setting material: {str(e)}") 445 | return f"Error setting material: {str(e)}" 446 | 447 | 448 | @mcp.tool() 449 | def execute_blender_code(ctx: Context, code: str) -> str: 450 | """ 451 | Execute arbitrary Python code in Blender. 452 | 453 | Parameters: 454 | - code: The Python code to execute 455 | """ 456 | try: 457 | # Get the global connection 458 | blender = get_blender_connection() 459 | 460 | result = blender.send_command("execute_code", {"code": code}) 461 | return f"Code executed successfully: {result.get('result', '')}" 462 | except Exception as e: 463 | logger.error(f"Error executing code: {str(e)}") 464 | return f"Error executing code: {str(e)}" 465 | 466 | 467 | @mcp.tool() 468 | def get_polyhaven_categories(ctx: Context, asset_type: str = "hdris") -> str: 469 | """ 470 | Get a list of categories for a specific asset type on Polyhaven. 471 | 472 | Parameters: 473 | - asset_type: The type of asset to get categories for (hdris, textures, models, all) 474 | """ 475 | try: 476 | blender = get_blender_connection() 477 | if not _polyhaven_enabled: 478 | return "PolyHaven integration is disabled. Select it in the sidebar in BlenderMCP, then run it again." 479 | result = blender.send_command( 480 | "get_polyhaven_categories", {"asset_type": asset_type} 481 | ) 482 | 483 | if "error" in result: 484 | return f"Error: {result['error']}" 485 | 486 | # Format the categories in a more readable way 487 | categories = result["categories"] 488 | formatted_output = f"Categories for {asset_type}:\n\n" 489 | 490 | # Sort categories by count (descending) 491 | sorted_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True) 492 | 493 | for category, count in sorted_categories: 494 | formatted_output += f"- {category}: {count} assets\n" 495 | 496 | return formatted_output 497 | except Exception as e: 498 | logger.error(f"Error getting Polyhaven categories: {str(e)}") 499 | return f"Error getting Polyhaven categories: {str(e)}" 500 | 501 | 502 | @mcp.tool() 503 | def search_polyhaven_assets( 504 | ctx: Context, asset_type: str = "all", categories: str = None 505 | ) -> str: 506 | """ 507 | Search for assets on Polyhaven with optional filtering. 508 | 509 | Parameters: 510 | - asset_type: Type of assets to search for (hdris, textures, models, all) 511 | - categories: Optional comma-separated list of categories to filter by 512 | 513 | Returns a list of matching assets with basic information. 514 | """ 515 | try: 516 | blender = get_blender_connection() 517 | result = blender.send_command( 518 | "search_polyhaven_assets", 519 | {"asset_type": asset_type, "categories": categories}, 520 | ) 521 | 522 | if "error" in result: 523 | return f"Error: {result['error']}" 524 | 525 | # Format the assets in a more readable way 526 | assets = result["assets"] 527 | total_count = result["total_count"] 528 | returned_count = result["returned_count"] 529 | 530 | formatted_output = f"Found {total_count} assets" 531 | if categories: 532 | formatted_output += f" in categories: {categories}" 533 | formatted_output += f"\nShowing {returned_count} assets:\n\n" 534 | 535 | # Sort assets by download count (popularity) 536 | sorted_assets = sorted( 537 | assets.items(), key=lambda x: x[1].get("download_count", 0), reverse=True 538 | ) 539 | 540 | for asset_id, asset_data in sorted_assets: 541 | formatted_output += ( 542 | f"- {asset_data.get('name', asset_id)} (ID: {asset_id})\n" 543 | ) 544 | formatted_output += ( 545 | f" Type: {['HDRI', 'Texture', 'Model'][asset_data.get('type', 0)]}\n" 546 | ) 547 | formatted_output += ( 548 | f" Categories: {', '.join(asset_data.get('categories', []))}\n" 549 | ) 550 | formatted_output += ( 551 | f" Downloads: {asset_data.get('download_count', 'Unknown')}\n\n" 552 | ) 553 | 554 | return formatted_output 555 | except Exception as e: 556 | logger.error(f"Error searching Polyhaven assets: {str(e)}") 557 | return f"Error searching Polyhaven assets: {str(e)}" 558 | 559 | 560 | @mcp.tool() 561 | def download_polyhaven_asset( 562 | ctx: Context, 563 | asset_id: str, 564 | asset_type: str, 565 | resolution: str = "1k", 566 | file_format: str = None, 567 | ) -> str: 568 | """ 569 | Download and import a Polyhaven asset into Blender. 570 | 571 | Parameters: 572 | - asset_id: The ID of the asset to download 573 | - asset_type: The type of asset (hdris, textures, models) 574 | - resolution: The resolution to download (e.g., 1k, 2k, 4k) 575 | - file_format: Optional file format (e.g., hdr, exr for HDRIs; jpg, png for textures; gltf, fbx for models) 576 | 577 | Returns a message indicating success or failure. 578 | """ 579 | try: 580 | blender = get_blender_connection() 581 | result = blender.send_command( 582 | "download_polyhaven_asset", 583 | { 584 | "asset_id": asset_id, 585 | "asset_type": asset_type, 586 | "resolution": resolution, 587 | "file_format": file_format, 588 | }, 589 | ) 590 | 591 | if "error" in result: 592 | return f"Error: {result['error']}" 593 | 594 | if result.get("success"): 595 | message = result.get( 596 | "message", "Asset downloaded and imported successfully" 597 | ) 598 | 599 | # Add additional information based on asset type 600 | if asset_type == "hdris": 601 | return f"{message}. The HDRI has been set as the world environment." 602 | elif asset_type == "textures": 603 | material_name = result.get("material", "") 604 | maps = ", ".join(result.get("maps", [])) 605 | return ( 606 | f"{message}. Created material '{material_name}' with maps: {maps}." 607 | ) 608 | elif asset_type == "models": 609 | return f"{message}. The model has been imported into the current scene." 610 | else: 611 | return message 612 | else: 613 | return f"Failed to download asset: {result.get('message', 'Unknown error')}" 614 | except Exception as e: 615 | logger.error(f"Error downloading Polyhaven asset: {str(e)}") 616 | return f"Error downloading Polyhaven asset: {str(e)}" 617 | 618 | 619 | @mcp.tool() 620 | def set_texture(ctx: Context, object_name: str, texture_id: str) -> str: 621 | """ 622 | Apply a previously downloaded Polyhaven texture to an object. 623 | 624 | Parameters: 625 | - object_name: Name of the object to apply the texture to 626 | - texture_id: ID of the Polyhaven texture to apply (must be downloaded first) 627 | 628 | Returns a message indicating success or failure. 629 | """ 630 | try: 631 | # Get the global connection 632 | blender = get_blender_connection() 633 | 634 | result = blender.send_command( 635 | "set_texture", {"object_name": object_name, "texture_id": texture_id} 636 | ) 637 | 638 | if "error" in result: 639 | return f"Error: {result['error']}" 640 | 641 | if result.get("success"): 642 | material_name = result.get("material", "") 643 | maps = ", ".join(result.get("maps", [])) 644 | 645 | # Add detailed material info 646 | material_info = result.get("material_info", {}) 647 | node_count = material_info.get("node_count", 0) 648 | has_nodes = material_info.get("has_nodes", False) 649 | texture_nodes = material_info.get("texture_nodes", []) 650 | 651 | output = f"Successfully applied texture '{texture_id}' to {object_name}.\n" 652 | output += f"Using material '{material_name}' with maps: {maps}.\n\n" 653 | output += f"Material has nodes: {has_nodes}\n" 654 | output += f"Total node count: {node_count}\n\n" 655 | 656 | if texture_nodes: 657 | output += "Texture nodes:\n" 658 | for node in texture_nodes: 659 | output += f"- {node['name']} using image: {node['image']}\n" 660 | if node["connections"]: 661 | output += " Connections:\n" 662 | for conn in node["connections"]: 663 | output += f" {conn}\n" 664 | else: 665 | output += "No texture nodes found in the material.\n" 666 | 667 | return output 668 | else: 669 | return f"Failed to apply texture: {result.get('message', 'Unknown error')}" 670 | except Exception as e: 671 | logger.error(f"Error applying texture: {str(e)}") 672 | return f"Error applying texture: {str(e)}" 673 | 674 | 675 | @mcp.tool() 676 | def get_polyhaven_status(ctx: Context) -> str: 677 | """ 678 | Check if PolyHaven integration is enabled in Blender. 679 | Returns a message indicating whether PolyHaven features are available. 680 | """ 681 | try: 682 | blender = get_blender_connection() 683 | result = blender.send_command("get_polyhaven_status") 684 | enabled = result.get("enabled", False) 685 | message = result.get("message", "") 686 | 687 | return message 688 | except Exception as e: 689 | logger.error(f"Error checking PolyHaven status: {str(e)}") 690 | return f"Error checking PolyHaven status: {str(e)}" 691 | 692 | 693 | @mcp.prompt() 694 | def asset_creation_strategy() -> str: 695 | """Defines the preferred strategy for creating assets in Blender""" 696 | return """When creating 3D content in Blender, always start by checking if PolyHaven is available: 697 | 698 | 0. Before anything, always check the scene from get_scene_info() 699 | 1. First use get_polyhaven_status() to verify if PolyHaven integration is enabled. 700 | 701 | 2. If PolyHaven is enabled: 702 | - For objects/models: Use download_polyhaven_asset() with asset_type="models" 703 | - For materials/textures: Use download_polyhaven_asset() with asset_type="textures" 704 | - For environment lighting: Use download_polyhaven_asset() with asset_type="hdris" 705 | 706 | 3. If PolyHaven is disabled or when falling back to basic tools: 707 | - create_object() for basic primitives (CUBE, SPHERE, CYLINDER, etc.) 708 | - set_material() for basic colors and materials 709 | 710 | Only fall back to basic creation tools when: 711 | - PolyHaven is disabled 712 | - A simple primitive is explicitly requested 713 | - No suitable PolyHaven asset exists 714 | - The task specifically requires a basic material/color 715 | """ 716 | 717 | 718 | @mcp.tool() 719 | async def create_3d_model_from_text( 720 | describe_the_look_of_object: str, face_limit: int = -1 721 | ) -> Dict[str, Any]: 722 | """ 723 | Create a 3D model from a text description using the Tripo API. 724 | 725 | IMPORTANT: This tool initiates a 3D model generation task but does NOT wait for completion. 726 | After calling this tool, you MUST repeatedly call the get_task_status tool with the returned 727 | task_id until the task status is SUCCESS or a terminal error state. 728 | 729 | Typical workflow: 730 | 1. Call create_3d_model_from_text to start the task 731 | 2. Get the task_id from the response 732 | 3. Call get_task_status with the task_id 733 | 4. If status is not SUCCESS, wait a moment and call get_task_status again 734 | 5. Repeat until status is SUCCESS or a terminal error state 735 | 6. When status is SUCCESS, use the pbr_model_url from the response 736 | 737 | Args: 738 | describe_the_look_of_object: A detailed description of the object to generate. 739 | face_limit: The maximum number of faces in the model. 740 | auto_size: Whether to automatically size the model. 741 | 742 | Returns: 743 | A dictionary containing the task ID and instructions for checking the status. 744 | """ 745 | # Get the API key from environment variable 746 | # api_key = os.environ.get("TRIPO_API_KEY") 747 | get_blender_connection() 748 | blender = get_blender_connection() 749 | api_key = _tripo_apikey 750 | if not api_key: 751 | raise ValueError( 752 | "It may take some more time to correctly re-enter the tripo function. You can first check the scene and then try the tripo function again later" 753 | ) 754 | 755 | # Create the Tripo client 756 | async with TripoClient(api_key=api_key) as client: 757 | # Create a text-to-model task 758 | task_id = await client.text_to_model( 759 | prompt=describe_the_look_of_object, 760 | face_limit=face_limit, 761 | ) 762 | 763 | # Get initial task status 764 | task = await client.get_task(task_id) 765 | 766 | # Return immediately with task ID and status 767 | return { 768 | "task_id": task_id, 769 | "status": str(task.status), 770 | "progress": task.progress, 771 | "message": "Task created successfully. The 3D model generation is in progress.", 772 | "next_step": "You MUST now call get_task_status with this task_id to check progress.", 773 | "important_note": "3D model generation takes 3-5 minutes. You need to repeatedly call get_task_status until completion.", 774 | "workflow": [ 775 | "1. You've completed this step by calling create_3d_model_from_text", 776 | "2. Now call get_task_status with task_id: " + task_id, 777 | "3. If status is not SUCCESS, wait and call get_task_status again", 778 | "4. When status is SUCCESS, use the pbr_model_url from the response", 779 | ], 780 | } 781 | 782 | 783 | @mcp.tool() 784 | async def create_3d_model_from_image( 785 | image: str, face_limit: int = -1 786 | ) -> Dict[str, Any]: 787 | """ 788 | Create a 3D model from an image using the Tripo API. 789 | 790 | IMPORTANT: This tool initiates a 3D model generation task but does NOT wait for completion. 791 | After calling this tool, you MUST repeatedly call the get_task_status tool with the returned 792 | task_id until the task status is SUCCESS or a terminal error state. 793 | 794 | Typical workflow: 795 | 1. Call create_3d_model_from_image to start the task 796 | 2. Get the task_id from the response 797 | 3. Call get_task_status with the task_id 798 | 4. If status is not SUCCESS, wait a moment and call get_task_status again 799 | 5. Repeat until status is SUCCESS or a terminal error state 800 | 6. When status is SUCCESS, use the pbr_model_url from the response 801 | 802 | Args: 803 | image: The local path or url to the image file. 804 | face_limit: The maximum number of faces in the model. 805 | auto_size: Whether to automatically size the model. 806 | 807 | Returns: 808 | A dictionary containing the task ID and instructions for checking the status. 809 | """ 810 | # Get the API key from environment variable 811 | # api_key = os.environ.get("TRIPO_API_KEY") 812 | get_blender_connection() 813 | api_key = _tripo_apikey 814 | if not api_key: 815 | raise ValueError( 816 | "It may take some more time to correctly re-enter the tripo function. You can first check the scene and then try the tripo function again later" 817 | ) 818 | 819 | # Create the Tripo client 820 | async with TripoClient(api_key=api_key) as client: 821 | # Create a text-to-model task 822 | task_id = await client.image_to_model( 823 | image=image, 824 | face_limit=face_limit, 825 | ) 826 | 827 | # Get initial task status 828 | task = await client.get_task(task_id) 829 | 830 | # Return immediately with task ID and status 831 | return { 832 | "task_id": task_id, 833 | "status": str(task.status), 834 | "progress": task.progress, 835 | "message": "Task created successfully. The 3D model generation is in progress.", 836 | "next_step": "You MUST now call get_task_status with this task_id to check progress.", 837 | "important_note": "3D model generation takes 3-5 minutes. You need to repeatedly call get_task_status until completion.", 838 | "workflow": [ 839 | "1. You've completed this step by calling create_3d_model_from_image", 840 | "2. Now call get_task_status with task_id: " + task_id, 841 | "3. If status is not SUCCESS, wait and call get_task_status again", 842 | "4. When status is SUCCESS, use the pbr_model_url from the response", 843 | ], 844 | } 845 | 846 | 847 | @mcp.tool() 848 | def import_tripo_glb_model(ctx: Context, glb_url: str) -> str: 849 | """ 850 | Import a GLB model from URL into Blender scene 851 | 852 | Parameters: 853 | - glb_url: Download URL of the GLB model file 854 | 855 | Returns: 856 | Result message of the import operation 857 | """ 858 | try: 859 | blender = get_blender_connection() 860 | result = blender.send_command("import_tripo_glb_model", {"url": glb_url}) 861 | 862 | if "error" in result: 863 | return f"Import failed: {result['error']}" 864 | 865 | if result.get("status") == "success": 866 | output = ["Successfully imported models:"] 867 | for model in result.get("models", []): 868 | dim = model["dimensions"] 869 | output.append( 870 | f"• {model['name']} | Dimensions: " 871 | f"{dim['x']} x {dim['y']} x {dim['z']} meters" 872 | ) 873 | 874 | if not output: 875 | output.append("No models found in imported file") 876 | 877 | return "\n".join(output) 878 | else: 879 | return f"Import failed: {result.get('message', 'Unknown error')}" 880 | 881 | except Exception as e: 882 | logger.error(f"Error importing GLB model: {str(e)}") 883 | return f"GLB model import failed: {str(e)}" 884 | 885 | 886 | @mcp.tool() 887 | async def get_task_status(task_id: str) -> Dict[str, Any]: 888 | """ 889 | Get the status of a 3D model generation task. 890 | 891 | IMPORTANT: This tool checks the status of a task started by create_3d_model_from_text. 892 | You may need to call this tool MULTIPLE TIMES until the task completes. 893 | 894 | Typical workflow: 895 | 1. Call this tool with the task_id from create_3d_model_from_text 896 | 2. Check the status in the response: 897 | - If status is SUCCESS, the task is complete and you can use the pbr_model_url 898 | - If status is FAILED, CANCELLED, BANNED, or EXPIRED, the task failed 899 | - If status is anything else, the task is still in progress 900 | 3. If the task is still in progress, wait a moment and call this tool again 901 | 902 | Args: 903 | task_id: The ID of the task to check (obtained from create_3d_model_from_text). 904 | 905 | Returns: 906 | A dictionary containing the task status and other information. 907 | """ 908 | # Get the API key from environment variable 909 | # api_key = os.environ.get("TRIPO_API_KEY") 910 | get_blender_connection() 911 | api_key = _tripo_apikey 912 | if not api_key: 913 | raise ValueError( 914 | "It may take some more time to correctly re-enter the tripo function. You can first check the scene and then try the tripo function again later" 915 | ) 916 | 917 | # Create the Tripo client 918 | async with TripoClient(api_key=api_key) as client: 919 | # Get task status 920 | task = await client.get_task(task_id) 921 | 922 | # Ensure task is not None 923 | if task is None: 924 | raise ValueError( 925 | f"Failed to retrieve task information for task ID: {task_id}" 926 | ) 927 | 928 | # Create result dictionary 929 | result = { 930 | "task_id": task_id, 931 | "status": str(task.status), 932 | "progress": task.progress, 933 | } 934 | 935 | # Add output fields if task is successful and output is available 936 | if task.status == TaskStatus.SUCCESS and task.output: 937 | result.update( 938 | { 939 | "base_model_url": task.output.base_model, 940 | "model_url": task.output.model, 941 | "pbr_model_url": task.output.pbr_model, 942 | "rendered_image_url": task.output.rendered_image, 943 | "message": "Task completed successfully! You can now use the pbr_model_url.", 944 | "next_step": "Use the pbr_model_url to access the 3D model, download it through import_tripo_glb_model tool", 945 | } 946 | ) 947 | 948 | if not task.output.pbr_model: 949 | result["warning"] = ( 950 | "Model generated but PBR model URL is not available." 951 | ) 952 | elif task.status == TaskStatus.SUCCESS: 953 | result["message"] = ( 954 | "Task completed successfully but no output data is available." 955 | ) 956 | result["next_step"] = ( 957 | "Try creating a new model with a different description." 958 | ) 959 | elif task.status in ( 960 | TaskStatus.FAILED, 961 | TaskStatus.CANCELLED, 962 | TaskStatus.BANNED, 963 | TaskStatus.EXPIRED, 964 | ): 965 | result["message"] = f"Task failed with status: {task.status}" 966 | result["next_step"] = ( 967 | "Try creating a new model with a different description." 968 | ) 969 | else: 970 | result["message"] = ( 971 | f"Task is still in progress. Current status: {task.status}, Progress: {task.progress}%" 972 | ) 973 | result["next_step"] = ( 974 | "IMPORTANT: You must call get_task_status again with this task_id to continue checking progress." 975 | ) 976 | result["wait_message"] = ( 977 | "3D model generation typically takes 3-5 minutes. Please be patient and keep checking." 978 | ) 979 | 980 | return result 981 | 982 | 983 | def main(): 984 | # mcp.run("sse") 985 | mcp.run(transport="stdio") 986 | 987 | 988 | if __name__ == "__main__": 989 | main() 990 | ```