# Directory Structure
```
├── .github
│ └── workflows
│ ├── publish.yml
│ └── release.yaml
├── .gitignore
├── LICENSE
├── main.py
├── pyproject.toml
├── README.md
├── src
│ └── server.py
└── succeed.jpg
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Python
2 | __pycache__/
3 | *.py[cod]
4 | *$py.class
5 | *.so
6 | .Python
7 | build/
8 | develop-eggs/
9 | dist/
10 | downloads/
11 | eggs/
12 | .eggs/
13 | lib/
14 | lib64/
15 | parts/
16 | sdist/
17 | var/
18 | wheels/
19 | *.egg-info/
20 | .installed.cfg
21 | *.egg
22 |
23 | # Virtual Environment
24 | venv/
25 | ENV/
26 | env/
27 | .env
28 | .venv
29 | env.bak/
30 | venv.bak/
31 |
32 | # IDE specific files
33 | .idea/
34 | .vscode/
35 | *.swp
36 | *.swo
37 | .DS_Store
38 | *.sublime-workspace
39 | *.sublime-project
40 |
41 | # Testing
42 | .tox/
43 | .coverage
44 | .coverage.*
45 | .cache
46 | nosetests.xml
47 | coverage.xml
48 | *.cover
49 | .hypothesis/
50 | .pytest_cache/
51 | htmlcov/
52 |
53 | # Distribution / packaging
54 | .Python
55 | *.manifest
56 | *.spec
57 | pip-log.txt
58 | pip-delete-this-directory.txt
59 |
60 | # Jupyter Notebook
61 | .ipynb_checkpoints
62 |
63 | # pyenv
64 | .python-version
65 |
66 | # mypy
67 | .mypy_cache/
68 | .dmypy.json
69 | dmypy.json
70 |
71 | # Logs and databases
72 | *.log
73 | *.sqlite
74 | *.db
75 |
76 | uv.lock
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # Tripo MCP Server
2 |
3 | Tripo MCP provides an interface between AI assistants and [Tripo AI](https://www.tripo3d.ai) via [Model Context Protocol (MCP)](https://github.com/anthropics/anthropic-cookbook/tree/main/mcp).
4 |
5 | > **Note:** This project is in alpha. Currently, it supports Tripo Blender Addon integration.
6 |
7 | ## Current Features
8 |
9 | - Generate 3D asset from natural language using Tripo's API and import to Blender
10 | - Compatible with Claude and other MCP-enabled AI assistants
11 |
12 | ## Quick Start
13 |
14 | ### Prerequisites
15 | - Python 3.10+
16 | - [Blender](https://www.blender.org/download/)
17 | - [Tripo AI Blender Addon](https://www.tripo3d.ai/app/home)
18 | - Claude for Desktop or Cursor IDE
19 |
20 | ### Installation
21 |
22 | 1. Install Tripo AI Blender Addon from [Tripo AI's website](https://www.tripo3d.ai/app/home)
23 |
24 | 2. Configure the MCP server in Claude Desktop or Cursor.
25 |
26 | * `pip install uv`
27 | * set mcp in cursor
28 | ```json
29 | {
30 | "mcpServers": {
31 | "tripo-mcp": {
32 | "command": "uvx",
33 | "args": [
34 | "tripo-mcp"
35 | ]
36 | }
37 | }
38 | }
39 | ```
40 |
41 | * Then you will get a green dot like this:
42 | 
43 |
44 | ### Usage
45 |
46 | 1. Enable Tripo AI Blender Addon and start blender mcp server.
47 |
48 | 2. Chat using cursor or claude. E.g., "Generate a 3D model of a futuristic chair".
49 |
50 | ## Acknowledgements
51 |
52 | - **[Tripo AI](https://www.tripo3d.ai)**
53 | - **[blender-mcp](https://github.com/ahujasid/blender-mcp)** by [Siddharth Ahuja](https://github.com/ahujasid)
54 |
55 | **Special Thanks**
56 | Special thanks to Siddharth Ahuja for the blender-mcp project, which provided inspiring ideas for MCP + 3D.
57 |
```
--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------
```python
1 | from src.server import main as server_main
2 |
3 |
4 | if __name__ == "__main__":
5 | """Entry point for the Tripo MCP package"""
6 | server_main()
```
--------------------------------------------------------------------------------
/.github/workflows/publish.yml:
--------------------------------------------------------------------------------
```yaml
1 | name: Publish to PyPI
2 |
3 | on:
4 | release:
5 | types: [published]
6 |
7 | jobs:
8 | deploy:
9 | runs-on: ubuntu-latest
10 | steps:
11 | - uses: actions/checkout@v4
12 | - name: Set up Python
13 | uses: actions/setup-python@v4
14 | with:
15 | python-version: '3.10'
16 | - name: Install dependencies
17 | run: |
18 | python -m pip install --upgrade pip
19 | pip install build
20 | - name: Build package
21 | run: python -m build
22 | - name: Publish package
23 | uses: pypa/[email protected]
24 | with:
25 | password: ${{ secrets.PYPI_API_TOKEN }}
```
--------------------------------------------------------------------------------
/.github/workflows/release.yaml:
--------------------------------------------------------------------------------
```yaml
1 | name: Publish to PyPI
2 |
3 | on:
4 | release:
5 | types: [created]
6 |
7 | jobs:
8 | deploy:
9 | runs-on: ubuntu-latest
10 | steps:
11 | - uses: actions/checkout@v3
12 |
13 | - name: Set up Python
14 | uses: actions/setup-python@v4
15 | with:
16 | python-version: '3.x'
17 |
18 | - name: Install dependencies
19 | run: |
20 | python -m pip install --upgrade pip
21 | pip install build twine
22 |
23 | - name: Build package
24 | run: python -m build
25 |
26 | - name: Publish to PyPI
27 | env:
28 | TWINE_USERNAME: __token__
29 | TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }}
30 | run: |
31 | python -m twine upload dist/*
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "tripo-mcp"
3 | version = "0.1.2"
4 | description = "MCP (Model Control Protocol) integration for Tripo"
5 | authors = [
6 | {name = "Allen Dang", email = "[email protected]"},
7 | {name = "pookiefoof", email = "[email protected]"},
8 | {name = "Ding Liang", email = "[email protected]"},
9 | ]
10 | license = {text = "MIT"}
11 | readme = "README.md"
12 | requires-python = ">=3.10"
13 | keywords = ["mcp", "blender", "3d", "automation"]
14 | classifiers = [
15 | "Development Status :: 3 - Alpha",
16 | "Intended Audience :: Developers",
17 | "License :: OSI Approved :: MIT License",
18 | "Programming Language :: Python :: 3",
19 | "Programming Language :: Python :: 3.10",
20 | ]
21 |
22 | dependencies = [
23 | "tripo3d>=0.2.0",
24 | "mcp[cli]>=1.4.1",
25 | ]
26 |
27 | [project.optional-dependencies]
28 | dev = [
29 | ]
30 |
31 | [project.scripts]
32 | tripo-mcp = "server:main"
33 |
34 | [build-system]
35 | requires = ["setuptools>=61.0", "wheel"]
36 | build-backend = "setuptools.build_meta"
37 |
38 | [tool.setuptools]
39 | package-dir = {"" = "src"}
40 |
```
--------------------------------------------------------------------------------
/src/server.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | MIT License
3 |
4 | Copyright (c) 2025 Siddharth Ahuja
5 | Copyright (c) 2025 for additions
6 |
7 | Permission is hereby granted, free of charge, to any person obtaining a copy
8 | of this software and associated documentation files (the "Software"), to deal
9 | in the Software without restriction, including without limitation the rights
10 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 | copies of the Software, and to permit persons to whom the Software is
12 | furnished to do so, subject to the following conditions:
13 |
14 | The above copyright notice and this permission notice shall be included in all
15 | copies or substantial portions of the Software.
16 |
17 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 | SOFTWARE.
24 |
25 | This file is based on work by Siddharth Ahuja, with additional contributions
26 | for Tripo MCP functionality.
27 | """
28 |
29 | from mcp.server.fastmcp import FastMCP, Context
30 | import sys
31 | from pathlib import Path
32 | from typing import Dict, Any
33 | import socket
34 | import json
35 | import logging
36 | from dataclasses import dataclass
37 | from contextlib import asynccontextmanager
38 | from typing import AsyncIterator, Dict, Any, List
39 |
40 | # Configure logging
41 | logging.basicConfig(
42 | level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
43 | )
44 | logger = logging.getLogger("BlenderMCPServer")
45 |
46 | sys.path.insert(0, str(Path(__file__).parent.parent))
47 |
48 | from tripo3d import TripoClient, TaskStatus
49 |
50 |
51 | @dataclass
52 | class BlenderConnection:
53 | host: str
54 | port: int
55 | sock: socket.socket = (
56 | None # Changed from 'socket' to 'sock' to avoid naming conflict
57 | )
58 |
59 | def connect(self) -> bool:
60 | """Connect to the Blender addon socket server"""
61 | if self.sock:
62 | return True
63 |
64 | try:
65 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
66 | self.sock.connect((self.host, self.port))
67 | logger.info(f"Connected to Blender at {self.host}:{self.port}")
68 | return True
69 | except Exception as e:
70 | logger.error(f"Failed to connect to Blender: {str(e)}")
71 | self.sock = None
72 | return False
73 |
74 | def disconnect(self):
75 | """Disconnect from the Blender addon"""
76 | if self.sock:
77 | try:
78 | self.sock.close()
79 | except Exception as e:
80 | logger.error(f"Error disconnecting from Blender: {str(e)}")
81 | finally:
82 | self.sock = None
83 |
84 | def receive_full_response(self, sock, buffer_size=8192):
85 | """Receive the complete response, potentially in multiple chunks"""
86 | chunks = []
87 | # Use a consistent timeout value that matches the addon's timeout
88 | sock.settimeout(15.0) # Match the addon's timeout
89 |
90 | try:
91 | while True:
92 | try:
93 | chunk = sock.recv(buffer_size)
94 | if not chunk:
95 | # If we get an empty chunk, the connection might be closed
96 | if (
97 | not chunks
98 | ): # If we haven't received anything yet, this is an error
99 | raise Exception(
100 | "Connection closed before receiving any data"
101 | )
102 | break
103 |
104 | chunks.append(chunk)
105 |
106 | # Check if we've received a complete JSON object
107 | try:
108 | data = b"".join(chunks)
109 | json.loads(data.decode("utf-8"))
110 | # If we get here, it parsed successfully
111 | logger.info(f"Received complete response ({len(data)} bytes)")
112 | return data
113 | except json.JSONDecodeError:
114 | # Incomplete JSON, continue receiving
115 | continue
116 | except socket.timeout:
117 | # If we hit a timeout during receiving, break the loop and try to use what we have
118 | logger.warning("Socket timeout during chunked receive")
119 | break
120 | except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
121 | logger.error(f"Socket connection error during receive: {str(e)}")
122 | raise # Re-raise to be handled by the caller
123 | except socket.timeout:
124 | logger.warning("Socket timeout during chunked receive")
125 | except Exception as e:
126 | logger.error(f"Error during receive: {str(e)}")
127 | raise
128 |
129 | # If we get here, we either timed out or broke out of the loop
130 | # Try to use what we have
131 | if chunks:
132 | data = b"".join(chunks)
133 | logger.info(f"Returning data after receive completion ({len(data)} bytes)")
134 | try:
135 | # Try to parse what we have
136 | json.loads(data.decode("utf-8"))
137 | return data
138 | except json.JSONDecodeError:
139 | # If we can't parse it, it's incomplete
140 | raise Exception("Incomplete JSON response received")
141 | else:
142 | raise Exception("No data received")
143 |
144 | def send_command(
145 | self, command_type: str, params: Dict[str, Any] = None
146 | ) -> Dict[str, Any]:
147 | """Send a command to Blender and return the response"""
148 | if not self.sock and not self.connect():
149 | raise ConnectionError("Not connected to Blender")
150 |
151 | command = {"type": command_type, "params": params or {}}
152 |
153 | try:
154 | # Log the command being sent
155 | logger.info(f"Sending command: {command_type} with params: {params}")
156 |
157 | # Send the command
158 | self.sock.sendall(json.dumps(command).encode("utf-8"))
159 | logger.info(f"Command sent, waiting for response...")
160 |
161 | # Set a timeout for receiving - use the same timeout as in receive_full_response
162 | self.sock.settimeout(15.0) # Match the addon's timeout
163 |
164 | # Receive the response using the improved receive_full_response method
165 | response_data = self.receive_full_response(self.sock)
166 | logger.info(f"Received {len(response_data)} bytes of data")
167 |
168 | response = json.loads(response_data.decode("utf-8"))
169 | logger.info(f"Response parsed, status: {response.get('status', 'unknown')}")
170 |
171 | if response.get("status") == "error":
172 | logger.error(f"Blender error: {response.get('message')}")
173 | raise Exception(response.get("message", "Unknown error from Blender"))
174 |
175 | return response.get("result", {})
176 | except socket.timeout:
177 | logger.error("Socket timeout while waiting for response from Blender")
178 | # Don't try to reconnect here - let the get_blender_connection handle reconnection
179 | # Just invalidate the current socket so it will be recreated next time
180 | self.sock = None
181 | raise Exception(
182 | "Timeout waiting for Blender response - try simplifying your request"
183 | )
184 | except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
185 | logger.error(f"Socket connection error: {str(e)}")
186 | self.sock = None
187 | raise Exception(f"Connection to Blender lost: {str(e)}")
188 | except json.JSONDecodeError as e:
189 | logger.error(f"Invalid JSON response from Blender: {str(e)}")
190 | # Try to log what was received
191 | if "response_data" in locals() and response_data:
192 | logger.error(f"Raw response (first 200 bytes): {response_data[:200]}")
193 | raise Exception(f"Invalid response from Blender: {str(e)}")
194 | except Exception as e:
195 | logger.error(f"Error communicating with Blender: {str(e)}")
196 | # Don't try to reconnect here - let the get_blender_connection handle reconnection
197 | self.sock = None
198 | raise Exception(f"Communication error with Blender: {str(e)}")
199 |
200 |
201 | @asynccontextmanager
202 | async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
203 | """Manage server startup and shutdown lifecycle"""
204 | # We don't need to create a connection here since we're using the global connection
205 | # for resources and tools
206 |
207 | try:
208 | # Just log that we're starting up
209 | logger.info("Blender server starting up")
210 |
211 | # Try to connect to Blender on startup to verify it's available
212 | try:
213 | # This will initialize the global connection if needed
214 | blender = get_blender_connection()
215 | logger.info("Successfully connected to Blender on startup")
216 | except Exception as e:
217 | logger.warning(f"Could not connect to Blender on startup: {str(e)}")
218 | logger.warning(
219 | "Make sure the Blender addon is running before using Blender resources or tools"
220 | )
221 |
222 | # Return an empty context - we're using the global connection
223 | yield {}
224 | finally:
225 | # Clean up the global connection on shutdown
226 | global _blender_connection
227 | if _blender_connection:
228 | logger.info("Disconnecting from Blender on shutdown")
229 | _blender_connection.disconnect()
230 | _blender_connection = None
231 | logger.info("BlenderMCP server shut down")
232 |
233 |
234 | mcp = FastMCP(
235 | name="Tripo MCP",
236 | instructions="MCP for Tripo Blender addon",
237 | lifespan=server_lifespan,
238 | # host="127.0.0.1",
239 | # port=8392,
240 | )
241 |
242 | _blender_connection = None
243 | _polyhaven_enabled = False # Add this global variable
244 | _tripo_apikey = ""
245 |
246 |
247 | def get_blender_connection():
248 | """Get or create a persistent Blender connection"""
249 | global _blender_connection, _polyhaven_enabled, _tripo_apikey # Add _polyhaven_enabled to globals
250 |
251 | # If we have an existing connection, check if it's still valid
252 | if _blender_connection is not None:
253 | try:
254 | # First check if PolyHaven is enabled by sending a ping command
255 | result = _blender_connection.send_command("get_tripo_apikey")
256 | _tripo_apikey = result.get("api_key", "")
257 | result = _blender_connection.send_command("get_polyhaven_status")
258 | # Store the PolyHaven status globally
259 | _polyhaven_enabled = result.get("enabled", False)
260 |
261 | return _blender_connection
262 | except Exception as e:
263 | # Connection is dead, close it and create a new one
264 | logger.warning(f"Existing connection is no longer valid: {str(e)}")
265 | try:
266 | _blender_connection.disconnect()
267 | except:
268 | pass
269 | _blender_connection = None
270 |
271 | # Create a new connection if needed
272 | if _blender_connection is None:
273 | _blender_connection = BlenderConnection(host="localhost", port=9876)
274 | if not _blender_connection.connect():
275 | logger.error("Failed to connect to Blender")
276 | _blender_connection = None
277 | raise Exception(
278 | "Could not connect to Blender. Make sure the Blender addon is running."
279 | )
280 | logger.info("Created new persistent connection to Blender")
281 |
282 | return _blender_connection
283 |
284 |
285 | @mcp.tool()
286 | def get_scene_info(ctx: Context) -> str:
287 | """Get detailed information about the current Blender scene"""
288 | try:
289 | blender = get_blender_connection()
290 | result = blender.send_command("get_scene_info")
291 |
292 | # Just return the JSON representation of what Blender sent us
293 | return json.dumps(result, indent=2)
294 | except Exception as e:
295 | logger.error(f"Error getting scene info from Blender: {str(e)}")
296 | return f"Error getting scene info: {str(e)}"
297 |
298 |
299 | @mcp.tool()
300 | def get_object_info(ctx: Context, object_name: str) -> str:
301 | """
302 | Get detailed information about a specific object in the Blender scene.
303 |
304 | Parameters:
305 | - object_name: The name of the object to get information about
306 | """
307 | try:
308 | blender = get_blender_connection()
309 | result = blender.send_command("get_object_info", {"name": object_name})
310 |
311 | # Just return the JSON representation of what Blender sent us
312 | return json.dumps(result, indent=2)
313 | except Exception as e:
314 | logger.error(f"Error getting object info from Blender: {str(e)}")
315 | return f"Error getting object info: {str(e)}"
316 |
317 |
318 | @mcp.tool()
319 | def create_object(
320 | ctx: Context,
321 | type: str = "CUBE",
322 | name: str = None,
323 | location: List[float] = None,
324 | rotation: List[float] = None,
325 | scale: List[float] = None,
326 | ) -> str:
327 | """
328 | Create a new object in the Blender scene.
329 |
330 | Parameters:
331 | - type: Object type (CUBE, SPHERE, CYLINDER, PLANE, CONE, TORUS, EMPTY, CAMERA, LIGHT)
332 | - name: Optional name for the object
333 | - location: Optional [x, y, z] location coordinates
334 | - rotation: Optional [x, y, z] rotation in radians
335 | - scale: Optional [x, y, z] scale factors
336 | """
337 | try:
338 | # Get the global connection
339 | blender = get_blender_connection()
340 |
341 | # Set default values for missing parameters
342 | loc = location or [0, 0, 0]
343 | rot = rotation or [0, 0, 0]
344 | sc = scale or [1, 1, 1]
345 |
346 | params = {"type": type, "location": loc, "rotation": rot, "scale": sc}
347 |
348 | if name:
349 | params["name"] = name
350 |
351 | result = blender.send_command("create_object", params)
352 | return f"Created {type} object: {result['name']}"
353 | except Exception as e:
354 | logger.error(f"Error creating object: {str(e)}")
355 | return f"Error creating object: {str(e)}"
356 |
357 |
358 | @mcp.tool()
359 | def modify_object(
360 | ctx: Context,
361 | name: str,
362 | location: List[float] = None,
363 | rotation: List[float] = None,
364 | scale: List[float] = None,
365 | visible: bool = None,
366 | ) -> str:
367 | """
368 | Modify an existing object in the Blender scene.
369 |
370 | Parameters:
371 | - name: Name of the object to modify
372 | - location: Optional [x, y, z] location coordinates
373 | - rotation: Optional [x, y, z] rotation in radians
374 | - scale: Optional [x, y, z] scale factors
375 | - visible: Optional boolean to set visibility
376 | """
377 | try:
378 | # Get the global connection
379 | blender = get_blender_connection()
380 |
381 | params = {"name": name}
382 |
383 | if location is not None:
384 | params["location"] = location
385 | if rotation is not None:
386 | params["rotation"] = rotation
387 | if scale is not None:
388 | params["scale"] = scale
389 | if visible is not None:
390 | params["visible"] = visible
391 |
392 | result = blender.send_command("modify_object", params)
393 | return f"Modified object: {result['name']}"
394 | except Exception as e:
395 | logger.error(f"Error modifying object: {str(e)}")
396 | return f"Error modifying object: {str(e)}"
397 |
398 |
399 | @mcp.tool()
400 | def delete_object(ctx: Context, name: str) -> str:
401 | """
402 | Delete an object from the Blender scene.
403 |
404 | Parameters:
405 | - name: Name of the object to delete
406 | """
407 | try:
408 | # Get the global connection
409 | blender = get_blender_connection()
410 |
411 | result = blender.send_command("delete_object", {"name": name})
412 | return f"Deleted object: {name}"
413 | except Exception as e:
414 | logger.error(f"Error deleting object: {str(e)}")
415 | return f"Error deleting object: {str(e)}"
416 |
417 |
418 | @mcp.tool()
419 | def set_material(
420 | ctx: Context, object_name: str, material_name: str = None, color: List[float] = None
421 | ) -> str:
422 | """
423 | Set or create a material for an object.
424 |
425 | Parameters:
426 | - object_name: Name of the object to apply the material to
427 | - material_name: Optional name of the material to use or create
428 | - color: Optional [R, G, B] color values (0.0-1.0)
429 | """
430 | try:
431 | # Get the global connection
432 | blender = get_blender_connection()
433 |
434 | params = {"object_name": object_name}
435 |
436 | if material_name:
437 | params["material_name"] = material_name
438 | if color:
439 | params["color"] = color
440 |
441 | result = blender.send_command("set_material", params)
442 | return f"Applied material to {object_name}: {result.get('material_name', 'unknown')}"
443 | except Exception as e:
444 | logger.error(f"Error setting material: {str(e)}")
445 | return f"Error setting material: {str(e)}"
446 |
447 |
448 | @mcp.tool()
449 | def execute_blender_code(ctx: Context, code: str) -> str:
450 | """
451 | Execute arbitrary Python code in Blender.
452 |
453 | Parameters:
454 | - code: The Python code to execute
455 | """
456 | try:
457 | # Get the global connection
458 | blender = get_blender_connection()
459 |
460 | result = blender.send_command("execute_code", {"code": code})
461 | return f"Code executed successfully: {result.get('result', '')}"
462 | except Exception as e:
463 | logger.error(f"Error executing code: {str(e)}")
464 | return f"Error executing code: {str(e)}"
465 |
466 |
467 | @mcp.tool()
468 | def get_polyhaven_categories(ctx: Context, asset_type: str = "hdris") -> str:
469 | """
470 | Get a list of categories for a specific asset type on Polyhaven.
471 |
472 | Parameters:
473 | - asset_type: The type of asset to get categories for (hdris, textures, models, all)
474 | """
475 | try:
476 | blender = get_blender_connection()
477 | if not _polyhaven_enabled:
478 | return "PolyHaven integration is disabled. Select it in the sidebar in BlenderMCP, then run it again."
479 | result = blender.send_command(
480 | "get_polyhaven_categories", {"asset_type": asset_type}
481 | )
482 |
483 | if "error" in result:
484 | return f"Error: {result['error']}"
485 |
486 | # Format the categories in a more readable way
487 | categories = result["categories"]
488 | formatted_output = f"Categories for {asset_type}:\n\n"
489 |
490 | # Sort categories by count (descending)
491 | sorted_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True)
492 |
493 | for category, count in sorted_categories:
494 | formatted_output += f"- {category}: {count} assets\n"
495 |
496 | return formatted_output
497 | except Exception as e:
498 | logger.error(f"Error getting Polyhaven categories: {str(e)}")
499 | return f"Error getting Polyhaven categories: {str(e)}"
500 |
501 |
502 | @mcp.tool()
503 | def search_polyhaven_assets(
504 | ctx: Context, asset_type: str = "all", categories: str = None
505 | ) -> str:
506 | """
507 | Search for assets on Polyhaven with optional filtering.
508 |
509 | Parameters:
510 | - asset_type: Type of assets to search for (hdris, textures, models, all)
511 | - categories: Optional comma-separated list of categories to filter by
512 |
513 | Returns a list of matching assets with basic information.
514 | """
515 | try:
516 | blender = get_blender_connection()
517 | result = blender.send_command(
518 | "search_polyhaven_assets",
519 | {"asset_type": asset_type, "categories": categories},
520 | )
521 |
522 | if "error" in result:
523 | return f"Error: {result['error']}"
524 |
525 | # Format the assets in a more readable way
526 | assets = result["assets"]
527 | total_count = result["total_count"]
528 | returned_count = result["returned_count"]
529 |
530 | formatted_output = f"Found {total_count} assets"
531 | if categories:
532 | formatted_output += f" in categories: {categories}"
533 | formatted_output += f"\nShowing {returned_count} assets:\n\n"
534 |
535 | # Sort assets by download count (popularity)
536 | sorted_assets = sorted(
537 | assets.items(), key=lambda x: x[1].get("download_count", 0), reverse=True
538 | )
539 |
540 | for asset_id, asset_data in sorted_assets:
541 | formatted_output += (
542 | f"- {asset_data.get('name', asset_id)} (ID: {asset_id})\n"
543 | )
544 | formatted_output += (
545 | f" Type: {['HDRI', 'Texture', 'Model'][asset_data.get('type', 0)]}\n"
546 | )
547 | formatted_output += (
548 | f" Categories: {', '.join(asset_data.get('categories', []))}\n"
549 | )
550 | formatted_output += (
551 | f" Downloads: {asset_data.get('download_count', 'Unknown')}\n\n"
552 | )
553 |
554 | return formatted_output
555 | except Exception as e:
556 | logger.error(f"Error searching Polyhaven assets: {str(e)}")
557 | return f"Error searching Polyhaven assets: {str(e)}"
558 |
559 |
560 | @mcp.tool()
561 | def download_polyhaven_asset(
562 | ctx: Context,
563 | asset_id: str,
564 | asset_type: str,
565 | resolution: str = "1k",
566 | file_format: str = None,
567 | ) -> str:
568 | """
569 | Download and import a Polyhaven asset into Blender.
570 |
571 | Parameters:
572 | - asset_id: The ID of the asset to download
573 | - asset_type: The type of asset (hdris, textures, models)
574 | - resolution: The resolution to download (e.g., 1k, 2k, 4k)
575 | - file_format: Optional file format (e.g., hdr, exr for HDRIs; jpg, png for textures; gltf, fbx for models)
576 |
577 | Returns a message indicating success or failure.
578 | """
579 | try:
580 | blender = get_blender_connection()
581 | result = blender.send_command(
582 | "download_polyhaven_asset",
583 | {
584 | "asset_id": asset_id,
585 | "asset_type": asset_type,
586 | "resolution": resolution,
587 | "file_format": file_format,
588 | },
589 | )
590 |
591 | if "error" in result:
592 | return f"Error: {result['error']}"
593 |
594 | if result.get("success"):
595 | message = result.get(
596 | "message", "Asset downloaded and imported successfully"
597 | )
598 |
599 | # Add additional information based on asset type
600 | if asset_type == "hdris":
601 | return f"{message}. The HDRI has been set as the world environment."
602 | elif asset_type == "textures":
603 | material_name = result.get("material", "")
604 | maps = ", ".join(result.get("maps", []))
605 | return (
606 | f"{message}. Created material '{material_name}' with maps: {maps}."
607 | )
608 | elif asset_type == "models":
609 | return f"{message}. The model has been imported into the current scene."
610 | else:
611 | return message
612 | else:
613 | return f"Failed to download asset: {result.get('message', 'Unknown error')}"
614 | except Exception as e:
615 | logger.error(f"Error downloading Polyhaven asset: {str(e)}")
616 | return f"Error downloading Polyhaven asset: {str(e)}"
617 |
618 |
619 | @mcp.tool()
620 | def set_texture(ctx: Context, object_name: str, texture_id: str) -> str:
621 | """
622 | Apply a previously downloaded Polyhaven texture to an object.
623 |
624 | Parameters:
625 | - object_name: Name of the object to apply the texture to
626 | - texture_id: ID of the Polyhaven texture to apply (must be downloaded first)
627 |
628 | Returns a message indicating success or failure.
629 | """
630 | try:
631 | # Get the global connection
632 | blender = get_blender_connection()
633 |
634 | result = blender.send_command(
635 | "set_texture", {"object_name": object_name, "texture_id": texture_id}
636 | )
637 |
638 | if "error" in result:
639 | return f"Error: {result['error']}"
640 |
641 | if result.get("success"):
642 | material_name = result.get("material", "")
643 | maps = ", ".join(result.get("maps", []))
644 |
645 | # Add detailed material info
646 | material_info = result.get("material_info", {})
647 | node_count = material_info.get("node_count", 0)
648 | has_nodes = material_info.get("has_nodes", False)
649 | texture_nodes = material_info.get("texture_nodes", [])
650 |
651 | output = f"Successfully applied texture '{texture_id}' to {object_name}.\n"
652 | output += f"Using material '{material_name}' with maps: {maps}.\n\n"
653 | output += f"Material has nodes: {has_nodes}\n"
654 | output += f"Total node count: {node_count}\n\n"
655 |
656 | if texture_nodes:
657 | output += "Texture nodes:\n"
658 | for node in texture_nodes:
659 | output += f"- {node['name']} using image: {node['image']}\n"
660 | if node["connections"]:
661 | output += " Connections:\n"
662 | for conn in node["connections"]:
663 | output += f" {conn}\n"
664 | else:
665 | output += "No texture nodes found in the material.\n"
666 |
667 | return output
668 | else:
669 | return f"Failed to apply texture: {result.get('message', 'Unknown error')}"
670 | except Exception as e:
671 | logger.error(f"Error applying texture: {str(e)}")
672 | return f"Error applying texture: {str(e)}"
673 |
674 |
675 | @mcp.tool()
676 | def get_polyhaven_status(ctx: Context) -> str:
677 | """
678 | Check if PolyHaven integration is enabled in Blender.
679 | Returns a message indicating whether PolyHaven features are available.
680 | """
681 | try:
682 | blender = get_blender_connection()
683 | result = blender.send_command("get_polyhaven_status")
684 | enabled = result.get("enabled", False)
685 | message = result.get("message", "")
686 |
687 | return message
688 | except Exception as e:
689 | logger.error(f"Error checking PolyHaven status: {str(e)}")
690 | return f"Error checking PolyHaven status: {str(e)}"
691 |
692 |
693 | @mcp.prompt()
694 | def asset_creation_strategy() -> str:
695 | """Defines the preferred strategy for creating assets in Blender"""
696 | return """When creating 3D content in Blender, always start by checking if PolyHaven is available:
697 |
698 | 0. Before anything, always check the scene from get_scene_info()
699 | 1. First use get_polyhaven_status() to verify if PolyHaven integration is enabled.
700 |
701 | 2. If PolyHaven is enabled:
702 | - For objects/models: Use download_polyhaven_asset() with asset_type="models"
703 | - For materials/textures: Use download_polyhaven_asset() with asset_type="textures"
704 | - For environment lighting: Use download_polyhaven_asset() with asset_type="hdris"
705 |
706 | 3. If PolyHaven is disabled or when falling back to basic tools:
707 | - create_object() for basic primitives (CUBE, SPHERE, CYLINDER, etc.)
708 | - set_material() for basic colors and materials
709 |
710 | Only fall back to basic creation tools when:
711 | - PolyHaven is disabled
712 | - A simple primitive is explicitly requested
713 | - No suitable PolyHaven asset exists
714 | - The task specifically requires a basic material/color
715 | """
716 |
717 |
718 | @mcp.tool()
719 | async def create_3d_model_from_text(
720 | describe_the_look_of_object: str, face_limit: int = -1
721 | ) -> Dict[str, Any]:
722 | """
723 | Create a 3D model from a text description using the Tripo API.
724 |
725 | IMPORTANT: This tool initiates a 3D model generation task but does NOT wait for completion.
726 | After calling this tool, you MUST repeatedly call the get_task_status tool with the returned
727 | task_id until the task status is SUCCESS or a terminal error state.
728 |
729 | Typical workflow:
730 | 1. Call create_3d_model_from_text to start the task
731 | 2. Get the task_id from the response
732 | 3. Call get_task_status with the task_id
733 | 4. If status is not SUCCESS, wait a moment and call get_task_status again
734 | 5. Repeat until status is SUCCESS or a terminal error state
735 | 6. When status is SUCCESS, use the pbr_model_url from the response
736 |
737 | Args:
738 | describe_the_look_of_object: A detailed description of the object to generate.
739 | face_limit: The maximum number of faces in the model.
740 | auto_size: Whether to automatically size the model.
741 |
742 | Returns:
743 | A dictionary containing the task ID and instructions for checking the status.
744 | """
745 | # Get the API key from environment variable
746 | # api_key = os.environ.get("TRIPO_API_KEY")
747 | get_blender_connection()
748 | blender = get_blender_connection()
749 | api_key = _tripo_apikey
750 | if not api_key:
751 | raise ValueError(
752 | "It may take some more time to correctly re-enter the tripo function. You can first check the scene and then try the tripo function again later"
753 | )
754 |
755 | # Create the Tripo client
756 | async with TripoClient(api_key=api_key) as client:
757 | # Create a text-to-model task
758 | task_id = await client.text_to_model(
759 | prompt=describe_the_look_of_object,
760 | face_limit=face_limit,
761 | )
762 |
763 | # Get initial task status
764 | task = await client.get_task(task_id)
765 |
766 | # Return immediately with task ID and status
767 | return {
768 | "task_id": task_id,
769 | "status": str(task.status),
770 | "progress": task.progress,
771 | "message": "Task created successfully. The 3D model generation is in progress.",
772 | "next_step": "You MUST now call get_task_status with this task_id to check progress.",
773 | "important_note": "3D model generation takes 3-5 minutes. You need to repeatedly call get_task_status until completion.",
774 | "workflow": [
775 | "1. You've completed this step by calling create_3d_model_from_text",
776 | "2. Now call get_task_status with task_id: " + task_id,
777 | "3. If status is not SUCCESS, wait and call get_task_status again",
778 | "4. When status is SUCCESS, use the pbr_model_url from the response",
779 | ],
780 | }
781 |
782 |
783 | @mcp.tool()
784 | async def create_3d_model_from_image(
785 | image: str, face_limit: int = -1
786 | ) -> Dict[str, Any]:
787 | """
788 | Create a 3D model from an image using the Tripo API.
789 |
790 | IMPORTANT: This tool initiates a 3D model generation task but does NOT wait for completion.
791 | After calling this tool, you MUST repeatedly call the get_task_status tool with the returned
792 | task_id until the task status is SUCCESS or a terminal error state.
793 |
794 | Typical workflow:
795 | 1. Call create_3d_model_from_image to start the task
796 | 2. Get the task_id from the response
797 | 3. Call get_task_status with the task_id
798 | 4. If status is not SUCCESS, wait a moment and call get_task_status again
799 | 5. Repeat until status is SUCCESS or a terminal error state
800 | 6. When status is SUCCESS, use the pbr_model_url from the response
801 |
802 | Args:
803 | image: The local path or url to the image file.
804 | face_limit: The maximum number of faces in the model.
805 | auto_size: Whether to automatically size the model.
806 |
807 | Returns:
808 | A dictionary containing the task ID and instructions for checking the status.
809 | """
810 | # Get the API key from environment variable
811 | # api_key = os.environ.get("TRIPO_API_KEY")
812 | get_blender_connection()
813 | api_key = _tripo_apikey
814 | if not api_key:
815 | raise ValueError(
816 | "It may take some more time to correctly re-enter the tripo function. You can first check the scene and then try the tripo function again later"
817 | )
818 |
819 | # Create the Tripo client
820 | async with TripoClient(api_key=api_key) as client:
821 | # Create a text-to-model task
822 | task_id = await client.image_to_model(
823 | image=image,
824 | face_limit=face_limit,
825 | )
826 |
827 | # Get initial task status
828 | task = await client.get_task(task_id)
829 |
830 | # Return immediately with task ID and status
831 | return {
832 | "task_id": task_id,
833 | "status": str(task.status),
834 | "progress": task.progress,
835 | "message": "Task created successfully. The 3D model generation is in progress.",
836 | "next_step": "You MUST now call get_task_status with this task_id to check progress.",
837 | "important_note": "3D model generation takes 3-5 minutes. You need to repeatedly call get_task_status until completion.",
838 | "workflow": [
839 | "1. You've completed this step by calling create_3d_model_from_image",
840 | "2. Now call get_task_status with task_id: " + task_id,
841 | "3. If status is not SUCCESS, wait and call get_task_status again",
842 | "4. When status is SUCCESS, use the pbr_model_url from the response",
843 | ],
844 | }
845 |
846 |
847 | @mcp.tool()
848 | def import_tripo_glb_model(ctx: Context, glb_url: str) -> str:
849 | """
850 | Import a GLB model from URL into Blender scene
851 |
852 | Parameters:
853 | - glb_url: Download URL of the GLB model file
854 |
855 | Returns:
856 | Result message of the import operation
857 | """
858 | try:
859 | blender = get_blender_connection()
860 | result = blender.send_command("import_tripo_glb_model", {"url": glb_url})
861 |
862 | if "error" in result:
863 | return f"Import failed: {result['error']}"
864 |
865 | if result.get("status") == "success":
866 | output = ["Successfully imported models:"]
867 | for model in result.get("models", []):
868 | dim = model["dimensions"]
869 | output.append(
870 | f"• {model['name']} | Dimensions: "
871 | f"{dim['x']} x {dim['y']} x {dim['z']} meters"
872 | )
873 |
874 | if not output:
875 | output.append("No models found in imported file")
876 |
877 | return "\n".join(output)
878 | else:
879 | return f"Import failed: {result.get('message', 'Unknown error')}"
880 |
881 | except Exception as e:
882 | logger.error(f"Error importing GLB model: {str(e)}")
883 | return f"GLB model import failed: {str(e)}"
884 |
885 |
886 | @mcp.tool()
887 | async def get_task_status(task_id: str) -> Dict[str, Any]:
888 | """
889 | Get the status of a 3D model generation task.
890 |
891 | IMPORTANT: This tool checks the status of a task started by create_3d_model_from_text.
892 | You may need to call this tool MULTIPLE TIMES until the task completes.
893 |
894 | Typical workflow:
895 | 1. Call this tool with the task_id from create_3d_model_from_text
896 | 2. Check the status in the response:
897 | - If status is SUCCESS, the task is complete and you can use the pbr_model_url
898 | - If status is FAILED, CANCELLED, BANNED, or EXPIRED, the task failed
899 | - If status is anything else, the task is still in progress
900 | 3. If the task is still in progress, wait a moment and call this tool again
901 |
902 | Args:
903 | task_id: The ID of the task to check (obtained from create_3d_model_from_text).
904 |
905 | Returns:
906 | A dictionary containing the task status and other information.
907 | """
908 | # Get the API key from environment variable
909 | # api_key = os.environ.get("TRIPO_API_KEY")
910 | get_blender_connection()
911 | api_key = _tripo_apikey
912 | if not api_key:
913 | raise ValueError(
914 | "It may take some more time to correctly re-enter the tripo function. You can first check the scene and then try the tripo function again later"
915 | )
916 |
917 | # Create the Tripo client
918 | async with TripoClient(api_key=api_key) as client:
919 | # Get task status
920 | task = await client.get_task(task_id)
921 |
922 | # Ensure task is not None
923 | if task is None:
924 | raise ValueError(
925 | f"Failed to retrieve task information for task ID: {task_id}"
926 | )
927 |
928 | # Create result dictionary
929 | result = {
930 | "task_id": task_id,
931 | "status": str(task.status),
932 | "progress": task.progress,
933 | }
934 |
935 | # Add output fields if task is successful and output is available
936 | if task.status == TaskStatus.SUCCESS and task.output:
937 | result.update(
938 | {
939 | "base_model_url": task.output.base_model,
940 | "model_url": task.output.model,
941 | "pbr_model_url": task.output.pbr_model,
942 | "rendered_image_url": task.output.rendered_image,
943 | "message": "Task completed successfully! You can now use the pbr_model_url.",
944 | "next_step": "Use the pbr_model_url to access the 3D model, download it through import_tripo_glb_model tool",
945 | }
946 | )
947 |
948 | if not task.output.pbr_model:
949 | result["warning"] = (
950 | "Model generated but PBR model URL is not available."
951 | )
952 | elif task.status == TaskStatus.SUCCESS:
953 | result["message"] = (
954 | "Task completed successfully but no output data is available."
955 | )
956 | result["next_step"] = (
957 | "Try creating a new model with a different description."
958 | )
959 | elif task.status in (
960 | TaskStatus.FAILED,
961 | TaskStatus.CANCELLED,
962 | TaskStatus.BANNED,
963 | TaskStatus.EXPIRED,
964 | ):
965 | result["message"] = f"Task failed with status: {task.status}"
966 | result["next_step"] = (
967 | "Try creating a new model with a different description."
968 | )
969 | else:
970 | result["message"] = (
971 | f"Task is still in progress. Current status: {task.status}, Progress: {task.progress}%"
972 | )
973 | result["next_step"] = (
974 | "IMPORTANT: You must call get_task_status again with this task_id to continue checking progress."
975 | )
976 | result["wait_message"] = (
977 | "3D model generation typically takes 3-5 minutes. Please be patient and keep checking."
978 | )
979 |
980 | return result
981 |
982 |
983 | def main():
984 | # mcp.run("sse")
985 | mcp.run(transport="stdio")
986 |
987 |
988 | if __name__ == "__main__":
989 | main()
990 |
```