# Directory Structure
```
├── .gitignore
├── .python-version
├── AbletonMCP_Remote_Script
│ └── __init__.py
├── Dockerfile
├── LICENSE
├── MCP_Server
│ ├── __init__.py
│ └── server.py
├── pyproject.toml
├── README.md
├── smithery.yaml
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.13
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
.DS_Store
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# AbletonMCP - Ableton Live Model Context Protocol Integration
[](https://smithery.ai/server/@ahujasid/ableton-mcp)
AbletonMCP connects Ableton Live to Claude AI through the Model Context Protocol (MCP), allowing Claude to directly interact with and control Ableton Live. This integration enables prompt-assisted music production, track creation, and Live session manipulation.
### Join the Community
Give feedback, get inspired, and build on top of the MCP: [Discord](https://discord.gg/3ZrMyGKnaU). Made by [Siddharth](https://x.com/sidahuj)
## Features
- **Two-way communication**: Connect Claude AI to Ableton Live through a socket-based server
- **Track manipulation**: Create, modify, and manipulate MIDI and audio tracks
- **Instrument and effect selection**: Claude can access and load the right instruments, effects and sounds from Ableton's library
- **Clip creation**: Create and edit MIDI clips with notes
- **Session control**: Start and stop playback, fire clips, and control transport
## Components
The system consists of two main components:
1. **Ableton Remote Script** (`Ableton_Remote_Script/__init__.py`): A MIDI Remote Script for Ableton Live that creates a socket server to receive and execute commands
2. **MCP Server** (`server.py`): A Python server that implements the Model Context Protocol and connects to the Ableton Remote Script
## Installation
### Installing via Smithery
To install Ableton Live Integration for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@ahujasid/ableton-mcp):
```bash
npx -y @smithery/cli install @ahujasid/ableton-mcp --client claude
```
### Prerequisites
- Ableton Live 10 or newer
- Python 3.8 or newer
- [uv package manager](https://astral.sh/uv)
If you're on Mac, please install uv as:
```
brew install uv
```
Otherwise, install from [uv's official website][https://docs.astral.sh/uv/getting-started/installation/]
⚠️ Do not proceed before installing UV
### Claude for Desktop Integration
[Follow along with the setup instructions video](https://youtu.be/iJWJqyVuPS8)
1. Go to Claude > Settings > Developer > Edit Config > claude_desktop_config.json to include the following:
```json
{
"mcpServers": {
"AbletonMCP": {
"command": "uvx",
"args": [
"ableton-mcp"
]
}
}
}
```
### Cursor Integration
Run ableton-mcp without installing it permanently through uvx. Go to Cursor Settings > MCP and paste this as a command:
```
uvx ableton-mcp
```
⚠️ Only run one instance of the MCP server (either on Cursor or Claude Desktop), not both
### Installing the Ableton Remote Script
[Follow along with the setup instructions video](https://youtu.be/iJWJqyVuPS8)
1. Download the `AbletonMCP_Remote_Script/__init__.py` file from this repo
2. Copy the folder to Ableton's MIDI Remote Scripts directory. Different OS and versions have different locations. **One of these should work, you might have to look**:
**For macOS:**
- Method 1: Go to Applications > Right-click on Ableton Live app → Show Package Contents → Navigate to:
`Contents/App-Resources/MIDI Remote Scripts/`
- Method 2: If it's not there in the first method, use the direct path (replace XX with your version number):
`/Users/[Username]/Library/Preferences/Ableton/Live XX/User Remote Scripts`
**For Windows:**
- Method 1:
C:\Users\[Username]\AppData\Roaming\Ableton\Live x.x.x\Preferences\User Remote Scripts
- Method 2:
`C:\ProgramData\Ableton\Live XX\Resources\MIDI Remote Scripts\`
- Method 3:
`C:\Program Files\Ableton\Live XX\Resources\MIDI Remote Scripts\`
*Note: Replace XX with your Ableton version number (e.g., 10, 11, 12)*
4. Create a folder called 'AbletonMCP' in the Remote Scripts directory and paste the downloaded '\_\_init\_\_.py' file
3. Launch Ableton Live
4. Go to Settings/Preferences → Link, Tempo & MIDI
5. In the Control Surface dropdown, select "AbletonMCP"
6. Set Input and Output to "None"
## Usage
### Starting the Connection
1. Ensure the Ableton Remote Script is loaded in Ableton Live
2. Make sure the MCP server is configured in Claude Desktop or Cursor
3. The connection should be established automatically when you interact with Claude
### Using with Claude
Once the config file has been set on Claude, and the remote script is running in Ableton, you will see a hammer icon with tools for the Ableton MCP.
## Capabilities
- Get session and track information
- Create and modify MIDI and audio tracks
- Create, edit, and trigger clips
- Control playback
- Load instruments and effects from Ableton's browser
- Add notes to MIDI clips
- Change tempo and other session parameters
## Example Commands
Here are some examples of what you can ask Claude to do:
- "Create an 80s synthwave track" [Demo](https://youtu.be/VH9g66e42XA)
- "Create a Metro Boomin style hip-hop beat"
- "Create a new MIDI track with a synth bass instrument"
- "Add reverb to my drums"
- "Create a 4-bar MIDI clip with a simple melody"
- "Get information about the current Ableton session"
- "Load a 808 drum rack into the selected track"
- "Add a jazz chord progression to the clip in track 1"
- "Set the tempo to 120 BPM"
- "Play the clip in track 2"
## Troubleshooting
- **Connection issues**: Make sure the Ableton Remote Script is loaded, and the MCP server is configured on Claude
- **Timeout errors**: Try simplifying your requests or breaking them into smaller steps
- **Have you tried turning it off and on again?**: If you're still having connection errors, try restarting both Claude and Ableton Live
## Technical Details
### Communication Protocol
The system uses a simple JSON-based protocol over TCP sockets:
- Commands are sent as JSON objects with a `type` and optional `params`
- Responses are JSON objects with a `status` and `result` or `message`
### Limitations & Security Considerations
- Creating complex musical arrangements might need to be broken down into smaller steps
- The tool is designed to work with Ableton's default devices and browser items
- Always save your work before extensive experimentation
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
## Disclaimer
This is a third-party integration and not made by Ableton.
```
--------------------------------------------------------------------------------
/MCP_Server/__init__.py:
--------------------------------------------------------------------------------
```python
"""Ableton Live integration through the Model Context Protocol."""
__version__ = "0.1.0"
# Expose key classes and functions for easier imports
from .server import AbletonConnection, get_ableton_connection
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
FROM python:3.10-alpine
# Install build dependencies
RUN apk add --no-cache gcc musl-dev libffi-dev
WORKDIR /app
# Copy project files
COPY . /app
# Install Python dependencies
RUN pip install --no-cache-dir .
# Expose port if server uses it, although MCP might use stdio
# Command to run the MCP server
CMD ["python", "-m", "MCP_Server.server"]
```
--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------
```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml
startCommand:
type: stdio
configSchema:
# JSON Schema defining the configuration options for the MCP.
type: object
properties: {}
commandFunction:
# A JS function that produces the CLI command based on the given config to start the MCP on stdio.
|-
(config) => ({
command: 'python',
args: ['-m', 'MCP_Server.server']
})
exampleConfig: {}
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "ableton-mcp"
version = "1.0.0"
description = "Ableton Live integration through the Model Context Protocol"
readme = "README.md"
requires-python = ">=3.10"
authors = [
{name = "Siddharth Ahuja", email = "[email protected]"}
]
license = {text = "MIT"}
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"mcp[cli]>=1.3.0",
]
[project.scripts]
ableton-mcp = "MCP_Server.server:main"
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
packages = ["MCP_Server"]
[project.urls]
"Homepage" = "https://github.com/ahujasid/ableton-mcp"
"Bug Tracker" = "https://github.com/ahujasid/ableton-mcp/issues"
```
--------------------------------------------------------------------------------
/MCP_Server/server.py:
--------------------------------------------------------------------------------
```python
# ableton_mcp_server.py
from mcp.server.fastmcp import FastMCP, Context
import socket
import json
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List, Union
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("AbletonMCPServer")
@dataclass
class AbletonConnection:
host: str
port: int
sock: socket.socket = None
def connect(self) -> bool:
"""Connect to the Ableton Remote Script socket server"""
if self.sock:
return True
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
logger.info(f"Connected to Ableton at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to Ableton: {str(e)}")
self.sock = None
return False
def disconnect(self):
"""Disconnect from the Ableton Remote Script"""
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error disconnecting from Ableton: {str(e)}")
finally:
self.sock = None
def receive_full_response(self, sock, buffer_size=8192):
"""Receive the complete response, potentially in multiple chunks"""
chunks = []
sock.settimeout(15.0) # Increased timeout for operations that might take longer
try:
while True:
try:
chunk = sock.recv(buffer_size)
if not chunk:
if not chunks:
raise Exception("Connection closed before receiving any data")
break
chunks.append(chunk)
# Check if we've received a complete JSON object
try:
data = b''.join(chunks)
json.loads(data.decode('utf-8'))
logger.info(f"Received complete response ({len(data)} bytes)")
return data
except json.JSONDecodeError:
# Incomplete JSON, continue receiving
continue
except socket.timeout:
logger.warning("Socket timeout during chunked receive")
break
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error during receive: {str(e)}")
raise
except Exception as e:
logger.error(f"Error during receive: {str(e)}")
raise
# If we get here, we either timed out or broke out of the loop
if chunks:
data = b''.join(chunks)
logger.info(f"Returning data after receive completion ({len(data)} bytes)")
try:
json.loads(data.decode('utf-8'))
return data
except json.JSONDecodeError:
raise Exception("Incomplete JSON response received")
else:
raise Exception("No data received")
def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Send a command to Ableton and return the response"""
if not self.sock and not self.connect():
raise ConnectionError("Not connected to Ableton")
command = {
"type": command_type,
"params": params or {}
}
# Check if this is a state-modifying command
is_modifying_command = command_type in [
"create_midi_track", "create_audio_track", "set_track_name",
"create_clip", "add_notes_to_clip", "set_clip_name",
"set_tempo", "fire_clip", "stop_clip", "set_device_parameter",
"start_playback", "stop_playback", "load_instrument_or_effect"
]
try:
logger.info(f"Sending command: {command_type} with params: {params}")
# Send the command
self.sock.sendall(json.dumps(command).encode('utf-8'))
logger.info(f"Command sent, waiting for response...")
# For state-modifying commands, add a small delay to give Ableton time to process
if is_modifying_command:
import time
time.sleep(0.1) # 100ms delay
# Set timeout based on command type
timeout = 15.0 if is_modifying_command else 10.0
self.sock.settimeout(timeout)
# Receive the response
response_data = self.receive_full_response(self.sock)
logger.info(f"Received {len(response_data)} bytes of data")
# Parse the response
response = json.loads(response_data.decode('utf-8'))
logger.info(f"Response parsed, status: {response.get('status', 'unknown')}")
if response.get("status") == "error":
logger.error(f"Ableton error: {response.get('message')}")
raise Exception(response.get("message", "Unknown error from Ableton"))
# For state-modifying commands, add another small delay after receiving response
if is_modifying_command:
import time
time.sleep(0.1) # 100ms delay
return response.get("result", {})
except socket.timeout:
logger.error("Socket timeout while waiting for response from Ableton")
self.sock = None
raise Exception("Timeout waiting for Ableton response")
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error: {str(e)}")
self.sock = None
raise Exception(f"Connection to Ableton lost: {str(e)}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from Ableton: {str(e)}")
if 'response_data' in locals() and response_data:
logger.error(f"Raw response (first 200 bytes): {response_data[:200]}")
self.sock = None
raise Exception(f"Invalid response from Ableton: {str(e)}")
except Exception as e:
logger.error(f"Error communicating with Ableton: {str(e)}")
self.sock = None
raise Exception(f"Communication error with Ableton: {str(e)}")
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Manage server startup and shutdown lifecycle"""
try:
logger.info("AbletonMCP server starting up")
try:
ableton = get_ableton_connection()
logger.info("Successfully connected to Ableton on startup")
except Exception as e:
logger.warning(f"Could not connect to Ableton on startup: {str(e)}")
logger.warning("Make sure the Ableton Remote Script is running")
yield {}
finally:
global _ableton_connection
if _ableton_connection:
logger.info("Disconnecting from Ableton on shutdown")
_ableton_connection.disconnect()
_ableton_connection = None
logger.info("AbletonMCP server shut down")
# Create the MCP server with lifespan support
mcp = FastMCP(
"AbletonMCP",
description="Ableton Live integration through the Model Context Protocol",
lifespan=server_lifespan
)
# Global connection for resources
_ableton_connection = None
def get_ableton_connection():
"""Get or create a persistent Ableton connection"""
global _ableton_connection
if _ableton_connection is not None:
try:
# Test the connection with a simple ping
# We'll try to send an empty message, which should fail if the connection is dead
# but won't affect Ableton if it's alive
_ableton_connection.sock.settimeout(1.0)
_ableton_connection.sock.sendall(b'')
return _ableton_connection
except Exception as e:
logger.warning(f"Existing connection is no longer valid: {str(e)}")
try:
_ableton_connection.disconnect()
except:
pass
_ableton_connection = None
# Connection doesn't exist or is invalid, create a new one
if _ableton_connection is None:
# Try to connect up to 3 times with a short delay between attempts
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
logger.info(f"Connecting to Ableton (attempt {attempt}/{max_attempts})...")
_ableton_connection = AbletonConnection(host="localhost", port=9877)
if _ableton_connection.connect():
logger.info("Created new persistent connection to Ableton")
# Validate connection with a simple command
try:
# Get session info as a test
_ableton_connection.send_command("get_session_info")
logger.info("Connection validated successfully")
return _ableton_connection
except Exception as e:
logger.error(f"Connection validation failed: {str(e)}")
_ableton_connection.disconnect()
_ableton_connection = None
# Continue to next attempt
else:
_ableton_connection = None
except Exception as e:
logger.error(f"Connection attempt {attempt} failed: {str(e)}")
if _ableton_connection:
_ableton_connection.disconnect()
_ableton_connection = None
# Wait before trying again, but only if we have more attempts left
if attempt < max_attempts:
import time
time.sleep(1.0)
# If we get here, all connection attempts failed
if _ableton_connection is None:
logger.error("Failed to connect to Ableton after multiple attempts")
raise Exception("Could not connect to Ableton. Make sure the Remote Script is running.")
return _ableton_connection
# Core Tool endpoints
@mcp.tool()
def get_session_info(ctx: Context) -> str:
"""Get detailed information about the current Ableton session"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("get_session_info")
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting session info from Ableton: {str(e)}")
return f"Error getting session info: {str(e)}"
@mcp.tool()
def get_track_info(ctx: Context, track_index: int) -> str:
"""
Get detailed information about a specific track in Ableton.
Parameters:
- track_index: The index of the track to get information about
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("get_track_info", {"track_index": track_index})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting track info from Ableton: {str(e)}")
return f"Error getting track info: {str(e)}"
@mcp.tool()
def create_midi_track(ctx: Context, index: int = -1) -> str:
"""
Create a new MIDI track in the Ableton session.
Parameters:
- index: The index to insert the track at (-1 = end of list)
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("create_midi_track", {"index": index})
return f"Created new MIDI track: {result.get('name', 'unknown')}"
except Exception as e:
logger.error(f"Error creating MIDI track: {str(e)}")
return f"Error creating MIDI track: {str(e)}"
@mcp.tool()
def set_track_name(ctx: Context, track_index: int, name: str) -> str:
"""
Set the name of a track.
Parameters:
- track_index: The index of the track to rename
- name: The new name for the track
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("set_track_name", {"track_index": track_index, "name": name})
return f"Renamed track to: {result.get('name', name)}"
except Exception as e:
logger.error(f"Error setting track name: {str(e)}")
return f"Error setting track name: {str(e)}"
@mcp.tool()
def create_clip(ctx: Context, track_index: int, clip_index: int, length: float = 4.0) -> str:
"""
Create a new MIDI clip in the specified track and clip slot.
Parameters:
- track_index: The index of the track to create the clip in
- clip_index: The index of the clip slot to create the clip in
- length: The length of the clip in beats (default: 4.0)
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("create_clip", {
"track_index": track_index,
"clip_index": clip_index,
"length": length
})
return f"Created new clip at track {track_index}, slot {clip_index} with length {length} beats"
except Exception as e:
logger.error(f"Error creating clip: {str(e)}")
return f"Error creating clip: {str(e)}"
@mcp.tool()
def add_notes_to_clip(
ctx: Context,
track_index: int,
clip_index: int,
notes: List[Dict[str, Union[int, float, bool]]]
) -> str:
"""
Add MIDI notes to a clip.
Parameters:
- track_index: The index of the track containing the clip
- clip_index: The index of the clip slot containing the clip
- notes: List of note dictionaries, each with pitch, start_time, duration, velocity, and mute
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("add_notes_to_clip", {
"track_index": track_index,
"clip_index": clip_index,
"notes": notes
})
return f"Added {len(notes)} notes to clip at track {track_index}, slot {clip_index}"
except Exception as e:
logger.error(f"Error adding notes to clip: {str(e)}")
return f"Error adding notes to clip: {str(e)}"
@mcp.tool()
def set_clip_name(ctx: Context, track_index: int, clip_index: int, name: str) -> str:
"""
Set the name of a clip.
Parameters:
- track_index: The index of the track containing the clip
- clip_index: The index of the clip slot containing the clip
- name: The new name for the clip
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("set_clip_name", {
"track_index": track_index,
"clip_index": clip_index,
"name": name
})
return f"Renamed clip at track {track_index}, slot {clip_index} to '{name}'"
except Exception as e:
logger.error(f"Error setting clip name: {str(e)}")
return f"Error setting clip name: {str(e)}"
@mcp.tool()
def set_tempo(ctx: Context, tempo: float) -> str:
"""
Set the tempo of the Ableton session.
Parameters:
- tempo: The new tempo in BPM
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("set_tempo", {"tempo": tempo})
return f"Set tempo to {tempo} BPM"
except Exception as e:
logger.error(f"Error setting tempo: {str(e)}")
return f"Error setting tempo: {str(e)}"
@mcp.tool()
def load_instrument_or_effect(ctx: Context, track_index: int, uri: str) -> str:
"""
Load an instrument or effect onto a track using its URI.
Parameters:
- track_index: The index of the track to load the instrument on
- uri: The URI of the instrument or effect to load (e.g., 'query:Synths#Instrument%20Rack:Bass:FileId_5116')
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("load_browser_item", {
"track_index": track_index,
"item_uri": uri
})
# Check if the instrument was loaded successfully
if result.get("loaded", False):
new_devices = result.get("new_devices", [])
if new_devices:
return f"Loaded instrument with URI '{uri}' on track {track_index}. New devices: {', '.join(new_devices)}"
else:
devices = result.get("devices_after", [])
return f"Loaded instrument with URI '{uri}' on track {track_index}. Devices on track: {', '.join(devices)}"
else:
return f"Failed to load instrument with URI '{uri}'"
except Exception as e:
logger.error(f"Error loading instrument by URI: {str(e)}")
return f"Error loading instrument by URI: {str(e)}"
@mcp.tool()
def fire_clip(ctx: Context, track_index: int, clip_index: int) -> str:
"""
Start playing a clip.
Parameters:
- track_index: The index of the track containing the clip
- clip_index: The index of the clip slot containing the clip
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("fire_clip", {
"track_index": track_index,
"clip_index": clip_index
})
return f"Started playing clip at track {track_index}, slot {clip_index}"
except Exception as e:
logger.error(f"Error firing clip: {str(e)}")
return f"Error firing clip: {str(e)}"
@mcp.tool()
def stop_clip(ctx: Context, track_index: int, clip_index: int) -> str:
"""
Stop playing a clip.
Parameters:
- track_index: The index of the track containing the clip
- clip_index: The index of the clip slot containing the clip
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("stop_clip", {
"track_index": track_index,
"clip_index": clip_index
})
return f"Stopped clip at track {track_index}, slot {clip_index}"
except Exception as e:
logger.error(f"Error stopping clip: {str(e)}")
return f"Error stopping clip: {str(e)}"
@mcp.tool()
def start_playback(ctx: Context) -> str:
"""Start playing the Ableton session."""
try:
ableton = get_ableton_connection()
result = ableton.send_command("start_playback")
return "Started playback"
except Exception as e:
logger.error(f"Error starting playback: {str(e)}")
return f"Error starting playback: {str(e)}"
@mcp.tool()
def stop_playback(ctx: Context) -> str:
"""Stop playing the Ableton session."""
try:
ableton = get_ableton_connection()
result = ableton.send_command("stop_playback")
return "Stopped playback"
except Exception as e:
logger.error(f"Error stopping playback: {str(e)}")
return f"Error stopping playback: {str(e)}"
@mcp.tool()
def get_browser_tree(ctx: Context, category_type: str = "all") -> str:
"""
Get a hierarchical tree of browser categories from Ableton.
Parameters:
- category_type: Type of categories to get ('all', 'instruments', 'sounds', 'drums', 'audio_effects', 'midi_effects')
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("get_browser_tree", {
"category_type": category_type
})
# Check if we got any categories
if "available_categories" in result and len(result.get("categories", [])) == 0:
available_cats = result.get("available_categories", [])
return (f"No categories found for '{category_type}'. "
f"Available browser categories: {', '.join(available_cats)}")
# Format the tree in a more readable way
total_folders = result.get("total_folders", 0)
formatted_output = f"Browser tree for '{category_type}' (showing {total_folders} folders):\n\n"
def format_tree(item, indent=0):
output = ""
if item:
prefix = " " * indent
name = item.get("name", "Unknown")
path = item.get("path", "")
has_more = item.get("has_more", False)
# Add this item
output += f"{prefix}• {name}"
if path:
output += f" (path: {path})"
if has_more:
output += " [...]"
output += "\n"
# Add children
for child in item.get("children", []):
output += format_tree(child, indent + 1)
return output
# Format each category
for category in result.get("categories", []):
formatted_output += format_tree(category)
formatted_output += "\n"
return formatted_output
except Exception as e:
error_msg = str(e)
if "Browser is not available" in error_msg:
logger.error(f"Browser is not available in Ableton: {error_msg}")
return f"Error: The Ableton browser is not available. Make sure Ableton Live is fully loaded and try again."
elif "Could not access Live application" in error_msg:
logger.error(f"Could not access Live application: {error_msg}")
return f"Error: Could not access the Ableton Live application. Make sure Ableton Live is running and the Remote Script is loaded."
else:
logger.error(f"Error getting browser tree: {error_msg}")
return f"Error getting browser tree: {error_msg}"
@mcp.tool()
def get_browser_items_at_path(ctx: Context, path: str) -> str:
"""
Get browser items at a specific path in Ableton's browser.
Parameters:
- path: Path in the format "category/folder/subfolder"
where category is one of the available browser categories in Ableton
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("get_browser_items_at_path", {
"path": path
})
# Check if there was an error with available categories
if "error" in result and "available_categories" in result:
error = result.get("error", "")
available_cats = result.get("available_categories", [])
return (f"Error: {error}\n"
f"Available browser categories: {', '.join(available_cats)}")
return json.dumps(result, indent=2)
except Exception as e:
error_msg = str(e)
if "Browser is not available" in error_msg:
logger.error(f"Browser is not available in Ableton: {error_msg}")
return f"Error: The Ableton browser is not available. Make sure Ableton Live is fully loaded and try again."
elif "Could not access Live application" in error_msg:
logger.error(f"Could not access Live application: {error_msg}")
return f"Error: Could not access the Ableton Live application. Make sure Ableton Live is running and the Remote Script is loaded."
elif "Unknown or unavailable category" in error_msg:
logger.error(f"Invalid browser category: {error_msg}")
return f"Error: {error_msg}. Please check the available categories using get_browser_tree."
elif "Path part" in error_msg and "not found" in error_msg:
logger.error(f"Path not found: {error_msg}")
return f"Error: {error_msg}. Please check the path and try again."
else:
logger.error(f"Error getting browser items at path: {error_msg}")
return f"Error getting browser items at path: {error_msg}"
@mcp.tool()
def load_drum_kit(ctx: Context, track_index: int, rack_uri: str, kit_path: str) -> str:
"""
Load a drum rack and then load a specific drum kit into it.
Parameters:
- track_index: The index of the track to load on
- rack_uri: The URI of the drum rack to load (e.g., 'Drums/Drum Rack')
- kit_path: Path to the drum kit inside the browser (e.g., 'drums/acoustic/kit1')
"""
try:
ableton = get_ableton_connection()
# Step 1: Load the drum rack
result = ableton.send_command("load_browser_item", {
"track_index": track_index,
"item_uri": rack_uri
})
if not result.get("loaded", False):
return f"Failed to load drum rack with URI '{rack_uri}'"
# Step 2: Get the drum kit items at the specified path
kit_result = ableton.send_command("get_browser_items_at_path", {
"path": kit_path
})
if "error" in kit_result:
return f"Loaded drum rack but failed to find drum kit: {kit_result.get('error')}"
# Step 3: Find a loadable drum kit
kit_items = kit_result.get("items", [])
loadable_kits = [item for item in kit_items if item.get("is_loadable", False)]
if not loadable_kits:
return f"Loaded drum rack but no loadable drum kits found at '{kit_path}'"
# Step 4: Load the first loadable kit
kit_uri = loadable_kits[0].get("uri")
load_result = ableton.send_command("load_browser_item", {
"track_index": track_index,
"item_uri": kit_uri
})
return f"Loaded drum rack and kit '{loadable_kits[0].get('name')}' on track {track_index}"
except Exception as e:
logger.error(f"Error loading drum kit: {str(e)}")
return f"Error loading drum kit: {str(e)}"
# Main execution
def main():
"""Run the MCP server"""
mcp.run()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/AbletonMCP_Remote_Script/__init__.py:
--------------------------------------------------------------------------------
```python
# AbletonMCP/init.py
from __future__ import absolute_import, print_function, unicode_literals
from _Framework.ControlSurface import ControlSurface
import socket
import json
import threading
import time
import traceback
# Change queue import for Python 2
try:
import Queue as queue # Python 2
except ImportError:
import queue # Python 3
# Constants for socket communication
DEFAULT_PORT = 9877
HOST = "localhost"
def create_instance(c_instance):
"""Create and return the AbletonMCP script instance"""
return AbletonMCP(c_instance)
class AbletonMCP(ControlSurface):
"""AbletonMCP Remote Script for Ableton Live"""
def __init__(self, c_instance):
"""Initialize the control surface"""
ControlSurface.__init__(self, c_instance)
self.log_message("AbletonMCP Remote Script initializing...")
# Socket server for communication
self.server = None
self.client_threads = []
self.server_thread = None
self.running = False
# Cache the song reference for easier access
self._song = self.song()
# Start the socket server
self.start_server()
self.log_message("AbletonMCP initialized")
# Show a message in Ableton
self.show_message("AbletonMCP: Listening for commands on port " + str(DEFAULT_PORT))
def disconnect(self):
"""Called when Ableton closes or the control surface is removed"""
self.log_message("AbletonMCP disconnecting...")
self.running = False
# Stop the server
if self.server:
try:
self.server.close()
except:
pass
# Wait for the server thread to exit
if self.server_thread and self.server_thread.is_alive():
self.server_thread.join(1.0)
# Clean up any client threads
for client_thread in self.client_threads[:]:
if client_thread.is_alive():
# We don't join them as they might be stuck
self.log_message("Client thread still alive during disconnect")
ControlSurface.disconnect(self)
self.log_message("AbletonMCP disconnected")
def start_server(self):
"""Start the socket server in a separate thread"""
try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((HOST, DEFAULT_PORT))
self.server.listen(5) # Allow up to 5 pending connections
self.running = True
self.server_thread = threading.Thread(target=self._server_thread)
self.server_thread.daemon = True
self.server_thread.start()
self.log_message("Server started on port " + str(DEFAULT_PORT))
except Exception as e:
self.log_message("Error starting server: " + str(e))
self.show_message("AbletonMCP: Error starting server - " + str(e))
def _server_thread(self):
"""Server thread implementation - handles client connections"""
try:
self.log_message("Server thread started")
# Set a timeout to allow regular checking of running flag
self.server.settimeout(1.0)
while self.running:
try:
# Accept connections with timeout
client, address = self.server.accept()
self.log_message("Connection accepted from " + str(address))
self.show_message("AbletonMCP: Client connected")
# Handle client in a separate thread
client_thread = threading.Thread(
target=self._handle_client,
args=(client,)
)
client_thread.daemon = True
client_thread.start()
# Keep track of client threads
self.client_threads.append(client_thread)
# Clean up finished client threads
self.client_threads = [t for t in self.client_threads if t.is_alive()]
except socket.timeout:
# No connection yet, just continue
continue
except Exception as e:
if self.running: # Only log if still running
self.log_message("Server accept error: " + str(e))
time.sleep(0.5)
self.log_message("Server thread stopped")
except Exception as e:
self.log_message("Server thread error: " + str(e))
def _handle_client(self, client):
"""Handle communication with a connected client"""
self.log_message("Client handler started")
client.settimeout(None) # No timeout for client socket
buffer = '' # Changed from b'' to '' for Python 2
try:
while self.running:
try:
# Receive data
data = client.recv(8192)
if not data:
# Client disconnected
self.log_message("Client disconnected")
break
# Accumulate data in buffer with explicit encoding/decoding
try:
# Python 3: data is bytes, decode to string
buffer += data.decode('utf-8')
except AttributeError:
# Python 2: data is already string
buffer += data
try:
# Try to parse command from buffer
command = json.loads(buffer) # Removed decode('utf-8')
buffer = '' # Clear buffer after successful parse
self.log_message("Received command: " + str(command.get("type", "unknown")))
# Process the command and get response
response = self._process_command(command)
# Send the response with explicit encoding
try:
# Python 3: encode string to bytes
client.sendall(json.dumps(response).encode('utf-8'))
except AttributeError:
# Python 2: string is already bytes
client.sendall(json.dumps(response))
except ValueError:
# Incomplete data, wait for more
continue
except Exception as e:
self.log_message("Error handling client data: " + str(e))
self.log_message(traceback.format_exc())
# Send error response if possible
error_response = {
"status": "error",
"message": str(e)
}
try:
# Python 3: encode string to bytes
client.sendall(json.dumps(error_response).encode('utf-8'))
except AttributeError:
# Python 2: string is already bytes
client.sendall(json.dumps(error_response))
except:
# If we can't send the error, the connection is probably dead
break
# For serious errors, break the loop
if not isinstance(e, ValueError):
break
except Exception as e:
self.log_message("Error in client handler: " + str(e))
finally:
try:
client.close()
except:
pass
self.log_message("Client handler stopped")
def _process_command(self, command):
"""Process a command from the client and return a response"""
command_type = command.get("type", "")
params = command.get("params", {})
# Initialize response
response = {
"status": "success",
"result": {}
}
try:
# Route the command to the appropriate handler
if command_type == "get_session_info":
response["result"] = self._get_session_info()
elif command_type == "get_track_info":
track_index = params.get("track_index", 0)
response["result"] = self._get_track_info(track_index)
# Commands that modify Live's state should be scheduled on the main thread
elif command_type in ["create_midi_track", "set_track_name",
"create_clip", "add_notes_to_clip", "set_clip_name",
"set_tempo", "fire_clip", "stop_clip",
"start_playback", "stop_playback", "load_browser_item"]:
# Use a thread-safe approach with a response queue
response_queue = queue.Queue()
# Define a function to execute on the main thread
def main_thread_task():
try:
result = None
if command_type == "create_midi_track":
index = params.get("index", -1)
result = self._create_midi_track(index)
elif command_type == "set_track_name":
track_index = params.get("track_index", 0)
name = params.get("name", "")
result = self._set_track_name(track_index, name)
elif command_type == "create_clip":
track_index = params.get("track_index", 0)
clip_index = params.get("clip_index", 0)
length = params.get("length", 4.0)
result = self._create_clip(track_index, clip_index, length)
elif command_type == "add_notes_to_clip":
track_index = params.get("track_index", 0)
clip_index = params.get("clip_index", 0)
notes = params.get("notes", [])
result = self._add_notes_to_clip(track_index, clip_index, notes)
elif command_type == "set_clip_name":
track_index = params.get("track_index", 0)
clip_index = params.get("clip_index", 0)
name = params.get("name", "")
result = self._set_clip_name(track_index, clip_index, name)
elif command_type == "set_tempo":
tempo = params.get("tempo", 120.0)
result = self._set_tempo(tempo)
elif command_type == "fire_clip":
track_index = params.get("track_index", 0)
clip_index = params.get("clip_index", 0)
result = self._fire_clip(track_index, clip_index)
elif command_type == "stop_clip":
track_index = params.get("track_index", 0)
clip_index = params.get("clip_index", 0)
result = self._stop_clip(track_index, clip_index)
elif command_type == "start_playback":
result = self._start_playback()
elif command_type == "stop_playback":
result = self._stop_playback()
elif command_type == "load_instrument_or_effect":
track_index = params.get("track_index", 0)
uri = params.get("uri", "")
result = self._load_instrument_or_effect(track_index, uri)
elif command_type == "load_browser_item":
track_index = params.get("track_index", 0)
item_uri = params.get("item_uri", "")
result = self._load_browser_item(track_index, item_uri)
# Put the result in the queue
response_queue.put({"status": "success", "result": result})
except Exception as e:
self.log_message("Error in main thread task: " + str(e))
self.log_message(traceback.format_exc())
response_queue.put({"status": "error", "message": str(e)})
# Schedule the task to run on the main thread
try:
self.schedule_message(0, main_thread_task)
except AssertionError:
# If we're already on the main thread, execute directly
main_thread_task()
# Wait for the response with a timeout
try:
task_response = response_queue.get(timeout=10.0)
if task_response.get("status") == "error":
response["status"] = "error"
response["message"] = task_response.get("message", "Unknown error")
else:
response["result"] = task_response.get("result", {})
except queue.Empty:
response["status"] = "error"
response["message"] = "Timeout waiting for operation to complete"
elif command_type == "get_browser_item":
uri = params.get("uri", None)
path = params.get("path", None)
response["result"] = self._get_browser_item(uri, path)
elif command_type == "get_browser_categories":
category_type = params.get("category_type", "all")
response["result"] = self._get_browser_categories(category_type)
elif command_type == "get_browser_items":
path = params.get("path", "")
item_type = params.get("item_type", "all")
response["result"] = self._get_browser_items(path, item_type)
# Add the new browser commands
elif command_type == "get_browser_tree":
category_type = params.get("category_type", "all")
response["result"] = self.get_browser_tree(category_type)
elif command_type == "get_browser_items_at_path":
path = params.get("path", "")
response["result"] = self.get_browser_items_at_path(path)
else:
response["status"] = "error"
response["message"] = "Unknown command: " + command_type
except Exception as e:
self.log_message("Error processing command: " + str(e))
self.log_message(traceback.format_exc())
response["status"] = "error"
response["message"] = str(e)
return response
# Command implementations
def _get_session_info(self):
"""Get information about the current session"""
try:
result = {
"tempo": self._song.tempo,
"signature_numerator": self._song.signature_numerator,
"signature_denominator": self._song.signature_denominator,
"track_count": len(self._song.tracks),
"return_track_count": len(self._song.return_tracks),
"master_track": {
"name": "Master",
"volume": self._song.master_track.mixer_device.volume.value,
"panning": self._song.master_track.mixer_device.panning.value
}
}
return result
except Exception as e:
self.log_message("Error getting session info: " + str(e))
raise
def _get_track_info(self, track_index):
"""Get information about a track"""
try:
if track_index < 0 or track_index >= len(self._song.tracks):
raise IndexError("Track index out of range")
track = self._song.tracks[track_index]
# Get clip slots
clip_slots = []
for slot_index, slot in enumerate(track.clip_slots):
clip_info = None
if slot.has_clip:
clip = slot.clip
clip_info = {
"name": clip.name,
"length": clip.length,
"is_playing": clip.is_playing,
"is_recording": clip.is_recording
}
clip_slots.append({
"index": slot_index,
"has_clip": slot.has_clip,
"clip": clip_info
})
# Get devices
devices = []
for device_index, device in enumerate(track.devices):
devices.append({
"index": device_index,
"name": device.name,
"class_name": device.class_name,
"type": self._get_device_type(device)
})
result = {
"index": track_index,
"name": track.name,
"is_audio_track": track.has_audio_input,
"is_midi_track": track.has_midi_input,
"mute": track.mute,
"solo": track.solo,
"arm": track.arm,
"volume": track.mixer_device.volume.value,
"panning": track.mixer_device.panning.value,
"clip_slots": clip_slots,
"devices": devices
}
return result
except Exception as e:
self.log_message("Error getting track info: " + str(e))
raise
def _create_midi_track(self, index):
"""Create a new MIDI track at the specified index"""
try:
# Create the track
self._song.create_midi_track(index)
# Get the new track
new_track_index = len(self._song.tracks) - 1 if index == -1 else index
new_track = self._song.tracks[new_track_index]
result = {
"index": new_track_index,
"name": new_track.name
}
return result
except Exception as e:
self.log_message("Error creating MIDI track: " + str(e))
raise
def _set_track_name(self, track_index, name):
"""Set the name of a track"""
try:
if track_index < 0 or track_index >= len(self._song.tracks):
raise IndexError("Track index out of range")
# Set the name
track = self._song.tracks[track_index]
track.name = name
result = {
"name": track.name
}
return result
except Exception as e:
self.log_message("Error setting track name: " + str(e))
raise
def _create_clip(self, track_index, clip_index, length):
"""Create a new MIDI clip in the specified track and clip slot"""
try:
if track_index < 0 or track_index >= len(self._song.tracks):
raise IndexError("Track index out of range")
track = self._song.tracks[track_index]
if clip_index < 0 or clip_index >= len(track.clip_slots):
raise IndexError("Clip index out of range")
clip_slot = track.clip_slots[clip_index]
# Check if the clip slot already has a clip
if clip_slot.has_clip:
raise Exception("Clip slot already has a clip")
# Create the clip
clip_slot.create_clip(length)
result = {
"name": clip_slot.clip.name,
"length": clip_slot.clip.length
}
return result
except Exception as e:
self.log_message("Error creating clip: " + str(e))
raise
def _add_notes_to_clip(self, track_index, clip_index, notes):
"""Add MIDI notes to a clip"""
try:
if track_index < 0 or track_index >= len(self._song.tracks):
raise IndexError("Track index out of range")
track = self._song.tracks[track_index]
if clip_index < 0 or clip_index >= len(track.clip_slots):
raise IndexError("Clip index out of range")
clip_slot = track.clip_slots[clip_index]
if not clip_slot.has_clip:
raise Exception("No clip in slot")
clip = clip_slot.clip
# Convert note data to Live's format
live_notes = []
for note in notes:
pitch = note.get("pitch", 60)
start_time = note.get("start_time", 0.0)
duration = note.get("duration", 0.25)
velocity = note.get("velocity", 100)
mute = note.get("mute", False)
live_notes.append((pitch, start_time, duration, velocity, mute))
# Add the notes
clip.set_notes(tuple(live_notes))
result = {
"note_count": len(notes)
}
return result
except Exception as e:
self.log_message("Error adding notes to clip: " + str(e))
raise
def _set_clip_name(self, track_index, clip_index, name):
"""Set the name of a clip"""
try:
if track_index < 0 or track_index >= len(self._song.tracks):
raise IndexError("Track index out of range")
track = self._song.tracks[track_index]
if clip_index < 0 or clip_index >= len(track.clip_slots):
raise IndexError("Clip index out of range")
clip_slot = track.clip_slots[clip_index]
if not clip_slot.has_clip:
raise Exception("No clip in slot")
clip = clip_slot.clip
clip.name = name
result = {
"name": clip.name
}
return result
except Exception as e:
self.log_message("Error setting clip name: " + str(e))
raise
def _set_tempo(self, tempo):
"""Set the tempo of the session"""
try:
self._song.tempo = tempo
result = {
"tempo": self._song.tempo
}
return result
except Exception as e:
self.log_message("Error setting tempo: " + str(e))
raise
def _fire_clip(self, track_index, clip_index):
"""Fire a clip"""
try:
if track_index < 0 or track_index >= len(self._song.tracks):
raise IndexError("Track index out of range")
track = self._song.tracks[track_index]
if clip_index < 0 or clip_index >= len(track.clip_slots):
raise IndexError("Clip index out of range")
clip_slot = track.clip_slots[clip_index]
if not clip_slot.has_clip:
raise Exception("No clip in slot")
clip_slot.fire()
result = {
"fired": True
}
return result
except Exception as e:
self.log_message("Error firing clip: " + str(e))
raise
def _stop_clip(self, track_index, clip_index):
"""Stop a clip"""
try:
if track_index < 0 or track_index >= len(self._song.tracks):
raise IndexError("Track index out of range")
track = self._song.tracks[track_index]
if clip_index < 0 or clip_index >= len(track.clip_slots):
raise IndexError("Clip index out of range")
clip_slot = track.clip_slots[clip_index]
clip_slot.stop()
result = {
"stopped": True
}
return result
except Exception as e:
self.log_message("Error stopping clip: " + str(e))
raise
def _start_playback(self):
"""Start playing the session"""
try:
self._song.start_playing()
result = {
"playing": self._song.is_playing
}
return result
except Exception as e:
self.log_message("Error starting playback: " + str(e))
raise
def _stop_playback(self):
"""Stop playing the session"""
try:
self._song.stop_playing()
result = {
"playing": self._song.is_playing
}
return result
except Exception as e:
self.log_message("Error stopping playback: " + str(e))
raise
def _get_browser_item(self, uri, path):
"""Get a browser item by URI or path"""
try:
# Access the application's browser instance instead of creating a new one
app = self.application()
if not app:
raise RuntimeError("Could not access Live application")
result = {
"uri": uri,
"path": path,
"found": False
}
# Try to find by URI first if provided
if uri:
item = self._find_browser_item_by_uri(app.browser, uri)
if item:
result["found"] = True
result["item"] = {
"name": item.name,
"is_folder": item.is_folder,
"is_device": item.is_device,
"is_loadable": item.is_loadable,
"uri": item.uri
}
return result
# If URI not provided or not found, try by path
if path:
# Parse the path and navigate to the specified item
path_parts = path.split("/")
# Determine the root based on the first part
current_item = None
if path_parts[0].lower() == "nstruments":
current_item = app.browser.instruments
elif path_parts[0].lower() == "sounds":
current_item = app.browser.sounds
elif path_parts[0].lower() == "drums":
current_item = app.browser.drums
elif path_parts[0].lower() == "audio_effects":
current_item = app.browser.audio_effects
elif path_parts[0].lower() == "midi_effects":
current_item = app.browser.midi_effects
else:
# Default to instruments if not specified
current_item = app.browser.instruments
# Don't skip the first part in this case
path_parts = ["instruments"] + path_parts
# Navigate through the path
for i in range(1, len(path_parts)):
part = path_parts[i]
if not part: # Skip empty parts
continue
found = False
for child in current_item.children:
if child.name.lower() == part.lower():
current_item = child
found = True
break
if not found:
result["error"] = "Path part '{0}' not found".format(part)
return result
# Found the item
result["found"] = True
result["item"] = {
"name": current_item.name,
"is_folder": current_item.is_folder,
"is_device": current_item.is_device,
"is_loadable": current_item.is_loadable,
"uri": current_item.uri
}
return result
except Exception as e:
self.log_message("Error getting browser item: " + str(e))
self.log_message(traceback.format_exc())
raise
def _load_browser_item(self, track_index, item_uri):
"""Load a browser item onto a track by its URI"""
try:
if track_index < 0 or track_index >= len(self._song.tracks):
raise IndexError("Track index out of range")
track = self._song.tracks[track_index]
# Access the application's browser instance instead of creating a new one
app = self.application()
# Find the browser item by URI
item = self._find_browser_item_by_uri(app.browser, item_uri)
if not item:
raise ValueError("Browser item with URI '{0}' not found".format(item_uri))
# Select the track
self._song.view.selected_track = track
# Load the item
app.browser.load_item(item)
result = {
"loaded": True,
"item_name": item.name,
"track_name": track.name,
"uri": item_uri
}
return result
except Exception as e:
self.log_message("Error loading browser item: {0}".format(str(e)))
self.log_message(traceback.format_exc())
raise
def _find_browser_item_by_uri(self, browser_or_item, uri, max_depth=10, current_depth=0):
"""Find a browser item by its URI"""
try:
# Check if this is the item we're looking for
if hasattr(browser_or_item, 'uri') and browser_or_item.uri == uri:
return browser_or_item
# Stop recursion if we've reached max depth
if current_depth >= max_depth:
return None
# Check if this is a browser with root categories
if hasattr(browser_or_item, 'instruments'):
# Check all main categories
categories = [
browser_or_item.instruments,
browser_or_item.sounds,
browser_or_item.drums,
browser_or_item.audio_effects,
browser_or_item.midi_effects
]
for category in categories:
item = self._find_browser_item_by_uri(category, uri, max_depth, current_depth + 1)
if item:
return item
return None
# Check if this item has children
if hasattr(browser_or_item, 'children') and browser_or_item.children:
for child in browser_or_item.children:
item = self._find_browser_item_by_uri(child, uri, max_depth, current_depth + 1)
if item:
return item
return None
except Exception as e:
self.log_message("Error finding browser item by URI: {0}".format(str(e)))
return None
# Helper methods
def _get_device_type(self, device):
"""Get the type of a device"""
try:
# Simple heuristic - in a real implementation you'd look at the device class
if device.can_have_drum_pads:
return "drum_machine"
elif device.can_have_chains:
return "rack"
elif "instrument" in device.class_display_name.lower():
return "instrument"
elif "audio_effect" in device.class_name.lower():
return "audio_effect"
elif "midi_effect" in device.class_name.lower():
return "midi_effect"
else:
return "unknown"
except:
return "unknown"
def get_browser_tree(self, category_type="all"):
"""
Get a simplified tree of browser categories.
Args:
category_type: Type of categories to get ('all', 'instruments', 'sounds', etc.)
Returns:
Dictionary with the browser tree structure
"""
try:
# Access the application's browser instance instead of creating a new one
app = self.application()
if not app:
raise RuntimeError("Could not access Live application")
# Check if browser is available
if not hasattr(app, 'browser') or app.browser is None:
raise RuntimeError("Browser is not available in the Live application")
# Log available browser attributes to help diagnose issues
browser_attrs = [attr for attr in dir(app.browser) if not attr.startswith('_')]
self.log_message("Available browser attributes: {0}".format(browser_attrs))
result = {
"type": category_type,
"categories": [],
"available_categories": browser_attrs
}
# Helper function to process a browser item and its children
def process_item(item, depth=0):
if not item:
return None
result = {
"name": item.name if hasattr(item, 'name') else "Unknown",
"is_folder": hasattr(item, 'children') and bool(item.children),
"is_device": hasattr(item, 'is_device') and item.is_device,
"is_loadable": hasattr(item, 'is_loadable') and item.is_loadable,
"uri": item.uri if hasattr(item, 'uri') else None,
"children": []
}
return result
# Process based on category type and available attributes
if (category_type == "all" or category_type == "instruments") and hasattr(app.browser, 'instruments'):
try:
instruments = process_item(app.browser.instruments)
if instruments:
instruments["name"] = "Instruments" # Ensure consistent naming
result["categories"].append(instruments)
except Exception as e:
self.log_message("Error processing instruments: {0}".format(str(e)))
if (category_type == "all" or category_type == "sounds") and hasattr(app.browser, 'sounds'):
try:
sounds = process_item(app.browser.sounds)
if sounds:
sounds["name"] = "Sounds" # Ensure consistent naming
result["categories"].append(sounds)
except Exception as e:
self.log_message("Error processing sounds: {0}".format(str(e)))
if (category_type == "all" or category_type == "drums") and hasattr(app.browser, 'drums'):
try:
drums = process_item(app.browser.drums)
if drums:
drums["name"] = "Drums" # Ensure consistent naming
result["categories"].append(drums)
except Exception as e:
self.log_message("Error processing drums: {0}".format(str(e)))
if (category_type == "all" or category_type == "audio_effects") and hasattr(app.browser, 'audio_effects'):
try:
audio_effects = process_item(app.browser.audio_effects)
if audio_effects:
audio_effects["name"] = "Audio Effects" # Ensure consistent naming
result["categories"].append(audio_effects)
except Exception as e:
self.log_message("Error processing audio_effects: {0}".format(str(e)))
if (category_type == "all" or category_type == "midi_effects") and hasattr(app.browser, 'midi_effects'):
try:
midi_effects = process_item(app.browser.midi_effects)
if midi_effects:
midi_effects["name"] = "MIDI Effects"
result["categories"].append(midi_effects)
except Exception as e:
self.log_message("Error processing midi_effects: {0}".format(str(e)))
# Try to process other potentially available categories
for attr in browser_attrs:
if attr not in ['instruments', 'sounds', 'drums', 'audio_effects', 'midi_effects'] and \
(category_type == "all" or category_type == attr):
try:
item = getattr(app.browser, attr)
if hasattr(item, 'children') or hasattr(item, 'name'):
category = process_item(item)
if category:
category["name"] = attr.capitalize()
result["categories"].append(category)
except Exception as e:
self.log_message("Error processing {0}: {1}".format(attr, str(e)))
self.log_message("Browser tree generated for {0} with {1} root categories".format(
category_type, len(result['categories'])))
return result
except Exception as e:
self.log_message("Error getting browser tree: {0}".format(str(e)))
self.log_message(traceback.format_exc())
raise
def get_browser_items_at_path(self, path):
"""
Get browser items at a specific path.
Args:
path: Path in the format "category/folder/subfolder"
where category is one of: instruments, sounds, drums, audio_effects, midi_effects
or any other available browser category
Returns:
Dictionary with items at the specified path
"""
try:
# Access the application's browser instance instead of creating a new one
app = self.application()
if not app:
raise RuntimeError("Could not access Live application")
# Check if browser is available
if not hasattr(app, 'browser') or app.browser is None:
raise RuntimeError("Browser is not available in the Live application")
# Log available browser attributes to help diagnose issues
browser_attrs = [attr for attr in dir(app.browser) if not attr.startswith('_')]
self.log_message("Available browser attributes: {0}".format(browser_attrs))
# Parse the path
path_parts = path.split("/")
if not path_parts:
raise ValueError("Invalid path")
# Determine the root category
root_category = path_parts[0].lower()
current_item = None
# Check standard categories first
if root_category == "instruments" and hasattr(app.browser, 'instruments'):
current_item = app.browser.instruments
elif root_category == "sounds" and hasattr(app.browser, 'sounds'):
current_item = app.browser.sounds
elif root_category == "drums" and hasattr(app.browser, 'drums'):
current_item = app.browser.drums
elif root_category == "audio_effects" and hasattr(app.browser, 'audio_effects'):
current_item = app.browser.audio_effects
elif root_category == "midi_effects" and hasattr(app.browser, 'midi_effects'):
current_item = app.browser.midi_effects
else:
# Try to find the category in other browser attributes
found = False
for attr in browser_attrs:
if attr.lower() == root_category:
try:
current_item = getattr(app.browser, attr)
found = True
break
except Exception as e:
self.log_message("Error accessing browser attribute {0}: {1}".format(attr, str(e)))
if not found:
# If we still haven't found the category, return available categories
return {
"path": path,
"error": "Unknown or unavailable category: {0}".format(root_category),
"available_categories": browser_attrs,
"items": []
}
# Navigate through the path
for i in range(1, len(path_parts)):
part = path_parts[i]
if not part: # Skip empty parts
continue
if not hasattr(current_item, 'children'):
return {
"path": path,
"error": "Item at '{0}' has no children".format('/'.join(path_parts[:i])),
"items": []
}
found = False
for child in current_item.children:
if hasattr(child, 'name') and child.name.lower() == part.lower():
current_item = child
found = True
break
if not found:
return {
"path": path,
"error": "Path part '{0}' not found".format(part),
"items": []
}
# Get items at the current path
items = []
if hasattr(current_item, 'children'):
for child in current_item.children:
item_info = {
"name": child.name if hasattr(child, 'name') else "Unknown",
"is_folder": hasattr(child, 'children') and bool(child.children),
"is_device": hasattr(child, 'is_device') and child.is_device,
"is_loadable": hasattr(child, 'is_loadable') and child.is_loadable,
"uri": child.uri if hasattr(child, 'uri') else None
}
items.append(item_info)
result = {
"path": path,
"name": current_item.name if hasattr(current_item, 'name') else "Unknown",
"uri": current_item.uri if hasattr(current_item, 'uri') else None,
"is_folder": hasattr(current_item, 'children') and bool(current_item.children),
"is_device": hasattr(current_item, 'is_device') and current_item.is_device,
"is_loadable": hasattr(current_item, 'is_loadable') and current_item.is_loadable,
"items": items
}
self.log_message("Retrieved {0} items at path: {1}".format(len(items), path))
return result
except Exception as e:
self.log_message("Error getting browser items at path: {0}".format(str(e)))
self.log_message(traceback.format_exc())
raise
```