#
tokens: 5773/50000 4/4 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── bridge_mcp_cutter.py
├── images
│   └── cutterMCP.png
├── LICENSE
├── README.md
├── requirements.txt
└── src
    └── CutterMCPPlugin.py
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![LinkedIn](https://img.shields.io/badge/LinkedIn-Connect-blue)](https://www.linkedin.com/in/amey-pathak/)

![cutter_MCP_logo](images/cutterMCP.png)


# cutterMCP
cutterMCP is an Model Context Protocol server for allowing LLMs to autonomously reverse engineer applications. It exposes numerous tools from core Cutter functionality to MCP clients.

# Features
MCP Server + Cutter Plugin

- Decompile and analyze binaries in Cutter
- Automatically rename methods and data
- List methods, imports, and exports

# Installation

## Prerequisites
- Install [Cutter](https://github.com/rizinorg/cutter)
- Python3
- MCP [SDK](https://github.com/modelcontextprotocol/python-sdk)

## Cutter
First, download the latest release from this repository. This contains the Cutter plugin and Python MCP client. Then, you can directly import the plugin into Cutter.

1. Run Cutter
2. Go to **Edit -> Preferences -> Plugins**
3. Find the plugin directory location
4. Copy `CutterMCPPlugin.py` from the downloaded release and paste it inside the **python** folder
5. Restart Cutter
6. If successful, you’ll see the plugin under **Windows -> Plugins** and a new widget in the bottom panel


## MCP Clients

Theoretically, any MCP client should work with cutterMCP. one example is given below.

## Example 1: Claude Desktop
To set up Claude Desktop as a Cutter MCP client, go to `Claude` -> `Settings` -> `Developer` -> `Edit Config` -> `claude_desktop_config.json` and add the following:

MacOS/Linux :
```json
{
  "mcpServers": {
    "cutter": {
      "command": "python",
      "args": [
        "/ABSOLUTE_PATH_TO/bridge_mcp_cutter.py"
      ]
    }
  }
}
```

Windows :
```json
{
  "mcpServers": {
    "cutter": {
      "command": "python",
      "args": [
        "C:\\ABSOLUTE_PATH_TO\\bridge_mcp_cutter.py"
      ]
    }
  }
}
```

```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
mcp==1.5.0
requests==2.32.3
```

--------------------------------------------------------------------------------
/bridge_mcp_cutter.py:
--------------------------------------------------------------------------------

```python
# Cutter MCP Server using FastMCP

import sys
import requests
import argparse
import logging
from mcp.server.fastmcp import FastMCP

DEFAULT_CUTTER_SERVER = "http://127.0.0.1:8000/"
cutter_server_url = DEFAULT_CUTTER_SERVER

logger = logging.getLogger(__name__)

mcp = FastMCP("cutter-mcp")

def safe_get(endpoint: str, params: dict = None) -> list:
    if params is None:
        params = {}
    url = f"{cutter_server_url}/{endpoint}"
    try:
        response = requests.get(url, params=params, timeout=5)
        response.encoding = 'utf-8'
        if response.ok:
            return response.text.splitlines()
        else:
            return [f"Error {response.status_code}: {response.text.strip()}"]
    except Exception as e:
        return [f"Request failed: {str(e)}"]

def safe_post(endpoint: str, data: dict | str) -> str:
    try:
        if isinstance(data, dict):
            response = requests.post(f"{cutter_server_url}/{endpoint}", data=data, timeout=5)
        else:
            response = requests.post(f"{cutter_server_url}/{endpoint}", data=data.encode("utf-8"), timeout=5)
        response.encoding = 'utf-8'
        if response.ok:
            return response.text.strip()
        else:
            return f"Error {response.status_code}: {response.text.strip()}"
    except Exception as e:
        return f"Request failed: {str(e)}"

@mcp.tool()
def list_functions(offset: int = 0, limit: int = 100) -> list:
    """
    List all function names in the binary with their addresses and pagination.
    """
    return safe_get("functions", {"offset": offset, "limit": limit})

@mcp.tool()
def decompile_function_by_address(address: str) -> str:
    """
    Decompile a function at the given address.
    """
    return "\n".join(safe_get("decompile", {"addr": address}))

@mcp.tool()
def list_segments(offset: int = 0, limit: int = 100) -> list:
    """
    List all memory segments in the program with pagination.
    """
    return safe_get("segments", {"offset": offset, "limit": limit})

@mcp.tool()
def list_imports(offset: int = 0, limit: int = 100) -> list:
    """
    List imported symbols in the binary.
    """
    return safe_get("imports", {"offset": offset, "limit": limit})

@mcp.tool()
def list_exports(offset: int = 0, limit: int = 100) -> list:
    """
    List exported symbols in the binary.
    """
    return safe_get("exports", {"offset": offset, "limit": limit})

def list_data(offset: int = 0, limit: int = 1000) -> list:
    """
    List defined data for each function with pagination.
    """
    return safe_get("data", {"offset": offset, "limit": limit})

@mcp.tool()
def search_functions_by_name(query: str, offset: int = 0, limit: int = 100) -> list:
    """
    Search for functions whose name contains the given substring.
    """
    if not query:
        return ["Error: query string is required"]
    return safe_get("searchFunctions", {"query": query, "offset": offset, "limit": limit})

@mcp.tool()
def rename_function_by_address(function_address: str, new_name: str) -> str:
    """
    Rename a function by its address.
    """
    return safe_post("renameFunction", {"address": function_address, "newName": new_name})

@mcp.tool()
def set_decompiler_comment(address: str, comment: str) -> str:
    """
    Set a comment for a given address in the function pseudocode.
    """
    return safe_post("set_decompiler_comment", {"address": address, "comment": comment})

@mcp.tool()
def list_libraries(offset: int = 0, limit: int = 100) -> list:
    """
    List shared libraries used in the binary.
    """
    return safe_get("libraries", {"offset": offset, "limit": limit})

@mcp.tool()
def show_headers(offset: int = 0, limit: int = 100) -> list:
    """
    Show header information in the binary.
    """
    return safe_get("headers", {"offset": offset, "limit": limit})

@mcp.tool()
def show_function_detail(address: str) -> str:
    """
    Show details about function at the given address.
    """
    return "\n".join(safe_get("showFunctionDetails", {"addr": address}))

@mcp.tool()
def get_function_prototype(address: str) -> str:
    """
    Get function signature at the given address.
    """
    return "\n".join(safe_get("getFunctionPrototype", {"addr": address}))

@mcp.tool()
def xrefs_to(address: str) -> str:
    """
    List code references to the given address.
    """
    return "\n".join(safe_get("xrefsTo", {"addr": address}))

@mcp.tool()
def disassemble_function(address: str) -> str:
    """
    Disassemble a function at the given address.
    """
    return "\n".join(safe_get("disassembleFunction", {"addr": address}))

@mcp.tool()
def set_function_prototype(address: str, description: str) -> str:
    """
    Rename a function by its address.
    """
    return safe_post("setFunctionPrototype", {"address": address, "description": description})
            
if __name__ == "__main__":
    mcp.run()

```

--------------------------------------------------------------------------------
/src/CutterMCPPlugin.py:
--------------------------------------------------------------------------------

```python
import cutter
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
from datetime import datetime
from PySide6.QtWidgets import (
    QWidget, QVBoxLayout, QLabel,
    QPlainTextEdit, QPushButton
)
from PySide6.QtCore import Qt, QObject, Signal
from urllib.parse import urlparse, parse_qs

class ServerSignals(QObject):
    log_signal = Signal(str)
    status_signal = Signal(str)

class MCPDockWidget(cutter.CutterDockWidget):
    def __init__(self, parent, signals):
        super().__init__(parent)
        self.setObjectName("MCPDockWidget")
        self.setWindowTitle("HTTP Server")
        self.signals = signals

        container = QWidget()
        layout = QVBoxLayout(container)

        self.status_label = QLabel("🟢 HTTP Server: Running (Port 8000)")
        self.status_label.setAlignment(Qt.AlignCenter)
        layout.addWidget(self.status_label)

        self.log_view = QPlainTextEdit()
        self.log_view.setReadOnly(True)
        self.log_view.setPlaceholderText("HTTP Server logs will appear here...")
        layout.addWidget(self.log_view)

        self.setWidget(container)

        self.signals.log_signal.connect(self.log)

    def log(self, message):
        timestamp = datetime.now().strftime("%H:%M:%S")
        self.log_view.appendPlainText(f"[{timestamp}] {message}")

class MCPPlugin(cutter.CutterPlugin):
    def __init__(self):
        super().__init__()
        self.server = None
        self.server_thread = None
        self.signals = ServerSignals()
        self.dock_widget = None

    class MCPRequestHandler(BaseHTTPRequestHandler):
        def log_message(self, format, *args):
            message = format % args
            self.server.parent.signals.log_signal.emit(message)

        def do_GET(self):
            parsed = urlparse(self.path)
            path = parsed.path
            query = parse_qs(parsed.query)

            if path == '/functions':
                self.handle_functions(query)
            elif path == '/decompile':
                self.handle_decompile(query)
            elif path == '/segments':
                self.handle_segments(query)
            elif path == '/imports':
                self.handle_imports(query)
            elif path == '/exports':
                self.handle_exports(query)
            elif path == '/data':
                self.handle_data(query)
            elif path == '/searchFunctions':
                self.handle_search_functions(query)
            elif path == '/libraries':
                self.handle_libraries(query)
            elif path == '/headers':
                self.handle_headers(query)
            elif path == '/showFunctionDetails':
                self.handle_show_function_details(query)
            elif path == '/getFunctionPrototype':
                self.handle_get_function_prototype(query)
            elif path == '/xrefsTo':
                self.handle_xrefs_to(query)
            elif path == '/disassembleFunction':
                self.handle_disassemble_function(query)
            else:
                self.handle_root()

        def do_POST(self):
            content_length = int(self.headers['Content-Length'])
            post_data = self.rfile.read(content_length)

            parsed = urlparse(self.path)
            if parsed.path == '/renameFunction':
                self.handle_rename_function(post_data)
            elif parsed.path == '/setDecompilerComment':
                self.handle_set_decompiler_comment(post_data)
            elif parsed.path == '/setFunctionPrototype':
                self.handle_set_function_prototype(post_data)
            else:
                self.send_error(404, "Endpoint not found")

        def handle_root(self):
            self.send_response(200)
            self.send_header('Content-type', 'text/plain')
            self.end_headers()
            response = """HTTP Server Endpoints:
GET /functions - List all functions
GET /decompile?addr=ADDR - Decompile function
GET /segments - List memory segments
GET /imports - List imports
GET /exports - List exports
GET /data - List defined data
GET /searchFunctions?query=NAME - Search functions
GET /libraries - List shared libraries
GET /headers - Show header information
GET /showFunctionDetails?addr=ADDR - Show details about function
GET /getFunctionPrototype?addr=ADDR - Get function signature
GET /xrefsTo?addr=ADDR - List code references
GET /disassembleFunction?addr=ADDR - Disassemble function

POST /renameFunction - Rename a function
POST /setDecompilerComment - Set decompiler comment
POST /setFunctionPrototype - Set function signature"""
            self.wfile.write(response.encode('utf-8'))

        def handle_rename_function(self, post_data):
            try:
                params = parse_qs(post_data.decode('utf-8'))
                function_address = params.get('address', [''])[0]
                new_name = params.get('newName', [''])[0]

                if not function_address or not new_name:
                    self.send_error(400, "Both address and newName parameters are required")
                    return

                cutter.cmd(f"afn {new_name} @ {function_address}")
                self.server.parent.signals.log_signal.emit(f"Renamed function at {function_address} to {new_name}")

                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(f"Successfully renamed function at {function_address} to {new_name}".encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error renaming function: {str(e)}")

        def handle_set_decompiler_comment(self, post_data):
            try:
                params = parse_qs(post_data.decode('utf-8'))
                address = params.get('address', [''])[0]
                comment = params.get('comment', [''])[0]

                if not address or not comment:
                    self.send_error(400, "Both address and comment parameters are required")
                    return

                cutter.cmd(f"CCu {comment} @ {address}")
                self.server.parent.signals.log_signal.emit(f"Set decompiler comment at {address} to: {comment}")

                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(f"Successfully set decompiler comment at {address}".encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error setting decompiler comment: {str(e)}")

        def handle_functions(self, query):
            try:
                offset = int(query.get('offset', [0])[0])
                limit = int(query.get('limit', [100])[0])
                funcs = cutter.cmd("aflq").splitlines()
                paginated_funcs = funcs[offset:offset+limit]
                response = "\n".join(paginated_funcs)
                self.server.parent.signals.log_signal.emit(f"Served {len(paginated_funcs)} functions")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(response.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_decompile(self, query):
            try:
                addr = query.get('addr', [''])[0]
                if not addr:
                    self.send_error(400, "Address parameter is required")
                    return
                decompiled = cutter.cmd(f"pdg @ {addr}")
                self.server.parent.signals.log_signal.emit(f"Decompiled function at {addr}")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(decompiled.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_segments(self, query):
            try:
                offset = int(query.get('offset', [0])[0])
                limit = int(query.get('limit', [100])[0])
                segments = cutter.cmd("iS").splitlines()
                result = segments[offset:offset+limit]
                response = "\n".join(result)
                self.server.parent.signals.log_signal.emit(f"Served {len(result)} segments")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(response.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_imports(self, query):
            try:
                offset = int(query.get('offset', [0])[0])
                limit = int(query.get('limit', [100])[0])
                imports = cutter.cmd("ii").splitlines()
                result = imports[offset:offset+limit]
                response = "\n".join(result)
                self.server.parent.signals.log_signal.emit(f"Served {len(result)} imports")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(response.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_exports(self, query):
            try:
                offset = int(query.get('offset', [0])[0])
                limit = int(query.get('limit', [100])[0])
                exports = cutter.cmd("iE").splitlines()
                result = exports[offset:offset+limit]
                response = "\n".join(result)
                self.server.parent.signals.log_signal.emit(f"Served {len(result)} exports")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(response.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_data(self, query):
            try:
                offset = int(query.get('offset', [0])[0])
                limit = int(query.get('limit', [100])[0])
                data = cutter.cmd("pd 1000").splitlines()
                result = data[offset:offset+limit]
                response = "\n".join(result)
                self.server.parent.signals.log_signal.emit(f"Served {len(result)} data items")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(response.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_search_functions(self, query):
            try:
                search_term = query.get('query', [''])[0]
                offset = int(query.get('offset', [0])[0])
                limit = int(query.get('limit', [100])[0])
                if not search_term:
                    self.send_error(400, "Search term is required")
                    return
                search_results = cutter.cmd(f"afl~{search_term}").splitlines()
                paginated_results = search_results[offset:offset+limit]
                response = "\n".join(paginated_results)
                self.server.parent.signals.log_signal.emit(
                    f"Found {len(search_results)} functions matching '{search_term}', "
                    f"returning {len(paginated_results)}"
                )
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(response.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error searching functions: {str(e)}")

        def handle_libraries(self, query):
            try:
                offset = int(query.get('offset', [0])[0])
                limit = int(query.get('limit', [100])[0])
                libraries = cutter.cmd("ilq").splitlines()
                result = libraries[offset:offset+limit]
                response = "\n".join(result)
                self.server.parent.signals.log_signal.emit(f"Served {len(result)} libraries")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(response.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_headers(self, query):
            try:
                offset = int(query.get('offset', [0])[0])
                limit = int(query.get('limit', [100])[0])
                headers = cutter.cmd("i;iH").splitlines()
                result = headers[offset:offset+limit]
                response = "\n".join(result)
                self.server.parent.signals.log_signal.emit(f"Served {len(result)} headers")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(response.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_show_function_details(self, query):
            try:
                addr = query.get('addr', [''])[0]
                if not addr:
                    self.send_error(400, "Address parameter is required")
                    return
                functionDetail = cutter.cmd(f"afi @ {addr}")
                self.server.parent.signals.log_signal.emit(f"Served details about function at {addr}")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(functionDetail.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_get_function_prototype(self, query):
            try:
                addr = query.get('addr', [''])[0]
                if not addr:
                    self.send_error(400, "Address parameter is required")
                    return
                functionPrototype = cutter.cmd(f"afs @ {addr}")
                self.server.parent.signals.log_signal.emit(f"Served signature of function at {addr}")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(functionPrototype.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_xrefs_to(self, query):
            try:
                addr = query.get('addr', [''])[0]
                if not addr:
                    self.send_error(400, "Address parameter is required")
                    return
                xrefsTo = cutter.cmd(f"axt @ {addr}")
                self.server.parent.signals.log_signal.emit(f"Served references of code at {addr}")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(xrefsTo.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_disassemble_function(self, query):
            try:
                addr = query.get('addr', [''])[0]
                if not addr:
                    self.send_error(400, "Address parameter is required")
                    return
                disassembledFunction = cutter.cmd(f"pdf @ {addr}")
                self.server.parent.signals.log_signal.emit(f"Disassembled function at {addr}")
                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(disassembledFunction.encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error: {str(e)}")

        def handle_set_function_prototype(self, post_data):
            try:
                params = parse_qs(post_data.decode('utf-8'))
                address = params.get('address', [''])[0]
                description = params.get('description', [''])[0]

                if not address or not description:
                    self.send_error(400, "Both address and description parameters are required")
                    return

                cutter.cmd(f"afs {description} @ {address}")
                self.server.parent.signals.log_signal.emit(f"Set function signature at {address} to: {description}")

                self.send_response(200)
                self.send_header('Content-type', 'text/plain')
                self.end_headers()
                self.wfile.write(f"Successfully set function signature at {address}".encode('utf-8'))
            except Exception as e:
                self.send_error(500, f"Error setting function signature: {str(e)}")
                                
    def setupPlugin(self):
        pass

    def setupInterface(self, main):
        self.dock_widget = MCPDockWidget(main, self.signals)
        main.addPluginDockWidget(self.dock_widget)
        self.start_server()

    def start_server(self):
        if self.server is not None:
            self.signals.log_signal.emit("Server is already running!")
            return

        def run_server():
            server_address = ('', 8000)
            self.server = HTTPServer(server_address, self.MCPRequestHandler)
            self.server.parent = self
            self.signals.log_signal.emit("HTTP Server started at http://localhost:8000")
            self.signals.log_signal.emit("Available endpoints:")
            self.signals.log_signal.emit("GET /functions - List functions")
            self.signals.log_signal.emit("GET /decompile - Decompile function")
            self.signals.log_signal.emit("GET /segments - List memory segments")
            self.signals.log_signal.emit("GET /imports - List imports")
            self.signals.log_signal.emit("GET /exports - List exports")
            self.signals.log_signal.emit("GET /data - List defined data")
            self.signals.log_signal.emit("GET /searchFunctions - Search functions by name")
            self.signals.log_signal.emit("GET /libraries - List libraries")
            self.signals.log_signal.emit("GET /headers - Show headers")
            self.signals.log_signal.emit("GET /showFunctionDetails - Show details about function")
            self.signals.log_signal.emit("GET /getFunctionPrototype - Show signature of function")
            self.signals.log_signal.emit("GET /xrefsTo - List code references")
            self.signals.log_signal.emit("GET /disassembleFunction - Disassemble function")
            self.signals.log_signal.emit("POST /renameFunction - Rename a function")
            self.signals.log_signal.emit("POST /setDecompilerComment - Set decompiler comment")
            self.signals.log_signal.emit("POST /setFunctionPrototype - Set signature of function")
            self.server.serve_forever()

        self.server_thread = threading.Thread(target=run_server)
        self.server_thread.daemon = True
        self.server_thread.start()

    def terminate(self):
        if self.server:
            self.signals.log_signal.emit("Shutting down HTTP Server...")
            self.server.shutdown()
            self.server.server_close()
            self.server = None
        if self.server_thread and self.server_thread.is_alive():
            self.server_thread.join()

def create_cutter_plugin():
    return MCPPlugin()

```