# Directory Structure ``` ├── .gitignore ├── .python-version ├── AbletonMCP_Remote_Script │ └── __init__.py ├── Dockerfile ├── LICENSE ├── MCP_Server │ ├── __init__.py │ └── server.py ├── pyproject.toml ├── README.md ├── smithery.yaml └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.13 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv .DS_Store ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # AbletonMCP - Ableton Live Model Context Protocol Integration [](https://smithery.ai/server/@ahujasid/ableton-mcp) AbletonMCP connects Ableton Live to Claude AI through the Model Context Protocol (MCP), allowing Claude to directly interact with and control Ableton Live. This integration enables prompt-assisted music production, track creation, and Live session manipulation. ### Join the Community Give feedback, get inspired, and build on top of the MCP: [Discord](https://discord.gg/3ZrMyGKnaU). Made by [Siddharth](https://x.com/sidahuj) ## Features - **Two-way communication**: Connect Claude AI to Ableton Live through a socket-based server - **Track manipulation**: Create, modify, and manipulate MIDI and audio tracks - **Instrument and effect selection**: Claude can access and load the right instruments, effects and sounds from Ableton's library - **Clip creation**: Create and edit MIDI clips with notes - **Session control**: Start and stop playback, fire clips, and control transport ## Components The system consists of two main components: 1. **Ableton Remote Script** (`Ableton_Remote_Script/__init__.py`): A MIDI Remote Script for Ableton Live that creates a socket server to receive and execute commands 2. **MCP Server** (`server.py`): A Python server that implements the Model Context Protocol and connects to the Ableton Remote Script ## Installation ### Installing via Smithery To install Ableton Live Integration for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@ahujasid/ableton-mcp): ```bash npx -y @smithery/cli install @ahujasid/ableton-mcp --client claude ``` ### Prerequisites - Ableton Live 10 or newer - Python 3.8 or newer - [uv package manager](https://astral.sh/uv) If you're on Mac, please install uv as: ``` brew install uv ``` Otherwise, install from [uv's official website][https://docs.astral.sh/uv/getting-started/installation/] ⚠️ Do not proceed before installing UV ### Claude for Desktop Integration [Follow along with the setup instructions video](https://youtu.be/iJWJqyVuPS8) 1. Go to Claude > Settings > Developer > Edit Config > claude_desktop_config.json to include the following: ```json { "mcpServers": { "AbletonMCP": { "command": "uvx", "args": [ "ableton-mcp" ] } } } ``` ### Cursor Integration Run ableton-mcp without installing it permanently through uvx. Go to Cursor Settings > MCP and paste this as a command: ``` uvx ableton-mcp ``` ⚠️ Only run one instance of the MCP server (either on Cursor or Claude Desktop), not both ### Installing the Ableton Remote Script [Follow along with the setup instructions video](https://youtu.be/iJWJqyVuPS8) 1. Download the `AbletonMCP_Remote_Script/__init__.py` file from this repo 2. Copy the folder to Ableton's MIDI Remote Scripts directory. Different OS and versions have different locations. **One of these should work, you might have to look**: **For macOS:** - Method 1: Go to Applications > Right-click on Ableton Live app → Show Package Contents → Navigate to: `Contents/App-Resources/MIDI Remote Scripts/` - Method 2: If it's not there in the first method, use the direct path (replace XX with your version number): `/Users/[Username]/Library/Preferences/Ableton/Live XX/User Remote Scripts` **For Windows:** - Method 1: C:\Users\[Username]\AppData\Roaming\Ableton\Live x.x.x\Preferences\User Remote Scripts - Method 2: `C:\ProgramData\Ableton\Live XX\Resources\MIDI Remote Scripts\` - Method 3: `C:\Program Files\Ableton\Live XX\Resources\MIDI Remote Scripts\` *Note: Replace XX with your Ableton version number (e.g., 10, 11, 12)* 4. Create a folder called 'AbletonMCP' in the Remote Scripts directory and paste the downloaded '\_\_init\_\_.py' file 3. Launch Ableton Live 4. Go to Settings/Preferences → Link, Tempo & MIDI 5. In the Control Surface dropdown, select "AbletonMCP" 6. Set Input and Output to "None" ## Usage ### Starting the Connection 1. Ensure the Ableton Remote Script is loaded in Ableton Live 2. Make sure the MCP server is configured in Claude Desktop or Cursor 3. The connection should be established automatically when you interact with Claude ### Using with Claude Once the config file has been set on Claude, and the remote script is running in Ableton, you will see a hammer icon with tools for the Ableton MCP. ## Capabilities - Get session and track information - Create and modify MIDI and audio tracks - Create, edit, and trigger clips - Control playback - Load instruments and effects from Ableton's browser - Add notes to MIDI clips - Change tempo and other session parameters ## Example Commands Here are some examples of what you can ask Claude to do: - "Create an 80s synthwave track" [Demo](https://youtu.be/VH9g66e42XA) - "Create a Metro Boomin style hip-hop beat" - "Create a new MIDI track with a synth bass instrument" - "Add reverb to my drums" - "Create a 4-bar MIDI clip with a simple melody" - "Get information about the current Ableton session" - "Load a 808 drum rack into the selected track" - "Add a jazz chord progression to the clip in track 1" - "Set the tempo to 120 BPM" - "Play the clip in track 2" ## Troubleshooting - **Connection issues**: Make sure the Ableton Remote Script is loaded, and the MCP server is configured on Claude - **Timeout errors**: Try simplifying your requests or breaking them into smaller steps - **Have you tried turning it off and on again?**: If you're still having connection errors, try restarting both Claude and Ableton Live ## Technical Details ### Communication Protocol The system uses a simple JSON-based protocol over TCP sockets: - Commands are sent as JSON objects with a `type` and optional `params` - Responses are JSON objects with a `status` and `result` or `message` ### Limitations & Security Considerations - Creating complex musical arrangements might need to be broken down into smaller steps - The tool is designed to work with Ableton's default devices and browser items - Always save your work before extensive experimentation ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. ## Disclaimer This is a third-party integration and not made by Ableton. ``` -------------------------------------------------------------------------------- /MCP_Server/__init__.py: -------------------------------------------------------------------------------- ```python """Ableton Live integration through the Model Context Protocol.""" __version__ = "0.1.0" # Expose key classes and functions for easier imports from .server import AbletonConnection, get_ableton_connection ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile FROM python:3.10-alpine # Install build dependencies RUN apk add --no-cache gcc musl-dev libffi-dev WORKDIR /app # Copy project files COPY . /app # Install Python dependencies RUN pip install --no-cache-dir . # Expose port if server uses it, although MCP might use stdio # Command to run the MCP server CMD ["python", "-m", "MCP_Server.server"] ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml startCommand: type: stdio configSchema: # JSON Schema defining the configuration options for the MCP. type: object properties: {} commandFunction: # A JS function that produces the CLI command based on the given config to start the MCP on stdio. |- (config) => ({ command: 'python', args: ['-m', 'MCP_Server.server'] }) exampleConfig: {} ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "ableton-mcp" version = "1.0.0" description = "Ableton Live integration through the Model Context Protocol" readme = "README.md" requires-python = ">=3.10" authors = [ {name = "Siddharth Ahuja", email = "[email protected]"} ] license = {text = "MIT"} classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ] dependencies = [ "mcp[cli]>=1.3.0", ] [project.scripts] ableton-mcp = "MCP_Server.server:main" [build-system] requires = ["setuptools>=61.0", "wheel"] build-backend = "setuptools.build_meta" [tool.setuptools] packages = ["MCP_Server"] [project.urls] "Homepage" = "https://github.com/ahujasid/ableton-mcp" "Bug Tracker" = "https://github.com/ahujasid/ableton-mcp/issues" ``` -------------------------------------------------------------------------------- /MCP_Server/server.py: -------------------------------------------------------------------------------- ```python # ableton_mcp_server.py from mcp.server.fastmcp import FastMCP, Context import socket import json import logging from dataclasses import dataclass from contextlib import asynccontextmanager from typing import AsyncIterator, Dict, Any, List, Union # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("AbletonMCPServer") @dataclass class AbletonConnection: host: str port: int sock: socket.socket = None def connect(self) -> bool: """Connect to the Ableton Remote Script socket server""" if self.sock: return True try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) logger.info(f"Connected to Ableton at {self.host}:{self.port}") return True except Exception as e: logger.error(f"Failed to connect to Ableton: {str(e)}") self.sock = None return False def disconnect(self): """Disconnect from the Ableton Remote Script""" if self.sock: try: self.sock.close() except Exception as e: logger.error(f"Error disconnecting from Ableton: {str(e)}") finally: self.sock = None def receive_full_response(self, sock, buffer_size=8192): """Receive the complete response, potentially in multiple chunks""" chunks = [] sock.settimeout(15.0) # Increased timeout for operations that might take longer try: while True: try: chunk = sock.recv(buffer_size) if not chunk: if not chunks: raise Exception("Connection closed before receiving any data") break chunks.append(chunk) # Check if we've received a complete JSON object try: data = b''.join(chunks) json.loads(data.decode('utf-8')) logger.info(f"Received complete response ({len(data)} bytes)") return data except json.JSONDecodeError: # Incomplete JSON, continue receiving continue except socket.timeout: logger.warning("Socket timeout during chunked receive") break except (ConnectionError, BrokenPipeError, ConnectionResetError) as e: logger.error(f"Socket connection error during receive: {str(e)}") raise except Exception as e: logger.error(f"Error during receive: {str(e)}") raise # If we get here, we either timed out or broke out of the loop if chunks: data = b''.join(chunks) logger.info(f"Returning data after receive completion ({len(data)} bytes)") try: json.loads(data.decode('utf-8')) return data except json.JSONDecodeError: raise Exception("Incomplete JSON response received") else: raise Exception("No data received") def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send a command to Ableton and return the response""" if not self.sock and not self.connect(): raise ConnectionError("Not connected to Ableton") command = { "type": command_type, "params": params or {} } # Check if this is a state-modifying command is_modifying_command = command_type in [ "create_midi_track", "create_audio_track", "set_track_name", "create_clip", "add_notes_to_clip", "set_clip_name", "set_tempo", "fire_clip", "stop_clip", "set_device_parameter", "start_playback", "stop_playback", "load_instrument_or_effect" ] try: logger.info(f"Sending command: {command_type} with params: {params}") # Send the command self.sock.sendall(json.dumps(command).encode('utf-8')) logger.info(f"Command sent, waiting for response...") # For state-modifying commands, add a small delay to give Ableton time to process if is_modifying_command: import time time.sleep(0.1) # 100ms delay # Set timeout based on command type timeout = 15.0 if is_modifying_command else 10.0 self.sock.settimeout(timeout) # Receive the response response_data = self.receive_full_response(self.sock) logger.info(f"Received {len(response_data)} bytes of data") # Parse the response response = json.loads(response_data.decode('utf-8')) logger.info(f"Response parsed, status: {response.get('status', 'unknown')}") if response.get("status") == "error": logger.error(f"Ableton error: {response.get('message')}") raise Exception(response.get("message", "Unknown error from Ableton")) # For state-modifying commands, add another small delay after receiving response if is_modifying_command: import time time.sleep(0.1) # 100ms delay return response.get("result", {}) except socket.timeout: logger.error("Socket timeout while waiting for response from Ableton") self.sock = None raise Exception("Timeout waiting for Ableton response") except (ConnectionError, BrokenPipeError, ConnectionResetError) as e: logger.error(f"Socket connection error: {str(e)}") self.sock = None raise Exception(f"Connection to Ableton lost: {str(e)}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON response from Ableton: {str(e)}") if 'response_data' in locals() and response_data: logger.error(f"Raw response (first 200 bytes): {response_data[:200]}") self.sock = None raise Exception(f"Invalid response from Ableton: {str(e)}") except Exception as e: logger.error(f"Error communicating with Ableton: {str(e)}") self.sock = None raise Exception(f"Communication error with Ableton: {str(e)}") @asynccontextmanager async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """Manage server startup and shutdown lifecycle""" try: logger.info("AbletonMCP server starting up") try: ableton = get_ableton_connection() logger.info("Successfully connected to Ableton on startup") except Exception as e: logger.warning(f"Could not connect to Ableton on startup: {str(e)}") logger.warning("Make sure the Ableton Remote Script is running") yield {} finally: global _ableton_connection if _ableton_connection: logger.info("Disconnecting from Ableton on shutdown") _ableton_connection.disconnect() _ableton_connection = None logger.info("AbletonMCP server shut down") # Create the MCP server with lifespan support mcp = FastMCP( "AbletonMCP", description="Ableton Live integration through the Model Context Protocol", lifespan=server_lifespan ) # Global connection for resources _ableton_connection = None def get_ableton_connection(): """Get or create a persistent Ableton connection""" global _ableton_connection if _ableton_connection is not None: try: # Test the connection with a simple ping # We'll try to send an empty message, which should fail if the connection is dead # but won't affect Ableton if it's alive _ableton_connection.sock.settimeout(1.0) _ableton_connection.sock.sendall(b'') return _ableton_connection except Exception as e: logger.warning(f"Existing connection is no longer valid: {str(e)}") try: _ableton_connection.disconnect() except: pass _ableton_connection = None # Connection doesn't exist or is invalid, create a new one if _ableton_connection is None: # Try to connect up to 3 times with a short delay between attempts max_attempts = 3 for attempt in range(1, max_attempts + 1): try: logger.info(f"Connecting to Ableton (attempt {attempt}/{max_attempts})...") _ableton_connection = AbletonConnection(host="localhost", port=9877) if _ableton_connection.connect(): logger.info("Created new persistent connection to Ableton") # Validate connection with a simple command try: # Get session info as a test _ableton_connection.send_command("get_session_info") logger.info("Connection validated successfully") return _ableton_connection except Exception as e: logger.error(f"Connection validation failed: {str(e)}") _ableton_connection.disconnect() _ableton_connection = None # Continue to next attempt else: _ableton_connection = None except Exception as e: logger.error(f"Connection attempt {attempt} failed: {str(e)}") if _ableton_connection: _ableton_connection.disconnect() _ableton_connection = None # Wait before trying again, but only if we have more attempts left if attempt < max_attempts: import time time.sleep(1.0) # If we get here, all connection attempts failed if _ableton_connection is None: logger.error("Failed to connect to Ableton after multiple attempts") raise Exception("Could not connect to Ableton. Make sure the Remote Script is running.") return _ableton_connection # Core Tool endpoints @mcp.tool() def get_session_info(ctx: Context) -> str: """Get detailed information about the current Ableton session""" try: ableton = get_ableton_connection() result = ableton.send_command("get_session_info") return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting session info from Ableton: {str(e)}") return f"Error getting session info: {str(e)}" @mcp.tool() def get_track_info(ctx: Context, track_index: int) -> str: """ Get detailed information about a specific track in Ableton. Parameters: - track_index: The index of the track to get information about """ try: ableton = get_ableton_connection() result = ableton.send_command("get_track_info", {"track_index": track_index}) return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting track info from Ableton: {str(e)}") return f"Error getting track info: {str(e)}" @mcp.tool() def create_midi_track(ctx: Context, index: int = -1) -> str: """ Create a new MIDI track in the Ableton session. Parameters: - index: The index to insert the track at (-1 = end of list) """ try: ableton = get_ableton_connection() result = ableton.send_command("create_midi_track", {"index": index}) return f"Created new MIDI track: {result.get('name', 'unknown')}" except Exception as e: logger.error(f"Error creating MIDI track: {str(e)}") return f"Error creating MIDI track: {str(e)}" @mcp.tool() def set_track_name(ctx: Context, track_index: int, name: str) -> str: """ Set the name of a track. Parameters: - track_index: The index of the track to rename - name: The new name for the track """ try: ableton = get_ableton_connection() result = ableton.send_command("set_track_name", {"track_index": track_index, "name": name}) return f"Renamed track to: {result.get('name', name)}" except Exception as e: logger.error(f"Error setting track name: {str(e)}") return f"Error setting track name: {str(e)}" @mcp.tool() def create_clip(ctx: Context, track_index: int, clip_index: int, length: float = 4.0) -> str: """ Create a new MIDI clip in the specified track and clip slot. Parameters: - track_index: The index of the track to create the clip in - clip_index: The index of the clip slot to create the clip in - length: The length of the clip in beats (default: 4.0) """ try: ableton = get_ableton_connection() result = ableton.send_command("create_clip", { "track_index": track_index, "clip_index": clip_index, "length": length }) return f"Created new clip at track {track_index}, slot {clip_index} with length {length} beats" except Exception as e: logger.error(f"Error creating clip: {str(e)}") return f"Error creating clip: {str(e)}" @mcp.tool() def add_notes_to_clip( ctx: Context, track_index: int, clip_index: int, notes: List[Dict[str, Union[int, float, bool]]] ) -> str: """ Add MIDI notes to a clip. Parameters: - track_index: The index of the track containing the clip - clip_index: The index of the clip slot containing the clip - notes: List of note dictionaries, each with pitch, start_time, duration, velocity, and mute """ try: ableton = get_ableton_connection() result = ableton.send_command("add_notes_to_clip", { "track_index": track_index, "clip_index": clip_index, "notes": notes }) return f"Added {len(notes)} notes to clip at track {track_index}, slot {clip_index}" except Exception as e: logger.error(f"Error adding notes to clip: {str(e)}") return f"Error adding notes to clip: {str(e)}" @mcp.tool() def set_clip_name(ctx: Context, track_index: int, clip_index: int, name: str) -> str: """ Set the name of a clip. Parameters: - track_index: The index of the track containing the clip - clip_index: The index of the clip slot containing the clip - name: The new name for the clip """ try: ableton = get_ableton_connection() result = ableton.send_command("set_clip_name", { "track_index": track_index, "clip_index": clip_index, "name": name }) return f"Renamed clip at track {track_index}, slot {clip_index} to '{name}'" except Exception as e: logger.error(f"Error setting clip name: {str(e)}") return f"Error setting clip name: {str(e)}" @mcp.tool() def set_tempo(ctx: Context, tempo: float) -> str: """ Set the tempo of the Ableton session. Parameters: - tempo: The new tempo in BPM """ try: ableton = get_ableton_connection() result = ableton.send_command("set_tempo", {"tempo": tempo}) return f"Set tempo to {tempo} BPM" except Exception as e: logger.error(f"Error setting tempo: {str(e)}") return f"Error setting tempo: {str(e)}" @mcp.tool() def load_instrument_or_effect(ctx: Context, track_index: int, uri: str) -> str: """ Load an instrument or effect onto a track using its URI. Parameters: - track_index: The index of the track to load the instrument on - uri: The URI of the instrument or effect to load (e.g., 'query:Synths#Instrument%20Rack:Bass:FileId_5116') """ try: ableton = get_ableton_connection() result = ableton.send_command("load_browser_item", { "track_index": track_index, "item_uri": uri }) # Check if the instrument was loaded successfully if result.get("loaded", False): new_devices = result.get("new_devices", []) if new_devices: return f"Loaded instrument with URI '{uri}' on track {track_index}. New devices: {', '.join(new_devices)}" else: devices = result.get("devices_after", []) return f"Loaded instrument with URI '{uri}' on track {track_index}. Devices on track: {', '.join(devices)}" else: return f"Failed to load instrument with URI '{uri}'" except Exception as e: logger.error(f"Error loading instrument by URI: {str(e)}") return f"Error loading instrument by URI: {str(e)}" @mcp.tool() def fire_clip(ctx: Context, track_index: int, clip_index: int) -> str: """ Start playing a clip. Parameters: - track_index: The index of the track containing the clip - clip_index: The index of the clip slot containing the clip """ try: ableton = get_ableton_connection() result = ableton.send_command("fire_clip", { "track_index": track_index, "clip_index": clip_index }) return f"Started playing clip at track {track_index}, slot {clip_index}" except Exception as e: logger.error(f"Error firing clip: {str(e)}") return f"Error firing clip: {str(e)}" @mcp.tool() def stop_clip(ctx: Context, track_index: int, clip_index: int) -> str: """ Stop playing a clip. Parameters: - track_index: The index of the track containing the clip - clip_index: The index of the clip slot containing the clip """ try: ableton = get_ableton_connection() result = ableton.send_command("stop_clip", { "track_index": track_index, "clip_index": clip_index }) return f"Stopped clip at track {track_index}, slot {clip_index}" except Exception as e: logger.error(f"Error stopping clip: {str(e)}") return f"Error stopping clip: {str(e)}" @mcp.tool() def start_playback(ctx: Context) -> str: """Start playing the Ableton session.""" try: ableton = get_ableton_connection() result = ableton.send_command("start_playback") return "Started playback" except Exception as e: logger.error(f"Error starting playback: {str(e)}") return f"Error starting playback: {str(e)}" @mcp.tool() def stop_playback(ctx: Context) -> str: """Stop playing the Ableton session.""" try: ableton = get_ableton_connection() result = ableton.send_command("stop_playback") return "Stopped playback" except Exception as e: logger.error(f"Error stopping playback: {str(e)}") return f"Error stopping playback: {str(e)}" @mcp.tool() def get_browser_tree(ctx: Context, category_type: str = "all") -> str: """ Get a hierarchical tree of browser categories from Ableton. Parameters: - category_type: Type of categories to get ('all', 'instruments', 'sounds', 'drums', 'audio_effects', 'midi_effects') """ try: ableton = get_ableton_connection() result = ableton.send_command("get_browser_tree", { "category_type": category_type }) # Check if we got any categories if "available_categories" in result and len(result.get("categories", [])) == 0: available_cats = result.get("available_categories", []) return (f"No categories found for '{category_type}'. " f"Available browser categories: {', '.join(available_cats)}") # Format the tree in a more readable way total_folders = result.get("total_folders", 0) formatted_output = f"Browser tree for '{category_type}' (showing {total_folders} folders):\n\n" def format_tree(item, indent=0): output = "" if item: prefix = " " * indent name = item.get("name", "Unknown") path = item.get("path", "") has_more = item.get("has_more", False) # Add this item output += f"{prefix}• {name}" if path: output += f" (path: {path})" if has_more: output += " [...]" output += "\n" # Add children for child in item.get("children", []): output += format_tree(child, indent + 1) return output # Format each category for category in result.get("categories", []): formatted_output += format_tree(category) formatted_output += "\n" return formatted_output except Exception as e: error_msg = str(e) if "Browser is not available" in error_msg: logger.error(f"Browser is not available in Ableton: {error_msg}") return f"Error: The Ableton browser is not available. Make sure Ableton Live is fully loaded and try again." elif "Could not access Live application" in error_msg: logger.error(f"Could not access Live application: {error_msg}") return f"Error: Could not access the Ableton Live application. Make sure Ableton Live is running and the Remote Script is loaded." else: logger.error(f"Error getting browser tree: {error_msg}") return f"Error getting browser tree: {error_msg}" @mcp.tool() def get_browser_items_at_path(ctx: Context, path: str) -> str: """ Get browser items at a specific path in Ableton's browser. Parameters: - path: Path in the format "category/folder/subfolder" where category is one of the available browser categories in Ableton """ try: ableton = get_ableton_connection() result = ableton.send_command("get_browser_items_at_path", { "path": path }) # Check if there was an error with available categories if "error" in result and "available_categories" in result: error = result.get("error", "") available_cats = result.get("available_categories", []) return (f"Error: {error}\n" f"Available browser categories: {', '.join(available_cats)}") return json.dumps(result, indent=2) except Exception as e: error_msg = str(e) if "Browser is not available" in error_msg: logger.error(f"Browser is not available in Ableton: {error_msg}") return f"Error: The Ableton browser is not available. Make sure Ableton Live is fully loaded and try again." elif "Could not access Live application" in error_msg: logger.error(f"Could not access Live application: {error_msg}") return f"Error: Could not access the Ableton Live application. Make sure Ableton Live is running and the Remote Script is loaded." elif "Unknown or unavailable category" in error_msg: logger.error(f"Invalid browser category: {error_msg}") return f"Error: {error_msg}. Please check the available categories using get_browser_tree." elif "Path part" in error_msg and "not found" in error_msg: logger.error(f"Path not found: {error_msg}") return f"Error: {error_msg}. Please check the path and try again." else: logger.error(f"Error getting browser items at path: {error_msg}") return f"Error getting browser items at path: {error_msg}" @mcp.tool() def load_drum_kit(ctx: Context, track_index: int, rack_uri: str, kit_path: str) -> str: """ Load a drum rack and then load a specific drum kit into it. Parameters: - track_index: The index of the track to load on - rack_uri: The URI of the drum rack to load (e.g., 'Drums/Drum Rack') - kit_path: Path to the drum kit inside the browser (e.g., 'drums/acoustic/kit1') """ try: ableton = get_ableton_connection() # Step 1: Load the drum rack result = ableton.send_command("load_browser_item", { "track_index": track_index, "item_uri": rack_uri }) if not result.get("loaded", False): return f"Failed to load drum rack with URI '{rack_uri}'" # Step 2: Get the drum kit items at the specified path kit_result = ableton.send_command("get_browser_items_at_path", { "path": kit_path }) if "error" in kit_result: return f"Loaded drum rack but failed to find drum kit: {kit_result.get('error')}" # Step 3: Find a loadable drum kit kit_items = kit_result.get("items", []) loadable_kits = [item for item in kit_items if item.get("is_loadable", False)] if not loadable_kits: return f"Loaded drum rack but no loadable drum kits found at '{kit_path}'" # Step 4: Load the first loadable kit kit_uri = loadable_kits[0].get("uri") load_result = ableton.send_command("load_browser_item", { "track_index": track_index, "item_uri": kit_uri }) return f"Loaded drum rack and kit '{loadable_kits[0].get('name')}' on track {track_index}" except Exception as e: logger.error(f"Error loading drum kit: {str(e)}") return f"Error loading drum kit: {str(e)}" # Main execution def main(): """Run the MCP server""" mcp.run() if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /AbletonMCP_Remote_Script/__init__.py: -------------------------------------------------------------------------------- ```python # AbletonMCP/init.py from __future__ import absolute_import, print_function, unicode_literals from _Framework.ControlSurface import ControlSurface import socket import json import threading import time import traceback # Change queue import for Python 2 try: import Queue as queue # Python 2 except ImportError: import queue # Python 3 # Constants for socket communication DEFAULT_PORT = 9877 HOST = "localhost" def create_instance(c_instance): """Create and return the AbletonMCP script instance""" return AbletonMCP(c_instance) class AbletonMCP(ControlSurface): """AbletonMCP Remote Script for Ableton Live""" def __init__(self, c_instance): """Initialize the control surface""" ControlSurface.__init__(self, c_instance) self.log_message("AbletonMCP Remote Script initializing...") # Socket server for communication self.server = None self.client_threads = [] self.server_thread = None self.running = False # Cache the song reference for easier access self._song = self.song() # Start the socket server self.start_server() self.log_message("AbletonMCP initialized") # Show a message in Ableton self.show_message("AbletonMCP: Listening for commands on port " + str(DEFAULT_PORT)) def disconnect(self): """Called when Ableton closes or the control surface is removed""" self.log_message("AbletonMCP disconnecting...") self.running = False # Stop the server if self.server: try: self.server.close() except: pass # Wait for the server thread to exit if self.server_thread and self.server_thread.is_alive(): self.server_thread.join(1.0) # Clean up any client threads for client_thread in self.client_threads[:]: if client_thread.is_alive(): # We don't join them as they might be stuck self.log_message("Client thread still alive during disconnect") ControlSurface.disconnect(self) self.log_message("AbletonMCP disconnected") def start_server(self): """Start the socket server in a separate thread""" try: self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((HOST, DEFAULT_PORT)) self.server.listen(5) # Allow up to 5 pending connections self.running = True self.server_thread = threading.Thread(target=self._server_thread) self.server_thread.daemon = True self.server_thread.start() self.log_message("Server started on port " + str(DEFAULT_PORT)) except Exception as e: self.log_message("Error starting server: " + str(e)) self.show_message("AbletonMCP: Error starting server - " + str(e)) def _server_thread(self): """Server thread implementation - handles client connections""" try: self.log_message("Server thread started") # Set a timeout to allow regular checking of running flag self.server.settimeout(1.0) while self.running: try: # Accept connections with timeout client, address = self.server.accept() self.log_message("Connection accepted from " + str(address)) self.show_message("AbletonMCP: Client connected") # Handle client in a separate thread client_thread = threading.Thread( target=self._handle_client, args=(client,) ) client_thread.daemon = True client_thread.start() # Keep track of client threads self.client_threads.append(client_thread) # Clean up finished client threads self.client_threads = [t for t in self.client_threads if t.is_alive()] except socket.timeout: # No connection yet, just continue continue except Exception as e: if self.running: # Only log if still running self.log_message("Server accept error: " + str(e)) time.sleep(0.5) self.log_message("Server thread stopped") except Exception as e: self.log_message("Server thread error: " + str(e)) def _handle_client(self, client): """Handle communication with a connected client""" self.log_message("Client handler started") client.settimeout(None) # No timeout for client socket buffer = '' # Changed from b'' to '' for Python 2 try: while self.running: try: # Receive data data = client.recv(8192) if not data: # Client disconnected self.log_message("Client disconnected") break # Accumulate data in buffer with explicit encoding/decoding try: # Python 3: data is bytes, decode to string buffer += data.decode('utf-8') except AttributeError: # Python 2: data is already string buffer += data try: # Try to parse command from buffer command = json.loads(buffer) # Removed decode('utf-8') buffer = '' # Clear buffer after successful parse self.log_message("Received command: " + str(command.get("type", "unknown"))) # Process the command and get response response = self._process_command(command) # Send the response with explicit encoding try: # Python 3: encode string to bytes client.sendall(json.dumps(response).encode('utf-8')) except AttributeError: # Python 2: string is already bytes client.sendall(json.dumps(response)) except ValueError: # Incomplete data, wait for more continue except Exception as e: self.log_message("Error handling client data: " + str(e)) self.log_message(traceback.format_exc()) # Send error response if possible error_response = { "status": "error", "message": str(e) } try: # Python 3: encode string to bytes client.sendall(json.dumps(error_response).encode('utf-8')) except AttributeError: # Python 2: string is already bytes client.sendall(json.dumps(error_response)) except: # If we can't send the error, the connection is probably dead break # For serious errors, break the loop if not isinstance(e, ValueError): break except Exception as e: self.log_message("Error in client handler: " + str(e)) finally: try: client.close() except: pass self.log_message("Client handler stopped") def _process_command(self, command): """Process a command from the client and return a response""" command_type = command.get("type", "") params = command.get("params", {}) # Initialize response response = { "status": "success", "result": {} } try: # Route the command to the appropriate handler if command_type == "get_session_info": response["result"] = self._get_session_info() elif command_type == "get_track_info": track_index = params.get("track_index", 0) response["result"] = self._get_track_info(track_index) # Commands that modify Live's state should be scheduled on the main thread elif command_type in ["create_midi_track", "set_track_name", "create_clip", "add_notes_to_clip", "set_clip_name", "set_tempo", "fire_clip", "stop_clip", "start_playback", "stop_playback", "load_browser_item"]: # Use a thread-safe approach with a response queue response_queue = queue.Queue() # Define a function to execute on the main thread def main_thread_task(): try: result = None if command_type == "create_midi_track": index = params.get("index", -1) result = self._create_midi_track(index) elif command_type == "set_track_name": track_index = params.get("track_index", 0) name = params.get("name", "") result = self._set_track_name(track_index, name) elif command_type == "create_clip": track_index = params.get("track_index", 0) clip_index = params.get("clip_index", 0) length = params.get("length", 4.0) result = self._create_clip(track_index, clip_index, length) elif command_type == "add_notes_to_clip": track_index = params.get("track_index", 0) clip_index = params.get("clip_index", 0) notes = params.get("notes", []) result = self._add_notes_to_clip(track_index, clip_index, notes) elif command_type == "set_clip_name": track_index = params.get("track_index", 0) clip_index = params.get("clip_index", 0) name = params.get("name", "") result = self._set_clip_name(track_index, clip_index, name) elif command_type == "set_tempo": tempo = params.get("tempo", 120.0) result = self._set_tempo(tempo) elif command_type == "fire_clip": track_index = params.get("track_index", 0) clip_index = params.get("clip_index", 0) result = self._fire_clip(track_index, clip_index) elif command_type == "stop_clip": track_index = params.get("track_index", 0) clip_index = params.get("clip_index", 0) result = self._stop_clip(track_index, clip_index) elif command_type == "start_playback": result = self._start_playback() elif command_type == "stop_playback": result = self._stop_playback() elif command_type == "load_instrument_or_effect": track_index = params.get("track_index", 0) uri = params.get("uri", "") result = self._load_instrument_or_effect(track_index, uri) elif command_type == "load_browser_item": track_index = params.get("track_index", 0) item_uri = params.get("item_uri", "") result = self._load_browser_item(track_index, item_uri) # Put the result in the queue response_queue.put({"status": "success", "result": result}) except Exception as e: self.log_message("Error in main thread task: " + str(e)) self.log_message(traceback.format_exc()) response_queue.put({"status": "error", "message": str(e)}) # Schedule the task to run on the main thread try: self.schedule_message(0, main_thread_task) except AssertionError: # If we're already on the main thread, execute directly main_thread_task() # Wait for the response with a timeout try: task_response = response_queue.get(timeout=10.0) if task_response.get("status") == "error": response["status"] = "error" response["message"] = task_response.get("message", "Unknown error") else: response["result"] = task_response.get("result", {}) except queue.Empty: response["status"] = "error" response["message"] = "Timeout waiting for operation to complete" elif command_type == "get_browser_item": uri = params.get("uri", None) path = params.get("path", None) response["result"] = self._get_browser_item(uri, path) elif command_type == "get_browser_categories": category_type = params.get("category_type", "all") response["result"] = self._get_browser_categories(category_type) elif command_type == "get_browser_items": path = params.get("path", "") item_type = params.get("item_type", "all") response["result"] = self._get_browser_items(path, item_type) # Add the new browser commands elif command_type == "get_browser_tree": category_type = params.get("category_type", "all") response["result"] = self.get_browser_tree(category_type) elif command_type == "get_browser_items_at_path": path = params.get("path", "") response["result"] = self.get_browser_items_at_path(path) else: response["status"] = "error" response["message"] = "Unknown command: " + command_type except Exception as e: self.log_message("Error processing command: " + str(e)) self.log_message(traceback.format_exc()) response["status"] = "error" response["message"] = str(e) return response # Command implementations def _get_session_info(self): """Get information about the current session""" try: result = { "tempo": self._song.tempo, "signature_numerator": self._song.signature_numerator, "signature_denominator": self._song.signature_denominator, "track_count": len(self._song.tracks), "return_track_count": len(self._song.return_tracks), "master_track": { "name": "Master", "volume": self._song.master_track.mixer_device.volume.value, "panning": self._song.master_track.mixer_device.panning.value } } return result except Exception as e: self.log_message("Error getting session info: " + str(e)) raise def _get_track_info(self, track_index): """Get information about a track""" try: if track_index < 0 or track_index >= len(self._song.tracks): raise IndexError("Track index out of range") track = self._song.tracks[track_index] # Get clip slots clip_slots = [] for slot_index, slot in enumerate(track.clip_slots): clip_info = None if slot.has_clip: clip = slot.clip clip_info = { "name": clip.name, "length": clip.length, "is_playing": clip.is_playing, "is_recording": clip.is_recording } clip_slots.append({ "index": slot_index, "has_clip": slot.has_clip, "clip": clip_info }) # Get devices devices = [] for device_index, device in enumerate(track.devices): devices.append({ "index": device_index, "name": device.name, "class_name": device.class_name, "type": self._get_device_type(device) }) result = { "index": track_index, "name": track.name, "is_audio_track": track.has_audio_input, "is_midi_track": track.has_midi_input, "mute": track.mute, "solo": track.solo, "arm": track.arm, "volume": track.mixer_device.volume.value, "panning": track.mixer_device.panning.value, "clip_slots": clip_slots, "devices": devices } return result except Exception as e: self.log_message("Error getting track info: " + str(e)) raise def _create_midi_track(self, index): """Create a new MIDI track at the specified index""" try: # Create the track self._song.create_midi_track(index) # Get the new track new_track_index = len(self._song.tracks) - 1 if index == -1 else index new_track = self._song.tracks[new_track_index] result = { "index": new_track_index, "name": new_track.name } return result except Exception as e: self.log_message("Error creating MIDI track: " + str(e)) raise def _set_track_name(self, track_index, name): """Set the name of a track""" try: if track_index < 0 or track_index >= len(self._song.tracks): raise IndexError("Track index out of range") # Set the name track = self._song.tracks[track_index] track.name = name result = { "name": track.name } return result except Exception as e: self.log_message("Error setting track name: " + str(e)) raise def _create_clip(self, track_index, clip_index, length): """Create a new MIDI clip in the specified track and clip slot""" try: if track_index < 0 or track_index >= len(self._song.tracks): raise IndexError("Track index out of range") track = self._song.tracks[track_index] if clip_index < 0 or clip_index >= len(track.clip_slots): raise IndexError("Clip index out of range") clip_slot = track.clip_slots[clip_index] # Check if the clip slot already has a clip if clip_slot.has_clip: raise Exception("Clip slot already has a clip") # Create the clip clip_slot.create_clip(length) result = { "name": clip_slot.clip.name, "length": clip_slot.clip.length } return result except Exception as e: self.log_message("Error creating clip: " + str(e)) raise def _add_notes_to_clip(self, track_index, clip_index, notes): """Add MIDI notes to a clip""" try: if track_index < 0 or track_index >= len(self._song.tracks): raise IndexError("Track index out of range") track = self._song.tracks[track_index] if clip_index < 0 or clip_index >= len(track.clip_slots): raise IndexError("Clip index out of range") clip_slot = track.clip_slots[clip_index] if not clip_slot.has_clip: raise Exception("No clip in slot") clip = clip_slot.clip # Convert note data to Live's format live_notes = [] for note in notes: pitch = note.get("pitch", 60) start_time = note.get("start_time", 0.0) duration = note.get("duration", 0.25) velocity = note.get("velocity", 100) mute = note.get("mute", False) live_notes.append((pitch, start_time, duration, velocity, mute)) # Add the notes clip.set_notes(tuple(live_notes)) result = { "note_count": len(notes) } return result except Exception as e: self.log_message("Error adding notes to clip: " + str(e)) raise def _set_clip_name(self, track_index, clip_index, name): """Set the name of a clip""" try: if track_index < 0 or track_index >= len(self._song.tracks): raise IndexError("Track index out of range") track = self._song.tracks[track_index] if clip_index < 0 or clip_index >= len(track.clip_slots): raise IndexError("Clip index out of range") clip_slot = track.clip_slots[clip_index] if not clip_slot.has_clip: raise Exception("No clip in slot") clip = clip_slot.clip clip.name = name result = { "name": clip.name } return result except Exception as e: self.log_message("Error setting clip name: " + str(e)) raise def _set_tempo(self, tempo): """Set the tempo of the session""" try: self._song.tempo = tempo result = { "tempo": self._song.tempo } return result except Exception as e: self.log_message("Error setting tempo: " + str(e)) raise def _fire_clip(self, track_index, clip_index): """Fire a clip""" try: if track_index < 0 or track_index >= len(self._song.tracks): raise IndexError("Track index out of range") track = self._song.tracks[track_index] if clip_index < 0 or clip_index >= len(track.clip_slots): raise IndexError("Clip index out of range") clip_slot = track.clip_slots[clip_index] if not clip_slot.has_clip: raise Exception("No clip in slot") clip_slot.fire() result = { "fired": True } return result except Exception as e: self.log_message("Error firing clip: " + str(e)) raise def _stop_clip(self, track_index, clip_index): """Stop a clip""" try: if track_index < 0 or track_index >= len(self._song.tracks): raise IndexError("Track index out of range") track = self._song.tracks[track_index] if clip_index < 0 or clip_index >= len(track.clip_slots): raise IndexError("Clip index out of range") clip_slot = track.clip_slots[clip_index] clip_slot.stop() result = { "stopped": True } return result except Exception as e: self.log_message("Error stopping clip: " + str(e)) raise def _start_playback(self): """Start playing the session""" try: self._song.start_playing() result = { "playing": self._song.is_playing } return result except Exception as e: self.log_message("Error starting playback: " + str(e)) raise def _stop_playback(self): """Stop playing the session""" try: self._song.stop_playing() result = { "playing": self._song.is_playing } return result except Exception as e: self.log_message("Error stopping playback: " + str(e)) raise def _get_browser_item(self, uri, path): """Get a browser item by URI or path""" try: # Access the application's browser instance instead of creating a new one app = self.application() if not app: raise RuntimeError("Could not access Live application") result = { "uri": uri, "path": path, "found": False } # Try to find by URI first if provided if uri: item = self._find_browser_item_by_uri(app.browser, uri) if item: result["found"] = True result["item"] = { "name": item.name, "is_folder": item.is_folder, "is_device": item.is_device, "is_loadable": item.is_loadable, "uri": item.uri } return result # If URI not provided or not found, try by path if path: # Parse the path and navigate to the specified item path_parts = path.split("/") # Determine the root based on the first part current_item = None if path_parts[0].lower() == "nstruments": current_item = app.browser.instruments elif path_parts[0].lower() == "sounds": current_item = app.browser.sounds elif path_parts[0].lower() == "drums": current_item = app.browser.drums elif path_parts[0].lower() == "audio_effects": current_item = app.browser.audio_effects elif path_parts[0].lower() == "midi_effects": current_item = app.browser.midi_effects else: # Default to instruments if not specified current_item = app.browser.instruments # Don't skip the first part in this case path_parts = ["instruments"] + path_parts # Navigate through the path for i in range(1, len(path_parts)): part = path_parts[i] if not part: # Skip empty parts continue found = False for child in current_item.children: if child.name.lower() == part.lower(): current_item = child found = True break if not found: result["error"] = "Path part '{0}' not found".format(part) return result # Found the item result["found"] = True result["item"] = { "name": current_item.name, "is_folder": current_item.is_folder, "is_device": current_item.is_device, "is_loadable": current_item.is_loadable, "uri": current_item.uri } return result except Exception as e: self.log_message("Error getting browser item: " + str(e)) self.log_message(traceback.format_exc()) raise def _load_browser_item(self, track_index, item_uri): """Load a browser item onto a track by its URI""" try: if track_index < 0 or track_index >= len(self._song.tracks): raise IndexError("Track index out of range") track = self._song.tracks[track_index] # Access the application's browser instance instead of creating a new one app = self.application() # Find the browser item by URI item = self._find_browser_item_by_uri(app.browser, item_uri) if not item: raise ValueError("Browser item with URI '{0}' not found".format(item_uri)) # Select the track self._song.view.selected_track = track # Load the item app.browser.load_item(item) result = { "loaded": True, "item_name": item.name, "track_name": track.name, "uri": item_uri } return result except Exception as e: self.log_message("Error loading browser item: {0}".format(str(e))) self.log_message(traceback.format_exc()) raise def _find_browser_item_by_uri(self, browser_or_item, uri, max_depth=10, current_depth=0): """Find a browser item by its URI""" try: # Check if this is the item we're looking for if hasattr(browser_or_item, 'uri') and browser_or_item.uri == uri: return browser_or_item # Stop recursion if we've reached max depth if current_depth >= max_depth: return None # Check if this is a browser with root categories if hasattr(browser_or_item, 'instruments'): # Check all main categories categories = [ browser_or_item.instruments, browser_or_item.sounds, browser_or_item.drums, browser_or_item.audio_effects, browser_or_item.midi_effects ] for category in categories: item = self._find_browser_item_by_uri(category, uri, max_depth, current_depth + 1) if item: return item return None # Check if this item has children if hasattr(browser_or_item, 'children') and browser_or_item.children: for child in browser_or_item.children: item = self._find_browser_item_by_uri(child, uri, max_depth, current_depth + 1) if item: return item return None except Exception as e: self.log_message("Error finding browser item by URI: {0}".format(str(e))) return None # Helper methods def _get_device_type(self, device): """Get the type of a device""" try: # Simple heuristic - in a real implementation you'd look at the device class if device.can_have_drum_pads: return "drum_machine" elif device.can_have_chains: return "rack" elif "instrument" in device.class_display_name.lower(): return "instrument" elif "audio_effect" in device.class_name.lower(): return "audio_effect" elif "midi_effect" in device.class_name.lower(): return "midi_effect" else: return "unknown" except: return "unknown" def get_browser_tree(self, category_type="all"): """ Get a simplified tree of browser categories. Args: category_type: Type of categories to get ('all', 'instruments', 'sounds', etc.) Returns: Dictionary with the browser tree structure """ try: # Access the application's browser instance instead of creating a new one app = self.application() if not app: raise RuntimeError("Could not access Live application") # Check if browser is available if not hasattr(app, 'browser') or app.browser is None: raise RuntimeError("Browser is not available in the Live application") # Log available browser attributes to help diagnose issues browser_attrs = [attr for attr in dir(app.browser) if not attr.startswith('_')] self.log_message("Available browser attributes: {0}".format(browser_attrs)) result = { "type": category_type, "categories": [], "available_categories": browser_attrs } # Helper function to process a browser item and its children def process_item(item, depth=0): if not item: return None result = { "name": item.name if hasattr(item, 'name') else "Unknown", "is_folder": hasattr(item, 'children') and bool(item.children), "is_device": hasattr(item, 'is_device') and item.is_device, "is_loadable": hasattr(item, 'is_loadable') and item.is_loadable, "uri": item.uri if hasattr(item, 'uri') else None, "children": [] } return result # Process based on category type and available attributes if (category_type == "all" or category_type == "instruments") and hasattr(app.browser, 'instruments'): try: instruments = process_item(app.browser.instruments) if instruments: instruments["name"] = "Instruments" # Ensure consistent naming result["categories"].append(instruments) except Exception as e: self.log_message("Error processing instruments: {0}".format(str(e))) if (category_type == "all" or category_type == "sounds") and hasattr(app.browser, 'sounds'): try: sounds = process_item(app.browser.sounds) if sounds: sounds["name"] = "Sounds" # Ensure consistent naming result["categories"].append(sounds) except Exception as e: self.log_message("Error processing sounds: {0}".format(str(e))) if (category_type == "all" or category_type == "drums") and hasattr(app.browser, 'drums'): try: drums = process_item(app.browser.drums) if drums: drums["name"] = "Drums" # Ensure consistent naming result["categories"].append(drums) except Exception as e: self.log_message("Error processing drums: {0}".format(str(e))) if (category_type == "all" or category_type == "audio_effects") and hasattr(app.browser, 'audio_effects'): try: audio_effects = process_item(app.browser.audio_effects) if audio_effects: audio_effects["name"] = "Audio Effects" # Ensure consistent naming result["categories"].append(audio_effects) except Exception as e: self.log_message("Error processing audio_effects: {0}".format(str(e))) if (category_type == "all" or category_type == "midi_effects") and hasattr(app.browser, 'midi_effects'): try: midi_effects = process_item(app.browser.midi_effects) if midi_effects: midi_effects["name"] = "MIDI Effects" result["categories"].append(midi_effects) except Exception as e: self.log_message("Error processing midi_effects: {0}".format(str(e))) # Try to process other potentially available categories for attr in browser_attrs: if attr not in ['instruments', 'sounds', 'drums', 'audio_effects', 'midi_effects'] and \ (category_type == "all" or category_type == attr): try: item = getattr(app.browser, attr) if hasattr(item, 'children') or hasattr(item, 'name'): category = process_item(item) if category: category["name"] = attr.capitalize() result["categories"].append(category) except Exception as e: self.log_message("Error processing {0}: {1}".format(attr, str(e))) self.log_message("Browser tree generated for {0} with {1} root categories".format( category_type, len(result['categories']))) return result except Exception as e: self.log_message("Error getting browser tree: {0}".format(str(e))) self.log_message(traceback.format_exc()) raise def get_browser_items_at_path(self, path): """ Get browser items at a specific path. Args: path: Path in the format "category/folder/subfolder" where category is one of: instruments, sounds, drums, audio_effects, midi_effects or any other available browser category Returns: Dictionary with items at the specified path """ try: # Access the application's browser instance instead of creating a new one app = self.application() if not app: raise RuntimeError("Could not access Live application") # Check if browser is available if not hasattr(app, 'browser') or app.browser is None: raise RuntimeError("Browser is not available in the Live application") # Log available browser attributes to help diagnose issues browser_attrs = [attr for attr in dir(app.browser) if not attr.startswith('_')] self.log_message("Available browser attributes: {0}".format(browser_attrs)) # Parse the path path_parts = path.split("/") if not path_parts: raise ValueError("Invalid path") # Determine the root category root_category = path_parts[0].lower() current_item = None # Check standard categories first if root_category == "instruments" and hasattr(app.browser, 'instruments'): current_item = app.browser.instruments elif root_category == "sounds" and hasattr(app.browser, 'sounds'): current_item = app.browser.sounds elif root_category == "drums" and hasattr(app.browser, 'drums'): current_item = app.browser.drums elif root_category == "audio_effects" and hasattr(app.browser, 'audio_effects'): current_item = app.browser.audio_effects elif root_category == "midi_effects" and hasattr(app.browser, 'midi_effects'): current_item = app.browser.midi_effects else: # Try to find the category in other browser attributes found = False for attr in browser_attrs: if attr.lower() == root_category: try: current_item = getattr(app.browser, attr) found = True break except Exception as e: self.log_message("Error accessing browser attribute {0}: {1}".format(attr, str(e))) if not found: # If we still haven't found the category, return available categories return { "path": path, "error": "Unknown or unavailable category: {0}".format(root_category), "available_categories": browser_attrs, "items": [] } # Navigate through the path for i in range(1, len(path_parts)): part = path_parts[i] if not part: # Skip empty parts continue if not hasattr(current_item, 'children'): return { "path": path, "error": "Item at '{0}' has no children".format('/'.join(path_parts[:i])), "items": [] } found = False for child in current_item.children: if hasattr(child, 'name') and child.name.lower() == part.lower(): current_item = child found = True break if not found: return { "path": path, "error": "Path part '{0}' not found".format(part), "items": [] } # Get items at the current path items = [] if hasattr(current_item, 'children'): for child in current_item.children: item_info = { "name": child.name if hasattr(child, 'name') else "Unknown", "is_folder": hasattr(child, 'children') and bool(child.children), "is_device": hasattr(child, 'is_device') and child.is_device, "is_loadable": hasattr(child, 'is_loadable') and child.is_loadable, "uri": child.uri if hasattr(child, 'uri') else None } items.append(item_info) result = { "path": path, "name": current_item.name if hasattr(current_item, 'name') else "Unknown", "uri": current_item.uri if hasattr(current_item, 'uri') else None, "is_folder": hasattr(current_item, 'children') and bool(current_item.children), "is_device": hasattr(current_item, 'is_device') and current_item.is_device, "is_loadable": hasattr(current_item, 'is_loadable') and current_item.is_loadable, "items": items } self.log_message("Retrieved {0} items at path: {1}".format(len(items), path)) return result except Exception as e: self.log_message("Error getting browser items at path: {0}".format(str(e))) self.log_message(traceback.format_exc()) raise ```