# Directory Structure
```
├── .gitignore
├── .python-version
├── addon.py
├── LICENSE
├── main.py
├── pyproject.toml
├── README.md
├── src
│ └── blender_open_mcp
│ ├── __init__.py
│ └── server.py
├── tests
│ ├── test_addon.py
│ └── test_server.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12.11
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# blender-open-mcp
`blender-open-mcp` is an open source project that integrates Blender with local AI models (via [Ollama](https://ollama.com/)) using the Model Context Protocol (MCP). This allows you to control Blender using natural language prompts, leveraging the power of AI to assist with 3D modeling tasks.
## Features
- **Control Blender with Natural Language:** Send prompts to a locally running Ollama model to perform actions in Blender.
- **MCP Integration:** Uses the Model Context Protocol for structured communication between the AI model and Blender.
- **Ollama Support:** Designed to work with Ollama for easy local model management.
- **Blender Add-on:** Includes a Blender add-on to provide a user interface and handle communication with the server.
- **PolyHaven Integration (Optional):** Download and use assets (HDRIs, textures, models) from [PolyHaven](https://polyhaven.com/) directly within Blender via AI prompts.
- **Basic 3D Operations:**
- Get Scene and Object Info
- Create Primitives
- Modify and delete objects
- Apply materials
- **Render Support:** Render images using the tool and retrieve information based on the output.
## Installation
### Prerequisites
1. **Blender:** Blender 3.0 or later. Download from [blender.org](https://www.blender.org/download/).
2. **Ollama:** Install from [ollama.com](https://ollama.com/), following OS-specific instructions.
3. **Python:** Python 3.10 or later.
4. **uv:** Install using `pip install uv`.
5. **Git:** Required for cloning the repository.
### Installation Steps
1. **Clone the Repository:**
```bash
git clone https://github.com/dhakalnirajan/blender-open-mcp.git
cd blender-open-mcp
```
2. **Create and Activate a Virtual Environment (Recommended):**
```bash
uv venv
source .venv/bin/activate # On Linux/macOS
.venv\Scripts\activate # On Windows
```
3. **Install Dependencies:**
```bash
uv pip install -e .
```
4. **Install the Blender Add-on:**
- Open Blender.
- Go to `Edit -> Preferences -> Add-ons`.
- Click `Install...`.
- Select the `addon.py` file from the `blender-open-mcp` directory.
- Enable the "Blender MCP" add-on.
5. **Download an Ollama Model (if not already installed):**
```bash
ollama run llama3.2
```
*(Other models like **`Gemma3`** can also be used.)*
## Setup
1. **Start the Ollama Server:** Ensure Ollama is running in the background.
2. **Start the MCP Server:**
```bash
blender-mcp
```
Or,
```bash
python src/blender_open_mcp/server.py
```
By default, it listens on `http://0.0.0.0:8000`, but you can modify settings:
```bash
blender-mcp --host 127.0.0.1 --port 8001 --ollama-url http://localhost:11434 --ollama-model llama3.2
```
3. **Start the Blender Add-on Server:**
- Open Blender and the 3D Viewport.
- Press `N` to open the sidebar.
- Find the "Blender MCP" panel.
- Click "Start MCP Server".
## Usage
Interact with `blender-open-mcp` using the `mcp` command-line tool:
### Example Commands
- **Basic Prompt:**
```bash
mcp prompt "Hello BlenderMCP!" --host http://localhost:8000
```
- **Get Scene Information:**
```bash
mcp tool get_scene_info --host http://localhost:8000
```
- **Create a Cube:**
```bash
mcp prompt "Create a cube named 'my_cube'." --host http://localhost:8000
```
- **Render an Image:**
```bash
mcp prompt "Render the image." --host http://localhost:8000
```
- **Using PolyHaven (if enabled):**
```bash
mcp prompt "Download a texture from PolyHaven." --host http://localhost:8000
```
## Available Tools
| Tool Name | Description | Parameters |
| -------------------------- | -------------------------------------- | ----------------------------------------------------- |
| `get_scene_info` | Retrieves scene details. | None |
| `get_object_info` | Retrieves information about an object. | `object_name` (str) |
| `create_object` | Creates a 3D object. | `type`, `name`, `location`, `rotation`, `scale` |
| `modify_object` | Modifies an object’s properties. | `name`, `location`, `rotation`, `scale`, `visible` |
| `delete_object` | Deletes an object. | `name` (str) |
| `set_material` | Assigns a material to an object. | `object_name`, `material_name`, `color` |
| `render_image` | Renders an image. | `file_path` (str) |
| `execute_blender_code` | Executes Python code in Blender. | `code` (str) |
| `get_polyhaven_categories` | Lists PolyHaven asset categories. | `asset_type` (str) |
| `search_polyhaven_assets` | Searches PolyHaven assets. | `asset_type`, `categories` |
| `download_polyhaven_asset` | Downloads a PolyHaven asset. | `asset_id`, `asset_type`, `resolution`, `file_format` |
| `set_texture` | Applies a downloaded texture. | `object_name`, `texture_id` |
| `set_ollama_model` | Sets the Ollama model. | `model_name` (str) |
| `set_ollama_url` | Sets the Ollama server URL. | `url` (str) |
| `get_ollama_models` | Lists available Ollama models. | None |
## Troubleshooting
If you encounter issues:
- Ensure Ollama and the `blender-open-mcp` server are running.
- Check Blender’s add-on settings.
- Verify command-line arguments.
- Refer to logs for error details.
For further assistance, visit the [GitHub Issues](https://github.com/dhakalnirajan/blender-open-mcp/issues) page.
---
Happy Blending with AI! 🚀
```
--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------
```python
from blender_open_mcp.server import main as server_main
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/blender_open_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
"""Blender integration through the Model Context Protocol."""
__version__ = "0.2.0" # Updated version
# Expose key classes and functions for easier imports
from .server import BlenderConnection, get_blender_connection
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "blender-open-mcp"
version = "0.2.0"
description = "Blender integration with local AI models via MCP and Ollama"
readme = "README.md"
requires-python = ">=3.10"
authors = [
{name = "Nirajan Dhakal"}
]
license = {text = "MIT"}
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"mcp[cli]>=1.3.0",
"httpx>=0.24.0",
"ollama>=0.4.7",
"requests",
]
[project.scripts]
blender-open-mcp = "blender_open_mcp.server:main"
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
package-dir = {"" = "src"}
[project.urls]
"Homepage" = "https://github.com/dhakalnirajan/blender-open-mcp"
"Bug Tracker" = "https://github.com/dhakalnirajan/blender-open-mcp/issues"
```
--------------------------------------------------------------------------------
/tests/test_addon.py:
--------------------------------------------------------------------------------
```python
import sys
import os
import unittest
from unittest.mock import patch, MagicMock
import tempfile
# Mock bpy and its submodules before importing the addon module
bpy_mock = MagicMock()
bpy_mock.props = MagicMock()
sys.modules['bpy'] = bpy_mock
sys.modules['bpy.props'] = bpy_mock.props
# Add the root directory to the path to allow imports
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Now we can import the addon
import addon
class TestAddonBugs(unittest.TestCase):
@patch('addon.requests.get')
def test_hdri_temp_file_deleted(self, mock_requests_get):
"""
Test that the temporary file for a downloaded HDRI is deleted.
"""
# 1. Setup mocks
# Mock requests.get to return a successful response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.content = b"fake hdri data"
mock_requests_get.return_value = mock_response
# Mock the JSON response for the asset files
mock_files_response = MagicMock()
mock_files_response.status_code = 200
mock_files_response.json.return_value = {
"hdri": {
"1k": {
"hdr": {
"url": "http://fake.url/hdri.hdr"
}
}
}
}
# The first call to requests.get is for the files, the second is for the download
mock_requests_get.side_effect = [mock_files_response, mock_response]
# A very basic mock for bpy
addon.bpy.data.images.load.return_value = MagicMock()
addon.bpy.path.abspath.side_effect = lambda x: x # Return the path as is
# 2. Instantiate the server from the addon
server = addon.BlenderMCPServer()
# 3. Call the function
result = server.download_polyhaven_asset(
asset_id="test_hdri",
asset_type="hdris",
resolution="1k",
file_format="hdr"
)
# 4. Assertions
self.assertTrue(result.get("success"))
# Get the path of the temporary file that was created
# The path is passed to bpy.data.images.load
self.assertTrue(addon.bpy.data.images.load.called)
temp_file_path = addon.bpy.data.images.load.call_args[0][0]
# Assert that the temporary file no longer exists
self.assertFalse(os.path.exists(temp_file_path), f"Temporary file was not deleted: {temp_file_path}")
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------
```python
import sys
import os
# Add src to the path to allow imports
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src')))
import unittest
from unittest.mock import patch, MagicMock, AsyncMock
import asyncio
import tempfile
import base64
from mcp.server.fastmcp import Context, Image
# Now import the server module
from blender_open_mcp import server as server_module
class TestServerTools(unittest.TestCase):
def test_set_ollama_url(self):
"""Test the set_ollama_url function."""
ctx = Context()
new_url = "http://localhost:12345"
# Run the async function
result = asyncio.run(server_module.set_ollama_url(ctx, new_url))
self.assertEqual(result, f"Ollama URL set to: {new_url}")
self.assertEqual(server_module._ollama_url, new_url)
def test_render_image_bug(self):
"""
Test the render_image tool to demonstrate the bug.
This test will fail before the fix and pass after.
"""
# Create a mock context object with an add_image method
ctx = MagicMock()
ctx.add_image = MagicMock()
# also mock get_image to return the added image
def get_image():
if ctx.add_image.call_args:
return ctx.add_image.call_args[0][0]
return None
ctx.get_image = get_image
# 1. Create a dummy image file to represent the rendered output
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_file:
tmp_file.write(b"fake_image_data")
correct_image_path = tmp_file.name
# 2. Mock the Blender connection
mock_blender_conn = MagicMock()
# 3. Configure the mock send_command to return the correct path
# This simulates the behavior of the addon
mock_blender_conn.send_command.return_value = {
"rendered": True,
"output_path": correct_image_path,
"resolution": [1920, 1080]
}
# 4. Patch get_blender_connection to return our mock
with patch('blender_open_mcp.server.get_blender_connection', return_value=mock_blender_conn):
# 5. Call the render_image tool
# The bug is that it uses "render.png" instead of `correct_image_path`
result = asyncio.run(server_module.render_image(ctx, file_path="render.png"))
# 6. Assertions
self.assertEqual(result, "Image Rendered Successfully.")
# Check if the context now has an image
ctx.add_image.assert_called_once()
img = ctx.add_image.call_args[0][0]
self.assertIsInstance(img, Image)
# Verify the image data is correct
with open(correct_image_path, "rb") as f:
expected_data = base64.b64encode(f.read()).decode('utf-8')
self.assertEqual(img.data, expected_data)
# 7. Clean up the dummy file
os.remove(correct_image_path)
if __name__ == '__main__':
unittest.main()
```
--------------------------------------------------------------------------------
/src/blender_open_mcp/server.py:
--------------------------------------------------------------------------------
```python
# server.py
from mcp.server.fastmcp import FastMCP, Context, Image
import socket
import json
import asyncio
import logging
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List, Optional
import httpx
from io import BytesIO
import base64
import argparse
import os
from urllib.parse import urlparse
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("BlenderMCPServer")
@dataclass
class BlenderConnection:
host: str
port: int
sock: Optional[socket.socket] = None
timeout: float = 15.0 # Added timeout as a property
def __post_init__(self):
if not isinstance(self.host, str):
raise ValueError("Host must be a string")
if not isinstance(self.port, int):
raise ValueError("Port must be an int")
def connect(self) -> bool:
if self.sock:
return True
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
logger.info(f"Connected to Blender at {self.host}:{self.port}")
self.sock.settimeout(self.timeout) # Set timeout on socket
return True
except Exception as e:
logger.error(f"Failed to connect to Blender: {e!s}")
self.sock = None
return False
def disconnect(self) -> None:
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error disconnecting: {e!s}")
finally:
self.sock = None
def _receive_full_response(self, buffer_size: int = 8192) -> bytes:
"""Receive data with timeout using a loop."""
chunks: List[bytes] = []
timed_out = False
try:
while True:
try:
chunk = self.sock.recv(buffer_size)
if not chunk:
if not chunks:
# Requirement 1b
raise Exception("Connection closed by Blender before any data was sent in this response")
else:
# Requirement 1a
raise Exception("Connection closed by Blender mid-stream with incomplete JSON data")
chunks.append(chunk)
try:
data = b''.join(chunks)
json.loads(data.decode('utf-8')) # Check if it is valid json
logger.debug(f"Received response ({len(data)} bytes)")
return data # Complete JSON received
except json.JSONDecodeError:
# Incomplete JSON, continue receiving
continue
except socket.timeout:
logger.warning("Socket timeout during receive")
timed_out = True # Set flag
break # Stop listening to socket
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error: {e!s}")
self.sock = None
raise # re-raise to outer error handler
# This part is reached if loop is broken by 'break' (only timeout case now)
if timed_out:
if chunks:
data = b''.join(chunks)
# Check if the partial data is valid JSON (it shouldn't be if timeout happened mid-stream)
try:
json.loads(data.decode('utf-8'))
# This case should ideally not be hit if JSON was incomplete,
# but if it's somehow valid, return it.
logger.warning("Timeout occurred, but received data forms valid JSON.")
return data
except json.JSONDecodeError:
# Requirement 2a
raise Exception(f"Incomplete JSON data received before timeout. Received: {data[:200]}")
else:
# Requirement 2b
raise Exception("Timeout waiting for response, no data received.")
# Fallback if loop exited for a reason not covered by explicit raises inside or by timeout logic
# This should ideally not be reached with the current logic.
if chunks: # Should have been handled by "Connection closed by Blender mid-stream..."
data = b''.join(chunks)
logger.warning(f"Exited receive loop unexpectedly with data: {data[:200]}")
raise Exception("Receive loop ended unexpectedly with partial data.")
else: # Should have been handled by "Connection closed by Blender before any data..." or timeout
logger.warning("Exited receive loop unexpectedly with no data.")
raise Exception("Receive loop ended unexpectedly with no data.")
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
# This handles connection errors raised from within the loop or if self.sock.recv fails
logger.error(f"Connection error during receive: {e!s}")
self.sock = None # Ensure socket is reset
# Re-raise with a more specific message if needed, or just re-raise
raise Exception(f"Connection to Blender lost during receive: {e!s}")
except Exception as e:
# Catch other exceptions, including our custom ones, and log them
logger.error(f"Error during _receive_full_response: {e!s}")
# If it's not one of the specific connection errors, it might be one of our custom messages
# or another unexpected issue. Re-raise to be handled by send_command.
raise
def send_command(self, command_type: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if not self.sock and not self.connect():
raise ConnectionError("Not connected")
command = {"type": command_type, "params": params or {}}
try:
logger.info(f"Sending command: {command_type} with params: {params}")
self.sock.sendall(json.dumps(command).encode('utf-8'))
logger.info(f"Command sent, waiting for response...")
response_data = self._receive_full_response()
logger.debug(f"Received response ({len(response_data)} bytes)")
response = json.loads(response_data.decode('utf-8'))
logger.info(f"Response status: {response.get('status', 'unknown')}")
if response.get("status") == "error":
logger.error(f"Blender error: {response.get('message')}")
raise Exception(response.get("message", "Unknown Blender error"))
return response.get("result", {})
except socket.timeout:
logger.error("Socket timeout from Blender")
self.sock = None # reset socket connection
raise Exception("Timeout waiting for Blender - simplify request")
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error: {e!s}")
self.sock = None # reset socket connection
raise Exception(f"Connection to Blender lost: {e!s}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response: {e!s}")
if 'response_data' in locals() and response_data:
logger.error(f"Raw (first 200): {response_data[:200]}")
raise Exception(f"Invalid response from Blender: {e!s}")
except Exception as e:
logger.error(f"Error communicating with Blender: {e!s}")
self.sock = None # reset socket connection
raise Exception(f"Communication error: {e!s}")
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
logger.info("BlenderMCP server starting up")
try:
blender = get_blender_connection()
logger.info("Connected to Blender on startup")
except Exception as e:
logger.warning(f"Could not connect to Blender on startup: {e!s}")
logger.warning("Ensure Blender addon is running before using resources")
yield {}
global _blender_connection
if _blender_connection:
logger.info("Disconnecting from Blender on shutdown")
_blender_connection.disconnect()
_blender_connection = None
logger.info("BlenderMCP server shut down")
# Initialize MCP server instance globally
mcp = FastMCP(
"BlenderOpenMCP",
lifespan=server_lifespan
)
_blender_connection = None
_polyhaven_enabled = False
# Default values (will be overridden by command-line arguments)
_ollama_model = ""
_ollama_url = "http://localhost:11434"
def get_blender_connection() -> BlenderConnection:
global _blender_connection, _polyhaven_enabled
if _blender_connection:
try:
result = _blender_connection.send_command("get_polyhaven_status")
_polyhaven_enabled = result.get("enabled", False)
return _blender_connection
except Exception as e:
logger.warning(f"Existing connection invalid: {e!s}")
try:
_blender_connection.disconnect()
except:
pass
_blender_connection = None
if _blender_connection is None:
_blender_connection = BlenderConnection(host="localhost", port=9876)
if not _blender_connection.connect():
logger.error("Failed to connect to Blender")
_blender_connection = None
raise Exception("Could not connect to Blender. Addon running?")
logger.info("Created new persistent connection to Blender")
return _blender_connection
async def query_ollama(prompt: str, context: Optional[List[Dict]] = None, image: Optional[Image] = None) -> str:
global _ollama_model, _ollama_url
payload = {"prompt": prompt, "model": _ollama_model, "format": "json", "stream": False}
if context:
payload["context"] = context
if image:
if image.data:
payload["images"] = [image.data]
elif image.path:
try:
with open(image.path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
payload["images"] = [encoded_string]
except FileNotFoundError:
logger.error(f"Image file not found: {image.path}")
return "Error: Image file not found."
else:
logger.warning("Image without data or path. Ignoring.")
try:
async with httpx.AsyncClient() as client:
response = await client.post(f"{_ollama_url}/api/generate", json=payload, timeout=60.0)
response.raise_for_status() # Raise HTTPStatusError for bad status
response_data = response.json()
logger.debug(f"Raw Ollama response: {response_data}")
if "response" in response_data:
return response_data["response"]
else:
logger.error(f"Unexpected response format: {response_data}")
return "Error: Unexpected response format from Ollama."
except httpx.HTTPStatusError as e:
logger.error(f"Ollama API error: {e.response.status_code} - {e.response.text}")
return f"Error: Ollama API returned: {e.response.status_code}"
except httpx.RequestError as e:
logger.error(f"Ollama API request failed: {e}")
return "Error: Failed to connect to Ollama API."
except Exception as e:
logger.error(f"An unexpected error occurred: {e!s}")
return f"Error: An unexpected error occurred: {e!s}"
@mcp.prompt()
async def base_prompt(context: Context, user_message: str) -> str:
system_message = f"""You are a helpful assistant that controls Blender.
You can use the following tools. Respond in well-formatted, valid JSON:
{mcp.tools_schema()}"""
full_prompt = f"{system_message}\n\n{user_message}"
response = await query_ollama(full_prompt, context.history(), context.get_image())
return response
@mcp.tool()
def get_scene_info(ctx: Context) -> str:
try:
blender = get_blender_connection()
result = blender.send_command("get_scene_info")
return json.dumps(result, indent=2) # Return as a formatted string
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def get_object_info(ctx: Context, object_name: str) -> str:
try:
blender = get_blender_connection()
result = blender.send_command("get_object_info", {"name": object_name})
return json.dumps(result, indent=2) # Return as a formatted string
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def create_object(
ctx: Context,
type: str = "CUBE",
name: Optional[str] = None,
location: Optional[List[float]] = None,
rotation: Optional[List[float]] = None,
scale: Optional[List[float]] = None
) -> str:
try:
blender = get_blender_connection()
loc, rot, sc = location or [0, 0, 0], rotation or [0, 0, 0], scale or [1, 1, 1]
params = {"type": type, "location": loc, "rotation": rot, "scale": sc}
if name: params["name"] = name
result = blender.send_command("create_object", params)
return f"Created {type} object: {result['name']}"
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def modify_object(
ctx: Context,
name: str,
location: Optional[List[float]] = None,
rotation: Optional[List[float]] = None,
scale: Optional[List[float]] = None,
visible: Optional[bool] = None
) -> str:
try:
blender = get_blender_connection()
params = {"name": name}
if location is not None: params["location"] = location
if rotation is not None: params["rotation"] = rotation
if scale is not None: params["scale"] = scale
if visible is not None: params["visible"] = visible
result = blender.send_command("modify_object", params)
return f"Modified object: {result['name']}"
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def delete_object(ctx: Context, name: str) -> str:
try:
blender = get_blender_connection()
blender.send_command("delete_object", {"name": name})
return f"Deleted object: {name}"
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def set_material(
ctx: Context,
object_name: str,
material_name: Optional[str] = None,
color: Optional[List[float]] = None
) -> str:
try:
blender = get_blender_connection()
params = {"object_name": object_name}
if material_name: params["material_name"] = material_name
if color: params["color"] = color
result = blender.send_command("set_material", params)
return f"Applied material to {object_name}: {result.get('material_name', 'unknown')}"
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def execute_blender_code(ctx: Context, code: str) -> str:
try:
blender = get_blender_connection()
result = blender.send_command("execute_code", {"code": code})
return f"Code executed: {result.get('result', '')}"
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def get_polyhaven_categories(ctx: Context, asset_type: str = "hdris") -> str:
try:
blender = get_blender_connection()
if not _polyhaven_enabled: return "PolyHaven disabled."
result = blender.send_command("get_polyhaven_categories", {"asset_type": asset_type})
if "error" in result: return f"Error: {result['error']}"
categories = result["categories"]
formatted = f"Categories for {asset_type}:\n" + \
"\n".join(f"- {cat}: {count}" for cat, count in
sorted(categories.items(), key=lambda x: x[1], reverse=True))
return formatted
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def search_polyhaven_assets(ctx: Context, asset_type: str = "all", categories: Optional[str] = None) -> str:
try:
blender = get_blender_connection()
result = blender.send_command("search_polyhaven_assets",
{"asset_type": asset_type, "categories": categories})
if "error" in result: return f"Error: {result['error']}"
assets, total, returned = result["assets"], result["total_count"], result["returned_count"]
formatted = f"Found {total} assets" + (f" in: {categories}" if categories else "") + \
f"\nShowing {returned}:\n" + "".join(
f"- {data.get('name', asset_id)} (ID: {asset_id})\n"
f" Type: {['HDRI', 'Texture', 'Model'][data.get('type', 0)]}\n"
f" Categories: {', '.join(data.get('categories', []))}\n"
f" Downloads: {data.get('download_count', 'Unknown')}\n"
for asset_id, data in sorted(assets.items(),
key=lambda x: x[1].get("download_count", 0),
reverse=True))
return formatted
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def download_polyhaven_asset(ctx: Context, asset_id: str, asset_type: str,
resolution: str = "1k", file_format: Optional[str] = None) -> str:
try:
blender = get_blender_connection()
result = blender.send_command("download_polyhaven_asset", {
"asset_id": asset_id, "asset_type": asset_type,
"resolution": resolution, "file_format": file_format})
if "error" in result: return f"Error: {result['error']}"
if result.get("success"):
message = result.get("message", "Success")
if asset_type == "hdris": return f"{message}. HDRI set as world."
elif asset_type == "textures":
mat_name, maps = result.get("material", ""), ", ".join(result.get("maps", []))
return f"{message}. Material '{mat_name}' with: {maps}."
elif asset_type == "models": return f"{message}. Model imported."
return message
return f"Failed: {result.get('message', 'Unknown')}"
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def set_texture(ctx: Context, object_name: str, texture_id: str) -> str:
try:
blender = get_blender_connection()
result = blender.send_command("set_texture",
{"object_name": object_name, "texture_id": texture_id})
if "error" in result: return f"Error: {result['error']}"
if result.get("success"):
mat_name, maps = result.get("material", ""), ", ".join(result.get("maps", []))
info, nodes = result.get("material_info", {}), result.get("material_info", {}).get("texture_nodes", [])
output = (f"Applied '{texture_id}' to {object_name}.\nMaterial '{mat_name}': {maps}.\n"
f"Nodes: {info.get('has_nodes', False)}\nCount: {info.get('node_count', 0)}\n")
if nodes:
output += "Texture nodes:\n" + "".join(
f"- {node['name']} ({node['image']})\n" +
(" Connections:\n" + "".join(f" {conn}\n" for conn in node['connections'])
if node['connections'] else "")
for node in nodes)
return output
return f"Failed: {result.get('message', 'Unknown')}"
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
def get_polyhaven_status(ctx: Context) -> str:
try:
blender = get_blender_connection()
result = blender.send_command("get_polyhaven_status")
return result.get("message", "") # Return the message directly
except Exception as e:
return f"Error: {e!s}"
@mcp.tool()
async def set_ollama_model(ctx: Context, model_name: str) -> str:
global _ollama_model
try:
async with httpx.AsyncClient() as client:
response = await client.post(f"{_ollama_url}/api/show",
json={"name": model_name}, timeout=10.0)
if response.status_code == 200:
_ollama_model = model_name
return f"Ollama model set to: {_ollama_model}"
else: return f"Error: Could not find model '{model_name}'."
except Exception as e:
return f"Error: Failed to communicate: {e!s}"
@mcp.tool()
async def set_ollama_url(ctx: Context, url: str) -> str:
global _ollama_url
if not (url.startswith("http://") or url.startswith("https://")):
return "Error: Invalid URL format. Must start with http:// or https://."
_ollama_url = url
return f"Ollama URL set to: {_ollama_url}"
@mcp.tool()
async def get_ollama_models(ctx: Context) -> str:
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{_ollama_url}/api/tags", timeout=10.0)
response.raise_for_status()
models_data = response.json()
if "models" in models_data:
model_list = [model["name"] for model in models_data["models"]]
return "Available Ollama models:\n" + "\n".join(model_list)
else: return "Error: Unexpected response from Ollama /api/tags."
except httpx.HTTPStatusError as e:
return f"Error: Ollama API error: {e.response.status_code}"
except httpx.RequestError as e:
return "Error: Failed to connect to Ollama API."
except Exception as e:
return f"Error: An unexpected error: {e!s}"
@mcp.tool()
async def render_image(ctx: Context, file_path: str = "render.png") -> str:
try:
blender = get_blender_connection()
result = blender.send_command("render_scene", {"output_path": file_path})
if result and result.get("rendered"):
# Use the actual output path returned from Blender
actual_file_path = result.get("output_path")
if not actual_file_path:
return "Error: Blender rendered but did not return an output path."
try:
with open(actual_file_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
ctx.add_image(Image(data=encoded_string)) # Add image to the context
return "Image Rendered Successfully."
except FileNotFoundError:
return f"Error: Blender rendered to '{actual_file_path}', but the file was not found by the server."
except Exception as exception:
return f"Blender rendered, but the image could not be read: {exception!s}"
else:
return f"Error: Rendering failed with result: {result}"
except Exception as e:
return f"Error: {e!s}"
def main():
"""Run the MCP server."""
parser = argparse.ArgumentParser(description="BlenderMCP Server")
# Set global variables from command-line arguments
global _ollama_url, _ollama_model
parser.add_argument("--ollama-url", type=str, default=_ollama_url,
help="URL of the Ollama server")
parser.add_argument("--ollama-model", type=str, default=_ollama_model,
help="Default Ollama model to use")
parser.add_argument("--port", type=int, default=8000,
help="Port for the MCP server to listen on")
parser.add_argument("--host", type=str, default="0.0.0.0",
help="Host for the MCP server to listen on")
args = parser.parse_args()
_ollama_url = args.ollama_url
_ollama_model = args.ollama_model
# MCP instance is already created globally
mcp.run(host=args.host, port=args.port)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/addon.py:
--------------------------------------------------------------------------------
```python
import bpy
import json
import threading
import socket
import time
import requests
import tempfile
from bpy.props import StringProperty, IntProperty
import traceback
import os
import shutil
bl_info = {
"name": "Blender MCP",
"author": "BlenderMCP",
"version": (0, 2), # Updated version
"blender": (3, 0, 0),
"location": "View3D > Sidebar > BlenderMCP",
"description": "Connect Blender to local AI models via MCP", # Updated description
"category": "Interface",
}
class BlenderMCPServer:
def __init__(self, host='localhost', port=9876):
self.host = host
self.port = port
self.running = False
self.socket = None
self.client = None
self.command_queue = []
self.buffer = b''
def start(self):
self.running = True
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.bind((self.host, self.port))
self.socket.listen(1)
self.socket.setblocking(False)
bpy.app.timers.register(self._process_server, persistent=True)
print(f"BlenderMCP server started on {self.host}:{self.port}")
except Exception as e:
print(f"Failed to start server: {str(e)}")
self.stop()
def stop(self):
self.running = False
if hasattr(bpy.app.timers, "unregister"):
if bpy.app.timers.is_registered(self._process_server):
bpy.app.timers.unregister(self._process_server)
if self.socket:
self.socket.close()
if self.client:
self.client.close()
self.socket = None
self.client = None
print("BlenderMCP server stopped")
def _process_server(self):
if not self.running:
return None
try:
if not self.client and self.socket:
try:
self.client, address = self.socket.accept()
self.client.setblocking(False)
print(f"Connected to client: {address}")
except BlockingIOError:
pass
except Exception as e:
print(f"Error accepting connection: {str(e)}")
if self.client:
try:
try:
data = self.client.recv(8192)
if data:
self.buffer += data
try:
command = json.loads(self.buffer.decode('utf-8'))
self.buffer = b''
response = self.execute_command(command)
response_json = json.dumps(response)
self.client.sendall(response_json.encode('utf-8'))
except json.JSONDecodeError:
pass
else:
print("Client disconnected")
self.client.close()
self.client = None
self.buffer = b''
except BlockingIOError:
pass
except Exception as e:
print(f"Error receiving data: {str(e)}")
self.client.close()
self.client = None
self.buffer = b''
except Exception as e:
print(f"Error with client: {str(e)}")
if self.client:
self.client.close()
self.client = None
self.buffer = b''
except Exception as e:
print(f"Server error: {str(e)}")
return 0.1
def execute_command(self, command):
try:
cmd_type = command.get("type")
params = command.get("params", {})
if cmd_type in ["create_object", "modify_object", "delete_object"]:
if not bpy.context.screen or not bpy.context.screen.areas:
return {"status": "error", "message": "Suitable 'VIEW_3D' context not found for command execution."}
view_3d_areas = [area for area in bpy.context.screen.areas if area.type == 'VIEW_3D']
if not view_3d_areas:
return {"status": "error", "message": "Suitable 'VIEW_3D' context not found for command execution."}
override = bpy.context.copy()
override['area'] = view_3d_areas[0]
with bpy.context.temp_override(**override):
return self._execute_command_internal(command)
else:
return self._execute_command_internal(command)
except Exception as e:
print(f"Error executing command: {str(e)}")
traceback.print_exc()
return {"status": "error", "message": str(e)}
def _execute_command_internal(self, command):
cmd_type = command.get("type")
params = command.get("params", {})
if cmd_type == "get_polyhaven_status":
return {"status": "success", "result": self.get_polyhaven_status()}
handlers = {
"get_scene_info": self.get_scene_info,
"create_object": self.create_object,
"modify_object": self.modify_object,
"delete_object": self.delete_object,
"get_object_info": self.get_object_info,
"execute_code": self.execute_code,
"set_material": self.set_material,
"get_polyhaven_status": self.get_polyhaven_status,
"render_scene": self.render_scene
}
if bpy.context.scene.blendermcp_use_polyhaven:
polyhaven_handlers = {
"get_polyhaven_categories": self.get_polyhaven_categories,
"search_polyhaven_assets": self.search_polyhaven_assets,
"download_polyhaven_asset": self.download_polyhaven_asset,
"set_texture": self.set_texture,
}
handlers.update(polyhaven_handlers)
handler = handlers.get(cmd_type)
if handler:
try:
print(f"Executing handler for {cmd_type}")
result = handler(**params)
print(f"Handler execution complete")
return {"status": "success", "result": result}
except Exception as e:
print(f"Error in handler: {str(e)}")
traceback.print_exc()
return {"status": "error", "message": str(e)}
else:
return {"status": "error", "message": f"Unknown command type: {cmd_type}"}
def get_simple_info(self):
return {
"blender_version": ".".join(str(v) for v in bpy.app.version),
"scene_name": bpy.context.scene.name,
"object_count": len(bpy.context.scene.objects)
}
def get_scene_info(self):
try:
print("Getting scene info...")
scene_info = {
"name": bpy.context.scene.name,
"object_count": len(bpy.context.scene.objects),
"objects": [],
"materials_count": len(bpy.data.materials),
}
for i, obj in enumerate(bpy.context.scene.objects):
if i >= 10:
break
obj_info = {
"name": obj.name,
"type": obj.type,
"location": [round(float(obj.location.x), 2),
round(float(obj.location.y), 2),
round(float(obj.location.z), 2)],
}
scene_info["objects"].append(obj_info)
print(f"Scene info collected: {len(scene_info['objects'])} objects")
return scene_info
except Exception as e:
print(f"Error in get_scene_info: {str(e)}")
traceback.print_exc()
return {"error": str(e)}
def render_scene(self, output_path=None, resolution_x=None, resolution_y=None):
"""Render the current scene"""
try:
if resolution_x is not None:
bpy.context.scene.render.resolution_x = int(resolution_x)
if resolution_y is not None:
bpy.context.scene.render.resolution_y = int(resolution_y)
if output_path:
# Use absolute path and ensure directory exists.
output_path = bpy.path.abspath(output_path)
output_dir = os.path.dirname(output_path)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
bpy.context.scene.render.filepath = output_path
else: # If path not given save to a temp dir
output_path = os.path.join(tempfile.gettempdir(),"render.png")
bpy.context.scene.render.filepath = output_path
# Render the scene
bpy.ops.render.render(write_still=True) #Always write still even if no path given
return {
"rendered": True,
"output_path": output_path ,
"resolution": [bpy.context.scene.render.resolution_x, bpy.context.scene.render.resolution_y],
}
except Exception as e:
print(f"Error in render_scene: {str(e)}")
traceback.print_exc()
return {"error": str(e)}
def create_object(self, type="CUBE", name=None, location=(0, 0, 0), rotation=(0, 0, 0), scale=(1, 1, 1)):
bpy.ops.object.select_all(action='DESELECT')
if type == "CUBE":
bpy.ops.mesh.primitive_cube_add(location=location, rotation=rotation, scale=scale)
elif type == "SPHERE":
bpy.ops.mesh.primitive_uv_sphere_add(location=location, rotation=rotation, scale=scale)
elif type == "CYLINDER":
bpy.ops.mesh.primitive_cylinder_add(location=location, rotation=rotation, scale=scale)
elif type == "PLANE":
bpy.ops.mesh.primitive_plane_add(location=location, rotation=rotation, scale=scale)
elif type == "CONE":
bpy.ops.mesh.primitive_cone_add(location=location, rotation=rotation, scale=scale)
elif type == "TORUS":
bpy.ops.mesh.primitive_torus_add(location=location, rotation=rotation, scale=scale)
elif type == "EMPTY":
bpy.ops.object.empty_add(location=location, rotation=rotation)
elif type == "CAMERA":
bpy.ops.object.camera_add(location=location, rotation=rotation)
elif type == "LIGHT":
bpy.ops.object.light_add(type='POINT', location=location, rotation=rotation)
else:
raise ValueError(f"Unsupported object type: {type}")
obj = bpy.context.active_object
if name:
obj.name = name
return {
"name": obj.name,
"type": obj.type,
"location": [obj.location.x, obj.location.y, obj.location.z],
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
}
def modify_object(self, name, location=None, rotation=None, scale=None, visible=None):
obj = bpy.data.objects.get(name)
if not obj:
raise ValueError(f"Object not found: {name}")
if location is not None:
obj.location = location
if rotation is not None:
obj.rotation_euler = rotation
if scale is not None:
obj.scale = scale
if visible is not None:
obj.hide_viewport = not visible
obj.hide_render = not visible
return {
"name": obj.name,
"type": obj.type,
"location": [obj.location.x, obj.location.y, obj.location.z],
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
"visible": obj.visible_get(),
}
def delete_object(self, name):
obj = bpy.data.objects.get(name)
if not obj:
raise ValueError(f"Object not found: {name}")
obj_name = obj.name
bpy.ops.object.select_all(action='DESELECT')
obj.select_set(True)
bpy.ops.object.delete()
return {"deleted": obj_name}
def get_object_info(self, name):
obj = bpy.data.objects.get(name)
if not obj:
raise ValueError(f"Object not found: {name}")
obj_info = {
"name": obj.name,
"type": obj.type,
"location": [obj.location.x, obj.location.y, obj.location.z],
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
"visible": obj.visible_get(),
"materials": [],
}
for slot in obj.material_slots:
if slot.material:
obj_info["materials"].append(slot.material.name)
if obj.type == 'MESH' and obj.data:
mesh = obj.data
obj_info["mesh"] = {
"vertices": len(mesh.vertices),
"edges": len(mesh.edges),
"polygons": len(mesh.polygons),
}
return obj_info
def execute_code(self, code):
try:
namespace = {"bpy": bpy}
exec(code, namespace)
return {"executed": True}
except Exception as e:
raise Exception(f"Code execution error: {str(e)}")
def set_material(self, object_name, material_name=None, create_if_missing=True, color=None):
"""Set or create a material for an object."""
try:
obj = bpy.data.objects.get(object_name)
if not obj:
raise ValueError(f"Object not found: {object_name}")
if not hasattr(obj, 'data') or not hasattr(obj.data, 'materials'):
raise ValueError(f"Object {object_name} cannot accept materials")
if material_name:
mat = bpy.data.materials.get(material_name)
if not mat and create_if_missing:
mat = bpy.data.materials.new(name=material_name)
print(f"Created new material: {material_name}")
else:
mat_name = f"{object_name}_material"
mat = bpy.data.materials.get(mat_name)
if not mat:
mat = bpy.data.materials.new(name=mat_name)
material_name = mat_name
print(f"Using material: {mat_name}")
if mat:
if not mat.use_nodes:
mat.use_nodes = True
principled = mat.node_tree.nodes.get('Principled BSDF')
if not principled:
principled = mat.node_tree.nodes.new('ShaderNodeBsdfPrincipled')
output = mat.node_tree.nodes.get('Material Output')
if not output:
output = mat.node_tree.nodes.new('ShaderNodeOutputMaterial')
if not principled.outputs[0].links:
mat.node_tree.links.new(principled.outputs[0], output.inputs[0])
if color and len(color) >= 3:
principled.inputs['Base Color'].default_value = (
color[0],
color[1],
color[2],
1.0 if len(color) < 4 else color[3]
)
print(f"Set material color to {color}")
if mat:
if not obj.data.materials:
obj.data.materials.append(mat)
else:
obj.data.materials[0] = mat
print(f"Assigned material {mat.name} to object {object_name}")
return {
"status": "success",
"object": object_name,
"material": mat.name,
"color": color if color else None
}
else:
raise ValueError(f"Failed to create or find material: {material_name}")
except Exception as e:
print(f"Error in set_material: {str(e)}")
traceback.print_exc()
return {
"status": "error",
"message": str(e),
"object": object_name,
"material": material_name if 'material_name' in locals() else None
}
def get_polyhaven_categories(self, asset_type):
"""Get categories for a specific asset type from Polyhaven"""
try:
if asset_type not in ["hdris", "textures", "models", "all"]:
return {"error": f"Invalid asset type: {asset_type}. Must be one of: hdris, textures, models, all"}
response = requests.get(f"https://api.polyhaven.com/categories/{asset_type}")
if response.status_code == 200:
return {"categories": response.json()}
else:
return {"error": f"API request failed with status code {response.status_code}"}
except Exception as e:
return {"error": str(e)}
def search_polyhaven_assets(self, asset_type=None, categories=None):
"""Search for assets from Polyhaven with optional filtering"""
try:
url = "https://api.polyhaven.com/assets"
params = {}
if asset_type and asset_type != "all":
if asset_type not in ["hdris", "textures", "models"]:
return {"error": f"Invalid asset type: {asset_type}. Must be one of: hdris, textures, models, all"}
params["type"] = asset_type
if categories:
params["categories"] = categories
response = requests.get(url, params=params)
if response.status_code == 200:
assets = response.json()
limited_assets = {}
for i, (key, value) in enumerate(assets.items()):
if i >= 20:
break
limited_assets[key] = value
return {"assets": limited_assets, "total_count": len(assets), "returned_count": len(limited_assets)}
else:
return {"error": f"API request failed with status code {response.status_code}"}
except Exception as e:
return {"error": str(e)}
def download_polyhaven_asset(self, asset_id, asset_type, resolution="1k", file_format=None):
"""Downloads and imports a PolyHaven asset."""
try:
files_response = requests.get(f"https://api.polyhaven.com/files/{asset_id}")
if files_response.status_code != 200:
return {"error": f"Failed to get asset files: {files_response.status_code}"}
files_data = files_response.json()
if asset_type == "hdris":
if not file_format:
file_format = "hdr"
if "hdri" in files_data and resolution in files_data["hdri"] and file_format in files_data["hdri"][resolution]:
file_info = files_data["hdri"][resolution][file_format]
file_url = file_info["url"]
tmp_path = None
try:
with tempfile.NamedTemporaryFile(suffix=f".{file_format}", delete=False) as tmp_file:
response = requests.get(file_url)
if response.status_code != 200:
return {"error": f"Failed to download HDRI: {response.status_code}"}
tmp_file.write(response.content)
tmp_path = tmp_file.name
if not bpy.data.worlds:
bpy.data.worlds.new("World")
world = bpy.data.worlds[0]
world.use_nodes = True
node_tree = world.node_tree
for node in node_tree.nodes:
node_tree.nodes.remove(node)
tex_coord = node_tree.nodes.new(type='ShaderNodeTexCoord')
tex_coord.location = (-800, 0)
mapping = node_tree.nodes.new(type='ShaderNodeMapping')
mapping.location = (-600, 0)
env_tex = node_tree.nodes.new(type='ShaderNodeTexEnvironment')
env_tex.location = (-400, 0)
env_tex.image = bpy.data.images.load(tmp_path)
if file_format.lower() == 'exr':
try:
env_tex.image.colorspace_settings.name = 'Linear'
except:
env_tex.image.colorspace_settings.name = 'Non-Color'
else:
for color_space in ['Linear', 'Linear Rec.709', 'Non-Color']:
try:
env_tex.image.colorspace_settings.name = color_space
break
except:
continue
background = node_tree.nodes.new(type='ShaderNodeBackground')
background.location = (-200, 0)
output = node_tree.nodes.new(type='ShaderNodeOutputWorld')
output.location = (0, 0)
node_tree.links.new(tex_coord.outputs['Generated'], mapping.inputs['Vector'])
node_tree.links.new(mapping.outputs['Vector'], env_tex.inputs['Vector'])
node_tree.links.new(env_tex.outputs['Color'], background.inputs['Color'])
node_tree.links.new(background.outputs['Background'], output.inputs['Surface'])
bpy.context.scene.world = world
return {
"success": True,
"message": f"HDRI {asset_id} imported successfully",
"image_name": env_tex.image.name
}
except Exception as e:
return {"error": f"Failed to set up HDRI: {str(e)}"}
finally:
if tmp_path and os.path.exists(tmp_path):
os.remove(tmp_path)
else:
return {"error": f"Resolution/format unavailable."}
elif asset_type == "textures":
if not file_format:
file_format = "jpg"
downloaded_maps = {}
try:
for map_type in files_data:
if map_type not in ["blend", "gltf"]:
if resolution in files_data[map_type] and file_format in files_data[map_type][resolution]:
file_info = files_data[map_type][resolution][file_format]
file_url = file_info["url"]
with tempfile.NamedTemporaryFile(suffix=f".{file_format}", delete=False) as tmp_file:
response = requests.get(file_url)
if response.status_code == 200:
tmp_file.write(response.content)
tmp_path = tmp_file.name
image = bpy.data.images.load(tmp_path)
image.name = f"{asset_id}_{map_type}.{file_format}"
image.pack()
if map_type in ['color', 'diffuse', 'albedo']:
try:
image.colorspace_settings.name = 'sRGB'
except:
pass
else:
try:
image.colorspace_settings.name = 'Non-Color'
except:
pass
downloaded_maps[map_type] = image
try:
os.unlink(tmp_path)
except:
pass
if not downloaded_maps:
return {"error": f"No texture maps found."}
mat = bpy.data.materials.new(name=asset_id)
mat.use_nodes = True
nodes = mat.node_tree.nodes
links = mat.node_tree.links
for node in nodes:
nodes.remove(node)
output = nodes.new(type='ShaderNodeOutputMaterial')
output.location = (300, 0)
principled = nodes.new(type='ShaderNodeBsdfPrincipled')
principled.location = (0, 0)
links.new(principled.outputs[0], output.inputs[0])
tex_coord = nodes.new(type='ShaderNodeTexCoord')
tex_coord.location = (-800, 0)
mapping = nodes.new(type='ShaderNodeMapping')
mapping.location = (-600, 0)
mapping.vector_type = 'TEXTURE'
links.new(tex_coord.outputs['UV'], mapping.inputs['Vector'])
x_pos = -400
y_pos = 300
for map_type, image in downloaded_maps.items():
tex_node = nodes.new(type='ShaderNodeTexImage')
tex_node.location = (x_pos, y_pos)
tex_node.image = image
if map_type.lower() in ['color', 'diffuse', 'albedo']:
try:
tex_node.image.colorspace_settings.name = 'sRGB'
except:
pass
else:
try:
tex_node.image.colorspace_settings.name = 'Non-Color'
except:
pass
links.new(mapping.outputs['Vector'], tex_node.inputs['Vector'])
if map_type.lower() in ['color', 'diffuse', 'albedo']:
links.new(tex_node.outputs['Color'], principled.inputs['Base Color'])
elif map_type.lower() in ['roughness', 'rough']:
links.new(tex_node.outputs['Color'], principled.inputs['Roughness'])
elif map_type.lower() in ['metallic', 'metalness', 'metal']:
links.new(tex_node.outputs['Color'], principled.inputs['Metallic'])
elif map_type.lower() in ['normal', 'nor']:
normal_map = nodes.new(type='ShaderNodeNormalMap')
normal_map.location = (x_pos + 200, y_pos)
links.new(tex_node.outputs['Color'], normal_map.inputs['Color'])
links.new(normal_map.outputs['Normal'], principled.inputs['Normal'])
elif map_type in ['displacement', 'disp', 'height']:
disp_node = nodes.new(type='ShaderNodeDisplacement')
disp_node.location = (x_pos + 200, y_pos - 200)
links.new(tex_node.outputs['Color'], disp_node.inputs['Height'])
links.new(disp_node.outputs['Displacement'], output.inputs['Displacement'])
y_pos -= 250
return {
"success": True,
"message": f"Texture {asset_id} imported as material",
"material": mat.name,
"maps": list(downloaded_maps.keys())
}
except Exception as e:
return {"error": f"Failed to process textures: {str(e)}"}
elif asset_type == "models":
if not file_format:
file_format = "gltf"
if file_format in files_data and resolution in files_data[file_format]:
file_info = files_data[file_format][resolution][file_format]
file_url = file_info["url"]
temp_dir = tempfile.mkdtemp()
main_file_path = ""
try:
main_file_name = file_url.split("/")[-1]
main_file_path = os.path.join(temp_dir, main_file_name)
response = requests.get(file_url)
if response.status_code != 200:
return {"error": f"Failed to download model: {response.status_code}"}
with open(main_file_path, "wb") as f:
f.write(response.content)
if "include" in file_info and file_info["include"]:
for include_path, include_info in file_info["include"].items():
include_url = include_info["url"]
include_file_path = os.path.join(temp_dir, include_path)
os.makedirs(os.path.dirname(include_file_path), exist_ok=True)
include_response = requests.get(include_url)
if include_response.status_code == 200:
with open(include_file_path, "wb") as f:
f.write(include_response.content)
else:
print(f"Failed to download included file: {include_path}")
if file_format == "gltf" or file_format == "glb":
bpy.ops.import_scene.gltf(filepath=main_file_path)
elif file_format == "fbx":
bpy.ops.import_scene.fbx(filepath=main_file_path)
elif file_format == "obj":
bpy.ops.import_scene.obj(filepath=main_file_path)
elif file_format == "blend":
with bpy.data.libraries.load(main_file_path, link=False) as (data_from, data_to):
data_to.objects = data_from.objects
for obj in data_to.objects:
if obj is not None:
bpy.context.collection.objects.link(obj)
else:
return {"error": f"Unsupported model format: {file_format}"}
imported_objects = [obj.name for obj in bpy.context.selected_objects]
return {
"success": True,
"message": f"Model {asset_id} imported successfully",
"imported_objects": imported_objects
}
except Exception as e:
return {"error": f"Failed to import model: {str(e)}"}
finally:
try:
shutil.rmtree(temp_dir)
except:
print(f"Failed to clean up: {temp_dir}")
else:
return {"error": f"Format/resolution unavailable."}
else:
return {"error": f"Unsupported asset type: {asset_type}"}
except Exception as e:
return {"error": f"Failed to download asset: {str(e)}"}
def set_texture(self, object_name, texture_id):
"""Apply a previously downloaded Polyhaven texture."""
try:
obj = bpy.data.objects.get(object_name)
if not obj:
return {"error": f"Object not found: {object_name}"}
if not hasattr(obj, 'data') or not hasattr(obj.data, 'materials'):
return {"error": f"Object {object_name} cannot accept materials"}
texture_images = {}
for img in bpy.data.images:
if img.name.startswith(texture_id + "_"):
map_type = img.name.split('_')[-1].split('.')[0]
img.reload()
if map_type.lower() in ['color', 'diffuse', 'albedo']:
try:
img.colorspace_settings.name = 'sRGB'
except:
pass
else:
try:
img.colorspace_settings.name = 'Non-Color'
except:
pass
if not img.packed_file:
img.pack()
texture_images[map_type] = img
print(f"Loaded: {map_type} - {img.name}")
print(f"Size: {img.size[0]}x{img.size[1]}")
print(f"Colorspace: {img.colorspace_settings.name}")
print(f"Format: {img.file_format}")
print(f"Packed: {bool(img.packed_file)}")
if not texture_images:
return {"error": f"No images found for: {texture_id}."}
new_mat_name = f"{texture_id}_material_{object_name}"
existing_mat = bpy.data.materials.get(new_mat_name)
if existing_mat:
bpy.data.materials.remove(existing_mat)
new_mat = bpy.data.materials.new(name=new_mat_name)
new_mat.use_nodes = True
nodes = new_mat.node_tree.nodes
links = new_mat.node_tree.links
nodes.clear()
output = nodes.new(type='ShaderNodeOutputMaterial')
output.location = (600, 0)
principled = nodes.new(type='ShaderNodeBsdfPrincipled')
principled.location = (300, 0)
links.new(principled.outputs[0], output.inputs[0])
tex_coord = nodes.new(type='ShaderNodeTexCoord')
tex_coord.location = (-800, 0)
mapping = nodes.new(type='ShaderNodeMapping')
mapping.location = (-600, 0)
mapping.vector_type = 'TEXTURE'
links.new(tex_coord.outputs['UV'], mapping.inputs['Vector'])
x_pos = -400
y_pos = 300
for map_type, image in texture_images.items():
tex_node = nodes.new(type='ShaderNodeTexImage')
tex_node.location = (x_pos, y_pos)
tex_node.image = image
if map_type.lower() in ['color', 'diffuse', 'albedo']:
try:
tex_node.image.colorspace_settings.name = 'sRGB'
except:
pass
else:
try:
tex_node.image.colorspace_settings.name = 'Non-Color'
except:
pass
links.new(mapping.outputs['Vector'], tex_node.inputs['Vector'])
if map_type.lower() in ['color', 'diffuse', 'albedo']:
links.new(tex_node.outputs['Color'], principled.inputs['Base Color'])
elif map_type.lower() in ['roughness', 'rough']:
links.new(tex_node.outputs['Color'], principled.inputs['Roughness'])
elif map_type.lower() in ['metallic', 'metalness', 'metal']:
links.new(tex_node.outputs['Color'], principled.inputs['Metallic'])
elif map_type.lower() in ['normal', 'nor', 'dx', 'gl']:
normal_map = nodes.new(type='ShaderNodeNormalMap')
normal_map.location = (x_pos + 200, y_pos)
links.new(tex_node.outputs['Color'], normal_map.inputs['Color'])
links.new(normal_map.outputs['Normal'], principled.inputs['Normal'])
elif map_type.lower() in ['displacement', 'disp', 'height']:
disp_node = nodes.new(type='ShaderNodeDisplacement')
disp_node.location = (x_pos + 200, y_pos - 200)
disp_node.inputs['Scale'].default_value = 0.1
links.new(tex_node.outputs['Color'], disp_node.inputs['Height'])
links.new(disp_node.outputs['Displacement'], output.inputs['Displacement'])
y_pos -= 250
texture_nodes = {}
for node in nodes:
if node.type == 'TEX_IMAGE' and node.image:
for map_type, image in texture_images.items():
if node.image == image:
texture_nodes[map_type] = node
break
for map_name in ['color', 'diffuse', 'albedo']:
if map_name in texture_nodes:
links.new(texture_nodes[map_name].outputs['Color'], principled.inputs['Base Color'])
print(f"Connected {map_name} to Base Color")
break
for map_name in ['roughness', 'rough']:
if map_name in texture_nodes:
links.new(texture_nodes[map_name].outputs['Color'], principled.inputs['Roughness'])
print(f"Connected {map_name} to Roughness")
break
for map_name in ['metallic', 'metalness', 'metal']:
if map_name in texture_nodes:
links.new(texture_nodes[map_name].outputs['Color'], principled.inputs['Metallic'])
print(f"Connected {map_name} to Metallic")
break
for map_name in ['gl', 'dx', 'nor']:
if map_name in texture_nodes:
normal_map_node = nodes.new(type='ShaderNodeNormalMap')
normal_map_node.location = (100, 100)
links.new(texture_nodes[map_name].outputs['Color'], normal_map_node.inputs['Color'])
links.new(normal_map_node.outputs['Normal'], principled.inputs['Normal'])
print(f"Connected {map_name} to Normal")
break
for map_name in ['displacement', 'disp', 'height']:
if map_name in texture_nodes:
disp_node = nodes.new(type='ShaderNodeDisplacement')
disp_node.location = (300, -200)
disp_node.inputs['Scale'].default_value = 0.1
links.new(texture_nodes[map_name].outputs['Color'], disp_node.inputs['Height'])
links.new(disp_node.outputs['Displacement'], output.inputs['Displacement'])
print(f"Connected {map_name} to Displacement")
break
if 'arm' in texture_nodes:
separate_rgb = nodes.new(type='ShaderNodeSeparateRGB')
separate_rgb.location = (-200, -100)
links.new(texture_nodes['arm'].outputs['Color'], separate_rgb.inputs['Image'])
if not any(map_name in texture_nodes for map_name in ['roughness', 'rough']):
links.new(separate_rgb.outputs['G'], principled.inputs['Roughness'])
print("Connected ARM.G to Roughness")
if not any(map_name in texture_nodes for map_name in ['metallic', 'metalness', 'metal']):
links.new(separate_rgb.outputs['B'], principled.inputs['Metallic'])
print("Connected ARM.B to Metallic")
base_color_node = None
for map_name in ['color', 'diffuse', 'albedo']:
if map_name in texture_nodes:
base_color_node = texture_nodes[map_name]
break
if base_color_node:
mix_node = nodes.new(type='ShaderNodeMixRGB')
mix_node.location = (100, 200)
mix_node.blend_type = 'MULTIPLY'
mix_node.inputs['Fac'].default_value = 0.8
for link in base_color_node.outputs['Color'].links:
if link.to_socket == principled.inputs['Base Color']:
links.remove(link)
links.new(base_color_node.outputs['Color'], mix_node.inputs[1])
links.new(separate_rgb.outputs['R'], mix_node.inputs[2])
links.new(mix_node.outputs['Color'], principled.inputs['Base Color'])
print("Connected ARM.R to AO mix with Base Color")
if 'ao' in texture_nodes:
base_color_node = None
for map_name in ['color', 'diffuse', 'albedo']:
if map_name in texture_nodes:
base_color_node = texture_nodes[map_name]
break
if base_color_node:
mix_node = nodes.new(type='ShaderNodeMixRGB')
mix_node.location = (100, 200)
mix_node.blend_type = 'MULTIPLY'
mix_node.inputs['Fac'].default_value = 0.8
for link in base_color_node.outputs['Color'].links:
if link.to_socket == principled.inputs['Base Color']:
links.remove(link)
links.new(base_color_node.outputs['Color'], mix_node.inputs[1])
links.new(texture_nodes['ao'].outputs['Color'], mix_node.inputs[2])
links.new(mix_node.outputs['Color'], principled.inputs['Base Color'])
print("Connected AO to mix with Base Color")
while len(obj.data.materials) > 0:
obj.data.materials.pop(index=0)
obj.data.materials.append(new_mat)
bpy.context.view_layer.objects.active = obj
obj.select_set(True)
bpy.context.view_layer.update()
texture_maps = list(texture_images.keys())
material_info = {
"name": new_mat.name,
"has_nodes": new_mat.use_nodes,
"node_count": len(new_mat.node_tree.nodes),
"texture_nodes": []
}
for node in new_mat.node_tree.nodes:
if node.type == 'TEX_IMAGE' and node.image:
connections = []
for output in node.outputs:
for link in output.links:
connections.append(f"{output.name} → {link.to_node.name}.{link.to_socket.name}")
material_info["texture_nodes"].append({
"name": node.name,
"image": node.image.name,
"colorspace": node.image.colorspace_settings.name,
"connections": connections
})
return {
"success": True,
"message": f"Created new material and applied texture {texture_id} to {object_name}",
"material": new_mat.name,
"maps": texture_maps,
"material_info": material_info
}
except Exception as e:
print(f"Error in set_texture: {str(e)}")
traceback.print_exc()
return {"error": f"Failed to apply texture: {str(e)}"}
def get_polyhaven_status(self):
enabled = bpy.context.scene.blendermcp_use_polyhaven
if enabled:
return {"enabled": True, "message": "PolyHaven integration is enabled and ready to use."}
else:
return {
"enabled": False,
"message": """PolyHaven integration is currently disabled. To enable it:
1. In the 3D Viewport, find the BlenderMCP panel in the sidebar (press N if hidden)
2. Check the 'Use assets from Poly Haven' checkbox
3. Restart the connection"""
}
class BLENDERMCP_PT_Panel(bpy.types.Panel):
bl_label = "Blender MCP"
bl_idname = "BLENDERMCP_PT_Panel"
bl_space_type = 'VIEW_3D'
bl_region_type = 'UI'
bl_category = 'BlenderMCP'
def draw(self, context):
layout = self.layout
scene = context.scene
layout.prop(scene, "blendermcp_port")
layout.prop(scene, "blendermcp_use_polyhaven", text="Use assets from Poly Haven")
if not scene.blendermcp_server_running:
layout.operator("blendermcp.start_server", text="Start MCP Server")
else:
layout.operator("blendermcp.stop_server", text="Stop MCP Server")
layout.label(text=f"Running on port {scene.blendermcp_port}")
class BLENDERMCP_OT_StartServer(bpy.types.Operator):
bl_idname = "blendermcp.start_server"
bl_label = "Connect to Local AI" # Updated label
bl_description = "Start the BlenderMCP server to connect with a local AI model" # Updated description
def execute(self, context):
scene = context.scene
if not hasattr(bpy.types, "blendermcp_server") or not bpy.types.blendermcp_server:
bpy.types.blendermcp_server = BlenderMCPServer(port=scene.blendermcp_port)
bpy.types.blendermcp_server.start()
scene.blendermcp_server_running = True
return {'FINISHED'}
class BLENDERMCP_OT_StopServer(bpy.types.Operator):
bl_idname = "blendermcp.stop_server"
bl_label = "Stop the connection" # Updated
bl_description = "Stop Server" # Updated
def execute(self, context):
scene = context.scene
if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server:
bpy.types.blendermcp_server.stop()
del bpy.types.blendermcp_server
scene.blendermcp_server_running = False
return {'FINISHED'}
def register():
bpy.types.Scene.blendermcp_port = IntProperty(
name="Port",
description="Port for the BlenderMCP server",
default=9876,
min=1024,
max=65535
)
bpy.types.Scene.blendermcp_server_running = bpy.props.BoolProperty(
name="Server Running",
default=False
)
bpy.types.Scene.blendermcp_use_polyhaven = bpy.props.BoolProperty(
name="Use Poly Haven",
description="Enable Poly Haven asset integration",
default=False
)
bpy.utils.register_class(BLENDERMCP_PT_Panel)
bpy.utils.register_class(BLENDERMCP_OT_StartServer)
bpy.utils.register_class(BLENDERMCP_OT_StopServer)
print("BlenderMCP addon registered")
def unregister():
if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server:
bpy.types.blendermcp_server.stop()
del bpy.types.blendermcp_server
bpy.utils.unregister_class(BLENDERMCP_PT_Panel)
bpy.utils.unregister_class(BLENDERMCP_OT_StartServer)
bpy.utils.unregister_class(BLENDERMCP_OT_StopServer)
del bpy.types.Scene.blendermcp_port
del bpy.types.Scene.blendermcp_server_running
del bpy.types.Scene.blendermcp_use_polyhaven
print("BlenderMCP addon unregistered")
if __name__ == "__main__":
register()
```