#
tokens: 12400/50000 7/7 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .github
│   └── workflows
│       ├── publish.yml
│       └── release.yaml
├── .gitignore
├── LICENSE
├── main.py
├── pyproject.toml
├── README.md
├── src
│   └── server.py
└── succeed.jpg
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python
 2 | __pycache__/
 3 | *.py[cod]
 4 | *$py.class
 5 | *.so
 6 | .Python
 7 | build/
 8 | develop-eggs/
 9 | dist/
10 | downloads/
11 | eggs/
12 | .eggs/
13 | lib/
14 | lib64/
15 | parts/
16 | sdist/
17 | var/
18 | wheels/
19 | *.egg-info/
20 | .installed.cfg
21 | *.egg
22 | 
23 | # Virtual Environment
24 | venv/
25 | ENV/
26 | env/
27 | .env
28 | .venv
29 | env.bak/
30 | venv.bak/
31 | 
32 | # IDE specific files
33 | .idea/
34 | .vscode/
35 | *.swp
36 | *.swo
37 | .DS_Store
38 | *.sublime-workspace
39 | *.sublime-project
40 | 
41 | # Testing
42 | .tox/
43 | .coverage
44 | .coverage.*
45 | .cache
46 | nosetests.xml
47 | coverage.xml
48 | *.cover
49 | .hypothesis/
50 | .pytest_cache/
51 | htmlcov/
52 | 
53 | # Distribution / packaging
54 | .Python
55 | *.manifest
56 | *.spec
57 | pip-log.txt
58 | pip-delete-this-directory.txt
59 | 
60 | # Jupyter Notebook
61 | .ipynb_checkpoints
62 | 
63 | # pyenv
64 | .python-version
65 | 
66 | # mypy
67 | .mypy_cache/
68 | .dmypy.json
69 | dmypy.json
70 | 
71 | # Logs and databases
72 | *.log
73 | *.sqlite
74 | *.db
75 | 
76 | uv.lock
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
 1 | # Tripo MCP Server
 2 | 
 3 | Tripo MCP provides an interface between AI assistants and [Tripo AI](https://www.tripo3d.ai) via [Model Context Protocol (MCP)](https://github.com/anthropics/anthropic-cookbook/tree/main/mcp). 
 4 | 
 5 | > **Note:** This project is in alpha. Currently, it supports Tripo Blender Addon integration.
 6 | 
 7 | ## Current Features
 8 | 
 9 | - Generate 3D asset from natural language using Tripo's API and import to Blender
10 | - Compatible with Claude and other MCP-enabled AI assistants
11 | 
12 | ## Quick Start
13 | 
14 | ### Prerequisites
15 | - Python 3.10+
16 | - [Blender](https://www.blender.org/download/)
17 | - [Tripo AI Blender Addon](https://www.tripo3d.ai/app/home)
18 | - Claude for Desktop or Cursor IDE
19 | 
20 | ### Installation
21 | 
22 | 1. Install Tripo AI Blender Addon from [Tripo AI's website](https://www.tripo3d.ai/app/home)
23 | 
24 | 2. Configure the MCP server in Claude Desktop or Cursor.
25 | 
26 |     * `pip install uv`
27 |     * set mcp in cursor
28 |     ```json
29 |     {
30 |       "mcpServers": {
31 |         "tripo-mcp": {
32 |           "command": "uvx",
33 |           "args": [
34 |             "tripo-mcp"
35 |           ]
36 |         }
37 |       }
38 |     }
39 |     ```
40 | 
41 |     * Then you will get a green dot like this:
42 |       ![img](succeed.jpg)
43 | 
44 | ### Usage
45 | 
46 | 1. Enable Tripo AI Blender Addon and start blender mcp server.
47 | 
48 | 2. Chat using cursor or claude. E.g., "Generate a 3D model of a futuristic chair".
49 | 
50 | ## Acknowledgements
51 | 
52 | - **[Tripo AI](https://www.tripo3d.ai)**
53 | - **[blender-mcp](https://github.com/ahujasid/blender-mcp)** by [Siddharth Ahuja](https://github.com/ahujasid)
54 | 
55 | **Special Thanks**  
56 | Special thanks to Siddharth Ahuja for the blender-mcp project, which provided inspiring ideas for MCP + 3D.
57 | 
```

--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------

```python
1 | from src.server import main as server_main
2 | 
3 | 
4 | if __name__ == "__main__":
5 |     """Entry point for the Tripo MCP package"""
6 |     server_main()
```

--------------------------------------------------------------------------------
/.github/workflows/publish.yml:
--------------------------------------------------------------------------------

```yaml
 1 | name: Publish to PyPI
 2 | 
 3 | on:
 4 |   release:
 5 |     types: [published]
 6 | 
 7 | jobs:
 8 |   deploy:
 9 |     runs-on: ubuntu-latest
10 |     steps:
11 |     - uses: actions/checkout@v4
12 |     - name: Set up Python
13 |       uses: actions/setup-python@v4
14 |       with:
15 |         python-version: '3.10'
16 |     - name: Install dependencies
17 |       run: |
18 |         python -m pip install --upgrade pip
19 |         pip install build
20 |     - name: Build package
21 |       run: python -m build
22 |     - name: Publish package
23 |       uses: pypa/[email protected]
24 |       with:
25 |         password: ${{ secrets.PYPI_API_TOKEN }} 
```

--------------------------------------------------------------------------------
/.github/workflows/release.yaml:
--------------------------------------------------------------------------------

```yaml
 1 | name: Publish to PyPI
 2 | 
 3 | on:
 4 |   release:
 5 |     types: [created]
 6 | 
 7 | jobs:
 8 |   deploy:
 9 |     runs-on: ubuntu-latest
10 |     steps:
11 |     - uses: actions/checkout@v3
12 |     
13 |     - name: Set up Python
14 |       uses: actions/setup-python@v4
15 |       with:
16 |         python-version: '3.x'
17 |     
18 |     - name: Install dependencies
19 |       run: |
20 |         python -m pip install --upgrade pip
21 |         pip install build twine
22 |     
23 |     - name: Build package
24 |       run: python -m build
25 |     
26 |     - name: Publish to PyPI
27 |       env:
28 |         TWINE_USERNAME: __token__
29 |         TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }}
30 |       run: |
31 |         python -m twine upload dist/* 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "tripo-mcp"
 3 | version = "0.1.2"
 4 | description = "MCP (Model Control Protocol) integration for Tripo"
 5 | authors = [
 6 |     {name = "Allen Dang", email = "[email protected]"},
 7 |     {name = "pookiefoof", email = "[email protected]"},
 8 |     {name = "Ding Liang", email = "[email protected]"},
 9 | ]
10 | license = {text = "MIT"}
11 | readme = "README.md"
12 | requires-python = ">=3.10"
13 | keywords = ["mcp", "blender", "3d", "automation"]
14 | classifiers = [
15 |     "Development Status :: 3 - Alpha",
16 |     "Intended Audience :: Developers",
17 |     "License :: OSI Approved :: MIT License",
18 |     "Programming Language :: Python :: 3",
19 |     "Programming Language :: Python :: 3.10",
20 | ]
21 | 
22 | dependencies = [
23 |     "tripo3d>=0.2.0",
24 |     "mcp[cli]>=1.4.1",
25 | ]
26 | 
27 | [project.optional-dependencies]
28 | dev = [
29 | ]
30 | 
31 | [project.scripts]
32 | tripo-mcp = "server:main"
33 | 
34 | [build-system]
35 | requires = ["setuptools>=61.0", "wheel"]
36 | build-backend = "setuptools.build_meta"
37 | 
38 | [tool.setuptools]
39 | package-dir = {"" = "src"}
40 | 
```

--------------------------------------------------------------------------------
/src/server.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | MIT License
  3 | 
  4 | Copyright (c) 2025 Siddharth Ahuja
  5 | Copyright (c) 2025 for additions
  6 | 
  7 | Permission is hereby granted, free of charge, to any person obtaining a copy
  8 | of this software and associated documentation files (the "Software"), to deal
  9 | in the Software without restriction, including without limitation the rights
 10 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 11 | copies of the Software, and to permit persons to whom the Software is
 12 | furnished to do so, subject to the following conditions:
 13 | 
 14 | The above copyright notice and this permission notice shall be included in all
 15 | copies or substantial portions of the Software.
 16 | 
 17 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 18 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 19 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 20 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 21 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 22 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 23 | SOFTWARE.
 24 | 
 25 | This file is based on work by Siddharth Ahuja, with additional contributions
 26 | for Tripo MCP functionality.
 27 | """
 28 | 
 29 | from mcp.server.fastmcp import FastMCP, Context
 30 | import sys
 31 | from pathlib import Path
 32 | from typing import Dict, Any
 33 | import socket
 34 | import json
 35 | import logging
 36 | from dataclasses import dataclass
 37 | from contextlib import asynccontextmanager
 38 | from typing import AsyncIterator, Dict, Any, List
 39 | 
 40 | # Configure logging
 41 | logging.basicConfig(
 42 |     level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
 43 | )
 44 | logger = logging.getLogger("BlenderMCPServer")
 45 | 
 46 | sys.path.insert(0, str(Path(__file__).parent.parent))
 47 | 
 48 | from tripo3d import TripoClient, TaskStatus
 49 | 
 50 | 
 51 | @dataclass
 52 | class BlenderConnection:
 53 |     host: str
 54 |     port: int
 55 |     sock: socket.socket = (
 56 |         None  # Changed from 'socket' to 'sock' to avoid naming conflict
 57 |     )
 58 | 
 59 |     def connect(self) -> bool:
 60 |         """Connect to the Blender addon socket server"""
 61 |         if self.sock:
 62 |             return True
 63 | 
 64 |         try:
 65 |             self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 66 |             self.sock.connect((self.host, self.port))
 67 |             logger.info(f"Connected to Blender at {self.host}:{self.port}")
 68 |             return True
 69 |         except Exception as e:
 70 |             logger.error(f"Failed to connect to Blender: {str(e)}")
 71 |             self.sock = None
 72 |             return False
 73 | 
 74 |     def disconnect(self):
 75 |         """Disconnect from the Blender addon"""
 76 |         if self.sock:
 77 |             try:
 78 |                 self.sock.close()
 79 |             except Exception as e:
 80 |                 logger.error(f"Error disconnecting from Blender: {str(e)}")
 81 |             finally:
 82 |                 self.sock = None
 83 | 
 84 |     def receive_full_response(self, sock, buffer_size=8192):
 85 |         """Receive the complete response, potentially in multiple chunks"""
 86 |         chunks = []
 87 |         # Use a consistent timeout value that matches the addon's timeout
 88 |         sock.settimeout(15.0)  # Match the addon's timeout
 89 | 
 90 |         try:
 91 |             while True:
 92 |                 try:
 93 |                     chunk = sock.recv(buffer_size)
 94 |                     if not chunk:
 95 |                         # If we get an empty chunk, the connection might be closed
 96 |                         if (
 97 |                             not chunks
 98 |                         ):  # If we haven't received anything yet, this is an error
 99 |                             raise Exception(
100 |                                 "Connection closed before receiving any data"
101 |                             )
102 |                         break
103 | 
104 |                     chunks.append(chunk)
105 | 
106 |                     # Check if we've received a complete JSON object
107 |                     try:
108 |                         data = b"".join(chunks)
109 |                         json.loads(data.decode("utf-8"))
110 |                         # If we get here, it parsed successfully
111 |                         logger.info(f"Received complete response ({len(data)} bytes)")
112 |                         return data
113 |                     except json.JSONDecodeError:
114 |                         # Incomplete JSON, continue receiving
115 |                         continue
116 |                 except socket.timeout:
117 |                     # If we hit a timeout during receiving, break the loop and try to use what we have
118 |                     logger.warning("Socket timeout during chunked receive")
119 |                     break
120 |                 except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
121 |                     logger.error(f"Socket connection error during receive: {str(e)}")
122 |                     raise  # Re-raise to be handled by the caller
123 |         except socket.timeout:
124 |             logger.warning("Socket timeout during chunked receive")
125 |         except Exception as e:
126 |             logger.error(f"Error during receive: {str(e)}")
127 |             raise
128 | 
129 |         # If we get here, we either timed out or broke out of the loop
130 |         # Try to use what we have
131 |         if chunks:
132 |             data = b"".join(chunks)
133 |             logger.info(f"Returning data after receive completion ({len(data)} bytes)")
134 |             try:
135 |                 # Try to parse what we have
136 |                 json.loads(data.decode("utf-8"))
137 |                 return data
138 |             except json.JSONDecodeError:
139 |                 # If we can't parse it, it's incomplete
140 |                 raise Exception("Incomplete JSON response received")
141 |         else:
142 |             raise Exception("No data received")
143 | 
144 |     def send_command(
145 |         self, command_type: str, params: Dict[str, Any] = None
146 |     ) -> Dict[str, Any]:
147 |         """Send a command to Blender and return the response"""
148 |         if not self.sock and not self.connect():
149 |             raise ConnectionError("Not connected to Blender")
150 | 
151 |         command = {"type": command_type, "params": params or {}}
152 | 
153 |         try:
154 |             # Log the command being sent
155 |             logger.info(f"Sending command: {command_type} with params: {params}")
156 | 
157 |             # Send the command
158 |             self.sock.sendall(json.dumps(command).encode("utf-8"))
159 |             logger.info(f"Command sent, waiting for response...")
160 | 
161 |             # Set a timeout for receiving - use the same timeout as in receive_full_response
162 |             self.sock.settimeout(15.0)  # Match the addon's timeout
163 | 
164 |             # Receive the response using the improved receive_full_response method
165 |             response_data = self.receive_full_response(self.sock)
166 |             logger.info(f"Received {len(response_data)} bytes of data")
167 | 
168 |             response = json.loads(response_data.decode("utf-8"))
169 |             logger.info(f"Response parsed, status: {response.get('status', 'unknown')}")
170 | 
171 |             if response.get("status") == "error":
172 |                 logger.error(f"Blender error: {response.get('message')}")
173 |                 raise Exception(response.get("message", "Unknown error from Blender"))
174 | 
175 |             return response.get("result", {})
176 |         except socket.timeout:
177 |             logger.error("Socket timeout while waiting for response from Blender")
178 |             # Don't try to reconnect here - let the get_blender_connection handle reconnection
179 |             # Just invalidate the current socket so it will be recreated next time
180 |             self.sock = None
181 |             raise Exception(
182 |                 "Timeout waiting for Blender response - try simplifying your request"
183 |             )
184 |         except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
185 |             logger.error(f"Socket connection error: {str(e)}")
186 |             self.sock = None
187 |             raise Exception(f"Connection to Blender lost: {str(e)}")
188 |         except json.JSONDecodeError as e:
189 |             logger.error(f"Invalid JSON response from Blender: {str(e)}")
190 |             # Try to log what was received
191 |             if "response_data" in locals() and response_data:
192 |                 logger.error(f"Raw response (first 200 bytes): {response_data[:200]}")
193 |             raise Exception(f"Invalid response from Blender: {str(e)}")
194 |         except Exception as e:
195 |             logger.error(f"Error communicating with Blender: {str(e)}")
196 |             # Don't try to reconnect here - let the get_blender_connection handle reconnection
197 |             self.sock = None
198 |             raise Exception(f"Communication error with Blender: {str(e)}")
199 | 
200 | 
201 | @asynccontextmanager
202 | async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
203 |     """Manage server startup and shutdown lifecycle"""
204 |     # We don't need to create a connection here since we're using the global connection
205 |     # for resources and tools
206 | 
207 |     try:
208 |         # Just log that we're starting up
209 |         logger.info("Blender server starting up")
210 | 
211 |         # Try to connect to Blender on startup to verify it's available
212 |         try:
213 |             # This will initialize the global connection if needed
214 |             blender = get_blender_connection()
215 |             logger.info("Successfully connected to Blender on startup")
216 |         except Exception as e:
217 |             logger.warning(f"Could not connect to Blender on startup: {str(e)}")
218 |             logger.warning(
219 |                 "Make sure the Blender addon is running before using Blender resources or tools"
220 |             )
221 | 
222 |         # Return an empty context - we're using the global connection
223 |         yield {}
224 |     finally:
225 |         # Clean up the global connection on shutdown
226 |         global _blender_connection
227 |         if _blender_connection:
228 |             logger.info("Disconnecting from Blender on shutdown")
229 |             _blender_connection.disconnect()
230 |             _blender_connection = None
231 |         logger.info("BlenderMCP server shut down")
232 | 
233 | 
234 | mcp = FastMCP(
235 |     name="Tripo MCP",
236 |     instructions="MCP for Tripo Blender addon",
237 |     lifespan=server_lifespan,
238 |     # host="127.0.0.1",
239 |     # port=8392,
240 | )
241 | 
242 | _blender_connection = None
243 | _polyhaven_enabled = False  # Add this global variable
244 | _tripo_apikey = ""
245 | 
246 | 
247 | def get_blender_connection():
248 |     """Get or create a persistent Blender connection"""
249 |     global _blender_connection, _polyhaven_enabled, _tripo_apikey  # Add _polyhaven_enabled to globals
250 | 
251 |     # If we have an existing connection, check if it's still valid
252 |     if _blender_connection is not None:
253 |         try:
254 |             # First check if PolyHaven is enabled by sending a ping command
255 |             result = _blender_connection.send_command("get_tripo_apikey")
256 |             _tripo_apikey = result.get("api_key", "")
257 |             result = _blender_connection.send_command("get_polyhaven_status")
258 |             # Store the PolyHaven status globally
259 |             _polyhaven_enabled = result.get("enabled", False)
260 | 
261 |             return _blender_connection
262 |         except Exception as e:
263 |             # Connection is dead, close it and create a new one
264 |             logger.warning(f"Existing connection is no longer valid: {str(e)}")
265 |             try:
266 |                 _blender_connection.disconnect()
267 |             except:
268 |                 pass
269 |             _blender_connection = None
270 | 
271 |     # Create a new connection if needed
272 |     if _blender_connection is None:
273 |         _blender_connection = BlenderConnection(host="localhost", port=9876)
274 |         if not _blender_connection.connect():
275 |             logger.error("Failed to connect to Blender")
276 |             _blender_connection = None
277 |             raise Exception(
278 |                 "Could not connect to Blender. Make sure the Blender addon is running."
279 |             )
280 |         logger.info("Created new persistent connection to Blender")
281 | 
282 |     return _blender_connection
283 | 
284 | 
285 | @mcp.tool()
286 | def get_scene_info(ctx: Context) -> str:
287 |     """Get detailed information about the current Blender scene"""
288 |     try:
289 |         blender = get_blender_connection()
290 |         result = blender.send_command("get_scene_info")
291 | 
292 |         # Just return the JSON representation of what Blender sent us
293 |         return json.dumps(result, indent=2)
294 |     except Exception as e:
295 |         logger.error(f"Error getting scene info from Blender: {str(e)}")
296 |         return f"Error getting scene info: {str(e)}"
297 | 
298 | 
299 | @mcp.tool()
300 | def get_object_info(ctx: Context, object_name: str) -> str:
301 |     """
302 |     Get detailed information about a specific object in the Blender scene.
303 | 
304 |     Parameters:
305 |     - object_name: The name of the object to get information about
306 |     """
307 |     try:
308 |         blender = get_blender_connection()
309 |         result = blender.send_command("get_object_info", {"name": object_name})
310 | 
311 |         # Just return the JSON representation of what Blender sent us
312 |         return json.dumps(result, indent=2)
313 |     except Exception as e:
314 |         logger.error(f"Error getting object info from Blender: {str(e)}")
315 |         return f"Error getting object info: {str(e)}"
316 | 
317 | 
318 | @mcp.tool()
319 | def create_object(
320 |     ctx: Context,
321 |     type: str = "CUBE",
322 |     name: str = None,
323 |     location: List[float] = None,
324 |     rotation: List[float] = None,
325 |     scale: List[float] = None,
326 | ) -> str:
327 |     """
328 |     Create a new object in the Blender scene.
329 | 
330 |     Parameters:
331 |     - type: Object type (CUBE, SPHERE, CYLINDER, PLANE, CONE, TORUS, EMPTY, CAMERA, LIGHT)
332 |     - name: Optional name for the object
333 |     - location: Optional [x, y, z] location coordinates
334 |     - rotation: Optional [x, y, z] rotation in radians
335 |     - scale: Optional [x, y, z] scale factors
336 |     """
337 |     try:
338 |         # Get the global connection
339 |         blender = get_blender_connection()
340 | 
341 |         # Set default values for missing parameters
342 |         loc = location or [0, 0, 0]
343 |         rot = rotation or [0, 0, 0]
344 |         sc = scale or [1, 1, 1]
345 | 
346 |         params = {"type": type, "location": loc, "rotation": rot, "scale": sc}
347 | 
348 |         if name:
349 |             params["name"] = name
350 | 
351 |         result = blender.send_command("create_object", params)
352 |         return f"Created {type} object: {result['name']}"
353 |     except Exception as e:
354 |         logger.error(f"Error creating object: {str(e)}")
355 |         return f"Error creating object: {str(e)}"
356 | 
357 | 
358 | @mcp.tool()
359 | def modify_object(
360 |     ctx: Context,
361 |     name: str,
362 |     location: List[float] = None,
363 |     rotation: List[float] = None,
364 |     scale: List[float] = None,
365 |     visible: bool = None,
366 | ) -> str:
367 |     """
368 |     Modify an existing object in the Blender scene.
369 | 
370 |     Parameters:
371 |     - name: Name of the object to modify
372 |     - location: Optional [x, y, z] location coordinates
373 |     - rotation: Optional [x, y, z] rotation in radians
374 |     - scale: Optional [x, y, z] scale factors
375 |     - visible: Optional boolean to set visibility
376 |     """
377 |     try:
378 |         # Get the global connection
379 |         blender = get_blender_connection()
380 | 
381 |         params = {"name": name}
382 | 
383 |         if location is not None:
384 |             params["location"] = location
385 |         if rotation is not None:
386 |             params["rotation"] = rotation
387 |         if scale is not None:
388 |             params["scale"] = scale
389 |         if visible is not None:
390 |             params["visible"] = visible
391 | 
392 |         result = blender.send_command("modify_object", params)
393 |         return f"Modified object: {result['name']}"
394 |     except Exception as e:
395 |         logger.error(f"Error modifying object: {str(e)}")
396 |         return f"Error modifying object: {str(e)}"
397 | 
398 | 
399 | @mcp.tool()
400 | def delete_object(ctx: Context, name: str) -> str:
401 |     """
402 |     Delete an object from the Blender scene.
403 | 
404 |     Parameters:
405 |     - name: Name of the object to delete
406 |     """
407 |     try:
408 |         # Get the global connection
409 |         blender = get_blender_connection()
410 | 
411 |         result = blender.send_command("delete_object", {"name": name})
412 |         return f"Deleted object: {name}"
413 |     except Exception as e:
414 |         logger.error(f"Error deleting object: {str(e)}")
415 |         return f"Error deleting object: {str(e)}"
416 | 
417 | 
418 | @mcp.tool()
419 | def set_material(
420 |     ctx: Context, object_name: str, material_name: str = None, color: List[float] = None
421 | ) -> str:
422 |     """
423 |     Set or create a material for an object.
424 | 
425 |     Parameters:
426 |     - object_name: Name of the object to apply the material to
427 |     - material_name: Optional name of the material to use or create
428 |     - color: Optional [R, G, B] color values (0.0-1.0)
429 |     """
430 |     try:
431 |         # Get the global connection
432 |         blender = get_blender_connection()
433 | 
434 |         params = {"object_name": object_name}
435 | 
436 |         if material_name:
437 |             params["material_name"] = material_name
438 |         if color:
439 |             params["color"] = color
440 | 
441 |         result = blender.send_command("set_material", params)
442 |         return f"Applied material to {object_name}: {result.get('material_name', 'unknown')}"
443 |     except Exception as e:
444 |         logger.error(f"Error setting material: {str(e)}")
445 |         return f"Error setting material: {str(e)}"
446 | 
447 | 
448 | @mcp.tool()
449 | def execute_blender_code(ctx: Context, code: str) -> str:
450 |     """
451 |     Execute arbitrary Python code in Blender.
452 | 
453 |     Parameters:
454 |     - code: The Python code to execute
455 |     """
456 |     try:
457 |         # Get the global connection
458 |         blender = get_blender_connection()
459 | 
460 |         result = blender.send_command("execute_code", {"code": code})
461 |         return f"Code executed successfully: {result.get('result', '')}"
462 |     except Exception as e:
463 |         logger.error(f"Error executing code: {str(e)}")
464 |         return f"Error executing code: {str(e)}"
465 | 
466 | 
467 | @mcp.tool()
468 | def get_polyhaven_categories(ctx: Context, asset_type: str = "hdris") -> str:
469 |     """
470 |     Get a list of categories for a specific asset type on Polyhaven.
471 | 
472 |     Parameters:
473 |     - asset_type: The type of asset to get categories for (hdris, textures, models, all)
474 |     """
475 |     try:
476 |         blender = get_blender_connection()
477 |         if not _polyhaven_enabled:
478 |             return "PolyHaven integration is disabled. Select it in the sidebar in BlenderMCP, then run it again."
479 |         result = blender.send_command(
480 |             "get_polyhaven_categories", {"asset_type": asset_type}
481 |         )
482 | 
483 |         if "error" in result:
484 |             return f"Error: {result['error']}"
485 | 
486 |         # Format the categories in a more readable way
487 |         categories = result["categories"]
488 |         formatted_output = f"Categories for {asset_type}:\n\n"
489 | 
490 |         # Sort categories by count (descending)
491 |         sorted_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True)
492 | 
493 |         for category, count in sorted_categories:
494 |             formatted_output += f"- {category}: {count} assets\n"
495 | 
496 |         return formatted_output
497 |     except Exception as e:
498 |         logger.error(f"Error getting Polyhaven categories: {str(e)}")
499 |         return f"Error getting Polyhaven categories: {str(e)}"
500 | 
501 | 
502 | @mcp.tool()
503 | def search_polyhaven_assets(
504 |     ctx: Context, asset_type: str = "all", categories: str = None
505 | ) -> str:
506 |     """
507 |     Search for assets on Polyhaven with optional filtering.
508 | 
509 |     Parameters:
510 |     - asset_type: Type of assets to search for (hdris, textures, models, all)
511 |     - categories: Optional comma-separated list of categories to filter by
512 | 
513 |     Returns a list of matching assets with basic information.
514 |     """
515 |     try:
516 |         blender = get_blender_connection()
517 |         result = blender.send_command(
518 |             "search_polyhaven_assets",
519 |             {"asset_type": asset_type, "categories": categories},
520 |         )
521 | 
522 |         if "error" in result:
523 |             return f"Error: {result['error']}"
524 | 
525 |         # Format the assets in a more readable way
526 |         assets = result["assets"]
527 |         total_count = result["total_count"]
528 |         returned_count = result["returned_count"]
529 | 
530 |         formatted_output = f"Found {total_count} assets"
531 |         if categories:
532 |             formatted_output += f" in categories: {categories}"
533 |         formatted_output += f"\nShowing {returned_count} assets:\n\n"
534 | 
535 |         # Sort assets by download count (popularity)
536 |         sorted_assets = sorted(
537 |             assets.items(), key=lambda x: x[1].get("download_count", 0), reverse=True
538 |         )
539 | 
540 |         for asset_id, asset_data in sorted_assets:
541 |             formatted_output += (
542 |                 f"- {asset_data.get('name', asset_id)} (ID: {asset_id})\n"
543 |             )
544 |             formatted_output += (
545 |                 f"  Type: {['HDRI', 'Texture', 'Model'][asset_data.get('type', 0)]}\n"
546 |             )
547 |             formatted_output += (
548 |                 f"  Categories: {', '.join(asset_data.get('categories', []))}\n"
549 |             )
550 |             formatted_output += (
551 |                 f"  Downloads: {asset_data.get('download_count', 'Unknown')}\n\n"
552 |             )
553 | 
554 |         return formatted_output
555 |     except Exception as e:
556 |         logger.error(f"Error searching Polyhaven assets: {str(e)}")
557 |         return f"Error searching Polyhaven assets: {str(e)}"
558 | 
559 | 
560 | @mcp.tool()
561 | def download_polyhaven_asset(
562 |     ctx: Context,
563 |     asset_id: str,
564 |     asset_type: str,
565 |     resolution: str = "1k",
566 |     file_format: str = None,
567 | ) -> str:
568 |     """
569 |     Download and import a Polyhaven asset into Blender.
570 | 
571 |     Parameters:
572 |     - asset_id: The ID of the asset to download
573 |     - asset_type: The type of asset (hdris, textures, models)
574 |     - resolution: The resolution to download (e.g., 1k, 2k, 4k)
575 |     - file_format: Optional file format (e.g., hdr, exr for HDRIs; jpg, png for textures; gltf, fbx for models)
576 | 
577 |     Returns a message indicating success or failure.
578 |     """
579 |     try:
580 |         blender = get_blender_connection()
581 |         result = blender.send_command(
582 |             "download_polyhaven_asset",
583 |             {
584 |                 "asset_id": asset_id,
585 |                 "asset_type": asset_type,
586 |                 "resolution": resolution,
587 |                 "file_format": file_format,
588 |             },
589 |         )
590 | 
591 |         if "error" in result:
592 |             return f"Error: {result['error']}"
593 | 
594 |         if result.get("success"):
595 |             message = result.get(
596 |                 "message", "Asset downloaded and imported successfully"
597 |             )
598 | 
599 |             # Add additional information based on asset type
600 |             if asset_type == "hdris":
601 |                 return f"{message}. The HDRI has been set as the world environment."
602 |             elif asset_type == "textures":
603 |                 material_name = result.get("material", "")
604 |                 maps = ", ".join(result.get("maps", []))
605 |                 return (
606 |                     f"{message}. Created material '{material_name}' with maps: {maps}."
607 |                 )
608 |             elif asset_type == "models":
609 |                 return f"{message}. The model has been imported into the current scene."
610 |             else:
611 |                 return message
612 |         else:
613 |             return f"Failed to download asset: {result.get('message', 'Unknown error')}"
614 |     except Exception as e:
615 |         logger.error(f"Error downloading Polyhaven asset: {str(e)}")
616 |         return f"Error downloading Polyhaven asset: {str(e)}"
617 | 
618 | 
619 | @mcp.tool()
620 | def set_texture(ctx: Context, object_name: str, texture_id: str) -> str:
621 |     """
622 |     Apply a previously downloaded Polyhaven texture to an object.
623 | 
624 |     Parameters:
625 |     - object_name: Name of the object to apply the texture to
626 |     - texture_id: ID of the Polyhaven texture to apply (must be downloaded first)
627 | 
628 |     Returns a message indicating success or failure.
629 |     """
630 |     try:
631 |         # Get the global connection
632 |         blender = get_blender_connection()
633 | 
634 |         result = blender.send_command(
635 |             "set_texture", {"object_name": object_name, "texture_id": texture_id}
636 |         )
637 | 
638 |         if "error" in result:
639 |             return f"Error: {result['error']}"
640 | 
641 |         if result.get("success"):
642 |             material_name = result.get("material", "")
643 |             maps = ", ".join(result.get("maps", []))
644 | 
645 |             # Add detailed material info
646 |             material_info = result.get("material_info", {})
647 |             node_count = material_info.get("node_count", 0)
648 |             has_nodes = material_info.get("has_nodes", False)
649 |             texture_nodes = material_info.get("texture_nodes", [])
650 | 
651 |             output = f"Successfully applied texture '{texture_id}' to {object_name}.\n"
652 |             output += f"Using material '{material_name}' with maps: {maps}.\n\n"
653 |             output += f"Material has nodes: {has_nodes}\n"
654 |             output += f"Total node count: {node_count}\n\n"
655 | 
656 |             if texture_nodes:
657 |                 output += "Texture nodes:\n"
658 |                 for node in texture_nodes:
659 |                     output += f"- {node['name']} using image: {node['image']}\n"
660 |                     if node["connections"]:
661 |                         output += "  Connections:\n"
662 |                         for conn in node["connections"]:
663 |                             output += f"    {conn}\n"
664 |             else:
665 |                 output += "No texture nodes found in the material.\n"
666 | 
667 |             return output
668 |         else:
669 |             return f"Failed to apply texture: {result.get('message', 'Unknown error')}"
670 |     except Exception as e:
671 |         logger.error(f"Error applying texture: {str(e)}")
672 |         return f"Error applying texture: {str(e)}"
673 | 
674 | 
675 | @mcp.tool()
676 | def get_polyhaven_status(ctx: Context) -> str:
677 |     """
678 |     Check if PolyHaven integration is enabled in Blender.
679 |     Returns a message indicating whether PolyHaven features are available.
680 |     """
681 |     try:
682 |         blender = get_blender_connection()
683 |         result = blender.send_command("get_polyhaven_status")
684 |         enabled = result.get("enabled", False)
685 |         message = result.get("message", "")
686 | 
687 |         return message
688 |     except Exception as e:
689 |         logger.error(f"Error checking PolyHaven status: {str(e)}")
690 |         return f"Error checking PolyHaven status: {str(e)}"
691 | 
692 | 
693 | @mcp.prompt()
694 | def asset_creation_strategy() -> str:
695 |     """Defines the preferred strategy for creating assets in Blender"""
696 |     return """When creating 3D content in Blender, always start by checking if PolyHaven is available:
697 | 
698 |     0. Before anything, always check the scene from get_scene_info()
699 |     1. First use get_polyhaven_status() to verify if PolyHaven integration is enabled.
700 | 
701 |     2. If PolyHaven is enabled:
702 |        - For objects/models: Use download_polyhaven_asset() with asset_type="models"
703 |        - For materials/textures: Use download_polyhaven_asset() with asset_type="textures"
704 |        - For environment lighting: Use download_polyhaven_asset() with asset_type="hdris"
705 | 
706 |     3. If PolyHaven is disabled or when falling back to basic tools:
707 |        - create_object() for basic primitives (CUBE, SPHERE, CYLINDER, etc.)
708 |        - set_material() for basic colors and materials
709 | 
710 |     Only fall back to basic creation tools when:
711 |     - PolyHaven is disabled
712 |     - A simple primitive is explicitly requested
713 |     - No suitable PolyHaven asset exists
714 |     - The task specifically requires a basic material/color
715 |     """
716 | 
717 | 
718 | @mcp.tool()
719 | async def create_3d_model_from_text(
720 |     describe_the_look_of_object: str, face_limit: int = -1
721 | ) -> Dict[str, Any]:
722 |     """
723 |     Create a 3D model from a text description using the Tripo API.
724 | 
725 |     IMPORTANT: This tool initiates a 3D model generation task but does NOT wait for completion.
726 |     After calling this tool, you MUST repeatedly call the get_task_status tool with the returned
727 |     task_id until the task status is SUCCESS or a terminal error state.
728 | 
729 |     Typical workflow:
730 |     1. Call create_3d_model_from_text to start the task
731 |     2. Get the task_id from the response
732 |     3. Call get_task_status with the task_id
733 |     4. If status is not SUCCESS, wait a moment and call get_task_status again
734 |     5. Repeat until status is SUCCESS or a terminal error state
735 |     6. When status is SUCCESS, use the pbr_model_url from the response
736 | 
737 |     Args:
738 |         describe_the_look_of_object: A detailed description of the object to generate.
739 |         face_limit: The maximum number of faces in the model.
740 |         auto_size: Whether to automatically size the model.
741 | 
742 |     Returns:
743 |         A dictionary containing the task ID and instructions for checking the status.
744 |     """
745 |     # Get the API key from environment variable
746 |     # api_key = os.environ.get("TRIPO_API_KEY")
747 |     get_blender_connection()
748 |     blender = get_blender_connection()
749 |     api_key = _tripo_apikey
750 |     if not api_key:
751 |         raise ValueError(
752 |             "It may take some more time to correctly re-enter the tripo function. You can first check the scene and then try the tripo function again later"
753 |         )
754 | 
755 |     # Create the Tripo client
756 |     async with TripoClient(api_key=api_key) as client:
757 |         # Create a text-to-model task
758 |         task_id = await client.text_to_model(
759 |             prompt=describe_the_look_of_object,
760 |             face_limit=face_limit,
761 |         )
762 | 
763 |         # Get initial task status
764 |         task = await client.get_task(task_id)
765 | 
766 |         # Return immediately with task ID and status
767 |         return {
768 |             "task_id": task_id,
769 |             "status": str(task.status),
770 |             "progress": task.progress,
771 |             "message": "Task created successfully. The 3D model generation is in progress.",
772 |             "next_step": "You MUST now call get_task_status with this task_id to check progress.",
773 |             "important_note": "3D model generation takes 3-5 minutes. You need to repeatedly call get_task_status until completion.",
774 |             "workflow": [
775 |                 "1. You've completed this step by calling create_3d_model_from_text",
776 |                 "2. Now call get_task_status with task_id: " + task_id,
777 |                 "3. If status is not SUCCESS, wait and call get_task_status again",
778 |                 "4. When status is SUCCESS, use the pbr_model_url from the response",
779 |             ],
780 |         }
781 | 
782 | 
783 | @mcp.tool()
784 | async def create_3d_model_from_image(
785 |     image: str, face_limit: int = -1
786 | ) -> Dict[str, Any]:
787 |     """
788 |     Create a 3D model from an image using the Tripo API.
789 | 
790 |     IMPORTANT: This tool initiates a 3D model generation task but does NOT wait for completion.
791 |     After calling this tool, you MUST repeatedly call the get_task_status tool with the returned
792 |     task_id until the task status is SUCCESS or a terminal error state.
793 | 
794 |     Typical workflow:
795 |     1. Call create_3d_model_from_image to start the task
796 |     2. Get the task_id from the response
797 |     3. Call get_task_status with the task_id
798 |     4. If status is not SUCCESS, wait a moment and call get_task_status again
799 |     5. Repeat until status is SUCCESS or a terminal error state
800 |     6. When status is SUCCESS, use the pbr_model_url from the response
801 | 
802 |     Args:
803 |         image: The local path or url to the image file.
804 |         face_limit: The maximum number of faces in the model.
805 |         auto_size: Whether to automatically size the model.
806 | 
807 |     Returns:
808 |         A dictionary containing the task ID and instructions for checking the status.
809 |     """
810 |     # Get the API key from environment variable
811 |     # api_key = os.environ.get("TRIPO_API_KEY")
812 |     get_blender_connection()
813 |     api_key = _tripo_apikey
814 |     if not api_key:
815 |         raise ValueError(
816 |             "It may take some more time to correctly re-enter the tripo function. You can first check the scene and then try the tripo function again later"
817 |         )
818 | 
819 |     # Create the Tripo client
820 |     async with TripoClient(api_key=api_key) as client:
821 |         # Create a text-to-model task
822 |         task_id = await client.image_to_model(
823 |             image=image,
824 |             face_limit=face_limit,
825 |         )
826 | 
827 |         # Get initial task status
828 |         task = await client.get_task(task_id)
829 | 
830 |         # Return immediately with task ID and status
831 |         return {
832 |             "task_id": task_id,
833 |             "status": str(task.status),
834 |             "progress": task.progress,
835 |             "message": "Task created successfully. The 3D model generation is in progress.",
836 |             "next_step": "You MUST now call get_task_status with this task_id to check progress.",
837 |             "important_note": "3D model generation takes 3-5 minutes. You need to repeatedly call get_task_status until completion.",
838 |             "workflow": [
839 |                 "1. You've completed this step by calling create_3d_model_from_image",
840 |                 "2. Now call get_task_status with task_id: " + task_id,
841 |                 "3. If status is not SUCCESS, wait and call get_task_status again",
842 |                 "4. When status is SUCCESS, use the pbr_model_url from the response",
843 |             ],
844 |         }
845 | 
846 | 
847 | @mcp.tool()
848 | def import_tripo_glb_model(ctx: Context, glb_url: str) -> str:
849 |     """
850 |     Import a GLB model from URL into Blender scene
851 | 
852 |     Parameters:
853 |     - glb_url: Download URL of the GLB model file
854 | 
855 |     Returns:
856 |     Result message of the import operation
857 |     """
858 |     try:
859 |         blender = get_blender_connection()
860 |         result = blender.send_command("import_tripo_glb_model", {"url": glb_url})
861 | 
862 |         if "error" in result:
863 |             return f"Import failed: {result['error']}"
864 | 
865 |         if result.get("status") == "success":
866 |             output = ["Successfully imported models:"]
867 |             for model in result.get("models", []):
868 |                 dim = model["dimensions"]
869 |                 output.append(
870 |                     f"• {model['name']} | Dimensions: "
871 |                     f"{dim['x']} x {dim['y']} x {dim['z']} meters"
872 |                 )
873 | 
874 |             if not output:
875 |                 output.append("No models found in imported file")
876 | 
877 |             return "\n".join(output)
878 |         else:
879 |             return f"Import failed: {result.get('message', 'Unknown error')}"
880 | 
881 |     except Exception as e:
882 |         logger.error(f"Error importing GLB model: {str(e)}")
883 |         return f"GLB model import failed: {str(e)}"
884 | 
885 | 
886 | @mcp.tool()
887 | async def get_task_status(task_id: str) -> Dict[str, Any]:
888 |     """
889 |     Get the status of a 3D model generation task.
890 | 
891 |     IMPORTANT: This tool checks the status of a task started by create_3d_model_from_text.
892 |     You may need to call this tool MULTIPLE TIMES until the task completes.
893 | 
894 |     Typical workflow:
895 |     1. Call this tool with the task_id from create_3d_model_from_text
896 |     2. Check the status in the response:
897 |        - If status is SUCCESS, the task is complete and you can use the pbr_model_url
898 |        - If status is FAILED, CANCELLED, BANNED, or EXPIRED, the task failed
899 |        - If status is anything else, the task is still in progress
900 |     3. If the task is still in progress, wait a moment and call this tool again
901 | 
902 |     Args:
903 |         task_id: The ID of the task to check (obtained from create_3d_model_from_text).
904 | 
905 |     Returns:
906 |         A dictionary containing the task status and other information.
907 |     """
908 |     # Get the API key from environment variable
909 |     # api_key = os.environ.get("TRIPO_API_KEY")
910 |     get_blender_connection()
911 |     api_key = _tripo_apikey
912 |     if not api_key:
913 |         raise ValueError(
914 |             "It may take some more time to correctly re-enter the tripo function. You can first check the scene and then try the tripo function again later"
915 |         )
916 | 
917 |     # Create the Tripo client
918 |     async with TripoClient(api_key=api_key) as client:
919 |         # Get task status
920 |         task = await client.get_task(task_id)
921 | 
922 |         # Ensure task is not None
923 |         if task is None:
924 |             raise ValueError(
925 |                 f"Failed to retrieve task information for task ID: {task_id}"
926 |             )
927 | 
928 |         # Create result dictionary
929 |         result = {
930 |             "task_id": task_id,
931 |             "status": str(task.status),
932 |             "progress": task.progress,
933 |         }
934 | 
935 |         # Add output fields if task is successful and output is available
936 |         if task.status == TaskStatus.SUCCESS and task.output:
937 |             result.update(
938 |                 {
939 |                     "base_model_url": task.output.base_model,
940 |                     "model_url": task.output.model,
941 |                     "pbr_model_url": task.output.pbr_model,
942 |                     "rendered_image_url": task.output.rendered_image,
943 |                     "message": "Task completed successfully! You can now use the pbr_model_url.",
944 |                     "next_step": "Use the pbr_model_url to access the 3D model, download it through import_tripo_glb_model tool",
945 |                 }
946 |             )
947 | 
948 |             if not task.output.pbr_model:
949 |                 result["warning"] = (
950 |                     "Model generated but PBR model URL is not available."
951 |                 )
952 |         elif task.status == TaskStatus.SUCCESS:
953 |             result["message"] = (
954 |                 "Task completed successfully but no output data is available."
955 |             )
956 |             result["next_step"] = (
957 |                 "Try creating a new model with a different description."
958 |             )
959 |         elif task.status in (
960 |             TaskStatus.FAILED,
961 |             TaskStatus.CANCELLED,
962 |             TaskStatus.BANNED,
963 |             TaskStatus.EXPIRED,
964 |         ):
965 |             result["message"] = f"Task failed with status: {task.status}"
966 |             result["next_step"] = (
967 |                 "Try creating a new model with a different description."
968 |             )
969 |         else:
970 |             result["message"] = (
971 |                 f"Task is still in progress. Current status: {task.status}, Progress: {task.progress}%"
972 |             )
973 |             result["next_step"] = (
974 |                 "IMPORTANT: You must call get_task_status again with this task_id to continue checking progress."
975 |             )
976 |             result["wait_message"] = (
977 |                 "3D model generation typically takes 3-5 minutes. Please be patient and keep checking."
978 |             )
979 | 
980 |         return result
981 | 
982 | 
983 | def main():
984 |     # mcp.run("sse")
985 |     mcp.run(transport="stdio")
986 | 
987 | 
988 | if __name__ == "__main__":
989 |     main()
990 | 
```