#
tokens: 2739/50000 5/5 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── nuke_bridge.py
├── package-lock.json
├── package.json
├── README.md
└── src
    ├── index.js
    └── server.js
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# nuke-mcp-2
nuke-mcp

```

--------------------------------------------------------------------------------
/src/index.js:
--------------------------------------------------------------------------------

```javascript
#!/usr/bin/env node
    import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
    import { server } from './server.js';

    console.log('Starting Nuke MCP server...');

    // Start receiving messages on stdin and sending messages on stdout
    const transport = new StdioServerTransport();
    await server.connect(transport);

```

--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------

```json
{
      "name": "mcp-nuke-server",
      "version": "1.0.0",
      "type": "module",
      "description": "MCP server for Nuke integration",
      "main": "src/index.js",
      "bin": {
        "mcp-nuke-server": "src/index.js"
      },
      "scripts": {
        "start": "node src/index.js",
        "dev": "node --watch src/index.js",
        "inspect": "npx -y @modelcontextprotocol/inspector node src/index.js"
      },
      "dependencies": {
        "@modelcontextprotocol/sdk": "^1.0.0",
        "zod": "^3.22.4"
      }
    }

```

--------------------------------------------------------------------------------
/src/server.js:
--------------------------------------------------------------------------------

```javascript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
    import { z } from 'zod';
    import { exec } from 'child_process';
    import { promisify } from 'util';

    const execAsync = promisify(exec);

    // Create an MCP server for Nuke integration
    const server = new McpServer({
      name: "Nuke MCP Server",
      version: "1.0.0",
      description: "MCP server for interacting with Nuke"
    });

    // Helper function to execute Python bridge script
    async function executeBridge(command, args = {}) {
      try {
        // Convert args to JSON string and escape quotes for command line
        const argsJson = JSON.stringify(args).replace(/"/g, '\\"');
        const { stdout, stderr } = await execAsync(`python nuke_bridge.py ${command} "${argsJson}"`);
        
        if (stderr) {
          console.error(`Bridge script error: ${stderr}`);
          return {
            content: [{ type: "text", text: `Error: ${stderr}` }],
            isError: true
          };
        }
        
        try {
          const result = JSON.parse(stdout);
          if (result.error) {
            return {
              content: [{ type: "text", text: `Error: ${result.error}` }],
              isError: true
            };
          }
          return {
            content: [{ type: "text", text: JSON.stringify(result.data, null, 2) }]
          };
        } catch (parseError) {
          console.error(`Error parsing bridge output: ${parseError.message}`);
          return {
            content: [{ type: "text", text: `Error parsing bridge output: ${parseError.message}\nRaw output: ${stdout}` }],
            isError: true
          };
        }
      } catch (error) {
        console.error(`Error executing bridge script: ${error.message}`);
        return {
          content: [{ type: "text", text: `Error executing bridge script: ${error.message}` }],
          isError: true
        };
      }
    }

    // 1. createNode tool
    server.tool(
      "createNode",
      {
        nodeType: z.string().describe("Type of node to create (e.g., 'Read', 'Merge2')"),
        name: z.string().optional().describe("Optional name for the node"),
        inputs: z.array(z.string()).optional().describe("Optional array of input node names")
      },
      async ({ nodeType, name, inputs }) => {
        return await executeBridge("createNode", { nodeType, name, inputs });
      },
      { description: "Creates a node of the specified type in a Nuke script" }
    );

    // 2. setKnobValue tool
    server.tool(
      "setKnobValue",
      {
        nodeName: z.string().describe("Name of the node"),
        knobName: z.string().describe("Name of the knob to set"),
        value: z.union([z.string(), z.number()]).describe("Value to set the knob to")
      },
      async ({ nodeName, knobName, value }) => {
        return await executeBridge("setKnobValue", { nodeName, knobName, value });
      },
      { description: "Sets a knob on the specified node to the provided value" }
    );

    // 3. getNode tool
    server.tool(
      "getNode",
      {
        nodeName: z.string().describe("Name of the node to get information about")
      },
      async ({ nodeName }) => {
        return await executeBridge("getNode", { nodeName });
      },
      { description: "Returns basic info about a node (type, knob values, etc.)" }
    );

    // 4. execute tool
    server.tool(
      "execute",
      {
        writeNodeName: z.string().describe("Name of the Write node to render"),
        frameRangeStart: z.number().describe("Start frame for rendering"),
        frameRangeEnd: z.number().describe("End frame for rendering")
      },
      async ({ writeNodeName, frameRangeStart, frameRangeEnd }) => {
        return await executeBridge("execute", { writeNodeName, frameRangeStart, frameRangeEnd });
      },
      { description: "Renders the specified Write node from start to end frames" }
    );

    export { server };

```

--------------------------------------------------------------------------------
/nuke_bridge.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
    import sys
    import json
    import traceback

    # Function to handle response formatting
    def respond(success=True, data=None, error=None):
        response = {
            "success": success,
            "data": data
        }
        if error:
            response["error"] = error
        print(json.dumps(response))

    # Main function to process commands
    def main():
        if len(sys.argv) < 2:
            respond(False, None, "No command specified")
            return

        command = sys.argv[1]
        args = {}
        
        # Parse arguments if provided
        if len(sys.argv) > 2:
            try:
                args = json.loads(sys.argv[2])
            except json.JSONDecodeError as e:
                respond(False, None, f"Invalid JSON arguments: {str(e)}")
                return

        try:
            # Import nuke here to avoid errors when running outside of Nuke
            try:
                import nuke
            except ImportError:
                respond(False, None, "Failed to import nuke module. Make sure this script is run within Nuke.")
                return

            # Process the command
            if command == "createNode":
                create_node(args, nuke)
            elif command == "setKnobValue":
                set_knob_value(args, nuke)
            elif command == "getNode":
                get_node(args, nuke)
            elif command == "execute":
                execute(args, nuke)
            else:
                respond(False, None, f"Unknown command: {command}")
        except Exception as e:
            respond(False, None, f"Error executing command: {str(e)}\n{traceback.format_exc()}")

    # Command implementations
    def create_node(args, nuke):
        if "nodeType" not in args:
            respond(False, None, "nodeType parameter is required")
            return
            
        node_type = args["nodeType"]
        name = args.get("name")
        inputs = args.get("inputs", [])
        
        try:
            # Create the node
            node = nuke.createNode(node_type, inpanel=False)
            
            # Set name if provided
            if name:
                node["name"].setValue(name)
                
            # Connect inputs if provided
            for i, input_name in enumerate(inputs):
                input_node = nuke.toNode(input_name)
                if input_node:
                    node.setInput(i, input_node)
                else:
                    respond(False, None, f"Input node not found: {input_name}")
                    return
            
            # Return node info
            node_info = {
                "name": node.name(),
                "type": node.Class(),
                "position": {"x": node.xpos(), "y": node.ypos()}
            }
            respond(True, node_info)
        except Exception as e:
            respond(False, None, f"Error creating node: {str(e)}")

    def set_knob_value(args, nuke):
        if "nodeName" not in args:
            respond(False, None, "nodeName parameter is required")
            return
        if "knobName" not in args:
            respond(False, None, "knobName parameter is required")
            return
        if "value" not in args:
            respond(False, None, "value parameter is required")
            return
            
        node_name = args["nodeName"]
        knob_name = args["knobName"]
        value = args["value"]
        
        try:
            node = nuke.toNode(node_name)
            if not node:
                respond(False, None, f"Node not found: {node_name}")
                return
                
            if knob_name not in node.knobs():
                respond(False, None, f"Knob not found: {knob_name}")
                return
                
            node[knob_name].setValue(value)
            
            respond(True, {
                "node": node_name,
                "knob": knob_name,
                "value": value
            })
        except Exception as e:
            respond(False, None, f"Error setting knob value: {str(e)}")

    def get_node(args, nuke):
        if "nodeName" not in args:
            respond(False, None, "nodeName parameter is required")
            return
            
        node_name = args["nodeName"]
        
        try:
            node = nuke.toNode(node_name)
            if not node:
                respond(False, None, f"Node not found: {node_name}")
                return
                
            # Collect basic node info
            node_info = {
                "name": node.name(),
                "type": node.Class(),
                "position": {"x": node.xpos(), "y": node.ypos()},
                "knobs": {}
            }
            
            # Collect knob values (only for basic types that can be serialized)
            for knob in node.knobs():
                try:
                    k = node[knob]
                    # Handle different knob types
                    if k.Class() in ["String_Knob", "File_Knob", "Text_Knob"]:
                        node_info["knobs"][knob] = k.value()
                    elif k.Class() in ["Int_Knob", "Double_Knob", "Boolean_Knob", "XY_Knob"]:
                        node_info["knobs"][knob] = k.value()
                    # Skip complex knobs that can't be easily serialized
                except:
                    pass
                    
            respond(True, node_info)
        except Exception as e:
            respond(False, None, f"Error getting node info: {str(e)}")

    def execute(args, nuke):
        if "writeNodeName" not in args:
            respond(False, None, "writeNodeName parameter is required")
            return
        if "frameRangeStart" not in args:
            respond(False, None, "frameRangeStart parameter is required")
            return
        if "frameRangeEnd" not in args:
            respond(False, None, "frameRangeEnd parameter is required")
            return
            
        write_node_name = args["writeNodeName"]
        frame_start = args["frameRangeStart"]
        frame_end = args["frameRangeEnd"]
        
        try:
            write_node = nuke.toNode(write_node_name)
            if not write_node:
                respond(False, None, f"Write node not found: {write_node_name}")
                return
                
            if write_node.Class() != "Write":
                respond(False, None, f"Node {write_node_name} is not a Write node")
                return
                
            # Execute the write node
            nuke.execute(write_node, int(frame_start), int(frame_end))
            
            respond(True, {
                "node": write_node_name,
                "rendered": True,
                "frameRange": {"start": frame_start, "end": frame_end},
                "outputPath": write_node["file"].value()
            })
        except Exception as e:
            respond(False, None, f"Error executing write node: {str(e)}")

    if __name__ == "__main__":
        main()

```